[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm pvfs2-server.h read.sm write.sm rw-sm.h

CVS commit program cvs at parl.clemson.edu
Tue Apr 28 12:17:01 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv17589/src/server

Modified Files:
      Tag: as-branch
	io.sm pvfs2-server.h read.sm write.sm 
Removed Files:
      Tag: as-branch
	rw-sm.h 
Log Message:
This file has been renamed to pipeline.h.



Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.5 -r1.73.6.6
--- io.sm	22 Apr 2009 19:58:42 -0000	1.73.6.5
+++ io.sm	28 Apr 2009 16:17:00 -0000	1.73.6.6
@@ -19,12 +19,12 @@
 #include "pint-request.h"
 #include "pvfs2-internal.h"
 #include "pint-segpool.h"
-#include "rw-sm.h"
+#include "pipeline.h"
 
 extern struct PINT_state_machine_s pvfs2_read_sm;
 extern struct PINT_state_machine_s pvfs2_write_sm;
 
-#define NUM_OF_PARALLEL_RWS 8
+#define NUM_OF_PARALLEL_SMS 8
 
 %%
 
@@ -160,11 +160,11 @@ static PINT_sm_action start_pipelining_s
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct server_configuration_s *user_opts = get_server_config_struct();
     struct filesystem_configuration_s *fs_conf;
-    struct PINT_server_op *rwsm_op;
+    struct PINT_server_op *pipeline_op;
     int i, ret; 
     PINT_segpool_handle_t seg_handle;
-        
-    s_op->u.io.hints = s_op->req->hints;
+
+    s_op->u.io.parallel_sms = 0;
     s_op->u.io.total_transferred = 0;
 
     /* we still have the file size stored in the response structure 
@@ -194,7 +194,6 @@ static PINT_sm_action start_pipelining_s
     s_op->u.io.file_req_offset = s_op->req->u.io.file_req_offset;
     s_op->u.io.mem_req = NULL;
     s_op->u.io.aggregate_size = s_op->req->u.io.aggregate_size;
-    s_op->u.io.tag = s_op->tag;
     gossip_debug(GOSSIP_IO_DEBUG, "%s: tag=%d\n", __func__, s_op->tag);
     s_op->u.io.user_ptr = NULL;
     s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */
@@ -226,26 +225,6 @@ static PINT_sm_action start_pipelining_s
         lld(s_op->u.io.aggregate_size),
         llu(s_op->req->u.io.handle));
 
-    /* FIXME: loop bodies are same, need to remove if_else??? */
-    if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
-    {
-        s_op->u.io.address = s_op->addr;
-        s_op->u.io.handle = s_op->req->u.io.handle;
-        s_op->u.io.coll_id = s_op->req->u.io.fs_id;
-    }
-    else if (s_op->req->u.io.io_type == PVFS_IO_READ)
-    {
-        s_op->u.io.handle = s_op->req->u.io.handle;
-        s_op->u.io.coll_id = s_op->req->u.io.fs_id;
-        s_op->u.io.address = s_op->addr;
-    }
-    else
-    {
-        gossip_lerr("Server: IO SM: unknown IO type requested.\n");
-        js_p->error_code = -PVFS_EINVAL;
-        return SM_ACTION_COMPLETE;
-    }
-
     /* setup the request processing states */
     gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
 		 s_op->u.io.aggregate_size);
@@ -255,9 +234,9 @@ static PINT_sm_action start_pipelining_s
     if(s_op->u.io.buffer_size < 1)
         s_op->u.io.buffer_size = BUFFER_SIZE;
     if(s_op->u.io.num_of_buffers < 1)
-        s_op->u.io.num_of_buffers = BUFFERS_PER_PIPELINING;
+        s_op->u.io.num_of_buffers = NUM_OF_PARALLEL_SMS;
 
-    PINT_segpool_unit_id id;
+    PINT_segpool_unit_id id[s_op->u.io.num_of_buffers];
 
     PINT_dist_lookup(s_op->u.io.file_data.dist);
     ret = PINT_segpool_init(s_op->u.io.mem_req,
@@ -273,24 +252,53 @@ static PINT_sm_action start_pipelining_s
 			    &seg_handle);
 
     s_op->u.io.seg_handle = seg_handle;
