[PVFS2-CVS] commit by neill in pvfs2/src/client/sysint: acache.c acache.h client-state-machine.c client-state-machine.h finalize.c fs-add.c getattr-acache.sm initialize.c msgpairarray.sm ncache.c ncache.h pint-bucket.c shared-state-methods.c shared-state-methods.h sys-create.sm sys-getattr.sm sys-io.sm sys-lookup.sm sys-setattr.sm sys-symlink.sm

CVS commit program cvs at parl.clemson.edu
Thu Jul 8 13:17:07 EDT 2004


Update of /projects/cvsroot/pvfs2/src/client/sysint
In directory parlweb:/tmp/cvs-serv12211/src/client/sysint

Modified Files:
	acache.c acache.h client-state-machine.c 
	client-state-machine.h finalize.c fs-add.c getattr-acache.sm 
	initialize.c msgpairarray.sm ncache.c ncache.h pint-bucket.c 
	shared-state-methods.c shared-state-methods.h sys-create.sm 
	sys-getattr.sm sys-io.sm sys-lookup.sm sys-setattr.sm 
	sys-symlink.sm 
Log Message:
- merging in the pvfs2-nm-nb-branch with the main tree
  see ChangeLog for details, or browse the cvs history of the branch
  for full details


Index: acache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/acache.c,v
diff -p -u -r1.7 -r1.8
--- acache.c	17 May 2004 15:57:32 -0000	1.7
+++ acache.c	8 Jul 2004 16:17:06 -0000	1.8
@@ -18,13 +18,13 @@
 #include "acache.h"
 #include "quickhash.h"
 
-/* uncomment the following for verbose acache debugging */
-/* #define VERBOSE_ACACHE_DEBUG */
+/* comment out the following for non-verbose acache debugging */
+#define VERBOSE_ACACHE_DEBUG
 
 /*
-  uncomment the following for an experimental pinode
-  cleanup mechanism for trying to bound the number of
-  pinode entries in the acache at any given time
+  comment out the following for an experimental pinode cleanup
+  mechanism for trying to bound the number of pinode entries in the
+  acache at any given time
 */
 #define PINT_ACACHE_AUTO_CLEANUP
 
@@ -35,19 +35,13 @@
 #define acache_debug(...)
 #endif
 
-/*
-  what we could do is disable the use of the acache
-  if locks are not available on the system, but for now...
-*/
-#ifndef __GEN_POSIX_LOCKING__
-#error "Cannot use acache without functioning mutex locking"
-#endif
-
 static struct qhash_table *s_acache_htable = NULL;
 static gen_mutex_t *s_acache_htable_mutex = NULL;
 
+static gen_mutex_t s_acache_interface_mutex = GEN_MUTEX_INITIALIZER;
+
 static int s_acache_initialized = 0;
-static int s_acache_timeout_ms = (PINT_ACACHE_TIMEOUT * 1000);
+static int s_acache_timeout_ms = PINT_ACACHE_TIMEOUT_MS;
 static int s_acache_allocated_entries = 0;
 
 /* static internal helper methods */
@@ -58,7 +52,8 @@ static void pinode_free(PINT_pinode *pin
 static int pinode_status(PINT_pinode *pinode);
 static void pinode_update_timestamp(PINT_pinode **pinode);
 static void pinode_invalidate(PINT_pinode *pinode);
-
+static void acache_internal_release(PINT_pinode *pinode);
+static PINT_pinode *acache_internal_lookup(PVFS_object_ref refn);
 #ifdef PINT_ACACHE_AUTO_CLEANUP
 static void reclaim_pinode_entries(void);
 #endif
@@ -76,9 +71,12 @@ int PINT_acache_initialize()
 
     acache_debug("PINT_acache_initialize entered\n");
 
+    gen_mutex_lock(&s_acache_interface_mutex);
+
     s_acache_htable_mutex = gen_mutex_build();
     if (!s_acache_htable_mutex)
     {
+        gen_mutex_unlock(&s_acache_interface_mutex);
         return ret;
     }
 
@@ -91,11 +89,13 @@ int PINT_acache_initialize()
         goto error_exit;
     }
 
-    acache_debug("PINT_acache_initialize exiting\n");
     s_acache_initialized = 1;
+    gen_mutex_unlock(&s_acache_interface_mutex);
+    acache_debug("PINT_acache_initialize exiting\n");
     return 0;
 
   error_exit:
+    gen_mutex_unlock(&s_acache_interface_mutex);
     acache_debug("PINT_acache_initialize error exiting\n");
     PINT_acache_finalize();
     return ret;
@@ -108,6 +108,9 @@ void PINT_acache_finalize()
     struct qlist_head *link = NULL, *tmp = NULL;
 
     acache_debug("PINT_acache_finalize entered\n");
+
+    gen_mutex_lock(&s_acache_interface_mutex);
+
     if (s_acache_htable_mutex && s_acache_htable)
     {
         gen_mutex_lock(s_acache_htable_mutex);
@@ -139,10 +142,42 @@ void PINT_acache_finalize()
 
     s_acache_htable = NULL;
     s_acache_htable_mutex = NULL;
+
+    gen_mutex_unlock(&s_acache_interface_mutex);
     acache_debug("PINT_acache_finalize exiting\n");
 }
 
 /*
+  internal use only -- does a lookup without involving the pinode
+  locks or reference counts; always done with the interface lock and
+  without the htable mutex held
+*/
+static PINT_pinode *acache_internal_lookup(PVFS_object_ref refn)
+{
+    PINT_pinode *pinode = NULL;
+    struct qhash_head *link = NULL;
+
+    acache_debug("acache_internal_lookup entered\n");
+    assert(s_acache_initialized);
+
+    gen_mutex_lock(s_acache_htable_mutex);
+    link = qhash_search(s_acache_htable, &refn);
+    if (link)
+    {
+        pinode = qlist_entry(link, PINT_pinode, link);
+        assert(pinode);
+    }
+    gen_mutex_unlock(s_acache_htable_mutex);
+
+    if (pinode)
+    {
+        assert(pinode->flag = PINODE_INTERNAL_FLAG_HASHED);
+    }
+    acache_debug("acache_internal_lookup exiting\n");
+    return pinode;
+}
+
+/*
   returns the pinode matching the specified reference if
   present in the acache. returns NULL otherwise.
 
@@ -156,6 +191,8 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
     struct qhash_head *link = NULL;
 
     acache_debug("PINT_acache_lookup entered\n");
+
+    gen_mutex_lock(&s_acache_interface_mutex);
     assert(s_acache_initialized);
 
     gen_mutex_lock(s_acache_htable_mutex);
@@ -173,6 +210,7 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
         assert(pinode->flag = PINODE_INTERNAL_FLAG_HASHED);
         pinode->ref_cnt++;
     }
+    gen_mutex_unlock(&s_acache_interface_mutex);
     acache_debug("PINT_acache_lookup exiting\n");
     return pinode;
 }
@@ -180,6 +218,7 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
 PINT_pinode *PINT_acache_pinode_alloc()
 {
 #ifdef PINT_ACACHE_AUTO_CLEANUP
+    gen_mutex_lock(&s_acache_interface_mutex);
     /*
       PINT_ACACHE_NUM_FLUSH_ENTRIES is a soft limit that triggers
       an attempt to reclaim any expired or invalidated entries
@@ -190,6 +229,7 @@ PINT_pinode *PINT_acache_pinode_alloc()
     {
         reclaim_pinode_entries();
     }
+    gen_mutex_unlock(&s_acache_interface_mutex);
 #endif
     return pinode_alloc();
 }
@@ -210,6 +250,8 @@ void PINT_acache_set_valid(PINT_pinode *
 {
     acache_debug("PINT_acache_set_valid entered\n");
     assert(s_acache_initialized);
+
+    gen_mutex_lock(&s_acache_interface_mutex);
     if (pinode)
     {
         /* if we don't have the lock, acquire it */
@@ -236,6 +278,7 @@ void PINT_acache_set_valid(PINT_pinode *
         pinode_update_timestamp(&pinode);
         gen_mutex_unlock(pinode->mutex);
     }
+    gen_mutex_unlock(&s_acache_interface_mutex);
     acache_debug("PINT_acache_set_valid exiting\n");
 }
 
@@ -247,18 +290,14 @@ void PINT_acache_invalidate(PVFS_object_
     acache_debug("PINT_acache_invalidate entered\n");
     assert(s_acache_initialized);
 
-    pinode = PINT_acache_lookup(refn);
+    gen_mutex_lock(&s_acache_interface_mutex);
+    pinode = acache_internal_lookup(refn);
+    gen_mutex_unlock(&s_acache_interface_mutex);
     if (pinode)
     {
-        /* drop the ref count we picked up in lookup */
-        pinode->ref_cnt--;
-
         /* forcefully expire the entry */
         pinode->status = PINODE_STATUS_EXPIRED;
 
-        /* we should have the lock at this point */
-        gen_mutex_unlock(pinode->mutex);
-
         PINT_acache_release(pinode);
     }
     acache_debug("PINT_acache_invalidate exiting\n");
@@ -266,18 +305,26 @@ void PINT_acache_invalidate(PVFS_object_
 
 void PINT_acache_release_refn(PVFS_object_ref refn)
 {
-    PINT_pinode *pinode = PINT_acache_lookup(refn);
+    PINT_pinode *pinode = NULL;
+
+    acache_debug("PINT_acache_release_refn entered\n");
+    gen_mutex_lock(&s_acache_interface_mutex);
+    pinode = acache_internal_lookup(refn);
     if (pinode)
     {
-        /* drop the ref count we picked up in lookup */
-        pinode->ref_cnt--;
-        PINT_acache_release(pinode);
+        acache_internal_release(pinode);
     }
+    gen_mutex_unlock(&s_acache_interface_mutex);
+    acache_debug("PINT_acache_release_refn exited\n");
 }
 
-void PINT_acache_release(PINT_pinode *pinode)
+/*
+  internal use only -- does a release without the interface lock and
+  without the htable mutex held
+*/
+static void acache_internal_release(PINT_pinode *pinode)
 {
-    acache_debug("PINT_acache_release entered\n");
+    acache_debug("acache_internal_release entered\n");
     assert(s_acache_initialized);
     if (pinode)
     {
@@ -297,6 +344,18 @@ void PINT_acache_release(PINT_pinode *pi
             gen_mutex_unlock(pinode->mutex);
         }
     }
+    acache_debug("acache_internal_release exited\n");
+}
+
+void PINT_acache_release(PINT_pinode *pinode)
+{
+    acache_debug("PINT_acache_release entered\n");
+    assert(s_acache_initialized);
+
+    gen_mutex_lock(&s_acache_interface_mutex);
+    acache_internal_release(pinode);
+    gen_mutex_unlock(&s_acache_interface_mutex);
+
     acache_debug("PINT_acache_release exited\n");
 }
 
@@ -539,6 +598,8 @@ static void pinode_free(PINT_pinode *pin
             gen_mutex_destroy(pinode->mutex);
             pinode->mutex = NULL;
         }
+
+        PINT_acache_object_attr_deep_free(&pinode->attr);
         free(pinode);
         s_acache_allocated_entries--;
         pinode = NULL;
@@ -570,7 +631,6 @@ static void pinode_invalidate(PINT_pinod
         gen_mutex_unlock(s_acache_htable_mutex);
         acache_debug("*** pinode_invalidate: removed from htable\n");
     }
-    PINT_acache_object_attr_deep_free(&pinode->attr);
     gen_mutex_unlock(pinode->mutex);
     pinode_free(pinode);
     acache_debug("*** pinode_invalidate: freed pinode\n");

Index: acache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/acache.h,v
diff -p -u -r1.2 -r1.3
--- acache.h	24 Mar 2004 23:10:30 -0000	1.2
+++ acache.h	8 Jul 2004 16:17:06 -0000	1.3
@@ -19,11 +19,11 @@
   ================================
 
   PINT_acache_initialize must be the first called method, and
-  PINT_acache_finalize must be the last.  The default lifespan
-  of a valid acache entry is PINT_ACACHE_TIMEOUT seconds, but
-  you can set the timeout (at millisecond granularity) at runtime
-  by called PINT_acache_set_timeout. You can also retrieve the
-  acache timeout at any time by calling PINT_acache_get_timeout.
+  PINT_acache_finalize must be the last.  The default lifespan of a
+  valid acache entry is PINT_ACACHE_TIMEOUT_MS seconds, but you can
+  set the timeout (at millisecond granularity) at runtime by called
+  PINT_acache_set_timeout. You can also retrieve the acache timeout at
+  any time by calling PINT_acache_get_timeout.
 
   How to use the acache in 5 steps of less:
   -----------------------------------------
@@ -45,14 +45,14 @@
   tips and tricks of the acache hacking gurus:
   --------------------------------------------
   update_timestamps internally bumps up the pinode reference count to
-  make sure it stays in the pinode cache.  on expiration, the ref count
-  is dropped.  these internal ref counts are separate from the user
-  influenced ref counts (i.e. lookup, invalidate, release)
+  make sure it stays in the pinode cache.  on expiration, the ref
+  count is dropped.  these internal ref counts are separate from the
+  user influenced ref counts (i.e. lookup, invalidate, release)
 
 
-  if you define PINT_ACACHE_AUTO_CLEANUP, the following applies:
-  since the number of pinodes in existance at any time is unbounded,
-  if the internal allocator sees that a multiple of
+  if you define PINT_ACACHE_AUTO_CLEANUP, the following applies: since
+  the number of pinodes in existance at any time is unbounded, if the
+  internal allocator sees that a multiple of
   PINT_ACACHE_NUM_FLUSH_ENTRIES pinodes exist, we secretly try to
   reclaim up to PINT_ACACHE_NUM_FLUSH_ENTRIES by scanning the htable
   of used pinodes and freeing any expired, invalid, or pinodes that
@@ -61,7 +61,7 @@
   this 'feature' is mostly untested and may not be beneficial.
 */
 
-#define PINT_ACACHE_TIMEOUT                                        5
+#define PINT_ACACHE_TIMEOUT_MS                                  5000
 #define PINT_ACACHE_NUM_ENTRIES                                 1024
 #define PINT_ACACHE_NUM_FLUSH_ENTRIES  (PINT_ACACHE_NUM_ENTRIES / 4)
 #define PINT_ACACHE_HTABLE_SIZE                                  511

Index: client-state-machine.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.c,v
diff -p -u -r1.50 -r1.51
--- client-state-machine.c	21 May 2004 17:22:37 -0000	1.50
+++ client-state-machine.c	8 Jul 2004 16:17:06 -0000	1.51
@@ -3,7 +3,6 @@
  *
  * See COPYING in top-level directory.
  */
-
 #include <string.h>
 #include <assert.h>
 
@@ -25,11 +24,171 @@
 
 job_context_id pint_client_sm_context;
 
-static job_id_t job_id_array[MAX_RETURNED_JOBS];
-static void *client_sm_p_array[MAX_RETURNED_JOBS];
-static job_status_s job_status_array[MAX_RETURNED_JOBS];
-static int job_count = 0;
+static int got_context = 0;
+
+/*
+  used for locally storing completed operations from test() call so
+  that we can retrieve them in testsome() while still making progress
+  (and possible completing operations in the test() call
+*/
+static int s_completion_list_index = 0;
+static PINT_client_sm *s_completion_list[MAX_RETURNED_JOBS] = {NULL};
+static gen_mutex_t s_completion_list_mutex = GEN_MUTEX_INITIALIZER;
+
+#define CLIENT_SM_INIT_ONCE()                                        \
+do {                                                                 \
+    if (got_context == 0)                                            \
+    {                                                                \
+	/* get a context for our state machine operations */         \
+	job_open_context(&pint_client_sm_context);                   \
+	got_context = 1;                                             \
+    }                                                                \
+} while(0)
+
+#define CLIENT_SM_ASSERT_INITIALIZED()  \
+do { assert(got_context); } while(0)
+
+static int add_sm_to_completion_list(PINT_client_sm *sm_p)
+{
+    gen_mutex_lock(&s_completion_list_mutex);
+    assert(s_completion_list_index < MAX_RETURNED_JOBS);
+    s_completion_list[s_completion_list_index++] = sm_p;
+    gen_mutex_unlock(&s_completion_list_mutex);
+    return 0;
+}
+
+static int conditional_remove_sm_if_in_completion_list(
+    PINT_client_sm *sm_p)
+{
+    int found = 0, i = 0;
+
+    gen_mutex_lock(&s_completion_list_mutex);
+    for(i = 0; i < s_completion_list_index; i++)
+    {
+        if (s_completion_list[i] == sm_p)
+        {
+            s_completion_list[i] = NULL;
+            found = 1;
+            break;
+        }
+    }
+    gen_mutex_unlock(&s_completion_list_mutex);
+    return found;
+}
+
+static int completion_list_retrieve_completed(
+    PVFS_sys_op_id *op_id_array,
+    void **user_ptr_array,
+    int *error_code_array,
+    int limit,
+    int *out_count)
+{
+    int i = 0, new_list_index = 0;
+    PINT_client_sm *sm_p = NULL;
+    PINT_client_sm *tmp_completion_list[MAX_RETURNED_JOBS] = {NULL};
+
+    assert(op_id_array);
+    assert(error_code_array);
+    assert(out_count);
+
+    memset(tmp_completion_list, 0,
+           (MAX_RETURNED_JOBS * sizeof(PINT_client_sm *)));
+
+    gen_mutex_lock(&s_completion_list_mutex);
+    for(i = 0; i < s_completion_list_index; i++)
+    {
+        if (s_completion_list[i] == NULL)
+        {
+            continue;
+        }
+
+        sm_p = s_completion_list[i];
+        assert(sm_p);
+
+        if (i < limit)
+        {
+            op_id_array[i] = sm_p->sys_op_id;
+            error_code_array[i] = sm_p->error_code;
+
+            if (user_ptr_array)
+            {
+                user_ptr_array[i] = (void *)sm_p->user_ptr;
+            }
+            s_completion_list[i] = NULL;
+
+            PINT_sys_release(sm_p->sys_op_id);
+        }
+        else
+        {
+            tmp_completion_list[new_list_index++] = sm_p;
+        }
+    }
+    *out_count = i;
+
+    /* clean up and adjust the list and it's book keeping */
+    s_completion_list_index = new_list_index;
+    memcpy(s_completion_list, tmp_completion_list,
+           (MAX_RETURNED_JOBS * sizeof(PINT_client_sm *)));
+    
+    gen_mutex_unlock(&s_completion_list_mutex);
+    return 0;
+}
+
+static inline int cancelled_io_jobs_are_pending(PINT_client_sm *sm_p)
+{
+    /*
+      NOTE: if the I/O cancellation has properly completed, the
+      cancelled contextual jobs within that I/O operation will be
+      popping out of the testcontext calls (in our testsome() or
+      test()).  to avoid passing out the same completed op mutliple
+      times, do not add the operation to the completion list until all
+      cancellations on the I/O operation are accounted for
+    */
+    assert(sm_p);
+
+    /*
+      this *can* possibly be 0 in the case that the I/O has already
+      completed and no job cancellation were issued at I/O cancel time
+    */
+    if (sm_p->u.io.total_cancellations_remaining > 0)
+    {
+        sm_p->u.io.total_cancellations_remaining--;
+    }
+
+    gossip_debug(
+        GOSSIP_IO_DEBUG, "(%p) cancelled_io_jobs_are_pending: %d "
+        "remaining (op %s)\n", sm_p,
+        sm_p->u.io.total_cancellations_remaining,
+        (sm_p->op_complete ? "complete" : "NOT complete"));
+
+    return (sm_p->u.io.total_cancellations_remaining != 0);
+}
+
+/*
+  NOTE: important usage notes regarding post(), test(), and testsome()
 
+  thread safety: test() and testsome() can be called in any order by
+  the same thread.  if you need to call test() and testsome()
+  simultaneously from different threads, you need to serialize the
+  calls yourself.
+
+  calling semantics: the non-blocking calls (i.e. PVFS_isys_* or
+  PVFS_imgmt_* calls) allocate the state machine pointer (sm_p) used
+  for each operation.  the blocking calls DO NOT allocate this, but
+  call the non-blocking method (which does allocate it) and waits for
+  completion.  On completion, the blocking call frees the state
+  machine pointer (via PINT_sys_release).  the blocking calls only
+  ever call the test() function, which does not free the state machine
+  pointer on completion.
+
+  the testsome() function frees the state machine pointers allocated
+  from the non-blocking calls on completion because any caller of
+  testsome() *should* be using the non-blocking calls with it.  this
+  means that if you are calling test() with a non-blocking operation
+  that you manually issued (with a PVFS_isys* or PVFS_imgmt* call),
+  you need to call PVFS_sys_release on your own when the operation
+  completes.
+*/
 int PINT_client_state_machine_post(
     PINT_client_sm *sm_p,
     int pvfs_sys_op,
@@ -38,19 +197,20 @@ int PINT_client_state_machine_post(
 {
     int ret = -PVFS_EINVAL;
     job_status_s js;
-    static int got_context = 0;
+
+#if 0
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+                 "PINT_client_state_machine_post called\n");
+#endif
+
+    CLIENT_SM_INIT_ONCE();
 
     if (!sm_p)
     {
         return ret;
     }
 
-    if (got_context == 0)
-    {
-	/* get a context for our state machine operations */
-	job_open_context(&pint_client_sm_context);
-	got_context = 1;
-    }
+    memset(&js, 0, sizeof(js));
 
     /* save operation type; mark operation as unfinished */
     sm_p->user_ptr = user_ptr;
@@ -143,68 +303,227 @@ int PINT_client_state_machine_post(
 	    sm_p->current_state =
                 (pvfs2_client_job_timer_sm.state_machine + 1);
 	    break;
+        case PVFS_DEV_UNEXPECTED:
+            gossip_err("You should be using PINT_sys_dev_unexp for "
+                       "posting this type of operation!  Failing.\n");
+            return ret;
 	default:
             gossip_lerr("FIXME: Unrecognized sysint operation!\n");
             return ret;
     }
 
-    /* clear job status structure */
-    memset(&js, 0, sizeof(js));
+    if (op_id)
+    {
+        ret = PINT_id_gen_safe_register(op_id, (void *)sm_p);
+        sm_p->sys_op_id = *op_id;
+    }
 
-    /* call function, continue calling as long as we get immediate
-     * success.
-     */
+    /*
+      start state machine and continue advancing while we're getting
+      immediate completions
+    */
     ret = sm_p->current_state->state_action(sm_p, &js);
-    while (ret == 1)
+    while(ret == 1)
     {
-	/* PINT_state_machine_next() calls next function and
-	 * returns the result.
-	 */
-	ret = PINT_state_machine_next(sm_p, &js);
+        ret = PINT_state_machine_next(sm_p, &js);
     }
 
-    /* note: job_status_s pointed to by js_p is ok to use after
-     * we return regardless of whether or not we finished.
-     */
+    if (sm_p->op_complete)
+    {
+        ret = add_sm_to_completion_list(sm_p);
+        assert(ret == 0);
+    }
+    return ret;
+}
 
-    if (op_id)
+int PINT_sys_dev_unexp(
+    struct PINT_dev_unexp_info *info,
+    job_status_s *jstat,
+    PVFS_sys_op_id *op_id,
+    void *user_ptr)
+{
+    int ret = -PVFS_EINVAL;
+    job_id_t id;
+    PINT_client_sm *sm_p = NULL;
+
+    CLIENT_SM_INIT_ONCE();
+
+    /* we require more input args than the regular post method above */
+    if (!info || !jstat || !op_id)
+    {
+        return -PVFS_EINVAL;
+    }
+
+    sm_p = (PINT_client_sm *)malloc(sizeof(PINT_client_sm));
+    if (!sm_p)
+    {
+        return -PVFS_ENOMEM;
+    }
+    memset(sm_p, 0, sizeof(PINT_client_sm));
+    sm_p->user_ptr = user_ptr;
+    sm_p->op = PVFS_DEV_UNEXPECTED;
+    sm_p->op_complete = 0;
+    sm_p->cred_p = NULL;
+
+    memset(jstat, 0, sizeof(job_status_s));
+    ret = job_dev_unexp(info, (void *)sm_p, 0, jstat, &id,
+                        JOB_NO_IMMED_COMPLETE, pint_client_sm_context);
+    if (ret)
+    {
+        PVFS_perror_gossip("PINT_sys_dev_unexp failed", ret);
+        free(sm_p);
+    }
+    else
     {
         ret = PINT_id_gen_safe_register(op_id, (void *)sm_p);
+        sm_p->sys_op_id = *op_id;
     }
     return ret;
 }
 
 /* PINT_client_bmi_cancel()
  *
- * wrapper function for job_bmi_cancel that handles race conditions of
- * jobs that have completed in the job_testcontext() loop but have not
- * yet triggered a state transition
+ * wrapper function for job_bmi_cancel
  *
  * returns 0 on success, -PVFS_error on failure
  */
 int PINT_client_bmi_cancel(job_id_t id)
 {
-    int i;
+    return job_bmi_cancel(id, pint_client_sm_context);
+}
+
+/* PINT_client_io_cancel()
+ *
+ * cancels in progress I/O operations
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+int PINT_client_io_cancel(PVFS_sys_op_id id)
+{
+    int ret = -PVFS_EINVAL, i = 0;
+    PINT_client_sm *sm_p = NULL;
 
-    /* TODO: this is not thread safe */
-    for(i=0; i<job_count; i++)
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "PINT_client_io_cancel called\n");
+
+    sm_p = PINT_id_gen_safe_lookup(id);
+    if (!sm_p)
     {
-	if(job_id_array[i] == id)
-	{
-	    /* job has already completed; do nothing */
-	    return(0);
-	}
+	/* if we can't find it, it may have already completed */
+        return 0;
+    }
+
+    /* we can't cancel any arbitrary operation */
+    assert(sm_p->op == PVFS_SYS_IO);
+
+    if (sm_p->op_complete)
+    {
+	/* op already completed; nothing to cancel. */
+        return 0;
     }
 
-    return(job_bmi_cancel(id, pint_client_sm_context));
+    /* if we fall to here, the I/O operation is still in flight */
+    /* first, set a flag informing the sys_io state machine that the
+     * operation has been cancelled so it doesn't post any new jobs 
+     */
+    sm_p->op_cancelled = 1;
+
+    /*
+      don't return an error if nothing is cancelled, because
+      everything may have completed already
+    */
+    ret = 0;
+
+    /* now run through and cancel the outstanding jobs */
+    for(i = 0; i < sm_p->u.io.datafile_count; i++)
+    {
+        PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+        assert(cur_ctx);
+
+        if (cur_ctx->msg_send_in_progress)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG,  "[%d] Posting "
+                         "cancellation of type: BMI Send "
+                         "(Request)\n",i);
+
+            ret = job_bmi_cancel(cur_ctx->msg->send_id,
+                                 pint_client_sm_context);
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("job_bmi_cancel failed", ret);
+                break;
+            }
+            sm_p->u.io.total_cancellations_remaining++;
+        }
+
+        if (cur_ctx->msg_recv_in_progress)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG,  "[%d] Posting "
+                         "cancellation of type: BMI Recv "
+                         "(Response)\n",i);
+
+            ret = job_bmi_cancel(cur_ctx->msg->recv_id,
+                                 pint_client_sm_context);
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("job_bmi_cancel failed", ret);
+                break;
+            }
+            sm_p->u.io.total_cancellations_remaining++;
+        }
+
+        if (cur_ctx->flow_in_progress)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG,
+                         "[%d] Posting cancellation of type: FLOW\n",i);
+
+            ret = job_flow_cancel(
+                cur_ctx->flow_job_id, pint_client_sm_context);
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("job_flow_cancel failed", ret);
+                break;
+            }
+            sm_p->u.io.total_cancellations_remaining++;
+        }
+
+        if (cur_ctx->write_ack_in_progress)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG,  "[%d] Posting "
+                         "cancellation of type: BMI Recv "
+                         "(Write Ack)\n",i);
+
+            ret = job_bmi_cancel(cur_ctx->write_ack.recv_id,
+                                 pint_client_sm_context);
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("job_bmi_cancel failed", ret);
+                break;
+            }
+            sm_p->u.io.total_cancellations_remaining++;
+        }
+    }
+    gossip_debug(GOSSIP_IO_DEBUG, "(%p) Total cancellations "
+                 "remaining: %d\n", sm_p,
+                 sm_p->u.io.total_cancellations_remaining);
+    return ret;
 }
 
 int PINT_client_state_machine_test(
     PVFS_sys_op_id op_id,
     int *error_code)
 {
-    int ret = -PVFS_EINVAL, i = 0;
+    int ret = -PVFS_EINVAL, i = 0, job_count = 0;
     PINT_client_sm *sm_p, *tmp_sm_p = NULL;
+    job_id_t job_id_array[MAX_RETURNED_JOBS];
+    job_status_s job_status_array[MAX_RETURNED_JOBS];
+    void *client_sm_p_array[MAX_RETURNED_JOBS] = {NULL};
+
+#if 0
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+                 "PINT_client_state_machine_test called\n");
+#endif
+
+    CLIENT_SM_ASSERT_INITIALIZED();
 
     job_count = MAX_RETURNED_JOBS;
 
