[PVFS2-CVS] commit by rbross in pvfs2/src/io/flow/flowproto-bmi-cache: flowproto-bmi-cache-server.c

CVS commit program cvs at parl.clemson.edu
Mon Feb 16 16:22:42 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-cache
In directory parlweb:/tmp/cvs-serv32348/src/io/flow/flowproto-bmi-cache

Modified Files:
	flowproto-bmi-cache-server.c 
Log Message:
combo PVFS_id_gen_t -> PVFS_BMI_addr_t, formatting, PVFS error code patch.
Ugly.  Sorry if my formatting pisses someone off; at least I'm not using >
80 columns any more :).


Index: flowproto-bmi-cache-server.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-cache/flowproto-bmi-cache-server.c,v
diff -p -u -r1.4 -r1.5
--- flowproto-bmi-cache-server.c	3 Nov 2003 14:11:48 -0000	1.4
+++ flowproto-bmi-cache-server.c	16 Feb 2004 21:22:42 -0000	1.5
@@ -59,23 +59,23 @@ struct pint_req_entry
 
 struct cache_req_entry
 {
-	cache_request_t request;  /* cache request handle */
-	int 	errval;		/* error code */
-	int mem_cnt;			/* how many buffers */
+    cache_request_t request;  /* cache request handle */
+    int 	errval;	      /* error code */
+    int mem_cnt;	      /* how many buffers */
 
-	/* buffer size array, array space provided by the cache */
-	PVFS_size *msize_list; 
+    /* buffer size array, array space provided by the cache */
+    PVFS_size *msize_list; 
 
-	/* "total_size" is the sum of the size list */
-	PVFS_size total_size; 
+    /* "total_size" is the sum of the size list */
+    PVFS_size total_size; 
 
-	/* buffer offset array, provided by the cache */
-	PVFS_offset **moff_list; 
+    /* buffer offset array, provided by the cache */
+    PVFS_offset **moff_list; 
 
-	/* if this is not NULL, this buffer is supplied by the flow */
-	PVFS_offset *buffer;
+    /* if this is not NULL, this buffer is supplied by the flow */
+    PVFS_offset *buffer;
 
-	enum bmi_buffer_type buffer_type;
+    enum bmi_buffer_type buffer_type;
 };
 
 
@@ -83,22 +83,22 @@ struct cache_req_entry
  * A request --> a flow --> a list of fp_queue_items */
 struct fp_queue_item
 {
-	/* point to the flow descriptor */
-	flow_descriptor* 	parent;	
+    /* point to the flow descriptor */
+    flow_descriptor* 	parent;	
 
-	/* PINT request information */
-	struct pint_req_entry 	pint_req; 
+    /* PINT request information */
+    struct pint_req_entry 	pint_req; 
 
-	/* cache request information */
-	struct cache_req_entry 	cache_req; 
-	int int_state;
-
-	/* flag to show whether the callbacks are set up */
-	int 	callback_setup;
-	struct PINT_thread_mgr_bmi_callback bmi_callback;
-	struct PINT_thread_mgr_trove_callback cache_callback;
+    /* cache request information */
+    struct cache_req_entry 	cache_req; 
+    int int_state;
+
+    /* flag to show whether the callbacks are set up */
+    int 	callback_setup;
+    struct PINT_thread_mgr_bmi_callback bmi_callback;
+    struct PINT_thread_mgr_trove_callback cache_callback;
 
-	struct qlist_head list_link;
+    struct qlist_head list_link;
 };
 
 /* fp_private_data is information specific to this flow protocol, stored
@@ -106,22 +106,22 @@ struct fp_queue_item
  */
 struct fp_private_data
 {
-	/* point to the flow descriptor */
-	flow_descriptor* parent;       
+    /* point to the flow descriptor */
+    flow_descriptor* parent;       
 
-	/* PINT request done? */
-	int pint_request_done;	
+    /* PINT request done? */
+    int pint_request_done;	
 	
-	/* PINT request list */
-	struct qlist_head pint_req_list;
+    /* PINT request list */
+    struct qlist_head pint_req_list;
 
-	/* requests completed on the cache side */ 
-	struct qlist_head cache_req_done_list;  
+    /* requests completed on the cache side */ 
+    struct qlist_head cache_req_done_list;  
 
-	PVFS_size total_bytes_processed;
-	TROVE_context_id trove_context;
+    PVFS_size total_bytes_processed;
+    TROVE_context_id trove_context;
 
-	gen_mutex_t flow_mutex;
+    gen_mutex_t flow_mutex;
 };
 
 #define PRIVATE_FLOW(target_flow)\
@@ -134,12 +134,12 @@ static TROVE_context_id global_trove_con
 
 
 static void bmi_recv_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code);
+				 PVFS_size actual_size,
+				 PVFS_error error_code);
 
 static void bmi_send_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code);
+				 PVFS_size actual_size,
+				 PVFS_error error_code);
 
 #if 0
 /* the above function is a special case; we need to look at a return
@@ -147,8 +147,8 @@ static void bmi_send_callback_fn(void *u
  * to trigger it from a callback
  */
 static void bmi_send_callback_wrapper(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				      PVFS_size actual_size,
+				      PVFS_error error_code)
 {
     bmi_send_callback_fn(user_ptr, actual_size, error_code);
     return;
@@ -156,19 +156,19 @@ static void bmi_send_callback_wrapper(vo
 #endif
 
 static void cache_read_callback_fn(void *user_ptr,
-		           PVFS_error error_code);
+				   PVFS_error error_code);
 
 static void cache_write_callback_fn(void *user_ptr,
-		           PVFS_error error_code);
+				    PVFS_error error_code);
 
 /* protocol specific functions */
 static int  bmi_cache_request_init(struct fp_private_data *flow_data, 
-			int direction);
+				   int direction);
 
 static int  bmi_cache_progress_check(struct flow_descriptor *flow_d, 
-			int wait_flag, 
-			int mutex_flag,
-			int direction);
+				     int wait_flag, 
+				     int mutex_flag,
+				     int direction);
 
 
 static int bmi_cache_check_cache_req(struct fp_queue_item *q_item); 
@@ -182,12 +182,12 @@ static int fp_bmi_cache_initialize(int f
 static int fp_bmi_cache_finalize(void);
 
 static int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter);
+				int option,
+				void *parameter);
 
 static int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter);
