[PVFS2-CVS] commit by wujs in pvfs2-1/src/io/flow/flowproto-bmi-cache: flowproto-bmi-cache-server.c

CVS commit program cvs at parl.clemson.edu
Mon Mar 22 23:02:20 EST 2004


Update of /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-cache
In directory parlweb:/tmp/cvs-serv1811/pvfs2/src/io/flow/flowproto-bmi-cache

Modified Files:
	flowproto-bmi-cache-server.c 
Log Message:
Changes to cater for the changes in the buffer cache code.



Index: flowproto-bmi-cache-server.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-cache/flowproto-bmi-cache-server.c,v
diff -p -u -r1.7 -r1.8
--- flowproto-bmi-cache-server.c	21 Mar 2004 20:53:30 -0000	1.7
+++ flowproto-bmi-cache-server.c	23 Mar 2004 04:02:20 -0000	1.8
@@ -11,6 +11,8 @@
 #include <sys/time.h>
 #include <unistd.h>
 
+//#undef __PVFS2_TROVE_SUPPORT__
+
 #include "gossip.h"
 #include "quicklist.h"
 #include "flow.h"
@@ -57,23 +59,23 @@ struct pint_req_entry
 
 struct cache_req_entry
 {
-    cache_request_t request;  /* cache request handle */
-    int 	errval;	      /* error code */
-    int mem_cnt;	      /* how many buffers */
+	cache_request_t request;  /* cache request handle */
+	int 	errval;		/* error code */
+	int mem_cnt;			/* how many buffers */
 
-    /* buffer size array, array space provided by the cache */
-    PVFS_size *msize_list; 
+	/* buffer size array, array space provided by the cache */
+	PVFS_size *msize_list; 
 
-    /* "total_size" is the sum of the size list */
-    PVFS_size total_size; 
+	/* "total_size" is the sum of the size list */
+	PVFS_size total_size; 
 
-    /* buffer offset array, provided by the cache */
-    PVFS_offset **moff_list; 
+	/* buffer offset array, provided by the cache */
+	PVFS_offset **moff_list; 
 
-    /* if this is not NULL, this buffer is supplied by the flow */
-    PVFS_offset *buffer;
+	/* if this is not NULL, this buffer is supplied by the flow */
+	PVFS_offset *buffer;
 
-    enum bmi_buffer_type buffer_type;
+	enum bmi_buffer_type buffer_type;
 };
 
 
@@ -81,22 +83,22 @@ struct cache_req_entry
  * A request --> a flow --> a list of fp_queue_items */
 struct fp_queue_item
 {
-    /* point to the flow descriptor */
-    flow_descriptor* 	parent;	
+	/* point to the flow descriptor */
+	flow_descriptor* 	parent;	
 
-    /* PINT request information */
-    struct pint_req_entry 	pint_req; 
+	/* PINT request information */
+	struct pint_req_entry 	pint_req; 
 
-    /* cache request information */
-    struct cache_req_entry 	cache_req; 
-    int int_state;
-
-    /* flag to show whether the callbacks are set up */
-    int 	callback_setup;
-    struct PINT_thread_mgr_bmi_callback bmi_callback;
-    struct PINT_thread_mgr_trove_callback cache_callback;
+	/* cache request information */
+	struct cache_req_entry 	cache_req; 
+	int int_state;
+
+	/* flag to show whether the callbacks are set up */
+	int 	callback_setup;
+	struct PINT_thread_mgr_bmi_callback bmi_callback;
+	struct PINT_thread_mgr_trove_callback cache_callback;
 
-    struct qlist_head list_link;
+	struct qlist_head list_link;
 };
 
 /* fp_private_data is information specific to this flow protocol, stored
@@ -104,22 +106,22 @@ struct fp_queue_item
  */
 struct fp_private_data
 {
-    /* point to the flow descriptor */
-    flow_descriptor* parent;       
+	/* point to the flow descriptor */
+	flow_descriptor* parent;       
 
-    /* PINT request done? */
-    int pint_request_done;	
+	/* PINT request done? */
+	int pint_request_done;	
 	
-    /* PINT request list */
-    struct qlist_head pint_req_list;
+	/* PINT request list */
+	struct qlist_head pint_req_list;
 
-    /* requests completed on the cache side */ 
-    struct qlist_head cache_req_done_list;  
+	/* requests completed on the cache side */ 
+	struct qlist_head cache_req_done_list;  
 
-    PVFS_size total_bytes_processed;
-    TROVE_context_id trove_context;
+	PVFS_size total_bytes_processed;
+	TROVE_context_id trove_context;
 
-    gen_mutex_t flow_mutex;
+	gen_mutex_t flow_mutex;
 };
 
 #define PRIVATE_FLOW(target_flow)\
@@ -132,12 +134,12 @@ static TROVE_context_id global_trove_con
 
 
 static void bmi_recv_callback_fn(void *user_ptr,
-				 PVFS_size actual_size,
-				 PVFS_error error_code);
+		         PVFS_size actual_size,
+		         PVFS_error error_code);
 
 static void bmi_send_callback_fn(void *user_ptr,
-				 PVFS_size actual_size,
-				 PVFS_error error_code);
+		         PVFS_size actual_size,
+		         PVFS_error error_code);
 
 #if 0
 /* the above function is a special case; we need to look at a return
@@ -145,8 +147,8 @@ static void bmi_send_callback_fn(void *u
  * to trigger it from a callback
  */
 static void bmi_send_callback_wrapper(void *user_ptr,
-				      PVFS_size actual_size,
-				      PVFS_error error_code)
+		         PVFS_size actual_size,
+		         PVFS_error error_code)
 {
     bmi_send_callback_fn(user_ptr, actual_size, error_code);
     return;
@@ -154,19 +156,19 @@ static void bmi_send_callback_wrapper(vo
 #endif
 
 static void cache_read_callback_fn(void *user_ptr,
-				   PVFS_error error_code);
+		           PVFS_error error_code);
 
 static void cache_write_callback_fn(void *user_ptr,
-				    PVFS_error error_code);
+		           PVFS_error error_code);
 
 /* protocol specific functions */
 static int  bmi_cache_request_init(struct fp_private_data *flow_data, 
-				   int direction);
+			int direction);
 
 static int  bmi_cache_progress_check(struct flow_descriptor *flow_d, 
-				     int wait_flag, 
-				     int mutex_flag,
-				     int direction);
+			int wait_flag, 
+			int mutex_flag,
+			int direction);
 
 
 static int bmi_cache_check_cache_req(struct fp_queue_item *q_item); 
@@ -180,12 +182,12 @@ static int fp_bmi_cache_initialize(int f
 static int fp_bmi_cache_finalize(void);
 
 static int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
-				int option,
-				void *parameter);
+			       int option,
+			       void *parameter);
 
 static int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
-				int option,
-				void *parameter);
+			       int option,
+			       void *parameter);
 
 static int fp_bmi_cache_post(flow_descriptor * flow_d);
 
@@ -198,10 +200,15 @@ struct flowproto_ops fp_bmi_cache_ops = 
     fp_bmi_cache_finalize,
     fp_bmi_cache_getinfo,
     fp_bmi_cache_setinfo,
