[Pvfs2-cvs] commit by nlmills in pvfs2/src/server: batch-create.sm batch-remove.sm precreate-pool-refiller.sm unstuff.sm chdirent.sm check.c check.h crdirent.sm create.sm del-eattr.sm event-mon.sm final-response.sm flush.sm get-attr.sm get-cred.sm get-eattr.sm io.sm iterate-handles.sm list-attr.sm list-eattr.sm lookup.sm mgmt-get-dirdata-handle.sm mgmt-remove-dirent.sm mgmt-remove-object.sm mkdir.sm module.mk.in prelude.sm proto-error.sm pvfs2-server-req.c pvfs2-server.c pvfs2-server.h readdir.sm remove.sm rmdirent.sm set-attr.sm set-eattr.sm setparam.sm small-io.sm truncate.sm unexpected.sm

CVS commit program cvs at parl.clemson.edu
Tue Aug 25 13:56:32 EDT 2009


Update of /anoncvs/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv5511/src/server

Modified Files:
      Tag: cu-security-branch
	chdirent.sm check.c check.h crdirent.sm create.sm del-eattr.sm 
	event-mon.sm final-response.sm flush.sm get-attr.sm 
	get-cred.sm get-eattr.sm io.sm iterate-handles.sm list-attr.sm 
	list-eattr.sm lookup.sm mgmt-get-dirdata-handle.sm 
	mgmt-remove-dirent.sm mgmt-remove-object.sm mkdir.sm 
	module.mk.in prelude.sm proto-error.sm pvfs2-server-req.c 
	pvfs2-server.c pvfs2-server.h readdir.sm remove.sm rmdirent.sm 
	set-attr.sm set-eattr.sm setparam.sm small-io.sm truncate.sm 
	unexpected.sm 
Added Files:
      Tag: cu-security-branch
	batch-create.sm batch-remove.sm precreate-pool-refiller.sm 
	unstuff.sm 
Log Message:
merged in changes from summer at LANL


--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ batch-create.sm	2009-08-25 13:56:32.000000000 -0400
@@ -0,0 +1,184 @@
+/* 
+ * (C) 2001 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "gossip.h"
+#include "pvfs2-internal.h"
+#include "pint-security.h"
+
+%%
+
+machine pvfs2_batch_create_sm
+{
+    state prelude
+    {
+        jump pvfs2_prelude_sm;
+        success => create;
+        default => final_response;
+    }
+
+    state create
+    {
+        run batch_create_create;
+        default => final_response;
+    }
+
+    state final_response
+    {
+        jump pvfs2_final_response_sm;
+        default => cleanup;
+    }
+
+    state cleanup
+    {
+        run batch_create_cleanup;
+        default => terminate;
+    }
+}
+
+%%
+
+
+/*
+ * Function: batch_create_create
+ *
+ * Params:   server_op *s_op, 
+ *           job_status_s* js_p
+ *
+ * Pre:      None
+ *
+ * Post:     None
+ *
+ * Returns:  int
+ *
+ * Synopsis: Create a dataspace.
+ */
+static int batch_create_create(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t i;
+
+    if(s_op->req->u.batch_create.object_count < 1)
+    {
+        js_p->error_code = -PVFS_EINVAL;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    s_op->resp.u.batch_create.handle_count 
+        = s_op->req->u.batch_create.object_count;
+
+    /* allocate some space to hold the handles we create */
+    s_op->resp.u.batch_create.handle_array = 
+        malloc(s_op->req->u.batch_create.object_count * sizeof(PVFS_handle));
+    if(!s_op->resp.u.batch_create.handle_array)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    ret = job_trove_dspace_create_list(
+        s_op->req->u.batch_create.fs_id,
+        &s_op->req->u.batch_create.handle_extent_array,
+        s_op->resp.u.batch_create.handle_array,
+        s_op->req->u.batch_create.object_count,
+        s_op->req->u.batch_create.object_type,
+        NULL,
+        TROVE_SYNC,
+        smcb,
+        0,
+        js_p,
+        &i,
+        server_job_context,
+        s_op->req->hints);
+
+     return(ret);
+}
+
+/*
+ * Function: batch_create_cleanup
+ *
+ * Params:   server_op *b, 
+ *           job_status_s* js_p
+ *
+ * Pre:      None
+ *
+ * Post:     None
+ *
+ * Returns:  int
+ *
+ * Synopsis: free memory and return
+ *           
+ */
+static int batch_create_cleanup(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int i;
+
+    if(s_op->resp.status == 0)
+    {
+        for(i=0; i<s_op->resp.u.batch_create.handle_count; i++)
+        {
+            gossip_debug(
+                GOSSIP_SERVER_DEBUG, "Batch created: %llu\n",
+                llu(s_op->resp.u.batch_create.handle_array[i]));
+        }
+    }
+
+    if(s_op->resp.u.batch_create.handle_array)
+    {
+        free(s_op->resp.u.batch_create.handle_array);
+    }
+
+    return(server_state_machine_complete(smcb));
+}
+
+static int perm_batch_create(PINT_server_op *s_op)
+{
+    int ret;
+
+    if (s_op->req->capability.op_mask & PINT_CAP_BATCH_CREATE)
+    {
+        ret = 0;
+    }
+    else if ((s_op->req->capability.op_mask & PINT_CAP_CREATE) &&
+             (s_op->req->u.batch_create.object_count == 1))
+    {
+        /* create capability allows you to create only one object */
+        ret = 0;
+    }
+    else
+    {
+        ret = -PVFS_EACCES;
+    }
+
+    return ret;
+}
+
+struct PINT_server_req_params pvfs2_batch_create_params =
+{
+    .string_name = "batch_create",
+    .perm = perm_batch_create,
+    .access_type = PINT_server_req_modify,
+    .state_machine = &pvfs2_batch_create_sm
+};
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ batch-remove.sm	2009-08-25 13:56:32.000000000 -0400
@@ -0,0 +1,248 @@
+/* 
+ * (C) 2001 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "gossip.h"
+#include "pvfs2-internal.h"
+
+enum
+{
+    REMOVE_NEXT = 1
+};
+
+%%
+
+machine pvfs2_batch_remove_sm
+{
+    state setup_prelude
+    {
+        run setup_prelude;
+        default => prelude;
+    }
+
+    state prelude
+    {
+        jump pvfs2_prelude_work_sm;
+        success => setup_remove;
+        default => release;
+    }
+
+    state setup_remove
+    {
+        run setup_remove;
+        success => remove;
+        default => remove_complete;
+    }
+
+    state remove
+    {
+        jump pvfs2_remove_work_sm;
+        default => remove_complete;
+    }
+
+    state remove_complete
+    {
+        run remove_complete;
+        default => release;
+    }
+
+    state release
+    {
+        run release;
+        default => remove_next;
+    }
+
+    state remove_next
+    {
+        run remove_next;
+        REMOVE_NEXT => setup_prelude;
+        default => response;
+    }
+
+    state response
+    {
+        jump pvfs2_final_response_sm;
+        default => cleanup;
+    }
+
+    state cleanup
+    {
+        run cleanup;
+        default => terminate;
+    }
+}
+
+%%
+
+static PINT_sm_action setup_prelude(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    assert(s_op);
+
+    /* get the object to remove, the access and scheduling policies */
+    s_op->target_fs_id = s_op->req->u.batch_remove.fs_id;
+    s_op->target_handle =
+        s_op->req->u.batch_remove.handles[s_op->u.batch_remove.handle_index];
+
+    s_op->access_type = PINT_server_req_get_access_type(s_op->req);
+    s_op->sched_policy = PINT_server_req_get_sched_policy(s_op->req);
+
+    js_p->error_code = 0;
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action setup_remove(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct PINT_server_op *remove_op;
+    int ret;
+
+    remove_op = malloc(sizeof(*remove_op));
+    if(!remove_op)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    memset(remove_op, 0, sizeof(*remove_op));
+
+    remove_op->u.remove.fs_id = s_op->target_fs_id;
+    remove_op->u.remove.handle = s_op->target_handle;
+
+    ret = PINT_sm_push_frame(smcb, 0, remove_op);
+    if(ret < 0)
+    {
+        js_p->error_code = ret;
+    }
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action remove_complete(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *remove_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct PINT_server_op *s_op;
+    int error_code;
+    int task_id;
+    int remaining;
+
+    s_op = PINT_sm_pop_frame(smcb, &task_id, &error_code, &remaining);
+
+    free(remove_op);
+
+    if(error_code != 0)
+    {
+        s_op->u.batch_remove.error_code = error_code;
+        return SM_ACTION_COMPLETE;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action release(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+    int ret;
+
+    /* we need to release the scheduled remove request on the target
+     * handle.  The schedule call occurred in the prelude_work sm */
+
+    if(!s_op->scheduled_id)
+    {
+        return SM_ACTION_COMPLETE;
+    }
+
+    if(js_p->error_code)
+    {
+        s_op->u.batch_remove.error_code = js_p->error_code;
+    }
+
+    ret = job_req_sched_release(s_op->scheduled_id, smcb, 0, js_p, &tmp_id,
+                                server_job_context);
+    s_op->scheduled_id = 0;
+    return ret;
+}
+
+static PINT_sm_action remove_next(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    if(s_op->u.batch_remove.error_code != 0)
+    {
+        js_p->error_code = s_op->u.batch_remove.error_code;
+        return SM_ACTION_COMPLETE;
+    }
+
+    if(js_p->error_code != 0)
+    {
+        return SM_ACTION_COMPLETE;
+    }
+
+    s_op->u.batch_remove.handle_index++;
+    if(s_op->u.batch_remove.handle_index < s_op->req->u.batch_remove.handle_count)
+    {
+        js_p->error_code = REMOVE_NEXT;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+
+static int cleanup(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    return(server_state_machine_complete(smcb));
+}
+
+static int perm_batch_remove(PINT_server_op *s_op)
+{
+    int ret;
+
+    if (s_op->req->capability.op_mask & PINT_CAP_BATCH_REMOVE)
+    {
+        ret = 0;
+    }
+    else if ((s_op->req->capability.op_mask & PINT_CAP_REMOVE) &&
+             (s_op->req->u.batch_remove.handle_count == 1))
+    {
+        /* remove capability allows you to remove only one object */
+        ret = 0;
+    }
+    else
+    {
+        ret = -PVFS_EACCES;
+    }
+
+    return ret;
+}
+
+struct PINT_server_req_params pvfs2_batch_remove_params =
+{
+    .string_name = "batch_remove",
+    .perm = perm_batch_remove,
+    .access_type = PINT_server_req_modify,
+    .state_machine = &pvfs2_batch_remove_sm
+};
+
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ precreate-pool-refiller.sm	2009-08-25 13:56:33.000000000 -0400
@@ -0,0 +1,337 @@
+/* 
+ * (C) 2009 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "pvfs2-server.h"
+#include "pint-perf-counter.h"
+#include "server-config.h"
+#include "pint-util.h"
+#include "security-util.h"
+
+static int batch_create_comp_fn(
+    void *v_p, struct PVFS_server_resp *resp_p, int index);
+
+%%
+
+machine pvfs2_precreate_pool_refiller_sm
+{
+        state setup
+        {
+                run setup_fn;
+                success => wait_for_threshold;
+                default => error_retry;
+        }
+
+	state wait_for_threshold 
+	{
+		run wait_for_threshold_fn;
+		success => setup_batch_create;
+		default => error_retry;
+	}
+
+	state setup_batch_create 
+	{
+		run setup_batch_create_fn;
+		success => msgpair_xfer_batch_create;
+		default => error_retry;
+	}
+
+        state msgpair_xfer_batch_create
+        {
+                jump pvfs2_msgpairarray_sm;
+                success => store_handles;
+                default => msgpair_retry;
+        }
+
+        state msgpair_retry
+        {
+                run msgpair_retry_fn;
+                default => setup_batch_create;
+        }
+
+        state store_handles
+        {
+                run store_handles_fn;
+                success => wait_for_threshold;
+                default => error_retry;
+        }
+
+        state error_retry
+        {
+                run error_fn;
+                success => setup;
+                default => terminate;
+        }
+}
+
+%%
+
+/* msgpair_retry_fn()
+ *
+ * handles anything that needs to happen between sets of msgpair retries
+ */
+static PINT_sm_action msgpair_retry_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+
+    /* signal anyone waiting on get_handles() that we are having trouble */
+    return(job_precreate_pool_fill_signal_error(
+        s_op->u.precreate_pool_refiller.pool_handle,
+        s_op->u.precreate_pool_refiller.fsid,
+        js_p->error_code,
+	smcb,
+	0,
+	js_p,
+	&tmp_id,
+	server_job_context));
+}
+
+/* wait_for_threshold_fn()
+ *
+ * waits until the pool count has dropped below a low threshold before
+ * proceeding
+ */
+static PINT_sm_action wait_for_threshold_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+
+    return(job_precreate_pool_check_level(
+        s_op->u.precreate_pool_refiller.pool_handle,
+        s_op->u.precreate_pool_refiller.fsid,
+        user_opts->precreate_low_threshold,
+	smcb,
+	0,
+	js_p,
+	&tmp_id,
+	server_job_context));
+}
+
+ /* store_handles_fn()
+  *
+  * stores a set of precreated handles persistently within precreate pools
+  */
+static PINT_sm_action store_handles_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+
+    return(job_precreate_pool_fill(
+        s_op->u.precreate_pool_refiller.pool_handle,
+        s_op->u.precreate_pool_refiller.fsid,
+        s_op->u.precreate_pool_refiller.precreate_handle_array,
+        user_opts->precreate_batch_size,
+	smcb,
+	0,
+	js_p,
+	&tmp_id,
+	server_job_context,
+        NULL));
+}
+
+
+/* setup_batch_create_fn()
+ *
+ * prepares a req/resp pair to another server to precreate a batch of
+ * handles
+ */
+static PINT_sm_action setup_batch_create_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PINT_sm_msgpair_state *msg_p = NULL;
+    struct server_configuration_s *user_opts = get_server_config_struct();
+
+    gossip_debug(GOSSIP_SERVER_DEBUG, "setting up msgpair to get precreated handles from %s and put store them in %llu.\n", 
+        s_op->u.precreate_pool_refiller.host,
+        llu(s_op->u.precreate_pool_refiller.pool_handle));
+
+    PINT_msgpair_init(&s_op->msgarray_op);
+    msg_p = &s_op->msgarray_op.msgpair;
+
+    /* note: we are acting like a client in this case, so use client timeout
+     * and delay values
+     */
+    s_op->msgarray_op.params.job_timeout = user_opts->client_job_bmi_timeout;
+    s_op->msgarray_op.params.retry_delay = user_opts->client_retry_delay_ms;
+    s_op->msgarray_op.params.retry_limit = user_opts->client_retry_limit;
+    s_op->msgarray_op.params.quiet_flag = 1;
+
+    msg_p->svr_addr = s_op->u.precreate_pool_refiller.host_addr;
+
+    PINT_SERVREQ_BATCH_CREATE_FILL(
+        msg_p->req,
+        s_op->u.precreate_pool_refiller.capability,
+        s_op->u.precreate_pool_refiller.fsid,
+        PVFS_TYPE_DATAFILE,
+        user_opts->precreate_batch_size,
+        s_op->u.precreate_pool_refiller.data_handle_extent_array,
+        NULL);
+
+    msg_p->fs_id = s_op->u.precreate_pool_refiller.fsid;
+    msg_p->handle = s_op->u.precreate_pool_refiller.data_handle_extent_array.extent_array[0].first;
+    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
+    msg_p->comp_fn = batch_create_comp_fn;
+
+    PINT_sm_push_frame(smcb, 0, &s_op->msgarray_op);
+    js_p->error_code = 0;
+    return(SM_ACTION_COMPLETE);
+}
+
+
+/* setup_fn()
+ *
+ * initial state to allocate memory for use through the remainder of the
+ * state machine's life
+ */
+static PINT_sm_action setup_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct server_configuration_s *user_opts = get_server_config_struct();
+    PVFS_capability *cap = &s_op->u.precreate_pool_refiller.capability;
+    int ret;
+
+    /* nlmills: TODO: ensure this is freed in all cases */
+    ret = PINT_init_capability(cap);
+    if (ret < 0)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    cap->fsid = s_op->u.precreate_pool_refiller.fsid;
+    cap->op_mask = PINT_CAP_BATCH_CREATE;
+    cap->num_handles = 0;
+    cap->handle_array = NULL;
+
+    ret = PINT_sign_capability(cap);
+    if (ret < 0)
+    {
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
+        
+    s_op->u.precreate_pool_refiller.precreate_handle_array = 
+        malloc(user_opts->precreate_batch_size * sizeof(PVFS_handle));
+    if(!s_op->u.precreate_pool_refiller.precreate_handle_array)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    js_p->error_code = 0;
+    return(SM_ACTION_COMPLETE);
+}
+
+
+/* error_fn()
+ *
+ * handles error transitions
+ */
+static PINT_sm_action error_fn(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    job_id_t tmp_id;
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    gossip_err("Error: precreate_pool_refiller for %s encountered error.\n",
+        s_op->u.precreate_pool_refiller.host);
+    gossip_err("Error: sleeping for 30 seconds before retrying.\n");
+        
+    if(s_op->u.precreate_pool_refiller.precreate_handle_array)
+    {
+        free(s_op->u.precreate_pool_refiller.precreate_handle_array);
+    }
+
+    PINT_cleanup_capability(&s_op->u.precreate_pool_refiller.capability);
+
+    return(job_req_sched_post_timer(
+        (30*1000),
+	smcb,
+	0,
+	js_p,
+	&tmp_id,
+	server_job_context));
+}
+
+
+/* batch_create_comp_fn()
+ *
+ * msgpair completion function to handle processing batch create response i
+ * from another server
+ */
+static int batch_create_comp_fn(void *v_p,
+                                 struct PVFS_server_resp *resp_p,
+                                 int index)
+{
+    PINT_smcb *smcb = v_p;
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
+    int i;
+    
+    gossip_debug(GOSSIP_SERVER_DEBUG, "batch_create_comp_fn\n");
+
+    assert(resp_p->op == PVFS_SERV_BATCH_CREATE);
+
+    if (resp_p->status != 0)
+    {
+        PVFS_perror_gossip("batch_create request got", resp_p->status);
+	return resp_p->status;
+    }
+
+    for(i = 0; i<resp_p->u.batch_create.handle_count; i++)
+    {
+        s_op->u.precreate_pool_refiller.precreate_handle_array[i] = 
+            resp_p->u.batch_create.handle_array[i];
+
+        gossip_debug(GOSSIP_SERVER_DEBUG,
+            "Got batch created handle: %llu from: %s\n",
+            llu(resp_p->u.batch_create.handle_array[i]),
+            s_op->u.precreate_pool_refiller.host);
+    }
+
+    return 0;
+}
+
+static int perm_precreate_pool_refiller(PINT_server_op *s_op)
+{
+    int ret;
+
+    ret = -PVFS_EINVAL;
+
+    return ret;
+}
+
+struct PINT_server_req_params pvfs2_precreate_pool_refiller_params =
+{
+    .string_name = "precreate_pool_refiller",
+    .perm = perm_precreate_pool_refiller,
+    .state_machine = &pvfs2_precreate_pool_refiller_sm
+};
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */
+

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ unstuff.sm	2009-08-25 13:56:33.000000000 -0400
@@ -0,0 +1,717 @@
+/* 
+ * (C) 2009 Clemson University and The University of Chicago 
+ *
+ * See COPYING in top-level directory.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-attr.h"
+#include "pvfs2-types.h"
+#include "pvfs2-types-debug.h"
+#include "pvfs2-util.h"
+#include "pint-util.h"
+#include "pvfs2-internal.h"
+#include "pint-cached-config.h"
+#include "pint-security.h"
+#include "security-util.h"
+#include "check.h"
+
+enum {
+    STATE_UNSTUFF = 33,
+    STATE_CAPABILITY
+};
+
+%%
+
+machine pvfs2_unstuff_sm
+{
+    state prelude
+    {
+        jump pvfs2_prelude_sm;
+        success => getattr_setup;
+        default => final_response;
+    }
+
+    state getattr_setup
+    {
+        run getattr_setup;
+        success => getattr_do_work;
+        default => final_response;
+    }
+
+    state getattr_do_work
+    {
+        jump pvfs2_get_attr_work_sm;
+        default => getattr_interpret;
+    }
+
+    state getattr_interpret
+    {
+        run getattr_interpret;
+        STATE_UNSTUFF => get_keyvals;
+        default => final_response;
+    }
+
+    state get_keyvals
+    {
+        run get_keyvals;
+        success => inspect_keyvals;
+        default => final_response;
+    }
+
+    state inspect_keyvals
+    {
+        run inspect_keyvals;
+        success => get_handles;
+        default => final_response;
+    }
+
+    state get_handles
+    {
+        run get_handles;
+        default => set_handles_on_object;
+    }
+
+    state set_handles_on_object
+    {
+        run set_handles_on_object;
+        success => update_dfile_count;
+        default => final_response;
+    }
+
+    state update_dfile_count
+    {
+        run update_dfile_count;
+        success => remove_layout;
+        default => final_response;
+    }
+
+    state remove_layout
+    {
+        run remove_layout;
+        default => check_if_capability_required;
+    }
+
+    state check_if_capability_required
+    {
+        run check_if_capability_required;
+        STATE_CAPABILITY => read_acl;
+        default => final_response;
+    }
+
+    state read_acl
+    {
+        run read_acl;
+        default => create_capability;
+    }
+
+    state create_capability
+    {
+        run create_capability;
+        default => final_response;
+    }
+
+    state final_response
+    {
+        jump pvfs2_final_response_sm;
+        default => cleanup;
+    }
+
+    state cleanup
+    {
+        run cleanup;
+        default => terminate;
+    }
+}
+
+%%
+
+static PINT_sm_action get_keyvals(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int kind = 0;
+    int ret;
+    job_id_t job_id;
+
+    s_op->keyval_count = 2;
+    s_op->key_a = malloc(sizeof(*s_op->key_a) * s_op->keyval_count);
+    if(!s_op->key_a)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        goto error_exit;
+    }
+
+    s_op->val_a = malloc(sizeof(*s_op->val_a) * s_op->keyval_count);
+    if(!s_op->val_a)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        goto free_key_array;
+    }
+
+    s_op->error_a = malloc(sizeof(*s_op->error_a) * s_op->keyval_count);
+    if(!s_op->error_a)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        goto free_val_array;
+    }
+    memset(s_op->error_a, 0, sizeof(*s_op->error_a) * s_op->keyval_count);
+
+    s_op->u.unstuff.encoded_layout = malloc(PVFS_REQ_LIMIT_LAYOUT);
+    if(!s_op->u.unstuff.encoded_layout)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        goto free_encoded_layout;
+    }
+    memset(s_op->u.unstuff.encoded_layout, 0, PVFS_REQ_LIMIT_LAYOUT);
+
+    /* kind = 0 */
+    s_op->key_a[kind].buffer = Trove_Common_Keys[METAFILE_LAYOUT_KEY].key;
+    s_op->key_a[kind].buffer_sz = Trove_Common_Keys[METAFILE_LAYOUT_KEY].size;
+
+    s_op->val_a[kind].buffer = s_op->u.unstuff.encoded_layout;
+    s_op->val_a[kind].buffer_sz = PVFS_REQ_LIMIT_LAYOUT;
+
+    ++kind;
+    /* kind = 1 */
+    s_op->key_a[kind].buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
+    s_op->key_a[kind].buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
+
+    s_op->val_a[kind].buffer = &s_op->u.unstuff.num_dfiles_req;
+    s_op->val_a[kind].buffer_sz = sizeof(s_op->u.unstuff.num_dfiles_req);
+
+    ret = job_trove_keyval_read_list(
+        s_op->req->u.unstuff.fs_id,
+        s_op->req->u.unstuff.handle,
+        s_op->key_a, s_op->val_a, s_op->error_a, s_op->keyval_count,
+        0, NULL, smcb, 0, js_p, &job_id, server_job_context,
+        s_op->req->hints);
+    return ret;
+
+free_encoded_layout:
+    free(s_op->u.unstuff.encoded_layout);
+    s_op->u.unstuff.encoded_layout = NULL;
+free_val_array:
+    free(s_op->val_a);
+    s_op->val_a = NULL;
+free_key_array:
+    free(s_op->key_a);
+    s_op->key_a = NULL;
+error_exit:
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action inspect_keyvals(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    char* tmpbuf;
+
+    if(js_p->error_code == 0)
+    {
+        /* check keys; we have a big problem if one of them is missing */
+        if(s_op->error_a[0])
+        {
+            js_p->error_code = s_op->error_a[0];
+        }
+        else if(s_op->error_a[1])
+        {
+            js_p->error_code = s_op->error_a[1];
+        }
+
+        if(js_p->error_code == 0)
+        {
+        
+            /* sanity check num dfiles */
+            if(s_op->u.unstuff.num_dfiles_req < 1)
+            {
+                js_p->error_code = -PVFS_EINVAL;
+            }
+
+            /* decode layout information */
+            tmpbuf = s_op->u.unstuff.encoded_layout;
+            decode_PVFS_sys_layout(&tmpbuf, &s_op->u.unstuff.layout);
+        }
+    }
+
+    /* pass along error code for next state to handle */
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action get_handles(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    int ret;
+    job_id_t j_id;
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    if(s_op->u.unstuff.layout.algorithm != PVFS_SYS_LAYOUT_ROUND_ROBIN)
+    {
+        /* see create.sm; for now the only layout that we use stuffing on is
+         * ROUND_ROBIN.  The storage format supports other layouts if we
+         * want to add support for others later
+         */
+        gossip_err("Error: unstuff doesn't support layout algorithm: %d\n", 
+            s_op->u.unstuff.layout.algorithm);
+        js_p->error_code = -PVFS_ENOSYS;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* allocate room for final number of handles we want */
+    s_op->u.unstuff.dfile_array = 
+        malloc(s_op->u.unstuff.num_dfiles_req * sizeof(PVFS_handle));
+    if(!s_op->u.unstuff.dfile_array)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* the very first handle should be our current stuffed handle */
+    s_op->u.unstuff.dfile_array[0] 
+        = s_op->resp.u.unstuff.attr.u.meta.dfile_array[0];
+
+    if(s_op->u.unstuff.num_dfiles_req == 1)
+    {
+        /* special case; we are unstuffing to 1 datafile.  There is no need
+         * to retrieve any additional handles
+         */
+        js_p->error_code = 0;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* NOTE: we are leaving the existing resp attr structure alone until we
+     * get a successful answer from get_handles() and commit to disk
+     */
+    ret = job_precreate_pool_get_handles(
+        s_op->req->u.unstuff.fs_id,
+        (s_op->u.unstuff.num_dfiles_req-1),
+        NULL,
+        &s_op->u.unstuff.dfile_array[1],
+        0,
+        smcb,
+        0,
+        js_p,
+        &j_id,
+        server_job_context,
+        s_op->req->hints);
+    return ret;
+}
+
+static PINT_sm_action set_handles_on_object(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t j_id;
+
+    gossip_debug(GOSSIP_SERVER_DEBUG, "job_precreate_pool_get_handles() returned %d\n", js_p->error_code);
+
+    if(js_p->error_code < 0)
+    {
+        /* we failed to retrieve any handles */
+        if(s_op->u.unstuff.dfile_array)
+        {
+            free(s_op->u.unstuff.dfile_array);
+            /* preserve error code */
+            return SM_ACTION_COMPLETE;
+        }
+    }
+
+    /* replace dfile information in attr structure */
+    free(s_op->resp.u.unstuff.attr.u.meta.dfile_array);
+    s_op->resp.u.unstuff.attr.u.meta.dfile_array 
+        = s_op->u.unstuff.dfile_array;
+    s_op->resp.u.unstuff.attr.u.meta.dfile_count 
+        = s_op->u.unstuff.num_dfiles_req;
+
+    /* write new datafile handles to disk */
+    s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
+    s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
+
+    s_op->val.buffer = 
+        s_op->resp.u.unstuff.attr.u.meta.dfile_array; 
+    s_op->val.buffer_sz =
+        s_op->resp.u.unstuff.attr.u.meta.dfile_count * sizeof(PVFS_handle);
+
+    return job_trove_keyval_write(
+        s_op->req->u.unstuff.fs_id,
+        s_op->req->u.unstuff.handle,
+        &s_op->key,
+        &s_op->val,
+        0, NULL, smcb, 0, js_p, &j_id, server_job_context,
+        s_op->req->hints);
+}
+
+static PINT_sm_action update_dfile_count(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t j_id;
+
+    /* take the current on-disk attribute in object_attr form
+     * (acquired from prelude) with the modified datafile array,
+     * and convert it to the dspace form for writing.
+     */
+    PVFS_object_attr_to_ds_attr(&s_op->resp.u.unstuff.attr, &s_op->ds_attr);
+
+    return job_trove_dspace_setattr(
+        s_op->req->u.unstuff.fs_id, s_op->req->u.unstuff.handle,
+        &s_op->ds_attr,
+        TROVE_SYNC,
+        smcb, 0, js_p, &j_id, server_job_context,
+        s_op->req->hints);
+}
+
+static PINT_sm_action remove_layout(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t j_id;
+
+    /* remove the layout and num_dfiles_req keyvals as the layout has
+     * now been chosen.
+     */
+    return job_trove_keyval_remove_list(
+        s_op->req->u.unstuff.fs_id, s_op->req->u.unstuff.handle,
+        s_op->key_a,
+        s_op->val_a,
+        s_op->error_a,
+        2,
+        TROVE_SYNC, NULL,
+        smcb, 0, js_p, &j_id, server_job_context,
+        s_op->req->hints);
+}
+
+/* nlmills: TODO: replace all this messy capability code with
+ * a call to the getattr nested state machine
+ */
+
+/* check_if_capability_required
+ *
+ * Branch point for capability creation.
+ */
+static PINT_sm_action check_if_capability_required(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    /* errors fall through */
+    if (js_p->error_code < 0)
+    {
+        return SM_ACTION_COMPLETE;
+    }
+
+    if (s_op->resp.u.unstuff.attr.mask & PVFS_ATTR_CAPABILITY)
+    {
+        /* get rid of the old capability */
+        PINT_cleanup_capability(&s_op->resp.u.unstuff.attr.capability);
+        js_p->error_code = STATE_CAPABILITY;
+    }
+    else
+    {
+        js_p->error_code = 0;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+/* read_acl
+ * 
+ * Reads any ACL data from Trove if a capability is requested.
+ */
+static PINT_sm_action read_acl(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+    int ret;
+
+    gossip_debug(GOSSIP_SERVER_DEBUG,
+                 "(%p) %s (unstuff sm) state: read_acl\n",
+                 s_op, PINT_map_server_op_to_string(s_op->req->op));
+
+    assert(s_op->resp.u.unstuff.attr.mask & PVFS_ATTR_CAPABILITY);
+
+    /* nlmills: TODO: make key a constant */
+    s_op->key.buffer = "system.posix_acl_access";
+    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
+    if(s_op->free_val)
+    {
+        free(s_op->val.buffer);
+        s_op->free_val = 0;
+    }
+    s_op->val.buffer = malloc(PVFS_REQ_LIMIT_VAL_LEN);
+    if (!s_op->val.buffer)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
+    s_op->free_val = 1;
+    
+    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
+                     "for handle %llu\n", llu(s_op->target_handle));
+
+    ret = job_trove_keyval_read(
+        s_op->req->u.unstuff.fs_id,
+        s_op->req->u.unstuff.handle,
+        &s_op->key,
+        &s_op->val,
+        0,
+        NULL,
+        smcb,
+        0,
+        js_p,
+        &tmp_id,
+        server_job_context,
+        s_op->req->hints);
+
+    return ret;
+}
+
+/* create_capability
+ * 
+ * nlmills: TODO: document me
+ */
+static PINT_sm_action create_capability(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PVFS_object_attr *resp_attr = &s_op->resp.u.unstuff.attr;
+    void *acl_buf;
+    size_t acl_size;
+    int ret;
+
+    if (js_p->error_code == -TROVE_ENOENT)
+    {
+        acl_buf = NULL;
+        acl_size = 0;
+        js_p->error_code = 0;
+    }
+    else if (js_p->error_code < 0)
+    {
+        /* let fatal errors fall through */
+        return SM_ACTION_COMPLETE;
+    }
+    else
+    {
+        acl_buf = s_op->val.buffer;
+        acl_size = s_op->val.read_sz;
+    }
+
+    ret = PINT_init_capability(&resp_attr->capability);
+    if (ret < 0)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG,
+                     "unstuff: capability init failed\n");
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+
+    resp_attr->capability.num_handles = 1;
+
+    if ((resp_attr->objtype == PVFS_TYPE_METAFILE) &&
+        (resp_attr->mask & PVFS_ATTR_META_DFILES))
+    {
+        resp_attr->capability.num_handles += resp_attr->u.meta.dfile_count;
+    }
+
+    resp_attr->capability.handle_array = 
+        calloc(resp_attr->capability.num_handles, sizeof(PVFS_handle));
+    if (!resp_attr->capability.handle_array)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    resp_attr->capability.handle_array[0] = s_op->req->u.unstuff.handle;
+    if ((resp_attr->objtype == PVFS_TYPE_METAFILE) &&
+        (resp_attr->mask & PVFS_ATTR_META_DFILES))
+    {
+        /* copy datafile handles after the metafile handle */
+        memcpy((resp_attr->capability.handle_array + 1),
+               resp_attr->u.meta.dfile_array,
+               resp_attr->u.meta.dfile_count * sizeof(PVFS_handle));
+    }
+
+    ret = PINT_get_capabilities(acl_buf,
+                                acl_size,
+                                s_op->req->u.unstuff.credential.userid,
+                                s_op->req->u.unstuff.credential.group_array,
+                                s_op->req->u.unstuff.credential.num_groups,
+                                resp_attr,
+                                &resp_attr->capability.op_mask);
+    if (ret < 0)
+    {
+        PINT_cleanup_capability(&resp_attr->capability);
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+    
+    resp_attr->capability.fsid = s_op->req->u.unstuff.fs_id;
+
+    ret = PINT_sign_capability(&resp_attr->capability);
+    if (ret < 0)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG,
+                     "unstuff: failed to sign capability\n");
+        PINT_cleanup_capability(&resp_attr->capability);
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
+                                 
+    resp_attr->mask |= PVFS_ATTR_CAPABILITY;
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action cleanup(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    if(s_op->u.unstuff.layout.server_list.servers)
+    {
+        free(s_op->u.unstuff.layout.server_list.servers);
+    }
+    if(s_op->u.unstuff.encoded_layout)
+    {
+        free(s_op->u.unstuff.encoded_layout);
+    }
+    if(s_op->val_a)
+    {
+        free(s_op->val_a);
+    }
+    if(s_op->key_a)
+    {
+        free(s_op->key_a);
+    }
+    if(s_op->error_a)
+    {
+        free(s_op->error_a);
+    }
+
+    PINT_free_object_attr(&s_op->resp.u.getattr.attr);
+    return (server_state_machine_complete(smcb));
+}
+
+static PINT_sm_action getattr_setup(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    struct PINT_server_op *getattr_op;
+    int ret;
+
+    js_p->error_code = 0;
+
+    getattr_op = malloc(sizeof(*getattr_op));
+    if(!getattr_op)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    memset(getattr_op, 0, sizeof(*getattr_op));
+
+    /* TODO: can we come up with a way to clean up and nail down what has 
+     * to be set in order to run this nested machine?  This seems fragile.
+     */
+
+    /* need attrs that the prelude read already */
+    getattr_op->attr = s_op->attr;
+    /* need a valid request structure for some generic features like access
+     * logging 
+     */
+    getattr_op->req = s_op->req;
+    /* need to fill in the input parameters to the getattr nested machine */
+    getattr_op->u.getattr.fs_id = s_op->req->u.unstuff.fs_id;
+    getattr_op->u.getattr.handle = s_op->req->u.unstuff.handle;
+    getattr_op->u.getattr.attrmask = s_op->req->u.unstuff.attrmask;
+
+    ret = PINT_sm_push_frame(smcb, 0, getattr_op);
+    if(ret < 0)
+    {
+        js_p->error_code = ret;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action getattr_interpret(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *getattr_op;
+    struct PINT_server_op *s_op;
+    int task_id;
+    int remaining;
+
+    getattr_op = PINT_sm_pop_frame(smcb, &task_id, &js_p->error_code, 
+        &remaining);
+    s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    s_op->resp.u.unstuff.attr = getattr_op->resp.u.getattr.attr;
+
+    free(getattr_op);
+
+    if(js_p->error_code)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG,
+            "unstuff failed to retrieve existing attrs.\n");
+        return(SM_ACTION_COMPLETE);
+    }
+
+    if(s_op->resp.u.unstuff.attr.mask & PVFS_ATTR_META_UNSTUFFED)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG,
+            "unstuff found file already unstuffed; return existing attrs.\n");
+        js_p->error_code = 0;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    gossip_debug(GOSSIP_SERVER_DEBUG,
+        "unstuff found stuffed file.\n");
+    js_p->error_code = STATE_UNSTUFF;
+    return SM_ACTION_COMPLETE;
+}
+
+static int perm_unstuff(PINT_server_op *s_op)
+{
+    int ret;
+
+    /* nlmills: TODO: probably just need write permission */
+    if (s_op->req->capability.op_mask & PINT_CAP_SETATTR)
+    {
+        ret = 0;
+    }
+    else
+    {
+        ret = -PVFS_EACCES;
+    }
+
+    return ret;
+}
+
+PINT_GET_OBJECT_REF_DEFINE(unstuff);
+
+struct PINT_server_req_params pvfs2_unstuff_params =
+{
+    .string_name = "unstuff",
+    .perm = perm_unstuff,
+    .access_type = PINT_server_req_modify,
+    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
+    .get_object_ref = PINT_get_object_ref_unstuff,
+    .state_machine = &pvfs2_unstuff_sm
+};
+
+/*
+ * Local variables:
+ *  mode: c
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ft=c ts=8 sts=4 sw=4 expandtab
+ */
+

Index: chdirent.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/chdirent.sm,v
diff -p -u -r1.19.2.4 -r1.19.2.5
--- chdirent.sm	29 Jul 2008 22:29:38 -0000	1.19.2.4
+++ chdirent.sm	25 Aug 2009 17:56:25 -0000	1.19.2.5
@@ -133,7 +133,7 @@ static PINT_sm_action chdirent_verify_pa
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -166,7 +166,7 @@ static PINT_sm_action chdirent_read_dire
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -223,7 +223,7 @@ static PINT_sm_action chdirent_change_di
         &s_op->key, &s_op->val, 
         TROVE_SYNC |
         0,
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     s_op->u.chdirent.dir_attr_update_required = 1;
     return ret;
@@ -271,7 +271,7 @@ static PINT_sm_action chdirent_update_di
         ds_attr, 
         TROVE_SYNC |
         0,
-        smcb, 0, js_p, &j_id, server_job_context);
+        smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: check.c
===================================================================
RCS file: /anoncvs/pvfs2/src/server/Attic/check.c,v
diff -p -u -r1.1.8.19 -r1.1.8.20
--- check.c	5 Aug 2008 20:01:54 -0000	1.1.8.19
+++ check.c	25 Aug 2009 17:56:25 -0000	1.1.8.20
@@ -14,432 +14,289 @@
  */
 #include <string.h>
 #include <assert.h>
-#include <pwd.h>
-#include <grp.h>
 
 #include "pvfs2-debug.h"
 #include "pvfs2-server.h"
 #include "pvfs2-attr.h"
 #include "server-config.h"
-#include "src/server/request-scheduler/request-scheduler.h"
 #include "trove.h"
 #include "pint-util.h"
 #include "pvfs2-internal.h"
 #include "pint-perf-counter.h"
-#include "gen-locks.h"
 #include "gossip.h"
 #include "bmi-byteswap.h"
 #include "check.h"
 #include "security-util.h"
 
-enum {
-    PRELUDE_RUN_ACL_CHECKS = 1,
+enum access_type
+{
+    READ_ACCESS,
+    WRITE_ACCESS,
+    EXEC_ACCESS
 };
 
-static gen_mutex_t check_group_mutex = GEN_MUTEX_INITIALIZER;
-static char* check_group_pw_buffer = NULL;
-static long check_group_pw_buffer_size = 0;
-static char* check_group_gr_buffer = NULL;
-static long check_group_gr_buffer_size = 0;
-static int PINT_check_group(uid_t uid, gid_t gid);
+static int check_mode(enum access_type access, PVFS_uid userid,
+    PVFS_gid group, const PVFS_object_attr *attr);
+static int check_acls(void *acl_buf, size_t acl_size, 
+    const PVFS_object_attr *attr, PVFS_uid uid, PVFS_gid *group_array, 
+    uint32_t num_groups, int want);
 static int iterate_ro_wildcards(struct filesystem_configuration_s *fsconfig, 
     PVFS_BMI_addr_t client_addr);
-static int iterate_root_squash_wildcards(struct filesystem_configuration_s *fsconfig,
-    PVFS_BMI_addr_t client_addr);
-static int iterate_all_squash_wildcards(struct filesystem_configuration_s *fsconfig,
-    PVFS_BMI_addr_t client_addr);
-static int translate_ids(PVFS_fs_id fsid, PVFS_uid uid, PVFS_gid gid, 
-    PVFS_uid *translated_uid, PVFS_gid *translated_gid, 
-    PVFS_BMI_addr_t client_addr);
-static void get_anon_ids(struct filesystem_configuration_s *fsconfig,
-    PVFS_uid *uid, PVFS_gid *gid);
 static int permit_operation(PVFS_fs_id fsid,
     enum PINT_server_req_access_type access_type, PVFS_BMI_addr_t client_addr);
 
-/* PINT_check_mode()
- *
- * checks to see if the type of access described by "access_type" is permitted 
- * for user "uid" of group "gid" on the object with attributes "attr"
- *
- * returns 0 on success, -PVFS_EACCES if permission is not granted
- */
-int PINT_check_mode(
-    PVFS_object_attr *attr,
-    PVFS_uid uid, PVFS_gid gid,
-    enum PINT_access_type access_type)
-{
-    int in_group_flag = 0;
-    int ret = 0;
-
-    /* if we don't have masks for the permission information that we
-     * need, then the system is broken
-     */
-    assert(attr->mask & PVFS_ATTR_COMMON_UID &&
-           attr->mask & PVFS_ATTR_COMMON_GID &&
-           attr->mask & PVFS_ATTR_COMMON_PERM);
-
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - check_mode called --- "
-                 "(uid=%d,gid=%d,access_type=%d)\n", uid, gid, access_type);
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - object attributes --- "
-                 "(uid=%d,gid=%d,mode=%d)\n", attr->owner, attr->group,
-                 attr->perms);
-
-    /* give root permission, no matter what */
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG,
-                 " - checking if uid (%d) is root ...\n", uid);
-    if (uid == 0)
-    {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        return 0;
-    }
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-
-    /* see if uid matches object owner */
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking if owner (%d) "
-        "matches uid (%d)...\n", attr->owner, uid);
-    if(attr->owner == uid)
-    {
-        /* see if object user permissions match access type */
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking if permissions "
-            "(%d) allows access type (%d) for user...\n", attr->perms, access_type);
-        if(access_type == PINT_ACCESS_READABLE && (attr->perms &
-            PVFS_U_READ))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        if(access_type == PINT_ACCESS_WRITABLE && (attr->perms &
-            PVFS_U_WRITE))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        if(access_type == PINT_ACCESS_EXECUTABLE && (attr->perms &
-            PVFS_U_EXECUTE))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-    }
-    else
-    {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-    }
-
-    /* see if other bits allow access */
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking if permissions "
-        "(%d) allows access type (%d) by others...\n", attr->perms, access_type);
-    if(access_type == PINT_ACCESS_READABLE && (attr->perms &
-        PVFS_O_READ))
-    {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        return(0);
-    }
-    if(access_type == PINT_ACCESS_WRITABLE && (attr->perms &
-        PVFS_O_WRITE))
-    {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        return(0);
-    }
-    if(access_type == PINT_ACCESS_EXECUTABLE && (attr->perms &
-        PVFS_O_EXECUTE))
-    {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        return(0);
-    }
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-
-    /* see if gid matches object group */
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking if group (%d) "
-        "matches gid (%d)...\n", attr->group, gid);
-    if(attr->group == gid)
-    {
-        /* default group match */
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-        in_group_flag = 1;
-    }
-    else
-    {
-        /* no default group match, check supplementary groups */
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking for"
-            " supplementary group match...\n");
-        ret = PINT_check_group(uid, attr->group);
-        if(ret == 0)
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            in_group_flag = 1;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-            if(ret != -PVFS_ENOENT)
-            {
-                /* system error; not just failed match */
-                return(ret);
-            }
-        }
-    }
 
