[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm
pvfs2-server.h read.sm write.sm rw-sm.h
CVS commit program
cvs at parl.clemson.edu
Tue Apr 28 12:17:01 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv17589/src/server
Modified Files:
Tag: as-branch
io.sm pvfs2-server.h read.sm write.sm
Removed Files:
Tag: as-branch
rw-sm.h
Log Message:
This file has been renamed to pipeline.h.
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.5 -r1.73.6.6
--- io.sm 22 Apr 2009 19:58:42 -0000 1.73.6.5
+++ io.sm 28 Apr 2009 16:17:00 -0000 1.73.6.6
@@ -19,12 +19,12 @@
#include "pint-request.h"
#include "pvfs2-internal.h"
#include "pint-segpool.h"
-#include "rw-sm.h"
+#include "pipeline.h"
extern struct PINT_state_machine_s pvfs2_read_sm;
extern struct PINT_state_machine_s pvfs2_write_sm;
-#define NUM_OF_PARALLEL_RWS 8
+#define NUM_OF_PARALLEL_SMS 8
%%
@@ -160,11 +160,11 @@ static PINT_sm_action start_pipelining_s
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct server_configuration_s *user_opts = get_server_config_struct();
struct filesystem_configuration_s *fs_conf;
- struct PINT_server_op *rwsm_op;
+ struct PINT_server_op *pipeline_op;
int i, ret;
PINT_segpool_handle_t seg_handle;
-
- s_op->u.io.hints = s_op->req->hints;
+
+ s_op->u.io.parallel_sms = 0;
s_op->u.io.total_transferred = 0;
/* we still have the file size stored in the response structure
@@ -194,7 +194,6 @@ static PINT_sm_action start_pipelining_s
s_op->u.io.file_req_offset = s_op->req->u.io.file_req_offset;
s_op->u.io.mem_req = NULL;
s_op->u.io.aggregate_size = s_op->req->u.io.aggregate_size;
- s_op->u.io.tag = s_op->tag;
gossip_debug(GOSSIP_IO_DEBUG, "%s: tag=%d\n", __func__, s_op->tag);
s_op->u.io.user_ptr = NULL;
s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */
@@ -226,26 +225,6 @@ static PINT_sm_action start_pipelining_s
lld(s_op->u.io.aggregate_size),
llu(s_op->req->u.io.handle));
- /* FIXME: loop bodies are same, need to remove if_else??? */
- if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
- {
- s_op->u.io.address = s_op->addr;
- s_op->u.io.handle = s_op->req->u.io.handle;
- s_op->u.io.coll_id = s_op->req->u.io.fs_id;
- }
- else if (s_op->req->u.io.io_type == PVFS_IO_READ)
- {
- s_op->u.io.handle = s_op->req->u.io.handle;
- s_op->u.io.coll_id = s_op->req->u.io.fs_id;
- s_op->u.io.address = s_op->addr;
- }
- else
- {
- gossip_lerr("Server: IO SM: unknown IO type requested.\n");
- js_p->error_code = -PVFS_EINVAL;
- return SM_ACTION_COMPLETE;
- }
-
/* setup the request processing states */
gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
s_op->u.io.aggregate_size);
@@ -255,9 +234,9 @@ static PINT_sm_action start_pipelining_s
if(s_op->u.io.buffer_size < 1)
s_op->u.io.buffer_size = BUFFER_SIZE;
if(s_op->u.io.num_of_buffers < 1)
- s_op->u.io.num_of_buffers = BUFFERS_PER_PIPELINING;
+ s_op->u.io.num_of_buffers = NUM_OF_PARALLEL_SMS;
- PINT_segpool_unit_id id;
+ PINT_segpool_unit_id id[s_op->u.io.num_of_buffers];
PINT_dist_lookup(s_op->u.io.file_data.dist);
ret = PINT_segpool_init(s_op->u.io.mem_req,
@@ -273,24 +252,53 @@ static PINT_sm_action start_pipelining_s
&seg_handle);
s_op->u.io.seg_handle = seg_handle;
+ gen_mutex_init(&s_op->u.io.mutex);
for(i=0; i<s_op->u.io.num_of_buffers; i++) {
- PINT_segpool_register(seg_handle, &id);
- rwsm_op = malloc(sizeof(*rwsm_op));
- memset(rwsm_op, 0, sizeof(*rwsm_op));
- rwsm_op->u.rwsm.seg_handle = seg_handle;
- rwsm_op->u.rwsm.id = id;
- rwsm_op->u.rwsm.parent = s_op;
+ PINT_segpool_register(seg_handle, &id[i]);
+ pipeline_op = malloc(sizeof(*pipeline_op));
+ memset(pipeline_op, 0, sizeof(*pipeline_op));
+
+ pipeline_op->u.pipeline.address = s_op->addr;
+ pipeline_op->u.pipeline.handle = s_op->req->u.io.handle;
+ pipeline_op->u.pipeline.coll_id = s_op->req->u.io.fs_id;
+
+ pipeline_op->u.pipeline.seg_handle = seg_handle;
+ pipeline_op->u.pipeline.id = id[i];
+ pipeline_op->u.pipeline.parent = s_op;
+ pipeline_op->u.pipeline.hints = s_op->req->hints;
+ pipeline_op->u.pipeline.tag = s_op->tag;
+ pipeline_op->u.pipeline.buffer_size = BUFFER_SIZE;
if(s_op->req->u.io.io_type == PVFS_IO_READ) {
- ret = PINT_sm_push_frame(smcb, DO_READ, rwsm_op);
- s_op->u.io.parallel_read_sms++;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n",
- __func__, s_op->u.io.parallel_read_sms);
+ if(!pipeline_op->u.pipeline.buffer) {
+ /* if the buffer has not been used, allocate a buffer */
+ pipeline_op->u.pipeline.buffer =
+ BMI_memalloc(pipeline_op->u.pipeline.address,
+ pipeline_op->u.pipeline.buffer_size,
+ BMI_SEND);
+ /* TODO: error handling */
+ assert(pipeline_op->u.pipeline.buffer);
+ }
+
+ ret = PINT_sm_push_frame(smcb, DO_READ, pipeline_op);
+ s_op->u.io.parallel_sms++;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n",
+ __func__, s_op->u.io.parallel_sms);
}
else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
- ret = PINT_sm_push_frame(smcb, DO_WRITE, rwsm_op);
- s_op->u.io.parallel_write_sms++;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n",
- __func__, s_op->u.io.parallel_write_sms);
+ if(!pipeline_op->u.pipeline.buffer){
+ /* if the buffer has not been used, allocate a buffer */
+ pipeline_op->u.pipeline.buffer =
+ BMI_memalloc(pipeline_op->u.pipeline.address,
+ pipeline_op->u.pipeline.buffer_size,
+ BMI_RECV);
+ /* TODO: error handling */
+ assert(pipeline_op->u.pipeline.buffer);
+ }
+
+ ret = PINT_sm_push_frame(smcb, DO_WRITE, pipeline_op);
+ s_op->u.io.parallel_sms++;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n",
+ __func__, s_op->u.io.parallel_sms);
}
if(ret < 0) {
js_p->error_code = -PVFS_ENOMEM;
@@ -353,6 +361,11 @@ static PINT_sm_action io_cleanup(
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
char status_string[64] = {0};
+ int i;
+ struct PINT_server_op *pipeline_op;
+ int task_id;
+ int remaining;
+ PVFS_error tmp_err;
PVFS_strerror_r(s_op->resp.status, status_string, 64);
PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "finish (%s)\n", status_string);
@@ -362,6 +375,28 @@ static PINT_sm_action io_cleanup(
PINT_segpool_destroy(s_op->u.io.seg_handle);
}
+ for(i=0; i<s_op->u.io.parallel_sms; i++)
+ {
+ pipeline_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err,
+ &remaining);
+ gossip_debug(GOSSIP_SERVER_DEBUG,
+ "io: nested sm returned error code: %d\n", tmp_err);
+ if(pipeline_op->u.pipeline.buffer) {
+ if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+ BMI_memfree(pipeline_op->u.pipeline.address,
+ pipeline_op->u.pipeline.buffer,
+ s_op->u.io.buffer_size, BMI_SEND);
+ }
+ else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+ BMI_memfree(pipeline_op->u.pipeline.address,
+ pipeline_op->u.pipeline.buffer,
+ s_op->u.io.buffer_size, BMI_RECV);
+ }
+ }
+ free(pipeline_op);
+ }
+
+ gen_mutex_destroy(&s_op->u.io.mutex);
/* let go of our encoded response buffer, if we appear to have
* made one
*/
@@ -398,7 +433,9 @@ static PINT_sm_action io_send_completion
int err = -PVFS_EIO;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
-
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: s_op->u.io.parallel_sms=%d\n",
+ __func__, s_op->u.io.parallel_sms);
/* we only send this trailing ack if we are working on a write
* operation; otherwise just cut out early
*/
@@ -406,11 +443,11 @@ static PINT_sm_action io_send_completion
{
/* normal READ (i.e., without op and datatype) */
if(s_op->req->u.io.op == 0) { /* AS */
- gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack()?\n");
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: normal READ\n", __func__);
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
- gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), IO_READ with op\n");
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_READ with op\n", __func__);
}
/* release encoding of the first ack that we sent */
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.3 -r1.155.6.4
--- pvfs2-server.h 20 Apr 2009 21:10:50 -0000 1.155.6.3
+++ pvfs2-server.h 28 Apr 2009 16:17:00 -0000 1.155.6.4
@@ -333,15 +333,12 @@ struct PINT_server_getconfig_op
struct PINT_server_io_op
{
- //void *parent;
- PVFS_fs_id coll_id;
- PVFS_handle handle;
- PVFS_BMI_addr_t address;
-
+ gen_mutex_t mutex;
+ PINT_segpool_handle_t seg_handle;
PINT_Request *file_req;
PVFS_offset file_req_offset;
PINT_Request *mem_req;
- PVFS_msg_tag_t tag;
+
void *user_ptr;
int op;
@@ -358,26 +355,28 @@ struct PINT_server_io_op
PVFS_size total_transferred;
- int parallel_write_sms;
- int parallel_read_sms;
- PINT_segpool_handle_t seg_handle;
-
- PVFS_hint hints;
+ int parallel_sms;
};
/* substibute for flow */
-struct PINT_server_rwsm_op
+struct PINT_server_pipeline_op
{
+ PVFS_fs_id coll_id;
+ PVFS_handle handle;
+ PVFS_BMI_addr_t address;
+
void *parent;
- void *buffer;
+ char *buffer;
+ PVFS_size buffer_size;
PVFS_size buffer_used;
PVFS_size out_size;
PINT_segpool_handle_t seg_handle;
PINT_segpool_unit_id id;
PVFS_offset *offsets;
- PINT_Request_state *file_req_state;
PVFS_size *sizes;
int segs;
+ PVFS_hint hints;
+ PVFS_msg_tag_t tag;
};
struct PINT_server_small_io_op
@@ -515,7 +514,7 @@ typedef struct PINT_server_op
struct PINT_server_rmdirent_op rmdirent;
struct PINT_server_io_op io;
struct PINT_server_small_io_op small_io;
- struct PINT_server_rwsm_op rwsm;
+ struct PINT_server_pipeline_op pipeline;
struct PINT_server_flush_op flush;
struct PINT_server_truncate_op truncate;
struct PINT_server_mkdir_op mkdir;
Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.5 -r1.1.2.6
--- read.sm 22 Apr 2009 19:58:42 -0000 1.1.2.5
+++ read.sm 28 Apr 2009 16:17:00 -0000 1.1.2.6
@@ -18,7 +18,7 @@
#include "pint-distribution.h"
#include "pint-request.h"
#include "pvfs2-internal.h"
-#include "rw-sm.h"
+#include "pipeline.h"
#include "trove.h"
static void do_comp(struct PINT_server_op *);
@@ -58,11 +58,11 @@ nested machine pvfs2_read_sm
/*
* PINT_process_request() -> job_trove_bstream_read_list()
*/
-static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
- PINT_segpool_unit_id id = s_op->u.rwsm.id;
+ PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.pipeline.id;
int ret;
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
@@ -71,30 +71,15 @@ static int trove_read_call(struct PINT_s
PVFS_size *sizes;
PVFS_size bytes;
job_id_t tmp_id;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- s_op->u.rwsm.buffer_used = 0;
+ s_op->u.pipeline.buffer_used = 0;
+ s_op->u.pipeline.segs = 0;
+ bytes = s_op->u.pipeline.buffer_size;
- if(!s_op->u.rwsm.buffer) {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: !buffer\n", __func__);
- /* if the buffer has not been used, allocate a buffer */
- s_op->u.rwsm.buffer = BMI_memalloc(
- parent_s_op->u.io.address,
- parent_s_op->u.io.buffer_size, BMI_SEND);
- /* TODO: error handling */
- assert(s_op->u.rwsm.buffer);
- }
-
- bytes = parent_s_op->u.io.buffer_size;
PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
&offsets, &sizes);
gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
bytes, count);
-
- s_op->u.rwsm.buffer_used = bytes;
- s_op->u.rwsm.offsets = offsets;
- s_op->u.rwsm.sizes = sizes;
- s_op->u.rwsm.segs = count;
if(count == 0) {
js_p->error_code = 0;
@@ -102,29 +87,50 @@ static int trove_read_call(struct PINT_s
return SM_ACTION_COMPLETE;
}
- fs_config = PINT_config_get_filesystems(&server_config);
+ s_op->u.pipeline.buffer_used = bytes;
+ s_op->u.pipeline.offsets = offsets;
+ s_op->u.pipeline.sizes = sizes;
+ s_op->u.pipeline.segs = count;
+
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ server_config = get_server_config_struct();
+ if(!server_config)
+ {
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ fs_config = PINT_config_find_fs_id(
+ server_config, s_op->u.pipeline.coll_id);
+ if(!fs_config)
+ {
+ gossip_err("%s: Failed to get filesystem "
+ "config from fs_id of: %d\n", __func__,
+ s_op->u.pipeline.coll_id);
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
ret = job_trove_bstream_read_list(
- parent_s_op->u.io.coll_id,
- parent_s_op->u.io.handle,
- (char **)&s_op->u.rwsm.buffer,
- (PVFS_size *)&s_op->u.rwsm.buffer_used,
+ s_op->u.pipeline.coll_id,
+ s_op->u.pipeline.handle,
+ (char **)&s_op->u.pipeline.buffer,
+ (PVFS_size *)&s_op->u.pipeline.buffer_used,
1,
offsets,
sizes,
count,
- &s_op->u.rwsm.out_size,
- fs_config->trove_sync_data,
+ &s_op->u.pipeline.out_size,
+ (fs_config->trove_sync_data ? TROVE_SYNC : 0),
NULL,
smcb,
0,
js_p,
&tmp_id,
server_job_context,
- parent_s_op->u.io.hints);
+ s_op->u.pipeline.hints);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
-
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
/* FIXME */
@@ -147,30 +153,32 @@ static int trove_read_call(struct PINT_s
return SM_ACTION_COMPLETE;
}
-static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
- if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
+ if(s_op->u.pipeline.segs == 0) {
js_p->error_code = 0;
gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
return SM_ACTION_COMPLETE;
}
gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
- s_op->u.rwsm.buffer_used);
+ s_op->u.pipeline.buffer_used);
+#if 0
gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
- (char *)s_op->u.rwsm.buffer);
+ (char *)s_op->u.pipeline.buffer);
+#endif
/***************************************************/
if(parent_s_op->u.io.op != 0)
do_comp(s_op);
/***************************************************/
- assert(s_op->u.rwsm.buffer_used);
+ assert(s_op->u.pipeline.buffer_used);
if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
ret = 1; /* AS: skip sending if op is specified */
gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
@@ -179,10 +187,12 @@ static int bmi_send_call(struct PINT_smc
}
else
/* FIXME: what should be a proper value for status_user_tag? */
- ret = job_bmi_send(parent_s_op->u.io.address,
- s_op->u.rwsm.buffer,
- s_op->u.rwsm.buffer_used,
- parent_s_op->u.io.tag,
+ /* FIXME: 3rd parameter, if s_op->u.pipeline.out_size is used,
+ client might complain about size mismatch */
+ ret = job_bmi_send(s_op->u.pipeline.address,
+ s_op->u.pipeline.buffer,
+ js_p->actual_size,
+ s_op->u.pipeline.tag,
BMI_PRE_ALLOC,
0, /* send_unexpected */
smcb, /* user_ptr */
@@ -191,7 +201,7 @@ static int bmi_send_call(struct PINT_smc
&tmp_id,
server_job_context,
user_opts->server_job_bmi_timeout,
- (bmi_hint)parent_s_op->u.io.hints);
+ (bmi_hint)s_op->u.pipeline.hints);
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
@@ -215,24 +225,29 @@ static int bmi_send_call(struct PINT_smc
return SM_ACTION_COMPLETE;
}
-static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
+ PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
js_p->error_code = 0;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
-
- parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+ gen_mutex_lock(&parent_s_op->u.io.mutex);
+ parent_s_op->u.io.total_transferred += js_p->actual_size;
+ gen_mutex_unlock(&parent_s_op->u.io.mutex);
gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
parent_s_op->u.io.total_transferred);
gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
- s_op->u.rwsm.segs);
+ s_op->u.pipeline.segs);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+ js_p->actual_size);
+
/* FIXME: */
/* unless the second condition is set, the server starts
a new read request to one already done, and falls into
infinite trove_read->bmi_send->check_done loop */
- if(!segpool_done(h) && s_op->u.rwsm.segs != 0) {
+ if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
js_p->error_code = LOOP;
}
@@ -243,39 +258,31 @@ static int check_done(struct PINT_smcb *
return SM_ACTION_COMPLETE;
}
-static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
{
- struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
js_p->error_code = 0;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
-
- if(s_op->u.rwsm.buffer) {
- BMI_memfree(parent_s_op->u.io.address, s_op->u.rwsm.buffer,
- parent_s_op->u.io.buffer_size, BMI_SEND);
- }
- gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
return SM_ACTION_COMPLETE;
}
static void do_comp(struct PINT_server_op *s_op)
{
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
- if(s_op->u.rwsm.buffer) {
+ if(s_op->u.pipeline.buffer) {
PVFS_size i;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n",
- __func__, s_op->u.rwsm.buffer_used,
+ __func__, s_op->u.pipeline.buffer_used,
parent_s_op->u.io.op,
parent_s_op->u.io.datatype); /* AS */
switch(parent_s_op->u.io.datatype) {
case ((int)0x4c000405): /* MPI_INT */
{
- int *a = s_op->u.rwsm.buffer;
+ int *a = s_op->u.pipeline.buffer;
int result;
- PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_INT).ub);
+ PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_INT).ub);
int *tmp;
if (parent_s_op->u.io.total_transferred == 0) {
@@ -329,16 +336,16 @@ static void do_comp(struct PINT_server_o
default:
break;
}
- s_op->u.rwsm.buffer = (void *)a;
+ s_op->u.pipeline.buffer = (void *)a;
parent_s_op->u.io.tmp_buffer = (void *)tmp;
}
break;
case ((int)0x4c00080b): /* MPI_DOUBLE */
{
- double *a = s_op->u.rwsm.buffer;
+ double *a = s_op->u.pipeline.buffer;
double result;
- PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_DOUBLE).ub);
+ PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
double *tmp;
if (parent_s_op->u.io.total_transferred == 0) {
@@ -396,7 +403,7 @@ static void do_comp(struct PINT_server_o
default:
break;
} /* end inner switch */
- s_op->u.rwsm.buffer = (void *)a;
+ s_op->u.pipeline.buffer = (void *)a;
parent_s_op->u.io.tmp_buffer = (void *)tmp;
}
Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.4 -r1.1.2.5
--- write.sm 20 Apr 2009 21:10:50 -0000 1.1.2.4
+++ write.sm 28 Apr 2009 16:17:00 -0000 1.1.2.5
@@ -18,7 +18,7 @@
#include "pint-distribution.h"
#include "pint-request.h"
#include "pvfs2-internal.h"
-#include "rw-sm.h"
+#include "pipeline.h"
#include "trove.h"
#include "pint-segpool.h"
@@ -60,8 +60,8 @@ nested machine pvfs2_write_sm
static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
- PINT_segpool_unit_id id = s_op->u.rwsm.id;
+ PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.pipeline.id;
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
@@ -69,47 +69,37 @@ static PINT_sm_action bmi_recv_call(stru
PVFS_offset *offsets;
PVFS_size *sizes;
PVFS_size bytes;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- s_op->u.rwsm.buffer_used = 0;
-
- if(!s_op->u.rwsm.buffer){
- /* if the buffer has not been used, allocate a buffer */
- s_op->u.rwsm.buffer = BMI_memalloc(
- parent_s_op->u.io.address,
- parent_s_op->u.io.buffer_size, BMI_RECV);
- /* TODO: error handling */
- assert(s_op->u.rwsm.buffer);
- }
+ s_op->u.pipeline.buffer_used = 0;
+ bytes = s_op->u.pipeline.buffer_size;
- bytes = parent_s_op->u.io.buffer_size;
PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
&offsets, &sizes);
gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n",
__func__, count, bytes);
- s_op->u.rwsm.buffer_used = bytes;
- s_op->u.rwsm.offsets = offsets;
- s_op->u.rwsm.sizes = sizes;
- s_op->u.rwsm.segs = count;
if(count == 0) {
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
-
- /* TODO: what if we recv less than expected? */
- ret = job_bmi_recv(parent_s_op->u.io.address,
- (char *)s_op->u.rwsm.buffer,
- parent_s_op->u.io.buffer_size,
- parent_s_op->u.io.tag,
+
+ s_op->u.pipeline.buffer_used = bytes;
+ s_op->u.pipeline.offsets = offsets;
+ s_op->u.pipeline.sizes = sizes;
+ s_op->u.pipeline.segs = count;
+
+ ret = job_bmi_recv(s_op->u.pipeline.address,
+ (void *)s_op->u.pipeline.buffer,
+ s_op->u.pipeline.buffer_size,
+ s_op->u.pipeline.tag,
BMI_PRE_ALLOC,
smcb,
- 0, /* unsigned long status_user_tag = 0 */
+ 1, /* unsigned long status_user_tag = 0 */
js_p,
&tmp_id,
server_job_context,
user_opts->server_job_flow_timeout,
- (bmi_hint)parent_s_op->u.io.hints);
+ (bmi_hint)s_op->u.pipeline.hints);
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
@@ -118,7 +108,7 @@ static PINT_sm_action bmi_recv_call(stru
js_p->error_code = -PVFS_EIO;
return SM_ACTION_COMPLETE;
}
-
+ /* immediate return */
if(ret == 1) {
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
@@ -128,7 +118,6 @@ static PINT_sm_action bmi_recv_call(stru
return SM_ACTION_DEFERRED;
}
- js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
@@ -139,39 +128,62 @@ static PINT_sm_action trove_write_call(s
job_id_t tmp_id;
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
- s_op->u.rwsm.buffer_used);
+ s_op->u.pipeline.buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+ js_p->actual_size);
+#if 0
gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
- (char *)s_op->u.rwsm.buffer);
+ (char *)s_op->u.pipeline.buffer);
+#endif
- if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
+ if(s_op->u.pipeline.buffer_used == 0) {
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
+
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ server_config = get_server_config_struct();
+ if(!server_config)
+ {
+ gossip_err("write_io: server config is NULL!\n");
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
- fs_config = PINT_config_get_filesystems(&server_config);
+ fs_config = PINT_config_find_fs_id(
+ server_config, s_op->u.pipeline.coll_id);
+ if(!fs_config)
+ {
+ gossip_err("write_io: Failed to get filesystem "
+ "config from fs_id of: %d\n",
+ s_op->u.pipeline.coll_id);
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
ret = job_trove_bstream_write_list(
- parent_s_op->u.io.coll_id,
- parent_s_op->u.io.handle,
- (char**)&s_op->u.rwsm.buffer,
- (TROVE_size *)&s_op->u.rwsm.buffer_used,
+ s_op->u.pipeline.coll_id,
+ s_op->u.pipeline.handle,
+ (char **)&s_op->u.pipeline.buffer,
+ (TROVE_size *)&js_p->actual_size,
1,
- s_op->u.rwsm.offsets,
- s_op->u.rwsm.sizes,
- s_op->u.rwsm.segs,
- &s_op->u.rwsm.out_size,
- fs_config->trove_sync_data,
+ s_op->u.pipeline.offsets,
+ s_op->u.pipeline.sizes,
+ s_op->u.pipeline.segs,
+ &s_op->u.pipeline.out_size,
+ (fs_config->trove_sync_data ? TROVE_SYNC : 0),
NULL,
smcb,
0,
js_p,
&tmp_id,
server_job_context,
- parent_s_op->u.io.hints);
+ s_op->u.pipeline.hints);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
//handle_io_error(ret, q_item, flow_data);
@@ -181,29 +193,33 @@ static PINT_sm_action trove_write_call(s
}
if(ret == 1) {
- /* immediate completion; trigger callback ourselves */
+ /* immediate completion */
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
- /* FIXME: not sure if this is required */
if(ret == 0) {
+ js_p->error_code = 0;
return SM_ACTION_DEFERRED;
}
return SM_ACTION_COMPLETE;
}
-static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
+ PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
js_p->error_code = 0;
- struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
- parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
+ parent_s_op->u.io.total_transferred += s_op->u.pipeline.out_size;
gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
parent_s_op->u.io.total_transferred);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
+ s_op->u.pipeline.buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: out_size=%ld\n", __func__,
+ s_op->u.pipeline.out_size);
if(!segpool_done(h)) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
@@ -212,16 +228,15 @@ static int check_done(struct PINT_smcb *
else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
}
-
+
return SM_ACTION_COMPLETE;
}
-static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
{
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
- //return(server_state_machine_complete(smcb)); // why not this?
}
/*
More information about the Pvfs2-cvs
mailing list