[Pvfs2-developers] patches: tuning options

Sam Lang slang at mcs.anl.gov
Thu Aug 10 15:30:27 EDT 2006


On Aug 10, 2006, at 4:04 PM, Phil Carns wrote:

> flow-proto-tuning.patch:
> -----------
> This patch adds "FlowBufferSizeBytes" and "FlowBuffersPerFlow"  
> options to the configuration file format.  They allow you to  
> specify the buffer size that the default flow protocol will use as  
> well as the maximum number of buffers to use per flow.  Note that  
> if you change either of these parameters, then you need to remount  
> any active clients so that they pick up the configuration change  
> before performing any I/O.
>
> max-aio.patch:
> ----------
> This patch adds "TroveMaxConcurrentIO" to the configuration file  
> format.  It allows you to specify the maximum number of I/O  
> operations that trove will allow to proceed concurrently (currently  
> 16).  Note from the previous email regarding AIO that depending on  
> your access pattern, AIO may queue all of your operations anyway  
> regardless of this setting.  It probably doesn't have much effect  
> unless you are accessing more than one file at a time, or if you  
> are using an alternative to the stock AIO implementation.
>

I had made the same change in Julian's branch, there are still a  
couple things that aren't clear to me about this max value though.   
First, its a global value for all outstanding lio_listio calls the  
pvfs server makes, but based on your previous email comments about  
glibc's   one-thread-per-fd oddity, it seems like we only want that  
value to max out per datafile.  Also, after we hit the max we just  
queue the operations and post them once current ops have completed.   
If librt just queues ops and does them in FIFO order though, its  
pretty much the same thing.  Why not just let librt handle the  
queuing?  If we were to do ordering of the operations based on  
offsets, then it would make sense for us to queue, but we don't.    
Are we better at queueing than librt?

I know Julian was looking at performance of aio and found results  
somewhere (I don't have a reference, sorry) that showed lio_listio  
did better in cases where multiple fds were passed to one lio_listio  
operation (right now we just do one fd with multiple segments to one  
lio_listio).  I wonder if that difference is based on the glibc  
queuing behavior that you describe.  Just a curiousity, but I wonder  
if the aio performance would change if we were to post multiple trove  
operations in the same lio_listio call, or possibly even break up the  
bstream into multiple files based on strip size...sounds crazy  
right? :-)

-sam