+				int option,
+				void *parameter);
 
 static int fp_bmi_cache_post(flow_descriptor * flow_d);
 
@@ -230,7 +230,7 @@ int fp_bmi_cache_initialize(int flowprot
 
     fp_bmi_cache_id = flowproto_id;
 
-    return(0);
+    return 0;
 }
 
 /* fp_bmi_cache_finalize()
@@ -245,7 +245,7 @@ int fp_bmi_cache_finalize(void)
 #ifdef __PVFS2_TROVE_SUPPORT__
     PINT_thread_mgr_trove_stop();
 #endif
-    return (0);
+    return 0;
 }
 
 /* fp_bmi_cache_getinfo()
@@ -255,21 +255,21 @@ int fp_bmi_cache_finalize(void)
  * returns 0 on success, -PVFS_error on failure
  */
 int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter)
+			 int option,
+			 void *parameter)
 {
     int* type;
 
-    switch(option)
+    switch (option)
     {
 	case FLOWPROTO_TYPE_QUERY:
 	    type = parameter;
 	    if(*type == FLOWPROTO_MULTIQUEUE)
 		return(0);
 	    else
-		return(-PVFS_ENOPROTOOPT);
+		return -PVFS_ENOPROTOOPT;
 	default:
-	    return(-PVFS_ENOSYS);
+	    return -PVFS_ENOSYS;
 	    break;
     }
 }
@@ -281,10 +281,10 @@ int fp_bmi_cache_getinfo(flow_descriptor
  * returns 0 on success, -PVFS_error on failure
  */
 int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
-			       int option,
-			       void *parameter)
+			 int option,
+			 void *parameter)
 {
-    return (-PVFS_ENOSYS);
+    return -PVFS_ENOSYS;
 }
 
 /* fp_bmi_cache_post()
@@ -295,143 +295,149 @@ int fp_bmi_cache_setinfo(flow_descriptor
  */
 int fp_bmi_cache_post(flow_descriptor * flow_d)
 {
-	struct fp_private_data* flow_data = NULL;
-	int ret;
+    struct fp_private_data* flow_data = NULL;
+    int ret;
 
-	/* on the server side: only two possible types */
-	assert( (flow_d->src.endpoint_id == BMI_ENDPOINT && 
-		flow_d->dest.endpoint_id == CACHE_ENDPOINT) ||
-		(flow_d->src.endpoint_id == CACHE_ENDPOINT &&
-		flow_d->dest.endpoint_id == BMI_ENDPOINT) );
-
-	flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
-	if(!flow_data)
-		return(-PVFS_ENOMEM);
-	memset(flow_data, 0, sizeof(struct fp_private_data));
-
-	flow_d->flow_protocol_data = flow_data;
-	flow_d->state = FLOW_TRANSMITTING;
-
-	/* protocol specific fields */
-	flow_data->parent = flow_d;
-	INIT_QLIST_HEAD(&flow_data->pint_req_list);
-	flow_data->pint_request_done = 0;
-	INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
+    /* on the server side: only two possible types */
+    assert( (flow_d->src.endpoint_id == BMI_ENDPOINT && 
+	     flow_d->dest.endpoint_id == CACHE_ENDPOINT) ||
+	    (flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+	     flow_d->dest.endpoint_id == BMI_ENDPOINT) );
 
-	flow_data->trove_context = global_trove_context;
-	gen_mutex_init(&flow_data->flow_mutex);
+    flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
+    if(flow_data == NULL)
+    {
+	return -PVFS_ENOMEM;
+    }
+    memset(flow_data, 0, sizeof(struct fp_private_data));
 
-	/* if a file datatype offset was specified, go ahead and skip ahead 
- 	 * before doing anything else
-	 */
-	if(flow_d->file_req_offset)
-		PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state, 
-					flow_d->file_req_offset);
+    flow_d->flow_protocol_data = flow_data;
+    flow_d->state = FLOW_TRANSMITTING;
 