-    fp_bmi_cache_post,
-    NULL
+    fp_bmi_cache_post
 };
 
+
+/* TODO: where we initialize cache. For the timebeing, I put it here. 
+ * This should be changed later.
+ */ 
+static int cache_initialized = 0;
+
 /* fp_bmi_cache_initialize()
  *
  * starts up the flow protocol
@@ -217,19 +224,9 @@ int fp_bmi_cache_initialize(int flowprot
 	return(ret);
     PINT_thread_mgr_bmi_getcontext(&global_bmi_context);
 
-#ifdef __PVFS2_TROVE_SUPPORT__
-    ret = PINT_thread_mgr_trove_start();
-    if(ret < 0)
-    {
-	PINT_thread_mgr_bmi_stop();
-	return(ret);
-    }
-    PINT_thread_mgr_trove_getcontext(&global_trove_context);
-#endif
-
     fp_bmi_cache_id = flowproto_id;
 
-    return 0;
+    return(0);
 }
 
 /* fp_bmi_cache_finalize()
@@ -241,10 +238,7 @@ int fp_bmi_cache_initialize(int flowprot
 int fp_bmi_cache_finalize(void)
 {
     PINT_thread_mgr_bmi_stop();
-#ifdef __PVFS2_TROVE_SUPPORT__
-    PINT_thread_mgr_trove_stop();
-#endif
-    return 0;
+    return (0);
 }
 
 /* fp_bmi_cache_getinfo()
@@ -254,21 +248,21 @@ int fp_bmi_cache_finalize(void)
  * returns 0 on success, -PVFS_error on failure
  */
 int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
-			 int option,
-			 void *parameter)
+			       int option,
+			       void *parameter)
 {
     int* type;
 
-    switch (option)
+    switch(option)
     {
 	case FLOWPROTO_TYPE_QUERY:
 	    type = parameter;
 	    if(*type == FLOWPROTO_MULTIQUEUE)
 		return(0);
 	    else
-		return -PVFS_ENOPROTOOPT;
+		return(-PVFS_ENOPROTOOPT);
 	default:
-	    return -PVFS_ENOSYS;
+	    return(-PVFS_ENOSYS);
 	    break;
     }
 }
@@ -280,10 +274,10 @@ int fp_bmi_cache_getinfo(flow_descriptor
  * returns 0 on success, -PVFS_error on failure
  */
 int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
-			 int option,
-			 void *parameter)
+			       int option,
+			       void *parameter)
 {
-    return -PVFS_ENOSYS;
+    return (-PVFS_ENOSYS);
 }
 
 /* fp_bmi_cache_post()
@@ -294,154 +288,188 @@ int fp_bmi_cache_setinfo(flow_descriptor
  */
 int fp_bmi_cache_post(flow_descriptor * flow_d)
 {
-    struct fp_private_data* flow_data = NULL;
-    int ret;
-
-    /* on the server side: only two possible types */
-    assert( (flow_d->src.endpoint_id == BMI_ENDPOINT && 
-	     flow_d->dest.endpoint_id == TROVE_ENDPOINT) ||
-	    (flow_d->src.endpoint_id == TROVE_ENDPOINT &&
-	     flow_d->dest.endpoint_id == BMI_ENDPOINT) );
-
-    flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
-    if(flow_data == NULL)
-    {
-	return -PVFS_ENOMEM;
-    }
-    memset(flow_data, 0, sizeof(struct fp_private_data));
+	struct fp_private_data* flow_data = NULL;
+	NCAC_info_t info;
+	int ret;
+
+	/* on the server side: only two possible types */
+	fprintf(stderr, "src.endpoint:%d, desc.endpoint:%d\n", flow_d->src.endpoint_id, flow_d->dest.endpoint_id);
+
+	assert( (flow_d->src.endpoint_id == BMI_ENDPOINT && 
+		flow_d->dest.endpoint_id == CACHE_ENDPOINT) ||
+		(flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+		flow_d->dest.endpoint_id == BMI_ENDPOINT) );
+
+	/* TODO: seems not right here. coll_id --> trove_context id */
+	if ( !cache_initialized ) {
+		info.max_req_num = 1000;
+		info.extsize     = 32768;
+		info.cachesize   = 40*1048576;
+		info.cachespace = malloc(info.cachesize);
+		if (!info.cachespace) {
+			fprintf(stderr, "cannot allocate memory for the cache\n");
+			return(-PVFS_ENOMEM);
+		}
+		ret = trove_open_context(flow_d->dest.u.trove.coll_id, &global_trove_context);
+		if (ret < 0)
+		{
+			fprintf(stderr, "TROVE_open_context() failure.\n");
+			return (-1);
+		}
+		fprintf(stderr, "collid:%d, trove_context:%d\n", flow_d->dest.u.trove.coll_id, global_trove_context);
 
-    flow_d->flow_protocol_data = flow_data;
-    flow_d->state = FLOW_TRANSMITTING;
+		ret = cache_init(&info);
+		if ( ret < 0 )
+		{
+			fprintf(stderr, "init cache error\n");
+			return ret;
+		}
 
-    /* protocol specific fields */
-    flow_data->parent = flow_d;
-    INIT_QLIST_HEAD(&flow_data->pint_req_list);
-    flow_data->pint_request_done = 0;
-    INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
-
-    flow_data->trove_context = global_trove_context;
-    gen_mutex_init(&flow_data->flow_mutex);
-
-    /* if a file datatype offset was specified, go ahead and skip ahead 
-     * before doing anything else
-     */
-    if (flow_d->file_req_offset)
-    {
-	PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state, 
-				      flow_d->file_req_offset);
-    }
+		cache_initialized = 1;
+		fprintf(stderr, "cache is initialized\n");
+	}
 
-    /* set boundaries on file datatype */
-    if(flow_d->aggregate_size > -1)
-    {
-	PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state, 
-				     flow_d->aggregate_size + 
-				     flow_d->file_req_offset);
-    }
-    else
-    {
-	PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
-				     flow_d->file_req_offset +
-				     PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
-    }
+	flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
+	if(!flow_data)
+		return(-PVFS_ENOMEM);
+	memset(flow_data, 0, sizeof(struct fp_private_data));
+
+	flow_d->flow_protocol_data = flow_data;
+	flow_d->state = FLOW_TRANSMITTING;
+
+	/* protocol specific fields */
+	flow_data->parent = flow_d;
+	INIT_QLIST_HEAD(&flow_data->pint_req_list);
+	flow_data->pint_request_done = 0;
+	INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
 
-    /* (1) init requests; (2) check progress; later all progress checks 
-     * are driven by callbacks.
-     */
+	flow_data->trove_context = global_trove_context;
+	gen_mutex_init(&flow_data->flow_mutex);
 
