[Pvfs2-cvs] commit by sson in pvfs2/src/server: read.sm io.sm module.mk.in pvfs2-server.h

CVS commit program cvs at parl.clemson.edu
Tue Apr 14 16:19:50 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv13438/src/server

Modified Files:
      Tag: as-branch
	io.sm module.mk.in pvfs2-server.h 
Added Files:
      Tag: as-branch
	read.sm 
Log Message:



--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ read.sm	2009-04-14 16:19:50.000000000 -0400
@@ -0,0 +1,513 @@
+/* 
+ * (C) 2001 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ *  PVFS2 server state machine for driving read I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "trove.h"
+
+static void do_comp(struct fp_queue_item *);
+
+%%
+
+nested machine pvfs2_read_sm
+{
+    state trove_read
+    {
+        run trove_read_call;
+        default => bmi_send;
+    }
+
+    state bmi_send
+    {
+        run bmi_send_call;
+        default => check;
+    }
+
+    state check
+    {
+        run check_done;
+	success => finished;
+        LOOP => trove_read;
+    }
+
+    state finished
+    {
+        run epilog;
+        default => return;
+    }
+}
+
+%%
+
+/*
+ * PINT_process_request() -> job_trove_bstream_read_list()
+ */
+static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.flow_read.id;
+    void *tmp_buffer;
+    PVFS_size bytes_processed = 0;
+    int err = -PVFS_EIO;
+    int ret;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+    struct filesystem_configuration_s *fs_config;
+    struct server_configuration_s *server_config;
+    int count;
+    PVFS_offset *offsets;
+    PVFS_size *sizes;
+    PVFS_size bytes;
+    int tmp_id;
+
+    q_item->buffer_used = 0;
+#if 0
+    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+		 "%s: error_code: %d, initial_call_flag: %d, flow: %p.\n", 
+		 __func__, error_code, initial_call_flag,
+        flow_data->parent);
+
+    if(error_code != 0 || flow_data->parent->error_code != 0)
+    {
+        gossip_err("%s: I/O error occurred\n", __func__);
+        handle_io_error(error_code, q_item, flow_data);
+        if(flow_data->parent->state == FLOW_COMPLETE)
+            return(1);
+        else
+            return(0);
+    }
+
+    /* if there are no more receives to post, just return */
+    if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+    {
+	js_p->error_code = 0;
+        return SM_ACTION_COMPLETE;
+    }
+#endif
+    if(!q_item->buffer) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
+        /* if the q_item has not been used, allocate a buffer */
+        q_item->buffer = BMI_memalloc(
+            q_item->parent->dest.u.bmi.address,
+            q_item->parent->buffer_size, BMI_SEND);
+        /* TODO: error handling */
+        assert(q_item->buffer);
+    }
+    
+    bytes = q_item->parent->buffer_size;
+    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%d, count=%d\n", __func__,
+		 bytes, count);
+    
+    q_item->buffer_used = bytes;
+    s_op->u.flow_read.offsets = offsets;
+    s_op->u.flow_read.sizes = sizes;
+    s_op->u.flow_read.segs = count;
+
+    if(count == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+
+#if 0
+    if(bytes == 0) {        
+	if(flow_data->parent->total_transferred ==
+	   flow_data->total_bytes_processed &&
+	   PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
+	    assert(q_item->parent->state != FLOW_COMPLETE);
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
+	    q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
+	}
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+#endif    
+
+    //do{
+        //assert(q_item->buffer_used);
+        //assert(result_tmp->result.bytes);
+        //result_tmp->q_item = q_item;
+        /* XXX: can someone confirm this avoids a segfault in the immediate
+         * completion case? */
+	//assert(result_tmp->result.bytes);
+
+	fs_config = PINT_config_get_filesystems(&server_config);
+
+        ret = job_trove_bstream_read_list(
+            q_item->parent->src.u.trove.coll_id,
+            q_item->parent->src.u.trove.handle,
+            //(char**)&result_tmp->buffer_offset,
+	    &q_item->buffer,
+	    &q_item->buffer_used,
+            1,
+	    offsets,
+	    sizes,
+	    count,
+            &q_item->out_size,
+	    fs_config->trove_sync_data,
+            NULL,
+	    smcb,
+	    0,
+	    js_p,
+	    &tmp_id,
+	    server_job_context,
+            q_item->parent->hints);
+
+        if(ret < 0)
+        {
+            gossip_err("%s: I/O error occurred\n", __func__);
+	    /* FIXME */
+            //handle_io_error(ret, q_item, flow_data);
+            if(flow_data->parent->state == FLOW_COMPLETE)
+                return (1);
+            else
+                return (0);
+        }
+
+        if(ret == 1)
+        {
+            /* immediate completion; trigger callback ourselves */
+	    js_p->error_code = 1;
+	    return SM_ACTION_COMPLETE;
+        }
+
+	if(ret == 0) {
+	    js_p->error_code = 0;
+	    return SM_ACTION_DEFERRED;
+	}
+	//} while(result_tmp);
+
+    return SM_ACTION_COMPLETE;
+}
+
+static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    int err = -PVFS_EIO;
+    int done = 0;
+    int ret;
+    job_id_t tmp_id;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+    PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+
+#if 0
+    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+		 "%s, error_code: %d, flow: %p.\n", __func__,
+		 error_code, flow_data->parent);
+
+    result_tmp->posted_id = 0;
+
+    if(error_code != 0 || flow_data->parent->error_code != 0)
+    {
+        gossip_err("%s: I/O error occurred\n", __func__);
+        handle_io_error(error_code, q_item, flow_data);
+        return;
+    }
+#endif
+
+    if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+#if 0
+    if(q_item->parent->state == FLOW_COMPLETE) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->result.bytes=%ld\n", 
+		 __func__, result_tmp->result.bytes);
+
+    result_tmp = &q_item->result_chain;
+    do{
+        old_result_tmp = result_tmp;
+        result_tmp = result_tmp->next;
+        if(old_result_tmp != &q_item->result_chain)
+            free(old_result_tmp);
+    } while(result_tmp);
+    q_item->result_chain.next = NULL;
+#endif
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+		 q_item->buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
+		 (char *)q_item->buffer);
+    /***************************************************/
+    do_comp(q_item); 
+    /***************************************************/
+
+    /* while we hold dest lock, look for next seq no. to send */
+    //do{
+	assert(q_item->buffer_used);
+	if(flow_data->parent->op != 0) { /* AS: when op is specified */
+	    //ret = 1; /* AS: skip sending if op is specified */ 
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
+	    js_p->error_code = 0;
+	    return SM_ACTION_COMPLETE;
+	} 
+	else 
+	    ret = job_bmi_send(q_item->parent->dest.u.bmi.address,
+			       q_item->buffer,
+			       q_item->buffer_used,
+			       q_item->parent->tag,
+			       BMI_PRE_ALLOC,
+			       0, /* send_unexpected */
+			       smcb, /* user_ptr */
+			       0, /* status_user_tag */
+			       js_p,
+			       &tmp_id,
+			       server_job_context,
+			       user_opts->server_job_flow_timeout,
+			       (bmi_hint)q_item->parent->hints);
+
+            gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+                "%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
+                __func__,
+                flow_data->initial_posts, flow_data->dest_pending,
+                flow_data->dest_last_posted);
+
+        if(ret < 0)
+        {
+            gossip_err("%s: I/O error occurred\n", __func__);
+	    /* FIXME !!!!!!! */
+            /* handle_io_error(ret, q_item, flow_data); */
+	    js_p->error_code = ret;
+            return SM_ACTION_COMPLETE;
+        }
+
+        if(ret == 1)
+        {
+	    js_p->error_code = ret;
+	    return SM_ACTION_COMPLETE;
+        }
+	
+	if(ret == 0) {
+	    js_p->error_code = ret;
+	    return SM_ACTION_DEFERRED;
+	}
+	//} while(!done);
+
+    return SM_ACTION_COMPLETE;
+}
+
+static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
+    js_p->error_code = 0;
+    
+    flow_data->total_bytes_processed += q_item->buffer_used;
+    q_item->parent->total_transferred += q_item->buffer_used;
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
+		 flow_data->total_bytes_processed);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
+		 s_op->u.flow_read.segs);
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: handle=%d\n", __func__, h);
+    //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
+    if(!segpool_done(h)) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
+	js_p->error_code = LOOP;
+    }
+    else {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: read: DONE\n", __func__);
+	flow_data->parent->state = FLOW_COMPLETE;
+	FLOW_CLEANUP(flow_data);
+    }
+    
+    return SM_ACTION_COMPLETE;
+}
+
+static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    js_p->error_code = 0;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
+    return SM_ACTION_COMPLETE;
+}
+
+static void do_comp(struct fp_queue_item *q_item)
+{
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+
+    if(q_item->buffer) {
+	PVFS_size i;
+	gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+		     "%s: q_item->buffer_used=%ld, op=0x%x, datatype=0x%x\n", 
+		     __func__, q_item->buffer_used, flow_data->parent->op, 
+		     flow_data->parent->datatype); /* AS */
+
+	switch(flow_data->parent->datatype) {
+	case ((int)0x4c000405): /* MPI_INT */
+	    {
+		int *a = q_item->buffer;
+		int result;
+		PVFS_size count = (q_item->buffer_used)/((*PVFS_INT).ub);
+		int *tmp;
+		
+		if (flow_data->parent->total_transferred == 0) {
+		     if (flow_data->parent->tmp_buffer == NULL)
+			 flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(int));
+		     memset(flow_data->parent->tmp_buffer, 0, sizeof(int));
+		}
+		tmp = flow_data->parent->tmp_buffer;
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld, total_bytes_processed=%ld\n", flow_data->parent->total_transferred, flow_data->total_bytes_processed);
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
+
+		switch(flow_data->parent->op) {
+		case 0x58000001: /* MAX */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] > result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (flow_data->parent->total_transferred == 0 ||
+			result > *tmp)
+			*tmp = result;
+		    break;
+		case 0x58000002: /* MIN */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] < result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (flow_data->parent->total_transferred == 0 ||
+			result < *tmp)
+			*tmp = result;
+		    break;
+		case 0x58000003: /* SUM */
+		    result = 0;
+		    for (i=0; i<count; i++ ) {
+			if (i<10) 
+			    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%d\n", 
+					 i, a[i]);
+			result += a[i];
+		    }
+		    a[0] = result;
+		    *tmp += result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n", 
+				 *tmp);
+		    break;
+		
+		default:
+		    break;
+		}
+		q_item->buffer = (void *)a;
+		flow_data->parent->tmp_buffer = (void *)tmp;
+	    }
+	    break;
+
+	case ((int)0x4c00080b): /* MPI_DOUBLE */
+	    {
+		double *a = q_item->buffer;
+		double result;
+		PVFS_size count = (q_item->buffer_used)/((*PVFS_DOUBLE).ub);
+		double *tmp;
+
+		if (flow_data->parent->total_transferred == 0) {
+		    if (flow_data->parent->tmp_buffer == NULL)
+			flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(double));
+		    memset(flow_data->parent->tmp_buffer, 0, sizeof(double));
+		}
+		tmp = flow_data->parent->tmp_buffer;
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, 
+			     "total_transferred=%ld, total_bytes_processed=%ld\n", 
+			     flow_data->parent->total_transferred, 
+			     flow_data->total_bytes_processed);
+		gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n", 
+			     count, *tmp);
+		switch(flow_data->parent->op) {
+		case 0x58000001: /* MAX */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] > result) {
+			    result = a[i];
+			}
+		    }
+		    a[0] = result;
+		    if (flow_data->parent->total_transferred == 0 ||
+			result > *tmp)
+			*tmp = result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
+		    break;
+		case 0x58000002: /* MIN */
+		    result = *a;
+		    for (i=1; i<count; i++ ) {
+			if (a[i] < result) {
+			    result = a[i];
+			}
+		    }
+
+		    a[0] = result;
+		    if (flow_data->parent->total_transferred == 0 ||
+			result < *tmp)
+			*tmp = result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n", 
+				 *tmp);
+		    break;
+		case 0x58000003: /* SUM */
+		    result = 0;
+		    for (i=0; i<count; i++ ) {
+			if (i<10) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
+			result += a[i];
+		    }
+
+		    a[0] = result;
+		    *tmp += result;
+		    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n", 
+				 *tmp);
+		    break;    
+		default:
+		    break;
+		} /* end inner switch */
+		q_item->buffer = (void *)a;
+		flow_data->parent->tmp_buffer = (void *)tmp;
+	    }
+	    
+	    break;
+	default:
+	    break;
+	} /* end switch() */
+    } /* end if() */
+}
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */

Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73 -r1.73.6.1
--- io.sm	20 Nov 2008 01:17:10 -0000	1.73
+++ io.sm	14 Apr 2009 20:19:50 -0000	1.73.6.1
@@ -18,6 +18,11 @@
 #include "pint-distribution.h"
 #include "pint-request.h"
 #include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "pint-segpool.h"
