[PVFS2-CVS] commit by neill in pvfs2/src/server: io.sm
CVS commit program
cvs at parl.clemson.edu
Wed Mar 10 12:21:22 EST 2004
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb:/tmp/cvs-serv31075/src/server
Modified Files:
io.sm
Log Message:
- server and client side io sm cleanups
- give io it's own debugmask
- improved error handling on the client side
Index: io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/io.sm,v
diff -p -u -r1.40 -r1.41
--- io.sm 19 Jan 2004 15:56:22 -0000 1.40
+++ io.sm 10 Mar 2004 17:21:22 -0000 1.41
@@ -16,12 +16,16 @@
#include "pint-distribution.h"
#include "pint-request.h"
-static int io_send_ack(PINT_server_op *s_op, job_status_s* js_p);
-static int io_send_completion_ack(PINT_server_op *s_op,
- job_status_s* js_p);
-static int io_start_flow(PINT_server_op *s_op, job_status_s* js_p);
-static int io_release(PINT_server_op *s_op, job_status_s* js_p);
-static int io_cleanup(PINT_server_op *s_op, job_status_s* js_p);
+static int io_send_ack(
+ PINT_server_op *s_op, job_status_s* js_p);
+static int io_send_completion_ack(
+ PINT_server_op *s_op, job_status_s* js_p);
+static int io_start_flow(
+ PINT_server_op *s_op, job_status_s* js_p);
+static int io_release(
+ PINT_server_op *s_op, job_status_s* js_p);
+static int io_cleanup(
+ PINT_server_op *s_op, job_status_s* js_p);
extern PINT_server_trove_keys_s Trove_Common_Keys[];
@@ -32,52 +36,57 @@ extern PINT_server_trove_keys_s Trove_Co
%%
-machine pvfs2_io_sm(prelude, send_positive_ack, send_negative_ack,
- start_flow, cleanup, release, send_completion_ack)
+machine pvfs2_io_sm(prelude,
+ send_positive_ack,
+ send_negative_ack,
+ start_flow,
+ cleanup,
+ release,
+ send_completion_ack)
{
- state prelude
- {
- jump pvfs2_prelude_sm;
- success => send_positive_ack;
- default => send_negative_ack;
- }
-
- state send_positive_ack
- {
- run io_send_ack;
- success => start_flow;
- default => release;
- }
-
- state send_negative_ack
- {
- run io_send_ack;
- default => release;
- }
-
- state start_flow
- {
- run io_start_flow;
- default => send_completion_ack;
- }
-
- state send_completion_ack
- {
- run io_send_completion_ack;
- default => release;
- }
-
- state release
- {
- run io_release;
- default => cleanup;
- }
-
- state cleanup
- {
- run io_cleanup;
- default => terminate;
- }
+ state prelude
+ {
+ jump pvfs2_prelude_sm;
+ success => send_positive_ack;
+ default => send_negative_ack;
+ }
+
+ state send_positive_ack
+ {
+ run io_send_ack;
+ success => start_flow;
+ default => release;
+ }
+
+ state send_negative_ack
+ {
+ run io_send_ack;
+ default => release;
+ }
+
+ state start_flow
+ {
+ run io_start_flow;
+ default => send_completion_ack;
+ }
+
+ state send_completion_ack
+ {
+ run io_send_completion_ack;
+ default => release;
+ }
+
+ state release
+ {
+ run io_release;
+ default => cleanup;
+ }
+
+ state cleanup
+ {
+ run io_cleanup;
+ default => terminate;
+ }
}
%%
@@ -103,54 +112,44 @@ machine pvfs2_io_sm(prelude, send_positi
*/
static int io_send_ack(PINT_server_op *s_op, job_status_s* js_p)
{
- int err = -1;
- job_id_t tmp_id;
+ int err = -PVFS_EIO;
+ job_id_t tmp_id;
- PINT_STATE_DEBUG("send_ack");
+ PINT_STATE_DEBUG("send_ack");
- /* this is where we report the file size to the client before
- * starting the I/O transfer, or else report an error if we
- * failed to get the size, or failed for permission reasons
- */
- s_op->resp.status = js_p->error_code;
- s_op->resp.u.io.bstream_size = s_op->ds_attr.b_size;
-
- err = PINT_encode(
- &s_op->resp,
- PINT_ENCODE_RESP,
- &(s_op->encoded),
- s_op->addr,
- s_op->decoded.enc_type);
-
- if(err < 0)
- {
- /* critical error prior to job posting */
- gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
- /* handle by setting error code in job status structure and
- * returning 1
- */
- js_p->error_code = err;
- return(1);
- }
- else
- {
- err = job_bmi_send_list(
- s_op->addr,
- s_op->encoded.buffer_list,
- s_op->encoded.size_list,
- s_op->encoded.list_count,
- s_op->encoded.total_size,
- s_op->tag,
- s_op->encoded.buffer_type,
- 0,
- s_op,
- 0,
- js_p,
- &tmp_id,
- server_job_context);
- }
-
- return(err);
+ /* this is where we report the file size to the client before
+ * starting the I/O transfer, or else report an error if we
+ * failed to get the size, or failed for permission reasons
+ */
+ s_op->resp.status = js_p->error_code;
+ s_op->resp.u.io.bstream_size = s_op->ds_attr.b_size;
+
+ err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
+ s_op->addr, s_op->decoded.enc_type);
+ if (err < 0)
+ {
+ gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
+ js_p->error_code = err;
+ return(1);
+ }
+ else
+ {
+ err = job_bmi_send_list(
+ s_op->addr,
+ s_op->encoded.buffer_list,
+ s_op->encoded.size_list,
+ s_op->encoded.list_count,
+ s_op->encoded.total_size,
+ s_op->tag,
+ s_op->encoded.buffer_type,
+ 0,
+ s_op,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context);
+ }
+ return(err);
}
/*
@@ -173,78 +172,73 @@ static int io_send_ack(PINT_server_op *s
*/
static int io_start_flow(PINT_server_op *s_op, job_status_s* js_p)
{
- int err = -1;
- job_id_t tmp_id;
+ int err = -PVFS_EIO;
+ job_id_t tmp_id;
- PINT_STATE_DEBUG("start_flow");
+ PINT_STATE_DEBUG("start_flow");
- s_op->u.io.flow_d = PINT_flow_alloc();
- if(!s_op->u.io.flow_d)
- {
- /* critical error, jump to next error state */
- js_p->error_code = -PVFS_ENOMEM;
- return(1);
- }
-
- /* we still have the file size stored in the response structure
- * that we sent in the previous state, other details come from
- * request
- */
- s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
- s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
- s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
- s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
-
- /* on writes, we allow the bstream to be extended at EOF */
- if(s_op->req->u.io.io_type == PVFS_IO_WRITE)
- s_op->u.io.flow_d->file_data.extend_flag = 1;
- else
- s_op->u.io.flow_d->file_data.extend_flag = 0;
-
- s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
- s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
- s_op->u.io.flow_d->mem_req = NULL;
- s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
- s_op->u.io.flow_d->tag = s_op->tag;
- s_op->u.io.flow_d->user_ptr = NULL;
- s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
-
- /* set endpoints depending on type of io requested */
- if(s_op->req->u.io.io_type == PVFS_IO_WRITE)
- {
- s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
- s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
- s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
- s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
- s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
- }
- else if(s_op->req->u.io.io_type == PVFS_IO_READ)
- {
- s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
- s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
- s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
- s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
- s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
- }
- else
- {
- /* critical error; jump out of state */
- gossip_lerr("Server: IO SM: unknown IO type requested.\n");
- js_p->error_code = -PVFS_EINVAL;
- return(1);
- }
-
-
- /* start the flow! */
- err = job_flow(
- s_op->u.io.flow_d,
- s_op,
- 0,
- js_p,
- &tmp_id,
- server_job_context);
+ s_op->u.io.flow_d = PINT_flow_alloc();
+ if (!s_op->u.io.flow_d)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ return(1);
+ }
+
+ /* we still have the file size stored in the response structure
+ * that we sent in the previous state, other details come from
+ * request
+ */
+ s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
+ s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
+ s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
+ s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
+
+ /* on writes, we allow the bstream to be extended at EOF */
+ if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
+ {
+ s_op->u.io.flow_d->file_data.extend_flag = 1;
+ }
+ else
+ {
+ s_op->u.io.flow_d->file_data.extend_flag = 0;
+ }
+
+ s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
+ s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
+ s_op->u.io.flow_d->mem_req = NULL;
+ s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
+ s_op->u.io.flow_d->tag = s_op->tag;
+ s_op->u.io.flow_d->user_ptr = NULL;
+ s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
+
+ /* set endpoints depending on type of io requested */
+ if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
+ {
+ s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
+ s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
+ s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
+ s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
+ s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
+ }
+ else if (s_op->req->u.io.io_type == PVFS_IO_READ)
+ {
+ s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
+ s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
+ s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
+ s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
+ s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
+ }
+ else
+ {
+ gossip_lerr("Server: IO SM: unknown IO type requested.\n");
+ js_p->error_code = -PVFS_EINVAL;
+ return(1);
+ }
- return(err);
+ err = job_flow(s_op->u.io.flow_d, s_op, 0, js_p, &tmp_id,
+ server_job_context);
+
+ return(err);
}
@@ -265,20 +259,21 @@ static int io_start_flow(PINT_server_op
*/
static int io_release(PINT_server_op *s_op, job_status_s* js_p)
{
+ int ret = 0;
+ job_id_t i;
- int ret=0;
- job_id_t i;
+ PINT_STATE_DEBUG("release");
- PINT_STATE_DEBUG("release");
+ /* tell the scheduler that we are done with this operation */
+ ret = job_req_sched_release(
+ s_op->scheduled_id,
+ s_op,
+ 0,
+ js_p,
+ &i,
+ server_job_context);
- /* tell the scheduler that we are done with this operation */
- ret = job_req_sched_release(s_op->scheduled_id,
- s_op,
- 0,
- js_p,
- &i,
- server_job_context);
- return ret;
+ return ret;
}
/*
@@ -298,22 +293,21 @@ static int io_release(PINT_server_op *s_
*/
static int io_cleanup(PINT_server_op *s_op, job_status_s* js_p)
{
- PINT_STATE_DEBUG("cleanup");
-
- if(s_op->u.io.flow_d)
- {
- PINT_flow_free(s_op->u.io.flow_d);
- }
-
- /* let go of our encoded response buffer,
- * if we appear to have made one
- */
- if(s_op->encoded.total_size)
- {
- PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
- }
+ PINT_STATE_DEBUG("cleanup");
- return(server_state_machine_complete(s_op));
+ if (s_op->u.io.flow_d)
+ {
+ PINT_flow_free(s_op->u.io.flow_d);
+ }
+
+ /* let go of our encoded response buffer,
+ * if we appear to have made one
+ */
+ if (s_op->encoded.total_size)
+ {
+ PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
+ }
+ return(server_state_machine_complete(s_op));
}
/*
@@ -334,69 +328,68 @@ static int io_cleanup(PINT_server_op *s_
* send either positive or negative acknowledgements.
*
*/
-static int io_send_completion_ack(PINT_server_op *s_op,
- job_status_s* js_p)
+static int io_send_completion_ack(
+ PINT_server_op *s_op, job_status_s* js_p)
{
- int err = -1;
- job_id_t tmp_id;
+ int err = -PVFS_EIO;
+ job_id_t tmp_id;
- PINT_STATE_DEBUG("send_completion_ack");
+ PINT_STATE_DEBUG("send_completion_ack");
- /* we only send this trailing ack if we are working on a write
- * operation; otherwise just cut out early
- */
- if(s_op->req->u.io.io_type == PVFS_IO_READ)
- {
- js_p->error_code = 0;
- return(1);
- }
-
- /* release encoding of the first ack that we sent */
- PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
- /* zero size for safety */
- s_op->encoded.total_size = 0;
-
- /* fill in response -- status field is the only generic one we should have to set */
- s_op->resp.op = PVFS_SERV_WRITE_COMPLETION; /* not IO */
- s_op->resp.status = js_p->error_code;
- s_op->resp.u.write_completion.total_completed = s_op->u.io.flow_d->total_transfered;
-
- err = PINT_encode(
- &s_op->resp,
- PINT_ENCODE_RESP,
- &(s_op->encoded),
- s_op->addr,
- s_op->decoded.enc_type);
-
- if(err < 0)
- {
- /* critical error prior to job posting */
- gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
- /* handle by setting error code in job status structure and
- * returning 1
- */
- js_p->error_code = err;
- return(1);
- }
- else
- {
- err = job_bmi_send_list(
- s_op->addr,
- s_op->encoded.buffer_list,
- s_op->encoded.size_list,
- s_op->encoded.list_count,
- s_op->encoded.total_size,
- s_op->tag,
- s_op->encoded.buffer_type,
- 0,
- s_op,
- 0,
- js_p,
- &tmp_id,
- server_job_context);
- }
-
- return(err);
+ /* we only send this trailing ack if we are working on a write
+ * operation; otherwise just cut out early
+ */
+ if (s_op->req->u.io.io_type == PVFS_IO_READ)
+ {
+ js_p->error_code = 0;
+ return(1);
+ }
+
+ /* release encoding of the first ack that we sent */
+ PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
+ /* zero size for safety */
+ s_op->encoded.total_size = 0;
+
+ /*
+ fill in response -- status field is the only generic one we
+ should have to set
+ */
+ s_op->resp.op = PVFS_SERV_WRITE_COMPLETION; /* not IO */
+ s_op->resp.status = js_p->error_code;
+ s_op->resp.u.write_completion.total_completed =
+ s_op->u.io.flow_d->total_transfered;
+
+ err = PINT_encode(
+ &s_op->resp,
+ PINT_ENCODE_RESP,
+ &(s_op->encoded),
+ s_op->addr,
+ s_op->decoded.enc_type);
+
+ if (err < 0)
+ {
+ gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
+ js_p->error_code = err;
+ return(1);
+ }
+ else
+ {
+ err = job_bmi_send_list(
+ s_op->addr,
+ s_op->encoded.buffer_list,
+ s_op->encoded.size_list,
+ s_op->encoded.list_count,
+ s_op->encoded.total_size,
+ s_op->tag,
+ s_op->encoded.buffer_type,
+ 0,
+ s_op,
+ 0,
+ js_p,
+ &tmp_id,
+ server_job_context);
+ }
+ return(err);
}
/*
@@ -408,4 +401,3 @@ static int io_send_completion_ack(PINT_s
*
* vim: ft=c ts=8 sts=4 sw=4 noexpandtab
*/
-
More information about the PVFS2-CVS
mailing list