[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/job: job-desc-queue.h job.c

CVS commit program cvs at parl.clemson.edu
Thu Feb 7 11:56:17 EST 2008


Update of /projects/cvsroot/pvfs2-1/src/io/job
In directory parlweb1:/tmp/cvs-serv32105/src/io/job

Modified Files:
      Tag: small-file-branch
	job-desc-queue.h job.c 
Log Message:
preliminary ability to hand out precreated handles.  Some functionality
still missing:
- no signaling in place yet to wake up refiller
- no ability to place handles yet by server id
- some servers are chewing a lot of CPU on first launch (if more than 2
  servers)


Index: job-desc-queue.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.h,v
diff -p -u -r1.26.66.2 -r1.26.66.3
--- job-desc-queue.h	5 Feb 2008 21:18:27 -0000	1.26.66.2
+++ job-desc-queue.h	7 Feb 2008 16:56:17 -0000	1.26.66.3
@@ -56,9 +56,12 @@ struct precreate_pool_desc
     int precreate_handle_count;
     int precreate_handle_index;
     int posted_count;
+    const char** servers;
+    struct qlist_head* current_pool;
+    int trove_pending;
 
+    /* TODO: does this make the job descriptor too big? */
     TROVE_keyval_s key_array[PRECREATE_POOL_MAX_KEYS];
-    TROVE_keyval_s val_array[PRECREATE_POOL_MAX_KEYS];
 
     TROVE_op_id id;
     

Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.c,v
diff -p -u -r1.177.2.6 -r1.177.2.7
--- job.c	6 Feb 2008 17:16:39 -0000	1.177.2.6
+++ job.c	7 Feb 2008 16:56:17 -0000	1.177.2.7
@@ -25,6 +25,7 @@
 #include "id-generator.h"
 #include "pint-event.h"
 #include "job-time-mgr.h"
+#include "pvfs2-internal.h"
 
 #define JOB_EVENT_START(__op, __id) \
  PINT_event_timestamp(PVFS_EVENT_API_JOB, __op, 0, __id, \
@@ -80,6 +81,15 @@ struct precreate_pool
     PVFS_handle pool_handle;
     uint32_t pool_count; 
 };
+struct precreate_pool_get_trove
+{
+    struct job_desc* jd; /* parent job descriptor */
+    /* variables needed per keyval_iterate_keys() call */
+    PVFS_ds_position pos;
+    PVFS_ds_keyval key;
+    int count;
+    struct PINT_thread_mgr_trove_callback trove_callback;
+};
 #endif /* __PVFS2_TROVE_SUPPORT__ */
 
 /********************************************************
@@ -115,6 +125,15 @@ static void flow_callback(flow_descripto
 static gen_mutex_t work_cycle_mutex = GEN_MUTEX_INITIALIZER;
 static void do_one_work_cycle_all(int idle_time_ms);
 #endif
+#ifdef __PVFS2_TROVE_SUPPORT__
+static void precreate_pool_get_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_fill_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_get_post_next(struct job_desc* jd);
+#endif
 
 /********************************************************
  * public interface 
@@ -4291,14 +4310,72 @@ static void teardown_queues(void)
 
 #ifdef __PVFS2_TROVE_SUPPORT__
 
-/* precreate_pool_thread_mgr_callback()
+/* precreate_pool_get_thread_mgr_callback()
  *
- * callback function executed by the thread manager for precreate pool 
+ * callback function executed by the thread manager for precreate pool get
  * when a trove operation completes
  *
  * no return value
  */
-static void precreate_pool_thread_mgr_callback(
+static void precreate_pool_get_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code)
+{
+    struct precreate_pool_get_trove* tmp_trove = data;
+    
+    gen_mutex_lock(&initialized_mutex);
+    if(initialized == 0)
+    {
+        /* The job interface has been shutdown.  Silently ignore callback. */
+        gen_mutex_unlock(&initialized_mutex);
+        return;
+    }
+    gen_mutex_unlock(&initialized_mutex);
+
+    /* TODO: handle this */
+    assert(error_code == 0);
+
+    trove_pending_count--;
+
+    gossip_debug(GOSSIP_JOB_DEBUG,
+        "Got precreated handle: %llu\n",
+        llu(*((PVFS_handle*)tmp_trove->key.buffer)));
+
+    /* TODO: lock this trove_pending variable? */
+    tmp_trove->jd->u.precreate_pool.trove_pending--;
+    if(tmp_trove->jd->u.precreate_pool.trove_pending == 0)
+    {
+        /* get_handles is complete */
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        tmp_trove->jd->u.precreate_pool.error_code = 0;
+        job_desc_q_add(completion_queue_array[tmp_trove->jd->context_id], 
+                       tmp_trove->jd);
+        /* set completed flag while holding queue lock */
+        tmp_trove->jd->completed_flag = 1;
+
+        free(tmp_trove);
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);
+        return;
+    }
+
+    free(tmp_trove);
+    return;
+}
+
+/* precreate_pool_fill_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool fill
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_fill_thread_mgr_callback(
     void* data, 
     PVFS_error error_code)
 {
@@ -4334,6 +4411,9 @@ static void precreate_pool_thread_mgr_ca
         trove_pending_count--;
 
         /* increment in-memory count for this pool */
