[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/job: job-desc-queue.c job-desc-queue.h job.c job.h

CVS commit program cvs at parl.clemson.edu
Mon Sep 8 11:42:43 EDT 2008


Update of /projects/cvsroot/pvfs2-1/src/io/job
In directory parlweb1:/tmp/cvs-serv32611/src/io/job

Modified Files:
	job-desc-queue.c job-desc-queue.h job.c job.h 
Log Message:
Merging small files branch to head.  Includes server side precreation of
data files and file stuffing.


Index: job-desc-queue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.c,v
diff -p -u -r1.18 -r1.19
--- job-desc-queue.c	11 Oct 2007 23:11:33 -0000	1.18
+++ job-desc-queue.c	8 Sep 2008 15:42:43 -0000	1.19
@@ -208,6 +208,9 @@ void job_desc_q_dump(job_desc_q_p jdqp)
 	case JOB_NULL:
 	    gossip_err("    type: JOB_NULL.\n");
 	    break;
+	case JOB_PRECREATE_POOL:
+	    gossip_err("    type: JOB_PRECREATE_POOL.\n");
+	    break;
 	}
     }
 

Index: job-desc-queue.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.h,v
diff -p -u -r1.26 -r1.27
--- job-desc-queue.h	7 Dec 2004 15:09:29 -0000	1.26
+++ job-desc-queue.h	8 Sep 2008 15:42:43 -0000	1.27
@@ -42,6 +42,30 @@ struct trove_desc
     int count;
 };
 
+/* describes precreate pool operations */
+struct precreate_pool_desc
+{
+    PVFS_handle precreate_pool;
+    PVFS_fs_id fsid;
+    PVFS_handle* precreate_handle_array;
+    int precreate_handle_count;
+    int precreate_handle_index;
+    int posted_count;
+    const char** servers;
+    struct qlist_head* current_pool;
+    int trove_pending;
+    int low_threshold;
+    void* data;
+    int first_callback_flag;
+    TROVE_keyval_s* key_array;
+    PVFS_ds_flags flags;
+    PVFS_ds_position position;
+    PVFS_ds_position pool_index;
+    int count;
+    
+    PVFS_error error_code;
+};
+
 /* describes unexpected BMI operations */
 struct bmi_unexp_desc
 {
@@ -85,6 +109,7 @@ enum job_type
     JOB_REQ_SCHED,
     JOB_DEV_UNEXP,
     JOB_REQ_SCHED_TIMER,
+    JOB_PRECREATE_POOL,
     JOB_NULL
 };
 
@@ -111,6 +136,7 @@ struct job_desc
 	struct req_sched_desc req_sched;
 	struct dev_unexp_desc dev_unexp;
 	struct null_info_desc null_info;
+        struct precreate_pool_desc precreate_pool;
     }
     u;
 

Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.c,v
diff -p -u -r1.181 -r1.182
--- job.c	4 Apr 2008 17:57:37 -0000	1.181
+++ job.c	8 Sep 2008 15:42:43 -0000	1.182
@@ -25,6 +25,7 @@
 #include "id-generator.h"
 #include "pint-event.h"
 #include "job-time-mgr.h"
+#include "pvfs2-internal.h"
 
 #define JOB_EVENT_START(__op, __id) \
  PINT_event_timestamp(PVFS_EVENT_API_JOB, __op, 0, __id, \
@@ -69,6 +70,36 @@ enum
     thread_wait_timeout = 10000        /* usecs */
 };
 
+/* cap how many keys we dump into trove at once when filling precreate pools
+ * so that it doesn't clog up trove queues
+ */
+#define PRECREATE_POOL_MAX_KEYS 32
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+static gen_mutex_t precreate_pool_mutex = GEN_MUTEX_INITIALIZER;
+static QLIST_HEAD(precreate_pool_list);
+static QLIST_HEAD(precreate_pool_check_level_list);
+static QLIST_HEAD(precreate_pool_get_handles_list);
+struct precreate_pool
+{
+    struct qlist_head list_link;
+    char* host;
+    PVFS_fs_id fsid;
+    PVFS_handle pool_handle;
+    uint32_t pool_count; 
+};
+struct precreate_pool_get_trove
+{
+    struct job_desc* jd; /* parent job descriptor */
+    /* variables needed per keyval_iterate_keys() call */
+    PVFS_ds_position pos;
+    PVFS_ds_keyval key;
+    int count;
+    struct PINT_thread_mgr_trove_callback trove_callback;
+    struct precreate_pool* pool;
+};
+#endif /* __PVFS2_TROVE_SUPPORT__ */
+
 /********************************************************
  * function prototypes
  */