+    gen_mutex_init(&s_op->u.io.mutex);
     for(i=0; i<s_op->u.io.num_of_buffers; i++) {
-	PINT_segpool_register(seg_handle, &id);
-	rwsm_op = malloc(sizeof(*rwsm_op));
-	memset(rwsm_op, 0, sizeof(*rwsm_op));
-	rwsm_op->u.rwsm.seg_handle = seg_handle;
-	rwsm_op->u.rwsm.id = id;
-	rwsm_op->u.rwsm.parent = s_op;
+	PINT_segpool_register(seg_handle, &id[i]);
+	pipeline_op = malloc(sizeof(*pipeline_op));
+	memset(pipeline_op, 0, sizeof(*pipeline_op));
+
+	pipeline_op->u.pipeline.address = s_op->addr;
+	pipeline_op->u.pipeline.handle = s_op->req->u.io.handle;
+        pipeline_op->u.pipeline.coll_id = s_op->req->u.io.fs_id;
+
+	pipeline_op->u.pipeline.seg_handle = seg_handle;
+	pipeline_op->u.pipeline.id = id[i];
+	pipeline_op->u.pipeline.parent = s_op;
+	pipeline_op->u.pipeline.hints = s_op->req->hints;
+	pipeline_op->u.pipeline.tag = s_op->tag;
+	pipeline_op->u.pipeline.buffer_size = BUFFER_SIZE;
 	if(s_op->req->u.io.io_type == PVFS_IO_READ) {
-	    ret = PINT_sm_push_frame(smcb, DO_READ, rwsm_op);
-	    s_op->u.io.parallel_read_sms++;
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n", 
-			 __func__, s_op->u.io.parallel_read_sms);
+	    if(!pipeline_op->u.pipeline.buffer) {
+		/* if the buffer has not been used, allocate a buffer */
+		pipeline_op->u.pipeline.buffer = 
+		    BMI_memalloc(pipeline_op->u.pipeline.address,
+				 pipeline_op->u.pipeline.buffer_size, 
+				 BMI_SEND);
+		/* TODO: error handling */
+		assert(pipeline_op->u.pipeline.buffer);
+	    }
+
+	    ret = PINT_sm_push_frame(smcb, DO_READ, pipeline_op);
+	    s_op->u.io.parallel_sms++;
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n", 
+			 __func__, s_op->u.io.parallel_sms);
 	}
 	else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
-	    ret = PINT_sm_push_frame(smcb, DO_WRITE, rwsm_op);
-	    s_op->u.io.parallel_write_sms++;
-	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n", 
-			 __func__, s_op->u.io.parallel_write_sms);
+	    if(!pipeline_op->u.pipeline.buffer){
+		/* if the buffer has not been used, allocate a buffer */
+		pipeline_op->u.pipeline.buffer = 
+		    BMI_memalloc(pipeline_op->u.pipeline.address,
+				 pipeline_op->u.pipeline.buffer_size, 
+				 BMI_RECV);
+		/* TODO: error handling */
+		assert(pipeline_op->u.pipeline.buffer);
+	    }
+
+	    ret = PINT_sm_push_frame(smcb, DO_WRITE, pipeline_op);
+	    s_op->u.io.parallel_sms++;
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n", 
+			 __func__, s_op->u.io.parallel_sms);
 	}
 	if(ret < 0) {
 	    js_p->error_code = -PVFS_ENOMEM;
@@ -353,6 +361,11 @@ static PINT_sm_action io_cleanup(
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     char status_string[64] = {0};
+    int i;
+    struct PINT_server_op *pipeline_op;
+    int task_id;
+    int remaining;
+    PVFS_error tmp_err;
 
     PVFS_strerror_r(s_op->resp.status, status_string, 64);
     PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "finish (%s)\n", status_string);
@@ -362,6 +375,28 @@ static PINT_sm_action io_cleanup(
 	PINT_segpool_destroy(s_op->u.io.seg_handle);
     }
 
+    for(i=0; i<s_op->u.io.parallel_sms; i++)
+    {
+	pipeline_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err, 
+            &remaining);
+        gossip_debug(GOSSIP_SERVER_DEBUG, 
+		     "io: nested sm returned error code: %d\n", tmp_err);
+	if(pipeline_op->u.pipeline.buffer) {
+	    if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+		BMI_memfree(pipeline_op->u.pipeline.address, 
+			    pipeline_op->u.pipeline.buffer, 
+			    s_op->u.io.buffer_size, BMI_SEND);
+	    }
+	    else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+		BMI_memfree(pipeline_op->u.pipeline.address, 
+			    pipeline_op->u.pipeline.buffer, 
+			    s_op->u.io.buffer_size, BMI_RECV);
+	    }
+	}
+	free(pipeline_op);
+    }
+
+    gen_mutex_destroy(&s_op->u.io.mutex);
     /* let go of our encoded response buffer, if we appear to have
      * made one
      */
