[Pvfs2-cvs] commit by elaine in pvfs2/src/server: create-file.sm create.sm

CVS commit program cvs at parl.clemson.edu
Fri May 30 15:27:08 EDT 2008


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv17202/src/server

Modified Files:
      Tag: cu-sandbox-branch
	create-file.sm create.sm 
Log Message:



Index: create-file.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/Attic/create-file.sm,v
diff -p -u -r1.1.2.11 -r1.1.2.12
--- create-file.sm	27 May 2008 18:00:53 -0000	1.1.2.11
+++ create-file.sm	30 May 2008 19:27:08 -0000	1.1.2.12
@@ -36,8 +36,8 @@ extern job_context_id pint_client_sm_con
 
 enum
 {
-    LOCAL_ATTRS = 2,
-    REMOTE_ATTRS = 3
+    LOCAL_OPERATION = 2,
+    REMOTE_OPERATION = 3
 };
 
 enum
@@ -45,16 +45,6 @@ enum
     CREATE_RETRY = 170
 };
 
-/* completion function prototypes */
-#if 0
-static int create_datafiles_comp_fn(
-    void *v_p, struct PVFS_server_resp *resp_p, int index);
-static int create_crdirent_comp_fn(
-    void *v_p, struct PVFS_server_resp *resp_p, int index);
-#endif
-static int create_delete_handles_comp_fn(
-    void *v_p, struct PVFS_server_resp *resp_p, int index);
-
 %%
 
 /* this machine launches the main machine below */
