[Pvfs2-cvs] commit by sson in pvfs2/src/client/sysint:
client-state-machine.h sys-io.sm
CVS commit program
cvs at parl.clemson.edu
Tue Apr 14 16:19:48 EDT 2009
Update of /projects/cvsroot/pvfs2/src/client/sysint
In directory parlweb1:/tmp/cvs-serv13438/src/client/sysint
Modified Files:
Tag: as-branch
client-state-machine.h sys-io.sm
Log Message:
Index: client-state-machine.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.h,v
diff -p -u -r1.177 -r1.177.6.1
--- client-state-machine.h 2 Feb 2009 18:47:42 -0000 1.177
+++ client-state-machine.h 14 Apr 2009 20:19:48 -0000 1.177.6.1
@@ -161,6 +161,7 @@ typedef struct PINT_client_io_ctx
PINT_sm_msgpair_state msg;
PINT_client_sm_recv_state write_ack;
+ PINT_client_sm_recv_state read_ack; /* AS */
/*
all *_has_been_posted fields are used at io_analyze_results time
@@ -170,6 +171,7 @@ typedef struct PINT_client_io_ctx
int msg_recv_has_been_posted;
int flow_has_been_posted;
int write_ack_has_been_posted;
+ int read_ack_has_been_posted; /* AS */
/*
all *_in_progress fields are used at cancellation time to
@@ -179,6 +181,7 @@ typedef struct PINT_client_io_ctx
int msg_recv_in_progress;
int flow_in_progress;
int write_ack_in_progress;
+ int read_ack_in_progress; /* AS */
} PINT_client_io_ctx;
@@ -190,6 +193,9 @@ struct PINT_client_io_sm
PVFS_offset file_req_offset;
void *buffer;
PVFS_Request mem_req;
+ int op; /* AS */
+ int datatype; /* AS */
+ void *tmp_buffer; /* AS */
/* output parameter */
PVFS_sysresp_io *io_resp_p;
@@ -203,6 +209,7 @@ struct PINT_client_io_sm
int msgpair_completion_count;
int flow_completion_count;
int write_ack_completion_count;
+ int read_ack_completion_count; /* AS */
PINT_client_io_ctx *contexts;
int context_count;
Index: sys-io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-io.sm,v
diff -p -u -r1.164 -r1.164.4.1
--- sys-io.sm 9 Feb 2009 16:36:08 -0000 1.164
+++ sys-io.sm 14 Apr 2009 20:19:48 -0000 1.164.4.1
@@ -26,6 +26,7 @@
#define IO_MAX_SEGMENT_NUM 50
#define IO_ATTR_MASKS (PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE)
+#define PVFS2_SMALL_IO_OFF 1 /* AS: for now, ignore small_io */
extern job_context_id pint_client_sm_context;
@@ -57,12 +58,15 @@ static inline int io_post_flow(
static inline int io_post_write_ack_recv(
PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
+static inline int io_post_read_ack_recv(
+ PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx); /* AS */
+
static inline int io_process_context_recv(
PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
static inline int io_check_context_status(
PINT_client_io_ctx *cur_ctx, int io_type,
- PVFS_size *total_size);
+ PVFS_size *total_size, PINT_client_sm *sm_p /* AS */);
static int io_find_target_datafiles(
PVFS_Request mem_req,
@@ -347,6 +351,109 @@ PVFS_error PVFS_isys_io(
smcb, op_id, user_ptr);
}
+/* AS */
+/** Initiate a read or write operation.
+ *
+ * \param type specifies if the operation is a read or write.
+ */
+PVFS_error PVFS_isys_io_ex(
+ PVFS_object_ref ref,
+ PVFS_Request file_req,
+ PVFS_offset file_req_offset,
+ void *buffer,
+ PVFS_Request mem_req,
+ const PVFS_credentials *credentials,
+ PVFS_sysresp_io *resp_p,
+ enum PVFS_io_type io_type,
+ PVFS_sys_op_id *op_id,
+ int datatype, /* AS */
+ int op, /* AS */
+ PVFS_hint hints,
+ void *user_ptr)
+{
+ PVFS_error ret = -PVFS_EINVAL;
+ PINT_smcb *smcb = NULL;
+ PINT_client_sm *sm_p = NULL;
+ struct filesystem_configuration_s* cur_fs = NULL;
+ struct server_configuration_s *server_config = NULL;
+
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io_ex entered [%llu]\n",
+ llu(ref.handle));
+
+ if ((ref.handle == PVFS_HANDLE_NULL) ||
+ (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
+ {
+ gossip_err("invalid (NULL) required argument\n");
+ return ret;
+ }
+
+ if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
+ {
+ gossip_err("invalid (unknown) I/O type specified\n");
+ return ret;
+ }
+
+ server_config = PINT_get_server_config_struct(ref.fs_id);
+ cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
+ PINT_put_server_config_struct(server_config);
+
+ if (!cur_fs)
+ {
+ gossip_err("invalid (unknown) fs id specified\n");
+ return ret;
+ }
+
+ /* look for zero byte operations */
+ if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
+ (PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
+ {
+ gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
+ "attempted.\n");
+ resp_p->total_completed = 0;
+ return 1;
+ }
+
+ PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
+ sizeof(struct PINT_client_sm),
+ client_op_state_get_machine,
+ client_state_machine_terminate,
+ pint_client_sm_context);
+ if (smcb == NULL)
+ {
+ return -PVFS_ENOMEM;
+ }
+ sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+ PINT_init_msgarray_params(sm_p, ref.fs_id);
+ PINT_init_sysint_credentials(sm_p->cred_p, credentials);
+ sm_p->u.io.io_type = io_type;
+ sm_p->u.io.file_req = file_req;
+ sm_p->u.io.file_req_offset = file_req_offset;
+ sm_p->u.io.io_resp_p = resp_p;
+ sm_p->u.io.mem_req = mem_req;
+ sm_p->u.io.buffer = buffer;
+ sm_p->u.io.flowproto_type = cur_fs->flowproto;
+ sm_p->u.io.encoding = cur_fs->encoding;
+ sm_p->u.io.stored_error_code = 0;
+ sm_p->u.io.retry_count = 0;
+ sm_p->msgarray_op.msgarray = NULL;
+ sm_p->msgarray_op.count = 0;
+ sm_p->u.io.datafile_index_array = NULL;
+ sm_p->u.io.datafile_count = 0;
+ sm_p->u.io.total_size = 0;
+ sm_p->u.io.small_io = 0;
+ sm_p->u.io.op = op; /* AS */
+ sm_p->u.io.datatype = datatype; /* AS */
+ sm_p->object_ref = ref;
+ PVFS_hint_copy(hints, &sm_p->hints);
+ PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
+
+ return PINT_client_state_machine_post(
+ smcb, op_id, user_ptr);
+}
+/* AS */
+
+
/** Perform a read or write operation.
*
* \param type specifies if the operation is a read or write.
@@ -390,6 +497,57 @@ PVFS_error PVFS_sys_io(
return error;
}
+/* AS */
+/** Perform a read or write operation.
+ *
+ * \param type specifies if the operation is a read or write.
+ */
+PVFS_error PVFS_sys_io_ex(
+ PVFS_object_ref ref,
+ PVFS_Request file_req,
+ PVFS_offset file_req_offset,
+ void *buffer,
+ PVFS_Request mem_req,
+ const PVFS_credentials *credentials,
+ int datatype, /* AS */
+ int op, /* AS */
+ PVFS_sysresp_io *resp_p,
+ enum PVFS_io_type io_type,
+ PVFS_hint hints)
+{
+ PVFS_error ret = -PVFS_EINVAL, error = 0;
+ PVFS_sys_op_id op_id;
+
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io_ex entered\n");
+
+ /* AS */
+ ret = PVFS_isys_io_ex(ref, file_req, file_req_offset, buffer, mem_req,
+ credentials, resp_p, io_type, &op_id,
+ datatype /* AS */, op /* AS */, hints, NULL);
+ /* AS */
+
+ if (ret == 1)
+ return 0;
+ else if (ret < 0)
+ {
+ PVFS_perror_gossip("PVFS_isys_io_ex call", ret);
+ error = ret;
+ }
+ else
+ {
+ ret = PVFS_sys_wait(op_id, "io", &error);
+ if (ret)
+ {
+ PVFS_perror_gossip("PVFS_sys_wait call", ret);
+ error = ret;
+ }
+ PINT_sys_release(op_id);
+ }
+
+ return error;
+}
+/* AS */
+
/*******************************************************************/
static PINT_sm_action io_init(
@@ -616,6 +774,11 @@ static PINT_sm_action io_datafile_setup_
" %s: %d datafiles "
"might have data\n", __func__, target_datafile_count);
+
+ for (i=0; i < target_datafile_count; i++) /* AS */
+ gossip_debug(GOSSIP_IO_DEBUG, " %s: dfile_array[%d]=%lu\n",
+ __func__, target_datafile_count, attr->u.meta.dfile_array[i]); /* AS */
+
/* look at sio_array and sio_count to see if there are any
* servers that we can do small I/O to, instead of setting up
* flows. For now, we're going to stick with the semantics that
@@ -628,6 +791,7 @@ static PINT_sm_action io_datafile_setup_
if(sio_count == target_datafile_count)
{
gossip_debug(GOSSIP_IO_DEBUG, " %s: doing small I/O\n", __func__);
+ gossip_debug(GOSSIP_IO_DEBUG, " %s: sio_count=%d, target_datafile_count=%d\n", __func__, sio_count, target_datafile_count); /* AS */
sm_p->u.io.small_io = 1;
js_p->error_code = IO_DO_SMALL_IO;
@@ -662,6 +826,9 @@ static PINT_sm_action io_datafile_setup_
sm_p->u.io.file_req,
sm_p->u.io.file_req_offset,
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
+ sm_p->u.io.op, /* AS: */
+ sm_p->u.io.datatype, /* AS */
+ attr->u.meta.dfile_array, /* AS */
sm_p->hints);
}
@@ -924,6 +1091,7 @@ static PINT_sm_action io_datafile_post_m
* go sleep then try the remaining msgpairs again */
if (sm_p->u.io.msgpair_completion_count
|| sm_p->u.io.flow_completion_count
+ || sm_p->u.io.read_ack_completion_count /* AS */
|| sm_p->u.io.write_ack_completion_count)
return SM_ACTION_DEFERRED; /* means go find another machine to run */
else {
@@ -983,6 +1151,7 @@ static PINT_sm_action io_datafile_comple
assert(sm_p->u.io.msgpair_completion_count > -1);
assert(sm_p->u.io.flow_completion_count > -1);
assert(sm_p->u.io.write_ack_completion_count > -1);
+ assert(sm_p->u.io.read_ack_completion_count > -1); /* AS */
attr = &sm_p->getattr.attr;
assert(attr);
@@ -1043,7 +1212,18 @@ static PINT_sm_action io_datafile_comple
goto check_next_step;
}
- if(sm_p->u.io.io_type == PVFS_IO_WRITE)
+ /* AS */
+ if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0)
+ {
+ ret = io_post_read_ack_recv(smcb, cur_ctx);
+ if(ret < 0)
+ {
+ PVFS_perror_gossip("Post of read-ack recv failed", ret);
+ js_p->error_code = ret;
+ goto check_next_step;
+ }
+ } /* AS */
+ else if(sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
{
/* we expect this write to _not_ succeed immediately, because we
* have not posted the flow yet.
@@ -1126,6 +1306,22 @@ static PINT_sm_action io_datafile_comple
*/
assert((ret == 0) || (ret == -PVFS_EINVAL));
}
+ else if (cur_ctx->read_ack_in_progress) /* AS */
+ {
+ int ret = 0;
+
+ assert(sm_p->u.io.read_ack_completion_count);
+ server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
+ ret = job_reset_timeout(cur_ctx->read_ack.recv_id,
+ server_config->client_job_bmi_timeout);
+ PINT_put_server_config_struct(server_config);
+
+ /*
+ allow -PVFS_EINVAL errors in case the recv has already
+ completed (before we've processed it)
+ */
+ assert((ret == 0) || (ret == -PVFS_EINVAL));
+ } /* AS */
gossip_debug(GOSSIP_IO_DEBUG, " matched completed flow for "
"context %p%s\n", cur_ctx,
@@ -1164,25 +1360,45 @@ static PINT_sm_action io_datafile_comple
}
else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
{
- assert(sm_p->u.io.write_ack_completion_count);
-
+ if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0) /* AS */
+ assert(sm_p->u.io.read_ack_completion_count); /* AS */
+ else if (sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
+ assert(sm_p->u.io.write_ack_completion_count); /* AS */
+
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_FINAL_ACK);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
- assert(cur_ctx->write_ack.recv_status.actual_size <=
- cur_ctx->write_ack.max_resp_sz);
-
- cur_ctx->write_ack.recv_id = 0;
- cur_ctx->write_ack.recv_status = *js_p;
+ if(sm_p->u.io.io_type == PVFS_IO_WRITE) /* AS */
+ assert(cur_ctx->write_ack.recv_status.actual_size <=
+ cur_ctx->write_ack.max_resp_sz);
+ else /* AS */
+ assert(cur_ctx->read_ack.recv_status.actual_size <=
+ cur_ctx->read_ack.max_resp_sz); /* AS */
+
+ if(sm_p->u.io.io_type == PVFS_IO_WRITE) { /* AS */
+ cur_ctx->write_ack.recv_id = 0; /* AS */
+ cur_ctx->write_ack.recv_status = *js_p; /* AS */
+ }
+ else { /* AS */
+ cur_ctx->read_ack.recv_id = 0;
+ cur_ctx->read_ack.recv_status = *js_p;
+ } /* AS */
gossip_debug(GOSSIP_IO_DEBUG, " matched completed ack for "
"context %p\n", cur_ctx);
- cur_ctx->write_ack_in_progress = 0;
- sm_p->u.io.write_ack_completion_count--;
- assert(sm_p->u.io.write_ack_completion_count > -1);
+ if(sm_p->u.io.io_type == PVFS_IO_WRITE) { /* AS */
+ cur_ctx->write_ack_in_progress = 0;
+ sm_p->u.io.write_ack_completion_count--;
+ assert(sm_p->u.io.write_ack_completion_count > -1);
+ }
+ else if(sm_p->u.io.io_type == PVFS_IO_READ) { /* AS */
+ cur_ctx->read_ack_in_progress = 0;
+ sm_p->u.io.read_ack_completion_count--;
+ assert(sm_p->u.io.read_ack_completion_count > -1);
+ }
if (js_p->error_code < 0) {
gossip_debug(GOSSIP_IO_DEBUG,
@@ -1216,6 +1432,7 @@ static PINT_sm_action io_datafile_comple
*/
if (sm_p->u.io.msgpair_completion_count
|| sm_p->u.io.flow_completion_count
+ || sm_p->u.io.read_ack_completion_count /* AS */
|| sm_p->u.io.write_ack_completion_count) {
if (PINT_smcb_cancelled(smcb))
gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
@@ -1303,7 +1520,7 @@ static PINT_sm_action io_analyze_results
assert(cur_ctx);
ret = io_check_context_status(
- cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size);
+ cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size, sm_p /* AS */); /* AS */
if (ret < 0)
{
if (ret == -PVFS_ECANCEL)
@@ -1356,7 +1573,8 @@ static PINT_sm_action io_analyze_results
/* be sure there are no jobs still laying around */
assert((sm_p->u.io.msgpair_completion_count == 0) &&
(sm_p->u.io.flow_completion_count == 0) &&
- (sm_p->u.io.write_ack_completion_count == 0));
+ (sm_p->u.io.write_ack_completion_count == 0) &&
+ (sm_p->u.io.read_ack_completion_count == 0)); /* AS */
/*
FIXME: non bmi errors pop out in flow failures above -- they are
@@ -1422,6 +1640,26 @@ static PINT_sm_action io_analyze_results
goto analyze_results_exit;
}
+ if(sm_p->u.io.io_type == PVFS_IO_READ && sm_p->u.io.op != 0) /* AS */
+ {
+ js_p->error_code = 0;
+ sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
+
+ /* we don't know if the file size has changed here so we invalidate
+ * the size in the attribute cache. The only sure-fire way to
+ * recompute the file size (if our write was past eof) is to
+ * get all the datafile sizes (we could have written to what was
+ * previously a hole). That's too expensive for just a cached
+ * size update.
+ */
+ PINT_acache_invalidate_size(sm_p->object_ref);
+
+ /* we can skip the check for holes since its only needed in the
+ * case of reads
+ */
+ goto analyze_results_exit;
+ } /* AS */
+
/* In order to give the sysint caller the correct value for length
* of bytes read, we have to check for holes in the logical file.
* The algorithm is as follows:
@@ -1849,6 +2087,7 @@ static inline int io_post_flow(
cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg.svr_addr;
cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
+ cur_ctx->flow_desc.op = sm_p->u.io.op; /* AS: FIXME? */
}
else
{
@@ -1983,6 +2222,66 @@ static inline int io_post_write_ack_recv
return 0;
}
+/* AS */
+static inline int io_post_read_ack_recv(
+ PINT_smcb *smcb,
+ PINT_client_io_ctx * cur_ctx)
+{
+ PINT_client_sm * sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ int ret;
+ unsigned long status_user_tag;
+
+ gossip_debug(GOSSIP_IO_DEBUG, " preposting read "
+ "ack for context %p.\n", cur_ctx);
+
+ cur_ctx->read_ack.max_resp_sz = PINT_encode_calc_max_size(
+ PINT_ENCODE_RESP, PVFS_SERV_READ_COMPLETION,
+ sm_p->u.io.encoding);
+ cur_ctx->read_ack.encoded_resp_p = BMI_memalloc(
+ cur_ctx->msg.svr_addr, cur_ctx->read_ack.max_resp_sz,
+ BMI_RECV);
+
+ if (!cur_ctx->read_ack.encoded_resp_p)
+ {
+ gossip_err("BMI_memalloc (for read ack) failed\n");
+ return -PVFS_ENOMEM;
+ }
+
+ /*
+ we're pre-posting the final read ack here, even though it's
+ ahead of the flow phase; reads are at the flow phase.
+
+ the timeout used here is a scaling one that needs to be long
+ enough for the entire flow to occur
+ */
+ status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
+
+ /*
+ pre-post this recv with an infinite timeout and adjust it
+ after the flow completes since we don't know how long a flow
+ can take at this point
+ */
+ ret = job_bmi_recv(
+ cur_ctx->msg.svr_addr, cur_ctx->read_ack.encoded_resp_p,
+ cur_ctx->read_ack.max_resp_sz, cur_ctx->session_tag,
+ BMI_PRE_ALLOC, smcb, status_user_tag,
+ &cur_ctx->read_ack.recv_status, &cur_ctx->read_ack.recv_id,
+ pint_client_sm_context, JOB_TIMEOUT_INF, sm_p->hints /* AS */);
+
+ if (ret < 0)
+ {
+ gossip_err("job_bmi_recv (read ack) failed\n");
+ return ret;
+ }
+
+ assert(ret == 0);
+ cur_ctx->read_ack_has_been_posted = 1;
+ cur_ctx->read_ack_in_progress = 1;
+ sm_p->u.io.read_ack_completion_count++;
+
+ return 0;
+} /* AS */
+
/*
returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
and -PVFS_error on failure
@@ -2136,7 +2435,8 @@ static inline int io_process_context_rec
static inline int io_check_context_status(
PINT_client_io_ctx *cur_ctx,
int io_type,
- PVFS_size *total_size)
+ PVFS_size *total_size,
+ PINT_client_sm *sm_p /* AS */)
{
int ret = 0;
@@ -2158,6 +2458,141 @@ static inline int io_check_context_statu
cur_ctx->msg.recv_status.error_code, cur_ctx);
ret = cur_ctx->msg.recv_status.error_code;
}
+ /* AS *************** */
+ else if (io_type == PVFS_IO_READ && cur_ctx->flow_desc.op != 0) /* AS */
+ {
+ /* we check the read ack status before the flow status so that the
+ * error code that the server reported in the ack takes precedence
+ */
+ if (cur_ctx->read_ack.recv_status.error_code)
+ {
+ gossip_debug(
+ GOSSIP_IO_DEBUG,
+ " error (%d) in final ack for context %p\n",
+ cur_ctx->read_ack.recv_status.error_code, cur_ctx);
+
+ assert(cur_ctx->read_ack_has_been_posted);
+ ret = cur_ctx->read_ack.recv_status.error_code;
+ }
+ else if (cur_ctx->read_ack_has_been_posted)
+ {
+ struct PINT_decoded_msg decoded_resp;
+ struct PVFS_server_resp *resp = NULL;
+ /*
+ size for writes are reported in the final ack, but we
+ have to decode it first
+ */
+ ret = PINT_serv_decode_resp(
+ cur_ctx->msg.fs_id, cur_ctx->read_ack.encoded_resp_p,
+ &decoded_resp, &cur_ctx->msg.svr_addr,
+ cur_ctx->read_ack.recv_status.actual_size, &resp);
+ if (ret == 0)
+ {
+ gossip_debug(
+ GOSSIP_IO_DEBUG,
+ " %lld bytes read to context %p\n",
+ lld(resp->u.read_completion.total_completed),
+ cur_ctx);
+ switch(sm_p->u.io.datatype) {
+ case (int)0x4c000405: /* MPI_INT */
+ {
+ if (sm_p->u.io.tmp_buffer == NULL) {
+ sm_p->u.io.tmp_buffer = (void *)malloc(sizeof(int)); /* AS */
+ memset(sm_p->u.io.tmp_buffer, 0, sizeof(int));
+ }
+ int old, new, final;
+ memcpy(&old, sm_p->u.io.tmp_buffer, sizeof(int));
+ memcpy(&new, resp->u.read_completion.result.buffer, sizeof(int));
+ switch(sm_p->u.io.op) {
+ case 0x58000001: /* MAX */
+ if(new > old)
+ final = new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+ break;
+ case 0x58000002: /* MIN */
+ if(new < old)
+ final = new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+ break;
+ case 0x58000003: /* SUM */
+ final = old + new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(INT): old=%d, new=%d, final=%d\n", old, new, final); /* AS */
+ break;
+ default:
+ break;
+ }
+
+ sm_p->u.io.tmp_buffer = (void *)&final;
+ memcpy(cur_ctx->flow_desc.dest.u.mem.buffer, &final, sizeof(int)); /* AS */
+ }
+ break;
+ case (int)0x4c00080b: /* MPI_DOUBLE */
+ {
+ if (sm_p->u.io.tmp_buffer == NULL) {
+ sm_p->u.io.tmp_buffer = (void *)malloc(sizeof(double)); /* AS */
+ memset(sm_p->u.io.tmp_buffer, 0, sizeof(double));
+ }
+ double old, new, final;
+ memcpy(&old, sm_p->u.io.tmp_buffer, sizeof(double)); /* old value */
+ memcpy(&new, resp->u.read_completion.result.buffer, sizeof(double)); /* new value */
+ switch(sm_p->u.io.op) {
+ case 0x58000001: /* MAX */
+ if(new > old)
+ final = new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+ break;
+ case 0x58000002: /* MIN */
+ if(new < old)
+ final = new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+ break;
+ case 0x58000003: /* SUM */
+ final = old + new;
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "result(DOUBLE): old=%lf, new=%lf, final=%lf\n", old, new, final); /* AS */
+ break;
+ default:
+ break;
+ }
+
+ sm_p->u.io.tmp_buffer = (void *)&final;
+ memcpy(cur_ctx->flow_desc.dest.u.mem.buffer, &final, sizeof(double)); /* AS */
+ }
+ break;
+ default:
+ break;
+ }
+ *total_size += resp->u.read_completion.total_completed;
+
+ /* pass along the error code from the server as well */
+ ret = resp->status;
+
+ PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
+ }
+ else
+ {
+ PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
+ }
+
+ PINT_flow_reset(&cur_ctx->flow_desc);
+ BMI_memfree(cur_ctx->msg.svr_addr,
+ cur_ctx->read_ack.encoded_resp_p,
+ cur_ctx->read_ack.max_resp_sz, BMI_RECV);
+ }
+ else if (cur_ctx->flow_status.error_code)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG,
+ " error (%d) in flow for context %p\n",
+ cur_ctx->flow_status.error_code, cur_ctx);
+ PINT_flow_reset(&cur_ctx->flow_desc);
+ ret = cur_ctx->flow_status.error_code;
+ }
+ } /* AS */
else if (io_type == PVFS_IO_WRITE)
{
/* we check the write ack status before the flow status so that the
More information about the Pvfs2-cvs
mailing list