[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm
pvfs2-server.h read.sm rw-sm.h write.sm
CVS commit program
cvs at parl.clemson.edu
Sat Apr 18 17:59:24 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv15808/src/server
Modified Files:
Tag: as-branch
io.sm pvfs2-server.h read.sm rw-sm.h write.sm
Log Message:
Initial version of parallel read/write sms is implemented.
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.2 -r1.73.6.3
--- io.sm 14 Apr 2009 22:18:53 -0000 1.73.6.2
+++ io.sm 18 Apr 2009 21:59:24 -0000 1.73.6.3
@@ -24,6 +24,8 @@
extern struct PINT_state_machine_s pvfs2_read_sm;
extern struct PINT_state_machine_s pvfs2_write_sm;
+#define NUM_OF_PARALLEL_RWS 8
+
%%
machine pvfs2_io_sm
@@ -156,7 +158,6 @@ static PINT_sm_action start_pipeline_sm(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- //int err = -PVFS_EIO;
struct server_configuration_s *user_opts = get_server_config_struct();
struct filesystem_configuration_s *fs_conf;
struct PINT_server_op *flow_read_op;
@@ -270,48 +271,6 @@ static PINT_sm_action start_pipeline_sm(
flow_data->parent = s_op->u.io.flow_d;
/* setup the request processing states */
-#if 0 //////////////////
- s_op->u.io.flow_d->file_req_state =
- PINT_new_request_state(s_op->u.io.flow_d->file_req);
- if (!s_op->u.io.flow_d->file_req_state)
- {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
-
- /* only setup a memory datatype state if caller provided a memory datatype */
- if(s_op->u.io.flow_d->mem_req)
- {
- s_op->u.io.flow_d->mem_req_state =
- PINT_new_request_state(s_op->u.io.flow_d->mem_req);
- if (!s_op->u.io.flow_d->mem_req_state)
- {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
- }
-
- /* if a file datatype offset was specified, go ahead and skip ahead
- * before doing anything else
- */
- if(s_op->u.io.flow_d->file_req_offset)
- PINT_REQUEST_STATE_SET_TARGET(s_op->u.io.flow_d->file_req_state,
- s_op->u.io.flow_d->file_req_offset);
-
-
- /* set boundaries on file datatype */
- if(s_op->u.io.flow_d->aggregate_size > -1)
- {
- PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
- s_op->u.io.flow_d->aggregate_size+s_op->u.io.flow_d->file_req_offset);
- }
- else
- {
- PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
- s_op->u.io.flow_d->file_req_offset +
- PINT_REQUEST_TOTAL_BYTES(s_op->u.io.flow_d->mem_req));
- }
-#endif //////////////////////
gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
s_op->u.io.flow_d->aggregate_size);
gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
@@ -354,16 +313,13 @@ static PINT_sm_action start_pipeline_sm(
PINT_SP_SERVER_READ,
&seg_handle);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
- //for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
- for(i=0; i<1; i++) {
+ s_op->u.io.seg_handle = seg_handle;
+ for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
PINT_segpool_register(seg_handle, &id);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
flow_data->prealloc_array[i].result_chain.q_item =
&flow_data->prealloc_array[i];
flow_read_op = malloc(sizeof(*flow_read_op));
memset(flow_read_op, 0, sizeof(*flow_read_op));
- memcpy(flow_read_op, s_op, sizeof(*flow_read_op));
flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
flow_read_op->u.flow_read.seg_handle = seg_handle;
flow_read_op->u.flow_read.id = id;
@@ -372,7 +328,9 @@ static PINT_sm_action start_pipeline_sm(
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
- gossip_debug(GOSSIP_IO_DEBUG, "flow_read: ret=%d\n", ret);
+ s_op->u.io.parallel_read_sms++;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n",
+ __func__, s_op->u.io.parallel_read_sms);
}
}
else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
@@ -393,19 +351,15 @@ static PINT_sm_action start_pipeline_sm(
PINT_SP_SERVER_WRITE,
&seg_handle);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
- /* only post one outstanding recv at a time; easier to manage */
+ s_op->u.io.seg_handle = seg_handle;
/* FIXME: we will eventually want to post multiple of recvs */
- //for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
- for(i=0; i<1; i++) {
+ for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
PINT_segpool_register(seg_handle, &id);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
flow_data->prealloc_array[i].result_chain.q_item =
&flow_data->prealloc_array[i];
flow_write_op = malloc(sizeof(*flow_write_op));
memset(flow_write_op, 0, sizeof(*flow_write_op));
- memcpy(flow_write_op, s_op, sizeof(*flow_write_op));
flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
flow_write_op->u.flow_write.seg_handle = seg_handle;
flow_write_op->u.flow_write.id = id;
@@ -414,6 +368,9 @@ static PINT_sm_action start_pipeline_sm(
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
+ s_op->u.io.parallel_write_sms++;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n",
+ __func__, s_op->u.io.parallel_write_sms);
}
}
@@ -479,6 +436,11 @@ static PINT_sm_action io_cleanup(
if (s_op->u.io.flow_d)
{
PINT_flow_free(s_op->u.io.flow_d);
+ }
+
+ if (s_op->u.io.seg_handle) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: freeing seg_handle\n", __func__);
+ PINT_segpool_destroy(s_op->u.io.seg_handle);
}
/* let go of our encoded response buffer, if we appear to have
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.1 -r1.155.6.2
--- pvfs2-server.h 14 Apr 2009 20:19:50 -0000 1.155.6.1
+++ pvfs2-server.h 18 Apr 2009 21:59:24 -0000 1.155.6.2
@@ -334,6 +334,10 @@ struct PINT_server_getconfig_op
struct PINT_server_io_op
{
flow_descriptor* flow_d;
+ //io_pipeline* iop; /* substitute for flow_d */
+ int parallel_write_sms;
+ int parallel_read_sms;
+ PINT_segpool_handle_t seg_handle;
};
/* This is for flow state machine */
@@ -341,6 +345,8 @@ struct PINT_server_io_op
struct PINT_server_flow_read_op
{
struct fp_queue_item *q_item;
+ void *buffer; /* substitute for q_item */
+ PVFS_size buffer_used; /* substitute for q_item */
PINT_segpool_handle_t seg_handle;
PINT_segpool_unit_id id;
PVFS_offset *offsets;
@@ -353,6 +359,8 @@ struct PINT_server_flow_read_op
struct PINT_server_flow_write_op
{
struct fp_queue_item *q_item;
+ void *buffer; /* substitute for q_item */
+ PVFS_size buffer_used; /* substitute for q_item */
PINT_segpool_handle_t seg_handle;
PINT_segpool_unit_id id;
PVFS_offset *offsets;
Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- read.sm 14 Apr 2009 20:19:50 -0000 1.1.2.1
+++ read.sm 18 Apr 2009 21:59:24 -0000 1.1.2.2
@@ -62,48 +62,19 @@ static int trove_read_call(struct PINT_s
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
PINT_segpool_unit_id id = s_op->u.flow_read.id;
- void *tmp_buffer;
- PVFS_size bytes_processed = 0;
- int err = -PVFS_EIO;
int ret;
- struct server_configuration_s *user_opts = get_server_config_struct();
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
int count;
PVFS_offset *offsets;
PVFS_size *sizes;
PVFS_size bytes;
- int tmp_id;
-
+ job_id_t tmp_id;
+
q_item->buffer_used = 0;
-#if 0
- gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "%s: error_code: %d, initial_call_flag: %d, flow: %p.\n",
- __func__, error_code, initial_call_flag,
- flow_data->parent);
-
- if(error_code != 0 || flow_data->parent->error_code != 0)
- {
- gossip_err("%s: I/O error occurred\n", __func__);
- handle_io_error(error_code, q_item, flow_data);
- if(flow_data->parent->state == FLOW_COMPLETE)
- return(1);
- else
- return(0);
- }
- /* if there are no more receives to post, just return */
- if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
- {
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-#endif
if(!q_item->buffer) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
/* if the q_item has not been used, allocate a buffer */
@@ -115,8 +86,9 @@ static int trove_read_call(struct PINT_s
}
bytes = q_item->parent->buffer_size;
- PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%d, count=%d\n", __func__,
+ PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
+ &offsets, &sizes);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
bytes, count);
q_item->buffer_used = bytes;
@@ -126,76 +98,50 @@ static int trove_read_call(struct PINT_s
if(count == 0) {
js_p->error_code = 0;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
return SM_ACTION_COMPLETE;
}
-#if 0
- if(bytes == 0) {
- if(flow_data->parent->total_transferred ==
- flow_data->total_bytes_processed &&
- PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
- assert(q_item->parent->state != FLOW_COMPLETE);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
- q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
- }
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-#endif
+ fs_config = PINT_config_get_filesystems(&server_config);
- //do{
- //assert(q_item->buffer_used);
- //assert(result_tmp->result.bytes);
- //result_tmp->q_item = q_item;
- /* XXX: can someone confirm this avoids a segfault in the immediate
- * completion case? */
- //assert(result_tmp->result.bytes);
-
- fs_config = PINT_config_get_filesystems(&server_config);
-
- ret = job_trove_bstream_read_list(
- q_item->parent->src.u.trove.coll_id,
- q_item->parent->src.u.trove.handle,
- //(char**)&result_tmp->buffer_offset,
- &q_item->buffer,
- &q_item->buffer_used,
- 1,
- offsets,
- sizes,
- count,
- &q_item->out_size,
- fs_config->trove_sync_data,
- NULL,
- smcb,
- 0,
- js_p,
- &tmp_id,
- server_job_context,
- q_item->parent->hints);
+ ret = job_trove_bstream_read_list(
+ q_item->parent->src.u.trove.coll_id,
+ q_item->parent->src.u.trove.handle,
+ &q_item->buffer,
+ (PVFS_size *)&q_item->buffer_used,
+ 1,
+ offsets,
+ sizes,
+ count,
+ &q_item->out_size,
+ fs_config->trove_sync_data,
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context,
+ q_item->parent->hints);
- if(ret < 0)
- {
- gossip_err("%s: I/O error occurred\n", __func__);
- /* FIXME */
- //handle_io_error(ret, q_item, flow_data);
- if(flow_data->parent->state == FLOW_COMPLETE)
- return (1);
- else
- return (0);
- }
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
+
+ if(ret < 0) {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME */
+ //handle_io_error(ret, q_item, flow_data);
+ js_p->error_code = -PVFS_EIO;
+ return SM_ACTION_COMPLETE;
+ }
- if(ret == 1)
- {
- /* immediate completion; trigger callback ourselves */
- js_p->error_code = 1;
- return SM_ACTION_COMPLETE;
- }
+ if(ret == 1) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
- if(ret == 0) {
- js_p->error_code = 0;
- return SM_ACTION_DEFERRED;
- }
- //} while(result_tmp);
+ if(ret == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_DEFERRED;
+ }
return SM_ACTION_COMPLETE;
}
@@ -205,53 +151,17 @@ static int bmi_send_call(struct PINT_smc
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
- int err = -PVFS_EIO;
- int done = 0;
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
- PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
-
-#if 0
- gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "%s, error_code: %d, flow: %p.\n", __func__,
- error_code, flow_data->parent);
-
- result_tmp->posted_id = 0;
-
- if(error_code != 0 || flow_data->parent->error_code != 0)
- {
- gossip_err("%s: I/O error occurred\n", __func__);
- handle_io_error(error_code, q_item, flow_data);
- return;
- }
-#endif
if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-#if 0
- if(q_item->parent->state == FLOW_COMPLETE) {
- js_p->error_code = 0;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
return SM_ACTION_COMPLETE;
}
- gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->result.bytes=%ld\n",
- __func__, result_tmp->result.bytes);
-
- result_tmp = &q_item->result_chain;
- do{
- old_result_tmp = result_tmp;
- result_tmp = result_tmp->next;
- if(old_result_tmp != &q_item->result_chain)
- free(old_result_tmp);
- } while(result_tmp);
- q_item->result_chain.next = NULL;
-#endif
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
q_item->buffer_used);
gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
(char *)q_item->buffer);
@@ -280,7 +190,7 @@ static int bmi_send_call(struct PINT_smc
js_p,
&tmp_id,
server_job_context,
- user_opts->server_job_flow_timeout,
+ user_opts->server_job_bmi_timeout,
(bmi_hint)q_item->parent->hints);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
@@ -317,29 +227,25 @@ static int check_done(struct PINT_smcb *
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
js_p->error_code = 0;
- flow_data->total_bytes_processed += q_item->buffer_used;
q_item->parent->total_transferred += q_item->buffer_used;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
- flow_data->total_bytes_processed);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+ q_item->parent->total_transferred);
gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
s_op->u.flow_read.segs);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: handle=%d\n", __func__, h);
- //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
- if(!segpool_done(h)) {
+ /* FIXME: */
+ /* unless the second condition is set, the server starts
+ a new read request to one already done, and falls into
+ infinite trove_read->bmi_send->check_done loop */
+ if(!segpool_done(h) && s_op->u.flow_read.segs != 0) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
js_p->error_code = LOOP;
}
else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: DONE\n", __func__);
- flow_data->parent->state = FLOW_COMPLETE;
- FLOW_CLEANUP(flow_data);
}
return SM_ACTION_COMPLETE;
@@ -347,10 +253,17 @@ static int check_done(struct PINT_smcb *
static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
js_p->error_code = 0;
+ if(q_item->buffer) {
+ BMI_memfree(q_item->parent->src.u.bmi.address,
+ q_item->buffer, q_item->parent->buffer_size, BMI_SEND);
+ }
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
return SM_ACTION_COMPLETE;
+ //return(server_state_machine_complete(smcb)); // why not this?
}
static void do_comp(struct fp_queue_item *q_item)
Index: rw-sm.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/rw-sm.h,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- rw-sm.h 14 Apr 2009 20:21:42 -0000 1.1.2.1
+++ rw-sm.h 18 Apr 2009 21:59:24 -0000 1.1.2.2
@@ -19,6 +19,7 @@ do {
free(__flow_data); \
} while(0)
+
struct result_chain_entry
{
PVFS_id_gen_t posted_id;
Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- write.sm 14 Apr 2009 22:18:53 -0000 1.1.2.2
+++ write.sm 18 Apr 2009 21:59:24 -0000 1.1.2.3
@@ -61,13 +61,8 @@ static PINT_sm_action bmi_recv_call(stru
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
PINT_segpool_unit_id id = s_op->u.flow_write.id;
- PVFS_size bytes_processed = 0;
- void *tmp_buffer;
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
@@ -77,26 +72,7 @@ static PINT_sm_action bmi_recv_call(stru
PVFS_size bytes;
q_item->buffer_used = 0;
-#if 0
- /* FIXME: not sure whether we need to check this
- * for now just commented out */
- if(js_p->error_code != 0 || flow_data->parent->error_code != 0)
- {
- gossip_err("%s: I/O error occurred\n", __func__);
- //handle_io_error(error_code, q_item, flow_data);
- js_p->error_code = error_code;
- return SM_ACTION_COMPLETE;
- }
-#endif
-
-#if 0
- /* if there are no more receives to post, just return */
- if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
- {
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-#endif
+
if(!q_item->buffer){
/* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(
@@ -107,8 +83,10 @@ static PINT_sm_action bmi_recv_call(stru
}
bytes = q_item->parent->buffer_size;
- PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%d\n", __func__, count, bytes);
+ PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
+ &offsets, &sizes);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n",
+ __func__, count, bytes);
q_item->buffer_used = bytes;
s_op->u.flow_write.offsets = offsets;
s_op->u.flow_write.sizes = sizes;
@@ -118,25 +96,6 @@ static PINT_sm_action bmi_recv_call(stru
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
-#if 0
- if(count == 0) {
- if(flow_data->parent->total_transferred ==
- flow_data->total_bytes_processed &&
- PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
- /* never reach this point */
- assert(q_item->parent->state != FLOW_COMPLETE);
- q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
- }
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-
- gossip_debug(GOSSIP_DIRECTIO_DEBUG,
- "offset %llu, buffer ptr: %p\n",
- llu(q_item->result_chain.result.offset_array[0]),
- q_item->buffer);
-#endif
/* TODO: what if we recv less than expected? */
ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
@@ -161,7 +120,6 @@ static PINT_sm_action bmi_recv_call(stru
}
if(ret == 1) {
- /* immediate completion; trigger callback ourselves */
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
@@ -178,82 +136,60 @@ static PINT_sm_action trove_write_call(s
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- PVFS_size tmp_actual_size;
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
- PVFS_size bytes_processed = 0;
- void *tmp_buffer;
- void *tmp_user_ptr;
int ret;
- struct server_configuration_s *user_opts = get_server_config_struct();
+ job_id_t tmp_id;
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
- PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
q_item->buffer_used);
gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
(char *)q_item->buffer);
- if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+ if(s_op->u.flow_write.segs == 0 && q_item->buffer_used == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+
+ fs_config = PINT_config_get_filesystems(&server_config);
+
+ ret = job_trove_bstream_write_list(
+ q_item->parent->dest.u.trove.coll_id,
+ q_item->parent->dest.u.trove.handle,
+ (char**)&q_item->buffer,
+ (TROVE_size *)&q_item->buffer_used,
+ 1,
+ s_op->u.flow_write.offsets,
+ s_op->u.flow_write.sizes,
+ s_op->u.flow_write.segs,
+ &q_item->out_size,
+ fs_config->trove_sync_data,
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context,
+ q_item->parent->hints);
+
+ if(ret < 0) {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ //handle_io_error(ret, q_item, flow_data);
+ /* FIXME ***************/
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 1) {
+ /* immediate completion; trigger callback ourselves */
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
-#if 0
- gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->buffer_offset=%x\n",
- __func__, result_tmp->buffer_offset);
-
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
- {
- /* This is the last write operation for this flow. Set sync
- * flag if needed
- */
- fs_config = PINT_config_get_filesystems(&server_config);
- }
-#endif
- fs_config = PINT_config_get_filesystems(&server_config);
-
- ret = job_trove_bstream_write_list(
- q_item->parent->dest.u.trove.coll_id,
- q_item->parent->dest.u.trove.handle,
- (char**)&q_item->buffer,
- (TROVE_size *)&q_item->buffer_used,
- 1,
- s_op->u.flow_write.offsets,
- s_op->u.flow_write.sizes,
- s_op->u.flow_write.segs,
- &q_item->out_size,
- fs_config->trove_sync_data,
- NULL,
- smcb,
- 0,
- js_p,
- &result_tmp->posted_id,
- server_job_context,
- q_item->parent->hints);
-
- if(ret < 0)
- {
- gossip_err("%s: I/O error occurred\n", __func__);
- //handle_io_error(ret, q_item, flow_data);
- /* FIXME ***************/
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
-
- if(ret == 1)
- {
- /* immediate completion; trigger callback ourselves */
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
- }
-
- /* FIXME: not sure if this is required */
- if(ret == 0) {
-
- return SM_ACTION_DEFERRED;
- }
+
+ /* FIXME: not sure if this is required */
+ if(ret == 0) {
+ return SM_ACTION_DEFERRED;
+ }
return SM_ACTION_COMPLETE;
}
@@ -262,16 +198,12 @@ static int check_done(struct PINT_smcb *
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry *result_tmp = &q_item->result_chain;
- struct result_chain_entry *old_result_tmp;
PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
js_p->error_code = 0;
- flow_data->total_bytes_processed += q_item->buffer_used;
q_item->parent->total_transferred += q_item->buffer_used;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
- flow_data->total_bytes_processed);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+ q_item->parent->total_transferred);
//if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
if(!segpool_done(h)) {
@@ -280,8 +212,6 @@ static int check_done(struct PINT_smcb *
}
else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
- flow_data->parent->state = FLOW_COMPLETE;
- FLOW_CLEANUP(flow_data);
}
return SM_ACTION_COMPLETE;
@@ -291,8 +221,8 @@ static int epilog(struct PINT_smcb *smcb
{
js_p->error_code = 0;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: write: \n", __func__);
return SM_ACTION_COMPLETE;
+ //return(server_state_machine_complete(smcb)); // why not this?
}
/*
More information about the Pvfs2-cvs
mailing list