[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm pvfs2-server.h read.sm rw-sm.h write.sm

CVS commit program cvs at parl.clemson.edu
Sat Apr 18 17:59:24 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv15808/src/server

Modified Files:
      Tag: as-branch
	io.sm pvfs2-server.h read.sm rw-sm.h write.sm 
Log Message:
Initial version of parallel read/write sms is implemented.



Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.2 -r1.73.6.3
--- io.sm	14 Apr 2009 22:18:53 -0000	1.73.6.2
+++ io.sm	18 Apr 2009 21:59:24 -0000	1.73.6.3
@@ -24,6 +24,8 @@
 extern struct PINT_state_machine_s pvfs2_read_sm;
 extern struct PINT_state_machine_s pvfs2_write_sm;
 
+#define NUM_OF_PARALLEL_RWS 8
+
 %%
 
 machine pvfs2_io_sm
@@ -156,7 +158,6 @@ static PINT_sm_action start_pipeline_sm(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    //int err = -PVFS_EIO;
     struct server_configuration_s *user_opts = get_server_config_struct();
     struct filesystem_configuration_s *fs_conf;
     struct PINT_server_op *flow_read_op;
@@ -270,48 +271,6 @@ static PINT_sm_action start_pipeline_sm(
     flow_data->parent = s_op->u.io.flow_d;
 
     /* setup the request processing states */
-#if 0 //////////////////
-    s_op->u.io.flow_d->file_req_state = 
-	PINT_new_request_state(s_op->u.io.flow_d->file_req);
-    if (!s_op->u.io.flow_d->file_req_state)
-    {
-	js_p->error_code = -PVFS_ENOMEM;
-	return SM_ACTION_COMPLETE;
-    }
-
-    /* only setup a memory datatype state if caller provided a memory datatype */
-    if(s_op->u.io.flow_d->mem_req)
-    {
-	s_op->u.io.flow_d->mem_req_state = 
-	    PINT_new_request_state(s_op->u.io.flow_d->mem_req);
-	if (!s_op->u.io.flow_d->mem_req_state)
-	{
-	    js_p->error_code = -PVFS_ENOMEM;
-	    return SM_ACTION_COMPLETE;
-	}
-    }
-
-    /* if a file datatype offset was specified, go ahead and skip ahead 
-     * before doing anything else
-     */
-    if(s_op->u.io.flow_d->file_req_offset)
-        PINT_REQUEST_STATE_SET_TARGET(s_op->u.io.flow_d->file_req_state,
-            s_op->u.io.flow_d->file_req_offset);
-
-    
-    /* set boundaries on file datatype */
-    if(s_op->u.io.flow_d->aggregate_size > -1)
-    {
-        PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
-            s_op->u.io.flow_d->aggregate_size+s_op->u.io.flow_d->file_req_offset);
-    }
-    else
-    {
-        PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
-            s_op->u.io.flow_d->file_req_offset +
-            PINT_REQUEST_TOTAL_BYTES(s_op->u.io.flow_d->mem_req));
-    }
-#endif //////////////////////
     gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
 		 s_op->u.io.flow_d->aggregate_size);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
@@ -354,16 +313,13 @@ static PINT_sm_action start_pipeline_sm(
 				PINT_SP_SERVER_READ,
 				&seg_handle);
 
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
-	//for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
-	for(i=0; i<1; i++) {
+	s_op->u.io.seg_handle = seg_handle;
+	for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
 	    PINT_segpool_register(seg_handle, &id);
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
 	    flow_data->prealloc_array[i].result_chain.q_item =
 		&flow_data->prealloc_array[i];
 	    flow_read_op = malloc(sizeof(*flow_read_op));
 	    memset(flow_read_op, 0, sizeof(*flow_read_op));
-	    memcpy(flow_read_op, s_op, sizeof(*flow_read_op));
 	    flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
 	    flow_read_op->u.flow_read.seg_handle = seg_handle;
 	    flow_read_op->u.flow_read.id = id;
@@ -372,7 +328,9 @@ static PINT_sm_action start_pipeline_sm(
 		js_p->error_code = -PVFS_ENOMEM;
 		return SM_ACTION_COMPLETE;
 	    }
-	    gossip_debug(GOSSIP_IO_DEBUG, "flow_read: ret=%d\n", ret);
+	    s_op->u.io.parallel_read_sms++;
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n", 
+			 __func__, s_op->u.io.parallel_read_sms);
 	}
     }
     else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
