[Pvfs2-cvs] commit by slang in pvfs2/src/io/trove/trove-dbpf:
dbpf-bstream-aio.c dbpf-bstream-threaded.c dbpf.h
CVS commit program
cvs at parl.clemson.edu
Thu Aug 3 12:06:56 EDT 2006
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv7112/src/io/trove/trove-dbpf
Modified Files:
Tag: kunkel-branch
dbpf-bstream-aio.c dbpf-bstream-threaded.c dbpf.h
Log Message:
cleanup of aio code, fix minor style issues.
Index: dbpf-bstream-aio.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-bstream-aio.c,v
diff -p -u -r1.24.6.5 -r1.24.6.6
--- dbpf-bstream-aio.c 2 Aug 2006 02:13:03 -0000 1.24.6.5
+++ dbpf-bstream-aio.c 3 Aug 2006 16:06:56 -0000 1.24.6.6
@@ -28,19 +28,7 @@
/* bstream-aio functions */
-int dbpf_bstream_listio_convert(
- int fd,
- int op_type,
- char **mem_offset_array,
- TROVE_size *mem_size_array,
- int mem_count,
- TROVE_offset *stream_offset_array,
- TROVE_size *stream_size_array,
- int stream_count,
- struct aiocb *aiocb_array,
- int *aiocb_count,
- struct bstream_listio_state *lio_state
- );
+static int dbpf_bstream_aiocb_init(struct aiocb ** new_aiocb_p);
/* dbpf_bstream_listio_convert()
*
@@ -50,6 +38,29 @@ int dbpf_bstream_listio_convert(
* Stored state in lio_state so that processing can
* continue on subsequent calls.
*/
+static int dbpf_bstream_listio_convert(
+ int fd,
+ int op_type,
+ char **mem_offset_array,
+ TROVE_size *mem_size_array,
+ int mem_count,
+ TROVE_offset *stream_offset_array,
+ TROVE_size *stream_size_array,
+ int stream_count,
+ struct aiocb *aiocb_array,
+ int *aiocb_count,
+ struct bstream_listio_state *lio_state);
+
+static int dbpf_bstream_aio_issue_or_delay(
+ dbpf_queued_op_t * cur_op,
+ struct aiocb **aiocb_ptr_array,
+ int aiocb_inuse_count,
+ struct sigevent *sig,
+ int dec_first);
+
+static void dbpf_bstream_aio_start_delayed(
+ int dec_first);
+
extern gen_mutex_t dbpf_attr_cache_mutex;
@@ -60,14 +71,12 @@ static int s_dbpf_ios_in_progress = 0;
static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
dbpf_op_queue_s s_dbpf_io_ready_queue;
-static int issue_or_delay_io_operation(
- dbpf_queued_op_t * cur_op,
- struct aiocb **aiocb_ptr_array,
- int aiocb_inuse_count,
- struct sigevent *sig,
- int dec_first);
-static void start_delayed_ops_if_any(
- int dec_first);
+static char *list_proc_state_strings[] = {
+ "LIST_PROC_INITIALIZED",
+ "LIST_PROC_INPROGRESS ",
+ "LIST_PROC_ALLCONVERTED",
+ "LIST_PROC_ALLPOSTED",
+};
static inline int dbpf_bstream_rw_list(
TROVE_coll_id coll_id,
@@ -137,73 +146,21 @@ static void aio_progress_notification(
assert(state != OP_COMPLETED);
- /*
- we should iterate through the ops here to determine the
- error/return value of the op based on individual request
- error/return values. they're ignored for now, however.
- */
- for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+ ret = dbpf_bstream_aio_check_progress(op_p);
+ if(ret < 0)
{
- if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
- {
- continue;
- }
-
- /* aio_error gets the "errno" value of the individual op */
- ret = aio_error(&aiocb_p[i]);
- if (ret == 0)
- {
- /* aio_return gets the return value of the individual op */
- ret = aio_return(&aiocb_p[i]);
-
- gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
- "aio_return() says %d [fd = %d]\n",
- __func__,
- ((op_p->type == BSTREAM_WRITE_LIST) ||
- (op_p->type == BSTREAM_WRITE_AT) ?
- "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
-
- *(op_p->u.b_rw_list.out_size_p) += ret;
-
- /* mark as a NOP so we ignore it from now on */
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- }
- else
- {
- gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
- "aio_error/aio_return on block %d; "
- "skipping\n", ret, strerror(ret), i);
-
- ret = -trove_errno_to_trove_error(ret);
- goto final_threaded_aio_cleanup;
- }
+ dbpf_bstream_aio_cleanup(op_p);
}
if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
{
ret = 0;
- final_threaded_aio_cleanup:
- if ((op_p->type == BSTREAM_WRITE_AT) ||
- (op_p->type == BSTREAM_WRITE_LIST))
- {
- DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
- }
-
- dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
- op_p->u.b_rw_list.fd = -1;
-
- gossip_debug(GOSSIP_TROVE_DEBUG, "*** starting delayed ops if any "
- "(state is %s)\n",
- list_proc_state_strings[op_p->u.b_rw_list.
- list_proc_state]);
-
+ dbpf_bstream_aio_cleanup(op_p);
dbpf_move_op_to_completion_queue(cur_op, ret,
((ret ==
-TROVE_ECANCEL) ? OP_CANCELED :
OP_COMPLETED));
-
- start_delayed_ops_if_any(1);
}
else
{
@@ -212,63 +169,17 @@ static void aio_progress_notification(
list_proc_state_strings[op_p->u.b_rw_list.
list_proc_state]);
- /* no operations in progress; convert and post some more */
- op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
- op_p->u.b_rw_list.aiocb_array = aiocb_p;
-
- /* convert listio arguments into aiocb structures */
- aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
- ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
- op_p->u.b_rw_list.opcode,
- op_p->u.b_rw_list.mem_offset_array,
- op_p->u.b_rw_list.mem_size_array,
- op_p->u.b_rw_list.mem_array_count,
- op_p->u.b_rw_list.stream_offset_array,
- op_p->u.b_rw_list.stream_size_array,
- op_p->u.b_rw_list.stream_array_count,
- aiocb_p,
- &aiocb_inuse_count,
- &op_p->u.b_rw_list.lio_state);
-
- if (ret == 1)
- {
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
- }
-
op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
op_p->u.b_rw_list.sigev.sigev_notify_attributes = NULL;
op_p->u.b_rw_list.sigev.sigev_notify_function =
aio_progress_notification;
op_p->u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) cur_op;
- /* mark the unused with LIO_NOPs */
- for (i = aiocb_inuse_count;
- i < op_p->u.b_rw_list.aiocb_array_count; i++)
- {
- /* mark these as NOPs and we'll ignore them */
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- }
-
- for (i = 0; i < aiocb_inuse_count; i++)
- {
- aiocb_ptr_array[i] = &aiocb_p[i];
- }
-
- assert(cur_op == op_p->u.b_rw_list.sigev.sigev_value.sival_ptr);
-
- if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
- {
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
- }
-
- ret =
- issue_or_delay_io_operation(cur_op, aiocb_ptr_array,
- aiocb_inuse_count,
- &op_p->u.b_rw_list.sigev, 1);
-
+ /* no operations in progress; convert and post some more */
+ dbpf_bstream_aio_start_op(op_p->u.b_rw_list.queued_op_ptr, aiocb_p);
if (ret)
{
- gossip_lerr("issue_or_delay_io_operation() returned " "%d\n", ret);
+ gossip_lerr("dbpf_bstream_start_op() returned " "%d\n", ret);
}
}
}
@@ -376,7 +287,7 @@ static void start_delayed_ops_if_any(
gen_mutex_unlock(& s_dbpf_io_mutex);
}
-static int issue_or_delay_io_operation(
+static int dbpf_bstream_aio_issue_or_delay(
dbpf_queued_op_t * cur_op,
struct aiocb **aiocb_ptr_array,
int aiocb_inuse_count,
@@ -1023,73 +934,25 @@ static inline int dbpf_bstream_rw_list(
*out_op_id_p = dbpf_queued_op_queue(q_op_p, & dbpf_op_queue[OP_QUEUE_IO]);
#else
- op_p = &q_op_p->op;
-
- /*
- instead of queueing the op like most other trove operations,
- we're going to issue the system aio calls here to begin being
- serviced immediately. We'll check progress in the
- aio_progress_notification callback method; this array is freed
- in dbpf-op.c:dbpf_queued_op_free
- */
- aiocb_p = (struct aiocb *) malloc((AIOCB_ARRAY_SZ * sizeof(struct aiocb)));
- if (aiocb_p == NULL)
- {
- dbpf_open_cache_put(&q_op_p->op.u.b_rw_list.open_ref);
- return -TROVE_ENOMEM;
- }
-
- memset(aiocb_p, 0, (AIOCB_ARRAY_SZ * sizeof(struct aiocb)));
- for (i = 0; i < AIOCB_ARRAY_SZ; i++)
- {
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
- }
-
- op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
- op_p->u.b_rw_list.aiocb_array = aiocb_p;
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
- /* convert listio arguments into aiocb structures */
- aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
- ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
- op_p->u.b_rw_list.opcode,
- op_p->u.b_rw_list.mem_offset_array,
- op_p->u.b_rw_list.mem_size_array,
- op_p->u.b_rw_list.mem_array_count,
- op_p->u.b_rw_list.stream_offset_array,
- op_p->u.b_rw_list.stream_size_array,
- op_p->u.b_rw_list.stream_array_count,
- aiocb_p,
- &aiocb_inuse_count,
- &op_p->u.b_rw_list.lio_state);
+ /* setup aio event notification */
+ q_op_p->op.u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
+ q_op_p->op.u.b_rw_list.sigev.sigev_notify_attributes = NULL;
+ q_op_p->op.u.b_rw_list.sigev.sigev_notify_function =
+ aio_progress_notification;
+ q_op_p->op.u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) q_op_p;
- if (ret == 1)
+ ret = dbpf_bstream_aiocb_init(&aiocb_p);
+ if(ret < 0)
{
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
- }
-
- op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_THREAD;
- op_p->u.b_rw_list.sigev.sigev_notify_attributes = NULL;
- op_p->u.b_rw_list.sigev.sigev_notify_function = aio_progress_notification;
- op_p->u.b_rw_list.sigev.sigev_value.sival_ptr = (void *) q_op_p;
-
- /* mark unused with LIO_NOPs */
- for (i = aiocb_inuse_count; i < op_p->u.b_rw_list.aiocb_array_count; i++)
- {
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- }
-
- for (i = 0; i < aiocb_inuse_count; i++)
- {
- aiocb_ptr_array[i] = &aiocb_p[i];
+ return ret;
}
- assert(q_op_p == op_p->u.b_rw_list.sigev.sigev_value.sival_ptr);
-
- if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
+ ret = dbpf_bstream_aio_start_op(q_op_p, aiocb_p);
+ if(ret < 0)
{
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
+ dbpf_open_cache_put(&q_op_p->op.u.b_rw_list.open_ref);
+ return ret;
}
dbpf_op_change_status(q_op_p, OP_IN_SERVICE);
@@ -1097,14 +960,6 @@ static inline int dbpf_bstream_rw_list(
id_gen_safe_register(&q_op_p->op.id, q_op_p);
*out_op_id_p = q_op_p->op.id;
- ret =
- issue_or_delay_io_operation(q_op_p, aiocb_ptr_array, aiocb_inuse_count,
- &op_p->u.b_rw_list.sigev, 0);
-
- if (ret)
- {
- return ret;
- }
#endif
return 0;
}
@@ -1150,174 +1005,46 @@ static inline int dbpf_bstream_rw_list(
static int dbpf_bstream_rw_list_op_svc(
struct dbpf_op *op_p)
{
- int ret = -TROVE_EINVAL, i = 0, aiocb_inuse_count = 0;
+ int ret = -TROVE_EINVAL;
int op_in_progress_count = 0;
- struct aiocb *aiocb_p = NULL, *aiocb_ptr_array[AIOCB_ARRAY_SZ];
if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_INITIALIZED)
{
- /*
- first call; need to allocate aiocb array and ptr array;
- this array is freed in dbpf-op.c:dbpf_queued_op_free
+ /* The rw_list has just been posted and no lio_listio operations
+ * have been posted yet, so we do that and return
*/
- aiocb_p = malloc(AIOCB_ARRAY_SZ * sizeof(struct aiocb));
- if (aiocb_p == NULL)
- {
- return -TROVE_ENOMEM;
- }
+ op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_NONE;
- memset(aiocb_p, 0, AIOCB_ARRAY_SZ * sizeof(struct aiocb));
- for (i = 0; i < AIOCB_ARRAY_SZ; i++)
+ ret = dbpf_bstream_aiocb_init(&aiocb_p);
+ if(ret < 0)
{
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+ return ret;
}
- op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
- op_p->u.b_rw_list.aiocb_array = aiocb_p;
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
- op_p->u.b_rw_list.sigev.sigev_notify = SIGEV_NONE;
+ ret = dbpf_bstream_aio_start_op(
+ (dbpf_queued_op_t *)op_p->u.b_rw_list.queued_op_ptr, aiocb_p);
+ return ret;
}
- else
- {
- /* operations potentially in progress */
- aiocb_p = op_p->u.b_rw_list.aiocb_array;
-
- /* check to see how we're progressing on previous operations */
- for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
- {
- if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
- {
- continue;
- }
-
- /* gets the "errno" value of the individual op */
- ret = aio_error(&aiocb_p[i]);
- if (ret == 0)
- {
- /*
- this particular operation completed w/out error.
- gets the return value of the individual op
- */
- ret = aio_return(&aiocb_p[i]);
-
- gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
- "aio_return() ret %d (fd %d)\n",
- __func__,
- ((op_p->type == BSTREAM_WRITE_LIST) ||
- (op_p->type == BSTREAM_WRITE_AT) ?
- "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
-
- /* aio_return doesn't seem to return bytes read/written if
- * sigev_notify == SIGEV_NONE, so we set the out size
- * from what's requested. For reads we just leave as zero,
- * which ends up being OK,
- * since the amount read (if past EOF its less than requested)
- * is determined from the bstream size.
- */
- if (op_p->type == BSTREAM_WRITE_LIST ||
- op_p->type == BSTREAM_WRITE_AT)
- {
- *(op_p->u.b_rw_list.out_size_p) += aiocb_p[i].aio_nbytes;
- }
- /* mark as a NOP so we ignore it from now on */
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- }
- else if (ret != EINPROGRESS)
- {
- gossip_err("%s: aio_error on block %d, skipping: %s\n",
- __func__, i, strerror(ret));
- ret = -trove_errno_to_trove_error(ret);
- goto final_aio_cleanup;
- }
- else
- {
- /* otherwise the operation is still in progress; skip it */
- op_in_progress_count++;
- }
- }
- }
+ /* operations potentially in progress */
+ aiocb_p = op_p->u.b_rw_list.aiocb_array;
- /* if we're not done with the last set of operations, break out */
- if (op_in_progress_count > 0)
+ ret = dbpf_bstream_aio_check_progress(aiocb_p);
+ if(ret == EINPROGRESS)
{
return 0;
}
- else if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
+ else if(ret < 0)
{
- /* we've posted everything, and it all completed */
- ret = 1;
-
- final_aio_cleanup:
- if ((op_p->type == BSTREAM_WRITE_AT) ||
- (op_p->type == BSTREAM_WRITE_LIST))
- {
- DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
- }
-
- dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
- op_p->u.b_rw_list.fd = -1;
-
- start_delayed_ops_if_any(1);
- return ret;
+ dbpf_bstream_aio_cleanup(op_p);
+ return 1;
}
- else
- {
- /* no operations in progress; convert and post some more */
- aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
- ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
- op_p->u.b_rw_list.opcode,
- op_p->u.b_rw_list.mem_offset_array,
- op_p->u.b_rw_list.mem_size_array,
- op_p->u.b_rw_list.mem_array_count,
- op_p->u.b_rw_list.stream_offset_array,
- op_p->u.b_rw_list.stream_size_array,
- op_p->u.b_rw_list.stream_array_count,
- aiocb_p,
- &aiocb_inuse_count,
- &op_p->u.b_rw_list.lio_state);
-
- if (ret == 1)
- {
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
- }
-
- /* mark unused with LIO_NOPs */
- for (i = aiocb_inuse_count;
- i < op_p->u.b_rw_list.aiocb_array_count; i++)
- {
- aiocb_p[i].aio_lio_opcode = LIO_NOP;
- }
-
- for (i = 0; i < aiocb_inuse_count; i++)
- {
- aiocb_ptr_array[i] = &aiocb_p[i];
- }
-
- if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
- {
- op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
- }
-
- /*
- we use a reverse mapped ptr for I/O operations in order to
- access the queued op from the op. this is only useful for
- the delayed io operation scheme. it's initialized in
- dbpf_bstream_rw_list
- */
- assert(op_p->u.b_rw_list.queued_op_ptr);
- ret = issue_or_delay_io_operation((dbpf_queued_op_t *) op_p->u.
- b_rw_list.queued_op_ptr,
- aiocb_ptr_array, aiocb_inuse_count,
- &op_p->u.b_rw_list.sigev, 1);
-
- if (ret)
- {
- return ret;
- }
- return 0;
+ if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLPOSTED)
+ {
+ /* we've posted everything, and it all completed */
+ dbpf_bstream_aio_cleanup(op_p);
+ return 1;
}
}
#endif
@@ -1510,6 +1237,176 @@ struct TROVE_bstream_ops dbpf_bstream_op
dbpf_bstream_write_list,
dbpf_bstream_flush
};
+
+static int dbpf_bstream_aiocb_init(struct aiocb ** new_aiocb_p)
+{
+ struct aiocb * aiocb_p;
+
+ aiocb_p = malloc(AIOCB_ARRAY_SZ * sizeof(struct aiocb));
+ if(aiocb_p == NULL)
+ {
+ return -TROVE_ENOMEM;
+ }
+ memset(aiocb_p, 0, AIOCB_ARRAY_SZ * sizeof(struct aiocb));
+ for (i = 0; i < AIOCB_ARRAY_SZ; i++)
+ {
+ aiocb_p[i].aio_lio_opcode = LIO_NOP;
+ aiocb_p[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+ }
+
+ return 0;
+}
+
+static int dbpf_bstream_aio_start_op(dbpf_queued_op_t * q_op_p,
+ struct aiocb * aiocb_p)
+{
+ struct aiocb * aiocb_ptr_array[AIOCB_ARRAY_SZ];
+ int aiocb_inuse_count;
+ int ret, i;
+ struct dbpf_op * op_p;
+
+ op_p = &q_op_p->op;
+
+ op_p->u.b_rw_list.aiocb_array_count = AIOCB_ARRAY_SZ;
+ op_p->u.b_rw_list.aiocb_array = aiocb_p;
+ op_p->u.b_rw_list.list_proc_state = LIST_PROC_INPROGRESS;
+
+ /* convert listio arguments into aiocb structures */
+ aiocb_inuse_count = op_p->u.b_rw_list.aiocb_array_count;
+ ret = dbpf_bstream_listio_convert(op_p->u.b_rw_list.fd,
+ op_p->u.b_rw_list.opcode,
+ op_p->u.b_rw_list.mem_offset_array,
+ op_p->u.b_rw_list.mem_size_array,
+ op_p->u.b_rw_list.mem_array_count,
+ op_p->u.b_rw_list.stream_offset_array,
+ op_p->u.b_rw_list.stream_size_array,
+ op_p->u.b_rw_list.stream_array_count,
+ aiocb_p,
+ &aiocb_inuse_count,
+ &op_p->u.b_rw_list.lio_state);
+
+ if (ret == 1)
+ {
+ op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLCONVERTED;
+ }
+
+ /* mark unused with LIO_NOPs */
+ for (i = aiocb_inuse_count; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+ {
+ aiocb_p[i].aio_lio_opcode = LIO_NOP;
+ }
+
+ for (i = 0; i < aiocb_inuse_count; i++)
+ {
+ aiocb_ptr_array[i] = &aiocb_p[i];
+ }
+
+ if (op_p->u.b_rw_list.list_proc_state == LIST_PROC_ALLCONVERTED)
+ {
+ op_p->u.b_rw_list.list_proc_state = LIST_PROC_ALLPOSTED;
+ }
+
+ ret =
+ dbpf_bstream_aio_issue_or_delay(
+ q_op_p, aiocb_ptr_array, aiocb_inuse_count,
+ &op_p->u.b_rw_list.sigev, 0);
+
+ return ret;
+}
+
+static int dbpf_bstream_aio_check_progress(struct dbpf_op * op_p)
+{
+ struct aiocb * aiocb_p;
+
+ /* operations potentially in progress */
+ aiocb_p = op_p->u.b_rw_list.aiocb_array;
+
+ /*
+ we should iterate through the ops here to determine the
+ error/return value of the op based on individual request
+ error/return values. they're ignored for now, however.
+ */
+ for (i = 0; i < op_p->u.b_rw_list.aiocb_array_count; i++)
+ {
+ if (aiocb_p[i].aio_lio_opcode == LIO_NOP)
+ {
+ continue;
+ }
+
+ /* aio_error gets the "errno" value of the individual op */
+ ret = aio_error(&aiocb_p[i]);
+ if(ret == EINPROGRESS)
+ {
+ op_in_progress_count++;
+ continue;
+ }
+ else if(ret < 0)
+ {
+ gossip_debug(GOSSIP_TROVE_DEBUG, "error %d (%s) from "
+ "aio_error/aio_return on block %d; "
+ "skipping\n", ret, strerror(ret), i);
+
+ ret = -trove_errno_to_trove_error(ret);
+ return ret;
+ }
+
+ /* aio_return gets the return value of the individual op */
+ ret = aio_return(&aiocb_p[i]);
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "%s: %s complete: "
+ "aio_return() says %d [fd = %d]\n",
+ __func__,
+ ((op_p->type == BSTREAM_WRITE_LIST) ||
+ (op_p->type == BSTREAM_WRITE_AT) ?
+ "WRITE" : "READ"), ret, op_p->u.b_rw_list.fd);
+
+ if(op_p->u.b_rw_list.sigev.sigev_notify == SIGEV_NONE)
+ {
+ /* aio_return doesn't seem to return bytes read/written if
+ * sigev_notify == SIGEV_NONE, so we set the out size
+ * from what's requested. For reads we just leave as zero,
+ * which ends up being OK,
+ * since the amount read (if past EOF its less than requested)
+ * is determined from the bstream size.
+ */
+ if (op_p->type == BSTREAM_WRITE_LIST ||
+ op_p->type == BSTREAM_WRITE_AT)
+ {
+ *(op_p->u.b_rw_list.out_size_p) += aiocb_p[i].aio_nbytes;
+ }
+ }
+ else
+ {
+ *(op_p->u.b_rw_list.out_size_p) += ret;
+ }
+
+ /* mark as a NOP so we ignore it from now on */
+ aiocb_p[i].aio_lio_opcode = LIO_NOP;
+ }
+
+ if(op_in_progress_count)
+ {
+ return EINPROGRESS;
+ }
+ return 0;
+}
+
+static int dbpf_bstream_aio_cleanup(struct dbpf_op * op_p)
+{
+ DBPF_AIO_SYNC_IF_NECESSARY(op_p, op_p->u.b_rw_list.fd, ret);
+
+ dbpf_open_cache_put(&op_p->u.b_rw_list.open_ref);
+ op_p->u.b_rw_list.fd = -1;
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "*** starting delayed ops if any "
+ "(state is %s)\n",
+ list_proc_state_strings[op_p->u.b_rw_list.
+ list_proc_state]);
+ start_delayed_ops_if_any(1);
+ return 0;
+}
+
+
#endif /* use AIO */
/*
* Local variables:
Index: dbpf-bstream-threaded.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/Attic/dbpf-bstream-threaded.c,v
diff -p -u -r1.1.2.12 -r1.1.2.13
--- dbpf-bstream-threaded.c 3 Aug 2006 13:52:07 -0000 1.1.2.12
+++ dbpf-bstream-threaded.c 3 Aug 2006 16:06:56 -0000 1.1.2.13
@@ -4,6 +4,8 @@
* See COPYING in top-level directory.
*/
+#include "dbpf.h"
+
/*
* Needed for O_DIRECT disk access
*/
@@ -28,7 +30,6 @@
#include "pvfs2-debug.h"
#include "trove.h"
#include "trove-internal.h"
-#include "dbpf.h"
#include "dbpf-op-queue.h"
#include "dbpf-attr-cache.h"
#include "pint-event.h"
Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.74.2.4 -r1.74.2.5
--- dbpf.h 24 Jul 2006 20:04:11 -0000 1.74.2.4
+++ dbpf.h 3 Aug 2006 16:06:56 -0000 1.74.2.5
@@ -11,6 +11,7 @@
extern "C" {
#endif
+#include <sys/types.h>
#include <db.h>
#include <aio.h>
#include "trove.h"
@@ -542,7 +543,9 @@ PVFS_error dbpf_db_error_to_trove_error(
#define DBPF_AIO_SYNC_IF_NECESSARY(dbpf_op_ptr, fd, ret) \
do { \
int tmp_ret, tmp_errno; \
- if (dbpf_op_ptr->flags & TROVE_SYNC) \
+ if ((dbpf_op_ptr->flags & TROVE_SYNC) && \
+ ((dbpf_op_ptr->type == BSTREAM_WRITE_AT) || \
+ (dbpf_op_ptr->type == BSTREAM_WRITE_LIST))) \
{ \
if ((tmp_ret = DBPF_SYNC(fd)) != 0) \
{ \
More information about the Pvfs2-cvs
mailing list