[Pvfs2-cvs] commit by sson in pvfs2/src/server: io.sm pipeline.sm
pvfs2-server-req.c pvfs2-server.h small-io.sm
CVS commit program
cvs at parl.clemson.edu
Fri May 22 18:30:06 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv21440/src/server
Modified Files:
Tag: as-branch
io.sm pipeline.sm pvfs2-server-req.c pvfs2-server.h
small-io.sm
Log Message:
Added the server to server communication module within pipeline.sm
to pull the small portion of data when the data is not aligned on strip boundaries.
Changed the default file stripe size to 256KB (originally 64KB)
so that pipeline unit is equal to it.
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.73.6.8 -r1.73.6.9
--- io.sm 29 Apr 2009 18:55:03 -0000 1.73.6.8
+++ io.sm 22 May 2009 22:30:06 -0000 1.73.6.9
@@ -21,10 +21,9 @@
#include "pint-segpool.h"
//#include "pipeline.h"
-extern struct PINT_state_machine_s pvfs2_pipeline_sm;
-
-#define NUM_OF_PARALLEL_SMS 4
-#define BUFFER_SIZE (256*1024)
+#define NUM_OF_PARALLEL_SMS 8
+#define BUFFER_SIZE (256*1024) /* 256KB */
+//#define BUFFER_SIZE (64*1024) /* 64KB */
%%
@@ -158,9 +157,9 @@ static PINT_sm_action start_pipelining_s
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct server_configuration_s *user_opts = get_server_config_struct();
- struct filesystem_configuration_s *fs_conf;
+ struct filesystem_configuration_s *fs_config;
struct PINT_server_op *pipeline_op;
- int i, ret;
+ int i, ret, dfile_index;
PINT_segpool_handle_t seg_handle;
s_op->u.io.parallel_sms = 0;
@@ -197,19 +196,26 @@ static PINT_sm_action start_pipelining_s
s_op->u.io.user_ptr = NULL;
s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */
s_op->u.io.datatype = s_op->req->u.io.datatype; /* AS: dtype */
- s_op->u.io.dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
+ s_op->u.io.dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */
gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct);
- for(i=0; i<s_op->req->u.io.server_ct; i++)
+ gossip_debug(GOSSIP_IO_DEBUG, "fs_id=%d\n", s_op->req->u.io.fs_id);
+ for(i=0; i<s_op->req->u.io.server_ct; i++) {
gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%lu\n", i,
s_op->u.io.dfile_array[i]);
+ if(s_op->u.io.dfile_array[i] == s_op->req->u.io.handle) {
+ dfile_index = i;
+ gossip_debug(GOSSIP_IO_DEBUG, "dfile_index=%d\n", i);
+ }
+ }
- fs_conf = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
- if(fs_conf)
+ fs_config = PINT_config_find_fs_id(user_opts, s_op->req->u.io.fs_id);
+ if(fs_config)
{
/* pick up any buffer settings overrides from fs conf */
- s_op->u.io.buffer_size = fs_conf->fp_buffer_size;
- s_op->u.io.num_of_buffers = fs_conf->fp_buffers_per_flow;
+ s_op->u.io.buffer_size = fs_config->fp_buffer_size;
+ //s_op->u.io.num_of_buffers = fs_config->fp_buffers_per_flow;
+ s_op->u.io.num_of_buffers = NUM_OF_PARALLEL_SMS; /* FIXME */
}
gossip_debug(GOSSIP_IO_DEBUG, "%s: fsize: %lld, "
@@ -235,6 +241,26 @@ static PINT_sm_action start_pipelining_s
if(s_op->u.io.num_of_buffers < 1)
s_op->u.io.num_of_buffers = NUM_OF_PARALLEL_SMS;
+#if 0
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ server_config = get_server_config_struct();
+ if(!server_config) {
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+
+ fs_config = PINT_config_find_fs_id(server_config,
+ s_op->u.pipeline.fs_id);
+ if(!fs_config) {
+ gossip_err("%s: Failed to get filesystem "
+ "config from fs_id of: %d\n", __func__,
+ s_op->u.pipeline.fs_id);
+ js_p->error_code = -PVFS_EINVAL;
+ return SM_ACTION_COMPLETE;
+ }
+#endif
+
PINT_segpool_unit_id id[s_op->u.io.num_of_buffers];
PINT_dist_lookup(s_op->u.io.file_data.dist);
@@ -259,7 +285,7 @@ static PINT_sm_action start_pipelining_s
pipeline_op->u.pipeline.address = s_op->addr;
pipeline_op->u.pipeline.handle = s_op->req->u.io.handle;
- pipeline_op->u.pipeline.coll_id = s_op->req->u.io.fs_id;
+ pipeline_op->u.pipeline.fs_id = s_op->req->u.io.fs_id;
pipeline_op->u.pipeline.seg_handle = seg_handle;
pipeline_op->u.pipeline.id = id[i];
@@ -268,6 +294,16 @@ static PINT_sm_action start_pipelining_s
pipeline_op->u.pipeline.tag = s_op->tag;
pipeline_op->u.pipeline.buffer_size = BUFFER_SIZE;
pipeline_op->u.pipeline.io_type = s_op->req->u.io.io_type;
+
+ pipeline_op->u.pipeline.dfile_index = dfile_index;
+ pipeline_op->u.pipeline.dfile_count = s_op->u.io.file_data.server_ct;
+ pipeline_op->u.pipeline.dist = s_op->u.io.file_data.dist;
+
+ /* figure out if the fs config has trove data sync turned on or off
+ */
+ pipeline_op->u.pipeline.trove_sync_flag =
+ (fs_config->trove_sync_data ? TROVE_SYNC : 0);
+
if(s_op->req->u.io.io_type == PVFS_IO_READ) {
if(!pipeline_op->u.pipeline.buffer) {
/* if the buffer has not been used, allocate a buffer */
Index: pipeline.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/pipeline.sm,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- pipeline.sm 29 Apr 2009 18:55:03 -0000 1.1.2.2
+++ pipeline.sm 22 May 2009 22:30:06 -0000 1.1.2.3
@@ -18,11 +18,16 @@
#include "pint-distribution.h"
#include "pint-request.h"
#include "pvfs2-internal.h"
-//#include "pipeline.h"
#include "trove.h"
-static void do_comp(struct PINT_server_op *);
#define LOOP 3
+#define UNALIGNED 4
+#define UNALIGNED_LOOP 5
+#define UNALIGNED_DONE 6
+#define DO_COMP 7
+
+static int s2s_comp_fn(
+ void *v_p, struct PVFS_server_resp *resp_p, int index);
%%
@@ -37,9 +42,35 @@ nested machine pvfs2_pipeline_sm
state dispatch
{
run dispatch_data;
+ DO_COMP => check_align;
default => check;
}
+ state check_align
+ {
+ run check_align_fn;
+ UNALIGNED => setup_s2s;
+ success => do_comp;
+ }
+
+ state setup_s2s
+ {
+ run setup_s2s_msg;
+ success => s2s_exchange;
+ }
+
+ state s2s_exchange
+ {
+ jump pvfs2_msgpairarray_sm;
+ success => do_comp;
+ }
+
+ state do_comp
+ {
+ run do_comp_fn;
+ success => check;
+ }
+
state check
{
run check_done;
@@ -68,10 +99,8 @@ static PINT_sm_action fetch_data(struct
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_segpool_handle_t seg_handle = s_op->u.pipeline.seg_handle;
PINT_segpool_unit_id id = s_op->u.pipeline.id;
- struct filesystem_configuration_s *fs_config;
- struct server_configuration_s *server_config;
- struct server_configuration_s *user_opts = get_server_config_struct(); /* for write */
- int count, ret;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ int count, ret, i;
PVFS_offset *offsets;
PVFS_size *sizes;
PVFS_size bytes;
@@ -87,6 +116,11 @@ static PINT_sm_action fetch_data(struct
(s_op->u.pipeline.io_type==PVFS_IO_READ?"READ":"WRITE"),
bytes, count);
+ for(i=0; i<count; i++) {
+ gossip_debug(GOSSIP_IO_DEBUG, "offsets[%d]=%ld, sizes[%d]=%ld\n",
+ i, offsets[i], i, sizes[i]);
+ }
+
if(count == 0) {
js_p->error_code = 0;
//gossip_debug(GOSSIP_IO_DEBUG, "%s: count==0?\n", __func__);
@@ -99,26 +133,9 @@ static PINT_sm_action fetch_data(struct
s_op->u.pipeline.segs = count;
if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
- /* figure out if the fs config has trove data sync turned on or off
- */
- server_config = get_server_config_struct();
- if(!server_config) {
- js_p->error_code = -PVFS_EINVAL;
- return SM_ACTION_COMPLETE;
- }
-
- fs_config = PINT_config_find_fs_id(server_config,
- s_op->u.pipeline.coll_id);
- if(!fs_config) {
- gossip_err("%s: Failed to get filesystem "
- "config from fs_id of: %d\n", __func__,
- s_op->u.pipeline.coll_id);
- js_p->error_code = -PVFS_EINVAL;
- return SM_ACTION_COMPLETE;
- }
-
+
ret = job_trove_bstream_read_list
- (s_op->u.pipeline.coll_id,
+ (s_op->u.pipeline.fs_id,
s_op->u.pipeline.handle,
(char **)&s_op->u.pipeline.buffer,
(PVFS_size *)&s_op->u.pipeline.buffer_used,
@@ -127,7 +144,7 @@ static PINT_sm_action fetch_data(struct
sizes,
count,
&s_op->u.pipeline.out_size,
- (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+ s_op->u.pipeline.trove_sync_flag,
NULL,
smcb,
0,
@@ -143,7 +160,7 @@ static PINT_sm_action fetch_data(struct
s_op->u.pipeline.tag,
BMI_PRE_ALLOC,
smcb,
- 1, /* unsigned long status_user_tag = 0 */
+ 0, /* unsigned long status_user_tag = 0 */
js_p,
&tmp_id,
server_job_context,
@@ -184,9 +201,7 @@ static PINT_sm_action dispatch_data(stru
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret;
job_id_t tmp_id;
- struct filesystem_configuration_s *fs_config; /* for write */
- struct server_configuration_s *server_config; /* for write */
- struct server_configuration_s *user_opts = get_server_config_struct(); /* for read */
+ struct server_configuration_s *user_opts = get_server_config_struct();
struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
if(s_op->u.pipeline.segs == 0) {
@@ -205,22 +220,15 @@ static PINT_sm_action dispatch_data(stru
if(s_op->u.pipeline.io_type == PVFS_IO_READ) {
- /***************************************************/
- if(parent_s_op->u.io.op != 0)
- do_comp(s_op);
- /***************************************************/
-
assert(s_op->u.pipeline.buffer_used);
+
if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */
- ret = 1; /* AS: skip sending if op is specified */
+ ret = DO_COMP; /* AS: skip sending if op is specified */
gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
else
- /* FIXME: what should be a proper value for status_user_tag? */
- /* FIXME: 3rd parameter, if s_op->u.pipeline.out_size is used,
- client might complain about size mismatch */
ret = job_bmi_send(s_op->u.pipeline.address,
s_op->u.pipeline.buffer,
js_p->actual_size,
@@ -237,27 +245,8 @@ static PINT_sm_action dispatch_data(stru
}
else if(s_op->u.pipeline.io_type == PVFS_IO_WRITE) {
- /* figure out if the fs config has trove data sync turned on or off
- */
- server_config = get_server_config_struct();
- if(!server_config) {
- gossip_err("write_io: server config is NULL!\n");
- js_p->error_code = -PVFS_EINVAL;
- return SM_ACTION_COMPLETE;
- }
-
- fs_config = PINT_config_find_fs_id(server_config,
- s_op->u.pipeline.coll_id);
- if(!fs_config) {
- gossip_err("write_io: Failed to get filesystem "
- "config from fs_id of: %d\n",
- s_op->u.pipeline.coll_id);
- js_p->error_code = -PVFS_EINVAL;
- return SM_ACTION_COMPLETE;
- }
-
ret = job_trove_bstream_write_list
- (s_op->u.pipeline.coll_id,
+ (s_op->u.pipeline.fs_id,
s_op->u.pipeline.handle,
(char **)&s_op->u.pipeline.buffer,
(TROVE_size *)&js_p->actual_size,
@@ -266,7 +255,7 @@ static PINT_sm_action dispatch_data(stru
s_op->u.pipeline.sizes,
s_op->u.pipeline.segs,
&s_op->u.pipeline.out_size,
- (fs_config->trove_sync_data ? TROVE_SYNC : 0),
+ s_op->u.pipeline.trove_sync_flag,
NULL,
smcb,
0,
@@ -298,67 +287,180 @@ static PINT_sm_action dispatch_data(stru
return SM_ACTION_COMPLETE;
}
-static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+
+static PINT_sm_action check_align_fn(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
- PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
- js_p->error_code = 0;
struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+ js_p->error_code = 0;
+ PVFS_size file_req_offset = parent_s_op->u.io.file_req_offset;
+ PINT_request_file_data fdata = parent_s_op->u.io.file_data;
+ PVFS_size count;
+ PVFS_offset strip_boundary;
- /* FIMXE: do we really need this lock? */
- gen_mutex_lock(&parent_s_op->u.io.mutex);
- parent_s_op->u.io.total_transferred += js_p->actual_size;
- gen_mutex_unlock(&parent_s_op->u.io.mutex);
+ PVFS_offset loff = fdata.dist->methods->physical_to_logical_offset(fdata.dist->params, &fdata, s_op->u.pipeline.offsets[0]);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
- parent_s_op->u.io.total_transferred);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: count=%d\n", __func__,
- s_op->u.pipeline.segs);
- gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
- js_p->actual_size);
+ s_op->u.pipeline.loff = loff;
+ gossip_debug(GOSSIP_IO_DEBUG, "loff=%ld, file_req_offset=%ld\n", loff, file_req_offset);
+ switch(parent_s_op->u.io.datatype) {
+ case ((int)0x4c000405): /* MPI_INT */
+ count = (PVFS_size)(s_op->u.pipeline.buffer_used-file_req_offset)/((*PVFS_INT).ub);
+ s_op->u.pipeline.unaligned_size = (s_op->u.pipeline.buffer_used-file_req_offset)%((*PVFS_INT).ub);
+ s_op->u.pipeline.unaligned_size = ((*PVFS_INT).ub) - s_op->u.pipeline.unaligned_size;
- /* FIXME: */
- /* unless the second condition is set, the server starts
- a new read request to one already done, and falls into
- infinite trove_read->bmi_send->check_done loop */
- if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
- js_p->error_code = LOOP;
- }
- else {
- gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
+ if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
+ js_p->error_code = UNALIGNED;
+ }
+ case ((int)0x4c00080b): /* MPI_DOUBLE */
+ count = (PVFS_size)(s_op->u.pipeline.buffer_used -
+ file_req_offset)/((*PVFS_DOUBLE).ub);
+ gossip_debug(GOSSIP_IO_DEBUG, "count=%ld\n", count);
+ strip_boundary = ((int)(loff/262144))*262144; /* FIXME */
+ s_op->u.pipeline.unaligned_size = loff-strip_boundary;
+
+ if (loff == strip_boundary && file_req_offset != 0) {
+ s_op->u.pipeline.unaligned_size = file_req_offset;
+ }
+
+ if (s_op->u.pipeline.unaligned_size != 0 && count != 0) {
+ js_p->error_code = UNALIGNED;
+ gossip_debug(GOSSIP_IO_DEBUG, "unaligned_size=%ld\n",
+ s_op->u.pipeline.unaligned_size);
+ }
}
-
+
return SM_ACTION_COMPLETE;
}
-static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+
+static PINT_sm_action setup_s2s_msg(struct PINT_smcb *smcb, job_status_s *js_p)
{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+ PINT_sm_msgpair_state *msg_p = NULL;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ int regions;
+ int ret;
+ PVFS_credentials creds;
+ int next_server_index;
+ PVFS_handle next_server_handle;
+
+ /* init msgpair */
+ PINT_msgpair_init(&s_op->msgarray_op);
+ msg_p = &s_op->msgarray_op.msgpair;
+
+ s_op->msgarray_op.params.job_timeout = user_opts->client_job_bmi_timeout;
+ s_op->msgarray_op.params.retry_delay = user_opts->client_retry_delay_ms;
+ s_op->msgarray_op.params.retry_limit = user_opts->client_retry_limit;
+ s_op->msgarray_op.params.quiet_flag = 1;
+
+ PINT_util_gen_credentials(&creds);
+ gossip_debug(GOSSIP_IO_DEBUG, "loff=%ld, buffer_used=%ld\n", s_op->u.pipeline.loff, s_op->u.pipeline.buffer_used);
+
+ /* determine which server we need to talk to */
+ next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count);
+ next_server_handle = parent_s_op->u.io.dfile_array[next_server_index];
+
+ /* build a request */
+ ret = PVFS_Request_contiguous(s_op->u.pipeline.unaligned_size,
+ PVFS_BYTE, &s_op->u.pipeline.file_req);
+
+ s_op->u.pipeline.file_req_offset = (((int)(s_op->u.pipeline.loff/262144))+1)*262144; /* FIXME */
+
+ regions = 1;
+ gossip_debug(GOSSIP_IO_DEBUG, "s_op->u.pipeline.file_req_offset=%ld\n", s_op->u.pipeline.file_req_offset);
+
+ PINT_SERVREQ_SMALL_IO_FILL(msg_p->req,
+ creds,
+ s_op->u.pipeline.fs_id,
+ next_server_handle,
+ s_op->u.pipeline.io_type,
+ next_server_index,
+ s_op->u.pipeline.dfile_count,
+ s_op->u.pipeline.dist,
+ s_op->u.pipeline.file_req,
+ s_op->u.pipeline.file_req_offset,
+ regions,
+ s_op->u.pipeline.unaligned_size,
+ NULL /* s_op->hints */);
+
+ msg_p->fs_id = s_op->u.pipeline.fs_id;
+ msg_p->handle = next_server_handle;
+ msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
+ msg_p->comp_fn = s2s_comp_fn;
+
+ ret = PINT_cached_config_map_to_server(&msg_p->svr_addr,
+ next_server_handle, //s_op->u.pipeline.handle,
+ s_op->u.pipeline.fs_id);
+ if(ret < 0) {
+ gossip_err("Failed to map meta server address\n");
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
js_p->error_code = 0;
+ PINT_sm_push_frame(smcb, 0, &s_op->msgarray_op);
return SM_ACTION_COMPLETE;
}
-static void do_comp(struct PINT_server_op *s_op)
+static int s2s_comp_fn(void *v_p, struct PVFS_server_resp *resp_p,
+ int index)
{
+ PINT_smcb *smcb = v_p;
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s called\n", __func__);
+ gossip_debug(GOSSIP_IO_DEBUG, "resp_p->status=%d\n", resp_p->status);
+ gossip_debug(GOSSIP_IO_DEBUG, "small_io: result_size=%ld\n",
+ resp_p->u.small_io.result_size);
+
+ assert(resp_p->op == PVFS_SERV_SMALL_IO);
+
+ if (resp_p->status != 0) {
+ return resp_p->status;
+ }
+
+ if(resp_p->u.small_io.result_size != 0) {
+ memcpy(s_op->u.pipeline.tmp_buf, resp_p->u.small_io.buffer,
+ resp_p->u.small_io.result_size);
+ }
+
+ return 0;
+}
+
+
+static PINT_sm_action do_comp_fn(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+ js_p->error_code = 0;
if(s_op->u.pipeline.buffer) {
PVFS_size i;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "%s: buffer_used=%ld, op=0x%x, datatype=0x%x\n",
+ "%s: buffer_used=%ld, op=0x%x, datatype=0x%x, actual_size=%ld\n",
__func__, s_op->u.pipeline.buffer_used,
parent_s_op->u.io.op,
- parent_s_op->u.io.datatype); /* AS */
+ parent_s_op->u.io.datatype,
+ js_p->actual_size); /* AS */
switch(parent_s_op->u.io.datatype) {
case ((int)0x4c000405): /* MPI_INT */
{
int *a = s_op->u.pipeline.buffer;
int result;
- PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_INT).ub);
+ PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_INT).ub);
int *tmp;
+
+ if(count == 0)
+ return SM_ACTION_COMPLETE;
+ /* data is not aligned perfectly, so adjust it */
+ if(s_op->u.pipeline.unaligned_size != 0) {
+ memcpy(((char*)&a[count])+(((*PVFS_INT).ub)-s_op->u.pipeline.unaligned_size), s_op->u.pipeline.tmp_buf, s_op->u.pipeline.unaligned_size);
+ count++;
+ }
if (parent_s_op->u.io.total_transferred == 0) {
if (parent_s_op->u.io.tmp_buffer == NULL)
@@ -420,13 +522,42 @@ static void do_comp(struct PINT_server_o
{
double *a = s_op->u.pipeline.buffer;
double result;
- PVFS_size count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
+ PVFS_size count = (s_op->u.pipeline.buffer_used-s_op->u.pipeline.unaligned_size)/((*PVFS_DOUBLE).ub);
double *tmp;
+ PVFS_offset strip_boundary = (int)(s_op->u.pipeline.loff/262144)*262144; /* FIXME */
+
+ if (s_op->u.pipeline.buffer_used < 262144 &&
+ s_op->u.pipeline.loff != strip_boundary) { /* FIXME */
+ count = (s_op->u.pipeline.buffer_used)/((*PVFS_DOUBLE).ub);
+ }
+ gossip_debug(GOSSIP_IO_DEBUG, "count=%ld\n", count);
+ if(count < 1)
+ return SM_ACTION_COMPLETE;
+
+ /* data is not aligned perfectly, so adjust it within the memory*/
+ if(s_op->u.pipeline.unaligned_size != 0) {
+ PVFS_size adj_sz = s_op->u.pipeline.unaligned_size;
+ PVFS_size tmp_sz = ((*PVFS_DOUBLE).ub) - adj_sz;
+
+ if(s_op->u.pipeline.loff == strip_boundary) {
+ memcpy(a, (char*)&a[0]+adj_sz,
+ s_op->u.pipeline.buffer_used-adj_sz);
+ }
+
+ memcpy(((char*)&a[count])+tmp_sz, s_op->u.pipeline.tmp_buf,
+ s_op->u.pipeline.unaligned_size);
+
+
+ count++;
+
+ if(s_op->u.pipeline.buffer_used < (262144-((*PVFS_DOUBLE).ub)))
+ count--;
+ }
if (parent_s_op->u.io.total_transferred == 0) {
if (parent_s_op->u.io.tmp_buffer == NULL)
parent_s_op->u.io.tmp_buffer = (void *)malloc(1*sizeof(double));
- memset(parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
+ memset( parent_s_op->u.io.tmp_buffer, 0, sizeof(double));
}
tmp = parent_s_op->u.io.tmp_buffer;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
@@ -466,7 +597,7 @@ static void do_comp(struct PINT_server_o
case 0x58000003: /* SUM */
result = 0;
for (i=0; i<count; i++ ) {
- if (i<10) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
+ if (i<10 || i== (count-1) || i == count) gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "a[%ld]=%lf\n", i, a[i]);
result += a[i];
}
@@ -487,6 +618,49 @@ static void do_comp(struct PINT_server_o
break;
} /* end switch() */
} /* end if() */
+
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct PINT_server_op *parent_s_op = s_op->u.pipeline.parent;
+ PINT_segpool_handle_t h = s_op->u.pipeline.seg_handle;
+ js_p->error_code = 0;
+
+ /* FIMXE: do we really need this lock? */
+ gen_mutex_lock(&parent_s_op->u.io.mutex);
+ //parent_s_op->u.io.total_transferred += js_p->actual_size;
+ parent_s_op->u.io.total_transferred += s_op->u.pipeline.buffer_used;
+ gen_mutex_unlock(&parent_s_op->u.io.mutex);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%ld\n", __func__,
+ parent_s_op->u.io.total_transferred);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: actual_size=%ld\n", __func__,
+ js_p->actual_size);
+
+
+ /* FIXME: */
+ /* unless the second condition is set, the server starts
+ a new read request to one already done, and falls into
+ infinite trove_read->bmi_send->check_done loop */
+ if(!segpool_done(h) && s_op->u.pipeline.segs != 0) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: LOOP\n", __func__);
+ js_p->error_code = LOOP;
+ }
+ else {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__);
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ js_p->error_code = 0;
+
+ return SM_ACTION_COMPLETE;
}
/*
Index: pvfs2-server-req.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server-req.c,v
diff -p -u -r1.5 -r1.5.6.1
--- pvfs2-server-req.c 20 Nov 2008 01:17:11 -0000 1.5
+++ pvfs2-server-req.c 22 May 2009 22:30:06 -0000 1.5.6.1
@@ -47,6 +47,7 @@ extern struct PINT_server_req_params pvf
extern struct PINT_server_req_params pvfs2_unstuff_params;
extern struct PINT_server_req_params pvfs2_stuffed_create_params;
extern struct PINT_server_req_params pvfs2_precreate_pool_refiller_params;
+//extern struct PINT_server_req_params pvfs2_s2s_params; /* sson */
/* table of incoming request types and associated parameters */
struct PINT_server_req_entry PINT_server_req_table[] =
@@ -90,6 +91,8 @@ struct PINT_server_req_entry PINT_server
/* 36 */ {PVFS_SERV_BATCH_REMOVE, &pvfs2_batch_remove_params},
/* 37 */ {PVFS_SERV_PRECREATE_POOL_REFILLER, &pvfs2_precreate_pool_refiller_params},
/* 38 */ {PVFS_SERV_UNSTUFF, &pvfs2_unstuff_params},
+ // /* 39 */ {PVFS_SERV_S2S_COMM, &pvfs2_s2s_comm_params}, /* sson */
+ ///* 40 */ {PVFS_SERV_S2S, &pvfs2_s2s_params}, /* sson */
};
#define CHECK_OP(_op_) assert(_op_ == PINT_server_req_table[_op_].op_type)
Index: pvfs2-server.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.155.6.5 -r1.155.6.6
--- pvfs2-server.h 28 Apr 2009 21:36:02 -0000 1.155.6.5
+++ pvfs2-server.h 22 May 2009 22:30:06 -0000 1.155.6.6
@@ -361,10 +361,21 @@ struct PINT_server_io_op
/* substibute for flow */
struct PINT_server_pipeline_op
{
- PVFS_fs_id coll_id;
+ PVFS_fs_id fs_id;
PVFS_handle handle;
PVFS_BMI_addr_t address;
+ int dfile_index;
+ int dfile_count;
+ struct PINT_dist_s *dist;
+
+ PINT_Request *file_req;
+ PVFS_offset file_req_offset;
+ PINT_Request *mem_req;
+
+ char tmp_buf[128]; /* FIXME */
+ PVFS_size unaligned_size;
+
enum PVFS_io_type io_type;
void *parent;
@@ -379,6 +390,9 @@ struct PINT_server_pipeline_op
int segs;
PVFS_hint hints;
PVFS_msg_tag_t tag;
+ int trove_sync_flag;
+ int adjust_flag; /* FIXME */
+ PVFS_offset loff;
};
struct PINT_server_small_io_op
@@ -558,6 +572,7 @@ extern struct PINT_state_machine_s pvfs2
extern struct PINT_state_machine_s pvfs2_remove_work_sm;
extern struct PINT_state_machine_s pvfs2_mkdir_work_sm;
extern struct PINT_state_machine_s pvfs2_unexpected_sm;
+extern struct PINT_state_machine_s pvfs2_pipeline_sm; /* sson */
/* Exported Prototypes */
struct server_configuration_s *get_server_config_struct(void);
Index: small-io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/small-io.sm,v
diff -p -u -r1.23 -r1.23.6.1
--- small-io.sm 20 Nov 2008 01:17:11 -0000 1.23
+++ small-io.sm 22 May 2009 22:30:06 -0000 1.23.6.1
@@ -169,6 +169,7 @@ static PINT_sm_action small_io_start_job
}
else
{
+ gossip_debug(GOSSIP_IO_DEBUG, "result.bytes=%d\n", result.bytes); /* sson */
/* allocate space for the read in the response buffer */
s_op->resp.u.small_io.buffer = BMI_memalloc(
s_op->addr, result.bytes, BMI_SEND);
More information about the Pvfs2-cvs
mailing list