@@ -393,19 +351,15 @@ static PINT_sm_action start_pipeline_sm(
 				 PINT_SP_SERVER_WRITE,
 				 &seg_handle);
 
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
-	/* only post one outstanding recv at a time; easier to manage */
+	s_op->u.io.seg_handle = seg_handle;
 	/* FIXME: we will eventually want to post multiple of recvs */
-	//for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
-	for(i=0; i<1; i++) {
+	for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
 	    PINT_segpool_register(seg_handle, &id);
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
 	    flow_data->prealloc_array[i].result_chain.q_item = 
 		&flow_data->prealloc_array[i];
 	
 	    flow_write_op = malloc(sizeof(*flow_write_op));
 	    memset(flow_write_op, 0, sizeof(*flow_write_op));
-	    memcpy(flow_write_op, s_op, sizeof(*flow_write_op));
 	    flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
 	    flow_write_op->u.flow_write.seg_handle = seg_handle;
 	    flow_write_op->u.flow_write.id = id;
@@ -414,6 +368,9 @@ static PINT_sm_action start_pipeline_sm(
 		js_p->error_code = -PVFS_ENOMEM;
 		return SM_ACTION_COMPLETE;
 	    }
+	    s_op->u.io.parallel_write_sms++;
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n", 
+			 __func__, s_op->u.io.parallel_write_sms);
 	}
     }
 
@@ -479,6 +436,11 @@ static PINT_sm_action io_cleanup(
     if (s_op->u.io.flow_d)
     {
         PINT_flow_free(s_op->u.io.flow_d);
+    }
+
+    if (s_op->u.io.seg_handle) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: freeing seg_handle\n", __func__);
+	PINT_segpool_destroy(s_op->u.io.seg_handle);
     }
 
     /* let go of our encoded response buffer, if we appear to have

Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.1 -r1.155.6.2
--- pvfs2-server.h	14 Apr 2009 20:19:50 -0000	1.155.6.1
+++ pvfs2-server.h	18 Apr 2009 21:59:24 -0000	1.155.6.2
@@ -334,6 +334,10 @@ struct PINT_server_getconfig_op
 struct PINT_server_io_op
 {
     flow_descriptor* flow_d;
+    //io_pipeline* iop; /* substitute for flow_d */
+    int parallel_write_sms;
+    int parallel_read_sms;
+    PINT_segpool_handle_t seg_handle;
 };
 
 /* This is for flow state machine */