-    if(in_group_flag)
-    {
-        /* see if object group permissions match access type */
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - checking if permissions "
-            "(%d) allows access type (%d) for group...\n", attr->perms, access_type);
-        if(access_type == PINT_ACCESS_READABLE && (attr->perms &
-            PVFS_G_READ))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        if(access_type == PINT_ACCESS_WRITABLE && (attr->perms &
-            PVFS_G_WRITE))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        if(access_type == PINT_ACCESS_EXECUTABLE && (attr->perms &
-            PVFS_G_EXECUTE))
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - yes\n");
-            return(0);
-        }
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, " - no\n");
-    }
-  
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "******PINT_check_mode: denying access\n");
-    /* default case: access denied */
-    return -PVFS_EACCES;
-}
-
-
-/* PINT_getattr_check_perms()
- *
- * fills in "op_mask" for user "uid" of groups in "gid" on
- * the object with attributes "attr"
- * ACL has priority over standard file permissions.  Standard permissions
- * only used if ACL is not present or has no entry
+/* PINT_get_capabilities
  *
+ * nlmills: TODO: document me
+ * nlmills: TODO: add to header
  */
-void PINT_getattr_check_perms(struct PINT_smcb *smcb, PVFS_uid uid, PVFS_gid *gid, uint32_t num_groups, 
-               PVFS_object_attr attr, uint32_t *op_mask)
+int PINT_get_capabilities(void *acl_buf, 
+                          size_t acl_size, 
+                          PVFS_uid userid,
+                          PVFS_gid *group_array, 
+                          uint32_t num_groups,
+                          const PVFS_object_attr *attr, 
+                          uint32_t *op_mask)
 {
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PVFS_gid active_group;
+    int ret;
     int i;
-    int acl_error_code = 0;
 
     *op_mask = 0;
     
     /* root has every possible capability */
-    if (uid == 0)
+    if (userid == 0)
     {
-        *op_mask = ~0L;
-        return;
+        *op_mask = ~((uint32_t)0);
+        return 0;
     }
-    
-    /* do ACL checks here...kinda slow to do it this way, but works for now */
-    /* TODO:  Rewrite ACL checks entirely to handle multiple groups */
-    for (i = 0; i < num_groups; i++)
-    {
-        acl_error_code = PINT_check_acls(s_op->val.buffer, s_op->val.read_sz,
-                           &attr, uid, gid[i], PVFS2_ACL_READ);
-        if (!acl_error_code)
+
+    /* if acls are present then use them */
+    if (acl_size < 0)
+    {
+        assert(acl_buf);
+
+        /* nlmills: errors are ignored on purpose. we don't want to
+           give anyone free access.
+        */
+
+        ret = check_acls(acl_buf, acl_size, attr, userid, 
+                         group_array, num_groups, PVFS2_ACL_READ);
+        if (!ret)
         {
             *op_mask |= PINT_CAP_READ;
         }
-        else if (acl_error_code == -PVFS_EIO)
-        {
-            break;
-        }
-                           
-        acl_error_code = PINT_check_acls(s_op->val.buffer, s_op->val.read_sz,
-                           &attr, uid, gid[i], PVFS2_ACL_WRITE);
-        if (!acl_error_code)
+
+        ret = check_acls(acl_buf, acl_size, attr, userid,
+                         group_array, num_groups, PVFS2_ACL_WRITE);
+        if (!ret)
         {
             *op_mask |= PINT_CAP_WRITE;
         }
-                           
-        acl_error_code = PINT_check_acls(s_op->val.buffer, s_op->val.read_sz,
-                           &attr, uid, gid[i], PVFS2_ACL_EXECUTE);
-        if (!acl_error_code)
+
+        ret = check_acls(acl_buf, acl_size, attr, userid,
+                         group_array, num_groups, PVFS2_ACL_EXECUTE);
+        if (!ret)
         {
             *op_mask |= PINT_CAP_EXEC;
         }
     }
-    
-    /* only check standard permissions if ACL is not in place */
-    if (acl_error_code == -PVFS_EIO)
+    /* otherwise fall back to standard UNIX permissions */
+    else
     {
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "Getattr perm check: no ACL\n");
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "Using standard file perms\n");
+        gossip_debug(GOSSIP_PERMISSIONS_DEBUG,
+                     "PINT_get_capabilities: ACL unavailable, "
+                     "using UNIX permissions\n");
+        
+        assert(num_groups > 0);
+        assert(attr->mask & PVFS_ATTR_COMMON_GID);
+        
+        /* see if the user is a member of the object's group */
+        active_group = group_array[0];
         for (i = 0; i < num_groups; i++)
         {
-            if (attr.group == gid[i]) break;
-        }
-        /* reset to prevent overflow */
-        if (i == num_groups)
-        {
-            i = 0;
+            if (group_array[i] == attr->group)
+            {
+                active_group = group_array[i];
+                break;
+            }
         }
-    
-        if (!PINT_check_mode(&attr, uid, gid[i], PINT_ACCESS_READABLE))
+
+        if (check_mode(READ_ACCESS, userid, active_group, attr))
         {
             *op_mask |= PINT_CAP_READ;
         }
-        
-        if (!PINT_check_mode(&attr, uid, gid[i], PINT_ACCESS_WRITABLE))
+
+        if (check_mode(WRITE_ACCESS, userid, active_group, attr))
         {
             *op_mask |= PINT_CAP_WRITE;
         }
-        
-        if (!PINT_check_mode(&attr, uid, gid[i], PINT_ACCESS_EXECUTABLE))
+
+        if (check_mode(EXEC_ACCESS, userid, active_group, attr))
         {
             *op_mask |= PINT_CAP_EXEC;
         }
-    }   
-    
-    /* give setattr and remove/create caps based on uid and op_mask */
-    if (uid == attr.owner)
+    }
+
+    /* only the owner can set attributes */
+    if (userid == attr->owner)
     {
         *op_mask |= PINT_CAP_SETATTR;
     }
-    
-    /* write access to directories allows create and remove */
-    if (attr.objtype == PVFS_TYPE_DIRECTORY 
-        && *op_mask & PINT_ACCESS_WRITABLE)
-    {
-        *op_mask |= PINT_CAP_REMOVE | PINT_CAP_CREATE;
+
+    /* write and exec access to directories allows create and remove */
+    if (attr->objtype == PVFS_TYPE_DIRECTORY &&
+        *op_mask & PINT_CAP_WRITE &&
+        *op_mask & PINT_CAP_EXEC)
+    {
+        *op_mask |= PINT_CAP_CREATE | PINT_CAP_REMOVE;
     }
-    
-    /* If metafile is not initialized allow setattr to complete */
-    if (attr.u.meta.dfile_count == 0 && attr.u.meta.dist == 0 
-        && attr.owner == 0)
+
+    /* nlmills: TODO: replace this hack */
+    /* if metafile is not initialized allow setattr to complete */
+    if (attr->u.meta.dfile_count == 0 && attr->u.meta.dist == 0 
+        && attr->owner == 0)
     {
         *op_mask |= PINT_CAP_SETATTR;
     }
-}
 
+    return 0;
+}
 
-/* PINT_check_group()
- *
- * checks to see if uid is a member of gid
- * 
- * returns 0 on success, -PVFS_ENOENT if not a member, other PVFS error codes
- * on system failure
- */
-static int PINT_check_group(uid_t uid, gid_t gid)
+/* nlmills: TODO: document me */
+int PINT_perm_check(struct PINT_server_op *s_op)
 {
-    struct passwd pwd;
-    struct passwd* pwd_p = NULL;
-    struct group grp;
-    struct group* grp_p = NULL;
-    int i = 0;
-    int ret = -1;
-
-    /* Explanation: 
-     *
-     * We use the _r variants of getpwuid and getgrgid in order to insure
-     * thread safety; particularly if this function ever gets called in a
-     * client side situation in which we can't prevent the application from
-     * making conflicting calls.
-     *
-     * These _r functions require that a buffer be supplied for the user and
-     * group information, however.  These buffers may be unconfortably large
-     * for the stack, so we malloc them on a static pointer and then mutex
-     * lock this function so that it can still be reentrant.
-     */
+    PVFS_capability *cap = &s_op->req->capability;
+    PINT_server_req_perm_fun perm_fun;
+    int ret = -PVFS_EINVAL;
 
-    gen_mutex_lock(&check_group_mutex);
+    if (s_op->target_fs_id != PVFS_FS_ID_NULL)
+    {
+        /*
+         * if we are exporting a volume readonly, disallow any operation 
+         * that modifies the state of the file-system.
+         */
+        ret = permit_operation(s_op->target_fs_id, s_op->access_type,
+                               s_op->addr);
+        if (ret < 0)
+        {
+            return ret;
+        }
+    }
 
-    if(!check_group_pw_buffer)
+    if (s_op->target_handle != PVFS_HANDLE_NULL && 
+        !PINT_capability_is_null(cap))
     {
-        /* need to create a buffer for pw and grp entries */
-#if defined(_SC_GETGR_R_SIZE_MAX) && defined(_SC_GETPW_R_SIZE_MAX)
-        /* newish posix systems can tell us what the max buffer size is */
-        check_group_gr_buffer_size = sysconf(_SC_GETGR_R_SIZE_MAX);
-        check_group_pw_buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX);
-#else
-        /* fall back for older systems */
-        check_group_pw_buffer_size = 1024;
-        check_group_gr_buffer_size = 1024;
-#endif
-        check_group_pw_buffer = (char*)malloc(check_group_pw_buffer_size);
-        check_group_gr_buffer = (char*)malloc(check_group_gr_buffer_size);
-        if(!check_group_pw_buffer || !check_group_gr_buffer)
+        int index;
+
+        /* ensure we have a capability for the target handle */
+        for (index = 0; index < cap->num_handles; index++)
         {
-            if(check_group_pw_buffer)
-            {
-                free(check_group_pw_buffer);
-                check_group_pw_buffer = NULL;
-            }
-            if(check_group_gr_buffer)
+            if (cap->handle_array[index] == s_op->target_handle)
             {
-                free(check_group_gr_buffer);
-                check_group_gr_buffer = NULL;
+                break;
             }
-            gen_mutex_unlock(&check_group_mutex);
-            return(-PVFS_ENOMEM);
+        }
+        if (index == cap->num_handles)
+        {
+            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "Attempted to perform "
+                         "an operation on target handle %llu that was "
+                         "not in the capability\n", llu(s_op->target_handle));
+            return -PVFS_EACCES;
         }
     }
 
-    /* get user information */
-    ret = getpwuid_r(uid, &pwd, check_group_pw_buffer,
-        check_group_pw_buffer_size,
-        &pwd_p);
-    if(ret != 0 || pwd_p == NULL)
+    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "PVFS operation \"%s\" got "
+                 "attr mask %d\n\t(capability mask = %d)\n",
+                 PINT_map_server_op_to_string(s_op->req->op),
+                 s_op->attr.mask, cap->op_mask);
+
+    perm_fun = PINT_server_req_get_perm_fun(s_op->req);
+    if (perm_fun)
     {
-        gen_mutex_unlock(&check_group_mutex);
-        return(-PVFS_EINVAL);
+        ret = perm_fun(s_op);
     }
-
-    /* check primary group */
-    if(pwd.pw_gid == gid)
+    else
     {
-        gen_mutex_unlock(&check_group_mutex);
-        return 0;
+        ret = -PVFS_EINVAL;
     }
 
-    /* get other group information */
-    ret = getgrgid_r(gid, &grp, check_group_gr_buffer,
-        check_group_gr_buffer_size,
-        &grp_p);
-    if(ret != 0)
+    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, 
+                 "Final permission check for \"%s\" set error code to %d\n", 
+                 PINT_map_server_op_to_string(s_op->req->op),
+                 ret);
+
+    return ret;
+}
+
+static int check_mode(enum access_type access, 
+               PVFS_uid userid, 
+               PVFS_gid group,
+               const PVFS_object_attr *attr)
+{
+    int user_other_access = 0;
+    int group_access = 0;
+    int mask;
+    
+    assert(attr->mask & PVFS_ATTR_COMMON_UID &&
+           attr->mask & PVFS_ATTR_COMMON_GID &&
+           attr->mask & PVFS_ATTR_COMMON_PERM);
+
+    mask = 0;
+    if (attr->owner == userid)
     {
-        gen_mutex_unlock(&check_group_mutex);
-        return(-PVFS_EINVAL);
+        if (access == READ_ACCESS)
+        {
+            mask = PVFS_U_READ;
+        }
+        else if (access == WRITE_ACCESS)
+        {
+            mask = PVFS_U_WRITE;
+        }
+        else if (access == EXEC_ACCESS)
+        {
+            mask = PVFS_U_EXECUTE;
+        }
     }
-
-    if(grp_p == NULL)
-    { 
-	gen_mutex_unlock(&check_group_mutex);
-	gossip_err("User (uid=%d) isn't in group %d on storage node.\n",
-		   uid, gid);
-        return(-PVFS_EACCES);
+    else
+    {
+        if (access == READ_ACCESS)
+        {
+            mask = PVFS_O_READ;
+        }
+        else if (access == WRITE_ACCESS)
+        {
+            mask = PVFS_O_WRITE;
+        }
+        else if (access == EXEC_ACCESS)
+        {
+            mask = PVFS_O_EXECUTE;
+        }
     }
+    
+    user_other_access = attr->perms & mask;
 
-    for(i = 0; grp.gr_mem[i] != NULL; i++)
+    mask = 0;
+    if (attr->group == group)
     {
-        if(0 == strcmp(pwd.pw_name, grp.gr_mem[i]) )
+        if (access == READ_ACCESS)
+        {
+            mask = PVFS_G_READ;
+        }
+        else if (access == WRITE_ACCESS)
         {
-            gen_mutex_unlock(&check_group_mutex);
-            return 0;
-        } 
+            mask = PVFS_G_WRITE;
+        }
+        else if (access == EXEC_ACCESS)
+        {
+            mask = PVFS_G_EXECUTE;
+        }
     }
 
-    gen_mutex_unlock(&check_group_mutex);
-    return(-PVFS_ENOENT);
-}
-
-/* Checks if a given user is part of any groups that matches the file gid */
-static int in_group_p(PVFS_uid uid, PVFS_gid gid, PVFS_gid attr_group)
-{
-    if (attr_group == gid)
-        return 1;
-    if (PINT_check_group(uid, attr_group) == 0)
-        return 1;
-    return 0;
+    group_access = attr->perms & mask;
+    
+    return (user_other_access || group_access);
 }
 
 /*
@@ -447,15 +304,22 @@ static int in_group_p(PVFS_uid uid, PVFS
  * by the acl. Returns -PVFS_EIO if ACL is invalid or not found,
  * returns -PVFS_EACCESS if access is denied
  */
