[Pvfs2-cvs] commit by slang in pvfs2/src/io/trove/trove-dbpf: dbpf-null-aio.c dbpf-keyval.c module.mk.in

CVS commit program cvs at parl.clemson.edu
Wed May 21 14:55:26 EDT 2008


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv7682/src/io/trove/trove-dbpf

Modified Files:
      Tag: he-branch
	dbpf-keyval.c module.mk.in 
Added Files:
      Tag: he-branch
	dbpf-null-aio.c 
Log Message:
reverse merge of latest changes from HEAD to he branch.


--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ dbpf-null-aio.c	2008-05-21 14:55:26.000000000 -0400
@@ -0,0 +1,407 @@
+
+#include <string.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#ifdef HAVE_MALLOC_H
+#include <malloc.h>
+#endif
+#include <assert.h>
+#include <errno.h>
+#include <aio.h>
+
+#include "gossip.h"
+#include "pvfs2-debug.h"
+#include "trove.h"
+#include "trove-internal.h"
+#include "dbpf.h"
+#include "quicklist.h"
+#include "pthread.h"
+#include "dbpf.h"
+
+
+static int null_lio_listio(int mode, struct aiocb * const list[],
+                          int nent, struct sigevent *sig);
+static int null_aio_error(const struct aiocb *aiocbp);
+static ssize_t null_aio_return(struct aiocb *aiocbp);
+static int null_aio_cancel(int filedesc, struct aiocb * aiocbp);
+static int null_aio_suspend(const struct aiocb * const list[], int nent,
+                           const struct timespec * timeout);
+static int null_aio_read(struct aiocb * aiocbp);
+static int null_aio_write(struct aiocb * aiocbp);
+static int null_aio_fsync(int operation, struct aiocb * aiocbp);
+
+static struct dbpf_aio_ops null_aio_ops;
+
+struct null_aio_item
+{
+    struct aiocb *cb_p;
+    struct sigevent *sig;
+    struct qlist_head list_link;
+    int master;
+    pthread_t *tids;
+    int nent;
+};
+static void* null_lio_thread(void*);
+
+int null_lio_listio(int mode, struct aiocb * const list[], 
+                   int nent, struct sigevent *sig) 
+{
+    struct null_aio_item* tmp_item;
+    int ret, i;
+    pthread_t *tids;
+    pthread_attr_t attr;
+
+    tids = (pthread_t *)malloc(sizeof(pthread_t) * nent);
+    if(!tids)
+    {
+        return (-1);
+    }
+
+    for(i = 0; i < nent; ++i)
+    {
+	int spawnmode= PTHREAD_CREATE_JOINABLE;
+        tmp_item = (struct null_aio_item*)malloc(sizeof(struct null_aio_item)*nent);
+        if(!tmp_item)
+        {
+            return (-1);
+        }
+        memset(tmp_item, 0, sizeof(struct null_aio_item));
+
+        if(mode == LIO_NOWAIT && i == (nent - 1))
+        {
+            /* This is the master thread and needs to wait for the others.
+             * We make the master the last thread to get created, so that
+             * we don't end up in a race with the thread ids getting set
+             * properly 
+             */
+            tmp_item->master = 1;
+            tmp_item->tids = tids;
+            tmp_item->nent = nent;
+	    spawnmode= PTHREAD_CREATE_DETACHED;
+        }
+
+        tmp_item->cb_p = list[i];
+        tmp_item->sig = sig;
+
+        /* setup state */
+#ifdef HAVE_AIOCB_ERROR_CODE
+        tmp_item->cb_p->__error_code = EINPROGRESS;
+#endif
+
+        /* set detached state */
+        ret = pthread_attr_init(&attr);
+        if(ret != 0)
+        {
+            free(tmp_item);
+            errno = ret;
+
+            return(-1);
+        }
+        ret = pthread_attr_setdetachstate(
+            &attr,
+	    spawnmode
+        );
+        if(ret != 0)
+        {
+            free(tmp_item);
+            errno = ret;
+            return(-1);
+        }
+
+        /* create thread to perform I/O and trigger callback */
+        ret = pthread_create(&tids[i], &attr, null_lio_thread, tmp_item);
+        if(ret != 0)
+        {
+            int j = 0;
+
+            if(mode == LIO_WAIT)
+            {
+                for(; j < i; ++j)
+                {
+                    pthread_join(tids[j], NULL);
+                }
+            }
+
+            free(tmp_item);
+            free(tids);
+            errno = ret;
+            return(-1);
+        }
+        gossip_debug(GOSSIP_BSTREAM_DEBUG, 
+                     "[null-aio]: pthread_create completed:"
+                     " id: %d, thread_id: %p\n",
+                     i, (void *)tids[i]);
+    }
+
+    ret = 0;
+    if(mode == LIO_WAIT)
+    {
+        for(i = 0; i < nent; ++i)
+        {
+            pthread_join(tids[i], NULL);
+            if(ret != 0 && null_aio_error(list[i]) != 0)
+            {
+                /* for now we're just overwriting previous errors
+                 * since we have no way to store and return them
+                 * in the blocking case.
+                 * The caller should call aio_error to get the
+                 * element specific errors
+                 */
+                ret = null_aio_error(list[i]);
+            }
+        }
+
+        free(tids);
+    }
+    return(ret);
+}
+
+static int null_aio_error(const struct aiocb *aiocbp)
+{
+#ifdef HAVE_AIOCB_ERROR_CODE
+    return aiocbp->__error_code;
+#else
+    return 0;
+#endif
+}
+
+static ssize_t null_aio_return(struct aiocb *aiocbp)
+{
+#ifdef HAVE_AIOCB_RETURN_VALUE
+    return aiocbp->__return_value;
+#else
+    return 0;
+#endif
+}
+
+static int null_aio_cancel(int filedesc, struct aiocb *aiocbp)
+{
+    errno = ENOSYS;
+    return -1;
+}
+
+static int null_aio_suspend(const struct aiocb * const list[], int nent,
+                           const struct timespec * timeout)
+{
+    errno = ENOSYS;
+    return -1;
+}
+
+static int null_aio_read(struct aiocb * aiocbp)
+{
+    errno = ENOSYS;
+    return -1;
+}
+
+static int null_aio_write(struct aiocb * aiocbp)
+{
+    errno = ENOSYS;
+    return -1;
+}
+
+static int null_aio_fsync(int operation, struct aiocb * aiocbp)
+{
+    errno = ENOSYS;
+    return -1;
+}
+
+static void* null_lio_thread(void* foo)
+{
+    struct null_aio_item* tmp_item = (struct null_aio_item*)foo;
+    int ret = 0;
+    struct stat statbuf;
+
+    if(tmp_item->cb_p->aio_lio_opcode == LIO_READ)
+    {
+        ret = tmp_item->cb_p->aio_nbytes;
+    }
+    else if(tmp_item->cb_p->aio_lio_opcode == LIO_WRITE)
+    {
+        gossip_debug(GOSSIP_BSTREAM_DEBUG,
+                     "[null-aio]: pwrite: cb_p: %p, "
+                     "fd: %d, bufp: %p, size: %zd off:%llu\n",
+                     tmp_item->cb_p, tmp_item->cb_p->aio_fildes, 
+                     tmp_item->cb_p->aio_buf, tmp_item->cb_p->aio_nbytes,
+		     llu(tmp_item->cb_p->aio_offset));
+
+        /* check size of file */
+        /* note, if either fstat or ftruncate fail, then we let the ret and
+         * errno drop through to the logic below.  Otherwise we report the
+         * size that would have been written.
+         */
+        ret = fstat(tmp_item->cb_p->aio_fildes, &statbuf);
+        if(ret == 0)
+        {
+            if(statbuf.st_size < 
+                (tmp_item->cb_p->aio_nbytes + tmp_item->cb_p->aio_offset))
+            {
+                /* this write would extend the file */
+                ret = ftruncate(tmp_item->cb_p->aio_fildes, 
+                    (tmp_item->cb_p->aio_nbytes + tmp_item->cb_p->aio_offset));
+                if(ret == 0)
+                {
+                    ret = tmp_item->cb_p->aio_nbytes;
+                }
+            }
+            else
+            {
+                ret = tmp_item->cb_p->aio_nbytes;
+            }
+        }
+    }
+    else
+    {
+        /* this should have been caught already */
+        assert(0);
+    }
+
+    /* store error and return codes */
+    if(ret < 0)
+    {
+#ifdef HAVE_AIOCB_ERROR_CODE
+        tmp_item->cb_p->__error_code = errno;
+#endif
+    }
+    else
+    {
+#ifdef HAVE_AIOCB_ERROR_CODE
+        tmp_item->cb_p->__error_code = 0;
+#endif
+
+#ifdef HAVE_AIOCB_RETURN_VALUE
+        tmp_item->cb_p->__return_value = ret;
+#endif
+    }
+
+    if(tmp_item->master)
+    {
+        int i;
+        /* I'm the master, gotta wait for the others to call notify */
+
+        /* we skip the last one because that's us */
+        for(i = 0; i < (tmp_item->nent - 1); ++i)
+        {
+            ret = pthread_join(tmp_item->tids[i], NULL);
+            if(ret != 0)
+            {
+                gossip_err("pthread_join failed: %d (%s), i: %d, tid: %p\n",
+                           ret, strerror(ret), i, (void *)tmp_item->tids[i]);
+            }
+        }
+
+        free(tmp_item->tids);
+        /* run callback fn */
+        tmp_item->sig->sigev_notify_function(tmp_item->sig->sigev_value);
+    }
+
+    free(tmp_item);
+
+    pthread_exit(NULL);
+    return NULL;
+}
+
+static int null_aio_bstream_read_list(TROVE_coll_id coll_id,
+                                     TROVE_handle handle,
+                                     char **mem_offset_array, 
+                                     TROVE_size *mem_size_array,
+                                     int mem_count,
+                                     TROVE_offset *stream_offset_array, 
+                                     TROVE_size *stream_size_array,
+                                     int stream_count,
+                                     TROVE_size *out_size_p,
+                                     TROVE_ds_flags flags, 
+                                     TROVE_vtag_s *vtag,
+                                     void *user_ptr,
+                                     TROVE_context_id context_id,
+                                     TROVE_op_id *out_op_id_p,
+                                     PVFS_hint hints)
+{
+    return dbpf_bstream_rw_list(coll_id,
+                                handle,
+                                mem_offset_array, 
+                                mem_size_array,
+                                mem_count,
+                                stream_offset_array, 
+                                stream_size_array,
+                                stream_count,
+                                out_size_p,
+                                flags, 
+                                vtag,
+                                user_ptr,
+                                context_id,
+                                out_op_id_p,
+                                LIO_READ,
+                                &null_aio_ops,
+                                hints);
+}
+
+static int null_aio_bstream_write_list(TROVE_coll_id coll_id,
+                                      TROVE_handle handle,
+                                      char **mem_offset_array, 
+                                      TROVE_size *mem_size_array,
+                                      int mem_count,
+                                      TROVE_offset *stream_offset_array, 
+                                      TROVE_size *stream_size_array,
+                                      int stream_count,
+                                      TROVE_size *out_size_p,
+                                      TROVE_ds_flags flags, 
+                                      TROVE_vtag_s *vtag,
+                                      void *user_ptr,
+                                      TROVE_context_id context_id,
+                                      TROVE_op_id *out_op_id_p,
+                                      PVFS_hint hints)
+{
+    return dbpf_bstream_rw_list(coll_id,
+                                handle,
+                                mem_offset_array, 
+                                mem_size_array,
+                                mem_count,
+                                stream_offset_array, 
+                                stream_size_array,
+                                stream_count,
+                                out_size_p,
+                                flags, 
+                                vtag,
+                                user_ptr,
+                                context_id,
+                                out_op_id_p,
+                                LIO_WRITE,
+                                &null_aio_ops,
+                                hints);
+}
+
+static struct dbpf_aio_ops null_aio_ops =
+{
+    null_aio_read,
+    null_aio_write,
+    null_lio_listio,
+    null_aio_error,
+    null_aio_return,
+    null_aio_cancel,
+    null_aio_suspend,
+    null_aio_fsync
+};
+
+struct TROVE_bstream_ops null_aio_bstream_ops =
+{
+    dbpf_bstream_read_at,
+    dbpf_bstream_write_at,
+    dbpf_bstream_resize,
+    dbpf_bstream_validate,
+    null_aio_bstream_read_list,
+    null_aio_bstream_write_list,
+    dbpf_bstream_flush
+};
+
+/*
+ * Local variables:
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ts=8 sts=4 sw=4 expandtab
+ */

Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.86.4.1 -r1.86.4.2
--- dbpf-keyval.c	7 Apr 2008 16:31:28 -0000	1.86.4.1
+++ dbpf-keyval.c	21 May 2008 18:55:26 -0000	1.86.4.2
@@ -1506,7 +1506,7 @@ static int dbpf_keyval_iterate_skip_to_p
         /* strip the session out of the position; we need to use a true
          * integer offset if we get past the cache
          */
-        pos = pos & 0xffff;
+        pos = pos & 0xffffffff;
         return dbpf_keyval_iterate_step_to_position(handle, pos, dbc_p);
     }
 

Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/module.mk.in,v
diff -p -u -r1.20 -r1.20.20.1
--- module.mk.in	18 Oct 2006 16:01:12 -0000	1.20
+++ module.mk.in	21 May 2008 18:55:26 -0000	1.20.20.1
@@ -15,7 +15,8 @@ SERVERSRC += \
 	$(DIR)/dbpf-mgmt.c \
 	$(DIR)/dbpf-keyval-pcache.c \
 	$(DIR)/dbpf-sync.c \
-	$(DIR)/dbpf-alt-aio.c 
+	$(DIR)/dbpf-alt-aio.c \
+	$(DIR)/dbpf-null-aio.c 
 
 # Grab trove-ledger.h from handle-mgmt.  Also make _GNU_SOURCE definition 
 # required for access to pread/pwrite on Linux.  _XOPEN_SOURCE seems to be



More information about the Pvfs2-cvs mailing list