[Pvfs2-cvs] commit by sson in pvfs2/src/client/sysint: client-state-machine.h sys-io.sm

CVS commit program cvs at parl.clemson.edu
Tue Apr 14 16:19:48 EDT 2009


Update of /projects/cvsroot/pvfs2/src/client/sysint
In directory parlweb1:/tmp/cvs-serv13438/src/client/sysint

Modified Files:
      Tag: as-branch
	client-state-machine.h sys-io.sm 
Log Message:



Index: client-state-machine.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.h,v
diff -p -u -r1.177 -r1.177.6.1
--- client-state-machine.h	2 Feb 2009 18:47:42 -0000	1.177
+++ client-state-machine.h	14 Apr 2009 20:19:48 -0000	1.177.6.1
@@ -161,6 +161,7 @@ typedef struct PINT_client_io_ctx
 
     PINT_sm_msgpair_state msg;
     PINT_client_sm_recv_state write_ack;
+    PINT_client_sm_recv_state read_ack; /* AS */
 
     /*
       all *_has_been_posted fields are used at io_analyze_results time
@@ -170,6 +171,7 @@ typedef struct PINT_client_io_ctx
     int msg_recv_has_been_posted;
     int flow_has_been_posted;
     int write_ack_has_been_posted;
+    int read_ack_has_been_posted; /* AS */
 
     /*
       all *_in_progress fields are used at cancellation time to
@@ -179,6 +181,7 @@ typedef struct PINT_client_io_ctx
     int msg_recv_in_progress;
     int flow_in_progress;
     int write_ack_in_progress;
+    int read_ack_in_progress; /* AS */
 
 } PINT_client_io_ctx;
 
@@ -190,6 +193,9 @@ struct PINT_client_io_sm
     PVFS_offset file_req_offset;
     void *buffer;
     PVFS_Request mem_req;
+    int op; /* AS */
+    int datatype; /* AS */
+    void *tmp_buffer; /* AS */
 
     /* output parameter */
     PVFS_sysresp_io *io_resp_p;
@@ -203,6 +209,7 @@ struct PINT_client_io_sm
     int msgpair_completion_count;
     int flow_completion_count;
     int write_ack_completion_count;
+    int read_ack_completion_count; /* AS */
 
     PINT_client_io_ctx *contexts;
     int context_count;

Index: sys-io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-io.sm,v
diff -p -u -r1.164 -r1.164.4.1
--- sys-io.sm	9 Feb 2009 16:36:08 -0000	1.164
+++ sys-io.sm	14 Apr 2009 20:19:48 -0000	1.164.4.1
@@ -26,6 +26,7 @@
 
 #define IO_MAX_SEGMENT_NUM 50 
 #define IO_ATTR_MASKS (PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE)
+#define PVFS2_SMALL_IO_OFF 1 /* AS: for now, ignore small_io */
 
 extern job_context_id pint_client_sm_context;
 