-int PINT_check_acls(void *acl_buf, size_t acl_size, 
-    PVFS_object_attr *attr,
-    PVFS_uid uid, PVFS_gid gid, int want)
+/* nlmills: TODO: this should be rewritten */
+static int check_acls(void *acl_buf, 
+                      size_t acl_size, 
+                      const PVFS_object_attr *attr,
+                      PVFS_uid uid,
+                      PVFS_gid *group_array, 
+                      uint32_t num_groups, 
+                      int want)
 {
     pvfs2_acl_entry pe, *pa;
     int i = 0, found = 0, count = 0;
+    int j;
     assert(attr->mask & PVFS_ATTR_COMMON_UID &&
            attr->mask & PVFS_ATTR_COMMON_GID &&
            attr->mask & PVFS_ATTR_COMMON_PERM);
+    assert(num_groups > 0);
 
     if (acl_size == 0)
     {
@@ -471,7 +335,7 @@ int PINT_check_acls(void *acl_buf, size_
         (int) acl_size, 
         (int) (acl_size / sizeof(pvfs2_acl_entry)));
     gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "uid = %d, gid = %d, want = %d\n",
-        uid, gid, want);
+        uid, group_array[0], want);
 
     assert(acl_buf);
     /* if the acl format doesn't look valid, then return an error rather than
@@ -510,18 +374,27 @@ int PINT_check_acls(void *acl_buf, size_
                     goto mask;
                 break;
             case PVFS2_ACL_GROUP_OBJ:
-                if (in_group_p(uid, gid, attr->group)) 
+                for (j = 0; j < num_groups; j++)
                 {
-                    found = 1;
-                    if ((pa->p_perm & want) == want)
-                        goto mask;
+                    if (group_array[j] == attr->group)
+                    {
+                        found = 1;
+                        if ((pa->p_perm & want) == want)
+                            goto mask;
+                        break;
+                    }
                 }
                 break;
             case PVFS2_ACL_GROUP:
-                if (in_group_p(uid, gid, pa->p_id)) {
-                    found = 1;
-                    if ((pa->p_perm & want) == want)
-                        goto mask;
+                for (j = 0; j < num_groups; j++)
+                {
+                    if (group_array[j] == pa->p_id)
+                    {
+                        found = 1;
+                        if ((pa->p_perm & want) == want)
+                            goto mask;
+                        break;
+                    }
                 }
                 break;
             case PVFS2_ACL_MASK:
@@ -579,322 +452,6 @@ check_perm:
     return -PVFS_EACCES;
 }
 
-
-int PINT_perm_check(struct PINT_server_op *s_op)
-{
-    PVFS_capability *cap = &s_op->req->capability;
-    PINT_server_req_perm_fun perm_fun;
-    int ret = -PVFS_EINVAL;
-
-    if (s_op->target_fs_id != PVFS_FS_ID_NULL)
-    {
-        /*
-         * if we are exporting a volume readonly, disallow any operation 
-         * that modifies the state of the file-system.
-         */
-        ret = permit_operation(s_op->target_fs_id, s_op->access_type,
-                               s_op->addr);
-        if (ret < 0)
-        {
-            return ret;
-        }
-
-        /* XXX: removed root squashing */
-    }
-
-    if (s_op->target_handle != PVFS_HANDLE_NULL && 
-        !PINT_capability_is_null(cap))
-    {
-        int index;
-
-        /* ensure we have a capability for the target handle */
-        for (index = 0; index < cap->num_handles; index++)
-        {
-            if (cap->handle_array[index] == s_op->target_handle)
-            {
-                break;
-            }
-        }
-        if (index == cap->num_handles)
-        {
-            gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "Attempted to perform "
-                         "an operation on target handle %llu that was "
-                         "not in the capability\n", llu(s_op->target_handle));
-            return -PVFS_EACCES;
-        }
-    }
-
-    /* XXX: removed positive error check */
-
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "PVFS operation \"%s\" got "
-                 "attr mask %d\n\t(capability mask = %d)\n",
-                 PINT_map_server_op_to_string(s_op->req->op),
-                 s_op->attr.mask, cap->op_mask);
-
-    perm_fun = PINT_server_req_get_perm_fun(s_op->req);
-    if (perm_fun)
-    {
-        ret = perm_fun(s_op);
-    }
-    else
-    {
-        ret = -PVFS_EINVAL;
-    }
-
-#if 0
-    switch (ret)
-    {
-    case PINT_SERVER_CHECK_WRITE:
-        ret = caps->op_mask & 0222 ? 0 : -PVFS_EACCES;
-        break;
-    case PINT_SERVER_CHECK_READ:
-        ret = caps->op_mask & 0444 ? 0 : -PVFS_EACCES;
-        break;
-    case PINT_SERVER_CHECK_CRDIRENT:
-        ret = caps->op_mask & 0333 ? 0 : -PVFS_EACCES;
-        break;
-    case PINT_SERVER_CHECK_ATTR:
-        /* let datafiles pass through attr check */
-        if (s_op->attr.objtype == PVFS_TYPE_DATAFILE)
-        {
-            ret = 0;
-        }
-        /* for now assume extended attribs are treated
-         * the same as regular attribs as far as permissions
-         */
-        else if (s_op->req->op == PVFS_SERV_GETATTR  ||
-                 s_op->req->op == PVFS_SERV_GETEATTR ||
-                 s_op->req->op == PVFS_SERV_LISTEATTR)
-        {
-            /* getting or listing attributes is always ok --- permission
-             * is checked on the parent directory at read time
-             */
-            ret = 0;
-        }
-        else
-        {
-            ret = -PVFS_EACCES;
-        }
-        break;
-    case PINT_SERVER_CHECK_NONE:
-        /* TODO: figure out how to do the root squash check */
-        /* maybe a new flag in cap->op_mask? */
-        /* for now just allow everything */
-        ret = 0;
-        break;
-    case PINT_SERVER_CHECK_INVALID:
-        ret = -PVFS_EINVAL;
-        break;
-    }
-#endif
-
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, 
-                 "Final permission check for \"%s\" set error code to %d\n", 
-                 PINT_map_server_op_to_string(s_op->req->op),
-                 ret);
-
-    return ret;
-}
-
-#if 0
-/* prelude_perm_check()
- *
- * this really just marks the spot where we would want to do
- * permission checking, it will be replaced by a couple of states that
- * actually perform this task later
- */
-PINT_sm_action prelude_perm_check(
-    struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PVFS_object_attr *obj_attr = NULL;
-    PVFS_ds_attributes *ds_attr = NULL;
-    PVFS_uid translated_uid = s_op->req->credentials.uid;
-    PVFS_gid translated_gid = s_op->req->credentials.gid;
-    PVFS_fs_id  fsid = PVFS_FS_ID_NULL;
-    int squashed_flag = 0;
-    int skip_acl_flag = 0;
-
-    /* moved gossip server debug output to end of state, so we can report
-     * resulting status value.
-     */
-
-    /*
-      first we translate the dspace attributes into a more convenient
-      server use-able format.  i.e. a PVFS_object_attr
-    */
-    ds_attr = &s_op->ds_attr;
-    obj_attr = &s_op->attr;
-    PVFS_ds_attr_to_object_attr(ds_attr, obj_attr);
-    s_op->attr.mask = PVFS_ATTR_COMMON_ALL;
-    /* Set the target object attribute pointer.. used later by the acl check */
-    s_op->target_object_attr = obj_attr;
-
-    if (s_op->target_fs_id != PVFS_FS_ID_NULL)
-    {
-        /*
-         * if we are exporting a volume readonly, disallow any operation that modifies
-         * the state of the file-system.
-         */
-        if (permit_operation(
-                s_op->target_fs_id, s_op->access_type, s_op->addr) < 0)
-        {
-            js_p->error_code = -PVFS_EROFS;
-            return SM_ACTION_COMPLETE;
-        }
-        else 
-        {
-            /* Translate the uid and gid's in case we need to do some squashing based on the export and the client address */
-            if (translate_ids(fsid, s_op->req->credentials.uid, s_op->req->credentials.gid,
-                &translated_uid, &translated_gid, s_op->addr) == 1)
-            {
-                squashed_flag = 1;
-                s_op->req->credentials.uid = translated_uid;
-                s_op->req->credentials.gid = translated_gid;
-                /* in the case of a setattr, translate the ids as well right here */
-                if (s_op->req->op == PVFS_SERV_SETATTR)
-                {
-                    s_op->req->u.setattr.attr.owner = translated_uid;
-                    s_op->req->u.setattr.attr.group = translated_gid;
-                }
-                else if (s_op->req->op == PVFS_SERV_MKDIR)
-                {
-                    s_op->req->u.mkdir.attr.owner = translated_uid;
-                    s_op->req->u.mkdir.attr.group = translated_gid;
-                }
-            }
-       }
-    }
-
-    /* anything else we treat as a real error */
-    if (js_p->error_code)
-    {
-        js_p->error_code = -PVFS_ERROR_CODE(-js_p->error_code);
-        return SM_ACTION_COMPLETE;
-    }
-
-    gossip_debug(
-        GOSSIP_PERMISSIONS_DEBUG, "PVFS operation \"%s\" got "
-        "attr mask %d\n\t(attr_uid_valid? %s, attr_owner = "
-        "%d, credentials_uid = %d)\n\t(attr_gid_valid? %s, attr_group = "
-        "%d, credentials.gid = %d)\n",
-        PINT_map_server_op_to_string(s_op->req->op), s_op->attr.mask,
-        ((s_op->attr.mask & PVFS_ATTR_COMMON_UID) ? "yes" : "no"),
-        s_op->attr.owner, translated_uid,
-        ((s_op->attr.mask & PVFS_ATTR_COMMON_GID) ? "yes" : "no"),
-        s_op->attr.group, translated_gid);
-    
-    switch(PINT_server_req_get_perms(s_op->req))
-    {
-        case PINT_SERVER_CHECK_WRITE:
-            js_p->error_code = PINT_check_mode(
-                &(s_op->attr), translated_uid,
-                translated_gid, PINT_ACCESS_WRITABLE);
-            break;
-        case PINT_SERVER_CHECK_READ:
-            js_p->error_code = PINT_check_mode(
-                &(s_op->attr), translated_uid,
-                translated_gid, PINT_ACCESS_READABLE);
-            break;
-        case PINT_SERVER_CHECK_CRDIRENT:
-            /* must also check executable after writable */
-            js_p->error_code = PINT_check_mode(
-                &(s_op->attr), translated_uid,
-                translated_gid, PINT_ACCESS_WRITABLE);
-            if(js_p->error_code == 0)
-            {
-                js_p->error_code = PINT_check_mode(
-                    &(s_op->attr), translated_uid,
-                    translated_gid, PINT_ACCESS_EXECUTABLE);
-            }
-            break;
-        case PINT_SERVER_CHECK_ATTR:
-            /* let datafiles pass through the attr check */
-            if (s_op->attr.objtype == PVFS_TYPE_DATAFILE)
-            {
-                js_p->error_code = 0;
-            }
-            /* for now we'll assume extended attribs are treated
-             * the same as regular attribs as far as permissions
-             */
-	    else if (s_op->req->op == PVFS_SERV_GETATTR ||
-                    s_op->req->op == PVFS_SERV_GETEATTR ||
-                    s_op->req->op == PVFS_SERV_LISTEATTR)
-	    {
-		/* getting or listing attributes is always ok -- permission
-		 * is checked on the parent directory at read time
-		 */
-		js_p->error_code = 0;
-	    }
-            else /* setattr, seteattr, seteattr_list */
-            {
-                /*
-                  NOTE: on other file systems, setattr doesn't
-                  seem to require read permissions by the user, group
-                  OR other, so long as the user or group matches (or
-                  is root)
-                */
-                if (((s_op->attr.mask & PVFS_ATTR_COMMON_UID) &&
-                     ((s_op->attr.owner == 0) ||
-                      (s_op->attr.owner == translated_uid))) ||
-                    (((s_op->attr.mask & PVFS_ATTR_COMMON_GID) &&
-                      ((s_op->attr.group == 0) ||
-                       (s_op->attr.group == translated_gid)))) ||
-                    (translated_uid == 0))
-                {
-                    js_p->error_code = 0;
-                }
-                else
-                {
-                    js_p->error_code = -PVFS_EACCES;
-                }
-            }
-            break;
-        case PINT_SERVER_CHECK_NONE:
-            if(squashed_flag &&
-               PINT_server_req_get_access_type(s_op->req) == PINT_SERVER_REQ_MODIFY &&
-               ((s_op->req->op == PVFS_SERV_IO) ||
-                (s_op->req->op == PVFS_SERV_SMALL_IO) ||
-                (s_op->req->op == PVFS_SERV_TRUNCATE)))
-            {
-                /* special case:
-                 * If we have been squashed, deny write permission to the
-                 * file system.  At the datafile level we don't have enough
-                 * attribute information to figure out if the nobody/guest
-                 * user has permission to write or not, so we disallow all
-                 * writes to be safe.  Not perfect semantics, but better
-                 * than being too permissive.
-                 */
-                skip_acl_flag = 1;
-                js_p->error_code = -PVFS_EACCES;
-            }
-            else
-            {
-                js_p->error_code = 0;
-            }
-            break;
-        case PINT_SERVER_CHECK_INVALID:
-            js_p->error_code = -PVFS_EINVAL;
-            break;
-    }
-
-    gossip_debug(
-        GOSSIP_PERMISSIONS_DEBUG, "Final permission check for \"%s\" set "
-        "error code to %d\n", PINT_map_server_op_to_string(s_op->req->op),
-        js_p->error_code);
-
-    gossip_debug(GOSSIP_SERVER_DEBUG, 
-        "(%p) %s (prelude sm) state: perm_check (status = %d)\n",
-	s_op,
-        PINT_map_server_op_to_string(s_op->req->op),
-	js_p->error_code);
-    /* If regular checks fail, we need to run acl checks */
-    if (js_p->error_code == -PVFS_EACCES && !skip_acl_flag)
-        js_p->error_code = PRELUDE_RUN_ACL_CHECKS;
-    return SM_ACTION_COMPLETE;
-}
-#endif
-
 /*
  * Return zero if this operation should be allowed.
  */
@@ -939,114 +496,6 @@ static int permit_operation(PVFS_fs_id f
     return 0;
 }
 
-/* Translate_ids will return 1 if it did some uid/gid squashing, 0 otherwise */
-static int translate_ids(PVFS_fs_id fsid, PVFS_uid uid, PVFS_gid gid, 
-    PVFS_uid *translated_uid, PVFS_gid *translated_gid, PVFS_BMI_addr_t client_addr)
-{
-    int exp_flags = 0;
-    struct server_configuration_s *serv_config = NULL;
-    struct filesystem_configuration_s * fsconfig = NULL;
-
-    serv_config = PINT_get_server_config();
-    fsconfig = PINT_config_find_fs_id(serv_config, fsid);
-
-    if (fsconfig == NULL)
-    {
-        return 0;
-    }
-    exp_flags = fsconfig->exp_flags;
-    /* If all squash was set */
-    if (exp_flags & TROVE_EXP_ALL_SQUASH)
-    {
-        if (iterate_all_squash_wildcards(fsconfig, client_addr) == 1)
-        {
-            get_anon_ids(fsconfig, translated_uid, translated_gid);
-            gossip_debug(GOSSIP_SERVER_DEBUG,
-                "Translated ids from <%u:%u> to <%u:%u>\n",
-                uid, gid, *translated_uid, *translated_gid);
-            return 1;
-        }
-    }
-    /* if only root squash was set translate uids for root alone*/
-    if (exp_flags & TROVE_EXP_ROOT_SQUASH)
-    {
-        if (uid == 0 || gid == 0)
-        {
-            if (iterate_root_squash_wildcards(fsconfig, client_addr) == 1)
-            {
-                get_anon_ids(fsconfig, translated_uid, translated_gid);
-                gossip_debug(GOSSIP_SERVER_DEBUG,
-                    "Translated ids from <%u:%u> to <%u:%u>\n",
-                    uid, gid, *translated_uid, *translated_gid);
-                return 1;
-            }
-        }
-    }
-    /* no such translation required! */
-    *translated_uid = uid;
-    *translated_gid = gid;
-    return 0;
-}
-
-static void get_anon_ids(struct filesystem_configuration_s *fsconfig,
-    PVFS_uid *uid, PVFS_gid *gid)
-{
-    *uid = fsconfig->exp_anon_uid;
-    *gid = fsconfig->exp_anon_gid;
-    return;
-}
-
-static int iterate_all_squash_wildcards(struct filesystem_configuration_s *fsconfig,
-    PVFS_BMI_addr_t client_addr)
-{
-    int i;
-
-    for (i = 0; i < fsconfig->all_squash_count; i++)
-    {
-        gossip_debug(GOSSIP_SERVER_DEBUG, "BMI_query_addr_range %lld, %s\n",
-            lld(client_addr), fsconfig->all_squash_hosts[i]);
-        if (BMI_query_addr_range(client_addr, fsconfig->all_squash_hosts[i],
-                fsconfig->all_squash_netmasks[i]) == 1)
-        {
-            return 1;
-        }
-    }
-    return 0;
-}
-
-static int iterate_root_squash_wildcards(struct filesystem_configuration_s *fsconfig,
-    PVFS_BMI_addr_t client_addr)
-{
-    int i;
-
-    /* check exceptions first */
-    for (i = 0; i < fsconfig->root_squash_exceptions_count; i++)
-    {
-        gossip_debug(GOSSIP_SERVER_DEBUG, "BMI_query_addr_range %lld, %s, netmask: %i\n",
-            lld(client_addr), fsconfig->root_squash_exceptions_hosts[i],
-            fsconfig->root_squash_exceptions_netmasks[i]);
-        if (BMI_query_addr_range(client_addr, fsconfig->root_squash_exceptions_hosts[i], 
-                fsconfig->root_squash_exceptions_netmasks[i]) == 1)
-        {
-            /* in the exception list, do not squash */
-            return 0;
-        }
-    }
-
-    for (i = 0; i < fsconfig->root_squash_count; i++)
-    {
-        gossip_debug(GOSSIP_SERVER_DEBUG, "BMI_query_addr_range %lld, %s, netmask: %i\n",
-            lld(client_addr), fsconfig->root_squash_hosts[i],
-            fsconfig->root_squash_netmasks[i]);
-        if (BMI_query_addr_range(client_addr, fsconfig->root_squash_hosts[i], 
-                fsconfig->root_squash_netmasks[i]) == 1)
-        {
-            return 1;
-        }
-    }
-    return 0;
-}
-
 static int iterate_ro_wildcards(struct filesystem_configuration_s *fsconfig, 
                                 PVFS_BMI_addr_t client_addr)
 {
@@ -1066,127 +515,6 @@ static int iterate_ro_wildcards(struct f
     }
     return 0;
 }
-
-PINT_sm_action prelude_check_acls_if_needed(
-    struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -PVFS_EINVAL;
-    job_id_t i;
-
-    gossip_debug(GOSSIP_SERVER_DEBUG,
-                 "(%p) %s (prelude sm) state: prelude_check_acls_if_needed\n",
-                 s_op, PINT_map_server_op_to_string(s_op->req->op));
-
-    /* If we get here with an invalid fsid and handle, we have to
-     * return -PVFS_EACCESS 
-     */
-    if (s_op->target_fs_id == PVFS_FS_ID_NULL
-        || s_op->target_handle == PVFS_HANDLE_NULL)
-    {
-        js_p->error_code = -PVFS_EACCES;
-        return SM_ACTION_COMPLETE;
-    }
-    js_p->error_code = 0;
-
-    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
-    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
-    s_op->key.buffer = "system.posix_acl_access";
-    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
-    s_op->val.buffer = (char *) malloc(PVFS_REQ_LIMIT_VAL_LEN);
-    if (!s_op->val.buffer)
-    {
-        js_p->error_code = -PVFS_ENOMEM;
-        return SM_ACTION_COMPLETE;
-    }
-    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
-
-    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
-                 "for handle %llu\n", llu(s_op->target_handle));
-
-    /* Read acl keys */
-    ret = job_trove_keyval_read(
-        s_op->target_fs_id,
-        s_op->target_handle,
-        &s_op->key,
-        &s_op->val,
-        0,
-        NULL,
-        smcb,
-        0,
-        js_p,
-        &i,
-        server_job_context);
-    return ret;
-}
-
-#if 0
-PINT_sm_action prelude_check_acls(
-    struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    PVFS_object_attr *obj_attr = NULL;
-    int want = 0;
-
-    /* The dspace attr must have been read at this point */
-    obj_attr = s_op->target_object_attr;
-    assert(obj_attr);
-
-    /* anything non-zero we treat as a real error */
-    if (js_p->error_code)
-    {
-        goto cleanup;
-    }
-    /* make sure that we hit here only for metafiles, dirs and symlink objects */
-    if (obj_attr->objtype != PVFS_TYPE_METAFILE
-        && obj_attr->objtype != PVFS_TYPE_DIRECTORY
-        && obj_attr->objtype != PVFS_TYPE_SYMLINK)
-    {
-        gossip_err("prelude_check_acls hit invalid object type %d\n",
-            obj_attr->objtype);
-        js_p->error_code = -PVFS_EINVAL;
-        goto cleanup;
-    }
-    switch (PINT_server_req_get_perms(s_op->req))
-    {
-        case PINT_SERVER_CHECK_WRITE:
-        default:
-            want = PVFS2_ACL_WRITE;
-            break;
-        case PINT_SERVER_CHECK_READ:
-            want = PVFS2_ACL_READ;
-            break;
-        case PINT_SERVER_CHECK_CRDIRENT:
-            want = PVFS2_ACL_WRITE | PVFS2_ACL_EXECUTE;
-            break;
-        case PINT_SERVER_CHECK_NONE:
-            want = 0;
-            break;
-        case PINT_SERVER_CHECK_INVALID:
-            js_p->error_code = -PVFS_EINVAL;
-            goto cleanup;
-    }
-    js_p->error_code = PINT_check_acls(s_op->val.buffer,
-                        s_op->val.read_sz,
-                        obj_attr, 
-                        s_op->req->credentials.uid,
-                        s_op->req->credentials.gid,
-                        want);
-cleanup:
-    gossip_debug(
-        GOSSIP_PERMISSIONS_DEBUG, "Final permission check (after acls) \"%s\" set "
-        "error code to %d (want %x)\n",
-            PINT_map_server_op_to_string(s_op->req->op),
-            js_p->error_code, want);
-
-    if (s_op->val.buffer) 
-        free(s_op->val.buffer);
-    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
-    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
-    return SM_ACTION_COMPLETE;
-}
-#endif 
-
 
 /*
  * Local variables:

Index: check.h
===================================================================
RCS file: /anoncvs/pvfs2/src/server/Attic/check.h,v
diff -p -u -r1.1.8.6 -r1.1.8.7
--- check.h	18 Jul 2008 18:45:07 -0000	1.1.8.6
+++ check.h	25 Aug 2009 17:56:25 -0000	1.1.8.7
@@ -6,35 +6,10 @@
 #include "pvfs2-attr.h"
 #include "pvfs2-server.h"
 
-enum PINT_access_type
-{
-    PINT_ACCESS_EXECUTABLE = 1,
-    PINT_ACCESS_WRITABLE = 2,
-    PINT_ACCESS_READABLE = 4,
-};
-
-int PINT_check_mode(
-    PVFS_object_attr *attr,
-    PVFS_uid uid, PVFS_gid gid,
-    enum PINT_access_type access_type);
-
-int PINT_check_acls(void *acl_buf, size_t acl_size, 
-    PVFS_object_attr *attr,
-    PVFS_uid uid, PVFS_gid gid, int want);
-
 int PINT_perm_check(struct PINT_server_op *s_op);
-    
-PINT_sm_action prelude_perm_check(
-    struct PINT_smcb *smcb, job_status_s *js_p);
-    
-PINT_sm_action prelude_check_acls_if_needed(
-    struct PINT_smcb *smcb, job_status_s *js_p);
-    
-PINT_sm_action prelude_check_acls(
-    struct PINT_smcb *smcb, job_status_s *js_p);
-    
-void PINT_getattr_check_perms(struct PINT_smcb *smcb, PVFS_uid uid, PVFS_gid *gid, uint32_t num_groups, 
-               PVFS_object_attr attr, uint32_t *op_mask);
+
+int PINT_get_capabilities(void *acl_buf, size_t acl_size, PVFS_uid userid,
+    PVFS_gid *group_array, uint32_t num_groups, const PVFS_object_attr *attr, 
+    uint32_t *op_mask);
     
 #endif  /* __CHECK_H */
-

Index: crdirent.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/crdirent.sm,v
diff -p -u -r1.69.2.4 -r1.69.2.5
--- crdirent.sm	29 Jul 2008 20:01:15 -0000	1.69.2.4
+++ crdirent.sm	25 Aug 2009 17:56:25 -0000	1.69.2.5
@@ -203,7 +203,7 @@ static PINT_sm_action crdirent_read_dire
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -280,7 +280,7 @@ static PINT_sm_action crdirent_write_dir
         s_op->u.crdirent.fs_id, s_op->u.crdirent.dirent_handle,
         &s_op->key, &s_op->val, 
         keyval_flags,
