[PVFS2-CVS] commit by rbross in pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Mon Feb 16 16:22:43 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb:/tmp/cvs-serv32348/src/io/flow/flowproto-bmi-trove

Modified Files:
	flowproto-multiqueue.c 
Log Message:
combo PVFS_id_gen_t -> PVFS_BMI_addr_t, formatting, PVFS error code patch.
Ugly.  Sorry if my formatting pisses someone off; at least I'm not using >
80 columns any more :).


Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.65 -r1.66
--- flowproto-multiqueue.c	14 Feb 2004 23:19:25 -0000	1.65
+++ flowproto-multiqueue.c	16 Feb 2004 21:22:43 -0000	1.66
@@ -109,40 +109,41 @@ static bmi_context_id global_bmi_context
 #ifdef __PVFS2_TROVE_SUPPORT__
 static TROVE_context_id global_trove_context = -1;
 static void bmi_recv_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code);
+				 PVFS_size actual_size,
+				 PVFS_error error_code);
 
 static int bmi_send_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code,
-			 int initial_call_flag);
+				PVFS_size actual_size,
+				PVFS_error error_code,
+				int initial_call_flag);
 /* the above function is a special case; we need to look at a return
  * value when we invoke it directly, so we use the following function
  * to trigger it from a callback
  */
 static void bmi_send_callback_wrapper(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				      PVFS_size actual_size,
+				      PVFS_error error_code)
 {
     bmi_send_callback_fn(user_ptr, actual_size, error_code, 0);
     return;
 };
 static void trove_read_callback_fn(void *user_ptr,
-		           PVFS_error error_code);
+				   PVFS_error error_code);
 static void trove_write_callback_fn(void *user_ptr,
-		           PVFS_error error_code);
+				    PVFS_error error_code);
 #endif
 static void mem_to_bmi_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code);
+				   PVFS_size actual_size,
+				   PVFS_error error_code);
 static void bmi_to_mem_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code);
-static void cleanup_buffers(struct fp_private_data* flow_data);
-static void handle_io_error(PVFS_error error_code, struct fp_queue_item*
-    q_item, struct fp_private_data* flow_data);
-static int cancel_pending_bmi(struct qlist_head* list);
-static int cancel_pending_trove(struct qlist_head* list);
+				   PVFS_size actual_size,
+				   PVFS_error error_code);
+static void cleanup_buffers(struct fp_private_data *flow_data);
+static void handle_io_error(PVFS_error error_code,
+			    struct fp_queue_item *q_item,
+			    struct fp_private_data *flow_data);
+static int cancel_pending_bmi(struct qlist_head *list);
+static int cancel_pending_trove(struct qlist_head *list);
 
 
 /* interface prototypes */
@@ -151,12 +152,12 @@ static int fp_multiqueue_initialize(int 
 static int fp_multiqueue_finalize(void);
 
 static int fp_multiqueue_getinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter);
+				 int option,
+				 void *parameter);
 
 static int fp_multiqueue_setinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter);
+				 int option,
+				 void *parameter);
 
 static int fp_multiqueue_post(flow_descriptor * flow_d);
 
@@ -182,23 +183,23 @@ int fp_multiqueue_initialize(int flowpro
     int ret = -1;
 
     ret = PINT_thread_mgr_bmi_start();
-    if(ret < 0)
-	return(ret);
+    if (ret < 0)
+	return ret;
     PINT_thread_mgr_bmi_getcontext(&global_bmi_context);
 
 #ifdef __PVFS2_TROVE_SUPPORT__
     ret = PINT_thread_mgr_trove_start();
-    if(ret < 0)
+    if (ret < 0)
     {
 	PINT_thread_mgr_bmi_stop();
-	return(ret);
+	return ret;
     }
     PINT_thread_mgr_trove_getcontext(&global_trove_context);
 #endif
 
     fp_multiqueue_id = flowproto_id;
 
-    return(0);
+    return 0;
 }
 
 /* fp_multiqueue_finalize()
@@ -213,7 +214,7 @@ int fp_multiqueue_finalize(void)
 #ifdef __PVFS2_TROVE_SUPPORT__
     PINT_thread_mgr_trove_stop();
 #endif
-    return (0);
+    return 0;
 }
 
 /* fp_multiqueue_getinfo()
@@ -222,9 +223,9 @@ int fp_multiqueue_finalize(void)
  *
  * returns 0 on success, -PVFS_error on failure
  */
-int fp_multiqueue_getinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter)
+int fp_multiqueue_getinfo(flow_descriptor *flow_d,
+			  int option,
+			  void *parameter)
 {
     int* type;
 
@@ -233,11 +234,11 @@ int fp_multiqueue_getinfo(flow_descripto
 	case FLOWPROTO_TYPE_QUERY:
 	    type = parameter;
 	    if(*type == FLOWPROTO_MULTIQUEUE)
-		return(0);
+		return 0;
 	    else
-		return(-PVFS_ENOPROTOOPT);
+		return -PVFS_ENOPROTOOPT;
 	default:
-	    return(-PVFS_ENOSYS);
+	    return -PVFS_ENOSYS;
 	    break;
     }
 }
@@ -249,10 +250,10 @@ int fp_multiqueue_getinfo(flow_descripto
  * returns 0 on success, -PVFS_error on failure
  */
 int fp_multiqueue_setinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter)
+			  int option,
+			  void *parameter)
 {
-    return (-PVFS_ENOSYS);
+    return -PVFS_ENOSYS;
 }
 
 /* fp_multiqueue_post()
@@ -275,10 +276,10 @@ int fp_multiqueue_post(flow_descriptor *
 	   (flow_d->src.endpoint_id == BMI_ENDPOINT &&
 	    flow_d->dest.endpoint_id == MEM_ENDPOINT));
 
-    flow_data = (struct fp_private_data*)malloc(sizeof(struct
-	fp_private_data));
-    if(!flow_data)
-	return(-PVFS_ENOMEM);
+    flow_data = (struct fp_private_data *)
+	malloc(sizeof(struct fp_private_data));
+    if (flow_data == NULL)
+	return -PVFS_ENOMEM;
     memset(flow_data, 0, sizeof(struct fp_private_data));
     
     flow_d->flow_protocol_data = flow_data;
@@ -292,12 +293,14 @@ int fp_multiqueue_post(flow_descriptor *
     /* if a file datatype offset was specified, go ahead and skip ahead 
      * before doing anything else
      */