@@ -398,7 +433,9 @@ static PINT_sm_action io_send_completion
     int err = -PVFS_EIO;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
-        
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: s_op->u.io.parallel_sms=%d\n",
+		 __func__, s_op->u.io.parallel_sms);
     /* we only send this trailing ack if we are working on a write
      * operation; otherwise just cut out early
      */
@@ -406,11 +443,11 @@ static PINT_sm_action io_send_completion
     {
 	/* normal READ (i.e., without op and datatype) */
 	if(s_op->req->u.io.op == 0) { /* AS */
-            gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack()?\n");
+            gossip_debug(GOSSIP_IO_DEBUG, "%s: normal READ\n", __func__);
             js_p->error_code = 0;
             return SM_ACTION_COMPLETE;
         }
-        gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), IO_READ with op\n");
+        gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_READ with op\n", __func__);
     }
 
     /* release encoding of the first ack that we sent */

Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.3 -r1.155.6.4
--- pvfs2-server.h	20 Apr 2009 21:10:50 -0000	1.155.6.3
+++ pvfs2-server.h	28 Apr 2009 16:17:00 -0000	1.155.6.4
@@ -333,15 +333,12 @@ struct PINT_server_getconfig_op
 
 struct PINT_server_io_op
 {
-    //void *parent;
-    PVFS_fs_id coll_id;
-    PVFS_handle handle;
-    PVFS_BMI_addr_t address;
-
+    gen_mutex_t mutex;
+    PINT_segpool_handle_t seg_handle;
     PINT_Request *file_req;
     PVFS_offset file_req_offset;
     PINT_Request *mem_req;
-    PVFS_msg_tag_t tag;
+    
     void *user_ptr;
 
     int op;
@@ -358,26 +355,28 @@ struct PINT_server_io_op
     
     PVFS_size total_transferred;
 
-    int parallel_write_sms;
-    int parallel_read_sms;
-    PINT_segpool_handle_t seg_handle;
-    
-    PVFS_hint hints;
+    int parallel_sms;
 };
 
 /* substibute for flow */
-struct PINT_server_rwsm_op
+struct PINT_server_pipeline_op
 {
+    PVFS_fs_id coll_id;
+    PVFS_handle handle;
+    PVFS_BMI_addr_t address;
+
     void *parent;
-    void *buffer; 
+    char *buffer; 
+    PVFS_size buffer_size;
     PVFS_size buffer_used; 
     PVFS_size out_size;
     PINT_segpool_handle_t seg_handle;
     PINT_segpool_unit_id id;
     PVFS_offset *offsets;
-    PINT_Request_state *file_req_state;
     PVFS_size *sizes;
     int segs;
+    PVFS_hint hints;
+    PVFS_msg_tag_t tag;
 };
  
 struct PINT_server_small_io_op
@@ -515,7 +514,7 @@ typedef struct PINT_server_op
 	struct PINT_server_rmdirent_op rmdirent;
 	struct PINT_server_io_op io;
         struct PINT_server_small_io_op small_io;
-	struct PINT_server_rwsm_op rwsm;
+	struct PINT_server_pipeline_op pipeline;
 	struct PINT_server_flush_op flush;
 	struct PINT_server_truncate_op truncate;
 	struct PINT_server_mkdir_op mkdir;

Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.5 -r1.1.2.6
--- read.sm	22 Apr 2009 19:58:42 -0000	1.1.2.5
+++ read.sm	28 Apr 2009 16:17:00 -0000	1.1.2.6
@@ -18,7 +18,7 @@
 #include "pint-distribution.h"
 #include "pint-request.h"
 #include "pvfs2-internal.h"