-    if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
-       flow_d->dest.endpoint_id == BMI_ENDPOINT)
-    {
-	/* CACHE --> BMI flow: read from cache, then send to the 
-	 * network. When the cache supports callback, the cache 
-	 * callback triggers BMI operations; the BMI callback triggers 
-	 * the release of cache resources. 
-	 * When the cache does not support callback, as in the current
-	 * implementation, we use the following progress method:
-	 * (1) wait for the completion of the first cache request;
-	 * (2) initiate the BMI send operation;
-	 * (3) in the BMI send callback function, we wait for the 
-	 *     completion of the next cache request;  
-	 * (4) goto step (2).
-	 */ 
+	/* if a file datatype offset was specified, go ahead and skip ahead 
+ 	 * before doing anything else
+	 */
+	if(flow_d->file_req_offset)
+		PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state, 
+					flow_d->file_req_offset);
 
-	ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
-	if (ret < 0)
+	/* set boundaries on file datatype */
+	if(flow_d->aggregate_size > -1)
+	{
+		PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state, 
+			flow_d->aggregate_size+flow_d->file_req_offset);
+	}
+	else
 	{
-	    PVFS_perror_gossip("bmi_cache_request_init error: "
-			       "error_code", ret);
-	    return ret;
+		PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
+					flow_d->file_req_offset +
+			PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
 	}
 
-	/* check progress. MUTEX_FLAG is needed because possible
-	 * callbacks may happen at the same time.
+	/* (1) init requests; (2) check progress; later all progress checks 
+	 * are driven by callbacks.
 	 */
-	ret = bmi_cache_progress_check( flow_d, 
-					BLOCKING_FLAG,
-					MUTEX_FLAG,
-					CACHE_TO_BMI );
-	if (ret < 0)
+
+	if( flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+		flow_d->dest.endpoint_id == BMI_ENDPOINT )
 	{
-	    PVFS_perror_gossip("bmi_cache_progress_check error: "
-			       "error_code", ret);
-	    return ret;
-	}
+		/* CACHE --> BMI flow: read from cache, then send to the 
+		 * network. When the cache supports callback, the cache 
+		 * callback triggers BMI operations; the BMI callback triggers 
+		 * the release of cache resources. 
+		 * When the cache does not support callback, as in the current
+		 * implementation, we use the following progress method:
+		 * (1) wait for the completion of the first cache request;
+		 * (2) initiate the BMI send operation;
+		 * (3) in the BMI send callback function, we wait for the 
+		 *     completion of the next cache request;  
+		 * (4) goto step (2).
+		 */ 
+
+		ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
+		if ( ret < 0 ) {
+			PVFS_perror_gossip("bmi_cache_request_init error: "
+                                           "error_code", ret);
+			return ret;
+		}
 
-    }
-    else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
-	    flow_d->dest.endpoint_id == TROVE_ENDPOINT)
-    {
-	/* BMI--->CACHE flow: (1) init requests; (2) check progress;
-	 * later all progress checks are driven by callbacks;
-	 *   cache callbacks to indicate that data buffers are 
-	 *   available;
-	 *	then trigger BMI receive operations to receive data 
-	 * 	into the cache buffers;
-	 *	the BMI call back function will trigger releasing 
-	 *	cache buffers.
-	 * In the current implementation, cache does not provide 
-	 * callback (we will add it later), the progress of cache 
-	 * is checked in the BMI call back function. That means, in 
-	 * the BMI call back function, we release cache buffers for 
-	 * the previous queue item; then we see if the cache buffers 
-	 * are available for another queue item (polling with time 
-	 * idle), if yes, initiate BMI receive operations.
-	 */ 
+		if ( ret == 1 ) { /* immediate completion */
+			return ret;
+		}
+		
+		/* check progress. MUTEX_FLAG is needed because possible
+		 * callbacks may happen at the same time.
+		 */
+		ret = bmi_cache_progress_check( flow_d, 
+						BLOCKING_FLAG,
+						MUTEX_FLAG,
+						CACHE_TO_BMI );
+		if ( ret < 0 )
+		{
+			PVFS_perror_gossip("bmi_cache_progress_check error: "
+                                           "error_code", ret);
+			return ret;
+		}
 
-	ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
-	if ( ret < 0 ) {
-	    PVFS_perror_gossip("bmi_cache_request_init error: "
-			       "error_code", ret);
-	    return ret;
 	}
+	else if( flow_d->src.endpoint_id == BMI_ENDPOINT &&
+			 flow_d->dest.endpoint_id == CACHE_ENDPOINT )
+	{
+		/* BMI--->CACHE flow: (1) init requests; (2) check progress;
+		 * later all progress checks are driven by callbacks;
+		 *   cache callbacks to indicate that data buffers are 
+		 *   available;
+		 *	then trigger BMI receive operations to receive data 
+		 * 	into the cache buffers;
+		 *	the BMI call back function will trigger releasing 
+		 *	cache buffers.
+		 * In the current implementation, cache does not provide 
+		 * callback (we will add it later), the progress of cache 
+		 * is checked in the BMI call back function. That means, in 
+		 * the BMI call back function, we release cache buffers for 
+		 * the previous queue item; then we see if the cache buffers 
+		 * are available for another queue item (polling with time 
+		 * idle), if yes, initiate BMI receive operations.
+		 */ 
+
+		ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
+		if ( ret < 0 ) {
+			PVFS_perror_gossip("bmi_cache_request_init error: "
+                                           "error_code", ret);
+			return ret;
+		}
 
-	/* check progress. MUTEX_FLAG is needed because possible
-	 * callbacks may happen at the same time.
-	 */
+		if ( ret == 1 ) { /* immediate completion */
+			return ret;
+		}
+
+		/* check progress. MUTEX_FLAG is needed because possible
+		 * callbacks may happen at the same time.
+		 */
 
-	ret = bmi_cache_progress_check(flow_d, 
-				       BLOCKING_FLAG, 
-				       MUTEX_FLAG,
-				       BMI_TO_CACHE);
-	if ( ret < 0 )
+		ret = bmi_cache_progress_check(flow_d, 
+					BLOCKING_FLAG, 
+					MUTEX_FLAG,
+					BMI_TO_CACHE);
+		if ( ret < 0 )
+		{
+			PVFS_perror_gossip("bmi_cache_progress_check error: "
+                                           "error_code", ret);
+			return ret;
+		}
+
+	}
+	else
 	{
-	    PVFS_perror_gossip("bmi_cache_progress_check error: "
-			       "error_code", ret);
-	    return ret;
+		return(-ENOSYS);
 	}
 
-    }
-    else
-    {
-	return -PVFS_ENOSYS;
-    }
-
-    return (0);
+	return (0);
 }
 
 
 /* bmi_cache_request_init(): get request information from "request component",
- * and inititate related cache requests
+ * and initiate the related cache requests
  * The basic idea in bmi_cache_request_init() is to chop a big request into 
  * several smaller requests (we called fp_queue_item) and link these small 
  * requests together. The direct purposes is to enable pipelining and to 
@@ -449,92 +477,99 @@ int fp_bmi_cache_post(flow_descriptor * 
  * return: 0: success;  <0 error; 1: complete;
  */ 
 