+
+extern struct PINT_state_machine_s pvfs2_read_sm;
+extern struct PINT_state_machine_s pvfs2_write_sm;
 
 %%
 
@@ -33,7 +38,7 @@ machine pvfs2_io_sm
     state send_positive_ack
     {
         run io_send_ack;
-        success => start_flow;
+        success => start_pipelining;
         default => release;
     }
 
@@ -43,9 +48,13 @@ machine pvfs2_io_sm
         default => release;
     }
 
-    state start_flow
+    state start_pipelining
     {
-        run io_start_flow;
+	pjmp start_pipeline_sm
+        {
+            DO_READ => pvfs2_read_sm;
+	    DO_WRITE => pvfs2_write_sm;
+        }
         success => send_completion_ack;
         default => release;
     }
@@ -106,6 +115,8 @@ static int io_send_ack(
 
     err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
                       s_op->addr, s_op->decoded.enc_type);
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: error=%d\n", __func__, err);
     if (err < 0)
     {
         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
@@ -124,7 +135,7 @@ static int io_send_ack(
 }
 
 /*
- * Function: io_start_flow()
+ * Function: start_pipeline_sm()
  *
  * Params:   server_op *s_op, 
  *           job_status_s* js_p
@@ -141,16 +152,21 @@ static int io_send_ack(
  *           carry out the data transfer
  *           
  */
-static PINT_sm_action io_start_flow(
+static PINT_sm_action start_pipeline_sm(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int err = -PVFS_EIO;
-    job_id_t tmp_id;
+    //int err = -PVFS_EIO;
     struct server_configuration_s *user_opts = get_server_config_struct();
     struct filesystem_configuration_s *fs_conf;
+    struct PINT_server_op *flow_read_op;
+    struct PINT_server_op *flow_write_op;
+    int i, ret; 
+    struct fp_private_data *flow_data = NULL;
+    PINT_segpool_handle_t seg_handle;
         
     s_op->u.io.flow_d = PINT_flow_alloc();
+    
     if (!s_op->u.io.flow_d)
     {
         js_p->error_code = -PVFS_ENOMEM;
@@ -171,14 +187,14 @@ static PINT_sm_action io_start_flow(
     /* on writes, we allow the bstream to be extended at EOF */
     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
     {
-        gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
-                     "write data.\n");
+        gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
+                     "write data.\n", __func__);
         s_op->u.io.flow_d->file_data.extend_flag = 1;
     }
     else
     {
-        gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
-                     "read data.\n");
+        gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
+                     "read data.\n", __func__);
         s_op->u.io.flow_d->file_data.extend_flag = 0;
     }
 
@@ -189,9 +205,15 @@ static PINT_sm_action io_start_flow(
     s_op->u.io.flow_d->tag = s_op->tag;
     s_op->u.io.flow_d->user_ptr = NULL;
     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
+    s_op->u.io.flow_d->op = s_op->req->u.io.op; /* AS: operation */
+    s_op->u.io.flow_d->datatype = s_op->req->u.io.datatype; /* AS: dtype */
+    s_op->u.io.flow_d->dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
+    
+    gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct);
+    for(i=0; i<s_op->req->u.io.server_ct; i++)
+	gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i, s_op->u.io.flow_d->dfile_array[i]);
 