-#include "rw-sm.h"
+#include "pipeline.h"
 #include "trove.h"
 
 static void do_comp(struct PINT_server_op *);
@@ -58,11 +58,11 @@ nested machine pvfs2_read_sm
 /*
  * PINT_process_request() -> job_trove_bstream_read_list()
  */
-static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
-    PINT_segpool_unit_id id = s_op->u.rwsm.id;
+    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.pipeline.id;
     int ret;
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
@@ -71,30 +71,15 @@ static int trove_read_call(struct PINT_s
     PVFS_size *sizes;
     PVFS_size bytes;
     job_id_t tmp_id;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
     
-    s_op->u.rwsm.buffer_used = 0;
+    s_op->u.pipeline.buffer_used = 0;
+    s_op->u.pipeline.segs = 0;
+    bytes = s_op->u.pipeline.buffer_size;
 
-    if(!s_op->u.rwsm.buffer) {
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: !buffer\n", __func__);
-        /* if the buffer has not been used, allocate a buffer */
-        s_op->u.rwsm.buffer = BMI_memalloc(
-	    parent_s_op->u.io.address,
-            parent_s_op->u.io.buffer_size, BMI_SEND);
-        /* TODO: error handling */
-        assert(s_op->u.rwsm.buffer);
-    }
-    
-    bytes = parent_s_op->u.io.buffer_size;
     PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
 			       &offsets, &sizes);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
 		 bytes, count);
-    
-    s_op->u.rwsm.buffer_used = bytes;
-    s_op->u.rwsm.offsets = offsets;
-    s_op->u.rwsm.sizes = sizes;
-    s_op->u.rwsm.segs = count;
 
     if(count == 0) {
 	js_p->error_code = 0;
@@ -102,29 +87,50 @@ static int trove_read_call(struct PINT_s
 	return SM_ACTION_COMPLETE;
     }
 
-    fs_config = PINT_config_get_filesystems(&server_config);
+    s_op->u.pipeline.buffer_used = bytes;
+    s_op->u.pipeline.offsets = offsets;
+    s_op->u.pipeline.sizes = sizes;
+    s_op->u.pipeline.segs = count;
+
+    /* figure out if the fs config has trove data sync turned on or off
+     */
+    server_config = get_server_config_struct();
+    if(!server_config)
+    {
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
+    
+    fs_config = PINT_config_find_fs_id(
+        server_config, s_op->u.pipeline.coll_id);
+    if(!fs_config)
+    {
+        gossip_err("%s: Failed to get filesystem "
+                   "config from fs_id of: %d\n", __func__,
+                   s_op->u.pipeline.coll_id);
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
 
     ret = job_trove_bstream_read_list(
-				      parent_s_op->u.io.coll_id,
-				      parent_s_op->u.io.handle,
-				      (char **)&s_op->u.rwsm.buffer,
-				      (PVFS_size *)&s_op->u.rwsm.buffer_used,
+				      s_op->u.pipeline.coll_id,
+				      s_op->u.pipeline.handle,
+				      (char **)&s_op->u.pipeline.buffer,
+				      (PVFS_size *)&s_op->u.pipeline.buffer_used,
 				      1,
 				      offsets,
 				      sizes,
 				      count,
-				      &s_op->u.rwsm.out_size,
-				      fs_config->trove_sync_data,
+				      &s_op->u.pipeline.out_size,
+				      (fs_config->trove_sync_data ? TROVE_SYNC : 0),
 				      NULL,
 				      smcb,
 				      0,
 				      js_p,
 				      &tmp_id,
 				      server_job_context,
-				      parent_s_op->u.io.hints);
+				      s_op->u.pipeline.hints);
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
-    
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
 	/* FIXME */
@@ -147,30 +153,32 @@ static int trove_read_call(struct PINT_s
     return SM_ACTION_COMPLETE;
 }
 
-static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
 
-    if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
+    if(s_op->u.pipeline.segs == 0) {
 	js_p->error_code = 0;
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
 	return SM_ACTION_COMPLETE;
     }
 
     gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
-		 s_op->u.rwsm.buffer_used);
+		 s_op->u.pipeline.buffer_used);
+#if 0
     gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