-int bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
+int  bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
 {
-    struct flow_descriptor *flow_d = flow_data->parent;
-    struct fp_queue_item *new_qitem = NULL;
-    struct pint_req_entry* pint_req = NULL;
-    PVFS_size bytes_processed = 0;
+	struct flow_descriptor *flow_d = flow_data->parent;
+	struct fp_queue_item *new_qitem = NULL;
+	struct pint_req_entry* pint_req = NULL;
+	PVFS_size bytes_processed = 0;
 
-    int ret;
+	int ret;
 
-    /* zero request. This is wrong, TO DO it later. */
-    if (flow_d->total_transfered >= flow_data->total_bytes_processed)
-    {
-	flow_d->state = FLOW_COMPLETE;
-	free(flow_data);
-	flow_d->release(flow_d);
-	flow_d->callback(flow_d);
-	return 1;
-    }
+	fprintf(stderr, "bmi_cache_request_init: enter\n");
+
+	/* Handle a zero request. TODO: check whether this is right or not. */
+	if ( flow_d->file_data.fsize == 0 ) 
+	{
+		flow_d->state = FLOW_COMPLETE;
+		free(flow_data);
+		flow_d->release(flow_d);
+		flow_d->callback(flow_d);
+		fprintf(stderr, "bmi_cache_request_init: exit with return 1. zero request.\n");
+		return(1);
+	}
+
+	/* get request information before launching cache request.
+	 * Basically, we need to know what the request is, including
+	 * file regions, each region offset, and each region length. 
+	 * A pipelining idea is included in the following steps:
+	 *    a large flow request ---> several requests with "MAX_REGIONS"
+	 *	   and "BUFFER_SIZE" ---> pint_req_list 
+	 * Later, we process each element in the pint_req_list:
+	 *  1) dequeue an element from the list;
+	 *	2) init cache request;
+	 *  3) drive BMI operations;
+	 *  4) enqueue the element into another list;
+	 * If all elements have been through the above steps,
+	 * the request is done.
+	 */
+
+	assert ( flow_data->pint_request_done == 0 );
 
-    /* get request information before launching cache request.
-     * Basically, we need to know what the request is, including
-     * file regions, each region offset, and each region length. 
-     * A pipelining idea is included in the following steps:
-     *    a large flow request ---> several requests with "MAX_REGIONS"
-     *	   and "BUFFER_SIZE" ---> pint_req_list 
-     * Later, we process each element in the pint_req_list:
-     *  1) dequeue an element from the list;
-     *	2) init cache request;
-     *  3) drive BMI operations;
-     *  4) enqueue the element into another list;
-     * If all elements have been through the above steps,
-     * the request is done.
-     */
-
-    assert ( flow_data->pint_request_done == 0 );
-
-    /* get PINT_requests and initiate related cache requests */
-    do {
-	new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
-	assert(new_qitem);
-	memset(new_qitem, 0 , sizeof(struct fp_queue_item));
+	/* get PINT_requests and initiate related cache requests */
+	do {
+		new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
+		assert(new_qitem);
+		memset(new_qitem, 0 , sizeof(struct fp_queue_item));
 
-	new_qitem->parent = flow_d;
+		new_qitem->parent = flow_d;
 		
-	/* process request */
-	pint_req = & new_qitem->pint_req;
-	pint_req->result.offset_array = pint_req->offset_list;
-	pint_req->result.size_array = pint_req->size_list;
-	pint_req->result.bytemax = BUFFER_SIZE;
-	pint_req->result.bytes = 0;
-	pint_req->result.segmax = MAX_REGIONS;
-	pint_req->result.segs = 0;
-
-	ret = PINT_Process_request(flow_d->file_req_state,
-				   flow_d->mem_req_state,
-				   &flow_d->file_data,
-				   &pint_req->result,
-				   PINT_SERVER);
-	/* TODO: error handling */
-	assert(ret >= 0);
+		/* process request */
+		pint_req = & new_qitem->pint_req;
+		pint_req->result.offset_array = pint_req->offset_list;
+		pint_req->result.size_array = pint_req->size_list;
+		pint_req->result.bytemax = BUFFER_SIZE;
+		pint_req->result.bytes = 0;
+		pint_req->result.segmax = MAX_REGIONS;
+		pint_req->result.segs = 0;
+
+		ret = PINT_Process_request( flow_d->file_req_state,
+					flow_d->mem_req_state,
+					&flow_d->file_data,
+					&pint_req->result,
+					PINT_SERVER );
+		/* TODO: error handling */
+		assert(ret >= 0);
 
-	/* submit the cache request */
-	ret = bmi_cache_init_cache_req(new_qitem, direction);
+		/* submit the cache request */
+		ret = bmi_cache_init_cache_req(new_qitem, direction);
 
-	/* TODO: error handling */
-	assert(ret >= 0);
+		/* TODO: error handling */
+		assert(ret >= 0);
 
-	new_qitem->int_state = INT_REQ_PROCESSING;
+		new_qitem->int_state = INT_REQ_PROCESSING;
 
-	/* immediate completion on the cache request */
-	if (ret == 1) 
-	{
-	    new_qitem->int_state = INT_REQ_COMPLETE;
-	}
+		/* immediate completion on the cache request */
+		if ( ret == 1 ) 
+		{
+			fprintf(stderr, "bmi_cache_init_cache_req: immediate completion\n");
+			new_qitem->int_state = INT_REQ_COMPLETE;
+		}
 		
-	/* put the request into the chain */
-	qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
-	bytes_processed += pint_req->result.bytes;
+		/* put the request into the chain */
+		qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
+		bytes_processed += pint_req->result.bytes;
+
+   	} while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
 
-    } while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
 	
-    flow_data->pint_request_done = 1;
+	flow_data->pint_request_done = 1;
+
+	flow_data->total_bytes_processed = bytes_processed;
 
-    flow_data->total_bytes_processed = bytes_processed;
+	fprintf(stderr, "bmi_cache_request_init: exit with return 0 (bytes_processed=%Ld)\n", bytes_processed);
 
-    return 0;
+	return 0;
 
 } /* end of bmi_cache_request_init() */
 