-        NULL, smcb, 0, js_p, &i, server_job_context);
+        NULL, smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
 
     /*
      * creating an entry will cause directory times to be updated.
@@ -330,7 +330,7 @@ static PINT_sm_action crdirent_update_di
         s_op->req->u.crdirent.fs_id, s_op->req->u.crdirent.handle,
         ds_attr, 
         TROVE_SYNC,
-        smcb, 0, js_p, &j_id, server_job_context);
+        smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: create.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/create.sm,v
diff -p -u -r1.47.2.5 -r1.47.2.6
--- create.sm	12 Sep 2008 15:19:39 -0000	1.47.2.5
+++ create.sm	25 Aug 2009 17:56:25 -0000	1.47.2.6
@@ -12,8 +12,12 @@
 #include "pvfs2-attr.h"
 #include "gossip.h"
 #include "pvfs2-internal.h"
+#include "pint-util.h"
+#include "pint-cached-config.h"
 #include "pint-security.h"
 
+#define REPLACE_DONE 100
+
 %%
 
 machine pvfs2_create_sm
@@ -21,19 +25,94 @@ machine pvfs2_create_sm
     state prelude
     {
         jump pvfs2_prelude_sm;
-        success => create;
-        default => final_response;
+        success => create_metafile;
+        default => setup_final_response;
+    }
+
+    state create_metafile
+    {
+        run create_metafile;
+        success => check_stuffed;
+        default => setup_final_response;
+    }
+
+    state check_stuffed
+    {
+        run check_stuffed;
+        success => create_local_datafiles;
+        default => setup_final_response;
+    }
+
+    state create_local_datafiles
+    {
+        run create_local_datafiles;
+        success => setup_local_datafile_handles;
+        default => remove_metafile_object;
+    }
+
+    state setup_local_datafile_handles
+    {
+        run setup_local_datafile_handles;
+        success => request_datafiles;
+        default => remove_local_datafile_handles;
+    }
+
+    state request_datafiles
+    {
+        run request_datafiles;
+        success => write_keyvals;
+        default => remove_local_datafile_handles;
+    }
+
+    state write_keyvals
+    {
+        run write_keyvals;
+        success => setobj_attribs;
+        default => replace_remote_datafile_handles;
     }
 
-    state create
+    state setobj_attribs
     {
-        run create_create;
-        default => setup_resp;
+        run setattr_setobj_attribs;
+        success => setup_resp;
+        default => remove_keyvals;
     }
 
     state setup_resp
     {
-        run create_setup_resp;
+        run setup_resp;
+        default => setup_final_response;
+    }
+
+    state remove_local_datafile_handles
+    {
+        run remove_local_datafile_handles;
+        default => remove_metafile_object;
+    }
+
+    state replace_remote_datafile_handles
+    {
+        run replace_remote_datafile_handles;
+        REPLACE_DONE => remove_local_datafile_handles;
+        default => replace_remote_datafile_handles;
+    }
+
+    state remove_metafile_object
+    {
+        run remove_metafile_object;
+        default => setup_final_response;
+    }
+
+    state remove_keyvals
+    {
+        run remove_keyvals;
+        success => replace_remote_datafile_handles;
+        default => setup_final_response;
+    }
+
+    state setup_final_response
+    {
+        run setup_final_response;
         default => final_response;
     }
 
@@ -45,95 +124,649 @@ machine pvfs2_create_sm
 
     state cleanup
     {
-        run create_cleanup;
+        run cleanup;
         default => terminate;
     }
 }
 
 %%
 
-/*
- * Function: create_create
- *
- * Params:   server_op *s_op, 
- *           job_status_s* js_p
- *
- * Pre:      None
- *
- * Post:     None
- *
- * Returns:  int
- *
- * Synopsis: Create the new dataspace with the values provided in the response.
- *           
- */
-static int create_create(
+static int setup_final_response(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    /* retrieve original error code if present */
+    if(s_op->u.create.saved_error_code)
+    {
+        js_p->error_code = s_op->u.create.saved_error_code;
+    }
+
+    /* otherwise propigate the js_p->error code */
+    return(SM_ACTION_COMPLETE);
+}
+
+static int create_metafile(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     int ret = -1;
     job_id_t i;
+    PVFS_handle_extent_array meta_handle_ext_array;
+    server_configuration_s *config = get_server_config_struct();
+
+    ret = PINT_cached_config_get_server(
+        s_op->req->u.create.fs_id,
+        config->host_id,
+        PINT_SERVER_TYPE_META,
+        &meta_handle_ext_array);
 
     ret = job_trove_dspace_create(
         s_op->req->u.create.fs_id,
-        &s_op->req->u.create.handle_extent_array,
-        s_op->req->u.create.object_type,
+        &meta_handle_ext_array,
+        PVFS_TYPE_METAFILE,
         NULL,
-        TROVE_SYNC ,
+        0,
         smcb,
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return(ret);
 }
 
-/* create_setup_resp()
- *
- * fills in the response structure based on results of previous operation
- */
-static int create_setup_resp(
+static int check_stuffed(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int server_type;
+    server_configuration_s *config = get_server_config_struct();
+    struct filesystem_configuration_s *fs_conf;
+    PVFS_BMI_addr_t myaddr;
+    PVFS_sys_layout *layout;
+    int ret;
+    const char* svr_name;
+    int i;
+
+    s_op->resp.u.create.metafile_handle = js_p->handle;
+    gossip_debug(
+        GOSSIP_SERVER_DEBUG, "Metafile handle created: %llu\n",
+        llu(js_p->handle));
+
+    assert(config);
+
+    layout = &s_op->req->u.create.layout;
+
+    if(layout->algorithm == PVFS_SYS_LAYOUT_LIST)
+    {
+        for(i=0; i<layout->server_list.count; i++)
+        {
+            gossip_debug(GOSSIP_SERVER_DEBUG, "layout list server %d: %lld\n", 
+                i, lld(layout->server_list.servers[i])); 
+        }
+    }
+
+    fs_conf = PINT_config_find_fs_id(config, 
+        s_op->req->u.create.fs_id);
+    if(!fs_conf)
+    {
+        js_p->error_code = -PVFS_EINVAL;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    ret = BMI_addr_lookup(&myaddr, config->host_id);
+    if(ret != 0)
+    {
+        /* we can't get our own address? */
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* is this metadata server also IO? */
+    svr_name = PINT_cached_config_map_addr(s_op->req->u.create.fs_id,
+                                myaddr, &server_type);
+    if(!svr_name)
+    {
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* for now only support stuffing of ROUND_ROBIN layouts */
+    if((server_type & PINT_SERVER_TYPE_IO) && fs_conf->file_stuffing && layout->algorithm == PVFS_SYS_LAYOUT_ROUND_ROBIN)
+    {    
+        /* we can do a stuffed create here, only one datafile */
+        s_op->req->u.create.attr.u.meta.dfile_count = 1;
+        s_op->resp.u.create.datafile_count = 1;
+        s_op->resp.u.create.datafile_handles = malloc(sizeof(PVFS_handle));
+        s_op->u.create.handle_array_local = malloc(sizeof(PVFS_handle));
+        if(!s_op->resp.u.create.datafile_handles || !s_op->u.create.handle_array_local)
+        {
+            js_p->error_code = -PVFS_ENOMEM;
+            return SM_ACTION_COMPLETE;
+        }
+
+        s_op->resp.u.create.stuffed = 1;
+        js_p->error_code = 0;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* file will not be stuffed; need to allocate all datafiles */
+    s_op->u.create.num_io_servers = s_op->req->u.create.num_dfiles_req;
+    s_op->resp.u.create.datafile_handles = malloc(
+        sizeof(*s_op->resp.u.create.datafile_handles) *
+        s_op->u.create.num_io_servers);
+    s_op->u.create.handle_array_local = malloc(
+        sizeof(*s_op->u.create.handle_array_local) *
+        s_op->u.create.num_io_servers);
+    s_op->u.create.handle_array_remote = malloc(
+        sizeof(*s_op->u.create.handle_array_remote) *
+        s_op->u.create.num_io_servers);
+    s_op->u.create.remote_io_servers = malloc(
+        sizeof(char*) *
+        s_op->u.create.num_io_servers);
+    if(!s_op->resp.u.create.datafile_handles || 
+        !s_op->u.create.handle_array_local ||
+        !s_op->u.create.handle_array_remote ||
+        !s_op->u.create.remote_io_servers)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* gather list of servers to use, may include local server */
+    ret = PINT_cached_config_get_server_list(
+        s_op->req->u.create.fs_id,
+        s_op->req->u.create.attr.u.meta.dist,
+        s_op->req->u.create.num_dfiles_req,
+        &s_op->req->u.create.layout,
+        &s_op->u.create.io_servers,
+        &s_op->u.create.num_io_servers);
+    if(ret < 0)
+    {
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+
+    /* layout may have adjusted number of datafiles */
+    s_op->req->u.create.attr.u.meta.dfile_count
+        = s_op->u.create.num_io_servers;
+    s_op->resp.u.create.datafile_count 
+        = s_op->u.create.num_io_servers;
+    for(i=0; i<s_op->u.create.num_io_servers; i++)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG, "io_server %d: %s\n", 
+            i, s_op->u.create.io_servers[i]); 
+    }
+
+    s_op->resp.u.create.stuffed = 0;
+    js_p->error_code = 0;
+    return SM_ACTION_COMPLETE;
+}
+
+static int create_local_datafiles(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    if (js_p->error_code == 0)
+    int ret = -1;
+    job_id_t tmp_id;
+    PVFS_handle_extent_array data_handle_ext_array;
+    server_configuration_s *config = get_server_config_struct();
+    int i;
+    int tmp_index = 0;
+
+    if(s_op->resp.u.create.stuffed)
     {
-	gossip_debug(GOSSIP_SERVER_DEBUG, "Handle created: %llu\n",
-                     llu(js_p->handle));
-	s_op->resp.u.create.handle = js_p->handle;
-        switch(s_op->req->u.create.object_type)
-        {
-            case PVFS_TYPE_NONE:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, 
-                    "new handle: %llu, type unknown.\n", llu(js_p->handle));
-                break;
-            case PVFS_TYPE_METAFILE:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, 
-                    "new handle: %llu, type metafile.\n", llu(js_p->handle));
-                break;
-            case PVFS_TYPE_DATAFILE:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG,    
-                    "new handle: %llu, type datafile.\n", llu(js_p->handle));
-                break;
-            case PVFS_TYPE_DIRECTORY:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, 
-                    "new handle: %llu, type directory.\n", llu(js_p->handle));
-                break;
-            case PVFS_TYPE_SYMLINK:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG,    
-                    "new handle: %llu, type symlink.\n", llu(js_p->handle));
-                break;
-            case PVFS_TYPE_DIRDATA:
-                PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG,    
-                    "new handle: %llu, type dirdata.\n", llu(js_p->handle));
-                break;
+        /* only one datafile, and it is local */
+        s_op->u.create.handle_array_local_count = 1;
+        s_op->u.create.handle_array_remote_count = 0;
+    }
+    else
+    {
+        /* figure out how many datafiles need to be local vs. remote */
+        s_op->u.create.handle_array_local_count = 0;
+        s_op->u.create.handle_array_remote_count = 0;
+        for(i=0; i<s_op->u.create.num_io_servers; i++)
+        {
+            if(!strcmp(s_op->u.create.io_servers[i], config->host_id))
+            {
+                s_op->u.create.handle_array_local_count++;
+            }
+            else
+            {
+                s_op->u.create.handle_array_remote_count++;
+                s_op->u.create.remote_io_servers[tmp_index] = 
+                    s_op->u.create.io_servers[i];
+                tmp_index++;
+            }
         }
     }
 
-    /* NOTE: we _deliberately_ leave the error_code unchanged so that it
-     * can be used by the next state.
+    gossip_debug(GOSSIP_SERVER_DEBUG, "creating %d local data files\n", 
+        s_op->u.create.handle_array_local_count);
+    gossip_debug(GOSSIP_SERVER_DEBUG, "creating %d remote data files\n", 
+        s_op->u.create.handle_array_remote_count);
+
+    if(s_op->u.create.handle_array_local_count == 0)
+    {
+        /* no local work to do */
+        js_p->error_code = 0;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    /* find local extent array */
+    ret = PINT_cached_config_get_server(
+        s_op->req->u.create.fs_id,
+        config->host_id,
+        PINT_SERVER_TYPE_IO,
+        &data_handle_ext_array);
+    if(ret < 0)
+    {
+        js_p->error_code = ret;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    /* deliberately not setting SYNC flag, because both the attrs and
+     * keyvals will be synced in later states
      */
+    ret = job_trove_dspace_create_list(
+        s_op->req->u.create.fs_id,
+        &data_handle_ext_array,
+        s_op->u.create.handle_array_local,
+        s_op->u.create.handle_array_local_count,
+        PVFS_TYPE_DATAFILE,
+        NULL,
+        0,
+        smcb,
+        0,
+        js_p,
+        &tmp_id,
+        server_job_context,
+        s_op->req->hints);
+
+    return(ret);
+}
+
+static PINT_sm_action request_datafiles(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+
+    if(s_op->u.create.handle_array_remote_count == 0)
+    {
+        js_p->error_code = 0;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    ret = job_precreate_pool_get_handles(
+        s_op->req->u.create.fs_id,
+        s_op->u.create.handle_array_remote_count,
+        s_op->u.create.remote_io_servers,
+        s_op->u.create.handle_array_remote,
+        0,
+        smcb,
+        0,
+        js_p,
+        &j_id,
+        server_job_context,
+        s_op->req->hints);
+    return ret;
+}
+
+static PINT_sm_action remove_metafile_object(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+
+    /* save the error code before we begin cleanup */
+    if(!s_op->u.create.saved_error_code)
+    {
+        s_op->u.create.saved_error_code = js_p->error_code;
+    }
+
+    free(s_op->resp.u.create.datafile_handles);
+
+    ret = job_trove_dspace_remove(
+        s_op->req->u.create.fs_id,
+        s_op->resp.u.create.metafile_handle,
+        0,
+        smcb,
+        0,
+        js_p,
+        &j_id,
+        server_job_context,
+        s_op->req->hints);
+    return ret;
+}
+
+static PINT_sm_action remove_local_datafile_handles(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+
+    /* save the error code before we begin cleanup */
+    if(!s_op->u.create.saved_error_code)
+    {
+        s_op->u.create.saved_error_code = js_p->error_code;
+    }
+
+    if(s_op->u.create.handle_array_local_count == 0)
+    {
+        /* nothing to do */
+        js_p->error_code = 0;
+        return(SM_ACTION_COMPLETE);
+    }
+
+    ret = job_trove_dspace_remove_list(s_op->req->u.create.fs_id,
+        s_op->u.create.handle_array_local,
+        NULL,
+        s_op->u.create.handle_array_local_count,
+        0,
+        smcb,
+        0,
+        js_p,
+        &j_id,
+        server_job_context,
+        s_op->req->hints);
+
+    return ret;
+}
+
+static PINT_sm_action replace_remote_datafile_handles(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+    PVFS_handle pool_handle;
+
+    /* save the error code before we begin cleanup */
+    if(!s_op->u.create.saved_error_code)
+    {
+        s_op->u.create.saved_error_code = js_p->error_code;
+    }
+
+    if(s_op->u.create.handle_index < s_op->u.create.handle_array_remote_count)
+    {
+        /* find pool that this handle belongs to */
+        ret = job_precreate_pool_lookup_server(
+            s_op->u.create.remote_io_servers[s_op->u.create.handle_index],
+            s_op->req->u.create.fs_id,
+            &pool_handle);
+        if(ret < 0)
+        {
+            s_op->u.create.handle_index++;
+            js_p->error_code = ret;
+            return(SM_ACTION_COMPLETE);
+        }
+
+        /* return handle to pool */
+        ret = job_precreate_pool_fill(
+            pool_handle,
+            s_op->req->u.create.fs_id,
+            &s_op->u.create.handle_array_remote[s_op->u.create.handle_index],
+            1,
+            smcb,
+            0,
+            js_p,
+            &j_id,
+            server_job_context,
+            s_op->req->hints);
+
+        s_op->u.create.handle_index++;
+        return(ret);
+    }
+    else
+    {
+        /* all handles have been replaced */
+        js_p->error_code = REPLACE_DONE;
+        return(SM_ACTION_COMPLETE);
+    }
+}
+
+static PINT_sm_action remove_keyvals(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+
+    /* save the error code before we begin cleanup */
+    if(!s_op->u.create.saved_error_code)
+    {
+        s_op->u.create.saved_error_code = js_p->error_code;
+    }
+
+    /* the keyval keys and vals should still be valid here */
+    ret = job_trove_keyval_remove_list(
+        s_op->req->u.create.fs_id,
+        s_op->resp.u.create.metafile_handle,
+        s_op->key_a, s_op->val_a, s_op->error_a,
+        2, TROVE_SYNC, NULL, smcb, 0, js_p, &j_id, server_job_context,
+        s_op->req->hints);
+
+    return ret;
+}
+
+static PINT_sm_action setup_local_datafile_handles(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int i;
+    int tmp_index = 0;
+    server_configuration_s *config = get_server_config_struct();
+
+    if(s_op->resp.u.create.stuffed)
+    {
+        s_op->resp.u.create.datafile_handles[0] = 
+            s_op->u.create.handle_array_local[0];
+        js_p->error_code = 0;
+        return(SM_ACTION_COMPLETE);
+    }
+    else
+    {
+        for(i=0; i<s_op->u.create.num_io_servers; i++)
+        {
+            /* find local server positions and set handles */
+            if(!strcmp(s_op->u.create.io_servers[i], config->host_id))
+            {
+                s_op->resp.u.create.datafile_handles[i] = 
+                    s_op->u.create.handle_array_local[tmp_index];
+                tmp_index++;
+            }
+        }
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action write_keyvals(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+    int i;
+    int tmp_index = 0;
+    server_configuration_s *config = get_server_config_struct();
+    char* tmpbuf;
+
+    if(s_op->u.create.handle_array_remote_count)
+    {
+        for(i=0; i<s_op->u.create.num_io_servers; i++)
+        {
+            /* find remote server positions and set handles */
+            if(strcmp(s_op->u.create.io_servers[i], config->host_id))
+            {
+                s_op->resp.u.create.datafile_handles[i] = 
+                    s_op->u.create.handle_array_remote[tmp_index];
+                tmp_index++;
+            }
+        }
+    }
+
+    /* start with 2 keyvals: the distribution and the datafile handles */
+    int keyval_count = 2;
+
+    if(s_op->resp.u.create.stuffed)
+    {
+        /* also need to set the layout as a keyval */
+        keyval_count+= 2;
+    }
+
+    s_op->key_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
+    if(!s_op->key_a)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    s_op->val_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
+    if(!s_op->val_a)
+    {
+        free(s_op->key_a);
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    memset(s_op->val_a, 0, sizeof(PVFS_ds_keyval) * keyval_count);
+
+    s_op->key_a[0].buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
+    s_op->key_a[0].buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
+
+    s_op->val_a[0].buffer = s_op->resp.u.create.datafile_handles;
+    s_op->val_a[0].buffer_sz =
+        s_op->resp.u.create.datafile_count * sizeof(PVFS_handle);
+
+    s_op->key_a[1].buffer = Trove_Common_Keys[METAFILE_DIST_KEY].key;
+    s_op->key_a[1].buffer_sz = Trove_Common_Keys[METAFILE_DIST_KEY].size;
+
+    s_op->val_a[1].buffer_sz =
+        s_op->req->u.create.attr.u.meta.dist_size;
+    s_op->val_a[1].buffer = malloc(s_op->val_a[1].buffer_sz);
+    if(!s_op->val_a[1].buffer)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    PINT_dist_encode(s_op->val_a[1].buffer,
+                     s_op->req->u.create.attr.u.meta.dist);
+
+    if(s_op->resp.u.create.stuffed)
+    {
+        s_op->key_a[2].buffer = Trove_Common_Keys[METAFILE_LAYOUT_KEY].key;
+        s_op->key_a[2].buffer_sz = Trove_Common_Keys[METAFILE_LAYOUT_KEY].size;
+
+        s_op->val_a[2].buffer = malloc(PVFS_REQ_LIMIT_LAYOUT);
+        if(!s_op->val_a[2].buffer)
+        {
+            js_p->error_code = -PVFS_ENOMEM;
+            return SM_ACTION_COMPLETE;
+        }
+        tmpbuf = s_op->val_a[2].buffer;
+        encode_PVFS_sys_layout(&tmpbuf, &s_op->req->u.create.layout);
+
+        s_op->val_a[2].buffer_sz = (tmpbuf - (char*)s_op->val_a[2].buffer);
+
+        gossip_debug(GOSSIP_SERVER_DEBUG, 
+            "create storing layout of size: %d\n", 
+            s_op->val_a[2].buffer_sz);
+
+        s_op->key_a[3].buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
+        s_op->key_a[3].buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
+
+        gossip_debug(
+            GOSSIP_SERVER_DEBUG, "create storing NUM_DFILES_REQ_KEY value of %d\n",
+            s_op->req->u.create.num_dfiles_req);
+        s_op->val_a[3].buffer = &s_op->req->u.create.num_dfiles_req;
+        s_op->val_a[3].buffer_sz = sizeof(s_op->req->u.create.num_dfiles_req);
+    }
+
+    ret = job_trove_keyval_write_list(
+        s_op->req->u.create.fs_id,
+        s_op->resp.u.create.metafile_handle,
+        s_op->key_a, s_op->val_a,
+        keyval_count, TROVE_SYNC, NULL, smcb,
+        0, js_p, &j_id, server_job_context,
+        s_op->req->hints);
+    return ret;
+}
+
+static PINT_sm_action setattr_setobj_attribs(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret = -1;
+    job_id_t j_id;
+    PVFS_object_attr *a_p = NULL;
+    PVFS_object_attr *dspace_a_p = NULL;
+    PVFS_ds_attributes *ds_attr = NULL;
+
+    dspace_a_p = &s_op->attr;
+    a_p = &s_op->req->u.create.attr;
+
+     /* 
+      * Remember that mtime is versioned on disk! so convert it here..
+      * It is better to do it here than change the PVFS_object_attr_overwrite_setable
+      * macro, since there are many more users of it, I think.
+      */
+     if (a_p->mask & PVFS_ATTR_COMMON_MTIME_SET)
+     {
+         PVFS_time orig_mtime = a_p->mtime;
+         a_p->mtime = PINT_util_mktime_version(orig_mtime);
+         gossip_debug(GOSSIP_SETATTR_DEBUG, "setting version "
+                 "to %llu\n\tmtime is %llu\n",
+                 llu(a_p->mtime), llu(orig_mtime));
+     }
+
+     /*
+        we have the attribs stored in the dspace, as well as the
+        requested attribs to store.  overwrite the ones that are setable
+        and specified by the mask value in the request; macro defined in
+        pvfs2-storage.h
+        */
+     PVFS_object_attr_overwrite_setable(dspace_a_p, a_p);
+
+     gossip_debug(
+         GOSSIP_SERVER_DEBUG,
+         "[STUFFED CREATE]: WRITING attrs: [owner = %d, group = %d\n\t"
+         "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
+         "ctime = %llu | dfile_count = %d | dist_size = %d]\n",
+         dspace_a_p->owner, dspace_a_p->group, dspace_a_p->perms,
+         dspace_a_p->objtype, llu(dspace_a_p->atime),
+         llu(PINT_util_mkversion_time(dspace_a_p->mtime)), llu(dspace_a_p->ctime),
+         (int)dspace_a_p->u.meta.dfile_count,
+         (int)dspace_a_p->u.meta.dist_size);
+
+     /* translate attrs to storage attr format */
+     ds_attr = &(s_op->ds_attr);
+     PVFS_object_attr_to_ds_attr(dspace_a_p, ds_attr);
+
+     ret = job_trove_dspace_setattr(
+         s_op->req->u.create.fs_id, s_op->resp.u.create.metafile_handle,
+         ds_attr,
+         TROVE_SYNC,
+         smcb, 0, js_p, &j_id, server_job_context,
+         s_op->req->hints);
+
+     return ret;
+}
+
+static int setup_resp(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    if (js_p->error_code == 0)
+    {
+        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG,
+                          "create: new metadata handle: %llu.\n",
+                          llu(s_op->resp.u.create.metafile_handle));
+    }
+
     return SM_ACTION_COMPLETE;
 }
 
@@ -153,9 +786,54 @@ static int create_setup_resp(
  * Synopsis: free memory and return
  *           
  */
-static int create_cleanup(
+static int cleanup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    if(s_op->key_a)
+    {
+        free(s_op->key_a);
+    }
+
+    if(s_op->val_a)
+    {
+        if(s_op->val_a[1].buffer)
+        {
+            free(s_op->val_a[1].buffer);
+        }
+        if(s_op->resp.u.create.stuffed && s_op->val_a[2].buffer)
+        {
+            free(s_op->val_a[2].buffer);
+        }
+        free(s_op->val_a);
+    }
+
+    if(s_op->resp.u.create.datafile_handles)
+    {
+        free(s_op->resp.u.create.datafile_handles);
+    }
+
+    if(s_op->u.create.handle_array_remote)
+    {
+        free(s_op->u.create.handle_array_remote);
+    }
+
+    if(s_op->u.create.handle_array_local)
+    {
+        free(s_op->u.create.handle_array_local);
+    }
+
+    if(s_op->u.create.io_servers)
+    {
+        free(s_op->u.create.io_servers);
+    }
+    
+    if(s_op->u.create.remote_io_servers)
+    {
+        free(s_op->u.create.remote_io_servers);
+    }
+
     return(server_state_machine_complete(smcb));
 }
 
@@ -165,22 +843,49 @@ static inline int PINT_get_object_ref_cr
     *fs_id = req->u.create.fs_id;
     *handle = PVFS_HANDLE_NULL;
     return 0;
-}
+};
 
 static int perm_create(PINT_server_op *s_op)
 {
-    int ret;
+    PVFS_object_attr *attr = &s_op->req->u.create.attr;
 
-    if (s_op->req->capability.op_mask & PINT_CAP_CREATE)
+    if (!(s_op->req->capability.op_mask & PINT_CAP_CREATE))
     {
-        ret = 0;
+        return -PVFS_EACCES;
     }
-    else
+
+    if (attr->mask & PVFS_ATTR_COMMON_UID)
     {
-        ret = -PVFS_EACCES;
+        if (!(s_op->req->capability.op_mask & PINT_CAP_ADMIN) &&
+            (s_op->req->u.create.credential.userid != attr->owner))
+        {
+            return -PVFS_EPERM;
+        }
     }
 
-    return ret;
+    if (attr->mask & PVFS_ATTR_COMMON_GID)
+    {
+        PVFS_credential *cred = &s_op->req->u.create.credential;
+        int i;
+
+        if (!(s_op->req->capability.op_mask & PINT_CAP_ADMIN))
+        {
+            for (i = 0; i < cred->num_groups; i++)
+            {
+                if (cred->group_array[i] == attr->group)
+                {
+                    break;
+                }
+            }
+            /* no group matches */
+            if (i >= cred->num_groups)
+            {
+                return -PVFS_EPERM;
+            }
+        }
+    }
+
+    return 0;
 }
 
 struct PINT_server_req_params pvfs2_create_params =

Index: del-eattr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/del-eattr.sm,v
diff -p -u -r1.15.2.3 -r1.15.2.4
--- del-eattr.sm	13 Jun 2008 19:59:40 -0000	1.15.2.3
+++ del-eattr.sm	25 Aug 2009 17:56:26 -0000	1.15.2.4
@@ -167,7 +167,7 @@ static PINT_sm_action deleattr_delobj_ea
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: event-mon.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/event-mon.sm,v
diff -p -u -r1.10.2.2 -r1.10.2.3
--- event-mon.sm	13 Jun 2008 19:59:40 -0000	1.10.2.2
+++ event-mon.sm	25 Aug 2009 17:56:26 -0000	1.10.2.3
@@ -82,10 +82,6 @@ static PINT_sm_action event_mon_do_work(
     s_op->resp.u.mgmt_event_mon.event_count = 
 	s_op->req->u.mgmt_event_mon.event_count;
 
-    /* get events */
-    PINT_event_retrieve(s_op->resp.u.mgmt_event_mon.event_array,
-	s_op->req->u.mgmt_event_mon.event_count);
-
     js_p->error_code = 0;
     return SM_ACTION_COMPLETE;
 }

Index: final-response.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/final-response.sm,v
diff -p -u -r1.38 -r1.38.2.1
--- final-response.sm	11 Feb 2008 17:25:29 -0000	1.38
+++ final-response.sm	25 Aug 2009 17:56:27 -0000	1.38.2.1
@@ -136,7 +136,8 @@ static PINT_sm_action final_response_sen
         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
         s_op->encoded.list_count, s_op->encoded.total_size, s_op->tag,
         s_op->encoded.buffer_type, 0, smcb, 0, js_p, &tmp_id,
-        server_job_context, user_opts->server_job_bmi_timeout);
+        server_job_context, user_opts->server_job_bmi_timeout,
+        s_op->req->hints);
 
     return ret;
 }
@@ -224,6 +225,9 @@ static void PINT_gossip_err_server_resp(
                         break;
                     case PVFS_TYPE_DIRDATA:
                         gossip_err("DIRDATA [ n/a ]\n");
+                        break;
+                    case PVFS_TYPE_INTERNAL:
+                        gossip_err("INTERNAL [ n/a ]\n");
                         break;
                     case PVFS_TYPE_NONE:
                         gossip_err("NONE [ n/a ]\n");

Index: flush.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/flush.sm,v
diff -p -u -r1.23.2.2 -r1.23.2.3
--- flush.sm	13 Jun 2008 19:59:40 -0000	1.23.2.2
+++ flush.sm	25 Aug 2009 17:56:27 -0000	1.23.2.3
@@ -134,7 +134,7 @@ static PINT_sm_action flush_keyval_flush
         0,
         js_p,
         &tmp_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -185,7 +185,7 @@ static PINT_sm_action flush_bstream_flus
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: get-attr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/get-attr.sm,v
diff -p -u -r1.93.2.23 -r1.93.2.24
--- get-attr.sm	8 Aug 2008 20:25:46 -0000	1.93.2.23
+++ get-attr.sm	25 Aug 2009 17:56:27 -0000	1.93.2.24
@@ -12,7 +12,7 @@
  * This state machine handles incoming server getattr operations.  These
  * are the operations sent by PVFS_sys_getattr() among others.
  *
- * The pvfs2_prelude_sm is responsible for reading the actual metadata
+ * The pvfs2_prelude_sm is respsonsible for reading the actual metadata
  * to begin with, because it does this as part of the permission checking
  * process.
  */
@@ -28,6 +28,7 @@
 #include "pvfs2-util.h"
 #include "pint-util.h"
 #include "pvfs2-internal.h"
+#include "pint-cached-config.h"
 #include "pint-security.h"
 #include "security-util.h"
 #include "check.h"
@@ -42,11 +43,12 @@ PINT_server_trove_keys_s Trove_Special_K
 
 enum
 {
-    STATE_METAFILE = 7,
-    STATE_SYMLINK  = 9,
-    STATE_DIR      = 10,
-    STATE_DIR_HINT = 11,
-    STATE_DONE     = 12
+    STATE_METAFILE   = 7,
+    STATE_SYMLINK    = 9,
+    STATE_DIR        = 10,
+    STATE_DIR_HINT   = 11,
+    STATE_DONE       = 12,
+    STATE_CAPABILITY = 13
 };
 
 static void free_nested_getattr_data(struct PINT_server_op *s_op);
@@ -61,13 +63,13 @@ nested machine pvfs2_get_attr_work_sm
         STATE_SYMLINK => read_symlink_target;
         STATE_METAFILE => read_metafile_hint;
         STATE_DIR => get_dirdata_handle;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
 
     state read_symlink_target
     {
         run getattr_read_symlink_target;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
 
     state read_metafile_hint
@@ -80,40 +82,60 @@ nested machine pvfs2_get_attr_work_sm
     {
         run getattr_interpret_metafile_hint;
         STATE_METAFILE => read_metafile_datafile_handles_if_required;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
 
     state read_metafile_datafile_handles_if_required
     {
         run getattr_read_metafile_datafile_handles_if_required;
         success => datafile_handles_safety_check;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
 
     state datafile_handles_safety_check
     {
         run getattr_datafile_handles_safety_check;
         success => read_metafile_distribution_if_required; 
-        default => setup_acl;
+        default => check_if_capability_required;
     }
-
+ 
     state read_metafile_distribution_if_required
     {
         run getattr_read_metafile_distribution_if_required;
-        default => distribution_safety_check;
+        default => interpret_metafile_distribution;
+    }
+
+    state interpret_metafile_distribution
+    {
+        run interpret_metafile_distribution;
+        success => detect_stuffed;
+        default => check_if_capability_required;
+    }
+
+    state detect_stuffed
+    {
+        run getattr_detect_stuffed;
+        default => read_stuffed_size;
     }
 
-    state distribution_safety_check
+    state read_stuffed_size
     {
-        run getattr_distribution_safety_check;
-        default => setup_acl;
+        run getattr_read_stuffed_size;
+        success => interpret_stuffed_size;
+        default => check_if_capability_required;
+    }
+
+    state interpret_stuffed_size
+    {
+        run getattr_interpret_stuffed_size;
+        default => check_if_capability_required;
     }
 
     state get_dirdata_handle
     {
         run getattr_get_dirdata_handle;
         success => get_dirent_count;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
 
     state get_dirent_count
@@ -132,19 +154,32 @@ nested machine pvfs2_get_attr_work_sm
     state get_dir_hint
     {
         run getattr_get_dir_hint;
-        STATE_DONE => setup_resp;
+        STATE_DONE => check_if_capability_required;
         default => interpret_dir_hint;
     }
 
     state interpret_dir_hint
     {
         run getattr_interpret_dir_hint;
-        default => setup_acl;
+        default => check_if_capability_required;
     }
-    
-    state setup_acl
+
+    state check_if_capability_required
     {
-        run getattr_setup_acl;
+        run getattr_check_if_capability_required;
+        STATE_CAPABILITY => read_acl;
+        default => setup_resp;
+    }
+
+    state read_acl
+    {
+        run getattr_read_acl;
+        default => create_capability;
+    }
+
+    state create_capability
+    {
+        run getattr_create_capability;
         default => setup_resp;
     }
 
@@ -167,7 +202,8 @@ machine pvfs2_get_attr_sm
     state setup_op
     {
         run getattr_setup_op;
-        default => do_work;
+        success => do_work;
+        default => final_response;
     }
 
     state do_work
@@ -191,60 +227,6 @@ machine pvfs2_get_attr_sm
 
 %%
 
-
-/* getattr_setup_acl
- *
- * Grabs the ACL from trove when the client requests a capability
- */
-PINT_sm_action getattr_setup_acl(
-    struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -PVFS_EINVAL;
-    job_id_t i;
-    js_p->error_code = 0;
-
-    gossip_debug(GOSSIP_SERVER_DEBUG,
-                 "(%p) %s (getattr sm) state: getattr_setup_acl\n",
-                 s_op, PINT_map_server_op_to_string(s_op->req->op));
-
-    if (s_op->u.getattr.attrmask & PVFS_ATTR_CAPABILITY)
-    {
-        memset(&s_op->key2, 0, sizeof(PVFS_ds_keyval));
-        memset(&s_op->val2, 0, sizeof(PVFS_ds_keyval));
-        s_op->key2.buffer = "system.posix_acl_access";
-        s_op->key2.buffer_sz = strlen(s_op->key2.buffer) + 1;
-        s_op->val2.buffer = (char *) malloc(PVFS_REQ_LIMIT_VAL_LEN);
-        if (!s_op->val2.buffer)
-        {
-            js_p->error_code = -PVFS_ENOMEM;
-            return SM_ACTION_COMPLETE;
-        }
-        s_op->val2.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
-    
-        gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
-                     "for handle %llu\n", llu(s_op->target_handle));
-    
-        /* Read acl keys */
-        ret = job_trove_keyval_read(
-            s_op->target_fs_id,
-            s_op->target_handle,
-            &s_op->key2,
-            &s_op->val2,
-            0,
-            NULL,
-            smcb,
-            0,
-            js_p,
-            &i,
-            server_job_context);
-        return ret;
-    }
-    else 
-        return SM_ACTION_COMPLETE;
-}
-
-
 /* getattr_verify_attribs()
  *
  * We initialize the attribute mask that will be returned in this
@@ -324,128 +306,130 @@ static PINT_sm_action getattr_verify_att
       have the original client request attr mask
       (s_op->u.getattr.attrmask).
     */
-    if (resp_attr->objtype == PVFS_TYPE_METAFILE)
+    switch(resp_attr->objtype)
     {
-        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: metafile\n");
-        gossip_debug(GOSSIP_GETATTR_DEBUG,
-                     "  Req handle %llu refers to a metafile\n",
-                     llu(s_op->u.getattr.handle));
-
-        if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES)
-        {
+        case PVFS_TYPE_METAFILE:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: metafile\n");
             gossip_debug(GOSSIP_GETATTR_DEBUG,
-                         " dspace has dfile_count of %d\n",
-                         resp_attr->u.meta.dfile_count);
-            resp_attr->mask |= PVFS_ATTR_META_DFILES;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
-                         "dfile info, clearing response attr mask\n");
-            resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
-        }
+                         "  Req handle %llu refers to a metafile\n",
+                         llu(s_op->u.getattr.handle));
 
-        if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                         " dspace has dist size of %d\n",
-                         resp_attr->u.meta.dist_size);
+            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES)
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                             " dspace has dfile_count of %d\n",
+                             resp_attr->u.meta.dfile_count);
+                resp_attr->mask |= PVFS_ATTR_META_DFILES;
+            }
+            else
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
+                             "dfile info, clearing response attr mask\n");
+                resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
+            }
 
