[Pvfs2-cvs] commit by slang in pvfs2/src/io/trove/trove-dbpf: dbpf-bstream-aio.c dbpf-bstream-threaded.c dbpf.h

CVS commit program cvs at parl.clemson.edu
Thu Aug 3 12:06:56 EDT 2006


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv7112/src/io/trove/trove-dbpf

Modified Files:
      Tag: kunkel-branch
	dbpf-bstream-aio.c dbpf-bstream-threaded.c dbpf.h 
Log Message:
cleanup of aio code, fix minor style issues.


Index: dbpf-bstream-aio.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-bstream-aio.c,v
diff -p -u -r1.24.6.5 -r1.24.6.6
--- dbpf-bstream-aio.c	2 Aug 2006 02:13:03 -0000	1.24.6.5
+++ dbpf-bstream-aio.c	3 Aug 2006 16:06:56 -0000	1.24.6.6
@@ -28,19 +28,7 @@
 
 /* bstream-aio functions */
 
-int dbpf_bstream_listio_convert(
-                int fd,
-                int op_type,
-                char **mem_offset_array,
-                TROVE_size *mem_size_array,
-                int mem_count,
-                TROVE_offset *stream_offset_array,
-                TROVE_size *stream_size_array,
-                int stream_count,
-                struct aiocb *aiocb_array,
-                int *aiocb_count,
-                struct bstream_listio_state *lio_state
-                );
+static int dbpf_bstream_aiocb_init(struct aiocb ** new_aiocb_p);
 
 /* dbpf_bstream_listio_convert()
  *
@@ -50,6 +38,29 @@ int dbpf_bstream_listio_convert(
  * Stored state in lio_state so that processing can
  * continue on subsequent calls.
  */
+static int dbpf_bstream_listio_convert(
+    int fd,
+    int op_type,
+    char **mem_offset_array,
+    TROVE_size *mem_size_array,
+    int mem_count,
+    TROVE_offset *stream_offset_array,
+    TROVE_size *stream_size_array,
+    int stream_count,
+    struct aiocb *aiocb_array,
+    int *aiocb_count,
+    struct bstream_listio_state *lio_state);
+
+static int dbpf_bstream_aio_issue_or_delay(
+    dbpf_queued_op_t * cur_op,
+    struct aiocb **aiocb_ptr_array,
+    int aiocb_inuse_count,
+    struct sigevent *sig,
+    int dec_first);
+
+static void dbpf_bstream_aio_start_delayed(
+    int dec_first);
+
 
 extern gen_mutex_t dbpf_attr_cache_mutex;
 
@@ -60,14 +71,12 @@ static int s_dbpf_ios_in_progress = 0;
 static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
 dbpf_op_queue_s  s_dbpf_io_ready_queue;
 