@@ -547,129 +582,134 @@ int bmi_cache_request_init(struct fp_pri
  *	 (2) mutex_flag is off
  *	    -- this is called in other callbacks which hold mutex.
  *	wait_flag: 
- (1) non-blocking; (2) blocking	
+	 (1) non-blocking; (2) blocking	
  *	direction:
  *	 (1) BMI_TO_CACHE; (2) CACHE_TO_BMI
+ *
+ * return values:
+ *      < 0: error
  */
 
 int bmi_cache_progress_check(struct flow_descriptor *flow_d, 
-			     int wait_flag, 
-			     int mutex_flag,
-			     int direction)
+			int wait_flag, 
+			int mutex_flag,
+			int direction)
 {
-    struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
-    struct fp_queue_item *q_item = NULL;
-    struct qlist_head *tmp_link = NULL;
-    int doneflag = 0;
-    int ret;
+	struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
+	struct fp_queue_item *q_item = NULL;
+	struct qlist_head *tmp_link = NULL;
+	int doneflag = 0;
+	int ret;
+
+	assert ( flow_data->pint_request_done == 1 );
 
-    assert ( flow_data->pint_request_done == 1 );
+	if ( qlist_empty(&flow_data->pint_req_list) ) 
+		return 1; 
 
-    if ( qlist_empty(&flow_data->pint_req_list) ) 
-    {
-	return 1; 
-    }
-
-    doneflag = 0;
+	fprintf(stderr, "bmi_cache_progress_check: enter (direction:%d)\n", direction);
+	doneflag = 0;
 
-    if (mutex_flag)
-    {
-	gen_mutex_lock(&flow_data->flow_mutex);
-    }
+	if ( mutex_flag ) {
+		gen_mutex_lock(&flow_data->flow_mutex);
+	}
 
- check_again:
+check_again:
 
-    qlist_for_each(tmp_link, &flow_data->pint_req_list)
+	qlist_for_each(tmp_link, &flow_data->pint_req_list)
 	{
-	    q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
-
-	    if ( q_item->int_state == INT_REQ_INIT )
-	    {
-		/* wrong state */
-		PVFS_perror_gossip("bmi_cache_progress_check error: "
-				   "wrong internal state:error code", -1);
-
-		if ( mutex_flag )
-		    gen_mutex_unlock(&flow_data->flow_mutex);
-
-		/* TODO: error code */
-		return -1;
-	    }
-
-	    /* internal reuqest has been issued, but in processing */
-	    if ( q_item->int_state == INT_REQ_PROCESSING ) 
-	    {
-		/* check the cache request */
-		ret = bmi_cache_check_cache_req(q_item); 
+		q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
 
-		/* TODO: error handling */
-		if ( ret < 0 ) 
+		if ( q_item->int_state == INT_REQ_INIT )
 		{
-		    if ( mutex_flag ) 
-			gen_mutex_unlock(&flow_data->flow_mutex);
-		    return -1;
-		}
-	
-		if ( ret == 1 ) { 
-		    q_item->int_state = INT_REQ_COMPLETE;
-		}
-	    }
+			/* wrong state */
+			PVFS_perror_gossip("bmi_cache_progress_check error: "
+                                           "wrong internal state:error code", -1);
 
-	    /* internal reuqest has been finished, buffers are available 
-	     * for BMI operations.
-	     */
-	    if ( q_item->int_state == INT_REQ_COMPLETE )
-	    {
-		/* move the item from pint_req_list to cache_req_done_list */
-		qlist_del(&q_item->list_link);
-		qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
+			if ( mutex_flag )
+				gen_mutex_unlock(&flow_data->flow_mutex);
 
-		if ( direction == BMI_TO_CACHE ) 
+			/* TODO: error code */
+			return -1;
+		}
+
+		/* internal reuqest has been issued, but in processing */
+		if ( q_item->int_state == INT_REQ_PROCESSING ) 
 		{
-		    if ( mutex_flag )
-			gen_mutex_unlock(&flow_data->flow_mutex);
-		    cache_write_callback_fn(q_item, 0);
+			/* check the cache request */
+			ret = bmi_cache_check_cache_req(q_item); 
+
+			/* TODO: error handling */
+			if ( ret < 0 ) 
+			{
+				if ( mutex_flag ) 
+					gen_mutex_unlock(&flow_data->flow_mutex);
+				PVFS_perror_gossip("bmi_cache_progress_check error: check cache request error:error code", -1);
 
-		    if ( mutex_flag )
-			gen_mutex_lock(&flow_data->flow_mutex);
-		} else
+				return -1;
+			}
+	
+			if ( ret == 1 ) { 
+				q_item->int_state = INT_REQ_COMPLETE;
+			}
+		}
+
+		/* internal reuqest has been finished, buffers are available 
+		 * for BMI operations.
+		 */
+		if ( q_item->int_state == INT_REQ_COMPLETE )
 		{
-		    if ( mutex_flag )
-			gen_mutex_unlock(&flow_data->flow_mutex);
-		    cache_read_callback_fn(q_item, 0);
+			fprintf(stderr, "bmi_cache_progress_check: cache req done\n");
 
-		    if ( mutex_flag )
-			gen_mutex_lock(&flow_data->flow_mutex);
+			/* move the item from pint_req_list to cache_req_done_list */
+			qlist_del(&q_item->list_link);
+			qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
+
+			if ( direction == BMI_TO_CACHE ) 
+			{
+				if ( mutex_flag )
+					gen_mutex_unlock(&flow_data->flow_mutex);
+				cache_write_callback_fn(q_item, 0);
+
+				if ( mutex_flag )
+					gen_mutex_lock(&flow_data->flow_mutex);
+			} else
+			{
+				if ( mutex_flag )
+					gen_mutex_unlock(&flow_data->flow_mutex);
+				cache_read_callback_fn(q_item, 0);
+
+				if ( mutex_flag )
+					gen_mutex_lock(&flow_data->flow_mutex);
+			}
+			doneflag = 1;
 		}
-		doneflag = 1;
-	    }
 		
-	    if ( !q_item->callback_setup ) 
-	    {
-		q_item->bmi_callback.fn = bmi_recv_callback_fn;
-		q_item->bmi_callback.data = q_item;
+		if ( !q_item->callback_setup ) 
+		{
+			q_item->bmi_callback.fn = bmi_recv_callback_fn;
+			q_item->bmi_callback.data = q_item;
 #ifdef __CACHE_CALLBACK_SUPPORT__
-		q_item->cache_callback.fn = cache_write_callback_fn;
-		q_item->cache_callback.data = q_item;
+			q_item->cache_callback.fn = cache_write_callback_fn;
+			q_item->cache_callback.data = q_item;
 #else
-		q_item->cache_callback.fn = NULL;
-		q_item->cache_callback.data = NULL;
+			q_item->cache_callback.fn = NULL;
+			q_item->cache_callback.data = NULL;
 #endif
 
-		q_item->callback_setup = 1;
-	    }
+			q_item->callback_setup = 1;
+		}
 		
-	    if ( doneflag ) break;
+		if ( doneflag ) break;
 		
 	} /* qlist_for_each element request */
 	
-    if ( !doneflag && wait_flag == BLOCKING_FLAG )
-	goto check_again;
+	if ( !doneflag && wait_flag == BLOCKING_FLAG )
+		goto check_again;
 
-    if ( mutex_flag )
-	gen_mutex_unlock(&flow_data->flow_mutex);
+	if ( mutex_flag )
+		gen_mutex_unlock(&flow_data->flow_mutex);
 	
-    return 0;
+	return 0;
 }
 
 
