[Pvfs2-cvs] commit by sson in pvfs2/src/server: read.sm io.sm
module.mk.in pvfs2-server.h
CVS commit program
cvs at parl.clemson.edu
Tue Apr 14 16:19:50 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv13438/src/server
Modified Files:
Tag: as-branch
io.sm module.mk.in pvfs2-server.h
Added Files:
Tag: as-branch
read.sm
Log Message:
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ read.sm 2009-04-14 16:19:50.000000000 -0400
@@ -0,0 +1,513 @@
+/*
+ * (C) 2001 Clemson University and The University of Chicago
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ * PVFS2 server state machine for driving read I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "trove.h"
+
+static void do_comp(struct fp_queue_item *);
+
+%%
+
+nested machine pvfs2_read_sm
+{
+ state trove_read
+ {
+ run trove_read_call;
+ default => bmi_send;
+ }
+
+ state bmi_send
+ {
+ run bmi_send_call;
+ default => check;
+ }
+
+ state check
+ {
+ run check_done;
+ success => finished;
+ LOOP => trove_read;
+ }
+
+ state finished
+ {
+ run epilog;
+ default => return;
+ }
+}
+
+%%
+
+/*
+ * PINT_process_request() -> job_trove_bstream_read_list()
+ */
+static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.flow_read.id;
+ void *tmp_buffer;
+ PVFS_size bytes_processed = 0;
+ int err = -PVFS_EIO;
+ int ret;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ struct filesystem_configuration_s *fs_config;
+ struct server_configuration_s *server_config;
+ int count;
+ PVFS_offset *offsets;
+ PVFS_size *sizes;
+ PVFS_size bytes;
+ int tmp_id;
+
+ q_item->buffer_used = 0;
+#if 0
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "%s: error_code: %d, initial_call_flag: %d, flow: %p.\n",
+ __func__, error_code, initial_call_flag,
+ flow_data->parent);
+
+ if(error_code != 0 || flow_data->parent->error_code != 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ handle_io_error(error_code, q_item, flow_data);
+ if(flow_data->parent->state == FLOW_COMPLETE)
+ return(1);
+ else
+ return(0);
+ }
+
+ /* if there are no more receives to post, just return */
+ if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+ {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+#endif
+ if(!q_item->buffer) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
+ /* if the q_item has not been used, allocate a buffer */
+ q_item->buffer = BMI_memalloc(
+ q_item->parent->dest.u.bmi.address,
+ q_item->parent->buffer_size, BMI_SEND);
+ /* TODO: error handling */
+ assert(q_item->buffer);
+ }
+
+ bytes = q_item->parent->buffer_size;
+ PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%d, count=%d\n", __func__,
+ bytes, count);
+
+ q_item->buffer_used = bytes;
+ s_op->u.flow_read.offsets = offsets;
+ s_op->u.flow_read.sizes = sizes;
+ s_op->u.flow_read.segs = count;
+
+ if(count == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+
+#if 0
+ if(bytes == 0) {
+ if(flow_data->parent->total_transferred ==
+ flow_data->total_bytes_processed &&
+ PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
+ assert(q_item->parent->state != FLOW_COMPLETE);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
+ q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
+ }
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+#endif
+
+ //do{
+ //assert(q_item->buffer_used);
+ //assert(result_tmp->result.bytes);
+ //result_tmp->q_item = q_item;
+ /* XXX: can someone confirm this avoids a segfault in the immediate
+ * completion case? */
+ //assert(result_tmp->result.bytes);
+
+ fs_config = PINT_config_get_filesystems(&server_config);
+
+ ret = job_trove_bstream_read_list(
+ q_item->parent->src.u.trove.coll_id,
+ q_item->parent->src.u.trove.handle,
+ //(char**)&result_tmp->buffer_offset,
+ &q_item->buffer,
+ &q_item->buffer_used,
+ 1,
+ offsets,
+ sizes,
+ count,
+ &q_item->out_size,
+ fs_config->trove_sync_data,
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context,
+ q_item->parent->hints);
+
+ if(ret < 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME */
+ //handle_io_error(ret, q_item, flow_data);
+ if(flow_data->parent->state == FLOW_COMPLETE)
+ return (1);
+ else
+ return (0);
+ }
+
+ if(ret == 1)
+ {
+ /* immediate completion; trigger callback ourselves */
+ js_p->error_code = 1;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_DEFERRED;
+ }
+ //} while(result_tmp);
+
+ return SM_ACTION_COMPLETE;
+}
+
+static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ int err = -PVFS_EIO;
+ int done = 0;
+ int ret;
+ job_id_t tmp_id;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+
+#if 0
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "%s, error_code: %d, flow: %p.\n", __func__,
+ error_code, flow_data->parent);
+
+ result_tmp->posted_id = 0;
+
+ if(error_code != 0 || flow_data->parent->error_code != 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ handle_io_error(error_code, q_item, flow_data);
+ return;
+ }
+#endif
+
+ if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+#if 0
+ if(q_item->parent->state == FLOW_COMPLETE) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->result.bytes=%ld\n",
+ __func__, result_tmp->result.bytes);
+
+ result_tmp = &q_item->result_chain;
+ do{
+ old_result_tmp = result_tmp;
+ result_tmp = result_tmp->next;
+ if(old_result_tmp != &q_item->result_chain)
+ free(old_result_tmp);
+ } while(result_tmp);
+ q_item->result_chain.next = NULL;
+#endif
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+ q_item->buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
+ (char *)q_item->buffer);
+ /***************************************************/
+ do_comp(q_item);
+ /***************************************************/
+
+ /* while we hold dest lock, look for next seq no. to send */
+ //do{
+ assert(q_item->buffer_used);
+ if(flow_data->parent->op != 0) { /* AS: when op is specified */
+ //ret = 1; /* AS: skip sending if op is specified */
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+ else
+ ret = job_bmi_send(q_item->parent->dest.u.bmi.address,
+ q_item->buffer,
+ q_item->buffer_used,
+ q_item->parent->tag,
+ BMI_PRE_ALLOC,
+ 0, /* send_unexpected */
+ smcb, /* user_ptr */
+ 0, /* status_user_tag */
+ js_p,
+ &tmp_id,
+ server_job_context,
+ user_opts->server_job_flow_timeout,
+ (bmi_hint)q_item->parent->hints);
+
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
+ __func__,
+ flow_data->initial_posts, flow_data->dest_pending,
+ flow_data->dest_last_posted);
+
+ if(ret < 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME !!!!!!! */
+ /* handle_io_error(ret, q_item, flow_data); */
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 1)
+ {
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 0) {
+ js_p->error_code = ret;
+ return SM_ACTION_DEFERRED;
+ }
+ //} while(!done);
+
+ return SM_ACTION_COMPLETE;
+}
+
+static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
+ js_p->error_code = 0;
+
+ flow_data->total_bytes_processed += q_item->buffer_used;
+ q_item->parent->total_transferred += q_item->buffer_used;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
+ flow_data->total_bytes_processed);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
+ s_op->u.flow_read.segs);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: handle=%d\n", __func__, h);
+ //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
+ if(!segpool_done(h)) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
+ js_p->error_code = LOOP;
+ }
+ else {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: read: DONE\n", __func__);
+ flow_data->parent->state = FLOW_COMPLETE;
+ FLOW_CLEANUP(flow_data);
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ js_p->error_code = 0;
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
+ return SM_ACTION_COMPLETE;
+}
+
+static void do_comp(struct fp_queue_item *q_item)
+{
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+
+ if(q_item->buffer) {
+ PVFS_size i;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "%s: q_item->buffer_used=%ld, op=0x%x, datatype=0x%x\n",
+ __func__, q_item->buffer_used, flow_data->parent->op,
+ flow_data->parent->datatype); /* AS */
+
+ switch(flow_data->parent->datatype) {
+ case ((int)0x4c000405): /* MPI_INT */
+ {
+ int *a = q_item->buffer;
+ int result;
+ PVFS_size count = (q_item->buffer_used)/((*PVFS_INT).ub);
+ int *tmp;
+
+ if (flow_data->parent->total_transferred == 0) {
+ if (flow_data->parent->tmp_buffer == NULL)
+ flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(int));
+ memset(flow_data->parent->tmp_buffer, 0, sizeof(int));
+ }
+ tmp = flow_data->parent->tmp_buffer;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld, total_bytes_processed=%ld\n", flow_data->parent->total_transferred, flow_data->total_bytes_processed);
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
+
+ switch(flow_data->parent->op) {
+ case 0x58000001: /* MAX */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] > result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (flow_data->parent->total_transferred == 0 ||
+ result > *tmp)
+ *tmp = result;
+ break;
+ case 0x58000002: /* MIN */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] < result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (flow_data->parent->total_transferred == 0 ||
+ result < *tmp)
+ *tmp = result;
+ break;
+ case 0x58000003: /* SUM */
+ result = 0;
+ for (i=0; i<count; i++ ) {
+ if (i<10)
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%d\n",
+ i, a[i]);
+ result += a[i];
+ }
+ a[0] = result;
+ *tmp += result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%d\n",
+ *tmp);
+ break;
+
+ default:
+ break;
+ }
+ q_item->buffer = (void *)a;
+ flow_data->parent->tmp_buffer = (void *)tmp;
+ }
+ break;
+
+ case ((int)0x4c00080b): /* MPI_DOUBLE */
+ {
+ double *a = q_item->buffer;
+ double result;
+ PVFS_size count = (q_item->buffer_used)/((*PVFS_DOUBLE).ub);
+ double *tmp;
+
+ if (flow_data->parent->total_transferred == 0) {
+ if (flow_data->parent->tmp_buffer == NULL)
+ flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(double));
+ memset(flow_data->parent->tmp_buffer, 0, sizeof(double));
+ }
+ tmp = flow_data->parent->tmp_buffer;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
+ "total_transferred=%ld, total_bytes_processed=%ld\n",
+ flow_data->parent->total_transferred,
+ flow_data->total_bytes_processed);
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n",
+ count, *tmp);
+ switch(flow_data->parent->op) {
+ case 0x58000001: /* MAX */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] > result) {
+ result = a[i];
+ }
+ }
+ a[0] = result;
+ if (flow_data->parent->total_transferred == 0 ||
+ result > *tmp)
+ *tmp = result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
+ break;
+ case 0x58000002: /* MIN */
+ result = *a;
+ for (i=1; i<count; i++ ) {
+ if (a[i] < result) {
+ result = a[i];
+ }
+ }
+
+ a[0] = result;
+ if (flow_data->parent->total_transferred == 0 ||
+ result < *tmp)
+ *tmp = result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
+ *tmp);
+ break;
+ case 0x58000003: /* SUM */
+ result = 0;
+ for (i=0; i<count; i++ ) {
+ if (i<10) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
+ result += a[i];
+ }
+
+ a[0] = result;
+ *tmp += result;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "sum=%lf\n",
+ *tmp);
+ break;
+ default:
+ break;
+ } /* end inner switch */
+ q_item->buffer = (void *)a;
+ flow_data->parent->tmp_buffer = (void *)tmp;
+ }
+
+ break;
+ default:
+ break;
+ } /* end switch() */
+ } /* end if() */
+}
+
+/*
+ * Local variables:
+ * mode: c
+ * c-indent-level: 4
+ * c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73 -r1.73.6.1
--- io.sm 20 Nov 2008 01:17:10 -0000 1.73
+++ io.sm 14 Apr 2009 20:19:50 -0000 1.73.6.1
@@ -18,6 +18,11 @@
#include "pint-distribution.h"
#include "pint-request.h"
#include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "pint-segpool.h"
+
+extern struct PINT_state_machine_s pvfs2_read_sm;
+extern struct PINT_state_machine_s pvfs2_write_sm;
%%
@@ -33,7 +38,7 @@ machine pvfs2_io_sm
state send_positive_ack
{
run io_send_ack;
- success => start_flow;
+ success => start_pipelining;
default => release;
}
@@ -43,9 +48,13 @@ machine pvfs2_io_sm
default => release;
}
- state start_flow
+ state start_pipelining
{
- run io_start_flow;
+ pjmp start_pipeline_sm
+ {
+ DO_READ => pvfs2_read_sm;
+ DO_WRITE => pvfs2_write_sm;
+ }
success => send_completion_ack;
default => release;
}
@@ -106,6 +115,8 @@ static int io_send_ack(
err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
s_op->addr, s_op->decoded.enc_type);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: error=%d\n", __func__, err);
if (err < 0)
{
gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
@@ -124,7 +135,7 @@ static int io_send_ack(
}
/*
- * Function: io_start_flow()
+ * Function: start_pipeline_sm()
*
* Params: server_op *s_op,
* job_status_s* js_p
@@ -141,16 +152,21 @@ static int io_send_ack(
* carry out the data transfer
*
*/
-static PINT_sm_action io_start_flow(
+static PINT_sm_action start_pipeline_sm(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- int err = -PVFS_EIO;
- job_id_t tmp_id;
+ //int err = -PVFS_EIO;
struct server_configuration_s *user_opts = get_server_config_struct();
struct filesystem_configuration_s *fs_conf;
+ struct PINT_server_op *flow_read_op;
+ struct PINT_server_op *flow_write_op;
+ int i, ret;
+ struct fp_private_data *flow_data = NULL;
+ PINT_segpool_handle_t seg_handle;
s_op->u.io.flow_d = PINT_flow_alloc();
+
if (!s_op->u.io.flow_d)
{
js_p->error_code = -PVFS_ENOMEM;
@@ -171,14 +187,14 @@ static PINT_sm_action io_start_flow(
/* on writes, we allow the bstream to be extended at EOF */
if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
{
- gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
- "write data.\n");
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
+ "write data.\n", __func__);
s_op->u.io.flow_d->file_data.extend_flag = 1;
}
else
{
- gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
- "read data.\n");
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
+ "read data.\n", __func__);
s_op->u.io.flow_d->file_data.extend_flag = 0;
}
@@ -189,9 +205,15 @@ static PINT_sm_action io_start_flow(
s_op->u.io.flow_d->tag = s_op->tag;
s_op->u.io.flow_d->user_ptr = NULL;
s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
+ s_op->u.io.flow_d->op = s_op->req->u.io.op; /* AS: operation */
+ s_op->u.io.flow_d->datatype = s_op->req->u.io.datatype; /* AS: dtype */
+ s_op->u.io.flow_d->dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
+
+ gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct);
+ for(i=0; i<s_op->req->u.io.server_ct; i++)
+ gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i, s_op->u.io.flow_d->dfile_array[i]);
- fs_conf = PINT_config_find_fs_id(user_opts,
- s_op->req->u.io.fs_id);
+ fs_conf = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
if(fs_conf)
{
/* pick up any buffer settings overrides from fs conf */
@@ -235,11 +257,168 @@ static PINT_sm_action io_start_flow(
return SM_ACTION_COMPLETE;
}
- err = job_flow(s_op->u.io.flow_d, smcb, 0, js_p, &tmp_id,
- server_job_context, user_opts->server_job_flow_timeout
- , s_op->req->hints);
+ flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
+ if(!flow_data) {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+
+ memset(flow_data, 0, sizeof(struct fp_private_data));
+
+ s_op->u.io.flow_d->flow_protocol_data = flow_data;
+ s_op->u.io.flow_d->state = FLOW_TRANSMITTING;
+ flow_data->parent = s_op->u.io.flow_d;
+
+ /* setup the request processing states */
+#if 0 //////////////////
+ s_op->u.io.flow_d->file_req_state =
+ PINT_new_request_state(s_op->u.io.flow_d->file_req);
+ if (!s_op->u.io.flow_d->file_req_state)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+
+ /* only setup a memory datatype state if caller provided a memory datatype */
+ if(s_op->u.io.flow_d->mem_req)
+ {
+ s_op->u.io.flow_d->mem_req_state =
+ PINT_new_request_state(s_op->u.io.flow_d->mem_req);
+ if (!s_op->u.io.flow_d->mem_req_state)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+ }
- return err;
+ /* if a file datatype offset was specified, go ahead and skip ahead
+ * before doing anything else
+ */
+ if(s_op->u.io.flow_d->file_req_offset)
+ PINT_REQUEST_STATE_SET_TARGET(s_op->u.io.flow_d->file_req_state,
+ s_op->u.io.flow_d->file_req_offset);
+
+
+ /* set boundaries on file datatype */
+ if(s_op->u.io.flow_d->aggregate_size > -1)
+ {
+ PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
+ s_op->u.io.flow_d->aggregate_size+s_op->u.io.flow_d->file_req_offset);
+ }
+ else
+ {
+ PINT_REQUEST_STATE_SET_FINAL(s_op->u.io.flow_d->file_req_state,
+ s_op->u.io.flow_d->file_req_offset +
+ PINT_REQUEST_TOTAL_BYTES(s_op->u.io.flow_d->mem_req));
+ }
+#endif //////////////////////
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
+ s_op->u.io.flow_d->aggregate_size);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
+ s_op->u.io.flow_d->file_data.fsize);
+
+ if(s_op->u.io.flow_d->buffer_size < 1)
+ s_op->u.io.flow_d->buffer_size = BUFFER_SIZE;
+ if(s_op->u.io.flow_d->buffers_per_flow < 1)
+ s_op->u.io.flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+
+ flow_data->prealloc_array = (struct fp_queue_item*)
+ malloc(s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+ if(!flow_data->prealloc_array)
+ {
+ free(flow_data);
+ js_p->error_code = (-PVFS_ENOMEM);
+ return SM_ACTION_COMPLETE;
+ }
+
+ memset(flow_data->prealloc_array, 0,
+ s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+ for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++)
+ {
+ flow_data->prealloc_array[i].parent = s_op->u.io.flow_d;
+ }
+
+ /* remaining setup depends on the endpoints we intend to use */
+ if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+ PINT_segpool_unit_id id;
+
+ PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
+ ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
+ s_op->u.io.flow_d->file_req,
+ s_op->u.io.flow_d->file_data.fsize,
+ s_op->u.io.flow_d->aggregate_size,
+ s_op->u.io.flow_d->file_data.server_nr,
+ s_op->u.io.flow_d->file_data.server_ct,
+ s_op->u.io.flow_d->file_data.dist, /* dist */
+ s_op->u.io.flow_d->file_req_offset, /* sson */
+ PINT_SP_SERVER_READ,
+ &seg_handle);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
+ //for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
+ for(i=0; i<1; i++) {
+ PINT_segpool_register(seg_handle, &id);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
+ flow_data->prealloc_array[i].result_chain.q_item =
+ &flow_data->prealloc_array[i];
+ flow_read_op = malloc(sizeof(*flow_read_op));
+ memset(flow_read_op, 0, sizeof(*flow_read_op));
+ memcpy(flow_read_op, s_op, sizeof(*flow_read_op));
+ flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
+ flow_read_op->u.flow_read.seg_handle = seg_handle;
+ flow_read_op->u.flow_read.id = id;
+ ret = PINT_sm_push_frame(smcb, DO_READ, flow_read_op);
+ if(ret < 0) {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+ gossip_debug(GOSSIP_IO_DEBUG, "flow_read: ret=%d\n", ret);
+ }
+ }
+ else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_WRITE\n", __func__);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: file_size=%ld\n", __func__,
+ s_op->u.io.flow_d->aggregate_size);
+ PINT_segpool_unit_id id;
+
+ PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
+ ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
+ s_op->u.io.flow_d->file_req,
+ s_op->u.io.flow_d->file_data.fsize, /* FIXME?? */
+ s_op->u.io.flow_d->aggregate_size,
+ s_op->u.io.flow_d->file_data.server_nr,
+ s_op->u.io.flow_d->file_data.server_ct,
+ s_op->u.io.flow_d->file_data.dist, /* dist */
+ s_op->u.io.flow_d->file_req_offset, /* sson */
+ PINT_SP_SERVER_WRITE,
+ &seg_handle);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_init() called: ret=%d, seg_handle=%d\n", __func__, ret, seg_handle);
+ /* only post one outstanding recv at a time; easier to manage */
+ /* FIXME: we will eventually want to post multiple of recvs */
+ //for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
+ for(i=0; i<1; i++) {
+ PINT_segpool_register(seg_handle, &id);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: PINT_segpool_register() called: id=%d\n", __func__, id);
+ flow_data->prealloc_array[i].result_chain.q_item =
+ &flow_data->prealloc_array[i];
+
+ flow_write_op = malloc(sizeof(*flow_write_op));
+ memset(flow_write_op, 0, sizeof(*flow_write_op));
+ memcpy(flow_write_op, s_op, sizeof(*flow_write_op));
+ flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
+ flow_write_op->u.flow_write.seg_handle = seg_handle;
+ flow_write_op->u.flow_write.id = id;
+ ret = PINT_sm_push_frame(smcb, DO_WRITE, flow_write_op);
+ if(ret < 0) {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+ }
+ }
+
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
}
/*
@@ -344,8 +523,12 @@ static PINT_sm_action io_send_completion
*/
if (s_op->req->u.io.io_type == PVFS_IO_READ)
{
- js_p->error_code = 0;
- return SM_ACTION_COMPLETE;
+ if(s_op->req->u.io.op == 0) { /* AS */
+ gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack()?\n");
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+ gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), IO_READ with op\n");
}
/* release encoding of the first ack that we sent */
@@ -354,19 +537,63 @@ static PINT_sm_action io_send_completion
/* zero size for safety */
s_op->encoded.total_size = 0;
+ if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS */
+ switch(s_op->req->u.io.datatype) {
+ case (int)0x4c000405: /* MPI_INT */
+ s_op->encoded.total_size = sizeof(int); /* FIXME */
+ break;
+ case (int)0x4c00080b:
+ s_op->encoded.total_size = sizeof(double); /* FIXME */
+ break;
+ default:
+ break;
+ }
+ }
+
/*
fill in response -- status field is the only generic one we
should have to set
*/
- s_op->resp.op = PVFS_SERV_WRITE_COMPLETION; /* not IO */
+ if(s_op->req->u.io.op != 0 && s_op->req->u.io.io_type == PVFS_IO_READ) /* AS */
+ s_op->resp.op = PVFS_SERV_READ_COMPLETION; /* AS: read with op */
+ else
+ s_op->resp.op = PVFS_SERV_WRITE_COMPLETION; /* not IO */
s_op->resp.status = js_p->error_code;
- s_op->resp.u.write_completion.total_completed =
- s_op->u.io.flow_d->total_transferred;
+ if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS: read with op */
+ s_op->resp.u.read_completion.total_completed =
+ s_op->u.io.flow_d->total_transferred;
+ gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.flow_d->total_transferred); /* AS */
+ switch(s_op->req->u.io.datatype) {
+ case (int)0x4c000405: /* MPI_INT */
+ {
+ int *tmp;
+ tmp = s_op->u.io.flow_d->tmp_buffer;
+ gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (INT) to send=%d, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
+ s_op->resp.u.read_completion.result.buffer_sz = sizeof(int);
+ s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+ break;
+ }
+ case (int)0x4c00080b:
+ {
+ double *tmp;
+ tmp = s_op->u.io.flow_d->tmp_buffer;
+ gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (DOUBLE) to send=%lf, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
+ s_op->resp.u.read_completion.result.buffer_sz = sizeof(double);
+ s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+ break;
+ }
+ }
+ }
+ else /* AS */
+ s_op->resp.u.write_completion.total_completed =
+ s_op->u.io.flow_d->total_transferred; /* AS */
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.flow_d->total_transferred);
err = PINT_encode(
&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
s_op->addr, s_op->decoded.enc_type);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 0
if (err < 0)
{
gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
@@ -381,6 +608,8 @@ static PINT_sm_action io_send_completion
server_job_context, user_opts->server_job_bmi_timeout,
s_op->req->hints);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 1
+
return err;
}
@@ -403,7 +632,7 @@ struct PINT_server_req_params pvfs2_io_p
.access_type = PINT_server_req_access_io,
.sched_policy = PINT_SERVER_REQ_SCHEDULE,
.get_object_ref = PINT_get_object_ref_io,
- .state_machine = &pvfs2_io_sm
+ .state_machine = &pvfs2_io_sm,
};
/*
Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/module.mk.in,v
diff -p -u -r1.55 -r1.55.6.1
--- module.mk.in 20 Nov 2008 01:17:10 -0000 1.55
+++ module.mk.in 14 Apr 2009 20:19:50 -0000 1.55.6.1
@@ -43,7 +43,9 @@ ifdef BUILD_SERVER
$(DIR)/list-eattr.c \
$(DIR)/unexpected.c \
$(DIR)/precreate-pool-refiller.c \
- $(DIR)/unstuff.c
+ $(DIR)/unstuff.c \
+ $(DIR)/read.c \
+ $(DIR)/write.c
# c files that should be added to server library
SERVERSRC += \
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155 -r1.155.6.1
--- pvfs2-server.h 29 Jan 2009 05:24:04 -0000 1.155
+++ pvfs2-server.h 14 Apr 2009 20:19:50 -0000 1.155.6.1
@@ -32,6 +32,7 @@
#include "pvfs2-req-proto.h"
#include "state-machine.h"
#include "pint-event.h"
+#include "pint-segpool.h"
extern job_context_id server_job_context;
@@ -335,6 +336,32 @@ struct PINT_server_io_op
flow_descriptor* flow_d;
};
+/* This is for flow state machine */
+/* not sure if this is the right place */
+struct PINT_server_flow_read_op
+{
+ struct fp_queue_item *q_item;
+ PINT_segpool_handle_t seg_handle;
+ PINT_segpool_unit_id id;
+ PVFS_offset *offsets;
+ PINT_Request_state *file_req_state;
+ PVFS_size *sizes;
+ int segs;
+ int parallel_sms;
+};
+
+struct PINT_server_flow_write_op
+{
+ struct fp_queue_item *q_item;
+ PINT_segpool_handle_t seg_handle;
+ PINT_segpool_unit_id id;
+ PVFS_offset *offsets;
+ PINT_Request_state *file_req_state;
+ PVFS_size *sizes;
+ int segs;
+ int parallel_sms;
+};
+
struct PINT_server_small_io_op
{
PVFS_offset offsets[IO_MAX_REGIONS];
@@ -470,6 +497,8 @@ typedef struct PINT_server_op
struct PINT_server_rmdirent_op rmdirent;
struct PINT_server_io_op io;
struct PINT_server_small_io_op small_io;
+ struct PINT_server_flow_read_op flow_read; /* for read sm */
+ struct PINT_server_flow_write_op flow_write; /* for write sm */
struct PINT_server_flush_op flush;
struct PINT_server_truncate_op truncate;
struct PINT_server_mkdir_op mkdir;
More information about the Pvfs2-cvs
mailing list