-static int issue_or_delay_io_operation(
-    dbpf_queued_op_t * cur_op,
-    struct aiocb **aiocb_ptr_array,
-    int aiocb_inuse_count,
-    struct sigevent *sig,
-    int dec_first);
-static void start_delayed_ops_if_any(
-    int dec_first);
+static char *list_proc_state_strings[] = {
+    "LIST_PROC_INITIALIZED",
+    "LIST_PROC_INPROGRESS ",
+    "LIST_PROC_ALLCONVERTED",
+    "LIST_PROC_ALLPOSTED",
+};
 
 static inline int dbpf_bstream_rw_list(
     TROVE_coll_id coll_id,
@@ -137,73 +146,21 @@ static void aio_progress_notification(
 
     assert(state != OP_COMPLETED);
 
-    /*
-       we should iterate through the ops here to determine the
-       error/return value of the op based on individual request
-       error/return values.  they're ignored for now, however.
-     */
-    for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+    ret = dbpf_bstream_aio_check_progress(op_p);
+    if(ret < 0)
     {
-        if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
-        {
-            continue;
-        }
-
-        /* aio_error gets the "errno" value of the individual op */
-        ret = aio_error(&aiocb_p[i]);
-        if (ret == 0)
-        {
-            /* aio_return gets the return value of the individual op */
-            ret = aio_return(&aiocb_p[i]);
-
-            gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
-                         "aio_return() says %d [fd = %d]\n",
-                         __func__,
-                         ((op_p->type == BSTREAM_WRITE_LIST) ||
-                          (op_p->type == BSTREAM_WRITE_AT) ?
-                          "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
-
-            *(op_p->u.b_rw_list.out_size_p) += ret;
-
-            /* mark as a NOP so we ignore it from now on */
-            aiocb_p[i].aio_lio_opcode = LIO_NOP;
-        }
-        else
-        {
-            gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
-                         "aio_error/aio_return on block %d; "
-                         "skipping\n", ret, strerror(ret), i);
-
-            ret = -trove_errno_to_trove_error(ret);
-            goto final_threaded_aio_cleanup;
-        }
+        dbpf_bstream_aio_cleanup(op_p);
     }
 
     if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
     {
         ret = 0;
 
-      final_threaded_aio_cleanup:
-        if ((op_p->type == BSTREAM_WRITE_AT) ||
-            (op_p->type == BSTREAM_WRITE_LIST))
-        {
-            DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
-        }
-
-        dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
-        op_p->u.b_rw_list.fd = -1;
-
-        gossip_debug(GOSSIP_TROVE_DEBUG, "*** starting delayed ops if any "
-                     "(state is %s)\n",
-                     list_proc_state_strings[op_p->u.b_rw_list.
-                                             list_proc_state]);
-
+        dbpf_bstream_aio_cleanup(op_p);
         dbpf_move_op_to_completion_queue(cur_op, ret,
                                          ((ret ==
                                            -TROVE_ECANCEL) ? OP_CANCELED :
                                           OP_COMPLETED));
-
-        start_delayed_ops_if_any(1);
     }
     else
     {
@@ -212,63 +169,17 @@ static void aio_progress_notification(
                      list_proc_state_strings[op_p->u.b_rw_list.
                                              list_proc_state]);
 
-        /* no operations in progress; convert and post some more */
-        op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
-        op_p->u.b_rw_list.aiocb_array = aiocb_p;
-
-        /* convert listio arguments into aiocb structures */
-        aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
-        ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
-                                          op_p->u.b_rw_list.opcode,
-                                          op_p->u.b_rw_list.mem_offset_array,
-                                          op_p->u.b_rw_list.mem_size_array,
-                                          op_p->u.b_rw_list.mem_array_count,
-                                          op_p->u.b_rw_list.stream_offset_array,
-                                          op_p->u.b_rw_list.stream_size_array,
-                                          op_p->u.b_rw_list.stream_array_count,
-                                          aiocb_p,
-                                          &aiocb_inuse_count,
-                                          &op_p->u.b_rw_list.lio_state);
-
-        if (ret == 1)
-        {
-            op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
-        }
-
         op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
         op_p->u.b_rw_list.sigev.sigev_notify_attributes = NULL;
         op_p->u.b_rw_list.sigev.sigev_notify_function =
             aio_progress_notification;
         op_p->u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) cur_op;
 
-        /* mark the unused with LIO_NOPs */
-        for (i = aiocb_inuse_count;
-             i < op_p->u.b_rw_list.aiocb_array_count; i++)
-        {
-            /* mark these as NOPs and we'll ignore them */
-            aiocb_p[i].aio_lio_opcode = LIO_NOP;
-        }
-
-        for (i = 0; i < aiocb_inuse_count; i++)
-        {
-            aiocb_ptr_array[i] = &aiocb_p[i];
-        }
-
-        assert(cur_op == op_p->u.b_rw_list.sigev.sigev_value.sival_ptr);
-
-        if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
-        {
-            op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
-        }
-
-        ret =
-            issue_or_delay_io_operation(cur_op, aiocb_ptr_array,
-                                        aiocb_inuse_count,
-                                        &op_p->u.b_rw_list.sigev, 1);
-
+        /* no operations in progress; convert and post some more */
+        dbpf_bstream_aio_start_op(op_p->u.b_rw_list.queued_op_ptr, aiocb_p);
         if (ret)
         {
-            gossip_lerr("issue_or_delay_io_operation() returned " "%d\n", ret);
+            gossip_lerr("dbpf_bstream_start_op() returned " "%d\n", ret);
         }
     }
 }