> As a side note, I generally am not including any pvfs2-genconfig  
> changes with these types of patches, but I can make them available  
> if anyone is interested.  We typically add command line arguments  
> to pvfs2-genconfig for every parameter so that we can reproduce  
> consistent configuration files by calling pvfs2-genconfig from a  
> script, but I don't know if this is generally useful or just adds  
> clutter :)
>
> -Phil
> diff -Naur pvfs2/src/client/sysint/sys-io.sm pvfs2-new/src/client/ 
> sysint/sys-io.sm
> --- pvfs2/src/client/sysint/sys-io.sm	2006-06-16 19:22:47.000000000  
> +0200
> +++ pvfs2-new/src/client/sysint/sys-io.sm	2006-08-02  
> 23:26:24.000000000 +0200
> @@ -1639,6 +1639,7 @@
>      PVFS_object_attr *attr = NULL;
>      struct server_configuration_s *server_config = NULL;
>      unsigned long status_user_tag = 0;
> +    struct filesystem_configuration_s * fs_config;
>
>      gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
>
> @@ -1703,7 +1704,16 @@
>
>      status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
>
> -    server_config = PINT_get_server_config_struct(sm_p- 
> >object_ref.fs_id);
> +    server_config = PINT_get_server_config_struct(sm_p- 
> >object_ref.fs_id);
> +
> +    fs_config = PINT_config_find_fs_id(server_config, cur_ctx- 
> >msg.fs_id);
> +    if(fs_config)
> +    {
> +        /* pick up any buffer settings overrides from fs conf */
> +        cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
> +        cur_ctx->flow_desc.buffers_per_flow = fs_config- 
> >fp_buffers_per_flow;
> +    }
> +
>      ret = job_flow(
>          &cur_ctx->flow_desc, sm_p, status_user_tag,
>          &cur_ctx->flow_status, &cur_ctx->flow_job_id,
> diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ 
> common/misc/server-config.c
> --- pvfs2/src/common/misc/server-config.c	2006-08-02  
> 17:13:00.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.c	2006-08-02  
> 23:29:17.000000000 +0200
> @@ -67,6 +67,8 @@
>  static DOTCONF_CB(get_bmi_module_list);
>  static DOTCONF_CB(get_flow_module_list);
>  static DOTCONF_CB(get_handle_recycle_timeout_seconds);
> +static DOTCONF_CB(get_flow_buffer_size_bytes);
> +static DOTCONF_CB(get_flow_buffers_per_flow);
>  static DOTCONF_CB(get_attr_cache_keywords_list);
>  static DOTCONF_CB(get_attr_cache_size);
>  static DOTCONF_CB(get_attr_cache_max_num_elems);
> @@ -573,6 +575,14 @@
>      {"FlowModules",ARG_LIST, get_flow_module_list,NULL,
>          CTX_DEFAULTS|CTX_GLOBAL,"flowproto_multiqueue,"},
>
> +    /* buffer size to use for bulk data transfers */
> +    {"FlowBufferSizeBytes", ARG_INT,
> +         get_flow_buffer_size_bytes, NULL, CTX_FILESYSTEM,"262144"},
> +
> +    /* number of buffers to use for bulk data transfers */
> +    {"FlowBuffersPerFlow", ARG_INT,
> +         get_flow_buffers_per_flow, NULL, CTX_FILESYSTEM,"8"},
> +
>      /* The TROVE storage layer has a management component that  
> deals with
>       * allocating handle values for new metafiles and datafiles.   
> The underlying
>       * trove module can be given a hint to tell it how long to  
> wait before
> @@ -979,6 +989,8 @@
>      fs_conf->encoding = ENCODING_DEFAULT;
>      fs_conf->trove_sync_meta = TROVE_SYNC;
>      fs_conf->trove_sync_data = TROVE_SYNC;
> +    fs_conf->fp_buffer_size = -1;
> +    fs_conf->fp_buffers_per_flow = -1;
>
>      if (!config_s->file_systems)
>      {
> @@ -1382,6 +1394,31 @@
>  }
>
>
> +DOTCONF_CB(get_flow_buffer_size_bytes)
> +{
> +    struct filesystem_configuration_s *fs_conf = NULL;
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +
> +    fs_conf = (struct filesystem_configuration_s *)
> +        PINT_llist_head(config_s->file_systems);
> +    fs_conf->fp_buffer_size = cmd->data.value;
> +    return NULL;
> +}
> +
> +DOTCONF_CB(get_flow_buffers_per_flow)
> +{
> +    struct filesystem_configuration_s *fs_conf = NULL;
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +
> +    fs_conf = (struct filesystem_configuration_s *)
> +        PINT_llist_head(config_s->file_systems);
> +    fs_conf->fp_buffers_per_flow = cmd->data.value;
> +
> +    return NULL;
> +}
> +
>  DOTCONF_CB(get_attr_cache_keywords_list)
>  {
>      int i = 0, len = 0;
> @@ -2298,6 +2335,9 @@
>              src_fs->attr_cache_max_num_elems;
>          dest_fs->trove_sync_meta = src_fs->trove_sync_meta;
>          dest_fs->trove_sync_data = src_fs->trove_sync_data;
> +
> +        dest_fs->fp_buffer_size = src_fs->fp_buffer_size;
> +        dest_fs->fp_buffers_per_flow = src_fs->fp_buffers_per_flow;
>      }
>  }
>
> diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ 
> common/misc/server-config.h
> --- pvfs2/src/common/misc/server-config.h	2006-07-13  
> 07:11:40.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.h	2006-08-02  
> 23:28:17.000000000 +0200
> @@ -85,6 +85,9 @@
>      int coalescing_high_watermark;
>      int coalescing_low_watermark;
>
> +    int fp_buffer_size;
> +    int fp_buffers_per_flow;
> +
>  } filesystem_configuration_s;
>
>  typedef struct distribution_param_configuration_s
> diff -Naur pvfs2/src/io/flow/flow.c pvfs2-new/src/io/flow/flow.c
> --- pvfs2/src/io/flow/flow.c	2006-06-28 23:03:08.000000000 +0200
> +++ pvfs2-new/src/io/flow/flow.c	2006-08-02 23:26:24.000000000 +0200
> @@ -306,6 +306,8 @@
>      flow_d->aggregate_size = -1;
>      flow_d->state = FLOW_INITIAL;
>      flow_d->type = FLOWPROTO_DEFAULT;
> +    flow_d->buffers_per_flow = -1;
> +    flow_d->buffer_size = -1;
>
>      flow_d->flow_mutex = (tmp_mutex ? tmp_mutex : gen_mutex_build());
>      assert(flow_d->flow_mutex);
> diff -Naur pvfs2/src/io/flow/flow.h pvfs2-new/src/io/flow/flow.h
> --- pvfs2/src/io/flow/flow.h	2005-12-14 22:50:25.000000000 +0100
> +++ pvfs2-new/src/io/flow/flow.h	2006-08-02 23:26:24.000000000 +0200
> @@ -120,6 +120,10 @@
>      /* information about the datafile that this flow will access */
>      PINT_request_file_data file_data;
>
> +    /* the buffer settings may be ignored by some protocols */
> +    int buffer_size;            /* buffer size to use */
> +    int buffers_per_flow;       /* number of buffers to allow per  
> flow */
> +
>  	/***********************************************************/
>      /* fields that can be read publicly upon completion */
>
> diff -Naur pvfs2/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c
> --- pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c	 
> 2006-05-28 18:52:08.000000000 +0200
> +++ pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c	2006-08-02 23:26:24.000000000 +0200
> @@ -22,8 +22,12 @@
>  #include "pint-perf-counter.h"
>  #include "pvfs2-internal.h"
>
> +/* the following buffer settings are used by default if none are  
> specified in
> + * the flow descriptor
> + */
>  #define BUFFERS_PER_FLOW 8
>  #define BUFFER_SIZE (256*1024)
> +
>  #define MAX_REGIONS 64
>
>  #define FLOW_CLEANUP 
> (__flow_data)                                     \
> @@ -72,7 +76,7 @@
>  struct fp_private_data
>  {
>      flow_descriptor *parent;
> -    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
> +    struct fp_queue_item* prealloc_array;
>      struct qlist_head list_link;
>      PVFS_size total_bytes_processed;
>      int next_seq;
> @@ -540,7 +544,21 @@
>              PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
>      }
>
> -    for(i=0; i<BUFFERS_PER_FLOW; i++)
> +    if(flow_d->buffer_size < 1)
> +        flow_d->buffer_size = BUFFER_SIZE;
> +    if(flow_d->buffers_per_flow < 1)
> +        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
> +
> +    flow_data->prealloc_array = (struct fp_queue_item*)
> +        malloc(flow_d->buffers_per_flow*sizeof(struct  
> fp_queue_item));
> +    if(!flow_data->prealloc_array)
> +    {
> +        free(flow_data);
> +        return(-PVFS_ENOMEM);
> +    }
> +    memset(flow_data->prealloc_array, 0,
> +        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
> +    for(i=0; i<flow_d->buffers_per_flow; i++)
>      {
>          flow_data->prealloc_array[i].parent = flow_d;
>          flow_data->prealloc_array[i].bmi_callback.data =
> @@ -557,7 +575,7 @@
>          /* put all of the buffers on empty list, we don't really  
> do any
>           * queueing for this type of flow
>           */
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -583,7 +601,7 @@
>          /* put all of the buffers on empty list, we don't really  
> do any
>           * queueing for this type of flow
>           */
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -604,9 +622,9 @@
>      else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
>          flow_d->dest.endpoint_id == BMI_ENDPOINT)
>      {
> -        flow_data->initial_posts = BUFFERS_PER_FLOW;
> +        flow_data->initial_posts = flow_d->buffers_per_flow;
>          gen_mutex_lock(flow_data->parent->flow_mutex);
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
>                  "flowproto-multiqueue forcing bmi_send_callback_fn. 
> \n");
> @@ -635,7 +653,7 @@
>          flow_data->initial_posts = 1;
>
>          /* place remaining buffers on "empty" queue */
> -        for(i=1; i<BUFFERS_PER_FLOW; i++)
> +        for(i=1; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -760,7 +778,7 @@
>          {
>              /* if the q_item has not been used, allocate a buffer */
>              q_item->buffer = BMI_memalloc(q_item->parent- 
> >src.u.bmi.address,
> -                BUFFER_SIZE, BMI_RECV);
> +                q_item->parent->buffer_size, BMI_RECV);
>              /* TODO: error handling */
>              assert(q_item->buffer);
>              q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> @@ -784,7 +802,8 @@
>                  result_tmp->offset_list;
>              result_tmp->result.size_array =
>                  result_tmp->size_list;
> -            result_tmp->result.bytemax = BUFFER_SIZE -  
> bytes_processed;
> +            result_tmp->result.bytemax = flow_data->parent- 
> >buffer_size -
> +                bytes_processed;
>              result_tmp->result.bytes = 0;
>              result_tmp->result.segmax = MAX_REGIONS;
>              result_tmp->result.segs = 0;
> @@ -813,10 +832,10 @@
>                  tmp_buffer = (void*)((char*)tmp_buffer +  
> old_result_tmp->result.bytes);
>                  bytes_processed += old_result_tmp->result.bytes;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>          if(bytes_processed == 0)
>          {
>              qlist_del(&q_item->list_link);
> @@ -830,7 +849,7 @@
>          ret = BMI_post_recv(&q_item->posted_id,
>              q_item->parent->src.u.bmi.address,
>              q_item->buffer,
> -            BUFFER_SIZE,
> +            flow_data->parent->buffer_size,
>              &tmp_actual_size,
>              BMI_PRE_ALLOC,
>              q_item->parent->tag,
> @@ -1065,7 +1084,7 @@
>      {
>          /* if the q_item has not been used, allocate a buffer */
>          q_item->buffer = BMI_memalloc(q_item->parent- 
> >dest.u.bmi.address,
> -            BUFFER_SIZE, BMI_SEND);
> +            q_item->parent->buffer_size, BMI_SEND);
>          /* TODO: error handling */
>          assert(q_item->buffer);
>          q_item->bmi_callback.fn = bmi_send_callback_wrapper;
> @@ -1093,7 +1112,8 @@
>              result_tmp->offset_list;
>          result_tmp->result.size_array =
>              result_tmp->size_list;
> -        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
> +        result_tmp->result.bytemax = q_item->parent->buffer_size
> +            - bytes_processed;
>          result_tmp->result.bytes = 0;
>          result_tmp->result.segmax = MAX_REGIONS;
>          result_tmp->result.segs = 0;
> @@ -1125,10 +1145,10 @@
>              q_item->buffer_used += old_result_tmp->result.bytes;
>          }
>
> -    }while(bytes_processed < BUFFER_SIZE &&
> +    }while(bytes_processed < flow_data->parent->buffer_size &&
>          !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -    assert(bytes_processed <= BUFFER_SIZE);
> +    assert(bytes_processed <= flow_data->parent->buffer_size);
>
>      /* important to update the sequence /after/ request processed */
>      q_item->seq = flow_data->next_seq;
> @@ -1328,7 +1348,7 @@
>      {
>          /* if the q_item has not been used, allocate a buffer */
>          q_item->buffer = BMI_memalloc(q_item->parent- 
> >src.u.bmi.address,
> -            BUFFER_SIZE, BMI_RECV);
> +            q_item->parent->buffer_size, BMI_RECV);
>          /* TODO: error handling */
>          assert(q_item->buffer);
>          q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> @@ -1360,7 +1380,8 @@
>                  result_tmp->offset_list;
>              result_tmp->result.size_array =
>                  result_tmp->size_list;
> -            result_tmp->result.bytemax = BUFFER_SIZE -  
> bytes_processed;
> +            result_tmp->result.bytemax = flow_data->parent- 
> >buffer_size
> +                - bytes_processed;
>              result_tmp->result.bytes = 0;
>              result_tmp->result.segmax = MAX_REGIONS;
>              result_tmp->result.segs = 0;
> @@ -1391,10 +1412,10 @@
>                      ((char*)tmp_buffer + old_result_tmp- 
> >result.bytes);
>                  bytes_processed += old_result_tmp->result.bytes;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>
>          flow_data->total_bytes_processed += bytes_processed;
>
> @@ -1414,7 +1435,7 @@
>          ret = BMI_post_recv(&q_item->posted_id,
>              q_item->parent->src.u.bmi.address,
>              q_item->buffer,
> -            BUFFER_SIZE,
> +            flow_data->parent->buffer_size,
>              &tmp_actual_size,
>              BMI_PRE_ALLOC,
>              q_item->parent->tag,
> @@ -1458,13 +1479,13 @@
>      if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
>          flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
>      {
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
>          {
>              if(flow_data->prealloc_array[i].buffer)
>              {
>                  BMI_memfree(flow_data->parent->src.u.bmi.address,
>                      flow_data->prealloc_array[i].buffer,
> -                    BUFFER_SIZE,
> +                    flow_data->parent->buffer_size,
>                      BMI_RECV);
>              }
>              result_tmp = &(flow_data->prealloc_array 
> [i].result_chain);
> @@ -1481,13 +1502,13 @@
>      else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
>          flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
>      {
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
>          {
>              if(flow_data->prealloc_array[i].buffer)
>              {
>                  BMI_memfree(flow_data->parent->dest.u.bmi.address,
>                      flow_data->prealloc_array[i].buffer,
> -                    BUFFER_SIZE,
> +                    flow_data->parent->buffer_size,
>                      BMI_SEND);
>              }
>              result_tmp = &(flow_data->prealloc_array 
> [i].result_chain);
> @@ -1507,7 +1528,7 @@
>          if(flow_data->intermediate)
>          {
>              BMI_memfree(flow_data->parent->dest.u.bmi.address,
> -                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
> +                flow_data->intermediate, flow_data->parent- 
> >buffer_size, BMI_SEND);
>          }
>      }
>      else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
> @@ -1516,9 +1537,11 @@
>          if(flow_data->intermediate)
>          {
>              BMI_memfree(flow_data->parent->src.u.bmi.address,
> -                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
> +                flow_data->intermediate, flow_data->parent- 
> >buffer_size, BMI_RECV);
>          }
>      }
> +
> +    free(flow_data->prealloc_array);
>  }
>
>  /* mem_to_bmi_callback()
> @@ -1574,7 +1597,7 @@
>          q_item->result_chain.offset_list;
>      q_item->result_chain.result.size_array =
>          q_item->result_chain.size_list;
> -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> +    q_item->result_chain.result.bytemax = flow_data->parent- 
> >buffer_size;
>      q_item->result_chain.result.bytes = 0;
>      q_item->result_chain.result.segmax = MAX_REGIONS;
>      q_item->result_chain.result.segs = 0;
> @@ -1590,14 +1613,14 @@
>
>      /* was MAX_REGIONS enough to satisfy this step? */
>      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> +        q_item->result_chain.result.bytes < flow_data->parent- 
> >buffer_size)
>      {
>          /* create an intermediate buffer */
>          if(!flow_data->intermediate)
>          {
>              flow_data->intermediate = BMI_memalloc(
>                  flow_data->parent->dest.u.bmi.address,
> -                BUFFER_SIZE, BMI_SEND);
> +                flow_data->parent->buffer_size, BMI_SEND);
>              /* TODO: error handling */
>              assert(flow_data->intermediate);
>          }
> @@ -1615,7 +1638,7 @@
>          do
>          {
>              q_item->result_chain.result.bytemax =
> -                (BUFFER_SIZE - bytes_processed);
> +                (flow_data->parent->buffer_size - bytes_processed);
>              q_item->result_chain.result.bytes = 0;
>              q_item->result_chain.result.segmax = MAX_REGIONS;
>              q_item->result_chain.result.segs = 0;
> @@ -1638,10 +1661,10 @@
>                  memcpy(dest_ptr, src_ptr, q_item- 
> >result_chain.size_list[i]);
>                  bytes_processed += q_item->result_chain.size_list[i];
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert (bytes_processed <= BUFFER_SIZE);
> +        assert (bytes_processed <= flow_data->parent->buffer_size);
>
>          /* setup for BMI operation */
>          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
> @@ -1761,7 +1784,7 @@
>          do
>          {
>              q_item->result_chain.result.bytemax =
> -                (BUFFER_SIZE - bytes_processed);
> +                (q_item->parent->buffer_size - bytes_processed);
>              q_item->result_chain.result.bytes = 0;
>              q_item->result_chain.result.segmax = MAX_REGIONS;
>              q_item->result_chain.result.segs = 0;
> @@ -1785,10 +1808,10 @@
>                  memcpy(dest_ptr, src_ptr, region_size);
>                  bytes_processed += region_size;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>      }
>
>      /* are we done? */
> @@ -1807,7 +1830,7 @@
>          q_item->result_chain.offset_list;
>      q_item->result_chain.result.size_array =
>          q_item->result_chain.size_list;
> -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> +    q_item->result_chain.result.bytemax = flow_data->parent- 
> >buffer_size;
>      q_item->result_chain.result.bytes = 0;
>      q_item->result_chain.result.segmax = MAX_REGIONS;
>      q_item->result_chain.result.segs = 0;
> @@ -1822,22 +1845,22 @@
>
>      /* was MAX_REGIONS enough to satisfy this step? */
>      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> +        q_item->result_chain.result.bytes < flow_data->parent- 
> >buffer_size)
>      {
>          /* create an intermediate buffer */
>          if(!flow_data->intermediate)
>          {
>              flow_data->intermediate = BMI_memalloc(
>                  flow_data->parent->src.u.bmi.address,
> -                BUFFER_SIZE, BMI_RECV);
> +                flow_data->parent->buffer_size, BMI_RECV);
>              /* TODO: error handling */
>              assert(flow_data->intermediate);
>          }
>          /* setup for BMI operation */
>          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
>          buffer_type = BMI_PRE_ALLOC;
> -        q_item->buffer_used = BUFFER_SIZE;
> -        total_size = BUFFER_SIZE;
> +        q_item->buffer_used = flow_data->parent->buffer_size;
> +        total_size = flow_data->parent->buffer_size;
>          size_array = &q_item->buffer_used;
>          segs = 1;
>          /* we will copy data out on next iteration */
> diff -Naur pvfs2/src/server/io.sm pvfs2-new/src/server/io.sm
> --- pvfs2/src/server/io.sm	2006-06-05 21:57:28.000000000 +0200
> +++ pvfs2-new/src/server/io.sm	2006-08-02 23:27:24.000000000 +0200
> @@ -162,6 +162,7 @@
>      int err = -PVFS_EIO;
>      job_id_t tmp_id;
>      struct server_configuration_s *user_opts =  
> get_server_config_struct();
> +    struct filesystem_configuration_s *fs_conf;
>
>      s_op->u.io.flow_d = PINT_flow_alloc();
>      if (!s_op->u.io.flow_d)
> @@ -201,6 +202,15 @@
>      s_op->u.io.flow_d->user_ptr = NULL;
>      s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
>
> +    fs_conf = PINT_config_find_fs_id(user_opts,
> +        s_op->req->u.io.fs_id);
> +    if(fs_conf)
> +    {
> +        /* pick up any buffer settings overrides from fs conf */
> +        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
> +        s_op->u.io.flow_d->buffers_per_flow = fs_conf- 
> >fp_buffers_per_flow;
> +    }
> +
>      gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, "
>          "server_nr: %d, server_ct: %d\n",
>          lld(s_op->u.io.flow_d->file_data.fsize),
> diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ 
> common/misc/server-config.c
> --- pvfs2/src/common/misc/server-config.c	2006-08-02  
> 17:13:00.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.c	2006-08-03  
> 23:10:28.000000000 +0200
> @@ -73,6 +73,7 @@
>  static DOTCONF_CB(get_trove_sync_meta);
>  static DOTCONF_CB(get_trove_sync_data);
>  static DOTCONF_CB(get_db_cache_size_bytes);
> +static DOTCONF_CB(get_trove_max_concurrent_io);
>  static DOTCONF_CB(get_db_cache_type);
>  static DOTCONF_CB(get_param);
>  static DOTCONF_CB(get_value);
> @@ -423,6 +424,12 @@
>      {"ID",ARG_INT, get_filesystem_collid,NULL,
>          CTX_FILESYSTEM,NULL},
>
> +    /* maximum number of AIO operations that Trove will allow to run
> +     * concurrently
> +     */
> +    {"TroveMaxConcurrentIO", ARG_INT, get_trove_max_concurrent_io,  
> NULL,
> +        CTX_DEFAULTS|CTX_GLOBAL,"16"},
> +
>      /* The gossip interface in pvfs allows users to specify different
>       * levels of logging for the pvfs server.  The output of these
>       * different log levels is written to a file, which is  
> specified in
> @@ -742,6 +749,7 @@
>      config_s->client_job_flow_timeout =  
> PVFS2_CLIENT_JOB_FLOW_TIMEOUT_DEFAULT;
>      config_s->client_retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
>      config_s->client_retry_delay_ms =  
> PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
> +    config_s->trove_max_concurrent_io = 16;
>
>      if (cache_config_files(
>              config_s, global_config_filename,  
> server_config_filename))
> @@ -1548,6 +1556,14 @@
>      return NULL;
>  }
>
> +DOTCONF_CB(get_trove_max_concurrent_io)
> +{
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +    config_s->trove_max_concurrent_io = cmd->data.value;
> +    return NULL;
> +}
> +
>  DOTCONF_CB(get_db_cache_type)
>  {
>      struct server_configuration_s *config_s =
> diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ 
> common/misc/server-config.h
> --- pvfs2/src/common/misc/server-config.h	2006-07-13  
> 07:11:40.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.h	2006-08-03  
> 23:26:23.000000000 +0200
> @@ -147,6 +147,8 @@
>                                         if zero, use defaults */
>      char * db_cache_type;
>
> +    int trove_max_concurrent_io;    /* maximum number of  
> simultaneous I/O ops */
> +
>  } server_configuration_s;
>
>  int PINT_parse_config(
> diff -Naur pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c pvfs2-new/ 
> src/io/trove/trove-dbpf/dbpf-bstream.c
> --- pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-06-23  
> 22:59:29.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-08-03  
> 23:05:37.000000000 +0200
> @@ -31,7 +31,7 @@
>
>  #define AIOCB_ARRAY_SZ 64
>
> -#define DBPF_MAX_IOS_IN_PROGRESS  16
> +extern int TROVE_max_concurrent_io;
>  static int s_dbpf_ios_in_progress = 0;
>  static dbpf_op_queue_p s_dbpf_io_ready_queue = NULL;
>  static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
> @@ -277,9 +277,6 @@
>                 (cur_op->op.type == BSTREAM_WRITE_LIST));
>          dbpf_op_queue_remove(cur_op);
>
> -        assert(s_dbpf_ios_in_progress <
> -               (DBPF_MAX_IOS_IN_PROGRESS + 1));
> -
>          gossip_debug(GOSSIP_TROVE_DEBUG, "starting delayed I/O "
>                       "operation %p (%d in progress)\n", cur_op,
>                       s_dbpf_ios_in_progress);
> @@ -363,7 +360,7 @@
>      {
>          s_dbpf_ios_in_progress--;
>      }
> -    if (s_dbpf_ios_in_progress < DBPF_MAX_IOS_IN_PROGRESS)
> +    if (s_dbpf_ios_in_progress < TROVE_max_concurrent_io)
>      {
>          s_dbpf_ios_in_progress++;
>      }
> diff -Naur pvfs2/src/io/trove/trove.c pvfs2-new/src/io/trove/trove.c
> --- pvfs2/src/io/trove/trove.c	2006-06-16 23:01:13.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove.c	2006-08-03 23:08:08.000000000 +0200
> @@ -31,6 +31,7 @@
>
>  int TROVE_db_cache_size_bytes = 0;
>  int TROVE_shm_key_hint = 0;
> +int TROVE_max_concurrent_io = 16;
>
>  /** Initiate reading from a contiguous region in a bstream into a
>   *  contiguous region in memory.
> @@ -964,6 +965,11 @@
>          TROVE_shm_key_hint = *((int*)parameter);
>  	return(0);
>      }
> +    if(option == TROVE_MAX_CONCURRENT_IO)
> +    {
> +        TROVE_max_concurrent_io = *((int*)parameter);
> +	return(0);
> +    }
>
>      method_id = map_coll_id_to_method(coll_id);
>      if (method_id < 0) {
> diff -Naur pvfs2/src/io/trove/trove.h pvfs2-new/src/io/trove/trove.h
> --- pvfs2/src/io/trove/trove.h	2006-07-13 07:11:41.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove.h	2006-08-03 23:07:37.000000000 +0200
> @@ -72,6 +72,7 @@
>      TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS,
>      TROVE_COLLECTION_ATTR_CACHE_INITIALIZE,
>      TROVE_DB_CACHE_SIZE_BYTES,
> +    TROVE_MAX_CONCURRENT_IO,
>      TROVE_COLLECTION_COALESCING_HIGH_WATERMARK,
>      TROVE_COLLECTION_COALESCING_LOW_WATERMARK,
>      TROVE_COLLECTION_META_SYNC_MODE,
> diff -Naur pvfs2/src/server/pvfs2-server.c pvfs2-new/src/server/ 
> pvfs2-server.c
> --- pvfs2/src/server/pvfs2-server.c	2006-07-13 07:11:42.000000000  
> +0200
> +++ pvfs2-new/src/server/pvfs2-server.c	2006-08-03  
> 23:07:01.000000000 +0200
> @@ -950,6 +950,10 @@
>                                      
> &server_config.db_cache_size_bytes);
>      /* this should never fail */
>      assert(ret == 0);
> +    ret = trove_collection_setinfo(0, 0, TROVE_MAX_CONCURRENT_IO,
> +        &server_config.trove_max_concurrent_io);
> +    /* this should never fail */
> +    assert(ret == 0);
>
>      /* parse port number and allow trove to use it to help  
> differentiate
>       * shmem regions if needed
> _______________________________________________
> Pvfs2-developers mailing list
> Pvfs2-developers at beowulf-underground.org
> http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers



More information about the Pvfs2-developers mailing list