[Pvfs2-cvs] commit by slang in
pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c
CVS commit program
cvs at parl.clemson.edu
Fri Aug 18 01:12:08 EDT 2006
Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv28668/src/io/flow/flowproto-bmi-trove
Modified Files:
Tag: kunkel-branch
flowproto-multiqueue.c
Log Message:
reverse merge from trunk. working for now.
Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.110.6.3 -r1.110.6.4
--- flowproto-multiqueue.c 25 Jul 2006 19:20:42 -0000 1.110.6.3
+++ flowproto-multiqueue.c 18 Aug 2006 05:12:08 -0000 1.110.6.4
@@ -22,7 +22,13 @@
#include "pint-perf-counter.h"
#include "pvfs2-internal.h"
+/* the following buffer settings are used by default if none are specified in
+ * the flow descriptor
+ */
#define BUFFERS_PER_FLOW 8
+#define BUFFER_SIZE (256*1024)
+
+#define MAX_REGIONS 64
#define FLOW_CLEANUP(__flow_data) \
do { \
@@ -39,7 +45,7 @@ do {
struct result_chain_entry
{
PVFS_id_gen_t posted_id;
- void *buffer_offset;
+ char *buffer_offset;
PINT_Request_result result;
PVFS_size size_list[IO_MAX_REGIONS];
PVFS_offset offset_list[IO_MAX_REGIONS];
@@ -70,7 +76,7 @@ struct fp_queue_item
struct fp_private_data
{
flow_descriptor *parent;
- struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
+ struct fp_queue_item* prealloc_array;
struct qlist_head list_link;
PVFS_size total_bytes_processed;
int next_seq;
@@ -496,6 +502,9 @@ int fp_multiqueue_post(flow_descriptor
struct fp_private_data *flow_data = NULL;
int i;
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto posting %p\n",
+ flow_d);
+
assert((flow_d->src.endpoint_id == BMI_ENDPOINT &&
flow_d->dest.endpoint_id == TROVE_ENDPOINT) ||
(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
@@ -538,7 +547,21 @@ int fp_multiqueue_post(flow_descriptor
PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
}
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ if(flow_d->buffer_size < 1)
+ flow_d->buffer_size = BUFFER_SIZE;
+ if(flow_d->buffers_per_flow < 1)
+ flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+
+ flow_data->prealloc_array = (struct fp_queue_item*)
+ malloc(flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+ if(!flow_data->prealloc_array)
+ {
+ free(flow_data);
+ return(-PVFS_ENOMEM);
+ }
+ memset(flow_data->prealloc_array, 0,
+ flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+ for(i=0; i<flow_d->buffers_per_flow; i++)
{
flow_data->prealloc_array[i].parent = flow_d;
flow_data->prealloc_array[i].bmi_callback.data =
@@ -555,7 +578,7 @@ int fp_multiqueue_post(flow_descriptor
/* put all of the buffers on empty list, we don't really do any
* queueing for this type of flow
*/
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for(i=0; i<flow_d->buffers_per_flow; i++)
{
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
@@ -581,7 +604,7 @@ int fp_multiqueue_post(flow_descriptor
/* put all of the buffers on empty list, we don't really do any
* queueing for this type of flow
*/
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for(i=0; i<flow_d->buffers_per_flow; i++)
{
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
@@ -602,9 +625,9 @@ int fp_multiqueue_post(flow_descriptor
else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
flow_d->dest.endpoint_id == BMI_ENDPOINT)
{
- flow_data->initial_posts = BUFFERS_PER_FLOW;
+ flow_data->initial_posts = flow_d->buffers_per_flow;
gen_mutex_lock(flow_data->parent->flow_mutex);
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for(i=0; i<flow_d->buffers_per_flow; i++)
{
gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
"flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -633,7 +656,7 @@ int fp_multiqueue_post(flow_descriptor
flow_data->initial_posts = 1;
/* place remaining buffers on "empty" queue */
- for(i=1; i<BUFFERS_PER_FLOW; i++)
+ for(i=1; i<flow_d->buffers_per_flow; i++)
{
qlist_add_tail(&flow_data->prealloc_array[i].list_link,
&flow_data->empty_list);
@@ -661,6 +684,8 @@ int fp_multiqueue_post(flow_descriptor
return(-ENOSYS);
}
+ gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto posted %p\n",
+ flow_d);
return (0);
}
@@ -758,7 +783,7 @@ static void bmi_recv_callback_fn(void *u
{
/* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
- IO_BUFFER_SIZE, BMI_RECV);
+ q_item->parent->buffer_size, BMI_RECV);
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -782,7 +807,8 @@ static void bmi_recv_callback_fn(void *u
result_tmp->offset_list;
result_tmp->result.size_array =
result_tmp->size_list;
- result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+ result_tmp->result.bytemax = flow_data->parent->buffer_size -
+ bytes_processed;
result_tmp->result.bytes = 0;
result_tmp->result.segmax = IO_MAX_REGIONS;
result_tmp->result.segs = 0;
@@ -811,10 +837,10 @@ static void bmi_recv_callback_fn(void *u
tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
bytes_processed += old_result_tmp->result.bytes;
}
- }while(bytes_processed < IO_BUFFER_SIZE &&
+ }while(bytes_processed < flow_data->parent->buffer_size &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- assert(bytes_processed <= IO_BUFFER_SIZE);
+ assert(bytes_processed <= flow_data->parent->buffer_size);
if(bytes_processed == 0)
{
qlist_del(&q_item->list_link);
@@ -828,7 +854,7 @@ static void bmi_recv_callback_fn(void *u
ret = BMI_post_recv(&q_item->posted_id,
q_item->parent->src.u.bmi.address,
q_item->buffer,
- IO_BUFFER_SIZE,
+ flow_data->parent->buffer_size,
&tmp_actual_size,
BMI_PRE_ALLOC,
q_item->parent->tag,
@@ -1063,7 +1089,7 @@ static int bmi_send_callback_fn(void *us
{
/* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
- IO_BUFFER_SIZE, BMI_SEND);
+ q_item->parent->buffer_size, BMI_SEND);
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_send_callback_wrapper;
@@ -1091,7 +1117,8 @@ static int bmi_send_callback_fn(void *us
result_tmp->offset_list;
result_tmp->result.size_array =
result_tmp->size_list;
- result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+ result_tmp->result.bytemax = q_item->parent->buffer_size
+ - bytes_processed;
result_tmp->result.bytes = 0;
result_tmp->result.segmax = IO_MAX_REGIONS;
result_tmp->result.segs = 0;
@@ -1123,10 +1150,10 @@ static int bmi_send_callback_fn(void *us
q_item->buffer_used += old_result_tmp->result.bytes;
}
- }while(bytes_processed < IO_BUFFER_SIZE &&
+ }while(bytes_processed < flow_data->parent->buffer_size &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- assert(bytes_processed <= IO_BUFFER_SIZE);
+ assert(bytes_processed <= flow_data->parent->buffer_size);
/* important to update the sequence /after/ request processed */
q_item->seq = flow_data->next_seq;
@@ -1326,7 +1353,7 @@ static void trove_write_callback_fn(void
{
/* if the q_item has not been used, allocate a buffer */
q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
- IO_BUFFER_SIZE, BMI_RECV);
+ q_item->parent->buffer_size, BMI_RECV);
/* TODO: error handling */
assert(q_item->buffer);
q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -1358,7 +1385,8 @@ static void trove_write_callback_fn(void
result_tmp->offset_list;
result_tmp->result.size_array =
result_tmp->size_list;
- result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+ result_tmp->result.bytemax = flow_data->parent->buffer_size
+ - bytes_processed;
result_tmp->result.bytes = 0;
result_tmp->result.segmax = IO_MAX_REGIONS;
result_tmp->result.segs = 0;
@@ -1389,10 +1417,10 @@ static void trove_write_callback_fn(void
((char*)tmp_buffer + old_result_tmp->result.bytes);
bytes_processed += old_result_tmp->result.bytes;
}
- }while(bytes_processed < IO_BUFFER_SIZE &&
+ }while(bytes_processed < flow_data->parent->buffer_size &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- assert(bytes_processed <= IO_BUFFER_SIZE);
+ assert(bytes_processed <= flow_data->parent->buffer_size);
flow_data->total_bytes_processed += bytes_processed;
@@ -1412,7 +1440,7 @@ static void trove_write_callback_fn(void
ret = BMI_post_recv(&q_item->posted_id,
q_item->parent->src.u.bmi.address,
q_item->buffer,
- IO_BUFFER_SIZE,
+ flow_data->parent->buffer_size,
&tmp_actual_size,
BMI_PRE_ALLOC,
q_item->parent->tag,
@@ -1456,13 +1484,13 @@ static void cleanup_buffers(struct fp_pr
if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
{
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for(i=0; i<flow_data->parent->buffers_per_flow; i++)
{
if(flow_data->prealloc_array[i].buffer)
{
BMI_memfree(flow_data->parent->src.u.bmi.address,
flow_data->prealloc_array[i].buffer,
- IO_BUFFER_SIZE,
+ flow_data->parent->buffer_size,
BMI_RECV);
}
result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1479,13 +1507,13 @@ static void cleanup_buffers(struct fp_pr
else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
{
- for(i=0; i<BUFFERS_PER_FLOW; i++)
+ for(i=0; i<flow_data->parent->buffers_per_flow; i++)
{
if(flow_data->prealloc_array[i].buffer)
{
BMI_memfree(flow_data->parent->dest.u.bmi.address,
flow_data->prealloc_array[i].buffer,
- IO_BUFFER_SIZE,
+ flow_data->parent->buffer_size,
BMI_SEND);
}
result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1505,7 +1533,7 @@ static void cleanup_buffers(struct fp_pr
if(flow_data->intermediate)
{
BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->intermediate, IO_BUFFER_SIZE, BMI_SEND);
+ flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
}
}
else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
@@ -1514,9 +1542,11 @@ static void cleanup_buffers(struct fp_pr
if(flow_data->intermediate)
{
BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->intermediate, IO_BUFFER_SIZE, BMI_RECV);
+ flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
}
}
+
+ free(flow_data->prealloc_array);
}
/* mem_to_bmi_callback()
@@ -1572,7 +1602,7 @@ static void mem_to_bmi_callback_fn(void
q_item->result_chain.offset_list;
q_item->result_chain.result.size_array =
q_item->result_chain.size_list;
- q_item->result_chain.result.bytemax = IO_BUFFER_SIZE;
+ q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
q_item->result_chain.result.bytes = 0;
q_item->result_chain.result.segmax = IO_MAX_REGIONS;
q_item->result_chain.result.segs = 0;
@@ -1588,14 +1618,14 @@ static void mem_to_bmi_callback_fn(void
/* was IO_MAX_REGIONS enough to satisfy this step? */
if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
- q_item->result_chain.result.bytes < IO_BUFFER_SIZE)
+ q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
{
/* create an intermediate buffer */
if(!flow_data->intermediate)
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->dest.u.bmi.address,
- IO_BUFFER_SIZE, BMI_SEND);
+ flow_data->parent->buffer_size, BMI_SEND);
/* TODO: error handling */
assert(flow_data->intermediate);
}
@@ -1613,7 +1643,7 @@ static void mem_to_bmi_callback_fn(void
do
{
q_item->result_chain.result.bytemax =
- (IO_BUFFER_SIZE - bytes_processed);
+ (flow_data->parent->buffer_size - bytes_processed);
q_item->result_chain.result.bytes = 0;
q_item->result_chain.result.segmax = IO_MAX_REGIONS;
q_item->result_chain.result.segs = 0;
@@ -1636,10 +1666,10 @@ static void mem_to_bmi_callback_fn(void
memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
bytes_processed += q_item->result_chain.size_list[i];
}
- }while(bytes_processed < IO_BUFFER_SIZE &&
+ }while(bytes_processed < flow_data->parent->buffer_size &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- assert (bytes_processed <= IO_BUFFER_SIZE);
+ assert (bytes_processed <= flow_data->parent->buffer_size);
/* setup for BMI operation */
flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1759,7 +1789,7 @@ static void bmi_to_mem_callback_fn(void
do
{
q_item->result_chain.result.bytemax =
- (IO_BUFFER_SIZE - bytes_processed);
+ (q_item->parent->buffer_size - bytes_processed);
q_item->result_chain.result.bytes = 0;
q_item->result_chain.result.segmax = IO_MAX_REGIONS;
q_item->result_chain.result.segs = 0;
@@ -1783,10 +1813,10 @@ static void bmi_to_mem_callback_fn(void
memcpy(dest_ptr, src_ptr, region_size);
bytes_processed += region_size;
}
- }while(bytes_processed < IO_BUFFER_SIZE &&
+ }while(bytes_processed < flow_data->parent->buffer_size &&
!PINT_REQUEST_DONE(q_item->parent->file_req_state));
- assert(bytes_processed <= IO_BUFFER_SIZE);
+ assert(bytes_processed <= flow_data->parent->buffer_size);
}
/* are we done? */
@@ -1805,7 +1835,7 @@ static void bmi_to_mem_callback_fn(void
q_item->result_chain.offset_list;
q_item->result_chain.result.size_array =
q_item->result_chain.size_list;
- q_item->result_chain.result.bytemax = IO_BUFFER_SIZE;
+ q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
q_item->result_chain.result.bytes = 0;
q_item->result_chain.result.segmax = IO_MAX_REGIONS;
q_item->result_chain.result.segs = 0;
@@ -1820,22 +1850,22 @@ static void bmi_to_mem_callback_fn(void
/* was IO_MAX_REGIONS enough to satisfy this step? */
if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
- q_item->result_chain.result.bytes < IO_BUFFER_SIZE)
+ q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
{
/* create an intermediate buffer */
if(!flow_data->intermediate)
{
flow_data->intermediate = BMI_memalloc(
flow_data->parent->src.u.bmi.address,
- IO_BUFFER_SIZE, BMI_RECV);
+ flow_data->parent->buffer_size, BMI_RECV);
/* TODO: error handling */
assert(flow_data->intermediate);
}
/* setup for BMI operation */
flow_data->tmp_buffer_list[0] = flow_data->intermediate;
buffer_type = BMI_PRE_ALLOC;
- q_item->buffer_used = IO_BUFFER_SIZE;
- total_size = IO_BUFFER_SIZE;
+ q_item->buffer_used = flow_data->parent->buffer_size;
+ total_size = flow_data->parent->buffer_size;
size_array = &q_item->buffer_used;
segs = 1;
/* we will copy data out on next iteration */
More information about the Pvfs2-cvs
mailing list