-            resp_attr->mask |= PVFS_ATTR_META_DIST;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
-                         "dist info, clearing response attr mask\n");
+            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                             " dspace has dist size of %d\n",
+                             resp_attr->u.meta.dist_size);
 
-            resp_attr->mask &= ~PVFS_ATTR_META_DIST;
-        }
-        js_p->error_code = STATE_METAFILE;
-    }
-    else if (resp_attr->objtype == PVFS_TYPE_DATAFILE)
-    {
-        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: datafile\n");
-        /*
-          note: the prelude already retrieved the size for us, so
-          there's no special action that needs to be taken if we have
-          a datafile here (other than adjusting our mask to include
-          the data information and copying the retrieved size from the
-          ds_attribute the prelude used)
-        */
-        resp_attr->u.data.size = s_op->ds_attr.b_size;
-        resp_attr->mask |= PVFS_ATTR_DATA_ALL;
+                resp_attr->mask |= PVFS_ATTR_META_DIST;
+            }
+            else
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
+                             "dist info, clearing response attr mask\n");
 
-    gossip_debug(GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
-                     "a datafile (size = %lld).\n",
-                     llu(s_op->u.getattr.handle),
-                     lld(resp_attr->u.data.size));
-    }
-    else if (resp_attr->objtype == PVFS_TYPE_DIRECTORY)
-    {
-        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: directory\n");
-        if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                         " getattr: dirent_count needed.\n");
-            assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
-            resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_COUNT;
-            js_p->error_code = STATE_DIR;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                         " getattr: dirent_count not needed.\n");
-            js_p->error_code = 0;
-            assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
-        }
-        if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT)
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                        " getattr: dfile_count needed.\n");
-            assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
-            resp_attr->mask |= PVFS_ATTR_DIR_HINT;
-            js_p->error_code = STATE_DIR;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                        " getattr: dfile_count not needed\n");
+                resp_attr->mask &= ~PVFS_ATTR_META_DIST;
+            }
+            js_p->error_code = STATE_METAFILE;
+            break;
+        case PVFS_TYPE_DATAFILE:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: datafile\n");
+            /*
+              note: the prelude already retrieved the size for us, so
+              there's no special action that needs to be taken if we have
+              a datafile here (other than adjusting our mask to include
+              the data information and copying the retrieved size from the
+              ds_attribute the prelude used)
+            */
+            resp_attr->u.data.size = s_op->ds_attr.u.datafile.b_size;
+            resp_attr->mask |= PVFS_ATTR_DATA_ALL;
+
+            gossip_debug(GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
+                         "a datafile (size = %lld).\n",
+                         llu(s_op->u.getattr.handle),
+                         lld(resp_attr->u.data.size));
+            break;
+        case PVFS_TYPE_DIRECTORY:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: directory\n");
+            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                             " getattr: dirent_count needed.\n");
+                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
+                resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_COUNT;
+                js_p->error_code = STATE_DIR;
+            }
+            else
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                             " getattr: dirent_count not needed.\n");
+                js_p->error_code = 0;
+                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
+            }
+            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT)
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                            " getattr: dfile_count needed.\n");
+                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
+                resp_attr->mask |= PVFS_ATTR_DIR_HINT;
+                js_p->error_code = STATE_DIR;
+            }
+            else
+            {
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                            " getattr: dfile_count not needed\n");
+                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
+            }
+            break;
+        case PVFS_TYPE_DIRDATA:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: dirdata\n");
+            gossip_debug(
+                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
+                "a dirdata object. doing nothing special\n",
+                llu(s_op->u.getattr.handle));
             assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
-        }
-    }
-    else if (resp_attr->objtype == PVFS_TYPE_DIRDATA)
-    {
-        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: dirdata\n");
-    gossip_debug(
-            GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
-            "a dirdata object. doing nothing special\n",
-            llu(s_op->u.getattr.handle));
-        assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
-    }
-    else if (resp_attr->objtype == PVFS_TYPE_SYMLINK)
-    {
-        PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
-    gossip_debug(
-            GOSSIP_GETATTR_DEBUG, "  handle %llu refers to a symlink.\n",
-            llu(s_op->u.getattr.handle));
+            break;
+        case PVFS_TYPE_SYMLINK:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
+            gossip_debug(
+                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to a symlink.\n",
+                llu(s_op->u.getattr.handle));
 
-        /*
-          we'll definitely have to fetch the symlink target in this
-          case, as the prelude will never retrieve it for us
-        */
-        js_p->error_code = STATE_SYMLINK;
-    }
-    else
-    {
-        /* if we don't understand the object type, then it probably indicates
-         * a bug or some data corruption.  All trove objects should have a
-         * type set.
-         */
-        gossip_err(
-            "Error: got unknown type when verifying attributes for handle %llu.\n", 
-            llu(s_op->u.getattr.handle));
-        js_p->error_code = -PVFS_ENXIO;
+            /*
+              we'll definitely have to fetch the symlink target in this
+              case, as the prelude will never retrieve it for us
+            */
+            js_p->error_code = STATE_SYMLINK;
+            break;
+        case PVFS_TYPE_INTERNAL:
+            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
+            /* nothing interesting to add; this is meaningless to a client */
+            break;
+        default:
+            /* if we don't understand the object type, then it probably indicates
+             * a bug or some data corruption.  All trove objects should have a
+             * type set.
+             */
+            gossip_err(
+                "Error: got unknown type when verifying attributes for handle %llu.\n", 
+                llu(s_op->u.getattr.handle));
+            js_p->error_code = -PVFS_ENXIO;
+            break;
     }
+
     return SM_ACTION_COMPLETE;
 }
 
@@ -475,11 +459,11 @@ static PINT_sm_action getattr_read_symli
 
     s_op->resp.u.getattr.attr.u.sym.target_path_len = PVFS_NAME_MAX;
     s_op->resp.u.getattr.attr.u.sym.target_path =
-    malloc(s_op->resp.u.getattr.attr.u.sym.target_path_len);
+	malloc(s_op->resp.u.getattr.attr.u.sym.target_path_len);
     if (!s_op->resp.u.getattr.attr.u.sym.target_path)
     {
-    js_p->error_code = -PVFS_ENOMEM;
-    return SM_ACTION_COMPLETE;
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
     }
 
     if(s_op->free_val)
@@ -493,10 +477,11 @@ static PINT_sm_action getattr_read_symli
 
     ret = job_trove_keyval_read(
         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
-        &(s_op->key), &(s_op->val), 
-        0, 
+        &s_op->key, &s_op->val,
+        0,
         NULL, smcb, 0, js_p,
-        &i, server_job_context);
+        &i, server_job_context, s_op->req->hints);
+
 
     return ret;
 }
@@ -535,7 +520,7 @@ static PINT_sm_action getattr_interpret_
         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
     }
-    return 1;
+    return SM_ACTION_COMPLETE;
 }
 
 static PINT_sm_action getattr_read_metafile_hint(
@@ -571,19 +556,19 @@ static PINT_sm_action getattr_read_metaf
     s_op->free_val = 1;
 
     gossip_debug(GOSSIP_GETATTR_DEBUG,
-         "  reading metafile hint (coll_id = %d, "
+		 "  reading metafile hint (coll_id = %d, "
                  "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
-         s_op->u.getattr.fs_id,
-         llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
-         s_op->key.buffer_sz, s_op->val.buffer,
-         s_op->val.buffer_sz);
+		 s_op->u.getattr.fs_id,
+		 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
+		 s_op->key.buffer_sz, s_op->val.buffer,
+		 s_op->val.buffer_sz);
 
     ret = job_trove_keyval_read(
         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
         &s_op->key, &s_op->val, 
         0, 
         NULL, smcb, 0, js_p,
-        &i, server_job_context);
+        &i, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -619,7 +604,7 @@ static PINT_sm_action getattr_read_metaf
     {
         gossip_err("The requested dfile count of %d is invalid; "
                    "aborting operation.\n", dfile_count);
-    gossip_err(
+	gossip_err(
             "+ attrs read from dspace: (owner = %d, group = %d, "
             "perms = %o, type = %d\n   atime = %lld, mtime = %lld, "
             "ctime = %lld |\n   dfile_count = %d | dist_size = %d)\n",
@@ -633,16 +618,16 @@ static PINT_sm_action getattr_read_metaf
             (int)s_op->resp.u.getattr.attr.u.meta.dfile_count,
             (int)s_op->resp.u.getattr.attr.u.meta.dist_size);
 
-    gossip_err("handle: %llu (%llx), fsid: %d.\n",
-        llu(s_op->u.getattr.handle), llu(s_op->u.getattr.handle),
-        (int)s_op->u.getattr.fs_id);
+	gossip_err("handle: %llu (%llx), fsid: %d.\n",
+	    llu(s_op->u.getattr.handle), llu(s_op->u.getattr.handle),
+	    (int)s_op->u.getattr.fs_id);
 
         /*If we hit an error the DIST & DFILES are no longer valid*/
         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
         
-    js_p->error_code = -PVFS_EOVERFLOW;
-    return SM_ACTION_COMPLETE;
+	js_p->error_code = -PVFS_EOVERFLOW;
+	return SM_ACTION_COMPLETE;
     }
 
     s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
@@ -657,8 +642,8 @@ static PINT_sm_action getattr_read_metaf
     {
         gossip_err("Cannot allocate dfile array of count %d\n",
                    dfile_count);
-    js_p->error_code = -PVFS_ENOMEM;
-    return SM_ACTION_COMPLETE;
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
     }
 
     if(s_op->free_val)
@@ -671,19 +656,19 @@ static PINT_sm_action getattr_read_metaf
     s_op->free_val = 0;
 
     gossip_debug(GOSSIP_GETATTR_DEBUG,
-         "  reading %d datafile handles (coll_id = %d, "
+		 "  reading %d datafile handles (coll_id = %d, "
                  "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
-         dfile_count, s_op->u.getattr.fs_id,
-         llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
-         s_op->key.buffer_sz, s_op->val.buffer,
-         s_op->val.buffer_sz);
+		 dfile_count, s_op->u.getattr.fs_id,
+		 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
+		 s_op->key.buffer_sz, s_op->val.buffer,
+		 s_op->val.buffer_sz);
 
     ret = job_trove_keyval_read(
         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
-        &s_op->key, &s_op->val, 
-        0, 
+        &s_op->key, &s_op->val,
+        0,
         NULL, smcb, 0, js_p,
-        &i, server_job_context);
+        &i, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -738,94 +723,315 @@ static PINT_sm_action getattr_read_metaf
     {
         gossip_err("Cannot allocate dist of size %d\n",
                    s_op->val.buffer_sz);
-    js_p->error_code = -PVFS_ENOMEM;
-    return SM_ACTION_COMPLETE;
+	js_p->error_code = -PVFS_ENOMEM;
+	return SM_ACTION_COMPLETE;
     }
     s_op->free_val = 1;
 
     ret = job_trove_keyval_read(
         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
-        &(s_op->key), &(s_op->val), 
-        0, 
+        &(s_op->key), &(s_op->val),
+        0,
         NULL,
-        smcb, 0, js_p, &i, server_job_context);
+        smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
 
     return ret;
 }
 
-static PINT_sm_action getattr_setup_resp(
+static PINT_sm_action getattr_read_stuffed_size(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t job_id;
+
+    if(js_p->error_code == -TROVE_ENOENT)
+    {
+        gossip_debug(
+            GOSSIP_GETATTR_DEBUG, "Getattr detected non-stuffed file.\n");
+        /* this means that the keyval fields used to indicate a file is
+         * stuffed are not present.  Set mask accordingly and continue.
+         */
+        s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
+        js_p->error_code = 0;
+        return SM_ACTION_COMPLETE;
+    }
+    if(js_p->error_code)
+    {
+        /* any other error code here is just a normal error case */
+        /* preserve error code and catch next error transition */
+        return SM_ACTION_COMPLETE;
+    }
+
+    gossip_debug(
+        GOSSIP_GETATTR_DEBUG, "Getattr detected stuffed file.\n");
+    /* otherwise, we found keyval fields indicating that the file is
+     * stuffed.  It does not matter if the client asked for the size or not;
+     * we must retrieve a valid stuffed_size value for the attrs.
+     */
+    s_op->resp.u.getattr.attr.mask &= (~(PVFS_ATTR_META_UNSTUFFED));
+
+    return(job_trove_dspace_getattr(
+        s_op->u.getattr.fs_id,
+        s_op->resp.u.getattr.attr.u.meta.dfile_array[0],
+        smcb,
+        &s_op->ds_attr,
+        0,
+        js_p,
+        &job_id,
+        server_job_context,
+        s_op->req->hints));
+}
+
+static PINT_sm_action getattr_interpret_stuffed_size(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    if(js_p->error_code == 0)
+    {
+        s_op->resp.u.getattr.attr.u.meta.stuffed_size = s_op->ds_attr.u.datafile.b_size;
+    }
+
+    /* deliberately leave error_code unchanged so that any errors get 
+     * handled in the next state
+     */
+    return SM_ACTION_COMPLETE;
+}
+
+
+/* interpret_metafile_distribution()
+ *
+ * capture and encode results of reading distribution
+ */
+static PINT_sm_action interpret_metafile_distribution(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
     PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
     
-    /* initial filling out and signing of capability structure */
-    if (s_op->u.getattr.attrmask & PVFS_ATTR_CAPABILITY)
+    if(js_p->error_code < 0)
     {
-    	resp_attr->mask |= PVFS_ATTR_CAPABILITY;
-        js_p->error_code = PINT_init_capability(&(resp_attr->capability));
-        if (js_p->error_code)
+        return SM_ACTION_COMPLETE;
+    }
+
+    if(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
+    {
+        /* successfully read dist key; make sure we got something valid */
+        if(s_op->val.read_sz != s_op->val.buffer_sz)
         {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                "Getattr: Capability init failed\n");
+            gossip_err("Error: %s key found val size: %d when expecting val size: %d\n",
+                Trove_Common_Keys[METAFILE_DIST_KEY].key,
+                s_op->val.read_sz,
+                s_op->val.buffer_sz);
+
+            /* clear bitmask to prevent double free between setup_resp and
+             * PINT_free_object_attr()
+             */
+            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
+
+            js_p->error_code = -PVFS_EIO;
             return SM_ACTION_COMPLETE;
         }
-        if (s_op->resp.u.getattr.attr.u.meta.dfile_array)
-        {
-            /* Metafile is in position 0, datafiles if requested in the rest */
-            resp_attr->capability.num_handles = 
-                s_op->resp.u.getattr.attr.u.meta.dfile_count + 1;
-            resp_attr->capability.handle_array = (PVFS_handle *)malloc(
-                sizeof(PVFS_handle) * resp_attr->capability.num_handles);
-            if (resp_attr->capability.handle_array == NULL)
-            {
-                gossip_debug(GOSSIP_GETATTR_DEBUG,
-                    "Getattr: Malloc for handle array failed\n");
-                js_p->error_code = -PVFS_ENOMEM;
-                return SM_ACTION_COMPLETE;
-            }
-            memcpy(resp_attr->capability.handle_array + 1, 
-                s_op->resp.u.getattr.attr.u.meta.dfile_array,
-                sizeof(PVFS_handle) *
-                s_op->resp.u.getattr.attr.u.meta.dfile_count);
-        }
-        else
-        {
-            resp_attr->capability.handle_array = (PVFS_handle *)malloc(
-                sizeof(PVFS_handle));
-            if (resp_attr->capability.handle_array == NULL)
-            {
-                gossip_debug(GOSSIP_GETATTR_DEBUG,
-                    "Getattr: Malloc for handle array failed\n");
-                js_p->error_code = -PVFS_ENOMEM;
-                return SM_ACTION_COMPLETE;
-            }
-            resp_attr->capability.num_handles = 1;
-        }
-        resp_attr->capability.handle_array[0] = s_op->u.getattr.handle;
-        
-        /* TODO:  Optimize function for speed */
-        PINT_getattr_check_perms(smcb, s_op->req->u.getattr.credential.userid, 
-            s_op->req->u.getattr.credential.group_array, 
-            s_op->req->u.getattr.credential.num_groups, 
-            s_op->attr, &resp_attr->capability.op_mask);
-            
-        /* free ACL buffer */
-        free(s_op->val2.buffer);
-
-        resp_attr->capability.fsid = s_op->u.getattr.fs_id;
-            
-        if (PINT_sign_capability(&(resp_attr->capability)))
-        {
-            /* sign failed */
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                "Getattr: Capability sign failed\n");
-            PINT_release_capability(&(resp_attr->capability));
-            js_p->error_code = -PVFS_EINVAL;
+
+        assert(s_op->val.buffer);
+        PINT_dist_decode(&resp_attr->u.meta.dist, s_op->val.buffer);
+
+        if(resp_attr->u.meta.dist == 0) {
+            gossip_err("Found dist of 0 for handle %llu,%d\n",
+                    llu(s_op->u.getattr.handle), s_op->u.getattr.fs_id);
+            PVFS_perror("Metafile getattr_setup_resp",js_p->error_code);
+            js_p->error_code = -PVFS_EIO;
             return SM_ACTION_COMPLETE;
         }
     }
 
+    js_p->error_code = 0;
+    return SM_ACTION_COMPLETE;
+}
+
+/* check_if_capability_required
+ *
+ * Branch point for capability creation.
+ */
+static PINT_sm_action getattr_check_if_capability_required(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    /* errors fall through */
+    if (js_p->error_code < 0)
+    {
+        return SM_ACTION_COMPLETE;
+    }
+
+    if (s_op->u.getattr.attrmask & PVFS_ATTR_CAPABILITY)
+    {
+        js_p->error_code = STATE_CAPABILITY;
+    }
+    else
+    {
+        js_p->error_code = 0;
+    }
+
+    return SM_ACTION_COMPLETE;
+}
+
+/* getattr_read_acl
+ * 
+ * Reads any ACL data from Trove if a capability is requested.
+ */
+static PINT_sm_action getattr_read_acl(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+    int ret;
+
+    gossip_debug(GOSSIP_SERVER_DEBUG,
+                 "(%p) %s (getattr sm) state: getattr_read_acl\n",
+                 s_op, PINT_map_server_op_to_string(s_op->req->op));
+
+    assert(s_op->u.getattr.attrmask & PVFS_ATTR_CAPABILITY);
+
+    /* nlmills: TODO: make key a constant */
+    s_op->key.buffer = "system.posix_acl_access";
+    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
+    if(s_op->free_val)
+    {
+        free(s_op->val.buffer);
+        s_op->free_val = 0;
+    }
+    s_op->val.buffer = malloc(PVFS_REQ_LIMIT_VAL_LEN);
+    if (!s_op->val.buffer)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
+    s_op->free_val = 1;
+    
+    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
+                     "for handle %llu\n", llu(s_op->target_handle));
+
+    ret = job_trove_keyval_read(
+        s_op->u.getattr.fs_id,
+        s_op->u.getattr.handle,
+        &s_op->key,
+        &s_op->val,
+        0,
+        NULL,
+        smcb,
+        0,
+        js_p,
+        &tmp_id,
+        server_job_context,
+        s_op->req->hints);
+
+    return ret;
+}
+
+/* getattr_create_capability
+ * 
+ * nlmills: TODO: document me
+ */
+static PINT_sm_action getattr_create_capability(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
+    void *acl_buf;
+    size_t acl_size;
+    int ret;
+
+    if (js_p->error_code == -TROVE_ENOENT)
+    {
+        acl_buf = NULL;
+        acl_size = 0;
+        js_p->error_code = 0;
+    }
+    else if (js_p->error_code < 0)
+    {
+        /* let fatal errors fall through */
+        return SM_ACTION_COMPLETE;
+    }
+    else
+    {
+        acl_buf = s_op->val.buffer;
+        acl_size = s_op->val.read_sz;
+    }
+
+    ret = PINT_init_capability(&resp_attr->capability);
+    if (ret < 0)
+    {
+        gossip_debug(GOSSIP_GETATTR_DEBUG,
+                     "getattr: capability init failed\n");
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+
+    resp_attr->capability.num_handles = 1;
+
+    if ((resp_attr->objtype == PVFS_TYPE_METAFILE) &&
+        (resp_attr->mask & PVFS_ATTR_META_DFILES))
+    {
+        resp_attr->capability.num_handles += resp_attr->u.meta.dfile_count;
+    }
+
+    resp_attr->capability.handle_array = 
+        calloc(resp_attr->capability.num_handles, sizeof(PVFS_handle));
+    if (!resp_attr->capability.handle_array)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return SM_ACTION_COMPLETE;
+    }
+
+    resp_attr->capability.handle_array[0] = s_op->u.getattr.handle;
+    if ((resp_attr->objtype == PVFS_TYPE_METAFILE) &&
+        (resp_attr->mask & PVFS_ATTR_META_DFILES))
+    {
+        /* copy datafile handles after the metafile handle */
+        memcpy((resp_attr->capability.handle_array + 1),
+               resp_attr->u.meta.dfile_array,
+               resp_attr->u.meta.dfile_count * sizeof(PVFS_handle));
+    }
+
+    ret = PINT_get_capabilities(acl_buf,
+                                acl_size,
+                                s_op->u.getattr.credential.userid,
+                                s_op->u.getattr.credential.group_array,
+                                s_op->u.getattr.credential.num_groups,
+                                resp_attr,
+                                &resp_attr->capability.op_mask);
+    if (ret < 0)
+    {
+        PINT_cleanup_capability(&resp_attr->capability);
+        js_p->error_code = ret;
+        return SM_ACTION_COMPLETE;
+    }
+    
+    resp_attr->capability.fsid = s_op->u.getattr.fs_id;
+
+    ret = PINT_sign_capability(&resp_attr->capability);
+    if (ret < 0)
+    {
+        gossip_debug(GOSSIP_GETATTR_DEBUG,
+                     "getattr: failed to sign capability\n");
+        PINT_cleanup_capability(&resp_attr->capability);
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
+                                 
+    resp_attr->mask |= PVFS_ATTR_CAPABILITY;
+
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action getattr_setup_resp(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
+
     if(js_p->error_code > 0)
     {
         /* if we reach this state with a positive error code it means that
@@ -848,6 +1054,7 @@ static PINT_sm_action getattr_setup_resp
         resp_attr->owner, resp_attr->group, resp_attr->perms,
         resp_attr->objtype, llu(resp_attr->atime),
         llu(resp_attr->mtime), llu(resp_attr->ctime),
+
         (int)resp_attr->u.meta.dist_size);
 
     if (resp_attr->objtype == PVFS_TYPE_METAFILE)
@@ -865,18 +1072,7 @@ static PINT_sm_action getattr_setup_resp
 
         if (resp_attr->mask & PVFS_ATTR_META_DIST)
         {
-            assert(s_op->val.buffer);
-            PINT_dist_decode(&resp_attr->u.meta.dist, s_op->val.buffer);
-
-            if(resp_attr->u.meta.dist == 0) {
-                gossip_err("Found dist of 0 for handle %llu,%d\n",
-                        llu(s_op->u.getattr.handle), s_op->u.getattr.fs_id);
-                PVFS_perror("Metafile getattr_setup_resp",js_p->error_code);
-                free_nested_getattr_data(s_op);
-                js_p->error_code = -PVFS_EIO;
-                return SM_ACTION_COMPLETE;
-            }
-            
+            /* we have already gathered the dist field in an earlier state */
             gossip_debug(GOSSIP_GETATTR_DEBUG,
                          "  also returning dist size of %d\n",
                          resp_attr->u.meta.dist_size);
@@ -892,33 +1088,19 @@ static PINT_sm_action getattr_setup_resp
     else if ((resp_attr->objtype == PVFS_TYPE_SYMLINK) &&
              (resp_attr->mask & PVFS_ATTR_SYMLNK_TARGET))
     {
-        if (js_p->error_code == 0)
-        {
-            assert(resp_attr->u.sym.target_path);
-            assert(resp_attr->u.sym.target_path_len);
-            /*
-              adjust target path len down to actual size ; always
-              include the null termination char in the target_path_len
-            */
-            resp_attr->u.sym.target_path_len =
-                (strlen(resp_attr->u.sym.target_path) + 1);
-
-            gossip_debug(GOSSIP_GETATTR_DEBUG,
-                         "  also returning link target of %s (len %d)\n",
-                         resp_attr->u.sym.target_path,
-                         resp_attr->u.sym.target_path_len);
-        }
-        else
-        {
-            gossip_err("Failed to retrieve symlink target path for "
-                       "handle %llu,%d\n",llu(s_op->u.getattr.handle),
-                       s_op->u.getattr.fs_id);
-            PVFS_perror("Symlink retrieval failure",js_p->error_code);
-
-            free_nested_getattr_data(s_op);
-            js_p->error_code = -PVFS_EINVAL;
-            return SM_ACTION_COMPLETE;
-        }
+        assert(resp_attr->u.sym.target_path);
+        assert(resp_attr->u.sym.target_path_len);
+        /*
+          adjust target path len down to actual size ; always
+          include the null termination char in the target_path_len
+        */
+        resp_attr->u.sym.target_path_len =
+            (strlen(resp_attr->u.sym.target_path) + 1);
+        
+        gossip_debug(GOSSIP_GETATTR_DEBUG,
+                     "  also returning link target of %s (len %d)\n",
+                     resp_attr->u.sym.target_path,
+                     resp_attr->u.sym.target_path_len);
     }
     else if ((resp_attr->objtype == PVFS_TYPE_DIRECTORY) &&
             (resp_attr->mask & PVFS_ATTR_DIR_HINT))
@@ -934,46 +1116,23 @@ static PINT_sm_action getattr_setup_resp
             resp_attr->u.dir.hint.dist_params_len);
     }
 
+    if (resp_attr->mask & PVFS_ATTR_CAPABILITY)
+    {
+        gossip_debug(GOSSIP_GETATTR_DEBUG,
+                     "  also returning capability with mask %#x\n",
+                     resp_attr->capability.op_mask);
+    }
+
     gossip_debug(GOSSIP_GETATTR_DEBUG,"@ End %s attributes: sending "
                  "status %d (error = %d)\n",
                  PINT_util_get_object_type(resp_attr->objtype),
-         s_op->resp.status, js_p->error_code);
+		 s_op->resp.status, js_p->error_code);
 
 #if 0
     gossip_debug(GOSSIP_GETATTR_DEBUG, "returning attrmask ");
     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,
                         s_op->resp.u.getattr.attr.mask);
 #endif
