[Pvfs2-cvs] commit by sson in pvfs2/src/server: pipeline.sm io.sm module.mk.in pipeline.h pvfs2-server.h

CVS commit program cvs at parl.clemson.edu
Tue Apr 28 17:36:02 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv27106/src/server

Modified Files:
      Tag: as-branch
	io.sm module.mk.in pipeline.h pvfs2-server.h 
Added Files:
      Tag: as-branch
	pipeline.sm 
Log Message:
Merged read.sm and write.sm into a single file, pipeline.sm.



--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ pipeline.sm	2009-04-28 17:36:02.000000000 -0400
@@ -0,0 +1,501 @@
+/* 
+ * (C) 2001 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ *  PVFS2 server state machine for driving read I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+//#include "pipeline.h"
+#include "trove.h"
+
+static void do_comp(struct PINT_server_op *);
+#define LOOP 3
+
+%%
+
+nested machine pvfs2_pipeline_sm
+{
+    state fetch
+    {
+        run fetch_data;
+        default => dispatch;
+    }
+
+    state dispatch
+    {
+        run dispatch_data;
+        default => check;
+    }
+
+    state check
+    {
+        run check_done;
+	success => finished;
+        LOOP => fetch;
+    }
+
+    state finished
+    {
+        run epilog;
+        default => return;
+    }
+}
+
+%%
+
+/*
+ * fetch data from either TROVE (in case of READ) or BMI (in case of WRITE)
+ * 
+ *   PINT_segpool_take_segments()
+ *     => READ: job_trove_bstream_read_list()
+ *     => WRITE: job_bmi_recv()
+ */
+static PINT_sm_action fetch_data(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.pipeline.id;
+    struct filesystem_configuration_s *fs_config;
+    struct server_configuration_s *server_config;
+    struct server_configuration_s *user_opts = get_server_config_struct(); /* for write */
+    int count, ret;
+    PVFS_offset *offsets;
+    PVFS_size *sizes;
+    PVFS_size bytes;
+    job_id_t tmp_id;
+    
+    s_op->u.pipeline.buffer_used = 0;
+    bytes = s_op->u.pipeline.buffer_size;
+
+    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, 
+			       &offsets, &sizes);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: bytes=%ld, count=%d\n", 
+		 __func__, 
+		 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
+		 bytes, count);
+
+    if(count == 0) {
+	js_p->error_code = 0;
+	//gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
+	return SM_ACTION_COMPLETE;
+    }
+
+    s_op->u.pipeline.buffer_used = bytes;
+    s_op->u.pipeline.offsets = offsets;
+    s_op->u.pipeline.sizes = sizes;
+    s_op->u.pipeline.segs = count;
+
+    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
+	/* figure out if the fs config has trove data sync turned on or off
+	 */
+	server_config = get_server_config_struct();
+	if(!server_config) {
+	    js_p->error_code = -PVFS_EINVAL;
+	    return SM_ACTION_COMPLETE;
+	}
+    
+	fs_config = PINT_config_find_fs_id(server_config, 
+					   s_op->u.pipeline.coll_id);
+	if(!fs_config) {
+	    gossip_err("%s: Failed to get filesystem "
+		       "config from fs_id of: %d\n", __func__,
+		       s_op->u.pipeline.coll_id);
+	    js_p->error_code = -PVFS_EINVAL;
+	    return SM_ACTION_COMPLETE;
+	}
+
+	ret = job_trove_bstream_read_list
+	    (s_op->u.pipeline.coll_id,
+	     s_op->u.pipeline.handle,
+	     (char **)&s_op->u.pipeline.buffer,
+	     (PVFS_size *)&s_op->u.pipeline.buffer_used,
+	     1,
+	     offsets,
+	     sizes,
+	     count,
+	     &s_op->u.pipeline.out_size,
+	     (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+	     NULL,
+	     smcb,
+	     0,
+	     js_p,
+	     &tmp_id,
+	     server_job_context,
+	     s_op->u.pipeline.hints);
+    }
+    else if (s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
+	ret = job_bmi_recv(s_op->u.pipeline.address,
+		       (void *)s_op->u.pipeline.buffer,
+		       s_op->u.pipeline.buffer_size,
+		       s_op->u.pipeline.tag,
+		       BMI_PRE_ALLOC, 
+		       smcb, 
+		       1, /* unsigned long status_user_tag = 0 */
+		       js_p,
+		       &tmp_id,
+		       server_job_context,
+		       user_opts->server_job_flow_timeout,
+		       (bmi_hint)s_op->u.pipeline.hints);
+    }
+
+    if(ret < 0) {
+	gossip_err("%s: I/O error occurred\n", __func__);
+	/* FIXME */
+	//handle_io_error(ret, q_item, flow_data);
+	js_p->error_code = -PVFS_EIO;
+	return SM_ACTION_COMPLETE;
+    }
+
+    /* immediate return */
+    if(ret == 1) {
+	js_p->error_code = 1;
+	return SM_ACTION_COMPLETE;
+    }
+
+    if(ret == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_DEFERRED;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+/*
+ * Dispatch data to either BMI (in case of READ) or TROVE (in case of WRITE)
+ *
+ *   => READ: job_bmi_send()
+ *   => WRITE: job_trove_bstream_write_list()
+ */
+static PINT_sm_action dispatch_data(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret;
+    job_id_t tmp_id;
+    struct filesystem_configuration_s *fs_config; /* for write */
+    struct server_configuration_s *server_config; /* for write */
+    struct server_configuration_s *user_opts = get_server_config_struct(); /* for read */
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+    if(s_op->u.pipeline.segs == 0) {
+	js_p->error_code = 0;
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
+	return SM_ACTION_COMPLETE;
+    }
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: buffer_used=%ld\n", __func__,
+		 (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
+		 s_op->u.pipeline.buffer_used);
+#if 0
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+		 (char *)s_op->u.pipeline.buffer);
+#endif
+
+    
+    if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
+	/***************************************************/
+	if(parent_s_op->u.io.op != 0)
+	    do_comp(s_op); 
+	/***************************************************/
+	
+	assert(s_op->u.pipeline.buffer_used);
+	if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
+	    ret = 1; /* AS: skip sending if op is specified */ 
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
+	    js_p->error_code = ret;
+	    return SM_ACTION_COMPLETE;
+	} 
+	else 
+	    /* FIXME: what should be a proper value for status_user_tag? */
+	    /* FIXME: 3rd parameter, if s_op->u.pipeline.out_size is used,
+	       client might complain about size mismatch */
+	    ret = job_bmi_send(s_op->u.pipeline.address,
+			       s_op->u.pipeline.buffer,
+			       js_p->actual_size, 
+			       s_op->u.pipeline.tag,
+			       BMI_PRE_ALLOC,
+			       0, /* send_unexpected */
+			       smcb, /* user_ptr */
+			       0, /* status_user_tag */
+			       js_p,
+			       &tmp_id,
+			       server_job_context,
+			       user_opts->server_job_bmi_timeout,
+			       (bmi_hint)s_op->u.pipeline.hints);
+	
+    }
+    else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
+	/* figure out if the fs config has trove data sync turned on or off
+	 */
+	server_config = get_server_config_struct();
+	if(!server_config) {
+	    gossip_err("write_io: server config is NULL!\n");
+	    js_p->error_code = -PVFS_EINVAL;
+	    return SM_ACTION_COMPLETE;
+	}
+	
+	fs_config = PINT_config_find_fs_id(server_config, 
+					   s_op->u.pipeline.coll_id);
+	if(!fs_config) {
+	    gossip_err("write_io: Failed to get filesystem "
+		       "config from fs_id of: %d\n",
+		       s_op->u.pipeline.coll_id);
+	    js_p->error_code = -PVFS_EINVAL;
+	    return SM_ACTION_COMPLETE;
+	}
+	
+	ret = job_trove_bstream_write_list
+	    (s_op->u.pipeline.coll_id,
+	     s_op->u.pipeline.handle,
+	     (char **)&s_op->u.pipeline.buffer,
+	     (TROVE_size *)&js_p->actual_size,
+	     1,
+	     s_op->u.pipeline.offsets,
+	     s_op->u.pipeline.sizes,
+	     s_op->u.pipeline.segs,
+	     &s_op->u.pipeline.out_size,
+	     (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+	     NULL,
+	     smcb,
+	     0,
+	     js_p,
+	     &tmp_id,
+	     server_job_context,
+	     s_op->u.pipeline.hints);
+    }
+
+    if(ret < 0) {
+	gossip_err("%s: I/O error occurred\n", __func__);
+	/* FIXME !!!!!!! */
+	/* handle_io_error(ret, q_item, flow_data); */
+	js_p->error_code = ret;
+	return SM_ACTION_COMPLETE;
+    }
+    
+    /* immediate return */
+    if(ret == 1) {
+	js_p->error_code = ret;
+	return SM_ACTION_COMPLETE;
+    }
+	
+    if(ret == 0) {
+	js_p->error_code = ret;
+	return SM_ACTION_DEFERRED;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
+    js_p->error_code = 0;
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+    gen_mutex_lock(&parent_s_op->u.io.mutex);
+    if(s_op->u.pipeline.io_type == PVFS_IO_READ)
+	parent_s_op->u.io.total_transferred += js_p->actual_size;
+    else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE)
+	parent_s_op->u.io.total_transferred += s_op->u.pipeline.out_size;
+    gen_mutex_unlock(&parent_s_op->u.io.mutex);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+		 parent_s_op->u.io.total_transferred);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
+		 s_op->u.pipeline.segs);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+		 js_p->actual_size);
+
+
+    /* FIXME: */
+    /* unless the second condition is set, the server starts
+       a new read request to one already done, and falls into
+       infinite trove_read->bmi_send->check_done loop */
+    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
+	js_p->error_code = LOOP;
+    }
+    else {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
+    }
+    
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    js_p->error_code = 0;
+
+    return SM_ACTION_COMPLETE;
+}
+
+static void do_comp(struct PINT_server_op *s_op)
+{
+    struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+    if(s_op->u.pipeline.buffer) {
+	PVFS_size i;
+	gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+		     "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n", 
+		     __func__, s_op->u.pipeline.buffer_used, 
+		     parent_s_op->u.io.op, 
+		     parent_s_op->u.io.datatype); /* AS */
+
+	switch(parent_s_op->u.io.datatype) {
+	case ((int)0x4c000405): /* MPI_INT */
+	    {
+		int *a = s_op->u.pipeline.buffer;
+		int result;
+		PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_INT).ub);
+		int *tmp;
+		
+		if (parent_s_op->u.io.total_transferred == 0) {
+		    if (parent_s_op->u.io.tmp_buffer == NULL)
+			parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
+		    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
+		}
+		tmp = parent_s_op->u.io.tmp_buffer;
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld\n", parent_s_op->u.io.total_transferred);
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
+
+		switch(parent_s_op->u.io.op) {
+		case 0x58000001: /* MAX */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] > result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (parent_s_op->u.io.total_transferred == 0 ||
+			result > *tmp)
+			*tmp = result;
+		    break;
+		case 0x58000002: /* MIN */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] < result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (parent_s_op->u.io.total_transferred == 0 ||
+			result < *tmp)
+			*tmp = result;
+		    break;
+		case 0x58000003: /* SUM */
+		    result = 0;
+		    for (i=0; i<count; i++ ) {
+			if (i<10) 
+			    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%d\n", 
+					 i, a[i]);
+			result += a[i];
+		    }
+		    a[0] = result;
+		    *tmp += result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n", 
+				 *tmp);
+		    break;
+		
+		default:
+		    break;
+		}
+		s_op->u.pipeline.buffer = (void *)a;
+		parent_s_op->u.io.tmp_buffer = (void *)tmp;
+	    }
+	    break;
+
+	case ((int)0x4c00080b): /* MPI_DOUBLE */
+	    {
+		double *a = s_op->u.pipeline.buffer;
+		double result;
+		PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
+		double *tmp;
+
+		if (parent_s_op->u.io.total_transferred == 0) {
+		    if (parent_s_op->u.io.tmp_buffer == NULL)
+			parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
+		    memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
+		}
+		tmp = parent_s_op->u.io.tmp_buffer;
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, 
+			     "total_transferred=%ld\n",
+			     parent_s_op->u.io.total_transferred);
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n", 
+			     count, *tmp);
+		switch(parent_s_op->u.io.op) {
+		case 0x58000001: /* MAX */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] > result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (parent_s_op->u.io.total_transferred == 0 ||
+			result > *tmp)
+			*tmp = result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
+		    break;
+		case 0x58000002: /* MIN */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] < result) {
+			    result = a[i];
+			}
+		    }
+
+		    a[0] = result;
+		    if (parent_s_op->u.io.total_transferred == 0 ||
+			result < *tmp)
+			*tmp = result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n", 
+				 *tmp);
+		    break;
+		case 0x58000003: /* SUM */
+		    result = 0;
+		    for (i=0; i<count; i++ ) {
+			if (i<10) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
+			result += a[i];
+		    }
+
+		    a[0] = result;
+		    *tmp += result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n", 
+				 *tmp);
+		    break;    
+		default:
+		    break;
+		} /* end inner switch */
+		s_op->u.pipeline.buffer = (void *)a;
+		parent_s_op->u.io.tmp_buffer = (void *)tmp;
+	    }
+	    
+	    break;
+	default:
+	    break;
+	} /* end switch() */
+    } /* end if() */
+}
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */

Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.6 -r1.73.6.7
--- io.sm	28 Apr 2009 16:17:00 -0000	1.73.6.6
+++ io.sm	28 Apr 2009 21:36:02 -0000	1.73.6.7
@@ -19,12 +19,12 @@
 #include "pint-request.h"
 #include "pvfs2-internal.h"
 #include "pint-segpool.h"
