[Pvfs2-cvs] commit by slang in pvfs2/src/io/trove/trove-dbpf:
dbpf-bstream-direct.c dbpf-thread.c
CVS commit program
cvs at parl.clemson.edu
Tue Jul 29 18:49:09 EDT 2008
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv10031/src/io/trove/trove-dbpf
Modified Files:
Tag: directio-branch
dbpf-bstream-direct.c dbpf-thread.c
Log Message:
adding threaded odirect impl
Index: dbpf-bstream-direct.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/Attic/dbpf-bstream-direct.c,v
diff -p -u -r1.1.2.2 -r1.1.2.3
--- dbpf-bstream-direct.c 22 Jul 2008 22:34:21 -0000 1.1.2.2
+++ dbpf-bstream-direct.c 29 Jul 2008 22:49:09 -0000 1.1.2.3
@@ -28,6 +28,9 @@
#include "dbpf-attr-cache.h"
#include "dbpf-bstream.h"
#include "pint-mem.h"
+#include "pint-mgmt.h"
+#include "pint-context.h"
+#include "pint-op.h"
typedef struct
{
@@ -170,10 +173,22 @@ static size_t direct_write(int fd,
void * aligned_buf;
size_t aligned_size;
off_t aligned_offset, end_offset, aligned_end_offset;
+ struct flock writelock;
aligned_size = ALIGNED_SIZE(write_offset, size);
aligned_offset = ALIGNED_OFFSET(write_offset);
+ writelock.l_type = F_WRLCK;
+ writelock.l_whence = SEEK_SET;
+ writelock.l_start = (off_t)aligned_offset;
+ writelock.l_len = (off_t)aligned_size;
+ ret = fcntl(fd, F_SETLKW, &writelock);
+ if(ret < 0 && errno == EINTR)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+ writelock.l_type = F_UNLCK;
+
/* if the buffer passed in, the offsets, and the size are all
* aligned properly, just pass through directly
*/
@@ -181,8 +196,15 @@ static size_t direct_write(int fd,
ALIGNED_OFFSET(buf_offset) == buf_offset &&
aligned_size == size)
{
- return direct_aligned_write(fd, buf, buf_offset,
- size, write_offset, stream_size);
+ int unlck_ret;
+ ret = direct_aligned_write(fd, buf, buf_offset,
+ size, write_offset, stream_size);
+ unlck_ret = fcntl(fd, F_SETLK, &writelock);
+ if(unlck_ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+ return ret;
}
gossip_debug(GOSSIP_DIRECTIO_DEBUG,
@@ -201,6 +223,12 @@ static size_t direct_write(int fd,
aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
if(!aligned_buf)
{
+ ret = fcntl(fd, F_SETLK, &writelock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
return -ENOMEM;
}
@@ -216,11 +244,18 @@ static size_t direct_write(int fd,
ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
if(ret < 0)
{
+ int pread_errno = errno;
gossip_err(
"direct_memcpy_write: RMW failed at "
"beginning of request\n");
PINT_mem_aligned_free(aligned_buf);
- return -errno;
+ ret = fcntl(fd, F_SETLK, &writelock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
+ return -trove_errno_to_trove_error(pread_errno);
}
}
else
@@ -243,10 +278,17 @@ static size_t direct_write(int fd,
aligned_end_offset - BLOCK_SIZE);
if(ret < 0)
{
+ int pread_errno = errno;
gossip_err(
"direct_memcpy_write: RMW failed at end of request\n");
PINT_mem_aligned_free(aligned_buf);
- return -errno;
+ ret = fcntl(fd, F_SETLK, &writelock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
+ return -trove_errno_to_trove_error(pread_errno);
}
}
else
@@ -267,6 +309,12 @@ static size_t direct_write(int fd,
PINT_mem_aligned_free(aligned_buf);
+ ret = fcntl(fd, F_SETLK, &writelock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
return size;
}
@@ -347,6 +395,7 @@ static size_t direct_read(int fd,
off_t aligned_offset;
size_t aligned_size, read_size;
size_t ret;
+ struct flock readlock;
if(file_offset > stream_size)
{
@@ -362,17 +411,43 @@ static size_t direct_read(int fd,
aligned_offset = ALIGNED_OFFSET(file_offset);
aligned_size = ALIGNED_SIZE(file_offset, read_size);
+ readlock.l_type = F_RDLCK;
+ readlock.l_whence = SEEK_SET;
+ readlock.l_start = (off_t)aligned_offset;
+ readlock.l_len = (off_t)aligned_size;
+ ret = fcntl(fd, F_SETLKW, &readlock);
+ if(ret < 0 && errno == EINTR)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+ readlock.l_type = F_UNLCK;
+
if(IS_ALIGNED_PTR(buf) &&
ALIGNED_OFFSET(buf_offset) == buf_offset &&
aligned_size == read_size)
{
- return direct_aligned_read(fd, buf, buf_offset, read_size,
- file_offset, stream_size);
+ int unlck_ret;
+
+ ret = direct_aligned_read(fd, buf, buf_offset, read_size,
+ file_offset, stream_size);
+
+ unlck_ret = fcntl(fd, F_SETLKW, &readlock);
+ if(unlck_ret < 0 && errno == EINTR)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+ return ret;
}
aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
if(!aligned_buf)
{
+ ret = fcntl(fd, F_SETLK, &readlock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
return -ENOMEM;
}
@@ -381,6 +456,13 @@ static size_t direct_read(int fd,
if(ret < 0)
{
PINT_mem_aligned_free(aligned_buf);
+
+ ret = fcntl(fd, F_SETLK, &readlock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
return ret;
}
@@ -390,27 +472,33 @@ static size_t direct_read(int fd,
PINT_mem_aligned_free(aligned_buf);
+ ret = fcntl(fd, F_SETLK, &readlock);
+ if(ret < 0)
+ {
+ return -trove_errno_to_trove_error(errno);
+ }
+
return read_size;
}
-static int dbpf_bstream_direct_read_op_svc(struct dbpf_op *op_p)
+static int dbpf_bstream_direct_read_op_svc(void *ptr, PVFS_hint *hint)
{
int ret = -TROVE_EINVAL;
TROVE_object_ref ref;
TROVE_ds_attributes attr;
- dbpf_queued_op_t *q_op_p;
+ dbpf_queued_op_t *qop_p;
struct dbpf_bstream_rw_list_op *rw_op;
dbpf_stream_extents_t *stream_extents;
int i, extent_count;
- q_op_p = (dbpf_queued_op_t *)op_p->u.b_rw_list.queued_op_ptr;
- rw_op = &op_p->u.b_rw_list;
+ rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
+ qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
- ref.fs_id = op_p->coll_p->coll_id;
- ref.handle = op_p->handle;
+ ref.fs_id = qop_p->op.coll_p->coll_id;
+ ref.handle = qop_p->op.handle;
/* not in attribute cache. get the size from dspace */
- ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
+ ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
if(ret != 0)
{
return ret;
@@ -468,24 +556,24 @@ static int dbpf_bstream_direct_read_op_s
return DBPF_OP_COMPLETE;
}
-static int dbpf_bstream_direct_write_op_svc(struct dbpf_op *op_p)
+static int dbpf_bstream_direct_write_op_svc(void *ptr, PVFS_hint *hint)
{
int ret = -TROVE_EINVAL;
TROVE_object_ref ref;
TROVE_ds_attributes attr;
dbpf_stream_extents_t *stream_extents;
int i, extent_count;
- dbpf_queued_op_t *q_op_p;
struct dbpf_bstream_rw_list_op *rw_op;
+ dbpf_queued_op_t *qop_p;
int eor = -1;
- q_op_p = (dbpf_queued_op_t *)(op_p->u.b_rw_list.queued_op_ptr);
- rw_op = &op_p->u.b_rw_list;
+ rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
+ qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
- ref.fs_id = op_p->coll_p->coll_id;
- ref.handle = op_p->handle;
+ ref.fs_id = qop_p->op.coll_p->coll_id;
+ ref.handle = qop_p->op.handle;
- ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
+ ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
if(ret != 0)
{
return ret;
@@ -553,26 +641,28 @@ static int dbpf_bstream_direct_write_op_
/* set the size of the file */
attr.u.datafile.b_size = eor;
/* We want to hit the coalesce path, so we queue up the setattr */
- dbpf_queued_op_init(q_op_p,
+
+ dbpf_queued_op_init(qop_p,
DSPACE_SETATTR,
ref.handle,
- op_p->coll_p,
+ qop_p->op.coll_p,
dbpf_dspace_setattr_op_svc,
- q_op_p->op.user_ptr,
+ qop_p->op.user_ptr,
TROVE_SYNC,
- q_op_p->op.context_id);
- op_p->u.d_setattr.attr_p = malloc(sizeof(*op_p->u.d_setattr.attr_p));
- if(!op_p->u.d_setattr.attr_p)
+ qop_p->op.context_id);
+ qop_p->op.u.d_setattr.attr_p = malloc(sizeof(*qop_p->op.u.d_setattr.attr_p));
+ if(!qop_p->op.u.d_setattr.attr_p)
{
- dbpf_queued_op_free(q_op_p);
+ dbpf_queued_op_free(qop_p);
return -TROVE_ENOMEM;
}
- *op_p->u.d_setattr.attr_p = attr;
+ *qop_p->op.u.d_setattr.attr_p = attr;
+ dbpf_queued_op_queue(qop_p);
- return DBPF_OP_CONTINUE;
+ return PINT_MGMT_OP_CONTINUE;
}
- return DBPF_OP_COMPLETE;
+ return PINT_MGMT_OP_COMPLETED;
}
static int dbpf_bstream_direct_read_at(TROVE_coll_id coll_id,
@@ -603,6 +693,9 @@ static int dbpf_bstream_direct_write_at(
return -TROVE_ENOSYS;
}
+extern PINT_manager_t io_thread_mgr;
+extern PINT_worker_id io_worker_id;
+
static int dbpf_bstream_direct_read_list(TROVE_coll_id coll_id,
TROVE_handle handle,
char **mem_offset_array,
@@ -622,6 +715,7 @@ static int dbpf_bstream_direct_read_list
dbpf_queued_op_t *q_op_p = NULL;
struct dbpf_bstream_rw_list_op *op;
struct dbpf_collection *coll_p = NULL;
+ PINT_op_id mgr_id;
int ret;
coll_p = dbpf_collection_find_registered(coll_id);
@@ -641,7 +735,7 @@ static int dbpf_bstream_direct_read_list
BSTREAM_READ_LIST,
handle,
coll_p,
- dbpf_bstream_direct_read_op_svc,
+ NULL,
user_ptr,
flags,
context_id);
@@ -677,7 +771,14 @@ static int dbpf_bstream_direct_read_list
return ret;
}
- *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+ *out_op_id_p = q_op_p->op.id;
+ ret = PINT_manager_id_post(
+ io_thread_mgr, q_op_p, &mgr_id,
+ dbpf_bstream_direct_read_op_svc, op, NULL, io_worker_id);
+ if(ret < 0)
+ {
+ return ret;
+ }
return DBPF_OP_CONTINUE;
}
@@ -702,6 +803,7 @@ static int dbpf_bstream_direct_write_lis
struct dbpf_bstream_rw_list_op *op;
struct dbpf_collection *coll_p = NULL;
int ret;
+ PINT_op_id mgr_id;
coll_p = dbpf_collection_find_registered(coll_id);
if (coll_p == NULL)
@@ -710,21 +812,20 @@ static int dbpf_bstream_direct_write_lis
}
q_op_p = dbpf_queued_op_alloc();
- if (q_op_p == NULL)
+ if(!q_op_p)
{
return -TROVE_ENOMEM;
}
-
- /* initialize all the common members */
dbpf_queued_op_init(q_op_p,
BSTREAM_WRITE_LIST,
handle,
coll_p,
- dbpf_bstream_direct_write_op_svc,
+ NULL,
user_ptr,
- flags,
+ 0,
context_id);
- op = (struct dbpf_bstream_rw_list_op *)(&q_op_p->op.u.b_rw_list);
+
+ op = &q_op_p->op.u.b_rw_list;
/* initialize the op-specific members */
op->stream_array_count = stream_count;
@@ -747,7 +848,10 @@ static int dbpf_bstream_direct_write_lis
return ret;
}
- *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+ PINT_manager_id_post(
+ io_thread_mgr, q_op_p, &mgr_id,
+ dbpf_bstream_direct_write_op_svc, op, NULL, io_worker_id);
+ *out_op_id_p = q_op_p->op.id;
return DBPF_OP_CONTINUE;
}
Index: dbpf-thread.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-thread.c,v
diff -p -u -r1.39 -r1.39.10.1
--- dbpf-thread.c 19 Mar 2008 16:42:09 -0000 1.39
+++ dbpf-thread.c 29 Jul 2008 22:49:09 -0000 1.39.10.1
@@ -18,6 +18,8 @@
#include "dbpf-bstream.h"
#include "dbpf-op-queue.h"
#include "dbpf-sync.h"
+#include "pint-context.h"
+#include "pint-mgmt.h"
extern struct qlist_head dbpf_op_queue;
extern gen_mutex_t dbpf_op_queue_mutex;
@@ -31,11 +33,22 @@ pthread_cond_t dbpf_op_incoming_cond = P
pthread_cond_t dbpf_op_completed_cond = PTHREAD_COND_INITIALIZER;
#endif
+int PINT_dbpf_io_completion_callback(PINT_context_id ctx_id,
+ int count,
+ PINT_op_id *op_ids,
+ void **user_ptrs,
+ PVFS_error *errors);
+
+PINT_manager_t io_thread_mgr;
+PINT_worker_id io_worker_id;
+
int dbpf_thread_initialize(void)
{
int ret = 0;
#ifdef __PVFS2_TROVE_THREADED__
ret = -1;
+ PINT_context_id io_ctx;
+ PINT_worker_attr_t io_worker_attrs;
pthread_cond_init(&dbpf_op_incoming_cond, NULL);
pthread_cond_init(&dbpf_op_completed_cond, NULL);
@@ -54,6 +67,32 @@ int dbpf_thread_initialize(void)
gossip_debug(
GOSSIP_TROVE_DEBUG, "dbpf_thread_initialize: failed (1)\n");
}
+
+ /* fire up the IO threads for direct IO */
+
+ ret = PINT_open_context(&io_ctx, PINT_dbpf_io_completion_callback);
+ if(ret < 0)
+ {
+ return ret;
+ }
+
+ ret = PINT_manager_init(&io_thread_mgr, io_ctx);
+ if(ret < 0)
+ {
+ PINT_close_context(io_ctx);
+ return ret;
+ }
+
+ io_worker_attrs.type = PINT_WORKER_TYPE_PER_OP;
+ io_worker_attrs.u.per_op.max_threads = 10000;
+ ret = PINT_manager_worker_add(io_thread_mgr, &io_worker_attrs, &io_worker_id);
+ if(ret < 0)
+ {
+ PINT_manager_destroy(io_thread_mgr);
+ PINT_close_context(io_ctx);
+ return ret;
+ }
+
#endif
return ret;
}
@@ -241,6 +280,27 @@ int dbpf_do_one_work_cycle(int *out_coun
} while(--max_num_ops_to_service);
#endif
+
+ return 0;
+}
+
+int PINT_dbpf_io_completion_callback(PINT_context_id ctx_id,
+ int count,
+ PINT_op_id *op_ids,
+ void **user_ptrs,
+ PVFS_error *errors)
+{
+ int i;
+ dbpf_queued_op_t *qop_p;
+
+ for(i = 0; i < count; ++i)
+ {
+ if(errors[i] == PINT_MGMT_OP_COMPLETED)
+ {
+ qop_p = (dbpf_queued_op_t *)(user_ptrs[i]);
+ dbpf_queued_op_complete(qop_p, OP_COMPLETED);
+ }
+ }
return 0;
}
More information about the Pvfs2-cvs
mailing list