@@ -57,12 +58,15 @@ static inline int io_post_flow(
 static inline int io_post_write_ack_recv(
     PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
 
+static inline int io_post_read_ack_recv(
+    PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx); /* AS */
+
 static inline int io_process_context_recv(
     PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
 
 static inline int io_check_context_status(
     PINT_client_io_ctx *cur_ctx, int io_type,
-    PVFS_size *total_size);
+    PVFS_size *total_size, PINT_client_sm *sm_p /* AS */);
 
 static int io_find_target_datafiles(
     PVFS_Request mem_req,
@@ -347,6 +351,109 @@ PVFS_error PVFS_isys_io(
         smcb,  op_id, user_ptr);
 }
 
+/* AS */
+/** Initiate a read or write operation.
+ *
+ *  \param type specifies if the operation is a read or write.
+ */
+PVFS_error PVFS_isys_io_ex(
+    PVFS_object_ref ref,
+    PVFS_Request file_req,
+    PVFS_offset file_req_offset,
+    void *buffer,
+    PVFS_Request mem_req,
+    const PVFS_credentials *credentials,
+    PVFS_sysresp_io *resp_p,
+    enum PVFS_io_type io_type,
+    PVFS_sys_op_id *op_id,
+    int datatype, /* AS */
+    int op, /* AS */
+    PVFS_hint hints,
+    void *user_ptr)
+{
+    PVFS_error ret = -PVFS_EINVAL;
+    PINT_smcb *smcb = NULL;
+    PINT_client_sm *sm_p = NULL;
+    struct filesystem_configuration_s* cur_fs = NULL;
+    struct server_configuration_s *server_config = NULL;
+
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io_ex entered [%llu]\n",
+                 llu(ref.handle));
+
+    if ((ref.handle == PVFS_HANDLE_NULL) ||
+        (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
+    {
+        gossip_err("invalid (NULL) required argument\n");
+        return ret;
+    }
+
+    if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
+    {
+        gossip_err("invalid (unknown) I/O type specified\n");
+        return ret;
+    }
+
+    server_config = PINT_get_server_config_struct(ref.fs_id);
+    cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
+    PINT_put_server_config_struct(server_config);
+
+    if (!cur_fs)
+    {
+        gossip_err("invalid (unknown) fs id specified\n");
+        return ret;
+    }
+
+    /* look for zero byte operations */
+    if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
+        (PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
+    {
+        gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
+                      "attempted.\n");
+        resp_p->total_completed = 0;
+        return 1; 
+    }
+
+    PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
+             sizeof(struct PINT_client_sm),
+             client_op_state_get_machine,
+             client_state_machine_terminate,
+             pint_client_sm_context);
+    if (smcb == NULL)
+    {
+        return -PVFS_ENOMEM;
+    }
+    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    PINT_init_msgarray_params(sm_p, ref.fs_id);
+    PINT_init_sysint_credentials(sm_p->cred_p, credentials);
+    sm_p->u.io.io_type = io_type;
+    sm_p->u.io.file_req = file_req;
+    sm_p->u.io.file_req_offset = file_req_offset;
+    sm_p->u.io.io_resp_p = resp_p;
+    sm_p->u.io.mem_req = mem_req;
+    sm_p->u.io.buffer = buffer; 
+    sm_p->u.io.flowproto_type = cur_fs->flowproto;
+    sm_p->u.io.encoding = cur_fs->encoding;
+    sm_p->u.io.stored_error_code = 0;
+    sm_p->u.io.retry_count = 0;
+    sm_p->msgarray_op.msgarray = NULL;
+    sm_p->msgarray_op.count = 0;
+    sm_p->u.io.datafile_index_array = NULL;
+    sm_p->u.io.datafile_count = 0;
+    sm_p->u.io.total_size = 0;
+    sm_p->u.io.small_io = 0;
+    sm_p->u.io.op = op; /* AS */
+    sm_p->u.io.datatype = datatype; /* AS */
+    sm_p->object_ref = ref;
+    PVFS_hint_copy(hints, &sm_p->hints);
+    PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
+
+    return PINT_client_state_machine_post(
+        smcb,  op_id, user_ptr);
+}
+/* AS */
+
+
 /** Perform a read or write operation.
  *
  *  \param type specifies if the operation is a read or write.
@@ -390,6 +497,57 @@ PVFS_error PVFS_sys_io(
     return error;
 }
 
+/* AS */
+/** Perform a read or write operation.
+ *
+ *  \param type specifies if the operation is a read or write.
+ */
+PVFS_error PVFS_sys_io_ex(
+    PVFS_object_ref ref,
+    PVFS_Request file_req,
+    PVFS_offset file_req_offset,
+    void *buffer,
+    PVFS_Request mem_req,
+    const PVFS_credentials *credentials,
+    int datatype, /* AS */
+    int op, /* AS */
+    PVFS_sysresp_io *resp_p,
+    enum PVFS_io_type io_type,
+    PVFS_hint hints)
+{
+    PVFS_error ret = -PVFS_EINVAL, error = 0;
+    PVFS_sys_op_id op_id;
+
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io_ex entered\n");
+
+    /* AS */
+    ret = PVFS_isys_io_ex(ref, file_req, file_req_offset, buffer, mem_req,
+			  credentials, resp_p, io_type, &op_id, 
+			  datatype /* AS */, op /* AS */, hints, NULL);
+    /* AS */
+
+    if (ret == 1)
+        return 0;
+    else if (ret < 0)
+    {
+        PVFS_perror_gossip("PVFS_isys_io_ex call", ret);
+        error = ret;
+    }
+    else 
+    {
+        ret = PVFS_sys_wait(op_id, "io", &error);
+        if (ret)
+        {
+            PVFS_perror_gossip("PVFS_sys_wait call", ret);
+            error = ret;
+        }
+        PINT_sys_release(op_id);
+    } 
+
+    return error;
+}
+/* AS */
+
 /*******************************************************************/
 
 static PINT_sm_action io_init(
@@ -616,6 +774,11 @@ static PINT_sm_action io_datafile_setup_
                  "  %s: %d datafiles "
                  "might have data\n", __func__, target_datafile_count);
 
+    
+    for (i=0; i < target_datafile_count; i++) /* AS */
+	gossip_debug(GOSSIP_IO_DEBUG, "  %s: dfile_array[%d]=%lu\n",
+		     __func__, target_datafile_count, attr->u.meta.dfile_array[i]); /* AS */
+
     /* look at sio_array and sio_count to see if there are any
      * servers that we can do small I/O to, instead of setting up
      * flows.  For now, we're going to stick with the semantics that
@@ -628,6 +791,7 @@ static PINT_sm_action io_datafile_setup_
     if(sio_count == target_datafile_count)
     {
         gossip_debug(GOSSIP_IO_DEBUG, "  %s: doing small I/O\n", __func__);
+	gossip_debug(GOSSIP_IO_DEBUG, "  %s: sio_count=%d, target_datafile_count=%d\n", __func__, sio_count, target_datafile_count); /* AS */
 
         sm_p->u.io.small_io = 1;
         js_p->error_code = IO_DO_SMALL_IO;
@@ -662,6 +826,9 @@ static PINT_sm_action io_datafile_setup_
             sm_p->u.io.file_req,
             sm_p->u.io.file_req_offset,
             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
+	    sm_p->u.io.op, /* AS:  */
+	    sm_p->u.io.datatype, /* AS */
+	    attr->u.meta.dfile_array, /* AS */
             sm_p->hints);
     }
 
