[Pvfs2-cvs] commit by sson in pvfs2/src/server: write.sm

CVS commit program cvs at parl.clemson.edu
Tue Apr 14 16:20:43 EDT 2009


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv13617/src/server

Added Files:
      Tag: as-branch
	write.sm 
Log Message:
This is a sm for write operations.


--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ write.sm	2009-04-14 16:20:43.000000000 -0400
@@ -0,0 +1,301 @@
+ /* 
+ * (C) 2001 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+/*
+ *  PVFS2 server state machine for driving write I/O operations.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-request.h"
+#include "pint-distribution.h"
+#include "pint-request.h"
+#include "pvfs2-internal.h"
+#include "rw-sm.h"
+#include "trove.h"
+#include "pint-segpool.h"
+
+%%
+
+nested machine pvfs2_write_sm
+{
+    state bmi_recv
+    {
+        run bmi_recv_call;
+        default => trove_write;
+    }
+
+    state trove_write
+    {
+        run trove_write_call;
+        default => check;
+    }
+
+    state check
+    {
+        run check_done;
+	success => finished;
+        LOOP => bmi_recv;
+    }
+
+    state finished
+    {
+        run epilog;
+        default => return;
+    }
+}
+
+%%
+
+/*
+ * PINT_process_request() -> job_bmi_recv()
+ */
+static PINT_sm_action bmi_recv_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
+    PINT_segpool_unit_id id = s_op->u.flow_write.id;
+    PVFS_size bytes_processed = 0;
+    void *tmp_buffer;
+    int ret;
+    job_id_t tmp_id;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+    int count;
+    PVFS_offset *offsets;
+    PVFS_size *sizes;
+    PVFS_size bytes;
+
+    q_item->buffer_used = 0;
+#if 0
+    /* FIXME: not sure whether we need to check this 
+     * for now just commented out */
+    if(js_p->error_code != 0 || flow_data->parent->error_code != 0)
+    {
+        gossip_err("%s: I/O error occurred\n", __func__);
+        //handle_io_error(error_code, q_item, flow_data);
+	js_p->error_code = error_code;
+        return SM_ACTION_COMPLETE;
+    }
+#endif
+    
+#if 0
+    /* if there are no more receives to post, just return */
+    if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+    {
+	js_p->error_code = 0;
+        return SM_ACTION_COMPLETE;
+    }
+#endif
+    if(!q_item->buffer){
+        /* if the q_item has not been used, allocate a buffer */
+        q_item->buffer = BMI_memalloc(
+            q_item->parent->src.u.bmi.address,
+            q_item->parent->buffer_size, BMI_RECV);
+        /* TODO: error handling */
+        assert(q_item->buffer);
+    }
+
+    bytes = q_item->parent->buffer_size;
+    PINT_segpool_take_segments(seg_handle, id, &bytes, &count, &offsets, &sizes);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: 3: count=%d, bytes=%d\n", __func__, count, bytes);
+    if(count == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+    q_item->buffer_used = bytes;
+    s_op->u.flow_write.offsets = offsets;
+    s_op->u.flow_write.sizes = sizes;
+    s_op->u.flow_write.segs = count;
+
+    if(count == 0) {
+	if(flow_data->parent->total_transferred ==
+	   flow_data->total_bytes_processed &&
+	   PINT_REQUEST_DONE(flow_data->parent->file_req_state)) {
+	    gossip_debug(GOSSIP_IO_DEBUG, "%s: FLOW_COMPLETE?\n", __func__);
+	    /* never reach this point */
+	    assert(q_item->parent->state != FLOW_COMPLETE);
+	    q_item->parent->state = FLOW_COMPLETE; /* bmi_recv_call */
+	}
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+#if 0
+    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+		 "offset %llu, buffer ptr: %p\n",
+		 llu(q_item->result_chain.result.offset_array[0]),
+		 q_item->buffer);
+#endif
+    
+    /* TODO: what if we recv less than expected? */
+    ret = job_bmi_recv(q_item->parent->src.u.bmi.address,
+		       (char *)q_item->buffer,
+		       q_item->parent->buffer_size,
+		       q_item->parent->tag,
+		       BMI_PRE_ALLOC, 
+		       smcb, 
+		       0, /* unsigned long status_user_tag = 0 */
+		       js_p,
+		       &tmp_id,
+		       server_job_context,
+		       user_opts->server_job_flow_timeout,
+		       (bmi_hint)q_item->parent->hints);
+    
+    if(ret < 0) {
+	gossip_err("%s: I/O error occurred\n", __func__);
+	/* FIXME: need to handle I/O error */
+	//handle_io_error(ret, q_item, flow_data);
+	js_p->error_code = -PVFS_EIO;
+	return SM_ACTION_COMPLETE;
+    }
+
+    if(ret == 1) {
+	/* immediate completion; trigger callback ourselves */
+	js_p->error_code = 0;
+	return SM_ACTION_COMPLETE;
+    }
+    if(ret == 0) {
+	js_p->error_code = 0;
+	return SM_ACTION_DEFERRED;
+    }
+
+    js_p->error_code = 0;
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action trove_write_call(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    PVFS_size tmp_actual_size;
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    PVFS_size bytes_processed = 0;
+    void *tmp_buffer;
+    void *tmp_user_ptr;
+    int ret;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+    struct filesystem_configuration_s *fs_config;
+    struct server_configuration_s *server_config;
+    PINT_segpool_handle_t seg_handle = s_op->u.flow_write.seg_handle;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer_used=%d\n", __func__, 
+		 q_item->buffer_used);
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: q_item->buffer[0]=%s\n", __func__,
+		 (char *)q_item->buffer);
+
+#if 0
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: result_tmp->buffer_offset=%x\n", 
+			__func__, result_tmp->buffer_offset);
+
+        if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+        {
+            /* This is the last write operation for this flow.  Set sync
+             * flag if needed
+             */ 
+	    fs_config = PINT_config_get_filesystems(&server_config);
+        }
+#endif
+	fs_config = PINT_config_get_filesystems(&server_config);
+
+        ret = job_trove_bstream_write_list(
+            q_item->parent->dest.u.trove.coll_id,
+            q_item->parent->dest.u.trove.handle,
+            (char**)&q_item->buffer,
+            (TROVE_size *)&q_item->buffer_used,
+            1,
+	    s_op->u.flow_write.offsets,
+	    s_op->u.flow_write.sizes,
+	    s_op->u.flow_write.segs,
+            &q_item->out_size,
+	    fs_config->trove_sync_data,
+            NULL,
+            smcb,
+	    0,
+	    js_p,
+            &result_tmp->posted_id,
+	    server_job_context,
+            q_item->parent->hints);
+
+        if(ret < 0)
+        {
+            gossip_err("%s: I/O error occurred\n", __func__);
+            //handle_io_error(ret, q_item, flow_data);
+	    /* FIXME ***************/
+	    js_p->error_code = -PVFS_ENOMEM;
+            return SM_ACTION_COMPLETE;
+        }
+
+        if(ret == 1)
+        {
+            /* immediate completion; trigger callback ourselves */
+	    js_p->error_code = 0;
+	    return SM_ACTION_COMPLETE;
+        }
+
+	/* FIXME: not sure if this is required */
+	if(ret == 0) {
+  
+	    return SM_ACTION_DEFERRED;
+	}
+
+    return SM_ACTION_COMPLETE;
+}
+
+static int check_done(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct fp_queue_item *q_item = s_op->u.flow_write.q_item;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(q_item->parent);
+    struct result_chain_entry *result_tmp = &q_item->result_chain;
+    struct result_chain_entry *old_result_tmp;
+    PINT_segpool_handle_t h = s_op->u.flow_write.seg_handle;
+    js_p->error_code = 0;
+
+    flow_data->total_bytes_processed += q_item->buffer_used;
+    q_item->parent->total_transferred += q_item->buffer_used;
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_bytes_processed=%d\n", __func__,
+		 flow_data->total_bytes_processed);
+
+    //if(!PINT_REQUEST_DONE(q_item->parent->file_req_state)) {
+    if(!segpool_done(h)) {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: LOOP\n", __func__);
+	js_p->error_code = LOOP;
+    }
+    else {
+	gossip_debug(GOSSIP_IO_DEBUG, "%s: write: DONE\n", __func__);
+	flow_data->parent->state = FLOW_COMPLETE;
+	FLOW_CLEANUP(flow_data);
+    }
+    
+    return SM_ACTION_COMPLETE;
+}
+
+static int epilog(struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    js_p->error_code = 0;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "%s: write: \n", __func__);
+    return SM_ACTION_COMPLETE;
+}
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */



More information about the Pvfs2-cvs mailing list