[Pvfs2-cvs] commit by aching in pvfs2-1/src/client/sysint:
sys-lock.sm
CVS commit program
cvs at parl.clemson.edu
Thu Apr 5 12:32:24 EDT 2007
Update of /projects/cvsroot/pvfs2-1/src/client/sysint
In directory parlweb1:/tmp/cvs-serv6153/src/client/sysint
Modified Files:
Tag: version-lock-actual-branch
sys-lock.sm
Log Message:
Revised state machine to process multiple rounds of lock requests.
Index: sys-lock.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/client/sysint/Attic/sys-lock.sm,v
diff -p -u -r1.1.2.4 -r1.1.2.5
--- sys-lock.sm 31 Jan 2007 05:10:47 -0000 1.1.2.4
+++ sys-lock.sm 5 Apr 2007 16:32:24 -0000 1.1.2.5
@@ -23,6 +23,7 @@
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
+#include "heap.h"
#define LOCK_MAX_SEGMENT_NUM 64
@@ -32,18 +33,31 @@ enum
{
LOCK_NO_DATA = 232,
LOCK_RETRY,
+ LOCK_INCOMPLETE,
LOCK_TRANSFERS_COMPLETE,
+ LOCK_NO_MSGPAIRS
+};
+
+const char *PVFS_lock_type_mapping[] =
+{
+ "PVFS_CLIENT_ACQUIRE_TWO_PHASE",
+ "PVFS_CLIENT_ACQUIRE_ONE_TRY",
+ "PVFS_CLIENT_ACQUIRE_ALT_TRY",
+ "PVFS_CLIENT_RELEASE",
+ "PVFS_SERVER_LOCK_INIT",
+ "PVFS_SERVER_ACQUIRE_NEW_NONBLOCK",
+ "PVFS_SERVER_ACQUIRE_NEW_BLOCK",
+ "PVFS_SERVER_ACQUIRE_NONBLOCK",
+ "PVFS_SERVER_ACQUIRE_BLOCK",
+ "PVFS_SERVER_RELEASE_SOME",
+ "PVFS_SERVER_RELEASE_ALL"
};
static int lock_init(
PINT_client_sm *sm_p, job_status_s *js_p);
-static int lock_setup_msgpairs(
- PINT_client_sm *sm_p, job_status_s *js_p);
-static int lock_post_msgpairs(
+static int lock_init_server_info(
PINT_client_sm *sm_p, job_status_s *js_p);
-static int lock_post_msgpairs_retry(
- PINT_client_sm *sm_p, job_status_s *js_p);
-static int lock_complete_operations(
+static int lock_setup_msgpairs(
PINT_client_sm *sm_p, job_status_s *js_p);
static int lock_analyze_results(
PINT_client_sm *sm_p, job_status_s *js_p);
@@ -52,22 +66,9 @@ static int lock_cleanup(
/* Helper functions local to sys-lock.sm */
-static inline int lock_complete_context_send_or_recv(
- PINT_client_sm *sm_p, job_status_s *js_p);
-
-static inline int lock_decode_ack_response(
- PINT_client_lock_ctx *cur_ctx,
- struct PINT_decoded_msg *decoded_resp,
- struct PVFS_server_resp **resp);
-
-static inline int lock_check_context_status(
- PINT_client_lock_ctx *cur_ctx, int lock_type,
- PVFS_size *total_size);
-
-static inline int lock_process_context_recv(
- PINT_client_sm *sm_p,
- job_status_s *js_p,
- PINT_client_lock_ctx **out_ctx);
+static int lock_completion_fn(void *user_args,
+ struct PVFS_server_resp *resp,
+ int index);
static int lock_find_target_datafiles(
PVFS_Request mem_req,
@@ -79,40 +80,6 @@ static int lock_find_target_datafiles(
int *handle_index_array,
int *handle_index_out_count);
-static int lock_find_total_size(
- PINT_client_sm * sm_p,
- PVFS_offset final_offset,
- PVFS_size * total_return_size);
-
-static int lock_find_offset(
- PINT_client_sm * sm_p,
- PVFS_size contig_size,
- PVFS_size * total_return_offset);
-
-static int lock_contexts_init(
- PINT_client_sm *sm_p,
- int context_count,
- PVFS_object_attr *attr);
-
-static void lock_contexts_destroy(PINT_client_sm *sm_p);
-
-/* misc constants and helper macros */
-#define LOCK_RECV_COMPLETED 1
-
-/* possible lock state machine phases (status_user_tag) */
-#define LOCK_SM_PHASE_REQ_MSGPAIR_RECV 0
-#define LOCK_SM_PHASE_REQ_MSGPAIR_SEND 1
-#define LOCK_SM_PHASE_FINAL_ACK 2
-#define LOCK_SM_NUM_PHASES 3
-
-#define STATUS_USER_TAG_TYPE(tag, type) \
-((tag % LOCK_SM_NUM_PHASES) == type)
-#define STATUS_USER_TAG_GET_INDEX(tag, type) \
-(tag / LOCK_SM_NUM_PHASES)
-#define STATUS_USER_TAG_IS_SEND_OR_RECV(tag) \
-(STATUS_USER_TAG_TYPE(tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV) || \
- STATUS_USER_TAG_TYPE(tag, LOCK_SM_PHASE_REQ_MSGPAIR_SEND))
-
static int lock_datafile_index_array_init(
PINT_client_sm *sm_p,
int datafile_count);
@@ -120,15 +87,48 @@ static int lock_datafile_index_array_ini
static void lock_datafile_index_array_destroy(
PINT_client_sm *sm_p);
+static void print_server_lock_info_arr(
+ PINT_client_sm *sm_p);
+
+static int lock_choose_method(
+ PINT_client_sm *sm_p);
+
+static int lock_fill_msgpair_array(
+ PINT_client_sm *sm_p);
+
+/* Heap helper functions */
+
+void heap_cpy_fn(heap_node_t *dest_p,
+ heap_node_t *src_p);
+
+void heap_swap_fn(heap_node_t *dest_p,
+ heap_node_t *src_p);
+
+void heap_print_fn(heap_node_t *node_p);
+
+void lock_heap_insert(heap_t *heap_p, int64_t key,
+ PINT_server_lock_info *lock_p ,
+ void (*cpy_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p));
+
+void lock_heap_extract_min(heap_t *heap_p, int64_t *key_p,
+ int *proc_p,
+ void (*cpy_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p),
+ void (*swap_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p));
+
+void lock_heap_min(heap_t *heap_p, int64_t *key_p,
+ int *proc_p);
+
%%
machine pvfs2_client_lock_sm(
init,
lock_getattr,
+ lock_init_server_info,
lock_setup_msgpairs,
- lock_post_msgpairs,
- lock_post_msgpairs_retry,
- lock_complete_operations,
+ lock_xfer_msgpairs,
lock_analyze_results,
lock_cleanup)
{
@@ -141,44 +141,34 @@ machine pvfs2_client_lock_sm(
state lock_getattr
{
jump pvfs2_client_getattr_sm;
- success => lock_setup_msgpairs;
+ success => lock_init_server_info;
default => lock_cleanup;
}
- state lock_setup_msgpairs
- {
- run lock_setup_msgpairs;
- LOCK_NO_DATA => lock_cleanup;
- success => lock_post_msgpairs;
- default => lock_cleanup;
- }
-
- state lock_post_msgpairs
+ state lock_init_server_info
{
- run lock_post_msgpairs;
- LOCK_RETRY => lock_post_msgpairs_retry;
- default => lock_complete_operations;
+ run lock_init_server_info;
+ default => lock_setup_msgpairs;
}
- state lock_post_msgpairs_retry
+ state lock_setup_msgpairs
{
- run lock_post_msgpairs_retry;
- LOCK_TRANSFERS_COMPLETE => lock_complete_operations;
- default => lock_post_msgpairs;
+ run lock_setup_msgpairs;
+ LOCK_NO_MSGPAIRS => lock_setup_msgpairs;
+ success => lock_xfer_msgpairs;
+ default => lock_cleanup;
}
- state lock_complete_operations
+ state lock_xfer_msgpairs
{
- run lock_complete_operations;
- LOCK_TRANSFERS_COMPLETE => lock_analyze_results;
- LOCK_RETRY => lock_post_msgpairs_retry;
- default => lock_complete_operations;
+ jump pvfs2_msgpairarray_sm;
+ success => lock_analyze_results;
}
state lock_analyze_results
{
run lock_analyze_results;
- LOCK_RETRY => init;
+ LOCK_INCOMPLETE => lock_setup_msgpairs;
default => lock_cleanup;
}
@@ -203,10 +193,9 @@ PVFS_error PVFS_isys_lock(
PVFS_Request mem_req,
PVFS_credentials *credentials,
PVFS_sysresp_lock *resp_p,
- PVFS_id_gen_t **lock_id_arr_p,
- int *lock_id_arr_count_p,
+ struct qlist_head *lock_id_list_head_p,
enum PVFS_io_type io_type,
- enum PVFS_lock_type lock_type,
+ enum PVFS_client_lock_type lock_type,
PVFS_sys_op_id *op_id,
void *user_ptr)
{
@@ -225,7 +214,10 @@ PVFS_error PVFS_isys_lock(
return ret;
}
- if ((lock_type != PVFS_ACQUIRE) && (lock_type != PVFS_RELEASE))
+ if ((lock_type != PVFS_CLIENT_ACQUIRE_TWO_PHASE) &&
+ (lock_type != PVFS_CLIENT_ACQUIRE_ONE_TRY) &&
+ (lock_type != PVFS_CLIENT_ACQUIRE_ALT_TRY) &&
+ (lock_type != PVFS_CLIENT_RELEASE))
{
gossip_err("invalid (unknown) lock type specified\n");
return ret;
@@ -246,10 +238,10 @@ PVFS_error PVFS_isys_lock(
{
gossip_ldebug(GOSSIP_LOCK_DEBUG, "Warning: 0 byte lock operation "
"attempted.\n");
- resp_p->granted_bytes = 0;
+ resp_p->bytes_accessed = 0;
return 1;
}
- resp_p->granted_bytes = 0;
+ resp_p->bytes_accessed = 0;
sm_p = (PINT_client_sm *)malloc(sizeof(*sm_p));
if (sm_p == NULL)
@@ -263,12 +255,12 @@ PVFS_error PVFS_isys_lock(
sm_p->u.lock.io_type = io_type;
sm_p->u.lock.lock_type = lock_type;
+ sm_p->u.lock.lock_server_cur_method = PVFS_SERVER_LOCK_INIT;
sm_p->u.lock.mem_req = mem_req;
sm_p->u.lock.file_req = file_req;
sm_p->u.lock.file_req_offset = file_req_offset;
sm_p->u.lock.lock_resp_p = resp_p;
- sm_p->u.lock.lock_id_arr_p = lock_id_arr_p;
- sm_p->u.lock.lock_id_arr_count_p = lock_id_arr_count_p;
+ sm_p->u.lock.lock_id_list_head_p = lock_id_list_head_p;
sm_p->u.lock.encoding = cur_fs->encoding;
sm_p->u.lock.stored_error_code = 0;
sm_p->u.lock.retry_count = 0;
@@ -294,10 +286,9 @@ PVFS_error PVFS_sys_lock(
PVFS_Request mem_req,
PVFS_credentials *credentials,
PVFS_sysresp_lock *resp_p,
- PVFS_id_gen_t **lock_id_arr_p,
- int *lock_id_arr_count_p,
+ struct qlist_head *lock_id_list_head_p,
enum PVFS_io_type io_type,
- enum PVFS_lock_type lock_type)
+ enum PVFS_client_lock_type lock_type)
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;
@@ -306,7 +297,7 @@ PVFS_error PVFS_sys_lock(
ret = PVFS_isys_lock(ref, file_req, file_req_offset, mem_req,
credentials, resp_p,
- lock_id_arr_p, lock_id_arr_count_p, io_type,
+ lock_id_list_head_p, io_type,
lock_type, &op_id, NULL);
if (ret == 1)
return 0;
@@ -353,7 +344,6 @@ static int lock_init(PINT_client_sm *sm_
js_p->error_code = 0;
lock_datafile_index_array_destroy(sm_p);
- lock_contexts_destroy(sm_p);
if (sm_p->op_cancelled)
{
@@ -369,15 +359,15 @@ static int lock_init(PINT_client_sm *sm_
return 1;
}
-static int lock_setup_msgpairs(PINT_client_sm *sm_p,
- job_status_s *js_p)
+/* lock_init_server_info - Setup the structures for keeping track of
+ * how far each server has progress with respect to its own locks */
+static int lock_init_server_info(
+ PINT_client_sm *sm_p, job_status_s *js_p)
{
- int ret = -PVFS_EINVAL, i = 0;
+ int ret = -PVFS_EINVAL, i;
PVFS_object_attr *attr = NULL;
- int target_datafile_count = 0;
-
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) lock state: "
- "lock_setup_msgpairs\n", sm_p);
+ "lock_init_server_info\n", sm_p);
if (sm_p->op_cancelled)
{
@@ -426,6 +416,25 @@ static int lock_setup_msgpairs(PINT_clie
&(sm_p->u.lock.dfile_size_array),
attr->u.meta.dfile_count);
+ /* No need to do these calculations if its a release*/
+ if (sm_p->u.lock.lock_type == PVFS_CLIENT_RELEASE)
+ {
+ /* Setup the first lock array to be removed */
+ assert(!qlist_empty(sm_p->u.lock.lock_id_list_head_p));
+ sm_p->u.lock.cur_lock_id_list_p =
+ qlist_entry(sm_p->u.lock.lock_id_list_head_p->next,
+ PVFS_lock_id_list, lock_link);
+
+ /* Initialize msgarray to the largest possible size */
+ sm_p->msgarray_count = attr->u.meta.dfile_count;
+ ret = PINT_msgpairarray_init(&sm_p->msgarray,
+ attr->u.meta.dfile_count);
+ sm_p->u.lock.datafile_count = attr->u.meta.dfile_count;
+ if (ret < 0)
+ js_p->error_code = ret;
+ return 1;
+ }
+
ret = lock_find_target_datafiles(
sm_p->u.lock.mem_req,
sm_p->u.lock.file_req,
@@ -434,7 +443,7 @@ static int lock_setup_msgpairs(PINT_clie
attr->u.meta.dfile_array,
attr->u.meta.dfile_count,
sm_p->u.lock.datafile_index_array,
- &target_datafile_count);
+ &sm_p->u.lock.datafile_count);
if (ret < 0)
{
gossip_debug(GOSSIP_LOCK_DEBUG, " lock_find_target_datafiles: ret "
@@ -443,8 +452,7 @@ static int lock_setup_msgpairs(PINT_clie
goto error_exit;
}
- sm_p->u.lock.datafile_count = target_datafile_count;
- if (target_datafile_count == 0)
+ if (sm_p->u.lock.datafile_count == 0)
{
gossip_debug(GOSSIP_LOCK_DEBUG, " datafile_setup_msg_pairs: no "
"datafiles have data; aborting\n");
@@ -454,516 +462,119 @@ static int lock_setup_msgpairs(PINT_clie
gossip_debug(GOSSIP_LOCK_DEBUG,
" %s: %d datafiles might have had data\n", __func__,
- target_datafile_count);
+ sm_p->u.lock.datafile_count);
- ret = lock_contexts_init(sm_p, target_datafile_count, attr);
- if (ret < 0)
- {
- js_p->error_code = ret;
- goto error_exit;
- }
+ /* Allocate a new node for the client's lock_id_arr and set its
+ * size if this is an acquire call. */
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
+ {
+ sm_p->u.lock.cur_lock_id_list_p = malloc(sizeof(PVFS_lock_id_list));
+ if (!sm_p->u.lock.cur_lock_id_list_p)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ goto error_exit;
+ }
- sm_p->u.lock.total_cancellations_remaining = 0;
+ sm_p->u.lock.cur_lock_id_list_p->lock_id_arr = (PVFS_id_gen_t *)
+ malloc(sm_p->u.lock.datafile_count * sizeof(PVFS_id_gen_t));
+ if (!sm_p->u.lock.cur_lock_id_list_p->lock_id_arr)
+ {
+ js_p->error_code = -PVFS_ENOMEM;
+ goto error_exit;
+ }
- /* Allocate the client's lock_id_arr and set its size if this is
- * an acquire call. */
- if (sm_p->u.lock.lock_type == PVFS_ACQUIRE)
- {
- *(sm_p->u.lock.lock_id_arr_p) = (PVFS_id_gen_t *)
- calloc(target_datafile_count, sizeof(PVFS_id_gen_t));
- if (*(sm_p->u.lock.lock_id_arr_p) == NULL)
+ sm_p->u.lock.cur_lock_id_list_p->datafile_handle_arr = (PVFS_handle *)
+ malloc(sm_p->u.lock.datafile_count * sizeof(PVFS_handle));
+ if (!sm_p->u.lock.cur_lock_id_list_p->datafile_handle_arr)
{
js_p->error_code = -PVFS_ENOMEM;
goto error_exit;
}
- *(sm_p->u.lock.lock_id_arr_count_p) = target_datafile_count;
+ sm_p->u.lock.cur_lock_id_list_p->lock_id_arr_count =
+ sm_p->u.lock.datafile_count;
+
+ qlist_add(&(sm_p->u.lock.cur_lock_id_list_p->lock_link),
+ sm_p->u.lock.lock_id_list_head_p);
}
- for (i = 0; i < target_datafile_count; i++)
+ /* Initialize msgarray */
+ sm_p->msgarray_count = sm_p->u.lock.datafile_count;
+ ret = PINT_msgpairarray_init(&sm_p->msgarray, sm_p->u.lock.datafile_count);
+ if (ret < 0)
{
- gossip_debug(GOSSIP_LOCK_DEBUG, " filling lock request for %llu\n",
- llu(sm_p->u.lock.contexts[i].data_handle));
-
- if (sm_p->u.lock.lock_type == PVFS_ACQUIRE)
- PINT_SERVREQ_LOCK_FILL(
- sm_p->u.lock.contexts[i].msg.req,
- *(sm_p->cred_p),
- sm_p->object_ref.fs_id,
- sm_p->u.lock.contexts[i].data_handle,
- sm_p->u.lock.io_type,
- sm_p->u.lock.lock_type,
- sm_p->u.lock.datafile_index_array[i],
- attr->u.meta.dfile_count,
- attr->u.meta.dist,
- sm_p->u.lock.file_req,
- sm_p->u.lock.file_req_offset,
- PINT_REQUEST_TOTAL_BYTES(sm_p->u.lock.mem_req),
- 0,
- PINT_REQUEST_TOTAL_BYTES(sm_p->u.lock.mem_req),
- -1);
- else
- PINT_SERVREQ_LOCK_FILL(
- sm_p->u.lock.contexts[i].msg.req,
- *(sm_p->cred_p),
- sm_p->object_ref.fs_id,
- sm_p->u.lock.contexts[i].data_handle,
- sm_p->u.lock.io_type,
- sm_p->u.lock.lock_type,
- sm_p->u.lock.datafile_index_array[i],
- attr->u.meta.dfile_count,
- attr->u.meta.dist,
- PVFS_BYTE,
- 0,
- -1,
- -1,
- -1,
- (*(sm_p->u.lock.lock_id_arr_p))[i]);
+ js_p->error_code = ret;
+ return -1;
+ }
+
+ /* Initialize the server lock info array, which keeps track of how
+ * far each server has gotten with this lock request. */
+ sm_p->u.lock.server_lock_info_arr =
+ calloc(sm_p->u.lock.datafile_count, sizeof(PINT_server_lock_info));
+ for (i = 0; i < sm_p->u.lock.datafile_count; i++)
+ sm_p->u.lock.server_lock_info_arr[i].index = i;
+ sm_p->u.lock.server_incomplete_count = sm_p->u.lock.datafile_count;
+
+ /* Initialize the heap. */
+ ret = create_heap(&sm_p->u.lock.server_heap, sm_p->u.lock.datafile_count);
+ if (ret == -1) {
+ gossip_err("sys-lock: create_heap of size %d failed!",
+ sm_p->u.lock.datafile_count);
+ return ret;
}
-
- js_p->error_code = 0;
- lock_datafile_index_array_destroy(sm_p);
error_exit:
exit:
return 1;
}
-/*
- This is based on msgpairarray_post() in msgpairarray.c. It's
- different enough in that we don't have to wait on the msgpairarray
- operations to all complete before posting flows as we can do so for each
- server individually when we're ready. this avoids the msgpairarray
- sync point implicit in the design
-*/
-
-static int lock_post_msgpairs(PINT_client_sm *sm_p,
- job_status_s *js_p)
+static int lock_setup_msgpairs(PINT_client_sm *sm_p,
+ job_status_s *js_p)
{
- int ret = -PVFS_EINVAL, i = 0;
- unsigned long status_user_tag = 0;
- int must_loop_encodings = 0;
- struct server_configuration_s *server_config = NULL;
+ int ret = -PVFS_EINVAL;
+ PVFS_object_attr *attr = NULL;
- gossip_debug(GOSSIP_CLIENT_DEBUG, "lock_post_msgpairs "
- "state: post (%d message(s))\n", sm_p->u.lock.datafile_count);
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) lock state: "
+ "lock_setup_msgpairs\n", sm_p);
if (sm_p->op_cancelled)
{
- js_p->error_code = -PVFS_ECANCEL;
- return 1;
- }
-
- js_p->error_code = 0;
-
- /* completion count tracks sends/recvs separately, will increment
- * as we go through the loop to maintain a count of outstanding msgpairs */
-
- sm_p->u.lock.msgpair_completion_count = 0;
-
- for(i = 0; i < sm_p->u.lock.context_count; i++)
- {
- PINT_client_lock_ctx *cur_ctx = &sm_p->u.lock.contexts[i];
- PINT_sm_msgpair_state *msg = &cur_ctx->msg;
-
- /* do not do this one again in retry case */
- if (cur_ctx->msg_recv_has_been_posted &&
- cur_ctx->msg_recv_in_progress)
- {
- ++sm_p->u.lock.msgpair_completion_count;
- goto recv_already_posted;
- }
-
- if (!ENCODING_IS_VALID(sm_p->u.lock.encoding))
- {
- PRINT_ENCODING_ERROR("supported", sm_p->u.lock.encoding);
- must_loop_encodings = 1;
- sm_p->u.lock.encoding = (ENCODING_INVALID_MIN + 1);
- }
- else if (!ENCODING_IS_SUPPORTED(sm_p->u.lock.encoding))
- {
- PRINT_ENCODING_ERROR("supported", sm_p->u.lock.encoding);
- must_loop_encodings = 1;
- sm_p->u.lock.encoding = ENCODING_SUPPORTED_MIN;
- }
-
- try_next_encoding:
- assert(ENCODING_IS_VALID(sm_p->u.lock.encoding));
-
- ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
- msg->svr_addr, sm_p->u.lock.encoding);
- if (ret)
- {
- if (must_loop_encodings)
- {
- gossip_debug(GOSSIP_CLIENT_DEBUG, "Looping through "
- "encodings [%d/%d]\n", sm_p->u.lock.encoding,
- ENCODING_INVALID_MAX);
-
- sm_p->u.lock.encoding++;
- if (ENCODING_IS_VALID(sm_p->u.lock.encoding))
- {
- goto try_next_encoding;
- }
- }
- /*
- FIXME: make this a clean error transition by adjusting
- the completion count and/or (not) exiting
- */
- PVFS_perror_gossip("PINT_encode failed", ret);
- js_p->error_code = ret;
- return 1;
- }
-
- /* calculate maximum response message size and allocate it */
- msg->max_resp_sz = PINT_encode_calc_max_size(
- PINT_ENCODE_RESP, msg->req.op, sm_p->u.lock.encoding);
- msg->encoded_resp_p = BMI_memalloc(
- msg->svr_addr, msg->max_resp_sz, BMI_RECV);
- if (!msg->encoded_resp_p)
- {
- /* FIXME: see above FIXME */
- js_p->error_code = -PVFS_ENOMEM;
- return 1;
- }
-
- /*
- recalculate the status user tag based on this the progress
- of the current context like this: status_user_tag = (num_phases *
- (context index) + context phase)
- */
- assert(cur_ctx->index == i);
- status_user_tag = ((LOCK_SM_NUM_PHASES * i) +
- LOCK_SM_PHASE_REQ_MSGPAIR_RECV);
-
- gossip_debug(GOSSIP_LOCK_DEBUG," posting recv with "
- "status_user_tag=%lu (max_size %d)\n",
- status_user_tag, msg->max_resp_sz);
-
- cur_ctx->session_tag = PINT_util_get_next_tag();
-
- cur_ctx->msg_recv_has_been_posted = 0;
- cur_ctx->msg_recv_in_progress = 0;
-
- server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
- ret = job_bmi_recv(
- msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
- cur_ctx->session_tag, BMI_PRE_ALLOC, sm_p, status_user_tag,
- &msg->recv_status, &msg->recv_id, pint_client_sm_context,
- server_config->client_job_bmi_timeout);
- PINT_put_server_config_struct(server_config);
-
- /* ret -1: problem, do not look at msg recv_status */
- /* ret 1: immediate completion, see status */
- /* ret 0: okay */
-
- if (ret < 0) {
- PVFS_perror_gossip("Post of receive failed", ret);
- js_p->error_code = ret;
- continue;
-
- }
-
- if (ret == 0) {
- int tmp = 0;
- /* perform a quick test to see if the recv failed before
- * posting the send; if it reports an error quickly then
- * we can save the confusion of sending a request for
- * which we can't recv a response
- */
- ret = job_test(msg->recv_id, &tmp, NULL,
- &msg->recv_status, 0,
- pint_client_sm_context);
- if (ret < 0) {
- PVFS_perror_gossip("Post of receive failed", ret);
- js_p->error_code = ret;
- continue;
- }
- }
-
- /* either from job_bmi_recv or from job_test finding something */
- if (ret == 1) {
- /*
- * This recv must have completed with an error because the
- * server has not yet been sent our request.
- */
- PVFS_perror_gossip("Receive immediately failed",
- msg->recv_status.error_code);
-
- ret = msg->recv_status.error_code;
- js_p->error_code = ret;
- continue;
- }
-
- cur_ctx->msg_recv_has_been_posted = 1;
- cur_ctx->msg_recv_in_progress = 1;
-
- /* posted the receive okay */
- ++sm_p->u.lock.msgpair_completion_count;
-
- recv_already_posted:
-
- if (cur_ctx->msg_send_has_been_posted &&
- cur_ctx->msg_send_in_progress)
- {
- ++sm_p->u.lock.msgpair_completion_count;
- continue;
- }
-
- status_user_tag = ((LOCK_SM_NUM_PHASES * i) +
- LOCK_SM_PHASE_REQ_MSGPAIR_SEND);
-
- cur_ctx->msg_send_has_been_posted = 0;
- cur_ctx->msg_send_in_progress = 0;
-
- gossip_debug(GOSSIP_LOCK_DEBUG," posting send with "
- "status_user_tag=%lu\n", status_user_tag);
-
- server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
- ret = job_bmi_send_list(
- msg->encoded_req.dest, msg->encoded_req.buffer_list,
- msg->encoded_req.size_list, msg->encoded_req.list_count,
- msg->encoded_req.total_size, cur_ctx->session_tag,
- msg->encoded_req.buffer_type, 1, sm_p, status_user_tag,
- &msg->send_status, &msg->send_id, pint_client_sm_context,
- server_config->client_job_bmi_timeout);
- PINT_put_server_config_struct(server_config);
-
- if (ret < 0) {
- PVFS_perror_gossip("Post of send failed, cancelling recv", ret);
- msg->op_status = msg->send_status.error_code;
- msg->send_id = 0;
- job_bmi_cancel(msg->recv_id, pint_client_sm_context);
-
- js_p->error_code = ret;
- continue;
- }
-
- if (ret == 1) {
- if (msg->send_status.error_code == 0) {
- gossip_debug(GOSSIP_LOCK_DEBUG,
- " lock_post_msgpairs: "
- "send completed immediately.\n");
-
- /* 0 is the valid "completed job id" value */
- cur_ctx->msg_send_has_been_posted = 1;
- msg->send_id = 0;
-
- } else {
- PVFS_perror_gossip("Send immediately failed, cancelling recv",
- msg->recv_status.error_code);
-
- msg->op_status = msg->send_status.error_code;
- msg->send_id = 0;
-
- /* still wait for the recv to complete */
- job_bmi_cancel(msg->recv_id, pint_client_sm_context);
-
- js_p->error_code = msg->send_status.error_code;
- continue;
- }
- } else {
- /* posted the send */
- cur_ctx->msg_send_in_progress = 1;
- cur_ctx->msg_send_has_been_posted = 1;
- ++sm_p->u.lock.msgpair_completion_count;
- }
+ js_p->error_code = -PVFS_ECANCEL;
+ goto exit;
}
- gossip_debug(GOSSIP_LOCK_DEBUG, "lock_post_msgpairs: "
- "completion count is %d, ack_completion count is %d\n",
- sm_p->u.lock.msgpair_completion_count,
- sm_p->u.lock.ack_completion_count);
-
- /* if anything posted, just wait for that to complete, else
- * go sleep then try the remaining msgpairs again */
- if (sm_p->u.lock.msgpair_completion_count
- || sm_p->u.lock.ack_completion_count)
- return 0; /* means go find another machine to run */
- else {
- js_p->error_code = LOCK_RETRY;
- return 1; /* means look at error_code and run my machine again */
- }
-}
-
-/*
- * For lock retry, come here to sleep a bit then go back and post
- * some more msgpairs.
- */
-
-static int lock_post_msgpairs_retry(PINT_client_sm *sm_p,
- job_status_s *js_p)
-{
- /* give up if beyond retry limit */
- ++sm_p->u.lock.retry_count;
- if (sm_p->u.lock.retry_count > sm_p->msgarray_params.retry_limit) {
- gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d exceeds limit %d\n",
- __func__, sm_p->u.lock.retry_count,
- sm_p->msgarray_params.retry_delay);
- js_p->error_code = LOCK_TRANSFERS_COMPLETE;
- return 1;
- }
-
- gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d, wait %d ms\n", __func__,
- sm_p->u.lock.retry_count, sm_p->msgarray_params.retry_delay);
-
- return job_req_sched_post_timer(sm_p->msgarray_params.retry_delay,
- sm_p, 0, js_p, NULL, pint_client_sm_context);
-}
-
-/*
- This state allows us to make sure all posted operations complete and
- are accounted for. since this handles ALL operation completions,
- there's special case handling of completing the msgpair recv. in
- this case we post the flow operations as soon as we see them (the
- main motivation for not using the common msgpairarray code).
-*/
-
-static int lock_complete_operations(
- PINT_client_sm *sm_p, job_status_s *js_p)
-{
- int ret = -PVFS_EINVAL, i;
- unsigned long status_user_tag = (unsigned long)
- js_p->status_user_tag;
- PINT_client_lock_ctx *cur_ctx = NULL;
- PVFS_object_attr * attr;
- int matched_send_or_recv = 0;
-
- gossip_debug(
- GOSSIP_CLIENT_DEBUG, "(%p) lock_datafile_complete_operations "
- "(tag %lu)\n", sm_p, status_user_tag);
-
- assert(sm_p->u.lock.msgpair_completion_count > -1);
- assert(sm_p->u.lock.ack_completion_count > -1);
+ js_p->error_code = 0;
- attr = &sm_p->getattr.attr;
+ attr = &(sm_p->getattr.attr);
assert(attr);
- /* check if we're completing a send or recv msgpair */
- if (STATUS_USER_TAG_IS_SEND_OR_RECV(status_user_tag))
- {
- /*
- * The completion count might validly be zero when recovering from
- * a cancellation.
- */
- if (sm_p->u.lock.msgpair_completion_count)
- {
- ret = lock_complete_context_send_or_recv(sm_p, js_p);
- if (ret < 0) {
- /* problem */
- PVFS_perror_gossip(
- "io_complete_context_send_or_recv failed", ret);
- js_p->error_code = ret;
- return 1;
- } else if (ret == 0) {
- /* is a send */
- gossip_debug(GOSSIP_LOCK_DEBUG, " matched send in context "
- "%lu; continuing.\n",
- STATUS_USER_TAG_GET_INDEX(
- status_user_tag,
- LOCK_SM_PHASE_REQ_MSGPAIR_SEND));
- js_p->error_code = 0;
- /* If send had problem, BMI will apparently ensure that the
- * recv will fail too, so handle the retry stuff there.
- */
- return 0;
- } else {
- /* is a recv */
- assert(ret == LOCK_RECV_COMPLETED);
- matched_send_or_recv = 1;
- }
- }
- }
-
- /* if we've just completed a recv above, process the receive
- * and post the flow if we're doing a read
- */
- if (ret == LOCK_RECV_COMPLETED)
- {
- ret = lock_process_context_recv(sm_p, js_p, &cur_ctx);
- if (ret < 0)
- {
- char buf[64] = {0};
- PVFS_strerror_r(ret, buf, 64);
-
- gossip_debug(GOSSIP_LOCK_DEBUG,
- "%s: lock_process_context_recv failed: "
- "%s (%d remaining msgpairs)\n",
- __func__, buf, sm_p->u.lock.msgpair_completion_count);
-
- js_p->error_code = ret;
- /* if recv failed, probably have to do the send again too */
- cur_ctx->msg_send_has_been_posted = 0;
- cur_ctx->msg_recv_has_been_posted = 0;
- goto check_next_step;
- }
- }
-
- /* check if we've completed all msgpairs and posted all flows */
- if (matched_send_or_recv)
- {
- if (sm_p->u.lock.msgpair_completion_count == 0)
- {
- gossip_debug(GOSSIP_LOCK_DEBUG, "*** all msgpairs complete "
- "\n");
- }
- else
- {
- gossip_debug(
- GOSSIP_LOCK_DEBUG, "*** %d msgpair completions "
- "pending\n", sm_p->u.lock.msgpair_completion_count);
- }
- }
+ /* Reset the msgpair state for each round. */
+ memset(sm_p->msgarray, 0,
+ (sm_p->u.lock.datafile_count * sizeof(PINT_sm_msgpair_state)));
- check_next_step:
+ /* Algorithm for handling lock acquiring is done here. Basically,
+ * we have 2 approaches, from which we can build hybrid methods.
+ *
+ * 1) Optimistic - grab as many locks as possible.
+ * 2) Two-phase - get locks in order
+ *
+ * Based on the client_lock_type, we will figure out which method
+ * to use here at the appropriate time.*/
+ lock_choose_method(sm_p);
+ ret = lock_fill_msgpair_array(sm_p);
- /*
- * If something is pending, return 0 to let SM find the next thing
- * to do.
- */
+ js_p->error_code = ret;
- if (sm_p->u.lock.msgpair_completion_count
- || sm_p->u.lock.ack_completion_count) {
- if (sm_p->op_cancelled)
- gossip_debug(GOSSIP_LOCK_DEBUG, "detected lock cancellation with "
- " %d write acks pending\n",
- sm_p->u.lock.ack_completion_count);
- else
- gossip_debug(GOSSIP_LOCK_DEBUG, " %d acks "
- "pending, %d msgpair\n",
- sm_p->u.lock.ack_completion_count,
- sm_p->u.lock.msgpair_completion_count);
- return 0;
- }
-
- /*
- * Else either we've finished it all or have some msgpairs to retry
- * that failed earlier.
- */
- for (i=0; i < sm_p->u.lock.datafile_count; i++) {
- PINT_client_lock_ctx *cur_ctx = &sm_p->u.lock.contexts[i];
- if (!cur_ctx->msg_recv_has_been_posted)
- break;
- if (!cur_ctx->msg_send_has_been_posted)
- break;
- }
- if (i < sm_p->u.lock.datafile_count && !sm_p->op_cancelled) {
- gossip_debug(GOSSIP_LOCK_DEBUG,
- "*** %s: some msgpairs to repost\n", __func__);
- js_p->error_code = LOCK_RETRY;
- } else {
- gossip_debug(GOSSIP_LOCK_DEBUG, "*** all operations %s "
- "(msgpairs, acks)\n",
- (sm_p->op_cancelled ? "cancelled" : "completed"));
- js_p->error_code = LOCK_TRANSFERS_COMPLETE;
- }
+ exit:
return 1;
}
-
static int lock_analyze_results(PINT_client_sm *sm_p,
job_status_s *js_p)
{
+ gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) lock state: "
+ "lock_analyze_results\n", sm_p);
+
/* Now that we have all the datafile sizes,
* this state allows us to finish our check that the file request
* is not beyond EOF, and return the appropriate value for bytes
@@ -978,60 +589,103 @@ static int lock_analyze_results(PINT_cli
* the datafiles (the actual EOF).
*/
- PVFS_offset eof = 0;
- PVFS_offset eor;
- PVFS_offset filereq_ub_offset;
- int ret;
PVFS_object_attr * attr;
attr = &sm_p->getattr.attr;
assert(attr);
- ret = lock_find_offset(
- sm_p,
- PINT_REQUEST_TOTAL_BYTES(sm_p->u.lock.mem_req),
- &filereq_ub_offset);
- if(ret < 0)
- {
- js_p->error_code = ret;
- goto error_exit;
- }
-
- eor = filereq_ub_offset + sm_p->u.lock.file_req_offset;
-
- eof = attr->u.meta.dist->methods->
- logical_file_size(
- attr->u.meta.dist->params,
- attr->u.meta.dfile_count,
- sm_p->u.lock.dfile_size_array);
- if(eof > eor)
+ js_p->error_code = 0;
+
+ /* Not sure about the rest of the code, but here is a nice spot to
+ * check whether the necessary locks have been acquired or
+ * released. */
+ if (sm_p->u.lock.lock_type == PVFS_CLIENT_RELEASE)
{
- eor = eof;
-
- /* we found a logical offset that is past the end of the
- * request, so we know the request is not past EOF
- */
-/* sm_p->u.lock.lock_resp_p->granted_bytes =
- PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req); */
+ if (sm_p->u.lock.server_incomplete_count != 0)
+ {
+ gossip_err("PVFS_sys_lock: Error only released %d of %d\n",
+ sm_p->u.lock.server_incomplete_count,
+ sm_p->msgarray_count);
+ js_p->error_code = -PVFS_EINVAL;
+ }
+ else
+ {
+ qlist_del(&(sm_p->u.lock.cur_lock_id_list_p->lock_link));
+ free(sm_p->u.lock.cur_lock_id_list_p->lock_id_arr);
+ free(sm_p->u.lock.cur_lock_id_list_p->datafile_handle_arr);
+ free(sm_p->u.lock.cur_lock_id_list_p);
+ sm_p->u.lock.cur_lock_id_list_p = NULL;
+ if (!qlist_empty(sm_p->u.lock.lock_id_list_head_p))
+ {
+ /* Get the information for the next lock to release */
+ sm_p->u.lock.cur_lock_id_list_p =
+ qlist_entry(sm_p->u.lock.lock_id_list_head_p->next,
+ PVFS_lock_id_list, lock_link);
+
+ gossip_debug(
+ GOSSIP_LOCK_DEBUG,
+ "PVFS_sys_lock: Continuing to next lock release "
+ "of size %d\n",
+ sm_p->u.lock.cur_lock_id_list_p->lock_id_arr_count);
+
+ js_p->error_code = LOCK_INCOMPLETE;
+ }
+ }
}
else
{
- PVFS_size total_size;
-
- ret = lock_find_total_size(sm_p, eof, &total_size);
- if(ret < 0)
- {
- js_p->error_code = ret;
- goto error_exit;
- }
-
- /* sm_p->u.lock.lock_resp_p->total_completed = total_size;*/
+ if (sm_p->u.lock.server_incomplete_count != 0)
+ {
+ /* Build the heap, now that everything has been updated */
+ if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_SOME)
+ {
+ build_heap(&(sm_p->u.lock.server_heap),
+ heap_swap_fn);
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NONBLOCK)
+ {
+ int64_t server_offset, max_offset;
+ int server_index, block_server_index;
+
+ /* Build the heap and get rid of completed requests. */
+
+ build_heap(&(sm_p->u.lock.server_heap),
+ heap_swap_fn);
+ print_heap(&(sm_p->u.lock.server_heap), heap_print_fn);
+ build_heap(&(sm_p->u.lock.server_heap),
+ heap_swap_fn);
+
+ do {
+ lock_heap_min(&sm_p->u.lock.server_heap, &max_offset,
+ &block_server_index);
+ if (max_offset == -1) {
+ gossip_debug(GOSSIP_LOCK_DEBUG, "%s delete "
+ "server=%d, max_offset=%Ld, heap_size=%d"
+ ",total_size = %d\n",
+ PVFS_lock_type_mapping[
+ sm_p->u.lock.lock_server_cur_method],
+ block_server_index,
+ max_offset, sm_p->u.lock.server_heap.size,
+ sm_p->u.lock.server_heap.max_size);
+ lock_heap_extract_min(&sm_p->u.lock.server_heap,
+ &server_offset, &server_index,
+ heap_cpy_fn, heap_swap_fn);
+ }
+ else {
+ gossip_debug(GOSSIP_LOCK_DEBUG, "%s: min = %Ld\n",
+ PVFS_lock_type_mapping[
+ sm_p->u.lock.lock_server_cur_method],
+ max_offset);
+ }
+ } while (max_offset == -1);
+ }
+
+ js_p->error_code = LOCK_INCOMPLETE;
+ }
}
-
-
- js_p->error_code = 0;
-
- error_exit:
+
return 1;
}
@@ -1041,22 +695,28 @@ static int lock_cleanup(PINT_client_sm *
gossip_debug(GOSSIP_CLIENT_DEBUG,
"(%p) lock state: lock_cleanup\n", sm_p);
- if (sm_p->u.lock.lock_type == PVFS_RELEASE)
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
+ gossip_debug(GOSSIP_LOCK_DEBUG, "Client response final - acquired "
+ "%Ld byte(s).\n",
+ sm_p->u.lock.lock_resp_p->bytes_accessed);
+ else
+ gossip_debug(GOSSIP_LOCK_DEBUG, "Client response final - released "
+ "%Ld byte(s).\n",
+ sm_p->u.lock.lock_resp_p->bytes_accessed);
+
+ /* Clean up the lock id array. */
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
{
- if (sm_p->u.lock.lock_resp_p->granted_bytes == -1 *
- (*(sm_p->u.lock.lock_id_arr_count_p)))
- free(*(sm_p->u.lock.lock_id_arr_p));
- else
- gossip_err("lock_cleanup: Did not free lock_id_arr since "
- "releasing was unsuccessful\n");
+ free(sm_p->u.lock.server_lock_info_arr);
+ free_heap(&sm_p->u.lock.server_heap);
}
-
- lock_contexts_destroy(sm_p);
-
+
lock_datafile_index_array_destroy(sm_p);
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
+ PINT_msgpairarray_destroy(sm_p->msgarray);
+
if(sm_p->u.lock.dfile_size_array)
{
PINT_SM_DATAFILE_SIZE_ARRAY_DESTROY(&sm_p->u.lock.dfile_size_array);
@@ -1078,277 +738,129 @@ static int lock_cleanup(PINT_client_sm *
return 0;
}
-/*******************************************************************/
-
-/*
- returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
- and -PVFS_error on failure
-*/
-static inline int lock_complete_context_send_or_recv(
- PINT_client_sm *sm_p,
- job_status_s *js_p)
+static int lock_completion_fn(void *user_args,
+ struct PVFS_server_resp *resp_p,
+ int index)
{
- int ret = -PVFS_EINVAL, index = 0;
- unsigned long status_user_tag = 0;
- PINT_client_lock_ctx *cur_ctx = NULL;
- PINT_sm_msgpair_state *msg = NULL;
-
- gossip_debug(GOSSIP_LOCK_DEBUG,
- "- lock_complete_context_send_or_recv called\n");
-
- assert(sm_p && js_p);
- assert(sm_p->op == PVFS_SYS_LOCK);
-
- status_user_tag = (unsigned long)js_p->status_user_tag;
-
- if (STATUS_USER_TAG_TYPE(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV))
- {
- index = STATUS_USER_TAG_GET_INDEX(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV);
-
- gossip_debug(GOSSIP_LOCK_DEBUG, "got a recv completion with "
- "context index %d\n", index);
-
- cur_ctx = &sm_p->u.lock.contexts[index];
- assert(cur_ctx);
-
- msg = &cur_ctx->msg;
-
- msg->recv_id = 0;
- msg->recv_status = *js_p;
-
- assert(msg->recv_status.error_code <= 0);
- assert(msg->recv_status.actual_size <= msg->max_resp_sz);
-
- cur_ctx->msg_recv_in_progress = 0;
- sm_p->u.lock.msgpair_completion_count--;
-
- ret = LOCK_RECV_COMPLETED;
- }
- else if (STATUS_USER_TAG_TYPE(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_SEND))
- {
- index = STATUS_USER_TAG_GET_INDEX(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV);
-
- gossip_debug(GOSSIP_LOCK_DEBUG, "got a send completion with "
- "context index %d\n", index);
-
- cur_ctx = &sm_p->u.lock.contexts[index];
- assert(cur_ctx);
-
- msg = &cur_ctx->msg;
-
- msg->send_id = 0;
- msg->send_status = *js_p;
-
- assert(msg->send_status.error_code <= 0);
-
- cur_ctx->msg_send_in_progress = 0;
- sm_p->u.lock.msgpair_completion_count--;
-
- ret = 0;
- }
- return ret;
-}
+ PVFS_object_attr *attr = NULL;
+ int done = -1;
+ PINT_client_sm *sm_p = (PINT_client_sm *)user_args;
-static inline int lock_decode_ack_response(
- PINT_client_lock_ctx *cur_ctx,
- struct PINT_decoded_msg *decoded_resp,
- struct PVFS_server_resp **resp)
-{
- int ret = -PVFS_EINVAL;
-
- gossip_debug(GOSSIP_LOCK_DEBUG, "- lock_decode_ack_response called\n");
- assert(cur_ctx && decoded_resp && resp);
-
- ret = PINT_serv_decode_resp(
- cur_ctx->msg.fs_id, cur_ctx->msg.encoded_resp_p, decoded_resp,
- &cur_ctx->msg.svr_addr,
- cur_ctx->msg.recv_status.actual_size, resp);
+ attr = &(sm_p->getattr.attr);
+ assert(attr);
+ assert(resp_p->op == PVFS_SERV_LOCK);
- if (ret)
+ if(resp_p->status != 0)
{
- PVFS_perror("PINT_server_decode_resp failed", ret);
- return ret;
+ return resp_p->status;
}
- assert((*resp)->status < 1);
- cur_ctx->msg.op_status = (*resp)->status;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "*Response from server=%d (index=%d) next_abs_off=%Ld\n",
+ sm_p->msgarray[index].req.u.lock.server_nr,
+ sm_p->msgarray[index].datafile_index,
+ resp_p->u.lock.next_abs_offset);
+
+ /* As index refers to the position in the msgarray, fix this to
+ * the actual server */
+ index = sm_p->msgarray[index].datafile_index;
- if (cur_ctx->msg.recv_status.error_code || cur_ctx->msg.op_status)
+ /* Modify the client response based on the server response. */
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
{
- gossip_debug(
- GOSSIP_LOCK_DEBUG, " error %d with status %d related "
- "to response from context %p.\n",
- cur_ctx->msg.recv_status.error_code,
- cur_ctx->msg.op_status, cur_ctx);
-
- if (cur_ctx->msg.recv_status.error_code)
- {
- PVFS_perror_gossip(
- "lock_decode_ack_response (recv_status.error_code)",
- cur_ctx->msg.recv_status.error_code);
- ret = cur_ctx->msg.recv_status.error_code;
- }
- else if (cur_ctx->msg.op_status)
- {
- PVFS_perror_gossip("lock_decode_ask_response (op_status)",
- cur_ctx->msg.op_status);
- ret = cur_ctx->msg.op_status;
- }
+ (sm_p->u.lock.cur_lock_id_list_p->lock_id_arr)[index] =
+ resp_p->u.lock.lock_id;
+ (sm_p->u.lock.cur_lock_id_list_p->datafile_handle_arr)[index] =
+ attr->u.meta.dfile_array[
+ sm_p->u.lock.datafile_index_array[index]];
+ sm_p->u.lock.lock_resp_p->bytes_accessed +=
+ resp_p->u.lock.bytes_accessed;
+ sm_p->u.lock.server_lock_info_arr[index].last_abs_offset_locked =
+ resp_p->u.lock.last_abs_offset_locked;
+ sm_p->u.lock.server_lock_info_arr[index].next_abs_offset =
+ resp_p->u.lock.next_abs_offset;
- PINT_serv_free_msgpair_resources(
- &cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
- decoded_resp, &cur_ctx->msg.svr_addr,
- cur_ctx->msg.max_resp_sz);
- }
- return ret;
-}
-
-static inline int lock_check_context_status(
- PINT_client_lock_ctx *cur_ctx, int lock_type,
- PVFS_size *total_size)
-{
- int ret = 0;
-
- gossip_debug(GOSSIP_LOCK_DEBUG, "- lock_check_context_status called\n");
-
- assert(cur_ctx && total_size);
-
- if (cur_ctx->msg.send_status.error_code)
- {
- gossip_debug(GOSSIP_LOCK_DEBUG,
- " error (%d) in msgpair send for context %p\n",
- cur_ctx->msg.send_status.error_code, cur_ctx);
- ret = cur_ctx->msg.send_status.error_code;
- }
- else if (cur_ctx->msg.recv_status.error_code)
- {
- gossip_debug(GOSSIP_LOCK_DEBUG,
- " error (%d) in msgpair recv for context %p\n",
- cur_ctx->msg.recv_status.error_code, cur_ctx);
- ret = cur_ctx->msg.recv_status.error_code;
- }
- else if (lock_type == PVFS_ACQUIRE)
- {
- gossip_debug(
- GOSSIP_LOCK_DEBUG, " %lld bytes acquired from context %p\n",
- cur_ctx->bytes_granted, cur_ctx);
- *total_size += cur_ctx->bytes_granted;
- }
- else if (lock_type == PVFS_RELEASE)
- {
- gossip_debug(
- GOSSIP_LOCK_DEBUG, " %lld bytes released from context %p\n",
- cur_ctx->bytes_granted, cur_ctx);
+ if ((resp_p->u.lock.request_finished == 1) &&
+ (sm_p->u.lock.server_lock_info_arr[index].next_abs_offset == -1))
+ {
+ done = 1;
+ sm_p->u.lock.server_incomplete_count--;
+ }
- *total_size += cur_ctx->bytes_granted;
- }
-
- return ret;
-}
-
-/**
- * process_context_recv handles the ack or nack from the server
- * in response to the I/O request. This is called for each I/O context
- * i.e. each specific server response for each datafile.
- */
-static inline int lock_process_context_recv(
- PINT_client_sm *sm_p,
- job_status_s *js_p,
- PINT_client_lock_ctx **out_ctx)
-{
- int ret = -PVFS_EINVAL, index = 0;
- unsigned long status_user_tag = 0;
- struct PINT_decoded_msg decoded_resp;
- struct PVFS_server_resp *resp = NULL;
- PINT_client_lock_ctx *cur_ctx = NULL;
-
- gossip_debug(GOSSIP_LOCK_DEBUG,
- "- io_process_context_recv called\n");
-
- assert(sm_p && js_p);
- assert(STATUS_USER_TAG_TYPE(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV));
-
- status_user_tag = (unsigned long)js_p->status_user_tag;
-
- index = STATUS_USER_TAG_GET_INDEX(
- status_user_tag, LOCK_SM_PHASE_REQ_MSGPAIR_RECV);
-
- cur_ctx = &sm_p->u.lock.contexts[index];
- assert(cur_ctx);
- *out_ctx = cur_ctx;
-
- if (js_p->error_code)
- {
- {
- char buf[1024];
- PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
- buf[sizeof(buf)-1] = '\0';
- gossip_debug(GOSSIP_LOCK_DEBUG, "%s: entered with error: %s\n",
- __func__, buf);
+ /* Modify the heap */
+ if (((sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK) ||
+ (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_BLOCK) ||
+ (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_BLOCK)) &&
+ (done != 1))
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG, "inserting server=%d,next_abs_off "
+ "= %Ld into heap (size=%d,max_size=%d).\n", index,
+ resp_p->u.lock.next_abs_offset,
+ sm_p->u.lock.server_heap.size,
+ sm_p->u.lock.server_heap.max_size);
+ lock_heap_insert(&sm_p->u.lock.server_heap,
+ resp_p->u.lock.next_abs_offset,
+ &sm_p->u.lock.server_lock_info_arr[index],
+ heap_cpy_fn);
}
- return js_p->error_code;
- }
-
- /* decode the response from the server */
- ret = lock_decode_ack_response(cur_ctx, &decoded_resp, &resp);
- if (ret)
- {
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NONBLOCK)
{
- char buf[1024];
- PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
- buf[sizeof(buf)-1] = '\0';
- gossip_debug(GOSSIP_LOCK_DEBUG, "%s: failed: %s\n", __func__, buf);
+ assert(sm_p->u.lock.server_lock_info_arr[index].heap_node_p
+ != NULL);
+ /* Update the heap_node associated with the server. If it
+ * is done, these heap_nodes will be extracted in the
+ * analyze stage. */
+ sm_p->u.lock.server_lock_info_arr[index].heap_node_p->key =
+ resp_p->u.lock.next_abs_offset;
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_SOME)
+ {
+ /* If a req was finished, move it back to the heap and add
+ * it back to the count, otherwise just update the heap
+ * entries for these responses and then build the heap in
+ * the completion stage */
+ if (sm_p->u.lock.server_lock_info_arr[index].heap_node_p == NULL)
+ {
+ sm_p->u.lock.server_incomplete_count++;
+ lock_heap_insert(&sm_p->u.lock.server_heap,
+ resp_p->u.lock.next_abs_offset,
+ &sm_p->u.lock.server_lock_info_arr[index],
+ heap_cpy_fn);
+ }
+ else
+ {
+ assert(resp_p->u.lock.next_abs_offset != -1);
+ sm_p->u.lock.server_lock_info_arr[index].heap_node_p->key =
+ resp_p->u.lock.next_abs_offset;
+ }
}
- return ret;
- }
-
- /* Modify the client response based on the server response. */
- if (sm_p->u.lock.lock_type == PVFS_ACQUIRE)
- {
- (*(sm_p->u.lock.lock_id_arr_p))[index] = resp->u.lock.lock_id;
- sm_p->u.lock.lock_resp_p->lock_id = resp->u.lock.lock_id;
- sm_p->u.lock.lock_resp_p->bstream_size = resp->u.lock.bstream_size;
- sm_p->u.lock.lock_resp_p->granted_bytes +=resp->u.lock.granted_bytes;
- }
- else
- {
- if (resp->u.lock.granted_bytes != -1)
- gossip_err("PVFS_RELEASE: Failed to fully release all locks "
- "with req. Released %Ld bytes.\n",
- resp->u.lock.granted_bytes);
- sm_p->u.lock.lock_resp_p->granted_bytes += resp->u.lock.granted_bytes;
}
-
- /* save the datafile size */
- sm_p->u.lock.dfile_size_array[cur_ctx->index] = resp->u.io.bstream_size;
-
- /* now we can destroy I/O request response resources */
- ret = PINT_serv_free_msgpair_resources(
- &cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
- &decoded_resp, &cur_ctx->msg.svr_addr,
- cur_ctx->msg.max_resp_sz);
-
- if (ret)
+ else /* Release all*/
{
- PVFS_perror_gossip("PINT_serv_free_msgpair_resources "
- "failed", ret);
- return ret;
- }
-
- return ret;
-}
+ sm_p->u.lock.lock_resp_p->bytes_accessed +=
+ resp_p->u.lock.bytes_accessed;
+ if (resp_p->u.lock.request_finished == 1)
+ sm_p->u.lock.server_incomplete_count--;
+
+ gossip_debug(GOSSIP_LOCK_DEBUG, "Response from server %d - released "
+ "%Ld bytes (%d left).\n", index,
+ resp_p->u.lock.bytes_accessed,
+ sm_p->u.lock.server_incomplete_count);
+ }
+
+#if 0
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
+ print_server_lock_info_arr(sm_p);
+#endif
+ return 0;
+};
-/*
- determines what subset of the datafiles actually contain data that
- we are interested in for this request. returns 0 on success,
- -PVFS_error on failure
-*/
static int lock_find_target_datafiles(
PVFS_Request mem_req,
@@ -1453,178 +965,6 @@ static int lock_find_target_datafiles(
return 0;
}
-
-/* If there are no datafiles that have a logical
- * offset past the upper bound of the file request, we know that
- * the request is beyond the EOF of the file. We compute
- * the return value for bytes read by finding the upper bound of the
- * memory request *within* the logical file (before EOF). This is
- * the end of the contiguous segment in the file request < EOF.
- * The number of bytes read is then the length of the file request
- * from start to this point.
- */
-
-static int lock_find_total_size(
- PINT_client_sm * sm_p,
- PVFS_offset final_offset,
- PVFS_size * total_return_size)
-{
- int res;
- PVFS_offset current_offset;
- PVFS_offset offsets[LOCK_MAX_SEGMENT_NUM];
- PVFS_size sizes[LOCK_MAX_SEGMENT_NUM];
- PINT_Request_state * filereq_state;
- PINT_Request_state * memreq_state;
- PINT_request_file_data rfdata;
- PINT_Request_result result;
- PVFS_size total_size = 0;
- PVFS_object_attr * attr;
- int index = 0;
-
- /* if the final offset is zero, then the file size is zero */
- if(final_offset == 0)
- {
- *total_return_size = 0;
- return 0;
- }
-
- attr = &sm_p->getattr.attr;
-
- filereq_state = PINT_new_request_state(sm_p->u.lock.file_req);
- memreq_state = PINT_new_request_state(sm_p->u.lock.mem_req);
-
- rfdata.server_nr = 0;
- rfdata.server_ct = 1;
- rfdata.fsize = final_offset;
- rfdata.dist = attr->u.meta.dist;
- rfdata.extend_flag = 0;
-
- result.offset_array = offsets;
- result.size_array = sizes;
- result.segmax = LOCK_MAX_SEGMENT_NUM;
- result.bytemax = final_offset;
-
- PINT_REQUEST_STATE_SET_FINAL(
- filereq_state, final_offset);
-
- do
- {
- result.segs = 0;
- result.bytes = 0;
-
- res = PINT_process_request(filereq_state, memreq_state,
- &rfdata, &result, PINT_SERVER);
- if(res < 0)
- {
- goto exit;
- }
-
- for(index = 0; index < result.segs; ++index)
- {
- current_offset = sm_p->u.lock.file_req_offset + offsets[index];
- if((final_offset >= current_offset) &&
- (final_offset <= (current_offset + sizes[index])))
- {
- total_size += (final_offset - current_offset);
- break;
- }
- else if(final_offset < current_offset)
- {
- break;
- }
- else
- {
- total_size += sizes[index];
- }
- }
-
- } while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
-
- *total_return_size = total_size;
-
- exit:
-
- PINT_free_request_state(filereq_state);
- PINT_free_request_state(memreq_state);
- return 0;
-}
-
-/* computes the logical offset in the file request from the size
- * of contiguous buffer. This function acts only on the file request
- * since the actual size of the file doesn't matter.
- */
-
-static int lock_find_offset(
- PINT_client_sm * sm_p,
- PVFS_size contig_size,
- PVFS_offset * total_return_offset)
-{
- PINT_Request_state * filereq_state;
- PINT_Request_state * memreq_state;
- PINT_request_file_data rfdata;
- PINT_Request_result result;
- int res;
- PVFS_offset offsets[LOCK_MAX_SEGMENT_NUM];
- PVFS_size sizes[LOCK_MAX_SEGMENT_NUM];
- PVFS_offset total_offset = 0;
- PVFS_size total_size = 0;
- PVFS_object_attr * attr;
- int index = 0;
-
- attr = &sm_p->getattr.attr;
-
- filereq_state = PINT_new_request_state(sm_p->u.lock.file_req);
- memreq_state = PINT_new_request_state(sm_p->u.lock.mem_req);
-
- rfdata.server_nr = 0;
- rfdata.server_ct = 1;
- rfdata.fsize = 0;
- rfdata.dist = attr->u.meta.dist;
- rfdata.extend_flag = 1;
-
- result.offset_array = offsets;
- result.size_array = sizes;
- result.segmax = LOCK_MAX_SEGMENT_NUM;
- result.bytemax = contig_size;
-
- PINT_REQUEST_STATE_SET_FINAL(
- filereq_state, contig_size);
- do
- {
- result.segs = 0;
- result.bytes = 0;
-
- res = PINT_process_request(filereq_state, memreq_state,
- &rfdata, &result, PINT_SERVER);
- if(res < 0)
- {
- PINT_free_request_state(filereq_state);
- PINT_free_request_state(memreq_state);
- return res;
- }
-
- for(index = 0; index < result.segs; ++index)
- {
- if(contig_size <= (total_size + sizes[index]))
- {
- total_offset = offsets[index] + (contig_size - total_size);
- break;
- }
- else
- {
- total_offset = offsets[index] + sizes[index];
- total_size += sizes[index];
- }
- }
-
- } while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
-
- *total_return_offset = total_offset;
-
- PINT_free_request_state(filereq_state);
- PINT_free_request_state(memreq_state);
- return 0;
-}
static int lock_datafile_index_array_init(
PINT_client_sm *sm_p,
@@ -1651,63 +991,464 @@ static void lock_datafile_index_array_de
sm_p->u.lock.datafile_count = 0;
}
-static int lock_contexts_init(
- PINT_client_sm *sm_p,
- int context_count,
- PVFS_object_attr *attr)
+static void print_server_lock_info_arr(
+ PINT_client_sm *sm_p)
{
- int ret;
+ PINT_server_lock_info *lock_info_p = sm_p->u.lock.server_lock_info_arr;
int i = 0;
- sm_p->u.lock.contexts = (PINT_client_lock_ctx *)malloc(
- context_count * sizeof(PINT_client_lock_ctx));
- if(!sm_p->u.lock.contexts)
+ fprintf(stdout,
+ "server_lock_info_arr: %d of %d valid\n"
+ "server | last_abs_offset_locked | next_abs_offset | heap_node\n",
+ sm_p->u.lock.server_incomplete_count,
+ sm_p->u.lock.datafile_count);
+ for (i = 0; i < sm_p->u.lock.datafile_count; i++)
{
- return -PVFS_ENOMEM;
+ fprintf(stdout, "%6d | %22Ld | %15Ld | %s\n",
+ sm_p->u.lock.datafile_index_array[i],
+ lock_info_p[i].last_abs_offset_locked,
+ lock_info_p[i].next_abs_offset,
+ (lock_info_p[i].heap_node_p == NULL) ? "n" : "y");
}
+ fprintf(stdout, "\n");
+ fflush(stdout);
+}
- memset(sm_p->u.lock.contexts, 0,
- (context_count * sizeof(PINT_client_lock_ctx)));
- sm_p->u.lock.context_count = context_count;
-
- for(i = 0; i < context_count; ++i)
- {
- PINT_client_lock_ctx * cur_ctx = &sm_p->u.lock.contexts[i];
- PINT_sm_msgpair_state *msg = &cur_ctx->msg;
-
- msg->fs_id = sm_p->object_ref.fs_id;
- msg->handle =
- attr->u.meta.dfile_array[
- sm_p->u.lock.datafile_index_array[i]];
- msg->retry_flag = PVFS_MSGPAIR_NO_RETRY;
- msg->comp_fn = NULL;
-
- ret = PINT_cached_config_map_to_server(
- &msg->svr_addr, msg->handle, msg->fs_id);
- if(ret)
- {
- gossip_err("Failed to map meta server address\n");
- free(sm_p->u.lock.contexts);
- return ret;
- }
+static int lock_choose_method(
+ PINT_client_sm *sm_p)
+{
+ switch(sm_p->u.lock.lock_type)
+ {
+ case PVFS_CLIENT_ACQUIRE_TWO_PHASE:
+ if (sm_p->u.lock.lock_server_cur_method == PVFS_SERVER_LOCK_INIT)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_NEW_BLOCK;
+ else
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_BLOCK;
+ break;
+ case PVFS_CLIENT_ACQUIRE_ONE_TRY:
+ if (sm_p->u.lock.lock_server_cur_method == PVFS_SERVER_LOCK_INIT)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_RELEASE_SOME;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_SOME)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_BLOCK;
+ break;
+ case PVFS_CLIENT_ACQUIRE_ALT_TRY:
+ if (sm_p->u.lock.lock_server_cur_method == PVFS_SERVER_LOCK_INIT)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_RELEASE_SOME;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_SOME)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_BLOCK;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_BLOCK)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_ACQUIRE_NONBLOCK;
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NONBLOCK)
+ sm_p->u.lock.lock_server_cur_method =
+ PVFS_SERVER_RELEASE_SOME;
+ break;
+ case PVFS_CLIENT_RELEASE:
+ sm_p->u.lock.lock_server_cur_method = PVFS_SERVER_RELEASE_ALL;
+ break;
+ default:
+ gossip_err("invalid client request type!\n");
+ }
- gossip_debug(GOSSIP_LOCK_DEBUG, "initializing context[%d] %p\n",
- i, cur_ctx);
+ return 0;
+}
+
+static int lock_fill_msgpair_array(
+ PINT_client_sm *sm_p)
+{
+ int i, ret = -1;
+ PVFS_object_attr *attr = NULL;
+ PVFS_handle datafile_handle;
+ PVFS_Request fill_file_req = PVFS_BYTE;
+ PVFS_offset fill_file_req_offset = -1;
+ PVFS_offset fill_final_offset = -1;
+ PVFS_size fill_aggregate_size = -1;
+ PVFS_id_gen_t fill_lock_id = -1;
+
+ attr = &(sm_p->getattr.attr);
+ assert(attr);
+
+ /* Fill all requests */
+ if ((sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK ) ||
+ (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_BLOCK) ||
+ (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_ALL))
+ {
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
+ sm_p->msgarray_count = sm_p->u.lock.datafile_count;
+ else
+ {
+ sm_p->msgarray_count =
+ sm_p->u.lock.cur_lock_id_list_p->lock_id_arr_count;
+ sm_p->u.lock.server_incomplete_count = sm_p->msgarray_count;
+ }
+
+ for (i = 0; i < sm_p->msgarray_count; i++)
+ {
+ if (sm_p->u.lock.lock_type != PVFS_CLIENT_RELEASE)
+ datafile_handle = attr->u.meta.dfile_array[
+ sm_p->u.lock.datafile_index_array[i]];
+ else
+ datafile_handle =
+ sm_p->u.lock.cur_lock_id_list_p->datafile_handle_arr[i];
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ " filling ALL %s lock requests for %llu\n",
+ PVFS_lock_type_mapping[
+ sm_p->u.lock.lock_server_cur_method],
+ llu(datafile_handle));
+
+ if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_NONBLOCK)
+ {
+ fill_file_req = sm_p->u.lock.file_req;
+ fill_file_req_offset = sm_p->u.lock.file_req_offset;
+ fill_final_offset = __LONG_LONG_MAX__;
+ fill_aggregate_size =
+ PINT_REQUEST_TOTAL_BYTES(sm_p->u.lock.mem_req);
+ fill_lock_id = -1;
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NEW_BLOCK)
+ {
+ fill_file_req = sm_p->u.lock.file_req;
+ fill_file_req_offset = sm_p->u.lock.file_req_offset;
+ fill_final_offset = 0;
+ fill_aggregate_size =
+ PINT_REQUEST_TOTAL_BYTES(sm_p->u.lock.mem_req);
+ fill_lock_id = -1;
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_ALL)
+ {
+ fill_file_req = PVFS_BYTE;
+ fill_file_req_offset = -1;
+ fill_final_offset = -1;
+ fill_aggregate_size = -1;
+ fill_lock_id =
+ (sm_p->u.lock.cur_lock_id_list_p->lock_id_arr)[i];
+ }
- cur_ctx->index = i;
- cur_ctx->server_nr = sm_p->u.lock.datafile_index_array[i];
- cur_ctx->data_handle =
- attr->u.meta.dfile_array[cur_ctx->server_nr];
+ PINT_SERVREQ_LOCK_FILL(
+ sm_p->msgarray[i].req,
+ *sm_p->cred_p,
+ sm_p->object_ref.fs_id,
+ datafile_handle,
+ sm_p->u.lock.io_type,
+ sm_p->u.lock.lock_server_cur_method,
+ sm_p->u.lock.datafile_index_array[i],
+ attr->u.meta.dfile_count,
+ attr->u.meta.dist,
+ fill_file_req,
+ fill_file_req_offset,
+ fill_final_offset,
+ fill_aggregate_size,
+ fill_lock_id);
+
+ sm_p->msgarray[i].fs_id = sm_p->object_ref.fs_id;
+ sm_p->msgarray[i].handle = sm_p->object_ref.handle;
+ sm_p->msgarray[i].retry_flag = PVFS_MSGPAIR_RETRY;
+ sm_p->msgarray[i].comp_fn = lock_completion_fn;
+ sm_p->msgarray[i].datafile_index = i;
+
+ ret = PINT_cached_config_map_to_server(
+ &sm_p->msgarray[i].svr_addr, datafile_handle,
+ sm_p->object_ref.fs_id);
+ if(ret < 0)
+ {
+ gossip_err("Failed to map meta server address\n");
+ return ret;
+ }
+ }
}
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_NONBLOCK)
+ {
+ int msg_index = 0, server_index;
+
+ /* Figure out which servers need to acquire more locks.
+ * Modify the heap nodes associated in the completion_fn */
+ for (i = 0; i < sm_p->u.lock.datafile_count; i++)
+ {
+ if (sm_p->u.lock.server_lock_info_arr[i].next_abs_offset != -1)
+ {
+ server_index = sm_p->u.lock.server_lock_info_arr[i].index;
+
+ datafile_handle = attr->u.meta.dfile_array[
+ sm_p->u.lock.datafile_index_array[server_index]];
+
+ PINT_SERVREQ_LOCK_FILL(
+ sm_p->msgarray[msg_index].req,
+ *sm_p->cred_p,
+ sm_p->object_ref.fs_id,
+ datafile_handle,
+ sm_p->u.lock.io_type,
+ sm_p->u.lock.lock_server_cur_method,
+ sm_p->u.lock.datafile_index_array[server_index],
+ attr->u.meta.dfile_count,
+ attr->u.meta.dist,
+ PVFS_BYTE,
+ -1,
+ __LONG_LONG_MAX__,
+ -1,
+ (sm_p->u.lock.cur_lock_id_list_p->
+ lock_id_arr)[server_index]);
+
+ sm_p->msgarray[msg_index].fs_id = sm_p->object_ref.fs_id;
+ sm_p->msgarray[msg_index].handle = sm_p->object_ref.handle;
+ sm_p->msgarray[msg_index].retry_flag = PVFS_MSGPAIR_RETRY;
+ sm_p->msgarray[msg_index].comp_fn = lock_completion_fn;
+ sm_p->msgarray[msg_index].datafile_index = server_index;
+
+ ret = PINT_cached_config_map_to_server(
+ &sm_p->msgarray[msg_index].svr_addr, datafile_handle,
+ sm_p->object_ref.fs_id);
+ if(ret < 0)
+ {
+ gossip_err("Failed to map meta server address\n");
+ return ret;
+ }
+
+ msg_index++;
+ }
+ }
+
+ sm_p->msgarray_count = msg_index;
+ assert(sm_p->msgarray_count > 0);
+ gossip_debug(GOSSIP_LOCK_DEBUG, "ACQUIRE_NON_BLOCK: "
+ "(%d of %d)\n",
+ msg_index, sm_p->u.lock.datafile_count);
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_RELEASE_SOME)
+ {
+ int64_t max_offset;
+ int server_index, block_server_index, msg_index = 0;
+
+ lock_heap_min(&sm_p->u.lock.server_heap, &max_offset,
+ &block_server_index);
+ /* Figure out which servers need to release. */
+ for (i = 0; i < sm_p->u.lock.datafile_count; i++)
+ {
+ if ((sm_p->u.lock.server_lock_info_arr[i].last_abs_offset_locked >
+ max_offset) && (i != block_server_index))
+ {
+ server_index = sm_p->u.lock.server_lock_info_arr[i].index;
+
+ datafile_handle = attr->u.meta.dfile_array[
+ sm_p->u.lock.datafile_index_array[server_index]];
+
+ PINT_SERVREQ_LOCK_FILL(
+ sm_p->msgarray[msg_index].req,
+ *sm_p->cred_p,
+ sm_p->object_ref.fs_id,
+ datafile_handle,
+ sm_p->u.lock.io_type,
+ sm_p->u.lock.lock_server_cur_method,
+ sm_p->u.lock.datafile_index_array[server_index],
+ attr->u.meta.dfile_count,
+ attr->u.meta.dist,
+ PVFS_BYTE,
+ -1,
+ max_offset,
+ -1,
+ (sm_p->u.lock.cur_lock_id_list_p->
+ lock_id_arr)[server_index]);
+
+ sm_p->msgarray[msg_index].fs_id = sm_p->object_ref.fs_id;
+ sm_p->msgarray[msg_index].handle = sm_p->object_ref.handle;
+ sm_p->msgarray[msg_index].retry_flag = PVFS_MSGPAIR_RETRY;
+ sm_p->msgarray[msg_index].comp_fn = lock_completion_fn;
+ sm_p->msgarray[msg_index].datafile_index = server_index;
+
+ ret = PINT_cached_config_map_to_server(
+ &sm_p->msgarray[msg_index].svr_addr, datafile_handle,
+ sm_p->object_ref.fs_id);
+ if(ret < 0)
+ {
+ gossip_err("Failed to map meta server address\n");
+ return ret;
+ }
+
+ msg_index++;
+ }
+ }
+
+ sm_p->msgarray_count = msg_index;
+ assert(sm_p->msgarray_count >= 0);
+ if (sm_p->msgarray_count > 0)
+ gossip_debug(GOSSIP_LOCK_DEBUG, "RELEASE_SOME: "
+ "max_offset = %Ld (%d of %d)\n", max_offset,
+ msg_index, sm_p->u.lock.datafile_count);
+ else {
+ gossip_debug(GOSSIP_LOCK_DEBUG, "RELEASE_SOME: No msgpairs this "
+ "round - max_offset = %Ld (%d of %d)\n", max_offset,
+ msg_index, sm_p->u.lock.datafile_count);
+
+ return LOCK_NO_MSGPAIRS;
+ }
+ }
+ else if (sm_p->u.lock.lock_server_cur_method ==
+ PVFS_SERVER_ACQUIRE_BLOCK)
+ {
+ int64_t server_offset, max_offset;
+ int server_index, block_server_index;
+
+ /* Figure out which server needs to get the next blocking lock */
+ lock_heap_extract_min(&sm_p->u.lock.server_heap, &server_offset,
+ &server_index, heap_cpy_fn, heap_swap_fn);
+
+ gossip_debug(GOSSIP_LOCK_DEBUG, "ACQUIRE_BLOCK: extracted "
+ "offset=%Ld,server=%d,lock_id=%Ld\n",
+ server_offset, server_index,
+ (sm_p->u.lock.cur_lock_id_list_p->
+ lock_id_arr)[server_index]);
+
+ /* How far is that server allowed to process the lock? */
+ if (sm_p->u.lock.server_heap.size == 0)
+ max_offset = __LONG_LONG_MAX__;
+ else
+ lock_heap_min(&sm_p->u.lock.server_heap, &max_offset,
+ &block_server_index);
+
+ datafile_handle = attr->u.meta.dfile_array[
+ sm_p->u.lock.datafile_index_array[server_index]];
+
+ PINT_SERVREQ_LOCK_FILL(
+ sm_p->msgarray[0].req,
+ *sm_p->cred_p,
+ sm_p->object_ref.fs_id,
+ datafile_handle,
+ sm_p->u.lock.io_type,
+ sm_p->u.lock.lock_server_cur_method,
+ sm_p->u.lock.datafile_index_array[server_index],
+ attr->u.meta.dfile_count,
+ attr->u.meta.dist,
+ PVFS_BYTE,
+ -1,
+ max_offset,
+ -1,
+ (sm_p->u.lock.cur_lock_id_list_p->lock_id_arr)[server_index]);
+
+ sm_p->msgarray[0].fs_id = sm_p->object_ref.fs_id;
+ sm_p->msgarray[0].handle = sm_p->object_ref.handle;
+ sm_p->msgarray[0].retry_flag = PVFS_MSGPAIR_RETRY;
+ sm_p->msgarray[0].comp_fn = lock_completion_fn;
+ sm_p->msgarray[0].datafile_index = server_index;
+
+ sm_p->msgarray_count = 1;
+
+ ret = PINT_cached_config_map_to_server(
+ &sm_p->msgarray[0].svr_addr, datafile_handle,
+ sm_p->object_ref.fs_id);
+ if(ret < 0)
+ {
+ gossip_err("Failed to map meta server address\n");
+ return ret;
+ }
+ }
+
return 0;
}
-static void lock_contexts_destroy(PINT_client_sm *sm_p)
+/* Heap helper functions */
+
+void heap_cpy_fn(heap_node_t *dest_p,
+ heap_node_t *src_p)
{
- free(sm_p->u.lock.contexts);
- sm_p->u.lock.contexts = NULL;
- sm_p->u.lock.context_count = 0;
+ PINT_server_lock_info *lock_p =
+ (PINT_server_lock_info *) src_p->user_p;
+ *dest_p = *src_p;
+
+ lock_p->heap_node_p = dest_p;
+}
+
+void heap_swap_fn(heap_node_t *dest_p,
+ heap_node_t *src_p)
+{
+ PINT_server_lock_info *lock_p;
+ heap_node_t tmp_node;
+
+ tmp_node = *dest_p;
+ *dest_p = *src_p;
+ *src_p = tmp_node;
+
+ /* Update their external stuctures */
+ lock_p = (PINT_server_lock_info *) dest_p->user_p;
+ lock_p->heap_node_p = dest_p;
+ lock_p = (PINT_server_lock_info *) src_p->user_p;
+ lock_p->heap_node_p = src_p;
+}
+
+void heap_print_fn(heap_node_t *node_p)
+{
+ PINT_server_lock_info *lock_p =
+ (PINT_server_lock_info *) node_p->user_p;
+
+ fprintf(stdout, "(k=%Ld,p=%d) ", node_p->key, lock_p->index);
+ fflush(stdout);
+}
+
+void lock_heap_insert(heap_t *heap_p, int64_t key,
+ PINT_server_lock_info *lock_p ,
+ void (*cpy_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p))
+{
+ heap_node_t *heap_node_p = NULL;
+
+ heap_node_p = heap_insert(heap_p, key, cpy_fn);
+
+ /* Set external links */
+ heap_node_p->user_p = (void *) lock_p;
+ lock_p->heap_node_p = heap_node_p;
+}
+
+void lock_heap_extract_min(heap_t *heap_p, int64_t *key_p,
+ int *proc_p,
+ void (*cpy_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p),
+ void (*swap_fn) (heap_node_t *dest_p,
+ heap_node_t *src_p))
+{
+ PINT_server_lock_info *lock_p = NULL;
+
+ assert(heap_p->size > 0);
+ lock_p = (PINT_server_lock_info *) heap_p->nodes[0].user_p;
+ *proc_p = lock_p->index;
+ heap_extract_min(heap_p, key_p,
+ cpy_fn, swap_fn);
+ lock_p->heap_node_p = NULL; /* Reset so no heap_node is associated */
+}
+
+void lock_heap_min(heap_t *heap_p, int64_t *key_p, int *proc_p)
+{
+ PINT_server_lock_info *lock_p = NULL;
+
+ assert(heap_p->size > 0);
+ lock_p = (PINT_server_lock_info *) heap_p->nodes[0].user_p;
+ *proc_p = lock_p->index;
+ heap_min(heap_p, key_p);
}
/*
More information about the Pvfs2-cvs
mailing list