-    fs_conf = PINT_config_find_fs_id(user_opts, 
-        s_op->req->u.io.fs_id);
+    fs_conf = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
     if(fs_conf)
     {
         /* pick up any buffer settings overrides from fs conf */
@@ -235,11 +257,168 @@ static PINT_sm_action io_start_flow(
         return SM_ACTION_COMPLETE;
     }
 
-    err = job_flow(s_op->u.io.flow_d, smcb, 0, js_p, &tmp_id,
-                   server_job_context, user_opts->server_job_flow_timeout
-                   , s_op->req->hints);
+    flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
+    if(!flow_data) {
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
+    }
+
+    memset(flow_data, 0, sizeof(struct fp_private_data));
+    
+    s_op->u.io.flow_d->flow_protocol_data = flow_data;
+    s_op->u.io.flow_d->state = FLOW_TRANSMITTING;
+    flow_data->parent = s_op->u.io.flow_d;
+
+    /* setup the request processing states */
+#if 0 //////////////////
+    s_op->u.io.flow_d->file_req_state = 
+	PINT_new_request_state(s_op->u.io.flow_d->file_req);
+    if (!s_op->u.io.flow_d->file_req_state)
+    {
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
+    }
+
+    /* only setup a memory datatype state if caller provided a memory datatype */
+    if(s_op->u.io.flow_d->mem_req)
+    {
+	s_op->u.io.flow_d->mem_req_state = 
+	    PINT_new_request_state(s_op->u.io.flow_d->mem_req);
+	if (!s_op->u.io.flow_d->mem_req_state)
+	{
+	    js_p->error_code = -PVFS_ENOMEM;
+	    return SM_ACTION_COMPLETE;
+	}
+    }
 
-    return err;
+    /* if a file datatype offset was specified, go ahead and skip ahead 
+     * before doing anything else
+     */
+    if(s_op->u.io.flow_d->file_req_offset)
+        PINT_REQUEST_STATE_SET_TARGET(s_op->u.io.flow_d->file_req_state,
+            s_op->u.io.flow_d->file_req_offset);
+
+    
+    /* set boundaries on file datatype */
+    if(s_op->u.io.flow_d->aggregate_size > -1)
+    {
+        PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
+            s_op->u.io.flow_d->aggregate_size+s_op->u.io.flow_d->file_req_offset);
+    }
+    else
+    {
+        PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
+            s_op->u.io.flow_d->file_req_offset +
+            PINT_REQUEST_TOTAL_BYTES(s_op->u.io.flow_d->mem_req));
+    }
+#endif //////////////////////
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
+		 s_op->u.io.flow_d->aggregate_size);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
+		 s_op->u.io.flow_d->file_data.fsize);
+
+    if(s_op->u.io.flow_d->buffer_size < 1)
+        s_op->u.io.flow_d->buffer_size = BUFFER_SIZE;
+    if(s_op->u.io.flow_d->buffers_per_flow < 1)
+        s_op->u.io.flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+        
+    flow_data->prealloc_array = (struct fp_queue_item*)
+        malloc(s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    if(!flow_data->prealloc_array)
+    {
+        free(flow_data);
+        js_p->error_code = (-PVFS_ENOMEM);
+	return SM_ACTION_COMPLETE;
+    }
+
+    memset(flow_data->prealloc_array, 0,
+        s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++)
+    {
+        flow_data->prealloc_array[i].parent = s_op->u.io.flow_d;
+    }
+
+    /* remaining setup depends on the endpoints we intend to use */
+    if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+	PINT_segpool_unit_id id;
+
+	PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
+	ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
+				s_op->u.io.flow_d->file_req,
+				s_op->u.io.flow_d->file_data.fsize,
+				s_op->u.io.flow_d->aggregate_size,
+				s_op->u.io.flow_d->file_data.server_nr,
+				s_op->u.io.flow_d->file_data.server_ct,
+				s_op->u.io.flow_d->file_data.dist, /* dist */
+				s_op->u.io.flow_d->file_req_offset, /* sson */
+				PINT_SP_SERVER_READ,
+				&seg_handle);
+
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
+	//for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
+	for(i=0; i<1; i++) {
+	    PINT_segpool_register(seg_handle, &id);
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
+	    flow_data->prealloc_array[i].result_chain.q_item =
+		&flow_data->prealloc_array[i];
+	    flow_read_op = malloc(sizeof(*flow_read_op));
+	    memset(flow_read_op, 0, sizeof(*flow_read_op));
+	    memcpy(flow_read_op, s_op, sizeof(*flow_read_op));
+	    flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
+	    flow_read_op->u.flow_read.seg_handle = seg_handle;
+	    flow_read_op->u.flow_read.id = id;
+	    ret = PINT_sm_push_frame(smcb, DO_READ, flow_read_op);
+	    if(ret < 0) {
+		js_p->error_code = -PVFS_ENOMEM;
+		return SM_ACTION_COMPLETE;
+	    }
+	    gossip_debug(GOSSIP_IO_DEBUG, "flow_read: ret=%d\n", ret);
+	}
+    }
+    else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_WRITE\n", __func__);
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: file_size=%ld\n", __func__,
+		     s_op->u.io.flow_d->aggregate_size);
+	PINT_segpool_unit_id id;
+
+	PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
+	ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
+				 s_op->u.io.flow_d->file_req,
+				s_op->u.io.flow_d->file_data.fsize, /* FIXME?? */
+				s_op->u.io.flow_d->aggregate_size,
+				 s_op->u.io.flow_d->file_data.server_nr,
+				 s_op->u.io.flow_d->file_data.server_ct,
+				 s_op->u.io.flow_d->file_data.dist, /* dist */
+				s_op->u.io.flow_d->file_req_offset, /* sson */
+				 PINT_SP_SERVER_WRITE,
+				 &seg_handle);
+
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
+	/* only post one outstanding recv at a time; easier to manage */
+	/* FIXME: we will eventually want to post multiple of recvs */
+	//for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
+	for(i=0; i<1; i++) {
+	    PINT_segpool_register(seg_handle, &id);
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
+	    flow_data->prealloc_array[i].result_chain.q_item = 
+		&flow_data->prealloc_array[i];
+	
+	    flow_write_op = malloc(sizeof(*flow_write_op));
+	    memset(flow_write_op, 0, sizeof(*flow_write_op));
+	    memcpy(flow_write_op, s_op, sizeof(*flow_write_op));
+	    flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
+	    flow_write_op->u.flow_write.seg_handle = seg_handle;
+	    flow_write_op->u.flow_write.id = id;
+	    ret = PINT_sm_push_frame(smcb, DO_WRITE, flow_write_op);
+	    if(ret < 0) {
+		js_p->error_code = -PVFS_ENOMEM;
+		return SM_ACTION_COMPLETE;
+	    }
+	}
+    }
+
+    js_p->error_code = 0;
+    return SM_ACTION_COMPLETE;
 }
 
 /*
@@ -344,8 +523,12 @@ static PINT_sm_action io_send_completion
      */
     if (s_op->req->u.io.io_type == PVFS_IO_READ)
     {
-        js_p->error_code = 0;
-        return SM_ACTION_COMPLETE;
+	if(s_op->req->u.io.op == 0) { /* AS */
+            gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack()?\n");
+            js_p->error_code = 0;
+            return SM_ACTION_COMPLETE;
+        }
+        gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), IO_READ with op\n");
     }
 
     /* release encoding of the first ack that we sent */
