[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm
pvfs2-server.h read.sm rw-sm.h write.sm
CVS commit program
cvs at parl.clemson.edu
Mon Apr 20 17:10:50 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv30835/src/server
Modified Files:
Tag: as-branch
io.sm pvfs2-server.h read.sm rw-sm.h write.sm
Log Message:
Cleaned the data structures related to flow_d.
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.3 -r1.73.6.4
--- io.sm 18 Apr 2009 21:59:24 -0000 1.73.6.3
+++ io.sm 20 Apr 2009 21:10:50 -0000 1.73.6.4
@@ -18,8 +18,8 @@
#include "pint-distribution.h"
#include "pint-request.h"
#include "pvfs2-internal.h"
-#include "rw-sm.h"
#include "pint-segpool.h"
+#include "rw-sm.h"
extern struct PINT_state_machine_s pvfs2_read_sm;
extern struct PINT_state_machine_s pvfs2_write_sm;
@@ -52,7 +52,7 @@ machine pvfs2_io_sm
state start_pipelining
{
- pjmp start_pipeline_sm
+ pjmp start_pipelining_sm
{
DO_READ => pvfs2_read_sm;
DO_WRITE => pvfs2_write_sm;
@@ -137,7 +137,7 @@ static int io_send_ack(
}
/*
- * Function: start_pipeline_sm()
+ * Function: start_pipelining_sm()
*
* Params: server_op *s_op,
* job_status_s* js_p
@@ -150,106 +150,94 @@ static int io_send_ack(
* Returns: int
*
* Synopsis: this is the most important part of the state machine.
- * we setup the flow descriptor and post it in order to
- * carry out the data transfer
+ * we setup the multiple read/write state machines
+ * to carry out the data transfer
*
*/
-static PINT_sm_action start_pipeline_sm(
+static PINT_sm_action start_pipelining_sm(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct server_configuration_s *user_opts = get_server_config_struct();
struct filesystem_configuration_s *fs_conf;
- struct PINT_server_op *flow_read_op;
- struct PINT_server_op *flow_write_op;
+ struct PINT_server_op *rwsm_op;
int i, ret;
- struct fp_private_data *flow_data = NULL;
PINT_segpool_handle_t seg_handle;
- s_op->u.io.flow_d = PINT_flow_alloc();
-
- if (!s_op->u.io.flow_d)
- {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
-
- s_op->u.io.flow_d->hints = s_op->req->hints;
+ s_op->u.io.hints = s_op->req->hints;
+ s_op->u.io.total_transferred = 0;
/* we still have the file size stored in the response structure
* that we sent in the previous state, other details come from
* request
*/
- s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
- s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
- s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
- s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
+ s_op->u.io.file_data.fsize = s_op->resp.u.io.bstream_size;
+ s_op->u.io.file_data.dist = s_op->req->u.io.io_dist;
+ s_op->u.io.file_data.server_nr = s_op->req->u.io.server_nr;
+ s_op->u.io.file_data.server_ct = s_op->req->u.io.server_ct;
/* on writes, we allow the bstream to be extended at EOF */
if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
{
gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
"write data.\n", __func__);
- s_op->u.io.flow_d->file_data.extend_flag = 1;
+ s_op->u.io.file_data.extend_flag = 1;
}
else
{
gossip_debug(GOSSIP_IO_DEBUG, "%s: issuing pipelining to "
"read data.\n", __func__);
- s_op->u.io.flow_d->file_data.extend_flag = 0;
+ s_op->u.io.file_data.extend_flag = 0;
}
- s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
- s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
- s_op->u.io.flow_d->mem_req = NULL;
- s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
- s_op->u.io.flow_d->tag = s_op->tag;
- s_op->u.io.flow_d->user_ptr = NULL;
- s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
- s_op->u.io.flow_d->op = s_op->req->u.io.op; /* AS: operation */
- s_op->u.io.flow_d->datatype = s_op->req->u.io.datatype; /* AS: dtype */
- s_op->u.io.flow_d->dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
+ s_op->u.io.file_req = s_op->req->u.io.file_req;
+ s_op->u.io.file_req_offset = s_op->req->u.io.file_req_offset;
+ s_op->u.io.mem_req = NULL;
+ s_op->u.io.aggregate_size = s_op->req->u.io.aggregate_size;
+ s_op->u.io.tag = s_op->tag;
+ s_op->u.io.user_ptr = NULL;
+ s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */
+ s_op->u.io.datatype = s_op->req->u.io.datatype; /* AS: dtype */
+ s_op->u.io.dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct);
for(i=0; i<s_op->req->u.io.server_ct; i++)
- gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i, s_op->u.io.flow_d->dfile_array[i]);
+ gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i,
+ s_op->u.io.dfile_array[i]);
fs_conf = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
if(fs_conf)
{
/* pick up any buffer settings overrides from fs conf */
- s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
- s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
+ s_op->u.io.buffer_size = fs_conf->fp_buffer_size;
+ s_op->u.io.num_of_buffers = fs_conf->fp_buffers_per_flow;
}
- gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, "
- "server_nr: %d, server_ct: %d\n",
- lld(s_op->u.io.flow_d->file_data.fsize),
- (int)s_op->u.io.flow_d->file_data.server_nr,
- (int)s_op->u.io.flow_d->file_data.server_ct);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: fsize: %lld, "
+ "server_nr: %d, server_ct: %d\n", __func__,
+ lld(s_op->u.io.file_data.fsize),
+ (int)s_op->u.io.file_data.server_nr,
+ (int)s_op->u.io.file_data.server_ct);
gossip_debug(GOSSIP_IO_DEBUG, " file_req_offset: %lld, "
"aggregate_size: %lld, handle: %llu\n",
- lld(s_op->u.io.flow_d->file_req_offset),
- lld(s_op->u.io.flow_d->aggregate_size),
+ lld(s_op->u.io.file_req_offset),
+ lld(s_op->u.io.aggregate_size),
llu(s_op->req->u.io.handle));
+ /* FIXME: loop bodies are same, need to remove if_else??? */
/* set endpoints depending on type of io requested */
if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
{
- s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
- s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
- s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
- s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
- s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
+ s_op->u.io.address = s_op->addr;
+ s_op->u.io.handle = s_op->req->u.io.handle;
+ s_op->u.io.coll_id = s_op->req->u.io.fs_id;
}
else if (s_op->req->u.io.io_type == PVFS_IO_READ)
{
- s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
- s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
- s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
- s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
- s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
+ s_op->u.io.handle = s_op->req->u.io.handle;
+ s_op->u.io.coll_id = s_op->req->u.io.fs_id;
+ s_op->u.io.address = s_op->addr;
}
else
{
@@ -258,120 +246,56 @@ static PINT_sm_action start_pipeline_sm(
return SM_ACTION_COMPLETE;
}
- flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
- if(!flow_data) {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
-
- memset(flow_data, 0, sizeof(struct fp_private_data));
-
- s_op->u.io.flow_d->flow_protocol_data = flow_data;
- s_op->u.io.flow_d->state = FLOW_TRANSMITTING;
- flow_data->parent = s_op->u.io.flow_d;
-
/* setup the request processing states */
gossip_debug(GOSSIP_IO_DEBUG, "%s: aggregate_size=%ld\n", __func__,
- s_op->u.io.flow_d->aggregate_size);
+ s_op->u.io.aggregate_size);
gossip_debug(GOSSIP_IO_DEBUG, "%s: file_data.fsize=%ld\n", __func__,
- s_op->u.io.flow_d->file_data.fsize);
+ s_op->u.io.file_data.fsize);
- if(s_op->u.io.flow_d->buffer_size < 1)
- s_op->u.io.flow_d->buffer_size = BUFFER_SIZE;
- if(s_op->u.io.flow_d->buffers_per_flow < 1)
- s_op->u.io.flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
-
- flow_data->prealloc_array = (struct fp_queue_item*)
- malloc(s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
- if(!flow_data->prealloc_array)
- {
- free(flow_data);
- js_p->error_code = (-PVFS_ENOMEM);
- return SM_ACTION_COMPLETE;
- }
-
- memset(flow_data->prealloc_array, 0,
- s_op->u.io.flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
- for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++)
- {
- flow_data->prealloc_array[i].parent = s_op->u.io.flow_d;
- }
-
- /* remaining setup depends on the endpoints we intend to use */
- if(s_op->req->u.io.io_type == PVFS_IO_READ) {
- PINT_segpool_unit_id id;
-
- PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
- ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
- s_op->u.io.flow_d->file_req,
- s_op->u.io.flow_d->file_data.fsize,
- s_op->u.io.flow_d->file_req_offset,
- s_op->u.io.flow_d->aggregate_size,
- s_op->u.io.flow_d->file_data.server_nr,
- s_op->u.io.flow_d->file_data.server_ct,
- s_op->u.io.flow_d->file_data.dist, /* dist */
- PINT_SP_SERVER_READ,
- &seg_handle);
-
- s_op->u.io.seg_handle = seg_handle;
- for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
- PINT_segpool_register(seg_handle, &id);
- flow_data->prealloc_array[i].result_chain.q_item =
- &flow_data->prealloc_array[i];
- flow_read_op = malloc(sizeof(*flow_read_op));
- memset(flow_read_op, 0, sizeof(*flow_read_op));
- flow_read_op->u.flow_read.q_item = &flow_data->prealloc_array[i];
- flow_read_op->u.flow_read.seg_handle = seg_handle;
- flow_read_op->u.flow_read.id = id;
- ret = PINT_sm_push_frame(smcb, DO_READ, flow_read_op);
- if(ret < 0) {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
+ if(s_op->u.io.buffer_size < 1)
+ s_op->u.io.buffer_size = BUFFER_SIZE;
+ if(s_op->u.io.num_of_buffers < 1)
+ s_op->u.io.num_of_buffers = BUFFERS_PER_PIPELINING;
+
+ PINT_segpool_unit_id id;
+
+ PINT_dist_lookup(s_op->u.io.file_data.dist);
+ ret = PINT_segpool_init(s_op->u.io.mem_req,
+ s_op->u.io.file_req,
+ s_op->u.io.file_data.fsize,
+ s_op->u.io.file_req_offset,
+ s_op->u.io.aggregate_size,
+ s_op->u.io.file_data.server_nr,
+ s_op->u.io.file_data.server_ct,
+ s_op->u.io.file_data.dist, /* dist */
+ (s_op->req->u.io.io_type == PVFS_IO_READ ?
+ PINT_SP_SERVER_READ: PINT_SP_SERVER_WRITE),
+ &seg_handle);
+
+ s_op->u.io.seg_handle = seg_handle;
+ for(i=0; i<s_op->u.io.num_of_buffers; i++) {
+ PINT_segpool_register(seg_handle, &id);
+ rwsm_op = malloc(sizeof(*rwsm_op));
+ memset(rwsm_op, 0, sizeof(*rwsm_op));
+ rwsm_op->u.rwsm.seg_handle = seg_handle;
+ rwsm_op->u.rwsm.id = id;
+ rwsm_op->u.rwsm.parent = s_op;
+ if(s_op->req->u.io.io_type == PVFS_IO_READ) {
+ ret = PINT_sm_push_frame(smcb, DO_READ, rwsm_op);
s_op->u.io.parallel_read_sms++;
gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_read_sms=%d\n",
__func__, s_op->u.io.parallel_read_sms);
}
- }
- else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: IO_WRITE\n", __func__);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: file_size=%ld\n", __func__,
- s_op->u.io.flow_d->aggregate_size);
- PINT_segpool_unit_id id;
-
- PINT_dist_lookup(s_op->u.io.flow_d->file_data.dist);
- ret = PINT_segpool_init(s_op->u.io.flow_d->mem_req,
- s_op->u.io.flow_d->file_req,
- s_op->u.io.flow_d->file_data.fsize,
- s_op->u.io.flow_d->file_req_offset, /* sson */
- s_op->u.io.flow_d->aggregate_size,
- s_op->u.io.flow_d->file_data.server_nr,
- s_op->u.io.flow_d->file_data.server_ct,
- s_op->u.io.flow_d->file_data.dist, /* dist */
- PINT_SP_SERVER_WRITE,
- &seg_handle);
-
- s_op->u.io.seg_handle = seg_handle;
- /* FIXME: we will eventually want to post multiple of recvs */
- for(i=0; i<s_op->u.io.flow_d->buffers_per_flow; i++) {
- PINT_segpool_register(seg_handle, &id);
- flow_data->prealloc_array[i].result_chain.q_item =
- &flow_data->prealloc_array[i];
-
- flow_write_op = malloc(sizeof(*flow_write_op));
- memset(flow_write_op, 0, sizeof(*flow_write_op));
- flow_write_op->u.flow_write.q_item = &flow_data->prealloc_array[i];
- flow_write_op->u.flow_write.seg_handle = seg_handle;
- flow_write_op->u.flow_write.id = id;
- ret = PINT_sm_push_frame(smcb, DO_WRITE, flow_write_op);
- if(ret < 0) {
- js_p->error_code = -PVFS_ENOMEM;
- return SM_ACTION_COMPLETE;
- }
+ else if(s_op->req->u.io.io_type == PVFS_IO_WRITE) {
+ ret = PINT_sm_push_frame(smcb, DO_WRITE, rwsm_op);
s_op->u.io.parallel_write_sms++;
gossip_debug(GOSSIP_IO_DEBUG, "%s: parallel_write_sms=%d\n",
__func__, s_op->u.io.parallel_write_sms);
}
+ if(ret < 0) {
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
}
js_p->error_code = 0;
@@ -433,11 +357,6 @@ static PINT_sm_action io_cleanup(
PVFS_strerror_r(s_op->resp.status, status_string, 64);
PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "finish (%s)\n", status_string);
- if (s_op->u.io.flow_d)
- {
- PINT_flow_free(s_op->u.io.flow_d);
- }
-
if (s_op->u.io.seg_handle) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: freeing seg_handle\n", __func__);
PINT_segpool_destroy(s_op->u.io.seg_handle);
@@ -460,7 +379,7 @@ static PINT_sm_action io_cleanup(
* Params: server_op *s_op,
* job_status_s* js_p
*
- * Pre: flow is completed so that we can report its status
+ * Pre: IO is completed so that we can report its status
*
* Post: if this is a write, response has been sent to client
* if this is a read, do nothing
@@ -523,34 +442,34 @@ static PINT_sm_action io_send_completion
s_op->resp.status = js_p->error_code;
if(s_op->req->u.io.io_type == PVFS_IO_READ) { /* AS: read with op */
s_op->resp.u.read_completion.total_completed =
- s_op->u.io.flow_d->total_transferred;
- gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.flow_d->total_transferred); /* AS */
+ s_op->u.io.total_transferred;
+ gossip_debug(GOSSIP_IO_DEBUG, "total_transferred=%ld\n", s_op->u.io.total_transferred); /* AS */
switch(s_op->req->u.io.datatype) {
case (int)0x4c000405: /* MPI_INT */
{
int *tmp;
- tmp = s_op->u.io.flow_d->tmp_buffer;
+ tmp = s_op->u.io.tmp_buffer;
gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (INT) to send=%d, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
s_op->resp.u.read_completion.result.buffer_sz = sizeof(int);
- s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+ s_op->resp.u.read_completion.result.buffer = s_op->u.io.tmp_buffer;
break;
}
case (int)0x4c00080b:
{
double *tmp;
- tmp = s_op->u.io.flow_d->tmp_buffer;
+ tmp = s_op->u.io.tmp_buffer;
gossip_debug(GOSSIP_IO_DEBUG, "io_send_completion_ack(), result (DOUBLE) to send=%lf, s_op->resp.u.read_completion.total_completed=%lu\n", *tmp, s_op->resp.u.read_completion.total_completed);
s_op->resp.u.read_completion.result.buffer_sz = sizeof(double);
- s_op->resp.u.read_completion.result.buffer = s_op->u.io.flow_d->tmp_buffer;
+ s_op->resp.u.read_completion.result.buffer = s_op->u.io.tmp_buffer;
break;
}
}
}
else /* AS */
s_op->resp.u.write_completion.total_completed =
- s_op->u.io.flow_d->total_transferred; /* AS */
+ s_op->u.io.total_transferred; /* AS */
- gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.flow_d->total_transferred);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__, s_op->u.io.total_transferred);
err = PINT_encode(
&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
s_op->addr, s_op->decoded.enc_type);
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.2 -r1.155.6.3
--- pvfs2-server.h 18 Apr 2009 21:59:24 -0000 1.155.6.2
+++ pvfs2-server.h 20 Apr 2009 21:10:50 -0000 1.155.6.3
@@ -333,43 +333,53 @@ struct PINT_server_getconfig_op
struct PINT_server_io_op
{
- flow_descriptor* flow_d;
- //io_pipeline* iop; /* substitute for flow_d */
+ //void *parent;
+ PVFS_fs_id coll_id;
+ PVFS_handle handle;
+ PVFS_BMI_addr_t address;
+
+ PINT_Request *file_req;
+ PVFS_offset file_req_offset;
+ PINT_Request *mem_req;
+ PVFS_msg_tag_t tag;
+ void *user_ptr;
+
+ int op;
+ int datatype;
+ void *tmp_buffer;
+ PVFS_handle *dfile_array;
+
+ PVFS_size aggregate_size;
+
+ PINT_request_file_data file_data;
+
+ int buffer_size;
+ int num_of_buffers;
+
+ PVFS_size total_transferred;
+
int parallel_write_sms;
int parallel_read_sms;
PINT_segpool_handle_t seg_handle;
+
+ PVFS_hint hints;
};
-/* This is for flow state machine */
-/* not sure if this is the right place */
-struct PINT_server_flow_read_op
-{
- struct fp_queue_item *q_item;
- void *buffer; /* substitute for q_item */
- PVFS_size buffer_used; /* substitute for q_item */
- PINT_segpool_handle_t seg_handle;
- PINT_segpool_unit_id id;
- PVFS_offset *offsets;
- PINT_Request_state *file_req_state;
- PVFS_size *sizes;
- int segs;
- int parallel_sms;
-};
-
-struct PINT_server_flow_write_op
+/* substibute for flow */
+struct PINT_server_rwsm_op
{
- struct fp_queue_item *q_item;
- void *buffer; /* substitute for q_item */
- PVFS_size buffer_used; /* substitute for q_item */
+ void *parent;
+ void *buffer;
+ PVFS_size buffer_used;
+ PVFS_size out_size;
PINT_segpool_handle_t seg_handle;
PINT_segpool_unit_id id;
PVFS_offset *offsets;
PINT_Request_state *file_req_state;
PVFS_size *sizes;
int segs;
- int parallel_sms;
};
-
+
struct PINT_server_small_io_op
{
PVFS_offset offsets[IO_MAX_REGIONS];
@@ -505,8 +515,7 @@ typedef struct PINT_server_op
struct PINT_server_rmdirent_op rmdirent;
struct PINT_server_io_op io;
struct PINT_server_small_io_op small_io;
- struct PINT_server_flow_read_op flow_read; /* for read sm */
- struct PINT_server_flow_write_op flow_write; /* for write sm */
+ struct PINT_server_rwsm_op rwsm;
struct PINT_server_flush_op flush;
struct PINT_server_truncate_op truncate;
struct PINT_server_mkdir_op mkdir;
Index: read.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/read.sm,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- read.sm 18 Apr 2009 21:59:24 -0000 1.1.2.2
+++ read.sm 20 Apr 2009 21:10:50 -0000 1.1.2.3
@@ -21,7 +21,7 @@
#include "rw-sm.h"
#include "trove.h"
-static void do_comp(struct fp_queue_item *);
+static void do_comp(struct PINT_server_op *);
%%
@@ -61,9 +61,8 @@ nested machine pvfs2_read_sm
static int trove_read_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
- PINT_segpool_handle_t seg_handle = s_op->u.flow_read.seg_handle;
- PINT_segpool_unit_id id = s_op->u.flow_read.id;
+ PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.rwsm.id;
int ret;
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
@@ -72,29 +71,30 @@ static int trove_read_call(struct PINT_s
PVFS_size *sizes;
PVFS_size bytes;
job_id_t tmp_id;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- q_item->buffer_used = 0;
+ s_op->u.rwsm.buffer_used = 0;
- if(!q_item->buffer) {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: !q_item->buffer\n", __func__);
- /* if the q_item has not been used, allocate a buffer */
- q_item->buffer = BMI_memalloc(
- q_item->parent->dest.u.bmi.address,
- q_item->parent->buffer_size, BMI_SEND);
+ if(!s_op->u.rwsm.buffer) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: !buffer\n", __func__);
+ /* if the buffer has not been used, allocate a buffer */
+ s_op->u.rwsm.buffer = BMI_memalloc(
+ parent_s_op->u.io.address,
+ parent_s_op->u.io.buffer_size, BMI_SEND);
/* TODO: error handling */
- assert(q_item->buffer);
+ assert(s_op->u.rwsm.buffer);
}
- bytes = q_item->parent->buffer_size;
+ bytes = parent_s_op->u.io.buffer_size;
PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
&offsets, &sizes);
gossip_debug(GOSSIP_IO_DEBUG, "%s: bytes=%ld, count=%d\n", __func__,
bytes, count);
- q_item->buffer_used = bytes;
- s_op->u.flow_read.offsets = offsets;
- s_op->u.flow_read.sizes = sizes;
- s_op->u.flow_read.segs = count;
+ s_op->u.rwsm.buffer_used = bytes;
+ s_op->u.rwsm.offsets = offsets;
+ s_op->u.rwsm.sizes = sizes;
+ s_op->u.rwsm.segs = count;
if(count == 0) {
js_p->error_code = 0;
@@ -105,15 +105,15 @@ static int trove_read_call(struct PINT_s
fs_config = PINT_config_get_filesystems(&server_config);
ret = job_trove_bstream_read_list(
- q_item->parent->src.u.trove.coll_id,
- q_item->parent->src.u.trove.handle,
- &q_item->buffer,
- (PVFS_size *)&q_item->buffer_used,
+ parent_s_op->u.io.coll_id,
+ parent_s_op->u.io.handle,
+ &s_op->u.rwsm.buffer,
+ (PVFS_size *)&s_op->u.rwsm.buffer_used,
1,
offsets,
sizes,
count,
- &q_item->out_size,
+ &s_op->u.rwsm.out_size,
fs_config->trove_sync_data,
NULL,
smcb,
@@ -121,7 +121,7 @@ static int trove_read_call(struct PINT_s
js_p,
&tmp_id,
server_job_context,
- q_item->parent->hints);
+ parent_s_op->u.io.hints);
gossip_debug(GOSSIP_IO_DEBUG, "%s: ret=%d\n", __func__, ret);
@@ -149,40 +149,38 @@ static int trove_read_call(struct PINT_s
static int bmi_send_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- if(s_op->u.flow_read.segs == 0 && q_item->buffer_used == 0) {
+ if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
js_p->error_code = 0;
gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
return SM_ACTION_COMPLETE;
}
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
- q_item->buffer_used);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
- (char *)q_item->buffer);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
+ s_op->u.rwsm.buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+ (char *)s_op->u.rwsm.buffer);
/***************************************************/
- do_comp(q_item);
+ do_comp(s_op);
/***************************************************/
- /* while we hold dest lock, look for next seq no. to send */
//do{
- assert(q_item->buffer_used);
- if(flow_data->parent->op != 0) { /* AS: when op is specified */
+ assert(s_op->u.rwsm.buffer_used);
+ if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
//ret = 1; /* AS: skip sending if op is specified */
gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
else
- ret = job_bmi_send(q_item->parent->dest.u.bmi.address,
- q_item->buffer,
- q_item->buffer_used,
- q_item->parent->tag,
+ ret = job_bmi_send(parent_s_op->u.io.address,
+ s_op->u.rwsm.buffer,
+ s_op->u.rwsm.buffer_used,
+ parent_s_op->u.io.tag,
BMI_PRE_ALLOC,
0, /* send_unexpected */
smcb, /* user_ptr */
@@ -191,14 +189,14 @@ static int bmi_send_call(struct PINT_smc
&tmp_id,
server_job_context,
user_opts->server_job_bmi_timeout,
- (bmi_hint)q_item->parent->hints);
-
+ (bmi_hint)parent_s_op->u.io.hints);
+#if 0
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
__func__,
flow_data->initial_posts, flow_data->dest_pending,
flow_data->dest_last_posted);
-
+#endif
if(ret < 0)
{
gossip_err("%s: I/O error occurred\n", __func__);
@@ -226,21 +224,21 @@ static int bmi_send_call(struct PINT_smc
static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
- PINT_segpool_handle_t h = s_op->u.flow_read.seg_handle;
+ PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
js_p->error_code = 0;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- q_item->parent->total_transferred += q_item->buffer_used;
+ parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
- q_item->parent->total_transferred);
+ parent_s_op->u.io.total_transferred);
gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
- s_op->u.flow_read.segs);
+ s_op->u.rwsm.segs);
/* FIXME: */
/* unless the second condition is set, the server starts
a new read request to one already done, and falls into
infinite trove_read->bmi_send->check_done loop */
- if(!segpool_done(h) && s_op->u.flow_read.segs != 0) {
+ if(!segpool_done(h) && s_op->u.rwsm.segs != 0) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: LOOP\n", __func__);
js_p->error_code = LOOP;
}
@@ -254,47 +252,48 @@ static int check_done(struct PINT_smcb *
static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_read.q_item;
js_p->error_code = 0;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- if(q_item->buffer) {
- BMI_memfree(q_item->parent->src.u.bmi.address,
- q_item->buffer, q_item->parent->buffer_size, BMI_SEND);
+ if(s_op->u.rwsm.buffer) {
+ BMI_memfree(parent_s_op->u.io.address, s_op->u.rwsm.buffer,
+ parent_s_op->u.io.buffer_size, BMI_SEND);
}
+
gossip_debug(GOSSIP_IO_DEBUG, "%s: read: \n", __func__);
return SM_ACTION_COMPLETE;
- //return(server_state_machine_complete(smcb)); // why not this?
}
-static void do_comp(struct fp_queue_item *q_item)
+static void do_comp(struct PINT_server_op *s_op)
{
- struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- if(q_item->buffer) {
+ if(s_op->u.rwsm.buffer) {
PVFS_size i;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "%s: q_item->buffer_used=%ld, op=0x%x, datatype=0x%x\n",
- __func__, q_item->buffer_used, flow_data->parent->op,
- flow_data->parent->datatype); /* AS */
+ "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n",
+ __func__, s_op->u.rwsm.buffer_used,
+ parent_s_op->u.io.op,
+ parent_s_op->u.io.datatype); /* AS */
- switch(flow_data->parent->datatype) {
+ switch(parent_s_op->u.io.datatype) {
case ((int)0x4c000405): /* MPI_INT */
{
- int *a = q_item->buffer;
+ int *a = s_op->u.rwsm.buffer;
int result;
- PVFS_size count = (q_item->buffer_used)/((*PVFS_INT).ub);
+ PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_INT).ub);
int *tmp;
- if (flow_data->parent->total_transferred == 0) {
- if (flow_data->parent->tmp_buffer == NULL)
- flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(int));
- memset(flow_data->parent->tmp_buffer, 0, sizeof(int));
+ if (parent_s_op->u.io.total_transferred == 0) {
+ if (parent_s_op->u.io.tmp_buffer == NULL)
+ parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(int));
+ memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(int));
}
- tmp = flow_data->parent->tmp_buffer;
- gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld, total_bytes_processed=%ld\n", flow_data->parent->total_transferred, flow_data->total_bytes_processed);
+ tmp = parent_s_op->u.io.tmp_buffer;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "total_transferred=%ld\n", parent_s_op->u.io.total_transferred);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%d\n", count, *tmp);
- switch(flow_data->parent->op) {
+ switch(parent_s_op->u.io.op) {
case 0x58000001: /* MAX */
result = *a;
for (i=1; i<count; i++ ) {
@@ -303,7 +302,7 @@ static void do_comp(struct fp_queue_item
}
}
a[0] = result;
- if (flow_data->parent->total_transferred == 0 ||
+ if (parent_s_op->u.io.total_transferred == 0 ||
result > *tmp)
*tmp = result;
break;
@@ -315,7 +314,7 @@ static void do_comp(struct fp_queue_item
}
}
a[0] = result;
- if (flow_data->parent->total_transferred == 0 ||
+ if (parent_s_op->u.io.total_transferred == 0 ||
result < *tmp)
*tmp = result;
break;
@@ -336,31 +335,30 @@ static void do_comp(struct fp_queue_item
default:
break;
}
- q_item->buffer = (void *)a;
- flow_data->parent->tmp_buffer = (void *)tmp;
+ s_op->u.rwsm.buffer = (void *)a;
+ parent_s_op->u.io.tmp_buffer = (void *)tmp;
}
break;
case ((int)0x4c00080b): /* MPI_DOUBLE */
{
- double *a = q_item->buffer;
+ double *a = s_op->u.rwsm.buffer;
double result;
- PVFS_size count = (q_item->buffer_used)/((*PVFS_DOUBLE).ub);
+ PVFS_size count = (s_op->u.rwsm.buffer_used)/((*PVFS_DOUBLE).ub);
double *tmp;
- if (flow_data->parent->total_transferred == 0) {
- if (flow_data->parent->tmp_buffer == NULL)
- flow_data->parent->tmp_buffer = (void *)malloc(1*sizeof(double));
- memset(flow_data->parent->tmp_buffer, 0, sizeof(double));
+ if (parent_s_op->u.io.total_transferred == 0) {
+ if (parent_s_op->u.io.tmp_buffer == NULL)
+ parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
+ memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
}
- tmp = flow_data->parent->tmp_buffer;
+ tmp = parent_s_op->u.io.tmp_buffer;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "total_transferred=%ld, total_bytes_processed=%ld\n",
- flow_data->parent->total_transferred,
- flow_data->total_bytes_processed);
+ "total_transferred=%ld\n",
+ parent_s_op->u.io.total_transferred);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%ld, tmp=%lf\n",
count, *tmp);
- switch(flow_data->parent->op) {
+ switch(parent_s_op->u.io.op) {
case 0x58000001: /* MAX */
result = *a;
for (i=1; i<count; i++ ) {
@@ -369,7 +367,7 @@ static void do_comp(struct fp_queue_item
}
}
a[0] = result;
- if (flow_data->parent->total_transferred == 0 ||
+ if (parent_s_op->u.io.total_transferred == 0 ||
result > *tmp)
*tmp = result;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "max=%lf\n", *tmp);
@@ -383,7 +381,7 @@ static void do_comp(struct fp_queue_item
}
a[0] = result;
- if (flow_data->parent->total_transferred == 0 ||
+ if (parent_s_op->u.io.total_transferred == 0 ||
result < *tmp)
*tmp = result;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "min=%lf\n",
@@ -404,8 +402,8 @@ static void do_comp(struct fp_queue_item
default:
break;
} /* end inner switch */
- q_item->buffer = (void *)a;
- flow_data->parent->tmp_buffer = (void *)tmp;
+ s_op->u.rwsm.buffer = (void *)a;
+ parent_s_op->u.io.tmp_buffer = (void *)tmp;
}
break;
Index: rw-sm.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/rw-sm.h,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- rw-sm.h 18 Apr 2009 21:59:24 -0000 1.1.2.2
+++ rw-sm.h 20 Apr 2009 21:10:50 -0000 1.1.2.3
@@ -1,81 +1,12 @@
-#include "src/io/flow/flowproto-support.h"
-#include "thread-mgr.h"
+#ifndef __RW_SM_H
+#define __RW_SM_H
-/* the following buffer settings are used by default if none are specified in
- * the flow descriptor
+/* the following buffer settings are used by default
*/
-#define BUFFERS_PER_FLOW 8
+#define BUFFERS_PER_PIPELINING 8
#define BUFFER_SIZE (256*1024)
-#define MAX_REGIONS 64
-
-#define FLOW_CLEANUP(__flow_data) \
-do { \
- struct flow_descriptor *__flow_d = (__flow_data)->parent; \
- gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto completing %p\n",\
- __flow_d); \
- cleanup_buffers(__flow_data); \
- __flow_d = (__flow_data)->parent; \
- free(__flow_data); \
-} while(0)
-
-
-struct result_chain_entry
-{
- PVFS_id_gen_t posted_id;
- char *buffer_offset;
- PINT_Request_result result;
- PVFS_size size_list[MAX_REGIONS];
- PVFS_offset offset_list[MAX_REGIONS];
- struct result_chain_entry *next;
- struct fp_queue_item *q_item;
- struct PINT_thread_mgr_trove_callback trove_callback;
-};
-
-/* fp_queue_item describes an individual buffer being used within the flow */
-struct fp_queue_item
-{
- PVFS_id_gen_t posted_id;
- int last;
- int seq;
- void *buffer;
- PVFS_size buffer_used;
- PVFS_size out_size;
- struct result_chain_entry result_chain;
- int result_chain_count;
- //struct qlist_head list_link;
- flow_descriptor *parent;
- struct PINT_thread_mgr_bmi_callback bmi_callback;
-};
-
-/* fp_private_data is information specific to this flow protocol, stored
- * in flow descriptor but hidden from caller
- */
-struct fp_private_data
-{
- flow_descriptor *parent;
- struct fp_queue_item* prealloc_array;
- struct qlist_head list_link;
- PVFS_size total_bytes_processed;
- int next_seq;
- int next_seq_to_send;
- int dest_pending;
- int dest_last_posted;
- int initial_posts;
- void *tmp_buffer_list[MAX_REGIONS];
- void *intermediate;
- int cleanup_pending_count;
- int req_proc_done;
-
- //struct qlist_head src_list;
- //struct qlist_head dest_list;
- //struct qlist_head empty_list;
-};
-#define PRIVATE_FLOW(target_flow)\
- ((struct fp_private_data*)(target_flow->flow_protocol_data))
-
-static void cleanup_buffers(
- struct fp_private_data *flow_data);
+enum {DO_READ=3, DO_WRITE, LOOP};
#if 0
@@ -192,86 +123,5 @@ static void handle_io_error(
#endif
-/* cleanup_buffers()
- *
- * releases any resources consumed during flow processing
- *
- * no return value
- */
-static void cleanup_buffers(struct fp_private_data *flow_data)
-{
- int i;
- struct result_chain_entry *result_tmp;
- struct result_chain_entry *old_result_tmp;
-
- gossip_debug(GOSSIP_IO_DEBUG, "%s: \n", __func__);
- if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
- {
- for(i=0; i<flow_data->parent->buffers_per_flow; i++)
- {
- if(flow_data->prealloc_array[i].buffer)
- {
- BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->prealloc_array[i].buffer,
- flow_data->parent->buffer_size,
- BMI_RECV);
- }
- result_tmp = &(flow_data->prealloc_array[i].result_chain);
- do{
- old_result_tmp = result_tmp;
- result_tmp = result_tmp->next;
- if(old_result_tmp !=
- &(flow_data->prealloc_array[i].result_chain))
- free(old_result_tmp);
- }while(result_tmp);
- flow_data->prealloc_array[i].result_chain.next = NULL;
- }
- }
- else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
- {
- for(i=0; i<flow_data->parent->buffers_per_flow; i++)
- {
- if(flow_data->prealloc_array[i].buffer)
- {
- BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->prealloc_array[i].buffer,
- flow_data->parent->buffer_size,
- BMI_SEND);
- }
- result_tmp = &(flow_data->prealloc_array[i].result_chain);
- do{
- old_result_tmp = result_tmp;
- result_tmp = result_tmp->next;
- if(old_result_tmp !=
- &(flow_data->prealloc_array[i].result_chain))
- free(old_result_tmp);
- }while(result_tmp);
- flow_data->prealloc_array[i].result_chain.next = NULL;
- }
- }
- else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
- {
- if(flow_data->intermediate)
- {
- BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
- }
- }
- else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
- {
- if(flow_data->intermediate)
- {
- BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
- }
- }
-
- free(flow_data->prealloc_array);
-}
-
-enum {DO_READ=3, DO_WRITE, LOOP};
+#endif /* __RW_SM_H */
Index: write.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/write.sm,v
diff -p -u -r1.1.2.3 -r1.1.2.4
--- write.sm 18 Apr 2009 21:59:24 -0000 1.1.2.3
+++ write.sm 20 Apr 2009 21:10:50 -0000 1.1.2.4
@@ -60,9 +60,8 @@ nested machine pvfs2_write_sm
static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
- PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
- PINT_segpool_unit_id id = s_op->u.flow_write.id;
+ PINT_segpool_handle_t seg_handle = s_op->u.rwsm.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.rwsm.id;
int ret;
job_id_t tmp_id;
struct server_configuration_s *user_opts = get_server_config_struct();
@@ -70,27 +69,28 @@ static PINT_sm_action bmi_recv_call(stru
PVFS_offset *offsets;
PVFS_size *sizes;
PVFS_size bytes;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- q_item->buffer_used = 0;
+ s_op->u.rwsm.buffer_used = 0;
- if(!q_item->buffer){
- /* if the q_item has not been used, allocate a buffer */
- q_item->buffer = BMI_memalloc(
- q_item->parent->src.u.bmi.address,
- q_item->parent->buffer_size, BMI_RECV);
+ if(!s_op->u.rwsm.buffer){
+ /* if the buffer has not been used, allocate a buffer */
+ s_op->u.rwsm.buffer = BMI_memalloc(
+ parent_s_op->u.io.address,
+ parent_s_op->u.io.buffer_size, BMI_RECV);
/* TODO: error handling */
- assert(q_item->buffer);
+ assert(s_op->u.rwsm.buffer);
}
- bytes = q_item->parent->buffer_size;
+ bytes = parent_s_op->u.io.buffer_size;
PINT_segpool_take_segments(seg_handle, id, &bytes, &count,
&offsets, &sizes);
gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%ld\n",
__func__, count, bytes);
- q_item->buffer_used = bytes;
- s_op->u.flow_write.offsets = offsets;
- s_op->u.flow_write.sizes = sizes;
- s_op->u.flow_write.segs = count;
+ s_op->u.rwsm.buffer_used = bytes;
+ s_op->u.rwsm.offsets = offsets;
+ s_op->u.rwsm.sizes = sizes;
+ s_op->u.rwsm.segs = count;
if(count == 0) {
js_p->error_code = 0;
@@ -98,10 +98,10 @@ static PINT_sm_action bmi_recv_call(stru
}
/* TODO: what if we recv less than expected? */
- ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
- (char *)q_item->buffer,
- q_item->parent->buffer_size,
- q_item->parent->tag,
+ ret = job_bmi_recv(parent_s_op->u.io.address,
+ (char *)s_op->u.rwsm.buffer,
+ parent_s_op->u.io.buffer_size,
+ parent_s_op->u.io.tag,
BMI_PRE_ALLOC,
smcb,
0, /* unsigned long status_user_tag = 0 */
@@ -109,7 +109,7 @@ static PINT_sm_action bmi_recv_call(stru
&tmp_id,
server_job_context,
user_opts->server_job_flow_timeout,
- (bmi_hint)q_item->parent->hints);
+ (bmi_hint)parent_s_op->u.io.hints);
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
@@ -135,18 +135,18 @@ static PINT_sm_action bmi_recv_call(stru
static PINT_sm_action trove_write_call(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
int ret;
job_id_t tmp_id;
struct filesystem_configuration_s *fs_config;
struct server_configuration_s *server_config;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%ld\n", __func__,
- q_item->buffer_used);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
- (char *)q_item->buffer);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer_used=%ld\n", __func__,
+ s_op->u.rwsm.buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: buffer[0]=%s\n", __func__,
+ (char *)s_op->u.rwsm.buffer);
- if(s_op->u.flow_write.segs == 0 && q_item->buffer_used == 0) {
+ if(s_op->u.rwsm.segs == 0 && s_op->u.rwsm.buffer_used == 0) {
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
@@ -154,15 +154,15 @@ static PINT_sm_action trove_write_call(s
fs_config = PINT_config_get_filesystems(&server_config);
ret = job_trove_bstream_write_list(
- q_item->parent->dest.u.trove.coll_id,
- q_item->parent->dest.u.trove.handle,
- (char**)&q_item->buffer,
- (TROVE_size *)&q_item->buffer_used,
+ parent_s_op->u.io.coll_id,
+ parent_s_op->u.io.handle,
+ (char**)&s_op->u.rwsm.buffer,
+ (TROVE_size *)&s_op->u.rwsm.buffer_used,
1,
- s_op->u.flow_write.offsets,
- s_op->u.flow_write.sizes,
- s_op->u.flow_write.segs,
- &q_item->out_size,
+ s_op->u.rwsm.offsets,
+ s_op->u.rwsm.sizes,
+ s_op->u.rwsm.segs,
+ &s_op->u.rwsm.out_size,
fs_config->trove_sync_data,
NULL,
smcb,
@@ -170,7 +170,7 @@ static PINT_sm_action trove_write_call(s
js_p,
&tmp_id,
server_job_context,
- q_item->parent->hints);
+ parent_s_op->u.io.hints);
if(ret < 0) {
gossip_err("%s: I/O error occurred\n", __func__);
@@ -197,15 +197,14 @@ static PINT_sm_action trove_write_call(s
static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
- PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+ PINT_segpool_handle_t h = s_op->u.rwsm.seg_handle;
js_p->error_code = 0;
+ struct PINT_server_op *parent_s_op = s_op->u.rwsm.parent;
- q_item->parent->total_transferred += q_item->buffer_used;
+ parent_s_op->u.io.total_transferred += s_op->u.rwsm.buffer_used;
gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
- q_item->parent->total_transferred);
+ parent_s_op->u.io.total_transferred);
- //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
if(!segpool_done(h)) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
js_p->error_code = LOOP;
More information about the Pvfs2-cvs
mailing list