[PVFS2-CVS] commit by neill in pvfs2/src/io/trove/trove-dbpf: dbpf-bstream.c dbpf-dspace.c dbpf-keyval-db-cache.c dbpf-thread.c dbpf-thread.h dbpf.h

CVS commit program pvfs2-internal@beowulf-underground.org
Mon, 2 Feb 2004 17:50:26 -0500


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb.parl.clemson.edu:/tmp/cvs-serv21560/src/io/trove/trove-dbpf

Modified Files:
	dbpf-bstream.c dbpf-dspace.c dbpf-keyval-db-cache.c 
	dbpf-thread.c dbpf-thread.h dbpf.h 
Log Message:
- first cut at supporting threaded-aio trove cancellation
- we now better handle errors by reporting them if they occur in any of
  the requests issued
- added a simple test-trove-cancel program as a sanity check
- misc cleanups


Index: dbpf-bstream.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c,v
diff -p -u -r1.29 -r1.30
--- dbpf-bstream.c	30 Jan 2004 20:12:13 -0000	1.29
+++ dbpf-bstream.c	2 Feb 2004 22:50:25 -0000	1.30
@@ -68,7 +68,7 @@ static void aio_progress_notification(si
 {
     dbpf_queued_op_t *cur_op = NULL;
     struct dbpf_op *op_p = NULL;
-    int ret, i, aiocb_inuse_count, state = 0;
+    int ret, i, aiocb_inuse_count, state = 0, error_code = 0;
     struct aiocb *aiocb_p = NULL, *aiocb_ptr_array[AIOCB_ARRAY_SZ] = {0};
     gen_mutex_t *context_mutex = NULL;
 
@@ -96,7 +96,6 @@ static void aio_progress_notification(si
       error/return value of the op based on individual request
       error/return values.  they're ignored for now, however.
     */
-#if 0
     for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
     {
         if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
@@ -110,26 +109,40 @@ static void aio_progress_notification(si
         {
             /* aio_return gets the return value of the individual op */
             ret = aio_return(&aiocb_p[i]);
+#if 0
             gossip_debug(GOSSIP_TROVE_DEBUG,
                          "  aio_return() says %d\n", ret);
-
-            /* WHAT DO WE DO WITH PARTIAL READ/WRITES??? */
+#endif
 
             /* mark as a NOP so we ignore it from now on */
             aiocb_p[i].aio_lio_opcode = LIO_NOP;
         }
+        else
+        {
+            gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
+                         "aio_error/aio_return on block %d; "
+                         "skipping\n", ret, strerror(ret), i);
 
-        /* we shouldn't get called until all ops completed */
-        assert(ret != EINPROGRESS);
+            error_code = ret;
+            goto final_threaded_aio_cleanup;
+        }
     }
-#endif
 
     if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
     {
+        error_code = 0;
+
+      final_threaded_aio_cleanup:
         gossip_debug(GOSSIP_TROVE_DEBUG, " aio_progress_notification: "
                      "op completed\n");
 
-        /* TODO: HOW DO WE DO A SYNC IN HERE?  WE DON'T HAVE THE FD */
+        if (op_p->flags & TROVE_SYNC)
+        {
+            if ((ret = DBPF_SYNC(op_p->u.b_rw_list.fd)) != 0)
+            {
+                error_code = -trove_errno_to_trove_error(ret);
+            }
+        }
         dbpf_bstream_fdcache_put(op_p->coll_p->coll_id, op_p->handle);
 
         /*
@@ -141,8 +154,9 @@ static void aio_progress_notification(si
         op_p->u.b_rw_list.aiocb_array = NULL;
 
         /* this is a macro defined in dbpf-thread.h */
-        ret = 1;
-        move_op_to_completion_queue(cur_op);
+        move_op_to_completion_queue(
+            cur_op, error_code,
+            ((error_code == ECANCELED) ? OP_CANCELED : OP_COMPLETED));
         return;
     }
     else
@@ -988,7 +1002,9 @@ static int dbpf_bstream_rw_list_op_svc(s
 		gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
                              "aio_error/aio_return on block %d; "
                              "skipping\n", ret, strerror(ret), i);
-		aiocb_p[i].aio_lio_opcode = LIO_NOP;
+
+                ret = -trove_errno_to_trove_error(ret);
+                goto final_aio_cleanup;
 	    }
 	    else
             {
@@ -999,7 +1015,7 @@ static int dbpf_bstream_rw_list_op_svc(s
     }
 
     /* if we're not done with the last set of operations, break out */
-    if (op_in_progress_count > 0) 
+    if (op_in_progress_count > 0)
     {
         return 0;
     }
@@ -1010,10 +1026,13 @@ static int dbpf_bstream_rw_list_op_svc(s
           done.  free the aiocb array, release the FD, and mark the
           whole op as complete
         */
+        ret = 1;
+
+      final_aio_cleanup:
 	free(aiocb_p);
 
 	dbpf_bstream_fdcache_put(op_p->coll_p->coll_id, op_p->handle);
-	return 1;
+	return ret;
     }
     else
     {

Index: dbpf-dspace.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-dspace.c,v
diff -p -u -r1.78 -r1.79
--- dbpf-dspace.c	30 Jan 2004 20:12:13 -0000	1.78
+++ dbpf-dspace.c	2 Feb 2004 22:50:25 -0000	1.79
@@ -48,10 +48,6 @@ static int dbpf_dspace_verify_op_svc(str
 static int dbpf_dspace_setattr_op_svc(struct dbpf_op *op_p);
 static int dbpf_dspace_getattr_op_svc(struct dbpf_op *op_p);
 
-/* dbpf_dspace_create()
- *
- * TODO: should this have a ds_attributes with it?
- */
 static int dbpf_dspace_create(TROVE_coll_id coll_id,
 			      TROVE_handle_extent_array *extent_array,
 			      TROVE_handle *handle_p,
@@ -257,8 +253,6 @@ return_error:
     return -TROVE_EINVAL;
 }
 
-/* dbpf_dspace_remove()
- */
 static int dbpf_dspace_remove(TROVE_coll_id coll_id,
 			      TROVE_handle handle,
 			      TROVE_ds_flags flags,
@@ -292,8 +286,6 @@ static int dbpf_dspace_remove(TROVE_coll
     return 0;
 }
 
-/* dbpf_dspace_remove_op_svc()
- */
 static int dbpf_dspace_remove_op_svc(struct dbpf_op *op_p)
 {
     int error, ret, got_db = 0;
@@ -427,8 +419,6 @@ int dbpf_dspace_iterate_handles(TROVE_co
     return 0;
 }
 
-/* dbpf_dspace_iterate_handles_op_svc()
- */
 static int dbpf_dspace_iterate_handles_op_svc(struct dbpf_op *op_p)
 {
     int ret = 0, i = 0, got_db = 0;
@@ -703,9 +693,6 @@ return_error:
     return error;
 }
 
-
-/* dbpf_dspace_getattr()
- */
 static int dbpf_dspace_getattr(TROVE_coll_id coll_id,
 			       TROVE_handle handle,
 			       TROVE_ds_attributes_s *ds_attr_p,
@@ -759,8 +746,6 @@ static int dbpf_dspace_getattr(TROVE_col
     return 0;
 }
 
-/* dbpf_dspace_setattr()
- */
 static int dbpf_dspace_setattr(TROVE_coll_id coll_id,
 			       TROVE_handle handle,
 			       TROVE_ds_attributes_s *ds_attr_p,
@@ -1048,6 +1033,103 @@ return_error:
     return ret;
 }
 
+/*
+  FIXME: it's possible to have a non-threaded version of this, but
+  it's not implemented right now
+*/
+static int dbpf_dspace_cancel(
+    TROVE_coll_id coll_id,
+    TROVE_op_id id,
+    TROVE_context_id context_id)
+{
+    int ret = -TROVE_ENOSYS;
+
+    gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_dspace_cancel called\n");
+
+#ifdef __PVFS2_TROVE_THREADED__
+    int state = 0;
+    gen_mutex_t *context_mutex = NULL;
+    dbpf_queued_op_t *cur_op = NULL;
+
+    assert(dbpf_completion_queue_array[context_id]);
+    context_mutex = dbpf_completion_queue_array_mutex[context_id];
+    assert(context_mutex);
+    cur_op = id_gen_fast_lookup(id);
+    if (cur_op == NULL)
+    {
+        gossip_err("Invalid operation to test against\n");
+        return -TROVE_EINVAL;
+    }
+
+    /* check the state of the current op to see if it's completed */
+    gen_mutex_lock(&cur_op->mutex);
+    state = cur_op->op.state;
+    gen_mutex_unlock(&cur_op->mutex);
+
+    gossip_debug(GOSSIP_TROVE_DEBUG, "got cur_op %p\n", cur_op);
+
+    switch(state)
+    {
+        case OP_QUEUED:
+        {
+            gossip_debug(GOSSIP_TROVE_DEBUG, "op is queued: handling\n");
+
+            /* try to quietly dequeue the op ; fail cancel otherwise */
+            dbpf_queued_op_put_and_dequeue(cur_op);
+            assert(cur_op->op.state == OP_DEQUEUED);
+
+            /* this is a macro defined in dbpf-thread.h */
+            move_op_to_completion_queue(cur_op, 0, OP_CANCELED);
+            ret = 0;
+        }
+        break;
+        case OP_IN_SERVICE:
+        {
+            /*
+              for bstream i/o op, try an aio_cancel.  for other ops,
+              there's not much we can do other than let the op
+              complete normally
+            */
+            if ((cur_op->op.type == BSTREAM_READ_LIST) ||
+                (cur_op->op.type == BSTREAM_WRITE_LIST))
+            {
+                ret = aio_cancel(cur_op->op.u.b_rw_list.fd,
+                                 cur_op->op.u.b_rw_list.aiocb_array);
+                gossip_debug(
+                    GOSSIP_TROVE_DEBUG, "aio_cancel returned %s\n",
+                    ((ret == AIO_CANCELED) ? "CANCELED" :
+                     "NOT CANCELED"));
+
+                /*
+                  NOTE: the normal aio notification method takes care
+                  of completing the op and moving it to the completion
+                  queue
+                */
+            }
+            else
+            {
+                gossip_debug(
+                    GOSSIP_TROVE_DEBUG, "op is in service: ignoring "
+                    "operation type %d\n", cur_op->op.type);
+            }
+            ret = 0;
+        }
+        break;
+        case OP_COMPLETED:
+            /* easy cancelation case; do nothing */
+            gossip_debug(
+                GOSSIP_TROVE_DEBUG, "op is completed: ignoring\n");
+            ret = 0;
+            break;
+        default:
+            gossip_err("Invalid dbpf_op state found (%d)\n", state);
+            assert(0);
+    }
+#endif
+    return ret;
+}
+
+
 /* dbpf_dspace_test()
  *
  * Returns 0 if not completed, 1 if completed (successfully or with error).
@@ -1059,14 +1141,15 @@ return_error:
  *
  * Removes completed operations from the queue.
  */
-static int dbpf_dspace_test(TROVE_coll_id coll_id,
-			    TROVE_op_id id,
-                            TROVE_context_id context_id,
-			    int *out_count_p,
-			    TROVE_vtag_s *vtag,
-			    void **returned_user_ptr_p,
-			    TROVE_ds_state *state_p,
-                            int max_idle_time_ms)
+static int dbpf_dspace_test(
+    TROVE_coll_id coll_id,
+    TROVE_op_id id,
+    TROVE_context_id context_id,
+    int *out_count_p,
+    TROVE_vtag_s *vtag,
+    void **returned_user_ptr_p,
+    TROVE_ds_state *state_p,
+    int max_idle_time_ms)
 {
     int ret = -1;
     dbpf_queued_op_t *cur_op = NULL;
@@ -1379,10 +1462,11 @@ int dbpf_dspace_testcontext(
 
 /* dbpf_dspace_testsome()
  *
- * Returns 0 if nothing completed, 1 if something is completed (successfully
- * or with error).
+ * Returns 0 if nothing completed, 1 if something is completed
+ * (successfully or with error).
  *
- * The error state of the completed operation is returned via the state_p.
+ * The error state of the completed operation is returned via the
+ * state_p.
  */
 static int dbpf_dspace_testsome(
     TROVE_coll_id coll_id,
@@ -1533,6 +1617,7 @@ struct TROVE_dspace_ops dbpf_dspace_ops 
     dbpf_dspace_verify,
     dbpf_dspace_getattr,
     dbpf_dspace_setattr,
+    dbpf_dspace_cancel,
     dbpf_dspace_test,
     dbpf_dspace_testsome,
     dbpf_dspace_testcontext

Index: dbpf-keyval-db-cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval-db-cache.c,v
diff -p -u -r1.27 -r1.28
--- dbpf-keyval-db-cache.c	30 Jan 2004 20:12:13 -0000	1.27
+++ dbpf-keyval-db-cache.c	2 Feb 2004 22:50:25 -0000	1.28
@@ -40,8 +40,6 @@ struct keyval_dbcache_entry
 
 static struct keyval_dbcache_entry keyval_db_cache[DBCACHE_ENTRIES];
 
-/* dbpf_keyval_dbcache_initialize()
- */
 void dbpf_keyval_dbcache_initialize(void)
 {
     int i;
@@ -54,8 +52,6 @@ void dbpf_keyval_dbcache_initialize(void
     }
 }
 
-/* dbpf_keyval_dbcache_finalize()
- */
 void dbpf_keyval_dbcache_finalize(void)
 {
     int i, ret;
@@ -309,8 +305,6 @@ return_error:
     return error;
 }
 
-/* dbpf_keyval_dbcache_put()
- */
 void dbpf_keyval_dbcache_put(TROVE_coll_id coll_id, TROVE_handle handle)
 {
     int i;

Index: dbpf-thread.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.c,v
diff -p -u -r1.18 -r1.19
--- dbpf-thread.c	30 Jan 2004 20:12:13 -0000	1.18
+++ dbpf-thread.c	2 Feb 2004 22:50:25 -0000	1.19
@@ -103,14 +103,14 @@ void *dbpf_thread_function(void *ptr)
         }
 
         /*
-          if we have no work to do, wait nicely until an
-          operation to be serviced has entered the system.
+          if we have no work to do, wait nicely until an operation to
+          be serviced has entered the system.
 
-          if the queue isn't empty, and the out_count is 0,
-          that means that we're driving i/o operations without
-          using the aio callback completion.  we sleep between
-          those calls to avoid busy waiting (i.e. the timedwait
-          call is okay in those cases)
+          if the queue isn't empty, and the out_count is 0, that means
+          that we're driving i/o operations without using the aio
+          callback completion.  we sleep between those calls to avoid
+          busy waiting (i.e. the timedwait call is okay in those
+          cases)
         */
         if ((op_queued_empty) || (!op_queued_empty && (out_count == 0)))
         {
@@ -186,7 +186,8 @@ int dbpf_do_one_work_cycle(int *out_coun
             (*out_count)++;
 
             /* this is a macro defined in dbpf-thread.h */
-            move_op_to_completion_queue(cur_op);
+            move_op_to_completion_queue(
+                cur_op, ((ret == 1) ? 0 : ret), OP_COMPLETED);
         }
         else
         {

Index: dbpf-thread.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.h,v
diff -p -u -r1.5 -r1.6
--- dbpf-thread.h	11 Aug 2003 19:25:50 -0000	1.5
+++ dbpf-thread.h	2 Feb 2004 22:50:25 -0000	1.6
@@ -24,20 +24,20 @@ void *dbpf_thread_function(void *ptr);
 
 int dbpf_do_one_work_cycle(int *out_count);
 
-#define move_op_to_completion_queue(cur_op) do {                   \
-TROVE_context_id cid = cur_op->op.context_id;                      \
-cur_op->state = (ret == 1) ? 0 : ret;                              \
+#define move_op_to_completion_queue(cur_op, ret_state, end_state)  \
+do { TROVE_context_id cid = cur_op->op.context_id;                 \
+cur_op->state = ret_state;                                         \
 context_mutex = dbpf_completion_queue_array_mutex[cid];            \
 assert(context_mutex);                                             \
 /*                                                                 \
   it's important to atomically place the op in the completion      \
-  queue and change the op state to completed so that dspace_test   \
+  queue and change the op state to 'end_state' so that dspace_test \
   and dspace_testcontext play nicely together                      \
 */                                                                 \
 gen_mutex_lock(context_mutex);                                     \
 dbpf_op_queue_add(dbpf_completion_queue_array[cid],cur_op);        \
 gen_mutex_lock(&cur_op->mutex);                                    \
-cur_op->op.state = OP_COMPLETED;                                   \
+cur_op->op.state = end_state;                                      \
 gen_mutex_unlock(&cur_op->mutex);                                  \
 /* wake up one waiting thread, if any */                           \
 pthread_cond_signal(&dbpf_op_completed_cond);                      \

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.32 -r1.33
--- dbpf.h	13 Jan 2004 20:06:43 -0000	1.32
+++ dbpf.h	2 Feb 2004 22:50:25 -0000	1.33
@@ -311,11 +311,11 @@ enum dbpf_op_state
     OP_QUEUED,
     OP_IN_SERVICE,
     OP_COMPLETED,
-    OP_DEQUEUED
+    OP_DEQUEUED,
+    OP_CANCELED
 };
 
-
-/* Used to keep in-memory copy of parameters for queued operations */
+/* Used to store parameters for queued operations */
 struct dbpf_op
 {
     enum dbpf_op_type type;