[PVFS2-CVS] commit by pcarns in pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Fri Feb 13 14:06:42 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb:/tmp/cvs-serv14811

Modified Files:
	flowproto-multiqueue.c 
Log Message:
restructured trove completion callbacks slightly to retrieve more
information


Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.58 -r1.59
--- flowproto-multiqueue.c	13 Feb 2004 15:50:00 -0000	1.58
+++ flowproto-multiqueue.c	13 Feb 2004 19:06:42 -0000	1.59
@@ -58,6 +58,8 @@ struct result_chain_entry
     PVFS_size size_list[MAX_REGIONS];
     PVFS_offset offset_list[MAX_REGIONS];
     struct result_chain_entry* next;
+    struct fp_queue_item* q_item;
+    struct PINT_thread_mgr_trove_callback trove_callback;
 };
 
 /* fp_queue_item describes an individual buffer being used within the flow */
@@ -73,7 +75,6 @@ struct fp_queue_item
     struct qlist_head list_link;
     flow_descriptor* parent;
     struct PINT_thread_mgr_bmi_callback bmi_callback;
-    struct PINT_thread_mgr_trove_callback trove_callback;
 };
 
 /* fp_private_data is information specific to this flow protocol, stored
@@ -312,8 +313,6 @@ int fp_multiqueue_post(flow_descriptor *
 	flow_data->prealloc_array[i].parent = flow_d;
 	flow_data->prealloc_array[i].bmi_callback.data = 
 	    &(flow_data->prealloc_array[i]);
-	flow_data->prealloc_array[i].trove_callback.data = 
-	    &(flow_data->prealloc_array[i]);
     }
 
     /* remaining setup depends on the endpoints we intend to use */
@@ -354,7 +353,9 @@ int fp_multiqueue_post(flow_descriptor *
 		&flow_data->empty_list);
 	}
 
-	trove_write_callback_fn(&(flow_data->prealloc_array[0]), 0);
+	flow_data->prealloc_array[0].result_chain.q_item = 
+	    &flow_data->prealloc_array[0];
+	trove_write_callback_fn(&(flow_data->prealloc_array[0].result_chain), 0);
     }
 #endif
     else
