[PVFS2-CVS]
commit by wujs in pvfs2/src/io/flow/flowproto-bmi-cache:
flowproto-bmi-cache-client.c
CVS commit program
cvs at parl.clemson.edu
Tue Mar 23 13:51:32 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/flow/flowproto-bmi-cache
In directory parlweb:/tmp/cvs-serv6296/src/io/flow/flowproto-bmi-cache
Removed Files:
flowproto-bmi-cache-client.c
Log Message:
remove flowproto-bmi-cache-client.c. At this moment, the client
still uses "flowproto_multiqueue", no matter what flow protocol is
on the server side.
--- flowproto-bmi-cache-client.c 2004-03-23 13:51:32.000000000 -0500
+++ /dev/null 2003-01-30 05:24:37.000000000 -0500
@@ -1,667 +0,0 @@
-/*
- * (C) 2001 Clemson University and The University of Chicago
- *
- * See COPYING in top-level directory.
- */
-
-#include <errno.h>
-#include <assert.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/time.h>
-#include <unistd.h>
-
-#include "gossip.h"
-#include "quicklist.h"
-#include "flow.h"
-#include "flowproto-support.h"
-#include "gen-locks.h"
-#include "bmi.h"
-#include "trove.h"
-#include "thread-mgr.h"
-#include "pint-perf-counter.h"
-
-#define BUFFERS_PER_FLOW 8
-#define BUFFER_SIZE (256*1024)
-#define MAX_REGIONS 16
-
-struct result_chain_entry
-{
- void* buffer_offset;
- PINT_Request_result result;
- PVFS_size size_list[MAX_REGIONS];
- PVFS_offset offset_list[MAX_REGIONS];
- struct result_chain_entry* next;
-};
-
-/* fp_queue_item describes an individual buffer being used within the flow */
-struct fp_queue_item
-{
- int last;
- int seq;
- void* buffer;
- PVFS_size buffer_used;
- struct result_chain_entry result_chain;
- int result_chain_count;
- struct qlist_head list_link;
- flow_descriptor* parent;
- struct PINT_thread_mgr_bmi_callback bmi_callback;
- struct PINT_thread_mgr_trove_callback trove_callback;
-};
-
-/* fp_private_data is information specific to this flow protocol, stored
- * in flow descriptor but hidden from caller
- */
-struct fp_private_data
-{
- flow_descriptor* parent;
- struct fp_queue_item prealloc_array[BUFFERS_PER_FLOW];
- struct qlist_head list_link;
- PVFS_size total_bytes_processed;
- int next_seq;
- int next_seq_to_send;
- int dest_pending;
- int dest_last_posted;
- int initial_posts;
- gen_mutex_t flow_mutex;
- void* tmp_buffer_list[MAX_REGIONS];
- void* intermediate;
-
- struct qlist_head src_list;
- struct qlist_head dest_list;
- struct qlist_head empty_list;
-};
-#define PRIVATE_FLOW(target_flow)\
- ((struct fp_private_data*)(target_flow->flow_protocol_data))
-
-static int fp_bmi_cache_id = -1;
-static bmi_context_id global_bmi_context = -1;
-static void mem_to_bmi_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
-static void bmi_to_mem_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code);
-static void cleanup_buffers(struct fp_private_data* flow_data);
-
-
-/* interface prototypes */
-static int fp_bmi_cache_initialize(int flowproto_id);
-
-static int fp_bmi_cache_finalize(void);
-
-static int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
-
-static int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter);
-
-static int fp_bmi_cache_post(flow_descriptor * flow_d);
-
-static char fp_bmi_cache_name[] = "flowproto_bmi_cache";
-
-struct flowproto_ops fp_bmi_cache_ops = {
- fp_bmi_cache_name,
- fp_bmi_cache_initialize,
- fp_bmi_cache_finalize,
- fp_bmi_cache_getinfo,
- fp_bmi_cache_setinfo,
- fp_bmi_cache_post
-};
-
-/* fp_bmi_cache_initialize()
- *
- * starts up the flow protocol
- *
- * returns 0 on succes, -PVFS_error on failure
- */
-int fp_bmi_cache_initialize(int flowproto_id)
-{
- int ret = -1;
-
- ret = PINT_thread_mgr_bmi_start();
- if(ret < 0)
- return(ret);
- PINT_thread_mgr_bmi_getcontext(&global_bmi_context);
-
- fp_bmi_cache_id = flowproto_id;
-
- return(0);
-}
-
-/* fp_bmi_cache_finalize()
- *
- * shuts down the flow protocol
- *
- * returns 0 on success, -PVFS_error on failure
- */
-int fp_bmi_cache_finalize(void)
-{
- PINT_thread_mgr_bmi_stop();
- return (0);
-}
-
-/* fp_bmi_cache_getinfo()
- *
- * retrieves runtime parameters from flow protocol
- *
- * returns 0 on success, -PVFS_error on failure
- */
-int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
-{
- int* type;
-
- switch(option)
- {
- case FLOWPROTO_TYPE_QUERY:
- type = parameter;
- if(*type == FLOWPROTO_MULTIQUEUE)
- return(0);
- else
- return(-PVFS_ENOPROTOOPT);
- default:
- return(-PVFS_ENOSYS);
- break;
- }
-}
-
-/* fp_bmi_cache_setinfo()
- *
- * sets runtime parameters in flow protocol
- *
- * returns 0 on success, -PVFS_error on failure
- */
-int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
- int option,
- void *parameter)
-{
- return (-PVFS_ENOSYS);
-}
-
-/* fp_bmi_cache_post()
- *
- * posts a flow descriptor to begin work
- *
- * returns 0 on success, 1 on immediate completion, -PVFS_error on failure
- */
-int fp_bmi_cache_post(flow_descriptor * flow_d)
-{
- struct fp_private_data* flow_data = NULL;
- int i;
-
- assert( (flow_d->src.endpoint_id == MEM_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT) ||
- (flow_d->src.endpoint_id == BMI_ENDPOINT &&
- flow_d->dest.endpoint_id == MEM_ENDPOINT) );
-
- flow_data = (struct fp_private_data*)malloc(sizeof(struct
- fp_private_data));
- if(!flow_data)
- return(-PVFS_ENOMEM);
- memset(flow_data, 0, sizeof(struct fp_private_data));
-
- flow_d->flow_protocol_data = flow_data;
- flow_d->state = FLOW_TRANSMITTING;
- flow_data->parent = flow_d;
- INIT_QLIST_HEAD(&flow_data->src_list);
- INIT_QLIST_HEAD(&flow_data->dest_list);
- INIT_QLIST_HEAD(&flow_data->empty_list);
- gen_mutex_init(&flow_data->flow_mutex);
-
- /* if a file datatype offset was specified, go ahead and skip ahead
- * before doing anything else
- */
- if(flow_d->file_req_offset)
- PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state,
- flow_d->file_req_offset);
-
- /* set boundaries on file datatype */
- if(flow_d->aggregate_size > -1)
- {
- PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
- flow_d->aggregate_size+flow_d->file_req_offset);
- }
- else
- {
- PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
- flow_d->file_req_offset +
- PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
- }
-
- for(i=0; i<BUFFERS_PER_FLOW; i++)
- {
- flow_data->prealloc_array[i].parent = flow_d;
- flow_data->prealloc_array[i].bmi_callback.data =
- &(flow_data->prealloc_array[i]);
- flow_data->prealloc_array[i].trove_callback.data =
- &(flow_data->prealloc_array[i]);
- }
-
- /* remaining setup depends on the endpoints we intend to use */
- if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
- flow_d->dest.endpoint_id == MEM_ENDPOINT)
- {
- flow_data->prealloc_array[0].buffer = flow_d->dest.u.mem.buffer;
- flow_data->prealloc_array[0].bmi_callback.fn = bmi_to_mem_callback_fn;
- bmi_to_mem_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
- }
- else if(flow_d->src.endpoint_id == MEM_ENDPOINT &&
- flow_d->dest.endpoint_id == BMI_ENDPOINT)
- {
- flow_data->prealloc_array[0].buffer = flow_d->src.u.mem.buffer;
- flow_data->prealloc_array[0].bmi_callback.fn = mem_to_bmi_callback_fn;
- mem_to_bmi_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
- }
- else
- {
- return(-ENOSYS);
- }
-
- return (0);
-}
-
-
-/* cleanup_buffers()
- *
- * releases any resources consumed during flow processing
- *
- * no return value
- */
-static void cleanup_buffers(struct fp_private_data* flow_data)
-{
- if(flow_data->parent->src.endpoint_id == MEM_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == BMI_ENDPOINT)
- {
- if(flow_data->intermediate)
- {
- BMI_memfree(flow_data->parent->dest.u.bmi.address,
- flow_data->intermediate, BUFFER_SIZE, BMI_SEND);
- }
- }
- else if(flow_data->parent->src.endpoint_id == BMI_ENDPOINT &&
- flow_data->parent->dest.endpoint_id == MEM_ENDPOINT)
- {
- if(flow_data->intermediate)
- {
- BMI_memfree(flow_data->parent->src.u.bmi.address,
- flow_data->intermediate, BUFFER_SIZE, BMI_RECV);
- }
- }
-
- return;
-}
-
-/* mem_to_bmi_callback()
- *
- * function to be called upon completion of bmi operations in memory to
- * bmi transfers
- *
- * no return value
- */
-static void mem_to_bmi_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
-{
- struct fp_queue_item* q_item = user_ptr;
- int ret;
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- int i;
- PVFS_id_gen_t tmp_id;
- PVFS_size bytes_processed = 0;
- char *src_ptr, *dest_ptr;
- enum bmi_buffer_type buffer_type = BMI_EXT_ALLOC;
- struct flow_descriptor* flow_d;
-
- /* TODO: error handling */
- if(error_code != 0)
- {
- PVFS_perror_gossip("bmi_recv_callback_fn error_code", error_code);
- assert(0);
- }
-
- gen_mutex_lock(&flow_data->flow_mutex);
-
- flow_data->parent->total_transfered += actual_size;
-
- /* are we done? */
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
- {
- q_item->parent->state = FLOW_COMPLETE;
- gen_mutex_unlock(&flow_data->flow_mutex);
- cleanup_buffers(flow_data);
- flow_d = flow_data->parent;
- free(flow_data);
- flow_d->release(flow_d);
- flow_d->callback(flow_d);
- return;
- }
-
- /* process request */
- q_item->result_chain.result.offset_array =
- q_item->result_chain.offset_list;
- q_item->result_chain.result.size_array =
- q_item->result_chain.size_list;
- q_item->result_chain.result.bytemax = BUFFER_SIZE;
- q_item->result_chain.result.bytes = 0;
- q_item->result_chain.result.segmax = MAX_REGIONS;
- q_item->result_chain.result.segs = 0;
- q_item->result_chain.buffer_offset = NULL;
- ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
-
- /* TODO: error handling */
- assert(ret >= 0);
-
- /* was MAX_REGIONS enough to satisfy this step? */
- if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
- q_item->result_chain.result.bytes < BUFFER_SIZE)
- {
- /* create an intermediate buffer */
- if(!flow_data->intermediate)
- {
- flow_data->intermediate = BMI_memalloc(
- flow_data->parent->dest.u.bmi.address,
- BUFFER_SIZE, BMI_SEND);
- /* TODO: error handling */
- assert(flow_data->intermediate);
- }
-
- /* copy what we have so far into intermediate buffer */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- src_ptr = ((char*)q_item->parent->src.u.mem.buffer +
- q_item->result_chain.offset_list[i]);
- dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
- memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
- bytes_processed += q_item->result_chain.size_list[i];
- }
-
- do
- {
- q_item->result_chain.result.bytemax = BUFFER_SIZE - bytes_processed;
- q_item->result_chain.result.bytes = 0;
- q_item->result_chain.result.segmax = MAX_REGIONS;
- q_item->result_chain.result.segs = 0;
- q_item->result_chain.buffer_offset = NULL;
- /* process ahead */
- ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
- /* TODO: error handling */
- assert(ret >= 0);
-
- /* copy what we have so far into intermediate buffer */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- src_ptr = ((char*)q_item->parent->src.u.mem.buffer +
- q_item->result_chain.offset_list[i]);
- dest_ptr = ((char*)flow_data->intermediate + bytes_processed);
- memcpy(dest_ptr, src_ptr, q_item->result_chain.size_list[i]);
- bytes_processed += q_item->result_chain.size_list[i];
- }
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
-
- /* setup for BMI operation */
- flow_data->tmp_buffer_list[0] = flow_data->intermediate;
- q_item->result_chain.result.size_array[0] = bytes_processed;
- q_item->result_chain.result.bytes = bytes_processed;
- q_item->result_chain.result.segs = 1;
- buffer_type = BMI_PRE_ALLOC;
- }
- else
- {
- /* go ahead and return if there is nothing to do */
- if(q_item->result_chain.result.bytes == 0)
- {
- q_item->parent->state = FLOW_COMPLETE;
- gen_mutex_unlock(&flow_data->flow_mutex);
- cleanup_buffers(flow_data);
- flow_d = flow_data->parent;
- free(flow_data);
- flow_d->release(flow_d);
- flow_d->callback(flow_d);
- return;
- }
-
- /* convert offsets to memory addresses */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- flow_data->tmp_buffer_list[i] =
- (void*)(q_item->result_chain.result.offset_array[i] +
- q_item->buffer);
- }
- }
-
- ret = BMI_post_send_list(&tmp_id,
- q_item->parent->dest.u.bmi.address,
- (const void**)flow_data->tmp_buffer_list,
- q_item->result_chain.result.size_array,
- q_item->result_chain.result.segs,
- q_item->result_chain.result.bytes,
- buffer_type,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
- /* TODO: error handling */
- assert(ret >= 0);
-
- gen_mutex_unlock(&flow_data->flow_mutex);
-
- if(ret == 1)
- {
- mem_to_bmi_callback_fn(q_item,
- q_item->result_chain.result.bytes, 0);
- }
-
- return;
-}
-
-
-/* bmi_to_mem_callback()
- *
- * function to be called upon completion of bmi operations in bmi to memory
- * transfers
- *
- * no return value
- */
-static void bmi_to_mem_callback_fn(void *user_ptr,
- PVFS_size actual_size,
- PVFS_error error_code)
-{
- struct fp_queue_item* q_item = user_ptr;
- int ret;
- struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
- int i;
- PVFS_id_gen_t tmp_id;
- PVFS_size tmp_actual_size;
- PVFS_size* size_array;
- int segs;
- PVFS_size total_size;
- enum bmi_buffer_type buffer_type = BMI_EXT_ALLOC;
- PVFS_size bytes_processed = 0;
- char *src_ptr, *dest_ptr;
- PVFS_size region_size;
- struct flow_descriptor* flow_d;
-
- /* TODO: error handling */
- if(error_code != 0)
- {
- PVFS_perror_gossip("bmi_recv_callback_fn error_code", error_code);
- assert(0);
- }
-
- gen_mutex_lock(&flow_data->flow_mutex);
-
- flow_data->parent->total_transfered += actual_size;
-
- /* if this is the result of a receive into an intermediate buffer,
- * then we must copy out */
- if(flow_data->tmp_buffer_list[0] == flow_data->intermediate &&
- flow_data->intermediate != NULL)
- {
- /* copy out what we have so far */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- region_size = q_item->result_chain.size_list[i];
- src_ptr = (char*)(flow_data->intermediate +
- bytes_processed);
- dest_ptr = (char*)(q_item->result_chain.offset_list[i]
- + q_item->parent->dest.u.mem.buffer);
- memcpy(dest_ptr, src_ptr, region_size);
- bytes_processed += region_size;
- }
-
- do
- {
- q_item->result_chain.result.bytemax = BUFFER_SIZE - bytes_processed;
- q_item->result_chain.result.bytes = 0;
- q_item->result_chain.result.segmax = MAX_REGIONS;
- q_item->result_chain.result.segs = 0;
- q_item->result_chain.buffer_offset = NULL;
- /* process ahead */
- ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
- /* TODO: error handling */
- assert(ret >= 0);
- /* copy out what we have so far */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- region_size = q_item->result_chain.size_list[i];
- src_ptr = (char*)(flow_data->intermediate +
- bytes_processed);
- dest_ptr = (char*)(q_item->result_chain.offset_list[i]
- + q_item->parent->dest.u.mem.buffer);
- memcpy(dest_ptr, src_ptr, region_size);
- bytes_processed += region_size;
- }
- }while(bytes_processed < BUFFER_SIZE &&
- !PINT_REQUEST_DONE(q_item->parent->file_req_state));
- }
-
- /* are we done? */
- if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
- {
- q_item->parent->state = FLOW_COMPLETE;
- gen_mutex_unlock(&flow_data->flow_mutex);
- cleanup_buffers(flow_data);
- flow_d = flow_data->parent;
- free(flow_data);
- flow_d->release(flow_d);
- flow_d->callback(flow_d);
- return;
- }
-
- /* process request */
- q_item->result_chain.result.offset_array =
- q_item->result_chain.offset_list;
- q_item->result_chain.result.size_array =
- q_item->result_chain.size_list;
- q_item->result_chain.result.bytemax = BUFFER_SIZE;
- q_item->result_chain.result.bytes = 0;
- q_item->result_chain.result.segmax = MAX_REGIONS;
- q_item->result_chain.result.segs = 0;
- q_item->result_chain.buffer_offset = NULL;
- ret = PINT_Process_request(q_item->parent->file_req_state,
- q_item->parent->mem_req_state,
- &q_item->parent->file_data,
- &q_item->result_chain.result,
- PINT_CLIENT);
- /* TODO: error handling */
- assert(ret >= 0);
-
- /* was MAX_REGIONS enough to satisfy this step? */
- if(!PINT_REQUEST_DONE(flow_data->parent->file_req_state) &&
- q_item->result_chain.result.bytes < BUFFER_SIZE)
- {
- /* create an intermediate buffer */
- if(!flow_data->intermediate)
- {
- flow_data->intermediate = BMI_memalloc(
- flow_data->parent->src.u.bmi.address,
- BUFFER_SIZE, BMI_RECV);
- /* TODO: error handling */
- assert(flow_data->intermediate);
- }
- /* setup for BMI operation */
- flow_data->tmp_buffer_list[0] = flow_data->intermediate;
- buffer_type = BMI_PRE_ALLOC;
- q_item->buffer_used = BUFFER_SIZE;
- total_size = BUFFER_SIZE;
- size_array = &q_item->buffer_used;
- segs = 1;
- /* we will copy data out on next iteration */
- }
- else
- {
- /* normal case */
- segs = q_item->result_chain.result.segs;
- size_array = q_item->result_chain.result.size_array;
- total_size = q_item->result_chain.result.bytes;
-
- /* convert offsets to memory addresses */
- for(i=0; i<q_item->result_chain.result.segs; i++)
- {
- flow_data->tmp_buffer_list[i] =
- (void*)(q_item->result_chain.result.offset_array[i] +
- q_item->buffer);
- }
-
- /* go ahead and return if there is nothing to do */
- if(q_item->result_chain.result.bytes == 0)
- {
- q_item->parent->state = FLOW_COMPLETE;
- gen_mutex_unlock(&flow_data->flow_mutex);
- cleanup_buffers(flow_data);
- flow_d = flow_data->parent;
- free(flow_data);
- flow_d->release(flow_d);
- flow_d->callback(flow_d);
- return;
- }
- }
-
- ret = BMI_post_recv_list(&tmp_id,
- q_item->parent->src.u.bmi.address,
- flow_data->tmp_buffer_list,
- size_array,
- segs,
- total_size,
- &tmp_actual_size,
- BMI_EXT_ALLOC,
- q_item->parent->tag,
- &q_item->bmi_callback,
- global_bmi_context);
- /* TODO: error handling */
- assert(ret >= 0);
-
- gen_mutex_unlock(&flow_data->flow_mutex);
-
- if(ret == 1)
- {
- bmi_to_mem_callback_fn(q_item, tmp_actual_size, 0);
- }
-
- return;
-}
-
-/*
- * Local variables:
- * c-indent-level: 4
- * c-basic-offset: 4
- * End:
- *
- * vim: ts=8 sts=4 sw=4 noexpandtab
- */
More information about the PVFS2-CVS
mailing list