[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm pvfs2-server.h read.sm rw-sm.h write.sm

CVS commit program cvs at parl.clemson.edu
Mon Apr 20 17:10:50 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv30835/src/server

Modified Files:
      Tag: as-branch
	io.sm pvfs2-server.h read.sm rw-sm.h write.sm 
Log Message:
Cleaned the data structures related to flow_d.




Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.3 -r1.73.6.4
--- io.sm	18 Apr 2009 21:59:24 -0000	1.73.6.3
+++ io.sm	20 Apr 2009 21:10:50 -0000	1.73.6.4
@@ -18,8 +18,8 @@
 #include "pint-distribution.h"
 #include "pint-request.h"
 #include "pvfs2-internal.h"
-#include "rw-sm.h"
 #include "pint-segpool.h"
+#include "rw-sm.h"
 
 extern struct PINT_state_machine_s pvfs2_read_sm;
 extern struct PINT_state_machine_s pvfs2_write_sm;
@@ -52,7 +52,7 @@ machine pvfs2_io_sm
 
     state start_pipelining
     {
-	pjmp start_pipeline_sm
+	pjmp start_pipelining_sm
         {
             DO_READ => pvfs2_read_sm;
 	    DO_WRITE => pvfs2_write_sm;
@@ -137,7 +137,7 @@ static int io_send_ack(
 }
 
 /*
- * Function: start_pipeline_sm()
+ * Function: start_pipelining_sm()
  *
  * Params:   server_op *s_op, 
  *           job_status_s* js_p
@@ -150,106 +150,94 @@ static int io_send_ack(
  * Returns:  int
  *
  * Synopsis: this is the most important part of the state machine.
- *           we setup the flow descriptor and post it in order to 
- *           carry out the data transfer
+ *           we setup the multiple read/write state machines 
+ *           to carry out the data transfer
  *           
  */
-static PINT_sm_action start_pipeline_sm(
+static PINT_sm_action start_pipelining_sm(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct server_configuration_s *user_opts = get_server_config_struct();
     struct filesystem_configuration_s *fs_conf;
-    struct PINT_server_op *flow_read_op;
-    struct PINT_server_op *flow_write_op;
+    struct PINT_server_op *rwsm_op;
     int i, ret; 
-    struct fp_private_data *flow_data = NULL;
     PINT_segpool_handle_t seg_handle;
         
-    s_op->u.io.flow_d = PINT_flow_alloc();
-    
-    if (!s_op->u.io.flow_d)
-    {
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-
-    s_op->u.io.flow_d->hints = s_op->req->hints;
+    s_op->u.io.hints = s_op->req->hints;
+    s_op->u.io.total_transferred = 0;
 
     /* we still have the file size stored in the response structure 
      * that we sent in the previous state, other details come from
      * request
      */
-    s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
-    s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
-    s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
-    s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
+    s_op->u.io.file_data.fsize = s_op->resp.u.io.bstream_size;
+    s_op->u.io.file_data.dist = s_op->req->u.io.io_dist;
+    s_op->u.io.file_data.server_nr = s_op->req->u.io.server_nr;
+    s_op->u.io.file_data.server_ct = s_op->req->u.io.server_ct;
 
     /* on writes, we allow the bstream to be extended at EOF */
     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
     {
         gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
                      "write data.\n", __func__);
-        s_op->u.io.flow_d->file_data.extend_flag = 1;
+        s_op->u.io.file_data.extend_flag = 1;
     }
     else
     {
         gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
                      "read data.\n", __func__);
-        s_op->u.io.flow_d->file_data.extend_flag = 0;
+        s_op->u.io.file_data.extend_flag = 0;
     }
 
-    s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
-    s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
-    s_op->u.io.flow_d->mem_req = NULL;
-    s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
-    s_op->u.io.flow_d->tag = s_op->tag;
-    s_op->u.io.flow_d->user_ptr = NULL;
-    s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
-    s_op->u.io.flow_d->op = s_op->req->u.io.op; /* AS: operation */
-    s_op->u.io.flow_d->datatype = s_op->req->u.io.datatype; /* AS: dtype */
-    s_op->u.io.flow_d->dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
+    s_op->u.io.file_req = s_op->req->u.io.file_req;
+    s_op->u.io.file_req_offset = s_op->req->u.io.file_req_offset;
+    s_op->u.io.mem_req = NULL;
+    s_op->u.io.aggregate_size = s_op->req->u.io.aggregate_size;
+    s_op->u.io.tag = s_op->tag;
+    s_op->u.io.user_ptr = NULL;
+    s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */
+    s_op->u.io.datatype = s_op->req->u.io.datatype; /* AS: dtype */
+    s_op->u.io.dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
     
     gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct);
     for(i=0; i<s_op->req->u.io.server_ct; i++)
-	gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i, s_op->u.io.flow_d->dfile_array[i]);
+	gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i, 
+		     s_op->u.io.dfile_array[i]);
 
     fs_conf = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
     if(fs_conf)
     {
         /* pick up any buffer settings overrides from fs conf */
-        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
-        s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
+        s_op->u.io.buffer_size = fs_conf->fp_buffer_size;
+        s_op->u.io.num_of_buffers = fs_conf->fp_buffers_per_flow;
     }
 
-    gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, " 
-        "server_nr: %d, server_ct: %d\n",
-        lld(s_op->u.io.flow_d->file_data.fsize),
-        (int)s_op->u.io.flow_d->file_data.server_nr,
-        (int)s_op->u.io.flow_d->file_data.server_ct);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: fsize: %lld, " 
+		 "server_nr: %d, server_ct: %d\n", __func__,
+        lld(s_op->u.io.file_data.fsize),
+        (int)s_op->u.io.file_data.server_nr,
+        (int)s_op->u.io.file_data.server_ct);
 
     gossip_debug(GOSSIP_IO_DEBUG, "      file_req_offset: %lld, "
         "aggregate_size: %lld, handle: %llu\n", 
-        lld(s_op->u.io.flow_d->file_req_offset),
-        lld(s_op->u.io.flow_d->aggregate_size),
+        lld(s_op->u.io.file_req_offset),
+        lld(s_op->u.io.aggregate_size),
         llu(s_op->req->u.io.handle));
 