@@ -384,6 +385,7 @@ static void bmi_recv_callback_fn(void *u
     struct result_chain_entry* old_result_tmp;
     PVFS_size bytes_processed = 0;
     void* tmp_buffer;
+    void* tmp_user_ptr;
 
     q_item->posted_id = 0;
 
@@ -402,6 +404,10 @@ static void bmi_recv_callback_fn(void *u
     result_tmp = &q_item->result_chain;
     do{
 	assert(result_tmp->result.bytes);
+	result_tmp->q_item = q_item;
+	result_tmp->trove_callback.data = result_tmp;
+	result_tmp->trove_callback.fn = trove_write_callback_fn;
+	tmp_user_ptr = &result_tmp->trove_callback;
 	ret = trove_bstream_write_list(q_item->parent->dest.u.trove.coll_id,
 	    q_item->parent->dest.u.trove.handle,
 	    (char**)&result_tmp->buffer_offset,
@@ -413,7 +419,7 @@ static void bmi_recv_callback_fn(void *u
 	    &result_tmp->result.bytes,
 	    0,
 	    NULL,
-	    &q_item->trove_callback,
+	    &result_tmp->trove_callback,
 	    global_trove_context,
 	    &result_tmp->posted_id);
 	result_tmp = result_tmp->next;
@@ -428,7 +434,7 @@ static void bmi_recv_callback_fn(void *u
 	{
 	    gen_mutex_unlock(&flow_data->flow_mutex);
 	    /* immediate completion; trigger callback ourselves */
-	    trove_write_callback_fn(q_item, 0);
+	    trove_write_callback_fn(tmp_user_ptr, 0);
 	    gen_mutex_lock(&flow_data->flow_mutex);
 	}
     }while(result_tmp);
@@ -452,7 +458,6 @@ static void bmi_recv_callback_fn(void *u
 	    /* TODO: error handling */
 	    assert(q_item->buffer);
 	    q_item->bmi_callback.fn = bmi_recv_callback_fn;
-	    q_item->trove_callback.fn = trove_write_callback_fn;
 	}
 	
 	result_tmp = &q_item->result_chain;
@@ -543,14 +548,16 @@ static void bmi_recv_callback_fn(void *u
 static void trove_read_callback_fn(void *user_ptr,
 		           PVFS_error error_code)
 {
-    struct fp_queue_item* q_item = user_ptr;
     int ret;
+    struct result_chain_entry* result_tmp = user_ptr;
+    struct fp_queue_item* q_item = result_tmp->q_item;
     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry* result_tmp;
     struct result_chain_entry* old_result_tmp;
     int done = 0;
     struct qlist_head* tmp_link;
 
+    q_item = result_tmp->q_item;
+
     gen_mutex_lock(&flow_data->flow_mutex);
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
@@ -654,6 +661,7 @@ static int bmi_send_callback_fn(void *us
     struct result_chain_entry* old_result_tmp;
     void* tmp_buffer;
     PVFS_size bytes_processed = 0;
+    void* tmp_user_ptr = NULL;
 
     q_item->posted_id = 0;
 
@@ -714,7 +722,6 @@ static int bmi_send_callback_fn(void *us
 	/* TODO: error handling */
 	assert(q_item->buffer);
 	q_item->bmi_callback.fn = bmi_send_callback_wrapper;
-	q_item->trove_callback.fn = trove_read_callback_fn;
     }
     
     /* add to src queue */
@@ -785,6 +792,10 @@ static int bmi_send_callback_fn(void *us
     do{
 	assert(q_item->buffer_used);
 	assert(result_tmp->result.bytes);
+	result_tmp->q_item = q_item;
+	result_tmp->trove_callback.data = result_tmp;
+	result_tmp->trove_callback.fn = trove_read_callback_fn;
+	tmp_user_ptr = &result_tmp->trove_callback;
 	ret = trove_bstream_read_list(q_item->parent->src.u.trove.coll_id,
 	    q_item->parent->src.u.trove.handle,
 	    (char**)&result_tmp->buffer_offset,
@@ -796,7 +807,7 @@ static int bmi_send_callback_fn(void *us
 	    &result_tmp->result.bytes,
 	    0,
 	    NULL,
-	    &q_item->trove_callback,
+	    &result_tmp->trove_callback,
 	    global_trove_context,
 	    &result_tmp->posted_id);
 	result_tmp = result_tmp->next;
@@ -822,7 +833,7 @@ static int bmi_send_callback_fn(void *us
 	{
 	    /* immediate completion; trigger callback ourselves */
 	    gen_mutex_unlock(&flow_data->flow_mutex);
-	    trove_read_callback_fn(q_item, 0);
+	    trove_read_callback_fn(tmp_user_ptr, 0);
 	    gen_mutex_lock(&flow_data->flow_mutex);
 	}
     }while(result_tmp);
@@ -841,14 +852,15 @@ static void trove_write_callback_fn(void
 		           PVFS_error error_code)
 {
     PVFS_size tmp_actual_size;
-    struct fp_queue_item* q_item = user_ptr;
     int ret;
+    struct result_chain_entry* result_tmp = user_ptr;
+    struct fp_queue_item* q_item = result_tmp->q_item;
     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
-    struct result_chain_entry* result_tmp;
     struct result_chain_entry* old_result_tmp;
     void* tmp_buffer;
     PVFS_size bytes_processed = 0;
 
+
     gen_mutex_lock(&flow_data->flow_mutex);
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
@@ -910,7 +922,6 @@ static void trove_write_callback_fn(void
 	/* TODO: error handling */
 	assert(q_item->buffer);
 	q_item->bmi_callback.fn = bmi_recv_callback_fn;
-	q_item->trove_callback.fn = trove_write_callback_fn;
     }
 
     /* if src list is empty, then post new recv; otherwise just queue



More information about the PVFS2-CVS mailing list