[PVFS2-CVS]
commit by neill in pvfs2/src/io/job: job-desc-queue.c job.c job.h
thread-mgr.c
CVS commit program
cvs at parl.clemson.edu
Thu Jul 8 13:17:11 EDT 2004
Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv12211/src/io/job
Modified Files:
job-desc-queue.c job.c job.h thread-mgr.c
Log Message:
- merging in the pvfs2-nm-nb-branch with the main tree
see ChangeLog for details, or browse the cvs history of the branch
for full details
Index: job-desc-queue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job-desc-queue.c,v
diff -p -u -r1.10 -r1.11
--- job-desc-queue.c 18 Dec 2003 20:31:34 -0000 1.10
+++ job-desc-queue.c 8 Jul 2004 16:17:11 -0000 1.11
@@ -41,7 +41,7 @@ struct job_desc *alloc_job_desc(int type
}
memset(jd, 0, sizeof(struct job_desc));
- if (id_gen_fast_register(&(jd->job_id), jd) < 0)
+ if (id_gen_safe_register(&(jd->job_id), jd) < 0)
{
free(jd);
return (NULL);
@@ -59,6 +59,7 @@ struct job_desc *alloc_job_desc(int type
*/
void dealloc_job_desc(struct job_desc *jd)
{
+ id_gen_safe_unregister(jd->job_id);
free(jd);
return;
}
@@ -135,12 +136,13 @@ void job_desc_q_add(job_desc_q_p jdqp,
gen_mutex_lock(s_job_desc_q_mutex);
if (jdqp)
{
+ assert(desc);
+
/* note that we are adding to tail to preserve fifo order */
qlist_add_tail(&(desc->job_desc_q_link), jdqp);
}
gen_mutex_unlock(s_job_desc_q_mutex);
}
- return;
}
/* job_desc_q_remove()
@@ -151,8 +153,8 @@ void job_desc_q_add(job_desc_q_p jdqp,
*/
void job_desc_q_remove(struct job_desc *desc)
{
+ assert(desc);
qlist_del(&(desc->job_desc_q_link));
- return;
}
/* job_desc_q_empty()
Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.c,v
diff -p -u -r1.129 -r1.130
--- job.c 18 May 2004 15:30:46 -0000 1.129
+++ job.c 8 Jul 2004 16:17:11 -0000 1.130
@@ -298,6 +298,7 @@ int job_bmi_send(PVFS_BMI_addr_t addr,
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_BMI_SEND, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -309,6 +310,7 @@ int job_bmi_send(PVFS_BMI_addr_t addr,
out_status_p->actual_size = size;
JOB_EVENT_END(PVFS_EVENT_BMI_SEND, size, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -395,6 +397,7 @@ int job_bmi_send_list(PVFS_BMI_addr_t ad
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_BMI_SEND, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -406,6 +409,7 @@ int job_bmi_send_list(PVFS_BMI_addr_t ad
out_status_p->actual_size = total_size;
JOB_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -472,6 +476,7 @@ int job_bmi_recv(PVFS_BMI_addr_t addr,
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_BMI_RECV, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -484,6 +489,7 @@ int job_bmi_recv(PVFS_BMI_addr_t addr,
JOB_EVENT_END(PVFS_EVENT_BMI_RECV, out_status_p->actual_size,
jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -557,6 +563,7 @@ int job_bmi_recv_list(PVFS_BMI_addr_t ad
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_BMI_RECV, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -569,6 +576,7 @@ int job_bmi_recv_list(PVFS_BMI_addr_t ad
JOB_EVENT_END(PVFS_EVENT_BMI_RECV, out_status_p->actual_size,
jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -638,6 +646,7 @@ int job_bmi_unexp(struct BMI_unexpected_
{
/* error testing */
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -647,10 +656,13 @@ int job_bmi_unexp(struct BMI_unexpected_
out_status_p->error_code = jd->u.bmi_unexp.info->error_code;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
}
+ PINT_thread_mgr_bmi_unexp_handler(bmi_thread_mgr_unexp_handler);
+
/* if we fall through to this point, then there were not any
* uenxpected receive's available; queue up to test later
*/
@@ -660,8 +672,6 @@ int job_bmi_unexp(struct BMI_unexpected_
bmi_unexp_pending_count++;
gen_mutex_unlock(&bmi_unexp_mutex);
- PINT_thread_mgr_bmi_unexp_handler(bmi_thread_mgr_unexp_handler);
-
return (0);
}
@@ -679,18 +689,19 @@ int job_bmi_cancel(job_id_t id, job_cont
gen_mutex_lock(&completion_mutex);
- query = id_gen_fast_lookup(id);
- if(query->completed_flag)
+ query = id_gen_safe_lookup(id);
+ if (!query || query->completed_flag)
{
/* job has already completed, no cancellation needed */
gen_mutex_unlock(&completion_mutex);
return(0);
}
- /* tell thread mgr to cancel operation. This will result in normal
- * completion path through thread mgr callbacks; no more work to do here */
- ret = PINT_thread_mgr_bmi_cancel(query->u.bmi.id,
- &(query->bmi_callback));
+ /* tell thread mgr to cancel operation. This will result in
+ * normal completion path through thread mgr callbacks; no more
+ * work to do here */
+ ret = PINT_thread_mgr_bmi_cancel(
+ query->u.bmi.id, &(query->bmi_callback));
gen_mutex_unlock(&completion_mutex);
@@ -705,18 +716,19 @@ int job_bmi_cancel(job_id_t id, job_cont
* returns 0 on success, -errno on failure, and 1 on immediate
* completion
*/
-int job_dev_unexp(struct PINT_dev_unexp_info* dev_unexp_d,
+int job_dev_unexp(
+ struct PINT_dev_unexp_info* dev_unexp_d,
void* user_ptr,
job_aint status_user_tag,
job_status_s * out_status_p,
job_id_t* id,
+ enum job_flags flags,
job_context_id context_id)
{
/* post a dev recv for an unexpected message. We will do a quick
* test to see if an unexpected message is available. If so, we
* return the necessary info; if not we queue up to test again later
*/
-
int ret = -1;
struct job_desc *jd = NULL;
int outcount = 0;
@@ -734,26 +746,36 @@ int job_dev_unexp(struct PINT_dev_unexp_
jd->context_id = context_id;
jd->status_user_tag = status_user_tag;
- ret = PINT_dev_test_unexpected(1, &outcount, jd->u.dev_unexp.info, 0);
-
- if (ret < 0)
+ /* only look for immediate completion if our flags alow it */
+ if (!(flags & JOB_NO_IMMED_COMPLETE))
{
- /* error testing */
- dealloc_job_desc(jd);
- return (ret);
- }
+ ret = PINT_dev_test_unexpected(
+ 1, &outcount, jd->u.dev_unexp.info, 0);
- if (outcount == 1)
- {
- /* there was an unexpected job available */
- out_status_p->error_code = 0;
- out_status_p->status_user_tag = status_user_tag;
- dealloc_job_desc(jd);
- return (ret);
+ if (ret < 0)
+ {
+ /* error testing */
+ dealloc_job_desc(jd);
+ jd = NULL;
+ return (ret);
+ }
+
+ if (outcount == 1)
+ {
+ /* there was an unexpected job available */
+ out_status_p->error_code = 0;
+ out_status_p->status_user_tag = status_user_tag;
+ dealloc_job_desc(jd);
+ jd = NULL;
+ return (ret);
+ }
}
+ PINT_thread_mgr_dev_unexp_handler(dev_thread_mgr_unexp_handler);
+
/* if we fall through to this point, then there were not any
- * uenxpected receive's available; queue up to test later
+ * uenxpected receive's available (or none requested); queue up to
+ * test later
*/
gen_mutex_lock(&dev_unexp_mutex);
*id = jd->job_id;
@@ -761,8 +783,6 @@ int job_dev_unexp(struct PINT_dev_unexp_
dev_unexp_pending_count++;
gen_mutex_unlock(&dev_unexp_mutex);
- PINT_thread_mgr_dev_unexp_handler(dev_thread_mgr_unexp_handler);
-
return (0);
}
@@ -893,6 +913,7 @@ int job_req_sched_post(struct PVFS_serve
{
/* error posting */
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -956,6 +977,7 @@ int job_req_sched_post_timer(int msecs,
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -965,6 +987,7 @@ int job_req_sched_post_timer(int msecs,
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -1009,13 +1032,14 @@ int job_req_sched_release(job_id_t in_co
jd->context_id = context_id;
jd->status_user_tag = status_user_tag;
- match_jd = id_gen_fast_lookup(in_completed_id);
+ match_jd = id_gen_safe_lookup(in_completed_id);
ret = PINT_req_sched_release(match_jd->u.req_sched.id, jd,
&(jd->u.req_sched.id));
/* delete the old req sched job desc; it is no longer needed */
dealloc_job_desc(match_jd);
+ match_jd = NULL;
/* NOTE: I am letting the return value propigate here, rather
* than just setting the status. Failure here is bad...
@@ -1024,6 +1048,7 @@ int job_req_sched_release(job_id_t in_co
{
/* error posting */
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1033,6 +1058,7 @@ int job_req_sched_release(job_id_t in_co
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -1088,6 +1114,7 @@ int job_flow(flow_descriptor * flow_d,
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_FLOW, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
if (ret == 1)
@@ -1098,6 +1125,7 @@ int job_flow(flow_descriptor * flow_d,
out_status_p->actual_size = flow_d->total_transfered;
JOB_EVENT_END(PVFS_EVENT_FLOW, flow_d->total_transfered, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (1);
}
@@ -1125,9 +1153,9 @@ int job_flow_cancel(job_id_t id, job_con
gen_mutex_lock(&completion_mutex);
- query = id_gen_fast_lookup(id);
+ query = id_gen_safe_lookup(id);
- if(query->completed_flag)
+ if (!query || query->completed_flag)
{
/* job has already completed, no cancellation needed */
gen_mutex_unlock(&completion_mutex);
@@ -1207,6 +1235,7 @@ int job_trove_bstream_write_at(PVFS_fs_i
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_WRITE_AT, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -1223,6 +1252,7 @@ int job_trove_bstream_write_at(PVFS_fs_i
JOB_EVENT_END(PVFS_EVENT_TROVE_WRITE_AT, out_status_p->actual_size,
jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1298,6 +1328,7 @@ int job_trove_bstream_read_at(PVFS_fs_id
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_READ_AT, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -1314,6 +1345,7 @@ int job_trove_bstream_read_at(PVFS_fs_id
JOB_EVENT_END(PVFS_EVENT_TROVE_READ_AT, out_status_p->actual_size,
jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1377,6 +1409,7 @@ int job_trove_bstream_flush(PVFS_fs_id c
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_FLUSH, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1388,6 +1421,7 @@ int job_trove_bstream_flush(PVFS_fs_id c
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_FLUSH, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
/* if we fall through to this point, the job did not
@@ -1458,7 +1492,7 @@ int job_trove_keyval_read(PVFS_fs_id col
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1472,6 +1506,7 @@ int job_trove_keyval_read(PVFS_fs_id col
out_status_p->vtag = jd->u.trove.vtag;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1545,7 +1580,7 @@ int job_trove_keyval_read_list(PVFS_fs_i
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ_LIST, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1559,6 +1594,7 @@ int job_trove_keyval_read_list(PVFS_fs_i
out_status_p->vtag = jd->u.trove.vtag;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_READ_LIST, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1631,7 +1667,7 @@ int job_trove_keyval_write(PVFS_fs_id co
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1645,6 +1681,7 @@ int job_trove_keyval_write(PVFS_fs_id co
out_status_p->vtag = jd->u.trove.vtag;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1707,6 +1744,7 @@ int job_trove_keyval_flush(PVFS_fs_id co
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_FLUSH, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1719,6 +1757,7 @@ int job_trove_keyval_flush(PVFS_fs_id co
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_FLUSH, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1790,6 +1829,7 @@ int job_trove_dspace_getattr(PVFS_fs_id
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_GETATTR, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1802,6 +1842,7 @@ int job_trove_dspace_getattr(PVFS_fs_id
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_GETATTR, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1873,6 +1914,7 @@ int job_trove_dspace_setattr(PVFS_fs_id
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_SETATTR, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1885,6 +1927,7 @@ int job_trove_dspace_setattr(PVFS_fs_id
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_SETATTR, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -1957,6 +2000,7 @@ int job_trove_bstream_resize(PVFS_fs_id
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_RESIZE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -1969,6 +2013,7 @@ int job_trove_bstream_resize(PVFS_fs_id
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_BSTREAM_RESIZE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2059,7 +2104,7 @@ int job_trove_keyval_remove(PVFS_fs_id c
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_REMOVE, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -2073,6 +2118,7 @@ int job_trove_keyval_remove(PVFS_fs_id c
out_status_p->vtag = jd->u.trove.vtag;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_REMOVE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2170,7 +2216,7 @@ int job_trove_keyval_iterate(PVFS_fs_id
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -2186,6 +2232,7 @@ int job_trove_keyval_iterate(PVFS_fs_id
out_status_p->count = jd->u.trove.count;
JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2260,7 +2307,7 @@ int job_trove_dspace_iterate_handles(PVF
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_ITERATE_HANDLES, 0, jd->job_id);
dealloc_job_desc(jd);
-
+ jd = NULL;
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
return (1);
@@ -2276,6 +2323,7 @@ int job_trove_dspace_iterate_handles(PVF
out_status_p->count = jd->u.trove.count;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_ITERATE_HANDLES, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2375,6 +2423,7 @@ int job_trove_dspace_create(PVFS_fs_id c
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2389,6 +2438,7 @@ int job_trove_dspace_create(PVFS_fs_id c
out_status_p->handle = jd->u.trove.handle;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2457,6 +2507,7 @@ int job_trove_dspace_remove(PVFS_fs_id c
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2470,6 +2521,7 @@ int job_trove_dspace_remove(PVFS_fs_id c
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2538,6 +2590,7 @@ int job_trove_dspace_verify(PVFS_fs_id c
/* error posting trove operation */
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_VERIFY, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2551,6 +2604,7 @@ int job_trove_dspace_verify(PVFS_fs_id c
out_status_p->status_user_tag = status_user_tag;
JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_VERIFY, 0, jd->job_id);
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2580,8 +2634,8 @@ int job_trove_dspace_cancel(PVFS_fs_id c
gen_mutex_lock(&completion_mutex);
- query = id_gen_fast_lookup(id);
- if(query->completed_flag)
+ query = id_gen_safe_lookup(id);
+ if (!query || query->completed_flag)
{
/* job has already completed, no cancellation needed */
gen_mutex_unlock(&completion_mutex);
@@ -2590,9 +2644,8 @@ int job_trove_dspace_cancel(PVFS_fs_id c
/* tell thread mgr to cancel operation. This will result in normal
* completion path through thread mgr callbacks; no more work to do here */
- ret = PINT_thread_mgr_trove_cancel(query->u.trove.id,
- coll_id,
- &(query->trove_callback));
+ ret = PINT_thread_mgr_trove_cancel(
+ query->u.trove.id, coll_id, &(query->trove_callback));
gen_mutex_unlock(&completion_mutex);
@@ -2650,6 +2703,7 @@ int job_trove_fs_create(char *collname,
{
/* error posting trove operation */
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2662,6 +2716,7 @@ int job_trove_fs_create(char *collname,
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2741,6 +2796,7 @@ int job_trove_fs_lookup(char *collname,
{
/* error posting trove operation */
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2754,6 +2810,7 @@ int job_trove_fs_lookup(char *collname,
out_status_p->status_user_tag = status_user_tag;
out_status_p->coll_id = jd->u.trove.fsid;
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2816,6 +2873,7 @@ int job_trove_fs_seteattr(PVFS_fs_id col
{
/* error posting trove operation */
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2828,6 +2886,7 @@ int job_trove_fs_seteattr(PVFS_fs_id col
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2893,6 +2952,7 @@ int job_trove_fs_geteattr(PVFS_fs_id col
{
/* error posting trove operation */
dealloc_job_desc(jd);
+ jd = NULL;
/* TODO: handle this correctly */
out_status_p->error_code = -EINVAL;
out_status_p->status_user_tag = status_user_tag;
@@ -2905,6 +2965,7 @@ int job_trove_fs_geteattr(PVFS_fs_id col
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
+ jd = NULL;
return (ret);
}
@@ -2970,7 +3031,7 @@ int job_test(job_id_t id,
gen_mutex_unlock(&completion_mutex);
return(completion_error);
}
- query = id_gen_fast_lookup(id);
+ query = id_gen_safe_lookup(id);
if(query->completed_flag)
{
job_desc_q_remove(query);
@@ -3036,7 +3097,7 @@ int job_test(job_id_t id,
gen_mutex_unlock(&completion_mutex);
return(completion_error);
}
- query = id_gen_fast_lookup(id);
+ query = id_gen_safe_lookup(id);
if(query->completed_flag)
{
job_desc_q_remove(query);
@@ -3075,6 +3136,7 @@ job_test_complete:
else
{
dealloc_job_desc(query);
+ query = NULL;
}
return(1);
}
@@ -3300,11 +3362,11 @@ int job_testsome(job_id_t * id_array,
* returns 0 on success, -errno on failure
*/
int job_testcontext(job_id_t * out_id_array_p,
- int *inout_count_p,
- void **returned_user_ptr_array,
- job_status_s * out_status_array_p,
- int timeout_ms,
- job_context_id context_id)
+ int *inout_count_p,
+ void **returned_user_ptr_array,
+ job_status_s * out_status_array_p,
+ int timeout_ms,
+ job_context_id context_id)
{
int ret = -1;
#ifdef __PVFS2_JOB_THREADED__
@@ -3383,6 +3445,7 @@ int job_testcontext(job_id_t * out_id_ar
do_one_work_cycle_all(10);
else
do_one_work_cycle_all(0);
+
ret = 0;
#endif
if (ret == ETIMEDOUT)
@@ -3425,7 +3488,7 @@ int job_testcontext(job_id_t * out_id_ar
}
timeout_remaining -= (end.tv_sec - start.tv_sec) * 1000 +
- (end.tv_usec - start.tv_usec) / 1000;
+ ((end.tv_usec - start.tv_usec) / 1000);
} while (timeout_remaining > 0);
@@ -3486,27 +3549,31 @@ static void teardown_queues(void)
*
* no return value
*/
-static void trove_thread_mgr_callback(void* data,
+static void trove_thread_mgr_callback(
+ void* data,
PVFS_error error_code)
{
struct job_desc* tmp_desc = (struct job_desc*)data;
+ assert(tmp_desc);
- /* set job descriptor fields and put into completion queue */
- tmp_desc->u.trove.state = error_code;
gen_mutex_lock(&completion_mutex);
- job_desc_q_add(completion_queue_array[tmp_desc->context_id],
- tmp_desc);
- /* set completed flag while holding queue lock */
- tmp_desc->completed_flag = 1;
- gen_mutex_unlock(&completion_mutex);
+ if (tmp_desc->completed_flag == 0)
+ {
+ /* set job descriptor fields and put into completion queue */
+ tmp_desc->u.trove.state = error_code;
+ job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+ tmp_desc);
+ /* set completed flag while holding queue lock */
+ tmp_desc->completed_flag = 1;
- trove_pending_count--;
+ trove_pending_count--;
#ifdef __PVFS2_JOB_THREADED__
- /* wake up anyone waiting for completion */
- pthread_cond_signal(&completion_cond);
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
#endif
- return;
+ }
+ gen_mutex_unlock(&completion_mutex);
}
/* bmi_thread_mgr_callback()
@@ -3516,30 +3583,33 @@ static void trove_thread_mgr_callback(vo
*
* no return value
*/
-static void bmi_thread_mgr_callback(void* data,
+static void bmi_thread_mgr_callback(
+ void* data,
PVFS_size actual_size,
PVFS_error error_code)
{
- struct job_desc* tmp_desc = (struct job_desc*)data;
+ struct job_desc* tmp_desc = (struct job_desc*)data;
+ assert(tmp_desc);
- /* set job descriptor fields and put into completion queue */
- tmp_desc->u.bmi.error_code = error_code;
- tmp_desc->u.bmi.actual_size = actual_size;
gen_mutex_lock(&completion_mutex);
- job_desc_q_add(completion_queue_array[tmp_desc->context_id],
- tmp_desc);
- /* set completed flag while holding queue lock */
- tmp_desc->completed_flag = 1;
- gen_mutex_unlock(&completion_mutex);
+ if (tmp_desc->completed_flag == 0)
+ {
+ /* set job descriptor fields and put into completion queue */
+ tmp_desc->u.bmi.error_code = error_code;
+ tmp_desc->u.bmi.actual_size = actual_size;
+ job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+ tmp_desc);
+ /* set completed flag while holding queue lock */
+ tmp_desc->completed_flag = 1;
- bmi_pending_count--;
+ bmi_pending_count--;
#ifdef __PVFS2_JOB_THREADED__
- /* wake up anyone waiting for completion */
- pthread_cond_signal(&completion_cond);
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
#endif
-
- return;
+ }
+ gen_mutex_unlock(&completion_mutex);
}
/* bmi_thread_mgr_unexp_handler()
@@ -3549,35 +3619,41 @@ static void bmi_thread_mgr_callback(void
*
* no return value
*/
-static void bmi_thread_mgr_unexp_handler(struct BMI_unexpected_info* unexp)
+static void bmi_thread_mgr_unexp_handler(
+ struct BMI_unexpected_info* unexp)
{
struct job_desc* tmp_desc = NULL;
- /* remove the operation from the pending bmi_unexp queue */
gen_mutex_lock(&bmi_unexp_mutex);
+ /* remove the operation from the pending bmi_unexp queue */
tmp_desc = job_desc_q_shownext(bmi_unexp_queue);
assert(tmp_desc != NULL); /* TODO: fix this */
- job_desc_q_remove(tmp_desc);
- bmi_unexp_pending_count--;
- gen_mutex_unlock(&bmi_unexp_mutex);
- /* set appropriate fields and store in completed queue */
- *(tmp_desc->u.bmi_unexp.info) = *unexp;
- gen_mutex_lock(&completion_mutex);
- /* set completed flag while holding queue lock */
- tmp_desc->completed_flag = 1;
- if (completion_queue_array[tmp_desc->context_id] != 0)
+ if (tmp_desc->completed_flag == 0)
{
- job_desc_q_add(completion_queue_array[tmp_desc->context_id],
- tmp_desc);
- }
- gen_mutex_unlock(&completion_mutex);
+ job_desc_q_remove(tmp_desc);
+ bmi_unexp_pending_count--;
+ gen_mutex_unlock(&bmi_unexp_mutex);
+ /* set appropriate fields and store in completed queue */
+ *(tmp_desc->u.bmi_unexp.info) = *unexp;
+ gen_mutex_lock(&completion_mutex);
+ /* set completed flag while holding queue lock */
+ tmp_desc->completed_flag = 1;
+ if (completion_queue_array[tmp_desc->context_id])
+ {
+ job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+ tmp_desc);
+ }
+ gen_mutex_unlock(&completion_mutex);
#ifdef __PVFS2_JOB_THREADED__
- /* wake up anyone waiting for completion */
- pthread_cond_signal(&completion_cond);
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
#endif
-
- return;
+ }
+ else
+ {
+ gen_mutex_unlock(&bmi_unexp_mutex);
+ }
}
/* dev_thread_mgr_unexp_handler()
@@ -3591,31 +3667,36 @@ static void dev_thread_mgr_unexp_handler
{
struct job_desc* tmp_desc = NULL;
- /* remove the operation from the pending dev_unexp queue */
gen_mutex_lock(&dev_unexp_mutex);
+ /* remove the operation from the pending dev_unexp queue */
tmp_desc = job_desc_q_shownext(dev_unexp_queue);
assert(tmp_desc != NULL); /* TODO: fix this */
- job_desc_q_remove(tmp_desc);
- dev_unexp_pending_count--;
- gen_mutex_unlock(&dev_unexp_mutex);
- /* set appropriate fields and store in completed queue */
- *(tmp_desc->u.dev_unexp.info) = *unexp;
- gen_mutex_lock(&completion_mutex);
- /* set completed flag while holding queue lock */
- tmp_desc->completed_flag = 1;
- if (completion_queue_array[tmp_desc->context_id] != 0)
+ if (tmp_desc->completed_flag == 0)
{
- job_desc_q_add(completion_queue_array[tmp_desc->context_id],
- tmp_desc);
- }
- gen_mutex_unlock(&completion_mutex);
+ job_desc_q_remove(tmp_desc);
+ dev_unexp_pending_count--;
+ gen_mutex_unlock(&dev_unexp_mutex);
+ /* set appropriate fields and store in completed queue */
+ *(tmp_desc->u.dev_unexp.info) = *unexp;
+ gen_mutex_lock(&completion_mutex);
+ /* set completed flag while holding queue lock */
+ tmp_desc->completed_flag = 1;
+ if (completion_queue_array[tmp_desc->context_id])
+ {
+ job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+ tmp_desc);
+ }
+ gen_mutex_unlock(&completion_mutex);
#ifdef __PVFS2_JOB_THREADED__
- /* wake up anyone waiting for completion */
- pthread_cond_signal(&completion_cond);
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
#endif
-
- return;
+ }
+ else
+ {
+ gen_mutex_unlock(&dev_unexp_mutex);
+ }
}
/* fill_status()
@@ -3628,6 +3709,9 @@ static void fill_status(struct job_desc
void **returned_user_ptr_p,
job_status_s * status)
{
+ assert(jd);
+ assert(status);
+
status->status_user_tag = jd->status_user_tag;
if (returned_user_ptr_p)
@@ -3750,7 +3834,7 @@ static int completion_query_some(job_id_
for(i=0; i<incount; i++)
{
- tmp_desc = id_gen_fast_lookup(id_array[i]);
+ tmp_desc = id_gen_safe_lookup(id_array[i]);
if(tmp_desc && tmp_desc->completed_flag)
{
if(returned_user_ptr_array)
@@ -3776,6 +3860,7 @@ static int completion_query_some(job_id_
else
{
dealloc_job_desc(tmp_desc);
+ tmp_desc = NULL;
}
out_index_array[*inout_count_p] = i;
(*inout_count_p)++;
@@ -3807,6 +3892,8 @@ static int completion_query_context(job_
job_desc_q_shownext(
completion_queue_array[context_id])))
{
+ assert(query);
+
if (returned_user_ptr_array)
{
fill_status(query, &(returned_user_ptr_array[*inout_count_p]),
@@ -3830,6 +3917,7 @@ static int completion_query_context(job_
else
{
dealloc_job_desc(query);
+ query = NULL;
}
}
gen_mutex_unlock(&completion_mutex);
@@ -3849,10 +3937,15 @@ static void do_one_work_cycle_all(int id
int total_pending_count = bmi_pending_count + bmi_unexp_pending_count
+ flow_pending_count + dev_unexp_pending_count + trove_pending_count;
- if(bmi_pending_count || bmi_unexp_pending_count || flow_pending_count)
+ if (bmi_pending_count || bmi_unexp_pending_count || flow_pending_count)
+ {
PINT_thread_mgr_bmi_push(idle_time_ms);
- if(dev_unexp_pending_count)
+ idle_time_ms = 0;
+ }
+ if (dev_unexp_pending_count)
+ {
PINT_thread_mgr_dev_push(idle_time_ms);
+ }
#ifdef __PVFS2_TROVE_SUPPORT__
if(trove_pending_count || flow_pending_count)
PINT_thread_mgr_trove_push(idle_time_ms);
Index: job.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.h,v
diff -p -u -r1.43 -r1.44
--- job.h 27 Apr 2004 17:46:17 -0000 1.43
+++ job.h 8 Jul 2004 16:17:11 -0000 1.44
@@ -140,6 +140,7 @@ int job_dev_unexp(struct PINT_dev_unexp_
job_aint status_user_tag,
job_status_s * out_status_p,
job_id_t* id,
+ enum job_flags flags,
job_context_id context_id);
/* device write */
Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.20 -r1.21
--- thread-mgr.c 18 May 2004 15:12:24 -0000 1.20
+++ thread-mgr.c 8 Jul 2004 16:17:11 -0000 1.21
@@ -6,6 +6,7 @@
#include <pthread.h>
#include <stdlib.h>
+#include <string.h>
#include <assert.h>
#include "pvfs2-types.h"
@@ -111,24 +112,33 @@ static void *trove_thread_function(void
#endif
gen_mutex_unlock(&trove_test_mutex);
- if(ret < 0)
+ if (ret < 0)
{
- /* critical error */
- /* TODO: how to handle this */
- assert(0);
- gossip_lerr("Error: critical Trove failure.\n");
+ PVFS_perror_gossip("critical Trove failure.\n", ret);
+#ifdef __PVFS2_JOB_THREADED__
+ gossip_err("trove_thread_function thread terminating\n");
+ break;
+#endif
+ return NULL;
}
for(i=0; i<trove_test_count; i++)
{
- /* execute a callback function for each completed BMI operation */
- tmp_callback =
- (struct PINT_thread_mgr_trove_callback*)stat_trove_user_ptr_array[i];
- /* sanity check */
- assert(tmp_callback != NULL);
- assert(tmp_callback->fn != NULL);
-
- tmp_callback->fn(tmp_callback->data, stat_trove_error_code_array[i]);
+ /* execute a callback for each completed BMI operation */
+ tmp_callback = (struct PINT_thread_mgr_trove_callback*)
+ stat_trove_user_ptr_array[i];
+
+ if (!tmp_callback || !tmp_callback->fn)
+ {
+ gossip_err("critical Trove failure (null callback)\n");
+#ifdef __PVFS2_JOB_THREADED__
+ gossip_err("trove_thread_function thread terminating\n");
+ break;
+#endif
+ return NULL;
+ }
+ tmp_callback->fn(tmp_callback->data,
+ stat_trove_error_code_array[i]);
}
}
return (NULL);
@@ -160,7 +170,8 @@ static void *bmi_thread_function(void *p
incount = THREAD_MGR_TEST_COUNT;
gen_mutex_unlock(&bmi_mutex);
- ret = BMI_testunexpected(incount, &outcount, stat_bmi_unexp_array, 0);
+ ret = BMI_testunexpected(
+ incount, &outcount, stat_bmi_unexp_array, 0);
if(ret < 0)
{
/* critical failure */
@@ -169,7 +180,7 @@ static void *bmi_thread_function(void *p
gossip_lerr("Error: critical BMI failure.\n");
}
- /* execute callback function for each completed unexpected message */
+ /* execute callback for each completed unexpected message */
gen_mutex_lock(&bmi_mutex);
for(i=0; i<outcount; i++)
{
@@ -209,6 +220,9 @@ static void *bmi_thread_function(void *p
incount = THREAD_MGR_TEST_COUNT;
bmi_test_count = 0;
+ memset(stat_bmi_user_ptr_array, 0,
+ (THREAD_MGR_TEST_COUNT * sizeof(void *)));
+
ret = BMI_testcontext(incount, stat_bmi_id_array, &bmi_test_count,
stat_bmi_error_code_array, stat_bmi_actual_size_array,
stat_bmi_user_ptr_array, test_timeout, global_bmi_context);
@@ -222,23 +236,33 @@ static void *bmi_thread_function(void *p
if(ret < 0)
{
- /* critical error */
- /* TODO: how to handle this */
- assert(0);
- gossip_lerr("Error: critical BMI failure.\n");
+ PVFS_perror_gossip("critical BMI failure.\n", ret);
+#ifdef __PVFS2_JOB_THREADED__
+ gossip_err("bmi_thread_function thread terminating\n");
+ break;
+#endif
+ return NULL;
}
for(i=0; i<bmi_test_count; i++)
{
- /* execute a callback function for each completed BMI operation */
- tmp_callback =
- (struct PINT_thread_mgr_bmi_callback*)stat_bmi_user_ptr_array[i];
- /* sanity check */
- assert(tmp_callback != NULL);
- assert(tmp_callback->fn != NULL);
-
- tmp_callback->fn(tmp_callback->data, stat_bmi_actual_size_array[i],
- stat_bmi_error_code_array[i]);
+ /* execute a callback for each completed BMI operation */
+ tmp_callback = (struct PINT_thread_mgr_bmi_callback*)
+ stat_bmi_user_ptr_array[i];
+
+ if (!tmp_callback || !tmp_callback->fn)
+ {
+ gossip_err("critical BMI failure (null callback)\n");
+#ifdef __PVFS2_JOB_THREADED__
+ gossip_err("bmi_thread_function thread terminating\n");
+ break;
+#endif
+ return NULL;
+ }
+
+ tmp_callback->fn(tmp_callback->data,
+ stat_bmi_actual_size_array[i],
+ stat_bmi_error_code_array[i]);
}
}
@@ -266,17 +290,20 @@ static void *dev_thread_function(void *p
incount = THREAD_MGR_TEST_COUNT;
gen_mutex_unlock(&dev_mutex);
- ret = PINT_dev_test_unexpected(incount, &outcount,
- stat_dev_unexp_array, timeout);
- if(ret < 0)
+ ret = PINT_dev_test_unexpected(
+ incount, &outcount, stat_dev_unexp_array, timeout);
+
+ if (ret < 0)
{
- /* critical failure */
- /* TODO: how to handle this? */
- gossip_lerr("Error: critical device failure.\n");
- assert(0);
+ PVFS_perror_gossip("critical device failure", ret);
+#ifdef __PVFS2_JOB_THREADED__
+ gossip_err("dev_thread_function thread terminating\n");
+ break;
+#endif
+ return NULL;
}
- /* execute callback function for each completed unexpected message */
+ /* execute callback for each completed unexpected message */
gen_mutex_lock(&dev_mutex);
for(i=0; i<outcount; i++)
{
More information about the PVFS2-CVS
mailing list