-#include "pipeline.h"
+//#include "pipeline.h"
 
-extern struct PINT_state_machine_s pvfs2_read_sm;
-extern struct PINT_state_machine_s pvfs2_write_sm;
+extern struct PINT_state_machine_s pvfs2_pipeline_sm;
 
-#define NUM_OF_PARALLEL_SMS 8
+#define NUM_OF_PARALLEL_SMS 4
+#define BUFFER_SIZE (256*1024)
 
 %%
 
@@ -54,8 +54,7 @@ machine pvfs2_io_sm
     {
 	pjmp start_pipelining_sm
         {
-            DO_READ => pvfs2_read_sm;
-	    DO_WRITE => pvfs2_write_sm;
+            success => pvfs2_pipeline_sm;
         }
         success => send_completion_ack;
         default => release;
@@ -268,6 +267,7 @@ static PINT_sm_action start_pipelining_s
 	pipeline_op->u.pipeline.hints = s_op->req->hints;
 	pipeline_op->u.pipeline.tag = s_op->tag;
 	pipeline_op->u.pipeline.buffer_size = BUFFER_SIZE;
+	pipeline_op->u.pipeline.io_type = s_op->req->u.io.io_type;
 	if(s_op->req->u.io.io_type == PVFS_IO_READ) {
 	    if(!pipeline_op->u.pipeline.buffer) {
 		/* if the buffer has not been used, allocate a buffer */
@@ -279,7 +279,7 @@ static PINT_sm_action start_pipelining_s
 		assert(pipeline_op->u.pipeline.buffer);
 	    }
 
-	    ret = PINT_sm_push_frame(smcb, DO_READ, pipeline_op);
+	    ret = PINT_sm_push_frame(smcb, 0, pipeline_op);
 	    s_op->u.io.parallel_sms++;
 	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n", 
 			 __func__, s_op->u.io.parallel_sms);
@@ -295,7 +295,7 @@ static PINT_sm_action start_pipelining_s
 		assert(pipeline_op->u.pipeline.buffer);
 	    }
 
