[Pvfs2-cvs] commit by slang in pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Tue Aug 15 16:24:26 EDT 2006


Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv18021/src/io/flow/flowproto-bmi-trove

Modified Files:
	flowproto-multiqueue.c 
Log Message:
[phil]: flow-proto-tuning: This patch adds "FlowBufferSizeBytes" and "FlowBuffersPerFlow" options to the configuration file format.  They allow you to specify the buffer size that the default flow protocol will use as well as the maximum number of buffers to use per flow.  Note that if you change either of these parameters, then you need to remount any active clients so that they pick up the configuration change before performing any I/O.

[phil]: max-aio: This patch adds "TroveMaxConcurrentIO" to the configuration file format.  It allows you to specify the maximum number of I/O operations that trove will allow to proceed concurrently (currently 16).  Note from the previous email regarding AIO that depending on your access pattern, AIO may queue all of your operations anyway regardless of this setting.  It probably doesn't have much effect unless you are accessing more than one file at a time, or if you are using an alternative to the stock AIO implementation.


Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.111 -r1.112
--- flowproto-multiqueue.c	11 Aug 2006 19:18:05 -0000	1.111
+++ flowproto-multiqueue.c	15 Aug 2006 20:24:26 -0000	1.112
@@ -22,8 +22,12 @@
 #include "pint-perf-counter.h"
 #include "pvfs2-internal.h"
 
+/* the following buffer settings are used by default if none are specified in
+ * the flow descriptor
+ */
 #define BUFFERS_PER_FLOW 8
 #define BUFFER_SIZE (256*1024)
+
 #define MAX_REGIONS 64
 
 #define FLOW_CLEANUP(__flow_data)                                     \