@@ -341,6 +345,8 @@ struct PINT_server_io_op
 struct PINT_server_flow_read_op
 {
     struct fp_queue_item *q_item;
+    void *buffer; /* substitute for q_item */
+    PVFS_size buffer_used; /* substitute for q_item */
     PINT_segpool_handle_t seg_handle;
     PINT_segpool_unit_id id;
     PVFS_offset *offsets;
@@ -353,6 +359,8 @@ struct PINT_server_flow_read_op
 struct PINT_server_flow_write_op
 {
     struct fp_queue_item *q_item;
+    void *buffer; /* substitute for q_item */
+    PVFS_size buffer_used; /* substitute for q_item */
     PINT_segpool_handle_t seg_handle;
     PINT_segpool_unit_id id;
     PVFS_offset *offsets;

Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- read.sm	14 Apr 2009 20:19:50 -0000	1.1.2.1
+++ read.sm	18 Apr 2009 21:59:24 -0000	1.1.2.2
@@ -62,48 +62,19 @@ static int trove_read_call(struct PINT_s
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
     PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
     PINT_segpool_unit_id id = s_op->u.flow_read.id;
-    void *tmp_buffer;
-    PVFS_size bytes_processed = 0;
-    int err = -PVFS_EIO;
     int ret;
-    struct server_configuration_s *user_opts = get_server_config_struct();
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
     int count;
     PVFS_offset *offsets;
     PVFS_size *sizes;
     PVFS_size bytes;
-    int tmp_id;
-
+    job_id_t tmp_id;
+    
     q_item->buffer_used = 0;
-#if 0
-    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-		 "%s: error_code: %d, initial_call_flag: %d, flow: %p.\n", 
-		 __func__, error_code, initial_call_flag,
-        flow_data->parent);
-
-    if(error_code != 0 || flow_data->parent->error_code != 0)
-    {
-        gossip_err("%s: I/O error occurred\n", __func__);
-        handle_io_error(error_code, q_item, flow_data);
-        if(flow_data->parent->state == FLOW_COMPLETE)
-            return(1);
-        else
-            return(0);
-    }
 
-    /* if there are no more receives to post, just return */
-    if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
-    {
-	js_p->error_code = 0;
-        return SM_ACTION_COMPLETE;
-    }
-#endif
     if(!q_item->buffer) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
         /* if the q_item has not been used, allocate a buffer */
@@ -115,8 +86,9 @@ static int trove_read_call(struct PINT_s
     }
     
     bytes = q_item->parent->buffer_size;
-    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%d, count=%d\n", __func__,
+    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
+			       &offsets, &sizes);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
 		 bytes, count);
     
     q_item->buffer_used = bytes;
@@ -126,76 +98,50 @@ static int trove_read_call(struct PINT_s
 
     if(count == 0) {
 	js_p->error_code = 0;
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
 	return SM_ACTION_COMPLETE;
     }
 
-#if 0
-    if(bytes == 0) {        
-	if(flow_data->parent->total_transferred ==
-	   flow_data->total_bytes_processed &&
-	   PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
-	    assert(q_item->parent->state != FLOW_COMPLETE);
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
-	    q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
-	}
-	js_p->error_code = 0;
-	return SM_ACTION_COMPLETE;
-    }
-#endif    
+    fs_config = PINT_config_get_filesystems(&server_config);
 
-    //do{
-        //assert(q_item->buffer_used);
-        //assert(result_tmp->result.bytes);
-        //result_tmp->q_item = q_item;
-        /* XXX: can someone confirm this avoids a segfault in the immediate
-         * completion case? */
-	//assert(result_tmp->result.bytes);
-
-	fs_config = PINT_config_get_filesystems(&server_config);
-
-        ret = job_trove_bstream_read_list(
-            q_item->parent->src.u.trove.coll_id,
-            q_item->parent->src.u.trove.handle,
-            //(char**)&result_tmp->buffer_offset,
-	    &q_item->buffer,
-	    &q_item->buffer_used,
-            1,
-	    offsets,
-	    sizes,
-	    count,
-            &q_item->out_size,
-	    fs_config->trove_sync_data,
-            NULL,
-	    smcb,
-	    0,
-	    js_p,
-	    &tmp_id,
-	    server_job_context,
-            q_item->parent->hints);
+    ret = job_trove_bstream_read_list(
+				      q_item->parent->src.u.trove.coll_id,
+				      q_item->parent->src.u.trove.handle,
+				      &q_item->buffer,
+				      (PVFS_size *)&q_item->buffer_used,
+				      1,
+				      offsets,
+				      sizes,
+				      count,
+				      &q_item->out_size,
+				      fs_config->trove_sync_data,
+				      NULL,
+				      smcb,
+				      0,
+				      js_p,
+				      &tmp_id,
+				      server_job_context,
+				      q_item->parent->hints);
 
-        if(ret < 0)
-        {
-            gossip_err("%s: I/O error occurred\n", __func__);
-	    /* FIXME */
-            //handle_io_error(ret, q_item, flow_data);
-            if(flow_data->parent->state == FLOW_COMPLETE)
-                return (1);
-            else
-                return (0);
-        }
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
+    
+    if(ret < 0) {
+	gossip_err("%s: I/O error occurred\n", __func__);
+	/* FIXME */
+	//handle_io_error(ret, q_item, flow_data);
+	js_p->error_code = -PVFS_EIO;
+	return SM_ACTION_COMPLETE;
+    }
 
-        if(ret == 1)
-        {
-            /* immediate completion; trigger callback ourselves */
-	    js_p->error_code = 1;
-	    return SM_ACTION_COMPLETE;
-        }
+    if(ret == 1) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
 
-	if(ret == 0) {
-	    js_p->error_code = 0;
-	    return SM_ACTION_DEFERRED;
-	}
-	//} while(result_tmp);
+    if(ret == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_DEFERRED;
+    }
 
     return SM_ACTION_COMPLETE;
 }
