[PVFS2-CVS]
commit by pcarns in pvfs2/src/io/flow/flowproto-bmi-trove:
flowproto-multiqueue.c
CVS commit program
cvs at parl.clemson.edu
Fri Feb 13 14:06:42 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb:/tmp/cvs-serv14811
Modified Files:
flowproto-multiqueue.c
Log Message:
restructured trove completion callbacks slightly to retrieve more
information
Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.58 -r1.59
--- flowproto-multiqueue.c 13 Feb 2004 15:50:00 -0000 1.58
+++ flowproto-multiqueue.c 13 Feb 2004 19:06:42 -0000 1.59
@@ -58,6 +58,8 @@ struct result_chain_entry
PVFS_size size_list[MAX_REGIONS];
PVFS_offset offset_list[MAX_REGIONS];
struct result_chain_entry* next;
+ struct fp_queue_item* q_item;
+ struct PINT_thread_mgr_trove_callback trove_callback;
};
/* fp_queue_item describes an individual buffer being used within the flow */
@@ -73,7 +75,6 @@ struct fp_queue_item
struct qlist_head list_link;
flow_descriptor* parent;
struct PINT_thread_mgr_bmi_callback bmi_callback;
- struct PINT_thread_mgr_trove_callback trove_callback;
};
/* fp_private_data is information specific to this flow protocol, stored
@@ -312,8 +313,6 @@ int fp_multiqueue_post(flow_descriptor *
flow_data->prealloc_array[i].parent = flow_d;
flow_data->prealloc_array[i].bmi_callback.data =
&(flow_data->prealloc_array[i]);
- flow_data->prealloc_array[i].trove_callback.data =
- &(flow_data->prealloc_array[i]);
}
/* remaining setup depends on the endpoints we intend to use */
@@ -354,7 +353,9 @@ int fp_multiqueue_post(flow_descriptor *
&flow_data->empty_list);
}
- trove_write_callback_fn(&(flow_data->prealloc_array[0]), 0);
+ flow_data->prealloc_array[0].result_chain.q_item =
+ &flow_data->prealloc_array[0];
+ trove_write_callback_fn(&(flow_data->prealloc_array[0].result_chain), 0);
}
#endif
else
@@ -384,6 +385,7 @@ static void bmi_recv_callback_fn(void *u
struct result_chain_entry* old_result_tmp;
PVFS_size bytes_processed = 0;
void* tmp_buffer;
+ void* tmp_user_ptr;
q_item->posted_id = 0;
@@ -402,6 +404,10 @@ static void bmi_recv_callback_fn(void *u
result_tmp = &q_item->result_chain;
do{
assert(result_tmp->result.bytes);
+ result_tmp->q_item = q_item;
+ result_tmp->trove_callback.data = result_tmp;
+ result_tmp->trove_callback.fn = trove_write_callback_fn;
+ tmp_user_ptr = &result_tmp->trove_callback;
ret = trove_bstream_write_list(q_item->parent->dest.u.trove.coll_id,
q_item->parent->dest.u.trove.handle,
(char**)&result_tmp->buffer_offset,
@@ -413,7 +419,7 @@ static void bmi_recv_callback_fn(void *u
&result_tmp->result.bytes,
0,
NULL,
- &q_item->trove_callback,
+ &result_tmp->trove_callback,
global_trove_context,
&result_tmp->posted_id);
result_tmp = result_tmp->next;
@@ -428,7 +434,7 @@ static void bmi_recv_callback_fn(void *u
{
gen_mutex_unlock(&flow_data->flow_mutex);
/* immediate completion; trigger callback ourselves */
- trove_write_callback_fn(q_item, 0);
+ trove_write_callback_fn(tmp_user_ptr, 0);
gen_mutex_lock(&flow_data->flow_mutex);
}
}while(result_tmp);
@@ -452,7 +458,6 @@ static void bmi_recv_callback_fn(void *u
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_recv_callback_fn;
- q_item->trove_callback.fn = trove_write_callback_fn;
}
result_tmp = &q_item->result_chain;
@@ -543,14 +548,16 @@ static void bmi_recv_callback_fn(void *u
static void trove_read_callback_fn(void *user_ptr,
PVFS_error error_code)
{
- struct fp_queue_item* q_item = user_ptr;
int ret;
+ struct result_chain_entry* result_tmp = user_ptr;
+ struct fp_queue_item* q_item = result_tmp->q_item;
struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry* result_tmp;
struct result_chain_entry* old_result_tmp;
int done = 0;
struct qlist_head* tmp_link;
+ q_item = result_tmp->q_item;
+
gen_mutex_lock(&flow_data->flow_mutex);
if(error_code != 0 || flow_data->parent->error_code != 0)
@@ -654,6 +661,7 @@ static int bmi_send_callback_fn(void *us
struct result_chain_entry* old_result_tmp;
void* tmp_buffer;
PVFS_size bytes_processed = 0;
+ void* tmp_user_ptr = NULL;
q_item->posted_id = 0;
@@ -714,7 +722,6 @@ static int bmi_send_callback_fn(void *us
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_send_callback_wrapper;
- q_item->trove_callback.fn = trove_read_callback_fn;
}
/* add to src queue */
@@ -785,6 +792,10 @@ static int bmi_send_callback_fn(void *us
do{
assert(q_item->buffer_used);
assert(result_tmp->result.bytes);
+ result_tmp->q_item = q_item;
+ result_tmp->trove_callback.data = result_tmp;
+ result_tmp->trove_callback.fn = trove_read_callback_fn;
+ tmp_user_ptr = &result_tmp->trove_callback;
ret = trove_bstream_read_list(q_item->parent->src.u.trove.coll_id,
q_item->parent->src.u.trove.handle,
(char**)&result_tmp->buffer_offset,
@@ -796,7 +807,7 @@ static int bmi_send_callback_fn(void *us
&result_tmp->result.bytes,
0,
NULL,
- &q_item->trove_callback,
+ &result_tmp->trove_callback,
global_trove_context,
&result_tmp->posted_id);
result_tmp = result_tmp->next;
@@ -822,7 +833,7 @@ static int bmi_send_callback_fn(void *us
{
/* immediate completion; trigger callback ourselves */
gen_mutex_unlock(&flow_data->flow_mutex);
- trove_read_callback_fn(q_item, 0);
+ trove_read_callback_fn(tmp_user_ptr, 0);
gen_mutex_lock(&flow_data->flow_mutex);
}
}while(result_tmp);
@@ -841,14 +852,15 @@ static void trove_write_callback_fn(void
PVFS_error error_code)
{
PVFS_size tmp_actual_size;
- struct fp_queue_item* q_item = user_ptr;
int ret;
+ struct result_chain_entry* result_tmp = user_ptr;
+ struct fp_queue_item* q_item = result_tmp->q_item;
struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- struct result_chain_entry* result_tmp;
struct result_chain_entry* old_result_tmp;
void* tmp_buffer;
PVFS_size bytes_processed = 0;
+
gen_mutex_lock(&flow_data->flow_mutex);
if(error_code != 0 || flow_data->parent->error_code != 0)
@@ -910,7 +922,6 @@ static void trove_write_callback_fn(void
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_recv_callback_fn;
- q_item->trove_callback.fn = trove_write_callback_fn;
}
/* if src list is empty, then post new recv; otherwise just queue
More information about the PVFS2-CVS
mailing list