-	/* set boundaries on file datatype */
-	if(flow_d->aggregate_size > -1)
-	{
-		PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state, 
-			flow_d->aggregate_size+flow_d->file_req_offset);
-	}
-	else
+    /* protocol specific fields */
+    flow_data->parent = flow_d;
+    INIT_QLIST_HEAD(&flow_data->pint_req_list);
+    flow_data->pint_request_done = 0;
+    INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
+
+    flow_data->trove_context = global_trove_context;
+    gen_mutex_init(&flow_data->flow_mutex);
+
+    /* if a file datatype offset was specified, go ahead and skip ahead 
+     * before doing anything else
+     */
+    if (flow_d->file_req_offset)
+    {
+	PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state, 
+				      flow_d->file_req_offset);
+    }
+
+    /* set boundaries on file datatype */
+    if(flow_d->aggregate_size > -1)
+    {
+	PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state, 
+				     flow_d->aggregate_size + 
+				     flow_d->file_req_offset);
+    }
+    else
+    {
+	PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
+				     flow_d->file_req_offset +
+				     PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
+    }
+
+    /* (1) init requests; (2) check progress; later all progress checks 
+     * are driven by callbacks.
+     */
+
+    if(flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+       flow_d->dest.endpoint_id == BMI_ENDPOINT)
+    {
+	/* CACHE --> BMI flow: read from cache, then send to the 
+	 * network. When the cache supports callback, the cache 
+	 * callback triggers BMI operations; the BMI callback triggers 
+	 * the release of cache resources. 
+	 * When the cache does not support callback, as in the current
+	 * implementation, we use the following progress method:
+	 * (1) wait for the completion of the first cache request;
+	 * (2) initiate the BMI send operation;
+	 * (3) in the BMI send callback function, we wait for the 
+	 *     completion of the next cache request;  
+	 * (4) goto step (2).
+	 */ 
+
+	ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
+	if (ret < 0)
 	{
-		PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
-					flow_d->file_req_offset +
-			PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
+	    PVFS_perror_gossip("bmi_cache_request_init error: "
+			       "error_code", ret);
+	    return ret;
 	}
 
-	/* (1) init requests; (2) check progress; later all progress checks 
-	 * are driven by callbacks.
+	/* check progress. MUTEX_FLAG is needed because possible
+	 * callbacks may happen at the same time.
 	 */
-
-	if( flow_d->src.endpoint_id == CACHE_ENDPOINT &&
-		flow_d->dest.endpoint_id == BMI_ENDPOINT )
+	ret = bmi_cache_progress_check( flow_d, 
+					BLOCKING_FLAG,
+					MUTEX_FLAG,
+					CACHE_TO_BMI );
+	if (ret < 0)
 	{
-		/* CACHE --> BMI flow: read from cache, then send to the 
-		 * network. When the cache supports callback, the cache 
-		 * callback triggers BMI operations; the BMI callback triggers 
-		 * the release of cache resources. 
-		 * When the cache does not support callback, as in the current
-		 * implementation, we use the following progress method:
-		 * (1) wait for the completion of the first cache request;
-		 * (2) initiate the BMI send operation;
-		 * (3) in the BMI send callback function, we wait for the 
-		 *     completion of the next cache request;  
-		 * (4) goto step (2).
-		 */ 
-
-		ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
-		if ( ret < 0 ) {
-			PVFS_perror_gossip("bmi_cache_request_init error: "
-                                           "error_code", ret);
-			return ret;
-		}
+	    PVFS_perror_gossip("bmi_cache_progress_check error: "
+			       "error_code", ret);
+	    return ret;
+	}
 
-		/* check progress. MUTEX_FLAG is needed because possible
-		 * callbacks may happen at the same time.
-		 */
-		ret = bmi_cache_progress_check( flow_d, 
-						BLOCKING_FLAG,
-						MUTEX_FLAG,
-						CACHE_TO_BMI );
-		if ( ret < 0 )
-		{
-			PVFS_perror_gossip("bmi_cache_progress_check error: "
-                                           "error_code", ret);
-			return ret;
-		}
+    }
+    else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
+	    flow_d->dest.endpoint_id == CACHE_ENDPOINT)
+    {
+	/* BMI--->CACHE flow: (1) init requests; (2) check progress;
+	 * later all progress checks are driven by callbacks;
+	 *   cache callbacks to indicate that data buffers are 
+	 *   available;
+	 *	then trigger BMI receive operations to receive data 
+	 * 	into the cache buffers;
+	 *	the BMI call back function will trigger releasing 
+	 *	cache buffers.
+	 * In the current implementation, cache does not provide 
+	 * callback (we will add it later), the progress of cache 
+	 * is checked in the BMI call back function. That means, in 
+	 * the BMI call back function, we release cache buffers for 
+	 * the previous queue item; then we see if the cache buffers 
+	 * are available for another queue item (polling with time 
+	 * idle), if yes, initiate BMI receive operations.
+	 */ 
 
+	ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
+	if ( ret < 0 ) {
+	    PVFS_perror_gossip("bmi_cache_request_init error: "
+			       "error_code", ret);
+	    return ret;
 	}
-	else if( flow_d->src.endpoint_id == BMI_ENDPOINT &&
-			 flow_d->dest.endpoint_id == CACHE_ENDPOINT )
-	{
-		/* BMI--->CACHE flow: (1) init requests; (2) check progress;
-		 * later all progress checks are driven by callbacks;
-		 *   cache callbacks to indicate that data buffers are 
-		 *   available;
-		 *	then trigger BMI receive operations to receive data 
-		 * 	into the cache buffers;
-		 *	the BMI call back function will trigger releasing 
-		 *	cache buffers.
-		 * In the current implementation, cache does not provide 
-		 * callback (we will add it later), the progress of cache 
-		 * is checked in the BMI call back function. That means, in 
-		 * the BMI call back function, we release cache buffers for 
-		 * the previous queue item; then we see if the cache buffers 
-		 * are available for another queue item (polling with time 
-		 * idle), if yes, initiate BMI receive operations.
-		 */ 
-
-		ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
-		if ( ret < 0 ) {
-			PVFS_perror_gossip("bmi_cache_request_init error: "
-                                           "error_code", ret);
-			return ret;
-		}
-
-		/* check progress. MUTEX_FLAG is needed because possible
-		 * callbacks may happen at the same time.
-		 */
 
-		ret = bmi_cache_progress_check(flow_d, 
-					BLOCKING_FLAG, 
-					MUTEX_FLAG,
-					BMI_TO_CACHE);
-		if ( ret < 0 )
-		{
-			PVFS_perror_gossip("bmi_cache_progress_check error: "
-                                           "error_code", ret);
-			return ret;
-		}
+	/* check progress. MUTEX_FLAG is needed because possible
+	 * callbacks may happen at the same time.
+	 */
 
-	}
-	else
+	ret = bmi_cache_progress_check(flow_d, 
+				       BLOCKING_FLAG, 
+				       MUTEX_FLAG,
+				       BMI_TO_CACHE);
+	if ( ret < 0 )
 	{
-		return(-ENOSYS);
+	    PVFS_perror_gossip("bmi_cache_progress_check error: "
+			       "error_code", ret);
+	    return ret;
 	}
 
-	return (0);
+    }
+    else
+    {
+	return -PVFS_ENOSYS;
+    }
+
+    return (0);
 }
 
 