@@ -106,72 +96,26 @@ machine pvfs2_create_file_work_sm
 {
     state init
     {
-        run create_init;
-        default => locate_attribs;
-    }
-
-    state locate_attribs
-    {
-        run create_find_attrs;
-        LOCAL_ATTRS => parent_getattr_local_prelude;
-        REMOTE_ATTRS => parent_getattr_remote;
-        default => parent_getattr_remote;
+        run create_file_init;
+        default => parent_getattr;
     }
 
-    state parent_getattr_local_prelude
-    {
-        jump pvfs2_server_prelude_sm;
-        success => parent_getattr_local;
-        default => final_response;
-    }
-        
-    /* Retrieve the parent's attribs locally */
-    state parent_getattr_local
+    state parent_getattr
     {
         jump pvfs2_get_attr_work_sm;
-        success => parent_getattr_local_cleanup;
-/* TODO: error handling */
-        default => parent_getattr_remote;
-    }
-
-    /* Clean up the stack from local getattr */
-    state parent_getattr_local_cleanup
-    {
-        run create_parent_getattr_local_cleanup;
-        default => parent_getattr_inspect;
-    }
-
-    /* send request to Meta server with parent's attribs */
-    state parent_getattr_remote
-    {
-        /* this is a client SM for doing a getattr from a server */
-        jump pvfs2_server_getattr_sm;
-        success => parent_getattr_inspect;
+        success => parent_getattr_cleanup_and_inspect;
         default => cleanup_work;
     }
 
-    state parent_getattr_inspect
+    state parent_getattr_cleanup_and_inspect
     {
-        run create_parent_getattr_inspect;
-        success => dspace_create_setup;
+        run create_file_parent_getattr_cleanup_and_inspect;
+        LOCAL_OPERATION => do_dspace_create;
+        REMOTE_OPERATION => dspace_xfer_msgpair_array;
         default => cleanup_work;
     }
 
-    state dspace_create_setup
-    {
-        run create_dspace_setup;
-        success => dspace_create_prelude;
-        default => cleanup_work;
-    }
-
-    state dspace_create_prelude
-    {
-        jump pvfs2_server_prelude_sm;
-        success => dspace_create;
-        default => cleanup_work;
-    }
-
-    state dspace_create
+    state do_dspace_create
     {
         jump pvfs2_create_work_sm;
         success => dspace_create_cleanup;
@@ -180,140 +124,96 @@ machine pvfs2_create_file_work_sm
 
     state dspace_create_cleanup
     {
-        run create_dspace_cleanup;
-        success => datafiles_create_local_setup;
-        default => cleanup_work;
-    }
-
-
-    /* set up to create Metadata for new file */
-/*
-    state dspace_create
-    {
-        run create_dspace_create;
-//        success => datafiles_setup_msgpair_array;
-        success => datafiles_create_local_setup;
-        default => cleanup_work;
-    }
-*/
-
-    state datafiles_create_local_setup
-    {
-        run create_datafiles_setup;
-        success => datafiles_create_prelude;
-        default => cleanup_work;
-    }
-
-    state datafiles_create_prelude
-    {
-        jump pvfs2_server_prelude_sm;
-        success => datafiles_create_local;
+        run create_file_dspace_cleanup;
+        success => datafiles_setup;
         default => cleanup_work;
     }
 
-    state datafiles_create_local
+    state dspace_xfer_msgpair_array
     {
-        jump pvfs2_create_work_sm;
-        success => datafiles_create_cleanup;
+        jump pvfs2_msgpairarray_sm;
+        success => datafiles_setup;
         default => cleanup_work;
     }
 
-    state datafiles_create_cleanup
+    state datafiles_setup
     {
-        run create_datafiles_cleanup;
-        success => setattr_setup;
+        run create_file_datafiles_setup;
+        REMOTE_OPERATION => datafiles_remote_setup;
+        LOCAL_OPERATION => datafiles_local_setup;
         default => cleanup_work;
     }
 
-    /* set up to create N datafiles on various servers */
-    /* some of these might be local and should be handled locally */
-    /* this is where the tree-based stuff will go */
-/*
-    state datafiles_setup_msgpair_array
+    state datafiles_remote_setup
     {
-        run create_datafiles_setup_msgpair_array;
-        success => datafiles_xfer_msgpair_array;
-        default => cleanup_work;
+        run create_file_datafiles_remote_setup;
+        REMOTE_OPERATION => datafiles_xfer_msgpair_array;
+        default => datafiles_local_setup;
     }
-*/
 
-    /* execute messages */
-/*
     state datafiles_xfer_msgpair_array
     {
         jump pvfs2_msgpairarray_sm;
-        success => setattr_setup;
-        default => datafiles_failure;
+        success => datafiles_local_setup;
+        default => cleanup_work;
     }
 
-    state datafiles_failure
+    state datafiles_local_setup
     {
-        run create_datafiles_failure;
-        default => delete_handles_setup_msgpair_array;
+        run create_file_datafiles_local_setup;
+        default => do_datafiles_local;
     }
-*/
 
-    /* write datafile handles to metadata. This is local.*/
-    state setattr_setup
+    /* TODO: Needs to handle multiple local creates. */
+    state do_datafiles_local
     {
-        run create_setattr_setup;
-        success => setattr_prelude;
+        jump pvfs2_create_work_sm;
+        success => datafiles_local_cleanup_and_setattr_setup;
         default => cleanup_work;
     }
 
-    state setattr_prelude
+    state datafiles_local_cleanup_and_setattr_setup
     {
-        jump pvfs2_server_prelude_sm;
-        success => do_setattr;
+        run create_file_datafiles_local_cleanup_and_setattr_setup;
+        default => do_setattr;
+/*
+        LOCAL_OPERATION => do_setattr;
+        REMOTE_OPERATION => setattr_xfer_msgpair_array;
         default => cleanup_work;
+*/
     }
 
     state do_setattr
     {
         jump pvfs2_set_attr_work_sm;
-        success => setattr_cleanup;
-        default => cleanup_work;
+        default => setattr_cleanup;
     }
 
     state setattr_cleanup
     {
-        run create_setattr_cleanup;
+        run create_file_setattr_cleanup;
         success => crdirent_setup;
         default => cleanup_work;
     }
 
-    /* set up to create dir entry - could be local */
-/*
-    state crdirent_setup_msgpair
-    {
-        run create_crdirent_setup_msgpair;
-        success => crdirent_xfer_msgpair;
-        default => crdirent_failure;
-    }
-*/
-
-    /* execute messages */
-/*
-    state crdirent_xfer_msgpair
+    state setattr_xfer_msgpair_array
     {
         jump pvfs2_msgpairarray_sm;
-        success => cleanup_work;
-        default => crdirent_failure;
+        success => crdirent_setup;
+        default => cleanup_work;
     }
-*/
 
     state crdirent_setup
     {
-        run create_crdirent_setup;
-        success => crdirent_prelude;
+        run create_file_crdirent_setup;
+        success => build_obj_attr;
         default => cleanup_work;
     }
 
-    state crdirent_prelude
+    state build_obj_attr
     {
-        jump pvfs2_server_prelude_sm;
-        success => do_crdirent;
-        default => cleanup_work;
+        run build_obj_attr;
+        default => do_crdirent;
     }
 
     state do_crdirent
@@ -326,28 +226,6 @@ machine pvfs2_create_file_work_sm
     state crdirent_cleanup
     {
         run create_crdirent_cleanup;
-        success => cleanup_work;
-        default => crdirent_failure;
-    }
-
-    state crdirent_failure
-    {
-        run create_crdirent_failure;
-        default => delete_handles_setup_msgpair_array;
-    }
-
-    /* set up to send message to delete handles in event of failure */
-    state delete_handles_setup_msgpair_array
-    {
-        run create_delete_handles_setup_msgpair_array;
-        success => delete_handles_xfer_msgpair_array;
-        default => cleanup_work;
-    }
-
-    /* execute messages */
-    state delete_handles_xfer_msgpair_array
-    {
-        jump pvfs2_msgpairarray_sm;
         default => cleanup_work;
     }
 
@@ -436,11 +314,8 @@ static PINT_sm_action create_file_resp(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op;
-    int task_id;
-    int error_code;
 
-    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
-    s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    s_op = PINT_sm_frame(smcb, PINT_FRAME_PREVIOUS);
     if (js_p->error_code == 0)
     {
         gossip_debug(GOSSIP_SERVER_DEBUG, "Handle created: %llu\n",
@@ -484,7 +359,11 @@ static PINT_sm_action create_file_cleanu
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int task_id;
+    int error_code;
+
     assert(sm_p);
+    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
     free(sm_p);
     return (SM_ACTION_COMPLETE);
 }
@@ -492,13 +371,14 @@ static PINT_sm_action create_file_cleanu
 
 /****************************************************************/
 
-static PINT_sm_action create_init(
+static PINT_sm_action create_file_init(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     job_id_t tmp_id;
+    struct PINT_server_op *s_op;
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: init\n");
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "create file state: init\n");
 
     assert((js_p->error_code == 0) ||
            (js_p->error_code == CREATE_RETRY));
@@ -512,192 +392,216 @@ static PINT_sm_action create_init(
             server_job_context);
     }
 
-    PINT_SM_GETATTR_STATE_FILL(
-        sm_p->getattr,
-        sm_p->object_ref,
-        PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT, 
-        PVFS_TYPE_DIRECTORY,
-        0);
-
-   return SM_ACTION_COMPLETE;
-}
-
-#if 0
-static int create_datafiles_comp_fn(void *v_p,
-                                    struct PVFS_server_resp *resp_p,
-                                    int index)
-{
-    PINT_smcb *smcb = v_p;
-    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create_datafiles_comp_fn[%d]\n",index);
-
-    assert(resp_p->op == PVFS_SERV_CREATE);
+    /* Prepare to retrieve the parent's attributes. */
+    s_op = malloc(sizeof(struct PINT_server_op));
 
-    if (resp_p->status != 0)
+    if(!s_op)
     {
-        gossip_err("%s: Failed to create data handle at server: %s "
-                   "for file: %s\n", 
-                   __func__, 
-                   BMI_addr_rev_lookup(
-                       sm_p->u.create_file.data_server_addrs[index]),
-                       sm_p->u.create_file.object_name);
-        PVFS_perror_gossip("Creation failure", resp_p->status);
-	return resp_p->status;
+        return -PVFS_ENOMEM;
     }
+    /* zero out all members */
+    memset(s_op, 0, sizeof(struct PINT_server_op));
 
-    /* allocate memory for the data handles if we haven't already */
-    if (sm_p->u.create_file.datafile_handles == NULL)
-    {
-        sm_p->u.create_file.datafile_handles = (PVFS_handle *)malloc(
-            sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
-
-        if (sm_p->u.create_file.datafile_handles == NULL)
-        {
-            gossip_err("create: Failed to allocate data handle array\n");
-            return -PVFS_ENOMEM;
-        }
-        memset(sm_p->u.create_file.datafile_handles, 0,
-               sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
-    }
+    s_op->op = PVFS_SERV_GETATTR;
+    s_op->u.getattr.handle = sm_p->parent_ref.handle;
+    s_op->u.getattr.fs_id = sm_p->parent_ref.fs_id;
+    s_op->u.getattr.attrmask = PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT;
+
+/*  TODO: Rather than faking s_op->attr.mask and s_op->attr.objtype, 
+          these need to be saved in sm_p. The prelude for the original
+          create-file call retrieved them */
+    s_op->attr.mask = PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT;
+    s_op->attr.objtype = PVFS_TYPE_DIRECTORY;
 
-    /* otherwise, just stash the newly created data file handle */
-    sm_p->u.create_file.datafile_handles[index] = resp_p->u.create_file.handle;
+    PINT_sm_push_frame(smcb, 0, s_op);
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "Datafile handle %d is %llu\n",
-                 index, llu(sm_p->u.create_file.datafile_handles[index]));
-    return 0;
+    return SM_ACTION_COMPLETE;
 }
-#endif
 
-#if 0
-static int create_crdirent_comp_fn(void *v_p,
-                                   struct PVFS_server_resp *resp_p,
-                                   int index)
+static PINT_sm_action create_file_parent_getattr_cleanup_and_inspect(
+        struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create_crdirent_comp_fn\n");
+    int task_id;
+    int error_code;
+    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_PREVIOUS);
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PVFS_object_attr *attr = NULL;
+    int num_dfiles_requested_override = 0;
+    PINT_dist *current_dist; 
+    int ret = 0;
 
-    assert(resp_p->op == PVFS_SERV_CRDIRENT);
-    return resp_p->status;
-}
-#endif
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+        "create file state: parent_getattr_cleanup_and_inspect\n");
 
-static int create_delete_handles_comp_fn(void *v_p,
-                                         struct PVFS_server_resp *resp_p,
-                                         int index)
-{
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create_delete_handles_comp_fn\n");
+    assert(sm_p);
+    assert(s_op);
 
-    assert(resp_p->op == PVFS_SERV_REMOVE);
+    /* Copy the results before popping the stack. */
+    attr = &sm_p->getattr.attr;
+    assert(attr);
+    memcpy(attr, &s_op->attr, sizeof(attr));
 
-    if (resp_p->status != 0)
-    {
-        gossip_debug(GOSSIP_CLIENT_DEBUG,
-                     "Failed to remove handle number %d\n", index);
-    }
-    return resp_p->status;
-}
+    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
+//    free(s_op);
 
-/* check to be sure everything referenced off off sm_p is either set */
-/* in the main machine, or generated in one of these state actions */
+    js_p->error_code = error_code;
 
-#if 0
-/* this is the metafile create - should be all local */
-static PINT_sm_action create_dspace_create(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -PVFS_EINVAL;
-    job_id_t i;
+    if (error_code == 0) {
+        /** Look at the attributes of the parent directory and
+         *  decide if it impacts the file creation in any way.
+         */
+        gossip_debug(GOSSIP_CLIENT_DEBUG, "parent owner: %d, group: %d, perms: %d\n",
+            (int)attr->owner, (int)attr->group, (int)attr->perms);
+
+        /* do we have a setgid bit? */
+        if(attr->perms & PVFS_G_SGID)
+        {
+            gossip_debug(GOSSIP_CLIENT_DEBUG, "parent has setgid bit set.\n");
+            gossip_debug(GOSSIP_CLIENT_DEBUG, " - modifying requested attr "
+                                              "for new file.\n");
+            sm_p->u.create_file.attr.group = attr->group;
+            /* note that permission checking is left to server even in this case */
+        }
+        gossip_debug(GOSSIP_CLIENT_DEBUG, "create_parent_getattr: [%p] "
+            "dfile_count     = %d "
+            "dist_name_len   = %d "
+            "dist_params_len = %d\n",
+            attr,
+            attr->u.dir.hint.dfile_count,
+            attr->u.dir.hint.dist_name_len,
+            attr->u.dir.hint.dist_params_len);
+
+        num_dfiles_requested_override = attr->u.dir.hint.dfile_count;
+        /* override the # of data files for this create */
+        if (num_dfiles_requested_override > 0)
+        {
+            /* Determine the number of dfiles */
+            PINT_cached_config_get_num_dfiles(sm_p->object_ref.fs_id,
+                    sm_p->u.create_file.dist,
+                    num_dfiles_requested_override,
+                    &sm_p->u.create_file.num_data_files);
+        }
+        gossip_debug(GOSSIP_CLIENT_DEBUG, "Setting number of datafiles to %d [requested %d]\n", 
+            sm_p->u.create_file.num_data_files, num_dfiles_requested_override);
+        current_dist = sm_p->u.create_file.dist;
+        /* We have an overriding distribution name for this directory.. honor that */
+        if (attr->u.dir.hint.dist_name_len > 0)
+        {
+            /* switch it only if it is different! */
+            if (strcmp(attr->u.dir.hint.dist_name, current_dist->dist_name))
+            {
+                PINT_dist *new_dist = NULL;
+                new_dist = PINT_dist_create(attr->u.dir.hint.dist_name);
+                if (new_dist)
+                {
+                    gossip_debug(GOSSIP_CLIENT_DEBUG, "Overridding distribution name to %s instead of %s\n",
+                        attr->u.dir.hint.dist_name,
+                        current_dist->dist_name);
+                    PINT_dist_free(current_dist);
+                    sm_p->u.create_file.dist = new_dist;
+                    current_dist = new_dist;
+                }
+                else
+                {
+                    gossip_debug(GOSSIP_CLIENT_DEBUG, "Could not override distribution name with %s instead of %s\n",
+                        attr->u.dir.hint.dist_name,
+                        current_dist->dist_name);
+                }
+            }
+            else {
+                gossip_debug(GOSSIP_CLIENT_DEBUG, "retaining current distribution name %s\n",
+                    current_dist->dist_name);
+            }
+        }
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: dspace_create\n");
+        /* okay, we might need to override some dist params as well */
+        if (attr->u.dir.hint.dist_params_len > 0)
+        {
+            /* We have a series of comma separated key:val strings */
+            char **key, **val;
+            int64_t tmp_val;
+            int nparams = 0;
+
+            /* ignore parse errors! */
+            if (PINT_split_keyvals(attr->u.dir.hint.dist_params,
+                &nparams, &key, &val) == 0)
+            {
+                int i;
+                for (i = 0; i < nparams; i++)
+                {
+                    gossip_debug(GOSSIP_CLIENT_DEBUG, "distribution parameter %s, value %s\n",
+                        key[i], val[i]);
+                    /* NOTE: just as in server-config.c when parsing "Param" and
+                     * "Value" fields, we will assume that all values are 64 bit
+                     * integers.  The only difference here is that we scan
+                     * directly into a 64 bit integer, rather than converting
+                     * from the int format that dotconf supports.
+                     */
+                    ret = sscanf(val[i], SCANF_lld, &tmp_val);
+                    if(ret != 1)
+                    {
+                        gossip_err("Error: unsupported type for distribution parameter %s, value %s found in directory hints.\n", 
+                            key[i], val[i]);
+                        gossip_err("Error: continuing anyway.\n");
+                    }
+                    else
+                    {
+                        if(current_dist->methods->set_param(current_dist->dist_name,
+                            current_dist->params,
+                            key[i],
+                            &tmp_val))
+                        {
 
-    js_p->error_code = 0;
+                            gossip_err("Error: could not override hinted distribution parameter %s, value %s found in directory hints\n",
+                                key[i],
+                                val[i]);
+                        }
+                     }
+                     free(key[i]);
+                     free(val[i]);
+                }
+                free(key);
+                free(val);
+            }
+        }
 
-    if (sm_p->u.create_file.num_data_files > PVFS_REQ_LIMIT_DFILE_COUNT)
-    {
-        sm_p->u.create_file.num_data_files = PVFS_REQ_LIMIT_DFILE_COUNT;
-        gossip_err("Warning: reducing number of data "
-                     "files to PVFS_REQ_LIMIT_DFILE_COUNT\n");
-    }
+        /* TODO: Determine whether the dspace should be created locally
+         * or remotely and setup up for the appropriate place. For now,
+         * just assume local. */
+       
+        /* Set up the frame for the nested create state machine */
+        js_p->error_code = LOCAL_OPERATION;
+        s_op = malloc(sizeof(struct PINT_server_op));
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "need to create %d datafiles\n",
-                 sm_p->u.create_file.num_data_files);
+        if(!s_op)
+        {
+            return -PVFS_ENOMEM;
+        }
+        /* zero out all members */
+        memset(s_op, 0, sizeof(struct PINT_server_op));
+        s_op->op = PVFS_SERV_CREATE;
+        s_op->u.create.fs_id = sm_p->object_ref.fs_id;
+        s_op->u.create.handle_extent_array = sm_p->u.create_file.meta_handle_extent_array;
+        s_op->u.create.object_type = PVFS_TYPE_METAFILE;
 
-    ret = job_trove_dspace_create(
-        sm_p->parent_ref.fs_id,
-        &sm_p->u.create_file.meta_handle_extent_array,
-        PVFS_TYPE_METAFILE,
-/*
-        sm_p->u.create_file.attr.objtype,
-*/
-        NULL,
-        TROVE_SYNC ,
-        smcb,
-        0,
-        js_p,
-        &i,
-        server_job_context);
 
-    if (js_p->error_code != 0)
-    {
-	return js_p->error_code;
+        PINT_sm_push_frame(smcb, 0, s_op);
+ 
     }
-
-    /* otherwise, just stash the newly created meta handle */
-    sm_p->u.create_file.metafile_handle = js_p->handle;
-
-    gossip_debug(
-        GOSSIP_CLIENT_DEBUG, "*** Got newly created handle %llu\n",
-        llu(sm_p->u.create_file.metafile_handle));
-
-    return SM_ACTION_COMPLETE;
-}
-#endif
-
-static PINT_sm_action create_dspace_setup(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct PINT_server_op *s_op;
     
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: dspace_create_setup\n");
-        
-    js_p->error_code = 0;
-
-    /* Set up the frame for the nested create state machine */
-    s_op = malloc(sizeof(struct PINT_server_op));
-        
-    if(!s_op)
-    {
-        return -PVFS_ENOMEM;
-    }
-    /* zero out all members */
-    memset(s_op, 0, sizeof(struct PINT_server_op));
-    s_op->op = PVFS_SERV_CREATE;
-    s_op->u.create.fs_id = sm_p->object_ref.fs_id;
-/*    s_op->u.create.handle = sm_p->u.create_file.meta_handle_extent_array.extent_array[0].first; */
-    s_op->u.create.handle_extent_array = sm_p->u.create_file.meta_handle_extent_array;
-    s_op->u.create.object_type = PVFS_TYPE_METAFILE;
-
-    s_op->target_fs_id = sm_p->object_ref.fs_id;
-/*    s_op->target_handle = sm_p->u.create_file.meta_handle_extent_array.extent_array[0].first; */
-    s_op->credentials.uid = sm_p->cred_p->uid;
-    s_op->credentials.gid = sm_p->cred_p->gid;
-        
-    PINT_sm_push_frame(smcb, 0, s_op);
-    return SM_ACTION_COMPLETE;
+    return(SM_ACTION_COMPLETE);
 }
 
-static PINT_sm_action create_dspace_cleanup(
+static PINT_sm_action create_file_dspace_cleanup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, 1);
+    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_PREVIOUS);
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     int task_id;
     int error_code;
 
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "create file state: dspace_cleanup\n");
+
     /* stash the newly created meta file handle */
     sm_p->u.create_file.metafile_handle = s_op->resp.u.create.handle;
 