-		 (char *)s_op->u.rwsm.buffer);
+		 (char *)s_op->u.pipeline.buffer);
+#endif
     /***************************************************/
     if(parent_s_op->u.io.op != 0)
 	do_comp(s_op); 
     /***************************************************/
 
-    assert(s_op->u.rwsm.buffer_used);
+    assert(s_op->u.pipeline.buffer_used);
     if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
 	ret = 1; /* AS: skip sending if op is specified */ 
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
@@ -179,10 +187,12 @@ static int bmi_send_call(struct PINT_smc
     } 
     else 
 	/* FIXME: what should be a proper value for status_user_tag? */
-	ret = job_bmi_send(parent_s_op->u.io.address,
-			   s_op->u.rwsm.buffer,
-			   s_op->u.rwsm.buffer_used,
-			   parent_s_op->u.io.tag,
+	/* FIXME: 3rd parameter, if s_op->u.pipeline.out_size is used,
+	   client might complain about size mismatch */
+	ret = job_bmi_send(s_op->u.pipeline.address,
+			   s_op->u.pipeline.buffer,
+			   js_p->actual_size, 
+			   s_op->u.pipeline.tag,
 			   BMI_PRE_ALLOC,
 			   0, /* send_unexpected */
 			   smcb, /* user_ptr */
@@ -191,7 +201,7 @@ static int bmi_send_call(struct PINT_smc
 			   &tmp_id,
 			   server_job_context,
 			   user_opts->server_job_bmi_timeout,
-			   (bmi_hint)parent_s_op->u.io.hints);
+			   (bmi_hint)s_op->u.pipeline.hints);
 
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
@@ -215,24 +225,29 @@ static int bmi_send_call(struct PINT_smc
     return SM_ACTION_COMPLETE;
 }
 
-static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
+    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
     js_p->error_code = 0;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
-    
-    parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+    gen_mutex_lock(&parent_s_op->u.io.mutex);
+    parent_s_op->u.io.total_transferred += js_p->actual_size;
+    gen_mutex_unlock(&parent_s_op->u.io.mutex);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
 		 parent_s_op->u.io.total_transferred);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
-		 s_op->u.rwsm.segs);
+		 s_op->u.pipeline.segs);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+		 js_p->actual_size);
 
+    
     /* FIXME: */
     /* unless the second condition is set, the server starts
        a new read request to one already done, and falls into
        infinite trove_read->bmi_send->check_done loop */
-    if(!segpool_done(h) && s_op->u.rwsm.segs != 0) {
+    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
 	js_p->error_code = LOOP;
     }
@@ -243,39 +258,31 @@ static int check_done(struct PINT_smcb *
     return SM_ACTION_COMPLETE;
 }
 