@@ -205,53 +151,17 @@ static int bmi_send_call(struct PINT_smc
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
     struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
-    int err = -PVFS_EIO;
-    int done = 0;
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
-    PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
-
-#if 0
-    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-		 "%s, error_code: %d, flow: %p.\n", __func__,
-		 error_code, flow_data->parent);
-
-    result_tmp->posted_id = 0;
-
-    if(error_code != 0 || flow_data->parent->error_code != 0)
-    {
-        gossip_err("%s: I/O error occurred\n", __func__);
-        handle_io_error(error_code, q_item, flow_data);
-        return;
-    }
-#endif
 
     if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
 	js_p->error_code = 0;
-	return SM_ACTION_COMPLETE;
-    }
-#if 0
-    if(q_item->parent->state == FLOW_COMPLETE) {
-	js_p->error_code = 0;
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
 	return SM_ACTION_COMPLETE;
     }
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->result.bytes=%ld\n", 
-		 __func__, result_tmp->result.bytes);
-
-    result_tmp = &q_item->result_chain;
-    do{
-        old_result_tmp = result_tmp;
-        result_tmp = result_tmp->next;
-        if(old_result_tmp != &q_item->result_chain)
-            free(old_result_tmp);
-    } while(result_tmp);
-    q_item->result_chain.next = NULL;
-#endif
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
 		 q_item->buffer_used);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
 		 (char *)q_item->buffer);
@@ -280,7 +190,7 @@ static int bmi_send_call(struct PINT_smc
 			       js_p,
 			       &tmp_id,
 			       server_job_context,
-			       user_opts->server_job_flow_timeout,
+			       user_opts->server_job_bmi_timeout,
 			       (bmi_hint)q_item->parent->hints);
 
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
@@ -317,29 +227,25 @@ static int check_done(struct PINT_smcb *
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
     PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
     js_p->error_code = 0;
     
-    flow_data->total_bytes_processed += q_item->buffer_used;
     q_item->parent->total_transferred += q_item->buffer_used;
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
-		 flow_data->total_bytes_processed);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+		 q_item->parent->total_transferred);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
 		 s_op->u.flow_read.segs);
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: handle=%d\n", __func__, h);
-    //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
-    if(!segpool_done(h)) {
+    /* FIXME: */
+    /* unless the second condition is set, the server starts
+       a new read request to one already done, and falls into
+       infinite trove_read->bmi_send->check_done loop */
+    if(!segpool_done(h) && s_op->u.flow_read.segs != 0) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
 	js_p->error_code = LOOP;
     }
     else {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: DONE\n", __func__);
-	flow_data->parent->state = FLOW_COMPLETE;
-	FLOW_CLEANUP(flow_data);
     }
     
     return SM_ACTION_COMPLETE;