@@ -705,22 +609,20 @@ static PINT_sm_action create_dspace_clea
                  llu(sm_p->u.create_file.metafile_handle));
 
     PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
+//    free(s_op);
     js_p->error_code = error_code;
  
     return SM_ACTION_COMPLETE;
 }
 
-
-static PINT_sm_action create_datafiles_setup(
+static PINT_sm_action create_file_datafiles_setup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct server_configuration_s *server_config = NULL;
-    PVFS_handle_extent_array *io_handle_extent_array;
     int ret = -PVFS_EINVAL;
-    struct PINT_server_op *s_op;
     
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: datafiles_create_setup\n");
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "create file state: datafiles_setup\n");
         
     js_p->error_code = 0;
 
@@ -736,10 +638,10 @@ static PINT_sm_action create_datafiles_s
     
     /* allocate handle extent array objects */
 /* TODO: need to free this */
-    io_handle_extent_array = (PVFS_handle_extent_array *)
+    sm_p->u.create_file.io_handle_extent_array = (PVFS_handle_extent_array *)
         malloc(sm_p->u.create_file.num_data_files *
                sizeof(PVFS_handle_extent_array));
-    if (!io_handle_extent_array)
+    if (!sm_p->u.create_file.io_handle_extent_array)
     {
         gossip_err("create: failed to allocate handle_extent_array\n"); 
         js_p->error_code = -PVFS_ENOMEM;
@@ -767,7 +669,7 @@ static PINT_sm_action create_datafiles_s
         &sm_p->u.create_file.num_data_files,
         &sm_p->u.create_file.layout,
         sm_p->u.create_file.data_server_addrs,
-        io_handle_extent_array);
+        sm_p->u.create_file.io_handle_extent_array);
     if(ret < 0)
     {
         gossip_err("create: failed to map the layout to a set of IO servers\n");
@@ -782,9 +684,48 @@ static PINT_sm_action create_datafiles_s
         return SM_ACTION_COMPLETE;
     }
 