+    /* FIXME: loop bodies are same, need to remove if_else??? */
     /* set endpoints depending on type of io requested */
     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
     {
-        s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
-        s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
-        s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
-        s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
-        s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
+        s_op->u.io.address = s_op->addr;
+        s_op->u.io.handle = s_op->req->u.io.handle;
+        s_op->u.io.coll_id = s_op->req->u.io.fs_id;
     }
     else if (s_op->req->u.io.io_type == PVFS_IO_READ)
     {
-        s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
-        s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
-        s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
-        s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
-        s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
+        s_op->u.io.handle = s_op->req->u.io.handle;
+        s_op->u.io.coll_id = s_op->req->u.io.fs_id;
+        s_op->u.io.address = s_op->addr;
     }
     else
     {
@@ -258,120 +246,56 @@ static PINT_sm_action start_pipeline_sm(
         return SM_ACTION_COMPLETE;
     }
 
-    flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
-    if(!flow_data) {
-	js_p->error_code = -PVFS_ENOMEM;
-	return SM_ACTION_COMPLETE;
-    }
-
-    memset(flow_data, 0, sizeof(struct fp_private_data));
-    
-    s_op->u.io.flow_d->flow_protocol_data = flow_data;
-    s_op->u.io.flow_d->state = FLOW_TRANSMITTING;
-    flow_data->parent = s_op->u.io.flow_d;
-
     /* setup the request processing states */
     gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
-		 s_op->u.io.flow_d->aggregate_size);
+		 s_op->u.io.aggregate_size);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
-		 s_op->u.io.flow_d->file_data.fsize);
+		 s_op->u.io.file_data.fsize);
 
-    if(s_op->u.io.flow_d->buffer_size < 1)
-        s_op->u.io.flow_d->buffer_size = BUFFER_SIZE;
-    if(s_op->u.io.flow_d->buffers_per_flow < 1)
-        s_op->u.io.flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
-        
-    flow_data->prealloc_array = (struct fp_queue_item*)
-        malloc(s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
-    if(!flow_data->prealloc_array)
-    {
-        free(flow_data);
-        js_p->error_code = (-PVFS_ENOMEM);
-	return SM_ACTION_COMPLETE;
-    }
-
-    memset(flow_data->prealloc_array, 0,
-        s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
-    for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++)
-    {
-        flow_data->prealloc_array[i].parent = s_op->u.io.flow_d;
-    }
-
-    /* remaining setup depends on the endpoints we intend to use */
-    if(s_op->req->u.io.io_type == PVFS_IO_READ) {
-	PINT_segpool_unit_id id;
-
-	PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
-	ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
-				s_op->u.io.flow_d->file_req,
-				s_op->u.io.flow_d->file_data.fsize,
-				s_op->u.io.flow_d->file_req_offset,
-				s_op->u.io.flow_d->aggregate_size,
-				s_op->u.io.flow_d->file_data.server_nr,
-				s_op->u.io.flow_d->file_data.server_ct,
-				s_op->u.io.flow_d->file_data.dist, /* dist */
-				PINT_SP_SERVER_READ,
-				&seg_handle);
-
-	s_op->u.io.seg_handle = seg_handle;
-	for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
-	    PINT_segpool_register(seg_handle, &id);
-	    flow_data->prealloc_array[i].result_chain.q_item =
-		&flow_data->prealloc_array[i];
-	    flow_read_op = malloc(sizeof(*flow_read_op));
-	    memset(flow_read_op, 0, sizeof(*flow_read_op));
-	    flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
-	    flow_read_op->u.flow_read.seg_handle = seg_handle;
-	    flow_read_op->u.flow_read.id = id;
-	    ret = PINT_sm_push_frame(smcb, DO_READ, flow_read_op);
-	    if(ret < 0) {
-		js_p->error_code = -PVFS_ENOMEM;
-		return SM_ACTION_COMPLETE;
-	    }
+    if(s_op->u.io.buffer_size < 1)
+        s_op->u.io.buffer_size = BUFFER_SIZE;
+    if(s_op->u.io.num_of_buffers < 1)
+        s_op->u.io.num_of_buffers = BUFFERS_PER_PIPELINING;
+
+    PINT_segpool_unit_id id;
+
+    PINT_dist_lookup(s_op->u.io.file_data.dist);
+    ret = PINT_segpool_init(s_op->u.io.mem_req,
+			    s_op->u.io.file_req,
+			    s_op->u.io.file_data.fsize,
+			    s_op->u.io.file_req_offset,
+			    s_op->u.io.aggregate_size,
+			    s_op->u.io.file_data.server_nr,
+			    s_op->u.io.file_data.server_ct,
+			    s_op->u.io.file_data.dist, /* dist */
+			    (s_op->req->u.io.io_type == PVFS_IO_READ ? 
+			     PINT_SP_SERVER_READ: PINT_SP_SERVER_WRITE),
+			    &seg_handle);
+
+    s_op->u.io.seg_handle = seg_handle;
+    for(i=0; i<s_op->u.io.num_of_buffers; i++) {
+	PINT_segpool_register(seg_handle, &id);
+	rwsm_op = malloc(sizeof(*rwsm_op));
+	memset(rwsm_op, 0, sizeof(*rwsm_op));
+	rwsm_op->u.rwsm.seg_handle = seg_handle;
+	rwsm_op->u.rwsm.id = id;
+	rwsm_op->u.rwsm.parent = s_op;
+	if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+	    ret = PINT_sm_push_frame(smcb, DO_READ, rwsm_op);
 	    s_op->u.io.parallel_read_sms++;
 	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n", 
 			 __func__, s_op->u.io.parallel_read_sms);
 	}
