[Pvfs2-cvs] commit by nlmills in
pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c
CVS commit program
cvs at parl.clemson.edu
Tue Aug 25 13:56:16 EDT 2009
Update of /anoncvs/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv5511/src/io/flow/flowproto-bmi-trove
Modified Files:
Tag: cu-security-branch
flowproto-multiqueue.c
Log Message:
merged in changes from summer at LANL
Index: flowproto-multiqueue.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.117.10.1 -r1.117.10.2
--- flowproto-multiqueue.c 16 May 2008 15:15:41 -0000 1.117.10.1
+++ flowproto-multiqueue.c 25 Aug 2009 17:56:16 -0000 1.117.10.2
@@ -106,7 +106,8 @@ static void handle_io_error(
static int cancel_pending_bmi(
struct qlist_head *list);
static int cancel_pending_trove(
- struct qlist_head *list);
+ struct qlist_head *list,
+ TROVE_coll_id coll_id);
#ifdef __PVFS2_TROVE_SUPPORT__
typedef struct
@@ -473,6 +474,7 @@ int fp_multiqueue_cancel(flow_descriptor
/* NOTE: set flow error class bit so that system interface understands
* that this may be a retry-able error
*/
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(-(PVFS_ECANCEL|PVFS_ERROR_FLOW), NULL, flow_data);
if(flow_data->parent->state == FLOW_COMPLETE)
{
@@ -709,6 +711,7 @@ static void bmi_recv_callback_fn(void *u
PVFS_size bytes_processed = 0;
void *tmp_buffer;
void *tmp_user_ptr;
+ int sync_mode = 0;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue bmi_recv_callback_fn, error code: %d, flow: %p.\n",
@@ -718,6 +721,7 @@ static void bmi_recv_callback_fn(void *u
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
return;
}
@@ -737,6 +741,15 @@ static void bmi_recv_callback_fn(void *u
tmp_user_ptr = result_tmp;
assert(result_tmp->result.bytes);
+ if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ {
+ /* This is the last write operation for this flow. Set sync
+ * flag if needed
+ */
+ sync_mode = get_data_sync_mode(
+ q_item->parent->dest.u.trove.coll_id);
+ }
+
ret = trove_bstream_write_list(
q_item->parent->dest.u.trove.coll_id,
q_item->parent->dest.u.trove.handle,
@@ -747,16 +760,18 @@ static void bmi_recv_callback_fn(void *u
result_tmp->result.size_array,
result_tmp->result.segs,
&q_item->out_size,
- get_data_sync_mode(q_item->parent->dest.u.trove.coll_id),
+ sync_mode,
NULL,
&result_tmp->trove_callback,
global_trove_context,
- &result_tmp->posted_id);
+ &result_tmp->posted_id,
+ q_item->parent->hints);
result_tmp = result_tmp->next;
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -782,7 +797,8 @@ static void bmi_recv_callback_fn(void *u
if(!q_item->buffer)
{
/* if the q_item has not been used, allocate a buffer */
- q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
+ q_item->buffer = BMI_memalloc(
+ q_item->parent->src.u.bmi.address,
q_item->parent->buffer_size, BMI_RECV);
/* TODO: error handling */
assert(q_item->buffer);
@@ -850,19 +866,26 @@ static void bmi_recv_callback_fn(void *u
flow_data->total_bytes_processed += bytes_processed;
+ gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+ "offset %llu, buffer ptr: %p\n",
+ llu(q_item->result_chain.result.offset_array[0]),
+ q_item->buffer);
+
/* TODO: what if we recv less than expected? */
ret = BMI_post_recv(&q_item->posted_id,
- q_item->parent->src.u.bmi.address,
- q_item->buffer,
- flow_data->parent->buffer_size,
- &tmp_actual_size,
+ q_item->parent->src.u.bmi.address,
+ ((char *)q_item->buffer),
+ flow_data->parent->buffer_size,
+ &tmp_actual_size,
BMI_PRE_ALLOC,
q_item->parent->tag,
&q_item->bmi_callback,
- global_bmi_context);
-
+ global_bmi_context,
+ (bmi_hint)q_item->parent->hints);
+
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -873,7 +896,7 @@ static void bmi_recv_callback_fn(void *u
bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
}
}
-
+
return;
}
@@ -905,6 +928,7 @@ static void trove_read_callback_fn(void
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
return;
}
@@ -952,7 +976,8 @@ static void trove_read_callback_fn(void
BMI_PRE_ALLOC,
q_item->parent->tag,
&q_item->bmi_callback,
- global_bmi_context);
+ global_bmi_context,
+ (bmi_hint)q_item->parent->hints);
flow_data->next_seq_to_send++;
if(q_item->last)
{
@@ -973,6 +998,7 @@ static void trove_read_callback_fn(void
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -1026,6 +1052,7 @@ static int bmi_send_callback_fn(void *us
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
if(flow_data->parent->state == FLOW_COMPLETE)
return(1);
@@ -1091,7 +1118,8 @@ static int bmi_send_callback_fn(void *us
else
{
/* if the q_item has not been used, allocate a buffer */
- q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
+ q_item->buffer = BMI_memalloc(
+ q_item->parent->dest.u.bmi.address,
q_item->parent->buffer_size, BMI_SEND);
/* TODO: error handling */
assert(q_item->buffer);
@@ -1254,12 +1282,14 @@ static int bmi_send_callback_fn(void *us
NULL,
&result_tmp->trove_callback,
global_trove_context,
- &result_tmp->posted_id);
+ &result_tmp->posted_id,
+ flow_data->parent->hints);
result_tmp = result_tmp->next;
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
if(flow_data->parent->state == FLOW_COMPLETE)
return(1);
@@ -1304,6 +1334,7 @@ static void trove_write_callback_fn(void
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
return;
}
@@ -1359,7 +1390,8 @@ static void trove_write_callback_fn(void
else
{
/* if the q_item has not been used, allocate a buffer */
- q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
+ q_item->buffer = BMI_memalloc(
+ q_item->parent->src.u.bmi.address,
q_item->parent->buffer_size, BMI_RECV);
/* TODO: error handling */
assert(q_item->buffer);
@@ -1443,19 +1475,25 @@ static void trove_write_callback_fn(void
return;
}
+ gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+ "offset %llu, buffer ptr: %p\n",
+ llu(q_item->result_chain.result.offset_array[0]),
+ q_item->buffer);
/* TODO: what if we recv less than expected? */
ret = BMI_post_recv(&q_item->posted_id,
q_item->parent->src.u.bmi.address,
- q_item->buffer,
+ ((char *)q_item->buffer),
flow_data->parent->buffer_size,
&tmp_actual_size,
BMI_PRE_ALLOC,
q_item->parent->tag,
&q_item->bmi_callback,
- global_bmi_context);
-
+ global_bmi_context,
+ (bmi_hint)q_item->parent->hints);
+
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -1495,10 +1533,10 @@ static void cleanup_buffers(struct fp_pr
{
if(flow_data->prealloc_array[i].buffer)
{
- BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->prealloc_array[i].buffer,
- flow_data->parent->buffer_size,
- BMI_RECV);
+ BMI_memfree(flow_data->parent->src.u.bmi.address,
+ flow_data->prealloc_array[i].buffer,
+ flow_data->parent->buffer_size,
+ BMI_RECV);
}
result_tmp = &(flow_data->prealloc_array[i].result_chain);
do{
@@ -1583,6 +1621,7 @@ static void mem_to_bmi_callback_fn(void
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
return;
}
@@ -1632,7 +1671,8 @@ static void mem_to_bmi_callback_fn(void
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->dest.u.bmi.address,
- flow_data->parent->buffer_size, BMI_SEND);
+ flow_data->parent->buffer_size,
+ BMI_SEND);
/* TODO: error handling */
assert(flow_data->intermediate);
}
@@ -1718,10 +1758,12 @@ static void mem_to_bmi_callback_fn(void
buffer_type,
q_item->parent->tag,
&q_item->bmi_callback,
- global_bmi_context);
+ global_bmi_context,
+ (bmi_hint)q_item->parent->hints);
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -1766,6 +1808,7 @@ static void bmi_to_mem_callback_fn(void
if(error_code != 0 || flow_data->parent->error_code != 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(error_code, q_item, flow_data);
return;
}
@@ -1864,7 +1907,8 @@ static void bmi_to_mem_callback_fn(void
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->src.u.bmi.address,
- flow_data->parent->buffer_size, BMI_RECV);
+ flow_data->parent->buffer_size,
+ BMI_RECV);
/* TODO: error handling */
assert(flow_data->intermediate);
}
@@ -1915,10 +1959,12 @@ static void bmi_to_mem_callback_fn(void
buffer_type,
q_item->parent->tag,
&q_item->bmi_callback,
- global_bmi_context);
+ global_bmi_context,
+ (bmi_hint)q_item->parent->hints);
if(ret < 0)
{
+ gossip_err("%s: I/O error occurred\n", __func__);
handle_io_error(ret, q_item, flow_data);
return;
}
@@ -1988,7 +2034,7 @@ static void handle_io_error(
}
else if (src == TROVE_ENDPOINT && dest == BMI_ENDPOINT)
{
- ret = cancel_pending_trove(&flow_data->src_list);
+ ret = cancel_pending_trove(&flow_data->src_list, flow_data->parent->src.u.trove.coll_id);
flow_data->cleanup_pending_count += ret;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue canceled %d trove-bmi Trove ops.\n", ret);
@@ -2003,7 +2049,7 @@ static void handle_io_error(
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue canceled %d bmi-trove BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
- ret = cancel_pending_trove(&flow_data->dest_list);
+ ret = cancel_pending_trove(&flow_data->dest_list, flow_data->parent->dest.u.trove.coll_id);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue canceled %d bmi-trove Trove ops.\n", ret);
flow_data->cleanup_pending_count += ret;
@@ -2086,7 +2132,7 @@ static int cancel_pending_bmi(struct qli
*
* returns the number of operations that were canceled
*/
-static int cancel_pending_trove(struct qlist_head *list)
+static int cancel_pending_trove(struct qlist_head *list, TROVE_coll_id coll_id)
{
struct qlist_head *tmp_link;
struct fp_queue_item *q_item = NULL;
@@ -2111,7 +2157,7 @@ static int cancel_pending_trove(struct q
count++;
ret = PINT_thread_mgr_trove_cancel(
old_result_tmp->posted_id,
- q_item->parent->src.u.trove.coll_id,
+ coll_id,
&old_result_tmp->trove_callback);
if(ret < 0)
{
More information about the Pvfs2-cvs
mailing list