@@ -376,7 +287,7 @@ static void start_delayed_ops_if_any(
     gen_mutex_unlock(& s_dbpf_io_mutex);
 }
 
-static int issue_or_delay_io_operation(
+static int dbpf_bstream_aio_issue_or_delay(
     dbpf_queued_op_t * cur_op,
     struct aiocb **aiocb_ptr_array,
     int aiocb_inuse_count,
@@ -1023,73 +934,25 @@ static inline int dbpf_bstream_rw_list(
     *out_op_id_p = dbpf_queued_op_queue(q_op_p, & dbpf_op_queue[OP_QUEUE_IO]);
 
 #else
-    op_p = &q_op_p->op;
-
-    /*
-       instead of queueing the op like most other trove operations,
-       we're going to issue the system aio calls here to begin being
-       serviced immediately.  We'll check progress in the
-       aio_progress_notification callback method; this array is freed
-       in dbpf-op.c:dbpf_queued_op_free
-     */
-    aiocb_p = (struct aiocb *) malloc((AIOCB_ARRAY_SZ * sizeof(struct aiocb)));
-    if (aiocb_p == NULL)
-    {
-        dbpf_open_cache_put(&q_op_p->op.u.b_rw_list.open_ref);
-        return -TROVE_ENOMEM;
-    }
-
-    memset(aiocb_p, 0, (AIOCB_ARRAY_SZ * sizeof(struct aiocb)));
-    for (i = 0; i < AIOCB_ARRAY_SZ; i++)
-    {
-        aiocb_p[i].aio_lio_opcode = LIO_NOP;
-        aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
-    }
-
-    op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
-    op_p->u.b_rw_list.aiocb_array = aiocb_p;
-    op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
 
-    /* convert listio arguments into aiocb structures */
-    aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
-    ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
-                                      op_p->u.b_rw_list.opcode,
-                                      op_p->u.b_rw_list.mem_offset_array,
-                                      op_p->u.b_rw_list.mem_size_array,
-                                      op_p->u.b_rw_list.mem_array_count,
-                                      op_p->u.b_rw_list.stream_offset_array,
-                                      op_p->u.b_rw_list.stream_size_array,
-                                      op_p->u.b_rw_list.stream_array_count,
-                                      aiocb_p,
-                                      &aiocb_inuse_count,
-                                      &op_p->u.b_rw_list.lio_state);
+    /* setup aio event notification */
+    q_op_p->op.u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
+    q_op_p->op.u.b_rw_list.sigev.sigev_notify_attributes = NULL;
+    q_op_p->op.u.b_rw_list.sigev.sigev_notify_function = 
+        aio_progress_notification;
+    q_op_p->op.u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) q_op_p;
 
-    if (ret == 1)
+    ret = dbpf_bstream_aiocb_init(&aiocb_p);
+    if(ret < 0)
     {
-        op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
-    }
-
-    op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
-    op_p->u.b_rw_list.sigev.sigev_notify_attributes = NULL;
-    op_p->u.b_rw_list.sigev.sigev_notify_function = aio_progress_notification;
-    op_p->u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) q_op_p;
-
-    /* mark unused with LIO_NOPs */
-    for (i = aiocb_inuse_count; i < op_p->u.b_rw_list.aiocb_array_count; i++)
-    {
-        aiocb_p[i].aio_lio_opcode = LIO_NOP;
-    }
-
-    for (i = 0; i < aiocb_inuse_count; i++)
-    {
-        aiocb_ptr_array[i] = &aiocb_p[i];
+        return ret;
     }
 
