[PVFS2-CVS]
commit by rbross in pvfs2/src/io/flow/flowproto-bmi-trove:
flowproto-multiqueue.c
CVS commit program
cvs at parl.clemson.edu
Mon Feb 16 16:22:43 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb:/tmp/cvs-serv32348/src/io/flow/flowproto-bmi-trove
Modified Files:
flowproto-multiqueue.c
Log Message:
combo PVFS_id_gen_t -> PVFS_BMI_addr_t, formatting, PVFS error code patch.
Ugly. Sorry if my formatting pisses someone off; at least I'm not using >
80 columns any more :).
Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.65 -r1.66
--- flowproto-multiqueue.c 14 Feb 2004 23:19:25 -0000 1.65
+++ flowproto-multiqueue.c 16 Feb 2004 21:22:43 -0000 1.66
@@ -109,40 +109,41 @@ static bmi_context_id global_bmi_context
#ifdef __PVFS2_TROVE_SUPPORT__
static TROVE_context_id global_trove_context = -1;
static void bmi_recv_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
+ PVFS_size actual_size,
+ PVFS_error error_code);
static int bmi_send_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code,
- int initial_call_flag);
+ PVFS_size actual_size,
+ PVFS_error error_code,
+ int initial_call_flag);
/* the above function is a special case; we need to look at a return
* value when we invoke it directly, so we use the following function
* to trigger it from a callback
*/
static void bmi_send_callback_wrapper(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
bmi_send_callback_fn(user_ptr, actual_size, error_code, 0);
return;
};
static void trove_read_callback_fn(void *user_ptr,
- PVFS_error error_code);
+ PVFS_error error_code);
static void trove_write_callback_fn(void *user_ptr,
- PVFS_error error_code);
+ PVFS_error error_code);
#endif
static void mem_to_bmi_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
+ PVFS_size actual_size,
+ PVFS_error error_code);
static void bmi_to_mem_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
-static void cleanup_buffers(struct fp_private_data* flow_data);
-static void handle_io_error(PVFS_error error_code, struct fp_queue_item*
- q_item, struct fp_private_data* flow_data);
-static int cancel_pending_bmi(struct qlist_head* list);
-static int cancel_pending_trove(struct qlist_head* list);
+ PVFS_size actual_size,
+ PVFS_error error_code);
+static void cleanup_buffers(struct fp_private_data *flow_data);
+static void handle_io_error(PVFS_error error_code,
+ struct fp_queue_item *q_item,
+ struct fp_private_data *flow_data);
+static int cancel_pending_bmi(struct qlist_head *list);
+static int cancel_pending_trove(struct qlist_head *list);
/* interface prototypes */
@@ -151,12 +152,12 @@ static int fp_multiqueue_initialize(int
static int fp_multiqueue_finalize(void);
static int fp_multiqueue_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
+ int option,
+ void *parameter);
static int fp_multiqueue_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
+ int option,
+ void *parameter);
static int fp_multiqueue_post(flow_descriptor * flow_d);
@@ -182,23 +183,23 @@ int fp_multiqueue_initialize(int flowpro
int ret = -1;
ret = PINT_thread_mgr_bmi_start();
- if(ret < 0)
- return(ret);
+ if (ret < 0)
+ return ret;
PINT_thread_mgr_bmi_getcontext(&global_bmi_context);
#ifdef __PVFS2_TROVE_SUPPORT__
ret = PINT_thread_mgr_trove_start();
- if(ret < 0)
+ if (ret < 0)
{
PINT_thread_mgr_bmi_stop();
- return(ret);
+ return ret;
}
PINT_thread_mgr_trove_getcontext(&global_trove_context);
#endif
fp_multiqueue_id = flowproto_id;
- return(0);
+ return 0;
}
/* fp_multiqueue_finalize()
@@ -213,7 +214,7 @@ int fp_multiqueue_finalize(void)
#ifdef __PVFS2_TROVE_SUPPORT__
PINT_thread_mgr_trove_stop();
#endif
- return (0);
+ return 0;
}
/* fp_multiqueue_getinfo()
@@ -222,9 +223,9 @@ int fp_multiqueue_finalize(void)
*
* returns 0 on success, -PVFS_error on failure
*/
-int fp_multiqueue_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
+int fp_multiqueue_getinfo(flow_descriptor *flow_d,
+ int option,
+ void *parameter)
{
int* type;
@@ -233,11 +234,11 @@ int fp_multiqueue_getinfo(flow_descripto
case FLOWPROTO_TYPE_QUERY:
type = parameter;
if(*type == FLOWPROTO_MULTIQUEUE)
- return(0);
+ return 0;
else
- return(-PVFS_ENOPROTOOPT);
+ return -PVFS_ENOPROTOOPT;
default:
- return(-PVFS_ENOSYS);
+ return -PVFS_ENOSYS;
break;
}
}
@@ -249,10 +250,10 @@ int fp_multiqueue_getinfo(flow_descripto
* returns 0 on success, -PVFS_error on failure
*/
int fp_multiqueue_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
+ int option,
+ void *parameter)
{
- return (-PVFS_ENOSYS);
+ return -PVFS_ENOSYS;
}
/* fp_multiqueue_post()
@@ -275,10 +276,10 @@ int fp_multiqueue_post(flow_descriptor *
(flow_d->src.endpoint_id == BMI_ENDPOINT &&
flow_d->dest.endpoint_id == MEM_ENDPOINT));
- flow_data = (struct fp_private_data*)malloc(sizeof(struct
- fp_private_data));
- if(!flow_data)
- return(-PVFS_ENOMEM);
+ flow_data = (struct fp_private_data *)
+ malloc(sizeof(struct fp_private_data));
+ if (flow_data == NULL)
+ return -PVFS_ENOMEM;
memset(flow_data, 0, sizeof(struct fp_private_data));
flow_d->flow_protocol_data = flow_data;
@@ -292,12 +293,14 @@ int fp_multiqueue_post(flow_descriptor *
/* if a file datatype offset was specified, go ahead and skip ahead
* before doing anything else
*/
- if(flow_d->file_req_offset)
+ if (flow_d->file_req_offset)
+ {
PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state,
flow_d->file_req_offset);
+ }
/* set boundaries on file datatype */
- if(flow_d->aggregate_size > -1)
+ if (flow_d->aggregate_size > -1)
{
PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
flow_d->aggregate_size+flow_d->file_req_offset);
@@ -309,7 +312,7 @@ int fp_multiqueue_post(flow_descriptor *
PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
}
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for (i=0; i < BUFFERS_PER_FLOW; i++)
{
flow_data->prealloc_array[i].parent = flow_d;
flow_data->prealloc_array[i].bmi_callback.data =
@@ -317,26 +320,26 @@ int fp_multiqueue_post(flow_descriptor *
}
/* remaining setup depends on the endpoints we intend to use */
- if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
+ if (flow_d->src.endpoint_id == BMI_ENDPOINT &&
flow_d->dest.endpoint_id == MEM_ENDPOINT)
{
flow_data->prealloc_array[0].buffer = flow_d->dest.u.mem.buffer;
flow_data->prealloc_array[0].bmi_callback.fn = bmi_to_mem_callback_fn;
bmi_to_mem_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
}
- else if(flow_d->src.endpoint_id == MEM_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_d->src.endpoint_id == MEM_ENDPOINT &&
+ flow_d->dest.endpoint_id == BMI_ENDPOINT)
{
flow_data->prealloc_array[0].buffer = flow_d->src.u.mem.buffer;
flow_data->prealloc_array[0].bmi_callback.fn = mem_to_bmi_callback_fn;
mem_to_bmi_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
}
#ifdef __PVFS2_TROVE_SUPPORT__
- else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_d->src.endpoint_id == TROVE_ENDPOINT &&
+ flow_d->dest.endpoint_id == BMI_ENDPOINT)
{
flow_data->initial_posts = BUFFERS_PER_FLOW;
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for (i=0; i < BUFFERS_PER_FLOW; i++)
{
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -344,14 +347,14 @@ int fp_multiqueue_post(flow_descriptor *
bmi_send_callback_fn(&(flow_data->prealloc_array[i]), 0, 0, 1);
}
}
- else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
- flow_d->dest.endpoint_id == TROVE_ENDPOINT)
+ else if (flow_d->src.endpoint_id == BMI_ENDPOINT &&
+ flow_d->dest.endpoint_id == TROVE_ENDPOINT)
{
/* only post one outstanding recv at a time; easier to manage */
flow_data->initial_posts = 1;
/* place remaining buffers on "empty" queue */
- for(i=1; i<BUFFERS_PER_FLOW; i++)
+ for (i=1; i < BUFFERS_PER_FLOW; i++)
{
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
@@ -380,8 +383,8 @@ int fp_multiqueue_post(flow_descriptor *
* no return value
*/
static void bmi_recv_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
struct fp_queue_item* q_item = user_ptr;
int ret;
@@ -411,46 +414,46 @@ static void bmi_recv_callback_fn(void *u
/* add to dest queue */
qlist_add_tail(&q_item->list_link, &flow_data->dest_list);
result_tmp = &q_item->result_chain;
- do{
+ do {
assert(result_tmp->result.bytes);
result_tmp->q_item = q_item;
result_tmp->trove_callback.data = result_tmp;
result_tmp->trove_callback.fn = trove_write_callback_fn;
tmp_user_ptr = &result_tmp->trove_callback;
ret = trove_bstream_write_list(q_item->parent->dest.u.trove.coll_id,
- q_item->parent->dest.u.trove.handle,
- (char**)&result_tmp->buffer_offset,
- &result_tmp->result.bytes,
- 1,
- result_tmp->result.offset_array,
- result_tmp->result.size_array,
- result_tmp->result.segs,
- &result_tmp->result.bytes,
- 0,
- NULL,
- &result_tmp->trove_callback,
- global_trove_context,
- &result_tmp->posted_id);
+ q_item->parent->dest.u.trove.handle,
+ (char**)&result_tmp->buffer_offset,
+ &result_tmp->result.bytes,
+ 1,
+ result_tmp->result.offset_array,
+ result_tmp->result.size_array,
+ result_tmp->result.segs,
+ &result_tmp->result.bytes,
+ 0,
+ NULL,
+ &result_tmp->trove_callback,
+ global_trove_context,
+ &result_tmp->posted_id);
result_tmp = result_tmp->next;
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
- if(ret == 1)
+ if (ret == 1)
{
gen_mutex_unlock(&flow_data->flow_mutex);
/* immediate completion; trigger callback ourselves */
trove_write_callback_fn(tmp_user_ptr, 0);
gen_mutex_lock(&flow_data->flow_mutex);
}
- }while(result_tmp);
+ } while(result_tmp);
/* do we need to repost another recv? */
- if((!PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ if ((!PINT_REQUEST_DONE(q_item->parent->file_req_state))
&& qlist_empty(&flow_data->src_list)
&& !qlist_empty(&flow_data->empty_list))
{
@@ -459,7 +462,7 @@ static void bmi_recv_callback_fn(void *u
qlist_del(&q_item->list_link);
qlist_add_tail(&q_item->list_link, &flow_data->src_list);
- if(!q_item->buffer)
+ if (!q_item->buffer)
{
/* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
@@ -474,7 +477,7 @@ static void bmi_recv_callback_fn(void *u
tmp_buffer = q_item->buffer;
do{
q_item->result_chain_count++;
- if(!result_tmp)
+ if (!result_tmp)
{
result_tmp = (struct result_chain_entry*)malloc(
sizeof(struct result_chain_entry));
@@ -504,10 +507,10 @@ static void bmi_recv_callback_fn(void *u
result_tmp = result_tmp->next;
tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
bytes_processed += old_result_tmp->result.bytes;
- }while(bytes_processed < BUFFER_SIZE &&
+ } while(bytes_processed < BUFFER_SIZE &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- if(bytes_processed == 0)
+ if (bytes_processed == 0)
{
gen_mutex_unlock(&flow_data->flow_mutex);
qlist_del(&q_item->list_link);
@@ -528,13 +531,13 @@ static void bmi_recv_callback_fn(void *u
&q_item->bmi_callback,
global_bmi_context);
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
- if(ret == 1)
+ if (ret == 1)
{
/* immediate completion; trigger callback ourselves */
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -555,7 +558,7 @@ static void bmi_recv_callback_fn(void *u
* no return value
*/
static void trove_read_callback_fn(void *user_ptr,
- PVFS_error error_code)
+ PVFS_error error_code)
{
int ret;
struct result_chain_entry* result_tmp = user_ptr;
@@ -574,14 +577,14 @@ static void trove_read_callback_fn(void
result_tmp->posted_id = 0;
- if(error_code != 0 || flow_data->parent->error_code != 0)
+ if (error_code != 0 || flow_data->parent->error_code != 0)
{
handle_io_error(error_code, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
/* don't do anything until the last read completes */
- if(q_item->result_chain_count > 1)
+ if (q_item->result_chain_count > 1)
{
q_item->result_chain_count--;
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -594,26 +597,29 @@ static void trove_read_callback_fn(void
qlist_add_tail(&q_item->list_link, &flow_data->dest_list);
result_tmp = &q_item->result_chain;
- do{
+ do {
old_result_tmp = result_tmp;
result_tmp = result_tmp->next;
if(old_result_tmp != &q_item->result_chain)
free(old_result_tmp);
- }while(result_tmp);
+ } while(result_tmp);
q_item->result_chain.next = NULL;
q_item->result_chain_count = 0;
/* while we hold dest lock, look for next seq no. to send */
- do{
+ do {
qlist_for_each(tmp_link, &flow_data->dest_list)
{
- q_item = qlist_entry(tmp_link, struct fp_queue_item,
- list_link);
- if(q_item->seq == flow_data->next_seq_to_send)
+ q_item = qlist_entry(tmp_link,
+ struct fp_queue_item,
+ list_link);
+ if (q_item->seq == flow_data->next_seq_to_send)
+ {
break;
+ }
}
- if(q_item->seq == flow_data->next_seq_to_send)
+ if (q_item->seq == flow_data->next_seq_to_send)
{
flow_data->dest_pending++;
assert(q_item->buffer_used);
@@ -626,8 +632,10 @@ static void trove_read_callback_fn(void
&q_item->bmi_callback,
global_bmi_context);
flow_data->next_seq_to_send++;
- if(q_item->last)
+ if (q_item->last)
+ {
flow_data->dest_last_posted = 1;
+ }
}
else
{
@@ -635,13 +643,13 @@ static void trove_read_callback_fn(void
done = 1;
}
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
- if(ret == 1)
+ if (ret == 1)
{
gen_mutex_unlock(&flow_data->flow_mutex);
/* immediate completion; trigger callback ourselves */
@@ -665,9 +673,9 @@ static void trove_read_callback_fn(void
* returns 1 if flow completes, 0 otherwise
*/
static int bmi_send_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code,
- int initial_call_flag)
+ PVFS_size actual_size,
+ PVFS_error error_code,
+ int initial_call_flag)
{
struct fp_queue_item* q_item = user_ptr;
struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
@@ -678,10 +686,10 @@ static int bmi_send_callback_fn(void *us
PVFS_size bytes_processed = 0;
void* tmp_user_ptr = NULL;
- if(flow_data->parent->error_code != 0 && initial_call_flag)
+ if (flow_data->parent->error_code != 0 && initial_call_flag)
{
/* cleanup path already triggered, don't do anything more */
- return(1);
+ return 1;
}
gen_mutex_lock(&flow_data->flow_mutex);
@@ -691,7 +699,7 @@ static int bmi_send_callback_fn(void *us
q_item->posted_id = 0;
- if(flow_data->parent->error_code != 0 && initial_call_flag)
+ if (flow_data->parent->error_code != 0 && initial_call_flag)
{
/* cleanup path already triggered, don't do anything more */
/* TODO: there is a race here; flow_mutex may be freed already? */
@@ -699,22 +707,22 @@ static int bmi_send_callback_fn(void *us
* other callbacks; dropping mutex in the process. Need to fix.
*/
gen_mutex_unlock(&flow_data->flow_mutex);
- return(1);
+ return 1;
}
- if(error_code != 0 || flow_data->parent->error_code != 0)
+ if (error_code != 0 || flow_data->parent->error_code != 0)
{
handle_io_error(error_code, q_item, flow_data);
- if(flow_data->parent->state == FLOW_COMPLETE)
+ if (flow_data->parent->state == FLOW_COMPLETE)
{
gen_mutex_unlock(&flow_data->flow_mutex);
FLOW_CLEANUP(flow_data);
- return(1);
+ return 1;
}
else
{
gen_mutex_unlock(&flow_data->flow_mutex);
- return(0);
+ return 0;
}
}
@@ -722,7 +730,7 @@ static int bmi_send_callback_fn(void *us
flow_data->parent->total_transfered += actual_size;
- if(q_item->buffer)
+ if (q_item->buffer)
{
flow_data->dest_pending--;
}
@@ -732,17 +740,18 @@ static int bmi_send_callback_fn(void *us
}
/* if this was the last operation, then mark the flow as done */
- if(flow_data->initial_posts == 0 &&
+ if (flow_data->initial_posts == 0 &&
flow_data->dest_pending == 0 &&
flow_data->dest_last_posted)
{
q_item->parent->state = FLOW_COMPLETE;
gen_mutex_unlock(&flow_data->flow_mutex);
FLOW_CLEANUP(flow_data);
- return(1);
+
+ return 1;
}
- if(q_item->buffer)
+ if (q_item->buffer)
{
/* if this q_item has been used before, remove it from its
* current queue */
@@ -765,13 +774,13 @@ static int bmi_send_callback_fn(void *us
old_result_tmp = result_tmp;
tmp_buffer = q_item->buffer;
q_item->buffer_used = 0;
- do{
+ do {
q_item->result_chain_count++;
- if(!result_tmp)
+ if (!result_tmp)
{
- result_tmp = (struct result_chain_entry*)malloc(
- sizeof(struct result_chain_entry));
- assert(result_tmp);
+ result_tmp = (struct result_chain_entry *)
+ malloc(sizeof(struct result_chain_entry));
+ assert(result_tmp != NULL);
memset(result_tmp, 0 , sizeof(struct result_chain_entry));
old_result_tmp->next = result_tmp;
}
@@ -788,10 +797,10 @@ static int bmi_send_callback_fn(void *us
q_item->seq = flow_data->next_seq;
flow_data->next_seq++;
ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &result_tmp->result,
- PINT_SERVER);
+ q_item->parent->mem_req_state,
+ &q_item->parent->file_data,
+ &result_tmp->result,
+ PINT_SERVER);
/* TODO: error handling */
assert(ret >= 0);
@@ -800,30 +809,32 @@ static int bmi_send_callback_fn(void *us
tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
bytes_processed += old_result_tmp->result.bytes;
q_item->buffer_used += old_result_tmp->result.bytes;
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+ } while(bytes_processed < BUFFER_SIZE &&
+ !PINT_REQUEST_DONE(q_item->parent->file_req_state));
flow_data->total_bytes_processed += bytes_processed;
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
{
q_item->last = 1;
/* special case, we never have a "last" operation when there
* is no work to do, trigger manually
*/
- if(flow_data->total_bytes_processed == 0)
+ if (flow_data->total_bytes_processed == 0)
+ {
flow_data->dest_last_posted = 1;
+ }
}
- if(bytes_processed == 0)
+ if (bytes_processed == 0)
{
gen_mutex_unlock(&flow_data->flow_mutex);
- return(0);
+ return 0;
}
assert(q_item->buffer_used);
result_tmp = &q_item->result_chain;
- do{
+ do {
assert(q_item->buffer_used);
assert(result_tmp->result.bytes);
result_tmp->q_item = q_item;
@@ -846,33 +857,33 @@ static int bmi_send_callback_fn(void *us
&result_tmp->posted_id);
result_tmp = result_tmp->next;
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
if(flow_data->parent->state == FLOW_COMPLETE)
{
gen_mutex_unlock(&flow_data->flow_mutex);
FLOW_CLEANUP(flow_data);
- return(1);
+ return 1;
}
else
{
gen_mutex_unlock(&flow_data->flow_mutex);
- return(0);
+ return 0;
}
}
- if(ret == 1)
+ if (ret == 1)
{
/* immediate completion; trigger callback ourselves */
gen_mutex_unlock(&flow_data->flow_mutex);
trove_read_callback_fn(tmp_user_ptr, 0);
gen_mutex_lock(&flow_data->flow_mutex);
}
- }while(result_tmp);
+ } while(result_tmp);
gen_mutex_unlock(&flow_data->flow_mutex);
- return(0);
+ return 0;
};
/* trove_write_callback_fn()
@@ -882,7 +893,7 @@ static int bmi_send_callback_fn(void *us
* no return value
*/
static void trove_write_callback_fn(void *user_ptr,
- PVFS_error error_code)
+ PVFS_error error_code)
{
PVFS_size tmp_actual_size;
int ret;
@@ -896,18 +907,18 @@ static void trove_write_callback_fn(void
gen_mutex_lock(&flow_data->flow_mutex);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue trove_write_callback_fn.\n");
+ "flowproto-multiqueue trove_write_callback_fn.\n");
result_tmp->posted_id = 0;
- if(error_code != 0 || flow_data->parent->error_code != 0)
+ if (error_code != 0 || flow_data->parent->error_code != 0)
{
handle_io_error(error_code, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
/* don't do anything until the last write completes */
- if(q_item->result_chain_count > 1)
+ if (q_item->result_chain_count > 1)
{
q_item->result_chain_count--;
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -915,15 +926,17 @@ static void trove_write_callback_fn(void
}
result_tmp = &q_item->result_chain;
- do{
+ do {
q_item->parent->total_transfered += result_tmp->result.bytes;
PINT_perf_count(PINT_PERF_WRITE, result_tmp->result.bytes,
PINT_PERF_ADD);
old_result_tmp = result_tmp;
result_tmp = result_tmp->next;
- if(old_result_tmp != &q_item->result_chain)
+ if (old_result_tmp != &q_item->result_chain)
+ {
free(old_result_tmp);
- }while(result_tmp);
+ }
+ } while(result_tmp);
q_item->result_chain.next = NULL;
q_item->result_chain_count = 0;
@@ -939,13 +952,13 @@ static void trove_write_callback_fn(void
}
/* if there are no more receives to post, just return */
- if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+ if (PINT_REQUEST_DONE(flow_data->parent->file_req_state))
{
gen_mutex_unlock(&flow_data->flow_mutex);
return;
}
- if(q_item->buffer)
+ if (q_item->buffer)
{
/* if this q_item has been used before, remove it from its
* current queue */
@@ -964,7 +977,7 @@ static void trove_write_callback_fn(void
/* if src list is empty, then post new recv; otherwise just queue
* in empty list
*/
- if(qlist_empty(&flow_data->src_list))
+ if (qlist_empty(&flow_data->src_list))
{
/* ready to post new recv! */
qlist_add_tail(&q_item->list_link, &flow_data->src_list);
@@ -972,13 +985,13 @@ static void trove_write_callback_fn(void
result_tmp = &q_item->result_chain;
old_result_tmp = result_tmp;
tmp_buffer = q_item->buffer;
- do{
+ do {
q_item->result_chain_count++;
- if(!result_tmp)
+ if (!result_tmp)
{
- result_tmp = (struct result_chain_entry*)malloc(
- sizeof(struct result_chain_entry));
- assert(result_tmp);
+ result_tmp = (struct result_chain_entry *)
+ malloc(sizeof(struct result_chain_entry));
+ assert(result_tmp != NULL);
memset(result_tmp, 0 , sizeof(struct result_chain_entry));
old_result_tmp->next = result_tmp;
}
@@ -993,10 +1006,10 @@ static void trove_write_callback_fn(void
result_tmp->result.segs = 0;
result_tmp->buffer_offset = tmp_buffer;
ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &result_tmp->result,
- PINT_SERVER);
+ q_item->parent->mem_req_state,
+ &q_item->parent->file_data,
+ &result_tmp->result,
+ PINT_SERVER);
/* TODO: error handling */
assert(ret >= 0);
@@ -1004,12 +1017,12 @@ static void trove_write_callback_fn(void
result_tmp = result_tmp->next;
tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
bytes_processed += old_result_tmp->result.bytes;
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+ } while(bytes_processed < BUFFER_SIZE &&
+ !PINT_REQUEST_DONE(q_item->parent->file_req_state));
flow_data->total_bytes_processed += bytes_processed;
- if(bytes_processed == 0)
+ if (bytes_processed == 0)
{
gen_mutex_unlock(&flow_data->flow_mutex);
return;
@@ -1017,22 +1030,22 @@ static void trove_write_callback_fn(void
/* TODO: what if we recv less than expected? */
ret = BMI_post_recv(&q_item->posted_id,
- q_item->parent->src.u.bmi.address,
- q_item->buffer,
- BUFFER_SIZE,
- &tmp_actual_size,
- BMI_PRE_ALLOC,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
+ q_item->parent->src.u.bmi.address,
+ q_item->buffer,
+ BUFFER_SIZE,
+ &tmp_actual_size,
+ BMI_PRE_ALLOC,
+ q_item->parent->tag,
+ &q_item->bmi_callback,
+ global_bmi_context);
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
}
- if(ret == 1)
+ if (ret == 1)
{
gen_mutex_unlock(&flow_data->flow_mutex);
/* immediate completion; trigger callback ourselves */
@@ -1043,7 +1056,7 @@ static void trove_write_callback_fn(void
else
{
qlist_add_tail(&q_item->list_link,
- &(flow_data->empty_list));
+ &(flow_data->empty_list));
}
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1063,68 +1076,77 @@ static void cleanup_buffers(struct fp_pr
struct result_chain_entry* result_tmp;
struct result_chain_entry* old_result_tmp;
- if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+ if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
{
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for (i=0; i < BUFFERS_PER_FLOW; i++)
{
- if(flow_data->prealloc_array[i].buffer)
+ if (flow_data->prealloc_array[i].buffer)
{
BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->prealloc_array[i].buffer,
- BUFFER_SIZE,
- BMI_RECV);
+ flow_data->prealloc_array[i].buffer,
+ BUFFER_SIZE,
+ BMI_RECV);
}
result_tmp = &(flow_data->prealloc_array[i].result_chain);
- do{
+ do {
old_result_tmp = result_tmp;
result_tmp = result_tmp->next;
- if(old_result_tmp !=
+
+ if (old_result_tmp !=
&(flow_data->prealloc_array[i].result_chain))
+ {
free(old_result_tmp);
- }while(result_tmp);
+ }
+ } while(result_tmp);
flow_data->prealloc_array[i].result_chain.next = NULL;
}
}
- else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
+ flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
{
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for (i=0; i < BUFFERS_PER_FLOW; i++)
{
- if(flow_data->prealloc_array[i].buffer)
+ if (flow_data->prealloc_array[i].buffer)
{
BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->prealloc_array[i].buffer,
- BUFFER_SIZE,
- BMI_SEND);
+ flow_data->prealloc_array[i].buffer,
+ BUFFER_SIZE,
+ BMI_SEND);
}
result_tmp = &(flow_data->prealloc_array[i].result_chain);
- do{
+ do {
old_result_tmp = result_tmp;
result_tmp = result_tmp->next;
if(old_result_tmp !=
- &(flow_data->prealloc_array[i].result_chain))
+ &(flow_data->prealloc_array[i].result_chain))
+ {
free(old_result_tmp);
- }while(result_tmp);
+ }
+ } while(result_tmp);
flow_data->prealloc_array[i].result_chain.next = NULL;
}
}
- else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
+ flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
{
- if(flow_data->intermediate)
+ if (flow_data->intermediate)
{
BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
+ flow_data->intermediate,
+ BUFFER_SIZE,
+ BMI_SEND);
}
}
- else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
+ else if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+ flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
{
- if(flow_data->intermediate)
+ if (flow_data->intermediate)
{
BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
+ flow_data->intermediate,
+ BUFFER_SIZE,
+ BMI_RECV);
}
}
@@ -1139,8 +1161,8 @@ static void cleanup_buffers(struct fp_pr
* no return value
*/
static void mem_to_bmi_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
struct fp_queue_item* q_item = user_ptr;
int ret;
@@ -1153,11 +1175,11 @@ static void mem_to_bmi_callback_fn(void
gen_mutex_lock(&flow_data->flow_mutex);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue mem_to_bmi_callback_fn.\n");
+ "flowproto-multiqueue mem_to_bmi_callback_fn.\n");
q_item->posted_id = 0;
- if(error_code != 0 || flow_data->parent->error_code != 0)
+ if (error_code != 0 || flow_data->parent->error_code != 0)
{
handle_io_error(error_code, q_item, flow_data);
ERROR_CLEANUP(flow_data);
@@ -1166,7 +1188,7 @@ static void mem_to_bmi_callback_fn(void
flow_data->parent->total_transfered += actual_size;
/* are we done? */
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
{
q_item->parent->state = FLOW_COMPLETE;
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1185,20 +1207,20 @@ static void mem_to_bmi_callback_fn(void
q_item->result_chain.result.segs = 0;
q_item->result_chain.buffer_offset = NULL;
ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
-
+ q_item->parent->mem_req_state,
+ &q_item->parent->file_data,
+ &q_item->result_chain.result,
+ PINT_CLIENT);
+
/* TODO: error handling */
assert(ret >= 0);
/* was MAX_REGIONS enough to satisfy this step? */
- if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
+ if (!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
q_item->result_chain.result.bytes < BUFFER_SIZE)
{
/* create an intermediate buffer */
- if(!flow_data->intermediate)
+ if (!flow_data->intermediate)
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->dest.u.bmi.address,
@@ -1208,10 +1230,10 @@ static void mem_to_bmi_callback_fn(void
}
/* copy what we have so far into intermediate buffer */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for (i=0; i < q_item->result_chain.result.segs; i++)
{
src_ptr = ((char*)q_item->parent->src.u.mem.buffer +
- q_item->result_chain.offset_list[i]);
+ q_item->result_chain.offset_list[i]);
dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
bytes_processed += q_item->result_chain.size_list[i];
@@ -1234,16 +1256,16 @@ static void mem_to_bmi_callback_fn(void
assert(ret >= 0);
/* copy what we have so far into intermediate buffer */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for(i=0; i < q_item->result_chain.result.segs; i++)
{
src_ptr = ((char*)q_item->parent->src.u.mem.buffer +
- q_item->result_chain.offset_list[i]);
+ q_item->result_chain.offset_list[i]);
dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
bytes_processed += q_item->result_chain.size_list[i];
}
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+ } while(bytes_processed < BUFFER_SIZE &&
+ !PINT_REQUEST_DONE(q_item->parent->file_req_state));
/* setup for BMI operation */
flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1264,7 +1286,7 @@ static void mem_to_bmi_callback_fn(void
}
/* convert offsets to memory addresses */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for(i=0; i < q_item->result_chain.result.segs; i++)
{
flow_data->tmp_buffer_list[i] =
(void*)(q_item->result_chain.result.offset_array[i] +
@@ -1273,17 +1295,17 @@ static void mem_to_bmi_callback_fn(void
}
ret = BMI_post_send_list(&q_item->posted_id,
- q_item->parent->dest.u.bmi.address,
- (const void**)flow_data->tmp_buffer_list,
- q_item->result_chain.result.size_array,
- q_item->result_chain.result.segs,
- q_item->result_chain.result.bytes,
- buffer_type,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
+ q_item->parent->dest.u.bmi.address,
+ (const void**)flow_data->tmp_buffer_list,
+ q_item->result_chain.result.size_array,
+ q_item->result_chain.result.segs,
+ q_item->result_chain.result.bytes,
+ buffer_type,
+ q_item->parent->tag,
+ &q_item->bmi_callback,
+ global_bmi_context);
- if(ret < 0)
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
@@ -1291,10 +1313,11 @@ static void mem_to_bmi_callback_fn(void
gen_mutex_unlock(&flow_data->flow_mutex);
- if(ret == 1)
+ if (ret == 1)
{
mem_to_bmi_callback_fn(q_item,
- q_item->result_chain.result.bytes, 0);
+ q_item->result_chain.result.bytes,
+ 0);
}
return;
@@ -1309,8 +1332,8 @@ static void mem_to_bmi_callback_fn(void
* no return value
*/
static void bmi_to_mem_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
struct fp_queue_item* q_item = user_ptr;
int ret;
@@ -1332,7 +1355,7 @@ static void bmi_to_mem_callback_fn(void
q_item->posted_id = 0;
- if(error_code != 0 || flow_data->parent->error_code != 0)
+ if (error_code != 0 || flow_data->parent->error_code != 0)
{
handle_io_error(error_code, q_item, flow_data);
ERROR_CLEANUP(flow_data);
@@ -1342,11 +1365,11 @@ static void bmi_to_mem_callback_fn(void
/* if this is the result of a receive into an intermediate buffer,
* then we must copy out */
- if(flow_data->tmp_buffer_list[0] == flow_data->intermediate &&
+ if (flow_data->tmp_buffer_list[0] == flow_data->intermediate &&
flow_data->intermediate != NULL)
{
/* copy out what we have so far */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for (i=0; i < q_item->result_chain.result.segs; i++)
{
region_size = q_item->result_chain.size_list[i];
src_ptr = (char*)(flow_data->intermediate +
@@ -1366,14 +1389,14 @@ static void bmi_to_mem_callback_fn(void
q_item->result_chain.buffer_offset = NULL;
/* process ahead */
ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
+ q_item->parent->mem_req_state,
+ &q_item->parent->file_data,
+ &q_item->result_chain.result,
+ PINT_CLIENT);
/* TODO: error handling */
assert(ret >= 0);
/* copy out what we have so far */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for (i=0; i<q_item->result_chain.result.segs; i++)
{
region_size = q_item->result_chain.size_list[i];
src_ptr = (char*)(flow_data->intermediate +
@@ -1383,12 +1406,12 @@ static void bmi_to_mem_callback_fn(void
memcpy(dest_ptr, src_ptr, region_size);
bytes_processed += region_size;
}
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+ } while(bytes_processed < BUFFER_SIZE &&
+ !PINT_REQUEST_DONE(q_item->parent->file_req_state));
}
/* are we done? */
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+ if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
{
q_item->parent->state = FLOW_COMPLETE;
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1415,11 +1438,11 @@ static void bmi_to_mem_callback_fn(void
assert(ret >= 0);
/* was MAX_REGIONS enough to satisfy this step? */
- if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
+ if (!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
q_item->result_chain.result.bytes < BUFFER_SIZE)
{
/* create an intermediate buffer */
- if(!flow_data->intermediate)
+ if (!flow_data->intermediate)
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->src.u.bmi.address,
@@ -1444,7 +1467,7 @@ static void bmi_to_mem_callback_fn(void
total_size = q_item->result_chain.result.bytes;
/* convert offsets to memory addresses */
- for(i=0; i<q_item->result_chain.result.segs; i++)
+ for (i=0; i < q_item->result_chain.result.segs; i++)
{
flow_data->tmp_buffer_list[i] =
(void*)(q_item->result_chain.result.offset_array[i] +
@@ -1452,7 +1475,7 @@ static void bmi_to_mem_callback_fn(void
}
/* go ahead and return if there is nothing to do */
- if(q_item->result_chain.result.bytes == 0)
+ if (q_item->result_chain.result.bytes == 0)
{
q_item->parent->state = FLOW_COMPLETE;
gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1462,18 +1485,18 @@ static void bmi_to_mem_callback_fn(void
}
ret = BMI_post_recv_list(&q_item->posted_id,
- q_item->parent->src.u.bmi.address,
- flow_data->tmp_buffer_list,
- size_array,
- segs,
- total_size,
- &tmp_actual_size,
- BMI_EXT_ALLOC,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
-
- if(ret < 0)
+ q_item->parent->src.u.bmi.address,
+ flow_data->tmp_buffer_list,
+ size_array,
+ segs,
+ total_size,
+ &tmp_actual_size,
+ BMI_EXT_ALLOC,
+ q_item->parent->tag,
+ &q_item->bmi_callback,
+ global_bmi_context);
+
+ if (ret < 0)
{
handle_io_error(ret, q_item, flow_data);
ERROR_CLEANUP(flow_data);
@@ -1481,7 +1504,7 @@ static void bmi_to_mem_callback_fn(void
gen_mutex_unlock(&flow_data->flow_mutex);
- if(ret == 1)
+ if (ret == 1)
{
bmi_to_mem_callback_fn(q_item, tmp_actual_size, 0);
}
@@ -1499,16 +1522,17 @@ static void bmi_to_mem_callback_fn(void
*
* no return value
*/
-static void handle_io_error(PVFS_error error_code, struct fp_queue_item*
- q_item, struct fp_private_data* flow_data)
+static void handle_io_error(PVFS_error error_code,
+ struct fp_queue_item *q_item,
+ struct fp_private_data *flow_data)
{
int ret;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
- "flowproto-multiqueue error cleanup path.\n");
+ "flowproto-multiqueue error cleanup path.\n");
/* is this the first error registered for this particular flow? */
- if(flow_data->parent->error_code == 0)
+ if (flow_data->parent->error_code == 0)
{
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue first failure.\n");
@@ -1517,7 +1541,7 @@ static void handle_io_error(PVFS_error e
flow_data->cleanup_pending_count = 0;
/* cleanup depending on what endpoints are in use */
- if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+ if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
{
ret = cancel_pending_bmi(&flow_data->src_list);
@@ -1525,16 +1549,16 @@ static void handle_io_error(PVFS_error e
"flowproto-multiqueue canceling %d BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
- else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
+ flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
{
ret = cancel_pending_bmi(&flow_data->dest_list);
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue canceling %d BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
- else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+ else if (flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
+ flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
{
ret = cancel_pending_trove(&flow_data->src_list);
flow_data->cleanup_pending_count += ret;
@@ -1545,7 +1569,7 @@ static void handle_io_error(PVFS_error e
"flowproto-multiqueue canceling %d BMI ops.\n", ret);
flow_data->cleanup_pending_count += ret;
}
- else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+ else if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
{
ret = cancel_pending_bmi(&flow_data->src_list);
@@ -1572,7 +1596,7 @@ static void handle_io_error(PVFS_error e
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "cleanup_pending_count: %d\n",
flow_data->cleanup_pending_count);
- if(flow_data->cleanup_pending_count == 0)
+ if (flow_data->cleanup_pending_count == 0)
{
/* we are finished, make sure error is marked and state is set */
assert(flow_data->parent->error_code);
@@ -1602,14 +1626,14 @@ static int cancel_pending_bmi(struct qli
q_item = qlist_entry(tmp_link, struct fp_queue_item,
list_link);
/* skip anything that is in the queue but not actually posted */
- if(q_item->posted_id)
+ if (q_item->posted_id)
{
count++;
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowprotocol cleanup: unposting BMI operation.\n");
ret = PINT_thread_mgr_bmi_cancel(q_item->posted_id,
&q_item->bmi_callback);
- if(ret < 0)
+ if (ret < 0)
{
gossip_err("WARNING: BMI thread mgr cancel failed, proceeding anyway.\n");
}
@@ -1641,7 +1665,7 @@ static int cancel_pending_trove(struct q
list_link);
result_tmp = &q_item->result_chain;
- do{
+ do {
old_result_tmp = result_tmp;
result_tmp = result_tmp->next;
@@ -1651,17 +1675,16 @@ static int cancel_pending_trove(struct q
ret = PINT_thread_mgr_trove_cancel(old_result_tmp->posted_id,
q_item->parent->src.u.trove.coll_id,
&old_result_tmp->trove_callback);
- if(ret < 0)
+ if (ret < 0)
{
gossip_err("WARNING: Trove thread mgr cancel failed, proceeding anyway.\n");
}
}
- }while(result_tmp);
+ } while(result_tmp);
}
- return (count);
+ return count;
}
-
/*
* Local variables:
More information about the PVFS2-CVS
mailing list