@@ -444,91 +450,92 @@ int fp_bmi_cache_post(flow_descriptor * 
  * return: 0: success;  <0 error; 1: complete;
  */ 
 
-int  bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
+int bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
 {
-	struct flow_descriptor *flow_d = flow_data->parent;
-	struct fp_queue_item *new_qitem = NULL;
-	struct pint_req_entry* pint_req = NULL;
-	PVFS_size bytes_processed = 0;
-
-	int ret;
-
-	/* zero request. This is wrong, TO DO it later. */
-	if ( flow_d->total_transfered >= flow_data->total_bytes_processed ) {
-		flow_d->state = FLOW_COMPLETE;
-		free(flow_data);
-		flow_d->release(flow_d);
-		flow_d->callback(flow_d);
-		return(1);
-	}
+    struct flow_descriptor *flow_d = flow_data->parent;
+    struct fp_queue_item *new_qitem = NULL;
+    struct pint_req_entry* pint_req = NULL;
+    PVFS_size bytes_processed = 0;
 
-	/* get request information before launching cache request.
-	 * Basically, we need to know what the request is, including
-	 * file regions, each region offset, and each region length. 
-	 * A pipelining idea is included in the following steps:
-	 *    a large flow request ---> several requests with "MAX_REGIONS"
-	 *	   and "BUFFER_SIZE" ---> pint_req_list 
-	 * Later, we process each element in the pint_req_list:
-	 *  1) dequeue an element from the list;
-	 *	2) init cache request;
-	 *  3) drive BMI operations;
-	 *  4) enqueue the element into another list;
-	 * If all elements have been through the above steps,
-	 * the request is done.
-	 */
+    int ret;
 
-	assert ( flow_data->pint_request_done == 0 );
+    /* zero request. This is wrong, TO DO it later. */
+    if (flow_d->total_transfered >= flow_data->total_bytes_processed)
+    {
+	flow_d->state = FLOW_COMPLETE;
+	free(flow_data);
+	flow_d->release(flow_d);
+	flow_d->callback(flow_d);
+	return 1;
+    }
 
-	/* get PINT_requests and initiate related cache requests */
-	do {
-		new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
-		assert(new_qitem);
-		memset(new_qitem, 0 , sizeof(struct fp_queue_item));
+    /* get request information before launching cache request.
+     * Basically, we need to know what the request is, including
+     * file regions, each region offset, and each region length. 
+     * A pipelining idea is included in the following steps:
+     *    a large flow request ---> several requests with "MAX_REGIONS"
+     *	   and "BUFFER_SIZE" ---> pint_req_list 
+     * Later, we process each element in the pint_req_list:
+     *  1) dequeue an element from the list;
+     *	2) init cache request;
+     *  3) drive BMI operations;
+     *  4) enqueue the element into another list;
+     * If all elements have been through the above steps,
+     * the request is done.
+     */
+
+    assert ( flow_data->pint_request_done == 0 );
+
+    /* get PINT_requests and initiate related cache requests */
+    do {
+	new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
+	assert(new_qitem);
+	memset(new_qitem, 0 , sizeof(struct fp_queue_item));
 
-		new_qitem->parent = flow_d;
+	new_qitem->parent = flow_d;
 		
-		/* process request */
-		pint_req = & new_qitem->pint_req;
-		pint_req->result.offset_array = pint_req->offset_list;
-		pint_req->result.size_array = pint_req->size_list;
-		pint_req->result.bytemax = BUFFER_SIZE;
-		pint_req->result.bytes = 0;
-		pint_req->result.segmax = MAX_REGIONS;
-		pint_req->result.segs = 0;
-
-		ret = PINT_Process_request( flow_d->file_req_state,
-					flow_d->mem_req_state,
-					&flow_d->file_data,
-					&pint_req->result,
-					PINT_SERVER );
-		/* TODO: error handling */
-		assert(ret >= 0);
+	/* process request */
+	pint_req = & new_qitem->pint_req;
+	pint_req->result.offset_array = pint_req->offset_list;
+	pint_req->result.size_array = pint_req->size_list;
+	pint_req->result.bytemax = BUFFER_SIZE;
+	pint_req->result.bytes = 0;
+	pint_req->result.segmax = MAX_REGIONS;
+	pint_req->result.segs = 0;
+
+	ret = PINT_Process_request(flow_d->file_req_state,
+				   flow_d->mem_req_state,
+				   &flow_d->file_data,
+				   &pint_req->result,
+				   PINT_SERVER);
+	/* TODO: error handling */
+	assert(ret >= 0);
 
-		/* submit the cache request */
-		ret = bmi_cache_init_cache_req(new_qitem, direction);
+	/* submit the cache request */
+	ret = bmi_cache_init_cache_req(new_qitem, direction);
 
-		/* TODO: error handling */
-		assert(ret >= 0);
+	/* TODO: error handling */
+	assert(ret >= 0);
 
-		new_qitem->int_state = INT_REQ_PROCESSING;
+	new_qitem->int_state = INT_REQ_PROCESSING;
 
-		/* immediate completion on the cache request */
-		if ( ret == 1 ) 
-		{
-			new_qitem->int_state = INT_REQ_COMPLETE;
-		}
+	/* immediate completion on the cache request */
+	if (ret == 1) 
+	{
+	    new_qitem->int_state = INT_REQ_COMPLETE;
+	}
 		
-		/* put the request into the chain */
-		qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
-		bytes_processed += pint_req->result.bytes;
+	/* put the request into the chain */
+	qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
+	bytes_processed += pint_req->result.bytes;
 
-   	} while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
+    } while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
 	
-	flow_data->pint_request_done = 1;
+    flow_data->pint_request_done = 1;
 
-	flow_data->total_bytes_processed = bytes_processed;
+    flow_data->total_bytes_processed = bytes_processed;
 
-	return 0;
+    return 0;
 
 } /* end of bmi_cache_request_init() */
 
@@ -541,126 +548,129 @@ int  bmi_cache_request_init(struct fp_pr
  *	 (2) mutex_flag is off
  *	    -- this is called in other callbacks which hold mutex.
  *	wait_flag: 
-	 (1) non-blocking; (2) blocking	
+ (1) non-blocking; (2) blocking	
  *	direction:
  *	 (1) BMI_TO_CACHE; (2) CACHE_TO_BMI
  */
 
 int bmi_cache_progress_check(struct flow_descriptor *flow_d, 
-			int wait_flag, 
-			int mutex_flag,
-			int direction)
+			     int wait_flag, 
+			     int mutex_flag,
+			     int direction)
 {
-	struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
-	struct fp_queue_item *q_item = NULL;
-	struct qlist_head *tmp_link = NULL;
-	int doneflag = 0;
-	int ret;
+    struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
+    struct fp_queue_item *q_item = NULL;
+    struct qlist_head *tmp_link = NULL;
+    int doneflag = 0;
+    int ret;
 
-	assert ( flow_data->pint_request_done == 1 );
+    assert ( flow_data->pint_request_done == 1 );
 
-	if ( qlist_empty(&flow_data->pint_req_list) ) 
-		return 1; 
+    if ( qlist_empty(&flow_data->pint_req_list) ) 
+    {
+	return 1; 
+    }
 
-	doneflag = 0;
+    doneflag = 0;
 
-	if ( mutex_flag ) {
-		gen_mutex_lock(&flow_data->flow_mutex);
-	}
+    if (mutex_flag)
+    {
+	gen_mutex_lock(&flow_data->flow_mutex);
+    }
 
-check_again:
+ check_again:
 
-	qlist_for_each(tmp_link, &flow_data->pint_req_list)
+    qlist_for_each(tmp_link, &flow_data->pint_req_list)
 	{
-		q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
+	    q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
 
-		if ( q_item->int_state == INT_REQ_INIT )
-		{
-			/* wrong state */
-			PVFS_perror_gossip("bmi_cache_progress_check error: "
-                                           "wrong internal state:error code", -1);
-
-			if ( mutex_flag )
-				gen_mutex_unlock(&flow_data->flow_mutex);
+	    if ( q_item->int_state == INT_REQ_INIT )
+	    {
+		/* wrong state */
+		PVFS_perror_gossip("bmi_cache_progress_check error: "
+				   "wrong internal state:error code", -1);
+
+		if ( mutex_flag )
+		    gen_mutex_unlock(&flow_data->flow_mutex);
+
+		/* TODO: error code */
+		return -1;
+	    }
+
+	    /* internal reuqest has been issued, but in processing */
+	    if ( q_item->int_state == INT_REQ_PROCESSING ) 
+	    {
+		/* check the cache request */
+		ret = bmi_cache_check_cache_req(q_item); 
 
-			/* TODO: error code */
-			return -1;
-		}
-
-		/* internal reuqest has been issued, but in processing */
-		if ( q_item->int_state == INT_REQ_PROCESSING ) 
+		/* TODO: error handling */
+		if ( ret < 0 ) 
 		{
-			/* check the cache request */
-			ret = bmi_cache_check_cache_req(q_item); 
-
-			/* TODO: error handling */
-			if ( ret < 0 ) 
-			{
-				if ( mutex_flag ) 
-					gen_mutex_unlock(&flow_data->flow_mutex);
-				return -1;
-			}
+		    if ( mutex_flag ) 
+			gen_mutex_unlock(&flow_data->flow_mutex);
+		    return -1;
+		}
 	
-			if ( ret == 1 ) { 
-				q_item->int_state = INT_REQ_COMPLETE;
-			}
+		if ( ret == 1 ) { 
+		    q_item->int_state = INT_REQ_COMPLETE;
 		}
+	    }
+
+	    /* internal reuqest has been finished, buffers are available 
+	     * for BMI operations.
+	     */
+	    if ( q_item->int_state == INT_REQ_COMPLETE )
+	    {
+		/* move the item from pint_req_list to cache_req_done_list */
+		qlist_del(&q_item->list_link);
+		qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
+
+		if ( direction == BMI_TO_CACHE ) 
+		{
+		    if ( mutex_flag )
+			gen_mutex_unlock(&flow_data->flow_mutex);
+		    cache_write_callback_fn(q_item, 0);
 
-		/* internal reuqest has been finished, buffers are available 
-		 * for BMI operations.
-		 */
-		if ( q_item->int_state == INT_REQ_COMPLETE )
+		    if ( mutex_flag )
+			gen_mutex_lock(&flow_data->flow_mutex);
+		} else
 		{
-			/* move the item from pint_req_list to cache_req_done_list */
-			qlist_del(&q_item->list_link);
-			qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
-
-			if ( direction == BMI_TO_CACHE ) 
-			{
-				if ( mutex_flag )
-					gen_mutex_unlock(&flow_data->flow_mutex);
-				cache_write_callback_fn(q_item, 0);
-
-				if ( mutex_flag )
-					gen_mutex_lock(&flow_data->flow_mutex);
-			} else
-			{
-				if ( mutex_flag )
-					gen_mutex_unlock(&flow_data->flow_mutex);
-				cache_read_callback_fn(q_item, 0);
-
-				if ( mutex_flag )
-					gen_mutex_lock(&flow_data->flow_mutex);
-			}
-			doneflag = 1;
+		    if ( mutex_flag )
+			gen_mutex_unlock(&flow_data->flow_mutex);
+		    cache_read_callback_fn(q_item, 0);
+
+		    if ( mutex_flag )
+			gen_mutex_lock(&flow_data->flow_mutex);
 		}
+		doneflag = 1;
+	    }
 		
-		if ( !q_item->callback_setup ) 
-		{
-			q_item->bmi_callback.fn = bmi_recv_callback_fn;
-			q_item->bmi_callback.data = q_item;
+	    if ( !q_item->callback_setup ) 
+	    {
+		q_item->bmi_callback.fn = bmi_recv_callback_fn;
+		q_item->bmi_callback.data = q_item;
 #ifdef __CACHE_CALLBACK_SUPPORT__
-			q_item->cache_callback.fn = cache_write_callback_fn;
-			q_item->cache_callback.data = q_item;
+		q_item->cache_callback.fn = cache_write_callback_fn;
+		q_item->cache_callback.data = q_item;
 #else
-			q_item->cache_callback.fn = NULL;
-			q_item->cache_callback.data = NULL;
+		q_item->cache_callback.fn = NULL;
+		q_item->cache_callback.data = NULL;
 #endif
 
-			q_item->callback_setup = 1;
-		}
+		q_item->callback_setup = 1;
+	    }
 		
-		if ( doneflag ) break;
+	    if ( doneflag ) break;
 		
 	} /* qlist_for_each element request */
 	
-	if ( !doneflag && wait_flag == BLOCKING_FLAG )
-		goto check_again;
+    if ( !doneflag && wait_flag == BLOCKING_FLAG )
+	goto check_again;
 
-	if ( mutex_flag )
-		gen_mutex_unlock(&flow_data->flow_mutex);
+    if ( mutex_flag )
+	gen_mutex_unlock(&flow_data->flow_mutex);
 	
-	return 0;
+    return 0;
 }
 
 
@@ -675,70 +685,71 @@ check_again:
  * no return value
  */
 static void bmi_recv_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				 PVFS_size actual_size,
+				 PVFS_error error_code)
 {
-	struct fp_queue_item* q_item = user_ptr;
-	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-	struct flow_descriptor * flow_d = flow_data->parent;
-	int ret;
+    struct fp_queue_item* q_item = user_ptr;
+    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+    struct flow_descriptor * flow_d = flow_data->parent;
+    int ret;
 
-	/* TODO: error handling */
-	if(error_code != 0)
-	{
-		PVFS_perror_gossip("bmi_recv_callback_fn error_code", 
-					error_code);
-		assert(0);
-	}
+    /* TODO: error handling */
+    if (error_code != 0)
+    {
+	PVFS_perror_gossip("bmi_recv_callback_fn error_code", 
+			   error_code);
+	assert(0);
+    }
 
-	gen_mutex_lock(&flow_data->flow_mutex);
+    gen_mutex_lock(&flow_data->flow_mutex);
 
-	/* remove from current queue */
-	qlist_del(&q_item->list_link);
+    /* remove from current queue */
+    qlist_del(&q_item->list_link);
 
-	flow_d->total_transfered += actual_size;
+    flow_d->total_transfered += actual_size;
 
 
-	/* release cache resource, since data has been received into 
-	 * the cache buffers 
-	 */
+    /* release cache resource, since data has been received into 
+     * the cache buffers 
+     */
 
-	ret = bmi_cache_release_cache_src(q_item);
+    ret = bmi_cache_release_cache_src(q_item);
 
-	/* TODO: error handling */
-	if ( ret < 0 ) {
-		PVFS_perror_gossip("bmi_recv_callback_fn: "
-                                   "error from  bmi_cache_release_cache_src:error_code", ret);
-		gen_mutex_unlock(&flow_data->flow_mutex);
-		return;
-	}
+    /* TODO: error handling */
+    if (ret < 0)
+    {
+	PVFS_perror_gossip("bmi_recv_callback_fn: "
+			   "error from  bmi_cache_release_cache_src:error_code", ret);
+	gen_mutex_unlock(&flow_data->flow_mutex);
+	return;
+    }
 
-	/* when no callback support from the cache, we take advantage of 
-	 * the BMI callback to make progress. That is, in the BMI callback 
-	 * function, we wait for the completion of another request from 
-	 * the cache component, then initiate BMI recv function. All these 
-	 * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
-	 * Be careful of "mutex" stuff. 
-	 */
+    /* when no callback support from the cache, we take advantage of 
+     * the BMI callback to make progress. That is, in the BMI callback 
+     * function, we wait for the completion of another request from 
+     * the cache component, then initiate BMI recv function. All these 
+     * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+     * Be careful of "mutex" stuff. 
+     */
 
-	/* no callback support from the cache */
-	if ( !q_item->cache_callback.fn )  
+    /* no callback support from the cache */
+    if (!q_item->cache_callback.fn)  
+    {
+	ret = bmi_cache_progress_check(flow_d, 
+				       BLOCKING_FLAG, 
+				       NO_MUTEX_FLAG,
+				       BMI_TO_CACHE);
+	if (ret < 0)
 	{
-		ret = bmi_cache_progress_check( flow_d, 
-						BLOCKING_FLAG, 
-						NO_MUTEX_FLAG,
-						BMI_TO_CACHE);
-		if ( ret < 0 )
-		{
-			PVFS_perror_gossip("bmi_recv_callback_fn: "
-                                           "error from bmi_cache_progress_check:error_code", ret);
-			gen_mutex_unlock(&flow_data->flow_mutex);
-			return;
-		}
+	    PVFS_perror_gossip("bmi_recv_callback_fn: "
+			       "error from bmi_cache_progress_check:error_code", ret);
+	    gen_mutex_unlock(&flow_data->flow_mutex);
+	    return;
 	}
+    }
 
-	gen_mutex_unlock(&flow_data->flow_mutex);
-	return;
+    gen_mutex_unlock(&flow_data->flow_mutex);
+    return;
 }
 	
 /* cache_write_callback_fn()
@@ -751,66 +762,66 @@ static void bmi_recv_callback_fn(void *u
  * no return value
  */
 static void cache_write_callback_fn(void *user_ptr,
-		           PVFS_error error_code)
+				    PVFS_error error_code)
 {
-	PVFS_size tmp_actual_size;
+    PVFS_size tmp_actual_size;
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
 #endif
 
-	struct fp_queue_item* q_item = user_ptr;
-	int ret;
-	PVFS_id_gen_t bmi_reqid;
+    struct fp_queue_item* q_item = user_ptr;
+    int ret;
+    PVFS_id_gen_t bmi_reqid;
 
-	/* TODO: error handling */
-	assert(error_code == 0);
+    /* TODO: error handling */
+    assert(error_code == 0);
 	
 
-	/* if cache supports callback, this function will called
-	 * independently. Otherwise, it is called inside of mutex.
-	 * Thus, there is no need holding mutex.
-	 */ 
+    /* if cache supports callback, this function will called
+     * independently. Otherwise, it is called inside of mutex.
+     * Thus, there is no need holding mutex.
+     */ 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_lock(&flow_data->flow_mutex);
+    gen_mutex_lock(&flow_data->flow_mutex);
 #endif
 
-	q_item->int_state = INT_REQ_COMPLETE;
+    q_item->int_state = INT_REQ_COMPLETE;
 
-	/* TODO: what if we recv less than expected? */
-	ret = BMI_post_recv_list(&bmi_reqid,
-		q_item->parent->src.u.bmi.address,
-		(void **)q_item->cache_req.moff_list,
-		q_item->cache_req.msize_list,
-		q_item->cache_req.mem_cnt,
-		q_item->cache_req.total_size,
-        	&tmp_actual_size,
-		q_item->cache_req.buffer_type,
-        	q_item->parent->tag,
-        	&q_item->bmi_callback,
-        	global_bmi_context);
+    /* TODO: what if we recv less than expected? */
+    ret = BMI_post_recv_list(&bmi_reqid,
+			     q_item->parent->src.u.bmi.address,
+			     (void **)q_item->cache_req.moff_list,
+			     q_item->cache_req.msize_list,
+			     q_item->cache_req.mem_cnt,
+			     q_item->cache_req.total_size,
+			     &tmp_actual_size,
+			     q_item->cache_req.buffer_type,
+			     q_item->parent->tag,
+			     &q_item->bmi_callback,
+			     global_bmi_context);
 
-	/* TODO: error handling */
-	assert(ret >= 0);
+    /* TODO: error handling */
+    assert(ret >= 0);
 
-	if(ret == 1)
-	{
+    if (ret == 1)
+    {
 #ifdef __CACHE_CALLBACK_SUPPORT__
-		gen_mutex_unlock(&flow_data->flow_mutex);
+	gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
-		/* immediate completion; trigger callback ourselves */
-		bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
+	/* immediate completion; trigger callback ourselves */
+	bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-		gen_mutex_lock(&flow_data->flow_mutex);
+	gen_mutex_lock(&flow_data->flow_mutex);
 #endif
-	}
+    }
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_unlock(&flow_data->flow_mutex);
+    gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-	return;
+    return;
 };
 
 