-	    ret = PINT_sm_push_frame(smcb, DO_WRITE, pipeline_op);
+	    ret = PINT_sm_push_frame(smcb, 0, pipeline_op);
 	    s_op->u.io.parallel_sms++;
 	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n", 
 			 __func__, s_op->u.io.parallel_sms);

Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/module.mk.in,v
diff -p -u -r1.55.6.1 -r1.55.6.2
--- module.mk.in	14 Apr 2009 20:19:50 -0000	1.55.6.1
+++ module.mk.in	28 Apr 2009 21:36:02 -0000	1.55.6.2
@@ -44,8 +44,7 @@ ifdef BUILD_SERVER
 		$(DIR)/unexpected.c \
 		$(DIR)/precreate-pool-refiller.c \
 		$(DIR)/unstuff.c \
-		$(DIR)/read.c \
-		$(DIR)/write.c
+		$(DIR)/pipeline.c
 
 	# c files that should be added to server library
 	SERVERSRC += \

Index: pipeline.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/pipeline.h,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- pipeline.h	28 Apr 2009 16:17:57 -0000	1.1.2.1
+++ pipeline.h	28 Apr 2009 21:36:02 -0000	1.1.2.2
@@ -5,7 +5,7 @@
  */
 #define BUFFER_SIZE (256*1024)
 
-enum {DO_READ=3, DO_WRITE, LOOP};
+enum {LOOP=3};
 
 #if 0
 

Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.4 -r1.155.6.5
--- pvfs2-server.h	28 Apr 2009 16:17:00 -0000	1.155.6.4
+++ pvfs2-server.h	28 Apr 2009 21:36:02 -0000	1.155.6.5
@@ -365,6 +365,8 @@ struct PINT_server_pipeline_op
     PVFS_handle handle;
     PVFS_BMI_addr_t address;
 
+    enum PVFS_io_type io_type;
+
     void *parent;
     char *buffer; 
     PVFS_size buffer_size;



More information about the Pvfs2-cvs mailing list