-/* TODO: make it work for more than 1 datafile */
+    /* TODO: Set up for remote creates. For now skip them. */
+    js_p->error_code = LOCAL_OPERATION;
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action create_file_datafiles_remote_setup(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+
+    /* TODO: Handle setup for any datafiles that are to be created remotely. */
+    js_p->error_code = 0;
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action create_file_datafiles_local_setup(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct PINT_server_op *s_op;
+    job_id_t tmp_id;
+    int ret = -PVFS_EINVAL;
+    
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "create file state: datafiles_local_setup\n");
+
+    /* allocate memory for the data handles if we haven't already */
+    if (sm_p->u.create_file.datafile_handles == NULL)
+    {
+        sm_p->u.create_file.datafile_handles = (PVFS_handle *)malloc(
+            sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
+
+        if (sm_p->u.create_file.datafile_handles == NULL)
+        {
+            gossip_err("create: Failed to allocate data handle array\n");
+            return -PVFS_ENOMEM;
+        }
+        memset(sm_p->u.create_file.datafile_handles, 0,
+               sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
+    }
+    js_p->error_code = 0;
 
-    /* Set up the frame for the nested create state machine */
+    /* Set up the frame for local creates */
     s_op = malloc(sizeof(struct PINT_server_op));
         
     if(!s_op)
@@ -795,7 +736,7 @@ static PINT_sm_action create_datafiles_s
     memset(s_op, 0, sizeof(struct PINT_server_op));
     s_op->op = PVFS_SERV_CREATE;
     s_op->u.create.fs_id = sm_p->object_ref.fs_id;
-    s_op->u.create.handle_extent_array = io_handle_extent_array[0];
+    s_op->u.create.handle_extent_array = sm_p->u.create_file.io_handle_extent_array[0];
     s_op->u.create.object_type = PVFS_TYPE_DATAFILE;
         
     s_op->target_fs_id = sm_p->object_ref.fs_id;
@@ -804,289 +745,111 @@ static PINT_sm_action create_datafiles_s
     s_op->credentials.gid = sm_p->cred_p->gid;
         
     PINT_sm_push_frame(smcb, 0, s_op);
+
+    /* TODO: Use a centralized method of obtaining ds_attr
+             that can be called from other places as well.  */
+    memset(&(s_op->ds_attr), 0, sizeof(PVFS_ds_attributes));
+    ret = job_trove_dspace_getattr(
+        sm_p->object_ref.fs_id, sm_p->object_ref.handle, smcb, &(s_op->ds_attr),
+        0, js_p, &tmp_id, server_job_context);
+
  
-    /* allocate memory for the data handles if we haven't already */
-    if (sm_p->u.create_file.datafile_handles == NULL)
-    {
-        sm_p->u.create_file.datafile_handles = (PVFS_handle *)malloc(
-            sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
+    return ret;
+//    return SM_ACTION_COMPLETE;
+}
 
-        if (sm_p->u.create_file.datafile_handles == NULL)
-        {
-            gossip_err("create: Failed to allocate data handle array\n");
-            return -PVFS_ENOMEM;
-        }
-        memset(sm_p->u.create_file.datafile_handles, 0,
-               sm_p->u.create_file.num_data_files * sizeof(PVFS_handle));
-    }
-    return SM_ACTION_COMPLETE;
-}
-
-static PINT_sm_action create_datafiles_cleanup(
+static PINT_sm_action create_file_datafiles_local_cleanup_and_setattr_setup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, 1);
+    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_PREVIOUS);
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     int task_id;
     int error_code;
+    job_id_t tmp_id;
+    int ret = -PVFS_EINVAL;
+
+    assert(sm_p);
+    assert(s_op);
 
-/* TODO: make it work for more than 1 datafile */
     /* stash the newly created data file handle */
-    sm_p->u.create_file.datafile_handles[0] = s_op->resp.u.create.handle;
+/*    sm_p->u.create_file.datafile_handles[0] = s_op->resp.u.create.handle; */
+    sm_p->u.create_file.datafile_handles[0] = js_p->handle;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "Datafile handle %d is %llu\n",
                  0, llu(sm_p->u.create_file.datafile_handles[0]));
 
     PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
     js_p->error_code = error_code;