-    assert(q_op_p == op_p->u.b_rw_list.sigev.sigev_value.sival_ptr);
-
-    if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
+    ret = dbpf_bstream_aio_start_op(q_op_p, aiocb_p);
+    if(ret < 0)
     {
-        op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
+        dbpf_open_cache_put(&q_op_p->op.u.b_rw_list.open_ref);
+        return ret;
     }
 
     dbpf_op_change_status(q_op_p, OP_IN_SERVICE);
@@ -1097,14 +960,6 @@ static inline int dbpf_bstream_rw_list(
     id_gen_safe_register(&q_op_p->op.id, q_op_p);
     *out_op_id_p = q_op_p->op.id;
 
-    ret =
-        issue_or_delay_io_operation(q_op_p, aiocb_ptr_array, aiocb_inuse_count,
-                                    &op_p->u.b_rw_list.sigev, 0);
-
-    if (ret)
-    {
-        return ret;
-    }
 #endif
     return 0;
 }
@@ -1150,174 +1005,46 @@ static inline int dbpf_bstream_rw_list(
 static int dbpf_bstream_rw_list_op_svc(
     struct dbpf_op *op_p)
 {
-    int ret = -TROVE_EINVAL, i = 0, aiocb_inuse_count = 0;
+    int ret = -TROVE_EINVAL;
     int op_in_progress_count = 0;
-    struct aiocb *aiocb_p = NULL, *aiocb_ptr_array[AIOCB_ARRAY_SZ];
 
     if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_INITIALIZED)
     {
-        /*
-           first call; need to allocate aiocb array and ptr array;
-           this array is freed in dbpf-op.c:dbpf_queued_op_free
+        /* The rw_list has just been posted and no lio_listio operations
+         * have been posted yet, so we do that and return
          */
-        aiocb_p = malloc(AIOCB_ARRAY_SZ * sizeof(struct aiocb));
-        if (aiocb_p == NULL)
-        {
-            return -TROVE_ENOMEM;
-        }
+        op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_NONE;
 
-        memset(aiocb_p, 0, AIOCB_ARRAY_SZ * sizeof(struct aiocb));
-        for (i = 0; i < AIOCB_ARRAY_SZ; i++)
+        ret = dbpf_bstream_aiocb_init(&aiocb_p);
+        if(ret < 0)
         {
-            aiocb_p[i].aio_lio_opcode = LIO_NOP;
-            aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+            return ret;
         }
 
-        op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
-        op_p->u.b_rw_list.aiocb_array = aiocb_p;
-        op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
-        op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_NONE;
+        ret = dbpf_bstream_aio_start_op(
+            (dbpf_queued_op_t *)op_p->u.b_rw_list.queued_op_ptr, aiocb_p);
+        return ret;
     }
-    else
-    {
-        /* operations potentially in progress */
-        aiocb_p = op_p->u.b_rw_list.aiocb_array;
-
-        /* check to see how we're progressing on previous operations */
-        for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
-        {
-            if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
-            {
-                continue;
-            }
-
-            /* gets the "errno" value of the individual op */
-            ret = aio_error(&aiocb_p[i]);
-            if (ret == 0)
-            {
-                /*
-                   this particular operation completed w/out error.
-                   gets the return value of the individual op
-                 */
-                ret = aio_return(&aiocb_p[i]);
-
-                gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
-                             "aio_return() ret %d (fd %d)\n",
-                             __func__,
-                             ((op_p->type == BSTREAM_WRITE_LIST) ||
-                              (op_p->type == BSTREAM_WRITE_AT) ?
-                              "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
-
-                /* aio_return doesn't seem to return bytes read/written if 
-                 * sigev_notify == SIGEV_NONE, so we set the out size 
-                 * from what's requested.  For reads we just leave as zero,
-                 * which ends up being OK,
-                 * since the amount read (if past EOF its less than requested)
-                 * is determined from the bstream size.
-                 */
-                if (op_p->type == BSTREAM_WRITE_LIST ||
-                    op_p->type == BSTREAM_WRITE_AT)
-                {
-                    *(op_p->u.b_rw_list.out_size_p) += aiocb_p[i].aio_nbytes;
-                }
 
-                /* mark as a NOP so we ignore it from now on */
-                aiocb_p[i].aio_lio_opcode = LIO_NOP;
-            }
-            else if (ret != EINPROGRESS)
-            {
-                gossip_err("%s: aio_error on block %d, skipping: %s\n",
-                           __func__, i, strerror(ret));
-                ret = -trove_errno_to_trove_error(ret);
-                goto final_aio_cleanup;
-            }
-            else
-            {
-                /* otherwise the operation is still in progress; skip it */
-                op_in_progress_count++;
-            }
-        }
-    }
+    /* operations potentially in progress */
+    aiocb_p = op_p->u.b_rw_list.aiocb_array;
 
-    /* if we're not done with the last set of operations, break out */
-    if (op_in_progress_count > 0)
+    ret = dbpf_bstream_aio_check_progress(aiocb_p);
+    if(ret == EINPROGRESS)
     {
         return 0;
     }
-    else if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
+    else if(ret < 0)
     {
-        /* we've posted everything, and it all completed */
-        ret = 1;
-
-      final_aio_cleanup:
-        if ((op_p->type == BSTREAM_WRITE_AT) ||
-            (op_p->type == BSTREAM_WRITE_LIST))
-        {
-            DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
-        }
-
-        dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
-        op_p->u.b_rw_list.fd = -1;
-
-        start_delayed_ops_if_any(1);
-        return ret;
+        dbpf_bstream_aio_cleanup(op_p);
+        return 1;
     }
-    else
-    {
-        /* no operations in progress; convert and post some more */
-        aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
-        ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
-                                          op_p->u.b_rw_list.opcode,
-                                          op_p->u.b_rw_list.mem_offset_array,
-                                          op_p->u.b_rw_list.mem_size_array,
-                                          op_p->u.b_rw_list.mem_array_count,
-                                          op_p->u.b_rw_list.stream_offset_array,
-                                          op_p->u.b_rw_list.stream_size_array,
-                                          op_p->u.b_rw_list.stream_array_count,
-                                          aiocb_p,
-                                          &aiocb_inuse_count,
-                                          &op_p->u.b_rw_list.lio_state);
-
-        if (ret == 1)
-        {
-            op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
-        }
-
-        /* mark unused with LIO_NOPs */
-        for (i = aiocb_inuse_count;
-             i < op_p->u.b_rw_list.aiocb_array_count; i++)
-        {
-            aiocb_p[i].aio_lio_opcode = LIO_NOP;
-        }
-
-        for (i = 0; i < aiocb_inuse_count; i++)
-        {
-            aiocb_ptr_array[i] = &aiocb_p[i];
-        }
-
-        if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
-        {
-            op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
-        }
-
-        /*
-           we use a reverse mapped ptr for I/O operations in order to
-           access the queued op from the op.  this is only useful for
-           the delayed io operation scheme.  it's initialized in
-           dbpf_bstream_rw_list
-         */
-        assert(op_p->u.b_rw_list.queued_op_ptr);
 
-        ret = issue_or_delay_io_operation((dbpf_queued_op_t *) op_p->u.
-                                          b_rw_list.queued_op_ptr,
-                                          aiocb_ptr_array, aiocb_inuse_count,
-                                          &op_p->u.b_rw_list.sigev, 1);
-
-        if (ret)
-        {
-            return ret;
-        }
-        return 0;
+    if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
+    {
+        /* we've posted everything, and it all completed */
+        dbpf_bstream_aio_cleanup(op_p);
+        return 1;
     }
 }
 #endif
@@ -1510,6 +1237,176 @@ struct TROVE_bstream_ops dbpf_bstream_op
     dbpf_bstream_write_list,
     dbpf_bstream_flush
 };