-    if (s_op->u.getattr.attrmask & PVFS_ATTR_CAPABILITY)
-    {
-        gossip_debug(GOSSIP_GETATTR_DEBUG,"Owner: %u\n", 
-            (unsigned int) resp_attr->capability.owner);
-        gossip_debug(GOSSIP_GETATTR_DEBUG,"Op mask: %d\n", 
-            resp_attr->capability.op_mask);
-        gossip_debug(GOSSIP_GETATTR_DEBUG,"Fsid: %d\n", 
-            resp_attr->capability.fsid);
-        gossip_debug(GOSSIP_GETATTR_DEBUG,"Handles: %d\n", 
-            resp_attr->capability.num_handles);
-        if (resp_attr->capability.num_handles > 0) 
-            gossip_debug(GOSSIP_GETATTR_DEBUG,"First handle: %llu\n", 
-                (long long unsigned int) resp_attr->capability.handle_array[0]);
-        gossip_debug(GOSSIP_GETATTR_DEBUG,"Sig size: %d\n", 
-            resp_attr->capability.sig_size);
-        if (resp_attr->capability.signature)
-        {
-            gossip_debug(GOSSIP_GETATTR_DEBUG,"Sig: %d%d%d%d%d%d%d%d%d%d\n", 
-                resp_attr->capability.signature[0],
-                resp_attr->capability.signature[1],
-                resp_attr->capability.signature[2],
-                resp_attr->capability.signature[3],
-                resp_attr->capability.signature[4],
-                resp_attr->capability.signature[5],
-                resp_attr->capability.signature[6],
-                resp_attr->capability.signature[7],
-                resp_attr->capability.signature[8],
-                resp_attr->capability.signature[9]);
-        }
-    }
 
     free_nested_getattr_data(s_op);
     return SM_ACTION_COMPLETE;
@@ -1011,7 +1170,9 @@ static PINT_sm_action getattr_cleanup(
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
 
+    PINT_cleanup_credential(&s_op->u.getattr.credential);
     PINT_free_object_attr(&s_op->resp.u.getattr.attr);
+
     return(server_state_machine_complete(smcb));
 }
 
@@ -1019,11 +1180,18 @@ static PINT_sm_action getattr_setup_op(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int ret;
+
+    memset(&s_op->resp.u.getattr.attr, 0, sizeof(PVFS_object_attr));
+
     s_op->u.getattr.handle = s_op->req->u.getattr.handle;
     s_op->u.getattr.fs_id = s_op->req->u.getattr.fs_id;
     s_op->u.getattr.attrmask = s_op->req->u.getattr.attrmask;
 
-    js_p->error_code = 0;
+    ret = PINT_copy_credential(&s_op->req->u.getattr.credential,
+                               &s_op->u.getattr.credential);
+
+    js_p->error_code = ret;
     return SM_ACTION_COMPLETE;
 }
 
@@ -1057,36 +1225,6 @@ static PINT_sm_action getattr_datafile_h
     return SM_ACTION_COMPLETE;
 }
 
-static PINT_sm_action getattr_distribution_safety_check(
-        struct PINT_smcb *smcb, job_status_s *js_p)
-{
-    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-
-    if((js_p->error_code == 0) &&
-        (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST))
-    {
-        /* successfully read dist key; make sure we got something valid */
-        if(s_op->val.read_sz != s_op->val.buffer_sz)
-        {
-            gossip_err("Error: %s key found val size: %d when expecting val size: %d\n",
-                Trove_Common_Keys[METAFILE_DIST_KEY].key,
-                s_op->val.read_sz,
-                s_op->val.buffer_sz);
-
-            /* clear bitmask to prevent double free between setup_resp and
-             * PINT_free_object_attr()
-             */
-            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
-
-            js_p->error_code = -PVFS_EIO;
-            return SM_ACTION_COMPLETE;
-        }
-    }
-
-    /* otherwise deliberately preserve existing error code */
-    return SM_ACTION_COMPLETE;
-}
-
 static PINT_sm_action getattr_get_dirdata_handle(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
@@ -1113,7 +1251,7 @@ static PINT_sm_action getattr_get_dirdat
         0,
         js_p,
         &tmp_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -1143,7 +1281,7 @@ static PINT_sm_action getattr_get_dirent
         0,
         js_p,
         &tmp_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -1270,7 +1408,7 @@ static PINT_sm_action getattr_get_dir_hi
         s_op->u.getattr.handle,
         s_op->key_a, s_op->val_a, s_op->u.getattr.err_array, NUM_SPECIAL_KEYS,
         0, NULL, smcb, 0, js_p, &tmp_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -1351,6 +1489,39 @@ static PINT_sm_action getattr_interpret_
         js_p->error_code = 0;
     }
     return SM_ACTION_COMPLETE;
+}
+
+/* getattr_detect_stuffed()
+ *
+ * determine if a file is stuffed or not
+ */
+static PINT_sm_action getattr_detect_stuffed(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    job_id_t tmp_id;
+
+    /* we can determine stuffedness by the presence of the dfiles req key */
+
+    s_op->key.buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
+    s_op->key.buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
+    if(s_op->free_val)
+    {
+        free(s_op->val.buffer);
+    }
+    s_op->val.buffer =  &s_op->u.getattr.num_dfiles_req;
+    s_op->val.buffer_sz =  sizeof(s_op->u.getattr.num_dfiles_req);
+    s_op->free_val = 0;
+
+    return(job_trove_keyval_read(
+        s_op->u.getattr.fs_id, 
+        s_op->u.getattr.handle,
+        &(s_op->key), 
+        &(s_op->val), 
+        0, 
+        NULL, smcb, 0, js_p,
+        &tmp_id, server_job_context,
+        s_op->req->hints));
 }
 
 static int perm_getattr(PINT_server_op *s_op)

Index: get-cred.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/Attic/get-cred.sm,v
diff -p -u -r1.1.2.9 -r1.1.2.10
--- get-cred.sm	7 Dec 2008 00:56:52 -0000	1.1.2.9
+++ get-cred.sm	25 Aug 2009 17:56:27 -0000	1.1.2.10
@@ -1,5 +1,5 @@
 /* 
- * Copyright 2008 Clemson University and The University of Chicago
+ * Copyright 2009 Clemson University and The University of Chicago
  *
  * See COPYING in top-level directory.
  */
@@ -17,7 +17,6 @@
 
 %%
 
-/* TODO: lots of work is done here. figure out a way to subdivide it */
 nested machine pvfs2_get_cred_work_sm
 {
     state verify_cert
@@ -151,7 +150,10 @@ static PINT_sm_action get_cred_make_cred
         return SM_ACTION_COMPLETE;
     }
 
-    /* TODO: remove the serial field if it is never used */
+    /* nlmills: TODO: implement fine-grained (per filesystem) security */
+    s_op->u.getcred.credential.fsid = s_op->target_fs_id;
+
+    /* nlmills: TODO: implement serial numbers */
     s_op->u.getcred.credential.serial = 0;
 
     return SM_ACTION_COMPLETE;
@@ -164,7 +166,7 @@ static PINT_sm_action get_cred_sign_cred
     int ret;
 
     ret = PINT_sign_credential(&s_op->u.getcred.credential);
-    if (ret == -1)
+    if (ret < 0)
     {
         gossip_err("Unable to sign credential\n");
         js_p->error_code = -PVFS_EINVAL;
@@ -193,6 +195,16 @@ static PINT_sm_action get_cred_cleanup(s
     return server_state_machine_complete(smcb);
 }
 
+static int get_object_ref_getcred(struct PVFS_server_req *req, 
+                                  PVFS_fs_id *fs_id, 
+                                  PVFS_handle *handle)
+{
+    *fs_id = req->u.getcred.fs_id;
+    *handle = PVFS_HANDLE_NULL;
+
+    return 0;
+}
+
 static int perm_getcred(PINT_server_op *s_op)
 {
     int ret;
@@ -205,6 +217,7 @@ static int perm_getcred(PINT_server_op *
 struct PINT_server_req_params pvfs2_get_cred_params =
 {
     .string_name = "getcred",
+    .get_object_ref = get_object_ref_getcred,
     .perm = perm_getcred,
     .access_type = PINT_server_req_readonly,
     .state_machine = &pvfs2_get_cred_sm

Index: get-eattr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/get-eattr.sm,v
diff -p -u -r1.20.2.3 -r1.20.2.4
--- get-eattr.sm	13 Jun 2008 19:59:40 -0000	1.20.2.3
+++ get-eattr.sm	25 Aug 2009 17:56:28 -0000	1.20.2.4
@@ -167,7 +167,7 @@ static PINT_sm_action geteattr_read_eatt
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: io.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/io.sm,v
diff -p -u -r1.71.2.2 -r1.71.2.3
--- io.sm	13 Jun 2008 19:59:40 -0000	1.71.2.2
+++ io.sm	25 Aug 2009 17:56:28 -0000	1.71.2.3
@@ -103,7 +103,7 @@ static int io_send_ack(
      * failed to get the size, or failed for permission reasons
      */
     s_op->resp.status = js_p->error_code;
-    s_op->resp.u.io.bstream_size = s_op->ds_attr.b_size;
+    s_op->resp.u.io.bstream_size = s_op->ds_attr.u.datafile.b_size;
 
     err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
                       s_op->addr, s_op->decoded.enc_type);
@@ -118,7 +118,8 @@ static int io_send_ack(
         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
         s_op->encoded.list_count, s_op->encoded.total_size,
         s_op->tag, s_op->encoded.buffer_type, 0, smcb, 0, js_p,
-        &tmp_id, server_job_context, user_opts->server_job_bmi_timeout);
+        &tmp_id, server_job_context, user_opts->server_job_bmi_timeout,
+        s_op->req->hints);
 
     return err;
 }
@@ -157,6 +158,8 @@ static PINT_sm_action io_start_flow(
         return SM_ACTION_COMPLETE;
     }
 
+    s_op->u.io.flow_d->hints = s_op->req->hints;
+
     /* we still have the file size stored in the response structure 
      * that we sent in the previous state, other details come from
      * request
@@ -234,7 +237,8 @@ static PINT_sm_action io_start_flow(
     }
 
     err = job_flow(s_op->u.io.flow_d, smcb, 0, js_p, &tmp_id,
-                   server_job_context, user_opts->server_job_flow_timeout);
+                   server_job_context, user_opts->server_job_flow_timeout
+                   , s_op->req->hints);
 
     return err;
 }
@@ -375,7 +379,8 @@ static PINT_sm_action io_send_completion
         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
         s_op->encoded.list_count, s_op->encoded.total_size, s_op->tag,
         s_op->encoded.buffer_type, 0, smcb, 0, js_p, &tmp_id,
-        server_job_context, user_opts->server_job_bmi_timeout);
+        server_job_context, user_opts->server_job_bmi_timeout,
+        s_op->req->hints);
 
     return err;
 }

Index: iterate-handles.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/iterate-handles.sm,v
diff -p -u -r1.11.2.2 -r1.11.2.3
--- iterate-handles.sm	13 Jun 2008 19:59:40 -0000	1.11.2.2
+++ iterate-handles.sm	25 Aug 2009 17:56:28 -0000	1.11.2.3
@@ -91,18 +91,48 @@ static PINT_sm_action iterate_handles_do
     s_op->resp.u.mgmt_iterate_handles.position
 	= s_op->req->u.mgmt_iterate_handles.position;
 
-    ret = job_trove_dspace_iterate_handles(
-	s_op->req->u.mgmt_iterate_handles.fs_id,
-	s_op->resp.u.mgmt_iterate_handles.position,
-	s_op->resp.u.mgmt_iterate_handles.handle_array,
-	s_op->req->u.mgmt_iterate_handles.handle_count,
-	0,
-	NULL,
-	smcb,
-	0,
-	js_p,
-	&tmp_id,
-	server_job_context);
+    if(s_op->req->u.mgmt_iterate_handles.flags == PVFS_MGMT_RESERVED)
+    {
+        /* for now the only special case reserved handles are those that are
+         * allocated by precreate
+         */
+        ret = job_precreate_pool_iterate_handles(
+            s_op->req->u.mgmt_iterate_handles.fs_id,
+            s_op->resp.u.mgmt_iterate_handles.position,
+            s_op->resp.u.mgmt_iterate_handles.handle_array,
+            s_op->req->u.mgmt_iterate_handles.handle_count,
+            0,
+            NULL,
+            smcb,
+            0,
+            js_p,
+            &tmp_id,
+            server_job_context,
+            s_op->req->hints);
+    }
+    else if(s_op->req->u.mgmt_iterate_handles.flags == 0)
+    {
+        ret = job_trove_dspace_iterate_handles(
+            s_op->req->u.mgmt_iterate_handles.fs_id,
+            s_op->resp.u.mgmt_iterate_handles.position,
+            s_op->resp.u.mgmt_iterate_handles.handle_array,
+            s_op->req->u.mgmt_iterate_handles.handle_count,
+            0,
+            NULL,
+            smcb,
+            0,
+            js_p,
+            &tmp_id,
+            server_job_context);
+    }
+    else
+    {
+        gossip_err("Error: unsupported mgmt_iterate_handles flags: %d\n",
+            s_op->req->u.mgmt_iterate_handles.flags);
+        js_p->error_code = -PVFS_EINVAL;
+        return SM_ACTION_COMPLETE;
+    }
+
     if (ret < 0)
         return ret;  /* error */
     if (ret == 1)

Index: list-attr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/list-attr.sm,v
diff -p -u -r1.10.2.4 -r1.10.2.5
--- list-attr.sm	14 Jun 2008 22:44:45 -0000	1.10.2.4
+++ list-attr.sm	25 Aug 2009 17:56:28 -0000	1.10.2.5
@@ -114,7 +114,8 @@ static PINT_sm_action listattr_read_basi
             0,
             js_p,
             &tmp_id,
-            server_job_context);
+            server_job_context,
+            s_op->req->hints);
 
     return ret;
 }
@@ -273,7 +274,6 @@ static PINT_sm_action listattr_cleanup(s
 
     return(server_state_machine_complete(smcb));
 }
-
 
 static inline int PINT_get_object_ref_listattr(
     struct PVFS_server_req *req, PVFS_fs_id *fs_id, PVFS_handle *handle)