@@ -354,19 +537,63 @@ static PINT_sm_action io_send_completion
     /* zero size for safety */
     s_op->encoded.total_size = 0;
 
+    if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS */
+	switch(s_op->req->u.io.datatype) {
+	case (int)0x4c000405: /* MPI_INT */
+	    s_op->encoded.total_size = sizeof(int); /* FIXME */
+	    break;
+	case (int)0x4c00080b:
+	    s_op->encoded.total_size = sizeof(double); /* FIXME */
+	    break;
+	default:
+	    break;
+	}
+    }
+
     /*
       fill in response -- status field is the only generic one we
       should have to set
     */
-    s_op->resp.op = PVFS_SERV_WRITE_COMPLETION;  /* not IO */
+    if(s_op->req->u.io.op != 0 && s_op->req->u.io.io_type == PVFS_IO_READ) /* AS */
+	s_op->resp.op = PVFS_SERV_READ_COMPLETION;  /* AS: read with op */
+    else
+	s_op->resp.op = PVFS_SERV_WRITE_COMPLETION;  /* not IO */
     s_op->resp.status = js_p->error_code;
-    s_op->resp.u.write_completion.total_completed =
-        s_op->u.io.flow_d->total_transferred;
+    if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS: read with op */
+	s_op->resp.u.read_completion.total_completed =
+	    s_op->u.io.flow_d->total_transferred;
+	gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.flow_d->total_transferred); /* AS */
+	switch(s_op->req->u.io.datatype) {
+	case (int)0x4c000405: /* MPI_INT */
+	    {
+		int *tmp;
+		tmp = s_op->u.io.flow_d->tmp_buffer;
+		gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (INT) to send=%d, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
+		s_op->resp.u.read_completion.result.buffer_sz = sizeof(int);
+		s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+		break;
+	    }
+	case (int)0x4c00080b:
+	    {
+		double *tmp;
+		tmp = s_op->u.io.flow_d->tmp_buffer;
+		gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (DOUBLE) to send=%lf, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
+		s_op->resp.u.read_completion.result.buffer_sz = sizeof(double);
+		s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+		break;
+	    }
+	}
+    }
+    else /* AS */
+	s_op->resp.u.write_completion.total_completed = 
+	    s_op->u.io.flow_d->total_transferred; /* AS */
 
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.flow_d->total_transferred);
     err = PINT_encode(
         &s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
         s_op->addr, s_op->decoded.enc_type);
 
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 0
     if (err < 0)
     {
         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
@@ -381,6 +608,8 @@ static PINT_sm_action io_send_completion
         server_job_context, user_opts->server_job_bmi_timeout,
         s_op->req->hints);
 
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 1
+
     return err;
 }
 
