[Pvfs2-cvs] commit by slang in pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Fri Aug 18 01:12:08 EDT 2006


Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv28668/src/io/flow/flowproto-bmi-trove

Modified Files:
      Tag: kunkel-branch
	flowproto-multiqueue.c 
Log Message:
reverse merge from trunk.  working for now.


Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.110.6.3 -r1.110.6.4
--- flowproto-multiqueue.c	25 Jul 2006 19:20:42 -0000	1.110.6.3
+++ flowproto-multiqueue.c	18 Aug 2006 05:12:08 -0000	1.110.6.4
@@ -22,7 +22,13 @@
 #include "pint-perf-counter.h"
 #include "pvfs2-internal.h"
 
+/* the following buffer settings are used by default if none are specified in
+ * the flow descriptor
+ */
 #define BUFFERS_PER_FLOW 8
+#define BUFFER_SIZE (256*1024)
+
+#define MAX_REGIONS 64
 
 #define FLOW_CLEANUP(__flow_data)                                     \
 do {                                                                  \
@@ -39,7 +45,7 @@ do {                                    
 struct result_chain_entry
 {
     PVFS_id_gen_t posted_id;
-    void *buffer_offset;
+    char *buffer_offset;
     PINT_Request_result result;
     PVFS_size size_list[IO_MAX_REGIONS];
     PVFS_offset offset_list[IO_MAX_REGIONS];
@@ -70,7 +76,7 @@ struct fp_queue_item
 struct fp_private_data
 {
     flow_descriptor *parent;
-    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
+    struct fp_queue_item* prealloc_array;
     struct qlist_head list_link;
     PVFS_size total_bytes_processed;
     int next_seq;
@@ -496,6 +502,9 @@ int fp_multiqueue_post(flow_descriptor  
     struct fp_private_data *flow_data = NULL;
     int i;
 
+    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto posting %p\n",
+                 flow_d);
+
     assert((flow_d->src.endpoint_id == BMI_ENDPOINT && 
             flow_d->dest.endpoint_id == TROVE_ENDPOINT) ||
            (flow_d->src.endpoint_id == TROVE_ENDPOINT &&
@@ -538,7 +547,21 @@ int fp_multiqueue_post(flow_descriptor  
             PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
     }
 
-    for(i=0; i<BUFFERS_PER_FLOW; i++)
+    if(flow_d->buffer_size < 1)
+        flow_d->buffer_size = BUFFER_SIZE;
+    if(flow_d->buffers_per_flow < 1)
+        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+        
+    flow_data->prealloc_array = (struct fp_queue_item*)
+        malloc(flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    if(!flow_data->prealloc_array)
+    {
+        free(flow_data);
+        return(-PVFS_ENOMEM);
+    }
+    memset(flow_data->prealloc_array, 0,
+        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    for(i=0; i<flow_d->buffers_per_flow; i++)
     {
         flow_data->prealloc_array[i].parent = flow_d;
         flow_data->prealloc_array[i].bmi_callback.data = 
@@ -555,7 +578,7 @@ int fp_multiqueue_post(flow_descriptor  
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -581,7 +604,7 @@ int fp_multiqueue_post(flow_descriptor  
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -602,9 +625,9 @@ int fp_multiqueue_post(flow_descriptor  
     else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
         flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
-        flow_data->initial_posts = BUFFERS_PER_FLOW;
+        flow_data->initial_posts = flow_d->buffers_per_flow;
         gen_mutex_lock(flow_data->parent->flow_mutex);
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -633,7 +656,7 @@ int fp_multiqueue_post(flow_descriptor  
         flow_data->initial_posts = 1;
 
         /* place remaining buffers on "empty" queue */
-        for(i=1; i<BUFFERS_PER_FLOW; i++)
+        for(i=1; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -661,6 +684,8 @@ int fp_multiqueue_post(flow_descriptor  
         return(-ENOSYS);
     }
 
+    gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "flowproto posted %p\n",
+                 flow_d);
     return (0);
 }
 
@@ -758,7 +783,7 @@ static void bmi_recv_callback_fn(void *u
         {
             /* if the q_item has not been used, allocate a buffer */
             q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-                IO_BUFFER_SIZE, BMI_RECV);
+                q_item->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(q_item->buffer);
             q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -782,7 +807,8 @@ static void bmi_recv_callback_fn(void *u
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size - 
+                bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = IO_MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -811,10 +837,10 @@ static void bmi_recv_callback_fn(void *u
                 tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < IO_BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= IO_BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
         if(bytes_processed == 0)
         {        
             qlist_del(&q_item->list_link);
@@ -828,7 +854,7 @@ static void bmi_recv_callback_fn(void *u
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            IO_BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1063,7 +1089,7 @@ static int bmi_send_callback_fn(void *us
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
-            IO_BUFFER_SIZE, BMI_SEND);
+            q_item->parent->buffer_size, BMI_SEND);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_send_callback_wrapper;
@@ -1091,7 +1117,8 @@ static int bmi_send_callback_fn(void *us
             result_tmp->offset_list;
         result_tmp->result.size_array = 
             result_tmp->size_list;
-        result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+        result_tmp->result.bytemax = q_item->parent->buffer_size 
+            - bytes_processed;
         result_tmp->result.bytes = 0;
         result_tmp->result.segmax = IO_MAX_REGIONS;
         result_tmp->result.segs = 0;
@@ -1123,10 +1150,10 @@ static int bmi_send_callback_fn(void *us
             q_item->buffer_used += old_result_tmp->result.bytes;
         }
 
-    }while(bytes_processed < IO_BUFFER_SIZE && 
+    }while(bytes_processed < flow_data->parent->buffer_size && 
         !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-    assert(bytes_processed <= IO_BUFFER_SIZE);
+    assert(bytes_processed <= flow_data->parent->buffer_size);
 
     /* important to update the sequence /after/ request processed */
     q_item->seq = flow_data->next_seq;
@@ -1326,7 +1353,7 @@ static void trove_write_callback_fn(void
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-            IO_BUFFER_SIZE, BMI_RECV);
+            q_item->parent->buffer_size, BMI_RECV);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -1358,7 +1385,8 @@ static void trove_write_callback_fn(void
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = IO_BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size 
+                - bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = IO_MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -1389,10 +1417,10 @@ static void trove_write_callback_fn(void
                     ((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < IO_BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= IO_BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
  
         flow_data->total_bytes_processed += bytes_processed;
 
@@ -1412,7 +1440,7 @@ static void trove_write_callback_fn(void
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            IO_BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1456,13 +1484,13 @@ static void cleanup_buffers(struct fp_pr
     if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->src.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    IO_BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_RECV);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1479,13 +1507,13 @@ static void cleanup_buffers(struct fp_pr
     else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->dest.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    IO_BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_SEND);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1505,7 +1533,7 @@ static void cleanup_buffers(struct fp_pr
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                flow_data->intermediate, IO_BUFFER_SIZE, BMI_SEND);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
         }
     }
     else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
@@ -1514,9 +1542,11 @@ static void cleanup_buffers(struct fp_pr
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->src.u.bmi.address,
-                flow_data->intermediate, IO_BUFFER_SIZE, BMI_RECV);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
         }
     }
+
+    free(flow_data->prealloc_array);
 }
 
 /* mem_to_bmi_callback()
@@ -1572,7 +1602,7 @@ static void mem_to_bmi_callback_fn(void 
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = IO_BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = IO_MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1588,14 +1618,14 @@ static void mem_to_bmi_callback_fn(void 
 
     /* was IO_MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < IO_BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->dest.u.bmi.address,
-                IO_BUFFER_SIZE, BMI_SEND);
+                flow_data->parent->buffer_size, BMI_SEND);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1613,7 +1643,7 @@ static void mem_to_bmi_callback_fn(void 
         do
         {
             q_item->result_chain.result.bytemax =
-                (IO_BUFFER_SIZE - bytes_processed);
+                (flow_data->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = IO_MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1636,10 +1666,10 @@ static void mem_to_bmi_callback_fn(void 
                 memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
                 bytes_processed += q_item->result_chain.size_list[i];
             }
-        }while(bytes_processed < IO_BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert (bytes_processed <= IO_BUFFER_SIZE);
+        assert (bytes_processed <= flow_data->parent->buffer_size);
 
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1759,7 +1789,7 @@ static void bmi_to_mem_callback_fn(void 
         do
         {
             q_item->result_chain.result.bytemax =
-                (IO_BUFFER_SIZE - bytes_processed);
+                (q_item->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = IO_MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1783,10 +1813,10 @@ static void bmi_to_mem_callback_fn(void 
                 memcpy(dest_ptr, src_ptr, region_size);
                 bytes_processed += region_size;
             }
-        }while(bytes_processed < IO_BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= IO_BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
     }
 
     /* are we done? */
@@ -1805,7 +1835,7 @@ static void bmi_to_mem_callback_fn(void 
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = IO_BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = IO_MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1820,22 +1850,22 @@ static void bmi_to_mem_callback_fn(void 
 
     /* was IO_MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < IO_BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->src.u.bmi.address,
-                IO_BUFFER_SIZE, BMI_RECV);
+                flow_data->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
         buffer_type = BMI_PRE_ALLOC;
-        q_item->buffer_used = IO_BUFFER_SIZE;
-        total_size = IO_BUFFER_SIZE;
+        q_item->buffer_used = flow_data->parent->buffer_size;
+        total_size = flow_data->parent->buffer_size;
         size_array = &q_item->buffer_used;
         segs = 1;
         /* we will copy data out on next iteration */



More information about the Pvfs2-cvs mailing list