-    if(flow_d->file_req_offset)
+    if (flow_d->file_req_offset)
+    {
 	PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state,
 	    flow_d->file_req_offset);
+    }
 
     /* set boundaries on file datatype */
-    if(flow_d->aggregate_size > -1)
+    if (flow_d->aggregate_size > -1)
     {
 	PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
 	    flow_d->aggregate_size+flow_d->file_req_offset);
@@ -309,7 +312,7 @@ int fp_multiqueue_post(flow_descriptor *
 	    PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
     }
 
-    for(i=0; i<BUFFERS_PER_FLOW; i++)
+    for (i=0; i < BUFFERS_PER_FLOW; i++)
     {
 	flow_data->prealloc_array[i].parent = flow_d;
 	flow_data->prealloc_array[i].bmi_callback.data = 
@@ -317,26 +320,26 @@ int fp_multiqueue_post(flow_descriptor *
     }
 
     /* remaining setup depends on the endpoints we intend to use */
-    if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
+    if (flow_d->src.endpoint_id == BMI_ENDPOINT &&
 	flow_d->dest.endpoint_id == MEM_ENDPOINT)
     {
 	flow_data->prealloc_array[0].buffer = flow_d->dest.u.mem.buffer;
 	flow_data->prealloc_array[0].bmi_callback.fn = bmi_to_mem_callback_fn;
 	bmi_to_mem_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
     }
-    else if(flow_d->src.endpoint_id == MEM_ENDPOINT &&
-	flow_d->dest.endpoint_id == BMI_ENDPOINT)
+    else if (flow_d->src.endpoint_id == MEM_ENDPOINT &&
+	     flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
 	flow_data->prealloc_array[0].buffer = flow_d->src.u.mem.buffer;
 	flow_data->prealloc_array[0].bmi_callback.fn = mem_to_bmi_callback_fn;
 	mem_to_bmi_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
     }
 #ifdef __PVFS2_TROVE_SUPPORT__
-    else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
-	flow_d->dest.endpoint_id == BMI_ENDPOINT)
+    else if (flow_d->src.endpoint_id == TROVE_ENDPOINT &&
+	     flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
 	flow_data->initial_posts = BUFFERS_PER_FLOW;
-	for(i=0; i<BUFFERS_PER_FLOW; i++)
+	for (i=0; i < BUFFERS_PER_FLOW; i++)
 	{
 	    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
 		"flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -344,14 +347,14 @@ int fp_multiqueue_post(flow_descriptor *
 	    bmi_send_callback_fn(&(flow_data->prealloc_array[i]), 0, 0, 1);
 	}
     }