-    }
-    else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_WRITE\n", __func__);
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: file_size=%ld\n", __func__,
-		     s_op->u.io.flow_d->aggregate_size);
-	PINT_segpool_unit_id id;
-
-	PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
-	ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
-				 s_op->u.io.flow_d->file_req,
-				s_op->u.io.flow_d->file_data.fsize,
-				s_op->u.io.flow_d->file_req_offset, /* sson */
-				s_op->u.io.flow_d->aggregate_size,
-				 s_op->u.io.flow_d->file_data.server_nr,
-				 s_op->u.io.flow_d->file_data.server_ct,
-				 s_op->u.io.flow_d->file_data.dist, /* dist */
-				 PINT_SP_SERVER_WRITE,
-				 &seg_handle);
-
-	s_op->u.io.seg_handle = seg_handle;
-	/* FIXME: we will eventually want to post multiple of recvs */
-	for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
-	    PINT_segpool_register(seg_handle, &id);
-	    flow_data->prealloc_array[i].result_chain.q_item = 
-		&flow_data->prealloc_array[i];
-	
-	    flow_write_op = malloc(sizeof(*flow_write_op));
-	    memset(flow_write_op, 0, sizeof(*flow_write_op));
-	    flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
-	    flow_write_op->u.flow_write.seg_handle = seg_handle;
-	    flow_write_op->u.flow_write.id = id;
-	    ret = PINT_sm_push_frame(smcb, DO_WRITE, flow_write_op);
-	    if(ret < 0) {
-		js_p->error_code = -PVFS_ENOMEM;
-		return SM_ACTION_COMPLETE;
-	    }
+	else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+	    ret = PINT_sm_push_frame(smcb, DO_WRITE, rwsm_op);
 	    s_op->u.io.parallel_write_sms++;
 	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n", 
 			 __func__, s_op->u.io.parallel_write_sms);
 	}
+	if(ret < 0) {
+	    js_p->error_code = -PVFS_ENOMEM;
+	    return SM_ACTION_COMPLETE;
+	}
     }
 
     js_p->error_code = 0;
@@ -433,11 +357,6 @@ static PINT_sm_action io_cleanup(
     PVFS_strerror_r(s_op->resp.status, status_string, 64);
     PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "finish (%s)\n", status_string);
 
-    if (s_op->u.io.flow_d)
-    {
-        PINT_flow_free(s_op->u.io.flow_d);
-    }
-
     if (s_op->u.io.seg_handle) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: freeing seg_handle\n", __func__);
 	PINT_segpool_destroy(s_op->u.io.seg_handle);
@@ -460,7 +379,7 @@ static PINT_sm_action io_cleanup(
  * Params:   server_op *s_op, 
  *           job_status_s* js_p
  *
- * Pre:      flow is completed so that we can report its status
+ * Pre:      IO is completed so that we can report its status
  *
  * Post:     if this is a write, response has been sent to client
  *           if this is a read, do nothing
@@ -523,34 +442,34 @@ static PINT_sm_action io_send_completion
     s_op->resp.status = js_p->error_code;
     if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS: read with op */
 	s_op->resp.u.read_completion.total_completed =
-	    s_op->u.io.flow_d->total_transferred;
-	gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.flow_d->total_transferred); /* AS */
+	    s_op->u.io.total_transferred;
+	gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.total_transferred); /* AS */
 	switch(s_op->req->u.io.datatype) {
 	case (int)0x4c000405: /* MPI_INT */
 	    {
 		int *tmp;
-		tmp = s_op->u.io.flow_d->tmp_buffer;
+		tmp = s_op->u.io.tmp_buffer;
 		gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (INT) to send=%d, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
 		s_op->resp.u.read_completion.result.buffer_sz = sizeof(int);
-		s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+		s_op->resp.u.read_completion.result.buffer = s_op->u.io.tmp_buffer;
 		break;
 	    }
 	case (int)0x4c00080b:
 	    {
 		double *tmp;
-		tmp = s_op->u.io.flow_d->tmp_buffer;
+		tmp = s_op->u.io.tmp_buffer;
 		gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (DOUBLE) to send=%lf, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
 		s_op->resp.u.read_completion.result.buffer_sz = sizeof(double);
-		s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+		s_op->resp.u.read_completion.result.buffer = s_op->u.io.tmp_buffer;
 		break;
 	    }
 	}
     }
     else /* AS */
 	s_op->resp.u.write_completion.total_completed = 