-static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     js_p->error_code = 0;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
-
-    if(s_op->u.rwsm.buffer) {
-	BMI_memfree(parent_s_op->u.io.address, s_op->u.rwsm.buffer, 
-		    parent_s_op->u.io.buffer_size, BMI_SEND);
-    }
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
     return SM_ACTION_COMPLETE;
 }
 
 static void do_comp(struct PINT_server_op *s_op)
 {
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
 
-    if(s_op->u.rwsm.buffer) {
+    if(s_op->u.pipeline.buffer) {
 	PVFS_size i;
 	gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
 		     "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n", 
-		     __func__, s_op->u.rwsm.buffer_used, 
+		     __func__, s_op->u.pipeline.buffer_used, 
 		     parent_s_op->u.io.op, 
 		     parent_s_op->u.io.datatype); /* AS */
 
 	switch(parent_s_op->u.io.datatype) {
 	case ((int)0x4c000405): /* MPI_INT */
 	    {
-		int *a = s_op->u.rwsm.buffer;
+		int *a = s_op->u.pipeline.buffer;
 		int result;
-		PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_INT).ub);
+		PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_INT).ub);
 		int *tmp;
 		
 		if (parent_s_op->u.io.total_transferred == 0) {
@@ -329,16 +336,16 @@ static void do_comp(struct PINT_server_o
 		default:
 		    break;
 		}
-		s_op->u.rwsm.buffer = (void *)a;
+		s_op->u.pipeline.buffer = (void *)a;
 		parent_s_op->u.io.tmp_buffer = (void *)tmp;
 	    }
 	    break;
 
 	case ((int)0x4c00080b): /* MPI_DOUBLE */
 	    {
-		double *a = s_op->u.rwsm.buffer;
+		double *a = s_op->u.pipeline.buffer;
 		double result;
-		PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_DOUBLE).ub);
+		PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
 		double *tmp;
 
 		if (parent_s_op->u.io.total_transferred == 0) {
@@ -396,7 +403,7 @@ static void do_comp(struct PINT_server_o
 		default:
 		    break;
 		} /* end inner switch */
-		s_op->u.rwsm.buffer = (void *)a;
+		s_op->u.pipeline.buffer = (void *)a;
 		parent_s_op->u.io.tmp_buffer = (void *)tmp;
 	    }
 	    

Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.4 -r1.1.2.5
--- write.sm	20 Apr 2009 21:10:50 -0000	1.1.2.4
+++ write.sm	28 Apr 2009 16:17:00 -0000	1.1.2.5
@@ -18,7 +18,7 @@
 #include "pint-distribution.h"
 #include "pint-request.h"
 #include "pvfs2-internal.h"
-#include "rw-sm.h"
+#include "pipeline.h"
 #include "trove.h"
 #include "pint-segpool.h"
 
@@ -60,8 +60,8 @@ nested machine pvfs2_write_sm
 static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
-    PINT_segpool_unit_id id = s_op->u.rwsm.id;
+    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.pipeline.id;
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
@@ -69,47 +69,37 @@ static PINT_sm_action bmi_recv_call(stru
     PVFS_offset *offsets;
     PVFS_size *sizes;
     PVFS_size bytes;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    s_op->u.rwsm.buffer_used = 0;
-    
-    if(!s_op->u.rwsm.buffer){
-        /* if the buffer has not been used, allocate a buffer */
-        s_op->u.rwsm.buffer = BMI_memalloc(
-            parent_s_op->u.io.address,
-            parent_s_op->u.io.buffer_size, BMI_RECV);
-        /* TODO: error handling */
-        assert(s_op->u.rwsm.buffer);
-    }
+    s_op->u.pipeline.buffer_used = 0;
+    bytes = s_op->u.pipeline.buffer_size;
 
-    bytes = parent_s_op->u.io.buffer_size;
     PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
 			       &offsets, &sizes);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n", 
 		 __func__, count, bytes);
-    s_op->u.rwsm.buffer_used = bytes;
-    s_op->u.rwsm.offsets = offsets;
-    s_op->u.rwsm.sizes = sizes;
-    s_op->u.rwsm.segs = count;
 
     if(count == 0) {
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
-    
-    /* TODO: what if we recv less than expected? */
-    ret = job_bmi_recv(parent_s_op->u.io.address,
-		       (char *)s_op->u.rwsm.buffer,
-		       parent_s_op->u.io.buffer_size,
-		       parent_s_op->u.io.tag,
+
+    s_op->u.pipeline.buffer_used = bytes;
+    s_op->u.pipeline.offsets = offsets;
+    s_op->u.pipeline.sizes = sizes;
+    s_op->u.pipeline.segs = count;
+
+    ret = job_bmi_recv(s_op->u.pipeline.address,
+		       (void *)s_op->u.pipeline.buffer,
+		       s_op->u.pipeline.buffer_size,
+		       s_op->u.pipeline.tag,
 		       BMI_PRE_ALLOC, 
 		       smcb, 
-		       0, /* unsigned long status_user_tag = 0 */
+		       1, /* unsigned long status_user_tag = 0 */
 		       js_p,
 		       &tmp_id,
 		       server_job_context,
 		       user_opts->server_job_flow_timeout,
-		       (bmi_hint)parent_s_op->u.io.hints);
+		       (bmi_hint)s_op->u.pipeline.hints);
     
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
@@ -118,7 +108,7 @@ static PINT_sm_action bmi_recv_call(stru
 	js_p->error_code = -PVFS_EIO;
 	return SM_ACTION_COMPLETE;
     }
-
+    /* immediate return */
     if(ret == 1) {
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
@@ -128,7 +118,6 @@ static PINT_sm_action bmi_recv_call(stru
 	return SM_ACTION_DEFERRED;
     }
 
-    js_p->error_code = 0;
     return SM_ACTION_COMPLETE;
 }
 
@@ -139,39 +128,62 @@ static PINT_sm_action trove_write_call(s
     job_id_t tmp_id;
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
     gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__, 
-		 s_op->u.rwsm.buffer_used);
+		 s_op->u.pipeline.buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+		 js_p->actual_size);
+#if 0
     gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