@@ -347,10 +253,17 @@ static int check_done(struct PINT_smcb *
 
 static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
 {
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
     js_p->error_code = 0;
 
+    if(q_item->buffer) {
+	BMI_memfree(q_item->parent->src.u.bmi.address, 
+		    q_item->buffer, q_item->parent->buffer_size, BMI_SEND);
+    }
     gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
     return SM_ACTION_COMPLETE;
+    //return(server_state_machine_complete(smcb)); // why not this?
 }
 
 static void do_comp(struct fp_queue_item *q_item)

Index: rw-sm.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/rw-sm.h,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- rw-sm.h	14 Apr 2009 20:21:42 -0000	1.1.2.1
+++ rw-sm.h	18 Apr 2009 21:59:24 -0000	1.1.2.2
@@ -19,6 +19,7 @@ do {                                    
     free(__flow_data);                                                \
 } while(0)
 
+
 struct result_chain_entry
 {
     PVFS_id_gen_t posted_id;

Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- write.sm	14 Apr 2009 22:18:53 -0000	1.1.2.2
+++ write.sm	18 Apr 2009 21:59:24 -0000	1.1.2.3
@@ -61,13 +61,8 @@ static PINT_sm_action bmi_recv_call(stru
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
     PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
     PINT_segpool_unit_id id = s_op->u.flow_write.id;
-    PVFS_size bytes_processed = 0;
-    void *tmp_buffer;
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
@@ -77,26 +72,7 @@ static PINT_sm_action bmi_recv_call(stru
     PVFS_size bytes;
 
     q_item->buffer_used = 0;
-#if 0
-    /* FIXME: not sure whether we need to check this 
-     * for now just commented out */
-    if(js_p->error_code != 0 || flow_data->parent->error_code != 0)
-    {
-        gossip_err("%s: I/O error occurred\n", __func__);
-        //handle_io_error(error_code, q_item, flow_data);
-	js_p->error_code = error_code;
-        return SM_ACTION_COMPLETE;
-    }
-#endif
-    
-#if 0
-    /* if there are no more receives to post, just return */
-    if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
-    {
-	js_p->error_code = 0;
-        return SM_ACTION_COMPLETE;
-    }
-#endif
+    
     if(!q_item->buffer){
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(
@@ -107,8 +83,10 @@ static PINT_sm_action bmi_recv_call(stru
     }
 
     bytes = q_item->parent->buffer_size;
-    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%d\n", __func__, count, bytes);
+    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
+			       &offsets, &sizes);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n", 
+		 __func__, count, bytes);
     q_item->buffer_used = bytes;
     s_op->u.flow_write.offsets = offsets;
     s_op->u.flow_write.sizes = sizes;
@@ -118,25 +96,6 @@ static PINT_sm_action bmi_recv_call(stru
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
-#if 0
-    if(count == 0) {
-	if(flow_data->parent->total_transferred ==
-	   flow_data->total_bytes_processed &&
-	   PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
-	    /* never reach this point */
-	    assert(q_item->parent->state != FLOW_COMPLETE);
-	    q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
-	}
-	js_p->error_code = 0;
-	return SM_ACTION_COMPLETE;
-    }
-
-    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
-		 "offset %llu, buffer ptr: %p\n",
-		 llu(q_item->result_chain.result.offset_array[0]),
-		 q_item->buffer);
-#endif
     
     /* TODO: what if we recv less than expected? */
     ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
@@ -161,7 +120,6 @@ static PINT_sm_action bmi_recv_call(stru
     }
 
     if(ret == 1) {
-	/* immediate completion; trigger callback ourselves */
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
@@ -178,82 +136,60 @@ static PINT_sm_action trove_write_call(s
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    PVFS_size tmp_actual_size;
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
-    PVFS_size bytes_processed = 0;
-    void *tmp_buffer;
-    void *tmp_user_ptr;
     int ret;
-    struct server_configuration_s *user_opts = get_server_config_struct();
+    job_id_t tmp_id;
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
-    PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__, 
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__, 
 		 q_item->buffer_used);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
 		 (char *)q_item->buffer);
 
-    if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+    if(s_op->u.flow_write.segs == 0 && q_item->buffer_used == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+    
+    fs_config = PINT_config_get_filesystems(&server_config);
+    
+    ret = job_trove_bstream_write_list(
+				       q_item->parent->dest.u.trove.coll_id,
+				       q_item->parent->dest.u.trove.handle,
+				       (char**)&q_item->buffer,
+				       (TROVE_size *)&q_item->buffer_used,
+				       1,
+				       s_op->u.flow_write.offsets,
+				       s_op->u.flow_write.sizes,
+				       s_op->u.flow_write.segs,
+				       &q_item->out_size,
+				       fs_config->trove_sync_data,
+				       NULL,
+				       smcb,
+				       0,
+				       js_p,
+				       &tmp_id,
+				       server_job_context,
+				       q_item->parent->hints);
+
+    if(ret < 0) {
+	gossip_err("%s: I/O error occurred\n", __func__);
+	//handle_io_error(ret, q_item, flow_data);
+	/* FIXME ***************/
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
+    }
+
+    if(ret == 1) {
+	/* immediate completion; trigger callback ourselves */
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
-#if 0
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->buffer_offset=%x\n", 
-			__func__, result_tmp->buffer_offset);
-
-        if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
-        {
-            /* This is the last write operation for this flow.  Set sync
-             * flag if needed
-             */ 
-	    fs_config = PINT_config_get_filesystems(&server_config);
-        }
-#endif
-	fs_config = PINT_config_get_filesystems(&server_config);
-
-        ret = job_trove_bstream_write_list(
-            q_item->parent->dest.u.trove.coll_id,
-            q_item->parent->dest.u.trove.handle,
-            (char**)&q_item->buffer,
-            (TROVE_size *)&q_item->buffer_used,
-            1,
-	    s_op->u.flow_write.offsets,
-	    s_op->u.flow_write.sizes,
-	    s_op->u.flow_write.segs,
-            &q_item->out_size,
-	    fs_config->trove_sync_data,
-            NULL,
-            smcb,
-	    0,
-	    js_p,
-            &result_tmp->posted_id,
-	    server_job_context,
-            q_item->parent->hints);
-
-        if(ret < 0)
-        {
-            gossip_err("%s: I/O error occurred\n", __func__);
-            //handle_io_error(ret, q_item, flow_data);
-	    /* FIXME ***************/
-	    js_p->error_code = -PVFS_ENOMEM;
-            return SM_ACTION_COMPLETE;
-        }
-
-        if(ret == 1)
-        {
-            /* immediate completion; trigger callback ourselves */
-	    js_p->error_code = 0;
-	    return SM_ACTION_COMPLETE;
-        }
-
-	/* FIXME: not sure if this is required */
-	if(ret == 0) {
-  
-	    return SM_ACTION_DEFERRED;
-	}
+
+    /* FIXME: not sure if this is required */
+    if(ret == 0) {
+	return SM_ACTION_DEFERRED;
+    }
 
     return SM_ACTION_COMPLETE;
 }
@@ -262,16 +198,12 @@ static int check_done(struct PINT_smcb *
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry *result_tmp = &q_item->result_chain;
-    struct result_chain_entry *old_result_tmp;
     PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
     js_p->error_code = 0;
 
-    flow_data->total_bytes_processed += q_item->buffer_used;
     q_item->parent->total_transferred += q_item->buffer_used;
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
-		 flow_data->total_bytes_processed);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+		 q_item->parent->total_transferred);
 
     //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
     if(!segpool_done(h)) {
@@ -280,8 +212,6 @@ static int check_done(struct PINT_smcb *
     }
     else {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
-	flow_data->parent->state = FLOW_COMPLETE;
-	FLOW_CLEANUP(flow_data);
     }
     
     return SM_ACTION_COMPLETE;
@@ -291,8 +221,8 @@ static int epilog(struct PINT_smcb *smcb
 {
     js_p->error_code = 0;
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: write: \n", __func__);
     return SM_ACTION_COMPLETE;
+    //return(server_state_machine_complete(smcb)); // why not this?
 }
 
 /*



More information about the Pvfs2-cvs mailing list