[PVFS2-CVS]
commit by neill in pvfs2/src/client/sysint: acache.c acache.h
client-state-machine.c client-state-machine.h finalize.c fs-add.c
getattr-acache.sm initialize.c msgpairarray.sm ncache.c ncache.h
pint-bucket.c shared-state-methods.c shared-state-methods.h
sys-create.sm sys-getattr.sm sys-io.sm sys-lookup.sm sys-setattr.sm
sys-symlink.sm
CVS commit program
cvs at parl.clemson.edu
Thu Jul 8 13:17:07 EDT 2004
Update of /projects/cvsroot/pvfs2/src/client/sysint
In directory parlweb:/tmp/cvs-serv12211/src/client/sysint
Modified Files:
acache.c acache.h client-state-machine.c
client-state-machine.h finalize.c fs-add.c getattr-acache.sm
initialize.c msgpairarray.sm ncache.c ncache.h pint-bucket.c
shared-state-methods.c shared-state-methods.h sys-create.sm
sys-getattr.sm sys-io.sm sys-lookup.sm sys-setattr.sm
sys-symlink.sm
Log Message:
- merging in the pvfs2-nm-nb-branch with the main tree
see ChangeLog for details, or browse the cvs history of the branch
for full details
Index: acache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/acache.c,v
diff -p -u -r1.7 -r1.8
--- acache.c 17 May 2004 15:57:32 -0000 1.7
+++ acache.c 8 Jul 2004 16:17:06 -0000 1.8
@@ -18,13 +18,13 @@
#include "acache.h"
#include "quickhash.h"
-/* uncomment the following for verbose acache debugging */
-/* #define VERBOSE_ACACHE_DEBUG */
+/* comment out the following for non-verbose acache debugging */
+#define VERBOSE_ACACHE_DEBUG
/*
- uncomment the following for an experimental pinode
- cleanup mechanism for trying to bound the number of
- pinode entries in the acache at any given time
+ comment out the following for an experimental pinode cleanup
+ mechanism for trying to bound the number of pinode entries in the
+ acache at any given time
*/
#define PINT_ACACHE_AUTO_CLEANUP
@@ -35,19 +35,13 @@
#define acache_debug(...)
#endif
-/*
- what we could do is disable the use of the acache
- if locks are not available on the system, but for now...
-*/
-#ifndef __GEN_POSIX_LOCKING__
-#error "Cannot use acache without functioning mutex locking"
-#endif
-
static struct qhash_table *s_acache_htable = NULL;
static gen_mutex_t *s_acache_htable_mutex = NULL;
+static gen_mutex_t s_acache_interface_mutex = GEN_MUTEX_INITIALIZER;
+
static int s_acache_initialized = 0;
-static int s_acache_timeout_ms = (PINT_ACACHE_TIMEOUT * 1000);
+static int s_acache_timeout_ms = PINT_ACACHE_TIMEOUT_MS;
static int s_acache_allocated_entries = 0;
/* static internal helper methods */
@@ -58,7 +52,8 @@ static void pinode_free(PINT_pinode *pin
static int pinode_status(PINT_pinode *pinode);
static void pinode_update_timestamp(PINT_pinode **pinode);
static void pinode_invalidate(PINT_pinode *pinode);
-
+static void acache_internal_release(PINT_pinode *pinode);
+static PINT_pinode *acache_internal_lookup(PVFS_object_ref refn);
#ifdef PINT_ACACHE_AUTO_CLEANUP
static void reclaim_pinode_entries(void);
#endif
@@ -76,9 +71,12 @@ int PINT_acache_initialize()
acache_debug("PINT_acache_initialize entered\n");
+ gen_mutex_lock(&s_acache_interface_mutex);
+
s_acache_htable_mutex = gen_mutex_build();
if (!s_acache_htable_mutex)
{
+ gen_mutex_unlock(&s_acache_interface_mutex);
return ret;
}
@@ -91,11 +89,13 @@ int PINT_acache_initialize()
goto error_exit;
}
- acache_debug("PINT_acache_initialize exiting\n");
s_acache_initialized = 1;
+ gen_mutex_unlock(&s_acache_interface_mutex);
+ acache_debug("PINT_acache_initialize exiting\n");
return 0;
error_exit:
+ gen_mutex_unlock(&s_acache_interface_mutex);
acache_debug("PINT_acache_initialize error exiting\n");
PINT_acache_finalize();
return ret;
@@ -108,6 +108,9 @@ void PINT_acache_finalize()
struct qlist_head *link = NULL, *tmp = NULL;
acache_debug("PINT_acache_finalize entered\n");
+
+ gen_mutex_lock(&s_acache_interface_mutex);
+
if (s_acache_htable_mutex && s_acache_htable)
{
gen_mutex_lock(s_acache_htable_mutex);
@@ -139,10 +142,42 @@ void PINT_acache_finalize()
s_acache_htable = NULL;
s_acache_htable_mutex = NULL;
+
+ gen_mutex_unlock(&s_acache_interface_mutex);
acache_debug("PINT_acache_finalize exiting\n");
}
/*
+ internal use only -- does a lookup without involving the pinode
+ locks or reference counts; always done with the interface lock and
+ without the htable mutex held
+*/
+static PINT_pinode *acache_internal_lookup(PVFS_object_ref refn)
+{
+ PINT_pinode *pinode = NULL;
+ struct qhash_head *link = NULL;
+
+ acache_debug("acache_internal_lookup entered\n");
+ assert(s_acache_initialized);
+
+ gen_mutex_lock(s_acache_htable_mutex);
+ link = qhash_search(s_acache_htable, &refn);
+ if (link)
+ {
+ pinode = qlist_entry(link, PINT_pinode, link);
+ assert(pinode);
+ }
+ gen_mutex_unlock(s_acache_htable_mutex);
+
+ if (pinode)
+ {
+ assert(pinode->flag = PINODE_INTERNAL_FLAG_HASHED);
+ }
+ acache_debug("acache_internal_lookup exiting\n");
+ return pinode;
+}
+
+/*
returns the pinode matching the specified reference if
present in the acache. returns NULL otherwise.
@@ -156,6 +191,8 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
struct qhash_head *link = NULL;
acache_debug("PINT_acache_lookup entered\n");
+
+ gen_mutex_lock(&s_acache_interface_mutex);
assert(s_acache_initialized);
gen_mutex_lock(s_acache_htable_mutex);
@@ -173,6 +210,7 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
assert(pinode->flag = PINODE_INTERNAL_FLAG_HASHED);
pinode->ref_cnt++;
}
+ gen_mutex_unlock(&s_acache_interface_mutex);
acache_debug("PINT_acache_lookup exiting\n");
return pinode;
}
@@ -180,6 +218,7 @@ PINT_pinode *PINT_acache_lookup(PVFS_obj
PINT_pinode *PINT_acache_pinode_alloc()
{
#ifdef PINT_ACACHE_AUTO_CLEANUP
+ gen_mutex_lock(&s_acache_interface_mutex);
/*
PINT_ACACHE_NUM_FLUSH_ENTRIES is a soft limit that triggers
an attempt to reclaim any expired or invalidated entries
@@ -190,6 +229,7 @@ PINT_pinode *PINT_acache_pinode_alloc()
{
reclaim_pinode_entries();
}
+ gen_mutex_unlock(&s_acache_interface_mutex);
#endif
return pinode_alloc();
}
@@ -210,6 +250,8 @@ void PINT_acache_set_valid(PINT_pinode *
{
acache_debug("PINT_acache_set_valid entered\n");
assert(s_acache_initialized);
+
+ gen_mutex_lock(&s_acache_interface_mutex);
if (pinode)
{
/* if we don't have the lock, acquire it */
@@ -236,6 +278,7 @@ void PINT_acache_set_valid(PINT_pinode *
pinode_update_timestamp(&pinode);
gen_mutex_unlock(pinode->mutex);
}
+ gen_mutex_unlock(&s_acache_interface_mutex);
acache_debug("PINT_acache_set_valid exiting\n");
}
@@ -247,18 +290,14 @@ void PINT_acache_invalidate(PVFS_object_
acache_debug("PINT_acache_invalidate entered\n");
assert(s_acache_initialized);
- pinode = PINT_acache_lookup(refn);
+ gen_mutex_lock(&s_acache_interface_mutex);
+ pinode = acache_internal_lookup(refn);
+ gen_mutex_unlock(&s_acache_interface_mutex);
if (pinode)
{
- /* drop the ref count we picked up in lookup */
- pinode->ref_cnt--;
-
/* forcefully expire the entry */
pinode->status = PINODE_STATUS_EXPIRED;
- /* we should have the lock at this point */
- gen_mutex_unlock(pinode->mutex);
-
PINT_acache_release(pinode);
}
acache_debug("PINT_acache_invalidate exiting\n");
@@ -266,18 +305,26 @@ void PINT_acache_invalidate(PVFS_object_
void PINT_acache_release_refn(PVFS_object_ref refn)
{
- PINT_pinode *pinode = PINT_acache_lookup(refn);
+ PINT_pinode *pinode = NULL;
+
+ acache_debug("PINT_acache_release_refn entered\n");
+ gen_mutex_lock(&s_acache_interface_mutex);
+ pinode = acache_internal_lookup(refn);
if (pinode)
{
- /* drop the ref count we picked up in lookup */
- pinode->ref_cnt--;
- PINT_acache_release(pinode);
+ acache_internal_release(pinode);
}
+ gen_mutex_unlock(&s_acache_interface_mutex);
+ acache_debug("PINT_acache_release_refn exited\n");
}
-void PINT_acache_release(PINT_pinode *pinode)
+/*
+ internal use only -- does a release without the interface lock and
+ without the htable mutex held
+*/
+static void acache_internal_release(PINT_pinode *pinode)
{
- acache_debug("PINT_acache_release entered\n");
+ acache_debug("acache_internal_release entered\n");
assert(s_acache_initialized);
if (pinode)
{
@@ -297,6 +344,18 @@ void PINT_acache_release(PINT_pinode *pi
gen_mutex_unlock(pinode->mutex);
}
}
+ acache_debug("acache_internal_release exited\n");
+}
+
+void PINT_acache_release(PINT_pinode *pinode)
+{
+ acache_debug("PINT_acache_release entered\n");
+ assert(s_acache_initialized);
+
+ gen_mutex_lock(&s_acache_interface_mutex);
+ acache_internal_release(pinode);
+ gen_mutex_unlock(&s_acache_interface_mutex);
+
acache_debug("PINT_acache_release exited\n");
}
@@ -539,6 +598,8 @@ static void pinode_free(PINT_pinode *pin
gen_mutex_destroy(pinode->mutex);
pinode->mutex = NULL;
}
+
+ PINT_acache_object_attr_deep_free(&pinode->attr);
free(pinode);
s_acache_allocated_entries--;
pinode = NULL;
@@ -570,7 +631,6 @@ static void pinode_invalidate(PINT_pinod
gen_mutex_unlock(s_acache_htable_mutex);
acache_debug("*** pinode_invalidate: removed from htable\n");
}
- PINT_acache_object_attr_deep_free(&pinode->attr);
gen_mutex_unlock(pinode->mutex);
pinode_free(pinode);
acache_debug("*** pinode_invalidate: freed pinode\n");
Index: acache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/acache.h,v
diff -p -u -r1.2 -r1.3
--- acache.h 24 Mar 2004 23:10:30 -0000 1.2
+++ acache.h 8 Jul 2004 16:17:06 -0000 1.3
@@ -19,11 +19,11 @@
================================
PINT_acache_initialize must be the first called method, and
- PINT_acache_finalize must be the last. The default lifespan
- of a valid acache entry is PINT_ACACHE_TIMEOUT seconds, but
- you can set the timeout (at millisecond granularity) at runtime
- by called PINT_acache_set_timeout. You can also retrieve the
- acache timeout at any time by calling PINT_acache_get_timeout.
+ PINT_acache_finalize must be the last. The default lifespan of a
+ valid acache entry is PINT_ACACHE_TIMEOUT_MS seconds, but you can
+ set the timeout (at millisecond granularity) at runtime by called
+ PINT_acache_set_timeout. You can also retrieve the acache timeout at
+ any time by calling PINT_acache_get_timeout.
How to use the acache in 5 steps of less:
-----------------------------------------
@@ -45,14 +45,14 @@
tips and tricks of the acache hacking gurus:
--------------------------------------------
update_timestamps internally bumps up the pinode reference count to
- make sure it stays in the pinode cache. on expiration, the ref count
- is dropped. these internal ref counts are separate from the user
- influenced ref counts (i.e. lookup, invalidate, release)
+ make sure it stays in the pinode cache. on expiration, the ref
+ count is dropped. these internal ref counts are separate from the
+ user influenced ref counts (i.e. lookup, invalidate, release)
- if you define PINT_ACACHE_AUTO_CLEANUP, the following applies:
- since the number of pinodes in existance at any time is unbounded,
- if the internal allocator sees that a multiple of
+ if you define PINT_ACACHE_AUTO_CLEANUP, the following applies: since
+ the number of pinodes in existance at any time is unbounded, if the
+ internal allocator sees that a multiple of
PINT_ACACHE_NUM_FLUSH_ENTRIES pinodes exist, we secretly try to
reclaim up to PINT_ACACHE_NUM_FLUSH_ENTRIES by scanning the htable
of used pinodes and freeing any expired, invalid, or pinodes that
@@ -61,7 +61,7 @@
this 'feature' is mostly untested and may not be beneficial.
*/
-#define PINT_ACACHE_TIMEOUT 5
+#define PINT_ACACHE_TIMEOUT_MS 5000
#define PINT_ACACHE_NUM_ENTRIES 1024
#define PINT_ACACHE_NUM_FLUSH_ENTRIES (PINT_ACACHE_NUM_ENTRIES / 4)
#define PINT_ACACHE_HTABLE_SIZE 511
Index: client-state-machine.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.c,v
diff -p -u -r1.50 -r1.51
--- client-state-machine.c 21 May 2004 17:22:37 -0000 1.50
+++ client-state-machine.c 8 Jul 2004 16:17:06 -0000 1.51
@@ -3,7 +3,6 @@
*
* See COPYING in top-level directory.
*/
-
#include <string.h>
#include <assert.h>
@@ -25,11 +24,171 @@
job_context_id pint_client_sm_context;
-static job_id_t job_id_array[MAX_RETURNED_JOBS];
-static void *client_sm_p_array[MAX_RETURNED_JOBS];
-static job_status_s job_status_array[MAX_RETURNED_JOBS];
-static int job_count = 0;
+static int got_context = 0;
+
+/*
+ used for locally storing completed operations from test() call so
+ that we can retrieve them in testsome() while still making progress
+ (and possible completing operations in the test() call
+*/
+static int s_completion_list_index = 0;
+static PINT_client_sm *s_completion_list[MAX_RETURNED_JOBS] = {NULL};
+static gen_mutex_t s_completion_list_mutex = GEN_MUTEX_INITIALIZER;
+
+#define CLIENT_SM_INIT_ONCE() \
+do { \
+ if (got_context == 0) \
+ { \
+ /* get a context for our state machine operations */ \
+ job_open_context(&pint_client_sm_context); \
+ got_context = 1; \
+ } \
+} while(0)
+
+#define CLIENT_SM_ASSERT_INITIALIZED() \
+do { assert(got_context); } while(0)
+
+static int add_sm_to_completion_list(PINT_client_sm *sm_p)
+{
+ gen_mutex_lock(&s_completion_list_mutex);
+ assert(s_completion_list_index < MAX_RETURNED_JOBS);
+ s_completion_list[s_completion_list_index++] = sm_p;
+ gen_mutex_unlock(&s_completion_list_mutex);
+ return 0;
+}
+
+static int conditional_remove_sm_if_in_completion_list(
+ PINT_client_sm *sm_p)
+{
+ int found = 0, i = 0;
+
+ gen_mutex_lock(&s_completion_list_mutex);
+ for(i = 0; i < s_completion_list_index; i++)
+ {
+ if (s_completion_list[i] == sm_p)
+ {
+ s_completion_list[i] = NULL;
+ found = 1;
+ break;
+ }
+ }
+ gen_mutex_unlock(&s_completion_list_mutex);
+ return found;
+}
+
+static int completion_list_retrieve_completed(
+ PVFS_sys_op_id *op_id_array,
+ void **user_ptr_array,
+ int *error_code_array,
+ int limit,
+ int *out_count)
+{
+ int i = 0, new_list_index = 0;
+ PINT_client_sm *sm_p = NULL;
+ PINT_client_sm *tmp_completion_list[MAX_RETURNED_JOBS] = {NULL};
+
+ assert(op_id_array);
+ assert(error_code_array);
+ assert(out_count);
+
+ memset(tmp_completion_list, 0,
+ (MAX_RETURNED_JOBS * sizeof(PINT_client_sm *)));
+
+ gen_mutex_lock(&s_completion_list_mutex);
+ for(i = 0; i < s_completion_list_index; i++)
+ {
+ if (s_completion_list[i] == NULL)
+ {
+ continue;
+ }
+
+ sm_p = s_completion_list[i];
+ assert(sm_p);
+
+ if (i < limit)
+ {
+ op_id_array[i] = sm_p->sys_op_id;
+ error_code_array[i] = sm_p->error_code;
+
+ if (user_ptr_array)
+ {
+ user_ptr_array[i] = (void *)sm_p->user_ptr;
+ }
+ s_completion_list[i] = NULL;
+
+ PINT_sys_release(sm_p->sys_op_id);
+ }
+ else
+ {
+ tmp_completion_list[new_list_index++] = sm_p;
+ }
+ }
+ *out_count = i;
+
+ /* clean up and adjust the list and it's book keeping */
+ s_completion_list_index = new_list_index;
+ memcpy(s_completion_list, tmp_completion_list,
+ (MAX_RETURNED_JOBS * sizeof(PINT_client_sm *)));
+
+ gen_mutex_unlock(&s_completion_list_mutex);
+ return 0;
+}
+
+static inline int cancelled_io_jobs_are_pending(PINT_client_sm *sm_p)
+{
+ /*
+ NOTE: if the I/O cancellation has properly completed, the
+ cancelled contextual jobs within that I/O operation will be
+ popping out of the testcontext calls (in our testsome() or
+ test()). to avoid passing out the same completed op mutliple
+ times, do not add the operation to the completion list until all
+ cancellations on the I/O operation are accounted for
+ */
+ assert(sm_p);
+
+ /*
+ this *can* possibly be 0 in the case that the I/O has already
+ completed and no job cancellation were issued at I/O cancel time
+ */
+ if (sm_p->u.io.total_cancellations_remaining > 0)
+ {
+ sm_p->u.io.total_cancellations_remaining--;
+ }
+
+ gossip_debug(
+ GOSSIP_IO_DEBUG, "(%p) cancelled_io_jobs_are_pending: %d "
+ "remaining (op %s)\n", sm_p,
+ sm_p->u.io.total_cancellations_remaining,
+ (sm_p->op_complete ? "complete" : "NOT complete"));
+
+ return (sm_p->u.io.total_cancellations_remaining != 0);
+}
+
+/*
+ NOTE: important usage notes regarding post(), test(), and testsome()
+ thread safety: test() and testsome() can be called in any order by
+ the same thread. if you need to call test() and testsome()
+ simultaneously from different threads, you need to serialize the
+ calls yourself.
+
+ calling semantics: the non-blocking calls (i.e. PVFS_isys_* or
+ PVFS_imgmt_* calls) allocate the state machine pointer (sm_p) used
+ for each operation. the blocking calls DO NOT allocate this, but
+ call the non-blocking method (which does allocate it) and waits for
+ completion. On completion, the blocking call frees the state
+ machine pointer (via PINT_sys_release). the blocking calls only
+ ever call the test() function, which does not free the state machine
+ pointer on completion.
+
+ the testsome() function frees the state machine pointers allocated
+ from the non-blocking calls on completion because any caller of
+ testsome() *should* be using the non-blocking calls with it. this
+ means that if you are calling test() with a non-blocking operation
+ that you manually issued (with a PVFS_isys* or PVFS_imgmt* call),
+ you need to call PVFS_sys_release on your own when the operation
+ completes.
+*/
int PINT_client_state_machine_post(
PINT_client_sm *sm_p,
int pvfs_sys_op,
@@ -38,19 +197,20 @@ int PINT_client_state_machine_post(
{
int ret = -PVFS_EINVAL;
job_status_s js;
- static int got_context = 0;
+
+#if 0
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "PINT_client_state_machine_post called\n");
+#endif
+
+ CLIENT_SM_INIT_ONCE();
if (!sm_p)
{
return ret;
}
- if (got_context == 0)
- {
- /* get a context for our state machine operations */
- job_open_context(&pint_client_sm_context);
- got_context = 1;
- }
+ memset(&js, 0, sizeof(js));
/* save operation type; mark operation as unfinished */
sm_p->user_ptr = user_ptr;
@@ -143,68 +303,227 @@ int PINT_client_state_machine_post(
sm_p->current_state =
(pvfs2_client_job_timer_sm.state_machine + 1);
break;
+ case PVFS_DEV_UNEXPECTED:
+ gossip_err("You should be using PINT_sys_dev_unexp for "
+ "posting this type of operation! Failing.\n");
+ return ret;
default:
gossip_lerr("FIXME: Unrecognized sysint operation!\n");
return ret;
}
- /* clear job status structure */
- memset(&js, 0, sizeof(js));
+ if (op_id)
+ {
+ ret = PINT_id_gen_safe_register(op_id, (void *)sm_p);
+ sm_p->sys_op_id = *op_id;
+ }
- /* call function, continue calling as long as we get immediate
- * success.
- */
+ /*
+ start state machine and continue advancing while we're getting
+ immediate completions
+ */
ret = sm_p->current_state->state_action(sm_p, &js);
- while (ret == 1)
+ while(ret == 1)
{
- /* PINT_state_machine_next() calls next function and
- * returns the result.
- */
- ret = PINT_state_machine_next(sm_p, &js);
+ ret = PINT_state_machine_next(sm_p, &js);
}
- /* note: job_status_s pointed to by js_p is ok to use after
- * we return regardless of whether or not we finished.
- */
+ if (sm_p->op_complete)
+ {
+ ret = add_sm_to_completion_list(sm_p);
+ assert(ret == 0);
+ }
+ return ret;
+}
- if (op_id)
+int PINT_sys_dev_unexp(
+ struct PINT_dev_unexp_info *info,
+ job_status_s *jstat,
+ PVFS_sys_op_id *op_id,
+ void *user_ptr)
+{
+ int ret = -PVFS_EINVAL;
+ job_id_t id;
+ PINT_client_sm *sm_p = NULL;
+
+ CLIENT_SM_INIT_ONCE();
+
+ /* we require more input args than the regular post method above */
+ if (!info || !jstat || !op_id)
+ {
+ return -PVFS_EINVAL;
+ }
+
+ sm_p = (PINT_client_sm *)malloc(sizeof(PINT_client_sm));
+ if (!sm_p)
+ {
+ return -PVFS_ENOMEM;
+ }
+ memset(sm_p, 0, sizeof(PINT_client_sm));
+ sm_p->user_ptr = user_ptr;
+ sm_p->op = PVFS_DEV_UNEXPECTED;
+ sm_p->op_complete = 0;
+ sm_p->cred_p = NULL;
+
+ memset(jstat, 0, sizeof(job_status_s));
+ ret = job_dev_unexp(info, (void *)sm_p, 0, jstat, &id,
+ JOB_NO_IMMED_COMPLETE, pint_client_sm_context);
+ if (ret)
+ {
+ PVFS_perror_gossip("PINT_sys_dev_unexp failed", ret);
+ free(sm_p);
+ }
+ else
{
ret = PINT_id_gen_safe_register(op_id, (void *)sm_p);
+ sm_p->sys_op_id = *op_id;
}
return ret;
}
/* PINT_client_bmi_cancel()
*
- * wrapper function for job_bmi_cancel that handles race conditions of
- * jobs that have completed in the job_testcontext() loop but have not
- * yet triggered a state transition
+ * wrapper function for job_bmi_cancel
*
* returns 0 on success, -PVFS_error on failure
*/
int PINT_client_bmi_cancel(job_id_t id)
{
- int i;
+ return job_bmi_cancel(id, pint_client_sm_context);
+}
+
+/* PINT_client_io_cancel()
+ *
+ * cancels in progress I/O operations
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+int PINT_client_io_cancel(PVFS_sys_op_id id)
+{
+ int ret = -PVFS_EINVAL, i = 0;
+ PINT_client_sm *sm_p = NULL;
- /* TODO: this is not thread safe */
- for(i=0; i<job_count; i++)
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "PINT_client_io_cancel called\n");
+
+ sm_p = PINT_id_gen_safe_lookup(id);
+ if (!sm_p)
{
- if(job_id_array[i] == id)
- {
- /* job has already completed; do nothing */
- return(0);
- }
+ /* if we can't find it, it may have already completed */
+ return 0;
+ }
+
+ /* we can't cancel any arbitrary operation */
+ assert(sm_p->op == PVFS_SYS_IO);
+
+ if (sm_p->op_complete)
+ {
+ /* op already completed; nothing to cancel. */
+ return 0;
}
- return(job_bmi_cancel(id, pint_client_sm_context));
+ /* if we fall to here, the I/O operation is still in flight */
+ /* first, set a flag informing the sys_io state machine that the
+ * operation has been cancelled so it doesn't post any new jobs
+ */
+ sm_p->op_cancelled = 1;
+
+ /*
+ don't return an error if nothing is cancelled, because
+ everything may have completed already
+ */
+ ret = 0;
+
+ /* now run through and cancel the outstanding jobs */
+ for(i = 0; i < sm_p->u.io.datafile_count; i++)
+ {
+ PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+ assert(cur_ctx);
+
+ if (cur_ctx->msg_send_in_progress)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, "[%d] Posting "
+ "cancellation of type: BMI Send "
+ "(Request)\n",i);
+
+ ret = job_bmi_cancel(cur_ctx->msg->send_id,
+ pint_client_sm_context);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("job_bmi_cancel failed", ret);
+ break;
+ }
+ sm_p->u.io.total_cancellations_remaining++;
+ }
+
+ if (cur_ctx->msg_recv_in_progress)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, "[%d] Posting "
+ "cancellation of type: BMI Recv "
+ "(Response)\n",i);
+
+ ret = job_bmi_cancel(cur_ctx->msg->recv_id,
+ pint_client_sm_context);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("job_bmi_cancel failed", ret);
+ break;
+ }
+ sm_p->u.io.total_cancellations_remaining++;
+ }
+
+ if (cur_ctx->flow_in_progress)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "[%d] Posting cancellation of type: FLOW\n",i);
+
+ ret = job_flow_cancel(
+ cur_ctx->flow_job_id, pint_client_sm_context);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("job_flow_cancel failed", ret);
+ break;
+ }
+ sm_p->u.io.total_cancellations_remaining++;
+ }
+
+ if (cur_ctx->write_ack_in_progress)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, "[%d] Posting "
+ "cancellation of type: BMI Recv "
+ "(Write Ack)\n",i);
+
+ ret = job_bmi_cancel(cur_ctx->write_ack.recv_id,
+ pint_client_sm_context);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("job_bmi_cancel failed", ret);
+ break;
+ }
+ sm_p->u.io.total_cancellations_remaining++;
+ }
+ }
+ gossip_debug(GOSSIP_IO_DEBUG, "(%p) Total cancellations "
+ "remaining: %d\n", sm_p,
+ sm_p->u.io.total_cancellations_remaining);
+ return ret;
}
int PINT_client_state_machine_test(
PVFS_sys_op_id op_id,
int *error_code)
{
- int ret = -PVFS_EINVAL, i = 0;
+ int ret = -PVFS_EINVAL, i = 0, job_count = 0;
PINT_client_sm *sm_p, *tmp_sm_p = NULL;
+ job_id_t job_id_array[MAX_RETURNED_JOBS];
+ job_status_s job_status_array[MAX_RETURNED_JOBS];
+ void *client_sm_p_array[MAX_RETURNED_JOBS] = {NULL};
+
+#if 0
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "PINT_client_state_machine_test called\n");
+#endif
+
+ CLIENT_SM_ASSERT_INITIALIZED();
job_count = MAX_RETURNED_JOBS;
@@ -218,20 +537,19 @@ int PINT_client_state_machine_test(
{
return ret;
}
- assert(sm_p);
if (sm_p->op_complete)
{
*error_code = sm_p->error_code;
+ conditional_remove_sm_if_in_completion_list(sm_p);
return 0;
}
- /* TODO: this isn't thread safe... */
ret = job_testcontext(job_id_array,
&job_count, /* in/out parameter */
client_sm_p_array,
job_status_array,
- 100, /* timeout? */
+ 10,
pint_client_sm_context);
assert(ret > -1);
@@ -239,47 +557,93 @@ int PINT_client_state_machine_test(
for(i = 0; i < job_count; i++)
{
tmp_sm_p = (PINT_client_sm *)client_sm_p_array[i];
+ assert(tmp_sm_p);
- do
+ if (tmp_sm_p->op == PVFS_DEV_UNEXPECTED)
{
- ret = PINT_state_machine_next(tmp_sm_p, &job_status_array[i]);
+ tmp_sm_p->op_complete = 1;
+ }
- } while (ret == 1);
+ if (!tmp_sm_p->op_complete)
+ {
+ do
+ {
+ ret = PINT_state_machine_next(
+ tmp_sm_p, &job_status_array[i]);
- /* (ret < 0) indicates a problem from the job system
- * itself; the return value of the underlying operation
- * is kept in the job status structure.
- */
- assert(ret == 0);
+ } while (ret == 1);
+
+ assert(ret == 0);
+ }
+
+ /* make sure we don't return internally cancelled I/O jobs */
+ if ((tmp_sm_p->op == PVFS_SYS_IO) && (tmp_sm_p->op_cancelled) &&
+ (cancelled_io_jobs_are_pending(tmp_sm_p)))
+ {
+ continue;
+ }
+
+ /*
+ if we've found a completed operation and it's NOT the op
+ being tested here, we add it to our local completion list so
+ that later calls to the sysint test/testsome can find it
+ */
+ if ((tmp_sm_p != sm_p) && (tmp_sm_p->op_complete == 1))
+ {
+ ret = add_sm_to_completion_list(tmp_sm_p);
+ assert(ret == 0);
+ }
}
if (sm_p->op_complete)
{
*error_code = sm_p->error_code;
+ conditional_remove_sm_if_in_completion_list(sm_p);
}
return 0;
}
-int PINT_client_state_machine_testsome(PVFS_sys_op_id *op_id_array,
- int *op_count) /* in/out */
+int PINT_client_state_machine_testsome(
+ PVFS_sys_op_id *op_id_array,
+ int *op_count, /* in/out */
+ void **user_ptr_array,
+ int *error_code_array,
+ int timeout_ms)
{
- int ret = -PVFS_EINVAL, i = 0, count = 0;
- int limit = (*op_count - 1);
+ int ret = -PVFS_EINVAL, i = 0;
+ int limit = 0, job_count = 0;
PINT_client_sm *sm_p = NULL;
+ job_id_t job_id_array[MAX_RETURNED_JOBS];
+ job_status_s job_status_array[MAX_RETURNED_JOBS];
+ void *client_sm_p_array[MAX_RETURNED_JOBS] = {NULL};
- job_count = MAX_RETURNED_JOBS;
+#if 0
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "PINT_client_state_machine_testsome called\n");
+#endif
+
+ CLIENT_SM_ASSERT_INITIALIZED();
+
+ if (!op_id_array || !op_count || !error_code_array)
+ {
+ PVFS_perror_gossip("PINT_client_state_machine_testsome", ret);
+ return ret;
+ }
- if (!op_id_array || !op_count)
+ if ((*op_count < 1) || (*op_count > MAX_RETURNED_JOBS))
{
+ PVFS_perror_gossip("testsome() got invalid op_count", ret);
return ret;
}
- /* TODO: this isn't thread safe... */
+ job_count = MAX_RETURNED_JOBS;
+ limit = *op_count;
+
ret = job_testcontext(job_id_array,
&job_count, /* in/out parameter */
client_sm_p_array,
job_status_array,
- 100, /* timeout? */
+ timeout_ms,
pint_client_sm_context);
assert(ret > -1);
@@ -287,32 +651,56 @@ int PINT_client_state_machine_testsome(P
for(i = 0; i < job_count; i++)
{
sm_p = (PINT_client_sm *)client_sm_p_array[i];
+ assert(sm_p);
- do
+ /*
+ note that dev unexp messages found here are treated as
+ complete since if we see them at all in here, they're ready
+ to be passed back to the caller
+ */
+ if (sm_p->op == PVFS_DEV_UNEXPECTED)
+ {
+ sm_p->op_complete = 1;
+ }
+
+ if (!sm_p->op_complete)
{
- ret = PINT_state_machine_next(sm_p, &job_status_array[i]);
+ do
+ {
+ ret = PINT_state_machine_next(
+ sm_p, &job_status_array[i]);
- } while (ret == 1);
+ } while (ret == 1);
- /* (ret < 0) indicates a problem from the job system
- * itself; the return value of the underlying operation
- * is kept in the job status structure.
- */
- assert(ret == 0);
+ /* (ret < 0) indicates a problem from the job system
+ * itself; the return value of the underlying operation is
+ * kept in the job status structure.
+ */
+ assert(ret == 0);
+ }
+
+ /* make sure we don't return internally cancelled I/O jobs */
+ if ((sm_p->op == PVFS_SYS_IO) && (sm_p->op_cancelled) &&
+ (cancelled_io_jobs_are_pending(sm_p)))
+ {
+ continue;
+ }
+ /*
+ by adding the completed op to our completion list, we can
+ keep progressing on operations in progress here and just
+ grab all completed operations when we're finished
+ (i.e. outside of this loop).
+ */
if (sm_p->op_complete)
{
- op_id_array[count++] = sm_p->sys_op_id;
- if (count > limit)
- {
- break;
- }
+ ret = add_sm_to_completion_list(sm_p);
+ assert(ret == 0);
}
}
- *op_count = count;
-
- return 0;
+ return completion_list_retrieve_completed(
+ op_id_array, user_ptr_array, error_code_array, limit, op_count);
}
int PINT_client_wait_internal(
@@ -357,8 +745,16 @@ void PINT_sys_release(PVFS_sys_op_id op_
{
PINT_id_gen_safe_unregister(op_id);
- assert(sm_p->cred_p);
- PVFS_util_release_credentials(sm_p->cred_p);
+ if (sm_p->op && sm_p->cred_p)
+ {
+ PVFS_util_release_credentials(sm_p->cred_p);
+ sm_p->cred_p = NULL;
+ }
+
+ if (sm_p->acache_hit)
+ {
+ PINT_acache_release(sm_p->pinode);
+ }
free(sm_p);
}
}
@@ -413,20 +809,19 @@ int PINT_serv_free_msgpair_resources(
PVFS_BMI_addr_t *svr_addr_p,
int max_resp_sz)
{
- PINT_encode_release(encoded_req_p,
- PINT_ENCODE_REQ);
+ int ret = -PVFS_EINVAL;
- /* sm_p->req doesn't go anywhere; we'll use it again. */
+ if (encoded_req_p && decoded_resp_p && svr_addr_p)
+ {
+ PINT_encode_release(encoded_req_p, PINT_ENCODE_REQ);
- PINT_decode_release(decoded_resp_p,
- PINT_DECODE_RESP);
+ PINT_decode_release(decoded_resp_p, PINT_DECODE_RESP);
- BMI_memfree(*svr_addr_p,
- encoded_resp_p,
- max_resp_sz,
- BMI_RECV);
+ BMI_memfree(*svr_addr_p, encoded_resp_p, max_resp_sz, BMI_RECV);
- return 0;
+ ret = 0;
+ }
+ return ret;
}
/* PINT_serv_msgpair_array_resolve_addrs()
Index: client-state-machine.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/client-state-machine.h,v
diff -p -u -r1.113 -r1.114
--- client-state-machine.h 21 May 2004 18:52:06 -0000 1.113
+++ client-state-machine.h 8 Jul 2004 16:17:06 -0000 1.114
@@ -7,10 +7,10 @@
#ifndef __PVFS2_CLIENT_STATE_MACHINE_H
#define __PVFS2_CLIENT_STATE_MACHINE_H
-/* NOTE: STATE-MACHINE.H IS INCLUDED AT THE BOTTOM! THIS IS SO WE CAN
- * DEFINE ALL THE STRUCTURES WE NEED BEFORE WE INCLUDE IT.
- */
-
+/*
+ NOTE: state-machine.h is included at the bottom so we can define all
+ the client-sm structures before it's included
+*/
#include "pvfs2-sysint.h"
#include "pvfs2-types.h"
#include "pvfs2-storage.h"
@@ -26,12 +26,16 @@
#define MAX_LOOKUP_SEGMENTS PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT
#define MAX_LOOKUP_CONTEXTS PVFS_REQ_LIMIT_MAX_SYMLINK_RESOLUTION_COUNT
-/* jobs that send or receive request messages will timeout if they do not
- * complete within PVFS2_CLIENT_JOB_TIMEOUT seconds; flows will timeout if
- * they go for more than PVFS2_CLIENT_JOB_TIMEOUT seconds without moving any
- * data.
+/*
+ TODO: the following constants should be run-time configurable
+ eventually
+*/
+
+/* jobs that send or receive request messages will timeout if they do
+ * not complete within PVFS2_CLIENT_JOB_TIMEOUT seconds; flows will
+ * timeout if they go for more than PVFS2_CLIENT_JOB_TIMEOUT seconds
+ * without moving any data.
*/
-/* TODO: this should be configurable at runtime somehow */
#define PVFS2_CLIENT_JOB_TIMEOUT 30
/* the maximum number of times to retry restartable client operations */
@@ -60,12 +64,10 @@ typedef struct PINT_client_sm_msgpair_st
/* don't use this -- internal msgpairarray use only */
int retry_count;
- /* comp_fn called after successful reception and decode of respone,
- * if the msgpair state machine is used for processing.
+ /* comp_fn called after successful reception and decode of
+ * respone, if the msgpair state machine is used for processing.
*/
- int (* comp_fn)(void *sm_p, /* actually (struct PINT_client_sm *) */
- struct PVFS_server_resp *resp_p,
- int i);
+ int (* comp_fn)(void *sm_p, struct PVFS_server_resp *resp_p, int i);
/* comp_ct used to keep up with number of operations remaining */
int comp_ct;
@@ -86,10 +88,17 @@ typedef struct PINT_client_sm_msgpair_st
/* send_status, recv_status used for error handling etc. */
job_status_s send_status, recv_status;
- /* op_status is the code returned from the server, if the operation
- * was actually processed (recv_status.error_code == 0)
+ /* op_status is the code returned from the server, if the
+ * operation was actually processed (recv_status.error_code == 0)
*/
PVFS_error op_status;
+
+ /*
+ used in the retry code path to know if we've already completed
+ or not (to avoid re-doing the work we've already done)
+ */
+ int complete;
+
} PINT_client_sm_msgpair_state;
/* PINT_client_sm_recv_state_s
@@ -106,7 +115,6 @@ typedef struct PINT_client_sm_recv_state
PVFS_error op_status;
} PINT_client_sm_recv_state;
-/* PINT_client_remove_sm */
struct PINT_client_remove_sm
{
char *object_name; /* input parameter */
@@ -114,155 +122,184 @@ struct PINT_client_remove_sm
int retry_count;
};
-/* PINT_client_create_sm */
-struct PINT_client_create_sm {
- char *object_name; /* input parameter */
- PVFS_sysresp_create *create_resp; /* in/out parameter*/
- PVFS_sys_attr sys_attr; /* input parameter */
- int num_data_files;
- PVFS_BMI_addr_t *data_server_addrs;
- PVFS_handle_extent_array *io_handle_extent_array;
- PVFS_handle metafile_handle;
- PVFS_handle *datafile_handles;
- PINT_dist *dist;
- int stored_error_code;
- int retry_count;
-};
-
-/* PINT_client_mkdir_sm */
-struct PINT_client_mkdir_sm {
- char *object_name; /* input parameter */
- PVFS_sysresp_mkdir *mkdir_resp; /* in/out parameter */
- PVFS_sys_attr sys_attr; /* input parameter */
- PVFS_handle metafile_handle;
- int stored_error_code;
- int retry_count;
-};
-
-/* PINT_client_symlink_sm */
-struct PINT_client_symlink_sm {
- PVFS_object_ref parent_ref; /* input parameter */
- char *link_name; /* input parameter */
- char *link_target; /* input parameter */
- PVFS_sysresp_symlink *sym_resp; /* in/out parameter*/
- PVFS_sys_attr sys_attr; /* input parameter */
- PVFS_handle symlink_handle;
- int stored_error_code;
- int retry_count;
-};
-
-/* PINT_client_getattr_sm */
-struct PINT_client_getattr_sm {
- PVFS_object_ref object_ref; /* input parameter */
- uint32_t attrmask; /* input parameter */
- int datafile_count; /* from object attribs */
- PVFS_handle *datafile_handles;
- PINT_dist *dist_p;
- uint32_t dist_size;
- PVFS_size *size_array; /* from datafile attribs */
+struct PINT_client_create_sm
+{
+ char *object_name; /* input parameter */
+ PVFS_sysresp_create *create_resp; /* in/out parameter*/
+ PVFS_sys_attr sys_attr; /* input parameter */
+
+ int retry_count;
+ int num_data_files;
+ int stored_error_code;
+
+ PINT_dist *dist;
+ PVFS_handle metafile_handle;
+ PVFS_handle *datafile_handles;
+ PVFS_BMI_addr_t *data_server_addrs;
+ PVFS_handle_extent_array *io_handle_extent_array;
+};
+
+struct PINT_client_mkdir_sm
+{
+ char *object_name; /* input parameter */
+ PVFS_sysresp_mkdir *mkdir_resp; /* in/out parameter */
+ PVFS_sys_attr sys_attr; /* input parameter */
+
+ int retry_count;
+ int stored_error_code;
+ PVFS_handle metafile_handle;
+};
+
+struct PINT_client_symlink_sm
+{
+ PVFS_object_ref parent_ref; /* input parameter */
+ char *link_name; /* input parameter */
+ char *link_target; /* input parameter */
+ PVFS_sysresp_symlink *sym_resp; /* in/out parameter*/
+ PVFS_sys_attr sys_attr; /* input parameter */
+
+ int retry_count;
+ int stored_error_code;
+ PVFS_handle symlink_handle;
+};
+
+struct PINT_client_getattr_sm
+{
+ PVFS_object_ref object_ref; /* input parameter */
+ uint32_t attrmask; /* input parameter */
+
+ PVFS_size *size_array; /* from datafile attribs */
PVFS_sysresp_getattr *getattr_resp_p; /* destination for output */
};
-/* PINT_client_setattr_sm */
struct PINT_client_setattr_sm
{
- PVFS_object_ref refn; /* input parameter */
- PVFS_sys_attr sys_attr; /* input parameter */
+ PVFS_object_ref refn; /* input parameter */
+ PVFS_sys_attr sys_attr; /* input parameter */
};
-/* PINT_client_io_sm
- *
- * Data specific to I/O operations on the client side.
- */
-struct PINT_client_io_sm {
+typedef struct
+{
+ /* the index of the current context (in the context array) */
+ int index;
+
+ /* the metafile's dfile server index we're communicating with */
+ int server_nr;
+
+ /* the data handle we're responsible for doing I/O on */
+ PVFS_handle data_handle;
+
+ /* a reference to the msgpair we're using for communication */
+ PINT_client_sm_msgpair_state *msg;
+
+ job_id_t flow_job_id;
+ job_status_s flow_status;
+ flow_descriptor flow_desc;
+ PVFS_msg_tag_t session_tag;
+
+ PINT_client_sm_recv_state write_ack;
+
+ /*
+ all *_has_been_posted fields are used at io_analyze_results time
+ to know if we should be checking for errors on particular fields
+ */
+ int msg_send_has_been_posted;
+ int msg_recv_has_been_posted;
+ int flow_has_been_posted;
+ int write_ack_has_been_posted;
+
+ /*
+ all *_in_progress fields are used at cancellation time to
+ determine what operations are currently in flight
+ */
+ int msg_send_in_progress;
+ int msg_recv_in_progress;
+ int flow_in_progress;
+ int write_ack_in_progress;
+
+} PINT_client_io_ctx;
+
+struct PINT_client_io_sm
+{
/* input parameters */
- enum PVFS_io_type io_type;
- PVFS_Request file_req;
- PVFS_offset file_req_offset;
- void *buffer;
- PVFS_Request mem_req;
- int stored_error_code;
- int retry_count;
-
- /* cached from object attributes */
- int orig_datafile_count;
- int datafile_count;
- PVFS_handle *datafile_handles;
- int *datafile_index_array;
- PINT_dist *dist_p;
- uint32_t dist_size;
-
- /* data regarding flows */
- int flow_comp_ct;
- flow_descriptor *flow_array;
- job_status_s *flow_status_array;
+ enum PVFS_io_type io_type;
+ PVFS_Request file_req;
+ PVFS_offset file_req_offset;
+ void *buffer;
+ PVFS_Request mem_req;
+
+ /* output parameter */
+ PVFS_sysresp_io *io_resp_p;
+
enum PVFS_flowproto_type flowproto_type;
enum PVFS_encoding_type encoding;
- /* session tags, used in all messages */
- PVFS_msg_tag_t *session_tag_array;
+ int datafile_count;
+ int *datafile_index_array;
- /* data regarding final acknowledgements (writes only) */
- int ack_comp_ct;
- PINT_client_sm_recv_state *ackarray;
+ int msgpair_completion_count;
+ int flow_completion_count;
+ int write_ack_completion_count;
- /* output parameter */
- PVFS_sysresp_io *io_resp_p;
+ PINT_client_io_ctx *contexts;
+
+ int total_cancellations_remaining;
+
+ int retry_count;
+ int stored_error_code;
};
-/* PINT_client_flush_sm */
-struct PINT_client_flush_sm {
+struct PINT_client_flush_sm
+{
};
-/* PINT_client_readdir_sm */
-struct PINT_client_readdir_sm {
- PVFS_object_ref object_ref; /* looked up */
- PVFS_ds_position pos_token; /* input parameter */
- int dirent_limit; /* input parameter */
- PVFS_sysresp_readdir *readdir_resp; /* in/out parameter*/
+struct PINT_client_readdir_sm
+{
+ PVFS_object_ref object_ref; /* looked up */
+ PVFS_ds_position pos_token; /* input parameter */
+ int dirent_limit; /* input parameter */
+ PVFS_sysresp_readdir *readdir_resp; /* in/out parameter*/
};
typedef struct
{
- char *seg_name;
- char *seg_remaining;
- PVFS_object_attr seg_attr;
- PVFS_object_ref seg_starting_refn;
- PVFS_object_ref seg_resolved_refn;
+ char *seg_name;
+ char *seg_remaining;
+ PVFS_object_attr seg_attr;
+ PVFS_object_ref seg_starting_refn;
+ PVFS_object_ref seg_resolved_refn;
} PINT_client_lookup_sm_segment;
typedef struct
{
- int total_segments;
- int current_segment;
+ int total_segments;
+ int current_segment;
PINT_client_lookup_sm_segment segments[MAX_LOOKUP_SEGMENTS];
- PVFS_object_ref ctx_starting_refn;
- PVFS_object_ref ctx_resolved_refn;
+ PVFS_object_ref ctx_starting_refn;
+ PVFS_object_ref ctx_resolved_refn;
} PINT_client_lookup_sm_ctx;
-/* PINT_client_lookup_sm */
struct PINT_client_lookup_sm
{
- char *orig_pathname;/* input parameter */
- PVFS_object_ref starting_refn; /* input parameter */
- PVFS_sysresp_lookup *lookup_resp; /* in/out parameter*/
- int follow_link; /* input parameter */
- int current_context;
- PINT_client_lookup_sm_ctx contexts[MAX_LOOKUP_CONTEXTS];
-};
-
-/* PINT_client_rename_sm */
-struct PINT_client_rename_sm {
- char *entries[2]; /* old/new entry names;
- input parameter */
- PVFS_object_ref parent_refns[2]; /* old/new parent pinode refns;
- input parameter */
- PVFS_object_ref refns[2]; /* old/new pinode ref */
- int rmdirent_index;
- int target_dirent_exists;
- PVFS_handle old_dirent_handle;
- int retry_count;
- int stored_error_code;
+ char *orig_pathname; /* input parameter */
+ PVFS_object_ref starting_refn; /* input parameter */
+ PVFS_sysresp_lookup *lookup_resp; /* in/out parameter*/
+ int follow_link; /* input parameter */
+ int current_context;
+ PINT_client_lookup_sm_ctx contexts[MAX_LOOKUP_CONTEXTS];
+};
+
+struct PINT_client_rename_sm
+{
+ char *entries[2]; /* old/new input entry names */
+ PVFS_object_ref parent_refns[2]; /* old/new input parent refns */
+
+ PVFS_object_ref refns[2]; /* old/new object refns */
+ int retry_count;
+ int stored_error_code;
+ int rmdirent_index;
+ int target_dirent_exists;
+ PVFS_handle old_dirent_handle;
};
struct PINT_client_mgmt_setparam_list_sm
@@ -280,31 +317,31 @@ struct PINT_client_mgmt_setparam_list_sm
struct PINT_client_mgmt_statfs_list_sm
{
PVFS_fs_id fs_id;
- struct PVFS_mgmt_server_stat* stat_array;
+ struct PVFS_mgmt_server_stat *stat_array;
int count;
- PVFS_id_gen_t* addr_array;
+ PVFS_id_gen_t *addr_array;
PVFS_error_details *details;
};
struct PINT_client_mgmt_perf_mon_list_sm
{
PVFS_fs_id fs_id;
- struct PVFS_mgmt_perf_stat** perf_matrix;
- uint64_t* end_time_ms_array;
+ struct PVFS_mgmt_perf_stat **perf_matrix;
+ uint64_t *end_time_ms_array;
int server_count;
int history_count;
- PVFS_id_gen_t* addr_array;
- uint32_t* next_id_array;
+ PVFS_id_gen_t *addr_array;
+ uint32_t *next_id_array;
PVFS_error_details *details;
};
struct PINT_client_mgmt_event_mon_list_sm
{
PVFS_fs_id fs_id;
- struct PVFS_mgmt_event** event_matrix;
+ struct PVFS_mgmt_event **event_matrix;
int server_count;
int event_count;
- PVFS_id_gen_t* addr_array;
+ PVFS_id_gen_t *addr_array;
PVFS_error_details *details;
};
@@ -312,10 +349,10 @@ struct PINT_client_mgmt_iterate_handles_
{
PVFS_fs_id fs_id;
int server_count;
- PVFS_id_gen_t* addr_array;
- PVFS_handle** handle_matrix;
- int* handle_count_array;
- PVFS_ds_position* position_array;
+ PVFS_id_gen_t *addr_array;
+ PVFS_handle **handle_matrix;
+ int *handle_count_array;
+ PVFS_ds_position *position_array;
PVFS_error_details *details;
};
@@ -325,56 +362,75 @@ struct PINT_client_mgmt_get_dfile_array_
int dfile_count;
};
-struct PINT_client_truncate_sm {
- PVFS_size size; /* new logical size of object*/
+struct PINT_client_truncate_sm
+{
+ PVFS_size size; /* new logical size of object*/
};
-struct PINT_server_get_config_sm {
- struct PVFS_sys_mntent* mntent;
+struct PINT_server_get_config_sm
+{
+ struct PVFS_sys_mntent *mntent;
char *fs_config_buf;
- uint32_t fs_config_buf_size;
char *server_config_buf;
+ uint32_t fs_config_buf_size;
uint32_t server_config_buf_size;
int persist_config_buffers;
};
-typedef struct PINT_client_sm {
- /* STATE MACHINE VALUES */
- int stackptr; /* stack of contexts for nested state machines */
- union PINT_state_array_values *current_state; /* xxx */
+typedef struct PINT_client_sm
+{
+ /*
+ internal state machine values; the stack is used for tracking
+ movement through nested state machines
+ */
+ int stackptr;
+ union PINT_state_array_values *current_state;
union PINT_state_array_values *state_stack[PINT_STATE_STACK_SIZE];
- int op; /* holds pvfs system operation type, defined up above */
+ /* the system interface operation type (defined below) */
+ int op;
+
+ /* indicates when the operation as a whole is finished */
+ int op_complete;
+
+ /* indicates when the operation has been cancelled */
+ int op_cancelled;
+
+ /* stores the final operation error code on operation exit */
+ PVFS_error error_code;
- /* CLIENT SM VALUES */
- int op_complete; /* used to indicate that the operation as a
- * whole is finished.
- */
- PVFS_error error_code; /* used to hold final job status so client
- * can determine what finally happened
- */
int comp_ct; /* used to keep up with completion of multiple
* jobs for some states; typically set and
* then decremented to zero as jobs complete */
- int ncache_hit; /* set if last segment lookup was from ncache */
- int acache_hit; /* set if pinode was from acache */
- PINT_pinode *pinode; /* filled in on acache hit */
- PVFS_object_attr acache_attr; /* a scratch attr space */
+ /* indicates that an ncache hit has been made */
+ int ncache_hit;
+
+ /* indicates that an acache hit has been made */
+ int acache_hit;
+
+ /* on acache hit, the attribute here can be used */
+ PINT_pinode *pinode;
+
+ /*
+ on acache miss, an attribute is typically stored here for later
+ insertion into the acache (if possible)
+ */
+ PVFS_object_attr acache_attr;
/* generic msgpair used with msgpair substate */
PINT_client_sm_msgpair_state msgpair;
- /* msgpair array ptr used when operations can be performed concurrently.
- * obviously this has to be allocated within the upper-level state
- * machine. used with msgpairarray substate typically.
+ /* msgpair array ptr used when operations can be performed
+ * concurrently. this must be allocated within the upper-level
+ * state machine and is used with the msgpairarray sm.
*/
int msgarray_count;
PINT_client_sm_msgpair_state *msgarray;
/*
internal pvfs_object references; used in conjunction with the
- sm_common state machine routines, or otherwise as scratch pinode
+ sm_common state machine routines, or otherwise as scratch object
references during sm processing
*/
PVFS_object_ref object_ref;
@@ -420,13 +476,35 @@ int PINT_client_state_machine_post(
PVFS_sys_op_id *op_id,
void *user_ptr);
+int PINT_sys_dev_unexp(
+ struct PINT_dev_unexp_info *info,
+ job_status_s *jstat,
+ PVFS_sys_op_id *op_id,
+ void *user_ptr);
+
int PINT_client_state_machine_test(
PVFS_sys_op_id op_id,
int *error_code);
int PINT_client_state_machine_testsome(
PVFS_sys_op_id *op_id_array,
- int *op_count); /* in/out */
+ int *op_count, /* in/out */
+ void **user_ptr_array,
+ int *error_code_array,
+ int timeout_ms);
+
+/* exposed wrapper around the client-state-machine testsome function */
+static inline int PINT_sys_testsome(
+ PVFS_sys_op_id *op_id_array,
+ int *op_count, /* in/out */
+ void **user_ptr_array,
+ int *error_code_array,
+ int timeout_ms)
+{
+ return PINT_client_state_machine_testsome(
+ op_id_array, op_count, user_ptr_array,
+ error_code_array, timeout_ms);
+}
/* exposed wrappers around the id-generator code */
static inline int PINT_id_gen_safe_register(
@@ -451,28 +529,30 @@ static inline int PINT_id_gen_safe_unreg
/* used with post call to tell the system what state machine to use
* when processing a new PINT_client_sm structure.
*/
-enum {
- PVFS_SYS_REMOVE = 1,
- PVFS_SYS_CREATE = 2,
- PVFS_SYS_MKDIR = 3,
- PVFS_SYS_SYMLINK = 4,
- PVFS_SYS_GETATTR = 5,
- PVFS_SYS_IO = 6,
- PVFS_SYS_FLUSH = 7,
- PVFS_SYS_TRUNCATE= 8,
- PVFS_SYS_READDIR = 9,
- PVFS_SYS_SETATTR = 10,
- PVFS_SYS_LOOKUP = 11,
- PVFS_SYS_RENAME = 12,
- PVFS_MGMT_SETPARAM_LIST = 70,
- PVFS_MGMT_NOOP = 71,
- PVFS_MGMT_STATFS_LIST = 72,
- PVFS_MGMT_PERF_MON_LIST = 73,
+enum
+{
+ PVFS_SYS_REMOVE = 1,
+ PVFS_SYS_CREATE = 2,
+ PVFS_SYS_MKDIR = 3,
+ PVFS_SYS_SYMLINK = 4,
+ PVFS_SYS_GETATTR = 5,
+ PVFS_SYS_IO = 6,
+ PVFS_SYS_FLUSH = 7,
+ PVFS_SYS_TRUNCATE = 8,
+ PVFS_SYS_READDIR = 9,
+ PVFS_SYS_SETATTR = 10,
+ PVFS_SYS_LOOKUP = 11,
+ PVFS_SYS_RENAME = 12,
+ PVFS_MGMT_SETPARAM_LIST = 70,
+ PVFS_MGMT_NOOP = 71,
+ PVFS_MGMT_STATFS_LIST = 72,
+ PVFS_MGMT_PERF_MON_LIST = 73,
PVFS_MGMT_ITERATE_HANDLES_LIST = 74,
- PVFS_MGMT_GET_DFILE_ARRAY = 75,
- PVFS_MGMT_EVENT_MON_LIST = 76,
- PVFS_SERVER_GET_CONFIG = 77,
- PVFS_CLIENT_JOB_TIMER = 200
+ PVFS_MGMT_GET_DFILE_ARRAY = 75,
+ PVFS_MGMT_EVENT_MON_LIST = 76,
+ PVFS_SERVER_GET_CONFIG = 77,
+ PVFS_CLIENT_JOB_TIMER = 200,
+ PVFS_DEV_UNEXPECTED = 300
};
/* prototypes of helper functions */
@@ -506,13 +586,7 @@ int PINT_serv_msgpairarray_resolve_addrs
int PINT_client_bmi_cancel(job_id_t id);
-/* INCLUDE STATE-MACHINE.H DOWN HERE */
-#define PINT_OP_STATE PINT_client_sm
-#if 0
-#define PINT_OP_STATE_TABLE PINT_server_op_table
-#endif
-
-#include "state-machine.h"
+int PINT_client_io_cancel(job_id_t id);
/* internal non-blocking helper methods */
int PINT_client_wait_internal(
@@ -533,21 +607,31 @@ PINT_client_wait_internal(op_id, in_op_s
#define PINT_mgmt_release(op_id) PINT_sys_release(op_id)
-#define PINT_init_sysint_credentials(sm_p_cred_p, user_cred_p) \
-do { \
- sm_p_cred_p = PVFS_util_dup_credentials(user_cred_p); \
- if (!sm_p_cred_p) \
- { \
- gossip_lerr("Failed to copy user credentials\n"); \
- free(sm_p); \
- return -PVFS_ENOMEM; \
- } \
+#define PINT_init_sysint_credentials(sm_p_cred_p, user_cred_p)\
+do { \
+ sm_p_cred_p = PVFS_util_dup_credentials(user_cred_p); \
+ if (!sm_p_cred_p) \
+ { \
+ gossip_lerr("Failed to copy user credentials\n"); \
+ free(sm_p); \
+ return -PVFS_ENOMEM; \
+ } \
} while(0)
/* misc helper methods */
struct server_configuration_s *PINT_get_server_config_struct(
PVFS_fs_id fs_id);
+/************************************
+ * state-machine.h included here
+ ************************************/
+#define PINT_OP_STATE PINT_client_sm
+#if 0
+#define PINT_OP_STATE_TABLE PINT_server_op_table
+#endif
+
+#include "state-machine.h"
+
/* system interface function state machines */
extern struct PINT_state_machine_s pvfs2_client_remove_sm;
extern struct PINT_state_machine_s pvfs2_client_create_sm;
@@ -576,6 +660,7 @@ extern struct PINT_state_machine_s pvfs2
extern struct PINT_state_machine_s pvfs2_client_getattr_acache_sm;
extern struct PINT_state_machine_s pvfs2_client_lookup_ncache_sm;
extern struct PINT_state_machine_s pvfs2_client_remove_helper_sm;
+#endif
/*
* Local variables:
@@ -585,5 +670,3 @@ extern struct PINT_state_machine_s pvfs2
*
* vim: ts=8 sts=4 sw=4 noexpandtab
*/
-
-#endif
Index: finalize.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/finalize.c,v
diff -p -u -r1.27 -r1.28
--- finalize.c 17 May 2004 19:48:26 -0000 1.27
+++ finalize.c 8 Jul 2004 16:17:06 -0000 1.28
@@ -27,6 +27,8 @@ extern job_context_id PVFS_sys_job_conte
extern gen_mutex_t *g_session_tag_mt_lock;
extern gen_mutex_t *g_server_config_mutex;
+extern PINT_client_sm *g_sm_p;
+
/* PVFS_finalize
*
* shuts down the PVFS system interface
@@ -65,6 +67,8 @@ int PVFS_sys_finalize()
PINT_release_pvfstab();
gossip_disable();
+
+ free(g_sm_p);
return 0;
}
Index: fs-add.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/fs-add.c,v
diff -p -u -r1.15 -r1.16
--- fs-add.c 17 May 2004 21:06:51 -0000 1.15
+++ fs-add.c 8 Jul 2004 16:17:06 -0000 1.16
@@ -49,10 +49,12 @@ int PVFS_sys_fs_add(struct PVFS_sys_mnte
return -PVFS_EEXIST;
}
- /* first make sure BMI knows how to handle this method, else fail quietly */
+ /* make sure BMI knows how to handle this method, else fail quietly */
ret = BMI_addr_lookup(&test_addr, mntent->pvfs_config_server);
if (ret == bmi_errno_to_pvfs(-ENOPROTOOPT))
+ {
goto error_exit;
+ }
new_server_config = (struct server_configuration_s *)malloc(
sizeof(struct server_configuration_s));
Index: getattr-acache.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/getattr-acache.sm,v
diff -p -u -r1.5 -r1.6
--- getattr-acache.sm 28 Apr 2004 16:32:41 -0000 1.5
+++ getattr-acache.sm 8 Jul 2004 16:17:06 -0000 1.6
@@ -181,15 +181,15 @@ static int getattr_acache_lookup(PINT_cl
fake_resp.status = 0;
/*
- skip the deep fill (i.e. copy) of attributes
- on cache hit. by setting the acache_hit flag,
- caller state machine won't try to read those
- attributes out of this faked response in their
- getattr completion functions
+ skip the deep fill (i.e. copy) of attributes on cache
+ hit. by setting the acache_hit flag, caller state
+ machine won't try to read those attributes out of this
+ faked response in their getattr completion functions.
+ also, the caller MUST call PINT_acache_release or
+ PINT_acache_release_refn on acache hit.
*/
sm_p->acache_hit = 1;
sm_p->msgpair.comp_fn(sm_p, &fake_resp, 0);
- PINT_acache_release(sm_p->pinode);
return 1;
}
cache_miss:
Index: initialize.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/initialize.c,v
diff -p -u -r1.84 -r1.85
--- initialize.c 20 May 2004 17:27:48 -0000 1.84
+++ initialize.c 8 Jul 2004 16:17:06 -0000 1.85
@@ -29,6 +29,8 @@
job_context_id PVFS_sys_job_context = -1;
+PINT_client_sm *g_sm_p = NULL;
+
extern gen_mutex_t *g_session_tag_mt_lock;
typedef enum
@@ -72,6 +74,9 @@ int PVFS_sys_initialize(int default_debu
{
return(-PVFS_ENOMEM);
}
+
+ /* keep track of this pointer for freeing on finalize */
+ g_sm_p = sm_p;
memset(sm_p, 0, sizeof(*sm_p));
gossip_enable_stderr();
@@ -167,7 +172,7 @@ int PVFS_sys_initialize(int default_debu
gossip_lerr("Error initializing attribute cache\n");
goto error_exit;
}
- PINT_acache_set_timeout(PINT_ACACHE_TIMEOUT * 1000);
+ PINT_acache_set_timeout(PINT_ACACHE_TIMEOUT_MS);
client_status_flag |= CLIENT_ACACHE_INIT;
/* initialize the name lookup cache and set the default timeout */
@@ -177,7 +182,7 @@ int PVFS_sys_initialize(int default_debu
gossip_lerr("Error initializing name lookup cache\n");
goto error_exit;
}
- PINT_ncache_set_timeout(PINT_NCACHE_TIMEOUT * 1000);
+ PINT_ncache_set_timeout(PINT_NCACHE_TIMEOUT_MS);
client_status_flag |= CLIENT_NCACHE_INIT;
/* initialize the server configuration manager */
Index: msgpairarray.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/msgpairarray.sm,v
diff -p -u -r1.43 -r1.44
--- msgpairarray.sm 20 May 2004 17:27:49 -0000 1.43
+++ msgpairarray.sm 8 Jul 2004 16:17:06 -0000 1.44
@@ -41,6 +41,9 @@ static int msgpairarray_complete(
static int msgpairarray_completion_fn(
PINT_client_sm *sm_p, job_status_s *js_p);
+static int count_incomplete_msgs(
+ PINT_client_sm_msgpair_state *msgarray, int array_count);
+
%%
nested machine pvfs2_client_msgpairarray_sm(init,
@@ -92,8 +95,8 @@ static int msgpairarray_init(PINT_client
PINT_client_sm_msgpair_state *msg_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
- "msgpairarray state: init (%d message(s))\n",
- sm_p->msgarray_count);
+ "(%p) msgpairarray state: init (%d msgpair(s))\n",
+ sm_p, sm_p->msgarray_count);
assert(sm_p->msgarray_count > 0);
@@ -104,15 +107,19 @@ static int msgpairarray_init(PINT_client
comp_ct in the first msgarray entry to keep up with the count
for the entire array.
*/
- sm_p->msgarray[0].comp_ct = 2 * sm_p->msgarray_count;
+ sm_p->msgarray[0].comp_ct = (2 * sm_p->msgarray_count);
for(i = 0; i < sm_p->msgarray_count; i++)
{
msg_p = &sm_p->msgarray[i];
assert(msg_p);
+ assert((msg_p->retry_flag == PVFS_MSGPAIR_RETRY) ||
+ (msg_p->retry_flag == PVFS_MSGPAIR_NO_RETRY));
+
msg_p->encoded_resp_p = NULL;
msg_p->retry_count = 0;
+ msg_p->complete = 0;
}
return 1;
}
@@ -143,22 +150,31 @@ static int msgpairarray_post(PINT_client
PVFS_msg_tag_t session_tag;
PINT_client_sm_msgpair_state *msg_p = NULL;
struct filesystem_configuration_s *cur_fs = NULL;
+ int num_incomplete_msgpairs = (sm_p->msgarray[0].comp_ct / 2);
- gossip_debug(GOSSIP_CLIENT_DEBUG,
- "msgpairarray state: post (%d message(s))\n",
- sm_p->msgarray_count);
-
- assert(sm_p->msgarray_count > 0);
+ gossip_debug(
+ GOSSIP_CLIENT_DEBUG, "(%p) msgpairarray state: post "
+ "(%d total message(s) with %d incomplete)\n", sm_p,
+ (sm_p->msgarray_count * 2), (num_incomplete_msgpairs * 2));
js_p->error_code = 0;
- for (i = 0; i < sm_p->msgarray_count; i++)
+ assert(sm_p->msgarray_count > 0);
+ assert(num_incomplete_msgpairs && sm_p->msgarray[0].comp_ct);
+
+ for (i = 0; i < num_incomplete_msgpairs; i++)
{
msg_p = &sm_p->msgarray[i];
assert(msg_p);
- assert((msg_p->retry_flag == PVFS_MSGPAIR_RETRY) ||
- (msg_p->retry_flag == PVFS_MSGPAIR_NO_RETRY));
+ /*
+ here we skip over the msgs that have already completed in
+ the case of being in the retry code path when it's ok
+ */
+ if (msg_p->complete)
+ {
+ continue;
+ }
msg_p->op_status = 0;
@@ -189,7 +205,7 @@ static int msgpairarray_post(PINT_client
return 1;
}
- /* calculate maximum response message size and allocate space */
+ /* calculate max response msg size and allocate space */
msg_p->max_resp_sz = PINT_encode_calc_max_size(
PINT_ENCODE_RESP, msg_p->req.op, enc_type);
@@ -205,6 +221,10 @@ static int msgpairarray_post(PINT_client
session_tag = get_next_session_tag();
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "(%p) msgpair %d (%p): posting recv\n",
+ sm_p, (i + 1), msg_p);
+
/* post receive of response; job_id stored in recv_id */
ret = job_bmi_recv(msg_p->svr_addr,
msg_p->encoded_resp_p,
@@ -228,7 +248,7 @@ static int msgpairarray_post(PINT_client
&msg_p->recv_status, 0, pint_client_sm_context);
}
- if (ret < 0 || ret == 1)
+ if ((ret < 0) || (ret == 1))
{
/* it is impossible for this recv to complete at this point
* without errors; we haven't sent the request yet!
@@ -260,6 +280,10 @@ static int msgpairarray_post(PINT_client
*/
assert(ret == 0);
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "(%p) msgpair %d (%p): posting send\n",
+ sm_p, (i + 1), msg_p);
+
/* post send of request; job_id stored in send_id */
ret = job_bmi_send_list(msg_p->encoded_req.dest,
msg_p->encoded_req.buffer_list,
@@ -275,13 +299,19 @@ static int msgpairarray_post(PINT_client
&msg_p->send_id,
pint_client_sm_context,
PVFS2_CLIENT_JOB_TIMEOUT);
- if (ret < 0 || (ret == 1 && msg_p->send_status.error_code != 0))
+
+ if ((ret < 0) ||
+ ((ret == 1) && (msg_p->send_status.error_code != 0)))
{
- if(ret < 0)
+ if (ret < 0)
+ {
PVFS_perror_gossip("Post of send failed", ret);
+ }
else
+ {
PVFS_perror_gossip("Send immediately failed",
msg_p->recv_status.error_code);
+ }
gossip_err("Send error: cancelling recv.\n");
@@ -294,7 +324,7 @@ static int msgpairarray_post(PINT_client
msg_p->send_id = 0;
sm_p->msgarray[0].comp_ct--;
}
- else if(ret == 1)
+ else if (ret == 1)
{
/* immediate completion */
msg_p->send_id = 0;
@@ -305,13 +335,10 @@ static int msgpairarray_post(PINT_client
*/
sm_p->msgarray[0].comp_ct--;
}
- else
- {
- /* successful post, no immediate completion */
- }
+ /* else: successful post, no immediate completion */
}
- if(sm_p->msgarray[0].comp_ct == 0)
+ if (sm_p->msgarray[0].comp_ct == 0)
{
/* everything is completed already (could happen in some failure
* cases); jump straight to final completion function.
@@ -319,14 +346,11 @@ static int msgpairarray_post(PINT_client
js_p->error_code = MSGPAIRS_COMPLETE;
return 1;
}
- else
- {
- /* we are still waiting on operations to complete, next state
- * transition will handle them
- */
- return 0;
- }
+ /* we are still waiting on operations to complete, next state
+ * transition will handle them
+ */
+ return 0;
}
static int msgpairarray_post_retry(PINT_client_sm *sm_p,
@@ -335,7 +359,7 @@ static int msgpairarray_post_retry(PINT_
job_id_t tmp_id;
gossip_debug(GOSSIP_CLIENT_DEBUG,
- "msgpairarray state: post_retry\n");
+ "(%p) msgpairarray state: post_retry\n", sm_p);
return job_req_sched_post_timer(
PVFS2_CLIENT_RETRY_DELAY, sm_p, 0, js_p, &tmp_id,
@@ -345,7 +369,8 @@ static int msgpairarray_post_retry(PINT_
static int msgpairarray_complete(PINT_client_sm *sm_p,
job_status_s *js_p)
{
- gossip_debug(GOSSIP_CLIENT_DEBUG, "msgpairarray state: complete\n");
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "(%p) msgpairarray state: complete\n", sm_p);
/* match operation with something in the msgpair array */
/* the first N tags are receives, the second N are sends */
@@ -410,13 +435,13 @@ static int msgpairarray_completion_fn(PI
job_status_s *js_p)
{
int ret = -PVFS_EINVAL, i = 0;
- struct PINT_decoded_msg decoded_resp; /* data about decoded resp */
+ struct PINT_decoded_msg decoded_resp;
struct PVFS_server_resp *resp_p; /* response structure (decoded) */
js_p->error_code = 0;
- gossip_debug(GOSSIP_CLIENT_DEBUG, "msgpairarray state: "
- "completion_fn\n");
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) msgpairarray state: "
+ "completion_fn\n", sm_p);
for (i = 0; i < sm_p->msgarray_count; i++)
{
@@ -432,12 +457,23 @@ static int msgpairarray_completion_fn(PI
{
msg_p->retry_count++;
- gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgarray send "
- "failed -- retrying msgpair "
- "(count = %d)\n", msg_p->retry_count);
-
- /* FIXME: only redo the failed ones, not all */
- sm_p->msgarray[0].comp_ct = 2 * sm_p->msgarray_count;
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgpairarray send "
+ "failed -- retrying msgpair (count = %d)\n",
+ msg_p->retry_count);
+
+ /*
+ NOTE: we only retry msgpairs that haven't yet been
+ completed
+ */
+ sm_p->msgarray[0].comp_ct = 2 * count_incomplete_msgs(
+ sm_p->msgarray, sm_p->msgarray_count);
+
+ assert(sm_p->msgarray[0].comp_ct > 0);
+
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "*** msgpairarray is "
+ "retrying %d of %d total messages\n",
+ sm_p->msgarray[0].comp_ct,
+ (2 * sm_p->msgarray_count));
js_p->error_code = MSGPAIRS_RETRY;
return 1;
@@ -469,7 +505,7 @@ static int msgpairarray_completion_fn(PI
* meaningful, so we save it.
*/
msg_p->op_status = resp_p->status;
-
+
/* NOTE: we call the function associated with each message,
* not just the one from the first array element. so
* there could in theory be different functions for each
@@ -512,19 +548,46 @@ static int msgpairarray_completion_fn(PI
}
/* free all the resources that we used to send and receive. */
- ret = PINT_serv_free_msgpair_resources(&msg_p->encoded_req,
- msg_p->encoded_resp_p,
- &decoded_resp,
- &msg_p->svr_addr,
- msg_p->max_resp_sz);
- if (ret != 0)
+ ret = PINT_serv_free_msgpair_resources(
+ &msg_p->encoded_req, msg_p->encoded_resp_p, &decoded_resp,
+ &msg_p->svr_addr, msg_p->max_resp_sz);
+ if (ret)
{
PVFS_perror_gossip("Failed to free msgpair resources", ret);
js_p->error_code = ret;
return 1;
}
+
+ msg_p->encoded_resp_p = NULL;
+ msg_p->max_resp_sz = 0;
+
+ /*
+ mark that this msgpair has been completed and should not be
+ retried in the case of possible future retries
+ */
+ msg_p->complete = 1;
+
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "(%p) msgpair %d (%p): marked complete\n",
+ sm_p, (i + 1), msg_p);
}
return 1;
+}
+
+static int count_incomplete_msgs(
+ PINT_client_sm_msgpair_state *msgarray, int array_count)
+{
+ int count = 0, i = 0;
+ PINT_client_sm_msgpair_state *msg = NULL;
+
+ for(i = 0; i < array_count; i++)
+ {
+ msg = &msgarray[i];
+ assert(msg);
+
+ count += (msg->complete ? 0 : 1);
+ }
+ return count;
}
/*
Index: ncache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/ncache.c,v
diff -p -u -r1.3 -r1.4
--- ncache.c 24 Mar 2004 23:10:30 -0000 1.3
+++ ncache.c 8 Jul 2004 16:17:06 -0000 1.4
@@ -58,7 +58,7 @@ static int ncache_add_dentry(
/* static globals required for ncache operation */
static ncache *cache = NULL;
-static int s_pint_ncache_timeout_ms = (PINT_NCACHE_TIMEOUT * 1000);
+static int s_pint_ncache_timeout_ms = PINT_NCACHE_TIMEOUT_MS;
/* compare
Index: ncache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/ncache.h,v
diff -p -u -r1.2 -r1.3
--- ncache.h 24 Mar 2004 23:10:30 -0000 1.2
+++ ncache.h 8 Jul 2004 16:17:06 -0000 1.3
@@ -13,8 +13,8 @@
/* number of entries allowed in the cache */
#define PINT_NCACHE_MAX_ENTRIES 512
-/* number of seconds that cache entries will remain valid */
-#define PINT_NCACHE_TIMEOUT 5
+/* number of milliseconds that cache entries will remain valid */
+#define PINT_NCACHE_TIMEOUT_MS 5000
/* TODO: replace later with real value from trove */
/* value passed out to indicate lookups that didn't find a match */
Index: pint-bucket.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/pint-bucket.c,v
diff -p -u -r1.58 -r1.59
--- pint-bucket.c 1 Jun 2004 19:49:24 -0000 1.58
+++ pint-bucket.c 8 Jul 2004 16:17:06 -0000 1.59
@@ -8,6 +8,7 @@
#include <string.h>
#include <assert.h>
#include <stdlib.h>
+#include <time.h>
#include "pvfs2-types.h"
#include "pvfs2-attr.h"
Index: shared-state-methods.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/shared-state-methods.c,v
diff -p -u -r1.17 -r1.18
--- shared-state-methods.c 4 May 2004 14:42:58 -0000 1.17
+++ shared-state-methods.c 8 Jul 2004 16:17:06 -0000 1.18
@@ -31,33 +31,32 @@ int PINT_sm_common_parent_getattr_setup_
memset(&sm_p->msgpair, 0, sizeof(PINT_client_sm_msgpair_state));
- assert(sm_p->parent_ref.fs_id != 0);
- assert(sm_p->parent_ref.handle != 0);
+ sm_p->msgarray = &(sm_p->msgpair);
+ sm_p->msgarray_count = 1;
- PINT_SERVREQ_GETATTR_FILL(sm_p->msgpair.req,
- *sm_p->cred_p,
- sm_p->parent_ref.fs_id,
- sm_p->parent_ref.handle,
- PVFS_ATTR_COMMON_ALL);
+ assert(sm_p->parent_ref.fs_id != PVFS_FS_ID_NULL);
+ assert(sm_p->parent_ref.handle != PVFS_HANDLE_NULL);
+
+ PINT_SERVREQ_GETATTR_FILL(
+ sm_p->msgpair.req,
+ *sm_p->cred_p,
+ sm_p->parent_ref.fs_id,
+ sm_p->parent_ref.handle,
+ PVFS_ATTR_COMMON_ALL);
- /* fill in msgpair structure components */
sm_p->msgpair.fs_id = sm_p->parent_ref.fs_id;
sm_p->msgpair.handle = sm_p->parent_ref.handle;
sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
- sm_p->msgpair.comp_fn = PINT_sm_common_directory_getattr_comp_fn;
+ sm_p->msgpair.comp_fn = PINT_sm_common_object_getattr_comp_fn;
ret = PINT_bucket_map_to_server(&sm_p->msgpair.svr_addr,
sm_p->msgpair.handle,
sm_p->msgpair.fs_id);
if (ret)
{
- gossip_err("Failed to map meta server address\n");
+ PVFS_perror_gossip("Failed to map meta server address", ret);
js_p->error_code = ret;
}
-
- sm_p->msgarray = &(sm_p->msgpair);
- sm_p->msgarray_count = 1;
-
return 1;
}
@@ -81,8 +80,11 @@ int PINT_sm_common_object_getattr_setup_
memset(&sm_p->msgpair, 0, sizeof(PINT_client_sm_msgpair_state));
- assert(sm_p->object_ref.fs_id != 0);
- assert(sm_p->object_ref.handle != 0);
+ sm_p->msgarray = &(sm_p->msgpair);
+ sm_p->msgarray_count = 1;
+
+ assert(sm_p->object_ref.fs_id != PVFS_FS_ID_NULL);
+ assert(sm_p->object_ref.handle != PVFS_HANDLE_NULL);
PINT_SERVREQ_GETATTR_FILL(
sm_p->msgpair.req,
@@ -91,7 +93,6 @@ int PINT_sm_common_object_getattr_setup_
sm_p->object_ref.handle,
(PVFS_ATTR_COMMON_ALL | PVFS_ATTR_META_ALL));
- /* fill in msgpair structure components */
sm_p->msgpair.fs_id = sm_p->object_ref.fs_id;
sm_p->msgpair.handle = sm_p->object_ref.handle;
sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
@@ -102,13 +103,9 @@ int PINT_sm_common_object_getattr_setup_
sm_p->msgpair.fs_id);
if (ret)
{
- gossip_err("Failed to map meta server address\n");
+ PVFS_perror_gossip("Failed to map meta server address", ret);
js_p->error_code = ret;
}
-
- sm_p->msgarray = &(sm_p->msgpair);
- sm_p->msgarray_count = 1;
-
return 1;
}
@@ -120,121 +117,25 @@ int PINT_sm_common_object_getattr_failur
return 1;
}
-int PINT_sm_common_directory_getattr_comp_fn(
+int PINT_sm_common_object_getattr_comp_fn(
void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
- int ret = 0;
- PVFS_object_attr *attr = NULL;
PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
- assert(resp_p->op == PVFS_SERV_GETATTR);
-
- assert(sm_p->msgarray == &sm_p->msgpair);
- sm_p->msgarray = NULL;
- sm_p->msgarray_count = 0;
-
gossip_debug(GOSSIP_CLIENT_DEBUG,
- "PINT_sm_common_getattr_directory_comp_fn\n");
-
- if (resp_p->status != 0)
- {
- gossip_err("Error: getattr failure\n");
- return resp_p->status;
- }
-
- /*
- if we didn't get a cache hit, we're making a copy of the
- attributes here so that we can add a acache entry later in
- cleanup.
- */
- if (!sm_p->acache_hit)
- {
- PINT_acache_object_attr_deep_copy(
- &sm_p->acache_attr, &resp_p->u.getattr.attr);
- }
-
- /*
- if we got a cache hit, use those attributes, otherwise use the
- real server replied attrs
- */
- attr = (sm_p->acache_hit ?
- &sm_p->pinode->attr :
- &resp_p->u.getattr.attr);
- assert(attr);
-
- if (attr->objtype == PVFS_TYPE_DIRECTORY)
- {
- /*
- check permissions against parent directory to determine
- if we're allowed to create a new entry there
- */
- ret = PINT_check_perms(*attr, attr->perms,
- sm_p->cred_p->uid, sm_p->cred_p->gid);
- if (ret < 0)
- {
- gossip_err("Error: Permission failure\n");
- return -PVFS_EPERM;
- }
- }
- else
- {
- gossip_err("Error: Parent is not a directory\n");
- return -PVFS_ENOTDIR;
- }
-
- /*
- if our parent directory attributes are good, and not present
- int the acache, put them in the acache now
- */
- if (!sm_p->acache_hit)
- {
- int release_required = 1;
- PINT_pinode *pinode =
- PINT_acache_lookup(sm_p->u.getattr.object_ref);
- if (!pinode)
- {
- pinode = PINT_acache_pinode_alloc();
- assert(pinode);
- release_required = 0;
- }
- pinode->refn = sm_p->u.getattr.object_ref;
- pinode->size = ((attr->mask & PVFS_ATTR_DATA_ALL) ?
- attr->u.data.size : 0);
-
- PINT_acache_object_attr_deep_copy(
- &pinode->attr, attr);
-
- PINT_acache_set_valid(pinode);
-
- if (release_required)
- {
- PINT_acache_release(pinode);
- }
- }
- return 0;
-}
+ "PINT_sm_common_getattr_object_comp_fn\n");
-int PINT_sm_common_object_getattr_comp_fn(
- void *v_p,
- struct PVFS_server_resp *resp_p,
- int index)
-{
- PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
-
assert(resp_p->op == PVFS_SERV_GETATTR);
-
assert(sm_p->msgarray == &sm_p->msgpair);
+
sm_p->msgarray = NULL;
sm_p->msgarray_count = 0;
- gossip_debug(GOSSIP_CLIENT_DEBUG,
- "PINT_sm_common_getattr_object_comp_fn\n");
-
- if (resp_p->status != 0)
+ if (resp_p->status)
{
- gossip_err("Error: getattr failure\n");
+ PVFS_perror_gossip("Getattr failed", resp_p->status);
return resp_p->status;
}
Index: shared-state-methods.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/shared-state-methods.h,v
diff -p -u -r1.3 -r1.4
--- shared-state-methods.h 4 Nov 2003 15:48:36 -0000 1.3
+++ shared-state-methods.h 8 Jul 2004 16:17:06 -0000 1.4
@@ -8,35 +8,22 @@
#define __SHARED_STATE_METHODS_H
/*
- this file is for storing common methods that are shared
- between the following client state machines implementations:
-
- sys-create.sm
- sys-mkdir.sm
- sys-symlink.sm
+ this file is for storing common methods that are shared between
+ client state machines (such as create, mkdir, symlink)
*/
-/*
- shared/common state operation functions
-*/
-int PINT_sm_common_parent_getattr_setup_msgpair(PINT_client_sm *sm_p,
- job_status_s *js_p);
-int PINT_sm_common_parent_getattr_failure(PINT_client_sm *sm_p,
- job_status_s *js_p);
-int PINT_sm_common_object_getattr_setup_msgpair(PINT_client_sm *sm_p,
- job_status_s *js_p);
-int PINT_sm_common_object_getattr_failure(PINT_client_sm *sm_p,
- job_status_s *js_p);
+/* shared/common state operation functions */
+int PINT_sm_common_parent_getattr_setup_msgpair(
+ PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_parent_getattr_failure(
+ PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_object_getattr_setup_msgpair(
+ PINT_client_sm *sm_p, job_status_s *js_p);
+int PINT_sm_common_object_getattr_failure(
+ PINT_client_sm *sm_p, job_status_s *js_p);
-/*
- shared/common msgpair completion functions
-*/
-int PINT_sm_common_directory_getattr_comp_fn(void *v_p,
- struct PVFS_server_resp *resp_p,
- int index);
-int PINT_sm_common_object_getattr_comp_fn(void *v_p,
- struct PVFS_server_resp *resp_p,
- int index);
+int PINT_sm_common_object_getattr_comp_fn(
+ void *v_p, struct PVFS_server_resp *resp_p, int index);
#endif /* __SHARED_STATE_METHODS_H */
Index: sys-create.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-create.sm,v
diff -p -u -r1.45 -r1.46
--- sys-create.sm 27 May 2004 22:11:47 -0000 1.45
+++ sys-create.sm 8 Jul 2004 16:17:06 -0000 1.46
@@ -406,13 +406,11 @@ static int create_datafiles_comp_fn(void
}
/* allocate memory for the data handles if we haven't already */
- if (index == 0)
+ if (sm_p->u.create.datafile_handles == NULL)
{
- if (sm_p->u.create.datafile_handles == NULL)
- {
- sm_p->u.create.datafile_handles = (PVFS_handle *)malloc(
- sm_p->u.create.num_data_files * sizeof(PVFS_handle));
- }
+ sm_p->u.create.datafile_handles = (PVFS_handle *)malloc(
+ sm_p->u.create.num_data_files * sizeof(PVFS_handle));
+
if (sm_p->u.create.datafile_handles == NULL)
{
gossip_err("create: Failed to allocate data handle array\n");
@@ -421,8 +419,6 @@ static int create_datafiles_comp_fn(void
memset(sm_p->u.create.datafile_handles, 0,
sm_p->u.create.num_data_files * sizeof(PVFS_handle));
}
-
- assert(sm_p->u.create.datafile_handles);
/* otherwise, just stash the newly created data file handle */
sm_p->u.create.datafile_handles[index] = resp_p->u.create.handle;
Index: sys-getattr.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-getattr.sm,v
diff -p -u -r1.57 -r1.58
--- sys-getattr.sm 21 May 2004 17:22:38 -0000 1.57
+++ sys-getattr.sm 8 Jul 2004 16:17:06 -0000 1.58
@@ -3,31 +3,6 @@
*
* See COPYING in top-level directory.
*/
-
-/* pvfs2_client_getattr_sm
- *
- * This state machine implements the getattr system interface function.
- *
- * The input parameters are held in sm_p->u.getattr.
- *
- * The sm_p->msgpair structure is used to get the attributes of the
- * object itself. We convert the original attribute mask (in
- * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
- * if the user asked for file size (PVFS_ATTR_SYS_SIZE). This allows us
- * to obtain this information (if the object turns out to be a metafile)
- * so that we can later look up the datafile sizes and calculate the overall
- * file size.
- *
- * The datafile handles and distribution information are stored in
- * sm_p->u.getattr also, if the object turns out to be a metafile and the
- * caller asked for the size. These need to be freed once we are done with
- * them.
- *
- * The sm_p->msgpairarray is used to get datafile sizes, if it turns out that
- * we need them. This space will also need to be freed, if we grab these
- * sizes.
- */
-
#include <string.h>
#include <assert.h>
@@ -43,6 +18,21 @@
#include "pint-bucket.h"
#include "PINT-reqproto-encode.h"
+/* pvfs2_client_getattr_sm
+ *
+ * The sm_p->msgpair structure is used to get the attributes of the
+ * object itself. We convert the original attribute mask (in
+ * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
+ * if the user asked for file size (PVFS_ATTR_SYS_SIZE). This allows
+ * us to obtain this information (if the object turns out to be a
+ * metafile) so that we can later look up the datafile sizes and
+ * calculate the overall file size.
+ *
+ * The sm_p->msgpairarray is used to get datafile sizes, if it turns
+ * out that we need them. This space will also need to be freed, if
+ * we grab these sizes.
+ */
+
extern job_context_id pint_client_sm_context;
enum
@@ -71,59 +61,60 @@ static int getattr_datafile_getattr_comp
%%
-machine pvfs2_client_getattr_sm(object_getattr_setup_msgpair,
- object_getattr_xfer_msgpair,
- object_getattr_failure,
- datafile_getattr_setup_msgpairarray,
- datafile_getattr_xfer_msgpairarray,
- datafile_getattr_failure,
- cleanup)
+machine pvfs2_client_getattr_sm(
+ object_getattr_setup_msgpair,
+ object_getattr_xfer_msgpair,
+ object_getattr_failure,
+ datafile_getattr_setup_msgpairarray,
+ datafile_getattr_xfer_msgpairarray,
+ datafile_getattr_failure,
+ cleanup)
{
state object_getattr_setup_msgpair
{
- run getattr_object_getattr_setup_msgpair;
- success => object_getattr_xfer_msgpair;
- default => cleanup;
+ run getattr_object_getattr_setup_msgpair;
+ success => object_getattr_xfer_msgpair;
+ default => cleanup;
}
state object_getattr_xfer_msgpair
{
- jump pvfs2_client_getattr_acache_sm;
- success => cleanup;
- GETATTR_NEED_DATAFILE_SIZES => datafile_getattr_setup_msgpairarray;
- default => object_getattr_failure;
+ jump pvfs2_client_getattr_acache_sm;
+ success => cleanup;
+ GETATTR_NEED_DATAFILE_SIZES => datafile_getattr_setup_msgpairarray;
+ default => object_getattr_failure;
}
state object_getattr_failure
{
- run getattr_object_getattr_failure;
- default => cleanup;
+ run getattr_object_getattr_failure;
+ default => cleanup;
}
state datafile_getattr_setup_msgpairarray
{
- run getattr_datafile_getattr_setup_msgpairarray;
- success => datafile_getattr_xfer_msgpairarray;
- default => cleanup;
+ run getattr_datafile_getattr_setup_msgpairarray;
+ success => datafile_getattr_xfer_msgpairarray;
+ default => cleanup;
}
state datafile_getattr_xfer_msgpairarray
{
- jump pvfs2_client_msgpairarray_sm;
- success => cleanup;
- default => datafile_getattr_failure;
+ jump pvfs2_client_msgpairarray_sm;
+ success => cleanup;
+ default => datafile_getattr_failure;
}
state datafile_getattr_failure
{
- run getattr_datafile_getattr_failure;
- default => cleanup;
+ run getattr_datafile_getattr_failure;
+ default => cleanup;
}
state cleanup
{
- run getattr_cleanup;
- default => terminate;
+ run getattr_cleanup;
+ default => terminate;
}
}
@@ -207,13 +198,8 @@ int PVFS_sys_getattr(
/*******************************************************************/
-/* getattr_object_getattr_setup_msgpair()
- *
- * Fills in sm_p->msgpair to perform a getattr using the getattr_acache
- * state machine.
- */
static int getattr_object_getattr_setup_msgpair(PINT_client_sm *sm_p,
- job_status_s *js_p)
+ job_status_s *js_p)
{
int ret = -PVFS_EINVAL;
uint32_t attrmask;
@@ -231,19 +217,19 @@ static int getattr_object_getattr_setup_
attrmask = sm_p->u.getattr.attrmask;
if (attrmask & PVFS_ATTR_SYS_SIZE)
{
- /* need datafile handles and distribution in order to get
- * datafile handles and know what function to call to get
- * the file size.
- */
- attrmask &= ~PVFS_ATTR_SYS_SIZE;
+ /* need datafile handles and distribution in order to get
+ * datafile handles and know what function to call to get
+ * the file size.
+ */
+ attrmask &= ~PVFS_ATTR_SYS_SIZE;
- attrmask |= (PVFS_ATTR_META_DFILES | PVFS_ATTR_META_DIST);
+ attrmask |= (PVFS_ATTR_META_DFILES | PVFS_ATTR_META_DIST);
attrmask |= PVFS_ATTR_DATA_SIZE;
}
if (attrmask & PVFS_ATTR_SYS_DFILE_COUNT)
{
- attrmask |= PVFS_ATTR_META_DFILES;
+ attrmask |= PVFS_ATTR_META_DFILES;
}
if (attrmask & PVFS_ATTR_SYS_LNK_TARGET)
@@ -256,13 +242,13 @@ static int getattr_object_getattr_setup_
"attrmask being passed to server: ");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG, attrmask);
- PINT_SERVREQ_GETATTR_FILL(sm_p->msgpair.req,
- *sm_p->cred_p,
- sm_p->u.getattr.object_ref.fs_id,
- sm_p->u.getattr.object_ref.handle,
- attrmask);
+ PINT_SERVREQ_GETATTR_FILL(
+ sm_p->msgpair.req,
+ *sm_p->cred_p,
+ sm_p->u.getattr.object_ref.fs_id,
+ sm_p->u.getattr.object_ref.handle,
+ attrmask);
- /* fill in msgpair structure components */
sm_p->msgpair.fs_id = sm_p->u.getattr.object_ref.fs_id;
sm_p->msgpair.handle = sm_p->u.getattr.object_ref.handle;
sm_p->msgpair.retry_flag = PVFS_MSGPAIR_RETRY;
@@ -279,46 +265,31 @@ static int getattr_object_getattr_setup_
return 1;
}
-/* getattr_object_getattr_comp_fn()
- *
- * Called to copy data from getattr response into the
- * getattr-specific portion of the PINT_client_sm structure,
- * so we can use the data after returning to this state
- * machine.
- *
- * Return value is returned in job status, so it affects the
- * resulting state coming back from the nested state machine.
- *
- * Returns 0 for directory, GETATTR_NEED_DATAFILE_SIZES for a
- * metafile (when appropriate). Returns 0 for symlink.
- * Other types die right now.
- */
-static int getattr_object_getattr_comp_fn(void *v_p,
- struct PVFS_server_resp *resp_p,
- int index)
+/*
+ copies data from getattr response into the user supplied sys_attr
+ structure. returns 0 for directories and symlinks, and
+ GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
+*/
+static int getattr_object_getattr_comp_fn(
+ void *v_p,
+ struct PVFS_server_resp *resp_p,
+ int index)
{
int need_datafiles = 0;
PVFS_object_attr *attr = NULL;
-
- /* this is a little kludge to get around some struct definition
- * issues in the headers. maybe fix later?
- */
PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
- assert(resp_p->op == PVFS_SERV_GETATTR); /* sanity check */
+ assert(resp_p->op == PVFS_SERV_GETATTR);
- /* if we get an error, just return immediately, don't try to
- * actually fill anything in.
- */
if (resp_p->status != 0)
{
- return resp_p->status;
+ return resp_p->status;
}
/*
- if we got a metafile that didn't get a cache hit *and* we
- need the size, that should be only time we're going to have
- to do a full data file fetch. (that's expensive)
+ if we got a metafile that didn't get a cache hit *and* we need
+ the size, that should be only time we're going to have to do a
+ full data file fetch. (that's expensive)
*/
if ((resp_p->u.getattr.attr.objtype == PVFS_TYPE_METAFILE) &&
(!sm_p->acache_hit) &&
@@ -328,9 +299,9 @@ static int getattr_object_getattr_comp_f
}
/*
- if we didn't get a cache hit, we're making a
- copy of the attributes here so that we can add
- a acache entry later in cleanup.
+ if we didn't get a cache hit, we're making a copy of the
+ attributes here so that we can add a acache entry later in
+ cleanup.
*/
if (!sm_p->acache_hit)
{
@@ -338,10 +309,6 @@ static int getattr_object_getattr_comp_f
&sm_p->acache_attr, &resp_p->u.getattr.attr);
}
- /*
- if we got a cache hit, use those attributes,
- otherwise use the real server replied attrs
- */
attr = (sm_p->acache_hit ?
&sm_p->pinode->attr :
&resp_p->u.getattr.attr);
@@ -372,16 +339,11 @@ static int getattr_object_getattr_comp_f
if ((attr->objtype == PVFS_TYPE_METAFILE) &&
(sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_DFILE_COUNT))
{
- sm_p->u.getattr.getattr_resp_p->attr.dfile_count =
- attr->u.meta.dfile_count;
+ sm_p->u.getattr.getattr_resp_p->attr.dfile_count =
+ attr->u.meta.dfile_count;
}
- /* transform the attributes from the response
- *
- * Note: the structure in sm_p is a PVFS_sys_attr, while the data
- * returned is in a PVFS_object_attr. So some translation is
- * necessary.
- */
+ /* copy outgoing sys_attr fields from returned object_attr */
sm_p->u.getattr.getattr_resp_p->attr.owner = attr->owner;
sm_p->u.getattr.getattr_resp_p->attr.group = attr->group;
sm_p->u.getattr.getattr_resp_p->attr.perms = attr->perms;
@@ -400,55 +362,24 @@ static int getattr_object_getattr_comp_f
switch (attr->objtype)
{
- case PVFS_TYPE_METAFILE:
- if (sm_p->msgpair.req.u.getattr.attrmask & PVFS_ATTR_META_DIST)
+ case PVFS_TYPE_METAFILE:
+ if (sm_p->msgpair.req.u.getattr.attrmask &
+ PVFS_ATTR_META_DIST)
{
- /* sanity checks */
- assert(attr->mask & PVFS_ATTR_META_DIST);
- assert(attr->u.meta.dist_size > 0);
-
- gossip_debug(GOSSIP_GETATTR_DEBUG,
- "getattr_object_getattr_comp_fn: "
- "copying %d bytes of dist.\n",
- attr->u.meta.dist_size);
+ assert(attr->mask & PVFS_ATTR_META_DIST);
+ assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
+ }
- /* here we make a copy of the distribution information. */
- sm_p->u.getattr.dist_p = PINT_dist_copy(attr->u.meta.dist);
- if (sm_p->u.getattr.dist_p == NULL) {
- assert(0);
- return -PVFS_ENOMEM;
- }
- sm_p->u.getattr.dist_size = attr->u.meta.dist_size;
-
- /* nothing special about our return value here */
- }
- if (sm_p->msgpair.req.u.getattr.attrmask & PVFS_ATTR_META_DFILES)
+ if (sm_p->msgpair.req.u.getattr.attrmask &
+ PVFS_ATTR_META_DFILES)
{
- /* need to save datafile handles to calculate file size;
- * redirect us to those states.
- */
-
- /* sanity checks */
- assert(attr->mask & PVFS_ATTR_META_DFILES);
- assert(attr->u.meta.dfile_count > 0);
-
- gossip_debug(GOSSIP_GETATTR_DEBUG,
- "getattr_object_getattr_comp_fn: %d datafiles.\n",
- attr->u.meta.dfile_count);
-
- /* save the datafile handles prior to freeing up the
- * buffers we used for messages. we could keep them around
- * i suppose, but we're not going to do that for now. later
- * it is likely that this stuff will be stuck in the acache
- * anyway, so we'll be able to just reference it from there.
- */
- sm_p->u.getattr.datafile_handles = (PVFS_handle *)
- malloc(attr->u.meta.dfile_count * sizeof(PVFS_handle));
- assert(sm_p->u.getattr.datafile_handles);
- sm_p->u.getattr.datafile_count = attr->u.meta.dfile_count;
- memcpy(sm_p->u.getattr.datafile_handles,
- attr->u.meta.dfile_array,
- attr->u.meta.dfile_count * sizeof(PVFS_handle));
+ assert(attr->mask & PVFS_ATTR_META_DFILES);
+ assert(attr->u.meta.dfile_array &&
+ (attr->u.meta.dfile_count > 0));
+
+ gossip_debug(GOSSIP_GETATTR_DEBUG,
+ "getattr_object_getattr_comp_fn: "
+ "%d datafiles.\n", attr->u.meta.dfile_count);
if (need_datafiles == 0)
{
@@ -458,39 +389,34 @@ static int getattr_object_getattr_comp_f
*/
return 0;
}
- else if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
+ else if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
{
- /* if caller asked for the size, AND we couldn't
+ /* if caller asked for the size, AND we couldn't
* retrieve them from the acache, then we need to
- * go get the datafile sizes
- */
- return GETATTR_NEED_DATAFILE_SIZES;
- }
- }
- else return 0;
- break;
- case PVFS_TYPE_DIRECTORY:
- return 0;
- break;
- case PVFS_TYPE_SYMLINK:
+ * retrieve the datafile sizes
+ */
+ return GETATTR_NEED_DATAFILE_SIZES;
+ }
+ }
+ return 0;
+ case PVFS_TYPE_DIRECTORY:
+ return 0;
+ case PVFS_TYPE_SYMLINK:
return 0;
- break;
- case PVFS_TYPE_DATAFILE:
- /* fall through */
- case PVFS_TYPE_DIRDATA:
- /* fall through */
- default:
- gossip_err("error: getattr_object_getattr_comp_fn: "
+ case PVFS_TYPE_DATAFILE:
+ /* fall through */
+ case PVFS_TYPE_DIRDATA:
+ /* fall through */
+ default:
+ gossip_err("error: getattr_object_getattr_comp_fn: "
"handle refers to invalid object type\n");
- return -PVFS_EINVAL;
}
-
- return -PVFS_EINVAL; /* should not get here */
+ return -PVFS_EINVAL;
}
static int getattr_object_getattr_failure(PINT_client_sm *sm_p,
- job_status_s *js_p)
+ job_status_s *js_p)
{
gossip_debug(
GOSSIP_CLIENT_DEBUG,
@@ -499,109 +425,114 @@ static int getattr_object_getattr_failur
if ((js_p->error_code != -PVFS_ENOENT) &&
(js_p->error_code != -PVFS_EINVAL))
{
- gossip_err("getattr: Failed with unexpected error code %d\n",
- js_p->error_code);
+ PVFS_perror_gossip("getattr_object_getattr_failure ",
+ js_p->error_code);
}
return 1;
}
-static int getattr_datafile_getattr_setup_msgpairarray(PINT_client_sm *sm_p,
- job_status_s *js_p)
+static int getattr_datafile_getattr_setup_msgpairarray(
+ PINT_client_sm *sm_p, job_status_s *js_p)
{
- int i = 0;
- int ret = -PVFS_EINVAL;
+ int ret = -PVFS_EINVAL, i = 0;
+ PVFS_object_attr *attr = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) getattr state: "
"getattr_datafile_getattr_setup_msgpairarray\n", sm_p);
js_p->error_code = 0;
-
- sm_p->msgarray = (PINT_client_sm_msgpair_state *)
- malloc(sm_p->u.getattr.datafile_count *
- sizeof(PINT_client_sm_msgpair_state));
- assert(sm_p->msgarray);
-
- sm_p->u.getattr.size_array = (PVFS_size *)
- malloc(sm_p->u.getattr.datafile_count * sizeof(PVFS_size));
- assert(sm_p->u.getattr.size_array);
- sm_p->msgarray_count = sm_p->u.getattr.datafile_count;
+ attr = (sm_p->acache_hit ?
+ &sm_p->pinode->attr :
+ &sm_p->acache_attr);
+ assert(attr);
+
+ sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
+ attr->u.meta.dfile_count * sizeof(PINT_client_sm_msgpair_state));
+ if (!sm_p->msgarray)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+
+ sm_p->u.getattr.size_array = (PVFS_size *)malloc(
+ attr->u.meta.dfile_count * sizeof(PVFS_size));
+ if (!sm_p->u.getattr.size_array)
+ {
+ free(sm_p->msgarray);
+ sm_p->msgarray = NULL;
+
+ js_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+
+ sm_p->msgarray_count = attr->u.meta.dfile_count;
/* for each datafile, post a send/recv pair to obtain the size */
for (i=0; i < sm_p->msgarray_count; i++)
{
- PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+ PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+ assert(msg_p);
+
+ gossip_debug(GOSSIP_GETATTR_DEBUG,
+ " datafile_getattr: getting size for handle %Lu\n",
+ Lu(attr->u.meta.dfile_array[i]));
+
+ PINT_SERVREQ_GETATTR_FILL(
+ msg_p->req,
+ *sm_p->cred_p,
+ sm_p->u.getattr.object_ref.fs_id,
+ attr->u.meta.dfile_array[i],
+ PVFS_ATTR_DATA_SIZE);
- gossip_debug(GOSSIP_GETATTR_DEBUG,
- " datafile_getattr: getting size for handle %Lu\n",
- Lu(sm_p->u.getattr.datafile_handles[i]));
-
- /* fill in getattr request; all we care about is the size */
- PINT_SERVREQ_GETATTR_FILL(msg_p->req,
- *sm_p->cred_p,
- sm_p->u.getattr.object_ref.fs_id,
- sm_p->u.getattr.datafile_handles[i],
- PVFS_ATTR_DATA_SIZE);
-
- /* fill in msgpair structure components */
- msg_p->fs_id = sm_p->u.getattr.object_ref.fs_id;
- msg_p->handle = sm_p->u.getattr.datafile_handles[i];
+ msg_p->fs_id = sm_p->u.getattr.object_ref.fs_id;
+ msg_p->handle = attr->u.meta.dfile_array[i];
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
- msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
+ msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
}
- /* fill in address of each server to contact */
ret = PINT_serv_msgpairarray_resolve_addrs(
sm_p->msgarray_count, sm_p->msgarray);
if (ret < 0)
{
- gossip_lerr("Error: failed to resolve meta server addresses.\n");
- js_p->error_code = ret;
+ gossip_lerr("Error: failed to resolve meta server addresses.\n");
+ js_p->error_code = ret;
}
return 1;
}
static int getattr_datafile_getattr_comp_fn(
- void *v_p,
- struct PVFS_server_resp *resp_p,
- int index)
+ void *v_p, struct PVFS_server_resp *resp_p, int index)
{
- PINT_client_sm *sm_p = (PINT_client_sm *) v_p;
+ PINT_client_sm *sm_p = (PINT_client_sm *)v_p;
if (resp_p->status != 0)
{
- return resp_p->status;
+ return resp_p->status;
}
- /* sanity checks:
- * - this is a getattr response
- * - the attributes indicate that this was indeed a datafile
- * - the datafile has a positive size
- */
assert(resp_p->op == PVFS_SERV_GETATTR);
- assert(resp_p->u.getattr.attr.objtype == PVFS_TYPE_DATAFILE);
- assert(resp_p->u.getattr.attr.u.data.size >= 0);
gossip_debug(GOSSIP_GETATTR_DEBUG,
- "datafile_getattr: size of datafile %d is %Ld\n",
- index, Ld(resp_p->u.getattr.attr.u.data.size));
+ "datafile_getattr: size of datafile %d is %Ld\n",
+ index, Ld(resp_p->u.getattr.attr.u.data.size));
- sm_p->u.getattr.size_array[index] = resp_p->u.getattr.attr.u.data.size;
+ sm_p->u.getattr.size_array[index] =
+ resp_p->u.getattr.attr.u.data.size;
return 0;
}
static int getattr_datafile_getattr_failure(PINT_client_sm *sm_p,
- job_status_s *js_p)
+ job_status_s *js_p)
{
- gossip_debug(GOSSIP_CLIENT_DEBUG,
- "(%p) getattr state: getattr_datafile_getattr_failure\n",
- sm_p);
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) getattr state: "
+ "getattr_datafile_getattr_failure\n", sm_p);
return 1;
}
static int getattr_cleanup(PINT_client_sm *sm_p,
- job_status_s *js_p)
+ job_status_s *js_p)
{
int ret = 0;
uint32_t amask;
@@ -614,32 +545,36 @@ static int getattr_cleanup(PINT_client_s
if (js_p->error_code == 0)
{
- /* calculate size of file if necessary */
- assert(s_attr_p->size == 0);
+ /* calculate size of file if necessary */
+ assert(s_attr_p->size == 0);
- amask = s_attr_p->mask; /* from object getattr */
+ amask = s_attr_p->mask; /* from object getattr */
if (amask & PVFS_ATTR_META_DIST)
{
/*
- recompute size if it's not a acache hit, or if it is
- a acache hit on an entry that no longer has a valid size
+ recompute size if it's not a acache hit, or if it is a
+ acache hit on an entry that no longer has a valid size
*/
if (!sm_p->acache_hit ||
(sm_p->acache_hit &&
!(sm_p->pinode->attr.mask & PVFS_ATTR_DATA_SIZE)))
{
- ret = PINT_dist_lookup(sm_p->u.getattr.dist_p);
- if (ret < 0)
- {
- assert(0);
- }
-
- s_attr_p->size =
- (sm_p->u.getattr.dist_p->methods->logical_file_size)
- (sm_p->u.getattr.dist_p->params,
- sm_p->u.getattr.datafile_count,
- sm_p->u.getattr.size_array);
+ PINT_dist *dist = NULL;
+ PVFS_object_attr *attr = (sm_p->acache_hit ?
+ &sm_p->pinode->attr :
+ &sm_p->acache_attr);
+ assert(attr && attr->u.meta.dist);
+
+ dist = attr->u.meta.dist;
+ assert(dist->methods);
+
+ ret = PINT_dist_lookup(dist);
+ assert(ret == 0);
+
+ s_attr_p->size = (dist->methods->logical_file_size)(
+ dist->params, attr->u.meta.dfile_count,
+ sm_p->u.getattr.size_array);
}
else
{
@@ -647,58 +582,33 @@ static int getattr_cleanup(PINT_client_s
}
}
- /* redo the mask */
- s_attr_p->mask = PVFS_util_object_to_sys_attr_mask(s_attr_p->mask);
+ /* convert outgoing attribute mask based on what we got */
+ s_attr_p->mask = PVFS_util_object_to_sys_attr_mask(
+ s_attr_p->mask);
- /* we know we grabbed the size if they asked for it */
- if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
+ /* and add the size if they asked for it */
+ if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_SIZE)
{
- s_attr_p->mask |= PVFS_ATTR_SYS_SIZE;
- }
+ s_attr_p->mask |= PVFS_ATTR_SYS_SIZE;
+ }
- /* if this is a symlink, we know we have the target */
+ /* if this is a symlink, add the link target */
if (sm_p->u.getattr.attrmask & PVFS_ATTR_SYS_LNK_TARGET)
{
s_attr_p->mask |= PVFS_ATTR_SYS_LNK_TARGET;
}
- /* free all the memory we allocated, which could include:
- * - size array
- * - msgpair array
- * - datafile handle array (if not from acache)
- * - distribution description (if not from acache)
- */
- if (sm_p->u.getattr.size_array != NULL)
+ if (sm_p->u.getattr.size_array)
{
- free(sm_p->u.getattr.size_array);
- }
-
- if (sm_p->msgarray != NULL && (sm_p->msgarray != &(sm_p->msgpair)))
- {
- free(sm_p->msgarray);
- }
- sm_p->msgarray = NULL;
- sm_p->msgarray_count = 0;
+ free(sm_p->u.getattr.size_array);
+ sm_p->u.getattr.size_array = NULL;
+ }
- /*
- only free dist and dfile memory if we didn't get a
- cache hit; if we got a acache hit, we're referencing
- memory inside the acache entries, so no allocations we
- made
- */
- if (!sm_p->acache_hit)
+ if (sm_p->msgarray && (sm_p->msgarray != &(sm_p->msgpair)))
{
- if ((amask & PVFS_ATTR_META_DFILES) &&
- (sm_p->u.getattr.datafile_handles != NULL))
- {
- free(sm_p->u.getattr.datafile_handles);
- }
- if ((amask & PVFS_ATTR_META_DIST) &&
- (sm_p->u.getattr.dist_p != NULL))
- {
- PINT_dist_free(sm_p->u.getattr.dist_p);
- }
+ free(sm_p->msgarray);
}
+ sm_p->msgarray = NULL;
/* add entry to acache if not already present */
if (!sm_p->acache_hit)
@@ -712,6 +622,10 @@ static int getattr_cleanup(PINT_client_s
assert(pinode);
release_required = 0;
}
+
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
+ "reference to acache (%s present)\n",
+ (release_required ? "already" : "not"));
pinode->refn = sm_p->u.getattr.object_ref;
if (sm_p->acache_attr.objtype == PVFS_TYPE_SYMLINK)
@@ -726,7 +640,6 @@ static int getattr_cleanup(PINT_client_s
not apply at all to symlinks, so special case
symlinks above
*/
-
pinode->size = s_attr_p->size;
/*
update the attr mask in the src object so that
@@ -753,19 +666,16 @@ static int getattr_cleanup(PINT_client_s
{
PINT_acache_release(pinode);
}
- sm_p->acache_hit = 0;
}
}
else
{
/* in case of failure, blank out response */
- memset(sm_p->u.getattr.getattr_resp_p,
+ memset(sm_p->u.getattr.getattr_resp_p,
0, sizeof(PVFS_sysresp_getattr));
}
- /* mark operation as complete */
sm_p->op_complete = 1;
-
return 0;
}
Index: sys-io.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-io.sm,v
diff -p -u -r1.77 -r1.78
--- sys-io.sm 22 Jun 2004 20:40:38 -0000 1.77
+++ sys-io.sm 8 Jul 2004 16:17:06 -0000 1.78
@@ -3,7 +3,6 @@
*
* See COPYING in top-level directory.
*/
-
#include <string.h>
#include <assert.h>
@@ -23,9 +22,9 @@ extern job_context_id pint_client_sm_con
enum
{
- IO_NO_DATA = 1,
- IO_DATAFILE_TRANSFERS_COMPLETE = 2,
- IO_RETRY = 3
+ IO_NO_DATA = 132,
+ IO_DATAFILE_TRANSFERS_COMPLETE,
+ IO_RETRY
};
static int io_init(
@@ -36,57 +35,71 @@ static int io_datafile_setup_msgpairs(
PINT_client_sm *sm_p, job_status_s *js_p);
static int io_datafile_post_msgpairs(
PINT_client_sm *sm_p, job_status_s *js_p);
-static int io_datafile_complete_msgpairs(
+static int io_datafile_complete_operations(
PINT_client_sm *sm_p, job_status_s *js_p);
static int io_analyze_results(
PINT_client_sm *sm_p, job_status_s *js_p);
+static int io_cleanup(
+ PINT_client_sm *sm_p, job_status_s *js_p);
-/* helper functions */
+/* misc helper functions */
+static inline int complete_context_send_or_recv(
+ PINT_client_sm *sm_p, job_status_s *js_p);
+static inline int process_context_recv(
+ PINT_client_io_ctx *cur_ctx,
+ struct PINT_decoded_msg *decoded_resp,
+ struct PVFS_server_resp **resp);
+static inline int build_context_flow(
+ PINT_client_sm *sm_p, PINT_client_io_ctx *cur_ctx,
+ PVFS_object_attr *attr, struct PVFS_server_resp *resp);
+static inline int process_context_recv_and_post_flow(
+ PINT_client_sm *sm_p, job_status_s *js_p);
+static inline int check_context_status(
+ PINT_client_io_ctx *cur_ctx, int io_type,
+ PVFS_size *total_size);
static int io_find_target_datafiles(
- PVFS_Request mem_req,
- PVFS_Request file_req,
- PVFS_offset file_req_offset,
- PINT_dist *dist_p,
- PVFS_handle *input_handle_array,
- int input_handle_count,
- PVFS_handle *output_handle_array,
- int *handle_index_array,
- int *handle_count_out_p);
-
-#define CLEAN_PRIVATE_MEMBERS(iosm_p) \
-do { \
- if (iosm_p->datafile_index_array) \
- { \
- free(iosm_p->datafile_index_array); \
- iosm_p->datafile_index_array = NULL;\
- } \
- if (sm_p->msgarray && \
- (sm_p->msgarray != &sm_p->msgpair)) \
- { \
- free(sm_p->msgarray); \
- sm_p->msgarray = NULL; \
- sm_p->msgarray_count = 0; \
- } \
- if (iosm_p->flow_array) \
- { \
- free(iosm_p->flow_array); \
- iosm_p->flow_array = NULL; \
- } \
- if (iosm_p->flow_status_array) \
- { \
- free(iosm_p->flow_status_array); \
- iosm_p->flow_status_array = NULL; \
- } \
- if (iosm_p->session_tag_array) \
- { \
- free(iosm_p->session_tag_array); \
- iosm_p->session_tag_array = NULL; \
- } \
- if (iosm_p->ackarray) \
- { \
- free(iosm_p->ackarray); \
- iosm_p->ackarray = NULL; \
- } \
+ PVFS_Request mem_req, PVFS_Request file_req,
+ PVFS_offset file_req_offset, PINT_dist *dist_p,
+ PVFS_handle *input_handle_array, int input_handle_count,
+ int *handle_index_array, int *handle_index_out_count);
+
+/* misc constants and helper macros */
+#define IO_RECV_COMPLETED 1
+
+/* possible I/O state machine phases (status_user_tag) */
+#define IO_SM_PHASE_REQ_MSGPAIR_RECV 0
+#define IO_SM_PHASE_REQ_MSGPAIR_SEND 1
+#define IO_SM_PHASE_FLOW 2
+#define IO_SM_PHASE_FINAL_ACK 3
+#define IO_SM_NUM_PHASES 4
+
+#define STATUS_USER_TAG_TYPE(tag, type) \
+((tag % IO_SM_NUM_PHASES) == type)
+#define STATUS_USER_TAG_GET_INDEX(tag, type) \
+(tag / IO_SM_NUM_PHASES)
+#define STATUS_USER_TAG_IS_SEND_OR_RECV(tag) \
+(STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) || \
+ STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
+
+#define CLEAN_PRIVATE_MEMBERS(sm_p) \
+do { \
+ if (sm_p->u.io.datafile_index_array) \
+ { \
+ free(sm_p->u.io.datafile_index_array); \
+ sm_p->u.io.datafile_index_array = NULL; \
+ } \
+ if (sm_p->msgarray && \
+ (sm_p->msgarray != &sm_p->msgpair)) \
+ { \
+ free(sm_p->msgarray); \
+ sm_p->msgarray = NULL; \
+ sm_p->msgarray_count = 0; \
+ } \
+ if (sm_p->u.io.contexts) \
+ { \
+ free(sm_p->u.io.contexts); \
+ sm_p->u.io.contexts = NULL; \
+ } \
} while(0)
%%
@@ -98,8 +111,9 @@ machine pvfs2_client_io_sm(
io_getattr_failure,
io_datafile_setup_msgpairs,
io_datafile_post_msgpairs,
- io_datafile_complete_msgpairs,
- io_analyze_results)
+ io_datafile_complete_operations,
+ io_analyze_results,
+ io_cleanup)
{
state init
{
@@ -124,34 +138,40 @@ machine pvfs2_client_io_sm(
state io_getattr_failure
{
run io_object_getattr_failure;
- default => io_analyze_results;
+ default => io_cleanup;
}
state io_datafile_setup_msgpairs
{
run io_datafile_setup_msgpairs;
+ IO_NO_DATA => io_cleanup;
success => io_datafile_post_msgpairs;
- default => io_analyze_results;
+ default => io_cleanup;
}
state io_datafile_post_msgpairs
{
run io_datafile_post_msgpairs;
- success => io_datafile_complete_msgpairs;
- default => io_analyze_results;
+ default => io_datafile_complete_operations;
}
- state io_datafile_complete_msgpairs
+ state io_datafile_complete_operations
{
- run io_datafile_complete_msgpairs;
+ run io_datafile_complete_operations;
IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
- default => io_datafile_complete_msgpairs;
+ default => io_datafile_complete_operations;
}
state io_analyze_results
{
run io_analyze_results;
IO_RETRY => init;
+ default => io_cleanup;
+ }
+
+ state io_cleanup
+ {
+ run io_cleanup;
default => terminate;
}
}
@@ -227,17 +247,9 @@ int PVFS_isys_io(
sm_p->u.io.encoding = cur_fs->encoding;
sm_p->u.io.stored_error_code = 0;
sm_p->u.io.retry_count = 0;
- sm_p->u.io.datafile_handles = NULL;
- sm_p->u.io.datafile_index_array = NULL;
sm_p->msgarray = NULL;
- sm_p->u.io.dist_p = NULL;
- sm_p->u.io.dist_size = 0;
- sm_p->u.io.datafile_handles = NULL;
+ sm_p->u.io.datafile_index_array = NULL;
sm_p->u.io.datafile_count = 0;
- sm_p->u.io.flow_array = NULL;
- sm_p->u.io.flow_status_array = NULL;
- sm_p->u.io.session_tag_array = NULL;
- sm_p->u.io.ackarray = NULL;
return PINT_client_state_machine_post(
sm_p, PVFS_SYS_IO, op_id, user_ptr);
@@ -287,7 +299,7 @@ static int io_init(PINT_client_sm *sm_p,
{
job_id_t tmp_id;
- gossip_debug(GOSSIP_CLIENT_DEBUG, "io state: init\n");
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
assert((js_p->error_code == 0) ||
(js_p->error_code == IO_RETRY));
@@ -296,6 +308,12 @@ static int io_init(PINT_client_sm *sm_p,
{
js_p->error_code = 0;
+ if (sm_p->op_cancelled)
+ {
+ js_p->error_code = -PVFS_ECANCEL;
+ return(1);
+ }
+
return job_req_sched_post_timer(
PVFS2_CLIENT_RETRY_DELAY, sm_p, 0, js_p, &tmp_id,
pint_client_sm_context);
@@ -303,34 +321,46 @@ static int io_init(PINT_client_sm *sm_p,
return 1;
}
+static int io_object_getattr_failure(PINT_client_sm *sm_p,
+ job_status_s *js_p)
+{
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
+ "io_object_getattr_failure\n", sm_p);
+
+ if (sm_p->op_cancelled)
+ {
+ js_p->error_code = -PVFS_ECANCEL;
+ }
+
+ /*
+ NOTE: this can happen if we're doing I/O on a file that was
+ removed by another process
+ */
+ if (js_p->error_code == 0)
+ {
+ js_p->error_code = -PVFS_ENOENT;
+ }
+
+ sm_p->u.io.stored_error_code = js_p->error_code;
+ return 1;
+}
-/* io_datafile_setup_msgpairs()
- *
- * Sets up msgpairs to send I/O requests to servers holding datafiles
- * that (might) have data for us (unless we hit EOF).
- *
- * This function swaps the original datafile_handles array for a new
- * array that just has datafiles that we think will have our data. It
- * updates datafile_count appropriately as well.
- *
- * We use sm_p->msgarray for this purpose.
- *
- * NOTE: we could combine this with the post_msgpairs state, but this one
- * has gotten pretty big already, so let's not.
- *
- */
static int io_datafile_setup_msgpairs(PINT_client_sm *sm_p,
job_status_s *js_p)
{
int ret = -PVFS_EINVAL, i = 0;
- int target_datafile_count = 0;
PVFS_object_attr *attr = NULL;
- PVFS_handle *target_datafile_array = NULL;
- struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
+ int target_datafile_count = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
"io_datafile_setup_msgpairs\n", sm_p);
+ if (sm_p->op_cancelled)
+ {
+ js_p->error_code = -PVFS_ECANCEL;
+ return 1;
+ }
+
js_p->error_code = 0;
attr = (sm_p->acache_hit ?
@@ -352,83 +382,47 @@ static int io_datafile_setup_msgpairs(PI
js_p->error_code = -PVFS_EISDIR;
return 1;
default:
- gossip_err("Error: I/O attempted on invalid object type.\n");
- js_p->error_code = -PVFS_EIO;
+ js_p->error_code = -PVFS_EBADF;
return 1;
}
- /*
- assign internal io ptrs for convenience here (without copying)
- if unassigned
- */
- if (!sm_p->u.io.dist_p)
- {
- sm_p->u.io.dist_p = attr->u.meta.dist;
- }
- if (!sm_p->u.io.dist_size)
- {
- sm_p->u.io.dist_size = attr->u.meta.dist_size;
- }
- if (!sm_p->u.io.datafile_handles)
- {
- sm_p->u.io.datafile_handles = attr->u.meta.dfile_array;
- }
- if (!sm_p->u.io.datafile_count)
- {
- sm_p->u.io.datafile_count = attr->u.meta.dfile_count;
- }
-
- ret = PINT_dist_lookup(iosm_p->dist_p);
- assert(ret == 0);
-
- if (!target_datafile_array)
- {
- target_datafile_array = (PVFS_handle *)malloc(
- (iosm_p->datafile_count * sizeof(PVFS_handle)));
- }
- if (!target_datafile_array)
+ ret = PINT_dist_lookup(attr->u.meta.dist);
+ if (ret)
{
- js_p->error_code = -PVFS_ENOMEM;
+ PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
+ js_p->error_code = -PVFS_EBADF;
return 1;
}
- if (!iosm_p->datafile_index_array)
- {
- iosm_p->datafile_index_array = (int *)malloc(
- (iosm_p->datafile_count * sizeof(int)));
- }
- if (!iosm_p->datafile_index_array)
+ sm_p->u.io.datafile_index_array = (int *)malloc(
+ (attr->u.meta.dfile_count * sizeof(int)));
+ if (!sm_p->u.io.datafile_index_array)
{
goto malloc_error_exit;
}
+ memset(sm_p->u.io.datafile_index_array, 0,
+ (attr->u.meta.dfile_count * sizeof(int)));
ret = io_find_target_datafiles(
- iosm_p->mem_req,
- iosm_p->file_req,
- iosm_p->file_req_offset,
- iosm_p->dist_p,
- iosm_p->datafile_handles,
- iosm_p->datafile_count,
- target_datafile_array,
- iosm_p->datafile_index_array,
+ sm_p->u.io.mem_req,
+ sm_p->u.io.file_req,
+ sm_p->u.io.file_req_offset,
+ attr->u.meta.dist,
+ attr->u.meta.dfile_array,
+ attr->u.meta.dfile_count,
+ sm_p->u.io.datafile_index_array,
&target_datafile_count);
+
assert(ret == 0);
if (target_datafile_count == 0)
{
- if (target_datafile_array)
- {
- free(target_datafile_array);
- target_datafile_array = NULL;
- }
+ free(sm_p->u.io.datafile_index_array);
+ sm_p->u.io.datafile_index_array = NULL;
- if (iosm_p->datafile_index_array)
- {
- free(iosm_p->datafile_index_array);
- iosm_p->datafile_index_array = NULL;
- }
+ gossip_debug(GOSSIP_IO_DEBUG, " datafile_setup_msgpairs: no "
+ "datafiles have data; aborting\n");
- /* the no data case should be caught earlier than this */
js_p->error_code = IO_NO_DATA;
return 1;
}
@@ -437,123 +431,74 @@ static int io_datafile_setup_msgpairs(PI
" datafile_setup_msgpairs: %d datafiles "
"might have data\n", target_datafile_count);
- /* setup msgpair array */
- if (!sm_p->msgarray)
- {
- sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
- (target_datafile_count *
- sizeof(PINT_client_sm_msgpair_state)));
- }
- if (!sm_p->msgarray)
+ sm_p->u.io.contexts = (PINT_client_io_ctx *)malloc(
+ (target_datafile_count * sizeof(PINT_client_io_ctx)));
+ if (!sm_p->u.io.contexts)
{
goto malloc_error_exit;
}
- sm_p->msgarray_count = target_datafile_count;
+ memset(sm_p->u.io.contexts, 0,
+ (target_datafile_count * sizeof(PINT_client_io_ctx)));
- /* setup flow descriptor array */
- if (!iosm_p->flow_array)
- {
- iosm_p->flow_array = (flow_descriptor *)malloc(
- (target_datafile_count * sizeof(flow_descriptor)));
- }
- if (!iosm_p->flow_array)
+ sm_p->msgarray_count = target_datafile_count;
+ sm_p->msgarray = (PINT_client_sm_msgpair_state *)malloc(
+ (sm_p->msgarray_count * sizeof(PINT_client_sm_msgpair_state)));
+ if (!sm_p->msgarray)
{
goto malloc_error_exit;
}
- iosm_p->flow_comp_ct = 0;
+ memset(sm_p->msgarray, 0, (sm_p->msgarray_count *
+ sizeof(PINT_client_sm_msgpair_state)));
- /* set each flow descriptor to initial values */
- for(i = 0; i < target_datafile_count; i++)
- {
- PINT_flow_reset(&iosm_p->flow_array[i]);
- }
+ sm_p->u.io.total_cancellations_remaining = 0;
- /* setup flow status array */
- if (!iosm_p->flow_status_array)
- {
- iosm_p->flow_status_array = (job_status_s *)malloc(
- (target_datafile_count * sizeof(job_status_s)));
- }
- if (!iosm_p->flow_status_array)
- {
- goto malloc_error_exit;
- }
-
- /* setup session tag array */
- if (!iosm_p->session_tag_array)
- {
- iosm_p->session_tag_array = (PVFS_msg_tag_t *)malloc(
- (target_datafile_count * sizeof(PVFS_msg_tag_t)));
- }
- if (!iosm_p->session_tag_array)
- {
- goto malloc_error_exit;
- }
-
- /* setup space for final ack if we're doing a write */
- if (iosm_p->io_type == PVFS_IO_WRITE)
+ /* initialize all per server I/O operation contexts and requests */
+ for(i = 0; i < target_datafile_count; i++)
{
- /* setup the write acknowledgement array */
- if (!iosm_p->ackarray)
- {
- iosm_p->ackarray = (PINT_client_sm_recv_state *)malloc(
- (target_datafile_count *
- sizeof(PINT_client_sm_recv_state)));
- }
- if (!iosm_p->ackarray)
- {
- goto malloc_error_exit;
- }
- memset(iosm_p->ackarray, 0,
- target_datafile_count*sizeof(PINT_client_sm_recv_state));
- iosm_p->ack_comp_ct = 0;
- }
+ PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+ PINT_client_sm_msgpair_state *msg = &sm_p->msgarray[i];
- /* fill in all the I/O requests */
- for (i = 0; i < target_datafile_count; i++)
- {
- int orig_index = 0;
- PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
- assert(msg_p);
+ assert(msg && cur_ctx);
+
+ memset(cur_ctx, 0, sizeof(PINT_client_io_ctx));
+ memset(msg, 0, sizeof(PINT_client_sm_msgpair_state));
+
+ gossip_debug(GOSSIP_IO_DEBUG, "initializing context[%d] %p\n",
+ i, cur_ctx);
+
+ cur_ctx->msg = msg;
+ cur_ctx->index = i;
+ cur_ctx->server_nr = sm_p->u.io.datafile_index_array[i];
+ cur_ctx->data_handle =
+ attr->u.meta.dfile_array[cur_ctx->server_nr];
- gossip_debug(GOSSIP_IO_DEBUG, " sending I/O request "
- "for %Lu\n", Lu(target_datafile_array[i]));
+ PINT_flow_reset(&cur_ctx->flow_desc);
- /* find the original index for the handle */
- for (orig_index = 0; orig_index < iosm_p->datafile_count;
- orig_index++)
- {
- if (target_datafile_array[i] ==
- iosm_p->datafile_handles[orig_index])
- {
- break;
- }
- }
- assert(orig_index < iosm_p->datafile_count);
+ gossip_debug(GOSSIP_IO_DEBUG, " filling I/O request "
+ "for %Lu\n", Lu(cur_ctx->data_handle));
PINT_SERVREQ_IO_FILL(
- msg_p->req,
+ msg->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
- target_datafile_array[i],
- iosm_p->io_type,
- iosm_p->flowproto_type,
- orig_index,
- iosm_p->datafile_count,
- iosm_p->dist_p,
- iosm_p->file_req,
- iosm_p->file_req_offset,
- PINT_REQUEST_TOTAL_BYTES(iosm_p->mem_req));
+ cur_ctx->data_handle,
+ sm_p->u.io.io_type,
+ sm_p->u.io.flowproto_type,
+ sm_p->u.io.datafile_index_array[i],
+ attr->u.meta.dfile_count,
+ attr->u.meta.dist,
+ sm_p->u.io.file_req,
+ sm_p->u.io.file_req_offset,
+ PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
/* fill in msgpair structure components */
- msg_p->fs_id = sm_p->object_ref.fs_id;
- msg_p->handle = target_datafile_array[i];
- msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
- msg_p->comp_fn = NULL;
-
- ret = PINT_bucket_map_to_server(&msg_p->svr_addr,
- msg_p->handle,
- msg_p->fs_id);
+ msg->fs_id = sm_p->object_ref.fs_id;
+ msg->handle = cur_ctx->data_handle;
+ msg->retry_flag = PVFS_MSGPAIR_NO_RETRY;
+ msg->comp_fn = NULL;
+
+ ret = PINT_bucket_map_to_server(
+ &msg->svr_addr, msg->handle, msg->fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
@@ -562,690 +507,503 @@ static int io_datafile_setup_msgpairs(PI
}
}
- /* swap the new list in for the old one, freeing the old list */
- if (!sm_p->acache_hit && iosm_p->datafile_handles)
- {
- free(iosm_p->datafile_handles);
- }
-
- if (target_datafile_array)
- {
- iosm_p->datafile_handles = target_datafile_array;
- }
-
- /*
- store the original datafile_count before it's modified,
- as we'll need it later in io_datafile_complete_msgpairs
- */
- iosm_p->orig_datafile_count = iosm_p->datafile_count;
- iosm_p->datafile_count = target_datafile_count;
+ sm_p->u.io.datafile_count = target_datafile_count;
+ js_p->error_code = 0;
return 1;
malloc_error_exit:
- CLEAN_PRIVATE_MEMBERS(iosm_p);
+ CLEAN_PRIVATE_MEMBERS(sm_p);
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
-/* io_datafile_post_msgpairs()
- *
- * This is basically a copy of msgpairarray.c:msgpairarray_post().
- * We need to handle the rest of the process somewhat differently though,
- * so we're going to have our own versions here.
- *
- * We use the msgarray to keep up with the initial send/recv pairs.
- *
- */
+/*
+ This is based on msgpairarray_post() in msgpairarray.c. It's
+ different enough in that we don't have to wait on the msgpairarray
+ operations to all complete before posting flows as we can for each
+ server individually when we're ready. this avoids the msgpairarray
+ sync point implicit in the design
+*/
static int io_datafile_post_msgpairs(PINT_client_sm *sm_p,
job_status_s *js_p)
{
int ret = -PVFS_EINVAL, i = 0;
+ unsigned long status_user_tag = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
"state: post (%d message(s))\n", sm_p->msgarray_count);
+ if (sm_p->op_cancelled)
+ {
+ js_p->error_code = -PVFS_ECANCEL;
+ return 1;
+ }
+
js_p->error_code = 0;
- /* set number of operations that must complete.
- *
- * NOTE: we're using the comp_ct in the first msgpair
- * entry to keep up with the count for the entire array.
- */
assert(sm_p->msgarray);
- assert(sm_p->msgarray_count > 0);
+ assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
- sm_p->msgarray[0].comp_ct = (2 * sm_p->msgarray_count);
+ /* completion count tracks sends/recvs separately */
+ sm_p->u.io.msgpair_completion_count = (2 * sm_p->u.io.datafile_count);
- for (i = 0; i < sm_p->msgarray_count; i++)
+ for(i = 0; i < sm_p->u.io.datafile_count; i++)
{
- PVFS_msg_tag_t session_tag;
- PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
- assert(msg_p);
+ PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+ PINT_client_sm_msgpair_state *msg = &sm_p->msgarray[i];
+
+ assert(cur_ctx && msg);
+ assert(cur_ctx->msg == msg);
- ret = PINT_encode(&msg_p->req,
- PINT_ENCODE_REQ,
- &msg_p->encoded_req,
- msg_p->svr_addr,
- sm_p->u.io.encoding);
- if (ret != 0)
+ ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
+ msg->svr_addr, sm_p->u.io.encoding);
+ if (ret)
{
- gossip_lerr("pint_encode failed\n");
- js_p->error_code = -PVFS_EIO;
+ /*
+ FIXME: make this a clean error transition by
+ adjusting the completion count and/or (not) exiting
+ */
+ PVFS_perror_gossip("PINT_encode failed", ret);
+ js_p->error_code = ret;
return 1;
}
- /* calculate maximum response message size and allocate space.
- * fills in max_resp_sz, encoded_resp_p
- */
- msg_p->max_resp_sz = PINT_encode_calc_max_size(
- PINT_ENCODE_RESP, msg_p->req.op, sm_p->u.io.encoding);
- msg_p->encoded_resp_p = BMI_memalloc(
- msg_p->svr_addr, msg_p->max_resp_sz, BMI_RECV);
- if (!msg_p->encoded_resp_p)
+ /* calculate maximum response message size and allocate it */
+ msg->max_resp_sz = PINT_encode_calc_max_size(
+ PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
+ msg->encoded_resp_p = BMI_memalloc(
+ msg->svr_addr, msg->max_resp_sz, BMI_RECV);
+ if (!msg->encoded_resp_p)
{
+ /* FIXME: see above FIXME */
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
- /* get session tag to associate with send and receive.
- * session tag is kept in io part of client state machine
- * structure for use in the flow and option final ack.
- */
- session_tag = get_next_session_tag();
- sm_p->u.io.session_tag_array[i] = session_tag;
+ /*
+ recalculate the status user tag based on this the progress
+ of the current context like this: status_user_tag = (4 *
+ (context index) + context phase)
+ */
+ assert(cur_ctx->index == i);
+ status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+ gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
+ "status_user_tag=%lu (max_size %d)\n",
+ status_user_tag, msg->max_resp_sz);
+
+ cur_ctx->session_tag = get_next_session_tag();
- /* post receive of response; job_id stored in recv_id */
ret = job_bmi_recv(
- msg_p->svr_addr,
- msg_p->encoded_resp_p,
- msg_p->max_resp_sz,
- session_tag,
- BMI_PRE_ALLOC,
- sm_p,
- i,
- &msg_p->recv_status,
- &msg_p->recv_id,
- pint_client_sm_context,
- PVFS2_CLIENT_JOB_TIMEOUT);
+ msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
+ cur_ctx->session_tag, BMI_PRE_ALLOC, sm_p, status_user_tag,
+ &msg->recv_status, &msg->recv_id, pint_client_sm_context,
+ PVFS2_CLIENT_JOB_TIMEOUT);
+
+ cur_ctx->msg_recv_has_been_posted = 1;
+ cur_ctx->msg_recv_in_progress = 1;
+
+ if (ret == 0)
+ {
+ int tmp = 0;
+ /* perform a quick test to see if the recv failed before
+ * posting the send; if it reports an error quickly then
+ * we can save the confusion of sending a request for
+ * which we can't recv a response
+ */
+ ret = job_test(msg->recv_id, &tmp, NULL,
+ &msg->recv_status, 0,
+ pint_client_sm_context);
+ }
- if (ret != 0)
+ if ((ret < 0) || (ret == 1))
{
- /* it shouldn't be possible for the receive to complete
- * before we send the request.
- */
- assert(ret != 1);
+ /*
+ this recv can't complete yet without errors because we
+ haven't sent the request yet
+ */
+ assert((ret < 0) || (msg->recv_status.error_code != 0));
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("Post of receive failed", ret);
+ }
+ else
+ {
+ PVFS_perror_gossip("Receive immediately failed",
+ msg->recv_status.error_code);
+ }
+
+ PVFS_perror_gossip("recv completed before send failure", ret);
- gossip_lerr("post of receive failed (ret = %d)\n", ret);
- js_p->error_code = -PVFS_EIO;
+ cur_ctx->msg_recv_in_progress = 0;
+ sm_p->u.io.msgpair_completion_count--;
+
+ /* FIXME: see above FIXME */
+ js_p->error_code = ret;
return 1;
}
+ assert(ret == 0);
+
+ assert(cur_ctx->index == i);
+ status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
+
+ gossip_debug(GOSSIP_IO_DEBUG," posting send with "
+ "status_user_tag=%lu\n", status_user_tag);
- /* post send of request; job_id stored in send_id */
ret = job_bmi_send_list(
- msg_p->encoded_req.dest,
- msg_p->encoded_req.buffer_list,
- msg_p->encoded_req.size_list,
- msg_p->encoded_req.list_count,
- msg_p->encoded_req.total_size,
- session_tag,
- msg_p->encoded_req.buffer_type,
- 1,
- sm_p,
- (sm_p->msgarray_count + i),
- &msg_p->send_status,
- &msg_p->send_id,
- pint_client_sm_context,
- PVFS2_CLIENT_JOB_TIMEOUT);
+ msg->encoded_req.dest, msg->encoded_req.buffer_list,
+ msg->encoded_req.size_list, msg->encoded_req.list_count,
+ msg->encoded_req.total_size, cur_ctx->session_tag,
+ msg->encoded_req.buffer_type, 1, sm_p, status_user_tag,
+ &msg->send_status, &msg->send_id, pint_client_sm_context,
+ PVFS2_CLIENT_JOB_TIMEOUT);
- if (ret < 0)
+ cur_ctx->msg_send_has_been_posted = 1;
+ cur_ctx->msg_send_in_progress = 1;
+
+ if ((ret < 0) ||
+ ((ret == 1) && (msg->send_status.error_code != 0)))
{
- gossip_lerr("post of send failed\n");
- js_p->error_code = -PVFS_EIO;
- return 1;
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("Post of send failed", ret);
+ }
+ else
+ {
+ PVFS_perror_gossip("Send immediately failed",
+ msg->recv_status.error_code);
+ }
+
+ msg->op_status = msg->send_status.error_code;
+ msg->send_id = 0;
+
+ /*
+ cancel the recv and decrement the completion count, but
+ still wait for the recv to complete
+ */
+ gossip_err("Send error: canceling recv.\n");
+
+ PINT_client_bmi_cancel(msg->recv_id);
+ cur_ctx->msg_send_in_progress = 0;
+ sm_p->u.io.msgpair_completion_count--;
+
+ /* FIXME: see above FIXME */
}
else if (ret == 1)
{
- /*
- send completed immediately; decrement the completion
- counter
- */
gossip_debug(
GOSSIP_IO_DEBUG, " io_datafile_post_msgpairs: "
"send completed immediately.\n");
/* 0 is the valid "completed job id" value */
- msg_p->send_id = 0;
+ msg->send_id = 0;
- if (msg_p->send_status.error_code != 0)
+ cur_ctx->msg_send_in_progress = 0;
+ sm_p->u.io.msgpair_completion_count--;
+
+ if (msg->send_status.error_code != 0)
{
- PVFS_perror_gossip(
- "sys-io.sm: msg_p->send_status.error_code",
- msg_p->send_status.error_code);
- js_p->error_code = msg_p->send_status.error_code;
+ PVFS_perror_gossip("send status failure",
+ msg->send_status.error_code);
+
+ /* FIXME: see above FIXME */
+ js_p->error_code = msg->send_status.error_code;
return 1;
}
-
- /* decrement our count, since send is already done.
- *
- * recall we're using the comp_ct in the first array
- * element to keep up with our count for the entire
- * array.
- */
- sm_p->msgarray[0].comp_ct--;
}
}
+
+ gossip_debug(GOSSIP_IO_DEBUG, "io_datafile_post_msgpairs: "
+ "completion count is %d\n",
+ sm_p->u.io.msgpair_completion_count);
+
+ js_p->error_code = 0;
return 0;
}
-/* io_datafile_complete_msgpairs()
- *
- * This started off as a copy of msgpairarray.c:msgpairarray_complete(),
- * but we need to post flow operations as the bmi operations complete,
- * so we have some additional work to do.
- */
-static int io_datafile_complete_msgpairs(PINT_client_sm *sm_p,
- job_status_s *js_p)
+/*
+ This state allows us to make sure all posted operations complete and
+ are accounted for. since this handles ALL operation completions,
+ there's special case handling of completing the msgpair recv. in
+ this case we post the flow operations as soon as we see them (the
+ main motivation for not using the common msgpairarray code).
+*/
+static int io_datafile_complete_operations(PINT_client_sm *sm_p,
+ job_status_s *js_p)
{
- int i = 0, ret = -1;
- int matched_msgpair = 0, matched_flow = 0;
- int matched_ack = 0, recv_match = -1;
- job_id_t tmp_id;
- PINT_client_sm_msgpair_state *msg_p = NULL;
+ int ret = -PVFS_EINVAL, index = 0;
+ unsigned long status_user_tag = (unsigned long)
+ js_p->status_user_tag;
+ PINT_client_io_ctx *cur_ctx = NULL;
+ int matched_send_or_recv = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG, "io state: "
- "datafile_complete_msgpairs\n");
-
- assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
- assert(sm_p->msgarray[0].comp_ct >= 0);
- assert(sm_p->u.io.flow_comp_ct >= 0);
- assert(sm_p->u.io.ack_comp_ct >= 0);
- assert(js_p->status_user_tag < (sm_p->msgarray_count*4));
-
- /* if there are outstanding msgpairs to complete, try to match
- * whatever completed with something in the msgpair array
- */
- if (sm_p->msgarray[0].comp_ct > 0)
- {
- if (js_p->status_user_tag < sm_p->msgarray_count)
- {
- /* first N user tags are for receives */
- i = js_p->status_user_tag;
- msg_p = &sm_p->msgarray[i];
- matched_msgpair = 1;
- recv_match = js_p->status_user_tag;
- msg_p->recv_id = 0;
- msg_p->recv_status = *js_p;
- assert(msg_p->recv_status.actual_size <= msg_p->max_resp_sz);
- assert(msg_p->recv_status.error_code <= 0);
- }
- else if (js_p->status_user_tag < (sm_p->msgarray_count*2))
- {
- /* second N user tags are for sends */
- i = js_p->status_user_tag - sm_p->msgarray_count;
- msg_p = &sm_p->msgarray[i];
- matched_msgpair = 1;
- msg_p->send_id = 0;
- msg_p->send_status = *js_p;
- assert(msg_p->send_status.error_code <= 0);
- }
+ "(%p) datafile_complete_operations "
+ "(got user tag %lu)\n", sm_p, status_user_tag);
- if (matched_msgpair)
+ if (js_p->error_code)
+ {
+ PVFS_perror_gossip("io_datafile_complete_operations failed",
+ js_p->error_code);
+ /*
+ if we looped back here following an error, we need to figure
+ out what happened and cleanup properly
+ */
+ if ((sm_p->u.io.msgpair_completion_count == 0) &&
+ (sm_p->u.io.flow_completion_count == 0) &&
+ (sm_p->u.io.write_ack_completion_count == 0))
{
- /* decrement comp_ct until all operations have completed */
- sm_p->msgarray[0].comp_ct--;
+ gossip_err("*** error path and all operations are "
+ "complete\n");
- if (sm_p->msgarray[0].comp_ct == 0)
- {
- gossip_debug(GOSSIP_IO_DEBUG, "all msgpairs complete.\n");
- }
+ /* preserve this error code for io_analyze_results */
+ sm_p->u.io.stored_error_code = js_p->error_code;
+
+ js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
+ return 1;
}
- }
- if (matched_msgpair && (recv_match == -1))
- {
- gossip_debug(GOSSIP_IO_DEBUG,
- " matched send to %d; continuing.\n", i);
- return 0;
+ gossip_err("*** error path with %d msgpairs pending, %d flows "
+ "pending, %d write acks pending\n",
+ sm_p->u.io.msgpair_completion_count,
+ sm_p->u.io.flow_completion_count,
+ sm_p->u.io.write_ack_completion_count);
}
- /* if we matched a receive, then we need to decode the receive,
- * post the appropriate flow, and possibly post the receive of an ack
- * BMI message (if a write operation).
- *
- * we use the same index into the various arrays in the io part of the
- * state machine state that we used for this receive job (recv_match).
- */
- if (recv_match != -1)
- {
- struct PINT_decoded_msg decoded_resp;
- struct PVFS_server_resp *resp_p = NULL;
-
- gossip_debug(GOSSIP_IO_DEBUG, " matched response from %d.\n", i);
-
- ret = PINT_serv_decode_resp(
- msg_p->fs_id,
- msg_p->encoded_resp_p, &decoded_resp, &msg_p->svr_addr,
- msg_p->recv_status.actual_size, &resp_p);
- if (ret != 0)
- {
- gossip_lerr("io_datafile_complete_msgpairs: decode error\n");
- assert(0);
- }
-
- assert(resp_p->status <= 0);
- msg_p->op_status = resp_p->status;
-
- /* note: we saved the recv_status up above (before decoding) */
- if ((msg_p->recv_status.error_code != 0) ||
- (msg_p->op_status != 0))
+ assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
+ assert(sm_p->u.io.msgpair_completion_count > -1);
+ assert(sm_p->u.io.flow_completion_count > -1);
+ assert(sm_p->u.io.write_ack_completion_count > -1);
+
+ /* check if we're completing a send or recv msgpair */
+ if (STATUS_USER_TAG_IS_SEND_OR_RECV(status_user_tag))
+ {
+ assert(sm_p->u.io.msgpair_completion_count > -1);
+ /*
+ the completion count might be zero when recovering from a
+ cancellation
+ */
+ if (sm_p->u.io.msgpair_completion_count)
{
- if (msg_p->recv_status.error_code)
+ ret = complete_context_send_or_recv(sm_p, js_p);
+ if (ret < 0)
{
- PVFS_perror_gossip("io_datafile_complete_msgpairs",
- msg_p->recv_status.error_code);
+ PVFS_perror_gossip(
+ "complete_context_send_or_recv failed", ret);
+ js_p->error_code = ret;
+ return 1;
}
- gossip_debug(GOSSIP_IO_DEBUG, " error %d with status %d "
- "related to response from %d; not submitting "
- "flow.\n", msg_p->recv_status.error_code,
- msg_p->op_status, recv_match);
-
- if (msg_p->op_status)
+ else if (ret == 0)
{
- gossip_err("Error: op_status: %d, resetting to PVFS_EIO.\n",
- msg_p->op_status);
- js_p->error_code = -PVFS_EIO;
- return 1;
+ gossip_debug(GOSSIP_IO_DEBUG, " matched send in context "
+ "%d; continuing.\n", index);
+ js_p->error_code = 0;
+ return 0;
}
+ assert(ret == IO_RECV_COMPLETED);
- ret = PINT_serv_free_msgpair_resources(
- &msg_p->encoded_req, msg_p->encoded_resp_p,
- &decoded_resp, &msg_p->svr_addr, msg_p->max_resp_sz);
- assert(ret == 0);
-
- return 0;
+ matched_send_or_recv = 1;
}
- else
- {
- flow_descriptor *fl_p = NULL;
- struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
-
- gossip_debug(GOSSIP_IO_DEBUG, " building flow for %d.\n",
- recv_match);
+ }
- /* find matching flow descriptor */
- fl_p = &sm_p->u.io.flow_array[recv_match];
+ /* if we've just completed a recv above, post the flow here */
+ if (ret == IO_RECV_COMPLETED)
+ {
+ ret = process_context_recv_and_post_flow(sm_p, js_p);
+ if (ret < 0)
+ {
+ char buf[64] = {0};
+ PVFS_strerror_r(ret, buf, 64);
- /* fill in file_data structure */
- fl_p->file_data.fsize = resp_p->u.io.bstream_size;
- fl_p->file_data.dist = iosm_p->dist_p;
- fl_p->file_data.server_nr =
- iosm_p->datafile_index_array[recv_match];
- fl_p->file_data.server_ct = iosm_p->orig_datafile_count;
-
- /* this is the file datatype */
- fl_p->file_req = iosm_p->file_req;
- fl_p->file_req_offset = iosm_p->file_req_offset;
+ gossip_err("process_context_recv_and_post_flow "
+ "failed: %s (%d remaining msgpairs)\n", buf,
+ sm_p->u.io.msgpair_completion_count);
- /* this is the memory datatype */
- fl_p->mem_req = iosm_p->mem_req;
+ js_p->error_code = ret;
+ return 1;
+ }
+ }
+ /* check if we've completed all msgpairs and posted all flows */
+ if (matched_send_or_recv)
+ {
+ if (sm_p->u.io.msgpair_completion_count == 0)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, "*** all msgpairs complete "
+ "(all flows posted)\n");
+ }
+ else
+ {
gossip_debug(
- GOSSIP_IO_DEBUG, " bstream_size = %Ld, datafile_nr = "
- "%d, datafile_ct = %d, file_req_off = %Ld\n",
- Ld(fl_p->file_data.fsize), fl_p->file_data.server_nr,
- fl_p->file_data.server_ct, Ld(fl_p->file_req_offset));
-
- /* session tag, same as the one used in the send/recv */
- fl_p->tag = iosm_p->session_tag_array[recv_match];
-
- /* this user_ptr should be left alone */
- fl_p->user_ptr = NULL;
-
- fl_p->type = iosm_p->flowproto_type;
-
- /* done with msgpair resources; free them now */
- ret = PINT_serv_free_msgpair_resources(
- &msg_p->encoded_req, msg_p->encoded_resp_p,
- &decoded_resp, &msg_p->svr_addr, msg_p->max_resp_sz);
- assert(ret == 0);
-
- resp_p = NULL;
+ GOSSIP_IO_DEBUG, "*** %d msgpairs completions "
+ "pending\n", sm_p->u.io.msgpair_completion_count);
+ }
+ return 0;
+ }
- if (iosm_p->io_type == PVFS_IO_READ)
- {
- /* Set up read-specific values
- * - don't keep going past EOF
- * - data is coming from BMI and going into memory
- * - data is coming from the same server we got the recv from
- */
- fl_p->file_data.extend_flag = 0;
- fl_p->src.endpoint_id = BMI_ENDPOINT;
- fl_p->src.u.bmi.address = msg_p->svr_addr;
- fl_p->dest.endpoint_id = MEM_ENDPOINT;
- fl_p->dest.u.mem.buffer = iosm_p->buffer;
- }
- else
- {
- assert (iosm_p->io_type == PVFS_IO_WRITE);
+ /* at this point, we're either completing a flow or a write ack */
+ if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FLOW))
+ {
+ assert(sm_p->u.io.flow_completion_count);
- /* do allow writes to extend the file */
- fl_p->file_data.extend_flag = 1;
- fl_p->src.endpoint_id = MEM_ENDPOINT;
- fl_p->src.u.mem.buffer = iosm_p->buffer;
- fl_p->dest.endpoint_id = BMI_ENDPOINT;
- fl_p->dest.u.bmi.address = msg_p->svr_addr;
-
- gossip_debug(GOSSIP_IO_DEBUG, " preposting write "
- "ack for %d.\n", recv_match);
-
- /* prepost the ack receive now */
- iosm_p->ackarray[recv_match].max_resp_sz =
- PINT_encode_calc_max_size(
- PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
- iosm_p->encoding);
- iosm_p->ackarray[recv_match].encoded_resp_p =
- BMI_memalloc(
- msg_p->svr_addr,
- iosm_p->ackarray[recv_match].max_resp_sz,
- BMI_RECV);
- assert(iosm_p->ackarray[recv_match].encoded_resp_p);
-
- ret = job_bmi_recv(
- msg_p->svr_addr,
- iosm_p->ackarray[recv_match].encoded_resp_p,
- iosm_p->ackarray[recv_match].max_resp_sz,
- iosm_p->session_tag_array[recv_match],
- BMI_PRE_ALLOC,
- sm_p,
- (sm_p->msgarray_count*3 + recv_match),
- &iosm_p->ackarray[recv_match].recv_status,
- &iosm_p->ackarray[recv_match].recv_id,
- pint_client_sm_context,
- PVFS2_CLIENT_JOB_TIMEOUT);
+ index = STATUS_USER_TAG_GET_INDEX(
+ status_user_tag, IO_SM_PHASE_FLOW);
+ cur_ctx = &sm_p->u.io.contexts[index];
+ assert(cur_ctx);
- if (ret < 0)
- {
- gossip_lerr("post of write ack failed\n");
- assert(0);
- }
- assert(ret == 0);
-
- iosm_p->ack_comp_ct++;
- }
+ cur_ctx->flow_status = *js_p;
- /* post the flow:
- * - pass sm_p in as user pointer, so we get called again
- * - have status stored in flow status array, if we complete
- * immed.
- * - save id in flow id array
- * - use recv_match as index for all this
- */
- ret = job_flow(
- fl_p, sm_p, (sm_p->msgarray_count*2 + recv_match),
- &iosm_p->flow_status_array[recv_match], &tmp_id,
- pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+ gossip_debug(GOSSIP_IO_DEBUG, " matched completed flow for "
+ "context %p\n", cur_ctx);
- if (ret < 0)
- {
- gossip_lerr("post of flow failed.\n");
- assert(0);
- }
- else if (ret == 1)
- {
- /* flow completed immediately */
- gossip_debug(GOSSIP_IO_DEBUG, " flow for %d completed "
- "immediately!\n", recv_match);
- assert(!iosm_p->flow_status_array[recv_match].error_code);
- }
- else
- {
- assert(ret == 0);
+ cur_ctx->flow_in_progress = 0;
+ sm_p->u.io.flow_completion_count--;
+ assert(sm_p->u.io.flow_completion_count > -1);
+ }
+ else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
+ {
+ assert(sm_p->u.io.write_ack_completion_count);
- gossip_debug(GOSSIP_IO_DEBUG, " posted flow for %d.\n",
- recv_match);
+ index = STATUS_USER_TAG_GET_INDEX(
+ status_user_tag, IO_SM_PHASE_FINAL_ACK);
+ cur_ctx = &sm_p->u.io.contexts[index];
+ assert(cur_ctx);
- /* record that we have a flow that still needs to complete */
- iosm_p->flow_comp_ct++;
- }
- }
- return 0;
- }
+ assert(cur_ctx->write_ack.recv_status.actual_size <=
+ cur_ctx->write_ack.max_resp_sz);
- /* check to ensure we caught all exits above */
- assert(matched_msgpair == 0);
+ cur_ctx->write_ack.recv_id = 0;
+ cur_ctx->write_ack.recv_status = *js_p;
- /* if we didn't match to a message pair, then we completed either
- * a flow or a write ack
- */
- gossip_debug(GOSSIP_IO_DEBUG, " trying to match to a flow or ack.\n");
-
- if ((js_p->status_user_tag >= (sm_p->msgarray_count*2)) &&
- (js_p->status_user_tag < (sm_p->msgarray_count*3)))
- {
- /* the third set of N user tags correspond to flows */
- matched_flow = 1;
- i = js_p->status_user_tag - (sm_p->msgarray_count*2);
- sm_p->u.io.flow_status_array[js_p->status_user_tag -
- (sm_p->msgarray_count*2)] = *js_p;
- sm_p->u.io.flow_comp_ct--;
- gossip_debug(GOSSIP_IO_DEBUG,
- " matched completed flow for %d\n", i);
+ gossip_debug(GOSSIP_IO_DEBUG, " matched completed ack for "
+ "context %p\n", cur_ctx);
- assert(sm_p->u.io.flow_comp_ct >= 0);
+ cur_ctx->write_ack_in_progress = 0;
+ sm_p->u.io.write_ack_completion_count--;
+ assert(sm_p->u.io.write_ack_completion_count > -1);
}
- else if (js_p->status_user_tag >= (sm_p->msgarray_count*3))
- {
- /* the fourth set of N user tags correspond to final acks */
- matched_ack = 1;
- i = js_p->status_user_tag - (sm_p->msgarray_count*3);
- sm_p->u.io.ackarray[i].recv_id = 0;
- sm_p->u.io.ackarray[i].recv_status = *js_p;
- assert(sm_p->u.io.ackarray[i].recv_status.actual_size <=
- sm_p->u.io.ackarray[i].max_resp_sz);
- sm_p->u.io.ack_comp_ct--;
- gossip_debug(GOSSIP_IO_DEBUG,
- " matched completed ack for %d\n", i);
- assert(sm_p->u.io.ack_comp_ct >= 0);
- }
-
- if ((sm_p->u.io.flow_comp_ct == 0) &&
- (sm_p->u.io.ack_comp_ct == 0) &&
- (sm_p->msgarray[0].comp_ct == 0))
- {
- gossip_debug(GOSSIP_IO_DEBUG,
- " all msgpairs, flows, ack completed.\n");
+ js_p->error_code = 0;
+
+ if ((sm_p->u.io.msgpair_completion_count == 0) &&
+ (sm_p->u.io.flow_completion_count == 0) &&
+ (sm_p->u.io.write_ack_completion_count == 0))
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, "*** all operations %s "
+ "(msgspairs, flows, and write acks)\n",
+ (sm_p->op_cancelled ? "cancelled" : "completed"));
js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
return 1;
}
- /* there's still something left to transfer */
- return 0;
-}
-
-static int io_object_getattr_failure(PINT_client_sm *sm_p,
- job_status_s *js_p)
-{
- gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
- "io_object_getattr_failure\n", sm_p);
-
- /*
- NOTE: this can easily happen if we're doing I/O on a file that
- was removed by another process
- */
- if (js_p->error_code == 0)
+ if (sm_p->op_cancelled)
{
- js_p->error_code = -PVFS_ENOENT;
+ gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
+ "%d flow(s) and %d write ack(s) pending\n",
+ sm_p->u.io.flow_completion_count,
+ sm_p->u.io.write_ack_completion_count);
}
-
- sm_p->u.io.stored_error_code = js_p->error_code;
- return 1;
+ else
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, " %d flows pending, %d write acks "
+ "pending\n", sm_p->u.io.flow_completion_count,
+ sm_p->u.io.write_ack_completion_count);
+ }
+ return 0;
}
static int io_analyze_results(PINT_client_sm *sm_p,
job_status_s *js_p)
{
- int i = 0, ret = -PVFS_EINVAL, error = 0, error_count = 0;
+ int ret = -PVFS_EINVAL, i = 0;
PVFS_size total_size = 0;
- struct PINT_client_io_sm *iosm_p = &sm_p->u.io;
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
- "analyze_results\n", sm_p);
+ "io_analyze_results\n", sm_p);
if (js_p->error_code != IO_DATAFILE_TRANSFERS_COMPLETE)
{
- /* some sort of error occurred early on */
- js_p->error_code = (sm_p->u.io.stored_error_code ?
- sm_p->u.io.stored_error_code :
- js_p->error_code);
- if (js_p->error_code == 0)
+ ret = (sm_p->u.io.stored_error_code ?
+ sm_p->u.io.stored_error_code :
+ js_p->error_code);
+
+ if (ret == 0)
{
- gossip_err("Error: lost error_code; resetting to PVFS_EIO.\n");
- js_p->error_code = -PVFS_EIO;
+ ret = (sm_p->op_cancelled ? -PVFS_ECANCEL : -PVFS_EIO);
}
- error = js_p->error_code;
}
- else
+ else if (!sm_p->op_cancelled)
{
- /* it's possible that some error occurred still. look through
- * all the messages, reporting errors, saving the first one to
- * return, and adding up the size of the transfer (just in
- * case things actually completed).
- */
+ /*
+ look through all the contexts for errors, saving the first
+ one to return (if any) while adding up the size of the
+ transfer (in case things actually completed).
+ */
+ assert(sm_p->msgarray_count == sm_p->u.io.datafile_count);
for(i = 0; i < sm_p->msgarray_count; i++)
{
- PINT_client_sm_msgpair_state *msg_p = &sm_p->msgarray[i];
+ PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
+ assert(cur_ctx);
- if (msg_p->send_status.error_code != 0)
+ ret = check_context_status(
+ cur_ctx, sm_p->u.io.io_type, &total_size);
+ if (ret < 0)
{
- gossip_debug(GOSSIP_IO_DEBUG,
- " error (%d) in msgpair send for %d.\n",
- msg_p->send_status.error_code, i);
- error_count++;
- if (error == 0)
+ if (ret == -PVFS_ECANCEL)
{
- error = msg_p->send_status.error_code;
+ gossip_debug(GOSSIP_IO_DEBUG, "*** I/O operation "
+ "cancelled\n");
}
- }
- else if (msg_p->recv_status.error_code != 0)
- {
- gossip_debug(GOSSIP_IO_DEBUG,
- " error (%d) in msgpair recv for %d.\n",
- msg_p->recv_status.error_code, i);
- error_count++;
- if (error == 0)
+ else
{
- error = msg_p->recv_status.error_code;
+ PVFS_perror_gossip(
+ "check_context_status found error", ret);
}
+ break;
}
- else if (iosm_p->flow_array[i].file_req != NULL)
- {
- /* NOTE: this is a little bit of a hack to determine if we
- * have actually used this flow descriptor; we don't
- * really care what the file_req is
- */
- if (iosm_p->flow_status_array[i].error_code != 0)
- {
- gossip_debug(
- GOSSIP_IO_DEBUG, " error (%d) in flow for %d.\n",
- iosm_p->flow_status_array[i].error_code, i);
- error_count++;
- if (error == 0)
- {
- error = iosm_p->flow_status_array[i].error_code;
- }
- }
- else if (iosm_p->io_type == PVFS_IO_READ)
- {
- /* size for reads is reported in the flow;
- * size for writes is reported in the final ack.
- */
- gossip_debug(
- GOSSIP_IO_DEBUG, " %Ld bytes read from %d.\n",
- Ld(iosm_p->flow_array[i].total_transfered), i);
- total_size += iosm_p->flow_array[i].total_transfered;
- }
- else if (iosm_p->io_type == PVFS_IO_WRITE)
- {
- if (iosm_p->ackarray[i].recv_status.error_code != 0)
- {
- gossip_debug(
- GOSSIP_IO_DEBUG,
- " error (%d) in write ack for %d.\n",
- iosm_p->ackarray[i].recv_status.error_code,
- i);
- error_count++;
- if (error == 0)
- {
- error = iosm_p->ackarray[i].recv_status.error_code;
- }
- }
- else
- {
- struct PINT_decoded_msg decoded_resp;
- struct PVFS_server_resp *resp_p = NULL;
-
- /* we need to decode the write ack so we can
- * see how much data we were actually able to
- * write.
- *
- * note: we just use the svr_addr from the
- * msgpair; it's the same address.
- */
- ret = PINT_serv_decode_resp(
- msg_p->fs_id,
- iosm_p->ackarray[i].encoded_resp_p,
- &decoded_resp,
- &msg_p->svr_addr,
- iosm_p->ackarray[i].recv_status.actual_size,
- &resp_p);
- assert(ret == 0);
-
- gossip_debug(
- GOSSIP_IO_DEBUG, "%Ld bytes written to %d.\n",
- Ld(resp_p->u.write_completion.total_completed),
- i);
- total_size +=
- resp_p->u.write_completion.total_completed;
- PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
- }
- }
+ gossip_debug(GOSSIP_IO_DEBUG, "[%d/%d] running size is %Ld\n",
+ (i + 1), sm_p->msgarray_count, Ld(total_size));
+ }
- /* free resources associated with this flow array
- * element: - write ack, if it was a write (we decoded
- * it if we needed to above, and got rid of that
- * already)
- */
- PINT_flow_reset(&sm_p->u.io.flow_array[i]);
- if (iosm_p->io_type == PVFS_IO_WRITE)
- {
- BMI_memfree(msg_p->svr_addr,
- iosm_p->ackarray[i].encoded_resp_p,
- iosm_p->ackarray[i].max_resp_sz,
- BMI_RECV);
- }
- }
+ /*
+ at this point, we know an error occurred. if we couldn't
+ find any errors in the context, use the preserved error code
+ from the complete_operations state
+ */
+ if (ret == 0)
+ {
+ char buf[64] = {0};
+
+ ret = (sm_p->op_cancelled ? -PVFS_ECANCEL :
+ sm_p->u.io.stored_error_code);
+
+ PVFS_strerror_r(ret, buf, 64);
+ gossip_debug(GOSSIP_IO_DEBUG, "no context errors found; "
+ "using: %s\n", buf);
}
}
-
- if (!sm_p->acache_hit)
+ else
{
- free(sm_p->u.io.datafile_handles);
- sm_p->u.io.datafile_handles = NULL;
+ ret = (sm_p->op_cancelled ? -PVFS_ECANCEL : -PVFS_EIO);
}
+ /* be sure there are no jobs still laying around */
+ assert((sm_p->u.io.msgpair_completion_count == 0) &&
+ (sm_p->u.io.flow_completion_count == 0) &&
+ (sm_p->u.io.write_ack_completion_count == 0));
+
/*
FIXME: non bmi errors pop out in flow failures above -- they are
not properly marked as flow errors either, so we check for them
explicitly here (but not all -- fix it for real).
*/
- if (((PVFS_ERROR_CLASS(-error) == PVFS_ERROR_BMI) ||
- (PVFS_ERROR_CLASS(-error) == PVFS_ERROR_FLOW) ||
- (error == -ECONNRESET)) &&
+ if (((PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_BMI) ||
+ (PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_FLOW) ||
+ (ret == -ECONNRESET) || (ret == -PVFS_EPROTO)) &&
(sm_p->u.io.retry_count < PVFS2_CLIENT_RETRY_LIMIT))
{
if (sm_p->acache_hit || sm_p->pinode)
@@ -1255,6 +1013,8 @@ static int io_analyze_results(PINT_clien
sm_p->pinode = NULL;
}
+ assert(!sm_p->op_cancelled);
+
sm_p->u.io.stored_error_code = 0;
sm_p->u.io.retry_count++;
@@ -1265,44 +1025,483 @@ static int io_analyze_results(PINT_clien
return 1;
}
- CLEAN_PRIVATE_MEMBERS(iosm_p);
+ gossip_debug(GOSSIP_IO_DEBUG, "total bytes transferred is %Ld\n",
+ Ld(total_size));
- /* return size, error, and set operation as complete */
sm_p->u.io.io_resp_p->total_completed = total_size;
- sm_p->error_code = (sm_p->u.io.stored_error_code ?
- sm_p->u.io.stored_error_code : error);
- sm_p->op_complete = 1;
+ js_p->error_code = ret;
+
+ return 1;
+}
+
+static int io_cleanup(PINT_client_sm *sm_p,
+ job_status_s *js_p)
+{
+ gossip_debug(GOSSIP_CLIENT_DEBUG,
+ "(%p) io state: io_cleanup\n", sm_p);
+
+ CLEAN_PRIVATE_MEMBERS(sm_p);
+
+ sm_p->error_code = js_p->error_code;
+ if (sm_p->error_code)
+ {
+ char buf[64] = {0};
+
+ PVFS_strerror_r(sm_p->error_code, buf, 64);
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "*** Final I/O operation error is %s\n", buf);
+ }
+
+ sm_p->op_complete = 1;
return 0;
}
/********************************************************************/
-/* io_find_target_datafiles()
- *
- * determines what subset of the datafiles actually contain data that
- * we are interested in for this request
- *
- * returns 0 on success, -pvfs_error on failure
- *
- */
-static int io_find_target_datafiles(PVFS_Request mem_req,
- PVFS_Request file_req,
- PVFS_offset file_req_offset,
- PINT_dist *dist_p,
- PVFS_handle *input_handle_array,
- int input_handle_count,
- PVFS_handle *output_handle_array,
- int* handle_index_array,
- int *handle_count_out_p)
+/*
+ returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
+ and -PVFS_error on failure
+*/
+static inline int complete_context_send_or_recv(
+ PINT_client_sm *sm_p,
+ job_status_s *js_p)
{
- int i, ret;
+ int ret = -PVFS_EINVAL, index = 0;
+ unsigned long status_user_tag = 0;
+ PINT_client_io_ctx *cur_ctx = NULL;
+ PINT_client_sm_msgpair_state *msg = NULL;
+
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "- complete_context_send_or_recv called\n");
+
+ assert(sm_p && js_p);
+
+ status_user_tag = (unsigned long)js_p->status_user_tag;
+
+ if (STATUS_USER_TAG_TYPE(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV))
+ {
+ index = STATUS_USER_TAG_GET_INDEX(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "got a recv completion with "
+ "context index %d\n", index);
+
+ cur_ctx = &sm_p->u.io.contexts[index];
+ assert(cur_ctx);
+
+ msg = &sm_p->msgarray[index];
+ msg->recv_id = 0;
+ msg->recv_status = *js_p;
+
+ assert(msg->recv_status.error_code <= 0);
+ assert(msg->recv_status.actual_size <= msg->max_resp_sz);
+
+ cur_ctx->msg_recv_in_progress = 0;
+ sm_p->u.io.msgpair_completion_count--;
+
+ ret = IO_RECV_COMPLETED;
+ }
+ else if (STATUS_USER_TAG_TYPE(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
+ {
+ index = STATUS_USER_TAG_GET_INDEX(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+ gossip_debug(GOSSIP_IO_DEBUG, "got a send completion with "
+ "context index %d\n", index);
+
+ cur_ctx = &sm_p->u.io.contexts[index];
+ assert(cur_ctx);
+
+ msg = &sm_p->msgarray[index];
+ msg->send_id = 0;
+ msg->send_status = *js_p;
+
+ assert(msg->send_status.error_code <= 0);
+
+ cur_ctx->msg_send_in_progress = 0;
+ sm_p->u.io.msgpair_completion_count--;
+
+ ret = 0;
+ }
+ return ret;
+}
+
+static inline int process_context_recv(
+ PINT_client_io_ctx *cur_ctx,
+ struct PINT_decoded_msg *decoded_resp,
+ struct PVFS_server_resp **resp)
+{
+ int ret = -PVFS_EINVAL;
+
+ gossip_debug(GOSSIP_IO_DEBUG, "- process_context_recv called\n");
+
+ assert(cur_ctx && cur_ctx->msg && decoded_resp && resp);
+
+ ret = PINT_serv_decode_resp(
+ cur_ctx->msg->fs_id, cur_ctx->msg->encoded_resp_p, decoded_resp,
+ &cur_ctx->msg->svr_addr,
+ cur_ctx->msg->recv_status.actual_size, resp);
+
+ if (ret)
+ {
+ PVFS_perror("PINT_server_decode_resp failed", ret);
+ return ret;
+ }
+
+ assert((*resp)->status < 1);
+ cur_ctx->msg->op_status = (*resp)->status;
+
+ if (cur_ctx->msg->recv_status.error_code || cur_ctx->msg->op_status)
+ {
+ gossip_err(" error %d with status %d related to response "
+ "from context %p; not submitting flow.\n",
+ cur_ctx->msg->recv_status.error_code,
+ cur_ctx->msg->op_status, cur_ctx);
+
+ if (cur_ctx->msg->recv_status.error_code)
+ {
+ PVFS_perror_gossip(
+ "process_context_recv (recv_status.error_code)",
+ cur_ctx->msg->recv_status.error_code);
+ ret = cur_ctx->msg->recv_status.error_code;
+ }
+ else if (cur_ctx->msg->op_status)
+ {
+ PVFS_perror_gossip("process_context_recv (op_status)",
+ cur_ctx->msg->op_status);
+ ret = cur_ctx->msg->op_status;
+ }
+
+ PINT_serv_free_msgpair_resources(
+ &cur_ctx->msg->encoded_req, cur_ctx->msg->encoded_resp_p,
+ decoded_resp, &cur_ctx->msg->svr_addr,
+ cur_ctx->msg->max_resp_sz);
+ }
+ return ret;
+}
+
+static inline int build_context_flow(
+ PINT_client_sm *sm_p,
+ PINT_client_io_ctx *cur_ctx,
+ PVFS_object_attr *attr,
+ struct PVFS_server_resp *resp)
+{
+ gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
+
+ if (!sm_p || !cur_ctx || !attr || !resp)
+ {
+ return -PVFS_EINVAL;
+ }
+
+ cur_ctx->flow_desc.file_data.fsize = resp->u.io.bstream_size;
+ cur_ctx->flow_desc.file_data.dist = attr->u.meta.dist;
+ cur_ctx->flow_desc.file_data.server_nr = cur_ctx->server_nr;
+ cur_ctx->flow_desc.file_data.server_ct = attr->u.meta.dfile_count;
+
+ cur_ctx->flow_desc.file_req = sm_p->u.io.file_req;
+ cur_ctx->flow_desc.file_req_offset = sm_p->u.io.file_req_offset;
+
+ cur_ctx->flow_desc.mem_req = sm_p->u.io.mem_req;
+
+ gossip_debug(
+ GOSSIP_IO_DEBUG, " bstream_size = %Ld, datafile_nr = "
+ "%d, datafile_ct = %d, file_req_off = %Ld\n",
+ Ld(cur_ctx->flow_desc.file_data.fsize),
+ cur_ctx->flow_desc.file_data.server_nr,
+ cur_ctx->flow_desc.file_data.server_ct,
+ Ld(cur_ctx->flow_desc.file_req_offset));
+
+ cur_ctx->flow_desc.tag = cur_ctx->session_tag;
+ cur_ctx->flow_desc.type = sm_p->u.io.flowproto_type;
+ cur_ctx->flow_desc.user_ptr = NULL;
+
+ if (sm_p->u.io.io_type == PVFS_IO_READ)
+ {
+ cur_ctx->flow_desc.file_data.extend_flag = 0;
+ cur_ctx->flow_desc.src.endpoint_id = BMI_ENDPOINT;
+ cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg->svr_addr;
+ cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
+ cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
+ }
+ else
+ {
+ assert(sm_p->u.io.io_type == PVFS_IO_WRITE);
+
+ cur_ctx->flow_desc.file_data.extend_flag = 1;
+ cur_ctx->flow_desc.src.endpoint_id = MEM_ENDPOINT;
+ cur_ctx->flow_desc.src.u.mem.buffer = sm_p->u.io.buffer;
+ cur_ctx->flow_desc.dest.endpoint_id = BMI_ENDPOINT;
+ cur_ctx->flow_desc.dest.u.bmi.address = cur_ctx->msg->svr_addr;
+ }
+ return 0;
+}
+
+static inline int process_context_recv_and_post_flow(
+ PINT_client_sm *sm_p,
+ job_status_s *js_p)
+{
+ int ret = -PVFS_EINVAL, index = 0;
+ unsigned long status_user_tag = 0;
+ struct PINT_decoded_msg decoded_resp;
+ struct PVFS_server_resp *resp = NULL;
+ PVFS_object_attr *attr = NULL;
+ PINT_client_io_ctx *cur_ctx = NULL;
+
+ gossip_debug(GOSSIP_IO_DEBUG,
+ "- process_context_recv_and_post_flow called\n");
+
+ assert(sm_p && js_p);
+ assert(STATUS_USER_TAG_TYPE(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV));
+
+ status_user_tag = (unsigned long)js_p->status_user_tag;
+
+ index = STATUS_USER_TAG_GET_INDEX(
+ status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
+
+ cur_ctx = &sm_p->u.io.contexts[index];
+ assert(cur_ctx && cur_ctx->msg);
+
+ if (js_p->error_code)
+ {
+ PVFS_perror_gossip("pre-process_context_recv failed",
+ js_p->error_code);
+ return js_p->error_code;
+ }
+
+ ret = process_context_recv(cur_ctx, &decoded_resp, &resp);
+ if (ret)
+ {
+ PVFS_perror_gossip("process_context_recv failed", ret);
+ return ret;
+ }
+
+ attr = (sm_p->acache_hit ? &sm_p->pinode->attr : &sm_p->acache_attr);
+ assert(attr);
+
+ ret = build_context_flow(sm_p, cur_ctx, attr, resp);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("build_context_flow failed", ret);
+ return ret;
+ }
+
+ ret = PINT_serv_free_msgpair_resources(
+ &cur_ctx->msg->encoded_req, cur_ctx->msg->encoded_resp_p,
+ &decoded_resp, &cur_ctx->msg->svr_addr,
+ cur_ctx->msg->max_resp_sz);
+
+ if (ret)
+ {
+ PVFS_perror_gossip("PINT_serv_free_msgpair_resources "
+ "failed", ret);
+ return ret;
+ }
+
+ if (sm_p->u.io.io_type == PVFS_IO_WRITE)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, " preposting write "
+ "ack for context %p.\n", cur_ctx);
+
+ cur_ctx->write_ack.max_resp_sz = PINT_encode_calc_max_size(
+ PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
+ sm_p->u.io.encoding);
+ cur_ctx->write_ack.encoded_resp_p = BMI_memalloc(
+ cur_ctx->msg->svr_addr, cur_ctx->write_ack.max_resp_sz,
+ BMI_RECV);
+
+ if (!cur_ctx->write_ack.encoded_resp_p)
+ {
+ gossip_err("BMI_memalloc (for write ack) failed\n");
+ return -PVFS_ENOMEM;
+ }
+
+ /*
+ we're pre-posting the final write ack here, even though it's
+ ahead of the flow phase; reads are at the flow phase.
+ */
+ status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
+
+ ret = job_bmi_recv(
+ cur_ctx->msg->svr_addr, cur_ctx->write_ack.encoded_resp_p,
+ cur_ctx->write_ack.max_resp_sz, cur_ctx->session_tag,
+ BMI_PRE_ALLOC, sm_p, status_user_tag,
+ &cur_ctx->write_ack.recv_status, &cur_ctx->write_ack.recv_id,
+ pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+
+ if (ret < 0)
+ {
+ gossip_err("job_bmi_recv (write ack) failed\n");
+ return ret;
+ }
+
+ assert(ret == 0);
+ cur_ctx->write_ack_has_been_posted = 1;
+ cur_ctx->write_ack_in_progress = 1;
+ sm_p->u.io.write_ack_completion_count++;
+ }
+
+ status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
+
+ ret = job_flow(
+ &cur_ctx->flow_desc, sm_p, status_user_tag,
+ &cur_ctx->flow_status, &cur_ctx->flow_job_id,
+ pint_client_sm_context, PVFS2_CLIENT_JOB_TIMEOUT);
+
+ if (ret < 0)
+ {
+ gossip_err("job_flow failed\n");
+ return ret;
+ }
+ else if (ret == 1)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, " flow for context %p "
+ "completed immediately\n", cur_ctx);
+ assert(cur_ctx->flow_status.error_code == 0);
+ }
+ else
+ {
+ gossip_debug(GOSSIP_IO_DEBUG, " posted flow for "
+ "context %p\n", cur_ctx);
+
+ cur_ctx->flow_has_been_posted = 1;
+ cur_ctx->flow_in_progress = 1;
+ sm_p->u.io.flow_completion_count++;
+ }
+ return ret;
+}
+
+static inline int check_context_status(
+ PINT_client_io_ctx *cur_ctx,
+ int io_type,
+ PVFS_size *total_size)
+{
+ int ret = 0;
+
+ gossip_debug(GOSSIP_IO_DEBUG, "- check_context_status called\n");
+
+ assert(cur_ctx && cur_ctx->msg && total_size);
+
+ if (cur_ctx->msg->send_status.error_code)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG,
+ " error (%d) in msgpair send for context %p\n",
+ cur_ctx->msg->send_status.error_code, cur_ctx);
+
+ assert(cur_ctx->msg_send_has_been_posted);
+ ret = cur_ctx->msg->send_status.error_code;
+ }
+ else if (cur_ctx->msg->recv_status.error_code)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG,
+ " error (%d) in msgpair recv for context %p\n",
+ cur_ctx->msg->recv_status.error_code, cur_ctx);
+
+ assert(cur_ctx->msg_recv_has_been_posted);
+ ret = cur_ctx->msg->recv_status.error_code;
+ }
+ else if (cur_ctx->flow_status.error_code)
+ {
+ gossip_debug(GOSSIP_IO_DEBUG,
+ " error (%d) in flow for context %p\n",
+ cur_ctx->flow_status.error_code, cur_ctx);
+
+ assert(cur_ctx->flow_has_been_posted);
+ PINT_flow_reset(&cur_ctx->flow_desc);
+ ret = cur_ctx->flow_status.error_code;
+ }
+ else if (io_type == PVFS_IO_READ)
+ {
+ gossip_debug(
+ GOSSIP_IO_DEBUG, " %Ld bytes read from context %p\n",
+ Ld(cur_ctx->flow_desc.total_transfered), cur_ctx);
+
+ /* size for reads are reported in the flow */
+ *total_size += cur_ctx->flow_desc.total_transfered;
+
+ PINT_flow_reset(&cur_ctx->flow_desc);
+ }
+ else if (io_type == PVFS_IO_WRITE)
+ {
+ if (cur_ctx->write_ack.recv_status.error_code)
+ {
+ gossip_debug(
+ GOSSIP_IO_DEBUG,
+ " error (%d) in final ack for context %p\n",
+ cur_ctx->write_ack.recv_status.error_code, cur_ctx);
+
+ assert(cur_ctx->write_ack_has_been_posted);
+ ret = cur_ctx->write_ack.recv_status.error_code;
+ }
+ else if (cur_ctx->write_ack_has_been_posted)
+ {
+ struct PINT_decoded_msg decoded_resp;
+ struct PVFS_server_resp *resp = NULL;
+ /*
+ size for writes are reported in the final ack, but we
+ have to decode it first
+ */
+ ret = PINT_serv_decode_resp(
+ cur_ctx->msg->fs_id, cur_ctx->write_ack.encoded_resp_p,
+ &decoded_resp, &cur_ctx->msg->svr_addr,
+ cur_ctx->write_ack.recv_status.actual_size, &resp);
+ if (ret == 0)
+ {
+ gossip_debug(
+ GOSSIP_IO_DEBUG,
+ " %Ld bytes written to context %p\n",
+ Ld(resp->u.write_completion.total_completed),
+ cur_ctx);
+
+ *total_size += resp->u.write_completion.total_completed;
+
+ PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
+ }
+ else
+ {
+ PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
+ }
+
+ PINT_flow_reset(&cur_ctx->flow_desc);
+ BMI_memfree(cur_ctx->msg->svr_addr,
+ cur_ctx->write_ack.encoded_resp_p,
+ cur_ctx->write_ack.max_resp_sz, BMI_RECV);
+ }
+ }
+ return ret;
+}
+
+/*
+ determines what subset of the datafiles actually contain data that
+ we are interested in for this request. returns 0 on success,
+ -PVFS_error on failure
+*/
+static int io_find_target_datafiles(
+ PVFS_Request mem_req,
+ PVFS_Request file_req,
+ PVFS_offset file_req_offset,
+ PINT_dist *dist_p,
+ PVFS_handle *input_handle_array,
+ int input_handle_count,
+ int *handle_index_array,
+ int *handle_index_out_count)
+{
+ int ret = -PVFS_EINVAL, i = 0;
struct PINT_Request_state *req_state = NULL;
struct PINT_Request_state *mem_req_state = NULL;
PINT_Request_file_data tmp_file_data;
PINT_Request_result tmp_result;
- *handle_count_out_p = 0;
+ gossip_debug(GOSSIP_IO_DEBUG, "- io_find_target_datafiles called\n");
+
+ if (!handle_index_array || !handle_index_out_count)
+ {
+ return ret;
+ }
+ *handle_index_out_count = 0;
req_state = PINT_New_request_state(file_req);
if (!req_state)
@@ -1358,18 +1557,16 @@ static int io_find_target_datafiles(PVFS
return ret;
}
- /* did we find that any data belongs to this handle? */
+ /* check if we found data that belongs to this handle */
if (tmp_result.bytes != 0)
{
assert(tmp_result.bytes > 0);
- gossip_debug(GOSSIP_IO_DEBUG, "io_find_target_dfiles: "
- "datafile %d might have some data.\n", i);
+ handle_index_array[(*handle_index_out_count)++] = i;
- output_handle_array[*handle_count_out_p] =
- input_handle_array[i];
- handle_index_array[*handle_count_out_p] = i;
- (*handle_count_out_p)++;
+ gossip_debug(GOSSIP_IO_DEBUG, "io_find_target_dfiles: "
+ "datafile %d might have some data (out_count "
+ "is %d).\n", i, *handle_index_out_count);
}
}
PINT_Free_request_state(req_state);
Index: sys-lookup.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-lookup.sm,v
diff -p -u -r1.36 -r1.37
--- sys-lookup.sm 21 May 2004 17:22:38 -0000 1.36
+++ sys-lookup.sm 8 Jul 2004 16:17:06 -0000 1.37
@@ -1159,7 +1159,7 @@ static int lookup_segment_getattr_comp_f
gossip_debug(GOSSIP_CLIENT_DEBUG, " getattr failed with code %d\n",
resp_p->status);
return resp_p->status;
- }
+ }
cur_seg = GET_CURRENT_SEGMENT();
assert(cur_seg);
@@ -1171,7 +1171,6 @@ static int lookup_segment_getattr_comp_f
PINT_acache_object_attr_deep_copy(
&(cur_seg->seg_attr),
&(resp_p->u.getattr.attr));
-
return 0;
}
Index: sys-setattr.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-setattr.sm,v
diff -p -u -r1.17 -r1.18
--- sys-setattr.sm 21 May 2004 18:52:07 -0000 1.17
+++ sys-setattr.sm 8 Jul 2004 16:17:06 -0000 1.18
@@ -217,6 +217,9 @@ static int setattr_msg_setup_msgpair(PIN
msg_p = &sm_p->msgpair;
memset(msg_p, 0, sizeof(PINT_client_sm_msgpair_state));
+ sm_p->msgarray = msg_p;
+ sm_p->msgarray_count = 1;
+
/*
if this lookup wasn't a acache hit, we must have a local
copy stored in the sm object for later acache insertion
@@ -270,11 +273,6 @@ static int setattr_msg_setup_msgpair(PIN
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
-
- /* let 'msgpairarray' handle the 'msgpair' case */
- sm_p->msgarray = msg_p;
- sm_p->msgarray_count = 1;
-
return 1;
}
Index: sys-symlink.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-symlink.sm,v
diff -p -u -r1.29 -r1.30
--- sys-symlink.sm 21 May 2004 18:52:07 -0000 1.29
+++ sys-symlink.sm 8 Jul 2004 16:17:07 -0000 1.30
@@ -215,7 +215,7 @@ int PVFS_isys_symlink(
return -PVFS_ENAMETOOLONG;
}
- sm_p = (PINT_client_sm *) malloc(sizeof(*sm_p));
+ sm_p = (PINT_client_sm *)malloc(sizeof(*sm_p));
if (sm_p == NULL)
{
return -PVFS_ENOMEM;
More information about the PVFS2-CVS
mailing list