[Pvfs2-developers] patches: tuning options

Sam Lang slang at mcs.anl.gov
Tue Aug 15 16:28:56 EDT 2006


Phil,

I just went ahead and committed these changes to trunk.  Thanks for  
the patches!

-sam

On Aug 10, 2006, at 4:04 PM, Phil Carns wrote:

> flow-proto-tuning.patch:
> -----------
> This patch adds "FlowBufferSizeBytes" and "FlowBuffersPerFlow"  
> options to the configuration file format.  They allow you to  
> specify the buffer size that the default flow protocol will use as  
> well as the maximum number of buffers to use per flow.  Note that  
> if you change either of these parameters, then you need to remount  
> any active clients so that they pick up the configuration change  
> before performing any I/O.
>
> max-aio.patch:
> ----------
> This patch adds "TroveMaxConcurrentIO" to the configuration file  
> format.  It allows you to specify the maximum number of I/O  
> operations that trove will allow to proceed concurrently (currently  
> 16).  Note from the previous email regarding AIO that depending on  
> your access pattern, AIO may queue all of your operations anyway  
> regardless of this setting.  It probably doesn't have much effect  
> unless you are accessing more than one file at a time, or if you  
> are using an alternative to the stock AIO implementation.
>
> As a side note, I generally am not including any pvfs2-genconfig  
> changes with these types of patches, but I can make them available  
> if anyone is interested.  We typically add command line arguments  
> to pvfs2-genconfig for every parameter so that we can reproduce  
> consistent configuration files by calling pvfs2-genconfig from a  
> script, but I don't know if this is generally useful or just adds  
> clutter :)
>
> -Phil
> diff -Naur pvfs2/src/client/sysint/sys-io.sm pvfs2-new/src/client/ 
> sysint/sys-io.sm
> --- pvfs2/src/client/sysint/sys-io.sm	2006-06-16 19:22:47.000000000  
> +0200
> +++ pvfs2-new/src/client/sysint/sys-io.sm	2006-08-02  
> 23:26:24.000000000 +0200
> @@ -1639,6 +1639,7 @@
>      PVFS_object_attr *attr = NULL;
>      struct server_configuration_s *server_config = NULL;
>      unsigned long status_user_tag = 0;
> +    struct filesystem_configuration_s * fs_config;
>
>      gossip_debug(GOSSIP_IO_DEBUG, "- build_context_flow called\n");
>
> @@ -1703,7 +1704,16 @@
>
>      status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
>
> -    server_config = PINT_get_server_config_struct(sm_p- 
> >object_ref.fs_id);
> +    server_config = PINT_get_server_config_struct(sm_p- 
> >object_ref.fs_id);
> +
> +    fs_config = PINT_config_find_fs_id(server_config, cur_ctx- 
> >msg.fs_id);
> +    if(fs_config)
> +    {
> +        /* pick up any buffer settings overrides from fs conf */
> +        cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
> +        cur_ctx->flow_desc.buffers_per_flow = fs_config- 
> >fp_buffers_per_flow;
> +    }
> +
>      ret = job_flow(
>          &cur_ctx->flow_desc, sm_p, status_user_tag,
>          &cur_ctx->flow_status, &cur_ctx->flow_job_id,
> diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ 
> common/misc/server-config.c
> --- pvfs2/src/common/misc/server-config.c	2006-08-02  
> 17:13:00.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.c	2006-08-02  
> 23:29:17.000000000 +0200
> @@ -67,6 +67,8 @@
>  static DOTCONF_CB(get_bmi_module_list);
>  static DOTCONF_CB(get_flow_module_list);
>  static DOTCONF_CB(get_handle_recycle_timeout_seconds);
> +static DOTCONF_CB(get_flow_buffer_size_bytes);
> +static DOTCONF_CB(get_flow_buffers_per_flow);
>  static DOTCONF_CB(get_attr_cache_keywords_list);
>  static DOTCONF_CB(get_attr_cache_size);
>  static DOTCONF_CB(get_attr_cache_max_num_elems);
> @@ -573,6 +575,14 @@
>      {"FlowModules",ARG_LIST, get_flow_module_list,NULL,
>          CTX_DEFAULTS|CTX_GLOBAL,"flowproto_multiqueue,"},
>
> +    /* buffer size to use for bulk data transfers */
> +    {"FlowBufferSizeBytes", ARG_INT,
> +         get_flow_buffer_size_bytes, NULL, CTX_FILESYSTEM,"262144"},
> +
> +    /* number of buffers to use for bulk data transfers */
> +    {"FlowBuffersPerFlow", ARG_INT,
> +         get_flow_buffers_per_flow, NULL, CTX_FILESYSTEM,"8"},
> +
>      /* The TROVE storage layer has a management component that  
> deals with
>       * allocating handle values for new metafiles and datafiles.   
> The underlying
>       * trove module can be given a hint to tell it how long to  
> wait before
> @@ -979,6 +989,8 @@
>      fs_conf->encoding = ENCODING_DEFAULT;
>      fs_conf->trove_sync_meta = TROVE_SYNC;
>      fs_conf->trove_sync_data = TROVE_SYNC;
> +    fs_conf->fp_buffer_size = -1;
> +    fs_conf->fp_buffers_per_flow = -1;
>
>      if (!config_s->file_systems)
>      {
> @@ -1382,6 +1394,31 @@
>  }
>
>
> +DOTCONF_CB(get_flow_buffer_size_bytes)
> +{
> +    struct filesystem_configuration_s *fs_conf = NULL;
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +
> +    fs_conf = (struct filesystem_configuration_s *)
> +        PINT_llist_head(config_s->file_systems);
> +    fs_conf->fp_buffer_size = cmd->data.value;
> +    return NULL;
> +}
> +
> +DOTCONF_CB(get_flow_buffers_per_flow)
> +{
> +    struct filesystem_configuration_s *fs_conf = NULL;
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +
> +    fs_conf = (struct filesystem_configuration_s *)
> +        PINT_llist_head(config_s->file_systems);
> +    fs_conf->fp_buffers_per_flow = cmd->data.value;
> +
> +    return NULL;
> +}
> +
>  DOTCONF_CB(get_attr_cache_keywords_list)
>  {
>      int i = 0, len = 0;
> @@ -2298,6 +2335,9 @@
>              src_fs->attr_cache_max_num_elems;
>          dest_fs->trove_sync_meta = src_fs->trove_sync_meta;
>          dest_fs->trove_sync_data = src_fs->trove_sync_data;
> +
> +        dest_fs->fp_buffer_size = src_fs->fp_buffer_size;
> +        dest_fs->fp_buffers_per_flow = src_fs->fp_buffers_per_flow;
>      }
>  }
>
> diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ 
> common/misc/server-config.h
> --- pvfs2/src/common/misc/server-config.h	2006-07-13  
> 07:11:40.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.h	2006-08-02  
> 23:28:17.000000000 +0200
> @@ -85,6 +85,9 @@
>      int coalescing_high_watermark;
>      int coalescing_low_watermark;
>
> +    int fp_buffer_size;
> +    int fp_buffers_per_flow;
> +
>  } filesystem_configuration_s;
>
>  typedef struct distribution_param_configuration_s
> diff -Naur pvfs2/src/io/flow/flow.c pvfs2-new/src/io/flow/flow.c
> --- pvfs2/src/io/flow/flow.c	2006-06-28 23:03:08.000000000 +0200
> +++ pvfs2-new/src/io/flow/flow.c	2006-08-02 23:26:24.000000000 +0200
> @@ -306,6 +306,8 @@
>      flow_d->aggregate_size = -1;
>      flow_d->state = FLOW_INITIAL;
>      flow_d->type = FLOWPROTO_DEFAULT;
> +    flow_d->buffers_per_flow = -1;
> +    flow_d->buffer_size = -1;
>
>      flow_d->flow_mutex = (tmp_mutex ? tmp_mutex : gen_mutex_build());
>      assert(flow_d->flow_mutex);
> diff -Naur pvfs2/src/io/flow/flow.h pvfs2-new/src/io/flow/flow.h
> --- pvfs2/src/io/flow/flow.h	2005-12-14 22:50:25.000000000 +0100
> +++ pvfs2-new/src/io/flow/flow.h	2006-08-02 23:26:24.000000000 +0200
> @@ -120,6 +120,10 @@
>      /* information about the datafile that this flow will access */
>      PINT_request_file_data file_data;
>
> +    /* the buffer settings may be ignored by some protocols */
> +    int buffer_size;            /* buffer size to use */
> +    int buffers_per_flow;       /* number of buffers to allow per  
> flow */
> +
>  	/***********************************************************/
>      /* fields that can be read publicly upon completion */
>
> diff -Naur pvfs2/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c
> --- pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c	 
> 2006-05-28 18:52:08.000000000 +0200
> +++ pvfs2-new/src/io/flow/flowproto-bmi-trove/flowproto- 
> multiqueue.c	2006-08-02 23:26:24.000000000 +0200
> @@ -22,8 +22,12 @@
>  #include "pint-perf-counter.h"
>  #include "pvfs2-internal.h"
>
> +/* the following buffer settings are used by default if none are  
> specified in
> + * the flow descriptor
> + */
>  #define BUFFERS_PER_FLOW 8
>  #define BUFFER_SIZE (256*1024)
> +
>  #define MAX_REGIONS 64
>
>  #define FLOW_CLEANUP 
> (__flow_data)                                     \
> @@ -72,7 +76,7 @@
>  struct fp_private_data
>  {
>      flow_descriptor *parent;
> -    struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
> +    struct fp_queue_item* prealloc_array;
>      struct qlist_head list_link;
>      PVFS_size total_bytes_processed;
>      int next_seq;
> @@ -540,7 +544,21 @@
>              PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
>      }
>
> -    for(i=0; i<BUFFERS_PER_FLOW; i++)
> +    if(flow_d->buffer_size < 1)
> +        flow_d->buffer_size = BUFFER_SIZE;
> +    if(flow_d->buffers_per_flow < 1)
> +        flow_d->buffers_per_flow = BUFFERS_PER_FLOW;
> +
> +    flow_data->prealloc_array = (struct fp_queue_item*)
> +        malloc(flow_d->buffers_per_flow*sizeof(struct  
> fp_queue_item));
> +    if(!flow_data->prealloc_array)
> +    {
> +        free(flow_data);
> +        return(-PVFS_ENOMEM);
> +    }
> +    memset(flow_data->prealloc_array, 0,
> +        flow_d->buffers_per_flow*sizeof(struct fp_queue_item));
> +    for(i=0; i<flow_d->buffers_per_flow; i++)
>      {
>          flow_data->prealloc_array[i].parent = flow_d;
>          flow_data->prealloc_array[i].bmi_callback.data =
> @@ -557,7 +575,7 @@
>          /* put all of the buffers on empty list, we don't really  
> do any
>           * queueing for this type of flow
>           */
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -583,7 +601,7 @@
>          /* put all of the buffers on empty list, we don't really  
> do any
>           * queueing for this type of flow
>           */
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -604,9 +622,9 @@
>      else if(flow_d->src.endpoint_id == TROVE_ENDPOINT &&
>          flow_d->dest.endpoint_id == BMI_ENDPOINT)
>      {
> -        flow_data->initial_posts = BUFFERS_PER_FLOW;
> +        flow_data->initial_posts = flow_d->buffers_per_flow;
>          gen_mutex_lock(flow_data->parent->flow_mutex);
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_d->buffers_per_flow; i++)
>          {
>              gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
>                  "flowproto-multiqueue forcing bmi_send_callback_fn. 
> \n");
> @@ -635,7 +653,7 @@
>          flow_data->initial_posts = 1;
>
>          /* place remaining buffers on "empty" queue */
> -        for(i=1; i<BUFFERS_PER_FLOW; i++)
> +        for(i=1; i<flow_d->buffers_per_flow; i++)
>          {
>              qlist_add_tail(&flow_data->prealloc_array[i].list_link,
>                  &flow_data->empty_list);
> @@ -760,7 +778,7 @@
>          {
>              /* if the q_item has not been used, allocate a buffer */
>              q_item->buffer = BMI_memalloc(q_item->parent- 
> >src.u.bmi.address,
> -                BUFFER_SIZE, BMI_RECV);
> +                q_item->parent->buffer_size, BMI_RECV);
>              /* TODO: error handling */
>              assert(q_item->buffer);
>              q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> @@ -784,7 +802,8 @@
>                  result_tmp->offset_list;
>              result_tmp->result.size_array =
>                  result_tmp->size_list;
> -            result_tmp->result.bytemax = BUFFER_SIZE -  
> bytes_processed;
> +            result_tmp->result.bytemax = flow_data->parent- 
> >buffer_size -
> +                bytes_processed;
>              result_tmp->result.bytes = 0;
>              result_tmp->result.segmax = MAX_REGIONS;
>              result_tmp->result.segs = 0;
> @@ -813,10 +832,10 @@
>                  tmp_buffer = (void*)((char*)tmp_buffer +  
> old_result_tmp->result.bytes);
>                  bytes_processed += old_result_tmp->result.bytes;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>          if(bytes_processed == 0)
>          {
>              qlist_del(&q_item->list_link);
> @@ -830,7 +849,7 @@
>          ret = BMI_post_recv(&q_item->posted_id,
>              q_item->parent->src.u.bmi.address,
>              q_item->buffer,
> -            BUFFER_SIZE,
> +            flow_data->parent->buffer_size,
>              &tmp_actual_size,
>              BMI_PRE_ALLOC,
>              q_item->parent->tag,
> @@ -1065,7 +1084,7 @@
>      {
>          /* if the q_item has not been used, allocate a buffer */
>          q_item->buffer = BMI_memalloc(q_item->parent- 
> >dest.u.bmi.address,
> -            BUFFER_SIZE, BMI_SEND);
> +            q_item->parent->buffer_size, BMI_SEND);
>          /* TODO: error handling */
>          assert(q_item->buffer);
>          q_item->bmi_callback.fn = bmi_send_callback_wrapper;
> @@ -1093,7 +1112,8 @@
>              result_tmp->offset_list;
>          result_tmp->result.size_array =
>              result_tmp->size_list;
> -        result_tmp->result.bytemax = BUFFER_SIZE - bytes_processed;
> +        result_tmp->result.bytemax = q_item->parent->buffer_size
> +            - bytes_processed;
>          result_tmp->result.bytes = 0;
>          result_tmp->result.segmax = MAX_REGIONS;
>          result_tmp->result.segs = 0;
> @@ -1125,10 +1145,10 @@
>              q_item->buffer_used += old_result_tmp->result.bytes;
>          }
>
> -    }while(bytes_processed < BUFFER_SIZE &&
> +    }while(bytes_processed < flow_data->parent->buffer_size &&
>          !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -    assert(bytes_processed <= BUFFER_SIZE);
> +    assert(bytes_processed <= flow_data->parent->buffer_size);
>
>      /* important to update the sequence /after/ request processed */
>      q_item->seq = flow_data->next_seq;
> @@ -1328,7 +1348,7 @@
>      {
>          /* if the q_item has not been used, allocate a buffer */
>          q_item->buffer = BMI_memalloc(q_item->parent- 
> >src.u.bmi.address,
> -            BUFFER_SIZE, BMI_RECV);
> +            q_item->parent->buffer_size, BMI_RECV);
>          /* TODO: error handling */
>          assert(q_item->buffer);
>          q_item->bmi_callback.fn = bmi_recv_callback_wrapper;
> @@ -1360,7 +1380,8 @@
>                  result_tmp->offset_list;
>              result_tmp->result.size_array =
>                  result_tmp->size_list;
> -            result_tmp->result.bytemax = BUFFER_SIZE -  
> bytes_processed;
> +            result_tmp->result.bytemax = flow_data->parent- 
> >buffer_size
> +                - bytes_processed;
>              result_tmp->result.bytes = 0;
>              result_tmp->result.segmax = MAX_REGIONS;
>              result_tmp->result.segs = 0;
> @@ -1391,10 +1412,10 @@
>                      ((char*)tmp_buffer + old_result_tmp- 
> >result.bytes);
>                  bytes_processed += old_result_tmp->result.bytes;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>
>          flow_data->total_bytes_processed += bytes_processed;
>
> @@ -1414,7 +1435,7 @@
>          ret = BMI_post_recv(&q_item->posted_id,
>              q_item->parent->src.u.bmi.address,
>              q_item->buffer,
> -            BUFFER_SIZE,
> +            flow_data->parent->buffer_size,
>              &tmp_actual_size,
>              BMI_PRE_ALLOC,
>              q_item->parent->tag,
> @@ -1458,13 +1479,13 @@
>      if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
>          flow_data->parent->dest.endpoint_id == TROVE_ENDPOINT)
>      {
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
>          {
>              if(flow_data->prealloc_array[i].buffer)
>              {
>                  BMI_memfree(flow_data->parent->src.u.bmi.address,
>                      flow_data->prealloc_array[i].buffer,
> -                    BUFFER_SIZE,
> +                    flow_data->parent->buffer_size,
>                      BMI_RECV);
>              }
>              result_tmp = &(flow_data->prealloc_array 
> [i].result_chain);
> @@ -1481,13 +1502,13 @@
>      else if(flow_data->parent->src.endpoint_id == TROVE_ENDPOINT &&
>          flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
>      {
> -        for(i=0; i<BUFFERS_PER_FLOW; i++)
> +        for(i=0; i<flow_data->parent->buffers_per_flow; i++)
>          {
>              if(flow_data->prealloc_array[i].buffer)
>              {
>                  BMI_memfree(flow_data->parent->dest.u.bmi.address,
>                      flow_data->prealloc_array[i].buffer,
> -                    BUFFER_SIZE,
> +                    flow_data->parent->buffer_size,
>                      BMI_SEND);
>              }
>              result_tmp = &(flow_data->prealloc_array 
> [i].result_chain);
> @@ -1507,7 +1528,7 @@
>          if(flow_data->intermediate)
>          {
>              BMI_memfree(flow_data->parent->dest.u.bmi.address,
> -                flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
> +                flow_data->intermediate, flow_data->parent- 
> >buffer_size, BMI_SEND);
>          }
>      }
>      else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
> @@ -1516,9 +1537,11 @@
>          if(flow_data->intermediate)
>          {
>              BMI_memfree(flow_data->parent->src.u.bmi.address,
> -                flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
> +                flow_data->intermediate, flow_data->parent- 
> >buffer_size, BMI_RECV);
>          }
>      }
> +
> +    free(flow_data->prealloc_array);
>  }
>
>  /* mem_to_bmi_callback()
> @@ -1574,7 +1597,7 @@
>          q_item->result_chain.offset_list;
>      q_item->result_chain.result.size_array =
>          q_item->result_chain.size_list;
> -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> +    q_item->result_chain.result.bytemax = flow_data->parent- 
> >buffer_size;
>      q_item->result_chain.result.bytes = 0;
>      q_item->result_chain.result.segmax = MAX_REGIONS;
>      q_item->result_chain.result.segs = 0;
> @@ -1590,14 +1613,14 @@
>
>      /* was MAX_REGIONS enough to satisfy this step? */
>      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> +        q_item->result_chain.result.bytes < flow_data->parent- 
> >buffer_size)
>      {
>          /* create an intermediate buffer */
>          if(!flow_data->intermediate)
>          {
>              flow_data->intermediate = BMI_memalloc(
>                  flow_data->parent->dest.u.bmi.address,
> -                BUFFER_SIZE, BMI_SEND);
> +                flow_data->parent->buffer_size, BMI_SEND);
>              /* TODO: error handling */
>              assert(flow_data->intermediate);
>          }
> @@ -1615,7 +1638,7 @@
>          do
>          {
>              q_item->result_chain.result.bytemax =
> -                (BUFFER_SIZE - bytes_processed);
> +                (flow_data->parent->buffer_size - bytes_processed);
>              q_item->result_chain.result.bytes = 0;
>              q_item->result_chain.result.segmax = MAX_REGIONS;
>              q_item->result_chain.result.segs = 0;
> @@ -1638,10 +1661,10 @@
>                  memcpy(dest_ptr, src_ptr, q_item- 
> >result_chain.size_list[i]);
>                  bytes_processed += q_item->result_chain.size_list[i];
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert (bytes_processed <= BUFFER_SIZE);
> +        assert (bytes_processed <= flow_data->parent->buffer_size);
>
>          /* setup for BMI operation */
>          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
> @@ -1761,7 +1784,7 @@
>          do
>          {
>              q_item->result_chain.result.bytemax =
> -                (BUFFER_SIZE - bytes_processed);
> +                (q_item->parent->buffer_size - bytes_processed);
>              q_item->result_chain.result.bytes = 0;
>              q_item->result_chain.result.segmax = MAX_REGIONS;
>              q_item->result_chain.result.segs = 0;
> @@ -1785,10 +1808,10 @@
>                  memcpy(dest_ptr, src_ptr, region_size);
>                  bytes_processed += region_size;
>              }
> -        }while(bytes_processed < BUFFER_SIZE &&
> +        }while(bytes_processed < flow_data->parent->buffer_size &&
>              !PINT_REQUEST_DONE(q_item->parent->file_req_state));
>
> -        assert(bytes_processed <= BUFFER_SIZE);
> +        assert(bytes_processed <= flow_data->parent->buffer_size);
>      }
>
>      /* are we done? */
> @@ -1807,7 +1830,7 @@
>          q_item->result_chain.offset_list;
>      q_item->result_chain.result.size_array =
>          q_item->result_chain.size_list;
> -    q_item->result_chain.result.bytemax = BUFFER_SIZE;
> +    q_item->result_chain.result.bytemax = flow_data->parent- 
> >buffer_size;
>      q_item->result_chain.result.bytes = 0;
>      q_item->result_chain.result.segmax = MAX_REGIONS;
>      q_item->result_chain.result.segs = 0;
> @@ -1822,22 +1845,22 @@
>
>      /* was MAX_REGIONS enough to satisfy this step? */
>      if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
> -        q_item->result_chain.result.bytes < BUFFER_SIZE)
> +        q_item->result_chain.result.bytes < flow_data->parent- 
> >buffer_size)
>      {
>          /* create an intermediate buffer */
>          if(!flow_data->intermediate)
>          {
>              flow_data->intermediate = BMI_memalloc(
>                  flow_data->parent->src.u.bmi.address,
> -                BUFFER_SIZE, BMI_RECV);
> +                flow_data->parent->buffer_size, BMI_RECV);
>              /* TODO: error handling */
>              assert(flow_data->intermediate);
>          }
>          /* setup for BMI operation */
>          flow_data->tmp_buffer_list[0] = flow_data->intermediate;
>          buffer_type = BMI_PRE_ALLOC;
> -        q_item->buffer_used = BUFFER_SIZE;
> -        total_size = BUFFER_SIZE;
> +        q_item->buffer_used = flow_data->parent->buffer_size;
> +        total_size = flow_data->parent->buffer_size;
>          size_array = &q_item->buffer_used;
>          segs = 1;
>          /* we will copy data out on next iteration */
> diff -Naur pvfs2/src/server/io.sm pvfs2-new/src/server/io.sm
> --- pvfs2/src/server/io.sm	2006-06-05 21:57:28.000000000 +0200
> +++ pvfs2-new/src/server/io.sm	2006-08-02 23:27:24.000000000 +0200
> @@ -162,6 +162,7 @@
>      int err = -PVFS_EIO;
>      job_id_t tmp_id;
>      struct server_configuration_s *user_opts =  
> get_server_config_struct();
> +    struct filesystem_configuration_s *fs_conf;
>
>      s_op->u.io.flow_d = PINT_flow_alloc();
>      if (!s_op->u.io.flow_d)
> @@ -201,6 +202,15 @@
>      s_op->u.io.flow_d->user_ptr = NULL;
>      s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
>
> +    fs_conf = PINT_config_find_fs_id(user_opts,
> +        s_op->req->u.io.fs_id);
> +    if(fs_conf)
> +    {
> +        /* pick up any buffer settings overrides from fs conf */
> +        s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
> +        s_op->u.io.flow_d->buffers_per_flow = fs_conf- 
> >fp_buffers_per_flow;
> +    }
> +
>      gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, "
>          "server_nr: %d, server_ct: %d\n",
>          lld(s_op->u.io.flow_d->file_data.fsize),
> diff -Naur pvfs2/src/common/misc/server-config.c pvfs2-new/src/ 
> common/misc/server-config.c
> --- pvfs2/src/common/misc/server-config.c	2006-08-02  
> 17:13:00.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.c	2006-08-03  
> 23:10:28.000000000 +0200
> @@ -73,6 +73,7 @@
>  static DOTCONF_CB(get_trove_sync_meta);
>  static DOTCONF_CB(get_trove_sync_data);
>  static DOTCONF_CB(get_db_cache_size_bytes);
> +static DOTCONF_CB(get_trove_max_concurrent_io);
>  static DOTCONF_CB(get_db_cache_type);
>  static DOTCONF_CB(get_param);
>  static DOTCONF_CB(get_value);
> @@ -423,6 +424,12 @@
>      {"ID",ARG_INT, get_filesystem_collid,NULL,
>          CTX_FILESYSTEM,NULL},
>
> +    /* maximum number of AIO operations that Trove will allow to run
> +     * concurrently
> +     */
> +    {"TroveMaxConcurrentIO", ARG_INT, get_trove_max_concurrent_io,  
> NULL,
> +        CTX_DEFAULTS|CTX_GLOBAL,"16"},
> +
>      /* The gossip interface in pvfs allows users to specify different
>       * levels of logging for the pvfs server.  The output of these
>       * different log levels is written to a file, which is  
> specified in
> @@ -742,6 +749,7 @@
>      config_s->client_job_flow_timeout =  
> PVFS2_CLIENT_JOB_FLOW_TIMEOUT_DEFAULT;
>      config_s->client_retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
>      config_s->client_retry_delay_ms =  
> PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
> +    config_s->trove_max_concurrent_io = 16;
>
>      if (cache_config_files(
>              config_s, global_config_filename,  
> server_config_filename))
> @@ -1548,6 +1556,14 @@
>      return NULL;
>  }
>
> +DOTCONF_CB(get_trove_max_concurrent_io)
> +{
> +    struct server_configuration_s *config_s =
> +        (struct server_configuration_s *)cmd->context;
> +    config_s->trove_max_concurrent_io = cmd->data.value;
> +    return NULL;
> +}
> +
>  DOTCONF_CB(get_db_cache_type)
>  {
>      struct server_configuration_s *config_s =
> diff -Naur pvfs2/src/common/misc/server-config.h pvfs2-new/src/ 
> common/misc/server-config.h
> --- pvfs2/src/common/misc/server-config.h	2006-07-13  
> 07:11:40.000000000 +0200
> +++ pvfs2-new/src/common/misc/server-config.h	2006-08-03  
> 23:26:23.000000000 +0200
> @@ -147,6 +147,8 @@
>                                         if zero, use defaults */
>      char * db_cache_type;
>
> +    int trove_max_concurrent_io;    /* maximum number of  
> simultaneous I/O ops */
> +
>  } server_configuration_s;
>
>  int PINT_parse_config(
> diff -Naur pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c pvfs2-new/ 
> src/io/trove/trove-dbpf/dbpf-bstream.c
> --- pvfs2/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-06-23  
> 22:59:29.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove-dbpf/dbpf-bstream.c	2006-08-03  
> 23:05:37.000000000 +0200
> @@ -31,7 +31,7 @@
>
>  #define AIOCB_ARRAY_SZ 64
>
> -#define DBPF_MAX_IOS_IN_PROGRESS  16
> +extern int TROVE_max_concurrent_io;
>  static int s_dbpf_ios_in_progress = 0;
>  static dbpf_op_queue_p s_dbpf_io_ready_queue = NULL;
>  static gen_mutex_t s_dbpf_io_mutex = GEN_MUTEX_INITIALIZER;
> @@ -277,9 +277,6 @@
>                 (cur_op->op.type == BSTREAM_WRITE_LIST));
>          dbpf_op_queue_remove(cur_op);
>
> -        assert(s_dbpf_ios_in_progress <
> -               (DBPF_MAX_IOS_IN_PROGRESS + 1));
> -
>          gossip_debug(GOSSIP_TROVE_DEBUG, "starting delayed I/O "
>                       "operation %p (%d in progress)\n", cur_op,
>                       s_dbpf_ios_in_progress);
> @@ -363,7 +360,7 @@
>      {
>          s_dbpf_ios_in_progress--;
>      }
> -    if (s_dbpf_ios_in_progress < DBPF_MAX_IOS_IN_PROGRESS)
> +    if (s_dbpf_ios_in_progress < TROVE_max_concurrent_io)
>      {
>          s_dbpf_ios_in_progress++;
>      }
> diff -Naur pvfs2/src/io/trove/trove.c pvfs2-new/src/io/trove/trove.c
> --- pvfs2/src/io/trove/trove.c	2006-06-16 23:01:13.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove.c	2006-08-03 23:08:08.000000000 +0200
> @@ -31,6 +31,7 @@
>
>  int TROVE_db_cache_size_bytes = 0;
>  int TROVE_shm_key_hint = 0;
> +int TROVE_max_concurrent_io = 16;
>
>  /** Initiate reading from a contiguous region in a bstream into a
>   *  contiguous region in memory.
> @@ -964,6 +965,11 @@
>          TROVE_shm_key_hint = *((int*)parameter);
>  	return(0);
>      }
> +    if(option == TROVE_MAX_CONCURRENT_IO)
> +    {
> +        TROVE_max_concurrent_io = *((int*)parameter);
> +	return(0);
> +    }
>
>      method_id = map_coll_id_to_method(coll_id);
>      if (method_id < 0) {
> diff -Naur pvfs2/src/io/trove/trove.h pvfs2-new/src/io/trove/trove.h
> --- pvfs2/src/io/trove/trove.h	2006-07-13 07:11:41.000000000 +0200
> +++ pvfs2-new/src/io/trove/trove.h	2006-08-03 23:07:37.000000000 +0200
> @@ -72,6 +72,7 @@
>      TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS,
>      TROVE_COLLECTION_ATTR_CACHE_INITIALIZE,
>      TROVE_DB_CACHE_SIZE_BYTES,
> +    TROVE_MAX_CONCURRENT_IO,
>      TROVE_COLLECTION_COALESCING_HIGH_WATERMARK,
>      TROVE_COLLECTION_COALESCING_LOW_WATERMARK,
>      TROVE_COLLECTION_META_SYNC_MODE,
> diff -Naur pvfs2/src/server/pvfs2-server.c pvfs2-new/src/server/ 
> pvfs2-server.c
> --- pvfs2/src/server/pvfs2-server.c	2006-07-13 07:11:42.000000000  
> +0200
> +++ pvfs2-new/src/server/pvfs2-server.c	2006-08-03  
> 23:07:01.000000000 +0200
> @@ -950,6 +950,10 @@
>                                      
> &server_config.db_cache_size_bytes);
>      /* this should never fail */
>      assert(ret == 0);
> +    ret = trove_collection_setinfo(0, 0, TROVE_MAX_CONCURRENT_IO,
> +        &server_config.trove_max_concurrent_io);
> +    /* this should never fail */
> +    assert(ret == 0);
>
>      /* parse port number and allow trove to use it to help  
> differentiate
>       * shmem regions if needed
> _______________________________________________
> Pvfs2-developers mailing list
> Pvfs2-developers at beowulf-underground.org
> http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers



More information about the Pvfs2-developers mailing list