+
+static int dbpf_bstream_aiocb_init(struct aiocb ** new_aiocb_p)
+{
+    struct aiocb * aiocb_p;
+
+    aiocb_p = malloc(AIOCB_ARRAY_SZ * sizeof(struct aiocb));
+    if(aiocb_p == NULL)
+    {
+        return -TROVE_ENOMEM;
+    }
+    memset(aiocb_p, 0, AIOCB_ARRAY_SZ * sizeof(struct aiocb));
+    for (i = 0; i < AIOCB_ARRAY_SZ; i++)
+    {
+        aiocb_p[i].aio_lio_opcode = LIO_NOP;
+        aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+    }
+
+    return 0;
+}
+    
+static int dbpf_bstream_aio_start_op(dbpf_queued_op_t * q_op_p,
+                                     struct aiocb * aiocb_p)
+{
+    struct aiocb * aiocb_ptr_array[AIOCB_ARRAY_SZ];
+    int aiocb_inuse_count;
+    int ret, i;
+    struct dbpf_op * op_p;
+
+    op_p = &q_op_p->op;
+
+    op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
+    op_p->u.b_rw_list.aiocb_array = aiocb_p;
+    op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
+
+    /* convert listio arguments into aiocb structures */
+    aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
+    ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
+                                      op_p->u.b_rw_list.opcode,
+                                      op_p->u.b_rw_list.mem_offset_array,
+                                      op_p->u.b_rw_list.mem_size_array,
+                                      op_p->u.b_rw_list.mem_array_count,
+                                      op_p->u.b_rw_list.stream_offset_array,
+                                      op_p->u.b_rw_list.stream_size_array,
+                                      op_p->u.b_rw_list.stream_array_count,
+                                      aiocb_p,
+                                      &aiocb_inuse_count,
+                                      &op_p->u.b_rw_list.lio_state);
+
+    if (ret == 1)
+    {
+        op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
+    }
+
+    /* mark unused with LIO_NOPs */
+    for (i = aiocb_inuse_count; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+    {
+        aiocb_p[i].aio_lio_opcode = LIO_NOP;
+    }
+
+    for (i = 0; i < aiocb_inuse_count; i++)
+    {
+        aiocb_ptr_array[i] = &aiocb_p[i];
+    }
+
+    if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
+    {
+        op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
+    }
+
+    ret =
+        dbpf_bstream_aio_issue_or_delay(
+            q_op_p, aiocb_ptr_array, aiocb_inuse_count,
+            &op_p->u.b_rw_list.sigev, 0);
+
+    return ret;
+}
+
+static int dbpf_bstream_aio_check_progress(struct dbpf_op * op_p)
+{
+    struct aiocb * aiocb_p;
+
+    /* operations potentially in progress */
+    aiocb_p = op_p->u.b_rw_list.aiocb_array;
+
+     /*
+       we should iterate through the ops here to determine the
+       error/return value of the op based on individual request
+       error/return values.  they're ignored for now, however.
+     */
+    for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+    {
+        if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
+        {
+            continue;
+        }
+
+        /* aio_error gets the "errno" value of the individual op */
+        ret = aio_error(&aiocb_p[i]);
+        if(ret == EINPROGRESS)
+        {
+            op_in_progress_count++;
+            continue;
+        }
+        else if(ret < 0)
+        {
+            gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
+                         "aio_error/aio_return on block %d; "
+                         "skipping\n", ret, strerror(ret), i);
+
+            ret = -trove_errno_to_trove_error(ret);
+            return ret;
+        }
+
+        /* aio_return gets the return value of the individual op */
+        ret = aio_return(&aiocb_p[i]);
+
+        gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
+                     "aio_return() says %d [fd = %d]\n",
+                     __func__,
+                     ((op_p->type == BSTREAM_WRITE_LIST) ||
+                      (op_p->type == BSTREAM_WRITE_AT) ?
+                      "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
+
+        if(op_p->u.b_rw_list.sigev.sigev_notify == SIGEV_NONE)
+        {
+            /* aio_return doesn't seem to return bytes read/written if 
+             * sigev_notify == SIGEV_NONE, so we set the out size 
+             * from what's requested.  For reads we just leave as zero,
+             * which ends up being OK,
+             * since the amount read (if past EOF its less than requested)
+             * is determined from the bstream size.
+             */
+            if (op_p->type == BSTREAM_WRITE_LIST ||
+                op_p->type == BSTREAM_WRITE_AT)
+            {
+                *(op_p->u.b_rw_list.out_size_p) += aiocb_p[i].aio_nbytes;
+            }
+        }
+        else
+        {
+            *(op_p->u.b_rw_list.out_size_p) += ret;
+        }
+
+        /* mark as a NOP so we ignore it from now on */
+        aiocb_p[i].aio_lio_opcode = LIO_NOP;
+    }
+
+    if(op_in_progress_count)
+    {
+        return EINPROGRESS;
+    }
+    return 0;
+}
+
+static int dbpf_bstream_aio_cleanup(struct dbpf_op * op_p)
+{
+    DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
+
+    dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
+    op_p->u.b_rw_list.fd = -1;
+
+    gossip_debug(GOSSIP_TROVE_DEBUG, "*** starting delayed ops if any "
+                 "(state is %s)\n",
+                 list_proc_state_strings[op_p->u.b_rw_list.
+                 list_proc_state]);
+    start_delayed_ops_if_any(1);
+    return 0;
+}
+
+
 #endif /* use AIO */
 /*
  * Local variables:

Index: dbpf-bstream-threaded.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/Attic/dbpf-bstream-threaded.c,v
diff -p -u -r1.1.2.12 -r1.1.2.13
--- dbpf-bstream-threaded.c	3 Aug 2006 13:52:07 -0000	1.1.2.12
+++ dbpf-bstream-threaded.c	3 Aug 2006 16:06:56 -0000	1.1.2.13
@@ -4,6 +4,8 @@
  * See COPYING in top-level directory.
  */
  
