[Pvfs2-cvs] commit by slang in pvfs2/src/io/trove/trove-dbpf: dbpf-bstream-direct.c dbpf-thread.c

CVS commit program cvs at parl.clemson.edu
Tue Jul 29 18:49:09 EDT 2008


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv10031/src/io/trove/trove-dbpf

Modified Files:
      Tag: directio-branch
	dbpf-bstream-direct.c dbpf-thread.c 
Log Message:
adding threaded odirect impl


Index: dbpf-bstream-direct.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/Attic/dbpf-bstream-direct.c,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- dbpf-bstream-direct.c	22 Jul 2008 22:34:21 -0000	1.1.2.2
+++ dbpf-bstream-direct.c	29 Jul 2008 22:49:09 -0000	1.1.2.3
@@ -28,6 +28,9 @@
 #include "dbpf-attr-cache.h"
 #include "dbpf-bstream.h"
 #include "pint-mem.h"
+#include "pint-mgmt.h"
+#include "pint-context.h"
+#include "pint-op.h"
 
 typedef struct
 {
@@ -170,10 +173,22 @@ static size_t direct_write(int fd,
     void * aligned_buf;
     size_t aligned_size;
     off_t aligned_offset, end_offset, aligned_end_offset;
+    struct flock writelock;
 
     aligned_size = ALIGNED_SIZE(write_offset, size);
     aligned_offset = ALIGNED_OFFSET(write_offset);
 
+    writelock.l_type = F_WRLCK;
+    writelock.l_whence = SEEK_SET;
+    writelock.l_start = (off_t)aligned_offset;
+    writelock.l_len = (off_t)aligned_size;
+    ret = fcntl(fd, F_SETLKW, &writelock);
+    if(ret < 0 && errno == EINTR)
+    {
+        return -trove_errno_to_trove_error(errno);
+    }
+    writelock.l_type = F_UNLCK;
+
     /* if the buffer passed in, the offsets, and the size are all
      * aligned properly, just pass through directly
      */
@@ -181,8 +196,15 @@ static size_t direct_write(int fd,
        ALIGNED_OFFSET(buf_offset) == buf_offset &&
        aligned_size == size)
     {
-        return direct_aligned_write(fd, buf, buf_offset, 
-                                     size, write_offset, stream_size);
+        int unlck_ret;
+        ret = direct_aligned_write(fd, buf, buf_offset, 
+                                   size, write_offset, stream_size);
+        unlck_ret = fcntl(fd, F_SETLK, &writelock);
+        if(unlck_ret < 0)
+        {
+            return -trove_errno_to_trove_error(errno);
+        }
+        return ret;
     }
 
     gossip_debug(GOSSIP_DIRECTIO_DEBUG, 
@@ -201,6 +223,12 @@ static size_t direct_write(int fd,
     aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
     if(!aligned_buf)
     {
+        ret = fcntl(fd, F_SETLK, &writelock);
+        if(ret < 0)
+        {
+            return -trove_errno_to_trove_error(errno);
+        }
+
         return -ENOMEM;
     }
 
@@ -216,11 +244,18 @@ static size_t direct_write(int fd,
             ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
             if(ret < 0)
             {
+                int pread_errno = errno;
                 gossip_err(
                     "direct_memcpy_write: RMW failed at "
                     "beginning of request\n");
                 PINT_mem_aligned_free(aligned_buf);
-                return -errno;
+                ret = fcntl(fd, F_SETLK, &writelock);
+                if(ret < 0)
+                {
+                    return -trove_errno_to_trove_error(errno);
+                }
+
+                return -trove_errno_to_trove_error(pread_errno);
             }
         }
         else
@@ -243,10 +278,17 @@ static size_t direct_write(int fd,
                             aligned_end_offset - BLOCK_SIZE);
             if(ret < 0)
             {
+                int pread_errno = errno;
                 gossip_err(
                     "direct_memcpy_write: RMW failed at end of request\n");
                 PINT_mem_aligned_free(aligned_buf);
-                return -errno;
+                ret = fcntl(fd, F_SETLK, &writelock);
+                if(ret < 0)
+                {
+                    return -trove_errno_to_trove_error(errno);
+                }
+
+                return -trove_errno_to_trove_error(pread_errno);
             }
         }
         else
@@ -267,6 +309,12 @@ static size_t direct_write(int fd,
 
     PINT_mem_aligned_free(aligned_buf);
 
+    ret = fcntl(fd, F_SETLK, &writelock);
+    if(ret < 0)
+    {
+        return -trove_errno_to_trove_error(errno);
+    }
+
     return size;
 }
 
@@ -347,6 +395,7 @@ static size_t direct_read(int fd,
     off_t aligned_offset;
     size_t aligned_size, read_size;
     size_t ret;
+    struct flock readlock;
 
     if(file_offset > stream_size)
     {
@@ -362,17 +411,43 @@ static size_t direct_read(int fd,
     aligned_offset = ALIGNED_OFFSET(file_offset);
     aligned_size = ALIGNED_SIZE(file_offset, read_size);
 
+    readlock.l_type = F_RDLCK;
+    readlock.l_whence = SEEK_SET;
+    readlock.l_start = (off_t)aligned_offset;
+    readlock.l_len = (off_t)aligned_size;
+    ret = fcntl(fd, F_SETLKW, &readlock);
+    if(ret < 0 && errno == EINTR)
+    {
+        return -trove_errno_to_trove_error(errno);
+    }
+    readlock.l_type = F_UNLCK;
+
     if(IS_ALIGNED_PTR(buf) &&
        ALIGNED_OFFSET(buf_offset) == buf_offset &&
        aligned_size == read_size)
     {
-        return direct_aligned_read(fd, buf, buf_offset, read_size, 
-                                    file_offset, stream_size);
+        int unlck_ret;
+
+        ret = direct_aligned_read(fd, buf, buf_offset, read_size, 
+                                  file_offset, stream_size);
+
+        unlck_ret = fcntl(fd, F_SETLKW, &readlock);
+        if(unlck_ret < 0 && errno == EINTR)
+        {
+            return -trove_errno_to_trove_error(errno);
+        }
+        return ret;
     }
 
     aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
     if(!aligned_buf)
     {
+        ret = fcntl(fd, F_SETLK, &readlock);
+        if(ret < 0)
+        {
+            return -trove_errno_to_trove_error(errno);
+        }
+
         return -ENOMEM;
     }
 
@@ -381,6 +456,13 @@ static size_t direct_read(int fd,
     if(ret < 0)
     {
         PINT_mem_aligned_free(aligned_buf);
+
+        ret = fcntl(fd, F_SETLK, &readlock);
+        if(ret < 0)
+        {
+            return -trove_errno_to_trove_error(errno);
+        }
+
         return ret;
     }
 
@@ -390,27 +472,33 @@ static size_t direct_read(int fd,
 
     PINT_mem_aligned_free(aligned_buf);
 
+    ret = fcntl(fd, F_SETLK, &readlock);
+    if(ret < 0)
+    {
+        return -trove_errno_to_trove_error(errno);
+    }
+
     return read_size;
 }
 
-static int dbpf_bstream_direct_read_op_svc(struct dbpf_op *op_p)
+static int dbpf_bstream_direct_read_op_svc(void *ptr, PVFS_hint *hint)
 {
     int ret = -TROVE_EINVAL;
     TROVE_object_ref ref;
     TROVE_ds_attributes attr;
-    dbpf_queued_op_t *q_op_p;
+    dbpf_queued_op_t *qop_p;
     struct dbpf_bstream_rw_list_op *rw_op;
     dbpf_stream_extents_t *stream_extents;
     int i, extent_count;
 
-    q_op_p = (dbpf_queued_op_t *)op_p->u.b_rw_list.queued_op_ptr;
-    rw_op = &op_p->u.b_rw_list;
+    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
+    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
 
-    ref.fs_id = op_p->coll_p->coll_id;
-    ref.handle = op_p->handle;
+    ref.fs_id = qop_p->op.coll_p->coll_id;
+    ref.handle = qop_p->op.handle;
 
     /* not in attribute cache.  get the size from dspace */
-    ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
+    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
     if(ret != 0)
     {
         return ret;
@@ -468,24 +556,24 @@ static int dbpf_bstream_direct_read_op_s
     return DBPF_OP_COMPLETE;
 }
 
-static int dbpf_bstream_direct_write_op_svc(struct dbpf_op *op_p)
+static int dbpf_bstream_direct_write_op_svc(void *ptr, PVFS_hint *hint)
 {
     int ret = -TROVE_EINVAL;
     TROVE_object_ref ref;
     TROVE_ds_attributes attr;
     dbpf_stream_extents_t *stream_extents;
     int i, extent_count;
-    dbpf_queued_op_t *q_op_p;
     struct dbpf_bstream_rw_list_op *rw_op;
+    dbpf_queued_op_t *qop_p;
     int eor = -1;
 
-    q_op_p = (dbpf_queued_op_t *)(op_p->u.b_rw_list.queued_op_ptr);
-    rw_op = &op_p->u.b_rw_list;
+    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
+    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
 
-    ref.fs_id = op_p->coll_p->coll_id;
-    ref.handle = op_p->handle;
+    ref.fs_id = qop_p->op.coll_p->coll_id;
+    ref.handle = qop_p->op.handle;
 
-    ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
+    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
     if(ret != 0)
     {
         return ret;
@@ -553,26 +641,28 @@ static int dbpf_bstream_direct_write_op_
         /* set the size of the file */
         attr.u.datafile.b_size = eor;
         /* We want to hit the coalesce path, so we queue up the setattr */
-        dbpf_queued_op_init(q_op_p,
+
+        dbpf_queued_op_init(qop_p,
                             DSPACE_SETATTR,
                             ref.handle,
-                            op_p->coll_p,
+                            qop_p->op.coll_p,
                             dbpf_dspace_setattr_op_svc,
-                            q_op_p->op.user_ptr,
+                            qop_p->op.user_ptr,
                             TROVE_SYNC,
-                            q_op_p->op.context_id);
-        op_p->u.d_setattr.attr_p = malloc(sizeof(*op_p->u.d_setattr.attr_p));
-        if(!op_p->u.d_setattr.attr_p)
+                            qop_p->op.context_id);
+        qop_p->op.u.d_setattr.attr_p = malloc(sizeof(*qop_p->op.u.d_setattr.attr_p));
+        if(!qop_p->op.u.d_setattr.attr_p)
         {
-            dbpf_queued_op_free(q_op_p);
+            dbpf_queued_op_free(qop_p);
             return -TROVE_ENOMEM;
         }
-        *op_p->u.d_setattr.attr_p = attr;
+        *qop_p->op.u.d_setattr.attr_p = attr;
+        dbpf_queued_op_queue(qop_p);
 
-        return DBPF_OP_CONTINUE;
+        return PINT_MGMT_OP_CONTINUE;
     }
 
-    return DBPF_OP_COMPLETE;
+    return PINT_MGMT_OP_COMPLETED;
 }
 
 static int dbpf_bstream_direct_read_at(TROVE_coll_id coll_id,
@@ -603,6 +693,9 @@ static int dbpf_bstream_direct_write_at(
     return -TROVE_ENOSYS;
 }
 
+extern PINT_manager_t io_thread_mgr;
+extern PINT_worker_id io_worker_id;
+
 static int dbpf_bstream_direct_read_list(TROVE_coll_id coll_id,
                                          TROVE_handle handle,
                                          char **mem_offset_array, 
@@ -622,6 +715,7 @@ static int dbpf_bstream_direct_read_list
     dbpf_queued_op_t *q_op_p = NULL;
     struct dbpf_bstream_rw_list_op *op;
     struct dbpf_collection *coll_p = NULL;
+    PINT_op_id mgr_id;
     int ret;
 
     coll_p = dbpf_collection_find_registered(coll_id);
@@ -641,7 +735,7 @@ static int dbpf_bstream_direct_read_list
                         BSTREAM_READ_LIST,
                         handle,
                         coll_p,
-                        dbpf_bstream_direct_read_op_svc,
+                        NULL,
                         user_ptr,
                         flags,
                         context_id);
@@ -677,7 +771,14 @@ static int dbpf_bstream_direct_read_list
         return ret;
     }
 
-    *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+    *out_op_id_p = q_op_p->op.id;
+    ret = PINT_manager_id_post(
+        io_thread_mgr, q_op_p, &mgr_id,
+        dbpf_bstream_direct_read_op_svc, op, NULL, io_worker_id);
+    if(ret < 0)
+    {
+        return ret;
+    }
 
     return DBPF_OP_CONTINUE;
 }
@@ -702,6 +803,7 @@ static int dbpf_bstream_direct_write_lis
     struct dbpf_bstream_rw_list_op *op;
     struct dbpf_collection *coll_p = NULL;
     int ret;
+    PINT_op_id mgr_id;
 
     coll_p = dbpf_collection_find_registered(coll_id);
     if (coll_p == NULL)
@@ -710,21 +812,20 @@ static int dbpf_bstream_direct_write_lis
     }
 
     q_op_p = dbpf_queued_op_alloc();
-    if (q_op_p == NULL)
+    if(!q_op_p)
     {
         return -TROVE_ENOMEM;
     }
-
-    /* initialize all the common members */
     dbpf_queued_op_init(q_op_p,
                         BSTREAM_WRITE_LIST,
                         handle,
                         coll_p,
-                        dbpf_bstream_direct_write_op_svc,
+                        NULL,
                         user_ptr,
-                        flags,
+                        0,
                         context_id);
-    op = (struct dbpf_bstream_rw_list_op *)(&q_op_p->op.u.b_rw_list);
+
+    op = &q_op_p->op.u.b_rw_list;
 
     /* initialize the op-specific members */
     op->stream_array_count = stream_count;
@@ -747,7 +848,10 @@ static int dbpf_bstream_direct_write_lis
         return ret;
     }
 
-    *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+    PINT_manager_id_post(
+        io_thread_mgr, q_op_p, &mgr_id,
+        dbpf_bstream_direct_write_op_svc, op, NULL, io_worker_id);
+    *out_op_id_p = q_op_p->op.id;
 
     return DBPF_OP_CONTINUE;
 }

Index: dbpf-thread.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.c,v
diff -p -u -r1.39 -r1.39.10.1
--- dbpf-thread.c	19 Mar 2008 16:42:09 -0000	1.39
+++ dbpf-thread.c	29 Jul 2008 22:49:09 -0000	1.39.10.1
@@ -18,6 +18,8 @@
 #include "dbpf-bstream.h"
 #include "dbpf-op-queue.h"
 #include "dbpf-sync.h"
+#include "pint-context.h"
+#include "pint-mgmt.h"
 
 extern struct qlist_head dbpf_op_queue;
 extern gen_mutex_t dbpf_op_queue_mutex;
@@ -31,11 +33,22 @@ pthread_cond_t dbpf_op_incoming_cond = P
 pthread_cond_t dbpf_op_completed_cond = PTHREAD_COND_INITIALIZER;
 #endif
 
+int PINT_dbpf_io_completion_callback(PINT_context_id ctx_id,
+                                     int count,
+                                     PINT_op_id *op_ids,
+                                     void **user_ptrs,
+                                     PVFS_error *errors);
+
+PINT_manager_t io_thread_mgr;
+PINT_worker_id io_worker_id;
+
 int dbpf_thread_initialize(void)
 {
     int ret = 0;
 #ifdef __PVFS2_TROVE_THREADED__
     ret = -1;
+    PINT_context_id io_ctx;
+    PINT_worker_attr_t io_worker_attrs;
 
     pthread_cond_init(&dbpf_op_incoming_cond, NULL);
     pthread_cond_init(&dbpf_op_completed_cond, NULL);
@@ -54,6 +67,32 @@ int dbpf_thread_initialize(void)
         gossip_debug(
             GOSSIP_TROVE_DEBUG, "dbpf_thread_initialize: failed (1)\n");
     }
+
+    /* fire up the IO threads for direct IO */
+
+    ret = PINT_open_context(&io_ctx, PINT_dbpf_io_completion_callback);
+    if(ret < 0)
+    {
+    return ret;
+    }
+
+    ret = PINT_manager_init(&io_thread_mgr, io_ctx);
+    if(ret < 0)
+    {
+        PINT_close_context(io_ctx);
+        return ret;
+    }
+
+    io_worker_attrs.type = PINT_WORKER_TYPE_PER_OP;
+    io_worker_attrs.u.per_op.max_threads = 10000;
+    ret = PINT_manager_worker_add(io_thread_mgr, &io_worker_attrs, &io_worker_id);
+    if(ret < 0)
+    {
+        PINT_manager_destroy(io_thread_mgr);
+        PINT_close_context(io_ctx);
+        return ret;
+    }
+
 #endif
     return ret;
 }
@@ -241,6 +280,27 @@ int dbpf_do_one_work_cycle(int *out_coun
 
     } while(--max_num_ops_to_service);
 #endif
+
+    return 0;
+}
+
+int PINT_dbpf_io_completion_callback(PINT_context_id ctx_id,
+                                     int count,
+                                     PINT_op_id *op_ids,
+                                     void **user_ptrs,
+                                     PVFS_error *errors)
+{
+    int i;
+    dbpf_queued_op_t *qop_p;
+
+    for(i = 0; i < count; ++i)
+    {
+        if(errors[i] == PINT_MGMT_OP_COMPLETED)
+        {
+            qop_p = (dbpf_queued_op_t *)(user_ptrs[i]);
+            dbpf_queued_op_complete(qop_p, OP_COMPLETED);
+        }
+    }
 
     return 0;
 }



More information about the Pvfs2-cvs mailing list