[Pvfs2-cvs] commit by sson in pvfs2/src/server: pipeline.sm io.sm
module.mk.in pipeline.h pvfs2-server.h
CVS commit program
cvs at parl.clemson.edu
Tue Apr 28 17:36:02 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv27106/src/server
Modified Files:
Tag: as-branch
io.sm module.mk.in pipeline.h pvfs2-server.h
Added Files:
Tag: as-branch
pipeline.sm
Log Message:
Merged read.sm and write.sm into a single file, pipeline.sm.
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ pipeline.sm 2009-04-28 17:36:02.000000000 -0400
@@ -0,0 +1,501 @@
+/*
+ * (C) 2001 Clemson University and The University of Chicago
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ * PVFS2 server state machine for driving read I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+//#include "pipeline.h"
+#include "trove.h"
+
+static void do_comp(struct PINT_server_op *);
+#define LOOP 3
+
+%%
+
+nested machine pvfs2_pipeline_sm
+{
+ state fetch
+ {
+ run fetch_data;
+ default => dispatch;
+ }
+
+ state dispatch
+ {
+ run dispatch_data;
+ default => check;
+ }
+
+ state check
+ {
+ run check_done;
+ success => finished;
+ LOOP => fetch;
+ }
+
+ state finished
+ {
+ run epilog;
+ default => return;
+ }
+}
+
+%%
+
+/*
+ * fetch data from either TROVE (in case of READ) or BMI (in case of WRITE)
+ *
+ * PINT_segpool_take_segments()
+ * => READ: job_trove_bstream_read_list()
+ * => WRITE: job_bmi_recv()
+ */
+static PINT_sm_action fetch_data(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.pipeline.id;
+ struct filesystem_configuration_s *fs_config;
+ struct server_configuration_s *server_config;
+ struct server_configuration_s *user_opts = get_server_config_struct(); /* for write */
+ int count, ret;
+ PVFS_offset *offsets;
+ PVFS_size *sizes;
+ PVFS_size bytes;
+ job_id_t tmp_id;
+
+ s_op->u.pipeline.buffer_used = 0;
+ bytes = s_op->u.pipeline.buffer_size;
+
+ PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
+ &offsets, &sizes);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: bytes=%ld, count=%d\n",
+ __func__,
+ (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
+ bytes, count);
+
+ if(count == 0) {
+ js_p->error_code = 0;
+ //gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
+ return SM_ACTION_COMPLETE;
+ }
+
+ s_op->u.pipeline.buffer_used = bytes;
+ s_op->u.pipeline.offsets = offsets;
+ s_op->u.pipeline.sizes = sizes;
+ s_op->u.pipeline.segs = count;
+
+ if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ server_config = get_server_config_struct();
+ if(!server_config) {
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ fs_config = PINT_config_find_fs_id(server_config,
+ s_op->u.pipeline.coll_id);
+ if(!fs_config) {
+ gossip_err("%s: Failed to get filesystem "
+ "config from fs_id of: %d\n", __func__,
+ s_op->u.pipeline.coll_id);
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ ret = job_trove_bstream_read_list
+ (s_op->u.pipeline.coll_id,
+ s_op->u.pipeline.handle,
+ (char **)&s_op->u.pipeline.buffer,
+ (PVFS_size *)&s_op->u.pipeline.buffer_used,
+ 1,
+ offsets,
+ sizes,
+ count,
+ &s_op->u.pipeline.out_size,
+ (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context,
+ s_op->u.pipeline.hints);
+ }
+ else if (s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
+ ret = job_bmi_recv(s_op->u.pipeline.address,
+ (void *)s_op->u.pipeline.buffer,
+ s_op->u.pipeline.buffer_size,
+ s_op->u.pipeline.tag,
+ BMI_PRE_ALLOC,
+ smcb,
+ 1, /* unsigned long status_user_tag = 0 */
+ js_p,
+ &tmp_id,
+ server_job_context,
+ user_opts->server_job_flow_timeout,
+ (bmi_hint)s_op->u.pipeline.hints);
+ }
+
+ if(ret < 0) {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME */
+ //handle_io_error(ret, q_item, flow_data);
+ js_p->error_code = -PVFS_EIO;
+ return SM_ACTION_COMPLETE;
+ }
+
+ /* immediate return */
+ if(ret == 1) {
+ js_p->error_code = 1;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_DEFERRED;
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+/*
+ * Dispatch data to either BMI (in case of READ) or TROVE (in case of WRITE)
+ *
+ * => READ: job_bmi_send()
+ * => WRITE: job_trove_bstream_write_list()
+ */
+static PINT_sm_action dispatch_data(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ int ret;
+ job_id_t tmp_id;
+ struct filesystem_configuration_s *fs_config; /* for write */
+ struct server_configuration_s *server_config; /* for write */
+ struct server_configuration_s *user_opts = get_server_config_struct(); /* for read */
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+ if(s_op->u.pipeline.segs == 0) {
+ js_p->error_code = 0;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
+ return SM_ACTION_COMPLETE;
+ }
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: %s: buffer_used=%ld\n", __func__,
+ (s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
+ s_op->u.pipeline.buffer_used);
+#if 0
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+ (char *)s_op->u.pipeline.buffer);
+#endif
+
+
+ if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
+ /***************************************************/
+ if(parent_s_op->u.io.op != 0)
+ do_comp(s_op);
+ /***************************************************/
+
+ assert(s_op->u.pipeline.buffer_used);
+ if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
+ ret = 1; /* AS: skip sending if op is specified */
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+ else
+ /* FIXME: what should be a proper value for status_user_tag? */
+ /* FIXME: 3rd parameter, if s_op->u.pipeline.out_size is used,
+ client might complain about size mismatch */
+ ret = job_bmi_send(s_op->u.pipeline.address,
+ s_op->u.pipeline.buffer,
+ js_p->actual_size,
+ s_op->u.pipeline.tag,
+ BMI_PRE_ALLOC,
+ 0, /* send_unexpected */
+ smcb, /* user_ptr */
+ 0, /* status_user_tag */
+ js_p,
+ &tmp_id,
+ server_job_context,
+ user_opts->server_job_bmi_timeout,
+ (bmi_hint)s_op->u.pipeline.hints);
+
+ }
+ else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ server_config = get_server_config_struct();
+ if(!server_config) {
+ gossip_err("write_io: server config is NULL!\n");
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ fs_config = PINT_config_find_fs_id(server_config,
+ s_op->u.pipeline.coll_id);
+ if(!fs_config) {
+ gossip_err("write_io: Failed to get filesystem "
+ "config from fs_id of: %d\n",
+ s_op->u.pipeline.coll_id);
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ ret = job_trove_bstream_write_list
+ (s_op->u.pipeline.coll_id,
+ s_op->u.pipeline.handle,
+ (char **)&s_op->u.pipeline.buffer,
+ (TROVE_size *)&js_p->actual_size,
+ 1,
+ s_op->u.pipeline.offsets,
+ s_op->u.pipeline.sizes,
+ s_op->u.pipeline.segs,
+ &s_op->u.pipeline.out_size,
+ (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context,
+ s_op->u.pipeline.hints);
+ }
+
+ if(ret < 0) {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME !!!!!!! */
+ /* handle_io_error(ret, q_item, flow_data); */
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
+ /* immediate return */
+ if(ret == 1) {
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 0) {
+ js_p->error_code = ret;
+ return SM_ACTION_DEFERRED;
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
+ js_p->error_code = 0;
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+ gen_mutex_lock(&parent_s_op->u.io.mutex);
+ if(s_op->u.pipeline.io_type == PVFS_IO_READ)
+ parent_s_op->u.io.total_transferred += js_p->actual_size;
+ else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE)
+ parent_s_op->u.io.total_transferred += s_op->u.pipeline.out_size;
+ gen_mutex_unlock(&parent_s_op->u.io.mutex);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+ parent_s_op->u.io.total_transferred);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
+ s_op->u.pipeline.segs);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+ js_p->actual_size);
+
+
+ /* FIXME: */
+ /* unless the second condition is set, the server starts
+ a new read request to one already done, and falls into
+ infinite trove_read->bmi_send->check_done loop */
+ if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
+ js_p->error_code = LOOP;
+ }
+ else {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ js_p->error_code = 0;
+
+ return SM_ACTION_COMPLETE;
+}
+
+static void do_comp(struct PINT_server_op *s_op)
+{
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+
+ if(s_op->u.pipeline.buffer) {
+ PVFS_size i;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n",
+ __func__, s_op->u.pipeline.buffer_used,
+ parent_s_op->u.io.op,
+ parent_s_op->u.io.datatype); /* AS */
+
+ switch(parent_s_op->u.io.datatype) {
+ case ((int)0x4c000405): /* MPI_INT */
+ {
+ int *a = s_op->u.pipeline.buffer;
+ int result;
+ PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_INT).ub);
+ int *tmp;
+
+ if (parent_s_op->u.io.total_transferred == 0) {
+ if (parent_s_op->u.io.tmp_buffer == NULL)
+ parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
+ memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
+ }
+ tmp = parent_s_op->u.io.tmp_buffer;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld\n", parent_s_op->u.io.total_transferred);
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
+
+ switch(parent_s_op->u.io.op) {
+ case 0x58000001: /* MAX */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] > result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (parent_s_op->u.io.total_transferred == 0 ||
+ result > *tmp)
+ *tmp = result;
+ break;
+ case 0x58000002: /* MIN */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] < result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (parent_s_op->u.io.total_transferred == 0 ||
+ result < *tmp)
+ *tmp = result;
+ break;
+ case 0x58000003: /* SUM */
+ result = 0;
+ for (i=0; i<count; i++ ) {
+ if (i<10)
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%d\n",
+ i, a[i]);
+ result += a[i];
+ }
+ a[0] = result;
+ *tmp += result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n",
+ *tmp);
+ break;
+
+ default:
+ break;
+ }
+ s_op->u.pipeline.buffer = (void *)a;
+ parent_s_op->u.io.tmp_buffer = (void *)tmp;
+ }
+ break;
+
+ case ((int)0x4c00080b): /* MPI_DOUBLE */
+ {
+ double *a = s_op->u.pipeline.buffer;
+ double result;
+ PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
+ double *tmp;
+
+ if (parent_s_op->u.io.total_transferred == 0) {
+ if (parent_s_op->u.io.tmp_buffer == NULL)
+ parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
+ memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
+ }
+ tmp = parent_s_op->u.io.tmp_buffer;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "total_transferred=%ld\n",
+ parent_s_op->u.io.total_transferred);
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n",
+ count, *tmp);
+ switch(parent_s_op->u.io.op) {
+ case 0x58000001: /* MAX */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] > result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (parent_s_op->u.io.total_transferred == 0 ||
+ result > *tmp)
+ *tmp = result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
+ break;
+ case 0x58000002: /* MIN */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] < result) {
+ result = a[i];
+ }
+ }
+
+ a[0] = result;
+ if (parent_s_op->u.io.total_transferred == 0 ||
+ result < *tmp)
+ *tmp = result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
+ *tmp);
+ break;
+ case 0x58000003: /* SUM */
+ result = 0;
+ for (i=0; i<count; i++ ) {
+ if (i<10) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
+ result += a[i];
+ }
+
+ a[0] = result;
+ *tmp += result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n",
+ *tmp);
+ break;
+ default:
+ break;
+ } /* end inner switch */
+ s_op->u.pipeline.buffer = (void *)a;
+ parent_s_op->u.io.tmp_buffer = (void *)tmp;
+ }
+
+ break;
+ default:
+ break;
+ } /* end switch() */
+ } /* end if() */
+}
+
+/*
+ * Local variables:
+ * mode: c
+ * c-indent-level: 4
+ * c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.6 -r1.73.6.7
--- io.sm 28 Apr 2009 16:17:00 -0000 1.73.6.6
+++ io.sm 28 Apr 2009 21:36:02 -0000 1.73.6.7
@@ -19,12 +19,12 @@
#include "pint-request.h"
#include "pvfs2-internal.h"
#include "pint-segpool.h"
-#include "pipeline.h"
+//#include "pipeline.h"
-extern struct PINT_state_machine_s pvfs2_read_sm;
-extern struct PINT_state_machine_s pvfs2_write_sm;
+extern struct PINT_state_machine_s pvfs2_pipeline_sm;
-#define NUM_OF_PARALLEL_SMS 8
+#define NUM_OF_PARALLEL_SMS 4
+#define BUFFER_SIZE (256*1024)
%%
@@ -54,8 +54,7 @@ machine pvfs2_io_sm
{
pjmp start_pipelining_sm
{
- DO_READ => pvfs2_read_sm;
- DO_WRITE => pvfs2_write_sm;
+ success => pvfs2_pipeline_sm;
}
success => send_completion_ack;
default => release;
@@ -268,6 +267,7 @@ static PINT_sm_action start_pipelining_s
pipeline_op->u.pipeline.hints = s_op->req->hints;
pipeline_op->u.pipeline.tag = s_op->tag;
pipeline_op->u.pipeline.buffer_size = BUFFER_SIZE;
+ pipeline_op->u.pipeline.io_type = s_op->req->u.io.io_type;
if(s_op->req->u.io.io_type == PVFS_IO_READ) {
if(!pipeline_op->u.pipeline.buffer) {
/* if the buffer has not been used, allocate a buffer */
@@ -279,7 +279,7 @@ static PINT_sm_action start_pipelining_s
assert(pipeline_op->u.pipeline.buffer);
}
- ret = PINT_sm_push_frame(smcb, DO_READ, pipeline_op);
+ ret = PINT_sm_push_frame(smcb, 0, pipeline_op);
s_op->u.io.parallel_sms++;
gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n",
__func__, s_op->u.io.parallel_sms);
@@ -295,7 +295,7 @@ static PINT_sm_action start_pipelining_s
assert(pipeline_op->u.pipeline.buffer);
}
- ret = PINT_sm_push_frame(smcb, DO_WRITE, pipeline_op);
+ ret = PINT_sm_push_frame(smcb, 0, pipeline_op);
s_op->u.io.parallel_sms++;
gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_sms=%d\n",
__func__, s_op->u.io.parallel_sms);
Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/module.mk.in,v
diff -p -u -r1.55.6.1 -r1.55.6.2
--- module.mk.in 14 Apr 2009 20:19:50 -0000 1.55.6.1
+++ module.mk.in 28 Apr 2009 21:36:02 -0000 1.55.6.2
@@ -44,8 +44,7 @@ ifdef BUILD_SERVER
$(DIR)/unexpected.c \
$(DIR)/precreate-pool-refiller.c \
$(DIR)/unstuff.c \
- $(DIR)/read.c \
- $(DIR)/write.c
+ $(DIR)/pipeline.c
# c files that should be added to server library
SERVERSRC += \
Index: pipeline.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/pipeline.h,v
diff -p -u -r1.1.2.1 -r1.1.2.2
--- pipeline.h 28 Apr 2009 16:17:57 -0000 1.1.2.1
+++ pipeline.h 28 Apr 2009 21:36:02 -0000 1.1.2.2
@@ -5,7 +5,7 @@
*/
#define BUFFER_SIZE (256*1024)
-enum {DO_READ=3, DO_WRITE, LOOP};
+enum {LOOP=3};
#if 0
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.4 -r1.155.6.5
--- pvfs2-server.h 28 Apr 2009 16:17:00 -0000 1.155.6.4
+++ pvfs2-server.h 28 Apr 2009 21:36:02 -0000 1.155.6.5
@@ -365,6 +365,8 @@ struct PINT_server_pipeline_op
PVFS_handle handle;
PVFS_BMI_addr_t address;
+ enum PVFS_io_type io_type;
+
void *parent;
char *buffer;
PVFS_size buffer_size;
More information about the Pvfs2-cvs
mailing list