- 
-    return SM_ACTION_COMPLETE;
-}
-
-#if 0
-/* sets up an array to create N datafiles */
-/* some of these might be local and should be */
-/* handled differently  - this will e re-written */
-/* to do tree-based collective communication */
-static PINT_sm_action create_datafiles_setup_msgpair_array(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -PVFS_EINVAL, i = 0;
-    struct server_configuration_s *server_config = NULL;
-    PVFS_handle_extent_array *io_handle_extent_array;
-    struct PINT_server_op *s_op = malloc(sizeof(struct PINT_server_op));
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
-                 "datafiles_setup_msgpair_array\n");
-
-    if(!s_op)
-    {
-        return -PVFS_ENOMEM;
-    }
-    /* zero out all members */
-    memset(s_op, 0, sizeof(struct PINT_server_op));
-    s_op->req = malloc(sizeof(struct PVFS_server_req));
-    if(!s_op->req)
-    {
-        return -PVFS_ENOMEM;
-    }
-    PINT_sm_push_frame(smcb, 0, s_op);
-
-/* TODO: Use create_file instead. */
-    s_op->op = PVFS_SERV_CREATE;
-    s_op->req->u.create.fs_id = sm_p->object_ref.fs_id;
-    s_op->req->u.create.object_type = PVFS_TYPE_DATAFILE;
-//    s_op->req->u.create.handle_extent_array =  io_handle_extent_array;
-
-    js_p->error_code = 0;
-
-    /* allocate handle extent array objects */
-/* TODO: need to free io_handle_extent_array */
-    io_handle_extent_array = (PVFS_handle_extent_array *)
-        malloc(sm_p->u.create_file.num_data_files *
-               sizeof(PVFS_handle_extent_array));
-    if (!io_handle_extent_array)
-    {
-        gossip_err("create: failed to allocate handle_extent_array\n"); 
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-
-    /* allocate data server bmi address array */
-    if (sm_p->u.create_file.data_server_addrs == NULL)
-    {
-        sm_p->u.create_file.data_server_addrs = (PVFS_BMI_addr_t *)malloc(
-            sm_p->u.create_file.num_data_files * sizeof(PVFS_BMI_addr_t));
-    }
-    if (!sm_p->u.create_file.data_server_addrs)
-    {
-        gossip_err("create: failed to allocate data server addrs\n"); 
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-
-    server_config = get_server_config_struct();
-
-    ret = PINT_cached_config_map_servers(
-        server_config,
-        sm_p->object_ref.fs_id,
-        &sm_p->u.create_file.num_data_files,
-        &sm_p->u.create_file.layout,
-        sm_p->u.create_file.data_server_addrs,
-        io_handle_extent_array);
-    if(ret < 0)
-    {
-        gossip_err("create: failed to map the layout to a set of IO servers\n");
-        js_p->error_code = ret;
-        return 1;
-    }
-
-    if (ret)
-    {
-        gossip_err("Failed to retrieve data server addresses\n");
-        js_p->error_code = ret;
-        return SM_ACTION_COMPLETE;
-    }
-
-    memset(&sm_p->msgpair, 0, sizeof(PINT_sm_msgpair_state));
 
-    /* allocate msgarray and set msgarray_count */
-    if (sm_p->msgarray && (sm_p->msgarray != &(sm_p->msgpair)))
-    {
-        free(sm_p->msgarray);
-    }
-    s_op->msgarray = (PINT_sm_msgpair_state *)malloc(
-        (sm_p->u.create_file.num_data_files * sizeof(PINT_sm_msgpair_state)));
-    if (s_op->msgarray == NULL)
-    {
-        gossip_err("create: failed to allocate msgarray\n");
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-    s_op->msgarray_count = sm_p->u.create_file.num_data_files;
-
-    /* for each datafile, prepare to post a create send/recv pair */
-    for(i = 0; i < sm_p->u.create_file.num_data_files; i++)
-    {
-        PINT_sm_msgpair_state *msg_p = &s_op->msgarray[i];
-
-        PINT_SERVREQ_CREATE_FILL(
-            msg_p->req,
-            *sm_p->cred_p,
-            sm_p->object_ref.fs_id,
-            PVFS_TYPE_DATAFILE,     /* this creates a datafile */
-            io_handle_extent_array[i]);
-//            s_op->req->u.create.handle_extent_array.extent_array[i]);
-
-        gossip_debug(GOSSIP_CLIENT_DEBUG,  "posting datafile[%d] create "
-                     "with extents %llu-%llu\n", i,
-                     llu(io_handle_extent_array[i].
-                         extent_array[0].first),
-                     llu(io_handle_extent_array[i].
-                         extent_array[0].last));
-
-        msg_p->fs_id = sm_p->object_ref.fs_id;
-        msg_p->handle = io_handle_extent_array[i].extent_array[0].first;
-        msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-        msg_p->comp_fn = create_datafiles_comp_fn;
-        msg_p->svr_addr = sm_p->u.create_file.data_server_addrs[i];
-    }
-    return SM_ACTION_COMPLETE;
-}
-#endif
-
-#if 0
-static PINT_sm_action create_datafiles_failure(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    sm_p->u.create_file.stored_error_code = js_p->error_code;
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create state: datafiles_failure\n");
-    return SM_ACTION_COMPLETE;
-}
-#endif
-
-/* this writes metadata to the metafile - this should now be local */
-
-/* setup and push a new frame for calling the nested set-attr state machine */
-static PINT_sm_action create_setattr_setup(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct PINT_server_op *s_op = malloc(sizeof(struct PINT_server_op));
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create state: setattr_setup\n");
-
-    if(!s_op)
-    {
-        return -PVFS_ENOMEM;
+    if (error_code != 0) {
+//        free(s_op);
     }
-    /* zero out all members */
-    memset(s_op, 0, sizeof(struct PINT_server_op));
-    PINT_sm_push_frame(smcb, 0, s_op);
-
-    s_op->op = PVFS_SERV_SETATTR;
-/*
-    s_op->credentials.uid = sm_p->cred_p->uid;
-    s_op->credentials.gid = sm_p->cred_p->gid;
-*/
-    s_op->u.setattr.handle = sm_p->object_ref.handle;
-    s_op->u.setattr.fs_id = sm_p->object_ref.fs_id;
-    s_op->u.setattr.attr = sm_p->u.create_file.attr;
-    s_op->u.setattr.attr.objtype = PVFS_TYPE_METAFILE;
-    s_op->u.setattr.attr.u.meta.dfile_array =
-        sm_p->u.create_file.datafile_handles;
-    s_op->u.setattr.attr.u.meta.dfile_count =
-        sm_p->u.create_file.num_data_files;
-    s_op->u.setattr.attr.u.meta.dist =
-        sm_p->u.create_file.dist;
-    s_op->u.setattr.attr.u.meta.dist_size =
-        PINT_DIST_PACK_SIZE(sm_p->u.create_file.dist);
-    s_op->attr = s_op->u.setattr.attr;
+    else {
+        /* TODO: Determine whether setattr is local or remote and set
+                 up for the appropriate one. For now just assume local. */
+        /* We will just reuse the sm_p that was just popped to set up for setattr. */
+        memset(s_op, 0, sizeof(struct PINT_server_op));
+        PINT_sm_push_frame(smcb, 0, s_op);
 
-    s_op->target_fs_id = sm_p->object_ref.fs_id;
-    s_op->target_handle = sm_p->u.create.metafile_handle;
-    s_op->credentials.uid = sm_p->cred_p->uid;
-    s_op->credentials.gid = sm_p->cred_p->gid;
+        s_op->op = PVFS_SERV_SETATTR;
+    /*
+        s_op->credentials.uid = sm_p->cred_p->uid;
+        s_op->credentials.gid = sm_p->cred_p->gid;
+    */
+        s_op->u.setattr.handle = sm_p->object_ref.handle;
+        s_op->u.setattr.fs_id = sm_p->object_ref.fs_id;
+        s_op->u.setattr.attr = sm_p->u.create_file.attr;
+        s_op->u.setattr.attr.objtype = PVFS_TYPE_METAFILE;
+        s_op->u.setattr.attr.u.meta.dfile_array =
+            sm_p->u.create_file.datafile_handles;
+        s_op->u.setattr.attr.u.meta.dfile_count =
+            sm_p->u.create_file.num_data_files;
+        s_op->u.setattr.attr.u.meta.dist =
+            sm_p->u.create_file.dist;
+        s_op->u.setattr.attr.u.meta.dist_size =
+            PINT_DIST_PACK_SIZE(sm_p->u.create_file.dist);
+        s_op->attr = s_op->u.setattr.attr;
 
-#if 0
-    PINT_SERVREQ_SETATTR_FILL(
-        msg_p->req,
-        *sm_p->cred_p,
-        sm_p->object_ref.fs_id,
-        sm_p->u.create_file.metafile_handle,
-        PVFS_TYPE_METAFILE,
-        sm_p->u.create_file.attr,
-        PVFS_ATTR_META_ALL);
-
-    msg_p->req.u.setattr.attr.u.meta.dfile_array =
-        sm_p->u.create_file.datafile_handles;
-    msg_p->req.u.setattr.attr.u.meta.dfile_count =
-        sm_p->u.create_file.num_data_files;
-    msg_p->req.u.setattr.attr.u.meta.dist =
-        sm_p->u.create_file.dist;
-    msg_p->req.u.setattr.attr.u.meta.dist_size =
-        PINT_DIST_PACK_SIZE(sm_p->u.create_file.dist);
-
-    msg_p->fs_id = sm_p->object_ref.fs_id;
-    msg_p->handle = sm_p->u.create_file.metafile_handle;
-    msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-    msg_p->comp_fn = create_setattr_comp_fn;
+        s_op->credentials.uid = sm_p->cred_p->uid;
+        s_op->credentials.gid = sm_p->cred_p->gid;
 
-    ret = PINT_cached_config_map_to_server(
-        &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
+        /* TODO: Use a centralized method of obtaining ds_attr
+                 that can be called from other places as well.  */
+        memset(&(s_op->ds_attr), 0, sizeof(PVFS_ds_attributes));
+        ret = job_trove_dspace_getattr(
+            sm_p->object_ref.fs_id, sm_p->u.create.metafile_handle, smcb, &(s_op->ds_attr),
+            0, js_p, &tmp_id, server_job_context);
 
-    if (ret)
-    {
-        gossip_err("Failed to map meta server address\n");
-        js_p->error_code = ret;
+        js_p->error_code = LOCAL_OPERATION;
+        return (ret);
     }
-#endif
+ 
     return SM_ACTION_COMPLETE;
 }
 
-static PINT_sm_action create_setattr_cleanup(
+static PINT_sm_action create_file_setattr_cleanup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     int task_id;
     int error_code;
 
+    assert(s_op);
     PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
+//    free(s_op);
     js_p->error_code = error_code;
  
     return SM_ACTION_COMPLETE;
 }
 
 /* setup and push a new frame for calling the nested crdirent state machine */
-static PINT_sm_action create_crdirent_setup(
+static PINT_sm_action create_file_crdirent_setup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     struct PINT_server_op *s_op = malloc(sizeof(struct PINT_server_op));
+    job_id_t tmp_id;
+    int ret = -PVFS_EINVAL;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG,
                  "create state: crdirent_setup\n");
@@ -1107,199 +870,43 @@ static PINT_sm_action create_crdirent_se
     s_op->u.crdirent.fs_id = sm_p->object_ref.fs_id;
     s_op->u.crdirent.dir_attr_update_required = 0;
 
-    s_op->target_fs_id = sm_p->object_ref.fs_id;
-/*    s_op->target_handle = sm_p->u.create_file.metafile_handle; */
-    s_op->target_handle = sm_p->parent_ref.handle;
     s_op->credentials.uid = sm_p->cred_p->uid;
     s_op->credentials.gid = sm_p->cred_p->gid;
 
-    js_p->error_code = 0;
-    return SM_ACTION_COMPLETE;
-}
+    /* TODO: Use a centralized method of obtaining ds_attr
+             that can be called from other places as well.  */
+    memset(&(s_op->ds_attr), 0, sizeof(PVFS_ds_attributes));
+    ret = job_trove_dspace_getattr(
+        sm_p->object_ref.fs_id, sm_p->parent_ref.handle, smcb, &(s_op->ds_attr),
+        0, js_p, &tmp_id, server_job_context);
 
-static PINT_sm_action create_crdirent_cleanup(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    int task_id;
-    int error_code;
-
-    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
-    js_p->error_code = error_code;
- 
-    return SM_ACTION_COMPLETE;
-}
-
-#if 0
-/* this creates a directory entry - this may or may not be local */
-static PINT_sm_action create_crdirent_setup_msgpair(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -1;
-    PINT_sm_msgpair_state *msg_p = NULL;
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create state: crdirent_setup_msgpair\n");
 
     js_p->error_code = 0;
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create: %s: posting crdirent req: parent handle: %llu, "
-                 "name: %s, handle: %llu\n",
-                 __func__,
-                 llu(sm_p->object_ref.handle), sm_p->u.create_file.object_name,
-                 llu(sm_p->u.create_file.metafile_handle));
-
-    PINT_init_msgpair(sm_p, msg_p);
-
-    PINT_SERVREQ_CRDIRENT_FILL(
-        msg_p->req,
-        *sm_p->cred_p,
-        sm_p->u.create_file.object_name,
-        sm_p->u.create_file.metafile_handle,
-        sm_p->object_ref.handle,
-        sm_p->object_ref.fs_id);
-
-    msg_p->fs_id = sm_p->object_ref.fs_id;
-    msg_p->handle = sm_p->object_ref.handle;
-    msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-    msg_p->comp_fn = create_crdirent_comp_fn;
-
-    ret = PINT_cached_config_map_to_server(
-        &msg_p->svr_addr, sm_p->object_ref.handle,
-        sm_p->object_ref.fs_id);
-
-    if (ret)
-    {
-        gossip_err("Failed to map meta server address\n");
-        js_p->error_code = ret;
-    }
-    return SM_ACTION_COMPLETE;
+    return ret;
 }
-#endif
 
-static PINT_sm_action create_crdirent_failure(
+static PINT_sm_action build_obj_attr(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: crdirent_failure\n");
-
-    sm_p->u.create_file.stored_error_code = js_p->error_code;
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
 
-    if (sm_p->u.create_file.stored_error_code == -PVFS_EEXIST)
-    {
-        gossip_debug(GOSSIP_CLIENT_DEBUG, "crdirent failed: "
-                     "dirent already exists!\n");
-    }
+    PVFS_ds_attr_to_object_attr(&s_op->ds_attr, &s_op->attr);
+    s_op->attr.mask = PVFS_ATTR_COMMON_ALL;
     return SM_ACTION_COMPLETE;
 }
 
-/* delete the newly created meta and data handles */
-static PINT_sm_action create_delete_handles_setup_msgpair_array(
+static PINT_sm_action create_crdirent_cleanup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -PVFS_EINVAL, i = 0, actual_count = 0;
-    PVFS_BMI_addr_t metafile_server_addr;
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
-                 "delete_handles_setup_msgpair_array\n");
-
-    js_p->error_code = 0;
-
-    memset(&sm_p->msgpair, 0, sizeof(PINT_sm_msgpair_state));
-
-    ret = PINT_cached_config_map_to_server(
-        &metafile_server_addr, sm_p->u.create_file.metafile_handle,
-        sm_p->object_ref.fs_id);
-
-    if (ret)
-    {
-        gossip_err("Failed to map meta server address\n");
-        js_p->error_code = ret;
-        return SM_ACTION_COMPLETE;
-    }
-
-    /*
-      in the case that all datafiles have already been created,
-      actual_count will be (sm_p->u.create_file.num_data_files + 1).
-      otherwise, it will be somewhere between 1 (for the metafile) and
-      1 + the number of data files
-    */
-    actual_count = 1;
-    for(i = 0; i < sm_p->u.create_file.num_data_files; i++)
-    {
-        if (sm_p->u.create_file.datafile_handles &&
-            (sm_p->u.create_file.datafile_handles[i] != PVFS_HANDLE_NULL))
-        {
-            actual_count++;
-        }
-    }
-
-    if (sm_p->msgarray && (sm_p->msgarray != &(sm_p->msgpair)))
-    {
-        free(sm_p->msgarray);
-    }
-    sm_p->msgarray = (PINT_sm_msgpair_state *)malloc(
-        (actual_count * sizeof(PINT_sm_msgpair_state)));
-
-    if (sm_p->msgarray == NULL)
-    {
-        gossip_err("create: failed to allocate msgarray\n"); 
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-    sm_p->msgarray_count = actual_count;
-
-    assert(sm_p->u.create_file.data_server_addrs);
-
-    /*
-      for the metafile and each datafile, prepare to post a remove
-      send/recv pair
-    */
-    for(i = 0; i < actual_count; i++)
-    {
-        PINT_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
-
-        gossip_debug(GOSSIP_CLIENT_DEBUG,
-                     "create: posting data file remove req %d\n",i);
-
-        /* arbitrarily handle deletion of the metafile last */
-        if (i == (actual_count - 1))
-        {
-            PINT_SERVREQ_REMOVE_FILL(
-                msg_p->req,
-                *sm_p->cred_p,
-                sm_p->object_ref.fs_id,
-                sm_p->u.create_file.metafile_handle);
-
-            msg_p->fs_id = sm_p->object_ref.fs_id;
-            msg_p->handle = sm_p->u.create_file.metafile_handle;
-            msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-            msg_p->comp_fn = create_delete_handles_comp_fn;
-            msg_p->svr_addr = metafile_server_addr;
-
-            gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
-                         "metafile handle %llu\n", llu(msg_p->handle));
-        }
-        else
-        {
-            PINT_SERVREQ_REMOVE_FILL(
-                msg_p->req,
-                *sm_p->cred_p,
-                sm_p->object_ref.fs_id,
-                sm_p->u.create_file.datafile_handles[i]);
-
-            msg_p->fs_id = sm_p->object_ref.fs_id;
-            msg_p->handle = sm_p->u.create_file.datafile_handles[i];
-            msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-            msg_p->comp_fn = create_delete_handles_comp_fn;
-            msg_p->svr_addr = sm_p->u.create_file.data_server_addrs[i];
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int task_id;
+    int error_code;
 
-            gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
-                         "datafile handle %llu\n", llu(msg_p->handle));
-        }
-    }
+    assert(s_op);
+    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
+//    free(s_op);
+    js_p->error_code = error_code;
+ 
     return SM_ACTION_COMPLETE;
 }
 
@@ -1408,216 +1015,6 @@ static PINT_sm_action create_file_work_c
     return SM_ACTION_TERMINATE;
 }
 
-/* In this function determine whether the parent's attribs    */
-/* are stored locally. If they are, set up the frame to call  */
-/* the nested get_attr state machine.                         */
-static PINT_sm_action create_find_attrs(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    struct server_configuration_s *config = get_server_config_struct();
-    char server_name[1024];
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "create-file state: create_find_attrs\n");
-
-    js_p->error_code = 0;
-
-    PINT_cached_config_get_server_name(server_name, 1024, sm_p->parent_ref.handle,
-        sm_p->parent_ref.fs_id);
-
-    if (! strcmp(config->host_id, server_name)) {
-        /* Set up the frame for the nested get_attr state machine */
-        struct PINT_server_op *s_op = malloc(sizeof(struct PINT_server_op));
-
-        if(!s_op)
-        {
-            return -PVFS_ENOMEM;
-        }
-        /* zero out all members */
-        memset(s_op, 0, sizeof(struct PINT_server_op));
-
-        s_op->op = PVFS_SERV_GETATTR;
-        s_op->u.getattr.handle = sm_p->parent_ref.handle;
-        s_op->u.getattr.fs_id = sm_p->parent_ref.fs_id;
-        s_op->u.getattr.attrmask = PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT;
-        s_op->attr.mask = PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT;
-        s_op->attr.objtype = PVFS_TYPE_DIRECTORY;
-        PINT_sm_push_frame(smcb, 0, s_op);
-        js_p->error_code = LOCAL_ATTRS;
-
-        s_op->target_fs_id = sm_p->parent_ref.fs_id;
-        s_op->target_handle = sm_p->parent_ref.handle;
-        s_op->credentials.uid = sm_p->cred_p->uid;
-        s_op->credentials.gid = sm_p->cred_p->gid;
-/*
-        s_op->access_type = PINT_server_req_table[s_op->op].params->access_type(NULL);
-        s_op->sched_policy = PINT_server_req_table[s_op->op].params->sched_policy;
-*/
-        return(SM_ACTION_COMPLETE);
-    }
-    else
-    {
-        /* it's not on this server. Fall through to another */
-        /* state to do the lookup on the correct server.    */
-        /* TODO:  set up machine for remote lookup here. I  */
-        /* think PINT_SM_GETATTR_STATE_FILL should move from*/
-        /* create_init to here.  */
-        sm_p->getattr.object_ref = sm_p->parent_ref;
-        sm_p->getattr.req_attrmask = PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT;
-        js_p->error_code = REMOTE_ATTRS;
-        return(SM_ACTION_COMPLETE);
-    }
-}
-
-static PINT_sm_action create_parent_getattr_local_cleanup(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    int task_id;
-    int error_code;
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, 1);
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-
-    memcpy(&sm_p->getattr.attr, &s_op->attr, sizeof(sm_p->getattr.attr));
-
-    PINT_sm_pop_frame(smcb, &task_id, &error_code, NULL);
-    js_p->error_code = error_code;
-    return(SM_ACTION_COMPLETE);
-}
-
-/** looks at the attributes of the parent directory and decides if it impacts
- *  the file creation in any way
- */
-static PINT_sm_action create_parent_getattr_inspect(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PVFS_object_attr *attr = NULL;
-    int num_dfiles_requested_override = 0;
-    PINT_dist *current_dist; 
-    int ret = 0;
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: parent_getattr_inspect\n");
-
-    attr = &sm_p->getattr.attr;
-    assert(attr);
-
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "parent owner: %d, group: %d, perms: %d\n",
-        (int)attr->owner, (int)attr->group, (int)attr->perms);
-
-    /* do we have a setgid bit? */
-    if(attr->perms & PVFS_G_SGID)
-    {
-        gossip_debug(GOSSIP_CLIENT_DEBUG, "parent has setgid bit set.\n");
-        gossip_debug(GOSSIP_CLIENT_DEBUG, " - modifying requested attr "
-                                          "for new file.\n");
-        sm_p->u.create_file.attr.group = attr->group;
-        /* note that permission checking is left to server even in this case */
-    }
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "create_parent_getattr: [%p] "
-        "dfile_count     = %d "
-        "dist_name_len   = %d "
-        "dist_params_len = %d\n",
-        attr,
-        attr->u.dir.hint.dfile_count,
-        attr->u.dir.hint.dist_name_len,
-        attr->u.dir.hint.dist_params_len);
-
-    num_dfiles_requested_override = attr->u.dir.hint.dfile_count;
-    /* override the # of data files for this create */
-    if (num_dfiles_requested_override > 0)
-    {
-        /* Determine the number of dfiles */
-        PINT_cached_config_get_num_dfiles(sm_p->object_ref.fs_id,
-                sm_p->u.create_file.dist,
-                num_dfiles_requested_override,
-                &sm_p->u.create_file.num_data_files);
-    }
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "Setting number of datafiles to %d [requested %d]\n", 
-        sm_p->u.create_file.num_data_files, num_dfiles_requested_override);
-    current_dist = sm_p->u.create_file.dist;
-    /* We have an overriding distribution name for this directory.. honor that */
-    if (attr->u.dir.hint.dist_name_len > 0)
-    {
-        /* switch it only if it is different! */
-        if (strcmp(attr->u.dir.hint.dist_name, current_dist->dist_name))
-        {
-            PINT_dist *new_dist = NULL;
-            new_dist = PINT_dist_create(attr->u.dir.hint.dist_name);
-            if (new_dist)
-            {
-                gossip_debug(GOSSIP_CLIENT_DEBUG, "Overridding distribution name to %s instead of %s\n",
-                    attr->u.dir.hint.dist_name,
-                    current_dist->dist_name);
-                PINT_dist_free(current_dist);
-                sm_p->u.create_file.dist = new_dist;
-                current_dist = new_dist;
-            }
-            else
-            {
-                gossip_debug(GOSSIP_CLIENT_DEBUG, "Could not override distribution name with %s instead of %s\n",
-                    attr->u.dir.hint.dist_name,
-                    current_dist->dist_name);
-            }
-        }
-        else {
-            gossip_debug(GOSSIP_CLIENT_DEBUG, "retaining current distribution name %s\n",
-                current_dist->dist_name);
-        }
-    }
-
-    /* okay, we might need to override some dist params as well */
-    if (attr->u.dir.hint.dist_params_len > 0)
-    {
-        /* We have a series of comma separated key:val strings */
-        char **key, **val;
-        int64_t tmp_val;
-        int nparams = 0;
-
-        /* ignore parse errors! */
-        if (PINT_split_keyvals(attr->u.dir.hint.dist_params,
-            &nparams, &key, &val) == 0)
-        {
-            int i;
-            for (i = 0; i < nparams; i++)
-            {
-                gossip_debug(GOSSIP_CLIENT_DEBUG, "distribution parameter %s, value %s\n",
-                    key[i], val[i]);
-                /* NOTE: just as in server-config.c when parsing "Param" and
-                 * "Value" fields, we will assume that all values are 64 bit
-                 * integers.  The only difference here is that we scan
-                 * directly into a 64 bit integer, rather than converting
-                 * from the int format that dotconf supports.
-                 */
-                ret = sscanf(val[i], SCANF_lld, &tmp_val);
-                if(ret != 1)
-                {
-                    gossip_err("Error: unsupported type for distribution parameter %s, value %s found in directory hints.\n", 
-                        key[i], val[i]);
-                    gossip_err("Error: continuing anyway.\n");
-                }
-                else
-                {
-                    if(current_dist->methods->set_param(current_dist->dist_name,
-                        current_dist->params,
-                        key[i],
-                        &tmp_val))
-                    {
-
-                        gossip_err("Error: could not override hinted distribution parameter %s, value %s found in directory hints\n",
-                            key[i],
-                            val[i]);
-                    }
-                 }
-                 free(key[i]);
-                 free(val[i]);
-            }
-            free(key);
-            free(val);
-        }
-    }
-    return SM_ACTION_COMPLETE;
-}
 
 static inline int PINT_get_object_ref_create_file(
     struct PVFS_server_req *req, PVFS_fs_id *fs_id, PVFS_handle *handle)

Index: create.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/create.sm,v
diff -p -u -r1.46.4.3 -r1.46.4.4
--- create.sm	27 May 2008 18:00:53 -0000	1.46.4.3
+++ create.sm	30 May 2008 19:27:08 -0000	1.46.4.4
@@ -88,6 +88,7 @@ static int create_create(
     int ret = -1;
     job_id_t i;
 
+    js_p->error_code = 0;
     ret = job_trove_dspace_create(
         s_op->u.create.fs_id,
         &s_op->u.create.handle_extent_array,



More information about the Pvfs2-cvs mailing list