+#include "dbpf.h"
+
 /*
  * Needed for O_DIRECT disk access
  */
@@ -28,7 +30,6 @@
 #include "pvfs2-debug.h"
 #include "trove.h"
 #include "trove-internal.h"
-#include "dbpf.h"
 #include "dbpf-op-queue.h"
 #include "dbpf-attr-cache.h"
 #include "pint-event.h"

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.74.2.4 -r1.74.2.5
--- dbpf.h	24 Jul 2006 20:04:11 -0000	1.74.2.4
+++ dbpf.h	3 Aug 2006 16:06:56 -0000	1.74.2.5
@@ -11,6 +11,7 @@
 extern "C" {
 #endif
 
+#include <sys/types.h>
 #include <db.h>
 #include <aio.h>
 #include "trove.h"
@@ -542,7 +543,9 @@ PVFS_error dbpf_db_error_to_trove_error(
 #define DBPF_AIO_SYNC_IF_NECESSARY(dbpf_op_ptr, fd, ret)  \
 do {                                                      \
     int tmp_ret, tmp_errno;                               \
-    if (dbpf_op_ptr->flags & TROVE_SYNC)                  \
+    if ((dbpf_op_ptr->flags & TROVE_SYNC) &&              \
+        ((dbpf_op_ptr->type == BSTREAM_WRITE_AT) ||       \
+         (dbpf_op_ptr->type == BSTREAM_WRITE_LIST)))      \
     {                                                     \
         if ((tmp_ret = DBPF_SYNC(fd)) != 0)               \
         {                                                 \



More information about the Pvfs2-cvs mailing list