Index: list-eattr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/list-eattr.sm,v
diff -p -u -r1.13.2.4 -r1.13.2.5
--- list-eattr.sm	14 Jun 2008 22:44:45 -0000	1.13.2.4
+++ list-eattr.sm	25 Aug 2009 17:56:28 -0000	1.13.2.5
@@ -142,7 +142,7 @@ static PINT_sm_action listeattr_list_eat
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: lookup.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/lookup.sm,v
diff -p -u -r1.57.2.11 -r1.57.2.12
--- lookup.sm	31 Jul 2008 14:41:23 -0000	1.57.2.11
+++ lookup.sm	25 Aug 2009 17:56:28 -0000	1.57.2.12
@@ -135,7 +135,7 @@ static PINT_sm_action lookup_read_direct
     ret = job_trove_keyval_read(
         s_op->req->u.lookup_path.fs_id, handle, &s_op->key, &s_op->val,
         0, 
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -179,27 +179,12 @@ static PINT_sm_action lookup_read_direct
         &s_op->key, &s_op->val, 
         0, 
         NULL, smcb, 0, js_p, &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
 
 
-/*
- * Function: lookup_cleanup
- *
- * Synopsis: Free memory allocated during request processing.
- *
- * There are a bunch of regions that must be freed after processing
- * completes:
- * - decoded request (s_op->decoded)
- * - encoded request (s_op->unexp_bmi_buff.buffer)
- * - encoded response (s_op->encoded)
- * - original (decoded) response (s_op->resp)
- * - dynamically allocated space (in this case 
- *   s_op->resp.u.lookup_path.handle_array)
- * - the server operation structure itself
- */
 static PINT_sm_action lookup_cleanup(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {

Index: mgmt-get-dirdata-handle.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/mgmt-get-dirdata-handle.sm,v
diff -p -u -r1.10.2.3 -r1.10.2.4
--- mgmt-get-dirdata-handle.sm	29 Jul 2008 22:29:38 -0000	1.10.2.3
+++ mgmt-get-dirdata-handle.sm	25 Aug 2009 17:56:28 -0000	1.10.2.4
@@ -82,7 +82,7 @@ static int mgmt_get_dirdata_handle_from_
         &s_op->key, &s_op->val, 
         0, 
         NULL, smcb, 0, js_p, &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: mgmt-remove-dirent.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/mgmt-remove-dirent.sm,v
diff -p -u -r1.14.2.3 -r1.14.2.4
--- mgmt-remove-dirent.sm	29 Jul 2008 22:29:38 -0000	1.14.2.3
+++ mgmt-remove-dirent.sm	25 Aug 2009 17:56:28 -0000	1.14.2.4
@@ -83,7 +83,7 @@ static int mgmt_remove_dirent_get_dirdat
         &s_op->key, &s_op->val, 
         0, 
         NULL, smcb, 0, js_p, &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -115,7 +115,7 @@ static PINT_sm_action mgmt_remove_dirent
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: mgmt-remove-object.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/mgmt-remove-object.sm,v
diff -p -u -r1.14.2.3 -r1.14.2.4
--- mgmt-remove-object.sm	29 Jul 2008 22:29:38 -0000	1.14.2.3
+++ mgmt-remove-object.sm	25 Aug 2009 17:56:28 -0000	1.14.2.4
@@ -67,7 +67,7 @@ static PINT_sm_action mgmt_remove_dspace
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: mkdir.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/mkdir.sm,v
diff -p -u -r1.51.2.2 -r1.51.2.3
--- mkdir.sm	13 Jun 2008 19:49:58 -0000	1.51.2.2
+++ mkdir.sm	25 Aug 2009 17:56:28 -0000	1.51.2.3
@@ -125,7 +125,7 @@ static PINT_sm_action mkdir_create(
         PVFS_TYPE_DIRECTORY, NULL,
         TROVE_SYNC, 
         smcb, 0, js_p, &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -166,7 +166,7 @@ static PINT_sm_action mkdir_setattrib(
         s_op->u.mkdir.fs_id, s_op->resp.u.mkdir.handle,
         ds_attr, 
         TROVE_SYNC,
-        smcb, 0, js_p, &j_id, server_job_context);
+        smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -198,7 +198,7 @@ static PINT_sm_action mkdir_create_dirda
         s_op->u.mkdir.fs_id, &extent_array, PVFS_TYPE_DIRDATA, NULL,
         TROVE_SYNC,
         smcb, 0, js_p, &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     free(extent_array.extent_array);
     extent_array.extent_array = NULL;
@@ -243,7 +243,7 @@ static PINT_sm_action mkdir_write_dirdat
         s_op->u.mkdir.fs_id, s_op->resp.u.mkdir.handle,
         &s_op->key, &s_op->val, 
         0,
-        NULL, smcb, 0, js_p, &i, server_job_context);
+        NULL, smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: module.mk.in
===================================================================
RCS file: /anoncvs/pvfs2/src/server/module.mk.in,v
diff -p -u -r1.53.2.1 -r1.53.2.2
--- module.mk.in	2 Dec 2008 03:35:52 -0000	1.53.2.1
+++ module.mk.in	25 Aug 2009 17:56:28 -0000	1.53.2.2
@@ -9,6 +9,8 @@ ifdef BUILD_SERVER
 		$(DIR)/setparam.c \
 		$(DIR)/lookup.c \
 		$(DIR)/create.c \
+		$(DIR)/batch-create.c \
+		$(DIR)/batch-remove.c \
 		$(DIR)/crdirent.c \
 		$(DIR)/set-attr.c \
 		$(DIR)/mkdir.c \
@@ -29,7 +31,6 @@ ifdef BUILD_SERVER
 		$(DIR)/final-response.c \
 		$(DIR)/perf-update.c \
 		$(DIR)/perf-mon.c \
-		$(DIR)/event-mon.c \
 		$(DIR)/iterate-handles.c \
 		$(DIR)/job-timer.c \
 		$(DIR)/proto-error.c \
@@ -41,6 +42,8 @@ ifdef BUILD_SERVER
 		$(DIR)/del-eattr.c \
 		$(DIR)/list-eattr.c \
 		$(DIR)/unexpected.c \
+		$(DIR)/precreate-pool-refiller.c \
+		$(DIR)/unstuff.c \
 		$(DIR)/get-cred.c
 
 	# c files that should be added to server library

Index: prelude.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/prelude.sm,v
diff -p -u -r1.74.2.8 -r1.74.2.9
--- prelude.sm	28 Jul 2008 20:50:55 -0000	1.74.2.8
+++ prelude.sm	25 Aug 2009 17:56:29 -0000	1.74.2.9
@@ -17,6 +17,7 @@
 #include "pint-perf-counter.h"
 #include "check.h"
 
+/* nlmills: TODO: fix comment */
 /* prelude state machine:
  * This is a nested state machine that performs initial setup 
  * steps that are common to many server operations.
@@ -142,7 +143,7 @@ static PINT_sm_action prelude_getattr_if
 
     ret = job_trove_dspace_getattr(
         s_op->target_fs_id, s_op->target_handle, smcb, &(s_op->ds_attr),
-        0, js_p, &tmp_id, server_job_context);
+        0, js_p, &tmp_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -161,6 +162,61 @@ static PINT_sm_action prelude_validate(s
     s_op->attr.mask = PVFS_ATTR_COMMON_ALL;
     s_op->target_object_attr = &s_op->attr;
 
+    gossip_debug(GOSSIP_SECURITY_DEBUG, "Received capability with: \n"
+                 "issuer: %s\n"
+                 "fsid: %u\n"
+                 "sig_size: %u\n"
+                 "signature: %p\n"
+                 "timeout: %d\n"
+                 "op_mask: %x\n"
+                 "num_handles: %u\n"
+                 "handle_array: %p\n",
+                 s_op->req->capability.issuer,
+                 s_op->req->capability.fsid,
+                 s_op->req->capability.sig_size,
+                 s_op->req->capability.signature,
+                 (int)s_op->req->capability.timeout,
+                 s_op->req->capability.op_mask,
+                 s_op->req->capability.num_handles,
+                 s_op->req->capability.handle_array
+                 );
+
+    /* nlmills: TODO: find a more secure way to verify credential */
+    if (s_op->op == PVFS_SERV_CREATE)
+    {
+        ret = PINT_verify_credential(&s_op->req->u.create.credential);
+    }
+    else if (s_op->op == PVFS_SERV_MKDIR)
+    {
+        ret = PINT_verify_credential(&s_op->req->u.mkdir.credential);
+    }
+    else if (s_op->op == PVFS_SERV_GETATTR)
+    {
+        ret = PINT_verify_credential(&s_op->req->u.getattr.credential);
+    }
+    else if (s_op->op == PVFS_SERV_SETATTR)
+    {
+        ret = PINT_verify_credential(&s_op->req->u.setattr.credential);
+    }
+    else if (s_op->op == PVFS_SERV_UNSTUFF)
+    {
+        ret = PINT_verify_credential(&s_op->req->u.unstuff.credential);
+    }
+    else
+    {
+        /* pass through all ops that don't take a credential */
+        ret = 1;
+    }
+
+    if (!ret)
+    {
+        gossip_debug(GOSSIP_SECURITY_DEBUG, 
+                     "Credential failed verification.\n");
+        /* nlmills: TODO: find a better error code */
+        js_p->error_code = -PVFS_EAGAIN;
+        return SM_ACTION_COMPLETE;
+    }
+    
     ret = PINT_verify_capability(&s_op->req->capability);
     if (ret)
     {
@@ -168,8 +224,10 @@ static PINT_sm_action prelude_validate(s
     }
     else
     {
-        /* TODO: find a better way to the client 'try again' */
-        ret = -PVFS_EAGAIN;
+        /* nlmills: TODO: find a better way to the client 'try again' */
+        gossip_debug(GOSSIP_SECURITY_DEBUG, "Capability failed verification.\n");
+        js_p->error_code = -PVFS_EAGAIN;
+        return SM_ACTION_COMPLETE;
     }
 
     js_p->error_code = ret;

Index: proto-error.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/proto-error.sm,v
diff -p -u -r1.8.2.3 -r1.8.2.4
--- proto-error.sm	14 Jun 2008 22:44:45 -0000	1.8.2.3
+++ proto-error.sm	25 Aug 2009 17:56:29 -0000	1.8.2.4
@@ -45,8 +45,6 @@ static PINT_sm_action proto_error_init(
     job_id_t tmp_id;
     struct server_configuration_s *user_opts = get_server_config_struct();
 
-    BMI_set_info(s_op->addr, BMI_INC_ADDR_REF, NULL);
-
     s_op->resp.op = PVFS_SERV_PROTO_ERROR;
     s_op->resp.status = -PVFS_EPROTO;
 
@@ -80,7 +78,7 @@ static PINT_sm_action proto_error_init(
 			    js_p,
 			    &tmp_id,
 			    server_job_context,
-			    user_opts->server_job_bmi_timeout);
+			    user_opts->server_job_bmi_timeout, s_op->req->hints);
 
     return ret;
 }

Index: pvfs2-server-req.c
===================================================================
RCS file: /anoncvs/pvfs2/src/server/pvfs2-server-req.c,v
diff -p -u -r1.3.4.2 -r1.3.4.3
--- pvfs2-server-req.c	2 Dec 2008 03:35:52 -0000	1.3.4.2
+++ pvfs2-server-req.c	25 Aug 2009 17:56:29 -0000	1.3.4.3
@@ -35,7 +35,6 @@ extern struct PINT_server_req_params pvf
 extern struct PINT_server_req_params pvfs2_job_timer_params;
 extern struct PINT_server_req_params pvfs2_proto_error_params;
 extern struct PINT_server_req_params pvfs2_perf_mon_params;
-extern struct PINT_server_req_params pvfs2_event_mon_params;
 extern struct PINT_server_req_params pvfs2_iterate_handles_params;
 extern struct PINT_server_req_params pvfs2_get_eattr_params;
 extern struct PINT_server_req_params pvfs2_get_eattr_list_params;
@@ -43,6 +42,11 @@ extern struct PINT_server_req_params pvf
 extern struct PINT_server_req_params pvfs2_set_eattr_list_params;
 extern struct PINT_server_req_params pvfs2_del_eattr_params;
 extern struct PINT_server_req_params pvfs2_list_eattr_params;
+extern struct PINT_server_req_params pvfs2_batch_create_params;
+extern struct PINT_server_req_params pvfs2_batch_remove_params;
+extern struct PINT_server_req_params pvfs2_unstuff_params;
+extern struct PINT_server_req_params pvfs2_stuffed_create_params;
+extern struct PINT_server_req_params pvfs2_precreate_pool_refiller_params;
 extern struct PINT_server_req_params pvfs2_get_cred_params;
 
 /* table of incoming request types and associated parameters */
@@ -71,7 +75,7 @@ struct PINT_server_req_entry PINT_server
     /* 20 */ {PVFS_SERV_MGMT_PERF_MON, &pvfs2_perf_mon_params},
     /* 21 */ {PVFS_SERV_MGMT_ITERATE_HANDLES, &pvfs2_iterate_handles_params},
     /* 22 */ {PVFS_SERV_MGMT_DSPACE_INFO_LIST, NULL},
-    /* 23 */ {PVFS_SERV_MGMT_EVENT_MON, &pvfs2_event_mon_params},
+    /* 23 */ {PVFS_SERV_MGMT_EVENT_MON, NULL},
     /* 24 */ {PVFS_SERV_MGMT_REMOVE_OBJECT, &pvfs2_mgmt_remove_object_params},
     /* 25 */ {PVFS_SERV_MGMT_REMOVE_DIRENT, &pvfs2_mgmt_remove_dirent_params},
     /* 26 */ {PVFS_SERV_MGMT_GET_DIRDATA_HANDLE, &pvfs2_mgmt_get_dirdata_handle_params},
@@ -83,7 +87,12 @@ struct PINT_server_req_entry PINT_server
     /* 32 */ {PVFS_SERV_LISTEATTR, &pvfs2_list_eattr_params},
     /* 33 */ {PVFS_SERV_SMALL_IO, &pvfs2_small_io_params},
     /* 34 */ {PVFS_SERV_LISTATTR, &pvfs2_list_attr_params},
-    /* 35 */ {PVFS_SERV_GETCRED, &pvfs2_get_cred_params}
+    /* 35 */ {PVFS_SERV_BATCH_CREATE, &pvfs2_batch_create_params},
+    /* 36 */ {PVFS_SERV_BATCH_REMOVE, &pvfs2_batch_remove_params},
+    /* 37 */ {PVFS_SERV_PRECREATE_POOL_REFILLER, &pvfs2_precreate_pool_refiller_params},
+    /* 38 */ {PVFS_SERV_UNSTUFF, &pvfs2_unstuff_params},
+    /* nlmills: TODO: does this list need to be in a specific order? */
+    /* 39 */ {PVFS_SERV_GETCRED, &pvfs2_get_cred_params}
 };
 
 #define CHECK_OP(_op_) assert(_op_ == PINT_server_req_table[_op_].op_type)

Index: pvfs2-server.c
===================================================================
RCS file: /anoncvs/pvfs2/src/server/pvfs2-server.c,v
diff -p -u -r1.251.2.5 -r1.251.2.6
--- pvfs2-server.c	21 Jul 2008 22:02:52 -0000	1.251.2.5
+++ pvfs2-server.c	25 Aug 2009 17:56:29 -0000	1.251.2.6
@@ -23,6 +23,8 @@
 #include <ucontext.h>
 #endif
 
+#define __PINT_REQPROTO_ENCODE_FUNCS_C
+
 #include "bmi.h"
 #include "gossip.h"
 #include "job.h"
@@ -37,12 +39,12 @@
 #include "quicklist.h"
 #include "pint-dist-utils.h"
 #include "pint-perf-counter.h"
-#include "pint-event.h"
 #include "id-generator.h"
 #include "job-time-mgr.h"
 #include "pint-cached-config.h"
 #include "pvfs2-internal.h"
 #include "src/server/request-scheduler/request-scheduler.h"
+#include "pint-event.h"
 #include "pint-util.h"
 #include "pint-security.h"
 
@@ -94,6 +96,8 @@ static struct server_configuration_s ser
 static int signal_recvd_flag = 0;
 static pid_t server_controlling_pid = 0;
 
+static PINT_event_id PINT_sm_event_id;
+
 /* A list of all serv_op's posted for unexpected message alone */
 QLIST_HEAD(posted_sop_list);
 /* A list of all serv_op's posted for expected messages alone */
@@ -126,7 +130,9 @@ PINT_server_trove_keys_s Trove_Common_Ke
     {DIRECTORY_ENTRY_KEYSTR, DIRECTORY_ENTRY_KEYLEN},
     {DATAFILE_HANDLES_KEYSTR, DATAFILE_HANDLES_KEYLEN},
     {METAFILE_DIST_KEYSTR, METAFILE_DIST_KEYLEN},
-    {SYMLINK_TARGET_KEYSTR, SYMLINK_TARGET_KEYLEN}
+    {SYMLINK_TARGET_KEYSTR, SYMLINK_TARGET_KEYLEN},
+    {METAFILE_LAYOUT_KEYSTR, METAFILE_LAYOUT_KEYLEN},
+    {NUM_DFILES_REQ_KEYSTR, NUM_DFILES_REQ_KEYLEN}
 };
 
 /* These three are used continuously in our wait loop.  They could be
@@ -157,10 +163,20 @@ static void bt_sighandler(int sig, sigin
 static int create_pidfile(char *pidfile);
 static void write_pidfile(int fd);
 static void remove_pidfile(void);
-static int generate_shm_key_hint(void);
+static int generate_shm_key_hint(int* server_index);
+
+static void precreate_pool_finalize(void);
+static int precreate_pool_initialize(int server_index);
+static int precreate_pool_setup_server(const char* host, PVFS_fs_id fsid,
+    PVFS_handle* pool_handle);
+static int precreate_pool_launch_refiller(const char* host, 
+    PVFS_BMI_addr_t addr, PVFS_fs_id fsid, PVFS_handle pool_handle);
+static int precreate_pool_count(
+    PVFS_fs_id fsid, PVFS_handle pool_handle, int* count);
 
 static TROVE_method_id trove_coll_to_method_callback(TROVE_coll_id);
 
+
 struct server_configuration_s *PINT_get_server_config(void)
 {
     return &server_config;
@@ -420,10 +436,20 @@ static void write_pidfile(int fd)
     pid_t pid = getpid();
     char pid_str[16] = {0};
     int len;
+    int ret;
 
     snprintf(pid_str, 16, "%d\n", pid);
     len = strlen(pid_str);
-    write(fd, pid_str, len);
+    ret = write(fd, pid_str, len);
+    if(ret < len)
+    {
+        gossip_err("Error: failed to write pid file.\n");
+        close(fd);
+        remove_pidfile();
+        return;
+    }
+    close(fd);
+    return;
 }
 
 static void remove_pidfile(void)
@@ -469,9 +495,12 @@ static int server_initialize(
     /* redirect gossip to specified target if backgrounded */
     if (s_server_options.server_background)
     {
-        freopen("/dev/null", "r", stdin);
-        freopen("/dev/null", "w", stdout);
-        freopen("/dev/null", "w", stderr);
+        if(!freopen("/dev/null", "r", stdin))
+            gossip_err("Error: failed to reopen stdin.\n");
+        if(!freopen("/dev/null", "w", stdout))
+            gossip_err("Error: failed to reopen stdout.\n");
+        if(!freopen("/dev/null", "w", stderr))
+            gossip_err("Error: failed to reopen stderr.\n");
 
         if(!strcmp(server_config.logtype, "syslog"))
         {
@@ -617,8 +646,8 @@ static int server_setup_process_environm
     }
     if (pid_fd >= 0)
     {
+        /* note: pid_fd closed by write_pidfile() */
         write_pidfile(pid_fd);
-        close(pid_fd);
         atexit(remove_pidfile);
     }
     server_controlling_pid = getpid();
@@ -654,6 +683,26 @@ static int server_initialize_subsystems(
     PVFS_ds_flags init_flags = 0;
     int bmi_flags = BMI_INIT_SERVER;
     int shm_key_hint;
+    int server_index;
+
+    if(server_config.enable_events)
+    {
+        ret = PINT_event_init(PINT_EVENT_TRACE_TAU);
+        if (ret < 0)
+        {
+            gossip_err("Error initializing event interface.\n");
+            return (ret);
+        }
+
+        /* Define the state machine event:
+         *   START: (client_id, request_id, rank, handle, op_id)
+         *   STOP: ()
+         */
+        PINT_event_define_event(
+            NULL, "sm", "%d%d%d%llu%d", "", &PINT_sm_event_id);
+
+        *server_status_flag |= SERVER_EVENT_INIT;
+    }
 
     /* Initialize distributions */
     ret = PINT_dist_initialize(0);
@@ -683,6 +732,12 @@ static int server_initialize_subsystems(
         bmi_flags |= BMI_TCP_BIND_SPECIFIC;
     }
 
+    /* Have bmi automatically increment reference count on addresses any
+     * time a new unexpected message appears.  The server will decrement it
+     * once it has completed processing related to that request.
+     */
+    bmi_flags |= BMI_AUTO_REF_COUNT;
+
     ret = BMI_initialize(server_config.bmi_modules, 
                          server_config.host_id,
                          bmi_flags);
@@ -711,7 +766,7 @@ static int server_initialize_subsystems(
     assert(ret == 0);
 
     /* help trove chose a differentiating shm key if needed for Berkeley DB */
-    shm_key_hint = generate_shm_key_hint();
+    shm_key_hint = generate_shm_key_hint(&server_index);
     gossip_debug(GOSSIP_SERVER_DEBUG, "Server using shm key hint: %d\n", shm_key_hint);
     ret = trove_collection_setinfo(0, 0, TROVE_SHM_KEY_HINT, &shm_key_hint);
     assert(ret == 0);
@@ -787,6 +842,38 @@ static int server_initialize_subsystems(
             return(ret);
         }
 
+        /*
+           set storage hints if any.  if any of these fail, we
+           can't error out since they're just hints.  thus, we
+           complain in logging and continue.
+           */
+        ret = trove_collection_setinfo(
+            cur_fs->coll_id, 0,
+            TROVE_DIRECTIO_THREADS_NUM,
+            (void *)&cur_fs->directio_thread_num);
+        if (ret < 0)
+        {
+            gossip_err("Error setting directio threads num\n");
+        }
+
+        ret = trove_collection_setinfo(
+            cur_fs->coll_id, 0,
+            TROVE_DIRECTIO_OPS_PER_QUEUE,
+            (void *)&cur_fs->directio_ops_per_queue);
+        if (ret < 0)
+        {
+            gossip_err("Error setting directio ops per queue\n");
+        }
+
+        ret = trove_collection_setinfo(
+            cur_fs->coll_id, 0,
+            TROVE_DIRECTIO_TIMEOUT,
+            (void *)&cur_fs->directio_timeout);
+        if (ret < 0)
+        {
+            gossip_err("Error setting directio threads num\n");
+        }
+
         orig_fsid = cur_fs->coll_id;
         ret = trove_collection_lookup(
             cur_fs->trove_method,
@@ -971,12 +1058,9 @@ static int server_initialize_subsystems(
                           "yes" : "no"));
 
             gossip_debug(GOSSIP_SERVER_DEBUG, "Export options for "
-                         "%s:\n RootSquash %s\n AllSquash %s\n ReadOnly %s\n"
-                         " AnonUID %u\n AnonGID %u\n", cur_fs->file_system_name,
-                         (cur_fs->exp_flags & TROVE_EXP_ROOT_SQUASH) ? "yes" : "no",
-                         (cur_fs->exp_flags & TROVE_EXP_ALL_SQUASH)  ? "yes" : "no",
-                         (cur_fs->exp_flags & TROVE_EXP_READ_ONLY)   ? "yes" : "no",
-                         cur_fs->exp_anon_uid, cur_fs->exp_anon_gid);
+                         "%s:\n ReadOnly %s\n", cur_fs->file_system_name,
+                         (cur_fs->exp_flags & TROVE_EXP_READ_ONLY) ? 
+                         "yes" : "no");
 
             /* format and pass sync mode to the flow implementation */
             snprintf(buf, 16, "%d,%d", cur_fs->coll_id,
@@ -997,6 +1081,16 @@ static int server_initialize_subsystems(
     gossip_debug(GOSSIP_SERVER_DEBUG, "%d filesystem(s) initialized\n",
                  PINT_llist_count(server_config.file_systems));
 
+    /*
+     * Migrate database if needed
+     */
+    ret = trove_migrate(server_config.trove_method,server_config.storage_path);
+    if (ret < 0)
+    {
+        gossip_err("trove_migrate failed: ret=%d\n", ret);
+        return(ret);
+    }
+
     ret = job_time_mgr_init();
     if(ret < 0)
     {
@@ -1043,13 +1137,14 @@ static int server_initialize_subsystems(
     *server_status_flag |= SERVER_PERF_COUNTER_INIT;
 #endif
 
-    ret = PINT_event_initialize(PINT_EVENT_DEFAULT_RING_SIZE);
+    ret = precreate_pool_initialize(server_index);
     if (ret < 0)
     {
-        gossip_err("Error initializing event interface.\n");
+        gossip_err("Error initializing precreate pool.\n");
         return (ret);
     }
-    *server_status_flag |= SERVER_EVENT_INIT;
+
+    *server_status_flag |= SERVER_PRECREATE_INIT;
 
     return ret;
 }
@@ -1158,6 +1253,15 @@ static int server_shutdown(
     gossip_debug(GOSSIP_SERVER_DEBUG,
                  "*** server shutdown in progress ***\n");
 
+    if (status & SERVER_PRECREATE_INIT)
+    {
+        gossip_debug(GOSSIP_SERVER_DEBUG, "[+] halting precreate pool "
+                     "           [   ...   ]\n");
+        precreate_pool_finalize();
+        gossip_debug(GOSSIP_SERVER_DEBUG, "[-]         precreate pool "
+                     "           [ stopped ]\n");
+    }
+
     if (status & SERVER_STATE_MACHINE_INIT)
     {
         gossip_debug(GOSSIP_SERVER_DEBUG, "[+] halting state machine "
@@ -1237,14 +1341,30 @@ static int server_shutdown(
 
     if (status & SERVER_TROVE_INIT)
     {
+        PINT_llist *cur;
+        struct filesystem_configuration_s *cur_fs;
         gossip_debug(GOSSIP_SERVER_DEBUG, "[+] halting storage "
                      "interface         [   ...   ]\n");
+
+        cur = server_config.file_systems;
+        while(cur)
+        {
+            cur_fs = PINT_llist_head(cur);
+            if (!cur_fs)
+            {
+                break;
+            }
+            trove_collection_clear(cur_fs->trove_method, cur_fs->coll_id);
+
+            cur = PINT_llist_next(cur);
+        }
+
         trove_finalize(server_config.trove_method);
         gossip_debug(GOSSIP_SERVER_DEBUG, "[-]         storage "
                      "interface         [ stopped ]\n");
     }
 
-    /* XXX: this the right place ? */
+    /* nlmills: TODO: this the right place ? */
     if (status & SERVER_SECURITY_INIT)
     {
         gossip_debug(GOSSIP_SERVER_DEBUG, "[+] halting security "
@@ -1381,7 +1501,7 @@ static int server_parse_cmd_line_args(in
         {0,0,0,0}
     };
 
-    while ((ret = getopt_long(argc, argv,"dfhrvp:a:",
+    while ((ret = getopt_long(argc, argv,"dfhrvp:a:e",
                               long_opts, &option_index)) != -1)
     {
         total_arguments++;
@@ -1447,7 +1567,7 @@ static int server_parse_cmd_line_args(in
                 }
                 break;
             case 'a':
-           do_alias:
+          do_alias:
                 total_arguments++;
                 s_server_options.server_alias = strdup(optarg);
                 break;
@@ -1568,6 +1688,9 @@ static int server_purge_unexpected_recv_
 
         /* mark the message for cancellation */
         s_op->op_cancelled = 1;
+
+        /* cancel the pending job_bmi_unexp operation */
+        job_bmi_unexp_cancel(s_op->unexp_id);
     }
     return 0;
 }
@@ -1613,6 +1736,8 @@ int server_state_machine_start(
         s_op->req  = (struct PVFS_server_req *)s_op->decoded.buffer;
         ret = PINT_smcb_set_op(smcb, s_op->req->op);
         s_op->op = s_op->req->op;
+        PVFS_hint_add(&s_op->req->hints, PVFS_HINT_SERVER_ID_NAME, sizeof(uint32_t), &server_config.host_index);
+        PVFS_hint_add(&s_op->req->hints, PVFS_HINT_OP_ID_NAME, sizeof(uint32_t), &s_op->req->op);
     }
     else
     {
@@ -1628,8 +1753,17 @@ int server_state_machine_start(
 
     if(s_op->req)
     {
-        PINT_event_timestamp(PVFS_EVENT_API_SM, (int32_t)s_op->req->op,
-                             0, tmp_id, PVFS_EVENT_FLAG_START);
+        gossip_debug(GOSSIP_SERVER_DEBUG, "client:%d, reqid:%d, rank:%d\n",
+                     PINT_HINT_GET_CLIENT_ID(s_op->req->hints),
+                     PINT_HINT_GET_REQUEST_ID(s_op->req->hints),
+                     PINT_HINT_GET_RANK(s_op->req->hints));
+        PINT_EVENT_START(PINT_sm_event_id, server_controlling_pid,
+                         NULL, &s_op->event_id,
+                         PINT_HINT_GET_CLIENT_ID(s_op->req->hints),
+                         PINT_HINT_GET_REQUEST_ID(s_op->req->hints),
+                         PINT_HINT_GET_RANK(s_op->req->hints),
+                         PINT_HINT_GET_HANDLE(s_op->req->hints),
+                         s_op->req->op);
         s_op->resp.op = s_op->req->op;
     }
 
@@ -1715,6 +1849,7 @@ int server_state_machine_start_noreq(str
 
     if (new_op)
     {
+
         /* add to list of state machines started without a request */
         qlist_add_tail(&new_op->next, &noreq_sop_list);
 
@@ -1748,12 +1883,18 @@ int server_state_machine_complete(PINT_s
 
     /* set a timestamp on the completion of the state machine */
     id_gen_fast_register(&tmp_id, s_op);
-    PINT_event_timestamp(PVFS_EVENT_API_SM, (int32_t)s_op->req->op,
-                         0, tmp_id, PVFS_EVENT_FLAG_END);
+
+    if(s_op->req)
+    {
+        PINT_EVENT_END(PINT_sm_event_id, server_controlling_pid,
+                       NULL, s_op->event_id, 0);
+    }
 
     /* release the decoding of the unexpected request */
     if (ENCODING_IS_VALID(s_op->decoded.enc_type))
     {
+        PVFS_hint_free(s_op->decoded.stub_dec.req.hints);
+
         PINT_decode_release(&(s_op->decoded),PINT_DECODE_REQ);
     }
 
@@ -1826,6 +1967,8 @@ static TROVE_method_id trove_coll_to_met
     return fs_config->trove_method;
 }
 
+/* nlmills: TODO: find a way to make this work with capabilities */
+#if 0
 #ifndef GOSSIP_DISABLE_DEBUG
 void PINT_server_access_debug(PINT_server_op * s_op,
                               int64_t debug_mask,
@@ -1833,6 +1976,8 @@ void PINT_server_access_debug(PINT_serve
                               ...)
 {
     static char pint_access_buffer[GOSSIP_BUF_SIZE];
+    struct passwd* pw;
+    struct group* gr;
     va_list ap;
 
     if ((gossip_debug_on) &&
@@ -1841,8 +1986,12 @@ void PINT_server_access_debug(PINT_serve
     {
         va_start(ap, format);
 
+        pw = getpwuid(s_op->req->credentials.uid);
+        gr = getgrgid(s_op->req->credentials.gid);
         snprintf(pint_access_buffer, GOSSIP_BUF_SIZE,
-            "@%s H=%llu S=%p: %s: %s",
+            "%s.%s@%s H=%llu S=%p: %s: %s",
+            ((pw) ? pw->pw_name : "UNKNOWN"),
+            ((gr) ? gr->gr_name : "UNKNOWN"),
             BMI_addr_rev_lookup_unexpected(s_op->addr),
             llu(s_op->target_handle),
             s_op,
@@ -1855,6 +2004,7 @@ void PINT_server_access_debug(PINT_serve
     }
 }
 #endif
+#endif
 
 /* generate_shm_key_hint()
  *
@@ -1864,10 +2014,12 @@ void PINT_server_access_debug(PINT_serve
  *
  * returns integer key
  */
-static int generate_shm_key_hint(void)
+static int generate_shm_key_hint(int* server_index)
 {
-    int server_index = 1;
     struct host_alias_s *cur_alias = NULL;
+    struct filesystem_configuration_s *first_fs;
+
+    *server_index = 1;
 
     PINT_llist *cur = server_config.host_aliases;
 
@@ -1885,10 +2037,11 @@ static int generate_shm_key_hint(void)
             /* space the shm keys out by 10 to allow for Berkeley DB using 
              * using more than one key on each server
              */
-            return(server_index*10);        
+            first_fs = PINT_llist_head(server_config.file_systems);
+            return(first_fs->coll_id + (*server_index)*10);
         }
 
-        server_index++;
+        (*server_index)++;
         cur = PINT_llist_next(cur);
     }
     
@@ -1898,6 +2051,353 @@ static int generate_shm_key_hint(void)
      */
     srand((unsigned int)time(NULL));
     return(rand());
+}
+
+/* precreate_pool_initialize()
+ * 
+ * starts the infrastructure for managing pools of precreated handles
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+static int precreate_pool_initialize(int server_index)
+{
+    PINT_llist *cur_f = server_config.file_systems;
+    struct filesystem_configuration_s *cur_fs;
+    int ret = -1;
+    PVFS_handle pool_handle;
+    int server_count;
+    PVFS_BMI_addr_t* addr_array;
+    const char* host;
+    int i;
+    int server_type;
+    int handle_count = 0;
+    int fs_count = 0;
+
+    /* iterate through list of file systems */
+    while(cur_f)
+    {
+        cur_fs = PINT_llist_head(cur_f);
+        if (!cur_fs)
+        {
+            break;
+        }
+
+        fs_count++;
+
+        /* am I a meta server in this file system? */
+        ret = PINT_cached_config_check_type(
+            cur_fs->coll_id,
+            server_config.host_id,
+            &server_type);
+        if(ret < 0)
+        {
+            gossip_err("Error: %s not found in configuration file.\n", 
+                server_config.host_id);
+            gossip_err("Error: configuration file is inconsistent.\n");
+            return(ret);
+        }
+        if(!(server_type & PINT_SERVER_TYPE_META))
+        {
+            /* This server is not a meta server for this file system; 
+             * skip doing any precreate setup steps.
+             */
+            cur_f = PINT_llist_next(cur_f);
+            continue;
+        }
+
+        /* how many I/O servers do we have? */
+        ret = PINT_cached_config_count_servers(
+            cur_fs->coll_id, PINT_SERVER_TYPE_IO, &server_count);
+        if(ret < 0)
+        {
+            gossip_err("Error: unable to count servers for fsid: %d\n", 
+                (int)cur_fs->coll_id);
+            return(ret);
+        }
+        
+        addr_array = malloc(server_count*sizeof(PVFS_BMI_addr_t));
+        if(!addr_array)
+        {
+            gossip_err("Error: unable to allocate book keeping information for precreate pools.\n");
+            return(-PVFS_ENOMEM);
+        }
+
+        /* resolve addrs for each I/O server */
+        ret = PINT_cached_config_get_server_array(
+            cur_fs->coll_id, PINT_SERVER_TYPE_IO,
+            addr_array, &server_count);
+        if(ret < 0)
+        {
+            gossip_err("Error: unable retrieve servers for fsid: %d\n", 
+                (int)cur_fs->coll_id);
+            return(ret);
+        }
+
+        for(i=0; i<server_count; i++)
+        {
+            host = PINT_cached_config_map_addr(
+                cur_fs->coll_id, addr_array[i], &server_type);
+            if(!strcmp(host, server_config.host_id) == 0)
+            {
+                /* this is a peer server */
+                /* make sure a pool exists for that server,fsid pair */
+                ret = precreate_pool_setup_server(host, 
+                    cur_fs->coll_id, &pool_handle);
+                if(ret < 0)
+                {
+                    gossip_err("Error: precreate_pool_initialize failed to setup pool for %s\n", server_config.host_id);
+                    return(ret);
+                }
+
+                /* count current handles */
+                ret = precreate_pool_count(cur_fs->coll_id, pool_handle, 
+                    &handle_count);
+                if(ret < 0)
+                {
+                    gossip_err("Error: precreate_pool_initialize failed to count pool for %s\n", server_config.host_id);
+                    return(ret);
+                }
+
+                /* prepare the job interface to use this pool */
+                ret = job_precreate_pool_register_server(host, 
+                    cur_fs->coll_id, pool_handle, handle_count);
+                assert(ret != 0);
+                if(ret < 0)
+                {
+                    gossip_err("Error: precreate_pool_initialize failed to register pool for %s\n", server_config.host_id);
+                    return(ret);
+                }
+
+                /* launch sm to take care of refilling */
+                ret = precreate_pool_launch_refiller(host, addr_array[i],
+                    cur_fs->coll_id, pool_handle);
+                if(ret < 0)
+                {
+                    gossip_err("Error: precreate_pool_initialize failed to launch refiller SM for %s\n", server_config.host_id);
+                    return(ret);
+                }
+            }
+        }
+
+        job_precreate_pool_set_index(server_index);
+
+        cur_f = PINT_llist_next(cur_f);
+
+    }
+
+    return(0);
+}
+
+/* precreate_pool_finalize()
+ *
+ * shuts down infrastructure for managing pools of precreated handles
+ */
+static void precreate_pool_finalize(void)
+{
+    /* TODO: anything to do here? */
+    /* TODO: maybe try to stop pending refiller sms? */
+    return;
+}
+
+/* precreate_pool_setup_server()
+ *  
+ * This function makes sure that a pool is present for the specified server
+ *
+ */
+static int precreate_pool_setup_server(const char* host, PVFS_fs_id fsid,
+    PVFS_handle* pool_handle)
+{
+    job_status_s js;
+    job_id_t job_id;
+    int ret;
+    int outcount;
+    PVFS_handle_extent_array ext_array;
+
+    PVFS_ds_keyval key;
+    PVFS_ds_keyval val;
+
+    /* look for the pool handle for this server */
+    key.buffer_sz = strlen(host) + strlen("precreate-pool-") + 1;
+    key.buffer = malloc(key.buffer_sz);
+    if(!key.buffer)
+    {
+        return(-ENOMEM);
+    }
+    snprintf((char*)key.buffer, key.buffer_sz, "precreate-pool-%s", host);
+    key.read_sz = 0;
+
+    val.buffer = pool_handle;
+    val.buffer_sz = sizeof(*pool_handle);
+    val.read_sz = 0;
+
+    ret = job_trove_fs_geteattr(fsid, &key, &val, 0, NULL, 0, &js, 
+        &job_id, server_job_context, NULL);
+    while(ret == 0)
+    {
+        ret = job_test(job_id, &outcount, NULL, &js, 
+            PVFS2_SERVER_DEFAULT_TIMEOUT_MS, server_job_context);
+    }
+    if(ret < 0)
+    {
+        gossip_err("Error: precreate_pool failed to read fs eattrs.\n");
+        free(key.buffer);
+        return(ret);
+    }
+    if(js.error_code && js.error_code != -TROVE_ENOENT)
+    {
+        gossip_err("Error: precreate_pool failed to read fs eattrs.\n");
+        free(key.buffer);
+        return(js.error_code);
+    }
+    else if(js.error_code == -TROVE_ENOENT)
+    {
+        /* handle doesn't exist yet; let's create it */
+        gossip_debug(GOSSIP_SERVER_DEBUG, "precreate_pool didn't find handle for %s; creating now.\n", host);
+
+        /* find extent array for ourselves */
+        ret = PINT_cached_config_get_server(
+            fsid, server_config.host_id, PINT_SERVER_TYPE_META, &ext_array);
+        if(ret < 0)
+        {
+            gossip_err("Error: PINT_cached_config_get_meta() failure.\n");
+            free(key.buffer);
+            return(ret);
+        }
+
+        /* create a trove object for the pool */
+        ret = job_trove_dspace_create(fsid, &ext_array, PVFS_TYPE_INTERNAL,
+            NULL, TROVE_SYNC, NULL, 0, &js, &job_id, server_job_context, NULL);
+        while(ret == 0)
+        {
+            ret = job_test(job_id, &outcount, NULL, &js, 
+                PVFS2_SERVER_DEFAULT_TIMEOUT_MS, server_job_context);
+        }
+        if(ret < 0 || js.error_code)
+        {
+            gossip_err("Error: precreate_pool failed to create pool.\n");
+            free(key.buffer);
+            return(ret < 0 ? ret : js.error_code);
+        }
+
+        *pool_handle = js.handle;
+
+        /* store reference to pool handle as collection eattr */
+        ret = job_trove_fs_seteattr(fsid, &key, &val, TROVE_SYNC, NULL, 0, &js, 
+            &job_id, server_job_context, NULL);
+        while(ret == 0)
+        {
+            ret = job_test(job_id, &outcount, NULL, &js, 
+                PVFS2_SERVER_DEFAULT_TIMEOUT_MS, server_job_context);
+        }
+        if(ret < 0 || js.error_code)
+        {
+            gossip_err("Error: failed to record precreate pool handle.\n");
+            gossip_err("Warning: fsck may be needed to recover lost handle.\n");
+            free(key.buffer);
+            return(ret < 0 ? ret : js.error_code);
+        }
+        gossip_debug(GOSSIP_SERVER_DEBUG, "precreate_pool created handle %llu for %s.\n", llu(*pool_handle), host);
+
+    }
+    else
+    {
+        /* handle already exists */
+        gossip_debug(GOSSIP_SERVER_DEBUG, "precreate_pool found handle %llu for %s.\n", llu(*pool_handle), host);
+    }
+
+    free(key.buffer);
+    return(0);
+}
+
+/* precreate_pool_count()
+ *
+ * counts the number of handles stored in a persistent precreate pool
+ */
+static int precreate_pool_count(
+    PVFS_fs_id fsid, PVFS_handle pool_handle, int* count)
+{
+    int ret;
+    job_status_s js;
+    job_id_t job_id;
+    int outcount;
+    PVFS_ds_keyval_handle_info handle_info;
+
+    /* try to get the current number of handles from the pool */
+    ret = job_trove_keyval_get_handle_info(
+        fsid, pool_handle, TROVE_KEYVAL_HANDLE_COUNT, &handle_info,
+        NULL, 0, &js, &job_id, server_job_context, NULL);
+    while(ret == 0)
+    {
+        ret = job_test(job_id, &outcount, NULL, &js, 
+            PVFS2_SERVER_DEFAULT_TIMEOUT_MS, server_job_context);
+    }
+    if(ret < 0)
+    {
+        return(ret);
+    }
+    
+    if(js.error_code == -TROVE_ENOENT)
+    {
+        /* this really means there aren't any keyvals there yet */
+        handle_info.count = 0;
+    }
+    else if(js.error_code != 0)
+    {
+        return(js.error_code);
+    }
+
+    *count = handle_info.count;
+
+    return(0);
+}
+
+static int precreate_pool_launch_refiller(const char* host, 
+    PVFS_BMI_addr_t addr, PVFS_fs_id fsid, PVFS_handle pool_handle)
+{
+    struct PINT_smcb *tmp_smcb = NULL;
+    struct PINT_server_op *s_op;
+    int ret;
+
+    /* allocate smcb */
+    ret = server_state_machine_alloc_noreq(PVFS_SERV_PRECREATE_POOL_REFILLER,
+        &(tmp_smcb));
+    if (ret < 0)
+    {
+        return(ret);
+    }
+
+    s_op = PINT_sm_frame(tmp_smcb, PINT_FRAME_CURRENT);
+    s_op->u.precreate_pool_refiller.host = strdup(host);
+    if(!s_op->u.precreate_pool_refiller.host)
+    {
+        PINT_smcb_free(tmp_smcb);
+        return(ret);
+    }
+
+    ret = PINT_cached_config_get_server(
+        fsid, host, PINT_SERVER_TYPE_IO, 
+        &s_op->u.precreate_pool_refiller.data_handle_extent_array);
+    if(ret < 0)
+    {
+        free(s_op->u.precreate_pool_refiller.host);
+        PINT_smcb_free(tmp_smcb);
+        return(ret);
+    }
+
+    s_op->u.precreate_pool_refiller.pool_handle = pool_handle;
+    s_op->u.precreate_pool_refiller.fsid = fsid;
+    s_op->u.precreate_pool_refiller.host_addr = addr;
+
+    /* start sm */
+    ret = server_state_machine_start_noreq(tmp_smcb);
+    if (ret < 0)
+    {
+        free(s_op->u.precreate_pool_refiller.host);
+        PINT_smcb_free(tmp_smcb);
+        return(ret);
+    }
+
+    return(0);
 }
 
 /*

Index: pvfs2-server.h
===================================================================
RCS file: /anoncvs/pvfs2/src/server/pvfs2-server.h,v
diff -p -u -r1.150.2.9 -r1.150.2.10
--- pvfs2-server.h	2 Dec 2008 03:35:52 -0000	1.150.2.9
+++ pvfs2-server.h	25 Aug 2009 17:56:30 -0000	1.150.2.10
@@ -31,6 +31,8 @@
 #include "msgpairarray.h"
 #include "pvfs2-req-proto.h"
 #include "state-machine.h"
+#include "pint-event.h"
+#include "security-types.h"
 
 extern job_context_id server_job_context;
 
@@ -55,6 +57,13 @@ extern job_context_id server_job_context
 /* number of milliseconds that clients will delay between retries */
 #define PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT  2000
 
+/* Specifies the number of handles to be preceated at a time from each
+ * server using the batch create request.
+ */
+#define PVFS2_PRECREATE_BATCH_SIZE_DEFAULT 512
+/* precreate pools will be topped off if they fall below this value */
+#define PVFS2_PRECREATE_LOW_THRESHOLD_DEFAULT 256
+
 /* types of permission checking that a server may need to perform for
  * incoming requests
  */
@@ -70,15 +79,6 @@ enum PINT_server_req_permissions
                                       needs write and execute */
 };
 
-#define PINT_GET_OBJECT_REF_DEFINE(req_name)                             \
-static inline int PINT_get_object_ref_##req_name(                        \
-    struct PVFS_server_req *req, PVFS_fs_id *fs_id, PVFS_handle *handle) \
-{                                                                        \
-    *fs_id = req->u.req_name.fs_id;                                      \
-    *handle = req->u.req_name.handle;                                    \
-    return 0;                                                            \
-}
-
 /* used to keep a random, but handy, list of keys around */
 typedef struct PINT_server_trove_keys
 {
@@ -95,6 +95,8 @@ enum 
     METAFILE_HANDLES_KEY = 2,
     METAFILE_DIST_KEY    = 3,
     SYMLINK_TARGET_KEY   = 4,
+    METAFILE_LAYOUT_KEY  = 5,
+    NUM_DFILES_REQ_KEY   = 6
 };
 
 /* optional; user-settable keys */
@@ -128,9 +130,23 @@ typedef enum
     SERVER_JOB_TIME_MGR_INIT   = (1 << 15),
     SERVER_DIST_INIT           = (1 << 16),
     SERVER_CACHED_CONFIG_INIT  = (1 << 17),
-    SERVER_SECURITY_INIT       = (1 << 18)
+    SERVER_PRECREATE_INIT      = (1 << 18),
+    SERVER_SECURITY_INIT       = (1 << 19)
 } PINT_server_status_flag;
 
+struct PINT_server_create_op
+{
+    const char **io_servers;
+    const char **remote_io_servers;
+    int num_io_servers;
+    PVFS_handle* handle_array_local; 
+    PVFS_handle* handle_array_remote; 
+    int handle_array_local_count;
+    int handle_array_remote_count;
+    PVFS_error saved_error_code;
+    int handle_index;
+};
+
 /* struct PINT_server_lookup_op
  *
  * All the data needed during lookup processing:
@@ -202,6 +218,29 @@ struct PINT_server_mgmt_remove_dirent_op
     PVFS_handle dirdata_handle;
 };
 
+struct PINT_server_precreate_pool_refiller_op
+{
+    PVFS_handle pool_handle;
+    PVFS_handle* precreate_handle_array;
+    PVFS_fs_id fsid;
+    char* host;
+    PVFS_BMI_addr_t host_addr;
+    PVFS_handle_extent_array data_handle_extent_array;
+    PVFS_capability capability;
+};
+
+struct PINT_server_batch_create_op
+{
+    int saved_error_code;
+    int batch_index;
+};
+
+struct PINT_server_batch_remove_op
+{
+    int handle_index;
+    int error_code;
+};
+
 struct PINT_server_mgmt_get_dirdata_op
 {
     PVFS_handle dirdata_handle;
@@ -249,11 +288,12 @@ struct PINT_server_getattr_op
 {
     PVFS_handle handle;
     PVFS_fs_id fs_id;
-    PVFS_ds_attributes dirdata_ds_attr;
     uint32_t attrmask;
     PVFS_error* err_array;
     PVFS_ds_keyval_handle_info keyval_handle_info;
     PVFS_handle dirent_handle;
+    int num_dfiles_req;
+    PVFS_credential credential;
 };
 
 struct PINT_server_listattr_op
@@ -268,7 +308,7 @@ struct PINT_server_getcred_op
 {
     char *certificate;
     uint32_t sig_size;
-    PVFS_sig signature;
+    PVFS_signature signature;
     PVFS_credential credential;
 };
 
@@ -277,7 +317,15 @@ struct PINT_server_eattr_op
 {
     void *buffer;
 };
-    
+
+struct PINT_server_unstuff_op
+{
+    PVFS_handle* dfile_array;
+    int num_dfiles_req;
+    PVFS_sys_layout layout;
+    void* encoded_layout;
+};
+
 /* This structure is passed into the void *ptr 
  * within the job interface.  Used to tell us where
  * to go next in our state machine.
@@ -286,16 +334,21 @@ typedef struct PINT_server_op
 {
     struct qlist_head   next; /* used to queue structures used for unexp style messages */
     int op_cancelled; /* indicates unexp message was cancelled */
+    job_id_t unexp_id;
+
     enum PVFS_server_op op;  /* type of operation that we are servicing */
 
+    PINT_event_id event_id;
+
     /* holds id from request scheduler so we can release it later */
     job_id_t scheduled_id; 
 
     /* generic structures used in most server operations */
     PVFS_ds_keyval key, val;
-    PVFS_ds_keyval key2, val2; 
     PVFS_ds_keyval *key_a;
     PVFS_ds_keyval *val_a;
+    int *error_a;
+    int keyval_count;
 
     int free_val;
 
@@ -334,6 +387,7 @@ typedef struct PINT_server_op
     union
     {
 	/* request-specific scratch spaces for use during processing */
+        struct PINT_server_create_op create;
         struct PINT_server_eattr_op eattr;
         struct PINT_server_getattr_op getattr;
         struct PINT_server_listattr_op listattr;
@@ -351,15 +405,21 @@ typedef struct PINT_server_op
 	struct PINT_server_mkdir_op mkdir;
         struct PINT_server_mgmt_remove_dirent_op mgmt_remove_dirent;
         struct PINT_server_mgmt_get_dirdata_op mgmt_get_dirdata_handle;
+        struct PINT_server_precreate_pool_refiller_op
+            precreate_pool_refiller;
+        struct PINT_server_batch_create_op batch_create;
+        struct PINT_server_batch_remove_op batch_remove;
+        struct PINT_server_unstuff_op unstuff;
         struct PINT_server_getcred_op getcred;
     } u;
 
 } PINT_server_op;
 
-/* TODO: consider passing request only */
-/* in that case we can move this whole block back to the file's top */
+/* nlmills: TODO: consider passing request only */
+/* nlmills: in that case we can move this whole block back to the file's top */
 typedef int (*PINT_server_req_perm_fun)(PINT_server_op *s_op);
 
+/* nlmills: TODO: remove these? */
 /* default checks that should work for most ops */
 extern int PINT_server_perm_read(PINT_server_op *s_op);
 extern int PINT_server_perm_write(PINT_server_op *s_op);
@@ -438,14 +498,16 @@ PINT_server_req_get_sched_policy(struct 
 const char* PINT_map_server_op_to_string(enum PVFS_server_op op);
 
 
+/* nlmills: TODO: fix this to work with capabilities */
 /* PINT_ACCESS_DEBUG()
  *
  * macro for consistent printing of access records
  *
  * no return value
  */
-#ifdef GOSSIP_DISABLE_DEBUG
+/*#ifdef GOSSIP_DISABLE_DEBUG*/
 #define PINT_ACCESS_DEBUG(__s_op, __mask, format, f...) do {} while (0)
+/*
 #else
 #define PINT_ACCESS_DEBUG(__s_op, __mask, format, f...)                     \
     PINT_server_access_debug(__s_op, __mask, format, ##f)
@@ -455,6 +517,7 @@ void PINT_server_access_debug(PINT_serve
                               int64_t debug_mask,
                               const char * format,
                               ...) __attribute__((format(printf, 3, 4)));
+*/
 
 /* nested state machines */
 extern struct PINT_state_machine_s pvfs2_get_attr_work_sm;

Index: readdir.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/readdir.sm,v
diff -p -u -r1.52.2.2 -r1.52.2.3
--- readdir.sm	13 Jun 2008 19:49:58 -0000	1.52.2.2
+++ readdir.sm	25 Aug 2009 17:56:30 -0000	1.52.2.3
@@ -125,7 +125,7 @@ static PINT_sm_action readdir_read_dirda
         &s_op->key, &s_op->val, 
         0, 
         NULL, smcb, 0, js_p, &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -205,8 +205,8 @@ static PINT_sm_action readdir_iterate_on
         s_op->req->u.readdir.token, s_op->key_a, s_op->val_a,
         s_op->req->u.readdir.dirent_count, 
         0, 
-        NULL, smcb, 0, js_p, 
-        &j_id, server_job_context);
+        NULL, smcb, 0, js_p,
+        &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: remove.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/remove.sm,v
diff -p -u -r1.60.2.4 -r1.60.2.5
--- remove.sm	31 Jul 2008 17:54:36 -0000	1.60.2.4
+++ remove.sm	25 Aug 2009 17:56:30 -0000	1.60.2.5
@@ -242,10 +242,10 @@ static PINT_sm_action remove_read_dirdat
 
     ret = job_trove_keyval_read(
         s_op->u.remove.fs_id, s_op->u.remove.handle,
-        &s_op->key, &s_op->val, 
-        0, 
+        &s_op->key, &s_op->val,
+        0,
         NULL, smcb, 0, js_p,
-        &j_id, server_job_context);
+        &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -276,7 +276,7 @@ static PINT_sm_action remove_get_dirent_
         0,
         js_p,
         &tmp_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -333,7 +333,7 @@ static PINT_sm_action remove_remove_dird
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -356,9 +356,9 @@ static PINT_sm_action remove_remove_dspa
 
     ret = job_trove_dspace_remove(
         s_op->u.remove.fs_id, s_op->u.remove.handle,
-        TROVE_SYNC, 
+        TROVE_SYNC,
         smcb, 0, js_p,
-        &j_id, server_job_context);
+        &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -389,7 +389,7 @@ static int perm_remove(PINT_server_op *s
 {
     int ret;
 
-    /* TODO: find a way to check permissions on the parent directory */
+    /* nlmills: TODO: find a way to check permissions on the parent directory */
     ret = 0;
 
     return ret;

Index: rmdirent.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/rmdirent.sm,v
diff -p -u -r1.56.2.3 -r1.56.2.4
--- rmdirent.sm	29 Jul 2008 22:29:38 -0000	1.56.2.3
+++ rmdirent.sm	25 Aug 2009 17:56:30 -0000	1.56.2.4
@@ -126,7 +126,7 @@ static int rmdirent_verify_parent_metada
         0,
         js_p,
         &i,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -170,7 +170,7 @@ static int rmdirent_remove_directory_ent
         &s_op->key, 
         &s_op->val,
         flags,
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     /* 
      * Removing an entry causes an update of directory timestamps
@@ -220,7 +220,7 @@ static PINT_sm_action rmdirent_update_di
         s_op->req->u.rmdirent.fs_id, s_op->req->u.rmdirent.handle,
         ds_attr, 
         TROVE_SYNC,
-        smcb, 0, js_p, &j_id, server_job_context);
+        smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: set-attr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/set-attr.sm,v
diff -p -u -r1.73.2.6 -r1.73.2.7
--- set-attr.sm	18 Sep 2008 21:12:33 -0000	1.73.2.6
+++ set-attr.sm	25 Aug 2009 17:56:30 -0000	1.73.2.7
@@ -344,7 +344,7 @@ static PINT_sm_action setattr_setobj_att
         s_op->req->u.setattr.fs_id, s_op->req->u.setattr.handle,
         ds_attr, 
         TROVE_SYNC,
-        smcb, 0, js_p, &j_id, server_job_context);
+        smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -398,7 +398,7 @@ static PINT_sm_action setattr_write_meta
         s_op->req->u.setattr.fs_id, s_op->req->u.setattr.handle,
         &(s_op->key), &(s_op->val),
         0,
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -449,7 +449,7 @@ static PINT_sm_action setattr_write_meta
         s_op->req->u.setattr.fs_id, s_op->req->u.setattr.handle,
         &(s_op->key), &(s_op->val),
         TROVE_SYNC,
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }
@@ -494,7 +494,7 @@ static PINT_sm_action setattr_write_syml
         s_op->req->u.setattr.fs_id, s_op->req->u.setattr.handle,
         &(s_op->key), &(s_op->val),
         TROVE_SYNC,
-        NULL, smcb, 0, js_p, &j_id, server_job_context);
+        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: set-eattr.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/set-eattr.sm,v
diff -p -u -r1.20.2.4 -r1.20.2.5
--- set-eattr.sm	14 Jun 2008 22:44:45 -0000	1.20.2.4
+++ set-eattr.sm	25 Aug 2009 17:56:30 -0000	1.20.2.5
@@ -143,7 +143,7 @@ static int seteattr_setobj_eattribs(
         0,
         js_p,
         &j_id,
-        server_job_context);
+        server_job_context, s_op->req->hints);
 
     return ret;
 }

Index: setparam.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/setparam.sm,v
diff -p -u -r1.34.2.5 -r1.34.2.6
--- setparam.sm	29 Jul 2008 22:29:38 -0000	1.34.2.5
+++ setparam.sm	25 Aug 2009 17:56:30 -0000	1.34.2.6
@@ -62,22 +62,20 @@ static PINT_sm_action setparam_work(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
-    int ret = -1, tmp_on = 0, old_event_on = 0;
+    int ret = -1, tmp_on = 0;
     job_id_t tmp_id;
     uint64_t tmp_mask = 0;
-    int32_t old_api_mask = 0, old_op_mask = 0;
     PVFS_handle tmp_handle = PVFS_HANDLE_NULL;
     struct server_configuration_s *user_opts;
     struct filesystem_configuration_s *fs_conf;
     char buf[16] = {0};
- 
+
     switch(s_op->req->u.mgmt_setparam.param)
     {
         case PVFS_SERV_PARAM_GOSSIP_MASK:
             gossip_get_debug_mask(&tmp_on, &tmp_mask);
-            s_op->resp.u.mgmt_setparam.old_value = tmp_mask;
             gossip_set_debug_mask(
-                1, s_op->req->u.mgmt_setparam.value);
+                1, s_op->req->u.mgmt_setparam.value.u.value);
             js_p->error_code = 0;
             return SM_ACTION_COMPLETE;
         case PVFS_SERV_PARAM_INVALID:
@@ -86,39 +84,26 @@ static PINT_sm_action setparam_work(
             js_p->error_code = -PVFS_ENOSYS;
             return SM_ACTION_COMPLETE;
         case PVFS_SERV_PARAM_FSID_CHECK:
-            s_op->resp.u.mgmt_setparam.old_value = 0;
             js_p->error_code = check_fs_id(
-                (PVFS_fs_id)s_op->req->u.mgmt_setparam.value);
+                (PVFS_fs_id)s_op->req->u.mgmt_setparam.value.u.value);
             return SM_ACTION_COMPLETE;
         case PVFS_SERV_PARAM_ROOT_CHECK:
-            tmp_handle = (PVFS_handle)s_op->req->u.mgmt_setparam.value;
-            s_op->resp.u.mgmt_setparam.old_value = 0;
+            tmp_handle = (PVFS_handle)s_op->req->u.mgmt_setparam.value.u.value;
             gossip_debug(GOSSIP_SERVER_DEBUG, " - ROOT_CHECK looking for"
                          " handle %llu, on fs_id %d\n", llu(tmp_handle),
                          s_op->req->u.mgmt_setparam.fs_id);
             ret = job_trove_dspace_verify(
                 s_op->req->u.mgmt_setparam.fs_id, tmp_handle,
                 0, 
-                smcb, 0, js_p, &tmp_id, server_job_context);
+                smcb, 0, js_p, &tmp_id, server_job_context, s_op->req->hints);
             return(ret);
-        case PVFS_SERV_PARAM_EVENT_ON:
+        case PVFS_SERV_PARAM_EVENT_ENABLE:
             ret = 0;
-            PINT_event_get_masks(
-                &old_event_on, &old_api_mask, &old_op_mask);
-            PINT_event_set_masks(
-                (int)s_op->req->u.mgmt_setparam.value,
-                old_api_mask, old_op_mask);
-            s_op->resp.u.mgmt_setparam.old_value = old_event_on;
+            PINT_event_enable(s_op->req->u.mgmt_setparam.value.u.string_value);
             js_p->error_code = ret;
             return SM_ACTION_COMPLETE;
-        case PVFS_SERV_PARAM_EVENT_MASKS:
-            PINT_event_get_masks(
-                &old_event_on, &old_api_mask, &old_op_mask);
-            PINT_event_set_masks(old_event_on,
-                (int32_t)(s_op->req->u.mgmt_setparam.value & 0x0FFFFFFFF),
-                (int32_t)(s_op->req->u.mgmt_setparam.value >> 32));
-            s_op->resp.u.mgmt_setparam.old_value = old_api_mask +
-                ((int64_t)old_op_mask << 32);
+        case PVFS_SERV_PARAM_EVENT_DISABLE:
+            PINT_event_disable(s_op->req->u.mgmt_setparam.value.u.string_value);
             js_p->error_code = 0;
             return SM_ACTION_COMPLETE;
         case PVFS_SERV_PARAM_SYNC_META:
@@ -127,7 +112,7 @@ static PINT_sm_action setparam_work(
                 s_op->req->u.mgmt_setparam.fs_id);
             if(fs_conf)
             {
-                if(s_op->req->u.mgmt_setparam.value)
+                if(s_op->req->u.mgmt_setparam.value.u.value)
                     fs_conf->trove_sync_meta = TROVE_SYNC;
                 else
                     fs_conf->trove_sync_meta = 0;
@@ -140,7 +125,7 @@ static PINT_sm_action setparam_work(
                 s_op->req->u.mgmt_setparam.fs_id);
             if(fs_conf)
             {
-                if(s_op->req->u.mgmt_setparam.value)
+                if(s_op->req->u.mgmt_setparam.value.u.value)
                 {
                     snprintf(buf, 16, "%d,%d", s_op->req->u.mgmt_setparam.fs_id,
                              TROVE_SYNC);
@@ -159,10 +144,10 @@ static PINT_sm_action setparam_work(
             return SM_ACTION_COMPLETE;
         case PVFS_SERV_PARAM_MODE:
 
-            s_op->resp.u.mgmt_setparam.old_value = PINT_req_sched_get_mode();
-            ret = job_req_sched_change_mode(s_op->req->u.mgmt_setparam.value,
-                                            NULL, 0, js_p, &s_op->scheduled_id,
-                                            server_job_context);
+            ret = job_req_sched_change_mode(
+                s_op->req->u.mgmt_setparam.value.u.value,
+                NULL, 0, js_p, &s_op->scheduled_id,
+                server_job_context);
 
             js_p->error_code = 0;
             return ret;

Index: small-io.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/small-io.sm,v
diff -p -u -r1.21.2.3 -r1.21.2.4
--- small-io.sm	14 Jun 2008 22:44:45 -0000	1.21.2.3
+++ small-io.sm	25 Aug 2009 17:56:31 -0000	1.21.2.4
@@ -78,7 +78,7 @@ static PINT_sm_action small_io_start_job
     s_op->resp.u.small_io.io_type = s_op->req->u.small_io.io_type;
 
     if(s_op->req->u.small_io.io_type == PVFS_IO_READ &&
-       s_op->ds_attr.b_size == 0)
+       s_op->ds_attr.u.datafile.b_size == 0)
     {
         /* nothing to read.  return SM_ACTION_DEFERRED */
         js_p->error_code = 0;
@@ -103,8 +103,8 @@ static PINT_sm_action small_io_start_job
                                  s_op->req->u.small_io.file_req_offset +
                                  s_op->req->u.small_io.aggregate_size);
 
-    s_op->resp.u.small_io.bstream_size = s_op->ds_attr.b_size;
-    fdata.fsize = s_op->ds_attr.b_size;
+    s_op->resp.u.small_io.bstream_size = s_op->ds_attr.u.datafile.b_size;
+    fdata.fsize = s_op->ds_attr.u.datafile.b_size;
     fdata.extend_flag = 
         (s_op->req->u.small_io.io_type == PVFS_IO_READ) ? 0 : 1;
 
@@ -161,7 +161,8 @@ static PINT_sm_action small_io_start_job
            0,
            js_p,
            &tmp_id,
-           server_job_context);
+           server_job_context,
+           s_op->req->hints);
         if(ret < 0)
         {
             gossip_err("small_io: Failed to post trove bstream write\n");
@@ -195,7 +196,8 @@ static PINT_sm_action small_io_start_job
             0,
             js_p,
             &tmp_id,
-            server_job_context);
+            server_job_context,
+            s_op->req->hints);
         if(ret < 0)
         {
             gossip_err("small-io: Failed to post trove bstream read\n");

Index: truncate.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/truncate.sm,v
diff -p -u -r1.9.2.3 -r1.9.2.4
--- truncate.sm	13 Jun 2008 19:49:58 -0000	1.9.2.3
+++ truncate.sm	25 Aug 2009 17:56:31 -0000	1.9.2.4
@@ -63,7 +63,8 @@ static PINT_sm_action truncate_resize(
     ret = job_trove_bstream_resize(
         s_op->req->u.truncate.fs_id, s_op->req->u.truncate.handle,
         s_op->req->u.truncate.size, s_op->req->u.truncate.flags,
-        NULL, smcb, 0, js_p, &i, server_job_context);
+        NULL, smcb, 0, js_p, &i, server_job_context,
+        s_op->req->hints);
 
     return ret;
 }

Index: unexpected.sm
===================================================================
RCS file: /anoncvs/pvfs2/src/server/unexpected.sm,v
diff -p -u -r1.5 -r1.5.2.1
--- unexpected.sm	26 Feb 2008 19:32:26 -0000	1.5
+++ unexpected.sm	25 Aug 2009 17:56:31 -0000	1.5.2.1
@@ -40,7 +40,6 @@ static PINT_sm_action unexpected_post(
         struct PINT_smcb *smcb, job_status_s *js_p)
 {
     int ret = -PVFS_EINVAL;
-    job_id_t j_id;
     struct PINT_server_op *s_op =
             (struct PINT_server_op *)PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
 
@@ -53,7 +52,7 @@ static PINT_sm_action unexpected_post(
        unexpected message (it is an output parameter).
      */
     ret = job_bmi_unexp(&s_op->unexp_bmi_buff, smcb, 0,
-                        js_p, &j_id, JOB_NO_IMMED_COMPLETE,
+                        js_p, &s_op->unexp_id, JOB_NO_IMMED_COMPLETE,
                         server_job_context);
     if(ret == SM_ACTION_COMPLETE)
     {
@@ -80,7 +79,6 @@ static PINT_sm_action unexpected_map(
     /* If op was cancelled, kill the SM */
     if (s_op->op_cancelled)
     {
-        /* is there a reason to do any cleanup? */
         return SM_ACTION_TERMINATE;
     }
     /* Else move it to the inprogress_sop_list */
@@ -97,9 +95,6 @@ static PINT_sm_action unexpected_map(
          */
         gossip_lerr("Error: post unexpected failure when restarting.\n");
     }
-
-    /* Bump up the reference count on the bmi address that we are using */
-    BMI_set_info(s_op->unexp_bmi_buff.addr, BMI_INC_ADDR_REF, NULL);
 
     /* restart as new request state machine */
     memset(js_p, 0, sizeof(job_status_s));



More information about the Pvfs2-cvs mailing list