[Pvfs2-cvs] commit by aching in pvfs2-1/src/common/gen-locks:
lock-storage.c
CVS commit program
cvs at parl.clemson.edu
Thu Apr 5 12:31:36 EDT 2007
Update of /projects/cvsroot/pvfs2-1/src/common/gen-locks
In directory parlweb1:/tmp/cvs-serv6057/src/common/gen-locks
Modified Files:
Tag: version-lock-actual-branch
lock-storage.c
Log Message:
Handle more cases properly including partial locks.
Index: lock-storage.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/common/gen-locks/Attic/lock-storage.c,v
diff -p -u -r1.1.2.4 -r1.1.2.5
--- lock-storage.c 11 Feb 2007 04:05:05 -0000 1.1.2.4
+++ lock-storage.c 5 Apr 2007 16:31:36 -0000 1.1.2.5
@@ -7,11 +7,47 @@
*/
#include "lock-storage.h"
+#include "../server/pvfs2-server.h"
+#include "../io/job/job-desc-queue.h"
/* Processing scratch space */
-#define MAX_BYTES
#define MAX_OL_PAIRS 64
-static PINT_Request_result lock_result;
+
+PINT_Request_result *PINT_Request_result_init(void)
+{
+ PINT_Request_result * tmp_result_p;
+
+ tmp_result_p = malloc(sizeof(PINT_Request_result));
+ if (!tmp_result_p)
+ {
+ gossip_err("malloc PINT_Request_result failed\n");
+ return NULL;
+ }
+
+ tmp_result_p->offset_array = malloc(MAX_OL_PAIRS*sizeof(PVFS_offset));
+ if (!tmp_result_p->offset_array)
+ {
+ free(tmp_result_p);
+ gossip_err("malloc offset_array failed\n");
+ return NULL;
+ }
+
+ tmp_result_p->size_array = malloc(MAX_OL_PAIRS*sizeof(PVFS_size));
+ if (!tmp_result_p->size_array)
+ {
+ free(tmp_result_p);
+ free(tmp_result_p->offset_array);
+ gossip_err("malloc size_array failed\n");
+ return NULL;
+ }
+
+ tmp_result_p->segmax = MAX_OL_PAIRS;
+ tmp_result_p->segs = 0;
+ tmp_result_p->bytemax = __LONG_LONG_MAX__;
+ tmp_result_p->bytes = 0;
+
+ return tmp_result_p;
+}
/* qlist_swap - Swap out one's nodes pointers for another's. There
* are 3 distinct cases. 1) node dest is before node src. 2) node dest
@@ -46,39 +82,6 @@ static void qlist_swap(struct qlist_head
b_p->prev = a_prev_p;
}
-/* qlist_replace - Replace one node with another. Since this node
- * which is replacing the other may be part of a list, it must first
- * be deleted from its list, then replace the other node. Also, in
- * the event that src_p is empty, then dest_p should simply be emptied. */
-static void qlist_replace(struct qlist_head * dest_p,
- struct qlist_head * src_p)
-{
- if (qlist_empty(src_p))
- {
- INIT_QLIST_HEAD(dest_p);
- }
- else
- {
- qlist_del(dest_p);
-
- /* Fix dest_p's links */
- dest_p->next = src_p->next;
- dest_p->prev = src_p->prev;
-
- assert(src_p->next != NULL);
- assert(src_p->prev != NULL);
-
- /* Fix dest_p's neighbors' links */
- src_p->next->prev = dest_p;
- src_p->prev->next = dest_p;
- }
-
- /* Clean up so you can no longer mess up the list by setting
- * src_p's links to point back to itself */
- src_p->next = src_p;
- src_p->prev = src_p;
-}
-
/* Sentinel creation for both itree.h and rbtree.h */
ITREE_NIL(ITREE_NIL);
@@ -138,7 +141,8 @@ int rbtree_lock_req_cpy_fn(rbtree_t *des
{
PINT_Request_state *tmp_file_req_state_p = NULL;
PINT_Request *tmp_file_req_p = NULL;
-
+ PINT_Request_result *tmp_result_p = NULL;
+
lock_req_t *tmp_dest_p =
rbtree_entry(dest_p, lock_req_t, granted_req_link);
lock_req_t *tmp_src_p =
@@ -149,6 +153,7 @@ int rbtree_lock_req_cpy_fn(rbtree_t *des
tmp_file_req_state_p = tmp_dest_p->file_req_state;
tmp_file_req_p = tmp_dest_p->file_req;
+ tmp_result_p = tmp_dest_p->lock_result_p;
qlist_swap(&(tmp_dest_p->lock_head), &(tmp_src_p->lock_head));
tmp_dest_p->lock_req_status = tmp_src_p->lock_req_status;
@@ -158,25 +163,35 @@ int rbtree_lock_req_cpy_fn(rbtree_t *des
&(tmp_src_p->all_req_link));
tmp_dest_p->req_id = tmp_src_p->req_id;
tmp_dest_p->io_type = tmp_src_p->io_type;
-
+ tmp_dest_p->lock_result_p = tmp_src_p->lock_result_p;
tmp_dest_p->actual_locked_bytes = tmp_src_p->actual_locked_bytes;
tmp_dest_p->file_req_state = tmp_src_p->file_req_state;
- tmp_dest_p->target_offset = tmp_src_p->target_offset;
+ tmp_dest_p->seg_num = tmp_src_p->seg_num;
+ tmp_dest_p->seg_bytes_used = tmp_src_p->seg_bytes_used;
+ qlist_swap(&(tmp_dest_p->removed_link),
+ &(tmp_src_p->removed_link));
+ tmp_dest_p->wait_abs_offset = tmp_src_p->wait_abs_offset;
tmp_dest_p->aggregate_size = tmp_src_p->aggregate_size;
- tmp_dest_p->wait_size = tmp_src_p->wait_size;
tmp_dest_p->file_req = tmp_src_p->file_req;
tmp_dest_p->file_req_offset = tmp_src_p->file_req_offset;
- /* lock_callback is not necessary */
+ /* lock_callback is not necessary to swap since anything in the
+ * granted list shouldn't have any callback information set up
+ * with it! */
/* Need to swap the file_req_state and file_req so that it can be
* freed. */
tmp_src_p->file_req_state = tmp_file_req_state_p;
tmp_src_p->file_req = tmp_file_req_p;
+ tmp_src_p->lock_result_p = tmp_result_p;
+
return 0;
}
void free_lock_req(lock_req_t *lock_req_p)
{
+ free(lock_req_p->lock_result_p->offset_array);
+ free(lock_req_p->lock_result_p->size_array);
+ free(lock_req_p->lock_result_p);
PINT_free_request_state(lock_req_p->file_req_state);
PVFS_Request_free(&lock_req_p->file_req);
free(lock_req_p);
@@ -233,33 +248,9 @@ int init_lock_file_table(void)
lock_file_table = NULL;
return -PVFS_ENOMEM;
}
-
- lock_result.segmax = MAX_OL_PAIRS;
- lock_result.bytemax = INT_MAX;
- lock_result.offset_array = (PVFS_offset *)
- calloc(MAX_OL_PAIRS, sizeof(PVFS_offset));
- if (!lock_result.offset_array)
- {
- qhash_finalize(lock_file_table);
- lock_file_table = NULL;
- gen_mutex_destroy(lock_file_table_mutex);
- lock_file_table_mutex = NULL;
- return -PVFS_ENOMEM;
- }
- lock_result.size_array = (PVFS_size *)
- calloc(MAX_OL_PAIRS, sizeof(PVFS_size));
- if (!lock_result.size_array)
- {
- qhash_finalize(lock_file_table);
- lock_file_table = NULL;
- gen_mutex_destroy(lock_file_table_mutex);
- lock_file_table_mutex = NULL;
- free(lock_result.offset_array);
- return -PVFS_ENOMEM;
- }
}
else
- fprintf(stdout, "init_lock_file_table: already exists!\n");
+ gossip_err("init_lock_file_table: already exists!\n");
return (lock_file_table ? 0 : -PVFS_ENOMEM);
}
@@ -274,11 +265,9 @@ void free_lock_file_table(void)
gen_mutex_unlock(lock_file_table_mutex);
gen_mutex_destroy(lock_file_table_mutex);
lock_file_table_mutex = NULL;
- free(lock_result.offset_array);
- free(lock_result.size_array);
}
else
- fprintf(stdout, "free_lock_file_table_all: Already NULL!\n");
+ gossip_err("free_lock_file_table_all: Already NULL!\n");
}
void free_lock_file_table_all(void)
@@ -350,7 +339,9 @@ void print_lock_file_table_all_info(void
if (LOCK_FILE_TABLE_INITIALIZED())
{
+#if 0
gen_mutex_lock(lock_file_table_mutex);
+#endif
fprintf(stdout, "---------------------------------------------\n");
fprintf(stdout, "lock legend {start,end,max,color(red or black)}\n\n");
for (i = 0; i < lock_file_table->table_size; i++)
@@ -402,7 +393,7 @@ void print_lock_file_table_all_info(void
&ITREE_NIL,
linked_itree_print_fn);
/* for now */
-#if 1
+#if 0
fprintf(stdout, "\n read_itree: ");
itree_breadth_print_fn(lock_node_p->read_itree,
&ITREE_NIL,
@@ -415,205 +406,472 @@ void print_lock_file_table_all_info(void
}
fprintf(stdout, "\n");
}
+#if 0
gen_mutex_unlock(lock_file_table_mutex);
+#endif
}
else
- fprintf(stdout, "print_lock_file_table_all: NULL!\n");
+ gossip_err("print_lock_file_table_all: NULL!\n");
}
-/* add_locks - Add as many of the locks that are requested (up to
- * actual bytes). It is assumed that the request already has the
- * file_req_state all setup (i.e. PINT_REQUEST_STATE_SET_TARGET and
+/* add_locks - Add as many of the locks that are requested until the
+ * final_abs_offset is reached. i.e. if final_abs_offset is 5, then
+ * lock bytes 0, 1, 2, 3, and 4, but NOT 5. If final_abs_offset is
+ * __LONG_LONG_MAX__, then try to do the full request. It is assumed
+ * that the request already has the file_req_state all setup
+ * (i.e. PINT_REQUEST_STATE_SET_TARGET and
* PINT_REQUEST_STATE_SET_FINAL) and that the file_req_state will be
- * reset later on. Returns 1 if req_actual_bytes matches the amount of
- * locks added. Returns 0 for locks added when req_actual_bytes is
- * -1. Returns an error if any lock adding failed. */
+ * reset later on. Returns 0 if all locks for the request were
+ * added. Returns 1 if none or some locks were acquired. Returns -1
+ * for an error. */
static inline int add_locks(lock_req_t *lock_req_p,
lock_node_t *lock_node_p,
- PVFS_size req_actual_bytes,
- int chk_rwlock)
+ PVFS_offset final_abs_offset,
+ PVFS_offset *last_abs_offset_locked_p,
+ PVFS_offset *next_abs_offset_p,
+ PVFS_size *bytes_locked_p)
{
- int ret = -1, i;
- PVFS_offset max_offset = -1;
- PVFS_size tmp_locked_bytes = 0;
+ enum PVFS_lock_origin lock_origin;
+ int ret = 0;
+ PVFS_offset tmp_start_offset = - 1,
+ tmp_end_offset = -1, tmp_abs_offset = -1, *offset_arr;
+ PVFS_size *size_arr;
+ PINT_request_file_data *fdata_p = &lock_node_p->fdata;
itree_t *itree_p = &ITREE_NIL;
linked_itree_t *linked_itree_p = NULL;
+ lock_t *tmp_removed_lock_p;
- do {
- lock_result.segs = 0;
- lock_result.bytes = 0;
- /* AC - I think it's okay to set this to an obscenely high
- * value, but it could cause problems. =) */
- lock_result.bytemax = INT_MAX;
+ /* Initialize return values */
+ *last_abs_offset_locked_p = -1;
+ *next_abs_offset_p = -1;
+ *bytes_locked_p = 0;
+
+ lock_req_p->wait_abs_offset = final_abs_offset;
+ offset_arr = lock_req_p->lock_result_p->offset_array;
+ size_arr = lock_req_p->lock_result_p->size_array;
+
+ gossip_debug(GOSSIP_LOCK_DEBUG, "add_locks: actual_locked_bytes = %Ld\n",
+ lock_req_p->actual_locked_bytes);
+
+ while (ret == 0)
+ {
+ /* Check for whether any locks need to be added from the
+ * removed list */
+ if (!qlist_empty(&(lock_req_p->removed_link)))
+ {
+ tmp_removed_lock_p =
+ qlist_entry(lock_req_p->removed_link.prev, lock_t,
+ lock_link);
+ tmp_start_offset = tmp_removed_lock_p->start;
+ tmp_end_offset = tmp_removed_lock_p->end;
+ lock_origin = REMOVED_LIST;
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: Getting (start offset=%Ld,end_offset=%Ld)"
+ " from removed list\n", tmp_start_offset,
+ tmp_end_offset);
+ }
+ else /* Process request if necessary then get the next piece */
+ {
+ if ((lock_req_p->seg_num ==
+ (lock_req_p->lock_result_p->segs - 1)) &&
+ (lock_req_p->seg_bytes_used == size_arr[lock_req_p->seg_num]))
+ {
+ /* We're all done! */
+ if (PINT_REQUEST_DONE(lock_req_p->file_req_state))
+ break;
+ lock_req_p->lock_result_p->segs = 0;
+ lock_req_p->lock_result_p->bytes = 0;
- ret = PINT_process_request(lock_req_p->file_req_state,
- NULL,
- &lock_node_p->fdata,
- &lock_result,
- PINT_SERVER);
- if (ret < 0)
+ ret = PINT_process_request(lock_req_p->file_req_state,
+ NULL,
+ &lock_node_p->fdata,
+ lock_req_p->lock_result_p,
+ PINT_SERVER);
+ if (ret < 0)
+ {
+ gossip_err("add_locks: Failed to process file request\n");
+ ret = -PVFS_EINVAL;
+ break;
+ }
+ lock_req_p->seg_num = 0;
+ lock_req_p->seg_bytes_used = 0;
+ }
+
+ tmp_start_offset =
+ offset_arr[lock_req_p->seg_num] + lock_req_p->seg_bytes_used;
+ tmp_end_offset = tmp_start_offset + size_arr[lock_req_p->seg_num] -
+ lock_req_p->seg_bytes_used - 1;
+ lock_origin = LOCK_REQUEST_SEG;
+ }
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: Lock (local off=%Ld,size=%Ld) "
+ "max_abs_off=%Ld...\n",
+ tmp_start_offset,
+ tmp_end_offset,
+ final_abs_offset);
+
+ /* Ensure that the lock requests are valid */
+#if 1
+ if ((tmp_start_offset < 0) || (tmp_end_offset < 0))
{
- gossip_err("add_locks: Failed to process file request\n");
- return -PVFS_EINVAL;
+ gossip_err("add_locks: Lock offset=%Ld with end offset=%Ld "
+ "invalid\n", tmp_start_offset, tmp_end_offset);
+ ret = -PVFS_EINVAL;
+ break;
}
-
- for (i = 0; i < lock_result.segs; i++)
+#endif
+ /* Convert the PINT_process_request output to the absolute
+ * logical offset */
+ tmp_abs_offset =
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ tmp_start_offset);
+
+ /* Don't go past the final_abs_offset */
+ if (tmp_abs_offset >= final_abs_offset)
{
- /* Ensure that the lock requests do not go beyond what the
- * request specifies. */
- if (lock_result.size_array[i] <= 0)
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: Lock not added since "
+ "(start=%Ld,end=%Ld) >= final_abs_offset=%Ld\n",
+ tmp_abs_offset,
+ tmp_end_offset - tmp_start_offset + 1,
+ final_abs_offset);
+
+ *next_abs_offset_p = tmp_abs_offset;
+ ret = 1;
+ break;
+ }
+
+ /* Check whether this lock should be trimmed */
+ if (final_abs_offset <
+ (tmp_abs_offset + tmp_end_offset - tmp_start_offset + 1))
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: Trimming lock from (start=%Ld,end=%Ld) to"
+ " (start=%Ld,end=%Ld)\n",
+ tmp_abs_offset,
+ tmp_end_offset,
+ tmp_abs_offset,
+ tmp_start_offset + (final_abs_offset -
+ tmp_abs_offset - 1));
+ tmp_end_offset =
+ tmp_start_offset + (final_abs_offset - tmp_abs_offset - 1);
+ }
+
+ itree_p = itree_interval_search(
+ lock_node_p->write_itree, &ITREE_NIL,
+ tmp_start_offset,
+ tmp_end_offset);
+ if (itree_p != &ITREE_NIL)
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "itree_interval_search WRITE (int=%Ld,%Ld,rw=%s)"
+ " failed\n", tmp_start_offset, tmp_end_offset,
+ ((lock_req_p->io_type ==
+ PVFS_IO_READ) ? "r" : "w"));
+ /* Still may be able to add part of this lock depending on
+ * the overlap */
+ if (itree_p->start > tmp_start_offset)
{
- gossip_err("add_locks: Lock offset %Ld with size %Ld "
- "invalid\n", lock_result.offset_array[i],
- lock_result.size_array[i]);
- continue;
+ if ((itree_p->left == &ITREE_NIL) ||
+ (itree_p->left->max < tmp_start_offset))
+ {
+ tmp_end_offset = itree_p->start - 1;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: WRITE reset interval to "
+ "(%Ld,%Ld)\n", tmp_start_offset,
+ tmp_end_offset);
+ }
+ else
+ {
+ *next_abs_offset_p = tmp_abs_offset;
+ ret = 1;
+ break;
+
+ }
}
else
{
- if ((req_actual_bytes == -1) ||
- (req_actual_bytes >=
- lock_result.size_array[i] + tmp_locked_bytes))
- max_offset = lock_result.offset_array[i] +
- lock_result.size_array[i] - 1;
- else
- max_offset = lock_result.offset_array[i] +
- req_actual_bytes - tmp_locked_bytes - 1;
+ *next_abs_offset_p = tmp_abs_offset;
+ ret = 1;
+ break;
}
-
+ }
+
+ /* Writes have to check both the write and read interval
+ * trees. Reads have to check both if a write in the * queue
+ * was already found. Reads need to see whether any * writes
+ * are ahead of them before just going ahead (if * you want
+ * some kind of guarantee to remove * starvation). */
+ if (lock_req_p->io_type == PVFS_IO_WRITE)
+ {
itree_p = itree_interval_search(
- lock_node_p->write_itree, &ITREE_NIL,
- lock_result.offset_array[i],
- max_offset);
+ lock_node_p->read_itree, &ITREE_NIL,
+ tmp_start_offset,
+ tmp_end_offset);
if (itree_p != &ITREE_NIL)
- return -PVFS_EINVAL;
-
- /* Writes have to check both the write and read interval
- * trees. Reads have to check both if a write in the
- * queue was already found. Reads need to see whether any
- * writes are ahead of them before just going ahead (if
- * you want some kind of guarantee to remove
- * starvation). */
- if (lock_req_p->io_type == PVFS_IO_WRITE || chk_rwlock)
{
- itree_p = itree_interval_search(
- lock_node_p->read_itree, &ITREE_NIL,
- lock_result.offset_array[i],
- max_offset);
- if (itree_p != &ITREE_NIL)
- return -PVFS_EINVAL;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "itree_interval_search READ (int=%Ld,%Ld,rw=%s)"
+ " failed\n", tmp_start_offset, tmp_end_offset,
+ ((lock_req_p->io_type ==
+ PVFS_IO_READ) ? "r" : "w"));
+ /* Still may be able to add part of this lock depending on
+ * the overlap */
+ if (itree_p->start > tmp_start_offset)
+ {
+ if ((itree_p->left == &ITREE_NIL) ||
+ (itree_p->left->max < tmp_start_offset))
+ {
+ tmp_end_offset = itree_p->start - 1;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "add_locks: READ reset interval to "
+ "(%Ld,%Ld)\n", tmp_start_offset,
+ tmp_end_offset);
+ }
+ else
+ {
+ *next_abs_offset_p = tmp_abs_offset;
+ ret = 1;
+ break;
+
+ }
+ }
+ else
+ {
+ *next_abs_offset_p = tmp_abs_offset;
+ ret = 1;
+ break;
+ }
}
+ }
+
+ /* Add the lock to the write/read interval tree */
+ if ((linked_itree_p = (linked_itree_t *)
+ calloc(1, sizeof(linked_itree_t))) == NULL)
+ {
+ gossip_err("add_locks: calloc linked_itree_p failed\n");
+ ret = -PVFS_ENOMEM;
+ break;
+ }
+
+ linked_itree_p->lock_id = lock_req_p->req_id;
+ linked_itree_p->itree_link.start = tmp_start_offset;
+ linked_itree_p->itree_link.end = tmp_end_offset;
+ if (linked_itree_p->itree_link.start >
+ linked_itree_p->itree_link.end)
+ {
+ gossip_err("Invalid lock (start=%Ld,end=%Ld)\n",
+ linked_itree_p->itree_link.start,
+ linked_itree_p->itree_link.end);
+ break;
+ }
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Inserting (int=%Ld,%Ld,rw=%s)\n",
+ linked_itree_p->itree_link.start,
+ linked_itree_p->itree_link.end,
+ ((lock_req_p->io_type ==
+ PVFS_IO_READ) ? "r" : "w"));
- /* Add the lock to the write/read interval tree */
- if ((linked_itree_p = (linked_itree_t *)
- calloc(1, sizeof(linked_itree_t))) == NULL)
+ if (lock_req_p->io_type == PVFS_IO_WRITE)
+ ret = itree_insert(&(lock_node_p->write_itree), &ITREE_NIL,
+ &(linked_itree_p->itree_link));
+ else
+ ret = itree_insert(&(lock_node_p->read_itree), &ITREE_NIL,
+ &(linked_itree_p->itree_link));
+ if (ret != 0)
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "itree_insert of lock (int=%Ld,%Ld,rw=%s) "
+ "failed\n", linked_itree_p->itree_link.start,
+ linked_itree_p->itree_link.end,
+ ((lock_req_p->io_type ==
+ PVFS_IO_READ) ? "r" : "w"));
+ *next_abs_offset_p = linked_itree_p->itree_link.start;
+ break;
+ }
+
+ /* Remove the lock from the lock origin */
+ if (lock_origin == REMOVED_LIST)
+ {
+ if (tmp_removed_lock_p->end == tmp_end_offset)
{
- gossip_err("add_locks: calloc linked_itree_p failed\n");
- return -PVFS_ENOMEM;
+ qlist_del(&(tmp_removed_lock_p->lock_link));
+ free(tmp_removed_lock_p);
}
-
- linked_itree_p->itree_link.start = lock_result.offset_array[i];
- linked_itree_p->itree_link.end =
- max_offset;
- if (linked_itree_p->itree_link.start >
- linked_itree_p->itree_link.end)
+ else
{
- fprintf(stdout, "Invalid lock (start=%Ld,end=%Ld)\n",
- linked_itree_p->itree_link.start,
- linked_itree_p->itree_link.end);
- return -EINVAL;
+ tmp_removed_lock_p->start = tmp_end_offset + 1;
}
-
- if (lock_req_p->io_type == PVFS_IO_WRITE)
- ret = itree_insert(&(lock_node_p->write_itree), &ITREE_NIL,
- &(linked_itree_p->itree_link));
- else
- ret = itree_insert(&(lock_node_p->read_itree), &ITREE_NIL,
- &(linked_itree_p->itree_link));
- if (ret != 0)
+ }
+ else if (lock_origin == LOCK_REQUEST_SEG)
+ {
+ lock_req_p->seg_bytes_used +=
+ tmp_end_offset - tmp_start_offset + 1;
+ assert(lock_req_p->seg_bytes_used <=
+ size_arr[lock_req_p->seg_num]);
+ if ((lock_req_p->seg_bytes_used ==
+ size_arr[lock_req_p->seg_num]) &&
+ (lock_req_p->seg_num !=
+ (lock_req_p->lock_result_p->segs - 1)))
{
- gossip_debug(GOSSIP_LOCK_DEBUG,
- "itree_insert of lock (int=%Ld,%Ld,rw=%s)"
- "failed\n", linked_itree_p->itree_link.start,
- linked_itree_p->itree_link.end,
- ((lock_req_p->io_type ==
- PVFS_IO_READ) ? "r" : "w"));
- return -EINVAL;
+ lock_req_p->seg_num++;
+ lock_req_p->seg_bytes_used = 0;
}
-
- /* Add the lock to the tmp_lock_head (later properly
- * linked to the lock request)*/
- qlist_add_tail(&(linked_itree_p->list_link),
- &(lock_req_p->lock_head));
-
- lock_req_p->actual_locked_bytes += lock_result.size_array[i];
- tmp_locked_bytes += lock_result.size_array[i];
-
- /* All waiting locks were granted! */
- if (tmp_locked_bytes == req_actual_bytes)
- return 1;
}
- } while ((ret == 0) && (!PINT_REQUEST_DONE(lock_req_p->file_req_state)));
+ else
+ {
+ gossip_err("add_locks: Invalid lock_origin\n");
+ ret = -PVFS_EINVAL;
+ }
- return 0;
+ /* Add the lock to the lock_req */
+ qlist_add_tail(&(linked_itree_p->list_link),
+ &(lock_req_p->lock_head));
+
+ *bytes_locked_p += linked_itree_p->itree_link.end -
+ linked_itree_p->itree_link.start + 1;
+ }
+
+ lock_req_p->actual_locked_bytes += *bytes_locked_p;
+
+ /* Check that locks were added */
+ if (lock_req_p->lock_head.prev != &lock_req_p->lock_head)
+ {
+ linked_itree_t *tmp_linked_itree_p =
+ qlist_entry(lock_req_p->lock_head.prev, linked_itree_t,
+ list_link);
+ *last_abs_offset_locked_p =
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ tmp_linked_itree_p->itree_link.end);
+ }
+
+ gossip_debug(GOSSIP_LOCK_DEBUG, "add_locks: granted %Ld actual bytes "
+ "(total %Ld) of %Ld requested bytes and reached "
+ "logical offset %Ld, last_abs_offset %Ld, "
+ "next_abs_offset %Ld ret=%d\n",
+ *bytes_locked_p,
+ lock_req_p->actual_locked_bytes, lock_req_p->aggregate_size,
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ tmp_end_offset),
+ *last_abs_offset_locked_p,
+ *next_abs_offset_p, ret);
+
+ return ret;
}
/* check_lock_reqs - Basically goes through the queued_reqs and tries
* to add the rest of the locks that they are waiting for. It
* processes the reqs in the order in which they were received. */
-
int check_lock_reqs(lock_node_t *lock_node_p)
{
int ret = -1, chk_rwlock = 0;
- struct qlist_head *pos = NULL;
+ struct qlist_head *pos = NULL, *scratch;
lock_req_t *tmp_lock_req_p = NULL;
+ PVFS_size tmp_bytes_locked = 0;
+ struct PVFS_servresp_lock *resp_lock_p;
- qlist_for_each(pos, &lock_node_p->queued_req)
+ qlist_for_each_safe(pos, scratch, &lock_node_p->queued_req)
{
tmp_lock_req_p = qlist_entry(pos, lock_req_t, queued_req_link);
- assert(tmp_lock_req_p->wait_size >= 0);
+ assert(tmp_lock_req_p->wait_abs_offset >= -1);
/* Add as many blocking locks as we can, then use callback
* function if its done */
- if (tmp_lock_req_p->wait_size != 0)
+ if (tmp_lock_req_p->wait_abs_offset >= 0)
{
+ assert(((struct job_desc *)
+ tmp_lock_req_p->lock_callback.data)->job_user_ptr != NULL);
+ resp_lock_p =
+ &((PINT_server_op *)
+ ((struct job_desc *) tmp_lock_req_p->lock_callback.data)
+ ->job_user_ptr)->resp.u.lock;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "check_lock_reqs: Trying lock id=%Ld...\n",
+ tmp_lock_req_p->req_id);
+
ret = add_locks(tmp_lock_req_p, lock_node_p,
- tmp_lock_req_p->wait_size, chk_rwlock);
- if (ret == 1)
+ tmp_lock_req_p->wait_abs_offset,
+ &resp_lock_p->last_abs_offset_locked,
+ &resp_lock_p->next_abs_offset,
+ &tmp_bytes_locked);
+
+ /* Keep track of how many bytes have been accessed
+ * in possibly multiple add_lock calls */
+ resp_lock_p->bytes_accessed += tmp_bytes_locked;
+
+ /* There are two cases to return: (1) The next lock is
+ * beyond the wait_abs_offset. Return incomplete. (2) All
+ * locks are granted. Return complete. */
+
+ /* At this point, the request must be INCOMPLETE */
+ assert(tmp_lock_req_p->lock_req_status == INCOMPLETE);
+ if (ret == 0)
+ {
+ resp_lock_p->request_finished = 1;
+
+ /* If this request is now done, send it to the granted list. */
+ if (resp_lock_p->next_abs_offset == -1)
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "check_lock_req: Deleting req_id="
+ "%Ld from the queued list\n",
+ tmp_lock_req_p->req_id);
+ qlist_del_init(&tmp_lock_req_p->queued_req_link);
+ tmp_lock_req_p->lock_req_status = ALL_LOCKS_GRANTED;
+ tmp_lock_req_p->granted_req_link.key =
+ tmp_lock_req_p->req_id;
+ rbtree_insert(&(lock_node_p->granted_req), &RBTREE_NIL,
+ &(tmp_lock_req_p->granted_req_link));
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "check_lock_req: all %Ld aggregate "
+ "bytes granted to req_id=%Ld "
+ "(req_id=%Ld added to granted list)\n",
+ tmp_lock_req_p->aggregate_size,
+ tmp_lock_req_p->req_id,
+ tmp_lock_req_p->req_id);
+ }
tmp_lock_req_p->lock_callback.fn(
tmp_lock_req_p->lock_callback.data);
- else
+ }
+ else if (ret == 1)
{
- linked_itree_t *last_lock_p = NULL;
- PINT_request_file_data *fdata_p =
- &lock_node_p->fdata;
- PVFS_offset last_lock_logical_offset = -1;
-
- PINT_REQUEST_STATE_RESET(tmp_lock_req_p->file_req_state);
- if (tmp_lock_req_p->actual_locked_bytes > 0)
+ resp_lock_p->request_finished = 0;
+ /* This request has gone as far as it can, while not
+ * being completed. Otherwise, it will wait in the
+ * queue. */
+ if (resp_lock_p->next_abs_offset >
+ tmp_lock_req_p->wait_abs_offset)
{
- last_lock_p = qlist_entry(
- &tmp_lock_req_p->lock_head.prev, linked_itree_t,
- list_link);
- last_lock_logical_offset =
- (*fdata_p->dist->methods->physical_to_logical_offset)
- (fdata_p->dist->params,
- fdata_p,
- last_lock_p->itree_link.end + 1);
-
- PINT_REQUEST_STATE_SET_TARGET(
- tmp_lock_req_p->file_req_state,
- last_lock_logical_offset);
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "check_lock_req: Returning req %Ld since "
+ "next_abs_offset %Ld is beyond the "
+ "wait_abs_offset %Ld (resetting "
+ "wait_abs_offset)\n",
+ tmp_lock_req_p->req_id,
+ resp_lock_p->next_abs_offset,
+ tmp_lock_req_p->wait_abs_offset);
+
+ tmp_lock_req_p->wait_abs_offset = -1;
+ tmp_lock_req_p->lock_callback.fn(
+ tmp_lock_req_p->lock_callback.data);
}
- else
- PINT_REQUEST_STATE_SET_TARGET(
- tmp_lock_req_p->file_req_state, 0);
-
- PINT_REQUEST_STATE_SET_FINAL(tmp_lock_req_p->file_req_state,
- tmp_lock_req_p->file_req_offset +
- tmp_lock_req_p->aggregate_size);
}
+
}
+
+ /* Todo: Implement some way to handle reads and writes so that
+ * writes don't get starved. */
if (tmp_lock_req_p->io_type == PVFS_IO_WRITE)
chk_rwlock = 1;
}
@@ -630,25 +888,24 @@ int check_lock_reqs(lock_node_t *lock_no
* error, 0 for pending, and 1 for all locks acquired */
int add_lock_req(PVFS_object_ref *object_ref_p,
enum PVFS_io_type io_type,
+ enum PVFS_server_lock_type lock_type,
+ PVFS_id_gen_t client_lock_req_id,
+ PINT_request_file_data *fdata_p,
PINT_Request *file_req,
PVFS_offset file_req_offset,
- PINT_request_file_data *fdata_p,
- PVFS_size nb_bytes,
- PVFS_size bb_bytes,
+ PVFS_offset final_offset,
PVFS_size aggregate_size,
- PVFS_id_gen_t *req_id,
- PVFS_size *granted_bytes_p,
+ struct PVFS_servresp_lock *resp_lock_p,
lock_req_t **lock_req_p_p)
{
- int i, ret = 0;
+ int ret = 0;
struct qhash_head *hash_link_p = NULL;
lock_node_t *lock_node_p = NULL;
lock_req_t *lock_req_p = NULL;
- linked_itree_t *linked_itree_p = NULL;
- itree_t *itree_p = &ITREE_NIL;
- PVFS_size nb_bytes_granted = 0;
PINT_Request_state *file_req_state;
- PVFS_offset final_physical_off = 0;
+
+ assert((lock_type != PVFS_SERVER_RELEASE_SOME) &&
+ (lock_type != PVFS_SERVER_RELEASE_ALL));
QLIST_HEAD(tmp_lock_head);
@@ -683,224 +940,183 @@ int add_lock_req(PVFS_object_ref *object
/* Copy fdata over */
memcpy(&lock_node_p->fdata, fdata_p, sizeof(PINT_request_file_data));
lock_node_p->fdata.dist = PINT_dist_copy(fdata_p->dist);
-
+ lock_node_p->fdata.fsize = __LONG_LONG_MAX__;
+ lock_node_p->fdata.extend_flag = 1;
+
qhash_add(lock_file_table, &(lock_node_p->refn),
&(lock_node_p->hash_link));
}
else
lock_node_p = qlist_entry(hash_link_p, lock_node_t, hash_link);
- /* Add all the locks to the correct lock tree - nb style. First,
- * setup the PVFS state. */
- file_req_state = PINT_new_request_state(file_req);
- if (!file_req_state)
- {
- ret = -PVFS_EINVAL;
- gossip_err("PINT_new_request_state of file_req_state failed\n");
- goto add_unlock_exit;
- }
- PINT_REQUEST_STATE_SET_TARGET(file_req_state,
- file_req_offset);
- PINT_REQUEST_STATE_SET_FINAL(file_req_state,
- file_req_offset + aggregate_size);
-
- do {
- lock_result.segs = 0;
- lock_result.bytes = 0;
- lock_result.bytemax = aggregate_size - nb_bytes_granted;
-
- ret = PINT_process_request(file_req_state,
- NULL,
- fdata_p,
- &lock_result,
- PINT_SERVER);
- if (ret < 0)
+ /* If this is a new request, it should be set up properly */
+ if ((lock_type == PVFS_SERVER_ACQUIRE_NEW_NONBLOCK) ||
+ (lock_type == PVFS_SERVER_ACQUIRE_NEW_BLOCK))
+ {
+ file_req_state = PINT_new_request_state(file_req);
+ if (!file_req_state)
{
- gossip_err("add_lock_req: Failed to process file request\n");
+ ret = -PVFS_EINVAL;
+ gossip_err("PINT_new_request_state of file_req_state failed\n");
goto add_unlock_exit;
}
+ PINT_REQUEST_STATE_SET_TARGET(file_req_state,
+ file_req_offset);
+ PINT_REQUEST_STATE_SET_FINAL(file_req_state,
+ file_req_offset + aggregate_size);
- for (i = 0; i < lock_result.segs; i++)
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "file_req_offset = %Ld aggregate_size = %Ld\n",
+ file_req_offset, aggregate_size);
+
+ /* Set up new lock request */
+ lock_req_p = (lock_req_t *) calloc(1, sizeof(lock_req_t));
+ INIT_QLIST_HEAD(&lock_req_p->lock_head);
+ lock_req_p->lock_req_status = NEW;
+ INIT_QLIST_HEAD(&lock_req_p->queued_req_link);
+ INIT_QLIST_HEAD(&lock_req_p->all_req_link);
+ lock_req_p->req_id = lock_req_id++;
+ lock_req_p->io_type = io_type;
+ lock_req_p->actual_locked_bytes = 0;
+ lock_req_p->lock_result_p = PINT_Request_result_init();
+ lock_req_p->file_req_state = file_req_state;
+ lock_req_p->seg_num = 0;
+ lock_req_p->seg_bytes_used = 0;
+ INIT_QLIST_HEAD(&lock_req_p->removed_link);
+ lock_req_p->aggregate_size = aggregate_size;
+ lock_req_p->file_req = file_req;
+ lock_req_p->file_req_offset = file_req_offset;
+
+ /* Do some initial request processing */
+ ret = PINT_process_request(lock_req_p->file_req_state,
+ NULL,
+ &lock_node_p->fdata,
+ lock_req_p->lock_result_p,
+ PINT_SERVER);
+
+ qlist_add_tail(&(lock_req_p->all_req_link), &(lock_node_p->all_req));
+ }
+ else /* Find the lock req - look in the queued list (this could be
+ * optimized by placing it in a hash table */
+ {
+ struct qlist_head *tmp_qlist_head = NULL;
+ qlist_for_each(tmp_qlist_head, &lock_node_p->queued_req)
{
- itree_p = itree_interval_search(
- lock_node_p->write_itree, &ITREE_NIL,
- lock_result.offset_array[i],
- lock_result.offset_array[i] + lock_result.size_array[i]);
- if (itree_p != &ITREE_NIL)
- {
- ret = -PVFS_EINVAL;
+ lock_req_p = qlist_entry(tmp_qlist_head,
+ lock_req_t, queued_req_link);
+ if (lock_req_p->req_id == client_lock_req_id)
break;
- }
-
- /* Writes have to check both the write and read interval
- * trees */
- if (io_type == PVFS_IO_WRITE)
- {
- itree_p = itree_interval_search(
- lock_node_p->read_itree, &ITREE_NIL,
- lock_result.offset_array[i],
- lock_result.offset_array[i] + lock_result.size_array[i]);
- if (itree_p != &ITREE_NIL)
- {
- ret = -PVFS_EINVAL;
- break;
- }
- }
- else
- {
- /* Reads need to see whether any writes are ahead of
- * them before just going ahead (if you want some kind
- * of guarantee to remove starvation). */
- }
-
- /* Add the lock to the write/read interval tree */
- if ((linked_itree_p = (linked_itree_t *)
- calloc(1, sizeof(linked_itree_t))) == NULL)
- {
- ret = -PVFS_ENOMEM;
- goto add_unlock_exit;
- }
-
- linked_itree_p->itree_link.start = lock_result.offset_array[i];
- linked_itree_p->itree_link.end =
- lock_result.offset_array[i] +
- lock_result.size_array[i] - 1;
- if (linked_itree_p->itree_link.start >
- linked_itree_p->itree_link.end)
- {
- fprintf(stdout, "Invalid lock (start=%Ld,end=%Ld)\n",
- linked_itree_p->itree_link.start,
- linked_itree_p->itree_link.end);
- ret = -EINVAL;
- goto add_unlock_exit;
- }
-
- if (io_type == PVFS_IO_WRITE)
- ret = itree_insert(&(lock_node_p->write_itree), &ITREE_NIL,
- &(linked_itree_p->itree_link));
- else
- ret = itree_insert(&(lock_node_p->read_itree), &ITREE_NIL,
- &(linked_itree_p->itree_link));
- if (ret != 0)
- {
- fprintf(stdout, "itree_insert of lock (int=%Ld,%Ld,rw=%s) "
- "failed\n", linked_itree_p->itree_link.start,
- linked_itree_p->itree_link.end,
- ((io_type == PVFS_IO_READ) ? "r" : "w"));
- ret = -EINVAL;
- goto add_unlock_exit;
- }
-
- /* Add the lock to the tmp_lock_head (later properly
- * linked to the lock request)*/
- qlist_add_tail(&(linked_itree_p->list_link), &tmp_lock_head);
- final_physical_off =
- lock_result.offset_array[i] + lock_result.size_array[i];
+ }
- nb_bytes_granted += lock_result.size_array[i];
+ if (lock_req_p == NULL ||
+ lock_req_p->req_id != client_lock_req_id)
+ {
+ gossip_err("add_lock_req: Request with id=%Ld should have been "
+ "found in the queued list!\n",
+ client_lock_req_id);
+ goto add_unlock_exit;
}
- } while ((ret == 0) && (!PINT_REQUEST_DONE(file_req_state)));
-
- /* Add the req to the all_req queue and the secondary appropriate
- * queue (queued or granted) */
- *granted_bytes_p = nb_bytes_granted;
- gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: granted %Ld actual bytes "
- "of %Ld requested bytes and reached logical offset %Ld\n",
- *granted_bytes_p, aggregate_size,
- (*fdata_p->dist->methods->physical_to_logical_offset)
- (fdata_p->dist->params,
- fdata_p,
- final_physical_off));
- if (nb_bytes_granted > nb_bytes)
- {
- gossip_err("Error - nb_bytes_granted (%Ld) > nb_bytes (%Ld)\n",
- nb_bytes_granted, nb_bytes);
- ret = -1;
- goto add_unlock_exit;
}
- /* Set up new lock request */
- lock_req_p = (lock_req_t *) calloc(1, sizeof(lock_req_t));
- INIT_QLIST_HEAD(&lock_req_p->lock_head);
- lock_req_p->lock_req_status = INCOMPLETE;
-#if 1
- /*RBTREE_HEAD_PTR_INIT(lock_req_p->granted_req_link, RBTREE_NIL);*/
- INIT_QLIST_HEAD(&lock_req_p->queued_req_link);
- INIT_QLIST_HEAD(&lock_req_p->all_req_link);
-#endif
- lock_req_p->req_id = lock_req_id++;
- *req_id = lock_req_p->req_id;
- lock_req_p->io_type = io_type;
-
- lock_req_p->actual_locked_bytes = nb_bytes_granted;
- lock_req_p->file_req_state = file_req_state;
- lock_req_p->target_offset = file_req_offset;
- lock_req_p->aggregate_size = aggregate_size;
- lock_req_p->file_req = file_req;
- lock_req_p->file_req_offset = file_req_offset;
-
- /* Debugging code for setting lock information */
+ /* Add the locks */
+ ret = add_locks(lock_req_p,
+ lock_node_p,
+ final_offset,
+ &resp_lock_p->last_abs_offset_locked,
+ &resp_lock_p->next_abs_offset,
+ &resp_lock_p->bytes_accessed);
+
+ /* Set up the rest of the response for the client. Not necessary
+ * in check_lock_req since it's set here. */
+ resp_lock_p->lock_id = lock_req_p->req_id;
+
+ /* Add the lock_req to either the granted tree or the queued
+ * linked list */
+ if (ret == 0)
+ {
+ if (lock_req_p->lock_req_status == INCOMPLETE)
{
- struct qlist_head *pos = NULL;
- qhash_for_each(pos, &(tmp_lock_head))
- {
- linked_itree_p = qlist_entry(pos, linked_itree_t, list_link);
- linked_itree_p->lock_id = lock_req_p->req_id;
- }
+ gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: Deleting req_id="
+ "%Ld from the queued list\n", lock_req_p->req_id);
+ qlist_del_init(&lock_req_p->queued_req_link);
}
- qlist_add_tail(&(lock_req_p->all_req_link), &(lock_node_p->all_req));
- qlist_replace(&(lock_req_p->lock_head), &tmp_lock_head);
- if (ret == 0 && (PINT_REQUEST_DONE(file_req_state)))
- {
+ lock_req_p->lock_req_status = ALL_LOCKS_GRANTED;
lock_req_p->granted_req_link.key = lock_req_p->req_id;
rbtree_insert(&(lock_node_p->granted_req), &RBTREE_NIL,
&(lock_req_p->granted_req_link));
- lock_req_p->lock_req_status = ALL_LOCKS_GRANTED;
+
gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: all %Ld aggregate "
- "bytes granted to req_id=%Ld\n",
+ "bytes granted to req_id=%Ld "
+ "(req_id=%Ld added to granted list)\n",
lock_req_p->aggregate_size,
- lock_req_p->req_id);
+ lock_req_p->req_id, lock_req_p->req_id);
+ *lock_req_p_p = NULL;
ret = 1;
+ resp_lock_p->request_finished = 1;
}
- else
+ else if (ret == 1)
{
- /* Reset the state to where we last left off */
- PVFS_offset final_logical_off =
- (*fdata_p->dist->methods->physical_to_logical_offset)
- (fdata_p->dist->params,
- fdata_p,
- final_physical_off);
- PINT_REQUEST_STATE_RESET(lock_req_p->file_req_state);
- PINT_REQUEST_STATE_SET_TARGET(lock_req_p->file_req_state,
- final_logical_off);
-
- qlist_add_tail(&(lock_req_p->queued_req_link),
- &(lock_node_p->queued_req));
+ if (lock_req_p->lock_req_status == NEW)
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: Adding req_id="
+ "%Ld to the queued list\n", lock_req_p->req_id);
+
+ qlist_add_tail(&(lock_req_p->queued_req_link),
+ &(lock_node_p->queued_req));
+ }
+
lock_req_p->lock_req_status = INCOMPLETE;
- gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: (blocking) %Ld of %Ld "
- "aggregate bytes granted to req_id=%Ld\n",
+
+ gossip_debug(GOSSIP_LOCK_DEBUG, "add_lock_req: some %Ld of %Ld "
+ "aggregate bytes granted to req_id=%Ld (waiting "
+ "til %Ld)\n",
lock_req_p->actual_locked_bytes,
lock_req_p->aggregate_size,
- lock_req_p->req_id);
+ lock_req_p->req_id, lock_req_p->wait_abs_offset);
*lock_req_p_p = lock_req_p;
- ret = 0;
+
+ if ((lock_type == PVFS_SERVER_ACQUIRE_NEW_BLOCK) ||
+ (lock_type == PVFS_SERVER_ACQUIRE_NEW_NONBLOCK) ||
+ (lock_type == PVFS_SERVER_ACQUIRE_NONBLOCK) ||
+ (resp_lock_p->next_abs_offset >= final_offset))
+ {
+ lock_req_p->wait_abs_offset = -1;
+ ret = 1;
+ gossip_debug(GOSSIP_LOCK_DEBUG, "Returning immediately (wait=%Ld)"
+ " since next offset=%Ld is beyond final_offset=%Ld "
+ "or request is nonblocking\n",
+ lock_req_p->wait_abs_offset,
+ resp_lock_p->next_abs_offset, final_offset);
+ }
+ else
+ {
+ lock_req_p->wait_abs_offset = final_offset;
+ ret = 0;
+ gossip_debug(GOSSIP_LOCK_DEBUG, "Waiting until %Ld.\n",
+ lock_req_p->wait_abs_offset);
+ }
+ resp_lock_p->request_finished = 0;
+ }
+ else
+ {
+ gossip_err("add_lock_req: add_locks() returned an error!\n");
}
add_unlock_exit:
gen_mutex_unlock(lock_file_table_mutex);
- *lock_req_p_p = NULL;
return ret;
}
-/* revise_lock_req - Remove a certain amount of nb_bytes from the lock
- * req. If nb_bytes == -1, remove the entire lock request. Return -1
- * for en. If all n*/
-
+/* revise_lock_req - Removes all locked bytes up to final_offset from
+ * the lock req. If lock_type == PVFS_SERVER_REMOVE_ALL, remove the
+ * entire lock request. Return -1 for an error. */
int revise_lock_req(PVFS_object_ref *object_ref_p,
+ enum PVFS_server_lock_type lock_type,
PVFS_id_gen_t req_id,
- PVFS_size req_release_bytes,
- PVFS_size *total_released_bytes_p)
+ PVFS_offset final_offset,
+ struct PVFS_servresp_lock *resp_lock_p,
+ lock_node_t **lock_node_p_p)
{
int ret = 0;
enum PVFS_io_type io_type;
@@ -912,10 +1128,13 @@ int revise_lock_req(PVFS_object_ref *obj
PVFS_size released_bytes = 0;
linked_itree_t *linked_itree_p = NULL;
lock_node_t *lock_node_p = NULL;
+ PVFS_offset final_local_offset;
+ PINT_request_file_data *fdata_p;
+ lock_t *tmp_lock_p;
if (!LOCK_FILE_TABLE_INITIALIZED())
{
- fprintf(stdout, "del_lock_req: Impossible lock_file_table not "
+ fprintf(stdout, "revise_lock_req: Impossible lock_file_table not "
"initialized\n");
return -1;
}
@@ -927,18 +1146,19 @@ int revise_lock_req(PVFS_object_ref *obj
if (!hash_link_p)
{
gossip_debug(GOSSIP_LOCK_DEBUG,
- "del_lock_req: Lock node not found!\n");
+ "revise_lock_req: Lock node not found!\n");
ret = -1;
goto del_unlock_exit;
}
lock_node_p = qlist_entry(hash_link_p, lock_node_t, hash_link);
+ *lock_node_p_p = lock_node_p;
+ fdata_p = &lock_node_p->fdata;
/* Since the actual lock req could be either in the queued_req
* list or the granted_req rbtree, look in the granted_req_rbtree
* first, then the queued_req_list. Maintain this note through
* the call */
-
rbtree_p = rbtree_search(lock_node_p->granted_req,
&RBTREE_NIL, req_id);
if (rbtree_p != &RBTREE_NIL)
@@ -947,7 +1167,7 @@ int revise_lock_req(PVFS_object_ref *obj
{
lock_req_t *tmp_lock_req_p = NULL;
- gossip_debug(GOSSIP_LOCK_DEBUG, "del_lock_req: req_id=%Ld not "
+ gossip_debug(GOSSIP_LOCK_DEBUG, "revise_lock_req: req_id=%Ld not "
"found in granted list\n", req_id);
qhash_for_each(pos, &(lock_node_p->queued_req))
@@ -958,8 +1178,8 @@ int revise_lock_req(PVFS_object_ref *obj
}
if (lock_req_p == NULL)
{
- fprintf(stdout, "del_lock_req: req_id=%Ld not found in "
- "granted or queued list\n", req_id);
+ gossip_debug(GOSSIP_LOCK_DEBUG, "revise_lock_req: req_id=%Ld not "
+ "found in granted or queued list\n", req_id);
ret = -1;
goto del_unlock_exit;
}
@@ -967,31 +1187,24 @@ int revise_lock_req(PVFS_object_ref *obj
lock_head_p = &(lock_req_p->lock_head);
io_type = lock_req_p->io_type;
-#if 0
- {
- /* debug...is number 2 okay?*/
- tmp_rbtree_p =
- rbtree_search(lock_node_p->granted_req, &RBTREE_NIL, 2);
- tmp_lock_req_p =
- rbtree_entry(tmp_rbtree_p, lock_req_t, granted_req_link);
- printf("test");
- }
-#endif
-
/* Delete and/or change locks in the correct lock tree by
- * req_release_bytes. */
- if (req_release_bytes == -1)
+ * final_offset. */
+ if (lock_type == PVFS_SERVER_RELEASE_ALL) /* Release all bytes. */
{
qlist_for_each_safe(pos, scratch, lock_head_p)
{
linked_itree_p = itree_entry(pos, linked_itree_t, list_link);
+
gossip_debug(GOSSIP_LOCK_DEBUG,
- "removing lock from %Ld to %Ld - req %d "
+ "removing (ALL) lock from %Ld to %Ld - req %d "
"(should be %Ld)\n",
linked_itree_p->itree_link.start,
linked_itree_p->itree_link.end,
linked_itree_p->lock_id, req_id);
+ released_bytes += linked_itree_p->itree_link.end -
+ linked_itree_p->itree_link.start + 1;
+
del_itree_p = itree_delete(
((io_type == PVFS_IO_READ) ?
&(lock_node_p->read_itree) : &(lock_node_p->write_itree)),
@@ -1004,22 +1217,97 @@ int revise_lock_req(PVFS_object_ref *obj
free(linked_itree_p);
}
}
- else
+ else /* Remove some of the locks (possibly all) */
{
- struct qlist_head *last_lock_p = lock_head_p->prev;
- while ((released_bytes < req_release_bytes) &&
- (last_lock_p != lock_head_p))
+ struct qlist_head *last_lock_p;
+
+ /* Convert the final_offset to a local offset */
+ final_local_offset =
+ (*fdata_p->dist->methods->logical_to_physical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ final_offset);
+
+ while (1)
{
+ last_lock_p = lock_head_p->prev;
+
+ if (last_lock_p == lock_head_p)
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "All locks (SOME) were removed!\n");
+ break;
+ }
+
linked_itree_p =
itree_entry(last_lock_p, linked_itree_t, list_link);
- if ((req_release_bytes - released_bytes) >=
- (linked_itree_p->itree_link.end -
- linked_itree_p->itree_link.start + 1))
-
+ /* 3 cases
+ * 1 - final_local_offset is now safe - not a problem
+ * 2 - Need to break a lock and add a piece to the removed_link
+ * 3 - Remove lock completely */
+ if (final_local_offset > linked_itree_p->itree_link.end)
+ break;
+ else if ((final_local_offset <= linked_itree_p->itree_link.end) &&
+ (final_local_offset > linked_itree_p->itree_link.start))
{
+ tmp_lock_p = malloc(sizeof(lock_t));
+ if (!tmp_lock_p)
+ {
+ gossip_err("revise_lock_req: malloc tmp_lock_p failed\n");
+ break;
+ }
+ tmp_lock_p->start = final_local_offset;
+ tmp_lock_p->end = linked_itree_p->itree_link.end;
+
+ qlist_add_tail(&(tmp_lock_p->lock_link),
+ &(lock_req_p->removed_link));
+
+ released_bytes += linked_itree_p->itree_link.end -
+ (final_local_offset - 1);
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "removing (SOME) partial lock from %Ld to %Ld - "
+ "req %d (should be %Ld) - final_local_off=%Ld\n",
+ final_local_offset,
+ linked_itree_p->itree_link.end,
+ linked_itree_p->lock_id, req_id,
+ final_local_offset);
+
+ linked_itree_p->itree_link.end = final_local_offset - 1;
+
+ /* Now, fix your own max and go up the tree */
+ itree_max_update_self(&(linked_itree_p->itree_link),
+ &ITREE_NIL);
+ itree_max_update_parent(&(linked_itree_p->itree_link),
+ &ITREE_NIL);
+ break;
+ }
+ else
+ {
+ tmp_lock_p = malloc(sizeof(lock_t));
+ if (!tmp_lock_p)
+ {
+ gossip_err("revise_lock_req: malloc tmp_lock_p failed\n");
+ break;
+ }
+ tmp_lock_p->start = linked_itree_p->itree_link.start;
+ tmp_lock_p->end = linked_itree_p->itree_link.end;
+
+ qlist_add_tail(&(tmp_lock_p->lock_link),
+ &(lock_req_p->removed_link));
+
released_bytes += linked_itree_p->itree_link.end -
linked_itree_p->itree_link.start + 1;
+
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "removing (SOME) lock from %Ld to %Ld - req %d "
+ "(should be %Ld) - final_local_off=%Ld\n",
+ linked_itree_p->itree_link.start,
+ linked_itree_p->itree_link.end,
+ linked_itree_p->lock_id, req_id,
+ final_local_offset);
+
qlist_del(last_lock_p);
del_itree_p = itree_delete(
((io_type == PVFS_IO_READ) ? &(lock_node_p->read_itree) :
@@ -1032,32 +1320,35 @@ int revise_lock_req(PVFS_object_ref *obj
free(linked_itree_p);
last_lock_p = lock_head_p->prev;
}
- else
- {
- linked_itree_p->itree_link.end -=
- req_release_bytes - released_bytes;
-
- /* Now, fix your own max and go up the tree */
- itree_max_update_self(&(linked_itree_p->itree_link),
- &ITREE_NIL);
- itree_max_update_parent(&(linked_itree_p->itree_link),
- &ITREE_NIL);
- released_bytes = req_release_bytes;
- }
}
}
- /* Remove lock_req from its respective queue if req_release_bytes == -1.
+ lock_req_p->actual_locked_bytes -= released_bytes;
+ /* Don't try to add locks for it in the future */
+ lock_req_p->wait_abs_offset = -1;
+
+ /* Remove lock_req from its respective queue if final_offset == -1.
* If the lock_req was ALL_LOCKS_GRANTED, change its status to
* INCOMPLETE, and add it to the queued_list in the proper
* location (i.e. search the all_req queue to see where to add
* it). */
- if (req_release_bytes == -1)
+ if (lock_type == PVFS_SERVER_RELEASE_ALL)
{
if (lock_req_p->lock_req_status == INCOMPLETE)
- qlist_del(&(lock_req_p->queued_req_link));
+ {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Remove all locks - req %Ld removed from the"
+ " queued list.\n",
+ lock_req_p->req_id);
+ qlist_del_init(&(lock_req_p->queued_req_link));
+ }
else
{
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Remove all locks - req %Ld removed from the"
+ " granted list.\n",
+ lock_req_p->req_id);
+
del_rbtree_p =
rbtree_delete(&lock_node_p->granted_req, &RBTREE_NIL,
&(lock_req_p->granted_req_link),
@@ -1069,14 +1360,18 @@ int revise_lock_req(PVFS_object_ref *obj
/* Delete the lock req from the all_req queue and possibly
* delete the lock_node if there are no more entries for
* it. */
- qlist_del(&(lock_req_p->all_req_link));
+ qlist_del_init(&(lock_req_p->all_req_link));
free_lock_req(lock_req_p);
if (qlist_empty(&(lock_node_p->all_req)))
{
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "No lock request in node, removing lock node\n");
+
qlist_del(&(lock_node_p->hash_link));
PINT_dist_free(lock_node_p->fdata.dist);
free(lock_node_p);
+ *lock_node_p_p = NULL;
}
/* Todo: Should also be a way to check if the table is empty to free
* that as well. */
@@ -1089,6 +1384,10 @@ int revise_lock_req(PVFS_object_ref *obj
lock_req_t *tmp_lock_req_p = NULL, *found_lock_req_p = NULL;
struct qlist_head *tmp_req_p = NULL;
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Remove some locks - req %Ld removed from the"
+ " granted list.\n",
+ lock_req_p->req_id);
del_rbtree_p =
rbtree_delete(&(lock_node_p->granted_req), &RBTREE_NIL,
&(lock_req_p->granted_req_link),
@@ -1097,6 +1396,11 @@ int revise_lock_req(PVFS_object_ref *obj
rbtree_entry(del_rbtree_p, lock_req_t, granted_req_link);
tmp_req_p = &(lock_req_p->all_req_link);
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Remove some locks - req %Ld added back to the"
+ " queued list.\n",
+ lock_req_p->req_id);
+
/* Find out where in the queued list it belongs. Search
* the all_req queue after it to see what is next and
* where it should be added. If the next one can't be
@@ -1115,28 +1419,105 @@ int revise_lock_req(PVFS_object_ref *obj
lock_req_p->lock_req_status = INCOMPLETE;
if (found_lock_req_p)
qlist_add(&(lock_req_p->queued_req_link),
- &(lock_node_p->queued_req));
+ found_lock_req_p->queued_req_link.prev);
else
qlist_add_tail(&(lock_req_p->queued_req_link),
&(lock_node_p->queued_req));
}
+ else {
+ gossip_debug(GOSSIP_LOCK_DEBUG,
+ "Remove some locks - req %Ld already in "
+ "queued list.\n",
+ lock_req_p->req_id);
+ }
}
-
+
+ /* Set the resp fields for last_abs_offset and next_abs_offset */
+ if (lock_type == PVFS_SERVER_RELEASE_ALL)
+ {
+ resp_lock_p->last_abs_offset_locked = -1;
+ resp_lock_p->next_abs_offset = -1;
+ }
+ else
+ {
+ resp_lock_p->lock_id = lock_req_p->req_id;
+
+ /* Since at least one lock should be removed, this shouldn't happen */
+ if (lock_req_p->lock_head.prev == &lock_req_p->lock_head)
+ resp_lock_p->last_abs_offset_locked = -1;
+ else
+ {
+ linked_itree_p = qlist_entry(lock_req_p->lock_head.prev,
+ linked_itree_t, list_link);
+ resp_lock_p->last_abs_offset_locked =
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ linked_itree_p->itree_link.end);
+ }
+
+ if (lock_req_p->removed_link.prev == &lock_req_p->removed_link)
+ {
+ /* search the request and process if necessary */
+ if ((lock_req_p->seg_num ==
+ (lock_req_p->lock_result_p->segs - 1)) &&
+ (lock_req_p->seg_bytes_used ==
+ lock_req_p->lock_result_p->offset_array[lock_req_p->seg_num]))
+ {
+ if (!PINT_REQUEST_DONE(lock_req_p->file_req_state))
+ {
+ lock_req_p->lock_result_p->segs = 0;
+ lock_req_p->lock_result_p->bytes = 0;
+
+ ret = PINT_process_request(lock_req_p->file_req_state,
+ NULL,
+ &lock_node_p->fdata,
+ lock_req_p->lock_result_p,
+ PINT_SERVER);
+
+ }
+ }
+
+ if (PINT_REQUEST_DONE(lock_req_p->file_req_state))
+ resp_lock_p->next_abs_offset = -1;
+ else
+ {
+ resp_lock_p->next_abs_offset =
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ lock_req_p->lock_result_p->offset_array[
+ lock_req_p->seg_num] + lock_req_p->seg_bytes_used);
+ }
+ }
+ else
+ {
+ tmp_lock_p = qlist_entry(lock_req_p->removed_link.prev,
+ lock_t, lock_link);
+ resp_lock_p->next_abs_offset =
+ (*fdata_p->dist->methods->physical_to_logical_offset)
+ (fdata_p->dist->params,
+ fdata_p,
+ tmp_lock_p->start);
+ }
+ }
+
del_unlock_exit:
gen_mutex_unlock(lock_file_table_mutex);
- if (req_release_bytes == -1 && ret == 0)
- *total_released_bytes_p = -1;
- else
- *total_released_bytes_p = released_bytes;
+ resp_lock_p->bytes_accessed = -released_bytes;
+ if (released_bytes == 0)
+ gossip_err("revise_lock_req: Warning..0 bytes released!\n");
+
+ if (ret == 0)
+ resp_lock_p->request_finished = 1;
+ else /* There was a problem. */
+ resp_lock_p->request_finished = 0;
/* If any bytes have been revoked, all queued locks need to be
* progress as far as possible (advance their blocking bytes and
- * possibly set jobs to complete)*/
- if (req_release_bytes > 0)
- {
- /* This function will make more sense in PVFS2 when it is written. */
- }
+ * possibly set jobs to complete), which happens in a later state
+ * machine */
+
return ret;
}
-
More information about the Pvfs2-cvs
mailing list