[PVFS2-CVS]
commit by rbross in pvfs2/src/io/flow/flowproto-bmi-cache:
flowproto-bmi-cache-server.c
CVS commit program
cvs at parl.clemson.edu
Mon Feb 16 16:22:42 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-cache
In directory parlweb:/tmp/cvs-serv32348/src/io/flow/flowproto-bmi-cache
Modified Files:
flowproto-bmi-cache-server.c
Log Message:
combo PVFS_id_gen_t -> PVFS_BMI_addr_t, formatting, PVFS error code patch.
Ugly. Sorry if my formatting pisses someone off; at least I'm not using >
80 columns any more :).
Index: flowproto-bmi-cache-server.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-cache/flowproto-bmi-cache-server.c,v
diff -p -u -r1.4 -r1.5
--- flowproto-bmi-cache-server.c 3 Nov 2003 14:11:48 -0000 1.4
+++ flowproto-bmi-cache-server.c 16 Feb 2004 21:22:42 -0000 1.5
@@ -59,23 +59,23 @@ struct pint_req_entry
struct cache_req_entry
{
- cache_request_t request; /* cache request handle */
- int errval; /* error code */
- int mem_cnt; /* how many buffers */
+ cache_request_t request; /* cache request handle */
+ int errval; /* error code */
+ int mem_cnt; /* how many buffers */
- /* buffer size array, array space provided by the cache */
- PVFS_size *msize_list;
+ /* buffer size array, array space provided by the cache */
+ PVFS_size *msize_list;
- /* "total_size" is the sum of the size list */
- PVFS_size total_size;
+ /* "total_size" is the sum of the size list */
+ PVFS_size total_size;
- /* buffer offset array, provided by the cache */
- PVFS_offset **moff_list;
+ /* buffer offset array, provided by the cache */
+ PVFS_offset **moff_list;
- /* if this is not NULL, this buffer is supplied by the flow */
- PVFS_offset *buffer;
+ /* if this is not NULL, this buffer is supplied by the flow */
+ PVFS_offset *buffer;
- enum bmi_buffer_type buffer_type;
+ enum bmi_buffer_type buffer_type;
};
@@ -83,22 +83,22 @@ struct cache_req_entry
* A request --> a flow --> a list of fp_queue_items */
struct fp_queue_item
{
- /* point to the flow descriptor */
- flow_descriptor* parent;
+ /* point to the flow descriptor */
+ flow_descriptor* parent;
- /* PINT request information */
- struct pint_req_entry pint_req;
+ /* PINT request information */
+ struct pint_req_entry pint_req;
- /* cache request information */
- struct cache_req_entry cache_req;
- int int_state;
-
- /* flag to show whether the callbacks are set up */
- int callback_setup;
- struct PINT_thread_mgr_bmi_callback bmi_callback;
- struct PINT_thread_mgr_trove_callback cache_callback;
+ /* cache request information */
+ struct cache_req_entry cache_req;
+ int int_state;
+
+ /* flag to show whether the callbacks are set up */
+ int callback_setup;
+ struct PINT_thread_mgr_bmi_callback bmi_callback;
+ struct PINT_thread_mgr_trove_callback cache_callback;
- struct qlist_head list_link;
+ struct qlist_head list_link;
};
/* fp_private_data is information specific to this flow protocol, stored
@@ -106,22 +106,22 @@ struct fp_queue_item
*/
struct fp_private_data
{
- /* point to the flow descriptor */
- flow_descriptor* parent;
+ /* point to the flow descriptor */
+ flow_descriptor* parent;
- /* PINT request done? */
- int pint_request_done;
+ /* PINT request done? */
+ int pint_request_done;
- /* PINT request list */
- struct qlist_head pint_req_list;
+ /* PINT request list */
+ struct qlist_head pint_req_list;
- /* requests completed on the cache side */
- struct qlist_head cache_req_done_list;
+ /* requests completed on the cache side */
+ struct qlist_head cache_req_done_list;
- PVFS_size total_bytes_processed;
- TROVE_context_id trove_context;
+ PVFS_size total_bytes_processed;
+ TROVE_context_id trove_context;
- gen_mutex_t flow_mutex;
+ gen_mutex_t flow_mutex;
};
#define PRIVATE_FLOW(target_flow)\
@@ -134,12 +134,12 @@ static TROVE_context_id global_trove_con
static void bmi_recv_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
+ PVFS_size actual_size,
+ PVFS_error error_code);
static void bmi_send_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
+ PVFS_size actual_size,
+ PVFS_error error_code);
#if 0
/* the above function is a special case; we need to look at a return
@@ -147,8 +147,8 @@ static void bmi_send_callback_fn(void *u
* to trigger it from a callback
*/
static void bmi_send_callback_wrapper(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
bmi_send_callback_fn(user_ptr, actual_size, error_code);
return;
@@ -156,19 +156,19 @@ static void bmi_send_callback_wrapper(vo
#endif
static void cache_read_callback_fn(void *user_ptr,
- PVFS_error error_code);
+ PVFS_error error_code);
static void cache_write_callback_fn(void *user_ptr,
- PVFS_error error_code);
+ PVFS_error error_code);
/* protocol specific functions */
static int bmi_cache_request_init(struct fp_private_data *flow_data,
- int direction);
+ int direction);
static int bmi_cache_progress_check(struct flow_descriptor *flow_d,
- int wait_flag,
- int mutex_flag,
- int direction);
+ int wait_flag,
+ int mutex_flag,
+ int direction);
static int bmi_cache_check_cache_req(struct fp_queue_item *q_item);
@@ -182,12 +182,12 @@ static int fp_bmi_cache_initialize(int f
static int fp_bmi_cache_finalize(void);
static int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
+ int option,
+ void *parameter);
static int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
+ int option,
+ void *parameter);
static int fp_bmi_cache_post(flow_descriptor * flow_d);
@@ -230,7 +230,7 @@ int fp_bmi_cache_initialize(int flowprot
fp_bmi_cache_id = flowproto_id;
- return(0);
+ return 0;
}
/* fp_bmi_cache_finalize()
@@ -245,7 +245,7 @@ int fp_bmi_cache_finalize(void)
#ifdef __PVFS2_TROVE_SUPPORT__
PINT_thread_mgr_trove_stop();
#endif
- return (0);
+ return 0;
}
/* fp_bmi_cache_getinfo()
@@ -255,21 +255,21 @@ int fp_bmi_cache_finalize(void)
* returns 0 on success, -PVFS_error on failure
*/
int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
+ int option,
+ void *parameter)
{
int* type;
- switch(option)
+ switch (option)
{
case FLOWPROTO_TYPE_QUERY:
type = parameter;
if(*type == FLOWPROTO_MULTIQUEUE)
return(0);
else
- return(-PVFS_ENOPROTOOPT);
+ return -PVFS_ENOPROTOOPT;
default:
- return(-PVFS_ENOSYS);
+ return -PVFS_ENOSYS;
break;
}
}
@@ -281,10 +281,10 @@ int fp_bmi_cache_getinfo(flow_descriptor
* returns 0 on success, -PVFS_error on failure
*/
int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
+ int option,
+ void *parameter)
{
- return (-PVFS_ENOSYS);
+ return -PVFS_ENOSYS;
}
/* fp_bmi_cache_post()
@@ -295,143 +295,149 @@ int fp_bmi_cache_setinfo(flow_descriptor
*/
int fp_bmi_cache_post(flow_descriptor * flow_d)
{
- struct fp_private_data* flow_data = NULL;
- int ret;
+ struct fp_private_data* flow_data = NULL;
+ int ret;
- /* on the server side: only two possible types */
- assert( (flow_d->src.endpoint_id == BMI_ENDPOINT &&
- flow_d->dest.endpoint_id == CACHE_ENDPOINT) ||
- (flow_d->src.endpoint_id == CACHE_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT) );
-
- flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
- if(!flow_data)
- return(-PVFS_ENOMEM);
- memset(flow_data, 0, sizeof(struct fp_private_data));
-
- flow_d->flow_protocol_data = flow_data;
- flow_d->state = FLOW_TRANSMITTING;
-
- /* protocol specific fields */
- flow_data->parent = flow_d;
- INIT_QLIST_HEAD(&flow_data->pint_req_list);
- flow_data->pint_request_done = 0;
- INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
+ /* on the server side: only two possible types */
+ assert( (flow_d->src.endpoint_id == BMI_ENDPOINT &&
+ flow_d->dest.endpoint_id == CACHE_ENDPOINT) ||
+ (flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+ flow_d->dest.endpoint_id == BMI_ENDPOINT) );
- flow_data->trove_context = global_trove_context;
- gen_mutex_init(&flow_data->flow_mutex);
+ flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
+ if(flow_data == NULL)
+ {
+ return -PVFS_ENOMEM;
+ }
+ memset(flow_data, 0, sizeof(struct fp_private_data));
- /* if a file datatype offset was specified, go ahead and skip ahead
- * before doing anything else
- */
- if(flow_d->file_req_offset)
- PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state,
- flow_d->file_req_offset);
+ flow_d->flow_protocol_data = flow_data;
+ flow_d->state = FLOW_TRANSMITTING;
- /* set boundaries on file datatype */
- if(flow_d->aggregate_size > -1)
- {
- PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
- flow_d->aggregate_size+flow_d->file_req_offset);
- }
- else
+ /* protocol specific fields */
+ flow_data->parent = flow_d;
+ INIT_QLIST_HEAD(&flow_data->pint_req_list);
+ flow_data->pint_request_done = 0;
+ INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
+
+ flow_data->trove_context = global_trove_context;
+ gen_mutex_init(&flow_data->flow_mutex);
+
+ /* if a file datatype offset was specified, go ahead and skip ahead
+ * before doing anything else
+ */
+ if (flow_d->file_req_offset)
+ {
+ PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state,
+ flow_d->file_req_offset);
+ }
+
+ /* set boundaries on file datatype */
+ if(flow_d->aggregate_size > -1)
+ {
+ PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
+ flow_d->aggregate_size +
+ flow_d->file_req_offset);
+ }
+ else
+ {
+ PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
+ flow_d->file_req_offset +
+ PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
+ }
+
+ /* (1) init requests; (2) check progress; later all progress checks
+ * are driven by callbacks.
+ */
+
+ if(flow_d->src.endpoint_id == CACHE_ENDPOINT &&
+ flow_d->dest.endpoint_id == BMI_ENDPOINT)
+ {
+ /* CACHE --> BMI flow: read from cache, then send to the
+ * network. When the cache supports callback, the cache
+ * callback triggers BMI operations; the BMI callback triggers
+ * the release of cache resources.
+ * When the cache does not support callback, as in the current
+ * implementation, we use the following progress method:
+ * (1) wait for the completion of the first cache request;
+ * (2) initiate the BMI send operation;
+ * (3) in the BMI send callback function, we wait for the
+ * completion of the next cache request;
+ * (4) goto step (2).
+ */
+
+ ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
+ if (ret < 0)
{
- PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
- flow_d->file_req_offset +
- PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
+ PVFS_perror_gossip("bmi_cache_request_init error: "
+ "error_code", ret);
+ return ret;
}
- /* (1) init requests; (2) check progress; later all progress checks
- * are driven by callbacks.
+ /* check progress. MUTEX_FLAG is needed because possible
+ * callbacks may happen at the same time.
*/
-
- if( flow_d->src.endpoint_id == CACHE_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT )
+ ret = bmi_cache_progress_check( flow_d,
+ BLOCKING_FLAG,
+ MUTEX_FLAG,
+ CACHE_TO_BMI );
+ if (ret < 0)
{
- /* CACHE --> BMI flow: read from cache, then send to the
- * network. When the cache supports callback, the cache
- * callback triggers BMI operations; the BMI callback triggers
- * the release of cache resources.
- * When the cache does not support callback, as in the current
- * implementation, we use the following progress method:
- * (1) wait for the completion of the first cache request;
- * (2) initiate the BMI send operation;
- * (3) in the BMI send callback function, we wait for the
- * completion of the next cache request;
- * (4) goto step (2).
- */
-
- ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
- if ( ret < 0 ) {
- PVFS_perror_gossip("bmi_cache_request_init error: "
- "error_code", ret);
- return ret;
- }
+ PVFS_perror_gossip("bmi_cache_progress_check error: "
+ "error_code", ret);
+ return ret;
+ }
- /* check progress. MUTEX_FLAG is needed because possible
- * callbacks may happen at the same time.
- */
- ret = bmi_cache_progress_check( flow_d,
- BLOCKING_FLAG,
- MUTEX_FLAG,
- CACHE_TO_BMI );
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_progress_check error: "
- "error_code", ret);
- return ret;
- }
+ }
+ else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
+ flow_d->dest.endpoint_id == CACHE_ENDPOINT)
+ {
+ /* BMI--->CACHE flow: (1) init requests; (2) check progress;
+ * later all progress checks are driven by callbacks;
+ * cache callbacks to indicate that data buffers are
+ * available;
+ * then trigger BMI receive operations to receive data
+ * into the cache buffers;
+ * the BMI call back function will trigger releasing
+ * cache buffers.
+ * In the current implementation, cache does not provide
+ * callback (we will add it later), the progress of cache
+ * is checked in the BMI call back function. That means, in
+ * the BMI call back function, we release cache buffers for
+ * the previous queue item; then we see if the cache buffers
+ * are available for another queue item (polling with time
+ * idle), if yes, initiate BMI receive operations.
+ */
+ ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
+ if ( ret < 0 ) {
+ PVFS_perror_gossip("bmi_cache_request_init error: "
+ "error_code", ret);
+ return ret;
}
- else if( flow_d->src.endpoint_id == BMI_ENDPOINT &&
- flow_d->dest.endpoint_id == CACHE_ENDPOINT )
- {
- /* BMI--->CACHE flow: (1) init requests; (2) check progress;
- * later all progress checks are driven by callbacks;
- * cache callbacks to indicate that data buffers are
- * available;
- * then trigger BMI receive operations to receive data
- * into the cache buffers;
- * the BMI call back function will trigger releasing
- * cache buffers.
- * In the current implementation, cache does not provide
- * callback (we will add it later), the progress of cache
- * is checked in the BMI call back function. That means, in
- * the BMI call back function, we release cache buffers for
- * the previous queue item; then we see if the cache buffers
- * are available for another queue item (polling with time
- * idle), if yes, initiate BMI receive operations.
- */
-
- ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
- if ( ret < 0 ) {
- PVFS_perror_gossip("bmi_cache_request_init error: "
- "error_code", ret);
- return ret;
- }
-
- /* check progress. MUTEX_FLAG is needed because possible
- * callbacks may happen at the same time.
- */
- ret = bmi_cache_progress_check(flow_d,
- BLOCKING_FLAG,
- MUTEX_FLAG,
- BMI_TO_CACHE);
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_progress_check error: "
- "error_code", ret);
- return ret;
- }
+ /* check progress. MUTEX_FLAG is needed because possible
+ * callbacks may happen at the same time.
+ */
- }
- else
+ ret = bmi_cache_progress_check(flow_d,
+ BLOCKING_FLAG,
+ MUTEX_FLAG,
+ BMI_TO_CACHE);
+ if ( ret < 0 )
{
- return(-ENOSYS);
+ PVFS_perror_gossip("bmi_cache_progress_check error: "
+ "error_code", ret);
+ return ret;
}
- return (0);
+ }
+ else
+ {
+ return -PVFS_ENOSYS;
+ }
+
+ return (0);
}
@@ -444,91 +450,92 @@ int fp_bmi_cache_post(flow_descriptor *
* return: 0: success; <0 error; 1: complete;
*/
-int bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
+int bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
{
- struct flow_descriptor *flow_d = flow_data->parent;
- struct fp_queue_item *new_qitem = NULL;
- struct pint_req_entry* pint_req = NULL;
- PVFS_size bytes_processed = 0;
-
- int ret;
-
- /* zero request. This is wrong, TO DO it later. */
- if ( flow_d->total_transfered >= flow_data->total_bytes_processed ) {
- flow_d->state = FLOW_COMPLETE;
- free(flow_data);
- flow_d->release(flow_d);
- flow_d->callback(flow_d);
- return(1);
- }
+ struct flow_descriptor *flow_d = flow_data->parent;
+ struct fp_queue_item *new_qitem = NULL;
+ struct pint_req_entry* pint_req = NULL;
+ PVFS_size bytes_processed = 0;
- /* get request information before launching cache request.
- * Basically, we need to know what the request is, including
- * file regions, each region offset, and each region length.
- * A pipelining idea is included in the following steps:
- * a large flow request ---> several requests with "MAX_REGIONS"
- * and "BUFFER_SIZE" ---> pint_req_list
- * Later, we process each element in the pint_req_list:
- * 1) dequeue an element from the list;
- * 2) init cache request;
- * 3) drive BMI operations;
- * 4) enqueue the element into another list;
- * If all elements have been through the above steps,
- * the request is done.
- */
+ int ret;
- assert ( flow_data->pint_request_done == 0 );
+ /* zero request. This is wrong, TO DO it later. */
+ if (flow_d->total_transfered >= flow_data->total_bytes_processed)
+ {
+ flow_d->state = FLOW_COMPLETE;
+ free(flow_data);
+ flow_d->release(flow_d);
+ flow_d->callback(flow_d);
+ return 1;
+ }
- /* get PINT_requests and initiate related cache requests */
- do {
- new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
- assert(new_qitem);
- memset(new_qitem, 0 , sizeof(struct fp_queue_item));
+ /* get request information before launching cache request.
+ * Basically, we need to know what the request is, including
+ * file regions, each region offset, and each region length.
+ * A pipelining idea is included in the following steps:
+ * a large flow request ---> several requests with "MAX_REGIONS"
+ * and "BUFFER_SIZE" ---> pint_req_list
+ * Later, we process each element in the pint_req_list:
+ * 1) dequeue an element from the list;
+ * 2) init cache request;
+ * 3) drive BMI operations;
+ * 4) enqueue the element into another list;
+ * If all elements have been through the above steps,
+ * the request is done.
+ */
+
+ assert ( flow_data->pint_request_done == 0 );
+
+ /* get PINT_requests and initiate related cache requests */
+ do {
+ new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
+ assert(new_qitem);
+ memset(new_qitem, 0 , sizeof(struct fp_queue_item));
- new_qitem->parent = flow_d;
+ new_qitem->parent = flow_d;
- /* process request */
- pint_req = & new_qitem->pint_req;
- pint_req->result.offset_array = pint_req->offset_list;
- pint_req->result.size_array = pint_req->size_list;
- pint_req->result.bytemax = BUFFER_SIZE;
- pint_req->result.bytes = 0;
- pint_req->result.segmax = MAX_REGIONS;
- pint_req->result.segs = 0;
-
- ret = PINT_Process_request( flow_d->file_req_state,
- flow_d->mem_req_state,
- &flow_d->file_data,
- &pint_req->result,
- PINT_SERVER );
- /* TODO: error handling */
- assert(ret >= 0);
+ /* process request */
+ pint_req = & new_qitem->pint_req;
+ pint_req->result.offset_array = pint_req->offset_list;
+ pint_req->result.size_array = pint_req->size_list;
+ pint_req->result.bytemax = BUFFER_SIZE;
+ pint_req->result.bytes = 0;
+ pint_req->result.segmax = MAX_REGIONS;
+ pint_req->result.segs = 0;
+
+ ret = PINT_Process_request(flow_d->file_req_state,
+ flow_d->mem_req_state,
+ &flow_d->file_data,
+ &pint_req->result,
+ PINT_SERVER);
+ /* TODO: error handling */
+ assert(ret >= 0);
- /* submit the cache request */
- ret = bmi_cache_init_cache_req(new_qitem, direction);
+ /* submit the cache request */
+ ret = bmi_cache_init_cache_req(new_qitem, direction);
- /* TODO: error handling */
- assert(ret >= 0);
+ /* TODO: error handling */
+ assert(ret >= 0);
- new_qitem->int_state = INT_REQ_PROCESSING;
+ new_qitem->int_state = INT_REQ_PROCESSING;
- /* immediate completion on the cache request */
- if ( ret == 1 )
- {
- new_qitem->int_state = INT_REQ_COMPLETE;
- }
+ /* immediate completion on the cache request */
+ if (ret == 1)
+ {
+ new_qitem->int_state = INT_REQ_COMPLETE;
+ }
- /* put the request into the chain */
- qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
- bytes_processed += pint_req->result.bytes;
+ /* put the request into the chain */
+ qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
+ bytes_processed += pint_req->result.bytes;
- } while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
+ } while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
- flow_data->pint_request_done = 1;
+ flow_data->pint_request_done = 1;
- flow_data->total_bytes_processed = bytes_processed;
+ flow_data->total_bytes_processed = bytes_processed;
- return 0;
+ return 0;
} /* end of bmi_cache_request_init() */
@@ -541,126 +548,129 @@ int bmi_cache_request_init(struct fp_pr
* (2) mutex_flag is off
* -- this is called in other callbacks which hold mutex.
* wait_flag:
- (1) non-blocking; (2) blocking
+ (1) non-blocking; (2) blocking
* direction:
* (1) BMI_TO_CACHE; (2) CACHE_TO_BMI
*/
int bmi_cache_progress_check(struct flow_descriptor *flow_d,
- int wait_flag,
- int mutex_flag,
- int direction)
+ int wait_flag,
+ int mutex_flag,
+ int direction)
{
- struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
- struct fp_queue_item *q_item = NULL;
- struct qlist_head *tmp_link = NULL;
- int doneflag = 0;
- int ret;
+ struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
+ struct fp_queue_item *q_item = NULL;
+ struct qlist_head *tmp_link = NULL;
+ int doneflag = 0;
+ int ret;
- assert ( flow_data->pint_request_done == 1 );
+ assert ( flow_data->pint_request_done == 1 );
- if ( qlist_empty(&flow_data->pint_req_list) )
- return 1;
+ if ( qlist_empty(&flow_data->pint_req_list) )
+ {
+ return 1;
+ }
- doneflag = 0;
+ doneflag = 0;
- if ( mutex_flag ) {
- gen_mutex_lock(&flow_data->flow_mutex);
- }
+ if (mutex_flag)
+ {
+ gen_mutex_lock(&flow_data->flow_mutex);
+ }
-check_again:
+ check_again:
- qlist_for_each(tmp_link, &flow_data->pint_req_list)
+ qlist_for_each(tmp_link, &flow_data->pint_req_list)
{
- q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
+ q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
- if ( q_item->int_state == INT_REQ_INIT )
- {
- /* wrong state */
- PVFS_perror_gossip("bmi_cache_progress_check error: "
- "wrong internal state:error code", -1);
-
- if ( mutex_flag )
- gen_mutex_unlock(&flow_data->flow_mutex);
+ if ( q_item->int_state == INT_REQ_INIT )
+ {
+ /* wrong state */
+ PVFS_perror_gossip("bmi_cache_progress_check error: "
+ "wrong internal state:error code", -1);
+
+ if ( mutex_flag )
+ gen_mutex_unlock(&flow_data->flow_mutex);
+
+ /* TODO: error code */
+ return -1;
+ }
+
+ /* internal reuqest has been issued, but in processing */
+ if ( q_item->int_state == INT_REQ_PROCESSING )
+ {
+ /* check the cache request */
+ ret = bmi_cache_check_cache_req(q_item);
- /* TODO: error code */
- return -1;
- }
-
- /* internal reuqest has been issued, but in processing */
- if ( q_item->int_state == INT_REQ_PROCESSING )
+ /* TODO: error handling */
+ if ( ret < 0 )
{
- /* check the cache request */
- ret = bmi_cache_check_cache_req(q_item);
-
- /* TODO: error handling */
- if ( ret < 0 )
- {
- if ( mutex_flag )
- gen_mutex_unlock(&flow_data->flow_mutex);
- return -1;
- }
+ if ( mutex_flag )
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return -1;
+ }
- if ( ret == 1 ) {
- q_item->int_state = INT_REQ_COMPLETE;
- }
+ if ( ret == 1 ) {
+ q_item->int_state = INT_REQ_COMPLETE;
}
+ }
+
+ /* internal reuqest has been finished, buffers are available
+ * for BMI operations.
+ */
+ if ( q_item->int_state == INT_REQ_COMPLETE )
+ {
+ /* move the item from pint_req_list to cache_req_done_list */
+ qlist_del(&q_item->list_link);
+ qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
+
+ if ( direction == BMI_TO_CACHE )
+ {
+ if ( mutex_flag )
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ cache_write_callback_fn(q_item, 0);
- /* internal reuqest has been finished, buffers are available
- * for BMI operations.
- */
- if ( q_item->int_state == INT_REQ_COMPLETE )
+ if ( mutex_flag )
+ gen_mutex_lock(&flow_data->flow_mutex);
+ } else
{
- /* move the item from pint_req_list to cache_req_done_list */
- qlist_del(&q_item->list_link);
- qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
-
- if ( direction == BMI_TO_CACHE )
- {
- if ( mutex_flag )
- gen_mutex_unlock(&flow_data->flow_mutex);
- cache_write_callback_fn(q_item, 0);
-
- if ( mutex_flag )
- gen_mutex_lock(&flow_data->flow_mutex);
- } else
- {
- if ( mutex_flag )
- gen_mutex_unlock(&flow_data->flow_mutex);
- cache_read_callback_fn(q_item, 0);
-
- if ( mutex_flag )
- gen_mutex_lock(&flow_data->flow_mutex);
- }
- doneflag = 1;
+ if ( mutex_flag )
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ cache_read_callback_fn(q_item, 0);
+
+ if ( mutex_flag )
+ gen_mutex_lock(&flow_data->flow_mutex);
}
+ doneflag = 1;
+ }
- if ( !q_item->callback_setup )
- {
- q_item->bmi_callback.fn = bmi_recv_callback_fn;
- q_item->bmi_callback.data = q_item;
+ if ( !q_item->callback_setup )
+ {
+ q_item->bmi_callback.fn = bmi_recv_callback_fn;
+ q_item->bmi_callback.data = q_item;
#ifdef __CACHE_CALLBACK_SUPPORT__
- q_item->cache_callback.fn = cache_write_callback_fn;
- q_item->cache_callback.data = q_item;
+ q_item->cache_callback.fn = cache_write_callback_fn;
+ q_item->cache_callback.data = q_item;
#else
- q_item->cache_callback.fn = NULL;
- q_item->cache_callback.data = NULL;
+ q_item->cache_callback.fn = NULL;
+ q_item->cache_callback.data = NULL;
#endif
- q_item->callback_setup = 1;
- }
+ q_item->callback_setup = 1;
+ }
- if ( doneflag ) break;
+ if ( doneflag ) break;
} /* qlist_for_each element request */
- if ( !doneflag && wait_flag == BLOCKING_FLAG )
- goto check_again;
+ if ( !doneflag && wait_flag == BLOCKING_FLAG )
+ goto check_again;
- if ( mutex_flag )
- gen_mutex_unlock(&flow_data->flow_mutex);
+ if ( mutex_flag )
+ gen_mutex_unlock(&flow_data->flow_mutex);
- return 0;
+ return 0;
}
@@ -675,70 +685,71 @@ check_again:
* no return value
*/
static void bmi_recv_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
- struct fp_queue_item* q_item = user_ptr;
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- struct flow_descriptor * flow_d = flow_data->parent;
- int ret;
+ struct fp_queue_item* q_item = user_ptr;
+ struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+ struct flow_descriptor * flow_d = flow_data->parent;
+ int ret;
- /* TODO: error handling */
- if(error_code != 0)
- {
- PVFS_perror_gossip("bmi_recv_callback_fn error_code",
- error_code);
- assert(0);
- }
+ /* TODO: error handling */
+ if (error_code != 0)
+ {
+ PVFS_perror_gossip("bmi_recv_callback_fn error_code",
+ error_code);
+ assert(0);
+ }
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
- /* remove from current queue */
- qlist_del(&q_item->list_link);
+ /* remove from current queue */
+ qlist_del(&q_item->list_link);
- flow_d->total_transfered += actual_size;
+ flow_d->total_transfered += actual_size;
- /* release cache resource, since data has been received into
- * the cache buffers
- */
+ /* release cache resource, since data has been received into
+ * the cache buffers
+ */
- ret = bmi_cache_release_cache_src(q_item);
+ ret = bmi_cache_release_cache_src(q_item);
- /* TODO: error handling */
- if ( ret < 0 ) {
- PVFS_perror_gossip("bmi_recv_callback_fn: "
- "error from bmi_cache_release_cache_src:error_code", ret);
- gen_mutex_unlock(&flow_data->flow_mutex);
- return;
- }
+ /* TODO: error handling */
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("bmi_recv_callback_fn: "
+ "error from bmi_cache_release_cache_src:error_code", ret);
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return;
+ }
- /* when no callback support from the cache, we take advantage of
- * the BMI callback to make progress. That is, in the BMI callback
- * function, we wait for the completion of another request from
- * the cache component, then initiate BMI recv function. All these
- * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
- * Be careful of "mutex" stuff.
- */
+ /* when no callback support from the cache, we take advantage of
+ * the BMI callback to make progress. That is, in the BMI callback
+ * function, we wait for the completion of another request from
+ * the cache component, then initiate BMI recv function. All these
+ * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+ * Be careful of "mutex" stuff.
+ */
- /* no callback support from the cache */
- if ( !q_item->cache_callback.fn )
+ /* no callback support from the cache */
+ if (!q_item->cache_callback.fn)
+ {
+ ret = bmi_cache_progress_check(flow_d,
+ BLOCKING_FLAG,
+ NO_MUTEX_FLAG,
+ BMI_TO_CACHE);
+ if (ret < 0)
{
- ret = bmi_cache_progress_check( flow_d,
- BLOCKING_FLAG,
- NO_MUTEX_FLAG,
- BMI_TO_CACHE);
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_recv_callback_fn: "
- "error from bmi_cache_progress_check:error_code", ret);
- gen_mutex_unlock(&flow_data->flow_mutex);
- return;
- }
+ PVFS_perror_gossip("bmi_recv_callback_fn: "
+ "error from bmi_cache_progress_check:error_code", ret);
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return;
}
+ }
- gen_mutex_unlock(&flow_data->flow_mutex);
- return;
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return;
}
/* cache_write_callback_fn()
@@ -751,66 +762,66 @@ static void bmi_recv_callback_fn(void *u
* no return value
*/
static void cache_write_callback_fn(void *user_ptr,
- PVFS_error error_code)
+ PVFS_error error_code)
{
- PVFS_size tmp_actual_size;
+ PVFS_size tmp_actual_size;
#ifdef __CACHE_CALLBACK_SUPPORT__
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+ struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
#endif
- struct fp_queue_item* q_item = user_ptr;
- int ret;
- PVFS_id_gen_t bmi_reqid;
+ struct fp_queue_item* q_item = user_ptr;
+ int ret;
+ PVFS_id_gen_t bmi_reqid;
- /* TODO: error handling */
- assert(error_code == 0);
+ /* TODO: error handling */
+ assert(error_code == 0);
- /* if cache supports callback, this function will called
- * independently. Otherwise, it is called inside of mutex.
- * Thus, there is no need holding mutex.
- */
+ /* if cache supports callback, this function will called
+ * independently. Otherwise, it is called inside of mutex.
+ * Thus, there is no need holding mutex.
+ */
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
#endif
- q_item->int_state = INT_REQ_COMPLETE;
+ q_item->int_state = INT_REQ_COMPLETE;
- /* TODO: what if we recv less than expected? */
- ret = BMI_post_recv_list(&bmi_reqid,
- q_item->parent->src.u.bmi.address,
- (void **)q_item->cache_req.moff_list,
- q_item->cache_req.msize_list,
- q_item->cache_req.mem_cnt,
- q_item->cache_req.total_size,
- &tmp_actual_size,
- q_item->cache_req.buffer_type,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
+ /* TODO: what if we recv less than expected? */
+ ret = BMI_post_recv_list(&bmi_reqid,
+ q_item->parent->src.u.bmi.address,
+ (void **)q_item->cache_req.moff_list,
+ q_item->cache_req.msize_list,
+ q_item->cache_req.mem_cnt,
+ q_item->cache_req.total_size,
+ &tmp_actual_size,
+ q_item->cache_req.buffer_type,
+ q_item->parent->tag,
+ &q_item->bmi_callback,
+ global_bmi_context);
- /* TODO: error handling */
- assert(ret >= 0);
+ /* TODO: error handling */
+ assert(ret >= 0);
- if(ret == 1)
- {
+ if (ret == 1)
+ {
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_unlock(&flow_data->flow_mutex);
+ gen_mutex_unlock(&flow_data->flow_mutex);
#endif
- /* immediate completion; trigger callback ourselves */
- bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
+ /* immediate completion; trigger callback ourselves */
+ bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
#endif
- }
+ }
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_unlock(&flow_data->flow_mutex);
+ gen_mutex_unlock(&flow_data->flow_mutex);
#endif
- return;
+ return;
};
@@ -829,61 +840,61 @@ static void cache_write_callback_fn(void
* no return value
*/
static void cache_read_callback_fn(void *user_ptr,
- PVFS_error error_code)
+ PVFS_error error_code)
{
- PVFS_size sent_size;
- struct fp_queue_item* q_item = user_ptr;
+ PVFS_size sent_size;
+ struct fp_queue_item* q_item = user_ptr;
#ifdef __CACHE_CALLBACK_SUPPORT
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+ struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
#endif
- PVFS_id_gen_t bmi_reqid;
- int ret;
+ PVFS_id_gen_t bmi_reqid;
+ int ret;
- /* TODO: error handling */
- assert(error_code == 0);
+ /* TODO: error handling */
+ assert(error_code == 0);
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
#endif
- q_item->int_state = INT_REQ_COMPLETE;
+ q_item->int_state = INT_REQ_COMPLETE;
- ret = BMI_post_send_list(&bmi_reqid,
- q_item->parent->dest.u.bmi.address,
- (const void **)q_item->cache_req.moff_list,
- q_item->cache_req.msize_list,
- q_item->cache_req.mem_cnt,
- q_item->cache_req.total_size,
- q_item->cache_req.buffer_type,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
+ ret = BMI_post_send_list(&bmi_reqid,
+ q_item->parent->dest.u.bmi.address,
+ (const void **)q_item->cache_req.moff_list,
+ q_item->cache_req.msize_list,
+ q_item->cache_req.mem_cnt,
+ q_item->cache_req.total_size,
+ q_item->cache_req.buffer_type,
+ q_item->parent->tag,
+ &q_item->bmi_callback,
+ global_bmi_context);
- /* TODO: error handling */
- assert(ret >= 0);
+ /* TODO: error handling */
+ assert(ret >= 0);
- if(ret == 1)
- {
+ if (ret == 1)
+ {
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_unlock(&flow_data->flow_mutex);
+ gen_mutex_unlock(&flow_data->flow_mutex);
#endif
- /* immediate completion; trigger callback ourselves */
- sent_size = q_item->cache_req.total_size;
- bmi_send_callback_fn(q_item, sent_size, 0);
+ /* immediate completion; trigger callback ourselves */
+ sent_size = q_item->cache_req.total_size;
+ bmi_send_callback_fn(q_item, sent_size, 0);
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
#endif
- }
+ }
#ifdef __CACHE_CALLBACK_SUPPORT__
- gen_mutex_unlock(&flow_data->flow_mutex);
+ gen_mutex_unlock(&flow_data->flow_mutex);
#endif
- return;
+ return;
};
/* bmi_send_callback_fn()
@@ -897,169 +908,179 @@ static void cache_read_callback_fn(void
* no return value
*/
static void bmi_send_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
+ PVFS_size actual_size,
+ PVFS_error error_code)
{
- struct fp_queue_item* q_item = user_ptr;
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- struct flow_descriptor * flow_d = flow_data->parent;
- int ret;
+ struct fp_queue_item* q_item = user_ptr;
+ struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
+ struct flow_descriptor * flow_d = flow_data->parent;
+ int ret;
- /* TODO: error handling */
- if(error_code != 0)
- {
- PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
- assert(0);
- }
+ /* TODO: error handling */
+ if (error_code != 0)
+ {
+ PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
+ assert(0);
+ }
- gen_mutex_lock(&flow_data->flow_mutex);
+ gen_mutex_lock(&flow_data->flow_mutex);
- flow_d->total_transfered += actual_size;
+ flow_d->total_transfered += actual_size;
- /* release cache resource */
+ /* release cache resource */
- ret = bmi_cache_release_cache_src(q_item);
+ ret = bmi_cache_release_cache_src(q_item);
- /* TODO: error handling */
- if ( ret < 0 ) {
- PVFS_perror_gossip("bmi_send_callback_fn: "
- "from bmi_cache_release_cache_src:error_code", ret);
- gen_mutex_unlock(&flow_data->flow_mutex);
- return;
- }
+ /* TODO: error handling */
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("bmi_send_callback_fn: "
+ "from bmi_cache_release_cache_src:error_code", ret);
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return;
+ }
- /* remove from current queue. q_item must be in the
- * cache_req_done_list when coming here. */
+ /* remove from current queue. q_item must be in the
+ * cache_req_done_list when coming here. */
- qlist_del(&q_item->list_link);
+ qlist_del(&q_item->list_link);
- /* when no callback support from the cache, we take advantage of
- * the BMI callback to make progress. That is, in the BMI callback
- * function, we wait for the completion of another request from
- * the cache component, then initiate BMI send function. All these
- * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
- */
+ /* when no callback support from the cache, we take advantage of
+ * the BMI callback to make progress. That is, in the BMI callback
+ * function, we wait for the completion of another request from
+ * the cache component, then initiate BMI send function. All these
+ * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
+ */
- /* no callback support from the cache. */
- if ( !q_item->cache_callback.fn )
+ /* no callback support from the cache. */
+ if (!q_item->cache_callback.fn)
+ {
+ ret = bmi_cache_progress_check(flow_d,
+ BLOCKING_FLAG,
+ NO_MUTEX_FLAG,
+ CACHE_TO_BMI);
+ if (ret < 0)
{
- ret = bmi_cache_progress_check( flow_d,
- BLOCKING_FLAG,
- NO_MUTEX_FLAG,
- CACHE_TO_BMI );
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_send_callback_fn: "
- "from bmi_cache_progress_check:error_code", ret);
- gen_mutex_unlock(&flow_data->flow_mutex);
- return;
- }
+ PVFS_perror_gossip("bmi_send_callback_fn: "
+ "from bmi_cache_progress_check:error_code", ret);
+ gen_mutex_unlock(&flow_data->flow_mutex);
+ return;
}
+ }
- gen_mutex_unlock(&flow_data->flow_mutex);
+ gen_mutex_unlock(&flow_data->flow_mutex);
- free(q_item);
+ free(q_item);
- return;
+ return;
}
static int bmi_cache_check_cache_req(struct fp_queue_item *qitem)
{
- cache_reply_t reply;
- int flag = 0;
- int ret = -1;
+ cache_reply_t reply;
+ int flag = 0;
+ int ret = -1;
- ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_check_cache_req: "
- "error_code", ret);
- return ret;
- }
+ ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("bmi_cache_check_cache_req: "
+ "error_code", ret);
+ return ret;
+ }
- /* a request is finished. */
- if ( flag )
- {
- qitem->cache_req.mem_cnt = reply.count;
- qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
- qitem->cache_req.msize_list = reply.cbuf_size_array;
- qitem->cache_req.errval = reply.errval;
+ /* a request is finished. */
+ if (flag)
+ {
+ qitem->cache_req.mem_cnt = reply.count;
+ qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
+ qitem->cache_req.msize_list = reply.cbuf_size_array;
+ qitem->cache_req.errval = reply.errval;
- return 1;
- }
- return 0;
+ return 1;
+ }
+ return 0;
}
static int bmi_cache_release_cache_src(struct fp_queue_item *qitem)
{
- int ret;
+ int ret;
- ret = cache_req_done( &(qitem->cache_req.request) );
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_release_cache_src: "
- "error_code", ret);
- return ret;
- }
+ ret = cache_req_done( &(qitem->cache_req.request) );
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("bmi_cache_release_cache_src: "
+ "error_code", ret);
+ return ret;
+ }
- return 0;
+ return 0;
}
-static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op )
+static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op)
{
- int ret = 0;
- cache_read_desc_t desc1;
- cache_write_desc_t desc2;
- cache_reply_t reply;
+ int ret = 0;
+ cache_read_desc_t desc1;
+ cache_write_desc_t desc2;
+ cache_reply_t reply;
- if ( op == BMI_TO_CACHE ) /* write */
- {
- desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
- desc2.handle = qitem->parent->dest.u.trove.handle;
- desc2.context_id = global_trove_context;
-
- /* TODO: if we use intemediate buffer, change here */
- desc2.buffer = NULL;
- desc2.len = 0;
+ if (op == BMI_TO_CACHE) /* write */
+ {
+ desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
+ desc2.handle = qitem->parent->dest.u.trove.handle;
+ desc2.context_id = global_trove_context;
+
+ /* TODO: if we use intemediate buffer, change here */
+ desc2.buffer = NULL;
+ desc2.len = 0;
- desc2.stream_array_count = qitem->pint_req.result.segs;
- desc2.stream_offset_array = qitem->pint_req.result.offset_array;
- desc2.stream_size_array = qitem->pint_req.result.size_array;
-
- ret = cache_write_post( &desc2,
- &qitem->cache_req.request,
- &reply,
- NULL );
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_init_cache_req: "
- "error_code", ret);
- }
- }
- else /* read */
+ desc2.stream_array_count = qitem->pint_req.result.segs;
+ desc2.stream_offset_array = qitem->pint_req.result.offset_array;
+ desc2.stream_size_array = qitem->pint_req.result.size_array;
+
+ ret = cache_write_post(&desc2,
+ &qitem->cache_req.request,
+ &reply,
+ NULL);
+ if (ret < 0)
{
- desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
- desc1.handle = qitem->parent->dest.u.trove.handle;
- desc1.context_id = global_trove_context;
-
- /* TODO: if we use intemediate buffer, change here */
- desc1.buffer = NULL;
- desc1.len = 0;
+ PVFS_perror_gossip("bmi_cache_init_cache_req: "
+ "error_code", ret);
+ }
+ }
+ else /* read */
+ {
+ desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
+ desc1.handle = qitem->parent->dest.u.trove.handle;
+ desc1.context_id = global_trove_context;
+
+ /* TODO: if we use intemediate buffer, change here */
+ desc1.buffer = NULL;
+ desc1.len = 0;
- desc1.stream_array_count = qitem->pint_req.result.segs;
- desc1.stream_offset_array = qitem->pint_req.result.offset_array;
- desc1.stream_size_array = qitem->pint_req.result.size_array;
-
- ret = cache_read_post( &desc1,
- &qitem->cache_req.request,
- &reply,
- NULL );
- if ( ret < 0 )
- {
- PVFS_perror_gossip("bmi_cache_init_cache_req: "
- "error_code", ret);
- }
+ desc1.stream_array_count = qitem->pint_req.result.segs;
+ desc1.stream_offset_array = qitem->pint_req.result.offset_array;
+ desc1.stream_size_array = qitem->pint_req.result.size_array;
+
+ ret = cache_read_post(&desc1,
+ &qitem->cache_req.request,
+ &reply,
+ NULL );
+ if (ret < 0)
+ {
+ PVFS_perror_gossip("bmi_cache_init_cache_req: "
+ "error_code", ret);
}
- return ret;
+ }
+ return ret;
}
+
+/*
+ * Local variables:
+ * c-indent-level: 4
+ * c-basic-offset: 4
+ * End:
+ *
+ * vim: ts=8 sts=4 sw=4 noexpandtab
+ */
More information about the PVFS2-CVS
mailing list