[PVFS2-CVS] commit by pcarns in pvfs2/src/io/job: thread-mgr.c thread-mgr.h

CVS commit program cvs at parl.clemson.edu
Thu Feb 12 15:23:56 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv10305/src/io/job

Modified Files:
	thread-mgr.c thread-mgr.h 
Log Message:
thread-mgr cancel hook for trove operations


Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.11 -r1.12
--- thread-mgr.c	12 Feb 2004 19:02:07 -0000	1.11
+++ thread-mgr.c	12 Feb 2004 20:23:56 -0000	1.12
@@ -51,6 +51,7 @@ static pthread_t trove_thread_id;
 static pthread_t dev_thread_id;
 
 static pthread_cond_t bmi_test_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t trove_test_cond = PTHREAD_COND_INITIALIZER;
 #endif /* __PVFS2_JOB_THREADED__ */
 
 /* used to indicate that a bmi testcontext is in progress; we can't simply
@@ -61,6 +62,10 @@ static gen_mutex_t bmi_test_mutex = GEN_
 static int bmi_test_flag = 0;
 static int bmi_test_count = 0;
 static int bmi_test_index = 0;
+static gen_mutex_t trove_test_mutex = GEN_MUTEX_INITIALIZER;
+static int trove_test_flag = 0;
+static int trove_test_count = 0;
+static int trove_test_index = 0;
 
 static int bmi_thread_running = 0;
 static int trove_thread_running = 0;
@@ -73,7 +78,6 @@ static int dev_thread_running = 0;
 static void *trove_thread_function(void *ptr)
 {
     int ret = -1;
-    int count;
     int i=0;
     struct PINT_thread_mgr_trove_callback *tmp_callback;
     int timeout = thread_mgr_test_timeout;
@@ -82,11 +86,17 @@ static void *trove_thread_function(void 
     while (trove_thread_running)
 #endif
     {
-	count = THREAD_MGR_TEST_COUNT;
+	/* indicate that a test is in progress */
+	gen_mutex_lock(&trove_test_mutex);
+	trove_test_flag = 1;
+	gen_mutex_unlock(&trove_test_mutex);
+	
+	trove_test_count = THREAD_MGR_TEST_COUNT;
+	trove_test_index = 0;
 #ifdef __PVFS2_TROVE_SUPPORT__
 	ret = trove_dspace_testcontext(HACK_fs_id,
 	    stat_trove_id_array,
-	    &count,
+	    &trove_test_count,
 	    stat_trove_error_code_array,
 	    stat_trove_user_ptr_array,
 	    timeout,
@@ -97,6 +107,13 @@ static void *trove_thread_function(void 
 	HACK_fs_id = 0;
 	assert(0);
 #endif
+	gen_mutex_lock(&trove_test_mutex);
+	trove_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+	pthread_cond_signal(&trove_test_cond);
+#endif
+	gen_mutex_unlock(&trove_test_mutex);
+
 	if(ret < 0)
 	{
 	    /* critical error */
@@ -105,7 +122,7 @@ static void *trove_thread_function(void 
 	    gossip_lerr("Error: critical Trove failure.\n");
 	}
 
-	for(i=0; i<count; i++)
+	for(i=0; i<trove_test_count; i++)
 	{
 	    /* execute a callback function for each completed BMI operation */
 	    tmp_callback = 
@@ -113,7 +130,25 @@ static void *trove_thread_function(void 
 	    /* sanity check */
 	    assert(tmp_callback != NULL);
 	    assert(tmp_callback->fn != NULL);
+	    
+	    /* yuck, another critical region; we can't execute callbacks
+	     * while a cancel() is in progress, but we also can't hold a
+	     * lock while executing the callback.  
+	     */
+	    gen_mutex_lock(&trove_test_mutex);
+	    trove_test_flag = 1;
+	    gen_mutex_unlock(&trove_test_mutex);
+	
+	    trove_test_index++;
 	    tmp_callback->fn(tmp_callback->data, stat_trove_error_code_array[i]);
+
+	    gen_mutex_lock(&trove_test_mutex);
+	    trove_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+	    pthread_cond_signal(&trove_test_cond);
+#endif
+	    gen_mutex_unlock(&trove_test_mutex);
+
 	}
     }
     return (NULL);
@@ -576,6 +611,58 @@ int PINT_thread_mgr_trove_getcontext(PVF
     gen_mutex_unlock(&trove_mutex);
 
     return(-PVFS_EINVAL);
+}
+
+/* PINT_thread_mgr_trove_cancel()
+ *
+ * cancels a pending Trove operation for which the callback function has not
+ * yet been executed
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+int PINT_thread_mgr_trove_cancel(PVFS_id_gen_t id, PVFS_fs_id fs_id,
+    void* user_ptr)
+{
+    int i;
+    int ret;
+
+    /* wait until we can guarantee that a trove_testcontext() is not in
+     * progress
+     */
+    gen_mutex_lock(&trove_test_mutex);
+    while(trove_test_flag == 1)
+    {
+#ifdef __PVFS2_JOB_THREADED__
+	pthread_cond_wait(&trove_test_cond, &trove_test_mutex);
+#else
+	/* this condition shouldn't be possible without threads */
+	assert(0);
+#endif
+    }
+
+    /* iterate down list of pending completions, to see if the caller is
+     * trying to cancel one of them
+     */
+    for(i=trove_test_index; i<trove_test_count; i++)
+    {
+	if(stat_trove_id_array[i] == id && stat_trove_user_ptr_array ==
+	    user_ptr)
+	{
+	    /* match; no steps needed to cancel, the op is already done */
+	    gen_mutex_unlock(&trove_test_mutex);
+	    return(0);
+	}
+    }
+
+    /* tell Trove to cancel the operation */
+#ifdef __PVFS2_TROVE_SUPPORT__
+    ret = trove_dspace_cancel(fs_id, id, global_trove_context);
+#else
+    ret = 0;
+    assert(0);
+#endif
+    gen_mutex_unlock(&trove_test_mutex);
+    return(ret);
 }
 
 /* PINT_thread_mgr_bmi_getcontext()

Index: thread-mgr.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.h,v
diff -p -u -r1.5 -r1.6
--- thread-mgr.h	12 Feb 2004 19:02:07 -0000	1.5
+++ thread-mgr.h	12 Feb 2004 20:23:56 -0000	1.6
@@ -37,6 +37,8 @@ struct PINT_thread_mgr_trove_callback
 int PINT_thread_mgr_trove_start(void);
 int PINT_thread_mgr_trove_stop(void);
 int PINT_thread_mgr_trove_getcontext(PVFS_context_id *context);
+int PINT_thread_mgr_trove_cancel(PVFS_id_gen_t id, PVFS_fs_id fs_id, 
+    void* user_ptr);
 
 /* dev thread */
 



More information about the PVFS2-CVS mailing list