-    else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
-	flow_d->dest.endpoint_id == TROVE_ENDPOINT)
+    else if (flow_d->src.endpoint_id == BMI_ENDPOINT &&
+	     flow_d->dest.endpoint_id == TROVE_ENDPOINT)
     {
 	/* only post one outstanding recv at a time; easier to manage */
 	flow_data->initial_posts = 1;
 
 	/* place remaining buffers on "empty" queue */
-	for(i=1; i<BUFFERS_PER_FLOW; i++)
+	for (i=1; i < BUFFERS_PER_FLOW; i++)
 	{
 	    qlist_add_tail(&flow_data->prealloc_array[i].list_link,
 		&flow_data->empty_list);
@@ -380,8 +383,8 @@ int fp_multiqueue_post(flow_descriptor *
  * no return value
  */
 static void bmi_recv_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				 PVFS_size actual_size,
+				 PVFS_error error_code)
 {
     struct fp_queue_item* q_item = user_ptr;
     int ret;
@@ -411,46 +414,46 @@ static void bmi_recv_callback_fn(void *u
     /* add to dest queue */
     qlist_add_tail(&q_item->list_link, &flow_data->dest_list);
     result_tmp = &q_item->result_chain;
-    do{
+    do {
 	assert(result_tmp->result.bytes);
 	result_tmp->q_item = q_item;
 	result_tmp->trove_callback.data = result_tmp;
 	result_tmp->trove_callback.fn = trove_write_callback_fn;
 	tmp_user_ptr = &result_tmp->trove_callback;
 	ret = trove_bstream_write_list(q_item->parent->dest.u.trove.coll_id,
-	    q_item->parent->dest.u.trove.handle,
-	    (char**)&result_tmp->buffer_offset,
-	    &result_tmp->result.bytes,
-	    1,
-	    result_tmp->result.offset_array,
-	    result_tmp->result.size_array,
-	    result_tmp->result.segs,
-	    &result_tmp->result.bytes,
-	    0,
-	    NULL,
-	    &result_tmp->trove_callback,
-	    global_trove_context,
-	    &result_tmp->posted_id);
+				       q_item->parent->dest.u.trove.handle,
+				       (char**)&result_tmp->buffer_offset,
+				       &result_tmp->result.bytes,
+				       1,
+				       result_tmp->result.offset_array,
+				       result_tmp->result.size_array,
+				       result_tmp->result.segs,
+				       &result_tmp->result.bytes,
+				       0,
+				       NULL,
+				       &result_tmp->trove_callback,
+				       global_trove_context,
+				       &result_tmp->posted_id);
 	result_tmp = result_tmp->next;
 
-	if(ret < 0)
+	if (ret < 0)
 	{
 	    handle_io_error(ret, q_item, flow_data);
 	    ERROR_CLEANUP(flow_data);
 	}
 
-	if(ret == 1)
+	if (ret == 1)
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    /* immediate completion; trigger callback ourselves */
 	    trove_write_callback_fn(tmp_user_ptr, 0);
 	    gen_mutex_lock(&flow_data->flow_mutex);
 	}
-    }while(result_tmp);
+    } while(result_tmp);
 
     /* do we need to repost another recv? */
 
-    if((!PINT_REQUEST_DONE(q_item->parent->file_req_state)) 
+    if ((!PINT_REQUEST_DONE(q_item->parent->file_req_state)) 
 	&& qlist_empty(&flow_data->src_list) 
 	&& !qlist_empty(&flow_data->empty_list))
     {
@@ -459,7 +462,7 @@ static void bmi_recv_callback_fn(void *u
 	qlist_del(&q_item->list_link);
 	qlist_add_tail(&q_item->list_link, &flow_data->src_list);
 
-	if(!q_item->buffer)
+	if (!q_item->buffer)
 	{
 	    /* if the q_item has not been used, allocate a buffer */
 	    q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
@@ -474,7 +477,7 @@ static void bmi_recv_callback_fn(void *u
 	tmp_buffer = q_item->buffer;
 	do{
 	    q_item->result_chain_count++;
-	    if(!result_tmp)
+	    if (!result_tmp)
 	    {
 		result_tmp = (struct result_chain_entry*)malloc(
 		    sizeof(struct result_chain_entry));
@@ -504,10 +507,10 @@ static void bmi_recv_callback_fn(void *u
 	    result_tmp = result_tmp->next;
 	    tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
 	    bytes_processed += old_result_tmp->result.bytes;
-	}while(bytes_processed < BUFFER_SIZE && 
+	} while(bytes_processed < BUFFER_SIZE && 
 	    !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-	if(bytes_processed == 0)
+	if (bytes_processed == 0)
 	{	
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    qlist_del(&q_item->list_link);
@@ -528,13 +531,13 @@ static void bmi_recv_callback_fn(void *u
 	    &q_item->bmi_callback,
 	    global_bmi_context);
 	
-	if(ret < 0)
+	if (ret < 0)
 	{
 	    handle_io_error(ret, q_item, flow_data);
 	    ERROR_CLEANUP(flow_data);
 	}
 
-	if(ret == 1)
+	if (ret == 1)
 	{
 	    /* immediate completion; trigger callback ourselves */
 	    gen_mutex_unlock(&flow_data->flow_mutex);
@@ -555,7 +558,7 @@ static void bmi_recv_callback_fn(void *u
  * no return value
  */
 static void trove_read_callback_fn(void *user_ptr,
-		           PVFS_error error_code)
+				   PVFS_error error_code)
 {
     int ret;
     struct result_chain_entry* result_tmp = user_ptr;
@@ -574,14 +577,14 @@ static void trove_read_callback_fn(void 
 
     result_tmp->posted_id = 0;
 
-    if(error_code != 0 || flow_data->parent->error_code != 0)
+    if (error_code != 0 || flow_data->parent->error_code != 0)
     {
 	handle_io_error(error_code, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
     }
 
     /* don't do anything until the last read completes */
-    if(q_item->result_chain_count > 1)
+    if (q_item->result_chain_count > 1)
     {
 	q_item->result_chain_count--;
 	gen_mutex_unlock(&flow_data->flow_mutex);
@@ -594,26 +597,29 @@ static void trove_read_callback_fn(void 
     qlist_add_tail(&q_item->list_link, &flow_data->dest_list);
 
     result_tmp = &q_item->result_chain;
-    do{
+    do {
 	old_result_tmp = result_tmp;
 	result_tmp = result_tmp->next;
 	if(old_result_tmp != &q_item->result_chain)
 	    free(old_result_tmp);
-    }while(result_tmp);
+    } while(result_tmp);
     q_item->result_chain.next = NULL;
     q_item->result_chain_count = 0;
 
     /* while we hold dest lock, look for next seq no. to send */
-    do{
+    do {
 	qlist_for_each(tmp_link, &flow_data->dest_list)
 	{
-	    q_item = qlist_entry(tmp_link, struct fp_queue_item,
-		list_link);
-	    if(q_item->seq == flow_data->next_seq_to_send)
+	    q_item = qlist_entry(tmp_link,
+				 struct fp_queue_item,
+				 list_link);
+	    if (q_item->seq == flow_data->next_seq_to_send)
+	    {
 		break;
+	    }
 	}
 
-	if(q_item->seq == flow_data->next_seq_to_send)
+	if (q_item->seq == flow_data->next_seq_to_send)
 	{
 	    flow_data->dest_pending++;
             assert(q_item->buffer_used);
@@ -626,8 +632,10 @@ static void trove_read_callback_fn(void 
 		&q_item->bmi_callback,
 		global_bmi_context);
 	    flow_data->next_seq_to_send++;
-	    if(q_item->last)
+	    if (q_item->last)
+	    {
 		flow_data->dest_last_posted = 1;
+	    }
 	}
 	else
 	{
@@ -635,13 +643,13 @@ static void trove_read_callback_fn(void 
 	    done = 1;
 	}	
 
-	if(ret < 0)
+	if (ret < 0)
 	{
 	    handle_io_error(ret, q_item, flow_data);
 	    ERROR_CLEANUP(flow_data);
 	}
 
-	if(ret == 1)
+	if (ret == 1)
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    /* immediate completion; trigger callback ourselves */
@@ -665,9 +673,9 @@ static void trove_read_callback_fn(void 
  * returns 1 if flow completes, 0 otherwise
  */
 static int bmi_send_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code,
-			 int initial_call_flag)
+				PVFS_size actual_size,
+				PVFS_error error_code,
+				int initial_call_flag)
 {
     struct fp_queue_item* q_item = user_ptr;
     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
@@ -678,10 +686,10 @@ static int bmi_send_callback_fn(void *us
     PVFS_size bytes_processed = 0;
     void* tmp_user_ptr = NULL;
 
-    if(flow_data->parent->error_code != 0 && initial_call_flag)
+    if (flow_data->parent->error_code != 0 && initial_call_flag)
     {
 	/* cleanup path already triggered, don't do anything more */
-	return(1);
+	return 1;
     }
 
     gen_mutex_lock(&flow_data->flow_mutex);
@@ -691,7 +699,7 @@ static int bmi_send_callback_fn(void *us
 
     q_item->posted_id = 0;
 
-    if(flow_data->parent->error_code != 0 && initial_call_flag)
+    if (flow_data->parent->error_code != 0 && initial_call_flag)
     {
 	/* cleanup path already triggered, don't do anything more */
 	/* TODO: there is a race here; flow_mutex may be freed already? */
@@ -699,22 +707,22 @@ static int bmi_send_callback_fn(void *us
 	 * other callbacks; dropping mutex in the process.  Need to fix.
 	 */
 	gen_mutex_unlock(&flow_data->flow_mutex);
-	return(1);
+	return 1;
     }
 
-    if(error_code != 0 || flow_data->parent->error_code != 0)
+    if (error_code != 0 || flow_data->parent->error_code != 0)
     {
 	handle_io_error(error_code, q_item, flow_data);
-	if(flow_data->parent->state == FLOW_COMPLETE)
+	if (flow_data->parent->state == FLOW_COMPLETE)
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    FLOW_CLEANUP(flow_data);
-	    return(1);
+	    return 1;
 	}
 	else
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
-	    return(0);
+	    return 0;
 	}
     }
 
@@ -722,7 +730,7 @@ static int bmi_send_callback_fn(void *us
 
     flow_data->parent->total_transfered += actual_size;
 
-    if(q_item->buffer)
+    if (q_item->buffer)
     {
 	flow_data->dest_pending--;
     }
@@ -732,17 +740,18 @@ static int bmi_send_callback_fn(void *us
     }
 
     /* if this was the last operation, then mark the flow as done */
-    if(flow_data->initial_posts == 0 &&
+    if (flow_data->initial_posts == 0 &&
 	flow_data->dest_pending == 0 && 
 	flow_data->dest_last_posted)
     {
 	q_item->parent->state = FLOW_COMPLETE;
 	gen_mutex_unlock(&flow_data->flow_mutex);
 	FLOW_CLEANUP(flow_data);
-	return(1);
+
+	return 1;
     }
 
-    if(q_item->buffer)
+    if (q_item->buffer)
     {
 	/* if this q_item has been used before, remove it from its 
 	 * current queue */
@@ -765,13 +774,13 @@ static int bmi_send_callback_fn(void *us
     old_result_tmp = result_tmp;
     tmp_buffer = q_item->buffer;
     q_item->buffer_used = 0;
-    do{
+    do {
 	q_item->result_chain_count++;
-	if(!result_tmp)
+	if (!result_tmp)
 	{
-	    result_tmp = (struct result_chain_entry*)malloc(
-		sizeof(struct result_chain_entry));
-	    assert(result_tmp);
+	    result_tmp = (struct result_chain_entry *)
+		malloc(sizeof(struct result_chain_entry));
+	    assert(result_tmp != NULL);
 	    memset(result_tmp, 0 , sizeof(struct result_chain_entry));
 	    old_result_tmp->next = result_tmp;
 	}
@@ -788,10 +797,10 @@ static int bmi_send_callback_fn(void *us
 	q_item->seq = flow_data->next_seq;
 	flow_data->next_seq++;
 	ret = PINT_Process_request(q_item->parent->file_req_state,
-	    q_item->parent->mem_req_state,
-	    &q_item->parent->file_data,
-	    &result_tmp->result,
-	    PINT_SERVER);
+				   q_item->parent->mem_req_state,
+				   &q_item->parent->file_data,
+				   &result_tmp->result,
+				   PINT_SERVER);
 	/* TODO: error handling */ 
 	assert(ret >= 0);
 	
@@ -800,30 +809,32 @@ static int bmi_send_callback_fn(void *us
 	tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
 	bytes_processed += old_result_tmp->result.bytes;
 	q_item->buffer_used += old_result_tmp->result.bytes;
-    }while(bytes_processed < BUFFER_SIZE && 
-	!PINT_REQUEST_DONE(q_item->parent->file_req_state));
+    } while(bytes_processed < BUFFER_SIZE && 
+	    !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
     flow_data->total_bytes_processed += bytes_processed;
-    if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+    if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
     {
 	q_item->last = 1;
 	/* special case, we never have a "last" operation when there
 	 * is no work to do, trigger manually
 	 */
-	if(flow_data->total_bytes_processed == 0)
+	if (flow_data->total_bytes_processed == 0)
+	{
 	    flow_data->dest_last_posted = 1;
+	}
     }
 
-    if(bytes_processed == 0)
+    if (bytes_processed == 0)
     {	
 	gen_mutex_unlock(&flow_data->flow_mutex);
-	return(0);
+	return 0;
     }
 
     assert(q_item->buffer_used);
 
     result_tmp = &q_item->result_chain;
-    do{
+    do {
 	assert(q_item->buffer_used);
 	assert(result_tmp->result.bytes);
 	result_tmp->q_item = q_item;
@@ -846,33 +857,33 @@ static int bmi_send_callback_fn(void *us
 	    &result_tmp->posted_id);
 	result_tmp = result_tmp->next;
 
-	if(ret < 0)
+	if (ret < 0)
 	{
 	    handle_io_error(ret, q_item, flow_data);
 	    if(flow_data->parent->state == FLOW_COMPLETE)
 	    {
 		gen_mutex_unlock(&flow_data->flow_mutex);
 		FLOW_CLEANUP(flow_data);
-		return(1);
+		return 1;
 	    }
 	    else
 	    {
 		gen_mutex_unlock(&flow_data->flow_mutex);
-		return(0);
+		return 0;
 	    }
 	}
 
-	if(ret == 1)
+	if (ret == 1)
 	{
 	    /* immediate completion; trigger callback ourselves */
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    trove_read_callback_fn(tmp_user_ptr, 0);
 	    gen_mutex_lock(&flow_data->flow_mutex);
 	}
-    }while(result_tmp);
+    } while(result_tmp);
 
     gen_mutex_unlock(&flow_data->flow_mutex);
-    return(0);
+    return 0;
 };
 
 /* trove_write_callback_fn()
@@ -882,7 +893,7 @@ static int bmi_send_callback_fn(void *us
  * no return value
  */
 static void trove_write_callback_fn(void *user_ptr,
-		           PVFS_error error_code)
+				    PVFS_error error_code)
 {
     PVFS_size tmp_actual_size;
     int ret;
@@ -896,18 +907,18 @@ static void trove_write_callback_fn(void
     gen_mutex_lock(&flow_data->flow_mutex);
 
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-	"flowproto-multiqueue trove_write_callback_fn.\n");
+		 "flowproto-multiqueue trove_write_callback_fn.\n");
 
     result_tmp->posted_id = 0;
 
-    if(error_code != 0 || flow_data->parent->error_code != 0)
+    if (error_code != 0 || flow_data->parent->error_code != 0)
     {
 	handle_io_error(error_code, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
     }
 
     /* don't do anything until the last write completes */
-    if(q_item->result_chain_count > 1)
+    if (q_item->result_chain_count > 1)
     {
 	q_item->result_chain_count--;
 	gen_mutex_unlock(&flow_data->flow_mutex);
@@ -915,15 +926,17 @@ static void trove_write_callback_fn(void
     }
 
     result_tmp = &q_item->result_chain;
-    do{
+    do {
 	q_item->parent->total_transfered += result_tmp->result.bytes;
 	PINT_perf_count(PINT_PERF_WRITE, result_tmp->result.bytes, 
 	    PINT_PERF_ADD);
 	old_result_tmp = result_tmp;
 	result_tmp = result_tmp->next;
-	if(old_result_tmp != &q_item->result_chain)
+	if (old_result_tmp != &q_item->result_chain)
+	{
 	    free(old_result_tmp);
-    }while(result_tmp);
+	}
+    } while(result_tmp);
     q_item->result_chain.next = NULL;
     q_item->result_chain_count = 0;
 
@@ -939,13 +952,13 @@ static void trove_write_callback_fn(void
     }
 
     /* if there are no more receives to post, just return */
-    if(PINT_REQUEST_DONE(flow_data->parent->file_req_state))
+    if (PINT_REQUEST_DONE(flow_data->parent->file_req_state))
     {
 	gen_mutex_unlock(&flow_data->flow_mutex);
 	return;
     }
 
-    if(q_item->buffer)
+    if (q_item->buffer)
     {
 	/* if this q_item has been used before, remove it from its 
 	 * current queue */
@@ -964,7 +977,7 @@ static void trove_write_callback_fn(void
     /* if src list is empty, then post new recv; otherwise just queue
      * in empty list
      */
-    if(qlist_empty(&flow_data->src_list))
+    if (qlist_empty(&flow_data->src_list))
     {
 	/* ready to post new recv! */
 	qlist_add_tail(&q_item->list_link, &flow_data->src_list);
@@ -972,13 +985,13 @@ static void trove_write_callback_fn(void
 	result_tmp = &q_item->result_chain;
 	old_result_tmp = result_tmp;
 	tmp_buffer = q_item->buffer;
-	do{
+	do {
 	    q_item->result_chain_count++;
-	    if(!result_tmp)
+	    if (!result_tmp)
 	    {
-		result_tmp = (struct result_chain_entry*)malloc(
-		    sizeof(struct result_chain_entry));
-		assert(result_tmp);
+		result_tmp = (struct result_chain_entry *)
+		    malloc(sizeof(struct result_chain_entry));
+		assert(result_tmp != NULL);
 		memset(result_tmp, 0 , sizeof(struct result_chain_entry));
 		old_result_tmp->next = result_tmp;
 	    }
@@ -993,10 +1006,10 @@ static void trove_write_callback_fn(void
 	    result_tmp->result.segs = 0;
 	    result_tmp->buffer_offset = tmp_buffer;
 	    ret = PINT_Process_request(q_item->parent->file_req_state,
-		q_item->parent->mem_req_state,
-		&q_item->parent->file_data,
-		&result_tmp->result,
-		PINT_SERVER);
+				       q_item->parent->mem_req_state,
+				       &q_item->parent->file_data,
+				       &result_tmp->result,
+				       PINT_SERVER);
 	    /* TODO: error handling */ 
 	    assert(ret >= 0);
 	    
@@ -1004,12 +1017,12 @@ static void trove_write_callback_fn(void
 	    result_tmp = result_tmp->next;
 	    tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
 	    bytes_processed += old_result_tmp->result.bytes;
-	}while(bytes_processed < BUFFER_SIZE && 
-	    !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+	} while(bytes_processed < BUFFER_SIZE && 
+		!PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
 	flow_data->total_bytes_processed += bytes_processed;
 
-	if(bytes_processed == 0)
+	if (bytes_processed == 0)
 	{	
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    return;
@@ -1017,22 +1030,22 @@ static void trove_write_callback_fn(void
 
 	/* TODO: what if we recv less than expected? */
 	ret = BMI_post_recv(&q_item->posted_id,
-	    q_item->parent->src.u.bmi.address,
-	    q_item->buffer,
-	    BUFFER_SIZE,
-	    &tmp_actual_size,
-	    BMI_PRE_ALLOC,
-	    q_item->parent->tag,
-	    &q_item->bmi_callback,
-	    global_bmi_context);
+			    q_item->parent->src.u.bmi.address,
+			    q_item->buffer,
+			    BUFFER_SIZE,
+			    &tmp_actual_size,
+			    BMI_PRE_ALLOC,
+			    q_item->parent->tag,
+			    &q_item->bmi_callback,
+			    global_bmi_context);
 	
-	if(ret < 0)
+	if (ret < 0)
 	{
 	    handle_io_error(ret, q_item, flow_data);
 	    ERROR_CLEANUP(flow_data);
 	}
 
-	if(ret == 1)
+	if (ret == 1)
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    /* immediate completion; trigger callback ourselves */
@@ -1043,7 +1056,7 @@ static void trove_write_callback_fn(void
     else
     {
 	qlist_add_tail(&q_item->list_link, 
-	    &(flow_data->empty_list));
+		       &(flow_data->empty_list));
     }
 
     gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1063,68 +1076,77 @@ static void cleanup_buffers(struct fp_pr
     struct result_chain_entry* result_tmp;
     struct result_chain_entry* old_result_tmp;
 
-    if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+    if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
 	flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
     {
-	for(i=0; i<BUFFERS_PER_FLOW; i++)
+	for (i=0; i < BUFFERS_PER_FLOW; i++)
 	{
-	    if(flow_data->prealloc_array[i].buffer)
+	    if (flow_data->prealloc_array[i].buffer)
 	    {
 		BMI_memfree(flow_data->parent->src.u.bmi.address,
-		    flow_data->prealloc_array[i].buffer,
-		    BUFFER_SIZE,
-		    BMI_RECV);
+			    flow_data->prealloc_array[i].buffer,
+			    BUFFER_SIZE,
+			    BMI_RECV);
 	    }
 	    result_tmp = &(flow_data->prealloc_array[i].result_chain);
-	    do{
+	    do {
 		old_result_tmp = result_tmp;
 		result_tmp = result_tmp->next;
-		if(old_result_tmp !=
+
+		if (old_result_tmp !=
 		    &(flow_data->prealloc_array[i].result_chain))
+		{
 		    free(old_result_tmp);
-	    }while(result_tmp);
+		}
+	    } while(result_tmp);
 	    flow_data->prealloc_array[i].result_chain.next = NULL;
 	}
     }
-    else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
-	flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+    else if (flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
+	     flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-	for(i=0; i<BUFFERS_PER_FLOW; i++)
+	for (i=0; i < BUFFERS_PER_FLOW; i++)
 	{
-	    if(flow_data->prealloc_array[i].buffer)
+	    if (flow_data->prealloc_array[i].buffer)
 	    {
 		BMI_memfree(flow_data->parent->dest.u.bmi.address,
-		    flow_data->prealloc_array[i].buffer,
-		    BUFFER_SIZE,
-		    BMI_SEND);
+			    flow_data->prealloc_array[i].buffer,
+			    BUFFER_SIZE,
+			    BMI_SEND);
 	    }
 	    result_tmp = &(flow_data->prealloc_array[i].result_chain);
-	    do{
+	    do {
 		old_result_tmp = result_tmp;
 		result_tmp = result_tmp->next;
 		if(old_result_tmp !=
-		    &(flow_data->prealloc_array[i].result_chain))
+		   &(flow_data->prealloc_array[i].result_chain))
+		{
 		    free(old_result_tmp);
-	    }while(result_tmp);
+		}
+	    } while(result_tmp);
 	    flow_data->prealloc_array[i].result_chain.next = NULL;
 	}
     }
-    else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
-	flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+    else if (flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
+	     flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-	if(flow_data->intermediate)
+	if (flow_data->intermediate)
 	{
 	    BMI_memfree(flow_data->parent->dest.u.bmi.address,
-		flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
+			flow_data->intermediate,
+			BUFFER_SIZE,
+			BMI_SEND);
 	}
     }
-    else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
-	flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
+    else if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+	     flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
     {
-	if(flow_data->intermediate)
+	if (flow_data->intermediate)
 	{
 	    BMI_memfree(flow_data->parent->src.u.bmi.address,
-		flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
+			flow_data->intermediate,
+			BUFFER_SIZE,
+			BMI_RECV);
 	}
     }
 
@@ -1139,8 +1161,8 @@ static void cleanup_buffers(struct fp_pr
  * no return value
  */
 static void mem_to_bmi_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				   PVFS_size actual_size,
+				   PVFS_error error_code)
 {
     struct fp_queue_item* q_item = user_ptr;
     int ret;
@@ -1153,11 +1175,11 @@ static void mem_to_bmi_callback_fn(void 
     gen_mutex_lock(&flow_data->flow_mutex);
     
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-	"flowproto-multiqueue mem_to_bmi_callback_fn.\n");
+		 "flowproto-multiqueue mem_to_bmi_callback_fn.\n");
 
     q_item->posted_id = 0;
 
-    if(error_code != 0 || flow_data->parent->error_code != 0)
+    if (error_code != 0 || flow_data->parent->error_code != 0)
     {
 	handle_io_error(error_code, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
@@ -1166,7 +1188,7 @@ static void mem_to_bmi_callback_fn(void 
     flow_data->parent->total_transfered += actual_size;
 
     /* are we done? */
-    if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+    if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
     {
 	q_item->parent->state = FLOW_COMPLETE;
 	gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1185,20 +1207,20 @@ static void mem_to_bmi_callback_fn(void 
     q_item->result_chain.result.segs = 0;
     q_item->result_chain.buffer_offset = NULL;
     ret = PINT_Process_request(q_item->parent->file_req_state,
-	q_item->parent->mem_req_state,
-	&q_item->parent->file_data,
-	&q_item->result_chain.result,
-	PINT_CLIENT);
-
+			       q_item->parent->mem_req_state,
+			       &q_item->parent->file_data,
+			       &q_item->result_chain.result,
+			       PINT_CLIENT);
+    
     /* TODO: error handling */ 
     assert(ret >= 0);
 
     /* was MAX_REGIONS enough to satisfy this step? */
-    if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
+    if (!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
 	q_item->result_chain.result.bytes < BUFFER_SIZE)
     {
 	/* create an intermediate buffer */
-	if(!flow_data->intermediate)
+	if (!flow_data->intermediate)
 	{
 	    flow_data->intermediate = BMI_memalloc(
 		flow_data->parent->dest.u.bmi.address,
@@ -1208,10 +1230,10 @@ static void mem_to_bmi_callback_fn(void 
 	}
 
 	/* copy what we have so far into intermediate buffer */
-	for(i=0; i<q_item->result_chain.result.segs; i++)
+	for (i=0; i < q_item->result_chain.result.segs; i++)
 	{
 	    src_ptr = ((char*)q_item->parent->src.u.mem.buffer + 
-		q_item->result_chain.offset_list[i]);
+		       q_item->result_chain.offset_list[i]);
 	    dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
 	    memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
 	    bytes_processed += q_item->result_chain.size_list[i];
@@ -1234,16 +1256,16 @@ static void mem_to_bmi_callback_fn(void 
 	    assert(ret >= 0);
 
 	    /* copy what we have so far into intermediate buffer */
-	    for(i=0; i<q_item->result_chain.result.segs; i++)
+	    for(i=0; i < q_item->result_chain.result.segs; i++)
 	    {
 		src_ptr = ((char*)q_item->parent->src.u.mem.buffer + 
-		    q_item->result_chain.offset_list[i]);
+			   q_item->result_chain.offset_list[i]);
 		dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
 		memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
 		bytes_processed += q_item->result_chain.size_list[i];
 	    }
-	}while(bytes_processed < BUFFER_SIZE &&
-	    !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+	} while(bytes_processed < BUFFER_SIZE &&
+		!PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
 	/* setup for BMI operation */
 	flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1264,7 +1286,7 @@ static void mem_to_bmi_callback_fn(void 
 	}
 
 	/* convert offsets to memory addresses */
-	for(i=0; i<q_item->result_chain.result.segs; i++)
+	for(i=0; i < q_item->result_chain.result.segs; i++)
 	{
 	    flow_data->tmp_buffer_list[i] = 
 		(void*)(q_item->result_chain.result.offset_array[i] +
@@ -1273,17 +1295,17 @@ static void mem_to_bmi_callback_fn(void 
     }
 
     ret = BMI_post_send_list(&q_item->posted_id,
-	q_item->parent->dest.u.bmi.address,
-	(const void**)flow_data->tmp_buffer_list,
-	q_item->result_chain.result.size_array,
-	q_item->result_chain.result.segs,
-	q_item->result_chain.result.bytes,
-	buffer_type,
-	q_item->parent->tag,
-	&q_item->bmi_callback,
-	global_bmi_context);
+			     q_item->parent->dest.u.bmi.address,
+			     (const void**)flow_data->tmp_buffer_list,
+			     q_item->result_chain.result.size_array,
+			     q_item->result_chain.result.segs,
+			     q_item->result_chain.result.bytes,
+			     buffer_type,
+			     q_item->parent->tag,
+			     &q_item->bmi_callback,
+			     global_bmi_context);
 
-    if(ret < 0)
+    if (ret < 0)
     {
 	handle_io_error(ret, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
@@ -1291,10 +1313,11 @@ static void mem_to_bmi_callback_fn(void 
 
     gen_mutex_unlock(&flow_data->flow_mutex);
 
-    if(ret == 1)
+    if (ret == 1)
     {
 	mem_to_bmi_callback_fn(q_item, 
-	    q_item->result_chain.result.bytes, 0);
+			       q_item->result_chain.result.bytes,
+			       0);
     }
 
     return;
@@ -1309,8 +1332,8 @@ static void mem_to_bmi_callback_fn(void 
  * no return value
  */
 static void bmi_to_mem_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				   PVFS_size actual_size,
+				   PVFS_error error_code)
 {
     struct fp_queue_item* q_item = user_ptr;
     int ret;
@@ -1332,7 +1355,7 @@ static void bmi_to_mem_callback_fn(void 
 
     q_item->posted_id = 0;
 
-    if(error_code != 0 || flow_data->parent->error_code != 0)
+    if (error_code != 0 || flow_data->parent->error_code != 0)
     {
 	handle_io_error(error_code, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
@@ -1342,11 +1365,11 @@ static void bmi_to_mem_callback_fn(void 
 
     /* if this is the result of a receive into an intermediate buffer,
      * then we must copy out */
-    if(flow_data->tmp_buffer_list[0] == flow_data->intermediate &&
+    if (flow_data->tmp_buffer_list[0] == flow_data->intermediate &&
 	flow_data->intermediate != NULL)
     {
 	/* copy out what we have so far */
-	for(i=0; i<q_item->result_chain.result.segs; i++)
+	for (i=0; i < q_item->result_chain.result.segs; i++)
 	{
 	    region_size = q_item->result_chain.size_list[i];
 	    src_ptr = (char*)(flow_data->intermediate + 
@@ -1366,14 +1389,14 @@ static void bmi_to_mem_callback_fn(void 
 	    q_item->result_chain.buffer_offset = NULL;
 	    /* process ahead */
 	    ret = PINT_Process_request(q_item->parent->file_req_state,
-		q_item->parent->mem_req_state,
-		&q_item->parent->file_data,
-		&q_item->result_chain.result,
-		PINT_CLIENT);
+				       q_item->parent->mem_req_state,
+				       &q_item->parent->file_data,
+				       &q_item->result_chain.result,
+				       PINT_CLIENT);
 	    /* TODO: error handling */
 	    assert(ret >= 0);
 	    /* copy out what we have so far */
-	    for(i=0; i<q_item->result_chain.result.segs; i++)
+	    for (i=0; i<q_item->result_chain.result.segs; i++)
 	    {
 		region_size = q_item->result_chain.size_list[i];
 		src_ptr = (char*)(flow_data->intermediate + 
@@ -1383,12 +1406,12 @@ static void bmi_to_mem_callback_fn(void 
 		memcpy(dest_ptr, src_ptr, region_size);
 		bytes_processed += region_size;
 	    }
-	}while(bytes_processed < BUFFER_SIZE &&
-	    !PINT_REQUEST_DONE(q_item->parent->file_req_state));
+	} while(bytes_processed < BUFFER_SIZE &&
+		!PINT_REQUEST_DONE(q_item->parent->file_req_state));
     }
 
     /* are we done? */
-    if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+    if (PINT_REQUEST_DONE(q_item->parent->file_req_state))
     {
 	q_item->parent->state = FLOW_COMPLETE;
 	gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1415,11 +1438,11 @@ static void bmi_to_mem_callback_fn(void 
     assert(ret >= 0);
 
     /* was MAX_REGIONS enough to satisfy this step? */
-    if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
+    if (!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
 	q_item->result_chain.result.bytes < BUFFER_SIZE)
     {
 	/* create an intermediate buffer */
-	if(!flow_data->intermediate)
+	if (!flow_data->intermediate)
 	{
 	    flow_data->intermediate = BMI_memalloc(
 		flow_data->parent->src.u.bmi.address,
@@ -1444,7 +1467,7 @@ static void bmi_to_mem_callback_fn(void 
 	total_size = q_item->result_chain.result.bytes;
 
 	/* convert offsets to memory addresses */
-	for(i=0; i<q_item->result_chain.result.segs; i++)
+	for (i=0; i < q_item->result_chain.result.segs; i++)
 	{
 	    flow_data->tmp_buffer_list[i] = 
 		(void*)(q_item->result_chain.result.offset_array[i] +
@@ -1452,7 +1475,7 @@ static void bmi_to_mem_callback_fn(void 
 	}
 
 	/* go ahead and return if there is nothing to do */
-	if(q_item->result_chain.result.bytes == 0)
+	if (q_item->result_chain.result.bytes == 0)
 	{	
 	    q_item->parent->state = FLOW_COMPLETE;
 	    gen_mutex_unlock(&flow_data->flow_mutex);
@@ -1462,18 +1485,18 @@ static void bmi_to_mem_callback_fn(void 
     }
 
     ret = BMI_post_recv_list(&q_item->posted_id,
-	q_item->parent->src.u.bmi.address,
-	flow_data->tmp_buffer_list,
-	size_array,
-	segs,
-	total_size,
-	&tmp_actual_size,
-	BMI_EXT_ALLOC,
-	q_item->parent->tag,
-	&q_item->bmi_callback,
-	global_bmi_context);
-
-    if(ret < 0)
+			     q_item->parent->src.u.bmi.address,
+			     flow_data->tmp_buffer_list,
+			     size_array,
+			     segs,
+			     total_size,
+			     &tmp_actual_size,
+			     BMI_EXT_ALLOC,
+			     q_item->parent->tag,
+			     &q_item->bmi_callback,
+			     global_bmi_context);
+    
+    if (ret < 0)
     {
 	handle_io_error(ret, q_item, flow_data);
 	ERROR_CLEANUP(flow_data);
@@ -1481,7 +1504,7 @@ static void bmi_to_mem_callback_fn(void 
 
     gen_mutex_unlock(&flow_data->flow_mutex);
 
-    if(ret == 1)
+    if (ret == 1)
     {
 	bmi_to_mem_callback_fn(q_item, tmp_actual_size, 0);
     }
@@ -1499,16 +1522,17 @@ static void bmi_to_mem_callback_fn(void 
  *
  * no return value
  */
-static void handle_io_error(PVFS_error error_code, struct fp_queue_item*
-    q_item, struct fp_private_data* flow_data)
+static void handle_io_error(PVFS_error error_code,
+			    struct fp_queue_item *q_item,
+			    struct fp_private_data *flow_data)
 {
     int ret;
 
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, 
-	"flowproto-multiqueue error cleanup path.\n");
+		 "flowproto-multiqueue error cleanup path.\n");
 
     /* is this the first error registered for this particular flow? */
-    if(flow_data->parent->error_code == 0)
+    if (flow_data->parent->error_code == 0)
     {
 	gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
 	    "flowproto-multiqueue first failure.\n");
@@ -1517,7 +1541,7 @@ static void handle_io_error(PVFS_error e
 	flow_data->cleanup_pending_count = 0;
 
 	/* cleanup depending on what endpoints are in use */
-	if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+	if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
 	    flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
 	{
 	    ret = cancel_pending_bmi(&flow_data->src_list);
@@ -1525,16 +1549,16 @@ static void handle_io_error(PVFS_error e
 		"flowproto-multiqueue canceling %d BMI ops.\n", ret);
 	    flow_data->cleanup_pending_count += ret;
 	}
-	else if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
-	    flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+	else if (flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
+		 flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
 	{
 	    ret = cancel_pending_bmi(&flow_data->dest_list);
 	    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
 		"flowproto-multiqueue canceling %d BMI ops.\n", ret);
 	    flow_data->cleanup_pending_count += ret;
 	}
-	else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
-	    flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
+	else if (flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
+		 flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
 	{
 	    ret = cancel_pending_trove(&flow_data->src_list);
 	    flow_data->cleanup_pending_count += ret;
@@ -1545,7 +1569,7 @@ static void handle_io_error(PVFS_error e
 		"flowproto-multiqueue canceling %d BMI ops.\n", ret);
 	    flow_data->cleanup_pending_count += ret;
 	}
-	else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
+	else if (flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
 	    flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
 	{
 	    ret = cancel_pending_bmi(&flow_data->src_list);
@@ -1572,7 +1596,7 @@ static void handle_io_error(PVFS_error e
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "cleanup_pending_count: %d\n",
 	flow_data->cleanup_pending_count);
 
-    if(flow_data->cleanup_pending_count == 0)
+    if (flow_data->cleanup_pending_count == 0)
     {
 	/* we are finished, make sure error is marked and state is set */
 	assert(flow_data->parent->error_code);
@@ -1602,14 +1626,14 @@ static int cancel_pending_bmi(struct qli
 	q_item = qlist_entry(tmp_link, struct fp_queue_item,
 	    list_link);
 	/* skip anything that is in the queue but not actually posted */
-	if(q_item->posted_id)
+	if (q_item->posted_id)
 	{
 	    count++;
 	    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
 		"flowprotocol cleanup: unposting BMI operation.\n");
 	    ret = PINT_thread_mgr_bmi_cancel(q_item->posted_id,
 		&q_item->bmi_callback);
-	    if(ret < 0)
+	    if (ret < 0)
 	    {
 		gossip_err("WARNING: BMI thread mgr cancel failed, proceeding anyway.\n");
 	    }
@@ -1641,7 +1665,7 @@ static int cancel_pending_trove(struct q
 	    list_link);
 
 	result_tmp = &q_item->result_chain;
-	do{
+	do {
 	    old_result_tmp = result_tmp;
 	    result_tmp = result_tmp->next;
 
@@ -1651,17 +1675,16 @@ static int cancel_pending_trove(struct q
 		ret = PINT_thread_mgr_trove_cancel(old_result_tmp->posted_id,
 		    q_item->parent->src.u.trove.coll_id,
 		    &old_result_tmp->trove_callback);
-		if(ret < 0)
+		if (ret < 0)
 		{
 		    gossip_err("WARNING: Trove thread mgr cancel failed, proceeding anyway.\n");
 		}
 	    }
-	}while(result_tmp);
+	} while(result_tmp);
     }
 
-    return (count);
+    return count;
 }
-
 
 /*
  * Local variables:



More information about the PVFS2-CVS mailing list