[PVFS2-CVS]
commit by neill in pvfs2/src/io/job: job-time-mgr.c job.c
thread-mgr.c
CVS commit program
cvs at parl.clemson.edu
Mon Jul 12 18:12:42 EDT 2004
Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv19097/src/io/job
Modified Files:
job-time-mgr.c job.c thread-mgr.c
Log Message:
- add some error handling; null out ptrs that are no longer in use
(fixes several crash bugs found in the job expiration case); add
some sanity checks, etc
- make server continue processing, rather than abort on
BMI_testcontext errors (these are recoverable)
Index: job-time-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job-time-mgr.c,v
diff -p -u -r1.7 -r1.8
--- job-time-mgr.c 12 Jul 2004 17:04:40 -0000 1.7
+++ job-time-mgr.c 12 Jul 2004 21:12:41 -0000 1.8
@@ -15,7 +15,7 @@
#include "gen-locks.h"
#include "gossip.h"
-QLIST_HEAD(bucket_queue);
+static QLIST_HEAD(bucket_queue);
static gen_mutex_t bucket_mutex = GEN_MUTEX_INITIALIZER;
struct time_bucket
@@ -33,6 +33,7 @@ struct time_bucket
*/
int job_time_mgr_init(void)
{
+ INIT_QLIST_HEAD(&bucket_queue);
return(0);
}
@@ -56,6 +57,7 @@ int job_time_mgr_finalize(void)
qlist_for_each_safe(iterator, scratch, &bucket_queue)
{
tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
+ assert(tmp_bucket);
qlist_del(&tmp_bucket->bucket_link);
qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
@@ -64,9 +66,10 @@ int job_time_mgr_finalize(void)
qlist_del(&jd->job_time_link);
jd->time_bucket = NULL;
}
-
+ INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
free(tmp_bucket);
}
+ INIT_QLIST_HEAD(&bucket_queue);
gen_mutex_unlock(&bucket_mutex);
@@ -104,7 +107,9 @@ static int __job_time_mgr_add(struct job
/* look for a bucket matching the desired seconds value */
qlist_for_each(tmp_link, &bucket_queue)
{
- tmp_bucket = qlist_entry(tmp_link, struct time_bucket, bucket_link);
+ tmp_bucket = qlist_entry(
+ tmp_link, struct time_bucket, bucket_link);
+ assert(tmp_bucket);
if(tmp_bucket->expire_time_sec >= expire_time_sec)
{
@@ -118,8 +123,8 @@ static int __job_time_mgr_add(struct job
if(!tmp_bucket || tmp_bucket->expire_time_sec != expire_time_sec)
{
/* make a new bucket, we didn't find an exact match */
- new_bucket = (struct time_bucket*)malloc(sizeof(struct
- time_bucket));
+ new_bucket = (struct time_bucket*)
+ malloc(sizeof(struct time_bucket));
assert(new_bucket);
memset(new_bucket, 0, sizeof(struct time_bucket));
new_bucket->expire_time_sec = expire_time_sec;
@@ -179,14 +184,15 @@ int job_time_mgr_rem(struct job_desc* jd
{
struct time_bucket* tmp_bucket = NULL;
+ gen_mutex_lock(&bucket_mutex);
+
if(jd->time_bucket == NULL)
{
/* nothing to do, it is already removed */
+ gen_mutex_unlock(&bucket_mutex);
return(0);
}
- gen_mutex_lock(&bucket_mutex);
-
qlist_del(&jd->job_time_link);
tmp_bucket = (struct time_bucket*)jd->time_bucket;
@@ -194,9 +200,9 @@ int job_time_mgr_rem(struct job_desc* jd
{
/* no need for this bucket any longer; it is empty */
qlist_del(&tmp_bucket->bucket_link);
+ INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
free(tmp_bucket);
}
-
jd->time_bucket = NULL;
gen_mutex_unlock(&bucket_mutex);
@@ -229,14 +235,15 @@ int job_time_mgr_expire(void)
qlist_for_each_safe(iterator, scratch, &bucket_queue)
{
tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
+ assert(tmp_bucket);
+
/* stop when we see the first bucket that has not expired */
if(tmp_bucket->expire_time_sec > tv.tv_sec)
{
break;
}
- /* remove the bucket and cancel the associated jobs */
- qlist_del(&tmp_bucket->bucket_link);
+ /* cancel the associated jobs and remove the bucket */
qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
{
jd = qlist_entry(iterator2, struct job_desc, job_time_link);
@@ -253,7 +260,7 @@ int job_time_mgr_expire(void)
case JOB_FLOW:
/* have we made any progress since last time we checked? */
PINT_flow_getinfo(jd->u.flow.flow_d,
- FLOW_AMT_COMPLETE_QUERY, &tmp_size);
+ FLOW_AMT_COMPLETE_QUERY, &tmp_size);
if(tmp_size > jd->u.flow.last_amt_complete)
{
/* if so, then update progress and reset timer */
@@ -264,25 +271,28 @@ int job_time_mgr_expire(void)
else
{
/* otherwise kill the flow */
- gossip_debug(GOSSIP_JOB_DEBUG, "Job timer: cancelling flow.\n");
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Job timer: cancelling flow.\n");
ret = job_flow_cancel(jd->job_id, jd->context_id);
}
break;
case JOB_TROVE:
- gossip_debug(GOSSIP_JOB_DEBUG, "Job timer: cancelling trove.\n");
- ret = job_trove_dspace_cancel(jd->u.trove.fsid, jd->job_id,
- jd->context_id);
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Job timer: cancelling trove.\n");
+ ret = job_trove_dspace_cancel(
+ jd->u.trove.fsid, jd->job_id, jd->context_id);
default:
ret = 0;
break;
}
- /* TODO: error handling */
+ /* FIXME: error handling */
assert(ret == 0);
jd->time_bucket = NULL;
}
-
+ qlist_del(&tmp_bucket->bucket_link);
+ INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
free(tmp_bucket);
}
Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.c,v
diff -p -u -r1.130 -r1.131
--- job.c 8 Jul 2004 16:17:11 -0000 1.130
+++ job.c 12 Jul 2004 21:12:41 -0000 1.131
@@ -3032,7 +3032,7 @@ int job_test(job_id_t id,
return(completion_error);
}
query = id_gen_safe_lookup(id);
- if(query->completed_flag)
+ if(query && query->completed_flag)
{
job_desc_q_remove(query);
gen_mutex_unlock(&completion_mutex);
@@ -3098,7 +3098,7 @@ int job_test(job_id_t id,
return(completion_error);
}
query = id_gen_safe_lookup(id);
- if(query->completed_flag)
+ if(query && query->completed_flag)
{
job_desc_q_remove(query);
gen_mutex_unlock(&completion_mutex);
Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.21 -r1.22
--- thread-mgr.c 8 Jul 2004 16:17:11 -0000 1.21
+++ thread-mgr.c 12 Jul 2004 21:12:41 -0000 1.22
@@ -172,12 +172,13 @@ static void *bmi_thread_function(void *p
ret = BMI_testunexpected(
incount, &outcount, stat_bmi_unexp_array, 0);
- if(ret < 0)
+ if (ret < 0)
{
- /* critical failure */
- /* TODO: how to handle this? */
- assert(0);
- gossip_lerr("Error: critical BMI failure.\n");
+ PVFS_perror_gossip("critical BMI failure", ret);
+#ifdef __PVFS2_JOB_THREADED__
+ continue;
+#endif
+ return NULL;
}
/* execute callback for each completed unexpected message */
More information about the PVFS2-CVS
mailing list