@@ -218,20 +537,19 @@ int PINT_client_state_machine_test(
     {
         return ret;
     }
-    assert(sm_p);
 
     if (sm_p->op_complete)
     {
         *error_code = sm_p->error_code;
+        conditional_remove_sm_if_in_completion_list(sm_p);
         return 0;
     }
 
-    /* TODO: this isn't thread safe... */
     ret = job_testcontext(job_id_array,
 			  &job_count, /* in/out parameter */
 			  client_sm_p_array,
 			  job_status_array,
-			  100, /* timeout? */
+			  10,
 			  pint_client_sm_context);
     assert(ret > -1);
 
@@ -239,47 +557,93 @@ int PINT_client_state_machine_test(
     for(i = 0; i < job_count; i++)
     {
 	tmp_sm_p = (PINT_client_sm *)client_sm_p_array[i];
+        assert(tmp_sm_p);
 
-        do
+        if (tmp_sm_p->op == PVFS_DEV_UNEXPECTED)
         {
-            ret = PINT_state_machine_next(tmp_sm_p, &job_status_array[i]);
+            tmp_sm_p->op_complete = 1;
+        }
 
-        } while (ret == 1);
+        if (!tmp_sm_p->op_complete)
+        {
+            do
+            {
+                ret = PINT_state_machine_next(
+                    tmp_sm_p, &job_status_array[i]);
 
-        /* (ret < 0) indicates a problem from the job system
-         * itself; the return value of the underlying operation
-         * is kept in the job status structure.
-         */
-        assert(ret == 0);
+            } while (ret == 1);
+
+            assert(ret == 0);
+        }
+
+        /* make sure we don't return internally cancelled I/O jobs */
+        if ((tmp_sm_p->op == PVFS_SYS_IO) && (tmp_sm_p->op_cancelled) &&
+            (cancelled_io_jobs_are_pending(tmp_sm_p)))
+        {
+            continue;
+        }
+
+        /*
+          if we've found a completed operation and it's NOT the op
+          being tested here, we add it to our local completion list so
+          that later calls to the sysint test/testsome can find it
+        */
+        if ((tmp_sm_p != sm_p) && (tmp_sm_p->op_complete == 1))
+        {
+            ret = add_sm_to_completion_list(tmp_sm_p);
+            assert(ret == 0);
+        }
     }
 
     if (sm_p->op_complete)
     {
         *error_code = sm_p->error_code;
+        conditional_remove_sm_if_in_completion_list(sm_p);
     }
     return 0;
 }
 
-int PINT_client_state_machine_testsome(PVFS_sys_op_id *op_id_array,
-                                       int *op_count) /* in/out */
+int PINT_client_state_machine_testsome(
+    PVFS_sys_op_id *op_id_array,
+    int *op_count, /* in/out */
+    void **user_ptr_array,
+    int *error_code_array,
+    int timeout_ms)
 {
-    int ret = -PVFS_EINVAL, i = 0, count = 0;
-    int limit = (*op_count - 1);
+    int ret = -PVFS_EINVAL, i = 0;
+    int limit = 0, job_count = 0;
     PINT_client_sm *sm_p = NULL;
+    job_id_t job_id_array[MAX_RETURNED_JOBS];
+    job_status_s job_status_array[MAX_RETURNED_JOBS];
+    void *client_sm_p_array[MAX_RETURNED_JOBS] = {NULL};
 
-    job_count = MAX_RETURNED_JOBS;
+#if 0
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+                 "PINT_client_state_machine_testsome called\n");
+#endif
+
+    CLIENT_SM_ASSERT_INITIALIZED();
+
+    if (!op_id_array || !op_count || !error_code_array)
+    {
+        PVFS_perror_gossip("PINT_client_state_machine_testsome", ret);
+        return ret;
+    }
 
-    if (!op_id_array || !op_count)
+    if ((*op_count < 1) || (*op_count > MAX_RETURNED_JOBS))
     {
+        PVFS_perror_gossip("testsome() got invalid op_count", ret);
         return ret;
     }
 
-    /* TODO: this isn't thread safe... */
+    job_count = MAX_RETURNED_JOBS;
+    limit = *op_count;
+
     ret = job_testcontext(job_id_array,
 			  &job_count, /* in/out parameter */
 			  client_sm_p_array,
 			  job_status_array,
-			  100, /* timeout? */
+			  timeout_ms,
 			  pint_client_sm_context);
     assert(ret > -1);
 
@@ -287,32 +651,56 @@ int PINT_client_state_machine_testsome(P
     for(i = 0; i < job_count; i++)
     {
 	sm_p = (PINT_client_sm *)client_sm_p_array[i];
+        assert(sm_p);
 
-        do
+        /*
+          note that dev unexp messages found here are treated as
+          complete since if we see them at all in here, they're ready
+          to be passed back to the caller
+        */
+        if (sm_p->op == PVFS_DEV_UNEXPECTED)
+        {
+            sm_p->op_complete = 1;
+        }
+
+        if (!sm_p->op_complete)
         {
-            ret = PINT_state_machine_next(sm_p, &job_status_array[i]);
+            do
+            {
+                ret = PINT_state_machine_next(
+                    sm_p, &job_status_array[i]);
 
-        } while (ret == 1);
+            } while (ret == 1);
 
-        /* (ret < 0) indicates a problem from the job system
-         * itself; the return value of the underlying operation
-         * is kept in the job status structure.
-         */
-        assert(ret == 0);
+            /* (ret < 0) indicates a problem from the job system
+             * itself; the return value of the underlying operation is
+             * kept in the job status structure.
+             */
+            assert(ret == 0);
+        }
+
+        /* make sure we don't return internally cancelled I/O jobs */
+        if ((sm_p->op == PVFS_SYS_IO) && (sm_p->op_cancelled) &&
+            (cancelled_io_jobs_are_pending(sm_p)))
+        {
+            continue;
+        }
 
+        /*
+          by adding the completed op to our completion list, we can
+          keep progressing on operations in progress here and just
+          grab all completed operations when we're finished
+          (i.e. outside of this loop).
+        */
         if (sm_p->op_complete)
         {
-            op_id_array[count++] = sm_p->sys_op_id;
-            if (count > limit)
-            {
-                break;
-            }
+            ret = add_sm_to_completion_list(sm_p);
+            assert(ret == 0);
         }
     }
 
-    *op_count = count;
-
-    return 0;
+    return completion_list_retrieve_completed(
+        op_id_array, user_ptr_array, error_code_array, limit, op_count);
 }
 
 int PINT_client_wait_internal(
@@ -357,8 +745,16 @@ void PINT_sys_release(PVFS_sys_op_id op_
     {
         PINT_id_gen_safe_unregister(op_id);
 
-        assert(sm_p->cred_p);
-        PVFS_util_release_credentials(sm_p->cred_p);
+        if (sm_p->op && sm_p->cred_p)
+        {
+            PVFS_util_release_credentials(sm_p->cred_p);
+            sm_p->cred_p = NULL;
+        }
+
+        if (sm_p->acache_hit)
+        {
+            PINT_acache_release(sm_p->pinode);
+        }
         free(sm_p);
     }
 }
@@ -413,20 +809,19 @@ int PINT_serv_free_msgpair_resources(
     PVFS_BMI_addr_t *svr_addr_p,
     int max_resp_sz)
 {
-    PINT_encode_release(encoded_req_p,
-			PINT_ENCODE_REQ);
+    int ret = -PVFS_EINVAL;
 
-    /* sm_p->req doesn't go anywhere; we'll use it again. */
+    if (encoded_req_p && decoded_resp_p && svr_addr_p)
+    {
+        PINT_encode_release(encoded_req_p, PINT_ENCODE_REQ);
 
-    PINT_decode_release(decoded_resp_p,
-			PINT_DECODE_RESP);
+        PINT_decode_release(decoded_resp_p, PINT_DECODE_RESP);
 
-    BMI_memfree(*svr_addr_p,
-		encoded_resp_p,
-		max_resp_sz,
-		BMI_RECV);
+        BMI_memfree(*svr_addr_p, encoded_resp_p, max_resp_sz, BMI_RECV);
 
-    return 0;
+        ret = 0;
+    }
+    return ret;
 }
 
 /* PINT_serv_msgpair_array_resolve_addrs()

Index: client-state-machine.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.h,v
diff -p -u -r1.113 -r1.114
--- client-state-machine.h	21 May 2004 18:52:06 -0000	1.113
+++ client-state-machine.h	8 Jul 2004 16:17:06 -0000	1.114
@@ -7,10 +7,10 @@
 #ifndef __PVFS2_CLIENT_STATE_MACHINE_H
 #define __PVFS2_CLIENT_STATE_MACHINE_H
 
-/* NOTE: STATE-MACHINE.H IS INCLUDED AT THE BOTTOM!  THIS IS SO WE CAN
- * DEFINE ALL THE STRUCTURES WE NEED BEFORE WE INCLUDE IT.
- */
-
+/*
+  NOTE: state-machine.h is included at the bottom so we can define all
+  the client-sm structures before it's included
+*/
 #include "pvfs2-sysint.h"
 #include "pvfs2-types.h"
 #include "pvfs2-storage.h"
@@ -26,12 +26,16 @@
 #define MAX_LOOKUP_SEGMENTS PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT
 #define MAX_LOOKUP_CONTEXTS PVFS_REQ_LIMIT_MAX_SYMLINK_RESOLUTION_COUNT
 
-/* jobs that send or receive request messages will timeout if they do not
- * complete within PVFS2_CLIENT_JOB_TIMEOUT seconds; flows will timeout if
- * they go for more than PVFS2_CLIENT_JOB_TIMEOUT seconds without moving any
- * data.
+/*
+  TODO: the following constants should be run-time configurable
+  eventually
+*/
+
+/* jobs that send or receive request messages will timeout if they do
+ * not complete within PVFS2_CLIENT_JOB_TIMEOUT seconds; flows will
+ * timeout if they go for more than PVFS2_CLIENT_JOB_TIMEOUT seconds
+ * without moving any data.
  */
-/* TODO: this should be configurable at runtime somehow */
 #define PVFS2_CLIENT_JOB_TIMEOUT 30
 
 /* the maximum number of times to retry restartable client operations */
@@ -60,12 +64,10 @@ typedef struct PINT_client_sm_msgpair_st
     /* don't use this -- internal msgpairarray use only */
     int retry_count;
 
-    /* comp_fn called after successful reception and decode of respone,
-     * if the msgpair state machine is used for processing.
+    /* comp_fn called after successful reception and decode of
+     * respone, if the msgpair state machine is used for processing.
      */
-    int (* comp_fn)(void *sm_p, /* actually (struct PINT_client_sm *) */
-		    struct PVFS_server_resp *resp_p,
-		    int i);
+    int (* comp_fn)(void *sm_p, struct PVFS_server_resp *resp_p, int i);
 
     /* comp_ct used to keep up with number of operations remaining */
     int comp_ct;
@@ -86,10 +88,17 @@ typedef struct PINT_client_sm_msgpair_st
     /* send_status, recv_status used for error handling etc. */
     job_status_s send_status, recv_status;
 
-    /* op_status is the code returned from the server, if the operation
-     * was actually processed (recv_status.error_code == 0)
+    /* op_status is the code returned from the server, if the
+     * operation was actually processed (recv_status.error_code == 0)
      */
     PVFS_error op_status;
+
+    /*
+      used in the retry code path to know if we've already completed
+      or not (to avoid re-doing the work we've already done)
+    */
+    int complete;
+
 } PINT_client_sm_msgpair_state;
 
 /* PINT_client_sm_recv_state_s
@@ -106,7 +115,6 @@ typedef struct PINT_client_sm_recv_state
     PVFS_error op_status;
 } PINT_client_sm_recv_state;
 
-/* PINT_client_remove_sm */
 struct PINT_client_remove_sm
 {
     char *object_name;   /* input parameter */
@@ -114,155 +122,184 @@ struct PINT_client_remove_sm
     int	retry_count;
 };
 
