[Pvfs2-cvs] commit by aching in
pvfs2-1/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c
CVS commit program
cvs at parl.clemson.edu
Mon Jul 21 14:20:11 EDT 2008
Update of /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv19729/io/flow/flowproto-bmi-trove
Modified Files:
Tag: locking-branch
flowproto-multiqueue.c
Log Message:
Reverse merged and ported to HEAD.
Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.113 -r1.113.10.1
--- flowproto-multiqueue.c 16 Aug 2006 01:23:19 -0000 1.113
+++ flowproto-multiqueue.c 21 Jul 2008 18:20:11 -0000 1.113.10.1
@@ -144,17 +144,17 @@ static inline void bmi_send_callback_wra
{
struct fp_private_data *flow_data =
PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
bmi_send_callback_fn(user_ptr, actual_size, error_code, 0);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -164,16 +164,16 @@ static inline void bmi_recv_callback_wra
{
struct fp_private_data *flow_data =
PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
bmi_recv_callback_fn(user_ptr, actual_size, error_code);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -183,16 +183,16 @@ static inline void trove_read_callback_w
struct fp_private_data *flow_data =
PRIVATE_FLOW(((struct
result_chain_entry*)user_ptr)->q_item->parent);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
trove_read_callback_fn(user_ptr, error_code);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -202,16 +202,16 @@ static inline void trove_write_callback_
struct fp_private_data *flow_data =
PRIVATE_FLOW(((struct
result_chain_entry*)user_ptr)->q_item->parent);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
trove_write_callback_fn(user_ptr, error_code);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -233,16 +233,16 @@ static void mem_to_bmi_callback_wrapper(
{
struct fp_private_data *flow_data =
PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
mem_to_bmi_callback_fn(user_ptr, actual_size, error_code);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -255,18 +255,17 @@ static void bmi_to_mem_callback_wrapper(
assert(flow_data);
assert(flow_data->parent);
- assert(flow_data->parent->flow_mutex);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
bmi_to_mem_callback_fn(user_ptr, actual_size, error_code);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
@@ -459,8 +458,8 @@ int fp_multiqueue_cancel(flow_descriptor
{
struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
- gossip_err("Flow proto cancel called on %p\n", flow_d);
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gossip_err("%s: flow proto cancel called on %p\n", __func__, flow_d);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
/*
if the flow is already marked as complete, then there is nothing
to do
@@ -468,14 +467,16 @@ int fp_multiqueue_cancel(flow_descriptor
if(flow_d->state != FLOW_COMPLETE)
{
gossip_debug(GOSSIP_CANCEL_DEBUG,
- "PINT_flow_cancel() called on active flow, %lld "
- "bytes transferred.\n",
- lld(flow_d->total_transferred));
+ "%s: called on active flow, %lld bytes transferred.\n",
+ __func__, lld(flow_d->total_transferred));
assert(flow_d->state == FLOW_TRANSMITTING);
- handle_io_error(-PVFS_ECANCEL, NULL, flow_data);
+ /* NOTE: set flow error class bit so that system interface understands
+ * that this may be a retry-able error
+ */
+ handle_io_error(-(PVFS_ECANCEL|PVFS_ERROR_FLOW), NULL, flow_data);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
return(0);
}
@@ -483,10 +484,10 @@ int fp_multiqueue_cancel(flow_descriptor
else
{
gossip_debug(GOSSIP_CANCEL_DEBUG,
- "PINT_flow_cancel() called on already completed "
- "flow; doing nothing.\n");
+ "%s: called on already completed flow; doing nothing.\n",
+ __func__);
}
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
return(0);
}
@@ -583,16 +584,16 @@ int fp_multiqueue_post(flow_descriptor
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
}
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
bmi_to_mem_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
else if(flow_d->src.endpoint_id == MEM_ENDPOINT &&
@@ -609,16 +610,16 @@ int fp_multiqueue_post(flow_descriptor
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
}
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
mem_to_bmi_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
#ifdef __PVFS2_TROVE_SUPPORT__
@@ -626,7 +627,7 @@ int fp_multiqueue_post(flow_descriptor
flow_d->dest.endpoint_id == BMI_ENDPOINT)
{
flow_data->initial_posts = flow_d->buffers_per_flow;
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
for(i=0; i<flow_d->buffers_per_flow; i++)
{
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
@@ -635,18 +636,17 @@ int fp_multiqueue_post(flow_descriptor
bmi_send_callback_fn(&(flow_data->prealloc_array[i]), 0, 0, 1);
if(flow_data->dest_last_posted)
{
- flow_data->initial_posts = 0;
break;
}
}
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
@@ -666,16 +666,16 @@ int fp_multiqueue_post(flow_descriptor
&flow_data->prealloc_array[0];
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue forcing trove_write_callback_fn.\n");
- gen_mutex_lock(flow_data->parent->flow_mutex);
+ gen_mutex_lock(&flow_data->parent->flow_mutex);
trove_write_callback_fn(&(flow_data->prealloc_array[0].result_chain), 0);
if(flow_data->parent->state == FLOW_COMPLETE)
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
FLOW_CLEANUP(flow_data);
}
else
{
- gen_mutex_unlock(flow_data->parent->flow_mutex);
+ gen_mutex_unlock(&flow_data->parent->flow_mutex);
}
}
#endif
@@ -709,6 +709,7 @@ static void bmi_recv_callback_fn(void *u
PVFS_size bytes_processed = 0;
void *tmp_buffer;
void *tmp_user_ptr;
+ int sync_mode = 0;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue bmi_recv_callback_fn, error code: %d, flow: %p.\n",
@@ -737,6 +738,15 @@ static void bmi_recv_callback_fn(void *u
tmp_user_ptr = result_tmp;
assert(result_tmp->result.bytes);
+ if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ {
+ /* This is the last write operation for this flow. Set sync
+ * flag if needed
+ */
+ sync_mode = get_data_sync_mode(
+ q_item->parent->dest.u.trove.coll_id);
+ }
+
ret = trove_bstream_write_list(
q_item->parent->dest.u.trove.coll_id,
q_item->parent->dest.u.trove.handle,
@@ -747,7 +757,7 @@ static void bmi_recv_callback_fn(void *u
result_tmp->result.size_array,
result_tmp->result.segs,
&q_item->out_size,
- get_data_sync_mode(q_item->parent->dest.u.trove.coll_id),
+ sync_mode,
NULL,
&result_tmp->trove_callback,
global_trove_context,
@@ -955,7 +965,10 @@ static void trove_read_callback_fn(void
global_bmi_context);
flow_data->next_seq_to_send++;
if(q_item->last)
+ {
+ flow_data->initial_posts = 0;
flow_data->dest_last_posted = 1;
+ }
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
__func__,
@@ -1169,7 +1182,10 @@ static int bmi_send_callback_fn(void *us
* is no work to do, trigger manually
*/
if(flow_data->total_bytes_processed == 0)
+ {
+ flow_data->initial_posts = 0;
flow_data->dest_last_posted = 1;
+ }
}
if(bytes_processed == 0)
@@ -1213,6 +1229,7 @@ static int bmi_send_callback_fn(void *us
* to prevent further trying to start other qitems from being
* posted
*/
+ flow_data->initial_posts = 0;
flow_data->dest_last_posted = 1;
return 0;
}
@@ -1940,6 +1957,7 @@ static void handle_io_error(
struct fp_private_data *flow_data)
{
int ret;
+ char buf[64] = {0};
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue handle_io_error() called for flow %p.\n",
@@ -1949,8 +1967,9 @@ static void handle_io_error(
if(flow_data->parent->error_code == 0)
{
enum flow_endpoint_type src, dest;
-
- gossip_err("Flow proto error cleanup started on %p, error_code: %d\n", flow_data->parent, error_code);
+
+ PVFS_strerror_r(error_code, buf, 64);
+ gossip_err("%s: flow proto error cleanup started on %p: %s\n", __func__, flow_data->parent, buf);
flow_data->parent->error_code = error_code;
if(q_item)
@@ -1967,14 +1986,14 @@ static void handle_io_error(
{
ret = cancel_pending_bmi(&flow_data->src_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d BMI-mem BMI ops.\n", ret);
+ "flowproto-multiqueue canceled %d bmi-mem BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
else if (src == MEM_ENDPOINT && dest == BMI_ENDPOINT)
{
ret = cancel_pending_bmi(&flow_data->dest_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d mem-BMI BMI ops.\n", ret);
+ "flowproto-multiqueue canceled %d mem-bmi BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
else if (src == TROVE_ENDPOINT && dest == BMI_ENDPOINT)
@@ -1982,21 +2001,21 @@ static void handle_io_error(
ret = cancel_pending_trove(&flow_data->src_list);
flow_data->cleanup_pending_count += ret;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d trove-bmi Trove ops.\n", ret);
+ "flowproto-multiqueue canceled %d trove-bmi Trove ops.\n", ret);
ret = cancel_pending_bmi(&flow_data->dest_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d trove-bmi BMI ops.\n", ret);
+ "flowproto-multiqueue canceled %d trove-bmi BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
else if (src == BMI_ENDPOINT && dest == TROVE_ENDPOINT)
{
ret = cancel_pending_bmi(&flow_data->src_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d bmi-trove BMI ops.\n", ret);
+ "flowproto-multiqueue canceled %d bmi-trove BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
ret = cancel_pending_trove(&flow_data->dest_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue canceling %d bmi-trove Trove ops.\n", ret);
+ "flowproto-multiqueue canceled %d bmi-trove Trove ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
else
@@ -2004,8 +2023,9 @@ static void handle_io_error(
/* impossible condition */
assert(0);
}
- gossip_err("Flow proto %p canceling a total of %d BMI or Trove operations\n",
- flow_data->parent, flow_data->cleanup_pending_count);
+ gossip_err("%s: flow proto %p canceled %d operations, will clean up.\n",
+ __func__, flow_data->parent,
+ flow_data->cleanup_pending_count);
}
else
{
@@ -2019,8 +2039,9 @@ static void handle_io_error(
if(flow_data->cleanup_pending_count == 0)
{
- gossip_err("Flow proto error cleanup finished %p, error_code: %d\n",
- flow_data->parent, flow_data->parent->error_code);
+ PVFS_strerror_r(flow_data->parent->error_code, buf, 64);
+ gossip_err("%s: flow proto %p error cleanup finished: %s\n",
+ __func__, flow_data->parent, buf);
/* we are finished, make sure error is marked and state is set */
assert(flow_data->parent->error_code);
More information about the Pvfs2-cvs
mailing list