[PVFS2-CVS] commit by pcarns in pvfs2/src/io/job: thread-mgr.c thread-mgr.h

CVS commit program cvs at parl.clemson.edu
Thu Feb 12 14:02:07 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv9949/src/io/job

Modified Files:
	thread-mgr.c thread-mgr.h 
Log Message:
where to begin?  ok, we need to proxy bmi cancellation requests (and
eventually trove requests) through the thread-mgr, in case the thread-mgr
has already completed whatever it is we want to cancel but we haven't found
out yet.  thus the addition of thread_mgr_bmi_cancel().  This function is
complicated by races with the thread that is actually trying to complete
stuff, but we can't solve it with simple locks because NPTL will punish us
for holding locks for the duration of a testcontext call... so we use an odd
looking condition variable technique to protect the critical regions


Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.10 -r1.11
--- thread-mgr.c	28 Jan 2004 16:25:37 -0000	1.10
+++ thread-mgr.c	12 Feb 2004 19:02:07 -0000	1.11
@@ -49,8 +49,19 @@ static struct PINT_dev_unexp_info stat_d
 static pthread_t bmi_thread_id;
 static pthread_t trove_thread_id;
 static pthread_t dev_thread_id;
+
+static pthread_cond_t bmi_test_cond = PTHREAD_COND_INITIALIZER;
 #endif /* __PVFS2_JOB_THREADED__ */
 
+/* used to indicate that a bmi testcontext is in progress; we can't simply
+ * hold a lock while calling bmi testcontext for performance reasons
+ * (particularly under NPTL)
+ */
+static gen_mutex_t bmi_test_mutex = GEN_MUTEX_INITIALIZER;
+static int bmi_test_flag = 0;
+static int bmi_test_count = 0;
+static int bmi_test_index = 0;
+
 static int bmi_thread_running = 0;
 static int trove_thread_running = 0;
 static int dev_thread_running = 0;
@@ -175,10 +186,26 @@ static void *bmi_thread_function(void *p
 	    test_timeout = thread_mgr_test_timeout;
 	}
 
+	/* indicate that a test is in progress */
+	gen_mutex_lock(&bmi_test_mutex);
+	bmi_test_flag = 1;
+	gen_mutex_unlock(&bmi_test_mutex);
+	
 	incount = THREAD_MGR_TEST_COUNT;
-	ret = BMI_testcontext(incount, stat_bmi_id_array, &outcount,
+	bmi_test_count = 0;
+	bmi_test_index = 0;
+
+	ret = BMI_testcontext(incount, stat_bmi_id_array, &bmi_test_count,
 	    stat_bmi_error_code_array, stat_bmi_actual_size_array,
 	    stat_bmi_user_ptr_array, test_timeout, global_bmi_context);
+
+	gen_mutex_lock(&bmi_test_mutex);
+	bmi_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+	pthread_cond_signal(&bmi_test_cond);
+#endif
+	gen_mutex_unlock(&bmi_test_mutex);
+
 	if(ret < 0)
 	{
 	    /* critical error */
@@ -187,7 +214,7 @@ static void *bmi_thread_function(void *p
 	    gossip_lerr("Error: critical BMI failure.\n");
 	}
 
-	for(i=0; i<outcount; i++)
+	for(i=0; i<bmi_test_count; i++)
 	{
 	    /* execute a callback function for each completed BMI operation */
 	    tmp_callback = 
@@ -195,8 +222,25 @@ static void *bmi_thread_function(void *p
 	    /* sanity check */
 	    assert(tmp_callback != NULL);
 	    assert(tmp_callback->fn != NULL);
+
+	    /* yuck, another critical region; we can't execute callbacks
+	     * while a cancel() is in progress, but we also can't hold a
+	     * lock while executing the callback.  
+	     */
+	    gen_mutex_lock(&bmi_test_mutex);
+	    bmi_test_flag = 1;
+	    gen_mutex_unlock(&bmi_test_mutex);
+	
+	    bmi_test_index++;
 	    tmp_callback->fn(tmp_callback->data, stat_bmi_actual_size_array[i],
 		stat_bmi_error_code_array[i]);
+
+	    gen_mutex_lock(&bmi_test_mutex);
+	    bmi_test_flag = 0;
+#ifdef __PVFS2_JOB_THREADED__
+	    pthread_cond_signal(&bmi_test_cond);
+#endif
+	    gen_mutex_unlock(&bmi_test_mutex);
 	}
     }
 
@@ -415,6 +459,51 @@ int PINT_thread_mgr_dev_stop(void)
     return(0);
 }
 
+/* PINT_thread_mgr_bmi_cancel()
+ *
+ * cancels a pending BMI operation for which the callback function has not
+ * yet been executed
+ *
+ * returns 0 on success, -PVFS_error on failure
+ */
+int PINT_thread_mgr_bmi_cancel(PVFS_id_gen_t id, void* user_ptr)
+{
+    int i;
+    int ret;
+
+    /* wait until we can guarantee that a BMI_testcontext() is not in
+     * progress
+     */
+    gen_mutex_lock(&bmi_test_mutex);
+    while(bmi_test_flag == 1)
+    {
+#ifdef __PVFS2_JOB_THREADED__
+	pthread_cond_wait(&bmi_test_cond, &bmi_test_mutex);
+#else
+	/* this condition shouldn't be possible without threads */
+	assert(0);
+#endif
+    }
+
+    /* iterate down list of pending completions, to see if the caller is
+     * trying to cancel one of them
+     */
+    for(i=bmi_test_index; i<bmi_test_count; i++)
+    {
+	if(stat_bmi_id_array[i] == id && stat_bmi_user_ptr_array ==
+	    user_ptr)
+	{
+	    /* match; no steps needed to cancel, the op is already done */
+	    gen_mutex_unlock(&bmi_test_mutex);
+	    return(0);
+	}
+    }
+
+    /* tell BMI to cancel the operation */
+    ret = BMI_cancel(id, global_bmi_context);
+    gen_mutex_unlock(&bmi_test_mutex);
+    return(ret);
+}
 
 /* PINT_thread_mgr_trove_stop()
  *

Index: thread-mgr.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.h,v
diff -p -u -r1.4 -r1.5
--- thread-mgr.h	12 Jan 2004 23:15:04 -0000	1.4
+++ thread-mgr.h	12 Feb 2004 19:02:07 -0000	1.5
@@ -19,6 +19,7 @@ struct PINT_thread_mgr_bmi_callback
     void* data;
 };
 
+int PINT_thread_mgr_bmi_cancel(PVFS_id_gen_t id, void* user_ptr);
 int PINT_thread_mgr_bmi_start(void);
 int PINT_thread_mgr_bmi_stop(void);
 int PINT_thread_mgr_bmi_getcontext(PVFS_context_id *context);



More information about the PVFS2-CVS mailing list