-		 (char *)s_op->u.rwsm.buffer);
+		 (char *)s_op->u.pipeline.buffer);
+#endif
 
-    if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
+    if(s_op->u.pipeline.buffer_used == 0) {
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
+
+    /* figure out if the fs config has trove data sync turned on or off
+     */
+    server_config = get_server_config_struct();
+    if(!server_config)
+    {
+        gossip_err("write_io: server config is NULL!\n");
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
     
-    fs_config = PINT_config_get_filesystems(&server_config);
+    fs_config = PINT_config_find_fs_id(
+        server_config, s_op->u.pipeline.coll_id);
+    if(!fs_config)
+    {
+        gossip_err("write_io: Failed to get filesystem "
+                   "config from fs_id of: %d\n",
+                   s_op->u.pipeline.coll_id);
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
     
     ret = job_trove_bstream_write_list(
-				       parent_s_op->u.io.coll_id,
-				       parent_s_op->u.io.handle,
-				       (char**)&s_op->u.rwsm.buffer,
-				       (TROVE_size *)&s_op->u.rwsm.buffer_used,
+				       s_op->u.pipeline.coll_id,
+				       s_op->u.pipeline.handle,
+				       (char **)&s_op->u.pipeline.buffer,
+				       (TROVE_size *)&js_p->actual_size,
 				       1,
-				       s_op->u.rwsm.offsets,
-				       s_op->u.rwsm.sizes,
-				       s_op->u.rwsm.segs,
-				       &s_op->u.rwsm.out_size,
-				       fs_config->trove_sync_data,
+				       s_op->u.pipeline.offsets,
+				       s_op->u.pipeline.sizes,
+				       s_op->u.pipeline.segs,
+				       &s_op->u.pipeline.out_size,
+				       (fs_config->trove_sync_data ? TROVE_SYNC : 0),
 				       NULL,
 				       smcb,
 				       0,
 				       js_p,
 				       &tmp_id,
 				       server_job_context,
-				       parent_s_op->u.io.hints);
+				       s_op->u.pipeline.hints);
 
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
 	//handle_io_error(ret, q_item, flow_data);
@@ -181,29 +193,33 @@ static PINT_sm_action trove_write_call(s
     }
 
     if(ret == 1) {
-	/* immediate completion; trigger callback ourselves */
+	/* immediate completion */
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
 
-    /* FIXME: not sure if this is required */
     if(ret == 0) {
+	js_p->error_code = 0;
 	return SM_ACTION_DEFERRED;
     }
 
     return SM_ACTION_COMPLETE;
 }
 
-static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
+    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
     js_p->error_code = 0;
-    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
 
-    parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
+    parent_s_op->u.io.total_transferred += s_op->u.pipeline.out_size;
     gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
 		 parent_s_op->u.io.total_transferred);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
+		 s_op->u.pipeline.buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: out_size=%ld\n", __func__,
+		 s_op->u.pipeline.out_size);
 
     if(!segpool_done(h)) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
@@ -212,16 +228,15 @@ static int check_done(struct PINT_smcb *
     else {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
     }
-    
+
     return SM_ACTION_COMPLETE;
 }
 
-static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     js_p->error_code = 0;
 
     return SM_ACTION_COMPLETE;
-    //return(server_state_machine_complete(smcb)); // why not this?
 }
 
 /*




More information about the Pvfs2-cvs mailing list