[PVFS2-CVS]
commit by pcarns in pvfs2/src/io/job: thread-mgr.c thread-mgr.h
CVS commit program
cvs at parl.clemson.edu
Thu Feb 12 15:23:56 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv10305/src/io/job
Modified Files:
thread-mgr.c thread-mgr.h
Log Message:
thread-mgr cancel hook for trove operations
Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.11 -r1.12
--- thread-mgr.c 12 Feb 2004 19:02:07 -0000 1.11
+++ thread-mgr.c 12 Feb 2004 20:23:56 -0000 1.12
@@ -51,6 +51,7 @@ static pthread_t trove_thread_id;
static pthread_t dev_thread_id;
static pthread_cond_t bmi_test_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t trove_test_cond = PTHREAD_COND_INITIALIZER;
#endif /* __PVFS2_JOB_THREADED__ */
/* used to indicate that a bmi testcontext is in progress; we can't simply
@@ -61,6 +62,10 @@ static gen_mutex_t bmi_test_mutex = GEN_
static int bmi_test_flag = 0;
static int bmi_test_count = 0;
static int bmi_test_index = 0;
+static gen_mutex_t trove_test_mutex = GEN_MUTEX_INITIALIZER;
+static int trove_test_flag = 0;
+static int trove_test_count = 0;
+static int trove_test_index = 0;
static int bmi_thread_running = 0;
static int trove_thread_running = 0;
@@ -73,7 +78,6 @@ static int dev_thread_running = 0;
static void *trove_thread_function(void *ptr)
{
int ret = -1;
- int count;
int i=0;
struct PINT_thread_mgr_trove_callback *tmp_callback;
int timeout = thread_mgr_test_timeout;
@@ -82,11 +86,17 @@ static void *trove_thread_function(void
while (trove_thread_running)
#endif
{
- count = THREAD_MGR_TEST_COUNT;
+ /* indicate that a test is in progress */
+ gen_mutex_lock(&trove_test_mutex);
+ trove_test_flag = 1;
+ gen_mutex_unlock(&trove_test_mutex);
+
+ trove_test_count = THREAD_MGR_TEST_COUNT;
+ trove_test_index = 0;
#ifdef __PVFS2_TROVE_SUPPORT__
ret = trove_dspace_testcontext(HACK_fs_id,
stat_trove_id_array,
- &count,
+ &trove_test_count,
stat_trove_error_code_array,
stat_trove_user_ptr_array,
timeout,
@@ -97,6 +107,13 @@ static void *trove_thread_function(void
HACK_fs_id = 0;
assert(0);
#endif
+ gen_mutex_lock(&trove_test_mutex);
+ trove_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+ pthread_cond_signal(&trove_test_cond);
+#endif
+ gen_mutex_unlock(&trove_test_mutex);
+
if(ret < 0)
{
/* critical error */
@@ -105,7 +122,7 @@ static void *trove_thread_function(void
gossip_lerr("Error: critical Trove failure.\n");
}
- for(i=0; i<count; i++)
+ for(i=0; i<trove_test_count; i++)
{
/* execute a callback function for each completed BMI operation */
tmp_callback =
@@ -113,7 +130,25 @@ static void *trove_thread_function(void
/* sanity check */
assert(tmp_callback != NULL);
assert(tmp_callback->fn != NULL);
+
+ /* yuck, another critical region; we can't execute callbacks
+ * while a cancel() is in progress, but we also can't hold a
+ * lock while executing the callback.
+ */
+ gen_mutex_lock(&trove_test_mutex);
+ trove_test_flag = 1;
+ gen_mutex_unlock(&trove_test_mutex);
+
+ trove_test_index++;
tmp_callback->fn(tmp_callback->data, stat_trove_error_code_array[i]);
+
+ gen_mutex_lock(&trove_test_mutex);
+ trove_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+ pthread_cond_signal(&trove_test_cond);
+#endif
+ gen_mutex_unlock(&trove_test_mutex);
+
}
}
return (NULL);
@@ -576,6 +611,58 @@ int PINT_thread_mgr_trove_getcontext(PVF
gen_mutex_unlock(&trove_mutex);
return(-PVFS_EINVAL);
+}
+
+/* PINT_thread_mgr_trove_cancel()
+ *
+ * cancels a pending Trove operation for which the callback function has not
+ * yet been executed
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+int PINT_thread_mgr_trove_cancel(PVFS_id_gen_t id, PVFS_fs_id fs_id,
+ void* user_ptr)
+{
+ int i;
+ int ret;
+
+ /* wait until we can guarantee that a trove_testcontext() is not in
+ * progress
+ */
+ gen_mutex_lock(&trove_test_mutex);
+ while(trove_test_flag == 1)
+ {
+#ifdef __PVFS2_JOB_THREADED__
+ pthread_cond_wait(&trove_test_cond, &trove_test_mutex);
+#else
+ /* this condition shouldn't be possible without threads */
+ assert(0);
+#endif
+ }
+
+ /* iterate down list of pending completions, to see if the caller is
+ * trying to cancel one of them
+ */
+ for(i=trove_test_index; i<trove_test_count; i++)
+ {
+ if(stat_trove_id_array[i] == id && stat_trove_user_ptr_array ==
+ user_ptr)
+ {
+ /* match; no steps needed to cancel, the op is already done */
+ gen_mutex_unlock(&trove_test_mutex);
+ return(0);
+ }
+ }
+
+ /* tell Trove to cancel the operation */
+#ifdef __PVFS2_TROVE_SUPPORT__
+ ret = trove_dspace_cancel(fs_id, id, global_trove_context);
+#else
+ ret = 0;
+ assert(0);
+#endif
+ gen_mutex_unlock(&trove_test_mutex);
+ return(ret);
}
/* PINT_thread_mgr_bmi_getcontext()
Index: thread-mgr.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.h,v
diff -p -u -r1.5 -r1.6
--- thread-mgr.h 12 Feb 2004 19:02:07 -0000 1.5
+++ thread-mgr.h 12 Feb 2004 20:23:56 -0000 1.6
@@ -37,6 +37,8 @@ struct PINT_thread_mgr_trove_callback
int PINT_thread_mgr_trove_start(void);
int PINT_thread_mgr_trove_stop(void);
int PINT_thread_mgr_trove_getcontext(PVFS_context_id *context);
+int PINT_thread_mgr_trove_cancel(PVFS_id_gen_t id, PVFS_fs_id fs_id,
+ void* user_ptr);
/* dev thread */
More information about the PVFS2-CVS
mailing list