@@ -924,6 +1091,7 @@ static PINT_sm_action io_datafile_post_m
      * go sleep then try the remaining msgpairs again */
     if (sm_p->u.io.msgpair_completion_count
      || sm_p->u.io.flow_completion_count
+	|| sm_p->u.io.read_ack_completion_count /* AS */
      || sm_p->u.io.write_ack_completion_count)
         return SM_ACTION_DEFERRED;  /* means go find another machine to run */
     else {
@@ -983,6 +1151,7 @@ static PINT_sm_action io_datafile_comple
     assert(sm_p->u.io.msgpair_completion_count > -1);
     assert(sm_p->u.io.flow_completion_count > -1);
     assert(sm_p->u.io.write_ack_completion_count > -1);
+    assert(sm_p->u.io.read_ack_completion_count > -1); /* AS */
 
     attr = &sm_p->getattr.attr;
     assert(attr);
@@ -1043,7 +1212,18 @@ static PINT_sm_action io_datafile_comple
             goto check_next_step;
         }
 
-        if(sm_p->u.io.io_type == PVFS_IO_WRITE)
+	/* AS */
+	if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0)
+	    {
+		ret = io_post_read_ack_recv(smcb, cur_ctx);
+		if(ret < 0)
+		    {
+			PVFS_perror_gossip("Post of read-ack recv failed", ret);
+			js_p->error_code = ret;
+			goto check_next_step;
+		    }
+	    } /* AS */
+        else if(sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
         {
             /* we expect this write to _not_ succeed immediately, because we
              * have not posted the flow yet.
@@ -1126,6 +1306,22 @@ static PINT_sm_action io_datafile_comple
             */
             assert((ret == 0) || (ret == -PVFS_EINVAL));
         }
+	else if (cur_ctx->read_ack_in_progress) /* AS */
+	    {
+		int ret = 0;
+		
+		assert(sm_p->u.io.read_ack_completion_count);
+		server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
+		ret = job_reset_timeout(cur_ctx->read_ack.recv_id,
+					server_config->client_job_bmi_timeout);
+		PINT_put_server_config_struct(server_config);
+		
+		/*
+		  allow -PVFS_EINVAL errors in case the recv has already
+		  completed (before we've processed it)
+		*/
+		assert((ret == 0) || (ret == -PVFS_EINVAL));
+	    } /* AS */
 
         gossip_debug(GOSSIP_IO_DEBUG, "  matched completed flow for "
                      "context %p%s\n", cur_ctx,
@@ -1164,25 +1360,45 @@ static PINT_sm_action io_datafile_comple
     }
     else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
     {
-        assert(sm_p->u.io.write_ack_completion_count);
-
+	if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0) /* AS */
+	    assert(sm_p->u.io.read_ack_completion_count); /* AS */
+	else if (sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
+	    assert(sm_p->u.io.write_ack_completion_count); /* AS */
+	
         index = STATUS_USER_TAG_GET_INDEX(
             status_user_tag, IO_SM_PHASE_FINAL_ACK);
         cur_ctx = &sm_p->u.io.contexts[index];
         assert(cur_ctx);
 
-        assert(cur_ctx->write_ack.recv_status.actual_size <=
-               cur_ctx->write_ack.max_resp_sz);
-
-        cur_ctx->write_ack.recv_id = 0;
-        cur_ctx->write_ack.recv_status = *js_p;
+	if(sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
+	    assert(cur_ctx->write_ack.recv_status.actual_size <=
+		   cur_ctx->write_ack.max_resp_sz);
+	else /* AS */
+	    assert(cur_ctx->read_ack.recv_status.actual_size <=
+		   cur_ctx->read_ack.max_resp_sz); /* AS */
+
+	if(sm_p->u.io.io_type == PVFS_IO_WRITE) { /* AS */
+	    cur_ctx->write_ack.recv_id = 0; /* AS */
+	    cur_ctx->write_ack.recv_status = *js_p; /* AS */
+	}
+	else { /* AS */
+	    cur_ctx->read_ack.recv_id = 0;
+            cur_ctx->read_ack.recv_status = *js_p;
+        } /* AS */
 
         gossip_debug(GOSSIP_IO_DEBUG, "  matched completed ack for "
                      "context %p\n", cur_ctx);
 
-        cur_ctx->write_ack_in_progress = 0;
-        sm_p->u.io.write_ack_completion_count--;
-        assert(sm_p->u.io.write_ack_completion_count > -1);
+	if(sm_p->u.io.io_type == PVFS_IO_WRITE) { /* AS */
+	    cur_ctx->write_ack_in_progress = 0;
+	    sm_p->u.io.write_ack_completion_count--;
+	    assert(sm_p->u.io.write_ack_completion_count > -1);
+	}
+	else if(sm_p->u.io.io_type == PVFS_IO_READ) { /* AS */
+            cur_ctx->read_ack_in_progress = 0;
+            sm_p->u.io.read_ack_completion_count--;
+            assert(sm_p->u.io.read_ack_completion_count > -1);
+        }
 
         if (js_p->error_code < 0) {
             gossip_debug(GOSSIP_IO_DEBUG,
@@ -1216,6 +1432,7 @@ static PINT_sm_action io_datafile_comple
      */
     if (sm_p->u.io.msgpair_completion_count
      || sm_p->u.io.flow_completion_count
+	|| sm_p->u.io.read_ack_completion_count /* AS */
      || sm_p->u.io.write_ack_completion_count) {
         if (PINT_smcb_cancelled(smcb))
             gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
@@ -1303,7 +1520,7 @@ static PINT_sm_action io_analyze_results
                 assert(cur_ctx);
 
                 ret = io_check_context_status(
-                    cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size);
+					      cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size, sm_p /* AS */); /* AS */
                 if (ret < 0)
                 {
                     if (ret == -PVFS_ECANCEL)
@@ -1356,7 +1573,8 @@ static PINT_sm_action io_analyze_results
     /* be sure there are no jobs still laying around */
     assert((sm_p->u.io.msgpair_completion_count == 0) &&
            (sm_p->u.io.flow_completion_count == 0) &&
-           (sm_p->u.io.write_ack_completion_count == 0));
+           (sm_p->u.io.write_ack_completion_count == 0) &&
+	   (sm_p->u.io.read_ack_completion_count == 0)); /* AS */
 
     /*
       FIXME: non bmi errors pop out in flow failures above -- they are
@@ -1422,6 +1640,26 @@ static PINT_sm_action io_analyze_results
         goto analyze_results_exit;
     }
 
+    if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0) /* AS */
+    {
+        js_p->error_code = 0;
+        sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
+
+        /* we don't know if the file size has changed here so we invalidate
+         * the size in the attribute cache.  The only sure-fire way to
+         * recompute the file size (if our write was past eof) is to
+         * get all the datafile sizes (we could have written to what was
+         * previously a hole).  That's too expensive for just a cached
+         * size update.
+         */
+        PINT_acache_invalidate_size(sm_p->object_ref);
+
+        /* we can skip the check for holes since its only needed in the
+         * case of reads
+         */
+        goto analyze_results_exit;
+    } /* AS */
+
     /* In order to give the sysint caller the correct value for length
      * of bytes read, we have to check for holes in the logical file.  
      * The algorithm is as follows:
@@ -1849,6 +2087,7 @@ static inline int io_post_flow(
         cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg.svr_addr;
         cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
         cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
+	cur_ctx->flow_desc.op = sm_p->u.io.op; /* AS: FIXME? */
     }
     else
     {
@@ -1983,6 +2222,66 @@ static inline int io_post_write_ack_recv
     return 0;
 }
 
+/* AS */
+static inline int io_post_read_ack_recv(
+    PINT_smcb *smcb,
+    PINT_client_io_ctx * cur_ctx)
+{
+    PINT_client_sm * sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret;
+    unsigned long status_user_tag;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "  preposting read "
+                 "ack for context %p.\n", cur_ctx);
+
+    cur_ctx->read_ack.max_resp_sz = PINT_encode_calc_max_size(
+        PINT_ENCODE_RESP, PVFS_SERV_READ_COMPLETION,
+        sm_p->u.io.encoding);
+    cur_ctx->read_ack.encoded_resp_p = BMI_memalloc(
+        cur_ctx->msg.svr_addr, cur_ctx->read_ack.max_resp_sz,
+        BMI_RECV);
+
+    if (!cur_ctx->read_ack.encoded_resp_p)
+    {
+        gossip_err("BMI_memalloc (for read ack) failed\n");
+        return -PVFS_ENOMEM;
+    }
+
+    /*
+       we're pre-posting the final read ack here, even though it's
+       ahead of the flow phase; reads are at the flow phase.
+
+       the timeout used here is a scaling one that needs to be long
+       enough for the entire flow to occur
+       */
+    status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
+
+    /*
+       pre-post this recv with an infinite timeout and adjust it
+       after the flow completes since we don't know how long a flow
+       can take at this point
+       */ 
+    ret = job_bmi_recv(
+        cur_ctx->msg.svr_addr, cur_ctx->read_ack.encoded_resp_p,
+        cur_ctx->read_ack.max_resp_sz, cur_ctx->session_tag,
+        BMI_PRE_ALLOC, smcb, status_user_tag,
+        &cur_ctx->read_ack.recv_status, &cur_ctx->read_ack.recv_id,
+        pint_client_sm_context, JOB_TIMEOUT_INF, sm_p->hints /* AS */);
+
+    if (ret < 0)
+    {
+        gossip_err("job_bmi_recv (read ack) failed\n");
+        return ret;
+    }
+
+    assert(ret == 0);
+    cur_ctx->read_ack_has_been_posted = 1;
+    cur_ctx->read_ack_in_progress = 1;
+    sm_p->u.io.read_ack_completion_count++;
+
+    return 0;
+} /* AS */
+
 /*
   returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
   and -PVFS_error on failure
@@ -2136,7 +2435,8 @@ static inline int io_process_context_rec
 static inline int io_check_context_status(
     PINT_client_io_ctx *cur_ctx,
     int io_type,
-    PVFS_size *total_size)
+    PVFS_size *total_size,
+    PINT_client_sm *sm_p /* AS */)
 {
     int ret = 0;
 
@@ -2158,6 +2458,141 @@ static inline int io_check_context_statu
                      cur_ctx->msg.recv_status.error_code, cur_ctx);
         ret = cur_ctx->msg.recv_status.error_code;
     }
+    /* AS *************** */
+    else if (io_type == PVFS_IO_READ && cur_ctx->flow_desc.op != 0) /* AS */
+    {
+        /* we check the read ack status before the flow status so that the
+         * error code that the server reported in the ack takes precedence
+         */
+        if (cur_ctx->read_ack.recv_status.error_code)
+        {
+            gossip_debug(
+                GOSSIP_IO_DEBUG,
+                "  error (%d) in final ack for context %p\n",
+                cur_ctx->read_ack.recv_status.error_code, cur_ctx);
+
+            assert(cur_ctx->read_ack_has_been_posted);
+            ret = cur_ctx->read_ack.recv_status.error_code;
+        }
+        else if (cur_ctx->read_ack_has_been_posted)
+        {
+            struct PINT_decoded_msg decoded_resp;
+            struct PVFS_server_resp *resp = NULL;
+            /*
+              size for writes are reported in the final ack, but we
+              have to decode it first
+            */
+            ret = PINT_serv_decode_resp(
+                cur_ctx->msg.fs_id, cur_ctx->read_ack.encoded_resp_p,
+                &decoded_resp, &cur_ctx->msg.svr_addr,
+                cur_ctx->read_ack.recv_status.actual_size, &resp);
+            if (ret == 0)
+            {
+                gossip_debug(
+                    GOSSIP_IO_DEBUG,
+                    "  %lld bytes read to context %p\n",
+                    lld(resp->u.read_completion.total_completed),
+                    cur_ctx);
+		switch(sm_p->u.io.datatype) {
+		case (int)0x4c000405: /* MPI_INT */
+		    {
+			if (sm_p->u.io.tmp_buffer == NULL) {
+			    sm_p->u.io.tmp_buffer = (void *)malloc(sizeof(int)); /* AS */
+			    memset(sm_p->u.io.tmp_buffer, 0, sizeof(int));
+			}
+			int old, new, final;
+			memcpy(&old, sm_p->u.io.tmp_buffer, sizeof(int));
+			memcpy(&new, resp->u.read_completion.result.buffer, sizeof(int));
+			switch(sm_p->u.io.op) {
+			case 0x58000001: /* MAX */
+			    if(new > old)
+				final = new;
+			    gossip_debug(GOSSIP_IO_DEBUG, 
+					 "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+			    break;
+			case 0x58000002: /* MIN */
+			    if(new < old)
+				final = new;
+			    gossip_debug(GOSSIP_IO_DEBUG, 
+					 "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+			    break;
+			case 0x58000003: /* SUM */
+			    final = old + new;
+			    gossip_debug(GOSSIP_IO_DEBUG,
+					 "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+			    break;
+			default:
+			    break;
+			}
+			
+			sm_p->u.io.tmp_buffer = (void *)&final;
+			memcpy(cur_ctx->flow_desc.dest.u.mem.buffer, &final, sizeof(int)); /* AS */
+		    }
+		    break;
+		case (int)0x4c00080b: /* MPI_DOUBLE */
+		    {
+			if (sm_p->u.io.tmp_buffer == NULL) {
+			    sm_p->u.io.tmp_buffer = (void *)malloc(sizeof(double)); /* AS */
+			    memset(sm_p->u.io.tmp_buffer, 0, sizeof(double));
+			}
+			double old, new, final;
+			memcpy(&old, sm_p->u.io.tmp_buffer, sizeof(double)); /* old value */
+			memcpy(&new, resp->u.read_completion.result.buffer, sizeof(double)); /* new value */
+			switch(sm_p->u.io.op) {
+			case 0x58000001: /* MAX */
+			    if(new > old)
+				final = new;
+			    gossip_debug(GOSSIP_IO_DEBUG, 
+					 "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+			    break;
+			case 0x58000002: /* MIN */
+			    if(new < old)
+				final = new;
+			    gossip_debug(GOSSIP_IO_DEBUG, 
+					 "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+			    break;
+			case 0x58000003: /* SUM */
+			    final = old + new;
+			    gossip_debug(GOSSIP_IO_DEBUG,
+					 "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+			    break;
+			default:
+			    break;
+			}
+
+			sm_p->u.io.tmp_buffer = (void *)&final;
+			memcpy(cur_ctx->flow_desc.dest.u.mem.buffer, &final, sizeof(double)); /* AS */
+		    }
+		    break;
+		default:
+		    break;
+		}
+		*total_size += resp->u.read_completion.total_completed;
+                
+                /* pass along the error code from the server as well */
+                ret = resp->status;
+
+                PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
+            }
+            else
+            {
+                PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
+            }
+
+            PINT_flow_reset(&cur_ctx->flow_desc);
+            BMI_memfree(cur_ctx->msg.svr_addr,
+                        cur_ctx->read_ack.encoded_resp_p,
+                        cur_ctx->read_ack.max_resp_sz, BMI_RECV);
+        }
+        else if (cur_ctx->flow_status.error_code)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG,
+                         "  error (%d) in flow for context %p\n",
+                         cur_ctx->flow_status.error_code, cur_ctx);
+            PINT_flow_reset(&cur_ctx->flow_desc);
+            ret = cur_ctx->flow_status.error_code;
+        }
+    } /* AS */
     else if (io_type == PVFS_IO_WRITE)
     {
         /* we check the write ack status before the flow status so that the



More information about the Pvfs2-cvs mailing list