[PVFS2-CVS] commit by neill in pvfs2/src/io/trove/trove-dbpf: dbpf-bstream.c dbpf-dspace.c dbpf-keyval-db-cache.c dbpf-thread.c dbpf-thread.h dbpf.h
CVS commit program
pvfs2-internal@beowulf-underground.org
Mon, 2 Feb 2004 17:50:26 -0500
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb.parl.clemson.edu:/tmp/cvs-serv21560/src/io/trove/trove-dbpf
Modified Files:
dbpf-bstream.c dbpf-dspace.c dbpf-keyval-db-cache.c
dbpf-thread.c dbpf-thread.h dbpf.h
Log Message:
- first cut at supporting threaded-aio trove cancellation
- we now better handle errors by reporting them if they occur in any of
the requests issued
- added a simple test-trove-cancel program as a sanity check
- misc cleanups
Index: dbpf-bstream.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c,v
diff -p -u -r1.29 -r1.30
--- dbpf-bstream.c 30 Jan 2004 20:12:13 -0000 1.29
+++ dbpf-bstream.c 2 Feb 2004 22:50:25 -0000 1.30
@@ -68,7 +68,7 @@ static void aio_progress_notification(si
{
dbpf_queued_op_t *cur_op = NULL;
struct dbpf_op *op_p = NULL;
- int ret, i, aiocb_inuse_count, state = 0;
+ int ret, i, aiocb_inuse_count, state = 0, error_code = 0;
struct aiocb *aiocb_p = NULL, *aiocb_ptr_array[AIOCB_ARRAY_SZ] = {0};
gen_mutex_t *context_mutex = NULL;
@@ -96,7 +96,6 @@ static void aio_progress_notification(si
error/return value of the op based on individual request
error/return values. they're ignored for now, however.
*/
-#if 0
for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
{
if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
@@ -110,26 +109,40 @@ static void aio_progress_notification(si
{
/* aio_return gets the return value of the individual op */
ret = aio_return(&aiocb_p[i]);
+#if 0
gossip_debug(GOSSIP_TROVE_DEBUG,
" aio_return() says %d\n", ret);
-
- /* WHAT DO WE DO WITH PARTIAL READ/WRITES??? */
+#endif
/* mark as a NOP so we ignore it from now on */
aiocb_p[i].aio_lio_opcode = LIO_NOP;
}
+ else
+ {
+ gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
+ "aio_error/aio_return on block %d; "
+ "skipping\n", ret, strerror(ret), i);
- /* we shouldn't get called until all ops completed */
- assert(ret != EINPROGRESS);
+ error_code = ret;
+ goto final_threaded_aio_cleanup;
+ }
}
-#endif
if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
{
+ error_code = 0;
+
+ final_threaded_aio_cleanup:
gossip_debug(GOSSIP_TROVE_DEBUG, " aio_progress_notification: "
"op completed\n");
- /* TODO: HOW DO WE DO A SYNC IN HERE? WE DON'T HAVE THE FD */
+ if (op_p->flags & TROVE_SYNC)
+ {
+ if ((ret = DBPF_SYNC(op_p->u.b_rw_list.fd)) != 0)
+ {
+ error_code = -trove_errno_to_trove_error(ret);
+ }
+ }
dbpf_bstream_fdcache_put(op_p->coll_p->coll_id, op_p->handle);
/*
@@ -141,8 +154,9 @@ static void aio_progress_notification(si
op_p->u.b_rw_list.aiocb_array = NULL;
/* this is a macro defined in dbpf-thread.h */
- ret = 1;
- move_op_to_completion_queue(cur_op);
+ move_op_to_completion_queue(
+ cur_op, error_code,
+ ((error_code == ECANCELED) ? OP_CANCELED : OP_COMPLETED));
return;
}
else
@@ -988,7 +1002,9 @@ static int dbpf_bstream_rw_list_op_svc(s
gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
"aio_error/aio_return on block %d; "
"skipping\n", ret, strerror(ret), i);
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
+
+ ret = -trove_errno_to_trove_error(ret);
+ goto final_aio_cleanup;
}
else
{
@@ -999,7 +1015,7 @@ static int dbpf_bstream_rw_list_op_svc(s
}
/* if we're not done with the last set of operations, break out */
- if (op_in_progress_count > 0)
+ if (op_in_progress_count > 0)
{
return 0;
}
@@ -1010,10 +1026,13 @@ static int dbpf_bstream_rw_list_op_svc(s
done. free the aiocb array, release the FD, and mark the
whole op as complete
*/
+ ret = 1;
+
+ final_aio_cleanup:
free(aiocb_p);
dbpf_bstream_fdcache_put(op_p->coll_p->coll_id, op_p->handle);
- return 1;
+ return ret;
}
else
{
Index: dbpf-dspace.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-dspace.c,v
diff -p -u -r1.78 -r1.79
--- dbpf-dspace.c 30 Jan 2004 20:12:13 -0000 1.78
+++ dbpf-dspace.c 2 Feb 2004 22:50:25 -0000 1.79
@@ -48,10 +48,6 @@ static int dbpf_dspace_verify_op_svc(str
static int dbpf_dspace_setattr_op_svc(struct dbpf_op *op_p);
static int dbpf_dspace_getattr_op_svc(struct dbpf_op *op_p);
-/* dbpf_dspace_create()
- *
- * TODO: should this have a ds_attributes with it?
- */
static int dbpf_dspace_create(TROVE_coll_id coll_id,
TROVE_handle_extent_array *extent_array,
TROVE_handle *handle_p,
@@ -257,8 +253,6 @@ return_error:
return -TROVE_EINVAL;
}
-/* dbpf_dspace_remove()
- */
static int dbpf_dspace_remove(TROVE_coll_id coll_id,
TROVE_handle handle,
TROVE_ds_flags flags,
@@ -292,8 +286,6 @@ static int dbpf_dspace_remove(TROVE_coll
return 0;
}
-/* dbpf_dspace_remove_op_svc()
- */
static int dbpf_dspace_remove_op_svc(struct dbpf_op *op_p)
{
int error, ret, got_db = 0;
@@ -427,8 +419,6 @@ int dbpf_dspace_iterate_handles(TROVE_co
return 0;
}
-/* dbpf_dspace_iterate_handles_op_svc()
- */
static int dbpf_dspace_iterate_handles_op_svc(struct dbpf_op *op_p)
{
int ret = 0, i = 0, got_db = 0;
@@ -703,9 +693,6 @@ return_error:
return error;
}
-
-/* dbpf_dspace_getattr()
- */
static int dbpf_dspace_getattr(TROVE_coll_id coll_id,
TROVE_handle handle,
TROVE_ds_attributes_s *ds_attr_p,
@@ -759,8 +746,6 @@ static int dbpf_dspace_getattr(TROVE_col
return 0;
}
-/* dbpf_dspace_setattr()
- */
static int dbpf_dspace_setattr(TROVE_coll_id coll_id,
TROVE_handle handle,
TROVE_ds_attributes_s *ds_attr_p,
@@ -1048,6 +1033,103 @@ return_error:
return ret;
}
+/*
+ FIXME: it's possible to have a non-threaded version of this, but
+ it's not implemented right now
+*/
+static int dbpf_dspace_cancel(
+ TROVE_coll_id coll_id,
+ TROVE_op_id id,
+ TROVE_context_id context_id)
+{
+ int ret = -TROVE_ENOSYS;
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_dspace_cancel called\n");
+
+#ifdef __PVFS2_TROVE_THREADED__
+ int state = 0;
+ gen_mutex_t *context_mutex = NULL;
+ dbpf_queued_op_t *cur_op = NULL;
+
+ assert(dbpf_completion_queue_array[context_id]);
+ context_mutex = dbpf_completion_queue_array_mutex[context_id];
+ assert(context_mutex);
+ cur_op = id_gen_fast_lookup(id);
+ if (cur_op == NULL)
+ {
+ gossip_err("Invalid operation to test against\n");
+ return -TROVE_EINVAL;
+ }
+
+ /* check the state of the current op to see if it's completed */
+ gen_mutex_lock(&cur_op->mutex);
+ state = cur_op->op.state;
+ gen_mutex_unlock(&cur_op->mutex);
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "got cur_op %p\n", cur_op);
+
+ switch(state)
+ {
+ case OP_QUEUED:
+ {
+ gossip_debug(GOSSIP_TROVE_DEBUG, "op is queued: handling\n");
+
+ /* try to quietly dequeue the op ; fail cancel otherwise */
+ dbpf_queued_op_put_and_dequeue(cur_op);
+ assert(cur_op->op.state == OP_DEQUEUED);
+
+ /* this is a macro defined in dbpf-thread.h */
+ move_op_to_completion_queue(cur_op, 0, OP_CANCELED);
+ ret = 0;
+ }
+ break;
+ case OP_IN_SERVICE:
+ {
+ /*
+ for bstream i/o op, try an aio_cancel. for other ops,
+ there's not much we can do other than let the op
+ complete normally
+ */
+ if ((cur_op->op.type == BSTREAM_READ_LIST) ||
+ (cur_op->op.type == BSTREAM_WRITE_LIST))
+ {
+ ret = aio_cancel(cur_op->op.u.b_rw_list.fd,
+ cur_op->op.u.b_rw_list.aiocb_array);
+ gossip_debug(
+ GOSSIP_TROVE_DEBUG, "aio_cancel returned %s\n",
+ ((ret == AIO_CANCELED) ? "CANCELED" :
+ "NOT CANCELED"));
+
+ /*
+ NOTE: the normal aio notification method takes care
+ of completing the op and moving it to the completion
+ queue
+ */
+ }
+ else
+ {
+ gossip_debug(
+ GOSSIP_TROVE_DEBUG, "op is in service: ignoring "
+ "operation type %d\n", cur_op->op.type);
+ }
+ ret = 0;
+ }
+ break;
+ case OP_COMPLETED:
+ /* easy cancelation case; do nothing */
+ gossip_debug(
+ GOSSIP_TROVE_DEBUG, "op is completed: ignoring\n");
+ ret = 0;
+ break;
+ default:
+ gossip_err("Invalid dbpf_op state found (%d)\n", state);
+ assert(0);
+ }
+#endif
+ return ret;
+}
+
+
/* dbpf_dspace_test()
*
* Returns 0 if not completed, 1 if completed (successfully or with error).
@@ -1059,14 +1141,15 @@ return_error:
*
* Removes completed operations from the queue.
*/
-static int dbpf_dspace_test(TROVE_coll_id coll_id,
- TROVE_op_id id,
- TROVE_context_id context_id,
- int *out_count_p,
- TROVE_vtag_s *vtag,
- void **returned_user_ptr_p,
- TROVE_ds_state *state_p,
- int max_idle_time_ms)
+static int dbpf_dspace_test(
+ TROVE_coll_id coll_id,
+ TROVE_op_id id,
+ TROVE_context_id context_id,
+ int *out_count_p,
+ TROVE_vtag_s *vtag,
+ void **returned_user_ptr_p,
+ TROVE_ds_state *state_p,
+ int max_idle_time_ms)
{
int ret = -1;
dbpf_queued_op_t *cur_op = NULL;
@@ -1379,10 +1462,11 @@ int dbpf_dspace_testcontext(
/* dbpf_dspace_testsome()
*
- * Returns 0 if nothing completed, 1 if something is completed (successfully
- * or with error).
+ * Returns 0 if nothing completed, 1 if something is completed
+ * (successfully or with error).
*
- * The error state of the completed operation is returned via the state_p.
+ * The error state of the completed operation is returned via the
+ * state_p.
*/
static int dbpf_dspace_testsome(
TROVE_coll_id coll_id,
@@ -1533,6 +1617,7 @@ struct TROVE_dspace_ops dbpf_dspace_ops
dbpf_dspace_verify,
dbpf_dspace_getattr,
dbpf_dspace_setattr,
+ dbpf_dspace_cancel,
dbpf_dspace_test,
dbpf_dspace_testsome,
dbpf_dspace_testcontext
Index: dbpf-keyval-db-cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval-db-cache.c,v
diff -p -u -r1.27 -r1.28
--- dbpf-keyval-db-cache.c 30 Jan 2004 20:12:13 -0000 1.27
+++ dbpf-keyval-db-cache.c 2 Feb 2004 22:50:25 -0000 1.28
@@ -40,8 +40,6 @@ struct keyval_dbcache_entry
static struct keyval_dbcache_entry keyval_db_cache[DBCACHE_ENTRIES];
-/* dbpf_keyval_dbcache_initialize()
- */
void dbpf_keyval_dbcache_initialize(void)
{
int i;
@@ -54,8 +52,6 @@ void dbpf_keyval_dbcache_initialize(void
}
}
-/* dbpf_keyval_dbcache_finalize()
- */
void dbpf_keyval_dbcache_finalize(void)
{
int i, ret;
@@ -309,8 +305,6 @@ return_error:
return error;
}
-/* dbpf_keyval_dbcache_put()
- */
void dbpf_keyval_dbcache_put(TROVE_coll_id coll_id, TROVE_handle handle)
{
int i;
Index: dbpf-thread.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.c,v
diff -p -u -r1.18 -r1.19
--- dbpf-thread.c 30 Jan 2004 20:12:13 -0000 1.18
+++ dbpf-thread.c 2 Feb 2004 22:50:25 -0000 1.19
@@ -103,14 +103,14 @@ void *dbpf_thread_function(void *ptr)
}
/*
- if we have no work to do, wait nicely until an
- operation to be serviced has entered the system.
+ if we have no work to do, wait nicely until an operation to
+ be serviced has entered the system.
- if the queue isn't empty, and the out_count is 0,
- that means that we're driving i/o operations without
- using the aio callback completion. we sleep between
- those calls to avoid busy waiting (i.e. the timedwait
- call is okay in those cases)
+ if the queue isn't empty, and the out_count is 0, that means
+ that we're driving i/o operations without using the aio
+ callback completion. we sleep between those calls to avoid
+ busy waiting (i.e. the timedwait call is okay in those
+ cases)
*/
if ((op_queued_empty) || (!op_queued_empty && (out_count == 0)))
{
@@ -186,7 +186,8 @@ int dbpf_do_one_work_cycle(int *out_coun
(*out_count)++;
/* this is a macro defined in dbpf-thread.h */
- move_op_to_completion_queue(cur_op);
+ move_op_to_completion_queue(
+ cur_op, ((ret == 1) ? 0 : ret), OP_COMPLETED);
}
else
{
Index: dbpf-thread.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.h,v
diff -p -u -r1.5 -r1.6
--- dbpf-thread.h 11 Aug 2003 19:25:50 -0000 1.5
+++ dbpf-thread.h 2 Feb 2004 22:50:25 -0000 1.6
@@ -24,20 +24,20 @@ void *dbpf_thread_function(void *ptr);
int dbpf_do_one_work_cycle(int *out_count);
-#define move_op_to_completion_queue(cur_op) do { \
-TROVE_context_id cid = cur_op->op.context_id; \
-cur_op->state = (ret == 1) ? 0 : ret; \
+#define move_op_to_completion_queue(cur_op, ret_state, end_state) \
+do { TROVE_context_id cid = cur_op->op.context_id; \
+cur_op->state = ret_state; \
context_mutex = dbpf_completion_queue_array_mutex[cid]; \
assert(context_mutex); \
/* \
it's important to atomically place the op in the completion \
- queue and change the op state to completed so that dspace_test \
+ queue and change the op state to 'end_state' so that dspace_test \
and dspace_testcontext play nicely together \
*/ \
gen_mutex_lock(context_mutex); \
dbpf_op_queue_add(dbpf_completion_queue_array[cid],cur_op); \
gen_mutex_lock(&cur_op->mutex); \
-cur_op->op.state = OP_COMPLETED; \
+cur_op->op.state = end_state; \
gen_mutex_unlock(&cur_op->mutex); \
/* wake up one waiting thread, if any */ \
pthread_cond_signal(&dbpf_op_completed_cond); \
Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.32 -r1.33
--- dbpf.h 13 Jan 2004 20:06:43 -0000 1.32
+++ dbpf.h 2 Feb 2004 22:50:25 -0000 1.33
@@ -311,11 +311,11 @@ enum dbpf_op_state
OP_QUEUED,
OP_IN_SERVICE,
OP_COMPLETED,
- OP_DEQUEUED
+ OP_DEQUEUED,
+ OP_CANCELED
};
-
-/* Used to keep in-memory copy of parameters for queued operations */
+/* Used to store parameters for queued operations */
struct dbpf_op
{
enum dbpf_op_type type;