[PVFS2-CVS] commit by neill in pvfs2/src/io/job: job-time-mgr.c job.c thread-mgr.c

CVS commit program cvs at parl.clemson.edu
Mon Jul 12 18:12:42 EDT 2004


Update of /projects/cvsroot/pvfs2/src/io/job
In directory parlweb:/tmp/cvs-serv19097/src/io/job

Modified Files:
	job-time-mgr.c job.c thread-mgr.c 
Log Message:
- add some error handling; null out ptrs that are no longer in use
  (fixes several crash bugs found in the job expiration case); add
  some sanity checks, etc
- make server continue processing, rather than abort on
  BMI_testcontext errors (these are recoverable)


Index: job-time-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job-time-mgr.c,v
diff -p -u -r1.7 -r1.8
--- job-time-mgr.c	12 Jul 2004 17:04:40 -0000	1.7
+++ job-time-mgr.c	12 Jul 2004 21:12:41 -0000	1.8
@@ -15,7 +15,7 @@
 #include "gen-locks.h"
 #include "gossip.h"
 
-QLIST_HEAD(bucket_queue);
+static QLIST_HEAD(bucket_queue);
 static gen_mutex_t bucket_mutex = GEN_MUTEX_INITIALIZER;
 
 struct time_bucket
@@ -33,6 +33,7 @@ struct time_bucket
  */
 int job_time_mgr_init(void)
 {
+    INIT_QLIST_HEAD(&bucket_queue);
     return(0);
 }
 
@@ -56,6 +57,7 @@ int job_time_mgr_finalize(void)
     qlist_for_each_safe(iterator, scratch, &bucket_queue)
     {
 	tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
+        assert(tmp_bucket);
 
 	qlist_del(&tmp_bucket->bucket_link);
 	qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
@@ -64,9 +66,10 @@ int job_time_mgr_finalize(void)
 	    qlist_del(&jd->job_time_link);
 	    jd->time_bucket = NULL;
 	}
-
+	INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
 	free(tmp_bucket);
     }
+    INIT_QLIST_HEAD(&bucket_queue);
 
     gen_mutex_unlock(&bucket_mutex);
 
