[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/job:
job-desc-queue.c job-desc-queue.h job.c job.h
CVS commit program
cvs at parl.clemson.edu
Mon Sep 8 11:42:43 EDT 2008
Update of /projects/cvsroot/pvfs2-1/src/io/job
In directory parlweb1:/tmp/cvs-serv32611/src/io/job
Modified Files:
job-desc-queue.c job-desc-queue.h job.c job.h
Log Message:
Merging small files branch to head. Includes server side precreation of
data files and file stuffing.
Index: job-desc-queue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.c,v
diff -p -u -r1.18 -r1.19
--- job-desc-queue.c 11 Oct 2007 23:11:33 -0000 1.18
+++ job-desc-queue.c 8 Sep 2008 15:42:43 -0000 1.19
@@ -208,6 +208,9 @@ void job_desc_q_dump(job_desc_q_p jdqp)
case JOB_NULL:
gossip_err(" type: JOB_NULL.\n");
break;
+ case JOB_PRECREATE_POOL:
+ gossip_err(" type: JOB_PRECREATE_POOL.\n");
+ break;
}
}
Index: job-desc-queue.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.h,v
diff -p -u -r1.26 -r1.27
--- job-desc-queue.h 7 Dec 2004 15:09:29 -0000 1.26
+++ job-desc-queue.h 8 Sep 2008 15:42:43 -0000 1.27
@@ -42,6 +42,30 @@ struct trove_desc
int count;
};
+/* describes precreate pool operations */
+struct precreate_pool_desc
+{
+ PVFS_handle precreate_pool;
+ PVFS_fs_id fsid;
+ PVFS_handle* precreate_handle_array;
+ int precreate_handle_count;
+ int precreate_handle_index;
+ int posted_count;
+ const char** servers;
+ struct qlist_head* current_pool;
+ int trove_pending;
+ int low_threshold;
+ void* data;
+ int first_callback_flag;
+ TROVE_keyval_s* key_array;
+ PVFS_ds_flags flags;
+ PVFS_ds_position position;
+ PVFS_ds_position pool_index;
+ int count;
+
+ PVFS_error error_code;
+};
+
/* describes unexpected BMI operations */
struct bmi_unexp_desc
{
@@ -85,6 +109,7 @@ enum job_type
JOB_REQ_SCHED,
JOB_DEV_UNEXP,
JOB_REQ_SCHED_TIMER,
+ JOB_PRECREATE_POOL,
JOB_NULL
};
@@ -111,6 +136,7 @@ struct job_desc
struct req_sched_desc req_sched;
struct dev_unexp_desc dev_unexp;
struct null_info_desc null_info;
+ struct precreate_pool_desc precreate_pool;
}
u;
Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.c,v
diff -p -u -r1.181 -r1.182
--- job.c 4 Apr 2008 17:57:37 -0000 1.181
+++ job.c 8 Sep 2008 15:42:43 -0000 1.182
@@ -25,6 +25,7 @@
#include "id-generator.h"
#include "pint-event.h"
#include "job-time-mgr.h"
+#include "pvfs2-internal.h"
#define JOB_EVENT_START(__op, __id) \
PINT_event_timestamp(PVFS_EVENT_API_JOB, __op, 0, __id, \
@@ -69,6 +70,36 @@ enum
thread_wait_timeout = 10000 /* usecs */
};
+/* cap how many keys we dump into trove at once when filling precreate pools
+ * so that it doesn't clog up trove queues
+ */
+#define PRECREATE_POOL_MAX_KEYS 32
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+static gen_mutex_t precreate_pool_mutex = GEN_MUTEX_INITIALIZER;
+static QLIST_HEAD(precreate_pool_list);
+static QLIST_HEAD(precreate_pool_check_level_list);
+static QLIST_HEAD(precreate_pool_get_handles_list);
+struct precreate_pool
+{
+ struct qlist_head list_link;
+ char* host;
+ PVFS_fs_id fsid;
+ PVFS_handle pool_handle;
+ uint32_t pool_count;
+};
+struct precreate_pool_get_trove
+{
+ struct job_desc* jd; /* parent job descriptor */
+ /* variables needed per keyval_iterate_keys() call */
+ PVFS_ds_position pos;
+ PVFS_ds_keyval key;
+ int count;
+ struct PINT_thread_mgr_trove_callback trove_callback;
+ struct precreate_pool* pool;
+};
+#endif /* __PVFS2_TROVE_SUPPORT__ */
+
/********************************************************
* function prototypes
*/
@@ -102,6 +133,21 @@ static void flow_callback(flow_descripto
static gen_mutex_t work_cycle_mutex = GEN_MUTEX_INITIALIZER;
static void do_one_work_cycle_all(int idle_time_ms);
#endif
+#ifdef __PVFS2_TROVE_SUPPORT__
+static void precreate_pool_get_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_get_thread_mgr_callback_unlocked(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_fill_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_iterate_callback(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_get_handles_try_post(struct job_desc* jd);
+#endif
/********************************************************
* public interface
@@ -2094,6 +2140,7 @@ int job_trove_keyval_write_list(PVFS_fs_
JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, jd->job_id);
#ifdef __PVFS2_TROVE_SUPPORT__
+ gossip_debug(GOSSIP_JOB_DEBUG, "job_trove_keyval_write_list() posting trove_keyval_write_list()\n");
ret = trove_keyval_write_list(coll_id, handle,
key_array, val_array,
count, flags,
@@ -2138,6 +2185,88 @@ int job_trove_keyval_write_list(PVFS_fs_
return (0);
}
+int job_trove_keyval_remove_list(PVFS_fs_id coll_id,
+ PVFS_handle handle,
+ PVFS_ds_keyval * key_array,
+ PVFS_ds_keyval * val_array,
+ int * error_array,
+ int count,
+ PVFS_ds_flags flags,
+ PVFS_vtag * vtag,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ int ret = -1;
+ struct job_desc *jd = NULL;
+ void* user_ptr_internal;
+
+ /* create the job desc first, even though we may not use it. This
+ * gives us somewhere to store the BMI id and user ptr
+ */
+ jd = alloc_job_desc(JOB_TROVE);
+ if (!jd)
+ {
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->u.trove.vtag = vtag;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->trove_callback.fn = trove_thread_mgr_callback;
+ jd->trove_callback.data = (void*)jd;
+ user_ptr_internal = &jd->trove_callback;
+ JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_REMOVE_LIST, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+ ret = trove_keyval_remove_list(coll_id, handle,
+ key_array, val_array, error_array,
+ count, flags,
+ jd->u.trove.vtag, user_ptr_internal,
+ global_trove_context,
+ &(jd->u.trove.id));
+#else
+ gossip_err("Error: Trove support not enabled.\n");
+ ret = -ENOSYS;
+#endif
+
+ if (ret < 0)
+ {
+ /* error posting trove operation */
+ JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ out_status_p->error_code = ret;
+ out_status_p->status_user_tag = status_user_tag;
+ return (1);
+ }
+
+ if (ret == 1)
+ {
+ /* immediate completion */
+ out_status_p->error_code = 0;
+ out_status_p->status_user_tag = status_user_tag;
+ out_status_p->vtag = jd->u.trove.vtag;
+ JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ return (ret);
+ }
+
+ /* if we fall through to this point, the job did not
+ * immediately complete and we must queue up to test later
+ */
+ *id = jd->job_id;
+ trove_pending_count++;
+ jd->event_type = PVFS_EVENT_TROVE_KEYVAL_WRITE_LIST;
+
+ return (0);
+}
+
+
/* job_trove_keyval_flush()
*
* ask the storage layer to flush keyvals to disk
@@ -3147,6 +3276,188 @@ int job_trove_dspace_create(PVFS_fs_id c
return (0);
}
+/* job_trove_dspace_create_list()
+ *
+ * create a new data space object
+ *
+ * returns 0 on success, 1 on immediate completion, and -errno on
+ * failure
+ */
+int job_trove_dspace_create_list(PVFS_fs_id coll_id,
+ PVFS_handle_extent_array *handle_extent_array,
+ PVFS_handle* out_handle_array,
+ int count,
+ PVFS_ds_type type,
+ void *hint,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ /* post a dspace create list. If it completes (or fails) immediately, then
+ * return and fill in the status structure. If it needs to be tested
+ * for completion later, then queue up a job_desc structure.
+ */
+ int ret = -1;
+ struct job_desc *jd = NULL;
+ void* user_ptr_internal;
+
+ /* create the job desc first, even though we may not use it. This
+ * gives us somewhere to store the BMI id and user ptr
+ */
+ jd = alloc_job_desc(JOB_TROVE);
+ if (!jd)
+ {
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->u.trove.handle = PVFS_HANDLE_NULL;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->trove_callback.fn = trove_thread_mgr_callback;
+ jd->trove_callback.data = (void*)jd;
+ user_ptr_internal = &jd->trove_callback;
+ JOB_EVENT_START(PVFS_EVENT_TROVE_DSPACE_CREATE, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+ ret = trove_dspace_create_list(coll_id,
+ handle_extent_array,
+ out_handle_array,
+ count,
+ type,
+ hint, flags,
+ user_ptr_internal,
+ global_trove_context, &(jd->u.trove.id));
+#else
+ gossip_err("Error: Trove support not enabled.\n");
+ ret = -ENOSYS;
+#endif
+
+ if (ret < 0)
+ {
+ /* error posting trove operation */
+ JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ out_status_p->error_code = ret;
+ out_status_p->status_user_tag = status_user_tag;
+ return (1);
+ }
+
+ if (ret == 1)
+ {
+ /* immediate completion */
+ out_status_p->error_code = 0;
+ out_status_p->status_user_tag = status_user_tag;
+ JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_CREATE, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ return (ret);
+ }
+
+ /* if we fall through to this point, the job did not
+ * immediately complete and we must queue up to test later
+ */
+ *id = jd->job_id;
+ trove_pending_count++;
+ jd->event_type = PVFS_EVENT_TROVE_DSPACE_CREATE;
+
+ return (0);
+}
+
+/* job_trove_dspace_remove_list()
+ *
+ * remove a list of data space objects (byte stream and key/value)
+ *
+ * returns 0 on success, 1 on immediate completion, and -errno on
+ * failure
+ */
+int job_trove_dspace_remove_list(PVFS_fs_id coll_id,
+ PVFS_handle* handle_array,
+ PVFS_error *out_error_array,
+ int count,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ /* post a dspace remove_list. If it completes (or fails) immediately, then
+ * return and fill in the status structure. If it needs to be tested
+ * for completion later, then queue up a job_desc structure.
+ */
+ int ret = -1;
+ struct job_desc *jd = NULL;
+ void* user_ptr_internal;
+
+ /* create the job desc first, even though we may not use it. This
+ * gives us somewhere to store the BMI id and user ptr
+ */
+ jd = alloc_job_desc(JOB_TROVE);
+ if (!jd)
+ {
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->trove_callback.fn = trove_thread_mgr_callback;
+ jd->trove_callback.data = (void*)jd;
+ user_ptr_internal = &jd->trove_callback;
+ JOB_EVENT_START(PVFS_EVENT_TROVE_DSPACE_REMOVE, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+ ret = trove_dspace_remove_list(coll_id,
+ handle_array,
+ out_error_array,
+ count,
+ flags,
+ user_ptr_internal,
+ global_trove_context, &(jd->u.trove.id));
+#else
+ gossip_err("Error: Trove support not enabled.\n");
+ ret = -ENOSYS;
+#endif
+
+ if (ret < 0)
+ {
+ /* error posting trove operation */
+ JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ out_status_p->error_code = ret;
+ out_status_p->status_user_tag = status_user_tag;
+ return (1);
+ }
+
+ if (ret == 1)
+ {
+ /* immediate completion */
+ out_status_p->error_code = 0;
+ out_status_p->status_user_tag = status_user_tag;
+ JOB_EVENT_END(PVFS_EVENT_TROVE_DSPACE_REMOVE, 0, jd->job_id);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ return (ret);
+ }
+
+ /* if we fall through to this point, the job did not
+ * immediately complete and we must queue up to test later
+ */
+ *id = jd->job_id;
+ trove_pending_count++;
+ jd->event_type = PVFS_EVENT_TROVE_DSPACE_REMOVE;
+
+ return (0);
+}
+
+
+
/* job_trove_dspace_remove()
*
* remove an entire data space object (byte stream and key/value)
@@ -4286,20 +4597,21 @@ static void teardown_queues(void)
return;
}
-/* trove_thread_mgr_callback()
+#ifdef __PVFS2_TROVE_SUPPORT__
+
+/* precreate_pool_get_thread_mgr_callback_unlocked()
*
- * callback function executed by the thread manager for Trove when a Trove
- * job completes
+ * callback function executed by the thread manager for precreate pool get
+ * when a trove operation completes
*
* no return value
*/
-static void trove_thread_mgr_callback(
+static void precreate_pool_get_thread_mgr_callback_unlocked(
void* data,
PVFS_error error_code)
{
- struct job_desc* tmp_desc = (struct job_desc*)data;
- assert(tmp_desc);
-
+ struct precreate_pool_get_trove* tmp_trove = data;
+
gen_mutex_lock(&initialized_mutex);
if(initialized == 0)
{
@@ -4309,11 +4621,354 @@ static void trove_thread_mgr_callback(
}
gen_mutex_unlock(&initialized_mutex);
- gen_mutex_lock(&completion_mutex);
- if (tmp_desc->completed_flag == 0)
+ if(error_code == 0)
{
- /* set job descriptor fields and put into completion queue */
- tmp_desc->u.trove.state = error_code;
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Got precreated handle: %llu\n",
+ llu(*((PVFS_handle*)tmp_trove->key.buffer)));
+ }
+
+ trove_pending_count--;
+ tmp_trove->jd->u.precreate_pool.trove_pending--;
+
+ /* don't overwrite error codes from other trove ops */
+ if(tmp_trove->jd->u.precreate_pool.error_code == 0)
+ {
+ tmp_trove->jd->u.precreate_pool.error_code = error_code;
+ }
+
+ /* is this job done? */
+ if(tmp_trove->jd->u.precreate_pool.trove_pending == 0)
+ {
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ tmp_trove->jd->u.precreate_pool.error_code = 0;
+ job_desc_q_add(completion_queue_array[tmp_trove->jd->context_id],
+ tmp_trove->jd);
+ /* set completed flag while holding queue lock */
+ tmp_trove->jd->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ free(tmp_trove->jd->u.precreate_pool.data);
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+
+ return;
+}
+
+
+/* precreate_pool_iterate_callback()
+ *
+ * callback function executed by the thread mgr when a trove iterate
+ * completes
+ *
+ * no return value
+ */
+static void precreate_pool_iterate_callback(
+ void* data,
+ PVFS_error error_code)
+{
+ struct job_desc* tmp_desc = (struct job_desc*)data;
+
+ gen_mutex_lock(&initialized_mutex);
+ if(initialized == 0)
+ {
+ /* The job interface has been shutdown. Silently ignore callback. */
+ gen_mutex_unlock(&initialized_mutex);
+ return;
+ }
+ gen_mutex_unlock(&initialized_mutex);
+
+ gen_mutex_lock(&completion_mutex);
+ if (tmp_desc->completed_flag == 0)
+ {
+ /* set job descriptor fields and put into completion queue */
+ tmp_desc->u.precreate_pool.error_code = error_code;
+ free(tmp_desc->u.precreate_pool.key_array);
+ job_desc_q_add(completion_queue_array[tmp_desc->context_id],
+ tmp_desc);
+ /* set completed flag while holding queue lock */
+ tmp_desc->completed_flag = 1;
+
+ trove_pending_count--;
+
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ }
+ gen_mutex_unlock(&completion_mutex);
+
+ return;
+}
+
+/* precreate_pool_get_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool get
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_get_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code)
+{
+ gen_mutex_lock(&precreate_pool_mutex);
+ precreate_pool_get_thread_mgr_callback_unlocked(data, error_code);
+ gen_mutex_unlock(&precreate_pool_mutex);
+}
+
+/* precreate_pool_fill_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool fill
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_fill_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code)
+{
+ struct job_desc* jd = (struct job_desc*)data;
+ struct job_desc* jd_checker;
+ int ret;
+ int count = 0;
+ int i;
+ struct qlist_head* iterator;
+ struct qlist_head* scratch;
+ struct precreate_pool* pool;
+ int awoken_count = 0;
+ QLIST_HEAD(tmp_list);
+ job_id_t tmp_id;
+ int extra_trove_flags = 0;
+
+ assert(jd);
+
+ gen_mutex_lock(&initialized_mutex);
+ if(initialized == 0)
+ {
+ /* The job interface has been shutdown. Silently ignore callback. */
+ gen_mutex_unlock(&initialized_mutex);
+ return;
+ }
+ gen_mutex_unlock(&initialized_mutex);
+
+ if(error_code != 0)
+ {
+ gossip_err("Error: unable to write all precreated handles to pool.\n");
+ gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
+ free(jd->u.precreate_pool.key_array);
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ jd->u.precreate_pool.error_code = error_code;
+ job_desc_q_add(completion_queue_array[jd->context_id], jd);
+ /* set completed flag while holding queue lock */
+ jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+
+ if(jd->u.precreate_pool.first_callback_flag == 1)
+ {
+ /* this is the first post */
+ gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() first post.\n");
+ jd->u.precreate_pool.first_callback_flag = 0;
+ }
+ else
+ {
+ gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_fill_thread_mgr_callback() completed trove op.\n");
+ /* a trove operation completed successfully */
+ jd->u.precreate_pool.precreate_handle_index +=
+ jd->u.precreate_pool.posted_count;
+ trove_pending_count--;
+
+ /* increment in-memory count for this pool */
+ gen_mutex_lock(&precreate_pool_mutex);
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ if(pool->pool_handle == jd->u.precreate_pool.precreate_pool &&
+ pool->fsid == jd->u.precreate_pool.fsid)
+ {
+ pool->pool_count += jd->u.precreate_pool.posted_count;
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Pool count for handle %llu incremented to %d\n",
+ llu(pool->pool_handle),
+ pool->pool_count);
+ break;
+ }
+ }
+
+ /* find out if anyone was sleeping because a pool was empty */
+ gossip_debug(GOSSIP_JOB_DEBUG, "checking for get_handles() sleepers\n");
+ qlist_for_each_safe(iterator, scratch,
+ &precreate_pool_get_handles_list)
+ {
+ jd_checker = qlist_entry(iterator, struct job_desc,
+ job_desc_q_link);
+
+ awoken_count++;
+ /* put them on a new local queue */
+ qlist_del(&jd_checker->job_desc_q_link);
+ qlist_add(&jd_checker->job_desc_q_link, &tmp_list);
+ gossip_debug(GOSSIP_JOB_DEBUG, "Found someone waiting to get handles from precreate pool\n");
+
+ if(awoken_count == jd->u.precreate_pool.posted_count)
+ {
+ /* that's as many as we should wake up right now */
+ break;
+ }
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ /* now that we have collected the sleepers into our own private
+ * queue, we can push them without the precreate_pool_mutex held
+ */
+ gossip_debug(GOSSIP_JOB_DEBUG, "About to push on get_handles() sleepers.\n");
+ qlist_for_each_safe(iterator, scratch, &tmp_list)
+ {
+ jd_checker = qlist_entry(iterator, struct job_desc,
+ job_desc_q_link);
+ qlist_del(&jd_checker->job_desc_q_link);
+ gossip_debug(GOSSIP_JOB_DEBUG, "Pushing get_handles() sleeper for jd: %p.\n", jd_checker);
+ precreate_pool_get_handles_try_post(jd_checker);
+ }
+ }
+
+ /* are we done? */
+ if(jd->u.precreate_pool.precreate_handle_index >=
+ jd->u.precreate_pool.precreate_handle_count)
+ {
+ free(jd->u.precreate_pool.key_array);
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ jd->u.precreate_pool.error_code = 0;
+ job_desc_q_add(completion_queue_array[jd->context_id],
+ jd);
+ /* set completed flag while holding queue lock */
+ jd->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+
+ /* fill in information for next keyval write */
+ for(i=jd->u.precreate_pool.precreate_handle_index;
+ (i < jd->u.precreate_pool.precreate_handle_count &&
+ (i < (jd->u.precreate_pool.precreate_handle_index
+ + PRECREATE_POOL_MAX_KEYS)));
+ i++)
+ {
+ jd->u.precreate_pool.key_array[count].buffer =
+ &jd->u.precreate_pool.precreate_handle_array[i];
+ jd->u.precreate_pool.key_array[count].buffer_sz = sizeof(PVFS_handle);
+ count++;
+
+ /* always leave the values zeroed out */
+ }
+
+ jd->u.precreate_pool.posted_count = count;
+
+ if((jd->u.precreate_pool.posted_count
+ + jd->u.precreate_pool.precreate_handle_index)
+ >= jd->u.precreate_pool.precreate_handle_count)
+ {
+ /* this will be the last set written; sync db */
+ extra_trove_flags |= TROVE_SYNC;
+ }
+
+ gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() posting trove_keyval_write_list()\n");
+ ret = trove_keyval_write_list(jd->u.precreate_pool.fsid,
+ jd->u.precreate_pool.precreate_pool,
+ jd->u.precreate_pool.key_array,
+ NULL,
+ count,
+ (TROVE_BINARY_KEY|TROVE_NOOVERWRITE|
+ TROVE_KEYVAL_HANDLE_COUNT|extra_trove_flags),
+ NULL,
+ &jd->trove_callback,
+ global_trove_context,
+ &tmp_id);
+
+ trove_pending_count++;
+
+ if(ret < 0)
+ {
+ gossip_err("Error: unable to write all precreated handles to pool.\n");
+ gossip_err("Warning: fsck may be needed to recover stranded handles.\n");
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ jd->u.precreate_pool.error_code = ret;
+ job_desc_q_add(completion_queue_array[jd->context_id], jd);
+ /* set completed flag while holding queue lock */
+ jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+ else if(ret == 1)
+ {
+ gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() immediate completion\n");
+ precreate_pool_fill_thread_mgr_callback(jd, 0);
+ }
+ else
+ {
+ gossip_debug(GOSSIP_JOB_DEBUG, "trove_keyval_write_list() returned zero\n");
+ }
+
+ return;
+}
+#endif /* __PVFS2_TROVE_SUPPORT__ */
+
+
+/* trove_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for Trove when a Trove
+ * job completes
+ *
+ * no return value
+ */
+static void trove_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code)
+{
+ struct job_desc* tmp_desc = (struct job_desc*)data;
+ assert(tmp_desc);
+
+ gen_mutex_lock(&initialized_mutex);
+ if(initialized == 0)
+ {
+ /* The job interface has been shutdown. Silently ignore callback. */
+ gen_mutex_unlock(&initialized_mutex);
+ return;
+ }
+ gen_mutex_unlock(&initialized_mutex);
+
+ gen_mutex_lock(&completion_mutex);
+ if (tmp_desc->completed_flag == 0)
+ {
+ /* set job descriptor fields and put into completion queue */
+ tmp_desc->u.trove.state = error_code;
job_desc_q_add(completion_queue_array[tmp_desc->context_id],
tmp_desc);
/* set completed flag while holding queue lock */
@@ -4398,7 +5053,7 @@ static void bmi_thread_mgr_unexp_handler
gen_mutex_lock(&bmi_unexp_mutex);
/* remove the operation from the pending bmi_unexp queue */
tmp_desc = job_desc_q_shownext(bmi_unexp_queue);
- assert(tmp_desc != NULL); /* TODO: fix this */
+ assert(tmp_desc != NULL);
if (tmp_desc->completed_flag == 0)
{
job_desc_q_remove(tmp_desc);
@@ -4486,6 +5141,12 @@ static void fill_status(struct job_desc
assert(jd);
assert(status);
+#if 0
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "job fill_status() for id: %llu, type: %d\n",
+ llu(jd->job_id), jd->type);
+#endif
+
status->status_user_tag = jd->status_user_tag;
if (returned_user_ptr_p)
@@ -4512,9 +5173,6 @@ static void fill_status(struct job_desc
status->error_code = jd->u.req_sched.error_code;
break;
case JOB_TROVE:
- /* TODO: make this work out for whatever type of trove
- * operation this is...
- */
status->error_code = jd->u.trove.state;
status->actual_size = jd->u.trove.actual_size;
status->vtag = jd->u.trove.vtag;
@@ -4534,6 +5192,12 @@ static void fill_status(struct job_desc
case JOB_NULL:
status->error_code = jd->u.null_info.error_code;
break;
+ case JOB_PRECREATE_POOL:
+ status->error_code = jd->u.precreate_pool.error_code;
+ status->count = jd->u.precreate_pool.count;
+ status->position = jd->u.precreate_pool.pool_index << 32;
+ status->position |= jd->u.precreate_pool.position;
+ break;
}
if(jd->event_type)
@@ -4567,7 +5231,7 @@ static int do_one_test_cycle_req_sched(v
{
/* critical failure */
/* TODO: can I clean up anything else here? */
- gossip_lerr("Error: critical BMI failure.\n");
+ gossip_lerr("Error: critical request scheduler failure.\n");
return (ret);
}
@@ -4685,7 +5349,12 @@ static int completion_query_some(job_id_
return(1);
}
-/* TODO: fill in comment */
+/* completion_query_context()
+ *
+ * retrieves completed jobs from specified context
+ *
+ * returns 1 if anything completed, 0 otherwise
+ */
static int completion_query_context(job_id_t * out_id_array_p,
int *inout_count_p,
void **returned_user_ptr_array,
@@ -4829,6 +5498,721 @@ static void flow_callback(flow_descripto
return;
}
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+
+/* job_precreate_pool_fill_signal_error()
+ *
+ * used for the entity responsible for filling the pool to indicate when
+ * there are errors preventing it from making progress. The error_code will
+ * be propigated to get_handles() callers that are sleeping if the pool is
+ * empty
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
+ * failure
+ */
+int job_precreate_pool_fill_signal_error(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ int error_code,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ struct job_desc* jd_checker;
+ struct qlist_head* iterator;
+ struct qlist_head* scratch;
+
+ gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() called.\n");
+ /* note: this function always processes immediately (returns 1) */
+
+ gen_mutex_lock(&precreate_pool_mutex);
+ /* see if anyone is waiting on pool handles */
+ qlist_for_each_safe(iterator, scratch, &precreate_pool_get_handles_list)
+ {
+ jd_checker = qlist_entry(iterator, struct job_desc,
+ job_desc_q_link);
+
+ qlist_del(&jd_checker->job_desc_q_link);
+
+ gossip_debug(GOSSIP_FLOW_DEBUG, "job_precreate_pool_fill_signal_error() waking up a get_handles() caller.\n");
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ jd_checker->u.precreate_pool.error_code = error_code;
+ job_desc_q_add(completion_queue_array[jd_checker->context_id],
+ jd_checker);
+ /* set completed flag while holding queue lock */
+ jd_checker->completed_flag = 1;
+
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ out_status_p->error_code = 0;
+ return(1);
+}
+
+/* job_precreate_pool_fill()
+ *
+ * fills in handles for a precreate pool
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on
+ * failure
+ */
+int job_precreate_pool_fill(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ PVFS_handle* precreate_handle_array,
+ int precreate_handle_count,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ struct job_desc *jd = NULL;
+
+ gossip_debug(GOSSIP_JOB_DEBUG, "job_precreate_pool_fill() called.\n");
+
+ /* create the job desc first, even though we may not use it. This
+ * gives us somewhere to store information
+ */
+ jd = alloc_job_desc(JOB_PRECREATE_POOL);
+ if (!jd)
+ {
+ return (-errno);
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->trove_callback.fn = precreate_pool_fill_thread_mgr_callback;
+ jd->trove_callback.data = (void*)jd;
+ jd->u.precreate_pool.precreate_pool = precreate_pool;
+ jd->u.precreate_pool.precreate_handle_array = precreate_handle_array;
+ jd->u.precreate_pool.precreate_handle_count = precreate_handle_count;
+ jd->u.precreate_pool.precreate_handle_index = 0;
+ jd->u.precreate_pool.first_callback_flag = 1;
+ jd->u.precreate_pool.fsid = fsid;
+ jd->u.precreate_pool.key_array =
+ malloc(PRECREATE_POOL_MAX_KEYS*sizeof(TROVE_keyval_s));
+ if(!jd->u.precreate_pool.key_array)
+ {
+ dealloc_job_desc(jd);
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return(1);
+ }
+
+ /* reuse the logic for trove op completion to get this started */
+ precreate_pool_fill_thread_mgr_callback(jd, 0);
+
+ /* for the moment, this type of job cannot immediately complete */
+
+ *id = jd->job_id;
+ return (0);
+}
+
+/* job_precreate_pool_lookup_server()
+ *
+ * resolves a string hostname into a pool handle
+ */
+int job_precreate_pool_lookup_server(
+ const char* host,
+ PVFS_fs_id fsid,
+ PVFS_handle* pool_handle)
+{
+ struct precreate_pool* pool;
+ struct qlist_head* iterator;
+
+ gen_mutex_lock(&precreate_pool_mutex);
+
+ /* check pool list, go back to sleep if any are empty */
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ if(!strcmp(pool->host, host) && fsid == pool->fsid)
+ {
+ *pool_handle = pool->pool_handle;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return(0);
+ }
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ return(-PVFS_ENOENT);
+}
+
+int job_precreate_pool_register_server(
+ const char* host,
+ PVFS_fs_id fsid,
+ PVFS_handle pool_handle,
+ int count)
+{
+ struct precreate_pool* tmp_pool;
+
+ /* create a little struct to track the pool information for this peer
+ * server
+ */
+ tmp_pool = malloc(sizeof(*tmp_pool));
+ if(!tmp_pool)
+ {
+ return(-ENOMEM);
+ }
+
+ tmp_pool->host = strdup(host);
+ if(!tmp_pool->host)
+ {
+ free(tmp_pool);
+ return(-ENOMEM);
+ }
+
+ tmp_pool->fsid = fsid;
+ tmp_pool->pool_handle = pool_handle;
+ tmp_pool->pool_count = count;
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Pool count for handle %llu initially set to %d\n",
+ llu(tmp_pool->pool_handle),
+ tmp_pool->pool_count);
+
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Initial pool count for host %s, fsid %d: %d\n", host, (int)fsid,
+ count);
+
+ /* stash the info where we can search and find it later */
+ qlist_add(&tmp_pool->list_link, &precreate_pool_list);
+
+#if 0
+ /* here are the steps to tear down these data structures if needed */
+ struct qlist_head* iterator;
+ struct qlist_head* scratch;
+ struct precreate_pool* pool;
+
+ qlist_for_each_safe(iterator, scratch, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ free(pool->host);
+ free(pool);
+ }
+#endif
+
+ return(1);
+}
+
+/* job_precreate_pool_check_level()
+ *
+ * checks to see if the current pool level is below a specified threshold
+ *
+ * returns 1 on immediate completion, 0 if level is not low enough yet
+ */
+int job_precreate_pool_check_level(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ int low_threshold,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ struct qlist_head* iterator;
+ struct precreate_pool* pool;
+ struct job_desc *jd = NULL;
+
+ gen_mutex_lock(&precreate_pool_mutex);
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ if(pool->pool_handle == precreate_pool &&
+ pool->fsid == fsid)
+ {
+ if(pool->pool_count < low_threshold)
+ {
+ /* handle count is below the low threshold */
+ out_status_p->error_code = 0;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ gossip_debug(GOSSIP_JOB_DEBUG, "found pool count low.\n");
+ return(1);
+ }
+ else
+ {
+ /* we are above threshold right now; queue up until it drops */
+ jd = alloc_job_desc(JOB_PRECREATE_POOL);
+ if (!jd)
+ {
+ out_status_p->error_code = -PVFS_ENOMEM;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return(1);
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->u.precreate_pool.precreate_pool = precreate_pool;
+ jd->u.precreate_pool.fsid = fsid;
+ jd->u.precreate_pool.low_threshold = low_threshold;
+ *id = jd->job_id;
+
+ qlist_add(&jd->job_desc_q_link, &precreate_pool_check_level_list);
+ gen_mutex_unlock(&precreate_pool_mutex);
+ gossip_debug(GOSSIP_JOB_DEBUG, "found pool count high.\n");
+ return(0);
+ }
+ break;
+ }
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ return(-PVFS_EINVAL);
+}
+
+/* job_precreate_pool_get_handles()
+ *
+ * Retrieves a set of datafile handles from one or more precreate pools.
+ * Servers may be specified using bmi addresses in the servers array. If
+ * servers is NULL, then it will provide handles from pools in round robin
+ * manner.
+ *
+ * returns 0 on success, 1 on immediate completion, and -PVFS_errno on failure
+ */
+int job_precreate_pool_get_handles(
+ PVFS_fs_id fsid,
+ int count,
+ const char** servers,
+ PVFS_handle* handle_array,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id)
+{
+ struct job_desc *jd = NULL;
+
+ if(count < 0)
+ {
+ out_status_p->error_code = -PVFS_EINVAL;
+ return(1);
+ }
+
+ jd = alloc_job_desc(JOB_PRECREATE_POOL);
+ if (!jd)
+ {
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return(1);
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->u.precreate_pool.precreate_handle_array = handle_array;
+ jd->u.precreate_pool.precreate_handle_count = count;
+ jd->u.precreate_pool.precreate_handle_index = 0;
+ jd->u.precreate_pool.fsid = fsid;
+ jd->u.precreate_pool.servers = servers;
+ jd->u.precreate_pool.trove_pending = 0;
+ jd->u.precreate_pool.flags = flags;
+
+ precreate_pool_get_handles_try_post(jd);
+
+ /* for the moment, this type of job cannot immediately complete */
+ *id = jd->job_id;
+ return(0);
+}
+
+/* precreate_pool_get_handles_try_post()
+ *
+ * Internal function used by job_precreate_pool_get_handles(). This
+ * function will check to see if all pools are ready (at least one handle
+ * available) and then post all required trove operations
+ *
+ * no return value
+ */
+static void precreate_pool_get_handles_try_post(struct job_desc* jd)
+{
+ struct precreate_pool* pool;
+ TROVE_op_id tmp_id;
+ int ret;
+ struct precreate_pool_get_trove* tmp_trove_array;
+ struct qlist_head* iterator;
+ struct qlist_head* scratch;
+ struct job_desc* jd_checker;
+ int i;
+
+ gossip_debug(GOSSIP_JOB_DEBUG, "precreate_pool_get_handles_try_post\n");
+
+ gen_mutex_lock(&precreate_pool_mutex);
+
+ /* check pool list, go back to sleep if any are empty */
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ if(pool->pool_count < 1)
+ {
+ /* queue up until the count for this pool increases */
+ qlist_add(&jd->job_desc_q_link, &precreate_pool_get_handles_list);
+ gossip_debug(GOSSIP_JOB_DEBUG, "Found empty precreate pool %llu\n", llu(pool->pool_handle));
+
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return;
+ }
+ }
+
+ /* if we get to this point, set up necessary information for all trove
+ * operations needed to service job
+ */
+ tmp_trove_array = malloc(jd->u.precreate_pool.precreate_handle_count *
+ sizeof(struct precreate_pool_get_trove));
+ if(!tmp_trove_array)
+ {
+ gen_mutex_unlock(&precreate_pool_mutex);
+ gen_mutex_lock(&completion_mutex);
+ jd->u.precreate_pool.error_code = -PVFS_ENOMEM;
+ job_desc_q_add(completion_queue_array[jd->context_id], jd);
+ jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+
+ }
+ jd->u.precreate_pool.data = tmp_trove_array;
+
+ /* translate reqested servers and set up necessary fields to post
+ * trove operations
+ */
+ for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
+ {
+ if(jd->u.precreate_pool.servers)
+ {
+ /* caller wanted specific servers ; search through list and
+ * set current pool to appropriate entry for this server
+ */
+ jd->u.precreate_pool.current_pool = NULL; /* sentinal */
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ if(!strcmp(pool->host, jd->u.precreate_pool.servers[i]))
+ {
+ jd->u.precreate_pool.current_pool = iterator;
+ break;
+ }
+ }
+ if(!jd->u.precreate_pool.current_pool)
+ {
+ gossip_err("Error: get_handles(): unknown server: %s\n",
+ jd->u.precreate_pool.servers[i]);
+
+ free(tmp_trove_array);
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ gen_mutex_lock(&completion_mutex);
+ jd->u.precreate_pool.error_code = -PVFS_EINVAL;
+ job_desc_q_add(completion_queue_array[jd->context_id], jd);
+ jd->completed_flag = 1;
+ #ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+ #endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+ }
+ else
+ {
+ /* caller wants whatever we hand out */
+ if(jd->u.precreate_pool.current_pool == NULL ||
+ jd->u.precreate_pool.current_pool->next == &precreate_pool_list)
+ {
+ /* either we are just starting, or we have wrapped around */
+ jd->u.precreate_pool.current_pool = precreate_pool_list.next;
+ }
+ else
+ {
+ /* normal case; cycle to next pool */
+ jd->u.precreate_pool.current_pool =
+ jd->u.precreate_pool.current_pool->next;
+ }
+ }
+
+ tmp_trove_array[i].pool = qlist_entry(jd->u.precreate_pool.current_pool,
+ struct precreate_pool, list_link);
+
+ tmp_trove_array[i].jd = jd;
+ tmp_trove_array[i].pos = PVFS_ITERATE_START;
+ tmp_trove_array[i].count = 1;
+ tmp_trove_array[i].key.buffer
+ = &jd->u.precreate_pool.precreate_handle_array[i];
+ tmp_trove_array[i].key.buffer_sz = sizeof(PVFS_handle);
+ tmp_trove_array[i].trove_callback.fn
+ = precreate_pool_get_thread_mgr_callback;
+ tmp_trove_array[i].trove_callback.data
+ = &tmp_trove_array[i];
+ }
+
+ /* post all trove operations at once */
+ for(i=0; i<jd->u.precreate_pool.precreate_handle_count; i++)
+ {
+ /* go ahead and decrement count to avoid races with other consumers */
+ tmp_trove_array[i].pool->pool_count--;
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Pool count for handle %llu decremented to %d\n",
+ llu(tmp_trove_array[i].pool->pool_handle),
+ tmp_trove_array[i].pool->pool_count);
+
+ /* is anyone waiting to check the count of this pool? */
+ if(!qlist_empty(&precreate_pool_check_level_list))
+ {
+ qlist_for_each_safe(iterator, scratch,
+ &precreate_pool_check_level_list)
+ {
+ jd_checker = qlist_entry(iterator, struct job_desc,
+ job_desc_q_link);
+ if(jd_checker->u.precreate_pool.precreate_pool ==
+ tmp_trove_array[i].pool->pool_handle &&
+ tmp_trove_array[i].pool->pool_count <
+ jd_checker->u.precreate_pool.low_threshold)
+ {
+ /* the pool level is low */
+ gossip_debug(GOSSIP_JOB_DEBUG, "Pool count low, waking up waiter for handle %llu.\n", llu(jd_checker->u.precreate_pool.precreate_pool));
+ qlist_del(&jd_checker->job_desc_q_link);
+
+ /* move waiting job to completion queue */
+ gen_mutex_lock(&completion_mutex);
+ job_desc_q_add(completion_queue_array[jd->context_id], jd_checker);
+ jd->completed_flag = 1;
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ }
+ }
+ }
+
+ /* post trove operation to pull out a handle */
+ ret = trove_keyval_iterate_keys(
+ tmp_trove_array[i].pool->fsid,
+ tmp_trove_array[i].pool->pool_handle,
+ &tmp_trove_array[i].pos,
+ &tmp_trove_array[i].key,
+ &tmp_trove_array[i].count,
+ tmp_trove_array[i].jd->u.precreate_pool.flags|
+ TROVE_BINARY_KEY|
+ TROVE_KEYVAL_HANDLE_COUNT|
+ TROVE_KEYVAL_ITERATE_REMOVE,
+ NULL,
+ &tmp_trove_array[i].trove_callback,
+ global_trove_context,
+ &tmp_id);
+ if(ret < 0)
+ {
+ precreate_pool_get_thread_mgr_callback_unlocked(
+ &tmp_trove_array[i], ret);
+ }
+ else if(ret == 1)
+ {
+ precreate_pool_get_thread_mgr_callback_unlocked(
+ &tmp_trove_array[i], 0);
+ }
+ else
+ {
+ /* callback will be triggered later */
+ trove_pending_count++;
+ jd->u.precreate_pool.trove_pending++;
+ }
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+}
+
+/* job_precreate_pool_iterate_handles()
+ *
+ * similar to the trove iterate handles function, but returns all handles
+ * stored in the precreate pools, including the handles for the pool objects
+ * themselves.
+ */
+int job_precreate_pool_iterate_handles(
+ PVFS_fs_id fsid,
+ PVFS_ds_position position,
+ PVFS_handle* handle_array,
+ int count,
+ PVFS_ds_flags flags,
+ PVFS_vtag* vtag,
+ void* user_ptr,
+ job_aint status_user_tag,
+ job_status_s* out_status_p,
+ job_id_t* id,
+ job_context_id context_id)
+{
+ PVFS_ds_position local_position;
+ PVFS_ds_position pool_index;
+ struct qlist_head* iterator;
+ PVFS_ds_position tmp_index = 1;
+ struct precreate_pool* pool = NULL;
+ int ret;
+ struct job_desc *jd = NULL;
+ void* user_ptr_internal;
+ TROVE_op_id tmp_id;
+ int i;
+
+ /* low order bits are the trove iterate position */
+ local_position = position & 0xffffffff;
+ /* high order bits tell us which pool we are on */
+ pool_index = position >> 32;
+
+ /* we start indexing at one and reserve 0 for the special start and end
+ * values for the entire set of pools
+ */
+ if(pool_index == 0)
+ {
+ if(local_position == PVFS_ITERATE_START)
+ {
+ pool_index = 1;
+ }
+ else
+ {
+ gossip_err("Error: invalid position given to job_precreate_pool_iterate_handles().\n");
+ out_status_p->error_code = -PVFS_EINVAL;
+ return(1);
+ }
+ }
+
+ gen_mutex_lock(&precreate_pool_mutex);
+
+ qlist_for_each(iterator, &precreate_pool_list)
+ {
+ if(tmp_index == pool_index)
+ {
+ pool = qlist_entry(iterator, struct precreate_pool,
+ list_link);
+ break;
+ }
+ tmp_index++;
+ }
+
+ if(!pool)
+ {
+ /* we ran out of pools; iteration is done */
+ gen_mutex_unlock(&precreate_pool_mutex);
+ out_status_p->error_code = 0;
+ out_status_p->count = 0;
+ out_status_p->position = PVFS_ITERATE_END;
+ return(1);
+ }
+
+ if(local_position == PVFS_ITERATE_END)
+ {
+ /* we got all of the handles out of the pool */
+ /* pass back pool handle by itself and go to next pool */
+ handle_array[0] = pool->pool_handle;
+ /* skip to next pool */
+ pool_index++;
+ out_status_p->position = pool_index << 32;
+ out_status_p->position |= PVFS_ITERATE_START;
+ out_status_p->count = 1;
+ out_status_p->error_code = 0;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return(1);
+ }
+
+ /* get ready to post a job to trove to find handles */
+ jd = alloc_job_desc(JOB_PRECREATE_POOL);
+ if (!jd)
+ {
+ gen_mutex_unlock(&precreate_pool_mutex);
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+ jd->u.precreate_pool.key_array = malloc(count * sizeof(*jd->u.precreate_pool.key_array));
+ if(!jd->u.precreate_pool.key_array)
+ {
+ gen_mutex_unlock(&precreate_pool_mutex);
+ dealloc_job_desc(jd);
+ out_status_p->error_code = -PVFS_ENOMEM;
+ return 1;
+ }
+ for(i=0; i<count; i++)
+ {
+ jd->u.precreate_pool.key_array[i].buffer = &handle_array[i];
+ jd->u.precreate_pool.key_array[i].buffer_sz = sizeof(handle_array[i]);
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->u.precreate_pool.position = local_position;
+ jd->u.precreate_pool.count = count;
+ jd->u.precreate_pool.precreate_handle_array = handle_array;
+ jd->u.precreate_pool.pool_index = pool_index;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->trove_callback.fn = precreate_pool_iterate_callback;
+ jd->trove_callback.data = (void*)jd;
+ user_ptr_internal = &jd->trove_callback;
+ JOB_EVENT_START(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, jd->job_id);
+
+#ifdef __PVFS2_TROVE_SUPPORT__
+ ret = trove_keyval_iterate_keys(fsid, pool->pool_handle,
+ &(jd->u.precreate_pool.position),
+ jd->u.precreate_pool.key_array,
+ &(jd->u.precreate_pool.count), flags, NULL,
+ user_ptr_internal,
+ global_trove_context, &tmp_id);
+#else
+ gossip_err("Error: Trove support not enabled.\n");
+ ret = -ENOSYS;
+#endif
+
+ if (ret < 0)
+ {
+ /* error posting trove operation */
+ JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, 0, jd->job_id);
+ free(jd->u.precreate_pool.key_array);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ out_status_p->error_code = ret;
+ out_status_p->status_user_tag = status_user_tag;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return (1);
+ }
+
+ if (ret == 1)
+ {
+ /* immediate completion */
+ out_status_p->error_code = 0;
+ out_status_p->status_user_tag = status_user_tag;
+ out_status_p->position = pool_index << 32;
+ out_status_p->position |= jd->u.precreate_pool.position;
+ out_status_p->count = jd->u.precreate_pool.count;
+ JOB_EVENT_END(PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS, 0, jd->job_id);
+ free(jd->u.precreate_pool.key_array);
+ dealloc_job_desc(jd);
+ jd = NULL;
+ gen_mutex_unlock(&precreate_pool_mutex);
+ return (ret);
+ }
+
+ /* if we fall through to this point, the job did not
+ * immediately complete and we must queue up to test later
+ */
+ *id = jd->job_id;
+ trove_pending_count++;
+ jd->event_type = PVFS_EVENT_TROVE_KEYVAL_ITERATE_KEYS;
+ gen_mutex_unlock(&precreate_pool_mutex);
+
+ return (0);
+}
+
+
+
+#endif /* __PVFS2_TROVE_SUPPORT__ */
/*
* Local variables:
Index: job.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.h,v
diff -p -u -r1.56 -r1.57
--- job.h 4 Apr 2008 17:57:37 -0000 1.56
+++ job.h 8 Sep 2008 15:42:43 -0000 1.57
@@ -434,6 +434,21 @@ int job_trove_keyval_remove(PVFS_fs_id c
job_id_t * id,
job_context_id context_id);
+/* remove a list of key/value entries */
+int job_trove_keyval_remove_list(PVFS_fs_id coll_id,
+ PVFS_handle handle,
+ PVFS_ds_keyval * key_a,
+ PVFS_ds_keyval * val_a,
+ int * error_a,
+ int count,
+ PVFS_ds_flags flags,
+ PVFS_vtag * vtag,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
/* check consistency of a key/value pair for a given vtag */
int job_trove_keyval_validate(PVFS_fs_id coll_id,
PVFS_handle handle,
@@ -498,6 +513,20 @@ int job_trove_dspace_create(PVFS_fs_id c
job_id_t * id,
job_context_id context_id);
+/* create a set of new data space objects */
+int job_trove_dspace_create_list(PVFS_fs_id coll_id,
+ PVFS_handle_extent_array *handle_extent_array,
+ PVFS_handle* out_handle_arry,
+ int count,
+ PVFS_ds_type type,
+ void *hint,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
/* remove an entire data space object (byte stream and key/value) */
int job_trove_dspace_remove(PVFS_fs_id coll_id,
PVFS_handle handle,
@@ -508,6 +537,18 @@ int job_trove_dspace_remove(PVFS_fs_id c
job_id_t * id,
job_context_id context_id);
+/* remove a list of data space objects (byte stream and key/value) */
+int job_trove_dspace_remove_list(PVFS_fs_id coll_id,
+ PVFS_handle* handle_array,
+ PVFS_error *out_error_array,
+ int count,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
/* verify that a given dataspace exists and discover its type */
int job_trove_dspace_verify(PVFS_fs_id coll_id,
PVFS_handle handle,
@@ -577,6 +618,73 @@ int job_null(
job_id_t * id,
job_context_id context_id);
+int job_precreate_pool_fill(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ PVFS_handle* precreate_handle_array,
+ int precreate_handle_count,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
+int job_precreate_pool_fill_signal_error(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ int error_code,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
+int job_precreate_pool_check_level(
+ PVFS_handle precreate_pool,
+ PVFS_fs_id fsid,
+ int low_threshold,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
+int job_precreate_pool_iterate_handles(
+ PVFS_fs_id fsid,
+ PVFS_ds_position position,
+ PVFS_handle* handle_array,
+ int count,
+ PVFS_ds_flags flags,
+ PVFS_vtag* vtag,
+ void* user_ptr,
+ job_aint status_user_tag,
+ job_status_s* out_status_p,
+ job_id_t* id,
+ job_context_id context_id);
+
+int job_precreate_pool_get_handles(
+ PVFS_fs_id fsid,
+ int count,
+ const char** servers,
+ PVFS_handle* handle_array,
+ PVFS_ds_flags flags,
+ void *user_ptr,
+ job_aint status_user_tag,
+ job_status_s * out_status_p,
+ job_id_t * id,
+ job_context_id context_id);
+
+int job_precreate_pool_register_server(
+ const char* host,
+ PVFS_fs_id fsid,
+ PVFS_handle pool_handle,
+ int count);
+
+int job_precreate_pool_lookup_server(
+ const char* host,
+ PVFS_fs_id fsid,
+ PVFS_handle* pool_handle);
+
/******************************************************************
* job test/wait for completion functions
*/
More information about the Pvfs2-cvs
mailing list