@@ -684,71 +724,95 @@ int bmi_cache_progress_check(struct flow
  * no return value
  */
 static void bmi_recv_callback_fn(void *user_ptr,
-				 PVFS_size actual_size,
-				 PVFS_error error_code)
+		         PVFS_size actual_size,
+		         PVFS_error error_code)
 {
-    struct fp_queue_item* q_item = user_ptr;
-    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-    struct flow_descriptor * flow_d = flow_data->parent;
-    int ret;
+	struct fp_queue_item* q_item = user_ptr;
+	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+	struct flow_descriptor * flow_d = flow_data->parent;
 
-    /* TODO: error handling */
-    if (error_code != 0)
-    {
-	PVFS_perror_gossip("bmi_recv_callback_fn error_code", 
-			   error_code);
-	assert(0);
-    }
+	struct qlist_head *tmp_link = NULL;
 
-    gen_mutex_lock(&flow_data->flow_mutex);
+	int ret;
 
-    /* remove from current queue */
-    qlist_del(&q_item->list_link);
+	/* TODO: error handling */
+	if(error_code != 0)
+	{
+		PVFS_perror_gossip("bmi_recv_callback_fn error_code", 
+					error_code);
+		assert(0);
+	}
 
-    flow_d->total_transfered += actual_size;
+	gen_mutex_lock(&flow_data->flow_mutex);
 
+	/* remove from current queue */
+	qlist_del(&q_item->list_link);
 
-    /* release cache resource, since data has been received into 
-     * the cache buffers 
-     */
+	/* WRONG: Add to another list for resource reclaim */
 
-    ret = bmi_cache_release_cache_src(q_item);
+	flow_d->total_transfered += actual_size;
 
-    /* TODO: error handling */
-    if (ret < 0)
-    {
-	PVFS_perror_gossip("bmi_recv_callback_fn: "
-			   "error from  bmi_cache_release_cache_src:error_code", ret);
-	gen_mutex_unlock(&flow_data->flow_mutex);
-	return;
-    }
 
-    /* when no callback support from the cache, we take advantage of 
-     * the BMI callback to make progress. That is, in the BMI callback 
-     * function, we wait for the completion of another request from 
-     * the cache component, then initiate BMI recv function. All these 
-     * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
-     * Be careful of "mutex" stuff. 
-     */
+	/* release cache resource, since data has been received into 
+	 * the cache buffers 
+	 */
 
-    /* no callback support from the cache */
-    if (!q_item->cache_callback.fn)  
-    {
-	ret = bmi_cache_progress_check(flow_d, 
-				       BLOCKING_FLAG, 
-				       NO_MUTEX_FLAG,
-				       BMI_TO_CACHE);
-	if (ret < 0)
+	ret = bmi_cache_release_cache_src(q_item);
+
+	/* TODO: error handling */
+	if ( ret < 0 ) {
+		PVFS_perror_gossip("bmi_recv_callback_fn: "
+                                   "error from  bmi_cache_release_cache_src:error_code", ret);
+		gen_mutex_unlock(&flow_data->flow_mutex);
+		return;
+	}
+
+	/* when no callback support from the cache, we take advantage of 
+	 * the BMI callback to make progress. That is, in the BMI callback 
+	 * function, we wait for the completion of another request from 
+	 * the cache component, then initiate BMI recv function. All these 
+	 * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+	 * Be careful of "mutex" stuff. 
+	 */
+
+	/* no callback support from the cache */
+	if ( !q_item->cache_callback.fn )  
 	{
-	    PVFS_perror_gossip("bmi_recv_callback_fn: "
-			       "error from bmi_cache_progress_check:error_code", ret);
-	    gen_mutex_unlock(&flow_data->flow_mutex);
-	    return;
+		ret = bmi_cache_progress_check( flow_d, 
+						BLOCKING_FLAG, 
+						NO_MUTEX_FLAG,
+						BMI_TO_CACHE);
+		if ( ret < 0 )
+		{
+			PVFS_perror_gossip("bmi_recv_callback_fn: "
+                                           "error from bmi_cache_progress_check:error_code", ret);
+			gen_mutex_unlock(&flow_data->flow_mutex);
+			return;
+		}
+		/* the whole flow request is finished */
+		if ( ret == 1 ){
+			gen_mutex_unlock(&flow_data->flow_mutex);
+
+			/* free fp_queue_item */
+			qlist_for_each(tmp_link, &flow_data->cache_req_done_list) 
+			{
+				q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
+				qlist_del(&q_item->list_link);
+				free(q_item);
+				fprintf(stderr, "bmi_recv_callback_fn: free fp_queue_item\n");
+				
+			}
+			free(flow_data);
+			flow_d->state = FLOW_COMPLETE;
+			flow_d->release(flow_d);
+			flow_d->callback(flow_d);
+			fprintf(stderr, "bmi_recv_callback_fn: request is done\n");
+			return;
+		}
 	}
-    }
 
-    gen_mutex_unlock(&flow_data->flow_mutex);
-    return;
+	gen_mutex_unlock(&flow_data->flow_mutex);
+	return;
 }
 	
 /* cache_write_callback_fn()
@@ -761,66 +825,78 @@ static void bmi_recv_callback_fn(void *u
  * no return value
  */
 static void cache_write_callback_fn(void *user_ptr,
-				    PVFS_error error_code)
+		           PVFS_error error_code)
 {
-    PVFS_size tmp_actual_size;
+	PVFS_size tmp_actual_size;
+	PVFS_size total_size;
+	int i;
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
 #endif
 
-    struct fp_queue_item* q_item = user_ptr;
-    int ret;
-    PVFS_id_gen_t bmi_reqid;
+	struct fp_queue_item* q_item = user_ptr;
+	int ret;
+	PVFS_id_gen_t bmi_reqid;
 
-    /* TODO: error handling */
-    assert(error_code == 0);
+	/* TODO: error handling */
+	assert(error_code == 0);
 	
+	fprintf(stderr, "cache_write_callback_fn: enter\n");
 
-    /* if cache supports callback, this function will called
-     * independently. Otherwise, it is called inside of mutex.
-     * Thus, there is no need holding mutex.
-     */ 
+	/* if cache supports callback, this function will called
+	 * independently. Otherwise, it is called inside of mutex.
+	 * Thus, there is no need holding mutex.
+	 */ 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-    gen_mutex_lock(&flow_data->flow_mutex);
+	gen_mutex_lock(&flow_data->flow_mutex);
 #endif
 
-    q_item->int_state = INT_REQ_COMPLETE;
+	q_item->int_state = INT_REQ_COMPLETE;
 
-    /* TODO: what if we recv less than expected? */
-    ret = BMI_post_recv_list(&bmi_reqid,
-			     q_item->parent->src.u.bmi.address,
-			     (void **)q_item->cache_req.moff_list,
-			     q_item->cache_req.msize_list,
-			     q_item->cache_req.mem_cnt,
-			     q_item->cache_req.total_size,
-			     &tmp_actual_size,
-			     q_item->cache_req.buffer_type,
-			     q_item->parent->tag,
-			     &q_item->bmi_callback,
-			     global_bmi_context);
+	/* TODO: cache_req.total_size in the buffer code */
+	total_size = 0;
+	for ( i=0;  i<q_item->cache_req.mem_cnt; i++ ) {
+		total_size += q_item->cache_req.msize_list[i];
+		fprintf(stderr, "cache_write_callback_fn:recv buff [%d] len:%Ld\n", i, q_item->cache_req.msize_list[i]);
+		q_item->cache_req.total_size = total_size;
+	}
+	fprintf(stderr, "cache_write_callback_fn:to recv %Ld\n", q_item->cache_req.total_size);
 
-    /* TODO: error handling */
-    assert(ret >= 0);
+	/* TODO: what if we recv less than expected? */
+	ret = BMI_post_recv_list(&bmi_reqid,
+		q_item->parent->src.u.bmi.address,
+		(void **)q_item->cache_req.moff_list,
+		q_item->cache_req.msize_list,
+		q_item->cache_req.mem_cnt,
+		q_item->cache_req.total_size,
+        	&tmp_actual_size,
+		q_item->cache_req.buffer_type,
+        	q_item->parent->tag,
+        	&q_item->bmi_callback,
+        	global_bmi_context);
 
-    if (ret == 1)
-    {
+	/* TODO: error handling */
+	assert(ret >= 0);
+
+	if(ret == 1)
+	{
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_unlock(&flow_data->flow_mutex);
+		gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
-	/* immediate completion; trigger callback ourselves */
-	bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
+		/* immediate completion; trigger callback ourselves */
+		bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_lock(&flow_data->flow_mutex);
+		gen_mutex_lock(&flow_data->flow_mutex);
 #endif
-    }
+	}
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-    gen_mutex_unlock(&flow_data->flow_mutex);
+	gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-    return;
+	return;
 };
 
 