@@ -102,6 +133,21 @@ static void flow_callback(flow_descripto
 static gen_mutex_t work_cycle_mutex = GEN_MUTEX_INITIALIZER;
 static void do_one_work_cycle_all(int idle_time_ms);
 #endif
+#ifdef __PVFS2_TROVE_SUPPORT__
+static void precreate_pool_get_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_get_thread_mgr_callback_unlocked(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_fill_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_iterate_callback(
+    void* data, 
+    PVFS_error error_code);
+static void precreate_pool_get_handles_try_post(struct job_desc* jd);
+#endif
 
 /********************************************************
  * public interface 
@@ -2094,6 +2140,7 @@ int job_trove_keyval_write_list(PVFS_fs_
     JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, jd->job_id);
 
 #ifdef __PVFS2_TROVE_SUPPORT__
+    gossip_debug(GOSSIP_JOB_DEBUG, "job_trove_keyval_write_list() posting trove_keyval_write_list()\n");
     ret = trove_keyval_write_list(coll_id, handle,
                              key_array, val_array,
                              count, flags,
@@ -2138,6 +2185,88 @@ int job_trove_keyval_write_list(PVFS_fs_
     return (0);
 }
 
+int job_trove_keyval_remove_list(PVFS_fs_id coll_id,
+                                  PVFS_handle handle,
+                                  PVFS_ds_keyval * key_array,
+                                  PVFS_ds_keyval * val_array,
+                                  int * error_array,
+                                  int count,
+                                  PVFS_ds_flags flags,
+                                  PVFS_vtag * vtag,
+                                  void *user_ptr,
+                                  job_aint status_user_tag,
+                                  job_status_s * out_status_p,
+                                  job_id_t * id,
+                                  job_context_id context_id)
+{
+    int ret = -1;
+    struct job_desc *jd = NULL;
+    void* user_ptr_internal;
+
+    /* create the job desc first, even though we may not use it.  This
+     * gives us somewhere to store the BMI id and user ptr
+     */
+    jd = alloc_job_desc(JOB_TROVE);
+    if (!jd)
+    {
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->u.trove.vtag = vtag;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->trove_callback.fn = trove_thread_mgr_callback;
+    jd->trove_callback.data = (void*)jd;
+    user_ptr_internal = &jd->trove_callback;
+    JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_REMOVE_LIST, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+    ret = trove_keyval_remove_list(coll_id, handle,
+                             key_array, val_array, error_array,
+                             count, flags,
+                             jd->u.trove.vtag, user_ptr_internal, 
+                             global_trove_context,
+                             &(jd->u.trove.id));
+#else
+    gossip_err("Error: Trove support not enabled.\n");
+    ret = -ENOSYS;
+#endif
+
+    if (ret < 0)
+    {
+        /* error posting trove operation */
+        JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        out_status_p->error_code = ret;
+        out_status_p->status_user_tag = status_user_tag;
+        return (1);
+    }
+
+    if (ret == 1)
+    {
+        /* immediate completion */
+        out_status_p->error_code = 0;
+        out_status_p->status_user_tag = status_user_tag;
+        out_status_p->vtag = jd->u.trove.vtag;
+        JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        return (ret);
+    }
+
+    /* if we fall through to this point, the job did not
+     * immediately complete and we must queue up to test later
+     */
+    *id = jd->job_id;
+    trove_pending_count++;
+    jd->event_type = PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST;
+
+    return (0);
+}
+
+
 /* job_trove_keyval_flush()
  *
  * ask the storage layer to flush keyvals to disk
@@ -3147,6 +3276,188 @@ int job_trove_dspace_create(PVFS_fs_id c
     return (0);
 }
 
+/* job_trove_dspace_create_list()
+ *
+ * create a new data space object 
+ *
+ * returns 0 on success, 1 on immediate completion, and -errno on
+ * failure
+ */
+int job_trove_dspace_create_list(PVFS_fs_id coll_id,
+                            PVFS_handle_extent_array *handle_extent_array,
+                            PVFS_handle* out_handle_array,
+                            int count,
+                            PVFS_ds_type type,
+                            void *hint,
+                            PVFS_ds_flags flags,
+                            void *user_ptr,
+                            job_aint status_user_tag,
+                            job_status_s * out_status_p,
+                            job_id_t * id,
+                            job_context_id context_id)
+{
+    /* post a dspace create list.  If it completes (or fails) immediately, then
+     * return and fill in the status structure.  If it needs to be tested
+     * for completion later, then queue up a job_desc structure.
+     */
+    int ret = -1;
+    struct job_desc *jd = NULL;
+    void* user_ptr_internal;
+
+    /* create the job desc first, even though we may not use it.  This
+     * gives us somewhere to store the BMI id and user ptr
+     */
+    jd = alloc_job_desc(JOB_TROVE);
+    if (!jd)
+    {
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->u.trove.handle = PVFS_HANDLE_NULL;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->trove_callback.fn = trove_thread_mgr_callback;
+    jd->trove_callback.data = (void*)jd;
+    user_ptr_internal = &jd->trove_callback;
+    JOB_EVENT_START(PVFS_EVENT_TROVE_DSPACE_CREATE, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+    ret = trove_dspace_create_list(coll_id,
+                              handle_extent_array,
+                              out_handle_array,
+                              count,
+                              type,
+                              hint, flags,
+                              user_ptr_internal, 
+                              global_trove_context, &(jd->u.trove.id));
+#else
+    gossip_err("Error: Trove support not enabled.\n");
+    ret = -ENOSYS;
+#endif
+
+    if (ret < 0)
+    {
+        /* error posting trove operation */
+        JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        out_status_p->error_code = ret;
+        out_status_p->status_user_tag = status_user_tag;
+        return (1);
+    }
+
+    if (ret == 1)
+    {
+        /* immediate completion */
+        out_status_p->error_code = 0;
+        out_status_p->status_user_tag = status_user_tag;
+        JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        return (ret);
+    }
+
+    /* if we fall through to this point, the job did not
+     * immediately complete and we must queue up to test later
+     */
+    *id = jd->job_id;
+    trove_pending_count++;
+    jd->event_type = PVFS_EVENT_TROVE_DSPACE_CREATE;
+
+    return (0);
+}
+
+/* job_trove_dspace_remove_list()
+ *
+ * remove a list of data space objects (byte stream and key/value) 
+ *
+ * returns 0 on success, 1 on immediate completion, and -errno on
+ * failure
+ */
+int job_trove_dspace_remove_list(PVFS_fs_id coll_id,
+			    PVFS_handle* handle_array,
+                            PVFS_error *out_error_array,
+                            int count,
+                            PVFS_ds_flags flags,
+			    void *user_ptr,
+			    job_aint status_user_tag,
+			    job_status_s * out_status_p,
+			    job_id_t * id,
+			    job_context_id context_id)
+{
+    /* post a dspace remove_list.  If it completes (or fails) immediately, then
+     * return and fill in the status structure.  If it needs to be tested
+     * for completion later, then queue up a job_desc structure.
+     */
+    int ret = -1;
+    struct job_desc *jd = NULL;
+    void* user_ptr_internal;
+
+    /* create the job desc first, even though we may not use it.  This
+     * gives us somewhere to store the BMI id and user ptr
+     */
+    jd = alloc_job_desc(JOB_TROVE);
+    if (!jd)
+    {
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->trove_callback.fn = trove_thread_mgr_callback;
+    jd->trove_callback.data = (void*)jd;
+    user_ptr_internal = &jd->trove_callback;
+    JOB_EVENT_START(PVFS_EVENT_TROVE_DSPACE_REMOVE, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+    ret = trove_dspace_remove_list(coll_id,
+                              handle_array, 
+                              out_error_array,
+                              count,
+                              flags,
+                              user_ptr_internal, 
+                              global_trove_context, &(jd->u.trove.id));
+#else
+    gossip_err("Error: Trove support not enabled.\n");
+    ret = -ENOSYS;
+#endif
+
+    if (ret < 0)
+    {
+        /* error posting trove operation */
+        JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        out_status_p->error_code = ret;
+        out_status_p->status_user_tag = status_user_tag;
+        return (1);
+    }
+
+    if (ret == 1)
+    {
+        /* immediate completion */
+        out_status_p->error_code = 0;
+        out_status_p->status_user_tag = status_user_tag;
+        JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        return (ret);
+    }
+
+    /* if we fall through to this point, the job did not
+     * immediately complete and we must queue up to test later
+     */
+    *id = jd->job_id;
+    trove_pending_count++;
+    jd->event_type = PVFS_EVENT_TROVE_DSPACE_REMOVE;
+
+    return (0);
+}
+
+
+
 /* job_trove_dspace_remove()
  *
  * remove an entire data space object (byte stream and key/value) 
@@ -4286,20 +4597,21 @@ static void teardown_queues(void)
     return;
 }
 
-/* trove_thread_mgr_callback()
+#ifdef __PVFS2_TROVE_SUPPORT__
+
+/* precreate_pool_get_thread_mgr_callback_unlocked()
  *
- * callback function executed by the thread manager for Trove when a Trove 
- * job completes
+ * callback function executed by the thread manager for precreate pool get
+ * when a trove operation completes
  *
  * no return value
  */
-static void trove_thread_mgr_callback(
+static void precreate_pool_get_thread_mgr_callback_unlocked(
     void* data, 
     PVFS_error error_code)
 {
-    struct job_desc* tmp_desc = (struct job_desc*)data; 
-    assert(tmp_desc);
-
+    struct precreate_pool_get_trove* tmp_trove = data;
+    
     gen_mutex_lock(&initialized_mutex);
     if(initialized == 0)
     {
@@ -4309,11 +4621,354 @@ static void trove_thread_mgr_callback(
     }
     gen_mutex_unlock(&initialized_mutex);
 
-    gen_mutex_lock(&completion_mutex);
-    if (tmp_desc->completed_flag == 0)
+    if(error_code == 0)
     {
-        /* set job descriptor fields and put into completion queue */
-        tmp_desc->u.trove.state = error_code;
+        gossip_debug(GOSSIP_JOB_DEBUG,
+            "Got precreated handle: %llu\n",
+            llu(*((PVFS_handle*)tmp_trove->key.buffer)));
+    }
+
+    trove_pending_count--;
+    tmp_trove->jd->u.precreate_pool.trove_pending--;
+
+    /* don't overwrite error codes from other trove ops */
+    if(tmp_trove->jd->u.precreate_pool.error_code == 0)
+    {
+        tmp_trove->jd->u.precreate_pool.error_code = error_code;
+    }
+
+    /* is this job done? */
+    if(tmp_trove->jd->u.precreate_pool.trove_pending == 0)
+    {
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        tmp_trove->jd->u.precreate_pool.error_code = 0;
+        job_desc_q_add(completion_queue_array[tmp_trove->jd->context_id], 
+                       tmp_trove->jd);
+        /* set completed flag while holding queue lock */
+        tmp_trove->jd->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        free(tmp_trove->jd->u.precreate_pool.data);
+        gen_mutex_unlock(&completion_mutex);
+        return;
+    }
+
+    return;
+}
+
+
+/* precreate_pool_iterate_callback()
+ *
+ * callback function executed by the thread mgr when a trove iterate
+ * completes
+ *
+ * no return value
+ */
+static void precreate_pool_iterate_callback(
+    void* data, 
+    PVFS_error error_code)
+{    
+    struct job_desc* tmp_desc = (struct job_desc*)data; 
+
+    gen_mutex_lock(&initialized_mutex);
+    if(initialized == 0)
+    {
+        /* The job interface has been shutdown.  Silently ignore callback. */
+        gen_mutex_unlock(&initialized_mutex);
+        return;
+    }
+    gen_mutex_unlock(&initialized_mutex);
+
+    gen_mutex_lock(&completion_mutex);
+    if (tmp_desc->completed_flag == 0)
+    {
+        /* set job descriptor fields and put into completion queue */
+        tmp_desc->u.precreate_pool.error_code = error_code;
+        free(tmp_desc->u.precreate_pool.key_array);
+        job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
+                       tmp_desc);
+        /* set completed flag while holding queue lock */
+        tmp_desc->completed_flag = 1;
+
+        trove_pending_count--;
+
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+    }
+    gen_mutex_unlock(&completion_mutex);
+
+    return;
+}
+
+/* precreate_pool_get_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool get
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_get_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code)
+{
+    gen_mutex_lock(&precreate_pool_mutex);
+    precreate_pool_get_thread_mgr_callback_unlocked(data, error_code);
+    gen_mutex_unlock(&precreate_pool_mutex);
+}
+
+/* precreate_pool_fill_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool fill
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_fill_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code)
+{
+    struct job_desc* jd = (struct job_desc*)data; 
+    struct job_desc* jd_checker; 
+    int ret;
+    int count = 0;
+    int i;
+    struct qlist_head* iterator;
+    struct qlist_head* scratch;
+    struct precreate_pool* pool;
+    int awoken_count = 0;
+    QLIST_HEAD(tmp_list);
+    job_id_t tmp_id;
+    int extra_trove_flags = 0;
+
+    assert(jd);
+
+    gen_mutex_lock(&initialized_mutex);
+    if(initialized == 0)
+    {
+        /* The job interface has been shutdown.  Silently ignore callback. */
+        gen_mutex_unlock(&initialized_mutex);
+        return;
+    }
+    gen_mutex_unlock(&initialized_mutex);
+
+    if(error_code != 0)
+    {
+        gossip_err("Error: unable to write all precreated handles to pool.\n");
+        gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
+        free(jd->u.precreate_pool.key_array);
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        jd->u.precreate_pool.error_code = error_code;
+        job_desc_q_add(completion_queue_array[jd->context_id], jd);
+        /* set completed flag while holding queue lock */
+        jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);
+        return;
+    }
+
+    if(jd->u.precreate_pool.first_callback_flag == 1)
+    {
+        /* this is the first post */
+        gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() first post.\n");
+        jd->u.precreate_pool.first_callback_flag = 0;
+    }
+    else
+    {
+        gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() completed trove op.\n");
+        /* a trove operation completed successfully */
+        jd->u.precreate_pool.precreate_handle_index += 
+            jd->u.precreate_pool.posted_count;
+        trove_pending_count--;
+
+        /* increment in-memory count for this pool */
+        gen_mutex_lock(&precreate_pool_mutex);
+        qlist_for_each(iterator, &precreate_pool_list)
+        {
+            pool = qlist_entry(iterator, struct precreate_pool,
+                list_link);
+            if(pool->pool_handle == jd->u.precreate_pool.precreate_pool &&
+                pool->fsid == jd->u.precreate_pool.fsid)
+            {
+                pool->pool_count += jd->u.precreate_pool.posted_count;
+                gossip_debug(GOSSIP_JOB_DEBUG, 
+                    "Pool count for handle %llu incremented to %d\n", 
+                    llu(pool->pool_handle), 
+                    pool->pool_count);
+                break;
+            }
+        }
+
+        /* find out if anyone was sleeping because a pool was empty */
+        gossip_debug(GOSSIP_JOB_DEBUG, "checking for get_handles() sleepers\n");
+        qlist_for_each_safe(iterator, scratch, 
+            &precreate_pool_get_handles_list)
+        {
+            jd_checker = qlist_entry(iterator, struct job_desc,
+                job_desc_q_link);
+
+            awoken_count++;
+            /* put them on a new local queue */
+            qlist_del(&jd_checker->job_desc_q_link);
+            qlist_add(&jd_checker->job_desc_q_link, &tmp_list);
+            gossip_debug(GOSSIP_JOB_DEBUG, "Found someone waiting to get handles from precreate pool\n");
+
+            if(awoken_count == jd->u.precreate_pool.posted_count)
+            {
+                /* that's as many as we should wake up right now */
+                break;
+            }
+        }
+        gen_mutex_unlock(&precreate_pool_mutex);
+
+        /* now that we have collected the sleepers into our own private
+         * queue, we can push them without the precreate_pool_mutex held
+         */
+        gossip_debug(GOSSIP_JOB_DEBUG, "About to push on get_handles() sleepers.\n");
+        qlist_for_each_safe(iterator, scratch, &tmp_list)
+        {
+            jd_checker = qlist_entry(iterator, struct job_desc,
+                job_desc_q_link);
+            qlist_del(&jd_checker->job_desc_q_link);
+            gossip_debug(GOSSIP_JOB_DEBUG, "Pushing get_handles() sleeper for jd: %p.\n", jd_checker);
+            precreate_pool_get_handles_try_post(jd_checker);
+        }
+    }
+
+    /* are we done? */
+    if(jd->u.precreate_pool.precreate_handle_index >= 
+        jd->u.precreate_pool.precreate_handle_count)
+    {
+        free(jd->u.precreate_pool.key_array);
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        jd->u.precreate_pool.error_code = 0;
+        job_desc_q_add(completion_queue_array[jd->context_id], 
+                       jd);
+        /* set completed flag while holding queue lock */
+        jd->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);
+        return;
+    }
+
+    /* fill in information for next keyval write */
+    for(i=jd->u.precreate_pool.precreate_handle_index; 
+        (i < jd->u.precreate_pool.precreate_handle_count &&
+        (i < (jd->u.precreate_pool.precreate_handle_index
+           + PRECREATE_POOL_MAX_KEYS)));
+        i++)
+    {
+        jd->u.precreate_pool.key_array[count].buffer = 
+            &jd->u.precreate_pool.precreate_handle_array[i];
+        jd->u.precreate_pool.key_array[count].buffer_sz = sizeof(PVFS_handle);
+        count++;
+
+        /* always leave the values zeroed out */
+    }
+
+    jd->u.precreate_pool.posted_count = count;
+
+    if((jd->u.precreate_pool.posted_count 
+        + jd->u.precreate_pool.precreate_handle_index) 
+        >= jd->u.precreate_pool.precreate_handle_count)
+    {
+        /* this will be the last set written; sync db */
+        extra_trove_flags |= TROVE_SYNC;
+    }
+
+    gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() posting trove_keyval_write_list()\n");
+    ret = trove_keyval_write_list(jd->u.precreate_pool.fsid, 
+                            jd->u.precreate_pool.precreate_pool,
+                            jd->u.precreate_pool.key_array, 
+                            NULL, 
+                            count, 
+                            (TROVE_BINARY_KEY|TROVE_NOOVERWRITE|
+                            TROVE_KEYVAL_HANDLE_COUNT|extra_trove_flags),
+                            NULL, 
+                            &jd->trove_callback, 
+                            global_trove_context,
+                            &tmp_id);
+    
+    trove_pending_count++;
+
+    if(ret < 0)
+    {
+        gossip_err("Error: unable to write all precreated handles to pool.\n");
+        gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        jd->u.precreate_pool.error_code = ret;
+        job_desc_q_add(completion_queue_array[jd->context_id], jd);
+        /* set completed flag while holding queue lock */
+        jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);
+        return;
+    }
+    else if(ret == 1)
+    {
+        gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() immediate completion\n");
+        precreate_pool_fill_thread_mgr_callback(jd, 0);
+    }
+    else
+    {
+        gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() returned zero\n");
+    }
+
+    return;
+}
+#endif /* __PVFS2_TROVE_SUPPORT__ */
+
+
+/* trove_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for Trove when a Trove 
+ * job completes
+ *
+ * no return value
+ */
+static void trove_thread_mgr_callback(
+    void* data, 
+    PVFS_error error_code)
+{
+    struct job_desc* tmp_desc = (struct job_desc*)data; 
+    assert(tmp_desc);
+
+    gen_mutex_lock(&initialized_mutex);
+    if(initialized == 0)
+    {
+        /* The job interface has been shutdown.  Silently ignore callback. */
+        gen_mutex_unlock(&initialized_mutex);
+        return;
+    }
+    gen_mutex_unlock(&initialized_mutex);
+
+    gen_mutex_lock(&completion_mutex);
+    if (tmp_desc->completed_flag == 0)
+    {
+        /* set job descriptor fields and put into completion queue */
+        tmp_desc->u.trove.state = error_code;
         job_desc_q_add(completion_queue_array[tmp_desc->context_id], 
                        tmp_desc);
         /* set completed flag while holding queue lock */
@@ -4398,7 +5053,7 @@ static void bmi_thread_mgr_unexp_handler
     gen_mutex_lock(&bmi_unexp_mutex);
     /* remove the operation from the pending bmi_unexp queue */
     tmp_desc = job_desc_q_shownext(bmi_unexp_queue);
-    assert(tmp_desc != NULL);        /* TODO: fix this */
+    assert(tmp_desc != NULL);
     if (tmp_desc->completed_flag == 0)
     {
         job_desc_q_remove(tmp_desc);
@@ -4486,6 +5141,12 @@ static void fill_status(struct job_desc 
     assert(jd);
     assert(status);
 
+#if 0
+    gossip_debug(GOSSIP_JOB_DEBUG,
+        "job fill_status() for id: %llu, type: %d\n",
+        llu(jd->job_id), jd->type);
+#endif
+
     status->status_user_tag = jd->status_user_tag;
 
     if (returned_user_ptr_p)
@@ -4512,9 +5173,6 @@ static void fill_status(struct job_desc 
         status->error_code = jd->u.req_sched.error_code;
         break;
     case JOB_TROVE:
-        /* TODO: make this work out for whatever type of trove
-         * operation this is...
-         */
         status->error_code = jd->u.trove.state;
         status->actual_size = jd->u.trove.actual_size;
         status->vtag = jd->u.trove.vtag;
@@ -4534,6 +5192,12 @@ static void fill_status(struct job_desc 
     case JOB_NULL:
         status->error_code = jd->u.null_info.error_code;
         break;
+    case JOB_PRECREATE_POOL:
+        status->error_code = jd->u.precreate_pool.error_code;
+        status->count = jd->u.precreate_pool.count;
+        status->position = jd->u.precreate_pool.pool_index << 32;
+        status->position |= jd->u.precreate_pool.position;
+        break;
     }
 
     if(jd->event_type)
@@ -4567,7 +5231,7 @@ static int do_one_test_cycle_req_sched(v
     {
         /* critical failure */
         /* TODO: can I clean up anything else here? */
-        gossip_lerr("Error: critical BMI failure.\n");
+        gossip_lerr("Error: critical request scheduler failure.\n");
         return (ret);
     }
 
@@ -4685,7 +5349,12 @@ static int completion_query_some(job_id_
     return(1);
 }
 
-/* TODO: fill in comment */
+/* completion_query_context()
+ *
+ * retrieves completed jobs from specified context
+ * 
+ * returns 1 if anything completed, 0 otherwise 
+ */
 static int completion_query_context(job_id_t * out_id_array_p,
                                   int *inout_count_p,
                                   void **returned_user_ptr_array,
@@ -4829,6 +5498,721 @@ static void flow_callback(flow_descripto
 
     return;
 }
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+
+/* job_precreate_pool_fill_signal_error()
+ *
+ * used for the entity responsible for filling the pool to indicate when
+ * there are errors preventing it from making progress.  The error_code will
+ * be propigated to get_handles() callers that are sleeping if the pool is
+ * empty
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
+ * failure
+ */
+int job_precreate_pool_fill_signal_error(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    int error_code,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id)
+{
+    struct job_desc* jd_checker; 
+    struct qlist_head* iterator;
+    struct qlist_head* scratch;
+
+    gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() called.\n");
+    /* note: this function always processes immediately (returns 1) */
+
+    gen_mutex_lock(&precreate_pool_mutex);
+    /* see if anyone is waiting on pool handles */
+    qlist_for_each_safe(iterator, scratch, &precreate_pool_get_handles_list)
+    {
+        jd_checker = qlist_entry(iterator, struct job_desc,
+            job_desc_q_link);
+
+        qlist_del(&jd_checker->job_desc_q_link);
+
+        gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() waking up a get_handles() caller.\n");
+        gen_mutex_lock(&completion_mutex);
+
+        /* set job descriptor fields and put into completion queue */
+        jd_checker->u.precreate_pool.error_code = error_code;
+        job_desc_q_add(completion_queue_array[jd_checker->context_id], 
+                       jd_checker);
+        /* set completed flag while holding queue lock */
+        jd_checker->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);
+    }
+    gen_mutex_unlock(&precreate_pool_mutex);
+
+    out_status_p->error_code = 0;
+    return(1);
+}
+
+/* job_precreate_pool_fill()
+ *
+ * fills in handles for a precreate pool 
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
+ * failure
+ */
+int job_precreate_pool_fill(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    PVFS_handle* precreate_handle_array,
+    int precreate_handle_count,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id)
+{
+    struct job_desc *jd = NULL;
+
+    gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() called.\n");
+
+    /* create the job desc first, even though we may not use it.  This
+     * gives us somewhere to store information
+     */
+    jd = alloc_job_desc(JOB_PRECREATE_POOL);
+    if (!jd)
+    {
+        return (-errno);
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->trove_callback.fn = precreate_pool_fill_thread_mgr_callback;
+    jd->trove_callback.data = (void*)jd;
+    jd->u.precreate_pool.precreate_pool = precreate_pool;
+    jd->u.precreate_pool.precreate_handle_array = precreate_handle_array;
+    jd->u.precreate_pool.precreate_handle_count = precreate_handle_count;
+    jd->u.precreate_pool.precreate_handle_index = 0;
+    jd->u.precreate_pool.first_callback_flag = 1;
+    jd->u.precreate_pool.fsid = fsid;
+    jd->u.precreate_pool.key_array = 
+        malloc(PRECREATE_POOL_MAX_KEYS*sizeof(TROVE_keyval_s));
+    if(!jd->u.precreate_pool.key_array)
+    {
+        dealloc_job_desc(jd);
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return(1);
+    }
+
+    /* reuse the logic for trove op completion to get this started */
+    precreate_pool_fill_thread_mgr_callback(jd, 0);
+
+    /* for the moment, this type of job cannot immediately complete */
+
+    *id = jd->job_id;
+    return (0);
+}
+  
+/* job_precreate_pool_lookup_server()
+ *
+ * resolves a string hostname into a pool handle 
+ */
+int job_precreate_pool_lookup_server(
+    const char* host, 
+    PVFS_fs_id fsid, 
+    PVFS_handle* pool_handle)
+{
+    struct precreate_pool* pool;
+    struct qlist_head* iterator;
+
+    gen_mutex_lock(&precreate_pool_mutex);
+
+    /* check pool list, go back to sleep if any are empty */
+    qlist_for_each(iterator, &precreate_pool_list)
+    {
+        pool = qlist_entry(iterator, struct precreate_pool,
+            list_link);
+        if(!strcmp(pool->host, host) && fsid == pool->fsid)
+        {
+            *pool_handle = pool->pool_handle;
+            gen_mutex_unlock(&precreate_pool_mutex);
+            return(0);
+        }
+    }
+    gen_mutex_unlock(&precreate_pool_mutex);
+
+    return(-PVFS_ENOENT);
+}
+  
+int job_precreate_pool_register_server(
+    const char* host, 
+    PVFS_fs_id fsid, 
+    PVFS_handle pool_handle, 
+    int count)
+{
+    struct precreate_pool* tmp_pool;
+
+    /* create a little struct to track the pool information for this peer
+     * server 
+     */
+    tmp_pool = malloc(sizeof(*tmp_pool));
+    if(!tmp_pool)
+    {
+        return(-ENOMEM);
+    }
+
+    tmp_pool->host = strdup(host);
+    if(!tmp_pool->host)
+    {
+        free(tmp_pool);
+        return(-ENOMEM);
+    }
+
+    tmp_pool->fsid = fsid;
+    tmp_pool->pool_handle = pool_handle;
+    tmp_pool->pool_count = count;
+    gossip_debug(GOSSIP_JOB_DEBUG, 
+        "Pool count for handle %llu initially set to %d\n", 
+        llu(tmp_pool->pool_handle), 
+        tmp_pool->pool_count);
+
+    gossip_debug(GOSSIP_JOB_DEBUG,
+        "Initial pool count for host %s, fsid %d: %d\n", host, (int)fsid,
+        count);
+
+    /* stash the info where we can search and find it later */
+    qlist_add(&tmp_pool->list_link, &precreate_pool_list);
+
+#if 0
+    /* here are the steps to tear down these data structures if needed */
+    struct qlist_head* iterator;
+    struct qlist_head* scratch;
+    struct precreate_pool* pool;
+
+    qlist_for_each_safe(iterator, scratch, &precreate_pool_list)
+    {
+        pool = qlist_entry(iterator, struct precreate_pool,
+            list_link);
+        free(pool->host);
+        free(pool);
+    }
+#endif
+
+    return(1);
+}
+   
+/* job_precreate_pool_check_level()
+ *
+ * checks to see if the current pool level is below a specified threshold
+ *
+ * returns 1 on immediate completion, 0 if level is not low enough yet
+ */
+int job_precreate_pool_check_level(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    int low_threshold,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id)
+{
+    struct qlist_head* iterator;
+    struct precreate_pool* pool;
+    struct job_desc *jd = NULL;
+
+    gen_mutex_lock(&precreate_pool_mutex);
+    qlist_for_each(iterator, &precreate_pool_list)
+    {
+        pool = qlist_entry(iterator, struct precreate_pool,
+            list_link);
+        if(pool->pool_handle == precreate_pool &&
+            pool->fsid == fsid)
+        {
+            if(pool->pool_count < low_threshold)
+            {
+                /* handle count is below the low threshold */
+                out_status_p->error_code = 0;
+                gen_mutex_unlock(&precreate_pool_mutex);
+                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count low.\n");
+                return(1);
+            }
+            else
+            {
+                /* we are above threshold right now; queue up until it drops */
+                jd = alloc_job_desc(JOB_PRECREATE_POOL);
+                if (!jd)
+                {
+                    out_status_p->error_code = -PVFS_ENOMEM;
+                    gen_mutex_unlock(&precreate_pool_mutex);
+                    return(1);
+                }
+                jd->job_user_ptr = user_ptr;
+                jd->context_id = context_id;
+                jd->status_user_tag = status_user_tag;
+                jd->u.precreate_pool.precreate_pool = precreate_pool;
+                jd->u.precreate_pool.fsid = fsid;
+                jd->u.precreate_pool.low_threshold = low_threshold;
+                *id = jd->job_id;
+
+                qlist_add(&jd->job_desc_q_link, &precreate_pool_check_level_list);
+                gen_mutex_unlock(&precreate_pool_mutex);
+                gossip_debug(GOSSIP_JOB_DEBUG, "found pool count high.\n");
+                return(0);
+            }
+            break;
+        }
+    }
+    gen_mutex_unlock(&precreate_pool_mutex);
+
+    return(-PVFS_EINVAL);
+}
+ 
+/* job_precreate_pool_get_handles()
+ *
+ * Retrieves a set of datafile handles from one or more precreate pools.
+ * Servers may be specified using bmi addresses in the servers array.  If
+ * servers is NULL, then it will provide handles from pools in round robin
+ * manner.
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on failure
+ */
+int job_precreate_pool_get_handles(
+    PVFS_fs_id fsid,
+    int count,
+    const char** servers,
+    PVFS_handle* handle_array,
+    PVFS_ds_flags flags,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id)
+{
+    struct job_desc *jd = NULL;
+
+    if(count < 0)
+    {
+        out_status_p->error_code = -PVFS_EINVAL;
+        return(1);
+    }
+
+    jd = alloc_job_desc(JOB_PRECREATE_POOL);
+    if (!jd)
+    {
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return(1);
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->u.precreate_pool.precreate_handle_array = handle_array;
+    jd->u.precreate_pool.precreate_handle_count = count;
+    jd->u.precreate_pool.precreate_handle_index = 0;
+    jd->u.precreate_pool.fsid = fsid;
+    jd->u.precreate_pool.servers = servers;
+    jd->u.precreate_pool.trove_pending = 0;
+    jd->u.precreate_pool.flags = flags;
+    
+    precreate_pool_get_handles_try_post(jd);
+
+    /* for the moment, this type of job cannot immediately complete */
+    *id = jd->job_id;
+    return(0);
+}
+
+/* precreate_pool_get_handles_try_post()
+ *
+ * Internal function used by job_precreate_pool_get_handles().  This
+ * function will check to see if all pools are ready (at least one handle
+ * available) and then post all required trove operations
+ *
+ * no return value
+ */
+static void precreate_pool_get_handles_try_post(struct job_desc* jd)
+{
+    struct precreate_pool* pool;
+    TROVE_op_id tmp_id;
+    int ret;
+    struct precreate_pool_get_trove* tmp_trove_array;
+    struct qlist_head* iterator;
+    struct qlist_head* scratch;
+    struct job_desc* jd_checker;
+    int i;
+
+    gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_get_handles_try_post\n");
+
+    gen_mutex_lock(&precreate_pool_mutex);
+
+    /* check pool list, go back to sleep if any are empty */
+    qlist_for_each(iterator, &precreate_pool_list)
+    {
+        pool = qlist_entry(iterator, struct precreate_pool,
+            list_link);
+        if(pool->pool_count < 1)
+        {
+            /* queue up until the count for this pool increases */
+            qlist_add(&jd->job_desc_q_link, &precreate_pool_get_handles_list);
+            gossip_debug(GOSSIP_JOB_DEBUG, "Found empty precreate pool %llu\n", llu(pool->pool_handle));
+            
+            gen_mutex_unlock(&precreate_pool_mutex);
+            return;
+        }
+    }
+
+    /* if we get to this point, set up necessary information for all trove 
+     * operations needed to service job
+     */
+    tmp_trove_array = malloc(jd->u.precreate_pool.precreate_handle_count *
+        sizeof(struct precreate_pool_get_trove));
+    if(!tmp_trove_array)
+    {
+        gen_mutex_unlock(&precreate_pool_mutex);
+        gen_mutex_lock(&completion_mutex);        
+        jd->u.precreate_pool.error_code = -PVFS_ENOMEM;
+        job_desc_q_add(completion_queue_array[jd->context_id], jd);
+        jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+        /* wake up anyone waiting for completion */
+        pthread_cond_signal(&completion_cond);
+#endif
+        gen_mutex_unlock(&completion_mutex);        
+        return;
+
+    }
+    jd->u.precreate_pool.data = tmp_trove_array;
+
+    /* translate reqested servers and set up necessary fields to post 
+     * trove operations
+     */
+    for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
+    {
+        if(jd->u.precreate_pool.servers)
+        {
+            /* caller wanted specific servers ; search through list and 
+             * set current pool to appropriate entry for this server
+             */
+            jd->u.precreate_pool.current_pool = NULL; /* sentinal */
+            qlist_for_each(iterator, &precreate_pool_list)
+            {
+                pool = qlist_entry(iterator, struct precreate_pool,
+                    list_link);
+                if(!strcmp(pool->host, jd->u.precreate_pool.servers[i]))
+                {
+                    jd->u.precreate_pool.current_pool = iterator;
+                    break;
+                }
+            }
+            if(!jd->u.precreate_pool.current_pool)
+            {
+                gossip_err("Error: get_handles(): unknown server: %s\n",
+                    jd->u.precreate_pool.servers[i]);
+
+                free(tmp_trove_array);
+                gen_mutex_unlock(&precreate_pool_mutex);
+
+                gen_mutex_lock(&completion_mutex);        
+                jd->u.precreate_pool.error_code = -PVFS_EINVAL;
+                job_desc_q_add(completion_queue_array[jd->context_id], jd);
+                jd->completed_flag = 1;
+        #ifdef __PVFS2_JOB_THREADED__
+                /* wake up anyone waiting for completion */
+                pthread_cond_signal(&completion_cond);
+        #endif
+                gen_mutex_unlock(&completion_mutex);        
+                return;
+            }
+        }
+        else
+        {
+            /* caller wants whatever we hand out */
+            if(jd->u.precreate_pool.current_pool == NULL ||
+                jd->u.precreate_pool.current_pool->next == &precreate_pool_list)
+            {
+                /* either we are just starting, or we have wrapped around */
+                jd->u.precreate_pool.current_pool = precreate_pool_list.next;
+            }
+            else
+            {
+                /* normal case; cycle to next pool */
+                jd->u.precreate_pool.current_pool =
+                    jd->u.precreate_pool.current_pool->next;
+            }
+        }
+
+        tmp_trove_array[i].pool = qlist_entry(jd->u.precreate_pool.current_pool, 
+            struct precreate_pool, list_link);
+
+        tmp_trove_array[i].jd = jd;
+        tmp_trove_array[i].pos = PVFS_ITERATE_START;
+        tmp_trove_array[i].count = 1;
+        tmp_trove_array[i].key.buffer 
+            = &jd->u.precreate_pool.precreate_handle_array[i];
+        tmp_trove_array[i].key.buffer_sz = sizeof(PVFS_handle);
+        tmp_trove_array[i].trove_callback.fn 
+            = precreate_pool_get_thread_mgr_callback;
+        tmp_trove_array[i].trove_callback.data 
+            = &tmp_trove_array[i];
+    }
+
+    /* post all trove operations at once */
+    for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
+    { 
+        /* go ahead and decrement count to avoid races with other consumers */
+        tmp_trove_array[i].pool->pool_count--;
+        gossip_debug(GOSSIP_JOB_DEBUG, 
+            "Pool count for handle %llu decremented to %d\n", 
+            llu(tmp_trove_array[i].pool->pool_handle), 
+            tmp_trove_array[i].pool->pool_count);
+
+        /* is anyone waiting to check the count of this pool? */
+        if(!qlist_empty(&precreate_pool_check_level_list))
+        {
+            qlist_for_each_safe(iterator, scratch, 
+                &precreate_pool_check_level_list)
+            {
+                jd_checker = qlist_entry(iterator, struct job_desc,
+                    job_desc_q_link);
+                if(jd_checker->u.precreate_pool.precreate_pool == 
+                    tmp_trove_array[i].pool->pool_handle &&
+                    tmp_trove_array[i].pool->pool_count < 
+                    jd_checker->u.precreate_pool.low_threshold)
+                {
+                    /* the pool level is low */
+                    gossip_debug(GOSSIP_JOB_DEBUG, "Pool count low, waking up waiter for handle %llu.\n", llu(jd_checker->u.precreate_pool.precreate_pool));
+                    qlist_del(&jd_checker->job_desc_q_link);
+
+                    /* move waiting job to completion queue */
+                    gen_mutex_lock(&completion_mutex);        
+                    job_desc_q_add(completion_queue_array[jd->context_id], jd_checker);
+                    jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+                    /* wake up anyone waiting for completion */
+                    pthread_cond_signal(&completion_cond);
+#endif
+                    gen_mutex_unlock(&completion_mutex);        
+                }
+            }
+        }
+
+        /* post trove operation to pull out a handle */
+        ret = trove_keyval_iterate_keys(
+            tmp_trove_array[i].pool->fsid, 
+            tmp_trove_array[i].pool->pool_handle,
+            &tmp_trove_array[i].pos,
+            &tmp_trove_array[i].key,
+            &tmp_trove_array[i].count,
+            tmp_trove_array[i].jd->u.precreate_pool.flags|
+            TROVE_BINARY_KEY|
+            TROVE_KEYVAL_HANDLE_COUNT|
+            TROVE_KEYVAL_ITERATE_REMOVE,
+            NULL, 
+            &tmp_trove_array[i].trove_callback, 
+            global_trove_context,
+            &tmp_id);
+        if(ret < 0)
+        {
+            precreate_pool_get_thread_mgr_callback_unlocked(
+                &tmp_trove_array[i], ret);
+        }
+        else if(ret == 1)
+        {
+            precreate_pool_get_thread_mgr_callback_unlocked(
+                &tmp_trove_array[i], 0);
+        }
+        else
+        {
+            /* callback will be triggered later */
+            trove_pending_count++;
+            jd->u.precreate_pool.trove_pending++;
+        }
+    }
+    gen_mutex_unlock(&precreate_pool_mutex);
+}
+
+/* job_precreate_pool_iterate_handles()
+ * 
+ * similar to the trove iterate handles function, but returns all handles
+ * stored in the precreate pools, including the handles for the pool objects
+ * themselves.
+ */
+int job_precreate_pool_iterate_handles(
+    PVFS_fs_id fsid,
+    PVFS_ds_position position,
+    PVFS_handle* handle_array,
+    int count,
+    PVFS_ds_flags flags,
+    PVFS_vtag* vtag,
+    void* user_ptr,
+    job_aint status_user_tag,
+    job_status_s* out_status_p,
+    job_id_t* id,
+    job_context_id context_id)
+{
+    PVFS_ds_position local_position;
+    PVFS_ds_position pool_index;
+    struct qlist_head* iterator;
+    PVFS_ds_position tmp_index = 1;
+    struct precreate_pool* pool = NULL;
+    int ret;
+    struct job_desc *jd = NULL;
+    void* user_ptr_internal;
+    TROVE_op_id tmp_id;
+    int i;
+
+    /* low order bits are the trove iterate position */
+    local_position = position & 0xffffffff;
+    /* high order bits tell us which pool we are on */
+    pool_index = position >> 32;
+
+    /* we start indexing at one and reserve 0 for the special start and end
+     * values for the entire set of pools
+     */
+    if(pool_index == 0)
+    {
+        if(local_position == PVFS_ITERATE_START)
+        {
+            pool_index = 1;
+        }
+        else
+        {
+            gossip_err("Error: invalid position given to job_precreate_pool_iterate_handles().\n");
+            out_status_p->error_code = -PVFS_EINVAL;
+            return(1);
+        }
+    }
+
+    gen_mutex_lock(&precreate_pool_mutex);
+
+    qlist_for_each(iterator, &precreate_pool_list)
+    {
+        if(tmp_index == pool_index)
+        {
+            pool = qlist_entry(iterator, struct precreate_pool,
+                list_link);
+            break;
+        }
+        tmp_index++;
+    }
+
+    if(!pool)
+    {
+        /* we ran out of pools; iteration is done */
+        gen_mutex_unlock(&precreate_pool_mutex);
+        out_status_p->error_code = 0;
+        out_status_p->count = 0;
+        out_status_p->position = PVFS_ITERATE_END;
+        return(1);
+    }
+
+    if(local_position == PVFS_ITERATE_END)
+    {
+        /* we got all of the handles out of the pool */
+        /* pass back pool handle by itself and go to next pool */
+        handle_array[0] = pool->pool_handle;
+        /* skip to next pool */
+        pool_index++;
+        out_status_p->position = pool_index << 32;
+        out_status_p->position |= PVFS_ITERATE_START;
+        out_status_p->count = 1;
+        out_status_p->error_code = 0;
+        gen_mutex_unlock(&precreate_pool_mutex);
+        return(1);
+    }
+
+    /* get ready to post a job to trove to find handles */
+    jd = alloc_job_desc(JOB_PRECREATE_POOL);
+    if (!jd)
+    {
+        gen_mutex_unlock(&precreate_pool_mutex);
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+    jd->u.precreate_pool.key_array = malloc(count * sizeof(*jd->u.precreate_pool.key_array));
+    if(!jd->u.precreate_pool.key_array)
+    {
+        gen_mutex_unlock(&precreate_pool_mutex);
+        dealloc_job_desc(jd);
+        out_status_p->error_code = -PVFS_ENOMEM;
+        return 1;
+    }
+    for(i=0; i<count; i++)
+    {
+        jd->u.precreate_pool.key_array[i].buffer = &handle_array[i];
+        jd->u.precreate_pool.key_array[i].buffer_sz = sizeof(handle_array[i]);
+    }
+    jd->job_user_ptr = user_ptr;
+    jd->u.precreate_pool.position = local_position;
+    jd->u.precreate_pool.count = count;
+    jd->u.precreate_pool.precreate_handle_array = handle_array;
+    jd->u.precreate_pool.pool_index = pool_index;
+    jd->context_id = context_id;
+    jd->status_user_tag = status_user_tag;
+    jd->trove_callback.fn = precreate_pool_iterate_callback;
+    jd->trove_callback.data = (void*)jd;
+    user_ptr_internal = &jd->trove_callback;
+    JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+    ret = trove_keyval_iterate_keys(fsid, pool->pool_handle,
+                               &(jd->u.precreate_pool.position), 
+                               jd->u.precreate_pool.key_array, 
+                               &(jd->u.precreate_pool.count), flags, NULL,
+                               user_ptr_internal, 
+                               global_trove_context, &tmp_id);
+#else
+    gossip_err("Error: Trove support not enabled.\n");
+    ret = -ENOSYS;
+#endif
+
+    if (ret < 0)
+    {
+        /* error posting trove operation */
+        JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, 0, jd->job_id);
+        free(jd->u.precreate_pool.key_array);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        out_status_p->error_code = ret;
+        out_status_p->status_user_tag = status_user_tag;
+        gen_mutex_unlock(&precreate_pool_mutex);
+        return (1);
+    }
+
+    if (ret == 1)
+    {
+        /* immediate completion */
+        out_status_p->error_code = 0;
+        out_status_p->status_user_tag = status_user_tag;
+        out_status_p->position = pool_index << 32;
+        out_status_p->position |= jd->u.precreate_pool.position;
+        out_status_p->count = jd->u.precreate_pool.count;
+        JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, 0, jd->job_id);
+        free(jd->u.precreate_pool.key_array);
+        dealloc_job_desc(jd);
+        jd = NULL;
+        gen_mutex_unlock(&precreate_pool_mutex);
+        return (ret);
+    }
+
+    /* if we fall through to this point, the job did not
+     * immediately complete and we must queue up to test later
+     */
+    *id = jd->job_id;
+    trove_pending_count++;
+    jd->event_type = PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS;
+    gen_mutex_unlock(&precreate_pool_mutex);
+
+    return (0);
+}
+
+
+
+#endif /* __PVFS2_TROVE_SUPPORT__ */
 
 /*
  * Local variables:

Index: job.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.h,v
diff -p -u -r1.56 -r1.57
--- job.h	4 Apr 2008 17:57:37 -0000	1.56
+++ job.h	8 Sep 2008 15:42:43 -0000	1.57
@@ -434,6 +434,21 @@ int job_trove_keyval_remove(PVFS_fs_id c
 			    job_id_t * id,
 			    job_context_id context_id);
 
+/* remove a list of key/value entries */
+int job_trove_keyval_remove_list(PVFS_fs_id coll_id,
+                                  PVFS_handle handle,
+                                  PVFS_ds_keyval * key_a,
+                                  PVFS_ds_keyval * val_a,
+                                  int * error_a,
+                                  int count,
+                                  PVFS_ds_flags flags,
+                                  PVFS_vtag * vtag,
+                                  void *user_ptr,
+                                  job_aint status_user_tag,
+                                  job_status_s * out_status_p,
+                                  job_id_t * id,
+                                  job_context_id context_id);
+
 /* check consistency of a key/value pair for a given vtag */
 int job_trove_keyval_validate(PVFS_fs_id coll_id,
 			      PVFS_handle handle,
@@ -498,6 +513,20 @@ int job_trove_dspace_create(PVFS_fs_id c
 			    job_id_t * id,
 			    job_context_id context_id);
 
+/* create a set of new data space objects */
+int job_trove_dspace_create_list(PVFS_fs_id coll_id,
+			    PVFS_handle_extent_array *handle_extent_array,
+                            PVFS_handle* out_handle_arry,
+                            int count,
+			    PVFS_ds_type type,
+			    void *hint,
+                            PVFS_ds_flags flags,
+			    void *user_ptr,
+			    job_aint status_user_tag,
+			    job_status_s * out_status_p,
+			    job_id_t * id,
+			    job_context_id context_id);
+
 /* remove an entire data space object (byte stream and key/value) */
 int job_trove_dspace_remove(PVFS_fs_id coll_id,
 			    PVFS_handle handle,
@@ -508,6 +537,18 @@ int job_trove_dspace_remove(PVFS_fs_id c
 			    job_id_t * id,
 			    job_context_id context_id);
 
+/* remove a list of data space objects (byte stream and key/value) */
+int job_trove_dspace_remove_list(PVFS_fs_id coll_id,
+			    PVFS_handle* handle_array,
+                            PVFS_error *out_error_array,
+                            int count,
+                            PVFS_ds_flags flags,
+			    void *user_ptr,
+			    job_aint status_user_tag,
+			    job_status_s * out_status_p,
+			    job_id_t * id,
+			    job_context_id context_id);
+
 /* verify that a given dataspace exists and discover its type */
 int job_trove_dspace_verify(PVFS_fs_id coll_id,
 			    PVFS_handle handle,
@@ -577,6 +618,73 @@ int job_null(
     job_id_t * id,
     job_context_id context_id);
 
+int job_precreate_pool_fill(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    PVFS_handle* precreate_handle_array,
+    int precreate_handle_count,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id);
+ 
+int job_precreate_pool_fill_signal_error(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    int error_code,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id);
+
+int job_precreate_pool_check_level(
+    PVFS_handle precreate_pool,
+    PVFS_fs_id fsid,
+    int low_threshold,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id);
+
+int job_precreate_pool_iterate_handles(
+    PVFS_fs_id fsid,
+    PVFS_ds_position position,
+    PVFS_handle* handle_array,
+    int count,
+    PVFS_ds_flags flags,
+    PVFS_vtag* vtag,
+    void* user_ptr,
+    job_aint status_user_tag,
+    job_status_s* out_status_p,
+    job_id_t* id,
+    job_context_id context_id);
+
+int job_precreate_pool_get_handles(
+    PVFS_fs_id fsid,
+    int count,
+    const char** servers,
+    PVFS_handle* handle_array,
+    PVFS_ds_flags flags,
+    void *user_ptr,
+    job_aint status_user_tag,
+    job_status_s * out_status_p,
+    job_id_t * id,
+    job_context_id context_id);
+
+int job_precreate_pool_register_server(
+    const char* host, 
+    PVFS_fs_id fsid, 
+    PVFS_handle pool_handle, 
+    int count);
+ 
+int job_precreate_pool_lookup_server(
+    const char* host, 
+    PVFS_fs_id fsid, 
+    PVFS_handle* pool_handle);
+  
 /******************************************************************
  * job test/wait for completion functions 
  */



More information about the Pvfs2-cvs mailing list