[PVFS2-CVS] commit by neill in pvfs2/src/io/job: job-desc-queue.c job.c job.h thread-mgr.c

CVS commit program cvs at parl.clemson.edu
Thu Jul 8 13:17:11 EDT 2004


Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv12211/src/io/job

Modified Files:
	job-desc-queue.c job.c job.h thread-mgr.c 
Log Message:
- merging in the pvfs2-nm-nb-branch with the main tree
  see ChangeLog for details, or browse the cvs history of the branch
  for full details


Index: job-desc-queue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job-desc-queue.c,v
diff -p -u -r1.10 -r1.11
--- job-desc-queue.c	18 Dec 2003 20:31:34 -0000	1.10
+++ job-desc-queue.c	8 Jul 2004 16:17:11 -0000	1.11
@@ -41,7 +41,7 @@ struct job_desc *alloc_job_desc(int type
     }
     memset(jd, 0, sizeof(struct job_desc));
 
-    if (id_gen_fast_register(&(jd->job_id), jd) < 0)
+    if (id_gen_safe_register(&(jd->job_id), jd) < 0)
     {
 	free(jd);
 	return (NULL);
@@ -59,6 +59,7 @@ struct job_desc *alloc_job_desc(int type
  */
 void dealloc_job_desc(struct job_desc *jd)
 {
+    id_gen_safe_unregister(jd->job_id);
     free(jd);
     return;
 }
@@ -135,12 +136,13 @@ void job_desc_q_add(job_desc_q_p jdqp,
         gen_mutex_lock(s_job_desc_q_mutex);
         if (jdqp)
         {
+            assert(desc);
+
             /* note that we are adding to tail to preserve fifo order */
             qlist_add_tail(&(desc->job_desc_q_link), jdqp);
         }
         gen_mutex_unlock(s_job_desc_q_mutex);
     }
-    return;
 }
 
 /* job_desc_q_remove()
@@ -151,8 +153,8 @@ void job_desc_q_add(job_desc_q_p jdqp,
  */
 void job_desc_q_remove(struct job_desc *desc)
 {
+    assert(desc);
     qlist_del(&(desc->job_desc_q_link));
-    return;
 }
 
 /* job_desc_q_empty()

Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.c,v
diff -p -u -r1.129 -r1.130
--- job.c	18 May 2004 15:30:46 -0000	1.129
+++ job.c	8 Jul 2004 16:17:11 -0000	1.130
@@ -298,6 +298,7 @@ int job_bmi_send(PVFS_BMI_addr_t addr,
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_BMI_SEND, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -309,6 +310,7 @@ int job_bmi_send(PVFS_BMI_addr_t addr,
 	out_status_p->actual_size = size;
         JOB_EVENT_END(PVFS_EVENT_BMI_SEND, size, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -395,6 +397,7 @@ int job_bmi_send_list(PVFS_BMI_addr_t ad
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_BMI_SEND, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -406,6 +409,7 @@ int job_bmi_send_list(PVFS_BMI_addr_t ad
 	out_status_p->actual_size = total_size;
         JOB_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -472,6 +476,7 @@ int job_bmi_recv(PVFS_BMI_addr_t addr,
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_BMI_RECV, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -484,6 +489,7 @@ int job_bmi_recv(PVFS_BMI_addr_t addr,
         JOB_EVENT_END(PVFS_EVENT_BMI_RECV, out_status_p->actual_size, 
 	    jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -557,6 +563,7 @@ int job_bmi_recv_list(PVFS_BMI_addr_t ad
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_BMI_RECV, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -569,6 +576,7 @@ int job_bmi_recv_list(PVFS_BMI_addr_t ad
         JOB_EVENT_END(PVFS_EVENT_BMI_RECV, out_status_p->actual_size, 
 	    jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -638,6 +646,7 @@ int job_bmi_unexp(struct BMI_unexpected_
 	{
 	    /* error testing */
 	    dealloc_job_desc(jd);
+            jd = NULL;
 	    return (ret);
 	}
 
@@ -647,10 +656,13 @@ int job_bmi_unexp(struct BMI_unexpected_
 	    out_status_p->error_code = jd->u.bmi_unexp.info->error_code;
 	    out_status_p->status_user_tag = status_user_tag;
 	    dealloc_job_desc(jd);
+            jd = NULL;
 	    return (ret);
 	}
     }
 
+    PINT_thread_mgr_bmi_unexp_handler(bmi_thread_mgr_unexp_handler);
+
     /* if we fall through to this point, then there were not any
      * uenxpected receive's available; queue up to test later 
      */