-/* PINT_client_create_sm */
-struct PINT_client_create_sm {
-    char                         *object_name;    /* input parameter */
-    PVFS_sysresp_create          *create_resp;    /* in/out parameter*/
-    PVFS_sys_attr                sys_attr;        /* input parameter */
-    int                          num_data_files;
-    PVFS_BMI_addr_t              *data_server_addrs;
-    PVFS_handle_extent_array     *io_handle_extent_array;
-    PVFS_handle                  metafile_handle;
-    PVFS_handle                  *datafile_handles;
-    PINT_dist                    *dist;
-    int                           stored_error_code;
-    int                          retry_count;
-};
-
-/* PINT_client_mkdir_sm */
-struct PINT_client_mkdir_sm {
-    char                         *object_name;    /* input parameter  */
-    PVFS_sysresp_mkdir           *mkdir_resp;     /* in/out parameter */
-    PVFS_sys_attr                sys_attr;        /* input parameter  */
-    PVFS_handle                  metafile_handle;
-    int                          stored_error_code;
-    int                          retry_count;
-};
-
-/* PINT_client_symlink_sm */
-struct PINT_client_symlink_sm {
-    PVFS_object_ref        parent_ref;      /* input parameter */
-    char                         *link_name;      /* input parameter */
-    char                         *link_target;    /* input parameter */
-    PVFS_sysresp_symlink         *sym_resp;       /* in/out parameter*/
-    PVFS_sys_attr                sys_attr;        /* input parameter */
-    PVFS_handle                  symlink_handle;
-    int                          stored_error_code;
-    int                          retry_count;
-};
-
-/* PINT_client_getattr_sm */
-struct PINT_client_getattr_sm {
-    PVFS_object_ref object_ref;     /* input parameter */
-    uint32_t              attrmask;       /* input parameter */
-    int                   datafile_count; /* from object attribs */
-    PVFS_handle          *datafile_handles;
-    PINT_dist            *dist_p;
-    uint32_t              dist_size;
-    PVFS_size            *size_array;     /* from datafile attribs */
+struct PINT_client_create_sm
+{
+    char *object_name;                /* input parameter */
+    PVFS_sysresp_create *create_resp; /* in/out parameter*/
+    PVFS_sys_attr sys_attr;           /* input parameter */
+
+    int retry_count;
+    int num_data_files;
+    int stored_error_code;
+
+    PINT_dist *dist;
+    PVFS_handle metafile_handle;
+    PVFS_handle *datafile_handles;
+    PVFS_BMI_addr_t *data_server_addrs;
+    PVFS_handle_extent_array *io_handle_extent_array;
+};
+
+struct PINT_client_mkdir_sm
+{
+    char *object_name;              /* input parameter  */
+    PVFS_sysresp_mkdir *mkdir_resp; /* in/out parameter */
+    PVFS_sys_attr sys_attr;         /* input parameter  */
+
+    int retry_count;
+    int stored_error_code;
+    PVFS_handle metafile_handle;
+};
+
+struct PINT_client_symlink_sm
+{
+    PVFS_object_ref parent_ref;     /* input parameter */
+    char *link_name;                /* input parameter */
+    char *link_target;              /* input parameter */
+    PVFS_sysresp_symlink *sym_resp; /* in/out parameter*/
+    PVFS_sys_attr sys_attr;         /* input parameter */
+
+    int retry_count;
+    int stored_error_code;
+    PVFS_handle symlink_handle;
+};
+
+struct PINT_client_getattr_sm
+{
+    PVFS_object_ref object_ref;           /* input parameter */
+    uint32_t attrmask;                    /* input parameter */
+
+    PVFS_size *size_array;                /* from datafile attribs */
     PVFS_sysresp_getattr *getattr_resp_p; /* destination for output */
 };
 
-/* PINT_client_setattr_sm */
 struct PINT_client_setattr_sm
 {
-    PVFS_object_ref refn;          /* input parameter */
-    PVFS_sys_attr   sys_attr;      /* input parameter */
+    PVFS_object_ref refn;   /* input parameter */
+    PVFS_sys_attr sys_attr; /* input parameter */
 };
 
-/* PINT_client_io_sm
- *
- * Data specific to I/O operations on the client side.
- */
-struct PINT_client_io_sm {
+typedef struct
+{
+    /* the index of the current context (in the context array) */
+    int index;
+
+    /* the metafile's dfile server index we're communicating with */
+    int server_nr;
+
+    /* the data handle we're responsible for doing I/O on */
+    PVFS_handle data_handle;
+
+    /* a reference to the msgpair we're using for communication */
+    PINT_client_sm_msgpair_state *msg;
+
+    job_id_t flow_job_id;
+    job_status_s flow_status;
+    flow_descriptor flow_desc;
+    PVFS_msg_tag_t session_tag;
+
+    PINT_client_sm_recv_state write_ack;
+
+    /*
+      all *_has_been_posted fields are used at io_analyze_results time
+      to know if we should be checking for errors on particular fields
+    */
+    int msg_send_has_been_posted;
+    int msg_recv_has_been_posted;
+    int flow_has_been_posted;
+    int write_ack_has_been_posted;
+
+    /*
+      all *_in_progress fields are used at cancellation time to
+      determine what operations are currently in flight
+    */
+    int msg_send_in_progress;
+    int msg_recv_in_progress;
+    int flow_in_progress;
+    int write_ack_in_progress;
+
+} PINT_client_io_ctx;
+
+struct PINT_client_io_sm
+{
     /* input parameters */
-    enum PVFS_io_type     io_type;
-    PVFS_Request          file_req;
-    PVFS_offset           file_req_offset;
-    void                 *buffer;
-    PVFS_Request          mem_req;
-    int                   stored_error_code;
-    int                   retry_count;
-
-    /* cached from object attributes */
-    int                   orig_datafile_count;
-    int                   datafile_count;
-    PVFS_handle          *datafile_handles;
-    int			 *datafile_index_array;
-    PINT_dist            *dist_p;
-    uint32_t              dist_size;
-
-    /* data regarding flows */
-    int                     flow_comp_ct;
-    flow_descriptor        *flow_array;
-    job_status_s           *flow_status_array;
+    enum PVFS_io_type io_type;
+    PVFS_Request file_req;
+    PVFS_offset file_req_offset;
+    void *buffer;
+    PVFS_Request mem_req;
+
+    /* output parameter */
+    PVFS_sysresp_io *io_resp_p;
+
     enum PVFS_flowproto_type flowproto_type;
     enum PVFS_encoding_type encoding;
 
-    /* session tags, used in all messages */
-    PVFS_msg_tag_t         *session_tag_array;
+    int datafile_count;
+    int *datafile_index_array;
 
-    /* data regarding final acknowledgements (writes only) */
-    int                        ack_comp_ct;
-    PINT_client_sm_recv_state *ackarray;
+    int msgpair_completion_count;
+    int flow_completion_count;
+    int write_ack_completion_count;
 
-    /* output parameter */
-    PVFS_sysresp_io      *io_resp_p;
+    PINT_client_io_ctx *contexts;
+
+    int total_cancellations_remaining;
+
+    int retry_count;
+    int stored_error_code;
 };
 
-/* PINT_client_flush_sm */
-struct PINT_client_flush_sm {
+struct PINT_client_flush_sm
+{
 };
 
-/* PINT_client_readdir_sm */
-struct PINT_client_readdir_sm {
-    PVFS_object_ref         object_ref;     /* looked up */
-    PVFS_ds_position              pos_token;      /* input parameter */
-    int                           dirent_limit;   /* input parameter */
-    PVFS_sysresp_readdir          *readdir_resp;  /* in/out parameter*/
+struct PINT_client_readdir_sm
+{
+    PVFS_object_ref object_ref;         /* looked up */
+    PVFS_ds_position pos_token;         /* input parameter */
+    int dirent_limit;                   /* input parameter */
+    PVFS_sysresp_readdir *readdir_resp; /* in/out parameter*/
 };
 
 typedef struct
 {
-    char                         *seg_name;
-    char                         *seg_remaining;
-    PVFS_object_attr             seg_attr;
-    PVFS_object_ref        seg_starting_refn;
-    PVFS_object_ref        seg_resolved_refn;
+    char *seg_name;
+    char *seg_remaining;
+    PVFS_object_attr seg_attr;
+    PVFS_object_ref seg_starting_refn;
+    PVFS_object_ref seg_resolved_refn;
 } PINT_client_lookup_sm_segment;
 
 typedef struct
 {
-    int                           total_segments;
-    int                           current_segment;
+    int total_segments;
+    int current_segment;
     PINT_client_lookup_sm_segment segments[MAX_LOOKUP_SEGMENTS];
-    PVFS_object_ref         ctx_starting_refn;
-    PVFS_object_ref         ctx_resolved_refn;
+    PVFS_object_ref ctx_starting_refn;
+    PVFS_object_ref ctx_resolved_refn;
 } PINT_client_lookup_sm_ctx;
 
-/* PINT_client_lookup_sm */
 struct PINT_client_lookup_sm
 {
-    char                       *orig_pathname;/* input parameter */
-    PVFS_object_ref            starting_refn; /* input parameter */
-    PVFS_sysresp_lookup        *lookup_resp;  /* in/out parameter*/
-    int                        follow_link;   /* input parameter */
-    int                        current_context;
-    PINT_client_lookup_sm_ctx  contexts[MAX_LOOKUP_CONTEXTS];
-};
-
-/* PINT_client_rename_sm */
-struct PINT_client_rename_sm {
-    char            *entries[2];     /* old/new entry names;
-                                        input parameter */
-    PVFS_object_ref parent_refns[2]; /* old/new parent pinode refns;
-                                        input parameter */
-    PVFS_object_ref refns[2];        /* old/new pinode ref */
-    int             rmdirent_index;
-    int             target_dirent_exists;
-    PVFS_handle     old_dirent_handle;
-    int		    retry_count;
-    int             stored_error_code;
+    char *orig_pathname;              /* input parameter */
+    PVFS_object_ref starting_refn;    /* input parameter */
+    PVFS_sysresp_lookup *lookup_resp; /* in/out parameter*/
+    int follow_link;                  /* input parameter */
+    int current_context;
+    PINT_client_lookup_sm_ctx contexts[MAX_LOOKUP_CONTEXTS];
+};
+
+struct PINT_client_rename_sm
+{
+    char *entries[2];                /* old/new input entry names */
+    PVFS_object_ref parent_refns[2]; /* old/new input parent refns */
+
+    PVFS_object_ref refns[2];        /* old/new object refns */
+    int retry_count;
+    int stored_error_code;
+    int rmdirent_index;
+    int target_dirent_exists;
+    PVFS_handle old_dirent_handle;
 };
 
 struct PINT_client_mgmt_setparam_list_sm 
@@ -280,31 +317,31 @@ struct PINT_client_mgmt_setparam_list_sm
 struct PINT_client_mgmt_statfs_list_sm
 {
     PVFS_fs_id fs_id;
-    struct PVFS_mgmt_server_stat* stat_array;
+    struct PVFS_mgmt_server_stat *stat_array;
     int count; 
-    PVFS_id_gen_t* addr_array;
+    PVFS_id_gen_t *addr_array;
     PVFS_error_details *details;
 };
 
 struct PINT_client_mgmt_perf_mon_list_sm
 {
     PVFS_fs_id fs_id;
-    struct PVFS_mgmt_perf_stat** perf_matrix;
-    uint64_t* end_time_ms_array;
+    struct PVFS_mgmt_perf_stat **perf_matrix;
+    uint64_t *end_time_ms_array;
     int server_count; 
     int history_count; 
-    PVFS_id_gen_t* addr_array;
-    uint32_t* next_id_array;
+    PVFS_id_gen_t *addr_array;
+    uint32_t *next_id_array;
     PVFS_error_details *details;
 };
 
 struct PINT_client_mgmt_event_mon_list_sm
 {
     PVFS_fs_id fs_id;
-    struct PVFS_mgmt_event** event_matrix;
+    struct PVFS_mgmt_event **event_matrix;
     int server_count; 
     int event_count; 
-    PVFS_id_gen_t* addr_array;
+    PVFS_id_gen_t *addr_array;
     PVFS_error_details *details;
 };
 
@@ -312,10 +349,10 @@ struct PINT_client_mgmt_iterate_handles_
 {
     PVFS_fs_id fs_id;
     int server_count; 
-    PVFS_id_gen_t* addr_array;
-    PVFS_handle** handle_matrix;
-    int* handle_count_array;
-    PVFS_ds_position* position_array;
+    PVFS_id_gen_t *addr_array;
+    PVFS_handle **handle_matrix;
+    int *handle_count_array;
+    PVFS_ds_position *position_array;
     PVFS_error_details *details;
 };
 
@@ -325,56 +362,75 @@ struct PINT_client_mgmt_get_dfile_array_
     int dfile_count;
 };
 
-struct PINT_client_truncate_sm {
-    PVFS_size			size; /* new logical size of object*/
+struct PINT_client_truncate_sm
+{
+    PVFS_size size; /* new logical size of object*/
 };
 
-struct PINT_server_get_config_sm {
-    struct PVFS_sys_mntent* mntent;
+struct PINT_server_get_config_sm
+{
+    struct PVFS_sys_mntent *mntent;
     char *fs_config_buf;
-    uint32_t fs_config_buf_size;
     char *server_config_buf;
+    uint32_t fs_config_buf_size;
     uint32_t server_config_buf_size;
     int persist_config_buffers;
 };
 
-typedef struct PINT_client_sm {
-    /* STATE MACHINE VALUES */
-    int stackptr; /* stack of contexts for nested state machines */
-    union PINT_state_array_values *current_state; /* xxx */
+typedef struct PINT_client_sm
+{
+    /*
+      internal state machine values; the stack is used for tracking
+      movement through nested state machines
+    */
+    int stackptr;
+    union PINT_state_array_values *current_state;
     union PINT_state_array_values *state_stack[PINT_STATE_STACK_SIZE];
 
-    int op; /* holds pvfs system operation type, defined up above */
+    /* the system interface operation type (defined below) */
+    int op;
+
+    /* indicates when the operation as a whole is finished */
+    int op_complete;
+
+    /* indicates when the operation has been cancelled */
+    int op_cancelled;
+
+    /* stores the final operation error code on operation exit */
+    PVFS_error error_code;
 
-    /* CLIENT SM VALUES */
-    int op_complete; /* used to indicate that the operation as a 
-		      * whole is finished.
-		      */
-    PVFS_error error_code; /* used to hold final job status so client
-			    * can determine what finally happened
-			    */
     int comp_ct; /* used to keep up with completion of multiple
 		  * jobs for some states; typically set and
 		  * then decremented to zero as jobs complete */
 
-    int ncache_hit; /* set if last segment lookup was from ncache */
-    int acache_hit; /* set if pinode was from acache */
-    PINT_pinode *pinode; /* filled in on acache hit */
-    PVFS_object_attr acache_attr; /* a scratch attr space */
+    /* indicates that an ncache hit has been made */ 
+    int ncache_hit;
+
+    /* indicates that an acache hit has been made */ 
+    int acache_hit;
+
+    /* on acache hit, the attribute here can be used */
+    PINT_pinode *pinode;
+
+    /*
+      on acache miss, an attribute is typically stored here for later
+      insertion into the acache (if possible)
+    */
+    PVFS_object_attr acache_attr;
 
     /* generic msgpair used with msgpair substate */
     PINT_client_sm_msgpair_state msgpair;
 
-    /* msgpair array ptr used when operations can be performed concurrently.
-     * obviously this has to be allocated within the upper-level state
-     * machine.  used with msgpairarray substate typically.
+    /* msgpair array ptr used when operations can be performed
+     * concurrently.  this must be allocated within the upper-level
+     * state machine and is used with the msgpairarray sm.
      */
     int msgarray_count;
     PINT_client_sm_msgpair_state *msgarray;
 
     /*
       internal pvfs_object references; used in conjunction with the
-      sm_common state machine routines, or otherwise as scratch pinode
+      sm_common state machine routines, or otherwise as scratch object
       references during sm processing
     */
     PVFS_object_ref object_ref;
@@ -420,13 +476,35 @@ int PINT_client_state_machine_post(
     PVFS_sys_op_id *op_id,
     void *user_ptr);
 
+int PINT_sys_dev_unexp(
+    struct PINT_dev_unexp_info *info,
+    job_status_s *jstat,
+    PVFS_sys_op_id *op_id,
+    void *user_ptr);
+
 int PINT_client_state_machine_test(
     PVFS_sys_op_id op_id,
     int *error_code);
 
 int PINT_client_state_machine_testsome(
     PVFS_sys_op_id *op_id_array,
-    int *op_count); /* in/out */
+    int *op_count, /* in/out */
+    void **user_ptr_array,
+    int *error_code_array,
+    int timeout_ms);
+
+/* exposed wrapper around the client-state-machine testsome function */
+static inline int PINT_sys_testsome(
+    PVFS_sys_op_id *op_id_array,
+    int *op_count, /* in/out */
+    void **user_ptr_array,
+    int *error_code_array,
+    int timeout_ms)
+{
+    return PINT_client_state_machine_testsome(
+        op_id_array, op_count, user_ptr_array,
+        error_code_array, timeout_ms);
+}
 
 /* exposed wrappers around the id-generator code */
 static inline int PINT_id_gen_safe_register(
@@ -451,28 +529,30 @@ static inline int PINT_id_gen_safe_unreg
 /* used with post call to tell the system what state machine to use
  * when processing a new PINT_client_sm structure.
  */
-enum {
-    PVFS_SYS_REMOVE  = 1,
-    PVFS_SYS_CREATE  = 2,
-    PVFS_SYS_MKDIR   = 3,
-    PVFS_SYS_SYMLINK = 4,
-    PVFS_SYS_GETATTR = 5,
-    PVFS_SYS_IO      = 6,
-    PVFS_SYS_FLUSH   = 7,
-    PVFS_SYS_TRUNCATE= 8,
-    PVFS_SYS_READDIR = 9,
-    PVFS_SYS_SETATTR = 10,
-    PVFS_SYS_LOOKUP  = 11,
-    PVFS_SYS_RENAME  = 12,
-    PVFS_MGMT_SETPARAM_LIST = 70,
-    PVFS_MGMT_NOOP   = 71,
-    PVFS_MGMT_STATFS_LIST = 72,
-    PVFS_MGMT_PERF_MON_LIST = 73,
+enum
+{
+    PVFS_SYS_REMOVE                = 1,
+    PVFS_SYS_CREATE                = 2,
+    PVFS_SYS_MKDIR                 = 3,
+    PVFS_SYS_SYMLINK               = 4,
+    PVFS_SYS_GETATTR               = 5,
+    PVFS_SYS_IO                    = 6,
+    PVFS_SYS_FLUSH                 = 7,
+    PVFS_SYS_TRUNCATE              = 8,
+    PVFS_SYS_READDIR               = 9,
+    PVFS_SYS_SETATTR               = 10,
+    PVFS_SYS_LOOKUP                = 11,
+    PVFS_SYS_RENAME                = 12,
+    PVFS_MGMT_SETPARAM_LIST        = 70,
+    PVFS_MGMT_NOOP                 = 71,
+    PVFS_MGMT_STATFS_LIST          = 72,
+    PVFS_MGMT_PERF_MON_LIST        = 73,
     PVFS_MGMT_ITERATE_HANDLES_LIST = 74,
-    PVFS_MGMT_GET_DFILE_ARRAY = 75,
-    PVFS_MGMT_EVENT_MON_LIST = 76,
-    PVFS_SERVER_GET_CONFIG = 77,
-    PVFS_CLIENT_JOB_TIMER = 200
+    PVFS_MGMT_GET_DFILE_ARRAY      = 75,
+    PVFS_MGMT_EVENT_MON_LIST       = 76,
+    PVFS_SERVER_GET_CONFIG         = 77,
+    PVFS_CLIENT_JOB_TIMER          = 200,
+    PVFS_DEV_UNEXPECTED            = 300
 };
 
 /* prototypes of helper functions */
@@ -506,13 +586,7 @@ int PINT_serv_msgpairarray_resolve_addrs
 
 int PINT_client_bmi_cancel(job_id_t id);
 
-/* INCLUDE STATE-MACHINE.H DOWN HERE */
-#define PINT_OP_STATE       PINT_client_sm
-#if 0
-#define PINT_OP_STATE_TABLE PINT_server_op_table
-#endif
-
-#include "state-machine.h"
+int PINT_client_io_cancel(job_id_t id);
 
 /* internal non-blocking helper methods */
 int PINT_client_wait_internal(
@@ -533,21 +607,31 @@ PINT_client_wait_internal(op_id, in_op_s
 
 #define PINT_mgmt_release(op_id) PINT_sys_release(op_id)
 
-#define PINT_init_sysint_credentials(sm_p_cred_p, user_cred_p)  \
-do {                                                            \
-    sm_p_cred_p = PVFS_util_dup_credentials(user_cred_p);       \
-    if (!sm_p_cred_p)                                           \
-    {                                                           \
-        gossip_lerr("Failed to copy user credentials\n");       \
-        free(sm_p);                                             \
-        return -PVFS_ENOMEM;                                    \
-    }                                                           \
+#define PINT_init_sysint_credentials(sm_p_cred_p, user_cred_p)\
+do {                                                          \
+    sm_p_cred_p = PVFS_util_dup_credentials(user_cred_p);     \
+    if (!sm_p_cred_p)                                         \
+    {                                                         \
+        gossip_lerr("Failed to copy user credentials\n");     \
+        free(sm_p);                                           \
+        return -PVFS_ENOMEM;                                  \
+    }                                                         \
 } while(0)
 
 /* misc helper methods */
 struct server_configuration_s *PINT_get_server_config_struct(
     PVFS_fs_id fs_id);
 
+/************************************
+ * state-machine.h included here
+ ************************************/
+#define PINT_OP_STATE       PINT_client_sm
+#if 0
+#define PINT_OP_STATE_TABLE PINT_server_op_table
+#endif
+
+#include "state-machine.h"
+
 /* system interface function state machines */
 extern struct PINT_state_machine_s pvfs2_client_remove_sm;
 extern struct PINT_state_machine_s pvfs2_client_create_sm;
@@ -576,6 +660,7 @@ extern struct PINT_state_machine_s pvfs2
 extern struct PINT_state_machine_s pvfs2_client_getattr_acache_sm;
 extern struct PINT_state_machine_s pvfs2_client_lookup_ncache_sm;
 extern struct PINT_state_machine_s pvfs2_client_remove_helper_sm;
+#endif
 
 /*
  * Local variables:
@@ -585,5 +670,3 @@ extern struct PINT_state_machine_s pvfs2
  *
  * vim: ts=8 sts=4 sw=4 noexpandtab
  */
-
-#endif

Index: finalize.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/finalize.c,v
diff -p -u -r1.27 -r1.28
--- finalize.c	17 May 2004 19:48:26 -0000	1.27
+++ finalize.c	8 Jul 2004 16:17:06 -0000	1.28
@@ -27,6 +27,8 @@ extern job_context_id PVFS_sys_job_conte
 extern gen_mutex_t *g_session_tag_mt_lock;
 extern gen_mutex_t *g_server_config_mutex;
 
+extern PINT_client_sm *g_sm_p;
+
 /* PVFS_finalize
  *
  * shuts down the PVFS system interface
@@ -65,6 +67,8 @@ int PVFS_sys_finalize()
     PINT_release_pvfstab();
 
     gossip_disable();
+
+    free(g_sm_p);
 
     return 0;
 }

Index: fs-add.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/fs-add.c,v
diff -p -u -r1.15 -r1.16
--- fs-add.c	17 May 2004 21:06:51 -0000	1.15
+++ fs-add.c	8 Jul 2004 16:17:06 -0000	1.16
@@ -49,10 +49,12 @@ int PVFS_sys_fs_add(struct PVFS_sys_mnte
         return -PVFS_EEXIST;
     }
 
-    /* first make sure BMI knows how to handle this method, else fail quietly */
+    /* make sure BMI knows how to handle this method, else fail quietly */
     ret = BMI_addr_lookup(&test_addr, mntent->pvfs_config_server);
     if (ret == bmi_errno_to_pvfs(-ENOPROTOOPT))
+    {
 	goto error_exit;
+    }
 
     new_server_config = (struct server_configuration_s *)malloc(
         sizeof(struct server_configuration_s));

Index: getattr-acache.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/getattr-acache.sm,v
diff -p -u -r1.5 -r1.6
--- getattr-acache.sm	28 Apr 2004 16:32:41 -0000	1.5
+++ getattr-acache.sm	8 Jul 2004 16:17:06 -0000	1.6
@@ -181,15 +181,15 @@ static int getattr_acache_lookup(PINT_cl
             fake_resp.status = 0;
 
             /*
-              skip the deep fill (i.e. copy) of attributes
-              on cache hit.  by setting the acache_hit flag,
-              caller state machine won't try to read those
-              attributes out of this faked response in their
-              getattr completion functions
+              skip the deep fill (i.e. copy) of attributes on cache
+              hit.  by setting the acache_hit flag, caller state
+              machine won't try to read those attributes out of this
+              faked response in their getattr completion functions.
+              also, the caller MUST call PINT_acache_release or
+              PINT_acache_release_refn on acache hit.
             */
             sm_p->acache_hit = 1;
             sm_p->msgpair.comp_fn(sm_p, &fake_resp, 0);
-            PINT_acache_release(sm_p->pinode);
             return 1;
         }
       cache_miss:

Index: initialize.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/initialize.c,v
diff -p -u -r1.84 -r1.85
--- initialize.c	20 May 2004 17:27:48 -0000	1.84
+++ initialize.c	8 Jul 2004 16:17:06 -0000	1.85
@@ -29,6 +29,8 @@
 
 job_context_id PVFS_sys_job_context = -1;
 
+PINT_client_sm *g_sm_p = NULL;
+
 extern gen_mutex_t *g_session_tag_mt_lock;
 
 typedef enum
@@ -72,6 +74,9 @@ int PVFS_sys_initialize(int default_debu
     {
 	return(-PVFS_ENOMEM);
     }
+
+    /* keep track of this pointer for freeing on finalize */
+    g_sm_p = sm_p;
     memset(sm_p, 0, sizeof(*sm_p));
 
     gossip_enable_stderr();
@@ -167,7 +172,7 @@ int PVFS_sys_initialize(int default_debu
         gossip_lerr("Error initializing attribute cache\n");
         goto error_exit;        
     }
-    PINT_acache_set_timeout(PINT_ACACHE_TIMEOUT * 1000);
+    PINT_acache_set_timeout(PINT_ACACHE_TIMEOUT_MS);
     client_status_flag |= CLIENT_ACACHE_INIT;
 
     /* initialize the name lookup cache and set the default timeout */
@@ -177,7 +182,7 @@ int PVFS_sys_initialize(int default_debu
         gossip_lerr("Error initializing name lookup cache\n");
         goto error_exit;        
     }        
-    PINT_ncache_set_timeout(PINT_NCACHE_TIMEOUT * 1000);
+    PINT_ncache_set_timeout(PINT_NCACHE_TIMEOUT_MS);
     client_status_flag |= CLIENT_NCACHE_INIT;
 
     /* initialize the server configuration manager */

Index: msgpairarray.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/msgpairarray.sm,v
diff -p -u -r1.43 -r1.44
--- msgpairarray.sm	20 May 2004 17:27:49 -0000	1.43
+++ msgpairarray.sm	8 Jul 2004 16:17:06 -0000	1.44
@@ -41,6 +41,9 @@ static int msgpairarray_complete(
 static int msgpairarray_completion_fn(
     PINT_client_sm *sm_p, job_status_s *js_p);
 
+static int count_incomplete_msgs(
+    PINT_client_sm_msgpair_state *msgarray, int array_count);
+
 %%
 
 nested machine pvfs2_client_msgpairarray_sm(init,
@@ -92,8 +95,8 @@ static int msgpairarray_init(PINT_client
     PINT_client_sm_msgpair_state *msg_p = NULL;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "msgpairarray state: init (%d message(s))\n",
-                 sm_p->msgarray_count);
+                 "(%p) msgpairarray state: init (%d msgpair(s))\n",
+                 sm_p, sm_p->msgarray_count);
 
     assert(sm_p->msgarray_count > 0);
 
@@ -104,15 +107,19 @@ static int msgpairarray_init(PINT_client
       comp_ct in the first msgarray entry to keep up with the count
       for the entire array.
     */
-    sm_p->msgarray[0].comp_ct = 2 * sm_p->msgarray_count;
+    sm_p->msgarray[0].comp_ct = (2 * sm_p->msgarray_count);
 
     for(i = 0; i < sm_p->msgarray_count; i++)
     {
         msg_p = &sm_p->msgarray[i];
         assert(msg_p);
 
+        assert((msg_p->retry_flag == PVFS_MSGPAIR_RETRY) ||
+               (msg_p->retry_flag == PVFS_MSGPAIR_NO_RETRY));
+
         msg_p->encoded_resp_p = NULL;
         msg_p->retry_count = 0;
+        msg_p->complete = 0;
     }
     return 1;
 }
@@ -143,22 +150,31 @@ static int msgpairarray_post(PINT_client
     PVFS_msg_tag_t session_tag;
     PINT_client_sm_msgpair_state *msg_p = NULL;
     struct filesystem_configuration_s *cur_fs = NULL;
+    int num_incomplete_msgpairs = (sm_p->msgarray[0].comp_ct / 2);
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "msgpairarray state: post (%d message(s))\n",
-		 sm_p->msgarray_count);
-
-    assert(sm_p->msgarray_count > 0);
+    gossip_debug(
+        GOSSIP_CLIENT_DEBUG, "(%p) msgpairarray state: post "
+        "(%d total message(s) with %d incomplete)\n", sm_p,
+        (sm_p->msgarray_count * 2), (num_incomplete_msgpairs * 2));
 
     js_p->error_code = 0;
 
-    for (i = 0; i < sm_p->msgarray_count; i++)
+    assert(sm_p->msgarray_count > 0);
+    assert(num_incomplete_msgpairs && sm_p->msgarray[0].comp_ct);
+
+    for (i = 0; i < num_incomplete_msgpairs; i++)
     {
         msg_p = &sm_p->msgarray[i];
         assert(msg_p);
 
-        assert((msg_p->retry_flag == PVFS_MSGPAIR_RETRY) ||
-               (msg_p->retry_flag == PVFS_MSGPAIR_NO_RETRY));
+        /*
+          here we skip over the msgs that have already completed in
+          the case of being in the retry code path when it's ok
+        */
+        if (msg_p->complete)
+        {
+            continue;
+        }
 
         msg_p->op_status = 0;
 
@@ -189,7 +205,7 @@ static int msgpairarray_post(PINT_client
                 return 1;
             }
 
-            /* calculate maximum response message size and allocate space */
+            /* calculate max response msg size and allocate space */
             msg_p->max_resp_sz = PINT_encode_calc_max_size(
                 PINT_ENCODE_RESP, msg_p->req.op, enc_type);
 
@@ -205,6 +221,10 @@ static int msgpairarray_post(PINT_client
 
 	session_tag = get_next_session_tag();
 
+        gossip_debug(GOSSIP_CLIENT_DEBUG,
+                     "(%p) msgpair %d (%p): posting recv\n",
+                     sm_p, (i + 1), msg_p);
+
 	/* post receive of response; job_id stored in recv_id */
 	ret = job_bmi_recv(msg_p->svr_addr,
 			   msg_p->encoded_resp_p,
@@ -228,7 +248,7 @@ static int msgpairarray_post(PINT_client
 			   &msg_p->recv_status, 0, pint_client_sm_context);
 	}
 
-	if (ret < 0 || ret == 1)
+	if ((ret < 0) || (ret == 1))
         {
 	    /* it is impossible for this recv to complete at this point
 	     * without errors; we haven't sent the request yet!
@@ -260,6 +280,10 @@ static int msgpairarray_post(PINT_client
 	 */
 	assert(ret == 0);
 
+        gossip_debug(GOSSIP_CLIENT_DEBUG,
+                     "(%p) msgpair %d (%p): posting send\n",
+                     sm_p, (i + 1), msg_p);
+
 	/* post send of request; job_id stored in send_id */
 	ret = job_bmi_send_list(msg_p->encoded_req.dest,
 				msg_p->encoded_req.buffer_list,
@@ -275,13 +299,19 @@ static int msgpairarray_post(PINT_client
 				&msg_p->send_id,
 				pint_client_sm_context,
 				PVFS2_CLIENT_JOB_TIMEOUT);
-	if (ret < 0 || (ret == 1 && msg_p->send_status.error_code != 0))
+
+ 	if ((ret < 0) ||
+            ((ret == 1) && (msg_p->send_status.error_code != 0)))
 	{
-	    if(ret < 0)
+	    if (ret < 0)
+            {
 		PVFS_perror_gossip("Post of send failed", ret);
+            }
 	    else
+            {
 		PVFS_perror_gossip("Send immediately failed",
 		    msg_p->recv_status.error_code);
+            }
 
 	    gossip_err("Send error: cancelling recv.\n");
 
@@ -294,7 +324,7 @@ static int msgpairarray_post(PINT_client
 	    msg_p->send_id = 0;
 	    sm_p->msgarray[0].comp_ct--;
 	}
-	else if(ret == 1)
+	else if (ret == 1)
 	{
 	    /* immediate completion */
 	    msg_p->send_id = 0;
@@ -305,13 +335,10 @@ static int msgpairarray_post(PINT_client
 	     */
 	    sm_p->msgarray[0].comp_ct--;
 	}
-	else
-	{
-	    /* successful post, no immediate completion */
-	}
+        /* else: successful post, no immediate completion */
     }
 
-    if(sm_p->msgarray[0].comp_ct == 0)
+    if (sm_p->msgarray[0].comp_ct == 0)
     {
 	/* everything is completed already (could happen in some failure
 	 * cases); jump straight to final completion function.
@@ -319,14 +346,11 @@ static int msgpairarray_post(PINT_client
 	 js_p->error_code = MSGPAIRS_COMPLETE;
 	 return 1;
     }
-    else
-    {
-	/* we are still waiting on operations to complete, next state
-	 * transition will handle them
-	 */
-	return 0;
 
-    }
+    /* we are still waiting on operations to complete, next state
+     * transition will handle them
+     */
+    return 0;
 }
 
 static int msgpairarray_post_retry(PINT_client_sm *sm_p,
@@ -335,7 +359,7 @@ static int msgpairarray_post_retry(PINT_
     job_id_t tmp_id;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "msgpairarray state: post_retry\n");
+                 "(%p) msgpairarray state: post_retry\n", sm_p);
 
     return job_req_sched_post_timer(
         PVFS2_CLIENT_RETRY_DELAY, sm_p, 0, js_p, &tmp_id,
@@ -345,7 +369,8 @@ static int msgpairarray_post_retry(PINT_
 static int msgpairarray_complete(PINT_client_sm *sm_p,
                                  job_status_s *js_p)
 {
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "msgpairarray state: complete\n");
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+                 "(%p) msgpairarray state: complete\n", sm_p);
 
     /* match operation with something in the msgpair array */
     /* the first N tags are receives, the second N are sends */
@@ -410,13 +435,13 @@ static int msgpairarray_completion_fn(PI
 				      job_status_s *js_p)
 {
     int ret = -PVFS_EINVAL, i = 0;
-    struct PINT_decoded_msg decoded_resp; /* data about decoded resp */
+    struct PINT_decoded_msg decoded_resp;
     struct PVFS_server_resp *resp_p; /* response structure (decoded) */
 
     js_p->error_code = 0;
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "msgpairarray state: "
-                 "completion_fn\n");
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) msgpairarray state: "
+                 "completion_fn\n", sm_p);
 
     for (i = 0; i < sm_p->msgarray_count; i++)
     {
@@ -432,12 +457,23 @@ static int msgpairarray_completion_fn(PI
             {
                 msg_p->retry_count++;
 
-                gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgarray send "
-                             "failed -- retrying msgpair "
-                             "(count = %d)\n", msg_p->retry_count);
-
-                /* FIXME: only redo the failed ones, not all */
-                sm_p->msgarray[0].comp_ct = 2 * sm_p->msgarray_count;
+                gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgpairarray send "
+                             "failed -- retrying msgpair (count = %d)\n",
+                             msg_p->retry_count);
+
+                /*
+                  NOTE: we only retry msgpairs that haven't yet been
+                  completed
+                */
+                sm_p->msgarray[0].comp_ct = 2 * count_incomplete_msgs(
+                    sm_p->msgarray, sm_p->msgarray_count);
+
+                assert(sm_p->msgarray[0].comp_ct > 0);
+
+                gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgpairarray is "
+                             "retrying %d of %d total messages\n",
+                             sm_p->msgarray[0].comp_ct,
+                             (2 * sm_p->msgarray_count));
 
                 js_p->error_code = MSGPAIRS_RETRY;
                 return 1;
@@ -469,7 +505,7 @@ static int msgpairarray_completion_fn(PI
 	 * meaningful, so we save it.
 	 */
 	msg_p->op_status = resp_p->status;
-	
+
 	/* NOTE: we call the function associated with each message,
 	 *       not just the one from the first array element.  so
 	 *       there could in theory be different functions for each
@@ -512,19 +548,46 @@ static int msgpairarray_completion_fn(PI
 	}
 	
 	/* free all the resources that we used to send and receive. */
-	ret = PINT_serv_free_msgpair_resources(&msg_p->encoded_req,
-					       msg_p->encoded_resp_p,
-					       &decoded_resp,
-					       &msg_p->svr_addr,
-					       msg_p->max_resp_sz);
-	if (ret != 0)
+	ret = PINT_serv_free_msgpair_resources(
+            &msg_p->encoded_req, msg_p->encoded_resp_p, &decoded_resp,
+            &msg_p->svr_addr, msg_p->max_resp_sz);
+	if (ret)
         {
             PVFS_perror_gossip("Failed to free msgpair resources", ret);
             js_p->error_code = ret;
             return 1;
 	}
+
+        msg_p->encoded_resp_p = NULL;
+        msg_p->max_resp_sz = 0;
+
+        /*
+          mark that this msgpair has been completed and should not be
+          retried in the case of possible future retries
+        */
+        msg_p->complete = 1;
+
+        gossip_debug(GOSSIP_CLIENT_DEBUG,
+                     "(%p) msgpair %d (%p): marked complete\n",
+                     sm_p, (i + 1), msg_p);
     }
     return 1;
+}
+
+static int count_incomplete_msgs(
+    PINT_client_sm_msgpair_state *msgarray, int array_count)
+{
+    int count = 0, i = 0;
+    PINT_client_sm_msgpair_state *msg = NULL;
+
+    for(i = 0; i < array_count; i++)
+    {
+        msg = &msgarray[i];
+        assert(msg);
+
+        count += (msg->complete ? 0 : 1);
+    }
+    return count;
 }
     
 /*

Index: ncache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/ncache.c,v
diff -p -u -r1.3 -r1.4
--- ncache.c	24 Mar 2004 23:10:30 -0000	1.3
+++ ncache.c	8 Jul 2004 16:17:06 -0000	1.4
@@ -58,7 +58,7 @@ static int ncache_add_dentry(
 
 /* static globals required for ncache operation */
 static ncache *cache = NULL;
-static int s_pint_ncache_timeout_ms = (PINT_NCACHE_TIMEOUT * 1000);
+static int s_pint_ncache_timeout_ms = PINT_NCACHE_TIMEOUT_MS;
 
 
 /* compare

Index: ncache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/ncache.h,v
diff -p -u -r1.2 -r1.3
--- ncache.h	24 Mar 2004 23:10:30 -0000	1.2
+++ ncache.h	8 Jul 2004 16:17:06 -0000	1.3
@@ -13,8 +13,8 @@
 /* number of entries allowed in the cache */
 #define PINT_NCACHE_MAX_ENTRIES 512
 
-/* number of seconds that cache entries will remain valid */
-#define PINT_NCACHE_TIMEOUT 5
+/* number of milliseconds that cache entries will remain valid */
+#define PINT_NCACHE_TIMEOUT_MS 5000
 
 /* TODO: replace later with real value from trove */
 /* value passed out to indicate lookups that didn't find a match */

Index: pint-bucket.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/pint-bucket.c,v
diff -p -u -r1.58 -r1.59
--- pint-bucket.c	1 Jun 2004 19:49:24 -0000	1.58
+++ pint-bucket.c	8 Jul 2004 16:17:06 -0000	1.59
@@ -8,6 +8,7 @@
 #include <string.h>
 #include <assert.h>
 #include <stdlib.h>
+#include <time.h>
 
 #include "pvfs2-types.h"
 #include "pvfs2-attr.h"

Index: shared-state-methods.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/shared-state-methods.c,v
diff -p -u -r1.17 -r1.18
--- shared-state-methods.c	4 May 2004 14:42:58 -0000	1.17
+++ shared-state-methods.c	8 Jul 2004 16:17:06 -0000	1.18
@@ -31,33 +31,32 @@ int PINT_sm_common_parent_getattr_setup_
 
     memset(&sm_p->msgpair, 0, sizeof(PINT_client_sm_msgpair_state));
 
-    assert(sm_p->parent_ref.fs_id != 0);
-    assert(sm_p->parent_ref.handle != 0);
+    sm_p->msgarray = &(sm_p->msgpair);
+    sm_p->msgarray_count = 1;
 
-    PINT_SERVREQ_GETATTR_FILL(sm_p->msgpair.req,
-			      *sm_p->cred_p,
-			      sm_p->parent_ref.fs_id,
-			      sm_p->parent_ref.handle,
-			      PVFS_ATTR_COMMON_ALL);
+    assert(sm_p->parent_ref.fs_id != PVFS_FS_ID_NULL);
+    assert(sm_p->parent_ref.handle != PVFS_HANDLE_NULL);
+
+    PINT_SERVREQ_GETATTR_FILL(
+        sm_p->msgpair.req,
+        *sm_p->cred_p,
+        sm_p->parent_ref.fs_id,
+        sm_p->parent_ref.handle,
+        PVFS_ATTR_COMMON_ALL);
 
-    /* fill in msgpair structure components */
     sm_p->msgpair.fs_id   = sm_p->parent_ref.fs_id;
     sm_p->msgpair.handle  = sm_p->parent_ref.handle;
     sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
-    sm_p->msgpair.comp_fn = PINT_sm_common_directory_getattr_comp_fn;
+    sm_p->msgpair.comp_fn = PINT_sm_common_object_getattr_comp_fn;
 
     ret = PINT_bucket_map_to_server(&sm_p->msgpair.svr_addr,
 				    sm_p->msgpair.handle,
 				    sm_p->msgpair.fs_id);
     if (ret)
     {
-        gossip_err("Failed to map meta server address\n");
+        PVFS_perror_gossip("Failed to map meta server address", ret);
         js_p->error_code = ret;
     }
-
-    sm_p->msgarray = &(sm_p->msgpair);
-    sm_p->msgarray_count = 1;
-
     return 1;
 }
 
@@ -81,8 +80,11 @@ int PINT_sm_common_object_getattr_setup_
 
     memset(&sm_p->msgpair, 0, sizeof(PINT_client_sm_msgpair_state));
 
-    assert(sm_p->object_ref.fs_id != 0);
-    assert(sm_p->object_ref.handle != 0);
+    sm_p->msgarray = &(sm_p->msgpair);
+    sm_p->msgarray_count = 1;
+
+    assert(sm_p->object_ref.fs_id != PVFS_FS_ID_NULL);
+    assert(sm_p->object_ref.handle != PVFS_HANDLE_NULL);
 
     PINT_SERVREQ_GETATTR_FILL(
         sm_p->msgpair.req,
@@ -91,7 +93,6 @@ int PINT_sm_common_object_getattr_setup_
         sm_p->object_ref.handle,
         (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_META_ALL));
 
-    /* fill in msgpair structure components */
     sm_p->msgpair.fs_id = sm_p->object_ref.fs_id;
     sm_p->msgpair.handle = sm_p->object_ref.handle;
     sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
@@ -102,13 +103,9 @@ int PINT_sm_common_object_getattr_setup_
 				    sm_p->msgpair.fs_id);
     if (ret)
     {
-        gossip_err("Failed to map meta server address\n");
+        PVFS_perror_gossip("Failed to map meta server address", ret);
         js_p->error_code = ret;
     }
-
-    sm_p->msgarray = &(sm_p->msgpair);
-    sm_p->msgarray_count = 1;
-
     return 1;
 }
 
@@ -120,121 +117,25 @@ int PINT_sm_common_object_getattr_failur
     return 1;
 }
 
-int PINT_sm_common_directory_getattr_comp_fn(
+int PINT_sm_common_object_getattr_comp_fn(
     void *v_p,
     struct PVFS_server_resp *resp_p,
     int index)
 {
-    int ret = 0;
-    PVFS_object_attr *attr = NULL;
     PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
     
-    assert(resp_p->op == PVFS_SERV_GETATTR);
-
-    assert(sm_p->msgarray == &sm_p->msgpair);
-    sm_p->msgarray = NULL;
-    sm_p->msgarray_count = 0;
-
     gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "PINT_sm_common_getattr_directory_comp_fn\n");
-
-    if (resp_p->status != 0)
-    {
-        gossip_err("Error: getattr failure\n");
-	return resp_p->status;
-    }
-
-    /*
-      if we didn't get a cache hit, we're making a copy of the
-      attributes here so that we can add a acache entry later in
-      cleanup.
-    */
-    if (!sm_p->acache_hit)
-    {
-        PINT_acache_object_attr_deep_copy(
-            &sm_p->acache_attr, &resp_p->u.getattr.attr);
-    }
-
-    /*
-      if we got a cache hit, use those attributes, otherwise use the
-      real server replied attrs
-    */
-    attr = (sm_p->acache_hit ?
-            &sm_p->pinode->attr :
-            &resp_p->u.getattr.attr);
-    assert(attr);
-
-    if (attr->objtype == PVFS_TYPE_DIRECTORY)
-    {
-        /*
-          check permissions against parent directory to determine
-          if we're allowed to create a new entry there
-        */
-        ret = PINT_check_perms(*attr, attr->perms,
-                          sm_p->cred_p->uid, sm_p->cred_p->gid);
-        if (ret < 0)
-        {
-            gossip_err("Error: Permission failure\n");
-            return -PVFS_EPERM;
-        }
-    }
-    else
-    {
-        gossip_err("Error: Parent is not a directory\n");
-	return -PVFS_ENOTDIR;
-    }
-
-    /*
-      if our parent directory attributes are good, and not present
-      int the acache, put them in the acache now
-    */
-    if (!sm_p->acache_hit)
-    {
-        int release_required = 1;
-        PINT_pinode *pinode =
-            PINT_acache_lookup(sm_p->u.getattr.object_ref);
-        if (!pinode)
-        {
-            pinode = PINT_acache_pinode_alloc();
-            assert(pinode);
-            release_required = 0;
-        }
-        pinode->refn = sm_p->u.getattr.object_ref;
-        pinode->size = ((attr->mask & PVFS_ATTR_DATA_ALL) ?
-                        attr->u.data.size : 0);
-
-        PINT_acache_object_attr_deep_copy(
-            &pinode->attr, attr);
-
-        PINT_acache_set_valid(pinode);
-
-        if (release_required)
-        {
-            PINT_acache_release(pinode);
-        }
-    }
-    return 0;
-}
+                 "PINT_sm_common_getattr_object_comp_fn\n");
 
-int PINT_sm_common_object_getattr_comp_fn(
-    void *v_p,
-    struct PVFS_server_resp *resp_p,
-    int index)
-{
-    PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
-    
     assert(resp_p->op == PVFS_SERV_GETATTR);
-
     assert(sm_p->msgarray == &sm_p->msgpair);
+
     sm_p->msgarray = NULL;
     sm_p->msgarray_count = 0;
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-                 "PINT_sm_common_getattr_object_comp_fn\n");
-
-    if (resp_p->status != 0)
+    if (resp_p->status)
     {
-        gossip_err("Error: getattr failure\n");
+        PVFS_perror_gossip("Getattr failed", resp_p->status);
 	return resp_p->status;
     }
 

Index: shared-state-methods.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/shared-state-methods.h,v
diff -p -u -r1.3 -r1.4
--- shared-state-methods.h	4 Nov 2003 15:48:36 -0000	1.3
+++ shared-state-methods.h	8 Jul 2004 16:17:06 -0000	1.4
@@ -8,35 +8,22 @@
 #define __SHARED_STATE_METHODS_H
 
 /*
-  this file is for storing common methods that are shared
-  between the following client state machines implementations:
-
-  sys-create.sm
-  sys-mkdir.sm
-  sys-symlink.sm
+  this file is for storing common methods that are shared between
+  client state machines (such as create, mkdir, symlink)
 */
 
-/*
-  shared/common state operation functions
-*/
-int PINT_sm_common_parent_getattr_setup_msgpair(PINT_client_sm *sm_p,
-                                           job_status_s *js_p);
-int PINT_sm_common_parent_getattr_failure(PINT_client_sm *sm_p,
-                                     job_status_s *js_p);
-int PINT_sm_common_object_getattr_setup_msgpair(PINT_client_sm *sm_p,
-                                           job_status_s *js_p);
-int PINT_sm_common_object_getattr_failure(PINT_client_sm *sm_p,
-                                     job_status_s *js_p);
+/* shared/common state operation functions */
+int PINT_sm_common_parent_getattr_setup_msgpair(
+    PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_parent_getattr_failure(
+    PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_object_getattr_setup_msgpair(
+    PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_object_getattr_failure(
+    PINT_client_sm *sm_p, job_status_s *js_p);
 
-/*
-  shared/common msgpair completion functions
-*/
-int PINT_sm_common_directory_getattr_comp_fn(void *v_p,
-                                        struct PVFS_server_resp *resp_p,
-                                        int index);
-int PINT_sm_common_object_getattr_comp_fn(void *v_p,
-                                     struct PVFS_server_resp *resp_p,
-                                     int index);
+int PINT_sm_common_object_getattr_comp_fn(
+    void *v_p, struct PVFS_server_resp *resp_p, int index);
 
 #endif /* __SHARED_STATE_METHODS_H */
 

Index: sys-create.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-create.sm,v
diff -p -u -r1.45 -r1.46
--- sys-create.sm	27 May 2004 22:11:47 -0000	1.45
+++ sys-create.sm	8 Jul 2004 16:17:06 -0000	1.46
@@ -406,13 +406,11 @@ static int create_datafiles_comp_fn(void
     }
 
     /* allocate memory for the data handles if we haven't already */
-    if (index == 0)
+    if (sm_p->u.create.datafile_handles == NULL)
     {
-        if (sm_p->u.create.datafile_handles == NULL)
-        {
-            sm_p->u.create.datafile_handles = (PVFS_handle *)malloc(
-                sm_p->u.create.num_data_files * sizeof(PVFS_handle));
-        }
+        sm_p->u.create.datafile_handles = (PVFS_handle *)malloc(
+            sm_p->u.create.num_data_files * sizeof(PVFS_handle));
+
         if (sm_p->u.create.datafile_handles == NULL)
         {
             gossip_err("create: Failed to allocate data handle array\n");
@@ -421,8 +419,6 @@ static int create_datafiles_comp_fn(void
         memset(sm_p->u.create.datafile_handles, 0,
                sm_p->u.create.num_data_files * sizeof(PVFS_handle));
     }
-
-    assert(sm_p->u.create.datafile_handles);
 
     /* otherwise, just stash the newly created data file handle */
     sm_p->u.create.datafile_handles[index] = resp_p->u.create.handle;

Index: sys-getattr.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-getattr.sm,v
diff -p -u -r1.57 -r1.58
--- sys-getattr.sm	21 May 2004 17:22:38 -0000	1.57
+++ sys-getattr.sm	8 Jul 2004 16:17:06 -0000	1.58
@@ -3,31 +3,6 @@
  *
  * See COPYING in top-level directory.
  */
-
-/* pvfs2_client_getattr_sm
- *
- * This state machine implements the getattr system interface function.
- *
- * The input parameters are held in sm_p->u.getattr.
- *
- * The sm_p->msgpair structure is used to get the attributes of the
- * object itself.  We convert the original attribute mask (in
- * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
- * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows us
- * to obtain this information (if the object turns out to be a metafile)
- * so that we can later look up the datafile sizes and calculate the overall
- * file size.
- *
- * The datafile handles and distribution information are stored in
- * sm_p->u.getattr also, if the object turns out to be a metafile and the 
- * caller asked for the size.  These need to be freed once we are done with
- * them.
- *
- * The sm_p->msgpairarray is used to get datafile sizes, if it turns out that
- * we need them.  This space will also need to be freed, if we grab these
- * sizes.
- */
-
 #include <string.h>
 #include <assert.h>
 
@@ -43,6 +18,21 @@
 #include "pint-bucket.h"
 #include "PINT-reqproto-encode.h"
 
+/* pvfs2_client_getattr_sm
+ *
+ * The sm_p->msgpair structure is used to get the attributes of the
+ * object itself.  We convert the original attribute mask (in
+ * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
+ * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows
+ * us to obtain this information (if the object turns out to be a
+ * metafile) so that we can later look up the datafile sizes and
+ * calculate the overall file size.
+ *
+ * The sm_p->msgpairarray is used to get datafile sizes, if it turns
+ * out that we need them.  This space will also need to be freed, if
+ * we grab these sizes.
+ */
+
 extern job_context_id pint_client_sm_context;
 
 enum
@@ -71,59 +61,60 @@ static int getattr_datafile_getattr_comp
 
 %%
 
-machine pvfs2_client_getattr_sm(object_getattr_setup_msgpair,
-				object_getattr_xfer_msgpair,
-				object_getattr_failure,
-				datafile_getattr_setup_msgpairarray,
-				datafile_getattr_xfer_msgpairarray,
-				datafile_getattr_failure,
-				cleanup)
+machine pvfs2_client_getattr_sm(
+    object_getattr_setup_msgpair,
+    object_getattr_xfer_msgpair,
+    object_getattr_failure,
+    datafile_getattr_setup_msgpairarray,
+    datafile_getattr_xfer_msgpairarray,
+    datafile_getattr_failure,
+    cleanup)
 {
     state object_getattr_setup_msgpair
     {
-	run getattr_object_getattr_setup_msgpair;
-	success => object_getattr_xfer_msgpair;
-	default => cleanup;
+        run getattr_object_getattr_setup_msgpair;
+        success => object_getattr_xfer_msgpair;
+        default => cleanup;
     }
 
     state object_getattr_xfer_msgpair
     {
-	jump pvfs2_client_getattr_acache_sm;
-	success => cleanup;
-	GETATTR_NEED_DATAFILE_SIZES => datafile_getattr_setup_msgpairarray;
-	default => object_getattr_failure;
+        jump pvfs2_client_getattr_acache_sm;
+        success => cleanup;
+        GETATTR_NEED_DATAFILE_SIZES => datafile_getattr_setup_msgpairarray;
+        default => object_getattr_failure;
     }
 
     state object_getattr_failure
     {
-	run getattr_object_getattr_failure;
-	default => cleanup;
+        run getattr_object_getattr_failure;
+        default => cleanup;
     }
 
     state datafile_getattr_setup_msgpairarray
     {
-	run getattr_datafile_getattr_setup_msgpairarray;
-	success => datafile_getattr_xfer_msgpairarray;
-	default => cleanup;
+        run getattr_datafile_getattr_setup_msgpairarray;
+        success => datafile_getattr_xfer_msgpairarray;
+        default => cleanup;
     }
 
     state datafile_getattr_xfer_msgpairarray
     {
-	jump pvfs2_client_msgpairarray_sm;
-	success => cleanup;
-	default => datafile_getattr_failure;
+        jump pvfs2_client_msgpairarray_sm;
+        success => cleanup;
+        default => datafile_getattr_failure;
     }
 
     state datafile_getattr_failure
     {
-	run getattr_datafile_getattr_failure;
-	default => cleanup;
+        run getattr_datafile_getattr_failure;
+        default => cleanup;
     }
 
     state cleanup
     {
-	run getattr_cleanup;
-	default => terminate;
+        run getattr_cleanup;
+        default => terminate;
     }
 }
 
@@ -207,13 +198,8 @@ int PVFS_sys_getattr(
 
 /*******************************************************************/
 
-/* getattr_object_getattr_setup_msgpair()
- *
- * Fills in sm_p->msgpair to perform a getattr using the getattr_acache
- * state machine.
- */
 static int getattr_object_getattr_setup_msgpair(PINT_client_sm *sm_p,
-						job_status_s *js_p)
+                                                job_status_s *js_p)
 {
     int ret = -PVFS_EINVAL;
     uint32_t attrmask;
@@ -231,19 +217,19 @@ static int getattr_object_getattr_setup_
     attrmask = sm_p->u.getattr.attrmask;
     if (attrmask & PVFS_ATTR_SYS_SIZE)
     {
-	/* need datafile handles and distribution in order to get 
-	 * datafile handles and know what function to call to get
-	 * the file size.
-	 */
-	attrmask &= ~PVFS_ATTR_SYS_SIZE;
+        /* need datafile handles and distribution in order to get 
+         * datafile handles and know what function to call to get
+         * the file size.
+         */
+        attrmask &= ~PVFS_ATTR_SYS_SIZE;
 
-	attrmask |= (PVFS_ATTR_META_DFILES | PVFS_ATTR_META_DIST);
+        attrmask |= (PVFS_ATTR_META_DFILES | PVFS_ATTR_META_DIST);
         attrmask |= PVFS_ATTR_DATA_SIZE;
     }
 
     if (attrmask & PVFS_ATTR_SYS_DFILE_COUNT)
     {
-	attrmask |= PVFS_ATTR_META_DFILES;
+        attrmask |= PVFS_ATTR_META_DFILES;
     }
 
     if (attrmask & PVFS_ATTR_SYS_LNK_TARGET)
@@ -256,13 +242,13 @@ static int getattr_object_getattr_setup_
                  "attrmask being passed to server: ");
     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG, attrmask);
 
-    PINT_SERVREQ_GETATTR_FILL(sm_p->msgpair.req,
-			      *sm_p->cred_p,
-			      sm_p->u.getattr.object_ref.fs_id,
-			      sm_p->u.getattr.object_ref.handle,
-			      attrmask);
+    PINT_SERVREQ_GETATTR_FILL(
+        sm_p->msgpair.req,
+        *sm_p->cred_p,
+        sm_p->u.getattr.object_ref.fs_id,
+        sm_p->u.getattr.object_ref.handle,
+        attrmask);
 
-    /* fill in msgpair structure components */
     sm_p->msgpair.fs_id   = sm_p->u.getattr.object_ref.fs_id;
     sm_p->msgpair.handle  = sm_p->u.getattr.object_ref.handle;
     sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
@@ -279,46 +265,31 @@ static int getattr_object_getattr_setup_
     return 1;
 }
 
-/* getattr_object_getattr_comp_fn()
- *
- * Called to copy data from getattr response into the
- * getattr-specific portion of the PINT_client_sm structure,
- * so we can use the data after returning to this state
- * machine.
- *
- * Return value is returned in job status, so it affects the
- * resulting state coming back from the nested state machine.
- *
- * Returns 0 for directory, GETATTR_NEED_DATAFILE_SIZES for a
- * metafile (when appropriate).  Returns 0 for symlink.
- * Other types die right now.
- */
-static int getattr_object_getattr_comp_fn(void *v_p,
-					  struct PVFS_server_resp *resp_p,
-					  int index)
+/*
+  copies data from getattr response into the user supplied sys_attr
+  structure.  returns 0 for directories and symlinks, and
+  GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
+*/
+static int getattr_object_getattr_comp_fn(
+    void *v_p,
+    struct PVFS_server_resp *resp_p,
+    int index)
 {
     int need_datafiles = 0;
     PVFS_object_attr *attr = NULL;
-
-    /* this is a little kludge to get around some struct definition
-     * issues in the headers.  maybe fix later?
-     */
     PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
 
-    assert(resp_p->op == PVFS_SERV_GETATTR); /* sanity check */
+    assert(resp_p->op == PVFS_SERV_GETATTR);
 
-    /* if we get an error, just return immediately, don't try to
-     * actually fill anything in.
-     */
     if (resp_p->status != 0)
     {
-	return resp_p->status;
+        return resp_p->status;
     }
 
     /*
-      if we got a metafile that didn't get a cache hit *and* we
-      need the size, that should be only time we're going to have
-      to do a full data file fetch.  (that's expensive)
+      if we got a metafile that didn't get a cache hit *and* we need
+      the size, that should be only time we're going to have to do a
+      full data file fetch.  (that's expensive)
     */
     if ((resp_p->u.getattr.attr.objtype == PVFS_TYPE_METAFILE) &&
         (!sm_p->acache_hit) &&
@@ -328,9 +299,9 @@ static int getattr_object_getattr_comp_f
     }
 
     /*
-      if we didn't get a cache hit, we're making a
-      copy of the attributes here so that we can add
-      a acache entry later in cleanup.
+      if we didn't get a cache hit, we're making a copy of the
+      attributes here so that we can add a acache entry later in
+      cleanup.
     */
     if (!sm_p->acache_hit)
     {
@@ -338,10 +309,6 @@ static int getattr_object_getattr_comp_f
             &sm_p->acache_attr, &resp_p->u.getattr.attr);
     }
 
-    /*
-      if we got a cache hit, use those attributes,
-      otherwise use the real server replied attrs
-    */
     attr = (sm_p->acache_hit ?
             &sm_p->pinode->attr :
             &resp_p->u.getattr.attr);
@@ -372,16 +339,11 @@ static int getattr_object_getattr_comp_f
     if ((attr->objtype == PVFS_TYPE_METAFILE) &&
         (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_DFILE_COUNT))
     {
-	sm_p->u.getattr.getattr_resp_p->attr.dfile_count = 
-	    attr->u.meta.dfile_count;
+        sm_p->u.getattr.getattr_resp_p->attr.dfile_count = 
+            attr->u.meta.dfile_count;
     }
 
-    /* transform the attributes from the response
-     *
-     * Note: the structure in sm_p is a PVFS_sys_attr, while the data
-     * returned is in a PVFS_object_attr.  So some translation is
-     * necessary.
-     */
+    /* copy outgoing sys_attr fields from returned object_attr */
     sm_p->u.getattr.getattr_resp_p->attr.owner = attr->owner;
     sm_p->u.getattr.getattr_resp_p->attr.group = attr->group;
     sm_p->u.getattr.getattr_resp_p->attr.perms = attr->perms;
@@ -400,55 +362,24 @@ static int getattr_object_getattr_comp_f
 
     switch (attr->objtype)
     {
-	case PVFS_TYPE_METAFILE:
-	    if (sm_p->msgpair.req.u.getattr.attrmask & PVFS_ATTR_META_DIST)
+        case PVFS_TYPE_METAFILE:
+            if (sm_p->msgpair.req.u.getattr.attrmask &
+                PVFS_ATTR_META_DIST)
             {
-		/* sanity checks */
-		assert(attr->mask & PVFS_ATTR_META_DIST);
-		assert(attr->u.meta.dist_size > 0);
-		
-		gossip_debug(GOSSIP_GETATTR_DEBUG,
-                             "getattr_object_getattr_comp_fn: "
-                             "copying %d bytes of dist.\n",
-                             attr->u.meta.dist_size);
+                assert(attr->mask & PVFS_ATTR_META_DIST);
+                assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
+            }
 
-		/* here we make a copy of the distribution information. */
-		sm_p->u.getattr.dist_p = PINT_dist_copy(attr->u.meta.dist);
-		if (sm_p->u.getattr.dist_p == NULL) {
-		    assert(0);
-		    return -PVFS_ENOMEM;
-		}
-		sm_p->u.getattr.dist_size = attr->u.meta.dist_size;
-
-		/* nothing special about our return value here */
-	    }
-	    if (sm_p->msgpair.req.u.getattr.attrmask & PVFS_ATTR_META_DFILES)
+            if (sm_p->msgpair.req.u.getattr.attrmask &
+                PVFS_ATTR_META_DFILES)
             {
-		/* need to save datafile handles to calculate file size;
-		 * redirect us to those states.
-		 */
-		
-		/* sanity checks */
-		assert(attr->mask & PVFS_ATTR_META_DFILES);
-		assert(attr->u.meta.dfile_count > 0);
-		
-		gossip_debug(GOSSIP_GETATTR_DEBUG,
-			     "getattr_object_getattr_comp_fn: %d datafiles.\n",
-			     attr->u.meta.dfile_count);
-		
-		/* save the datafile handles prior to freeing up the
-		 * buffers we used for messages.  we could keep them around
-		 * i suppose, but we're not going to do that for now.  later
-		 * it is likely that this stuff will be stuck in the acache
-		 * anyway, so we'll be able to just reference it from there.
-		 */
-		sm_p->u.getattr.datafile_handles = (PVFS_handle *)
-                    malloc(attr->u.meta.dfile_count * sizeof(PVFS_handle));
-                assert(sm_p->u.getattr.datafile_handles);
-		sm_p->u.getattr.datafile_count = attr->u.meta.dfile_count;
-		memcpy(sm_p->u.getattr.datafile_handles,
-		   attr->u.meta.dfile_array,
-		   attr->u.meta.dfile_count * sizeof(PVFS_handle));
+                assert(attr->mask & PVFS_ATTR_META_DFILES);
+                assert(attr->u.meta.dfile_array &&
+                       (attr->u.meta.dfile_count > 0));
+
+                gossip_debug(GOSSIP_GETATTR_DEBUG,
+                             "getattr_object_getattr_comp_fn: "
+                             "%d datafiles.\n", attr->u.meta.dfile_count);
 
                 if (need_datafiles == 0)
                 {
@@ -458,39 +389,34 @@ static int getattr_object_getattr_comp_f
                     */
                     return 0;
                 }
-		else if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
+                else if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
                 {
-		    /* if caller asked for the size, AND we couldn't
+                    /* if caller asked for the size, AND we couldn't
                      * retrieve them from the acache, then we need to
-                     * go get the datafile sizes
-		     */
-		    return GETATTR_NEED_DATAFILE_SIZES;
-		}
-	    }
-	    else return 0;
-	    break;
-	case PVFS_TYPE_DIRECTORY:
-	    return 0;
-	    break;
-	case PVFS_TYPE_SYMLINK:
+                     * retrieve the datafile sizes
+                     */
+                    return GETATTR_NEED_DATAFILE_SIZES;
+                }
+            }
+            return 0;
+        case PVFS_TYPE_DIRECTORY:
+            return 0;
+        case PVFS_TYPE_SYMLINK:
             return 0;
-            break;
-	case PVFS_TYPE_DATAFILE:
-	    /* fall through */
-	case PVFS_TYPE_DIRDATA:
-	    /* fall through */
-	default:
-	    gossip_err("error: getattr_object_getattr_comp_fn: "
+        case PVFS_TYPE_DATAFILE:
+            /* fall through */
+        case PVFS_TYPE_DIRDATA:
+            /* fall through */
+        default:
+            gossip_err("error: getattr_object_getattr_comp_fn: "
                        "handle refers to invalid object type\n");
-	    return -PVFS_EINVAL;
     }
-
-    return -PVFS_EINVAL; /* should not get here */
+    return -PVFS_EINVAL;
 }
 
 
 static int getattr_object_getattr_failure(PINT_client_sm *sm_p,
-					  job_status_s *js_p)
+                                          job_status_s *js_p)
 {
     gossip_debug(
         GOSSIP_CLIENT_DEBUG,
@@ -499,109 +425,114 @@ static int getattr_object_getattr_failur
     if ((js_p->error_code != -PVFS_ENOENT) &&
         (js_p->error_code != -PVFS_EINVAL))
     {
-        gossip_err("getattr: Failed with unexpected error code %d\n",
-                   js_p->error_code);
+        PVFS_perror_gossip("getattr_object_getattr_failure ",
+                           js_p->error_code);
     }
     return 1;
 }
 
-static int getattr_datafile_getattr_setup_msgpairarray(PINT_client_sm *sm_p,
-						       job_status_s *js_p)
+static int getattr_datafile_getattr_setup_msgpairarray(
+    PINT_client_sm *sm_p, job_status_s *js_p)
 {
-    int i = 0;
-    int ret = -PVFS_EINVAL;
+    int ret = -PVFS_EINVAL, i = 0;
+    PVFS_object_attr *attr = NULL;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) getattr state: "
                  "getattr_datafile_getattr_setup_msgpairarray\n", sm_p);
 
     js_p->error_code = 0;
-    
-    sm_p->msgarray = (PINT_client_sm_msgpair_state *)
-	malloc(sm_p->u.getattr.datafile_count *
-               sizeof(PINT_client_sm_msgpair_state));
-    assert(sm_p->msgarray);
-
-    sm_p->u.getattr.size_array = (PVFS_size *)
-	malloc(sm_p->u.getattr.datafile_count * sizeof(PVFS_size));
-    assert(sm_p->u.getattr.size_array);
 
-    sm_p->msgarray_count = sm_p->u.getattr.datafile_count;
+    attr = (sm_p->acache_hit ?
+            &sm_p->pinode->attr :
+            &sm_p->acache_attr);
+    assert(attr);
+
+    sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
+        attr->u.meta.dfile_count * sizeof(PINT_client_sm_msgpair_state));
+    if (!sm_p->msgarray)
+    {
+        js_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+
+    sm_p->u.getattr.size_array = (PVFS_size *)malloc(
+        attr->u.meta.dfile_count * sizeof(PVFS_size));
+    if (!sm_p->u.getattr.size_array)
+    {
+        free(sm_p->msgarray);
+        sm_p->msgarray = NULL;
+
+        js_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+
+    sm_p->msgarray_count = attr->u.meta.dfile_count;
 
     /* for each datafile, post a send/recv pair to obtain the size */
     for (i=0; i < sm_p->msgarray_count; i++)
     {
-	PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+        PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+        assert(msg_p);
+
+        gossip_debug(GOSSIP_GETATTR_DEBUG,
+                     "  datafile_getattr: getting size for handle %Lu\n", 
+                     Lu(attr->u.meta.dfile_array[i]));
+
+        PINT_SERVREQ_GETATTR_FILL(
+            msg_p->req,
+            *sm_p->cred_p,
+            sm_p->u.getattr.object_ref.fs_id,
+            attr->u.meta.dfile_array[i],
+            PVFS_ATTR_DATA_SIZE);
 
-	gossip_debug(GOSSIP_GETATTR_DEBUG,
-		     "  datafile_getattr: getting size for handle %Lu\n", 
-		     Lu(sm_p->u.getattr.datafile_handles[i]));
-
-	/* fill in getattr request; all we care about is the size */
-	PINT_SERVREQ_GETATTR_FILL(msg_p->req,
-				  *sm_p->cred_p,
-				  sm_p->u.getattr.object_ref.fs_id,
-				  sm_p->u.getattr.datafile_handles[i],
-				  PVFS_ATTR_DATA_SIZE);
-	
-	/* fill in msgpair structure components */
-	msg_p->fs_id = sm_p->u.getattr.object_ref.fs_id;
-	msg_p->handle = sm_p->u.getattr.datafile_handles[i];
+        msg_p->fs_id = sm_p->u.getattr.object_ref.fs_id;
+        msg_p->handle = attr->u.meta.dfile_array[i];
         msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
-	msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
+        msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
     }
 
-    /* fill in address of each server to contact */
     ret = PINT_serv_msgpairarray_resolve_addrs(
         sm_p->msgarray_count, sm_p->msgarray);
     if (ret < 0)
     {
-	gossip_lerr("Error: failed to resolve meta server addresses.\n");
-	js_p->error_code = ret;
+        gossip_lerr("Error: failed to resolve meta server addresses.\n");
+        js_p->error_code = ret;
     }
     return 1;
 }
 
 static int getattr_datafile_getattr_comp_fn(
-    void *v_p,
-    struct PVFS_server_resp *resp_p,
-    int index)
+    void *v_p, struct PVFS_server_resp *resp_p, int index)
 {
-    PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
+    PINT_client_sm *sm_p = (PINT_client_sm *)v_p;
 
     if (resp_p->status != 0)
     {
-	return resp_p->status;
+        return resp_p->status;
     }
 
-    /* sanity checks:
-     * - this is a getattr response
-     * - the attributes indicate that this was indeed a datafile
-     * - the datafile has a positive size
-     */
     assert(resp_p->op == PVFS_SERV_GETATTR);
-    assert(resp_p->u.getattr.attr.objtype == PVFS_TYPE_DATAFILE);
-    assert(resp_p->u.getattr.attr.u.data.size >= 0);
 
     gossip_debug(GOSSIP_GETATTR_DEBUG,
-		 "datafile_getattr: size of datafile %d is %Ld\n",
-		 index, Ld(resp_p->u.getattr.attr.u.data.size));
+                 "datafile_getattr: size of datafile %d is %Ld\n",
+                 index, Ld(resp_p->u.getattr.attr.u.data.size));
 
-    sm_p->u.getattr.size_array[index] = resp_p->u.getattr.attr.u.data.size;
+    sm_p->u.getattr.size_array[index] =
+        resp_p->u.getattr.attr.u.data.size;
 
     return 0;
 }
 
 static int getattr_datafile_getattr_failure(PINT_client_sm *sm_p,
-					    job_status_s *js_p)
+                                            job_status_s *js_p)
 {
-    gossip_debug(GOSSIP_CLIENT_DEBUG,
-		 "(%p) getattr state: getattr_datafile_getattr_failure\n",
-		 sm_p);
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) getattr state: "
+                 "getattr_datafile_getattr_failure\n", sm_p);
     return 1;
 }
 
 static int getattr_cleanup(PINT_client_sm *sm_p,
-			   job_status_s *js_p)
+                           job_status_s *js_p)
 {
     int ret = 0;
     uint32_t amask;
@@ -614,32 +545,36 @@ static int getattr_cleanup(PINT_client_s
 
     if (js_p->error_code == 0)
     {
-	/* calculate size of file if necessary */
-	assert(s_attr_p->size == 0);
+        /* calculate size of file if necessary */
+        assert(s_attr_p->size == 0);
 
-	amask = s_attr_p->mask; /* from object getattr */
+        amask = s_attr_p->mask; /* from object getattr */
 
         if (amask & PVFS_ATTR_META_DIST)
         {
             /*
-              recompute size if it's not a acache hit, or if it is
-              a acache hit on an entry that no longer has a valid size
+              recompute size if it's not a acache hit, or if it is a
+              acache hit on an entry that no longer has a valid size
             */
             if (!sm_p->acache_hit ||
                 (sm_p->acache_hit &&
                  !(sm_p->pinode->attr.mask & PVFS_ATTR_DATA_SIZE)))
             {
-                ret = PINT_dist_lookup(sm_p->u.getattr.dist_p);
-                if (ret < 0)
-                {
-                    assert(0);
-                }
-
-                s_attr_p->size =
-                    (sm_p->u.getattr.dist_p->methods->logical_file_size)
-                    (sm_p->u.getattr.dist_p->params,
-                     sm_p->u.getattr.datafile_count,
-                     sm_p->u.getattr.size_array);
+                PINT_dist *dist = NULL;
+                PVFS_object_attr *attr = (sm_p->acache_hit ?
+                                          &sm_p->pinode->attr :
+                                          &sm_p->acache_attr);
+                assert(attr && attr->u.meta.dist);
+
+                dist = attr->u.meta.dist;
+                assert(dist->methods);
+
+                ret = PINT_dist_lookup(dist);
+                assert(ret == 0);
+
+                s_attr_p->size = (dist->methods->logical_file_size)(
+                    dist->params, attr->u.meta.dfile_count,
+                    sm_p->u.getattr.size_array);
             }
             else
             {
@@ -647,58 +582,33 @@ static int getattr_cleanup(PINT_client_s
             }
         }
 
-	/* redo the mask */
-	s_attr_p->mask = PVFS_util_object_to_sys_attr_mask(s_attr_p->mask);
+        /* convert outgoing attribute mask based on what we got */
+        s_attr_p->mask = PVFS_util_object_to_sys_attr_mask(
+            s_attr_p->mask);
 
-	/* we know we grabbed the size if they asked for it */
-	if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
+        /* and add the size if they asked for it */
+        if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
         {
-	    s_attr_p->mask |= PVFS_ATTR_SYS_SIZE;
-	}
+            s_attr_p->mask |= PVFS_ATTR_SYS_SIZE;
+        }
 
-        /* if this is a symlink, we know we have the target */
+        /* if this is a symlink, add the link target */
         if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_LNK_TARGET)
         {
             s_attr_p->mask |= PVFS_ATTR_SYS_LNK_TARGET;
         }
 
-	/* free all the memory we allocated, which could include:
-	 * - size array
-	 * - msgpair array
-	 * - datafile handle array (if not from acache)
-	 * - distribution description (if not from acache)
-	 */
-	if (sm_p->u.getattr.size_array != NULL)
+        if (sm_p->u.getattr.size_array)
         {
-	    free(sm_p->u.getattr.size_array);
-	}
-
-	if (sm_p->msgarray != NULL && (sm_p->msgarray != &(sm_p->msgpair)))
-        {
-	    free(sm_p->msgarray);
-	}
-        sm_p->msgarray = NULL;
-        sm_p->msgarray_count = 0;
+            free(sm_p->u.getattr.size_array);
+            sm_p->u.getattr.size_array = NULL;
+        }
 
-        /*
-          only free dist and dfile memory if we didn't get a
-          cache hit;  if we got a acache hit, we're referencing
-          memory inside the acache entries, so no allocations we
-          made
-        */
-        if (!sm_p->acache_hit)
+        if (sm_p->msgarray && (sm_p->msgarray != &(sm_p->msgpair)))
         {
-            if ((amask & PVFS_ATTR_META_DFILES) &&
-                (sm_p->u.getattr.datafile_handles != NULL))
-            {
-                free(sm_p->u.getattr.datafile_handles);
-            }
-            if ((amask & PVFS_ATTR_META_DIST) &&
-                (sm_p->u.getattr.dist_p != NULL))
-            {
-                PINT_dist_free(sm_p->u.getattr.dist_p);
-            }
+            free(sm_p->msgarray);
         }
+        sm_p->msgarray = NULL;
 
         /* add entry to acache if not already present */
         if (!sm_p->acache_hit)
@@ -712,6 +622,10 @@ static int getattr_cleanup(PINT_client_s
                 assert(pinode);
                 release_required = 0;
             }
+
+            gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
+                         "reference to acache (%s present)\n",
+                         (release_required ? "already" : "not"));
             pinode->refn = sm_p->u.getattr.object_ref;
 
             if (sm_p->acache_attr.objtype == PVFS_TYPE_SYMLINK)
@@ -726,7 +640,6 @@ static int getattr_cleanup(PINT_client_s
                   not apply at all to symlinks, so special case
                   symlinks above
                 */
-
                 pinode->size = s_attr_p->size;
                 /*
                   update the attr mask in the src object so that
@@ -753,19 +666,16 @@ static int getattr_cleanup(PINT_client_s
             {
                 PINT_acache_release(pinode);
             }
-            sm_p->acache_hit = 0;
         }
     }
     else
     {
         /* in case of failure, blank out response */ 
-	memset(sm_p->u.getattr.getattr_resp_p,
+        memset(sm_p->u.getattr.getattr_resp_p,
                0, sizeof(PVFS_sysresp_getattr));
     }
 
-    /* mark operation as complete */
     sm_p->op_complete = 1;
-
     return 0;
 }
 

Index: sys-io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-io.sm,v
diff -p -u -r1.77 -r1.78
--- sys-io.sm	22 Jun 2004 20:40:38 -0000	1.77
+++ sys-io.sm	8 Jul 2004 16:17:06 -0000	1.78
@@ -3,7 +3,6 @@
  *
  * See COPYING in top-level directory.
  */
-
 #include <string.h>
 #include <assert.h>
 
@@ -23,9 +22,9 @@ extern job_context_id pint_client_sm_con
 
 enum
 {
-    IO_NO_DATA = 1,
-    IO_DATAFILE_TRANSFERS_COMPLETE = 2,
-    IO_RETRY = 3
+    IO_NO_DATA = 132,
+    IO_DATAFILE_TRANSFERS_COMPLETE,
+    IO_RETRY
 };
 
 static int io_init(
@@ -36,57 +35,71 @@ static int io_datafile_setup_msgpairs(
     PINT_client_sm *sm_p, job_status_s *js_p);
 static int io_datafile_post_msgpairs(
     PINT_client_sm *sm_p, job_status_s *js_p);
-static int io_datafile_complete_msgpairs(
+static int io_datafile_complete_operations(
     PINT_client_sm *sm_p, job_status_s *js_p);
 static int io_analyze_results(
     PINT_client_sm *sm_p, job_status_s *js_p);
+static int io_cleanup(
+    PINT_client_sm *sm_p, job_status_s *js_p);
 
-/* helper functions */
+/* misc helper functions */
+static inline int complete_context_send_or_recv(
+    PINT_client_sm *sm_p, job_status_s *js_p);
+static inline int process_context_recv(
+    PINT_client_io_ctx *cur_ctx,
+    struct PINT_decoded_msg *decoded_resp,
+    struct PVFS_server_resp **resp);
+static inline int build_context_flow(
+    PINT_client_sm *sm_p, PINT_client_io_ctx *cur_ctx,
+    PVFS_object_attr *attr, struct PVFS_server_resp *resp);
+static inline int process_context_recv_and_post_flow(
+    PINT_client_sm *sm_p, job_status_s *js_p);
+static inline int check_context_status(
+    PINT_client_io_ctx *cur_ctx, int io_type,
+    PVFS_size *total_size);
 static int io_find_target_datafiles(
-    PVFS_Request mem_req,
-    PVFS_Request file_req,
-    PVFS_offset file_req_offset, 
-    PINT_dist *dist_p,
-    PVFS_handle *input_handle_array,
-    int input_handle_count,
-    PVFS_handle *output_handle_array, 
-    int *handle_index_array,
-    int *handle_count_out_p);
-
-#define CLEAN_PRIVATE_MEMBERS(iosm_p)       \
-do {                                        \
-    if (iosm_p->datafile_index_array)       \
-    {                                       \
-        free(iosm_p->datafile_index_array); \
-        iosm_p->datafile_index_array = NULL;\
-    }                                       \
-    if (sm_p->msgarray &&                   \
-        (sm_p->msgarray != &sm_p->msgpair)) \
-    {                                       \
-        free(sm_p->msgarray);               \
-        sm_p->msgarray = NULL;              \
-        sm_p->msgarray_count = 0;           \
-    }                                       \
-    if (iosm_p->flow_array)                 \
-    {                                       \
-        free(iosm_p->flow_array);           \
-        iosm_p->flow_array = NULL;          \
-    }                                       \
-    if (iosm_p->flow_status_array)          \
-    {                                       \
-        free(iosm_p->flow_status_array);    \
-        iosm_p->flow_status_array = NULL;   \
-    }                                       \
-    if (iosm_p->session_tag_array)          \
-    {                                       \
-        free(iosm_p->session_tag_array);    \
-        iosm_p->session_tag_array = NULL;   \
-    }                                       \
-    if (iosm_p->ackarray)                   \
-    {                                       \
-        free(iosm_p->ackarray);             \
-        iosm_p->ackarray = NULL;            \
-    }                                       \
+    PVFS_Request mem_req, PVFS_Request file_req,
+    PVFS_offset file_req_offset, PINT_dist *dist_p,
+    PVFS_handle *input_handle_array, int input_handle_count,
+    int *handle_index_array, int *handle_index_out_count);
+
+/* misc constants and helper macros */
+#define IO_RECV_COMPLETED                                   1
+
+/* possible I/O state machine phases (status_user_tag) */
+#define IO_SM_PHASE_REQ_MSGPAIR_RECV                        0
+#define IO_SM_PHASE_REQ_MSGPAIR_SEND                        1
+#define IO_SM_PHASE_FLOW                                    2
+#define IO_SM_PHASE_FINAL_ACK                               3
+#define IO_SM_NUM_PHASES                                    4
+
+#define STATUS_USER_TAG_TYPE(tag, type)                     \
+((tag % IO_SM_NUM_PHASES) == type)
+#define STATUS_USER_TAG_GET_INDEX(tag, type)                \
+(tag / IO_SM_NUM_PHASES)
+#define STATUS_USER_TAG_IS_SEND_OR_RECV(tag)                \
+(STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) || \
+ STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
+
+#define CLEAN_PRIVATE_MEMBERS(sm_p)                          \
+do {                                                         \
+    if (sm_p->u.io.datafile_index_array)                     \
+    {                                                        \
+        free(sm_p->u.io.datafile_index_array);               \
+        sm_p->u.io.datafile_index_array = NULL;              \
+    }                                                        \
+    if (sm_p->msgarray &&                                    \
+        (sm_p->msgarray != &sm_p->msgpair))                  \
+    {                                                        \
+        free(sm_p->msgarray);                                \
+        sm_p->msgarray = NULL;                               \
+        sm_p->msgarray_count = 0;                            \
+    }                                                        \
+    if (sm_p->u.io.contexts)                                 \
+    {                                                        \
+        free(sm_p->u.io.contexts);                           \
+        sm_p->u.io.contexts = NULL;                          \
+    }                                                        \
 } while(0)
 
 %%
@@ -98,8 +111,9 @@ machine pvfs2_client_io_sm(
     io_getattr_failure,
     io_datafile_setup_msgpairs,
     io_datafile_post_msgpairs,
-    io_datafile_complete_msgpairs,
-    io_analyze_results)
+    io_datafile_complete_operations,
+    io_analyze_results,
+    io_cleanup)
 {
     state init
     {
@@ -124,34 +138,40 @@ machine pvfs2_client_io_sm(
     state io_getattr_failure
     {
         run io_object_getattr_failure;
-        default => io_analyze_results;
+        default => io_cleanup;
     }
 
     state io_datafile_setup_msgpairs
     {
         run io_datafile_setup_msgpairs;
+        IO_NO_DATA => io_cleanup;
         success => io_datafile_post_msgpairs;
-        default => io_analyze_results;
+        default => io_cleanup;
     }
 
     state io_datafile_post_msgpairs
     {
         run io_datafile_post_msgpairs;
-        success => io_datafile_complete_msgpairs;
-        default => io_analyze_results;
+        default => io_datafile_complete_operations;
     }
 
-    state io_datafile_complete_msgpairs
+    state io_datafile_complete_operations
     {
-        run io_datafile_complete_msgpairs;
+        run io_datafile_complete_operations;
         IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
-        default => io_datafile_complete_msgpairs;
+        default => io_datafile_complete_operations;
     }
 
     state io_analyze_results
     {
         run io_analyze_results;
         IO_RETRY => init;
+        default => io_cleanup;
+    }
+
+    state io_cleanup
+    {
+        run io_cleanup;
         default => terminate;
     }
 }
@@ -227,17 +247,9 @@ int PVFS_isys_io(
     sm_p->u.io.encoding = cur_fs->encoding;
     sm_p->u.io.stored_error_code = 0;
     sm_p->u.io.retry_count = 0;
-    sm_p->u.io.datafile_handles = NULL;
-    sm_p->u.io.datafile_index_array = NULL;
     sm_p->msgarray = NULL;
-    sm_p->u.io.dist_p = NULL;
-    sm_p->u.io.dist_size = 0;
-    sm_p->u.io.datafile_handles = NULL;
+    sm_p->u.io.datafile_index_array = NULL;
     sm_p->u.io.datafile_count = 0;
-    sm_p->u.io.flow_array = NULL;
-    sm_p->u.io.flow_status_array = NULL;
-    sm_p->u.io.session_tag_array = NULL;
-    sm_p->u.io.ackarray = NULL;
 
     return PINT_client_state_machine_post(
         sm_p, PVFS_SYS_IO, op_id, user_ptr);
@@ -287,7 +299,7 @@ static int io_init(PINT_client_sm *sm_p,
 {
     job_id_t tmp_id;
 
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "io state: init\n");
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
 
     assert((js_p->error_code == 0) ||
            (js_p->error_code == IO_RETRY));
@@ -296,6 +308,12 @@ static int io_init(PINT_client_sm *sm_p,
     {
         js_p->error_code = 0;
 
+        if (sm_p->op_cancelled)
+        {
+            js_p->error_code = -PVFS_ECANCEL;
+            return(1);
+        }
+
         return job_req_sched_post_timer(
             PVFS2_CLIENT_RETRY_DELAY, sm_p, 0, js_p, &tmp_id,
             pint_client_sm_context);
@@ -303,34 +321,46 @@ static int io_init(PINT_client_sm *sm_p,
     return 1;
 }
 
+static int io_object_getattr_failure(PINT_client_sm *sm_p,
+                                     job_status_s *js_p)
+{
+    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
+                 "io_object_getattr_failure\n", sm_p);
+
+    if (sm_p->op_cancelled)
+    {
+        js_p->error_code = -PVFS_ECANCEL;
+    }
+
+    /*
+      NOTE: this can happen if we're doing I/O on a file that was
+      removed by another process
+    */
+    if (js_p->error_code == 0)
+    {
+        js_p->error_code = -PVFS_ENOENT;
+    }
+
+    sm_p->u.io.stored_error_code = js_p->error_code;
+    return 1;
+}
 
-/* io_datafile_setup_msgpairs()
- *
- * Sets up msgpairs to send I/O requests to servers holding datafiles
- * that (might) have data for us (unless we hit EOF).
- *
- * This function swaps the original datafile_handles array for a new
- * array that just has datafiles that we think will have our data.  It
- * updates datafile_count appropriately as well.
- *
- * We use sm_p->msgarray for this purpose.
- *
- * NOTE: we could combine this with the post_msgpairs state, but this one
- *       has gotten pretty big already, so let's not.
- *
- */
 static int io_datafile_setup_msgpairs(PINT_client_sm *sm_p,
                                       job_status_s *js_p)
 {
     int ret = -PVFS_EINVAL, i = 0;
-    int target_datafile_count = 0;
     PVFS_object_attr *attr = NULL;
-    PVFS_handle *target_datafile_array = NULL;
-    struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
+    int target_datafile_count = 0;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
                  "io_datafile_setup_msgpairs\n", sm_p);
 
+    if (sm_p->op_cancelled)
+    {
+        js_p->error_code = -PVFS_ECANCEL;
+        return 1;
+    }
+
     js_p->error_code = 0;
 
     attr = (sm_p->acache_hit ?
@@ -352,83 +382,47 @@ static int io_datafile_setup_msgpairs(PI
             js_p->error_code = -PVFS_EISDIR;
             return 1;
         default:
-	    gossip_err("Error: I/O attempted on invalid object type.\n");
-            js_p->error_code = -PVFS_EIO;
+            js_p->error_code = -PVFS_EBADF;
             return 1;
     }
 
-    /*
-      assign internal io ptrs for convenience here (without copying)
-      if unassigned
-    */
-    if (!sm_p->u.io.dist_p)
-    {
-        sm_p->u.io.dist_p = attr->u.meta.dist;
-    }
-    if (!sm_p->u.io.dist_size)
-    {
-        sm_p->u.io.dist_size = attr->u.meta.dist_size;
-    }
-    if (!sm_p->u.io.datafile_handles)
-    {
-        sm_p->u.io.datafile_handles = attr->u.meta.dfile_array;
-    }
-    if (!sm_p->u.io.datafile_count)
-    {
-        sm_p->u.io.datafile_count = attr->u.meta.dfile_count;
-    }
-
-    ret = PINT_dist_lookup(iosm_p->dist_p);
-    assert(ret == 0);
-
-    if (!target_datafile_array)
-    {
-        target_datafile_array = (PVFS_handle *)malloc(
-            (iosm_p->datafile_count * sizeof(PVFS_handle)));
-    }
-    if (!target_datafile_array)
+    ret = PINT_dist_lookup(attr->u.meta.dist);
+    if (ret)
     {
-        js_p->error_code = -PVFS_ENOMEM;
+        PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
+        js_p->error_code = -PVFS_EBADF;
         return 1;
     }
 
-    if (!iosm_p->datafile_index_array)
-    {
-        iosm_p->datafile_index_array = (int *)malloc(
-            (iosm_p->datafile_count * sizeof(int)));
-    }
-    if (!iosm_p->datafile_index_array)
+    sm_p->u.io.datafile_index_array = (int *)malloc(
+        (attr->u.meta.dfile_count * sizeof(int)));
+    if (!sm_p->u.io.datafile_index_array)
     {
         goto malloc_error_exit;
     }
+    memset(sm_p->u.io.datafile_index_array, 0,
+           (attr->u.meta.dfile_count * sizeof(int)));
 
     ret = io_find_target_datafiles(
-        iosm_p->mem_req,
-        iosm_p->file_req,
-        iosm_p->file_req_offset,
-        iosm_p->dist_p,
-        iosm_p->datafile_handles,
-        iosm_p->datafile_count,
-        target_datafile_array,
-        iosm_p->datafile_index_array,
+        sm_p->u.io.mem_req,
+        sm_p->u.io.file_req,
+        sm_p->u.io.file_req_offset,
+        attr->u.meta.dist,
+        attr->u.meta.dfile_array,
+        attr->u.meta.dfile_count,
+        sm_p->u.io.datafile_index_array,
         &target_datafile_count);
+
     assert(ret == 0);
 
     if (target_datafile_count == 0)
     {
-        if (target_datafile_array)
-        {
-            free(target_datafile_array);
-            target_datafile_array = NULL;
-        }
+        free(sm_p->u.io.datafile_index_array);
+        sm_p->u.io.datafile_index_array = NULL;
 
-        if (iosm_p->datafile_index_array)
-        {
-            free(iosm_p->datafile_index_array);
-            iosm_p->datafile_index_array = NULL;
-        }
+        gossip_debug(GOSSIP_IO_DEBUG, "  datafile_setup_msgpairs: no "
+                     "datafiles have data; aborting\n");
 
-        /* the no data case should be caught earlier than this */
         js_p->error_code = IO_NO_DATA;
         return 1;
     }
@@ -437,123 +431,74 @@ static int io_datafile_setup_msgpairs(PI
                  "  datafile_setup_msgpairs: %d datafiles "
                  "might have data\n", target_datafile_count);
 
-    /* setup msgpair array */
-    if (!sm_p->msgarray)
-    {
-        sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
-            (target_datafile_count *
-             sizeof(PINT_client_sm_msgpair_state)));
-    }
-    if (!sm_p->msgarray)
+    sm_p->u.io.contexts = (PINT_client_io_ctx *)malloc(
+        (target_datafile_count * sizeof(PINT_client_io_ctx)));
+    if (!sm_p->u.io.contexts)
     {
         goto malloc_error_exit;
     }
-    sm_p->msgarray_count = target_datafile_count;
+    memset(sm_p->u.io.contexts, 0,
+           (target_datafile_count * sizeof(PINT_client_io_ctx)));
 
-    /* setup flow descriptor array */
-    if (!iosm_p->flow_array)
-    {
-        iosm_p->flow_array = (flow_descriptor *)malloc(
-            (target_datafile_count * sizeof(flow_descriptor)));
-    }
-    if (!iosm_p->flow_array)
+    sm_p->msgarray_count = target_datafile_count;
+    sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
+        (sm_p->msgarray_count * sizeof(PINT_client_sm_msgpair_state)));
+    if (!sm_p->msgarray)
     {
         goto malloc_error_exit;
     }
-    iosm_p->flow_comp_ct = 0;
+    memset(sm_p->msgarray, 0, (sm_p->msgarray_count *
+                               sizeof(PINT_client_sm_msgpair_state)));
 
-    /* set each flow descriptor to initial values */
-    for(i = 0; i < target_datafile_count; i++)
-    {
-        PINT_flow_reset(&iosm_p->flow_array[i]);
-    }
+    sm_p->u.io.total_cancellations_remaining = 0;
 
-    /* setup flow status array */
-    if (!iosm_p->flow_status_array)
-    {
-        iosm_p->flow_status_array = (job_status_s *)malloc(
-            (target_datafile_count * sizeof(job_status_s)));
-    }
-    if (!iosm_p->flow_status_array)
-    {
-        goto malloc_error_exit;
-    }
-
-    /* setup session tag array */
-    if (!iosm_p->session_tag_array)
-    {
-        iosm_p->session_tag_array = (PVFS_msg_tag_t *)malloc(
-            (target_datafile_count * sizeof(PVFS_msg_tag_t)));
-    }
-    if (!iosm_p->session_tag_array)
-    {
-        goto malloc_error_exit;
-    }
-
-    /* setup space for final ack if we're doing a write */
-    if (iosm_p->io_type == PVFS_IO_WRITE)
+    /* initialize all per server I/O operation contexts and requests */
+    for(i = 0; i < target_datafile_count; i++)
     {
-        /* setup the write acknowledgement array */
-        if (!iosm_p->ackarray)
-        {
-            iosm_p->ackarray = (PINT_client_sm_recv_state *)malloc(
-                (target_datafile_count *
-                 sizeof(PINT_client_sm_recv_state)));
-        }
-        if (!iosm_p->ackarray)
-        {
-            goto malloc_error_exit;
-        }
-        memset(iosm_p->ackarray, 0,
-            target_datafile_count*sizeof(PINT_client_sm_recv_state));
-        iosm_p->ack_comp_ct = 0;
-    }
+        PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+        PINT_client_sm_msgpair_state *msg = &sm_p->msgarray[i];
 
-    /* fill in all the I/O requests */
-    for (i = 0; i < target_datafile_count; i++)
-    {
-        int orig_index = 0;
-        PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
-        assert(msg_p);
+        assert(msg && cur_ctx);
+        
+        memset(cur_ctx, 0, sizeof(PINT_client_io_ctx));
+        memset(msg, 0, sizeof(PINT_client_sm_msgpair_state));
+
+        gossip_debug(GOSSIP_IO_DEBUG, "initializing context[%d] %p\n",
+                     i, cur_ctx);
+
+        cur_ctx->msg = msg;
+        cur_ctx->index = i;
+        cur_ctx->server_nr = sm_p->u.io.datafile_index_array[i];
+        cur_ctx->data_handle =
+            attr->u.meta.dfile_array[cur_ctx->server_nr];
 
-        gossip_debug(GOSSIP_IO_DEBUG, "  sending I/O request "
-                     "for %Lu\n", Lu(target_datafile_array[i]));
+        PINT_flow_reset(&cur_ctx->flow_desc);
 
-        /* find the original index for the handle */
-        for (orig_index = 0; orig_index < iosm_p->datafile_count;
-             orig_index++)
-        {
-            if (target_datafile_array[i] ==
-                iosm_p->datafile_handles[orig_index])
-            {
-                break;
-            }
-        }
-        assert(orig_index < iosm_p->datafile_count);
+        gossip_debug(GOSSIP_IO_DEBUG, "  filling I/O request "
+                     "for %Lu\n", Lu(cur_ctx->data_handle));
 
         PINT_SERVREQ_IO_FILL(
-            msg_p->req,
+            msg->req,
             *sm_p->cred_p,
             sm_p->object_ref.fs_id,
-            target_datafile_array[i],
-            iosm_p->io_type,
-            iosm_p->flowproto_type,
-            orig_index,
-            iosm_p->datafile_count,
-            iosm_p->dist_p,
-            iosm_p->file_req,
-            iosm_p->file_req_offset,
-            PINT_REQUEST_TOTAL_BYTES(iosm_p->mem_req));
+            cur_ctx->data_handle,
+            sm_p->u.io.io_type,
+            sm_p->u.io.flowproto_type,
+            sm_p->u.io.datafile_index_array[i],
+            attr->u.meta.dfile_count,
+            attr->u.meta.dist,
+            sm_p->u.io.file_req,
+            sm_p->u.io.file_req_offset,
+            PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
 
         /* fill in msgpair structure components */
-        msg_p->fs_id = sm_p->object_ref.fs_id;
-        msg_p->handle = target_datafile_array[i];
-        msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
-        msg_p->comp_fn = NULL;
-
-        ret = PINT_bucket_map_to_server(&msg_p->svr_addr,
-                                        msg_p->handle,
-                                        msg_p->fs_id);
+        msg->fs_id = sm_p->object_ref.fs_id;
+        msg->handle = cur_ctx->data_handle;
+        msg->retry_flag = PVFS_MSGPAIR_NO_RETRY;
+        msg->comp_fn = NULL;
+
+        ret = PINT_bucket_map_to_server(
+               &msg->svr_addr, msg->handle, msg->fs_id);
         if (ret)
         {
             gossip_err("Failed to map meta server address\n");
@@ -562,690 +507,503 @@ static int io_datafile_setup_msgpairs(PI
         }
     }
 
-    /* swap the new list in for the old one, freeing the old list */
-    if (!sm_p->acache_hit && iosm_p->datafile_handles)
-    {
-        free(iosm_p->datafile_handles);
-    }
-
-    if (target_datafile_array)
-    {
-        iosm_p->datafile_handles = target_datafile_array;
-    }
-
-    /*
-      store the original datafile_count before it's modified,
-      as we'll need it later in io_datafile_complete_msgpairs
-    */
-    iosm_p->orig_datafile_count = iosm_p->datafile_count;
-    iosm_p->datafile_count = target_datafile_count;
+    sm_p->u.io.datafile_count = target_datafile_count;
 
+    js_p->error_code = 0;
     return 1;
 
   malloc_error_exit:
-    CLEAN_PRIVATE_MEMBERS(iosm_p);
+    CLEAN_PRIVATE_MEMBERS(sm_p);
 
     js_p->error_code = -PVFS_ENOMEM;
     return 1;
 }
 
-/* io_datafile_post_msgpairs()
- *
- * This is basically a copy of msgpairarray.c:msgpairarray_post().
- * We need to handle the rest of the process somewhat differently though,
- * so we're going to have our own versions here.
- *
- * We use the msgarray to keep up with the initial send/recv pairs.
- *
- */
+/*
+  This is based on msgpairarray_post() in msgpairarray.c.  It's
+  different enough in that we don't have to wait on the msgpairarray
+  operations to all complete before posting flows as we can for each
+  server individually when we're ready.  this avoids the msgpairarray
+  sync point implicit in the design
+*/
 static int io_datafile_post_msgpairs(PINT_client_sm *sm_p,
                                      job_status_s *js_p)
 {
     int ret = -PVFS_EINVAL, i = 0;
+    unsigned long status_user_tag = 0;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
                  "state: post (%d message(s))\n", sm_p->msgarray_count);
 
+    if (sm_p->op_cancelled)
+    {
+        js_p->error_code = -PVFS_ECANCEL;
+        return 1;
+    }
+
     js_p->error_code = 0;
 
-    /* set number of operations that must complete.
-     *
-     * NOTE: we're using the comp_ct in the first msgpair
-     * entry to keep up with the count for the entire array.
-     */
     assert(sm_p->msgarray);
-    assert(sm_p->msgarray_count > 0);
+    assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
 
-    sm_p->msgarray[0].comp_ct = (2 * sm_p->msgarray_count);
+    /* completion count tracks sends/recvs separately */
+    sm_p->u.io.msgpair_completion_count = (2 * sm_p->u.io.datafile_count);
 
-    for (i = 0; i < sm_p->msgarray_count; i++)
+    for(i = 0; i < sm_p->u.io.datafile_count; i++)
     {
-        PVFS_msg_tag_t session_tag;
-        PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
-        assert(msg_p);
+        PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+        PINT_client_sm_msgpair_state *msg = &sm_p->msgarray[i];
+
+        assert(cur_ctx && msg);
+        assert(cur_ctx->msg == msg);
 
-        ret = PINT_encode(&msg_p->req,
-                          PINT_ENCODE_REQ,
-                          &msg_p->encoded_req,
-                          msg_p->svr_addr,
-                          sm_p->u.io.encoding);
-        if (ret != 0)
+        ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
+                          msg->svr_addr, sm_p->u.io.encoding);
+        if (ret)
         {
-            gossip_lerr("pint_encode failed\n");
-            js_p->error_code = -PVFS_EIO;
+            /*
+              FIXME: make this a clean error transition by
+              adjusting the completion count and/or (not) exiting
+            */
+            PVFS_perror_gossip("PINT_encode failed", ret);
+            js_p->error_code = ret;
             return 1;
         }
 
-        /* calculate maximum response message size and allocate space.
-         * fills in max_resp_sz, encoded_resp_p
-         */
-        msg_p->max_resp_sz = PINT_encode_calc_max_size(
-            PINT_ENCODE_RESP, msg_p->req.op, sm_p->u.io.encoding);
-        msg_p->encoded_resp_p = BMI_memalloc(
-            msg_p->svr_addr, msg_p->max_resp_sz, BMI_RECV);
-        if (!msg_p->encoded_resp_p)
+        /* calculate maximum response message size and allocate it */
+        msg->max_resp_sz = PINT_encode_calc_max_size(
+            PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
+        msg->encoded_resp_p = BMI_memalloc(
+            msg->svr_addr, msg->max_resp_sz, BMI_RECV);
+        if (!msg->encoded_resp_p)
         {
+            /* FIXME: see above FIXME */
             js_p->error_code = -PVFS_ENOMEM;
             return 1;
         }
 
-        /* get session tag to associate with send and receive.
-         * session tag is kept in io part of client state machine
-         * structure for use in the flow and option final ack.
-         */
-        session_tag = get_next_session_tag();
-        sm_p->u.io.session_tag_array[i] = session_tag;
+        /*
+          recalculate the status user tag based on this the progress
+          of the current context like this: status_user_tag = (4 *
+          (context index) + context phase)
+        */
+        assert(cur_ctx->index == i);
+        status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+        gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
+                     "status_user_tag=%lu (max_size %d)\n",
+                     status_user_tag, msg->max_resp_sz);
+
+        cur_ctx->session_tag = get_next_session_tag();
 
-        /* post receive of response; job_id stored in recv_id */
         ret = job_bmi_recv(
-            msg_p->svr_addr,
-            msg_p->encoded_resp_p,
-            msg_p->max_resp_sz,
-            session_tag,
-            BMI_PRE_ALLOC,
-            sm_p,
-            i,
-            &msg_p->recv_status,
-            &msg_p->recv_id,
-            pint_client_sm_context,
-	    PVFS2_CLIENT_JOB_TIMEOUT);
+            msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
+            cur_ctx->session_tag, BMI_PRE_ALLOC, sm_p, status_user_tag,
+            &msg->recv_status, &msg->recv_id, pint_client_sm_context,
+            PVFS2_CLIENT_JOB_TIMEOUT);
+
+        cur_ctx->msg_recv_has_been_posted = 1;
+        cur_ctx->msg_recv_in_progress = 1;
+
+        if (ret == 0)
+        {
+            int tmp = 0;
+            /* perform a quick test to see if the recv failed before
+             * posting the send; if it reports an error quickly then
+             * we can save the confusion of sending a request for
+             * which we can't recv a response
+             */
+            ret = job_test(msg->recv_id, &tmp, NULL,
+                           &msg->recv_status, 0,
+                           pint_client_sm_context);
+        }
 
-        if (ret != 0)
+        if ((ret < 0) || (ret == 1))
         {
-            /* it shouldn't be possible for the receive to complete
-             * before we send the request.
-             */
-            assert(ret != 1);
+            /*
+              this recv can't complete yet without errors because we
+              haven't sent the request yet
+            */
+            assert((ret < 0) || (msg->recv_status.error_code != 0));
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("Post of receive failed", ret);
+            }
+            else
+            {
+                PVFS_perror_gossip("Receive immediately failed",
+                                   msg->recv_status.error_code);
+            }
+
+            PVFS_perror_gossip("recv completed before send failure", ret);
 
-            gossip_lerr("post of receive failed (ret = %d)\n", ret);
-            js_p->error_code = -PVFS_EIO;
+            cur_ctx->msg_recv_in_progress = 0;
+            sm_p->u.io.msgpair_completion_count--;
+
+            /* FIXME: see above FIXME */
+            js_p->error_code = ret;
             return 1;
         }
+        assert(ret == 0);
+
+        assert(cur_ctx->index == i);
+        status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
+
+        gossip_debug(GOSSIP_IO_DEBUG," posting send with "
+                     "status_user_tag=%lu\n", status_user_tag);
 
-        /* post send of request; job_id stored in send_id */
         ret = job_bmi_send_list(
-            msg_p->encoded_req.dest,
-            msg_p->encoded_req.buffer_list,
-            msg_p->encoded_req.size_list,
-            msg_p->encoded_req.list_count,
-            msg_p->encoded_req.total_size,
-            session_tag,
-            msg_p->encoded_req.buffer_type,
-            1,
-            sm_p,
-            (sm_p->msgarray_count + i),
-            &msg_p->send_status,
-            &msg_p->send_id,
-            pint_client_sm_context,
-	    PVFS2_CLIENT_JOB_TIMEOUT);
+            msg->encoded_req.dest, msg->encoded_req.buffer_list,
+            msg->encoded_req.size_list, msg->encoded_req.list_count,
+            msg->encoded_req.total_size, cur_ctx->session_tag,
+            msg->encoded_req.buffer_type, 1, sm_p, status_user_tag,
+            &msg->send_status, &msg->send_id, pint_client_sm_context,
+            PVFS2_CLIENT_JOB_TIMEOUT);
 
-        if (ret < 0)
+        cur_ctx->msg_send_has_been_posted = 1;
+        cur_ctx->msg_send_in_progress = 1;
+
+        if ((ret < 0) ||
+            ((ret == 1) && (msg->send_status.error_code != 0)))
         {
-            gossip_lerr("post of send failed\n");
-            js_p->error_code = -PVFS_EIO;
-            return 1;
+            if (ret < 0)
+            {
+                PVFS_perror_gossip("Post of send failed", ret);
+            }
+            else
+            {
+                PVFS_perror_gossip("Send immediately failed",
+                    msg->recv_status.error_code);
+            }
+
+            msg->op_status = msg->send_status.error_code;
+            msg->send_id = 0;
+
+            /*
+              cancel the recv and decrement the completion count, but
+              still wait for the recv to complete
+            */
+            gossip_err("Send error: canceling recv.\n");
+
+            PINT_client_bmi_cancel(msg->recv_id);
+            cur_ctx->msg_send_in_progress = 0;
+            sm_p->u.io.msgpair_completion_count--;
+
+            /* FIXME: see above FIXME */
         }
         else if (ret == 1)
         {
-            /*
-              send completed immediately; decrement the completion
-              counter
-            */
             gossip_debug(
                 GOSSIP_IO_DEBUG, "  io_datafile_post_msgpairs: "
                 "send completed immediately.\n");
 
             /* 0 is the valid "completed job id" value */
-            msg_p->send_id = 0;
+            msg->send_id = 0;
 
-            if (msg_p->send_status.error_code != 0)
+            cur_ctx->msg_send_in_progress = 0;
+            sm_p->u.io.msgpair_completion_count--;
+
+            if (msg->send_status.error_code != 0)
             {
-                PVFS_perror_gossip(
-                    "sys-io.sm: msg_p->send_status.error_code",
-                    msg_p->send_status.error_code);
-                js_p->error_code = msg_p->send_status.error_code;
+                PVFS_perror_gossip("send status failure",
+                                   msg->send_status.error_code);
+
+                /* FIXME: see above FIXME */
+                js_p->error_code = msg->send_status.error_code;
                 return 1;
             }
-
-            /* decrement our count, since send is already done.
-             *
-             * recall we're using the comp_ct in the first array
-             * element to keep up with our count for the entire
-             * array.
-             */
-            sm_p->msgarray[0].comp_ct--;
         }
     }
+
+    gossip_debug(GOSSIP_IO_DEBUG, "io_datafile_post_msgpairs: "
+                 "completion count is %d\n",
+                 sm_p->u.io.msgpair_completion_count);
+
+    js_p->error_code = 0;
     return 0;
 }
 
-/* io_datafile_complete_msgpairs()
- *
- * This started off as a copy of msgpairarray.c:msgpairarray_complete(),
- * but we need to post flow operations as the bmi operations complete,
- * so we have some additional work to do.
- */
-static int io_datafile_complete_msgpairs(PINT_client_sm *sm_p,
-                                         job_status_s *js_p)
+/*
+  This state allows us to make sure all posted operations complete and
+  are accounted for.  since this handles ALL operation completions,
+  there's special case handling of completing the msgpair recv.  in
+  this case we post the flow operations as soon as we see them (the
+  main motivation for not using the common msgpairarray code).
+*/
+static int io_datafile_complete_operations(PINT_client_sm *sm_p,
+                                           job_status_s *js_p)
 {
-    int i = 0, ret = -1;
-    int matched_msgpair = 0, matched_flow = 0;
-    int matched_ack = 0, recv_match = -1;
-    job_id_t tmp_id;
-    PINT_client_sm_msgpair_state *msg_p = NULL;
+    int ret = -PVFS_EINVAL, index = 0;
+    unsigned long status_user_tag = (unsigned long)
+        js_p->status_user_tag;
+    PINT_client_io_ctx *cur_ctx = NULL;
+    int matched_send_or_recv = 0;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "io state: "
-                 "datafile_complete_msgpairs\n");
-
-    assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
-    assert(sm_p->msgarray[0].comp_ct >= 0);
-    assert(sm_p->u.io.flow_comp_ct >= 0);
-    assert(sm_p->u.io.ack_comp_ct >= 0);
-    assert(js_p->status_user_tag < (sm_p->msgarray_count*4));
-
-    /* if there are outstanding msgpairs to complete, try to match
-     * whatever completed with something in the msgpair array
-     */
-    if (sm_p->msgarray[0].comp_ct > 0)
-    {
-        if (js_p->status_user_tag < sm_p->msgarray_count)
-        {
-            /* first N user tags are for receives */
-            i = js_p->status_user_tag;
-            msg_p = &sm_p->msgarray[i];
-            matched_msgpair = 1;
-            recv_match = js_p->status_user_tag;
-            msg_p->recv_id     = 0;
-            msg_p->recv_status = *js_p;
-            assert(msg_p->recv_status.actual_size <= msg_p->max_resp_sz);
-            assert(msg_p->recv_status.error_code <= 0);
-        }
-        else if (js_p->status_user_tag < (sm_p->msgarray_count*2))
-        {
-            /* second N user tags are for sends */
-            i = js_p->status_user_tag - sm_p->msgarray_count;
-            msg_p = &sm_p->msgarray[i];
-            matched_msgpair = 1;
-            msg_p->send_id     = 0;
-            msg_p->send_status = *js_p;
-            assert(msg_p->send_status.error_code <= 0);
-        }
+                 "(%p) datafile_complete_operations "
+                 "(got user tag %lu)\n", sm_p, status_user_tag);
 
-        if (matched_msgpair)
+    if (js_p->error_code)
+    {
+        PVFS_perror_gossip("io_datafile_complete_operations failed",
+                           js_p->error_code);
+        /*
+          if we looped back here following an error, we need to figure
+          out what happened and cleanup properly
+        */
+        if ((sm_p->u.io.msgpair_completion_count == 0) &&
+            (sm_p->u.io.flow_completion_count == 0) &&
+            (sm_p->u.io.write_ack_completion_count == 0))
         {
-            /* decrement comp_ct until all operations have completed */
-            sm_p->msgarray[0].comp_ct--;
+            gossip_err("*** error path and all operations are "
+                       "complete\n");
 
-            if (sm_p->msgarray[0].comp_ct == 0)
-            {
-                gossip_debug(GOSSIP_IO_DEBUG, "all msgpairs complete.\n");
-            }
+            /* preserve this error code for io_analyze_results */
+            sm_p->u.io.stored_error_code = js_p->error_code;
+
+            js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
+            return 1;
         }
-    }
 
-    if (matched_msgpair && (recv_match == -1))
-    {
-        gossip_debug(GOSSIP_IO_DEBUG,
-                     "  matched send to %d; continuing.\n", i);
-        return 0;
+        gossip_err("*** error path with %d msgpairs pending, %d flows "
+                   "pending, %d write acks pending\n",
+                   sm_p->u.io.msgpair_completion_count,
+                   sm_p->u.io.flow_completion_count,
+                   sm_p->u.io.write_ack_completion_count);
     }
 
-    /* if we matched a receive, then we need to decode the receive,
-     * post the appropriate flow, and possibly post the receive of an ack
-     * BMI message (if a write operation).
-     *
-     * we use the same index into the various arrays in the io part of the
-     * state machine state that we used for this receive job (recv_match).
-     */
-    if (recv_match != -1)
-    {
-        struct PINT_decoded_msg decoded_resp;
-        struct PVFS_server_resp *resp_p = NULL;
-
-        gossip_debug(GOSSIP_IO_DEBUG, "  matched response from %d.\n", i);
-
-        ret = PINT_serv_decode_resp(
-	    msg_p->fs_id,
-            msg_p->encoded_resp_p, &decoded_resp, &msg_p->svr_addr,
-            msg_p->recv_status.actual_size, &resp_p);
-        if (ret != 0)
-        {
-            gossip_lerr("io_datafile_complete_msgpairs: decode error\n");
-            assert(0);
-        }
-
-        assert(resp_p->status <= 0);
-        msg_p->op_status = resp_p->status;
-
-        /* note: we saved the recv_status up above (before decoding) */
-        if ((msg_p->recv_status.error_code != 0) ||
-            (msg_p->op_status != 0))
+    assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
+    assert(sm_p->u.io.msgpair_completion_count > -1);
+    assert(sm_p->u.io.flow_completion_count > -1);
+    assert(sm_p->u.io.write_ack_completion_count > -1);
+
+    /* check if we're completing a send or recv msgpair */
+    if (STATUS_USER_TAG_IS_SEND_OR_RECV(status_user_tag))
+    {
+        assert(sm_p->u.io.msgpair_completion_count > -1);
+        /*
+          the completion count might be zero when recovering from a
+          cancellation
+        */
+        if (sm_p->u.io.msgpair_completion_count)
         {
-            if (msg_p->recv_status.error_code)
+            ret = complete_context_send_or_recv(sm_p, js_p);
+            if (ret < 0)
             {
-                PVFS_perror_gossip("io_datafile_complete_msgpairs",
-                                   msg_p->recv_status.error_code);
+                PVFS_perror_gossip(
+                    "complete_context_send_or_recv failed", ret);
+                js_p->error_code = ret;
+                return 1;
             }
-            gossip_debug(GOSSIP_IO_DEBUG, "  error %d with status %d "
-                         "related to response from %d; not submitting "
-                         "flow.\n", msg_p->recv_status.error_code,
-                         msg_p->op_status, recv_match);
-
-            if (msg_p->op_status)
+            else if (ret == 0)
             {
-		gossip_err("Error: op_status: %d, resetting to PVFS_EIO.\n",
-		    msg_p->op_status);
-                js_p->error_code = -PVFS_EIO;
-                return 1;
+                gossip_debug(GOSSIP_IO_DEBUG, "  matched send in context "
+                             "%d; continuing.\n", index);
+                js_p->error_code = 0;
+                return 0;
             }
+            assert(ret == IO_RECV_COMPLETED);
 
-            ret = PINT_serv_free_msgpair_resources(
-                &msg_p->encoded_req, msg_p->encoded_resp_p,
-                &decoded_resp, &msg_p->svr_addr, msg_p->max_resp_sz);
-            assert(ret == 0);
-
-            return 0;
+            matched_send_or_recv = 1;
         }
-        else
-        {
-            flow_descriptor *fl_p = NULL;
-            struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
-
-            gossip_debug(GOSSIP_IO_DEBUG, "  building flow for %d.\n",
-                         recv_match);
+    }
 
-            /* find matching flow descriptor */
-            fl_p = &sm_p->u.io.flow_array[recv_match];
+    /* if we've just completed a recv above, post the flow here */
+    if (ret == IO_RECV_COMPLETED)
+    {
+        ret = process_context_recv_and_post_flow(sm_p, js_p);
+        if (ret < 0)
+        {
+            char buf[64] = {0};
+            PVFS_strerror_r(ret, buf, 64);
 
-            /* fill in file_data structure */
-            fl_p->file_data.fsize = resp_p->u.io.bstream_size;
-            fl_p->file_data.dist = iosm_p->dist_p;
-            fl_p->file_data.server_nr =
-                iosm_p->datafile_index_array[recv_match];
-            fl_p->file_data.server_ct = iosm_p->orig_datafile_count;
-
-            /* this is the file datatype */
-            fl_p->file_req = iosm_p->file_req;
-            fl_p->file_req_offset = iosm_p->file_req_offset;
+            gossip_err("process_context_recv_and_post_flow "
+                       "failed: %s (%d remaining msgpairs)\n", buf,
+                       sm_p->u.io.msgpair_completion_count);
 
-            /* this is the memory datatype */
-            fl_p->mem_req = iosm_p->mem_req;
+            js_p->error_code = ret;
+            return 1;
+        }
+    }
 
+    /* check if we've completed all msgpairs and posted all flows */
+    if (matched_send_or_recv)
+    {
+        if (sm_p->u.io.msgpair_completion_count == 0)
+        {
+            gossip_debug(GOSSIP_IO_DEBUG, "*** all msgpairs complete "
+                         "(all flows posted)\n");
+        }
+        else
+        {
             gossip_debug(
-                GOSSIP_IO_DEBUG, "    bstream_size = %Ld, datafile_nr = "
-                "%d, datafile_ct = %d, file_req_off = %Ld\n",
-                Ld(fl_p->file_data.fsize), fl_p->file_data.server_nr,
-                fl_p->file_data.server_ct, Ld(fl_p->file_req_offset));
-
-            /* session tag, same as the one used in the send/recv */
-            fl_p->tag = iosm_p->session_tag_array[recv_match];
-
-            /* this user_ptr should be left alone */
-            fl_p->user_ptr = NULL;
-
-            fl_p->type = iosm_p->flowproto_type;
-
-            /* done with msgpair resources; free them now */
-            ret = PINT_serv_free_msgpair_resources(
-                &msg_p->encoded_req, msg_p->encoded_resp_p,
-                &decoded_resp, &msg_p->svr_addr, msg_p->max_resp_sz);
-            assert(ret == 0);
-            
-            resp_p = NULL;
+                GOSSIP_IO_DEBUG, "*** %d msgpairs completions "
+                "pending\n", sm_p->u.io.msgpair_completion_count);
+        }
+        return 0;
+    }
 
-            if (iosm_p->io_type == PVFS_IO_READ)
-            {
-                /* Set up read-specific values
-                 * - don't keep going past EOF
-                 * - data is coming from BMI and going into memory
-                 * - data is coming from the same server we got the recv from
-                 */
-                fl_p->file_data.extend_flag = 0;
-                fl_p->src.endpoint_id = BMI_ENDPOINT;
-                fl_p->src.u.bmi.address = msg_p->svr_addr;
-                fl_p->dest.endpoint_id = MEM_ENDPOINT;
-                fl_p->dest.u.mem.buffer = iosm_p->buffer;
-            }
-            else
-            {
-                assert (iosm_p->io_type == PVFS_IO_WRITE);
+    /* at this point, we're either completing a flow or a write ack */
+    if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FLOW))
+    {
+        assert(sm_p->u.io.flow_completion_count);
 
-                /* do allow writes to extend the file */
-                fl_p->file_data.extend_flag = 1;
-                fl_p->src.endpoint_id = MEM_ENDPOINT;
-                fl_p->src.u.mem.buffer = iosm_p->buffer;
-                fl_p->dest.endpoint_id = BMI_ENDPOINT;
-                fl_p->dest.u.bmi.address = msg_p->svr_addr;
-
-                gossip_debug(GOSSIP_IO_DEBUG, "  preposting write "
-                             "ack for %d.\n", recv_match);
-
-                /* prepost the ack receive now */
-                iosm_p->ackarray[recv_match].max_resp_sz =
-                    PINT_encode_calc_max_size(
-                        PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
-                        iosm_p->encoding);
-                iosm_p->ackarray[recv_match].encoded_resp_p =
-                    BMI_memalloc(
-                        msg_p->svr_addr,
-                        iosm_p->ackarray[recv_match].max_resp_sz,
-                        BMI_RECV);
-                assert(iosm_p->ackarray[recv_match].encoded_resp_p);
-
-                ret = job_bmi_recv(
-                    msg_p->svr_addr,
-                    iosm_p->ackarray[recv_match].encoded_resp_p,
-                    iosm_p->ackarray[recv_match].max_resp_sz,
-                    iosm_p->session_tag_array[recv_match],
-                    BMI_PRE_ALLOC,
-                    sm_p,
-                    (sm_p->msgarray_count*3 + recv_match),
-                    &iosm_p->ackarray[recv_match].recv_status,
-                    &iosm_p->ackarray[recv_match].recv_id,
-                    pint_client_sm_context,
-		    PVFS2_CLIENT_JOB_TIMEOUT);
+        index = STATUS_USER_TAG_GET_INDEX(
+            status_user_tag, IO_SM_PHASE_FLOW);
+        cur_ctx = &sm_p->u.io.contexts[index];
+        assert(cur_ctx);
 
-                if (ret < 0)
-                {
-                    gossip_lerr("post of write ack failed\n");
-                    assert(0);
-                }
-                assert(ret == 0);
-                
-                iosm_p->ack_comp_ct++;
-            }
+        cur_ctx->flow_status = *js_p;
 
-            /* post the flow:
-             * - pass sm_p in as user pointer, so we get called again
-             * - have status stored in flow status array, if we complete
-             *   immed.
-             * - save id in flow id array
-             * - use recv_match as index for all this
-             */
-            ret = job_flow(
-                fl_p, sm_p, (sm_p->msgarray_count*2 + recv_match),
-                &iosm_p->flow_status_array[recv_match], &tmp_id,
-                pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+        gossip_debug(GOSSIP_IO_DEBUG, "  matched completed flow for "
+                     "context %p\n", cur_ctx);
 
-            if (ret < 0)
-            {
-                gossip_lerr("post of flow failed.\n");
-                assert(0);
-            }
-            else if (ret == 1)
-            {
-                /* flow completed immediately */
-                gossip_debug(GOSSIP_IO_DEBUG, "  flow for %d completed "
-                             "immediately!\n", recv_match);
-                assert(!iosm_p->flow_status_array[recv_match].error_code);
-            }
-            else
-            {
-                assert(ret == 0);
+        cur_ctx->flow_in_progress = 0;
+        sm_p->u.io.flow_completion_count--;
+        assert(sm_p->u.io.flow_completion_count > -1);
+    }
+    else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
+    {
+        assert(sm_p->u.io.write_ack_completion_count);
 
-                gossip_debug(GOSSIP_IO_DEBUG, "  posted flow for %d.\n",
-                             recv_match);
+        index = STATUS_USER_TAG_GET_INDEX(
+            status_user_tag, IO_SM_PHASE_FINAL_ACK);
+        cur_ctx = &sm_p->u.io.contexts[index];
+        assert(cur_ctx);
 
-                /* record that we have a flow that still needs to complete */
-                iosm_p->flow_comp_ct++;
-            }
-        }
-        return 0;
-    }
+        assert(cur_ctx->write_ack.recv_status.actual_size <=
+               cur_ctx->write_ack.max_resp_sz);
 
-    /* check to ensure we caught all exits above */
-    assert(matched_msgpair == 0);
+        cur_ctx->write_ack.recv_id = 0;
+        cur_ctx->write_ack.recv_status = *js_p;
 
-    /* if we didn't match to a message pair, then we completed either
-     * a flow or a write ack
-     */
-    gossip_debug(GOSSIP_IO_DEBUG, "  trying to match to a flow or ack.\n");
-
-    if ((js_p->status_user_tag >= (sm_p->msgarray_count*2)) &&
-        (js_p->status_user_tag < (sm_p->msgarray_count*3)))
-    {
-        /* the third set of N user tags correspond to flows */
-        matched_flow = 1;
-        i = js_p->status_user_tag - (sm_p->msgarray_count*2);
-        sm_p->u.io.flow_status_array[js_p->status_user_tag -
-                                     (sm_p->msgarray_count*2)] = *js_p;
-        sm_p->u.io.flow_comp_ct--;
-        gossip_debug(GOSSIP_IO_DEBUG,
-                     "  matched completed flow for %d\n", i);
+        gossip_debug(GOSSIP_IO_DEBUG, "  matched completed ack for "
+                     "context %p\n", cur_ctx);
 
-        assert(sm_p->u.io.flow_comp_ct >= 0);
+        cur_ctx->write_ack_in_progress = 0;
+        sm_p->u.io.write_ack_completion_count--;
+        assert(sm_p->u.io.write_ack_completion_count > -1);
     }
-    else if (js_p->status_user_tag >= (sm_p->msgarray_count*3))
-    {
-        /* the fourth set of N user tags correspond to final acks */
-        matched_ack = 1;
-        i = js_p->status_user_tag - (sm_p->msgarray_count*3);
-        sm_p->u.io.ackarray[i].recv_id      = 0;
-        sm_p->u.io.ackarray[i].recv_status = *js_p;
-        assert(sm_p->u.io.ackarray[i].recv_status.actual_size <=
-               sm_p->u.io.ackarray[i].max_resp_sz);
-        sm_p->u.io.ack_comp_ct--;
-        gossip_debug(GOSSIP_IO_DEBUG,
-                     "  matched completed ack for %d\n", i);
 
-        assert(sm_p->u.io.ack_comp_ct >= 0);
-    }
-    
-    if ((sm_p->u.io.flow_comp_ct == 0) &&
-        (sm_p->u.io.ack_comp_ct == 0) &&
-        (sm_p->msgarray[0].comp_ct == 0))
-    {
-        gossip_debug(GOSSIP_IO_DEBUG,
-                     "  all msgpairs, flows, ack completed.\n");
+    js_p->error_code = 0;
+
+    if ((sm_p->u.io.msgpair_completion_count == 0) &&
+        (sm_p->u.io.flow_completion_count == 0) &&
+        (sm_p->u.io.write_ack_completion_count == 0))
+    {
+        gossip_debug(GOSSIP_IO_DEBUG, "*** all operations %s "
+                     "(msgspairs, flows, and write acks)\n",
+                     (sm_p->op_cancelled ? "cancelled" : "completed"));
 
         js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
         return 1;
     }
 
-    /* there's still something left to transfer */
-    return 0;
-}
-
-static int io_object_getattr_failure(PINT_client_sm *sm_p,
-                                     job_status_s *js_p)
-{
-    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
-                 "io_object_getattr_failure\n", sm_p);
-
-    /*
-      NOTE: this can easily happen if we're doing I/O on a file that
-      was removed by another process
-    */
-    if (js_p->error_code == 0)
+    if (sm_p->op_cancelled)
     {
-        js_p->error_code = -PVFS_ENOENT;
+        gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
+                     "%d flow(s) and %d write ack(s) pending\n",
+                     sm_p->u.io.flow_completion_count,
+                     sm_p->u.io.write_ack_completion_count);
     }
-
-    sm_p->u.io.stored_error_code = js_p->error_code;
-    return 1;
+    else
+    {
+        gossip_debug(GOSSIP_IO_DEBUG, " %d flows pending, %d write acks "
+                     "pending\n", sm_p->u.io.flow_completion_count,
+                     sm_p->u.io.write_ack_completion_count);
+    }
+    return 0;
 }
 
 static int io_analyze_results(PINT_client_sm *sm_p,
                               job_status_s *js_p)
 {
-    int i = 0, ret = -PVFS_EINVAL, error = 0, error_count = 0;
+    int ret = -PVFS_EINVAL, i = 0;
     PVFS_size total_size = 0;
-    struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
 
     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
-                 "analyze_results\n", sm_p);
+                 "io_analyze_results\n", sm_p);
 
     if (js_p->error_code != IO_DATAFILE_TRANSFERS_COMPLETE)
     {
-        /* some sort of error occurred early on */
-        js_p->error_code = (sm_p->u.io.stored_error_code ?
-                            sm_p->u.io.stored_error_code :
-                            js_p->error_code);
-        if (js_p->error_code == 0)
+        ret = (sm_p->u.io.stored_error_code ?
+               sm_p->u.io.stored_error_code :
+               js_p->error_code);
+
+        if (ret == 0)
         {
-	    gossip_err("Error: lost error_code; resetting to PVFS_EIO.\n");
-            js_p->error_code = -PVFS_EIO;
+            ret = (sm_p->op_cancelled ? -PVFS_ECANCEL : -PVFS_EIO);
         }
-        error = js_p->error_code;
     }
