[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/job:
job-desc-queue.h job.c
CVS commit program
cvs at parl.clemson.edu
Thu Feb 7 11:56:17 EST 2008
Update of /projects/cvsroot/pvfs2-1/src/io/job
In directory parlweb1:/tmp/cvs-serv32105/src/io/job
Modified Files:
Tag: small-file-branch
job-desc-queue.h job.c
Log Message:
preliminary ability to hand out precreated handles. Some functionality
still missing:
- no signaling in place yet to wake up refiller
- no ability to place handles yet by server id
- some servers are chewing a lot of CPU on first launch (if more than 2
servers)
Index: job-desc-queue.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job-desc-queue.h,v
diff -p -u -r1.26.66.2 -r1.26.66.3
--- job-desc-queue.h 5 Feb 2008 21:18:27 -0000 1.26.66.2
+++ job-desc-queue.h 7 Feb 2008 16:56:17 -0000 1.26.66.3
@@ -56,9 +56,12 @@ struct precreate_pool_desc
int precreate_handle_count;
int precreate_handle_index;
int posted_count;
+ const char** servers;
+ struct qlist_head* current_pool;
+ int trove_pending;
+ /* TODO: does this make the job descriptor too big? */
TROVE_keyval_s key_array[PRECREATE_POOL_MAX_KEYS];
- TROVE_keyval_s val_array[PRECREATE_POOL_MAX_KEYS];
TROVE_op_id id;
Index: job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/job/job.c,v
diff -p -u -r1.177.2.6 -r1.177.2.7
--- job.c 6 Feb 2008 17:16:39 -0000 1.177.2.6
+++ job.c 7 Feb 2008 16:56:17 -0000 1.177.2.7
@@ -25,6 +25,7 @@
#include "id-generator.h"
#include "pint-event.h"
#include "job-time-mgr.h"
+#include "pvfs2-internal.h"
#define JOB_EVENT_START(__op, __id) \
PINT_event_timestamp(PVFS_EVENT_API_JOB, __op, 0, __id, \
@@ -80,6 +81,15 @@ struct precreate_pool
PVFS_handle pool_handle;
uint32_t pool_count;
};
+struct precreate_pool_get_trove
+{
+ struct job_desc* jd; /* parent job descriptor */
+ /* variables needed per keyval_iterate_keys() call */
+ PVFS_ds_position pos;
+ PVFS_ds_keyval key;
+ int count;
+ struct PINT_thread_mgr_trove_callback trove_callback;
+};
#endif /* __PVFS2_TROVE_SUPPORT__ */
/********************************************************
@@ -115,6 +125,15 @@ static void flow_callback(flow_descripto
static gen_mutex_t work_cycle_mutex = GEN_MUTEX_INITIALIZER;
static void do_one_work_cycle_all(int idle_time_ms);
#endif
+#ifdef __PVFS2_TROVE_SUPPORT__
+static void precreate_pool_get_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_fill_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code);
+static void precreate_pool_get_post_next(struct job_desc* jd);
+#endif
/********************************************************
* public interface
@@ -4291,14 +4310,72 @@ static void teardown_queues(void)
#ifdef __PVFS2_TROVE_SUPPORT__
-/* precreate_pool_thread_mgr_callback()
+/* precreate_pool_get_thread_mgr_callback()
*
- * callback function executed by the thread manager for precreate pool
+ * callback function executed by the thread manager for precreate pool get
* when a trove operation completes
*
* no return value
*/
-static void precreate_pool_thread_mgr_callback(
+static void precreate_pool_get_thread_mgr_callback(
+ void* data,
+ PVFS_error error_code)
+{
+ struct precreate_pool_get_trove* tmp_trove = data;
+
+ gen_mutex_lock(&initialized_mutex);
+ if(initialized == 0)
+ {
+ /* The job interface has been shutdown. Silently ignore callback. */
+ gen_mutex_unlock(&initialized_mutex);
+ return;
+ }
+ gen_mutex_unlock(&initialized_mutex);
+
+ /* TODO: handle this */
+ assert(error_code == 0);
+
+ trove_pending_count--;
+
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "Got precreated handle: %llu\n",
+ llu(*((PVFS_handle*)tmp_trove->key.buffer)));
+
+ /* TODO: lock this trove_pending variable? */
+ tmp_trove->jd->u.precreate_pool.trove_pending--;
+ if(tmp_trove->jd->u.precreate_pool.trove_pending == 0)
+ {
+ /* get_handles is complete */
+ gen_mutex_lock(&completion_mutex);
+
+ /* set job descriptor fields and put into completion queue */
+ tmp_trove->jd->u.precreate_pool.error_code = 0;
+ job_desc_q_add(completion_queue_array[tmp_trove->jd->context_id],
+ tmp_trove->jd);
+ /* set completed flag while holding queue lock */
+ tmp_trove->jd->completed_flag = 1;
+
+ free(tmp_trove);
+#ifdef __PVFS2_JOB_THREADED__
+ /* wake up anyone waiting for completion */
+ pthread_cond_signal(&completion_cond);
+#endif
+ gen_mutex_unlock(&completion_mutex);
+ return;
+ }
+
+ free(tmp_trove);
+ return;
+}
+
+/* precreate_pool_fill_thread_mgr_callback()
+ *
+ * callback function executed by the thread manager for precreate pool fill
+ * when a trove operation completes
+ *
+ * no return value
+ */
+static void precreate_pool_fill_thread_mgr_callback(
void* data,
PVFS_error error_code)
{
@@ -4334,6 +4411,9 @@ static void precreate_pool_thread_mgr_ca
trove_pending_count--;
/* increment in-memory count for this pool */
+ /* TODO: could track this now in current_pool variable to avoid
+ * searching again
+ */
gen_mutex_lock(&precreate_pool_mutex);
qlist_for_each(iterator, &precreate_pool_list)
{
@@ -4390,7 +4470,7 @@ static void precreate_pool_thread_mgr_ca
ret = trove_keyval_write_list(jd->u.precreate_pool.fsid,
jd->u.precreate_pool.precreate_pool,
jd->u.precreate_pool.key_array,
- jd->u.precreate_pool.val_array,
+ NULL,
count,
TROVE_BINARY_KEY|TROVE_SYNC|
TROVE_NOOVERWRITE|TROVE_KEYVAL_HANDLE_COUNT,
@@ -4410,7 +4490,7 @@ static void precreate_pool_thread_mgr_ca
* completion
*/
jd->u.precreate_pool.id = 1;
- precreate_pool_thread_mgr_callback(jd, 0);
+ precreate_pool_fill_thread_mgr_callback(jd, 0);
}
return;
@@ -4615,6 +4695,10 @@ static void fill_status(struct job_desc
assert(jd);
assert(status);
+ gossip_debug(GOSSIP_JOB_DEBUG,
+ "job fill_status() for id: %llu, type: %d\n",
+ llu(jd->job_id), jd->type);
+
status->status_user_tag = jd->status_user_tag;
if (returned_user_ptr_p)
@@ -4995,7 +5079,7 @@ int job_precreate_pool_fill(
jd->job_user_ptr = user_ptr;
jd->context_id = context_id;
jd->status_user_tag = status_user_tag;
- jd->trove_callback.fn = precreate_pool_thread_mgr_callback;
+ jd->trove_callback.fn = precreate_pool_fill_thread_mgr_callback;
jd->trove_callback.data = (void*)jd;
jd->u.precreate_pool.precreate_pool = precreate_pool;
jd->u.precreate_pool.precreate_handle_array = precreate_handle_array;
@@ -5005,7 +5089,7 @@ int job_precreate_pool_fill(
jd->u.precreate_pool.fsid = fsid;
/* reuse the logic for trove op completion to get this started */
- precreate_pool_thread_mgr_callback(jd, 0);
+ precreate_pool_fill_thread_mgr_callback(jd, 0);
/* for the moment, this type of job cannot immediately complete */
@@ -5041,7 +5125,7 @@ int job_precreate_pool_register_server(
tmp_pool->pool_handle = pool_handle;
tmp_pool->pool_count = count;
- gossip_debug(GOSSIP_SERVER_DEBUG,
+ gossip_debug(GOSSIP_JOB_DEBUG,
"Initial pool count for host %s, fsid %d: %d\n", host, (int)fsid,
count);
@@ -5092,14 +5176,14 @@ int job_precreate_pool_check_level(
/* handle count is below the threshold */
out_status_p->error_code = 0;
gen_mutex_unlock(&precreate_pool_mutex);
- gossip_debug(GOSSIP_SERVER_DEBUG, "found pool count low.\n");
+ gossip_debug(GOSSIP_JOB_DEBUG, "found pool count low.\n");
return(1);
}
else
{
/* TODO: finish this part; for now we just launch into space */
gen_mutex_unlock(&precreate_pool_mutex);
- gossip_debug(GOSSIP_SERVER_DEBUG, "found pool count high.\n");
+ gossip_debug(GOSSIP_JOB_DEBUG, "found pool count high.\n");
return(0);
}
break;
@@ -5122,14 +5206,123 @@ int job_precreate_pool_get_handles(
job_id_t * id,
job_context_id context_id)
{
- /* TODO: implement */
+ struct job_desc *jd = NULL;
- /* TODO: log the handle array on the way out */
+ /* create the job desc first, even though we may not use it. This
+ * gives us somewhere to store information
+ */
+ jd = alloc_job_desc(JOB_PRECREATE_POOL);
+ if (!jd)
+ {
+ return (-errno);
+ }
+ jd->job_user_ptr = user_ptr;
+ jd->context_id = context_id;
+ jd->status_user_tag = status_user_tag;
+ jd->u.precreate_pool.precreate_handle_array = handle_array;
+ jd->u.precreate_pool.precreate_handle_count = count;
+ jd->u.precreate_pool.precreate_handle_index = 0;
+ jd->u.precreate_pool.id = PVFS_OP_NULL;
+ jd->u.precreate_pool.fsid = fsid;
+ jd->u.precreate_pool.servers = servers;
+ jd->u.precreate_pool.trove_pending = count;
+
+ precreate_pool_get_post_next(jd);
- out_status_p->error_code = -PVFS_ENOSYS;
- return(1);
+ /* for the moment, this type of job cannot immediately complete */
+ *id = jd->job_id;
+ return(0);
}
+/* TODO: comment properly */
+static void precreate_pool_get_post_next(struct job_desc* jd)
+{
+ struct precreate_pool* pool;
+ TROVE_op_id tmp_id;
+ int ret;
+ struct precreate_pool_get_trove* tmp_trove;
+
+ /* we better still have handles to retrieve */
+ assert(jd->u.precreate_pool.precreate_handle_index <
+ jd->u.precreate_pool.precreate_handle_count);
+
+ gen_mutex_lock(&precreate_pool_mutex);
+ for(;
+ jd->u.precreate_pool.precreate_handle_index <
+ jd->u.precreate_pool.precreate_handle_count;
+ jd->u.precreate_pool.precreate_handle_index++)
+ {
+ if(jd->u.precreate_pool.servers)
+ {
+ /* caller wanted specific servers */
+ /* TODO: implement this */
+ assert(0);
+ }
+ else
+ {
+ /* caller wants whatever we hand out */
+ if(jd->u.precreate_pool.current_pool == NULL ||
+ jd->u.precreate_pool.current_pool->next == &precreate_pool_list)
+ {
+ /* either we are just starting, or we have wrapped around */
+ jd->u.precreate_pool.current_pool = precreate_pool_list.next;
+ }
+ else
+ {
+ /* normal case; cycle to next pool */
+ jd->u.precreate_pool.current_pool =
+ jd->u.precreate_pool.current_pool->next;
+ }
+ }
+
+ pool = qlist_entry(jd->u.precreate_pool.current_pool,
+ struct precreate_pool, list_link);
+
+ if(pool->pool_count < 1)
+ {
+ /* TODO: implement this */
+ /* need to rewind current_pool and queue to try again after
+ * refill
+ */
+ assert(0);
+ }
+ /* go ahead and decrement count to avoid races with other consumers */
+ /* TODO: remember to bump this up if trove op fails */
+ pool->pool_count--;
+
+ /* somewhere to stash trove variables per operation */
+ tmp_trove = malloc(sizeof(*tmp_trove));
+ if(!tmp_trove)
+ {
+ /* TODO: handle this */
+ assert(0);
+ }
+ tmp_trove->jd = jd;
+ tmp_trove->pos = PVFS_ITERATE_START;
+ tmp_trove->count = 1;
+ tmp_trove->key.buffer = &jd->u.precreate_pool.precreate_handle_array[jd->u.precreate_pool.precreate_handle_index];
+ tmp_trove->key.buffer_sz = sizeof(PVFS_handle);
+ tmp_trove->trove_callback.fn = precreate_pool_get_thread_mgr_callback;
+ tmp_trove->trove_callback.data = tmp_trove;
+
+ /* post trove operation to pull out a handle */
+ ret = trove_keyval_iterate_keys(
+ pool->fsid,
+ pool->pool_handle,
+ &tmp_trove->pos,
+ &tmp_trove->key,
+ &tmp_trove->count,
+ TROVE_BINARY_KEY|TROVE_SYNC|
+ TROVE_KEYVAL_HANDLE_COUNT|
+ TROVE_KEYVAL_ITERATE_REMOVE,
+ NULL,
+ &tmp_trove->trove_callback,
+ global_trove_context,
+ &tmp_id);
+ trove_pending_count++;
+ }
+ gen_mutex_unlock(&precreate_pool_mutex);
+}
#endif /* __PVFS2_TROVE_SUPPORT__ */
More information about the Pvfs2-cvs
mailing list