@@ -104,7 +107,9 @@ static int __job_time_mgr_add(struct job
     /* look for a bucket matching the desired seconds value */
     qlist_for_each(tmp_link, &bucket_queue)
     {
-	tmp_bucket = qlist_entry(tmp_link, struct time_bucket, bucket_link);
+	tmp_bucket = qlist_entry(
+            tmp_link, struct time_bucket, bucket_link);
+        assert(tmp_bucket);
 
 	if(tmp_bucket->expire_time_sec >= expire_time_sec)
 	{
@@ -118,8 +123,8 @@ static int __job_time_mgr_add(struct job
     if(!tmp_bucket || tmp_bucket->expire_time_sec != expire_time_sec)
     {
 	/* make a new bucket, we didn't find an exact match */
-	new_bucket = (struct time_bucket*)malloc(sizeof(struct
-	    time_bucket));
+	new_bucket = (struct time_bucket*)
+            malloc(sizeof(struct time_bucket));
 	assert(new_bucket);
 	memset(new_bucket, 0, sizeof(struct time_bucket));
 	new_bucket->expire_time_sec = expire_time_sec;
@@ -179,14 +184,15 @@ int job_time_mgr_rem(struct job_desc* jd
 {
     struct time_bucket* tmp_bucket = NULL;
 
+    gen_mutex_lock(&bucket_mutex);
+
     if(jd->time_bucket == NULL)
     {
 	/* nothing to do, it is already removed */
+        gen_mutex_unlock(&bucket_mutex);
 	return(0);
     }
 
-    gen_mutex_lock(&bucket_mutex);
-
     qlist_del(&jd->job_time_link);
     tmp_bucket = (struct time_bucket*)jd->time_bucket;
 
@@ -194,9 +200,9 @@ int job_time_mgr_rem(struct job_desc* jd
     {
 	/* no need for this bucket any longer; it is empty */
 	qlist_del(&tmp_bucket->bucket_link);
+	INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
 	free(tmp_bucket);
     }
-
     jd->time_bucket = NULL;
 
     gen_mutex_unlock(&bucket_mutex);
@@ -229,14 +235,15 @@ int job_time_mgr_expire(void)
     qlist_for_each_safe(iterator, scratch, &bucket_queue)
     {
 	tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
+        assert(tmp_bucket);
+
 	/* stop when we see the first bucket that has not expired */
 	if(tmp_bucket->expire_time_sec > tv.tv_sec)
 	{
 	    break;
 	}
 
-	/* remove the bucket and cancel the associated jobs */
-	qlist_del(&tmp_bucket->bucket_link);
+	/* cancel the associated jobs and remove the bucket */
 	qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
 	{
 	    jd = qlist_entry(iterator2, struct job_desc, job_time_link);
@@ -253,7 +260,7 @@ int job_time_mgr_expire(void)
 	    case JOB_FLOW:
 		/* have we made any progress since last time we checked? */
 		PINT_flow_getinfo(jd->u.flow.flow_d,
-		    FLOW_AMT_COMPLETE_QUERY, &tmp_size);
+                                  FLOW_AMT_COMPLETE_QUERY, &tmp_size);
 		if(tmp_size > jd->u.flow.last_amt_complete)
 		{
 		    /* if so, then update progress and reset timer */
@@ -264,25 +271,28 @@ int job_time_mgr_expire(void)
 		else
 		{
 		    /* otherwise kill the flow */
-		    gossip_debug(GOSSIP_JOB_DEBUG, "Job timer: cancelling flow.\n");
+		    gossip_debug(GOSSIP_JOB_DEBUG,
+                                 "Job timer: cancelling flow.\n");
 		    ret = job_flow_cancel(jd->job_id, jd->context_id);
 		}
 		break;
 	    case JOB_TROVE:
-		gossip_debug(GOSSIP_JOB_DEBUG, "Job timer: cancelling trove.\n");
-		ret = job_trove_dspace_cancel(jd->u.trove.fsid, jd->job_id,
-		    jd->context_id);
+		gossip_debug(GOSSIP_JOB_DEBUG,
+                             "Job timer: cancelling trove.\n");
+		ret = job_trove_dspace_cancel(
+                    jd->u.trove.fsid, jd->job_id, jd->context_id);
 	    default:
 		ret = 0;
 		break;
 	    }
 
-	    /* TODO: error handling */
+	    /* FIXME: error handling */
 	    assert(ret == 0);
 
 	    jd->time_bucket = NULL;
 	}
-
+	qlist_del(&tmp_bucket->bucket_link);
+        INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
 	free(tmp_bucket);
     }
 

Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/job.c,v
diff -p -u -r1.130 -r1.131
--- job.c	8 Jul 2004 16:17:11 -0000	1.130
+++ job.c	12 Jul 2004 21:12:41 -0000	1.131
@@ -3032,7 +3032,7 @@ int job_test(job_id_t id,
 	return(completion_error);
     }
     query = id_gen_safe_lookup(id);
-    if(query->completed_flag)
+    if(query && query->completed_flag)
     {
 	job_desc_q_remove(query);
 	gen_mutex_unlock(&completion_mutex);
@@ -3098,7 +3098,7 @@ int job_test(job_id_t id,
 		return(completion_error);
 	    }
 	    query = id_gen_safe_lookup(id);
-	    if(query->completed_flag)
+	    if(query && query->completed_flag)
 	    {
 		job_desc_q_remove(query);
 		gen_mutex_unlock(&completion_mutex);

Index: thread-mgr.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/job/thread-mgr.c,v
diff -p -u -r1.21 -r1.22
--- thread-mgr.c	8 Jul 2004 16:17:11 -0000	1.21
+++ thread-mgr.c	12 Jul 2004 21:12:41 -0000	1.22
@@ -172,12 +172,13 @@ static void *bmi_thread_function(void *p
 
 	    ret = BMI_testunexpected(
                 incount, &outcount, stat_bmi_unexp_array, 0);
-	    if(ret < 0)
+	    if (ret < 0)
 	    {
-		/* critical failure */
-		/* TODO: how to handle this? */
-		assert(0);
-		gossip_lerr("Error: critical BMI failure.\n");
+                PVFS_perror_gossip("critical BMI failure", ret);
+#ifdef __PVFS2_JOB_THREADED__
+                continue;
+#endif
+                return NULL;
 	    }
 
 	    /* execute callback for each completed unexpected message */



More information about the PVFS2-CVS mailing list