-    else
+    else if (!sm_p->op_cancelled)
     {
-        /* it's possible that some error occurred still.  look through
-         * all the messages, reporting errors, saving the first one to
-         * return, and adding up the size of the transfer (just in
-         * case things actually completed).
-         */
+        /*
+          look through all the contexts for errors, saving the first
+          one to return (if any) while adding up the size of the
+          transfer (in case things actually completed).
+        */
+        assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
         for(i = 0; i < sm_p->msgarray_count; i++)
         {
-            PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+            PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+            assert(cur_ctx);
 
-            if (msg_p->send_status.error_code != 0)
+            ret = check_context_status(
+                cur_ctx, sm_p->u.io.io_type, &total_size);
+            if (ret < 0)
             {
-                gossip_debug(GOSSIP_IO_DEBUG,
-                             "  error (%d) in msgpair send for %d.\n",
-                             msg_p->send_status.error_code, i);
-                error_count++;
-                if (error == 0)
+                if (ret == -PVFS_ECANCEL)
                 {
-                    error = msg_p->send_status.error_code;
+                    gossip_debug(GOSSIP_IO_DEBUG, "*** I/O operation "
+                                 "cancelled\n");
                 }
-            }
-            else if (msg_p->recv_status.error_code != 0)
-            {
-                gossip_debug(GOSSIP_IO_DEBUG,
-                             "  error (%d) in msgpair recv for %d.\n",
-                             msg_p->recv_status.error_code, i);
-                error_count++;
-                if (error == 0)
+                else
                 {
-                    error = msg_p->recv_status.error_code;
+                    PVFS_perror_gossip(
+                        "check_context_status found error", ret);
                 }
+                break;
             }
-            else if (iosm_p->flow_array[i].file_req != NULL)
-            {
-                /* NOTE: this is a little bit of a hack to determine if we
-                 * have actually used this flow descriptor; we don't
-                 * really care what the file_req is
-                 */
-                if (iosm_p->flow_status_array[i].error_code != 0)
-                {
-                    gossip_debug(
-                        GOSSIP_IO_DEBUG, "  error (%d) in flow for %d.\n",
-                        iosm_p->flow_status_array[i].error_code, i);
-                    error_count++;
-                    if (error == 0)
-                    {
-                        error = iosm_p->flow_status_array[i].error_code;
-                    }
-                }
-                else if (iosm_p->io_type == PVFS_IO_READ)
-                {
-                    /* size for reads is reported in the flow;
-                     * size for writes is reported in the final ack.
-                     */
-                    gossip_debug(
-                        GOSSIP_IO_DEBUG, "  %Ld bytes read from %d.\n",
-                        Ld(iosm_p->flow_array[i].total_transfered), i);
-                    total_size += iosm_p->flow_array[i].total_transfered;
-                }
-                else if (iosm_p->io_type == PVFS_IO_WRITE)
-                {
-                    if (iosm_p->ackarray[i].recv_status.error_code != 0)
-                    {
-                        gossip_debug(
-                            GOSSIP_IO_DEBUG,
-                            "  error (%d) in write ack for %d.\n",
-                            iosm_p->ackarray[i].recv_status.error_code,
-                            i);
-                        error_count++;
-                        if (error == 0)
-                        {
-                            error = iosm_p->ackarray[i].recv_status.error_code;
-                        }
-                    }
-                    else
-                    {
-                        struct PINT_decoded_msg decoded_resp;
-                        struct PVFS_server_resp *resp_p = NULL;
-
-                        /* we need to decode the write ack so we can
-                         * see how much data we were actually able to
-                         * write.
-                         *
-                         * note: we just use the svr_addr from the
-                         * msgpair; it's the same address.
-                         */
-                        ret = PINT_serv_decode_resp(
-			    msg_p->fs_id,
-                            iosm_p->ackarray[i].encoded_resp_p,
-                            &decoded_resp,
-                            &msg_p->svr_addr,
-                            iosm_p->ackarray[i].recv_status.actual_size,
-                            &resp_p);
-                        assert(ret == 0);
-
-                        gossip_debug(
-                            GOSSIP_IO_DEBUG, "%Ld bytes written to %d.\n",
-                            Ld(resp_p->u.write_completion.total_completed),
-                            i);
-                        total_size +=
-                            resp_p->u.write_completion.total_completed;
 
-                        PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
-                    }
-                }
+            gossip_debug(GOSSIP_IO_DEBUG, "[%d/%d] running size is %Ld\n",
+                         (i + 1), sm_p->msgarray_count, Ld(total_size));
+        }
 
-                /* free resources associated with this flow array
-                 * element: - write ack, if it was a write (we decoded
-                 * it if we needed to above, and got rid of that
-                 * already)
-                 */
-                PINT_flow_reset(&sm_p->u.io.flow_array[i]);
-                if (iosm_p->io_type == PVFS_IO_WRITE)
-                {
-                    BMI_memfree(msg_p->svr_addr,
-                                iosm_p->ackarray[i].encoded_resp_p,
-                                iosm_p->ackarray[i].max_resp_sz,
-                                BMI_RECV);
-                }
-            }
+        /*
+          at this point, we know an error occurred.  if we couldn't
+          find any errors in the context, use the preserved error code
+          from the complete_operations state
+        */
+        if (ret == 0)
+        {
+            char buf[64] = {0};
+
+            ret = (sm_p->op_cancelled ? -PVFS_ECANCEL :
+                   sm_p->u.io.stored_error_code);
+
+            PVFS_strerror_r(ret, buf, 64);
+            gossip_debug(GOSSIP_IO_DEBUG, "no context errors found; "
+                         "using: %s\n", buf);
         }
     }