-	    s_op->u.io.flow_d->total_transferred; /* AS */
+	    s_op->u.io.total_transferred; /* AS */
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.flow_d->total_transferred);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.total_transferred);
     err = PINT_encode(
         &s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
         s_op->addr, s_op->decoded.enc_type);

Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.2 -r1.155.6.3
--- pvfs2-server.h	18 Apr 2009 21:59:24 -0000	1.155.6.2
+++ pvfs2-server.h	20 Apr 2009 21:10:50 -0000	1.155.6.3
@@ -333,43 +333,53 @@ struct PINT_server_getconfig_op
 
 struct PINT_server_io_op
 {
-    flow_descriptor* flow_d;
-    //io_pipeline* iop; /* substitute for flow_d */
+    //void *parent;
+    PVFS_fs_id coll_id;
+    PVFS_handle handle;
+    PVFS_BMI_addr_t address;
+
+    PINT_Request *file_req;
+    PVFS_offset file_req_offset;
+    PINT_Request *mem_req;
+    PVFS_msg_tag_t tag;
+    void *user_ptr;
+
+    int op;
+    int datatype;
+    void *tmp_buffer;
+    PVFS_handle *dfile_array;
+
+    PVFS_size aggregate_size;
+
+    PINT_request_file_data file_data;
+  
+    int buffer_size;
+    int num_of_buffers;
+    
+    PVFS_size total_transferred;
+
     int parallel_write_sms;
     int parallel_read_sms;
     PINT_segpool_handle_t seg_handle;
+    
+    PVFS_hint hints;
 };
 
-/* This is for flow state machine */
-/* not sure if this is the right place */
-struct PINT_server_flow_read_op
-{
-    struct fp_queue_item *q_item;
-    void *buffer; /* substitute for q_item */
-    PVFS_size buffer_used; /* substitute for q_item */
-    PINT_segpool_handle_t seg_handle;
-    PINT_segpool_unit_id id;
-    PVFS_offset *offsets;
-    PINT_Request_state *file_req_state;
-    PVFS_size *sizes;
-    int segs;
-    int parallel_sms;
-};
- 
-struct PINT_server_flow_write_op
+/* substibute for flow */
+struct PINT_server_rwsm_op
 {
-    struct fp_queue_item *q_item;
-    void *buffer; /* substitute for q_item */
-    PVFS_size buffer_used; /* substitute for q_item */
+    void *parent;
+    void *buffer; 
+    PVFS_size buffer_used; 
+    PVFS_size out_size;
     PINT_segpool_handle_t seg_handle;
     PINT_segpool_unit_id id;
     PVFS_offset *offsets;
     PINT_Request_state *file_req_state;
     PVFS_size *sizes;
     int segs;
-    int parallel_sms;
 };
-
+ 
 struct PINT_server_small_io_op
 {
     PVFS_offset offsets[IO_MAX_REGIONS];
@@ -505,8 +515,7 @@ typedef struct PINT_server_op
 	struct PINT_server_rmdirent_op rmdirent;
 	struct PINT_server_io_op io;
         struct PINT_server_small_io_op small_io;
-	struct PINT_server_flow_read_op flow_read; /* for read sm */
-	struct PINT_server_flow_write_op flow_write; /* for write sm */
+	struct PINT_server_rwsm_op rwsm;
 	struct PINT_server_flush_op flush;
 	struct PINT_server_truncate_op truncate;
 	struct PINT_server_mkdir_op mkdir;

Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- read.sm	18 Apr 2009 21:59:24 -0000	1.1.2.2
+++ read.sm	20 Apr 2009 21:10:50 -0000	1.1.2.3
@@ -21,7 +21,7 @@
 #include "rw-sm.h"
 #include "trove.h"
 
-static void do_comp(struct fp_queue_item *);
+static void do_comp(struct PINT_server_op *);
 
 %%
 
@@ -61,9 +61,8 @@ nested machine pvfs2_read_sm
 static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
-    PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
-    PINT_segpool_unit_id id = s_op->u.flow_read.id;
+    PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.rwsm.id;
     int ret;
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
@@ -72,29 +71,30 @@ static int trove_read_call(struct PINT_s
     PVFS_size *sizes;
     PVFS_size bytes;
     job_id_t tmp_id;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
     
-    q_item->buffer_used = 0;
+    s_op->u.rwsm.buffer_used = 0;
 
-    if(!q_item->buffer) {
-	gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
-        /* if the q_item has not been used, allocate a buffer */
-        q_item->buffer = BMI_memalloc(
-            q_item->parent->dest.u.bmi.address,
-            q_item->parent->buffer_size, BMI_SEND);
+    if(!s_op->u.rwsm.buffer) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: !buffer\n", __func__);
+        /* if the buffer has not been used, allocate a buffer */
+        s_op->u.rwsm.buffer = BMI_memalloc(
+	    parent_s_op->u.io.address,
+            parent_s_op->u.io.buffer_size, BMI_SEND);
         /* TODO: error handling */
-        assert(q_item->buffer);
+        assert(s_op->u.rwsm.buffer);
     }
     
-    bytes = q_item->parent->buffer_size;
+    bytes = parent_s_op->u.io.buffer_size;
     PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
 			       &offsets, &sizes);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
 		 bytes, count);
     