@@ -72,7 +76,7 @@ struct fp_queue_item
 struct fp_private_data
 {
     flow_descriptor *parent;
-    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
+    struct fp_queue_item* prealloc_array;
     struct qlist_head list_link;
     PVFS_size total_bytes_processed;
     int next_seq;
@@ -543,7 +547,21 @@ int fp_multiqueue_post(flow_descriptor  
             PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
     }
 
-    for(i=0; i<BUFFERS_PER_FLOW; i++)
+    if(flow_d->buffer_size < 1)
+        flow_d->buffer_size = BUFFER_SIZE;
+    if(flow_d->buffers_per_flow < 1)
+        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
+        
+    flow_data->prealloc_array = (struct fp_queue_item*)
+        malloc(flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    if(!flow_data->prealloc_array)
+    {
+        free(flow_data);
+        return(-PVFS_ENOMEM);
+    }
+    memset(flow_data->prealloc_array, 0,
+        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
+    for(i=0; i<flow_d->buffers_per_flow; i++)
     {
         flow_data->prealloc_array[i].parent = flow_d;
         flow_data->prealloc_array[i].bmi_callback.data = 
@@ -560,7 +578,7 @@ int fp_multiqueue_post(flow_descriptor  
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -586,7 +604,7 @@ int fp_multiqueue_post(flow_descriptor  
         /* put all of the buffers on empty list, we don't really do any
          * queueing for this type of flow
          */
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -607,9 +625,9 @@ int fp_multiqueue_post(flow_descriptor  
     else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
         flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
-        flow_data->initial_posts = BUFFERS_PER_FLOW;
+        flow_data->initial_posts = flow_d->buffers_per_flow;
         gen_mutex_lock(flow_data->parent->flow_mutex);
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue forcing bmi_send_callback_fn.\n");
@@ -638,7 +656,7 @@ int fp_multiqueue_post(flow_descriptor  
         flow_data->initial_posts = 1;
 
         /* place remaining buffers on "empty" queue */
-        for(i=1; i<BUFFERS_PER_FLOW; i++)
+        for(i=1; i<flow_d->buffers_per_flow; i++)
         {
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
@@ -765,7 +783,7 @@ static void bmi_recv_callback_fn(void *u
         {
             /* if the q_item has not been used, allocate a buffer */
             q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                q_item->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(q_item->buffer);
             q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -789,7 +807,8 @@ static void bmi_recv_callback_fn(void *u
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size - 
+                bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -818,10 +837,10 @@ static void bmi_recv_callback_fn(void *u
                 tmp_buffer = (void*)((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
         if(bytes_processed == 0)
         {        
             qlist_del(&q_item->list_link);
@@ -835,7 +854,7 @@ static void bmi_recv_callback_fn(void *u
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1070,7 +1089,7 @@ static int bmi_send_callback_fn(void *us
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
-            BUFFER_SIZE, BMI_SEND);
+            q_item->parent->buffer_size, BMI_SEND);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_send_callback_wrapper;
@@ -1098,7 +1117,8 @@ static int bmi_send_callback_fn(void *us
             result_tmp->offset_list;
         result_tmp->result.size_array = 
             result_tmp->size_list;
-        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+        result_tmp->result.bytemax = q_item->parent->buffer_size 
+            - bytes_processed;
         result_tmp->result.bytes = 0;
         result_tmp->result.segmax = MAX_REGIONS;
         result_tmp->result.segs = 0;
@@ -1130,10 +1150,10 @@ static int bmi_send_callback_fn(void *us
             q_item->buffer_used += old_result_tmp->result.bytes;
         }
 
-    }while(bytes_processed < BUFFER_SIZE && 
+    }while(bytes_processed < flow_data->parent->buffer_size && 
         !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-    assert(bytes_processed <= BUFFER_SIZE);
+    assert(bytes_processed <= flow_data->parent->buffer_size);
 
     /* important to update the sequence /after/ request processed */
     q_item->seq = flow_data->next_seq;
@@ -1333,7 +1353,7 @@ static void trove_write_callback_fn(void
     {
         /* if the q_item has not been used, allocate a buffer */
         q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
-            BUFFER_SIZE, BMI_RECV);
+            q_item->parent->buffer_size, BMI_RECV);
         /* TODO: error handling */
         assert(q_item->buffer);
         q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
@@ -1365,7 +1385,8 @@ static void trove_write_callback_fn(void
                 result_tmp->offset_list;
             result_tmp->result.size_array = 
                 result_tmp->size_list;
-            result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
+            result_tmp->result.bytemax = flow_data->parent->buffer_size 
+                - bytes_processed;
             result_tmp->result.bytes = 0;
             result_tmp->result.segmax = MAX_REGIONS;
             result_tmp->result.segs = 0;
@@ -1396,10 +1417,10 @@ static void trove_write_callback_fn(void
                     ((char*)tmp_buffer + old_result_tmp->result.bytes);
                 bytes_processed += old_result_tmp->result.bytes;
             }
-        }while(bytes_processed < BUFFER_SIZE && 
+        }while(bytes_processed < flow_data->parent->buffer_size && 
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
  
         flow_data->total_bytes_processed += bytes_processed;
 
@@ -1419,7 +1440,7 @@ static void trove_write_callback_fn(void
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
             q_item->buffer,
-            BUFFER_SIZE,
+            flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
@@ -1463,13 +1484,13 @@ static void cleanup_buffers(struct fp_pr
     if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->src.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_RECV);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1486,13 +1507,13 @@ static void cleanup_buffers(struct fp_pr
     else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
         flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
     {
-        for(i=0; i<BUFFERS_PER_FLOW; i++)
+        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
         {
             if(flow_data->prealloc_array[i].buffer)
             {
                 BMI_memfree(flow_data->parent->dest.u.bmi.address,
                     flow_data->prealloc_array[i].buffer,
-                    BUFFER_SIZE,
+                    flow_data->parent->buffer_size,
                     BMI_SEND);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
@@ -1512,7 +1533,7 @@ static void cleanup_buffers(struct fp_pr
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->dest.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_SEND);
         }
     }
     else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
@@ -1521,9 +1542,11 @@ static void cleanup_buffers(struct fp_pr
         if(flow_data->intermediate)
         {
             BMI_memfree(flow_data->parent->src.u.bmi.address,
-                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
+                flow_data->intermediate, flow_data->parent->buffer_size, BMI_RECV);
         }
     }
+
+    free(flow_data->prealloc_array);
 }
 
 /* mem_to_bmi_callback()
@@ -1579,7 +1602,7 @@ static void mem_to_bmi_callback_fn(void 
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1595,14 +1618,14 @@ static void mem_to_bmi_callback_fn(void 
 
     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->dest.u.bmi.address,
-                BUFFER_SIZE, BMI_SEND);
+                flow_data->parent->buffer_size, BMI_SEND);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1620,7 +1643,7 @@ static void mem_to_bmi_callback_fn(void 
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (flow_data->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1643,10 +1666,10 @@ static void mem_to_bmi_callback_fn(void 
                 memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
                 bytes_processed += q_item->result_chain.size_list[i];
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert (bytes_processed <= BUFFER_SIZE);
+        assert (bytes_processed <= flow_data->parent->buffer_size);
 
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
@@ -1766,7 +1789,7 @@ static void bmi_to_mem_callback_fn(void 
         do
         {
             q_item->result_chain.result.bytemax =
-                (BUFFER_SIZE - bytes_processed);
+                (q_item->parent->buffer_size - bytes_processed);
             q_item->result_chain.result.bytes = 0;
             q_item->result_chain.result.segmax = MAX_REGIONS;
             q_item->result_chain.result.segs = 0;
@@ -1790,10 +1813,10 @@ static void bmi_to_mem_callback_fn(void 
                 memcpy(dest_ptr, src_ptr, region_size);
                 bytes_processed += region_size;
             }
-        }while(bytes_processed < BUFFER_SIZE &&
+        }while(bytes_processed < flow_data->parent->buffer_size &&
             !PINT_REQUEST_DONE(q_item->parent->file_req_state));
 
-        assert(bytes_processed <= BUFFER_SIZE);
+        assert(bytes_processed <= flow_data->parent->buffer_size);
     }
 
     /* are we done? */
@@ -1812,7 +1835,7 @@ static void bmi_to_mem_callback_fn(void 
         q_item->result_chain.offset_list;
     q_item->result_chain.result.size_array = 
         q_item->result_chain.size_list;
-    q_item->result_chain.result.bytemax = BUFFER_SIZE;
+    q_item->result_chain.result.bytemax = flow_data->parent->buffer_size;
     q_item->result_chain.result.bytes = 0;
     q_item->result_chain.result.segmax = MAX_REGIONS;
     q_item->result_chain.result.segs = 0;
@@ -1827,22 +1850,22 @@ static void bmi_to_mem_callback_fn(void 
 
     /* was MAX_REGIONS enough to satisfy this step? */
     if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
-        q_item->result_chain.result.bytes < BUFFER_SIZE)
+        q_item->result_chain.result.bytes < flow_data->parent->buffer_size)
     {
         /* create an intermediate buffer */
         if(!flow_data->intermediate)
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->src.u.bmi.address,
-                BUFFER_SIZE, BMI_RECV);
+                flow_data->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
         /* setup for BMI operation */
         flow_data->tmp_buffer_list[0] = flow_data->intermediate;
         buffer_type = BMI_PRE_ALLOC;
-        q_item->buffer_used = BUFFER_SIZE;
-        total_size = BUFFER_SIZE;
+        q_item->buffer_used = flow_data->parent->buffer_size;
+        total_size = flow_data->parent->buffer_size;
         size_array = &q_item->buffer_used;
         segs = 1;
         /* we will copy data out on next iteration */



More information about the Pvfs2-cvs mailing list