[Pvfs2-cvs] commit by sson in pvfs2/src/server: write.sm
CVS commit program
cvs at parl.clemson.edu
Tue Apr 14 16:20:43 EDT 2009
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv13617/src/server
Added Files:
Tag: as-branch
write.sm
Log Message:
This is a sm for write operations.
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ write.sm 2009-04-14 16:20:43.000000000 -0400
@@ -0,0 +1,301 @@
+ /*
+ * (C) 2001 Clemson University and The University of Chicago
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ * PVFS2 server state machine for driving write I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "trove.h"
+#include "pint-segpool.h"
+
+%%
+
+nested machine pvfs2_write_sm
+{
+ state bmi_recv
+ {
+ run bmi_recv_call;
+ default => trove_write;
+ }
+
+ state trove_write
+ {
+ run trove_write_call;
+ default => check;
+ }
+
+ state check
+ {
+ run check_done;
+ success => finished;
+ LOOP => bmi_recv;
+ }
+
+ state finished
+ {
+ run epilog;
+ default => return;
+ }
+}
+
+%%
+
+/*
+ * PINT_process_request() -> job_bmi_recv()
+ */
+static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
+ PINT_segpool_unit_id id = s_op->u.flow_write.id;
+ PVFS_size bytes_processed = 0;
+ void *tmp_buffer;
+ int ret;
+ job_id_t tmp_id;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ int count;
+ PVFS_offset *offsets;
+ PVFS_size *sizes;
+ PVFS_size bytes;
+
+ q_item->buffer_used = 0;
+#if 0
+ /* FIXME: not sure whether we need to check this
+ * for now just commented out */
+ if(js_p->error_code != 0 || flow_data->parent->error_code != 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ //handle_io_error(error_code, q_item, flow_data);
+ js_p->error_code = error_code;
+ return SM_ACTION_COMPLETE;
+ }
+#endif
+
+#if 0
+ /* if there are no more receives to post, just return */
+ if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+ {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+#endif
+ if(!q_item->buffer){
+ /* if the q_item has not been used, allocate a buffer */
+ q_item->buffer = BMI_memalloc(
+ q_item->parent->src.u.bmi.address,
+ q_item->parent->buffer_size, BMI_RECV);
+ /* TODO: error handling */
+ assert(q_item->buffer);
+ }
+
+ bytes = q_item->parent->buffer_size;
+ PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%d\n", __func__, count, bytes);
+ if(count == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+ q_item->buffer_used = bytes;
+ s_op->u.flow_write.offsets = offsets;
+ s_op->u.flow_write.sizes = sizes;
+ s_op->u.flow_write.segs = count;
+
+ if(count == 0) {
+ if(flow_data->parent->total_transferred ==
+ flow_data->total_bytes_processed &&
+ PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
+ /* never reach this point */
+ assert(q_item->parent->state != FLOW_COMPLETE);
+ q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
+ }
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+#if 0
+ gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+ "offset %llu, buffer ptr: %p\n",
+ llu(q_item->result_chain.result.offset_array[0]),
+ q_item->buffer);
+#endif
+
+ /* TODO: what if we recv less than expected? */
+ ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
+ (char *)q_item->buffer,
+ q_item->parent->buffer_size,
+ q_item->parent->tag,
+ BMI_PRE_ALLOC,
+ smcb,
+ 0, /* unsigned long status_user_tag = 0 */
+ js_p,
+ &tmp_id,
+ server_job_context,
+ user_opts->server_job_flow_timeout,
+ (bmi_hint)q_item->parent->hints);
+
+ if(ret < 0) {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ /* FIXME: need to handle I/O error */
+ //handle_io_error(ret, q_item, flow_data);
+ js_p->error_code = -PVFS_EIO;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 1) {
+ /* immediate completion; trigger callback ourselves */
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+ if(ret == 0) {
+ js_p->error_code = 0;
+ return SM_ACTION_DEFERRED;
+ }
+
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action trove_write_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ PVFS_size tmp_actual_size;
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ PVFS_size bytes_processed = 0;
+ void *tmp_buffer;
+ void *tmp_user_ptr;
+ int ret;
+ struct server_configuration_s *user_opts = get_server_config_struct();
+ struct filesystem_configuration_s *fs_config;
+ struct server_configuration_s *server_config;
+ PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__,
+ q_item->buffer_used);
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
+ (char *)q_item->buffer);
+
+#if 0
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->buffer_offset=%x\n",
+ __func__, result_tmp->buffer_offset);
+
+ if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ {
+ /* This is the last write operation for this flow. Set sync
+ * flag if needed
+ */
+ fs_config = PINT_config_get_filesystems(&server_config);
+ }
+#endif
+ fs_config = PINT_config_get_filesystems(&server_config);
+
+ ret = job_trove_bstream_write_list(
+ q_item->parent->dest.u.trove.coll_id,
+ q_item->parent->dest.u.trove.handle,
+ (char**)&q_item->buffer,
+ (TROVE_size *)&q_item->buffer_used,
+ 1,
+ s_op->u.flow_write.offsets,
+ s_op->u.flow_write.sizes,
+ s_op->u.flow_write.segs,
+ &q_item->out_size,
+ fs_config->trove_sync_data,
+ NULL,
+ smcb,
+ 0,
+ js_p,
+ &result_tmp->posted_id,
+ server_job_context,
+ q_item->parent->hints);
+
+ if(ret < 0)
+ {
+ gossip_err("%s: I/O error occurred\n", __func__);
+ //handle_io_error(ret, q_item, flow_data);
+ /* FIXME ***************/
+ js_p->error_code = -PVFS_ENOMEM;
+ return SM_ACTION_COMPLETE;
+ }
+
+ if(ret == 1)
+ {
+ /* immediate completion; trigger callback ourselves */
+ js_p->error_code = 0;
+ return SM_ACTION_COMPLETE;
+ }
+
+ /* FIXME: not sure if this is required */
+ if(ret == 0) {
+
+ return SM_ACTION_DEFERRED;
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+ struct result_chain_entry *result_tmp = &q_item->result_chain;
+ struct result_chain_entry *old_result_tmp;
+ PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+ js_p->error_code = 0;
+
+ flow_data->total_bytes_processed += q_item->buffer_used;
+ q_item->parent->total_transferred += q_item->buffer_used;
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
+ flow_data->total_bytes_processed);
+
+ //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
+ if(!segpool_done(h)) {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
+ js_p->error_code = LOOP;
+ }
+ else {
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
+ flow_data->parent->state = FLOW_COMPLETE;
+ FLOW_CLEANUP(flow_data);
+ }
+
+ return SM_ACTION_COMPLETE;
+}
+
+static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ js_p->error_code = 0;
+
+ gossip_debug(GOSSIP_IO_DEBUG, "%s: write: \n", __func__);
+ return SM_ACTION_COMPLETE;
+}
+
+/*
+ * Local variables:
+ * mode: c
+ * c-indent-level: 4
+ * c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */
More information about the Pvfs2-cvs
mailing list