-    q_item->buffer_used = bytes;
-    s_op->u.flow_read.offsets = offsets;
-    s_op->u.flow_read.sizes = sizes;
-    s_op->u.flow_read.segs = count;
+    s_op->u.rwsm.buffer_used = bytes;
+    s_op->u.rwsm.offsets = offsets;
+    s_op->u.rwsm.sizes = sizes;
+    s_op->u.rwsm.segs = count;
 
     if(count == 0) {
 	js_p->error_code = 0;
@@ -105,15 +105,15 @@ static int trove_read_call(struct PINT_s
     fs_config = PINT_config_get_filesystems(&server_config);
 
     ret = job_trove_bstream_read_list(
-				      q_item->parent->src.u.trove.coll_id,
-				      q_item->parent->src.u.trove.handle,
-				      &q_item->buffer,
-				      (PVFS_size *)&q_item->buffer_used,
+				      parent_s_op->u.io.coll_id,
+				      parent_s_op->u.io.handle,
+				      &s_op->u.rwsm.buffer,
+				      (PVFS_size *)&s_op->u.rwsm.buffer_used,
 				      1,
 				      offsets,
 				      sizes,
 				      count,
-				      &q_item->out_size,
+				      &s_op->u.rwsm.out_size,
 				      fs_config->trove_sync_data,
 				      NULL,
 				      smcb,
@@ -121,7 +121,7 @@ static int trove_read_call(struct PINT_s
 				      js_p,
 				      &tmp_id,
 				      server_job_context,
-				      q_item->parent->hints);
+				      parent_s_op->u.io.hints);
 
     gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
     
@@ -149,40 +149,38 @@ static int trove_read_call(struct PINT_s
 static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+    if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
 	js_p->error_code = 0;
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
 	return SM_ACTION_COMPLETE;
     }
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
-		 q_item->buffer_used);
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
-		 (char *)q_item->buffer);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
+		 s_op->u.rwsm.buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+		 (char *)s_op->u.rwsm.buffer);
     /***************************************************/
-    do_comp(q_item); 
+    do_comp(s_op); 
     /***************************************************/
 
-    /* while we hold dest lock, look for next seq no. to send */
     //do{
-	assert(q_item->buffer_used);
-	if(flow_data->parent->op != 0) { /* AS: when op is specified */
+	assert(s_op->u.rwsm.buffer_used);
+	if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
 	    //ret = 1; /* AS: skip sending if op is specified */ 
 	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
 	    js_p->error_code = 0;
 	    return SM_ACTION_COMPLETE;
 	} 
 	else 
-	    ret = job_bmi_send(q_item->parent->dest.u.bmi.address,
-			       q_item->buffer,
-			       q_item->buffer_used,
-			       q_item->parent->tag,
+	    ret = job_bmi_send(parent_s_op->u.io.address,
+			       s_op->u.rwsm.buffer,
+			       s_op->u.rwsm.buffer_used,
+			       parent_s_op->u.io.tag,
 			       BMI_PRE_ALLOC,
 			       0, /* send_unexpected */
 			       smcb, /* user_ptr */
@@ -191,14 +189,14 @@ static int bmi_send_call(struct PINT_smc
 			       &tmp_id,
 			       server_job_context,
 			       user_opts->server_job_bmi_timeout,
-			       (bmi_hint)q_item->parent->hints);
-
+			       (bmi_hint)parent_s_op->u.io.hints);
+#if 0
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
                 __func__,
                 flow_data->initial_posts, flow_data->dest_pending,
                 flow_data->dest_last_posted);
-
+#endif
         if(ret < 0)
         {
             gossip_err("%s: I/O error occurred\n", __func__);
@@ -226,21 +224,21 @@ static int bmi_send_call(struct PINT_smc
 static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
-    PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
+    PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
     js_p->error_code = 0;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
     
-    q_item->parent->total_transferred += q_item->buffer_used;
+    parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
     gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
-		 q_item->parent->total_transferred);
+		 parent_s_op->u.io.total_transferred);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
-		 s_op->u.flow_read.segs);
+		 s_op->u.rwsm.segs);
 
     /* FIXME: */
     /* unless the second condition is set, the server starts
        a new read request to one already done, and falls into
        infinite trove_read->bmi_send->check_done loop */
-    if(!segpool_done(h) && s_op->u.flow_read.segs != 0) {
+    if(!segpool_done(h) && s_op->u.rwsm.segs != 0) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
 	js_p->error_code = LOOP;
     }