@@ -829,61 +840,61 @@ static void cache_write_callback_fn(void
  * no return value
  */
 static void cache_read_callback_fn(void *user_ptr,
-		           PVFS_error error_code)
+				   PVFS_error error_code)
 {
-	PVFS_size sent_size;
-	struct fp_queue_item* q_item = user_ptr;
+    PVFS_size sent_size;
+    struct fp_queue_item* q_item = user_ptr;
 
 #ifdef __CACHE_CALLBACK_SUPPORT
-	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
 #endif
 
-	PVFS_id_gen_t bmi_reqid;
-	int ret;
+    PVFS_id_gen_t bmi_reqid;
+    int ret;
 
-	/* TODO: error handling */
-	assert(error_code == 0);
+    /* TODO: error handling */
+    assert(error_code == 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_lock(&flow_data->flow_mutex);
+    gen_mutex_lock(&flow_data->flow_mutex);
 #endif
 
-	q_item->int_state = INT_REQ_COMPLETE;
+    q_item->int_state = INT_REQ_COMPLETE;
 
-	ret = BMI_post_send_list(&bmi_reqid,
-		q_item->parent->dest.u.bmi.address,
-		(const void **)q_item->cache_req.moff_list,
-		q_item->cache_req.msize_list,
-		q_item->cache_req.mem_cnt,
-		q_item->cache_req.total_size,
-		q_item->cache_req.buffer_type,
-		q_item->parent->tag,
-		&q_item->bmi_callback,
-		global_bmi_context);
+    ret = BMI_post_send_list(&bmi_reqid,
+			     q_item->parent->dest.u.bmi.address,
+			     (const void **)q_item->cache_req.moff_list,
+			     q_item->cache_req.msize_list,
+			     q_item->cache_req.mem_cnt,
+			     q_item->cache_req.total_size,
+			     q_item->cache_req.buffer_type,
+			     q_item->parent->tag,
+			     &q_item->bmi_callback,
+			     global_bmi_context);
 		
-	/* TODO: error handling */
-	assert(ret >= 0);
+    /* TODO: error handling */
+    assert(ret >= 0);
 
-	if(ret == 1)
-	{
+    if (ret == 1)
+    {
 #ifdef __CACHE_CALLBACK_SUPPORT__
-		gen_mutex_unlock(&flow_data->flow_mutex);
+	gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-		/* immediate completion; trigger callback ourselves */
-		sent_size = q_item->cache_req.total_size;
-		bmi_send_callback_fn(q_item, sent_size, 0);
+	/* immediate completion; trigger callback ourselves */
+	sent_size = q_item->cache_req.total_size;
+	bmi_send_callback_fn(q_item, sent_size, 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-		gen_mutex_lock(&flow_data->flow_mutex);
+	gen_mutex_lock(&flow_data->flow_mutex);
 #endif
-	}
+    }
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_unlock(&flow_data->flow_mutex);
+    gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-	return;
+    return;
 };
 
 /* bmi_send_callback_fn()
@@ -897,169 +908,179 @@ static void cache_read_callback_fn(void 
  * no return value
  */
 static void bmi_send_callback_fn(void *user_ptr,
-		         PVFS_size actual_size,
-		         PVFS_error error_code)
+				 PVFS_size actual_size,
+				 PVFS_error error_code)
 {
-	struct fp_queue_item* q_item = user_ptr;
-	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-	struct flow_descriptor * flow_d = flow_data->parent;
-	int ret;
+    struct fp_queue_item* q_item = user_ptr;
+    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+    struct flow_descriptor * flow_d = flow_data->parent;
+    int ret;
 
-	/* TODO: error handling */
-	if(error_code != 0)
-	{
-		PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
-		assert(0);
-	}
+    /* TODO: error handling */
+    if (error_code != 0)
+    {
+	PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
+	assert(0);
+    }
 
-	gen_mutex_lock(&flow_data->flow_mutex);
+    gen_mutex_lock(&flow_data->flow_mutex);
 
 
-	flow_d->total_transfered += actual_size;
+    flow_d->total_transfered += actual_size;
 
-	/* release cache resource */
+    /* release cache resource */
 
-	ret = bmi_cache_release_cache_src(q_item);
+    ret = bmi_cache_release_cache_src(q_item);
 
-	/* TODO: error handling */
-	if ( ret < 0 ) {
-		PVFS_perror_gossip("bmi_send_callback_fn: "
-                                   "from bmi_cache_release_cache_src:error_code", ret);
-		gen_mutex_unlock(&flow_data->flow_mutex);
-		return;
-	}
+    /* TODO: error handling */
+    if (ret < 0)
+    {
+	PVFS_perror_gossip("bmi_send_callback_fn: "
+			   "from bmi_cache_release_cache_src:error_code", ret);
+	gen_mutex_unlock(&flow_data->flow_mutex);
+	return;
+    }
 
-	/* remove from current queue. q_item must be in the 
-	 * cache_req_done_list when coming here.  */
+    /* remove from current queue. q_item must be in the 
+     * cache_req_done_list when coming here.  */
 
-	qlist_del(&q_item->list_link);
+    qlist_del(&q_item->list_link);
 
-	/* when no callback support from the cache, we take advantage of 
-	 * the BMI callback to make progress. That is, in the BMI callback 
-	 * function, we wait for the completion of another request from 
-	 * the cache component, then initiate BMI send function. All these 
-	 * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
-	 */
+    /* when no callback support from the cache, we take advantage of 
+     * the BMI callback to make progress. That is, in the BMI callback 
+     * function, we wait for the completion of another request from 
+     * the cache component, then initiate BMI send function. All these 
+     * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+     */
 
-	/* no callback support from the cache. */
-	if ( !q_item->cache_callback.fn )  
+    /* no callback support from the cache. */
+    if (!q_item->cache_callback.fn)  
+    {
+	ret = bmi_cache_progress_check(flow_d, 
+				       BLOCKING_FLAG, 
+				       NO_MUTEX_FLAG,
+				       CACHE_TO_BMI);
+	if (ret < 0)
 	{
-		ret = bmi_cache_progress_check( flow_d, 
-						BLOCKING_FLAG, 
-						NO_MUTEX_FLAG,
-						CACHE_TO_BMI );
-		if ( ret < 0 )
-		{
-			PVFS_perror_gossip("bmi_send_callback_fn: "
-                                           "from bmi_cache_progress_check:error_code", ret);
-			gen_mutex_unlock(&flow_data->flow_mutex);
-			return;
-		}
+	    PVFS_perror_gossip("bmi_send_callback_fn: "
+			       "from bmi_cache_progress_check:error_code", ret);
+	    gen_mutex_unlock(&flow_data->flow_mutex);
+	    return;
 	}
+    }
 
-	gen_mutex_unlock(&flow_data->flow_mutex);
+    gen_mutex_unlock(&flow_data->flow_mutex);
 
-	free(q_item);
+    free(q_item);
 
-	return;
+    return;
 }
 
 static int bmi_cache_check_cache_req(struct fp_queue_item *qitem) 
 {
-	cache_reply_t reply;
-	int flag = 0;
-	int ret = -1;
+    cache_reply_t reply;
+    int flag = 0;
+    int ret = -1;
 		
-	ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
-	if ( ret < 0 ) 
-	{
-		PVFS_perror_gossip("bmi_cache_check_cache_req: "
-                        "error_code", ret);
-		return ret;
-	}
+    ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
+    if (ret < 0) 
+    {
+	PVFS_perror_gossip("bmi_cache_check_cache_req: "
+			   "error_code", ret);
+	return ret;
+    }
 
-	/* a request is finished. */
-	if ( flag )
-	{
-		qitem->cache_req.mem_cnt = reply.count;
-		qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
-		qitem->cache_req.msize_list = reply.cbuf_size_array;
-		qitem->cache_req.errval = reply.errval;
+    /* a request is finished. */
+    if (flag)
+    {
+	qitem->cache_req.mem_cnt = reply.count;
+	qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
+	qitem->cache_req.msize_list = reply.cbuf_size_array;
+	qitem->cache_req.errval = reply.errval;
 
-		return 1;
-	}
-	return 0;
+	return 1;
+    }
+    return 0;
 }
 
 static int bmi_cache_release_cache_src(struct fp_queue_item *qitem)
 {
-	int ret;
+    int ret;
 	
-	ret = cache_req_done( &(qitem->cache_req.request) );	
-	if ( ret < 0 ) 
-	{
-		PVFS_perror_gossip("bmi_cache_release_cache_src: "
-                        "error_code", ret);
-		return ret;
-	}
+    ret = cache_req_done( &(qitem->cache_req.request) );	
+    if (ret < 0) 
+    {
+	PVFS_perror_gossip("bmi_cache_release_cache_src: "
+			   "error_code", ret);
+	return ret;
+    }
 	
-	return 0;
+    return 0;
 }
 
-static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op )
+static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op)
 {
-	int ret = 0;
-	cache_read_desc_t desc1;
-	cache_write_desc_t desc2;
-	cache_reply_t reply;
+    int ret = 0;
+    cache_read_desc_t desc1;
+    cache_write_desc_t desc2;
+    cache_reply_t reply;
 
-	if ( op == BMI_TO_CACHE )  /* write */
-	{
-		desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
-		desc2.handle = qitem->parent->dest.u.trove.handle;
-		desc2.context_id = global_trove_context;
-
-		/* TODO: if we use intemediate buffer, change here */
-		desc2.buffer = NULL;
-		desc2.len = 0;
+    if (op == BMI_TO_CACHE)  /* write */
+    {
+	desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
+	desc2.handle = qitem->parent->dest.u.trove.handle;
+	desc2.context_id = global_trove_context;
+
+	/* TODO: if we use intemediate buffer, change here */
+	desc2.buffer = NULL;
+	desc2.len = 0;
 		
-		desc2.stream_array_count  = qitem->pint_req.result.segs; 
-		desc2.stream_offset_array = qitem->pint_req.result.offset_array;
-		desc2.stream_size_array = qitem->pint_req.result.size_array;
-
-		ret = cache_write_post( &desc2, 
-					&qitem->cache_req.request, 
-					&reply,
-					NULL );
-		if ( ret < 0 ) 
-		{
-			PVFS_perror_gossip("bmi_cache_init_cache_req: "
-                                     "error_code", ret);
-		}
-	}
-	else /* read */
+	desc2.stream_array_count  = qitem->pint_req.result.segs; 
+	desc2.stream_offset_array = qitem->pint_req.result.offset_array;
+	desc2.stream_size_array = qitem->pint_req.result.size_array;
+
+	ret = cache_write_post(&desc2, 
+			       &qitem->cache_req.request, 
+			       &reply,
+			       NULL);
+	if (ret < 0)
 	{
-		desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
-		desc1.handle = qitem->parent->dest.u.trove.handle;
-		desc1.context_id = global_trove_context;
-
-		/* TODO: if we use intemediate buffer, change here */
-		desc1.buffer = NULL;
-		desc1.len = 0;
+	    PVFS_perror_gossip("bmi_cache_init_cache_req: "
+			       "error_code", ret);
+	}
+    }
+    else /* read */
+    {
+	desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
+	desc1.handle = qitem->parent->dest.u.trove.handle;
+	desc1.context_id = global_trove_context;
+
+	/* TODO: if we use intemediate buffer, change here */
+	desc1.buffer = NULL;
+	desc1.len = 0;
 		
-		desc1.stream_array_count  = qitem->pint_req.result.segs; 
-		desc1.stream_offset_array = qitem->pint_req.result.offset_array;
-		desc1.stream_size_array = qitem->pint_req.result.size_array;
-
-		ret = cache_read_post( &desc1, 
-					&qitem->cache_req.request, 
-					&reply,
-					NULL );
-		if ( ret < 0 ) 
-		{
-			PVFS_perror_gossip("bmi_cache_init_cache_req: "
-                                     "error_code", ret);
-		}
+	desc1.stream_array_count  = qitem->pint_req.result.segs; 
+	desc1.stream_offset_array = qitem->pint_req.result.offset_array;
+	desc1.stream_size_array = qitem->pint_req.result.size_array;
+
+	ret = cache_read_post(&desc1, 
+			      &qitem->cache_req.request, 
+			      &reply,
+			      NULL );
+	if (ret < 0) 
+	{
+	    PVFS_perror_gossip("bmi_cache_init_cache_req: "
+			       "error_code", ret);
 	}
-	return ret;
+    }
+    return ret;
 }
+
+/*
+ * Local variables:
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ts=8 sts=4 sw=4 noexpandtab
+ */



More information about the PVFS2-CVS mailing list