-
-    if (!sm_p->acache_hit)
+    else
     {
-        free(sm_p->u.io.datafile_handles);
-        sm_p->u.io.datafile_handles = NULL;
+        ret = (sm_p->op_cancelled ? -PVFS_ECANCEL : -PVFS_EIO);
     }
 
+    /* be sure there are no jobs still laying around */
+    assert((sm_p->u.io.msgpair_completion_count == 0) &&
+           (sm_p->u.io.flow_completion_count == 0) &&
+           (sm_p->u.io.write_ack_completion_count == 0));
+
     /*
       FIXME: non bmi errors pop out in flow failures above -- they are
       not properly marked as flow errors either, so we check for them
       explicitly here (but not all -- fix it for real).
     */
-    if (((PVFS_ERROR_CLASS(-error) == PVFS_ERROR_BMI) ||
-         (PVFS_ERROR_CLASS(-error) == PVFS_ERROR_FLOW) ||
-         (error == -ECONNRESET)) &&
+    if (((PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_BMI) ||
+         (PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_FLOW) ||
+         (ret == -ECONNRESET) || (ret == -PVFS_EPROTO)) &&
         (sm_p->u.io.retry_count < PVFS2_CLIENT_RETRY_LIMIT))
     {
         if (sm_p->acache_hit || sm_p->pinode)
@@ -1255,6 +1013,8 @@ static int io_analyze_results(PINT_clien
             sm_p->pinode = NULL;
         }
 
+        assert(!sm_p->op_cancelled);
+
         sm_p->u.io.stored_error_code = 0;
         sm_p->u.io.retry_count++;
 
@@ -1265,44 +1025,483 @@ static int io_analyze_results(PINT_clien
         return 1;
     }
 
-    CLEAN_PRIVATE_MEMBERS(iosm_p);
+    gossip_debug(GOSSIP_IO_DEBUG, "total bytes transferred is %Ld\n",
+                 Ld(total_size));
 
-    /* return size, error, and set operation as complete */
     sm_p->u.io.io_resp_p->total_completed = total_size;
-    sm_p->error_code = (sm_p->u.io.stored_error_code ?
-                        sm_p->u.io.stored_error_code : error);
-    sm_p->op_complete = 1;
+    js_p->error_code = ret;
+
+    return 1;
+}
+
+static int io_cleanup(PINT_client_sm *sm_p,
+                      job_status_s *js_p)
+{
+    gossip_debug(GOSSIP_CLIENT_DEBUG,
+                 "(%p) io state: io_cleanup\n", sm_p);
+
+    CLEAN_PRIVATE_MEMBERS(sm_p);
+
+    sm_p->error_code = js_p->error_code;
 
+    if (sm_p->error_code)
+    {
+        char buf[64] = {0};
+
+        PVFS_strerror_r(sm_p->error_code, buf, 64);
+        gossip_debug(GOSSIP_IO_DEBUG,
+                     "*** Final I/O operation error is %s\n", buf);
+    }
+
+    sm_p->op_complete = 1;
     return 0;
 }
 
 /********************************************************************/
 
-/* io_find_target_datafiles()
- *
- * determines what subset of the datafiles actually contain data that
- * we are interested in for this request
- *
- * returns 0 on success, -pvfs_error on failure
- *
- */
-static int io_find_target_datafiles(PVFS_Request mem_req,
-                                    PVFS_Request file_req,
-                                    PVFS_offset file_req_offset,
-                                    PINT_dist *dist_p,
-                                    PVFS_handle *input_handle_array,
-                                    int input_handle_count,
-                                    PVFS_handle *output_handle_array,
-                                    int* handle_index_array,
-                                    int *handle_count_out_p)
+/*
+  returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
+  and -PVFS_error on failure
+*/
+static inline int complete_context_send_or_recv(
+    PINT_client_sm *sm_p,
+    job_status_s *js_p)
 {
-    int i, ret;
+    int ret = -PVFS_EINVAL, index = 0;
+    unsigned long status_user_tag = 0;
+    PINT_client_io_ctx *cur_ctx = NULL;
+    PINT_client_sm_msgpair_state *msg = NULL;
+
+    gossip_debug(GOSSIP_IO_DEBUG,
+                 "- complete_context_send_or_recv called\n");
+
+    assert(sm_p && js_p);
+
+    status_user_tag = (unsigned long)js_p->status_user_tag;
+
+    if (STATUS_USER_TAG_TYPE(
+            status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV))
+    {
+        index = STATUS_USER_TAG_GET_INDEX(
+            status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+        gossip_debug(GOSSIP_IO_DEBUG, "got a recv completion with "
+                     "context index %d\n", index);
+
+        cur_ctx = &sm_p->u.io.contexts[index];
+        assert(cur_ctx);
+
+        msg = &sm_p->msgarray[index];
+        msg->recv_id = 0;
+        msg->recv_status = *js_p;
+
+        assert(msg->recv_status.error_code <= 0);
+        assert(msg->recv_status.actual_size <= msg->max_resp_sz);
+
+        cur_ctx->msg_recv_in_progress = 0;
+        sm_p->u.io.msgpair_completion_count--;
+
+        ret = IO_RECV_COMPLETED;
+    }
+    else if (STATUS_USER_TAG_TYPE(
+                 status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
+    {
+        index = STATUS_USER_TAG_GET_INDEX(
+            status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+        gossip_debug(GOSSIP_IO_DEBUG, "got a send completion with "
+                     "context index %d\n", index);
+
+        cur_ctx = &sm_p->u.io.contexts[index];
+        assert(cur_ctx);
+
+        msg = &sm_p->msgarray[index];
+        msg->send_id = 0;
+        msg->send_status = *js_p;
+
+        assert(msg->send_status.error_code <= 0);
+
+        cur_ctx->msg_send_in_progress = 0;
+        sm_p->u.io.msgpair_completion_count--;
+
+        ret = 0;
+    }
+    return ret;
+}
+
+static inline int process_context_recv(
+    PINT_client_io_ctx *cur_ctx,
+    struct PINT_decoded_msg *decoded_resp,
+    struct PVFS_server_resp **resp)
+{
+    int ret = -PVFS_EINVAL;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "- process_context_recv called\n");
+
+    assert(cur_ctx && cur_ctx->msg && decoded_resp && resp);
+
+    ret = PINT_serv_decode_resp(
+        cur_ctx->msg->fs_id, cur_ctx->msg->encoded_resp_p, decoded_resp,
+        &cur_ctx->msg->svr_addr,
+        cur_ctx->msg->recv_status.actual_size, resp);
+
+    if (ret)
+    {
+        PVFS_perror("PINT_server_decode_resp failed", ret);
+        return ret;
+    }
+
+    assert((*resp)->status < 1);
+    cur_ctx->msg->op_status = (*resp)->status;
+
+    if (cur_ctx->msg->recv_status.error_code || cur_ctx->msg->op_status)
+    {
+        gossip_err("  error %d with status %d related to response "
+                   "from context %p; not submitting flow.\n",
+                   cur_ctx->msg->recv_status.error_code,
+                   cur_ctx->msg->op_status, cur_ctx);
+
+        if (cur_ctx->msg->recv_status.error_code)
+        {
+            PVFS_perror_gossip(
+                "process_context_recv (recv_status.error_code)",
+                cur_ctx->msg->recv_status.error_code);
+            ret = cur_ctx->msg->recv_status.error_code;
+        }
+        else if (cur_ctx->msg->op_status)
+        {
+            PVFS_perror_gossip("process_context_recv (op_status)",
+                               cur_ctx->msg->op_status);
+            ret = cur_ctx->msg->op_status;
+        }
+
+        PINT_serv_free_msgpair_resources(
+            &cur_ctx->msg->encoded_req, cur_ctx->msg->encoded_resp_p,
+            decoded_resp, &cur_ctx->msg->svr_addr,
+            cur_ctx->msg->max_resp_sz);
+    }
+    return ret;
+}
+
+static inline int build_context_flow(
+    PINT_client_sm *sm_p,
+    PINT_client_io_ctx *cur_ctx,
+    PVFS_object_attr *attr,
+    struct PVFS_server_resp *resp)
+{
+    gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
+
+    if (!sm_p || !cur_ctx || !attr || !resp)
+    {
+        return -PVFS_EINVAL;
+    }
+
+    cur_ctx->flow_desc.file_data.fsize = resp->u.io.bstream_size;
+    cur_ctx->flow_desc.file_data.dist = attr->u.meta.dist;
+    cur_ctx->flow_desc.file_data.server_nr = cur_ctx->server_nr;
+    cur_ctx->flow_desc.file_data.server_ct = attr->u.meta.dfile_count;
+
+    cur_ctx->flow_desc.file_req = sm_p->u.io.file_req;
+    cur_ctx->flow_desc.file_req_offset = sm_p->u.io.file_req_offset;
+
+    cur_ctx->flow_desc.mem_req = sm_p->u.io.mem_req;
+
+    gossip_debug(
+        GOSSIP_IO_DEBUG, "    bstream_size = %Ld, datafile_nr = "
+        "%d, datafile_ct = %d, file_req_off = %Ld\n",
+        Ld(cur_ctx->flow_desc.file_data.fsize),
+        cur_ctx->flow_desc.file_data.server_nr,
+        cur_ctx->flow_desc.file_data.server_ct,
+        Ld(cur_ctx->flow_desc.file_req_offset));
+
+    cur_ctx->flow_desc.tag = cur_ctx->session_tag;
+    cur_ctx->flow_desc.type = sm_p->u.io.flowproto_type;
+    cur_ctx->flow_desc.user_ptr = NULL;
+
+    if (sm_p->u.io.io_type == PVFS_IO_READ)
+    {
+        cur_ctx->flow_desc.file_data.extend_flag = 0;
+        cur_ctx->flow_desc.src.endpoint_id = BMI_ENDPOINT;
+        cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg->svr_addr;
+        cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
+        cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
+    }
+    else
+    {
+        assert(sm_p->u.io.io_type == PVFS_IO_WRITE);
+
+        cur_ctx->flow_desc.file_data.extend_flag = 1;
+        cur_ctx->flow_desc.src.endpoint_id = MEM_ENDPOINT;
+        cur_ctx->flow_desc.src.u.mem.buffer = sm_p->u.io.buffer;
+        cur_ctx->flow_desc.dest.endpoint_id = BMI_ENDPOINT;
+        cur_ctx->flow_desc.dest.u.bmi.address = cur_ctx->msg->svr_addr;
+    }
+    return 0;
+}
+
+static inline int process_context_recv_and_post_flow(
+    PINT_client_sm *sm_p,
+    job_status_s *js_p)
+{
+    int ret = -PVFS_EINVAL, index = 0;
+    unsigned long status_user_tag = 0;
+    struct PINT_decoded_msg decoded_resp;
+    struct PVFS_server_resp *resp = NULL;
+    PVFS_object_attr *attr = NULL;
+    PINT_client_io_ctx *cur_ctx = NULL;
+
+    gossip_debug(GOSSIP_IO_DEBUG,
+                 "- process_context_recv_and_post_flow called\n");
+
+    assert(sm_p && js_p);
+    assert(STATUS_USER_TAG_TYPE(
+               status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV));
+
+    status_user_tag = (unsigned long)js_p->status_user_tag;
+
+    index = STATUS_USER_TAG_GET_INDEX(
+        status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+    cur_ctx = &sm_p->u.io.contexts[index];
+    assert(cur_ctx && cur_ctx->msg);
+
+    if (js_p->error_code)
+    {
+        PVFS_perror_gossip("pre-process_context_recv failed",
+                           js_p->error_code);
+        return js_p->error_code;
+    }
+
+    ret = process_context_recv(cur_ctx, &decoded_resp, &resp);
+    if (ret)
+    {
+        PVFS_perror_gossip("process_context_recv failed", ret);
+        return ret;
+    }
+
+    attr = (sm_p->acache_hit ? &sm_p->pinode->attr : &sm_p->acache_attr);
+    assert(attr);
+
+    ret = build_context_flow(sm_p, cur_ctx, attr, resp);
+    if (ret < 0)
+    {
+        PVFS_perror_gossip("build_context_flow failed", ret);
+        return ret;
+    }
+
+    ret = PINT_serv_free_msgpair_resources(
+        &cur_ctx->msg->encoded_req, cur_ctx->msg->encoded_resp_p,
+        &decoded_resp, &cur_ctx->msg->svr_addr,
+        cur_ctx->msg->max_resp_sz);
+
+    if (ret)
+    {
+        PVFS_perror_gossip("PINT_serv_free_msgpair_resources "
+                           "failed", ret);
+        return ret;
+    }
+
+    if (sm_p->u.io.io_type == PVFS_IO_WRITE)
+    {
+        gossip_debug(GOSSIP_IO_DEBUG, "  preposting write "
+                     "ack for context %p.\n", cur_ctx);
+
+        cur_ctx->write_ack.max_resp_sz = PINT_encode_calc_max_size(
+            PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
+            sm_p->u.io.encoding);
+        cur_ctx->write_ack.encoded_resp_p = BMI_memalloc(
+            cur_ctx->msg->svr_addr, cur_ctx->write_ack.max_resp_sz,
+            BMI_RECV);
+
+        if (!cur_ctx->write_ack.encoded_resp_p)
+        {
+            gossip_err("BMI_memalloc (for write ack) failed\n");
+            return -PVFS_ENOMEM;
+        }
+
+        /*
+          we're pre-posting the final write ack here, even though it's
+          ahead of the flow phase; reads are at the flow phase.
+        */
+        status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
+
+        ret = job_bmi_recv(
+            cur_ctx->msg->svr_addr, cur_ctx->write_ack.encoded_resp_p,
+            cur_ctx->write_ack.max_resp_sz, cur_ctx->session_tag,
+            BMI_PRE_ALLOC, sm_p, status_user_tag,
+            &cur_ctx->write_ack.recv_status, &cur_ctx->write_ack.recv_id,
+            pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+
+        if (ret < 0)
+        {
+            gossip_err("job_bmi_recv (write ack) failed\n");
+            return ret;
+        }
+
+        assert(ret == 0);
+        cur_ctx->write_ack_has_been_posted = 1;
+        cur_ctx->write_ack_in_progress = 1;
+        sm_p->u.io.write_ack_completion_count++;
+    }
+
+    status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
+
+    ret = job_flow(
+        &cur_ctx->flow_desc, sm_p, status_user_tag,
+        &cur_ctx->flow_status, &cur_ctx->flow_job_id,
+        pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+
+    if (ret < 0)
+    {
+        gossip_err("job_flow failed\n");
+        return ret;
+    }
+    else if (ret == 1)
+    {
+        gossip_debug(GOSSIP_IO_DEBUG, "  flow for context %p "
+                     "completed immediately\n", cur_ctx);
+        assert(cur_ctx->flow_status.error_code == 0);
+    }
+    else
+    {
+        gossip_debug(GOSSIP_IO_DEBUG, "  posted flow for "
+                     "context %p\n", cur_ctx);
+
+        cur_ctx->flow_has_been_posted = 1;
+        cur_ctx->flow_in_progress = 1;
+        sm_p->u.io.flow_completion_count++;
+    }
+    return ret;
+}
+
+static inline int check_context_status(
+    PINT_client_io_ctx *cur_ctx,
+    int io_type,
+    PVFS_size *total_size)
+{
+    int ret = 0;
+
+    gossip_debug(GOSSIP_IO_DEBUG, "- check_context_status called\n");
+
+    assert(cur_ctx && cur_ctx->msg && total_size);
+
+    if (cur_ctx->msg->send_status.error_code)
+    {
+        gossip_debug(GOSSIP_IO_DEBUG,
+                     "  error (%d) in msgpair send for context %p\n",
+                     cur_ctx->msg->send_status.error_code, cur_ctx);
+
+        assert(cur_ctx->msg_send_has_been_posted);
+        ret = cur_ctx->msg->send_status.error_code;
+    }
+    else if (cur_ctx->msg->recv_status.error_code)
+    {
+        gossip_debug(GOSSIP_IO_DEBUG,
+                     "  error (%d) in msgpair recv for context %p\n",
+                     cur_ctx->msg->recv_status.error_code, cur_ctx);
+
+        assert(cur_ctx->msg_recv_has_been_posted);
+        ret = cur_ctx->msg->recv_status.error_code;
+    }
+    else if (cur_ctx->flow_status.error_code)
+    {
+        gossip_debug(GOSSIP_IO_DEBUG,
+                     "  error (%d) in flow for context %p\n",
+                     cur_ctx->flow_status.error_code, cur_ctx);
+
+        assert(cur_ctx->flow_has_been_posted);
+        PINT_flow_reset(&cur_ctx->flow_desc);
+        ret = cur_ctx->flow_status.error_code;
+    }
+    else if (io_type == PVFS_IO_READ)
+    {
+        gossip_debug(
+            GOSSIP_IO_DEBUG, "  %Ld bytes read from context %p\n",
+            Ld(cur_ctx->flow_desc.total_transfered), cur_ctx);
+
+        /* size for reads are reported in the flow */
+        *total_size += cur_ctx->flow_desc.total_transfered;
+
+        PINT_flow_reset(&cur_ctx->flow_desc);
+    }
+    else if (io_type == PVFS_IO_WRITE)
+    {
+        if (cur_ctx->write_ack.recv_status.error_code)
+        {
+            gossip_debug(
+                GOSSIP_IO_DEBUG,
+                "  error (%d) in final ack for context %p\n",
+                cur_ctx->write_ack.recv_status.error_code, cur_ctx);
+
+            assert(cur_ctx->write_ack_has_been_posted);
+            ret = cur_ctx->write_ack.recv_status.error_code;
+        }
+        else if (cur_ctx->write_ack_has_been_posted)
+        {
+            struct PINT_decoded_msg decoded_resp;
+            struct PVFS_server_resp *resp = NULL;
+            /*
+              size for writes are reported in the final ack, but we
+              have to decode it first
+            */
+            ret = PINT_serv_decode_resp(
+                cur_ctx->msg->fs_id, cur_ctx->write_ack.encoded_resp_p,
+                &decoded_resp, &cur_ctx->msg->svr_addr,
+                cur_ctx->write_ack.recv_status.actual_size, &resp);
+            if (ret == 0)
+            {
+                gossip_debug(
+                    GOSSIP_IO_DEBUG,
+                    "  %Ld bytes written to context %p\n",
+                    Ld(resp->u.write_completion.total_completed),
+                    cur_ctx);
+
+                *total_size += resp->u.write_completion.total_completed;
+
+                PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
+            }
+            else
+            {
+                PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
+            }
+
+            PINT_flow_reset(&cur_ctx->flow_desc);
+            BMI_memfree(cur_ctx->msg->svr_addr,
+                        cur_ctx->write_ack.encoded_resp_p,
+                        cur_ctx->write_ack.max_resp_sz, BMI_RECV);
+        }
+    }
+    return ret;
+}
+
+/*
+  determines what subset of the datafiles actually contain data that
+  we are interested in for this request. returns 0 on success,
+  -PVFS_error on failure
+*/
+static int io_find_target_datafiles(
+    PVFS_Request mem_req,
+    PVFS_Request file_req,
+    PVFS_offset file_req_offset,
+    PINT_dist *dist_p,
+    PVFS_handle *input_handle_array,
+    int input_handle_count,
+    int *handle_index_array,
+    int *handle_index_out_count)
+{
+    int ret = -PVFS_EINVAL, i = 0;
     struct PINT_Request_state *req_state = NULL;
     struct PINT_Request_state *mem_req_state = NULL;
     PINT_Request_file_data tmp_file_data;
     PINT_Request_result tmp_result;
 
-    *handle_count_out_p = 0;
+    gossip_debug(GOSSIP_IO_DEBUG, "- io_find_target_datafiles called\n");
+
+    if (!handle_index_array || !handle_index_out_count)
+    {
+        return ret;
+    }
+    *handle_index_out_count = 0;
 
     req_state = PINT_New_request_state(file_req);
     if (!req_state)
@@ -1358,18 +1557,16 @@ static int io_find_target_datafiles(PVFS
             return ret;
         }
 
-        /* did we find that any data belongs to this handle? */
+        /* check if we found data that belongs to this handle */
         if (tmp_result.bytes != 0)
         {
             assert(tmp_result.bytes > 0);
 
-            gossip_debug(GOSSIP_IO_DEBUG, "io_find_target_dfiles: "
-                         "datafile %d might have some data.\n", i);
+            handle_index_array[(*handle_index_out_count)++] = i;
 
-            output_handle_array[*handle_count_out_p] =
-                input_handle_array[i]; 
-            handle_index_array[*handle_count_out_p] = i;
-            (*handle_count_out_p)++;
+            gossip_debug(GOSSIP_IO_DEBUG, "io_find_target_dfiles: "
+                         "datafile %d might have some data (out_count "
+                         "is %d).\n", i, *handle_index_out_count);
         }
     }
     PINT_Free_request_state(req_state);

Index: sys-lookup.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-lookup.sm,v
diff -p -u -r1.36 -r1.37
--- sys-lookup.sm	21 May 2004 17:22:38 -0000	1.36
+++ sys-lookup.sm	8 Jul 2004 16:17:06 -0000	1.37
@@ -1159,7 +1159,7 @@ static int lookup_segment_getattr_comp_f
         gossip_debug(GOSSIP_CLIENT_DEBUG, " getattr failed with code %d\n",
                      resp_p->status);
         return resp_p->status;
-    }            
+    }
 
     cur_seg = GET_CURRENT_SEGMENT();
     assert(cur_seg);
@@ -1171,7 +1171,6 @@ static int lookup_segment_getattr_comp_f
     PINT_acache_object_attr_deep_copy(
         &(cur_seg->seg_attr),
         &(resp_p->u.getattr.attr));
-
     return 0;
 }
 

Index: sys-setattr.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-setattr.sm,v
diff -p -u -r1.17 -r1.18
--- sys-setattr.sm	21 May 2004 18:52:07 -0000	1.17
+++ sys-setattr.sm	8 Jul 2004 16:17:06 -0000	1.18
@@ -217,6 +217,9 @@ static int setattr_msg_setup_msgpair(PIN
     msg_p = &sm_p->msgpair;
     memset(msg_p, 0, sizeof(PINT_client_sm_msgpair_state));
 
+    sm_p->msgarray = msg_p;
+    sm_p->msgarray_count = 1;
+
     /*
       if this lookup wasn't a acache hit, we must have a local
       copy stored in the sm object for later acache insertion
@@ -270,11 +273,6 @@ static int setattr_msg_setup_msgpair(PIN
         gossip_err("Failed to map meta server address\n");
         js_p->error_code = ret;
     }
-
-    /* let 'msgpairarray' handle the 'msgpair' case */
-    sm_p->msgarray = msg_p;
-    sm_p->msgarray_count = 1;
-
     return 1;
 }
 

Index: sys-symlink.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-symlink.sm,v
diff -p -u -r1.29 -r1.30
--- sys-symlink.sm	21 May 2004 18:52:07 -0000	1.29
+++ sys-symlink.sm	8 Jul 2004 16:17:07 -0000	1.30
@@ -215,7 +215,7 @@ int PVFS_isys_symlink(
         return -PVFS_ENAMETOOLONG;
     }
 
-    sm_p = (PINT_client_sm *) malloc(sizeof(*sm_p));
+    sm_p = (PINT_client_sm *)malloc(sizeof(*sm_p));
     if (sm_p == NULL)
     {
         return -PVFS_ENOMEM;



More information about the PVFS2-CVS mailing list