@@ -254,47 +252,48 @@ static int check_done(struct PINT_smcb *
 static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
     js_p->error_code = 0;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    if(q_item->buffer) {
-	BMI_memfree(q_item->parent->src.u.bmi.address, 
-		    q_item->buffer, q_item->parent->buffer_size, BMI_SEND);
+    if(s_op->u.rwsm.buffer) {
+	BMI_memfree(parent_s_op->u.io.address, s_op->u.rwsm.buffer, 
+		    parent_s_op->u.io.buffer_size, BMI_SEND);
     }
+
     gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
     return SM_ACTION_COMPLETE;
-    //return(server_state_machine_complete(smcb)); // why not this?
 }
 
-static void do_comp(struct fp_queue_item *q_item)
+static void do_comp(struct PINT_server_op *s_op)
 {
-    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    if(q_item->buffer) {
+    if(s_op->u.rwsm.buffer) {
 	PVFS_size i;
 	gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-		     "%s: q_item->buffer_used=%ld, op=0x%x, datatype=0x%x\n", 
-		     __func__, q_item->buffer_used, flow_data->parent->op, 
-		     flow_data->parent->datatype); /* AS */
+		     "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n", 
+		     __func__, s_op->u.rwsm.buffer_used, 
+		     parent_s_op->u.io.op, 
+		     parent_s_op->u.io.datatype); /* AS */
 
-	switch(flow_data->parent->datatype) {
+	switch(parent_s_op->u.io.datatype) {
 	case ((int)0x4c000405): /* MPI_INT */
 	    {
-		int *a = q_item->buffer;
+		int *a = s_op->u.rwsm.buffer;
 		int result;
-		PVFS_size count = (q_item->buffer_used)/((*PVFS_INT).ub);
+		PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_INT).ub);
 		int *tmp;
 		
-		if (flow_data->parent->total_transferred == 0) {
-		     if (flow_data->parent->tmp_buffer == NULL)
-			 flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(int));
-		     memset(flow_data->parent->tmp_buffer, 0, sizeof(int));
+		if (parent_s_op->u.io.total_transferred == 0) {
+		    if (parent_s_op->u.io.tmp_buffer == NULL)
+			parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
+		    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
 		}
-		tmp = flow_data->parent->tmp_buffer;
-		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld, total_bytes_processed=%ld\n", flow_data->parent->total_transferred, flow_data->total_bytes_processed);
+		tmp = parent_s_op->u.io.tmp_buffer;
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld\n", parent_s_op->u.io.total_transferred);
 		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
 
-		switch(flow_data->parent->op) {
+		switch(parent_s_op->u.io.op) {
 		case 0x58000001: /* MAX */
 		    result = *a;
 		    for (i=1; i<count; i++ ) {
@@ -303,7 +302,7 @@ static void do_comp(struct fp_queue_item
 			}
 		    }
 		    a[0] = result;
-		    if (flow_data->parent->total_transferred == 0 ||
+		    if (parent_s_op->u.io.total_transferred == 0 ||
 			result > *tmp)
 			*tmp = result;
 		    break;
@@ -315,7 +314,7 @@ static void do_comp(struct fp_queue_item
 			}
 		    }
 		    a[0] = result;
-		    if (flow_data->parent->total_transferred == 0 ||
+		    if (parent_s_op->u.io.total_transferred == 0 ||
 			result < *tmp)
 			*tmp = result;
 		    break;
@@ -336,31 +335,30 @@ static void do_comp(struct fp_queue_item
 		default:
 		    break;
 		}
-		q_item->buffer = (void *)a;
-		flow_data->parent->tmp_buffer = (void *)tmp;
+		s_op->u.rwsm.buffer = (void *)a;
+		parent_s_op->u.io.tmp_buffer = (void *)tmp;
 	    }
 	    break;
 
 	case ((int)0x4c00080b): /* MPI_DOUBLE */
 	    {
-		double *a = q_item->buffer;
+		double *a = s_op->u.rwsm.buffer;
 		double result;
-		PVFS_size count = (q_item->buffer_used)/((*PVFS_DOUBLE).ub);
+		PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_DOUBLE).ub);
 		double *tmp;
 
-		if (flow_data->parent->total_transferred == 0) {
-		    if (flow_data->parent->tmp_buffer == NULL)
-			flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(double));
-		    memset(flow_data->parent->tmp_buffer, 0, sizeof(double));
+		if (parent_s_op->u.io.total_transferred == 0) {
+		    if (parent_s_op->u.io.tmp_buffer == NULL)
+			parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
+		    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
 		}
-		tmp = flow_data->parent->tmp_buffer;
+		tmp = parent_s_op->u.io.tmp_buffer;
 		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, 
-			     "total_transferred=%ld, total_bytes_processed=%ld\n", 
-			     flow_data->parent->total_transferred, 
-			     flow_data->total_bytes_processed);
+			     "total_transferred=%ld\n",
+			     parent_s_op->u.io.total_transferred);
 		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n", 
 			     count, *tmp);
-		switch(flow_data->parent->op) {
+		switch(parent_s_op->u.io.op) {
 		case 0x58000001: /* MAX */
 		    result = *a;
 		    for (i=1; i<count; i++ ) {
@@ -369,7 +367,7 @@ static void do_comp(struct fp_queue_item
 			}
 		    }
 		    a[0] = result;
-		    if (flow_data->parent->total_transferred == 0 ||
+		    if (parent_s_op->u.io.total_transferred == 0 ||
 			result > *tmp)
 			*tmp = result;
 		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
@@ -383,7 +381,7 @@ static void do_comp(struct fp_queue_item
 		    }
 
 		    a[0] = result;
-		    if (flow_data->parent->total_transferred == 0 ||
+		    if (parent_s_op->u.io.total_transferred == 0 ||
 			result < *tmp)
 			*tmp = result;
 		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n", 
@@ -404,8 +402,8 @@ static void do_comp(struct fp_queue_item
 		default:
 		    break;
 		} /* end inner switch */
-		q_item->buffer = (void *)a;
-		flow_data->parent->tmp_buffer = (void *)tmp;
+		s_op->u.rwsm.buffer = (void *)a;
+		parent_s_op->u.io.tmp_buffer = (void *)tmp;
 	    }
 	    
 	    break;

Index: rw-sm.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/rw-sm.h,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- rw-sm.h	18 Apr 2009 21:59:24 -0000	1.1.2.2
+++ rw-sm.h	20 Apr 2009 21:10:50 -0000	1.1.2.3
@@ -1,81 +1,12 @@
-#include "src/io/flow/flowproto-support.h"
-#include "thread-mgr.h"
+#ifndef __RW_SM_H
+#define __RW_SM_H
 
-/* the following buffer settings are used by default if none are specified in
- * the flow descriptor
+/* the following buffer settings are used by default
  */
-#define BUFFERS_PER_FLOW 8
+#define BUFFERS_PER_PIPELINING 8
 #define BUFFER_SIZE (256*1024)
 
-#define MAX_REGIONS 64
-
-#define FLOW_CLEANUP(__flow_data)                                     \
-do {                                                                  \
-    struct flow_descriptor *__flow_d = (__flow_data)->parent;         \
-    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto completing %p\n",\
-                 __flow_d);                                           \
-    cleanup_buffers(__flow_data);                                     \
-    __flow_d = (__flow_data)->parent;                                 \
-    free(__flow_data);                                                \
-} while(0)
-
-
-struct result_chain_entry
-{
-    PVFS_id_gen_t posted_id;
-    char *buffer_offset;
-    PINT_Request_result result;
-    PVFS_size size_list[MAX_REGIONS];
-    PVFS_offset offset_list[MAX_REGIONS];
-    struct result_chain_entry *next;
-    struct fp_queue_item *q_item;
-    struct PINT_thread_mgr_trove_callback trove_callback;
-};
-
-/* fp_queue_item describes an individual buffer being used within the flow */
-struct fp_queue_item
-{
-    PVFS_id_gen_t posted_id;
-    int last;
-    int seq;
-    void *buffer;
-    PVFS_size buffer_used;
-    PVFS_size out_size;
-    struct result_chain_entry result_chain;
-    int result_chain_count;
-  //struct qlist_head list_link;
-    flow_descriptor *parent;
-    struct PINT_thread_mgr_bmi_callback bmi_callback;
-};
-
-/* fp_private_data is information specific to this flow protocol, stored
- * in flow descriptor but hidden from caller
- */
-struct fp_private_data
-{
-    flow_descriptor *parent;
-    struct fp_queue_item* prealloc_array;
-    struct qlist_head list_link;
-    PVFS_size total_bytes_processed;
-    int next_seq;
-    int next_seq_to_send;
-    int dest_pending;
-    int dest_last_posted;
-    int initial_posts;
-    void *tmp_buffer_list[MAX_REGIONS];
-    void *intermediate;
-    int cleanup_pending_count;
-    int req_proc_done;
-  
-  //struct qlist_head src_list;
-  //struct qlist_head dest_list;
-  //struct qlist_head empty_list;
-};
-#define PRIVATE_FLOW(target_flow)\
-    ((struct fp_private_data*)(target_flow->flow_protocol_data))
-
-static void cleanup_buffers(
-    struct fp_private_data *flow_data);
+enum {DO_READ=3, DO_WRITE, LOOP};
 
 #if 0
 
@@ -192,86 +123,5 @@ static void handle_io_error(
 
 #endif
 
-/* cleanup_buffers()
- *
- * releases any resources consumed during flow processing
- *
- * no return value
- */
-static void cleanup_buffers(struct fp_private_data *flow_data)
-{
-    int i;
-    struct result_chain_entry *result_tmp;
-    struct result_chain_entry *old_result_tmp;
-
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: \n", __func__);
-    if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
-        flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
-    {
-        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
-        {
-            if(flow_data->prealloc_array[i].buffer)
-            {
-	      BMI_memfree(flow_data->parent->src.u.bmi.address,
-			  flow_data->prealloc_array[i].buffer,
-			  flow_data->parent->buffer_size,
-			  BMI_RECV);
-            }
-            result_tmp = &(flow_data->prealloc_array[i].result_chain);
-            do{
-                old_result_tmp = result_tmp;
-                result_tmp = result_tmp->next;
-                if(old_result_tmp !=
-                    &(flow_data->prealloc_array[i].result_chain))
-                    free(old_result_tmp);
-            }while(result_tmp);
-            flow_data->prealloc_array[i].result_chain.next = NULL;
-        }
-    }
-    else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
-        flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
-    {
-        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
-        {
-            if(flow_data->prealloc_array[i].buffer)
-            {
-                BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                    flow_data->prealloc_array[i].buffer,
-                    flow_data->parent->buffer_size,
-                    BMI_SEND);
-            }
-            result_tmp = &(flow_data->prealloc_array[i].result_chain);
-            do{
-                old_result_tmp = result_tmp;
-                result_tmp = result_tmp->next;
-                if(old_result_tmp !=
-                    &(flow_data->prealloc_array[i].result_chain))
-                    free(old_result_tmp);
-            }while(result_tmp);
-            flow_data->prealloc_array[i].result_chain.next = NULL;
-        }
-    }
-    else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
-        flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
-    {
-        if(flow_data->intermediate)
-        {
-            BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
-        }
-    }
-    else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
-        flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
-    {
-        if(flow_data->intermediate)
-        {
-            BMI_memfree(flow_data->parent->src.u.bmi.address,
-                flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
-        }
-    }
-
-    free(flow_data->prealloc_array);
-}
-
 
-enum {DO_READ=3, DO_WRITE, LOOP};
+#endif /* __RW_SM_H */

Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.3 -r1.1.2.4
--- write.sm	18 Apr 2009 21:59:24 -0000	1.1.2.3
+++ write.sm	20 Apr 2009 21:10:50 -0000	1.1.2.4
@@ -60,9 +60,8 @@ nested machine pvfs2_write_sm
 static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
-    PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
-    PINT_segpool_unit_id id = s_op->u.flow_write.id;
+    PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.rwsm.id;
     int ret;
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
@@ -70,27 +69,28 @@ static PINT_sm_action bmi_recv_call(stru
     PVFS_offset *offsets;
     PVFS_size *sizes;
     PVFS_size bytes;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    q_item->buffer_used = 0;
+    s_op->u.rwsm.buffer_used = 0;
     
-    if(!q_item->buffer){
-        /* if the q_item has not been used, allocate a buffer */
-        q_item->buffer = BMI_memalloc(
-            q_item->parent->src.u.bmi.address,
-            q_item->parent->buffer_size, BMI_RECV);
+    if(!s_op->u.rwsm.buffer){
+        /* if the buffer has not been used, allocate a buffer */
+        s_op->u.rwsm.buffer = BMI_memalloc(
+            parent_s_op->u.io.address,
+            parent_s_op->u.io.buffer_size, BMI_RECV);
         /* TODO: error handling */
-        assert(q_item->buffer);
+        assert(s_op->u.rwsm.buffer);
     }
 
-    bytes = q_item->parent->buffer_size;
+    bytes = parent_s_op->u.io.buffer_size;
     PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
 			       &offsets, &sizes);
     gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n", 
 		 __func__, count, bytes);
-    q_item->buffer_used = bytes;
-    s_op->u.flow_write.offsets = offsets;
-    s_op->u.flow_write.sizes = sizes;
-    s_op->u.flow_write.segs = count;
+    s_op->u.rwsm.buffer_used = bytes;
+    s_op->u.rwsm.offsets = offsets;
+    s_op->u.rwsm.sizes = sizes;
+    s_op->u.rwsm.segs = count;
 
     if(count == 0) {
 	js_p->error_code = 0;
@@ -98,10 +98,10 @@ static PINT_sm_action bmi_recv_call(stru
     }
     
     /* TODO: what if we recv less than expected? */
-    ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
-		       (char *)q_item->buffer,
-		       q_item->parent->buffer_size,
-		       q_item->parent->tag,
+    ret = job_bmi_recv(parent_s_op->u.io.address,
+		       (char *)s_op->u.rwsm.buffer,
+		       parent_s_op->u.io.buffer_size,
+		       parent_s_op->u.io.tag,
 		       BMI_PRE_ALLOC, 
 		       smcb, 
 		       0, /* unsigned long status_user_tag = 0 */
@@ -109,7 +109,7 @@ static PINT_sm_action bmi_recv_call(stru
 		       &tmp_id,
 		       server_job_context,
 		       user_opts->server_job_flow_timeout,
-		       (bmi_hint)q_item->parent->hints);
+		       (bmi_hint)parent_s_op->u.io.hints);
     
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
@@ -135,18 +135,18 @@ static PINT_sm_action bmi_recv_call(stru
 static PINT_sm_action trove_write_call(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
     int ret;
     job_id_t tmp_id;
     struct filesystem_configuration_s *fs_config;
     struct server_configuration_s *server_config;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__, 
-		 q_item->buffer_used);
-    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
-		 (char *)q_item->buffer);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__, 
+		 s_op->u.rwsm.buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+		 (char *)s_op->u.rwsm.buffer);
 
-    if(s_op->u.flow_write.segs == 0 && q_item->buffer_used == 0) {
+    if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
 	js_p->error_code = 0;
 	return SM_ACTION_COMPLETE;
     }
@@ -154,15 +154,15 @@ static PINT_sm_action trove_write_call(s
     fs_config = PINT_config_get_filesystems(&server_config);
     
     ret = job_trove_bstream_write_list(
-				       q_item->parent->dest.u.trove.coll_id,
-				       q_item->parent->dest.u.trove.handle,
-				       (char**)&q_item->buffer,
-				       (TROVE_size *)&q_item->buffer_used,
+				       parent_s_op->u.io.coll_id,
+				       parent_s_op->u.io.handle,
+				       (char**)&s_op->u.rwsm.buffer,
+				       (TROVE_size *)&s_op->u.rwsm.buffer_used,
 				       1,
-				       s_op->u.flow_write.offsets,
-				       s_op->u.flow_write.sizes,
-				       s_op->u.flow_write.segs,
-				       &q_item->out_size,
+				       s_op->u.rwsm.offsets,
+				       s_op->u.rwsm.sizes,
+				       s_op->u.rwsm.segs,
+				       &s_op->u.rwsm.out_size,
 				       fs_config->trove_sync_data,
 				       NULL,
 				       smcb,
@@ -170,7 +170,7 @@ static PINT_sm_action trove_write_call(s
 				       js_p,
 				       &tmp_id,
 				       server_job_context,
-				       q_item->parent->hints);
+				       parent_s_op->u.io.hints);
 
     if(ret < 0) {
 	gossip_err("%s: I/O error occurred\n", __func__);
@@ -197,15 +197,14 @@ static PINT_sm_action trove_write_call(s
 static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
-    PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+    PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
     js_p->error_code = 0;
+    struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
 
-    q_item->parent->total_transferred += q_item->buffer_used;
+    parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
     gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
-		 q_item->parent->total_transferred);
+		 parent_s_op->u.io.total_transferred);
 
-    //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
     if(!segpool_done(h)) {
 	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
 	js_p->error_code = LOOP;



More information about the Pvfs2-cvs mailing list