@@ -839,61 +915,61 @@ static void cache_write_callback_fn(void
  * no return value
  */
 static void cache_read_callback_fn(void *user_ptr,
-				   PVFS_error error_code)
+		           PVFS_error error_code)
 {
-    PVFS_size sent_size;
-    struct fp_queue_item* q_item = user_ptr;
+	PVFS_size sent_size;
+	struct fp_queue_item* q_item = user_ptr;
 
 #ifdef __CACHE_CALLBACK_SUPPORT
-    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
 #endif
 
-    PVFS_id_gen_t bmi_reqid;
-    int ret;
+	PVFS_id_gen_t bmi_reqid;
+	int ret;
 
-    /* TODO: error handling */
-    assert(error_code == 0);
+	/* TODO: error handling */
+	assert(error_code == 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-    gen_mutex_lock(&flow_data->flow_mutex);
+	gen_mutex_lock(&flow_data->flow_mutex);
 #endif
 
-    q_item->int_state = INT_REQ_COMPLETE;
+	q_item->int_state = INT_REQ_COMPLETE;
 
-    ret = BMI_post_send_list(&bmi_reqid,
-			     q_item->parent->dest.u.bmi.address,
-			     (const void **)q_item->cache_req.moff_list,
-			     q_item->cache_req.msize_list,
-			     q_item->cache_req.mem_cnt,
-			     q_item->cache_req.total_size,
-			     q_item->cache_req.buffer_type,
-			     q_item->parent->tag,
-			     &q_item->bmi_callback,
-			     global_bmi_context);
+	ret = BMI_post_send_list(&bmi_reqid,
+		q_item->parent->dest.u.bmi.address,
+		(const void **)q_item->cache_req.moff_list,
+		q_item->cache_req.msize_list,
+		q_item->cache_req.mem_cnt,
+		q_item->cache_req.total_size,
+		q_item->cache_req.buffer_type,
+		q_item->parent->tag,
+		&q_item->bmi_callback,
+		global_bmi_context);
 		
-    /* TODO: error handling */
-    assert(ret >= 0);
+	/* TODO: error handling */
+	assert(ret >= 0);
 
-    if (ret == 1)
-    {
+	if(ret == 1)
+	{
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_unlock(&flow_data->flow_mutex);
+		gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-	/* immediate completion; trigger callback ourselves */
-	sent_size = q_item->cache_req.total_size;
-	bmi_send_callback_fn(q_item, sent_size, 0);
+		/* immediate completion; trigger callback ourselves */
+		sent_size = q_item->cache_req.total_size;
+		bmi_send_callback_fn(q_item, sent_size, 0);
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-	gen_mutex_lock(&flow_data->flow_mutex);
+		gen_mutex_lock(&flow_data->flow_mutex);
 #endif
-    }
+	}
 
 #ifdef __CACHE_CALLBACK_SUPPORT__
-    gen_mutex_unlock(&flow_data->flow_mutex);
+	gen_mutex_unlock(&flow_data->flow_mutex);
 #endif
 
-    return;
+	return;
 };
 
 /* bmi_send_callback_fn()
@@ -907,179 +983,187 @@ static void cache_read_callback_fn(void 
  * no return value
  */
 static void bmi_send_callback_fn(void *user_ptr,
-				 PVFS_size actual_size,
-				 PVFS_error error_code)
+		         PVFS_size actual_size,
+		         PVFS_error error_code)
 {
-    struct fp_queue_item* q_item = user_ptr;
-    struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-    struct flow_descriptor * flow_d = flow_data->parent;
-    int ret;
+	struct fp_queue_item* q_item = user_ptr;
+	struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+	struct flow_descriptor * flow_d = flow_data->parent;
+	int ret;
 
-    /* TODO: error handling */
-    if (error_code != 0)
-    {
-	PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
-	assert(0);
-    }
+	/* TODO: error handling */
+	if(error_code != 0)
+	{
+		PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
+		assert(0);
+	}
 
-    gen_mutex_lock(&flow_data->flow_mutex);
+	gen_mutex_lock(&flow_data->flow_mutex);
 
 
-    flow_d->total_transfered += actual_size;
+	flow_d->total_transfered += actual_size;
 
-    /* release cache resource */
+	/* release cache resource */
 
-    ret = bmi_cache_release_cache_src(q_item);
+	ret = bmi_cache_release_cache_src(q_item);
 
-    /* TODO: error handling */
-    if (ret < 0)
-    {
-	PVFS_perror_gossip("bmi_send_callback_fn: "
-			   "from bmi_cache_release_cache_src:error_code", ret);
-	gen_mutex_unlock(&flow_data->flow_mutex);
-	return;
-    }
+	/* TODO: error handling */
+	if ( ret < 0 ) {
+		PVFS_perror_gossip("bmi_send_callback_fn: "
+                                   "from bmi_cache_release_cache_src:error_code", ret);
+		gen_mutex_unlock(&flow_data->flow_mutex);
+		return;
+	}
 
-    /* remove from current queue. q_item must be in the 
-     * cache_req_done_list when coming here.  */
+	/* remove from current queue. q_item must be in the 
+	 * cache_req_done_list when coming here.  */
 
-    qlist_del(&q_item->list_link);
+	qlist_del(&q_item->list_link);
 
-    /* when no callback support from the cache, we take advantage of 
-     * the BMI callback to make progress. That is, in the BMI callback 
-     * function, we wait for the completion of another request from 
-     * the cache component, then initiate BMI send function. All these 
-     * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
-     */
+	/* when no callback support from the cache, we take advantage of 
+	 * the BMI callback to make progress. That is, in the BMI callback 
+	 * function, we wait for the completion of another request from 
+	 * the cache component, then initiate BMI send function. All these 
+	 * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+	 */
 
-    /* no callback support from the cache. */
-    if (!q_item->cache_callback.fn)  
-    {
-	ret = bmi_cache_progress_check(flow_d, 
-				       BLOCKING_FLAG, 
-				       NO_MUTEX_FLAG,
-				       CACHE_TO_BMI);
-	if (ret < 0)
+	/* no callback support from the cache. */
+	if ( !q_item->cache_callback.fn )  
 	{
-	    PVFS_perror_gossip("bmi_send_callback_fn: "
-			       "from bmi_cache_progress_check:error_code", ret);
-	    gen_mutex_unlock(&flow_data->flow_mutex);
-	    return;
+		ret = bmi_cache_progress_check( flow_d, 
+						BLOCKING_FLAG, 
+						NO_MUTEX_FLAG,
+						CACHE_TO_BMI );
+		if ( ret < 0 )
+		{
+			PVFS_perror_gossip("bmi_send_callback_fn: "
+                                           "from bmi_cache_progress_check:error_code", ret);
+			gen_mutex_unlock(&flow_data->flow_mutex);
+			return;
+		}
 	}
-    }
 
-    gen_mutex_unlock(&flow_data->flow_mutex);
+	gen_mutex_unlock(&flow_data->flow_mutex);
 
-    free(q_item);
+	free(q_item);
 
-    return;
+	return;
 }
 
 static int bmi_cache_check_cache_req(struct fp_queue_item *qitem) 
 {
-    cache_reply_t reply;
-    int flag = 0;
-    int ret = -1;
+	cache_reply_t reply;
+	int flag = 0;
+	int ret = -1;
 		
-    ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
-    if (ret < 0) 
-    {
-	PVFS_perror_gossip("bmi_cache_check_cache_req: "
-			   "error_code", ret);
-	return ret;
-    }
+	fprintf(stderr, "bmi_cache_check_cache_req:enter (req:%d\n", qitem->cache_req.request.internal_id);
 
-    /* a request is finished. */
-    if (flag)
-    {
-	qitem->cache_req.mem_cnt = reply.count;
-	qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
-	qitem->cache_req.msize_list = reply.cbuf_size_array;
-	qitem->cache_req.errval = reply.errval;
+	ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
+	if ( ret < 0 ) 
+	{
+		PVFS_perror_gossip("bmi_cache_check_cache_req: "
+                        "error_code", ret);
+		return ret;
+	}
 
-	return 1;
-    }
-    return 0;
+	/* a request is finished. */
+	if ( flag )
+	{
+		qitem->cache_req.mem_cnt = reply.count;
+		qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
+		qitem->cache_req.msize_list = reply.cbuf_size_array;
+		qitem->cache_req.errval = reply.errval;
+
+		fprintf(stderr, "bmi_cache_check_cache_req:exit req done count %d\n", reply.count);
+		return 1;
+	}
+	fprintf(stderr, "bmi_cache_check_cache_req:exit req no done\n");
+
+	return 0;
 }
 
 static int bmi_cache_release_cache_src(struct fp_queue_item *qitem)
 {
-    int ret;
+	int ret;
 	
-    ret = cache_req_done( &(qitem->cache_req.request) );	
-    if (ret < 0) 
-    {
-	PVFS_perror_gossip("bmi_cache_release_cache_src: "
-			   "error_code", ret);
-	return ret;
-    }
+	ret = cache_req_done( &(qitem->cache_req.request) );	
+	if ( ret < 0 ) 
+	{
+		PVFS_perror_gossip("bmi_cache_release_cache_src: "
+                        "error_code", ret);
+		return ret;
+	}
 	
-    return 0;
+	return 0;
 }
 