@@ -403,7 +632,7 @@ struct PINT_server_req_params pvfs2_io_p
     .access_type = PINT_server_req_access_io,
     .sched_policy = PINT_SERVER_REQ_SCHEDULE,
     .get_object_ref = PINT_get_object_ref_io,
-    .state_machine = &pvfs2_io_sm
+    .state_machine = &pvfs2_io_sm,
 };
 
 /*

Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/module.mk.in,v
diff -p -u -r1.55 -r1.55.6.1
--- module.mk.in	20 Nov 2008 01:17:10 -0000	1.55
+++ module.mk.in	14 Apr 2009 20:19:50 -0000	1.55.6.1
@@ -43,7 +43,9 @@ ifdef BUILD_SERVER
 		$(DIR)/list-eattr.c \
 		$(DIR)/unexpected.c \
 		$(DIR)/precreate-pool-refiller.c \
-		$(DIR)/unstuff.c
+		$(DIR)/unstuff.c \
+		$(DIR)/read.c \
+		$(DIR)/write.c
 
 	# c files that should be added to server library
 	SERVERSRC += \

Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155 -r1.155.6.1
--- pvfs2-server.h	29 Jan 2009 05:24:04 -0000	1.155
+++ pvfs2-server.h	14 Apr 2009 20:19:50 -0000	1.155.6.1
@@ -32,6 +32,7 @@
 #include "pvfs2-req-proto.h"
 #include "state-machine.h"
 #include "pint-event.h"
+#include "pint-segpool.h"
 
 extern job_context_id server_job_context;
 
@@ -335,6 +336,32 @@ struct PINT_server_io_op
     flow_descriptor* flow_d;
 };
 
+/* This is for flow state machine */
+/* not sure if this is the right place */
+struct PINT_server_flow_read_op
+{
+    struct fp_queue_item *q_item;
+    PINT_segpool_handle_t seg_handle;
+    PINT_segpool_unit_id id;
+    PVFS_offset *offsets;
+    PINT_Request_state *file_req_state;
+    PVFS_size *sizes;
+    int segs;
+    int parallel_sms;
+};
+ 
+struct PINT_server_flow_write_op
+{
+    struct fp_queue_item *q_item;
+    PINT_segpool_handle_t seg_handle;
+    PINT_segpool_unit_id id;
+    PVFS_offset *offsets;
+    PINT_Request_state *file_req_state;
+    PVFS_size *sizes;
+    int segs;
+    int parallel_sms;
+};
+
 struct PINT_server_small_io_op
 {
     PVFS_offset offsets[IO_MAX_REGIONS];
@@ -470,6 +497,8 @@ typedef struct PINT_server_op
 	struct PINT_server_rmdirent_op rmdirent;
 	struct PINT_server_io_op io;
         struct PINT_server_small_io_op small_io;
+	struct PINT_server_flow_read_op flow_read; /* for read sm */
+	struct PINT_server_flow_write_op flow_write; /* for write sm */
 	struct PINT_server_flush_op flush;
 	struct PINT_server_truncate_op truncate;
 	struct PINT_server_mkdir_op mkdir;



More information about the Pvfs2-cvs mailing list