@@ -660,8 +672,6 @@ int job_bmi_unexp(struct BMI_unexpected_
     bmi_unexp_pending_count++;
     gen_mutex_unlock(&bmi_unexp_mutex);
 
-    PINT_thread_mgr_bmi_unexp_handler(bmi_thread_mgr_unexp_handler);
-
     return (0);
 }
 
@@ -679,18 +689,19 @@ int job_bmi_cancel(job_id_t id, job_cont
 
     gen_mutex_lock(&completion_mutex);
 
-    query = id_gen_fast_lookup(id);
-    if(query->completed_flag)
+    query = id_gen_safe_lookup(id);
+    if (!query || query->completed_flag)
     {
 	/* job has already completed, no cancellation needed */
 	gen_mutex_unlock(&completion_mutex);
 	return(0);
     }
 
-    /* tell thread mgr to cancel operation.  This will result in normal
-     * completion path through thread mgr callbacks; no more work to do here */
-    ret = PINT_thread_mgr_bmi_cancel(query->u.bmi.id,
-	&(query->bmi_callback));
+    /* tell thread mgr to cancel operation.  This will result in
+     * normal completion path through thread mgr callbacks; no more
+     * work to do here */
+    ret = PINT_thread_mgr_bmi_cancel(
+        query->u.bmi.id, &(query->bmi_callback));
 
     gen_mutex_unlock(&completion_mutex);
 
@@ -705,18 +716,19 @@ int job_bmi_cancel(job_id_t id, job_cont
  * returns 0 on success, -errno on failure, and 1 on immediate
  * completion
  */
-int job_dev_unexp(struct PINT_dev_unexp_info* dev_unexp_d,
+int job_dev_unexp(
+    struct PINT_dev_unexp_info* dev_unexp_d,
     void* user_ptr,
     job_aint status_user_tag,
     job_status_s * out_status_p,
     job_id_t* id,
+    enum job_flags flags,
     job_context_id context_id)
 {
     /* post a dev recv for an unexpected message.  We will do a quick
      * test to see if an unexpected message is available.  If so, we
      * return the necessary info; if not we queue up to test again later
      */
-
     int ret = -1;
     struct job_desc *jd = NULL;
     int outcount = 0;
@@ -734,26 +746,36 @@ int job_dev_unexp(struct PINT_dev_unexp_
     jd->context_id = context_id;
     jd->status_user_tag = status_user_tag;
 
-    ret = PINT_dev_test_unexpected(1, &outcount, jd->u.dev_unexp.info, 0);
-
-    if (ret < 0)
+    /* only look for immediate completion if our flags alow it */
+    if (!(flags & JOB_NO_IMMED_COMPLETE))
     {
-	/* error testing */
-	dealloc_job_desc(jd);
-	return (ret);
-    }
+        ret = PINT_dev_test_unexpected(
+            1, &outcount, jd->u.dev_unexp.info, 0);
 
-    if (outcount == 1)
-    {
-	/* there was an unexpected job available */
-	out_status_p->error_code = 0;
-	out_status_p->status_user_tag = status_user_tag;
-	dealloc_job_desc(jd);
-	return (ret);
+        if (ret < 0)
+        {
+            /* error testing */
+            dealloc_job_desc(jd);
+            jd = NULL;
+            return (ret);
+        }
+
+        if (outcount == 1)
+        {
+            /* there was an unexpected job available */
+            out_status_p->error_code = 0;
+            out_status_p->status_user_tag = status_user_tag;
+            dealloc_job_desc(jd);
+            jd = NULL;
+            return (ret);
+        }
     }
 
+    PINT_thread_mgr_dev_unexp_handler(dev_thread_mgr_unexp_handler);
+
     /* if we fall through to this point, then there were not any
-     * uenxpected receive's available; queue up to test later 
+     * uenxpected receive's available (or none requested); queue up to
+     * test later
      */
     gen_mutex_lock(&dev_unexp_mutex);
     *id = jd->job_id;
@@ -761,8 +783,6 @@ int job_dev_unexp(struct PINT_dev_unexp_
     dev_unexp_pending_count++;
     gen_mutex_unlock(&dev_unexp_mutex);
 
-    PINT_thread_mgr_dev_unexp_handler(dev_thread_mgr_unexp_handler);
-
     return (0);
 }
 
@@ -893,6 +913,7 @@ int job_req_sched_post(struct PVFS_serve
     {
 	/* error posting */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -956,6 +977,7 @@ int job_req_sched_post_timer(int msecs,
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -965,6 +987,7 @@ int job_req_sched_post_timer(int msecs,
 	out_status_p->error_code = 0;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -1009,13 +1032,14 @@ int job_req_sched_release(job_id_t in_co
     jd->context_id = context_id;
     jd->status_user_tag = status_user_tag;
 
-    match_jd = id_gen_fast_lookup(in_completed_id);
+    match_jd = id_gen_safe_lookup(in_completed_id);
 
     ret = PINT_req_sched_release(match_jd->u.req_sched.id, jd,
 				 &(jd->u.req_sched.id));
 
     /* delete the old req sched job desc; it is no longer needed */
     dealloc_job_desc(match_jd);
+    match_jd = NULL;
 
     /* NOTE: I am letting the return value propigate here, rather
      * than just setting the status.  Failure here is bad...
@@ -1024,6 +1048,7 @@ int job_req_sched_release(job_id_t in_co
     {
 	/* error posting */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1033,6 +1058,7 @@ int job_req_sched_release(job_id_t in_co
 	out_status_p->error_code = 0;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -1088,6 +1114,7 @@ int job_flow(flow_descriptor * flow_d,
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_FLOW, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
     if (ret == 1)
@@ -1098,6 +1125,7 @@ int job_flow(flow_descriptor * flow_d,
 	out_status_p->actual_size = flow_d->total_transfered;
         JOB_EVENT_END(PVFS_EVENT_FLOW, flow_d->total_transfered, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (1);
     }
 
@@ -1125,9 +1153,9 @@ int job_flow_cancel(job_id_t id, job_con
 
     gen_mutex_lock(&completion_mutex);
 
-    query = id_gen_fast_lookup(id);
+    query = id_gen_safe_lookup(id);
 
-    if(query->completed_flag)
+    if (!query || query->completed_flag)
     {
 	/* job has already completed, no cancellation needed */
 	gen_mutex_unlock(&completion_mutex);
@@ -1207,6 +1235,7 @@ int job_trove_bstream_write_at(PVFS_fs_i
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_WRITE_AT, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -1223,6 +1252,7 @@ int job_trove_bstream_write_at(PVFS_fs_i
         JOB_EVENT_END(PVFS_EVENT_TROVE_WRITE_AT, out_status_p->actual_size, 
 	    jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1298,6 +1328,7 @@ int job_trove_bstream_read_at(PVFS_fs_id
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_READ_AT, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -1314,6 +1345,7 @@ int job_trove_bstream_read_at(PVFS_fs_id
         JOB_EVENT_END(PVFS_EVENT_TROVE_READ_AT, out_status_p->actual_size, 
 	    jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1377,6 +1409,7 @@ int job_trove_bstream_flush(PVFS_fs_id c
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_FLUSH, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1388,6 +1421,7 @@ int job_trove_bstream_flush(PVFS_fs_id c
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_FLUSH, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
     /* if we fall through to this point, the job did not
@@ -1458,7 +1492,7 @@ int job_trove_keyval_read(PVFS_fs_id col
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1472,6 +1506,7 @@ int job_trove_keyval_read(PVFS_fs_id col
 	out_status_p->vtag = jd->u.trove.vtag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1545,7 +1580,7 @@ int job_trove_keyval_read_list(PVFS_fs_i
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ_LIST, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1559,6 +1594,7 @@ int job_trove_keyval_read_list(PVFS_fs_i
 	out_status_p->vtag = jd->u.trove.vtag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ_LIST, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1631,7 +1667,7 @@ int job_trove_keyval_write(PVFS_fs_id co
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1645,6 +1681,7 @@ int job_trove_keyval_write(PVFS_fs_id co
 	out_status_p->vtag = jd->u.trove.vtag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1707,6 +1744,7 @@ int job_trove_keyval_flush(PVFS_fs_id co
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_FLUSH, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1719,6 +1757,7 @@ int job_trove_keyval_flush(PVFS_fs_id co
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_FLUSH, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1790,6 +1829,7 @@ int job_trove_dspace_getattr(PVFS_fs_id 
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_GETATTR, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1802,6 +1842,7 @@ int job_trove_dspace_getattr(PVFS_fs_id 
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_GETATTR, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1873,6 +1914,7 @@ int job_trove_dspace_setattr(PVFS_fs_id 
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_SETATTR, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1885,6 +1927,7 @@ int job_trove_dspace_setattr(PVFS_fs_id 
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_SETATTR, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -1957,6 +2000,7 @@ int job_trove_bstream_resize(PVFS_fs_id 
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_RESIZE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -1969,6 +2013,7 @@ int job_trove_bstream_resize(PVFS_fs_id 
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_RESIZE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2059,7 +2104,7 @@ int job_trove_keyval_remove(PVFS_fs_id c
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_REMOVE, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -2073,6 +2118,7 @@ int job_trove_keyval_remove(PVFS_fs_id c
 	out_status_p->vtag = jd->u.trove.vtag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_REMOVE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2170,7 +2216,7 @@ int job_trove_keyval_iterate(PVFS_fs_id 
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -2186,6 +2232,7 @@ int job_trove_keyval_iterate(PVFS_fs_id 
 	out_status_p->count = jd->u.trove.count;
         JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2260,7 +2307,7 @@ int job_trove_dspace_iterate_handles(PVF
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_ITERATE_HANDLES, 0, jd->job_id);
 	dealloc_job_desc(jd);
-
+        jd = NULL;
 	out_status_p->error_code = ret;
 	out_status_p->status_user_tag = status_user_tag;
 	return (1);
@@ -2276,6 +2323,7 @@ int job_trove_dspace_iterate_handles(PVF
 	out_status_p->count = jd->u.trove.count;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_ITERATE_HANDLES, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2375,6 +2423,7 @@ int job_trove_dspace_create(PVFS_fs_id c
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2389,6 +2438,7 @@ int job_trove_dspace_create(PVFS_fs_id c
 	out_status_p->handle = jd->u.trove.handle;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2457,6 +2507,7 @@ int job_trove_dspace_remove(PVFS_fs_id c
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2470,6 +2521,7 @@ int job_trove_dspace_remove(PVFS_fs_id c
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2538,6 +2590,7 @@ int job_trove_dspace_verify(PVFS_fs_id c
 	/* error posting trove operation */
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_VERIFY, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2551,6 +2604,7 @@ int job_trove_dspace_verify(PVFS_fs_id c
 	out_status_p->status_user_tag = status_user_tag;
         JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_VERIFY, 0, jd->job_id);
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2580,8 +2634,8 @@ int job_trove_dspace_cancel(PVFS_fs_id c
 
     gen_mutex_lock(&completion_mutex);
 
-    query = id_gen_fast_lookup(id);
-    if(query->completed_flag)
+    query = id_gen_safe_lookup(id);
+    if (!query || query->completed_flag)
     {
 	/* job has already completed, no cancellation needed */
 	gen_mutex_unlock(&completion_mutex);
@@ -2590,9 +2644,8 @@ int job_trove_dspace_cancel(PVFS_fs_id c
 
     /* tell thread mgr to cancel operation.  This will result in normal
      * completion path through thread mgr callbacks; no more work to do here */
-    ret = PINT_thread_mgr_trove_cancel(query->u.trove.id,
-	coll_id,
-	&(query->trove_callback));
+    ret = PINT_thread_mgr_trove_cancel(
+        query->u.trove.id, coll_id, &(query->trove_callback));
 
     gen_mutex_unlock(&completion_mutex);
 
@@ -2650,6 +2703,7 @@ int job_trove_fs_create(char *collname,
     {
 	/* error posting trove operation */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2662,6 +2716,7 @@ int job_trove_fs_create(char *collname,
 	out_status_p->error_code = 0;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2741,6 +2796,7 @@ int job_trove_fs_lookup(char *collname,
     {
 	/* error posting trove operation */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2754,6 +2810,7 @@ int job_trove_fs_lookup(char *collname,
 	out_status_p->status_user_tag = status_user_tag;
 	out_status_p->coll_id = jd->u.trove.fsid;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2816,6 +2873,7 @@ int job_trove_fs_seteattr(PVFS_fs_id col
     {
 	/* error posting trove operation */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2828,6 +2886,7 @@ int job_trove_fs_seteattr(PVFS_fs_id col
 	out_status_p->error_code = 0;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2893,6 +2952,7 @@ int job_trove_fs_geteattr(PVFS_fs_id col
     {
 	/* error posting trove operation */
 	dealloc_job_desc(jd);
+        jd = NULL;
 	/* TODO: handle this correctly */
 	out_status_p->error_code = -EINVAL;
 	out_status_p->status_user_tag = status_user_tag;
@@ -2905,6 +2965,7 @@ int job_trove_fs_geteattr(PVFS_fs_id col
 	out_status_p->error_code = 0;
 	out_status_p->status_user_tag = status_user_tag;
 	dealloc_job_desc(jd);
+        jd = NULL;
 	return (ret);
     }
 
@@ -2970,7 +3031,7 @@ int job_test(job_id_t id,
 	gen_mutex_unlock(&completion_mutex);
 	return(completion_error);
     }
-    query = id_gen_fast_lookup(id);
+    query = id_gen_safe_lookup(id);
     if(query->completed_flag)
     {
 	job_desc_q_remove(query);
@@ -3036,7 +3097,7 @@ int job_test(job_id_t id,
 		gen_mutex_unlock(&completion_mutex);
 		return(completion_error);
 	    }
-	    query = id_gen_fast_lookup(id);
+	    query = id_gen_safe_lookup(id);
 	    if(query->completed_flag)
 	    {
 		job_desc_q_remove(query);
@@ -3075,6 +3136,7 @@ job_test_complete:
     else
     {
 	dealloc_job_desc(query);
+        query = NULL;
     }
     return(1);
 }
@@ -3300,11 +3362,11 @@ int job_testsome(job_id_t * id_array,
  * returns 0 on success, -errno on failure
  */
 int job_testcontext(job_id_t * out_id_array_p,
-		  int *inout_count_p,
-		  void **returned_user_ptr_array,
-		  job_status_s * out_status_array_p,
-		  int timeout_ms,
-		  job_context_id context_id)
+                    int *inout_count_p,
+                    void **returned_user_ptr_array,
+                    job_status_s * out_status_array_p,
+                    int timeout_ms,
+                    job_context_id context_id)
 {
     int ret = -1;
 #ifdef __PVFS2_JOB_THREADED__
@@ -3383,6 +3445,7 @@ int job_testcontext(job_id_t * out_id_ar
 	    do_one_work_cycle_all(10);
 	else
 	    do_one_work_cycle_all(0);
+
 	ret = 0;
 #endif
 	if (ret == ETIMEDOUT)
@@ -3425,7 +3488,7 @@ int job_testcontext(job_id_t * out_id_ar
 	}
 
 	timeout_remaining -= (end.tv_sec - start.tv_sec) * 1000 +
-	    (end.tv_usec - start.tv_usec) / 1000;
+	    ((end.tv_usec - start.tv_usec) / 1000);
 
     } while (timeout_remaining > 0);
 
@@ -3486,27 +3549,31 @@ static void teardown_queues(void)
  *
  * no return value
  */
-static void trove_thread_mgr_callback(void* data, 
+static void trove_thread_mgr_callback(
+    void* data, 
     PVFS_error error_code)
 {
     struct job_desc* tmp_desc = (struct job_desc*)data; 
+    assert(tmp_desc);
 
-    /* set job descriptor fields and put into completion queue */
-    tmp_desc->u.trove.state = error_code;
     gen_mutex_lock(&completion_mutex);
-    job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
-	tmp_desc);
-    /* set completed flag while holding queue lock */
-    tmp_desc->completed_flag = 1;
-    gen_mutex_unlock(&completion_mutex);
+    if (tmp_desc->completed_flag == 0)
+    {
+        /* set job descriptor fields and put into completion queue */
+        tmp_desc->u.trove.state = error_code;
+        job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
+                       tmp_desc);
+        /* set completed flag while holding queue lock */
+        tmp_desc->completed_flag = 1;
 
-    trove_pending_count--;
+        trove_pending_count--;
 
 #ifdef __PVFS2_JOB_THREADED__
-    /* wake up anyone waiting for completion */
-    pthread_cond_signal(&completion_cond);
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
 #endif
-    return;
+    }
+    gen_mutex_unlock(&completion_mutex);
 }
 
 /* bmi_thread_mgr_callback()
@@ -3516,30 +3583,33 @@ static void trove_thread_mgr_callback(vo
  *
  * no return value
  */
-static void bmi_thread_mgr_callback(void* data, 
+static void bmi_thread_mgr_callback(
+    void* data, 
     PVFS_size actual_size,
     PVFS_error error_code)
 {
-    struct job_desc* tmp_desc = (struct job_desc*)data; 
+    struct job_desc* tmp_desc = (struct job_desc*)data;
+    assert(tmp_desc);
 
-    /* set job descriptor fields and put into completion queue */
-    tmp_desc->u.bmi.error_code = error_code;
-    tmp_desc->u.bmi.actual_size = actual_size;
     gen_mutex_lock(&completion_mutex);
-    job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
-	tmp_desc);
-    /* set completed flag while holding queue lock */
-    tmp_desc->completed_flag = 1;
-    gen_mutex_unlock(&completion_mutex);
+    if (tmp_desc->completed_flag == 0)
+    {
+        /* set job descriptor fields and put into completion queue */
+        tmp_desc->u.bmi.error_code = error_code;
+        tmp_desc->u.bmi.actual_size = actual_size;
+        job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+                       tmp_desc);
+        /* set completed flag while holding queue lock */
+        tmp_desc->completed_flag = 1;
 
-    bmi_pending_count--;
+        bmi_pending_count--;
 
 #ifdef __PVFS2_JOB_THREADED__
-    /* wake up anyone waiting for completion */
-    pthread_cond_signal(&completion_cond);
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
 #endif
-
-    return;
+    }
+    gen_mutex_unlock(&completion_mutex);
 }
 
 /* bmi_thread_mgr_unexp_handler()
@@ -3549,35 +3619,41 @@ static void bmi_thread_mgr_callback(void
  *
  * no return value
  */
-static void bmi_thread_mgr_unexp_handler(struct BMI_unexpected_info* unexp)
+static void bmi_thread_mgr_unexp_handler(
+    struct BMI_unexpected_info* unexp)
 {
     struct job_desc* tmp_desc = NULL; 
 
-    /* remove the operation from the pending bmi_unexp queue */
     gen_mutex_lock(&bmi_unexp_mutex);
+    /* remove the operation from the pending bmi_unexp queue */
     tmp_desc = job_desc_q_shownext(bmi_unexp_queue);
     assert(tmp_desc != NULL);	/* TODO: fix this */
-    job_desc_q_remove(tmp_desc);
-    bmi_unexp_pending_count--;
-    gen_mutex_unlock(&bmi_unexp_mutex);
-    /* set appropriate fields and store in completed queue */
-    *(tmp_desc->u.bmi_unexp.info) = *unexp;
-    gen_mutex_lock(&completion_mutex);
-    /* set completed flag while holding queue lock */
-    tmp_desc->completed_flag = 1;
-    if (completion_queue_array[tmp_desc->context_id] != 0)
+    if (tmp_desc->completed_flag == 0)
     {
-        job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
-                       tmp_desc);
-    }
-    gen_mutex_unlock(&completion_mutex);
+        job_desc_q_remove(tmp_desc);
+        bmi_unexp_pending_count--;
+        gen_mutex_unlock(&bmi_unexp_mutex);
+        /* set appropriate fields and store in completed queue */
+        *(tmp_desc->u.bmi_unexp.info) = *unexp;
+        gen_mutex_lock(&completion_mutex);
+        /* set completed flag while holding queue lock */
+        tmp_desc->completed_flag = 1;
+        if (completion_queue_array[tmp_desc->context_id])
+        {
+            job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
+                           tmp_desc);
+        }
+        gen_mutex_unlock(&completion_mutex);
 
 #ifdef __PVFS2_JOB_THREADED__
-    /* wake up anyone waiting for completion */
-    pthread_cond_signal(&completion_cond);
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
 #endif
-
-    return;
+    }
+    else
+    {
+        gen_mutex_unlock(&bmi_unexp_mutex);
+    }
 }
 
 /* dev_thread_mgr_unexp_handler()
@@ -3591,31 +3667,36 @@ static void dev_thread_mgr_unexp_handler
 {
     struct job_desc* tmp_desc = NULL; 
 
-    /* remove the operation from the pending dev_unexp queue */
     gen_mutex_lock(&dev_unexp_mutex);
+    /* remove the operation from the pending dev_unexp queue */
     tmp_desc = job_desc_q_shownext(dev_unexp_queue);
     assert(tmp_desc != NULL);	/* TODO: fix this */
-    job_desc_q_remove(tmp_desc);
-    dev_unexp_pending_count--;
-    gen_mutex_unlock(&dev_unexp_mutex);
-    /* set appropriate fields and store in completed queue */
-    *(tmp_desc->u.dev_unexp.info) = *unexp;
-    gen_mutex_lock(&completion_mutex);
-    /* set completed flag while holding queue lock */
-    tmp_desc->completed_flag = 1;
-    if (completion_queue_array[tmp_desc->context_id] != 0)
+    if (tmp_desc->completed_flag == 0)
     {
-        job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
-                       tmp_desc);
-    }
-    gen_mutex_unlock(&completion_mutex);
+        job_desc_q_remove(tmp_desc);
+        dev_unexp_pending_count--;
+        gen_mutex_unlock(&dev_unexp_mutex);
+        /* set appropriate fields and store in completed queue */
+        *(tmp_desc->u.dev_unexp.info) = *unexp;
+        gen_mutex_lock(&completion_mutex);
+        /* set completed flag while holding queue lock */
+        tmp_desc->completed_flag = 1;
+        if (completion_queue_array[tmp_desc->context_id])
+        {
+            job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
+                           tmp_desc);
+        }
+        gen_mutex_unlock(&completion_mutex);
 
 #ifdef __PVFS2_JOB_THREADED__
-    /* wake up anyone waiting for completion */
-    pthread_cond_signal(&completion_cond);
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
 #endif
-
-    return;
+    }
+    else
+    {
+        gen_mutex_unlock(&dev_unexp_mutex);
+    }
 }
 
 /* fill_status()
@@ -3628,6 +3709,9 @@ static void fill_status(struct job_desc 
 			void **returned_user_ptr_p,
 			job_status_s * status)
 {
+    assert(jd);
+    assert(status);
+
     status->status_user_tag = jd->status_user_tag;
 
     if (returned_user_ptr_p)
@@ -3750,7 +3834,7 @@ static int completion_query_some(job_id_
 
     for(i=0; i<incount; i++)
     {
-	tmp_desc = id_gen_fast_lookup(id_array[i]);
+	tmp_desc = id_gen_safe_lookup(id_array[i]);
 	if(tmp_desc && tmp_desc->completed_flag)
 	{
 	    if(returned_user_ptr_array)
@@ -3776,6 +3860,7 @@ static int completion_query_some(job_id_
 	    else
 	    {
 		dealloc_job_desc(tmp_desc);
+                tmp_desc = NULL;
 	    }
 	    out_index_array[*inout_count_p] = i;
 	    (*inout_count_p)++;
@@ -3807,6 +3892,8 @@ static int completion_query_context(job_
 					job_desc_q_shownext(
 					completion_queue_array[context_id])))
     {
+        assert(query);
+
 	if (returned_user_ptr_array)
 	{
 	    fill_status(query, &(returned_user_ptr_array[*inout_count_p]),
@@ -3830,6 +3917,7 @@ static int completion_query_context(job_
 	else
 	{
 	    dealloc_job_desc(query);
+            query = NULL;
 	}
     }
     gen_mutex_unlock(&completion_mutex);
@@ -3849,10 +3937,15 @@ static void do_one_work_cycle_all(int id
     int total_pending_count = bmi_pending_count + bmi_unexp_pending_count
 	+ flow_pending_count + dev_unexp_pending_count + trove_pending_count;
 
-    if(bmi_pending_count || bmi_unexp_pending_count || flow_pending_count)
+    if (bmi_pending_count || bmi_unexp_pending_count || flow_pending_count)
+    {
 	PINT_thread_mgr_bmi_push(idle_time_ms);
-    if(dev_unexp_pending_count)
+        idle_time_ms = 0;
+    }
+    if (dev_unexp_pending_count)
+    {
 	PINT_thread_mgr_dev_push(idle_time_ms);
+    }
 #ifdef __PVFS2_TROVE_SUPPORT__
     if(trove_pending_count || flow_pending_count)
 	PINT_thread_mgr_trove_push(idle_time_ms);

Index: job.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.h,v
diff -p -u -r1.43 -r1.44
--- job.h	27 Apr 2004 17:46:17 -0000	1.43
+++ job.h	8 Jul 2004 16:17:11 -0000	1.44
@@ -140,6 +140,7 @@ int job_dev_unexp(struct PINT_dev_unexp_
 		  job_aint status_user_tag,
 		  job_status_s * out_status_p,
 		  job_id_t* id,
+		  enum job_flags flags,
 		  job_context_id context_id);
 
 /* device write */

Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.20 -r1.21
--- thread-mgr.c	18 May 2004 15:12:24 -0000	1.20
+++ thread-mgr.c	8 Jul 2004 16:17:11 -0000	1.21
@@ -6,6 +6,7 @@
 
 #include <pthread.h>
 #include <stdlib.h>
+#include <string.h>
 #include <assert.h>
 
 #include "pvfs2-types.h"
@@ -111,24 +112,33 @@ static void *trove_thread_function(void 
 #endif
 	gen_mutex_unlock(&trove_test_mutex);
 
-	if(ret < 0)
+	if (ret < 0)
 	{
-	    /* critical error */
-	    /* TODO: how to handle this */
-	    assert(0);
-	    gossip_lerr("Error: critical Trove failure.\n");
+	    PVFS_perror_gossip("critical Trove failure.\n", ret);
+#ifdef __PVFS2_JOB_THREADED__
+            gossip_err("trove_thread_function thread terminating\n");
+            break;
+#endif
+            return NULL;
 	}
 
 	for(i=0; i<trove_test_count; i++)
 	{
-	    /* execute a callback function for each completed BMI operation */
-	    tmp_callback = 
-		(struct PINT_thread_mgr_trove_callback*)stat_trove_user_ptr_array[i];
-	    /* sanity check */
-	    assert(tmp_callback != NULL);
-	    assert(tmp_callback->fn != NULL);
-	   
-	    tmp_callback->fn(tmp_callback->data, stat_trove_error_code_array[i]);
+	    /* execute a callback for each completed BMI operation */
+	    tmp_callback =  (struct PINT_thread_mgr_trove_callback*)
+                stat_trove_user_ptr_array[i];
+
+            if (!tmp_callback || !tmp_callback->fn)
+            {
+                gossip_err("critical Trove failure (null callback)\n");
+#ifdef __PVFS2_JOB_THREADED__
+                gossip_err("trove_thread_function thread terminating\n");
+                break;
+#endif
+                return NULL;
+            }
+	    tmp_callback->fn(tmp_callback->data,
+                             stat_trove_error_code_array[i]);
 	}
     }
     return (NULL);
@@ -160,7 +170,8 @@ static void *bmi_thread_function(void *p
 		incount = THREAD_MGR_TEST_COUNT;
 	    gen_mutex_unlock(&bmi_mutex);
 
-	    ret = BMI_testunexpected(incount, &outcount, stat_bmi_unexp_array, 0);
+	    ret = BMI_testunexpected(
+                incount, &outcount, stat_bmi_unexp_array, 0);
 	    if(ret < 0)
 	    {
 		/* critical failure */
@@ -169,7 +180,7 @@ static void *bmi_thread_function(void *p
 		gossip_lerr("Error: critical BMI failure.\n");
 	    }
 
-	    /* execute callback function for each completed unexpected message */
+	    /* execute callback for each completed unexpected message */
 	    gen_mutex_lock(&bmi_mutex);
 	    for(i=0; i<outcount; i++)
 	    {
@@ -209,6 +220,9 @@ static void *bmi_thread_function(void *p
 	incount = THREAD_MGR_TEST_COUNT;
 	bmi_test_count = 0;
 
+        memset(stat_bmi_user_ptr_array, 0,
+               (THREAD_MGR_TEST_COUNT * sizeof(void *)));
+
 	ret = BMI_testcontext(incount, stat_bmi_id_array, &bmi_test_count,
 	    stat_bmi_error_code_array, stat_bmi_actual_size_array,
 	    stat_bmi_user_ptr_array, test_timeout, global_bmi_context);
@@ -222,23 +236,33 @@ static void *bmi_thread_function(void *p
 
 	if(ret < 0)
 	{
-	    /* critical error */
-	    /* TODO: how to handle this */
-	    assert(0);
-	    gossip_lerr("Error: critical BMI failure.\n");
+	    PVFS_perror_gossip("critical BMI failure.\n", ret);
+#ifdef __PVFS2_JOB_THREADED__
+            gossip_err("bmi_thread_function thread terminating\n");
+            break;
+#endif
+            return NULL;
 	}
 
 	for(i=0; i<bmi_test_count; i++)
 	{
-	    /* execute a callback function for each completed BMI operation */
-	    tmp_callback = 
-		(struct PINT_thread_mgr_bmi_callback*)stat_bmi_user_ptr_array[i];
-	    /* sanity check */
-	    assert(tmp_callback != NULL);
-	    assert(tmp_callback->fn != NULL);
-	
-	    tmp_callback->fn(tmp_callback->data, stat_bmi_actual_size_array[i],
-		stat_bmi_error_code_array[i]);
+	    /* execute a callback for each completed BMI operation */
+	    tmp_callback = (struct PINT_thread_mgr_bmi_callback*)
+                stat_bmi_user_ptr_array[i];
+
+            if (!tmp_callback || !tmp_callback->fn)
+            {
+                gossip_err("critical BMI failure (null callback)\n");
+#ifdef __PVFS2_JOB_THREADED__
+                gossip_err("bmi_thread_function thread terminating\n");
+                break;
+#endif
+                return NULL;
+            }
+
+	    tmp_callback->fn(tmp_callback->data,
+                             stat_bmi_actual_size_array[i],
+                             stat_bmi_error_code_array[i]);
 	}
     }
 
@@ -266,17 +290,20 @@ static void *dev_thread_function(void *p
 	    incount = THREAD_MGR_TEST_COUNT;
 	gen_mutex_unlock(&dev_mutex);
 
-	ret = PINT_dev_test_unexpected(incount, &outcount,
-	    stat_dev_unexp_array, timeout);
-	if(ret < 0)
+	ret = PINT_dev_test_unexpected(
+            incount, &outcount, stat_dev_unexp_array, timeout);
+
+	if (ret < 0)
 	{
-	    /* critical failure */
-	    /* TODO: how to handle this? */
-	    gossip_lerr("Error: critical device failure.\n");
-	    assert(0);
+            PVFS_perror_gossip("critical device failure", ret);
+#ifdef __PVFS2_JOB_THREADED__
+            gossip_err("dev_thread_function thread terminating\n");
+            break;
+#endif
+            return NULL;
 	}
 
-	/* execute callback function for each completed unexpected message */
+	/* execute callback for each completed unexpected message */
 	gen_mutex_lock(&dev_mutex);
 	for(i=0; i<outcount; i++)
 	{



More information about the PVFS2-CVS mailing list