+        /* TODO: could track this now in current_pool variable to avoid
+         * searching again
+         */
         gen_mutex_lock(&precreate_pool_mutex);
         qlist_for_each(iterator, &precreate_pool_list)
         {
@@ -4390,7 +4470,7 @@ static void precreate_pool_thread_mgr_ca
     ret = trove_keyval_write_list(jd->u.precreate_pool.fsid, 
                             jd->u.precreate_pool.precreate_pool,
                             jd->u.precreate_pool.key_array, 
-                            jd->u.precreate_pool.val_array, 
+                            NULL, 
                             count, 
                             TROVE_BINARY_KEY|TROVE_SYNC|
                             TROVE_NOOVERWRITE|TROVE_KEYVAL_HANDLE_COUNT,
@@ -4410,7 +4490,7 @@ static void precreate_pool_thread_mgr_ca
          * completion
          */
         jd->u.precreate_pool.id = 1;
-        precreate_pool_thread_mgr_callback(jd, 0);
+        precreate_pool_fill_thread_mgr_callback(jd, 0);
     }
 
     return;
@@ -4615,6 +4695,10 @@ static void fill_status(struct job_desc 
     assert(jd);
     assert(status);
 
+    gossip_debug(GOSSIP_JOB_DEBUG,
+        "job fill_status() for id: %llu, type: %d\n",
+        llu(jd->job_id), jd->type);
+
     status->status_user_tag = jd->status_user_tag;
 
     if (returned_user_ptr_p)
@@ -4995,7 +5079,7 @@ int job_precreate_pool_fill(
     jd->job_user_ptr = user_ptr;
     jd->context_id = context_id;
     jd->status_user_tag = status_user_tag;
-    jd->trove_callback.fn = precreate_pool_thread_mgr_callback;
+    jd->trove_callback.fn = precreate_pool_fill_thread_mgr_callback;
     jd->trove_callback.data = (void*)jd;
     jd->u.precreate_pool.precreate_pool = precreate_pool;
     jd->u.precreate_pool.precreate_handle_array = precreate_handle_array;
@@ -5005,7 +5089,7 @@ int job_precreate_pool_fill(
     jd->u.precreate_pool.fsid = fsid;
 
     /* reuse the logic for trove op completion to get this started */
-    precreate_pool_thread_mgr_callback(jd, 0);
+    precreate_pool_fill_thread_mgr_callback(jd, 0);
 
     /* for the moment, this type of job cannot immediately complete */
 
@@ -5041,7 +5125,7 @@ int job_precreate_pool_register_server(
     tmp_pool->pool_handle = pool_handle;
     tmp_pool->pool_count = count;
 
-    gossip_debug(GOSSIP_SERVER_DEBUG,
+    gossip_debug(GOSSIP_JOB_DEBUG,
         "Initial pool count for host %s, fsid %d: %d\n", host, (int)fsid,
         count);
 
@@ -5092,14 +5176,14 @@ int job_precreate_pool_check_level(
                 /* handle count is below the threshold */
                 out_status_p->error_code = 0;
                 gen_mutex_unlock(&precreate_pool_mutex);
-                gossip_debug(GOSSIP_SERVER_DEBUG, "found pool count low.\n");
+                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count low.\n");
                 return(1);
             }
             else
             {
                 /* TODO: finish this part; for now we just launch into space */
                 gen_mutex_unlock(&precreate_pool_mutex);
-                gossip_debug(GOSSIP_SERVER_DEBUG, "found pool count high.\n");
+                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count high.\n");
                 return(0);
             }
             break;
@@ -5122,14 +5206,123 @@ int job_precreate_pool_get_handles(
     job_id_t * id,
     job_context_id context_id)
 {
-    /* TODO: implement */
+    struct job_desc *jd = NULL;
 
-    /* TODO: log the handle array on the way out */
+    /* create the job desc first, even though we may not use it.  This
+     * gives us somewhere to store information
+     */
+    jd = alloc_job_desc(JOB_PRECREATE_POOL);
+    if (!jd)
+    {
+        return (-errno);
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->u.precreate_pool.precreate_handle_array = handle_array;
+    jd->u.precreate_pool.precreate_handle_count = count;
+    jd->u.precreate_pool.precreate_handle_index = 0;
+    jd->u.precreate_pool.id = PVFS_OP_NULL;
+    jd->u.precreate_pool.fsid = fsid;
+    jd->u.precreate_pool.servers = servers;
+    jd->u.precreate_pool.trove_pending = count;
+    
+    precreate_pool_get_post_next(jd);
 
-    out_status_p->error_code = -PVFS_ENOSYS;
-    return(1);
+    /* for the moment, this type of job cannot immediately complete */
+    *id = jd->job_id;
+    return(0);
 }
 
+/* TODO: comment properly */
+static void precreate_pool_get_post_next(struct job_desc* jd)
+{
+    struct precreate_pool* pool;
+    TROVE_op_id tmp_id;
+    int ret;
+    struct precreate_pool_get_trove* tmp_trove;
+
+    /* we better still have handles to retrieve */
+    assert(jd->u.precreate_pool.precreate_handle_index < 
+        jd->u.precreate_pool.precreate_handle_count);
+
+    gen_mutex_lock(&precreate_pool_mutex);
+    for(;
+        jd->u.precreate_pool.precreate_handle_index <
+        jd->u.precreate_pool.precreate_handle_count;
+        jd->u.precreate_pool.precreate_handle_index++)
+    {
+        if(jd->u.precreate_pool.servers)
+        {
+            /* caller wanted specific servers */
+            /* TODO: implement this */
+            assert(0);
+        }
+        else
+        {
+            /* caller wants whatever we hand out */
+            if(jd->u.precreate_pool.current_pool == NULL ||
+                jd->u.precreate_pool.current_pool->next == &precreate_pool_list)
+            {
+                /* either we are just starting, or we have wrapped around */
+                jd->u.precreate_pool.current_pool = precreate_pool_list.next;
+            }
+            else
+            {
+                /* normal case; cycle to next pool */
+                jd->u.precreate_pool.current_pool =
+                    jd->u.precreate_pool.current_pool->next;
+            }
+        }
+
+        pool = qlist_entry(jd->u.precreate_pool.current_pool, 
+            struct precreate_pool, list_link);
+
+        if(pool->pool_count < 1)
+        {
+            /* TODO: implement this */
+            /* need to rewind current_pool and queue to try again after
+             * refill
+             */
+            assert(0);
+        }
+        /* go ahead and decrement count to avoid races with other consumers */
+        /* TODO: remember to bump this up if trove op fails */
+        pool->pool_count--;
+
+        /* somewhere to stash trove variables per operation */
+        tmp_trove = malloc(sizeof(*tmp_trove));
+        if(!tmp_trove)
+        {
+            /* TODO: handle this */
+            assert(0);
+        }
+        tmp_trove->jd = jd;
+        tmp_trove->pos = PVFS_ITERATE_START;
+        tmp_trove->count = 1;
+        tmp_trove->key.buffer = &jd->u.precreate_pool.precreate_handle_array[jd->u.precreate_pool.precreate_handle_index];
+        tmp_trove->key.buffer_sz = sizeof(PVFS_handle);
+        tmp_trove->trove_callback.fn = precreate_pool_get_thread_mgr_callback;
+        tmp_trove->trove_callback.data = tmp_trove;
+
+        /* post trove operation to pull out a handle */
+        ret = trove_keyval_iterate_keys(
+                            pool->fsid, 
+                            pool->pool_handle,
+                            &tmp_trove->pos,
+                            &tmp_trove->key,
+                            &tmp_trove->count,
+                            TROVE_BINARY_KEY|TROVE_SYNC|
+                            TROVE_KEYVAL_HANDLE_COUNT|
+                            TROVE_KEYVAL_ITERATE_REMOVE,
+                            NULL, 
+                            &tmp_trove->trove_callback, 
+                            global_trove_context,
+                            &tmp_id);
+        trove_pending_count++;
+    }
+    gen_mutex_unlock(&precreate_pool_mutex);
+}
 
 #endif /* __PVFS2_TROVE_SUPPORT__ */
 



More information about the Pvfs2-cvs mailing list