-static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op)
+static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op )
 {
-    int ret = 0;
-    cache_read_desc_t desc1;
-    cache_write_desc_t desc2;
-    cache_reply_t reply;
+	int ret = 0;
+	cache_read_desc_t desc1;
+	cache_write_desc_t desc2;
+	cache_reply_t reply;
 
-    if (op == BMI_TO_CACHE)  /* write */
-    {
-	desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
-	desc2.handle = qitem->parent->dest.u.trove.handle;
-	desc2.context_id = global_trove_context;
-
-	/* TODO: if we use intemediate buffer, change here */
-	desc2.buffer = NULL;
-	desc2.len = 0;
-		
-	desc2.stream_array_count  = qitem->pint_req.result.segs; 
-	desc2.stream_offset_array = qitem->pint_req.result.offset_array;
-	desc2.stream_size_array = qitem->pint_req.result.size_array;
-
-	ret = cache_write_post(&desc2, 
-			       &qitem->cache_req.request, 
-			       &reply,
-			       NULL);
-	if (ret < 0)
+	if ( op == BMI_TO_CACHE )  /* write */
 	{
-	    PVFS_perror_gossip("bmi_cache_init_cache_req: "
-			       "error_code", ret);
+		desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
+		desc2.handle = qitem->parent->dest.u.trove.handle;
+		desc2.context_id = global_trove_context;
+
+		/* TODO: if we use intemediate buffer, change here */
+		desc2.buffer = NULL;
+		desc2.len = 0;
+		
+		desc2.stream_array_count  = qitem->pint_req.result.segs; 
+		desc2.stream_offset_array = qitem->pint_req.result.offset_array;
+		desc2.stream_size_array = qitem->pint_req.result.size_array;
+
+		ret = cache_write_post( &desc2, 
+					&qitem->cache_req.request, 
+					&reply,
+					NULL );
+		if ( ret < 0 ) 
+		{
+			PVFS_perror_gossip("bmi_cache_init_cache_req: "
+                                     "error_code", ret);
+		}
+
 	}
-    }
-    else /* read */
-    {
-	desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
-	desc1.handle = qitem->parent->dest.u.trove.handle;
-	desc1.context_id = global_trove_context;
-
-	/* TODO: if we use intemediate buffer, change here */
-	desc1.buffer = NULL;
-	desc1.len = 0;
+	else /* read */
+	{
+		desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
+		desc1.handle = qitem->parent->dest.u.trove.handle;
+		desc1.context_id = global_trove_context;
+
+		/* TODO: if we use intemediate buffer, change here */
+		desc1.buffer = NULL;
+		desc1.len = 0;
 		
-	desc1.stream_array_count  = qitem->pint_req.result.segs; 
-	desc1.stream_offset_array = qitem->pint_req.result.offset_array;
-	desc1.stream_size_array = qitem->pint_req.result.size_array;
-
-	ret = cache_read_post(&desc1, 
-			      &qitem->cache_req.request, 
-			      &reply,
-			      NULL );
-	if (ret < 0) 
+		desc1.stream_array_count  = qitem->pint_req.result.segs; 
+		desc1.stream_offset_array = qitem->pint_req.result.offset_array;
+		desc1.stream_size_array = qitem->pint_req.result.size_array;
+
+		ret = cache_read_post( &desc1, 
+					&qitem->cache_req.request, 
+					&reply,
+					NULL );
+		if ( ret < 0 ) 
+		{
+			PVFS_perror_gossip("bmi_cache_init_cache_req: "
+                                     "error_code", ret);
+		}
+	}
+
+	/* immediate completion */
+	if ( ret == 1 ) 
 	{
-	    PVFS_perror_gossip("bmi_cache_init_cache_req: "
-			       "error_code", ret);
+		qitem->cache_req.mem_cnt = reply.count;
+		qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
+		qitem->cache_req.msize_list = reply.cbuf_size_array;
+		qitem->cache_req.errval = reply.errval;
+
+		fprintf(stderr, "bmi_cache_init_cache_req: immediate completion mcnt:%d\n", reply.count);
 	}
-    }
-    return ret;
-}
 
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- * End:
- *
- * vim: ts=8 sts=4 sw=4 noexpandtab
- */
+	return ret;
+}



More information about the PVFS2-CVS mailing list