[PVFS2-CVS]
commit by wujs in pvfs2/src/io/buffer: ncac-lru.c cache.c cache.h
flags.h internal.c internal.h module.mk.in ncac-buf-job.c
ncac-init.c ncac-interface.c ncac-job.c ncac-trove.c ncac-trove.h
state.c state.h
CVS commit program
cvs at parl.clemson.edu
Tue Sep 21 10:46:13 EDT 2004
Update of /projects/cvsroot/pvfs2/src/io/buffer
In directory parlweb:/tmp/cvs-serv23414
Modified Files:
cache.c cache.h flags.h internal.c internal.h module.mk.in
ncac-buf-job.c ncac-init.c ncac-interface.c ncac-job.c
ncac-trove.c ncac-trove.h state.c state.h
Added Files:
ncac-lru.c
Log Message:
Changes to the buffer code:
1) Simplify the concurrency control in the buffer code. The upper
layer is responsible for the write-sharing control.
2) Separate cache mangement code from other code. "ncac-lru.c" is
for LRU policy. "ncac-arc" is for ARC policy which is added soon.
3) Bug fixes in the list access.
Still working on it for writes and more testing.
--- /dev/null 2003-01-30 05:24:37.000000000 -0500
+++ ncac-lru.c 2004-09-21 09:46:13.000000000 -0400
@@ -0,0 +1,81 @@
+/* Specific functions related to LRU cache policy */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "internal.h"
+#include "state.h"
+#include "flags.h"
+#include "cache.h"
+
+
+/* add an extent into a lru cache list. The caller should hold the lock
+ * of "cache".
+ */
+void LRU_add_cache_item(struct cache_stack *cache,struct extent *extent)
+{
+ /* Insert an entry after the specified head "active_list". */
+ list_add(&extent->lru, &cache->active_list);
+ SetPageLRU(extent);
+ extent->mapping->nrpages++;
+ cache->nr_active++;
+}
+
+/* remove an extent from a lru cache list. The caller should hold the
+ * the lock of cache.
+ */
+void LRU_remove_cache_item(struct cache_stack *cache, struct extent *extent)
+{
+ list_del(&extent->lru);
+ extent->mapping->nrpages--;
+ cache->nr_active--;
+}
+
+/* shrink the LRU cache list by discarding some extents from the list.
+ * The expected number of extents discarded is "expected", while the
+ * real number of discarded extents is "shrinked".
+ */
+int LRU_shrink_cache(struct cache_stack *cache, unsigned int expected,
+ unsigned int *shrinked)
+{
+ struct list_head *lru_head, *lru_tail;
+ struct extent *victim;
+ int ret = 0;
+
+ fprintf(stderr, "%s: expected:%d\n", __FUNCTION__, expected);
+
+ *shrinked = 0;
+ lru_head = &cache->active_list;
+ lru_tail = lru_head->prev;
+
+ while (*shrinked < expected && lru_tail != (& cache->active_list) ){
+ victim = list_entry(lru_tail, struct extent, lru);
+
+ if ( !PageLRU(victim) ){
+ NCAC_error("extent flag is wrong. LRU flag is expected\n");
+ ret = NCAC_INVAL_FLAGS;
+ break;
+ }
+
+ lru_tail = lru_tail->prev;
+
+ if (PageReadPending(victim) || PageWritePending(victim)){
+ ret = NCAC_check_ioreq(victim);
+ if (ret < 0){
+ NCAC_error("NCAC_check_ioreq error: index=%ld, ioreq=%Ld\n",
+ victim->index, victim->ioreq);
+ break;
+ }
+
+ if (ret) { /* completion */
+ list_set_clean_page(victim);
+ }
+ }
+
+ if ( is_extent_discardable(victim) ){
+ LRU_remove_cache_item(cache, victim);
+ list_add_tail(&victim->list, &cache->free_extent_list);
+ *shrinked ++;
+ }
+ }
+ return ret;
+}
Index: cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/cache.c,v
diff -p -u -r1.1 -r1.2
--- cache.c 21 Aug 2003 18:57:27 -0000 1.1
+++ cache.c 21 Sep 2004 13:46:12 -0000 1.2
@@ -1,3 +1,6 @@
+/* common functions for cache management. These functions are shared
+ * by all cache policies.
+ */
#include <stdio.h>
#include <stdlib.h>
@@ -5,18 +8,13 @@
#include "state.h"
#include "flags.h"
#include "cache.h"
-
-/* contains core functions about cache */
-
-
+#include "ncac-lru.h"
/*
- * Given the index of an extent, look up whether this extent is cached
- * or not.
- * Cached: retured the extent
- * NOT cached: return NULL.
- *
- * No cache position management. NOT cache policy related.
+ * lookup_cache_item: Given the index of an extent, look up whether this
+ * extent is cached or not.
+ * Cached: retured the extent
+ * NOT cached: return NULL.
*/
struct extent * lookup_cache_item(struct inode *mapping, unsigned long index)
{
@@ -27,19 +25,19 @@ struct extent * lookup_cache_item(struct
return extent;
}
-
/* ==================================================================
- * add an item into the cache: *
+ * add an item into the cache: *
* (1) add an item into the radix tree (add_cache_item_no_policy). *
- * (2) add an item into the cache (add_cache_item_with_policy). *
- * ==================================================================
+ * (2) add an item into the cache (add_cache_item_with_policy). *
+ * ==================================================================
*/
/* add_cache_item_no_policy(): add an extent into the cache tree.
* Each inode has a cache tree, protected by its "lock".
* NOT cache policy related.
*/
-int add_cache_item_no_policy(struct extent *extent, struct inode *mapping, unsigned long index)
+static inline int add_cache_item_no_policy(struct extent *extent,
+ struct inode *mapping, unsigned long index)
{
int error;
@@ -53,40 +51,42 @@ int add_cache_item_no_policy(struct exte
return error;
}
-
/* add an item into a cache list with certain policy.
* Current implementation is related to LRU. This function is
* cache policy related.
*/
-void add_cache_item_with_policy(struct extent *extent)
+static inline void add_cache_item_with_policy(struct extent *extent, int cache_policy)
{
struct cache_stack *cache_stack = NULL;
cache_stack = get_extent_cache_stack(extent);
- if ( TestSetPageLRU(extent) )
- NCAC_error("flag error");
-
- add_page_to_inactive_list(cache_stack, extent);
-
- return;
+ switch (cache_policy) {
+ case LRU_POLICY:
+ LRU_add_cache_item(cache_stack, extent);
+ break;
+
+ default:
+ NCAC_error("unknown cache policy");
+ break;
+ }
}
-
+/* add an extent into the cache. */
int add_cache_item(struct extent *extent, struct inode *mapping,
-unsigned long index)
+ unsigned long index, int policy)
{
+ /* 1. bookkeeping in the radix tree */
int ret = add_cache_item_no_policy(extent, mapping, index);
+ /* 2. put into cache list with respect to the cache policy */
if (ret == 0){
- add_cache_item_with_policy(extent);
+ add_cache_item_with_policy(extent, policy);
return ret;
}
return ret;
}
-
-
/* ==================================================================
* remove an item from the cache: *
* (1) remove it from the radix tree (remove_cache_item_no_policy). *
@@ -95,7 +95,7 @@ unsigned long index)
* ==================================================================
*/
-void remove_cache_item_no_policy(struct extent *extent)
+static void remove_cache_item_no_policy(struct extent *extent)
{
struct inode *mapping = extent->mapping;
@@ -104,29 +104,34 @@ void remove_cache_item_no_policy(struct
/* get this back if the "list" field is used */
//list_del(&page->list);
extent->mapping = NULL;
-
}
-void remove_cache_item_with_policy(struct extent *victim)
+static void remove_cache_item_with_policy(struct extent *victim, int policy)
{
- list_del(&victim->lru);
- victim->mapping->nrpages--;
+ struct cache_stack *cache;
+
+ cache = get_extent_cache_stack(victim);
+ if ( NULL == cache ){
+ NCAC_error("extent cache stack is NULL");
+ return;
+ }
+ switch (policy){
+ case LRU_POLICY:
+ LRU_remove_cache_item(cache, victim);
+ break;
+ default:
+ NCAC_error("unknown cache policy");
+ break;
+ }
}
-void remove_cache_item(struct extent *extent)
+void remove_cache_item(struct extent *extent, int policy)
{
- remove_cache_item_with_policy(extent);
+ remove_cache_item_with_policy(extent, policy);
remove_cache_item_no_policy(extent);
}
-
-void add_free_extent_list_item(struct list_head *head, struct extent *page)
-{
- list_add_tail(&page->list, head);
-}
-
-
struct extent * get_free_extent_list_item(struct list_head *list)
{
struct extent *new;
@@ -143,260 +148,49 @@ struct extent * get_free_extent_list_ite
}
-int shrink_extent_cache( struct cache_stack *cache_stack,
- unsigned int expected, unsigned int *scanned);
-int wakeup_dirty_flush(void);
-int shrink_extent_inactive( struct cache_stack *cache_stack, int max_scan, int expected );
-int refill_inactive_list( struct cache_stack *cache_stack, int expected);
-static inline int is_extent_movable(struct extent *victim);
-
-
-/* here is the main entry point part of cache replacement.
- * When we run out of free extents, someone should be discarded
- * from the cache.
- */
-
-/* try_to_discard_extents(): this function scans the inactive_list
- * and try to discard up to "expected" extents.
- * The discardable extents are clean and no referenced. If there is not
- * enough clean extents present, the dirty flush thread will be
- * waken up.
- *
- * this call return the number of extents which have been discarded.
- * if the return value is less than 0, error occurs.
- *
- */
-int try_to_discard_extents( struct cache_stack *cache_stack,
- unsigned int expected )
-{
- int ret=0;
- unsigned int scan;
-
- DPRINT("try_to_discard_extents: dirty=%ld, expected=%d\n",cache_stack->nr_dirty, expected);
-
- ret = shrink_extent_cache( cache_stack, expected, &scan);
-
- if ( ret < 0 ) {
- NCAC_error("try_to_discard_extents: error in shrink_extent_cache\n");
- return ret;
- }
-
- DPRINT("try_to_discard_extents: expected=%d, flushed=%d\n", expected, ret);
-
- if ( ret < expected && cache_stack->nr_dirty ) {
- wakeup_dirty_flush();
- }
-
- return ret;
-}
-
-/* if there is no pending write or read operations on this
- * extent, this extent is movable.
- */
-static inline int is_extent_movable(struct extent *victim)
-{
- if ( PageClean(victim) && victim->writes == victim->wcmp && victim->reads == victim->rcmp )
- return 1;
- else return 0;
-}
-
-
-
-/* shrink_extent_cache(): this function is to discard as many as
- * "expected" clean extents from the cache.
- *
- * This function is dependent on the cache replacement policy.
- * The current implementation is more and less a simplified
- * version of the LRU-2Q policy.
- *
- * return value is less than 0, if error. Otherwise, return
- * the number of extents shrinked.
- *
- */
-int shrink_extent_cache(struct cache_stack *cache_stack, unsigned int expected, unsigned int *scanned)
-{
- int ret;
- unsigned int nr_reclaimed = 0;
- unsigned int nr_refilled = 0;
-
- DPRINT("shrink_extent_cache: to shrink inactive list: max_scan=%ld, expected=%d\n",cache_stack->nr_inactive, expected);
-
- ret = shrink_extent_inactive(cache_stack, cache_stack->nr_inactive, expected);
- if ( ret < 0 ) {
- NCAC_error("shrink_extent_inactive error: error=%d\n", ret);
- return ret;
- }
-
- DPRINT("shrink_extent_cache: to shrink inactive list: expected=%d, shrinked=%d\n",expected, ret);
-
- nr_reclaimed += ret;
-
- if ( nr_reclaimed >= expected) return nr_reclaimed;
-
- /* how many extents are moved from active list to the inactive list?
- * In Linux, it tries to keep the active list of 2/3 size of the cache.
- */
- nr_refilled = expected * cache_stack->nr_active/
- ( (cache_stack->nr_inactive | 1) *2 );
-
- DPRINT("----------:refill inactive: num=%d, active=%ld, inactive=%ld\n", nr_refilled, cache_stack->nr_active, cache_stack->nr_inactive);
-
- if ( !nr_refilled ) return nr_reclaimed;
-
- /* Limit the number of refilled extents in one run. */
- if ( nr_refilled > 2*REFILL_CLUSTER_MAX )
- nr_refilled = 2*REFILL_CLUSTER_MAX;
-
- ret = refill_inactive_list(cache_stack, nr_refilled);
- if ( ret < 0 ) {
- NCAC_error("refill_inactive_list error: error=%d\n", ret);
- return ret;
- }
-
- ret = shrink_extent_inactive(cache_stack, cache_stack->nr_inactive, expected - nr_reclaimed);
- if ( ret < 0 ) {
- NCAC_error("shrink_extent_inactive error: error=%d\n", ret);
- return ret;
- }
-
- nr_reclaimed += ret;
-
- return nr_reclaimed;
-}
-
-
-/* shrink_extent_inactive(): dicards clean extents from the inactive_list.
- */
-int shrink_extent_inactive( struct cache_stack *cache_stack, int max_scan, int expected )
-{
- int nr_to_process;
- int error;
- int ret = 0;
- struct extent * victim;
- struct list_head * inactive_list, *tail;
-
- DPRINT("shrink_extent_inactive: max_scan=%d, expected=%d\n", max_scan, expected);
-
- nr_to_process = expected;
- if (nr_to_process < DISCARD_CLUSTER_MIN)
- nr_to_process = DISCARD_CLUSTER_MIN;
-
- inactive_list =&cache_stack->inactive_list;
-
- tail = inactive_list->prev;
- while ( nr_to_process && tail != inactive_list ) {
-
- victim = list_entry(tail, struct extent, lru);
- tail = tail->prev;
-
- if ( !PageLRU(victim) ) {
- NCAC_error("extent flag is wrong\n");
- return NCAC_INVAL_FLAGS;
- }
-
- DPRINT("victim.flags=%lx, wcnt=%d, rcnt=%d, wcmp=%d, rcmp=%d, index=%ld\n", victim->flags, victim->writes, victim->reads, victim->wcmp, victim->rcmp, victim->index);
-
- if ( is_extent_movable(victim) ) {
- remove_cache_item(victim);
- add_free_extent_list_item(&cache_stack->free_extent_list, victim);
-
- DPRINT("discard extent: %p\n", victim);
-
- nr_to_process --;
- ret ++;
- }
-
-
- if ( PageReadPending(victim) || PageWritePending(victim)) {
- error = NCAC_check_ioreq(victim);
- if (error <0) {
-
- NCAC_error("NCAC_check_ioreq error: index=%ld, flags=%lx\n", victim->index, victim->flags);
-
- return error;
- }
-
- if (error) { /* completion */
- /* set all other related extents */
- list_set_clean_page(victim);
- }
- }
- }
-
+/* shrink_cache: shrink a cache with expected number of extents. The
+ * real number of extents which have been shrinked is returned by
+ * "scanned". This number might be less than "expected". All shrinked
+ * extents are returned into the extent free list.
+ * Different cache policies take their own ways to do shrink.
+ */
+int shrink_cache(struct cache_stack *cache_stack,
+ unsigned int expected,
+ int policy,
+ unsigned int *shrinked)
+{
+ int ret=-1;
+
+ switch (policy){
+ case LRU_POLICY:
+ ret = LRU_shrink_cache(cache_stack, expected, shrinked);
+ break;
+
+ case ARC_POLICY:
+ ret = LRU_shrink_cache(cache_stack, expected, shrinked);
+ break;
+
+ default:
+ NCAC_error("unknown cache policy");
+ break;
+ }
return ret;
}
-/*
- * refill_inactive_list(): Try to move extents from "cache_stack" active
- * list to its inactive list.
- * If the extent is not movable, we move it to the head of the active
- * list.
- * TODO: to verify this does make sense.
- *
- * Returns how many extents moved, may be less than expected.
- */
-int refill_inactive_list( struct cache_stack *cache_stack, int expected)
+int is_extent_discardable(struct extent *victim)
{
- struct list_head *tail;
- struct list_head *active_list;
- int moved = 0;
- int error;
-
- DPRINT("refill_inactive_list: expected=%d\n", expected);
-
- active_list =&cache_stack->active_list;
-
- tail = active_list->prev;
- while ( expected && tail != active_list ) {
- struct extent * victim;
-
- victim = list_entry(tail, struct extent, lru);
- tail = tail->prev;
-
- if ( PageReadPending(victim) || PageWritePending(victim)) {
- error = NCAC_check_ioreq(victim);
- if (error <0) {
- NCAC_error("NCAC_check_ioreq error");
- return error;
- }
-
- if (error) { /* completion */
- /* set all other related extents */
- list_set_clean_page(victim);
- }
- }
-
- if ( !is_extent_movable(victim) ) { /* not movable */
- //list_del(&victim->lru);
- //list_add(&victim->lru, active_list);
- continue;
- }
-
- DPRINT("victim.flags=%lx, wcnt=%d, rcnt=%d, wcmp=%d, rcmp=%d\n", victim->flags, victim->writes, victim->reads, victim->wcmp, victim->rcmp);
-
- expected --;
- moved ++;
- list_move(&victim->lru, &cache_stack->inactive_list);
-
- /* set reference here to show that this extent was once "hot".
- * If there is a reference on it again when it is still in
- * inactive list, this extent will be quickly promoted into
- * the active list.
- */
- SetPageReferenced(victim);
- ClearPageActive(victim);
- }
-
- cache_stack->nr_active -= moved;
- cache_stack->nr_inactive += moved;
-
- DPRINT("************ move %d extents into inactive list\n", moved);
-
- return (moved);
+ if ( PageClean(victim) && 0 == victim->reads && 0 == victim->writes )
+ return 1;
+ else
+ return 0;
}
-int wakeup_dirty_flush()
+/* hit_cache_item: cache hit, change the position according to the policy */
+void hit_cache_item(struct extent *extent, int cache_policy)
{
- return 0;
+ remove_cache_item_with_policy(extent, cache_policy);
+ add_cache_item_with_policy(extent, cache_policy);
+ return;
}
+
Index: cache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/cache.h,v
diff -p -u -r1.1 -r1.2
--- cache.h 21 Aug 2003 18:57:27 -0000 1.1
+++ cache.h 21 Sep 2004 13:46:12 -0000 1.2
@@ -7,16 +7,18 @@
#define DISCARD_CLUSTER_MIN 4
#define DELT_DISCARD_NUM 5
-struct extent * lookup_cache_item(struct inode *mapping, unsigned long offset);
-
-struct extent * get_free_extent_list_item(struct list_head *list);
-
-int add_cache_item(struct extent *page, struct inode *mapping, unsigned long offset);
-
-void list_set_clean_page(struct extent *page);
-int try_to_discard_extents( struct cache_stack *cache_stack, unsigned int num);
-
-
-
+#define LRU_POLICY 1
+#define ARC_POLICY 2
+#define TWOQ_POLICY 3
+
+struct extent *lookup_cache_item(struct inode *mapping, unsigned long offset);
+struct extent *get_free_extent_list_item(struct list_head *list);
+int add_cache_item(struct extent *page, struct inode *mapping,
+ unsigned long index, int policy);
+void remove_cache_item(struct extent *page, int policy);
+int shrink_cache(struct cache_stack *cache_stack, unsigned int expected,
+ int policy, unsigned int *shrinked);
+int is_extent_discardable(struct extent *victim);
+void hit_cache_item(struct extent *page, int policy);
#endif
Index: flags.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/flags.h,v
diff -p -u -r1.3 -r1.4
--- flags.h 22 Aug 2003 15:48:48 -0000 1.3
+++ flags.h 21 Sep 2004 13:46:12 -0000 1.4
@@ -34,11 +34,11 @@ static inline int test_and_set_bit(int n
#define PG_writecomm 10 /* Write communication */
#define PG_writepending 11 /* Write op pending */
-#define PG_referenced 12
-#define PG_blank 13 /* Blank page */
-
-
+#define PG_readpreparing 12 /* Preparing for reading */
+#define PG_writepreparing 13 /* Preparing for writing */
+#define PG_referenced 14
+#define PG_blank 15 /* Blank page */
/*
* Manipulation of state flags
@@ -87,9 +87,17 @@ static inline int test_and_set_bit(int n
#define SetPageReadPending(page) set_bit(PG_readpending, &(page)->flags)
#define ClearPageReadPending(page) clear_bit(PG_readpending, &(page)->flags)
+#define PageReadPreparing(page) test_bit(PG_readpreparing, &(page)->flags)
+#define SetPageReadPreparing(page) set_bit(PG_readpreparing, &(page)->flags)
+#define ClearPageReadPreparing(page) clear_bit(PG_readpreparing, &(page)->flags)
+
#define PageWritePending(page) test_bit(PG_writepending, &(page)->flags)
#define SetPageWritePending(page) set_bit(PG_writepending, &(page)->flags)
#define ClearPageWritePending(page) clear_bit(PG_writepending, &(page)->flags)
+
+#define PageWritePreparing(page) test_bit(PG_writepreparing, &(page)->flags)
+#define SetPageWritePreparing(page) set_bit(PG_writepreparing, &(page)->flags)
+#define ClearPageWritePreparing(page) clear_bit(PG_writepreparing, &(page)->flags)
#define PageClean(page) test_bit(PG_clean, &(page)->flags)
#define SetPageClean(page) set_bit(PG_clean, &(page)->flags)
Index: internal.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/internal.c,v
diff -p -u -r1.2 -r1.3
--- internal.c 23 Mar 2004 04:07:15 -0000 1.2
+++ internal.c 21 Sep 2004 13:46:12 -0000 1.3
@@ -10,14 +10,13 @@
#include "cache.h"
#include "ncac-job.h"
-extern struct inode *inode_arr[1000];
extern struct NCAC_dev NCAC_dev;
/* This file contains NCAC internal functions. */
-
-static inline struct inode *get_inode(PVFS_fs_id, PVFS_handle , PVFS_context_id);
-static inline int NCAC_rwjob_prepare_one_piece(PVFS_offset pos, PVFS_size size, char ** cbufoff, PVFS_size * cbufsize, struct extent **cbufhash);
+static inline struct inode *get_inode( PVFS_fs_id, PVFS_handle , PVFS_context_id);
+static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req);
+static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req);
/* get_internal_req(): get a internal request structure from the free
* list. To avoid dynamic allocation, for the timebeing, I hard code
@@ -33,7 +32,7 @@ static inline struct NCAC_req * get_inte
NCAC_req_t *req=NULL;
struct list_head *new;
- //list_lock(&NCAC_dev.req_list_lock);
+ list_lock(&NCAC_dev.req_list_lock);
if ( list_empty(&NCAC_dev.free_req_list) ) return NULL;
@@ -44,7 +43,7 @@ static inline struct NCAC_req * get_inte
}
list_del_init(new);
- //list_unlock(&NCAC_dev.req_list_lock);
+ list_unlock(&NCAC_dev.req_list_lock);
req = list_entry(new->prev, NCAC_req_t, list);
@@ -63,7 +62,7 @@ void NCAC_list_add_tail_lock(struct list
}
-/* del an entry from its list */
+/* delete an entry from its list */
void NCAC_list_del_lock(struct list_head *entry, NCAC_lock *lock)
{
list_lock(lock);
@@ -94,13 +93,12 @@ void NCAC_read_request_from_list_lock(st
*ncac_req_ptr = req;
}
-
-
/* build internal read/write requests */
NCAC_req_t *NCAC_rwreq_build( NCAC_desc_t *desc, NCAC_optype optype)
{
void *iovec;
NCAC_req_t *ncac_req;
+ int tmp_off, tmp_size;
ncac_req = get_internal_req_lock(desc->coll_id, desc->handle);
if (ncac_req == NULL) { /* run out of ncac request resources */
@@ -118,14 +116,14 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
ncac_req->aiovec = &(ncac_req->mapping->aiovec);
ncac_req->nr_dirty = 0;
- if ( desc->buffer ) { /* buffer read or not */
+ if ( desc->buffer ) { /* copy data into the user's buffer */
if ( optype == NCAC_GEN_READ )
ncac_req->optype = NCAC_BUF_READ;
else ncac_req->optype = NCAC_BUF_WRITE;
ncac_req->usrbuf = desc->buffer;
ncac_req->usrlen = desc->len;
- }else{
+ }else{ /* use cache buffers for communication */
if ( optype == NCAC_GEN_READ )
ncac_req->optype = NCAC_READ;
else ncac_req->optype = NCAC_WRITE;
@@ -145,10 +143,10 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
ncac_req->sizevec = NULL; /* no vector */
}else{ /* a list of <off, len> tuples */
+ tmp_off = desc->stream_array_count*sizeof(PVFS_offset);
+ tmp_size = desc->stream_array_count*sizeof(PVFS_size);
- /* I do want to avoid this malloc */
-
- iovec = (void*) malloc( desc->stream_array_count*(sizeof(PVFS_offset)+sizeof(PVFS_size)) );
+ iovec = (void*) malloc( tmp_off + tmp_size );
if (iovec == NULL ) {
ncac_req->status = NCAC_NO_MEM;
return ncac_req;
@@ -156,10 +154,11 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
ncac_req->offcnt = desc->stream_array_count;
ncac_req->offvec = (PVFS_offset*)iovec;
- ncac_req->sizevec = (PVFS_size *)( (unsigned long)iovec + desc->stream_array_count*sizeof(PVFS_offset) );
-
- /* copy user reuqest's stuff here */
+ ncac_req->sizevec = (PVFS_size *)( (unsigned long)iovec + tmp_off );
+ /* copy the user request information into an internal request */
+ memcpy(ncac_req->offvec, desc->stream_offset_array, tmp_off);
+ memcpy(ncac_req->sizevec, desc->stream_size_array, tmp_size);
}
/* success */
@@ -170,163 +169,318 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
/*
* NCAC_rwjob_prepare(): does three things:
* (1) allocate resource; caculate index, offset, and length;
- * (2) put the request in the interanl job list
+ * (2) put the request in the internal job list
* (3) make progress of the requests in the job list.
*/
int NCAC_rwjob_prepare(NCAC_req_t *ncac_req, NCAC_reply_t *reply )
{
- int bufcnt, cnt;
- char **cbufoff;
- PVFS_size *cbufsize;
- struct extent **cbufindex;
- int *cbufflag;
- int *cbufrcnt;
- int *cbufwcnt;
- int ret;
- int seg;
-
+ int ret;
- /* stream <off, len> --> page info. */
+ /* prepare the request */
+ if ( !ncac_req->offcnt ) { /* only one contiguous segment */
- if ( !ncac_req->offcnt ) { /* only one contiguous segment */
+ ret = NCAC_rwjob_prepare_single(ncac_req);
- /* bufcnt: the biggest number of buffers the data could be
- * placed in the cache.
- */
- bufcnt = (ncac_req->pos + ncac_req->size + NCAC_dev.extsize -1)/NCAC_dev.extsize - ncac_req->pos/NCAC_dev.extsize;
- }else {
- bufcnt = 0;
- for (seg = 0; seg < ncac_req->offcnt; seg ++)
- bufcnt += (ncac_req->offvec[seg]+ncac_req->sizevec[seg] + NCAC_dev.extsize -1)/NCAC_dev.extsize - ncac_req->offvec[seg]/NCAC_dev.extsize;
- }
-
- /* try to reuse buffer info. arrays if possible. "reserved_cbufcnt" is the
- * size of the previous request. If the size of the current request is
- * not larger than the previous one, we reuse the previous resource. Otherwise,
- * we free the old one and malloc the new one. */
-
- if ( ncac_req->reserved_cbufcnt < bufcnt ) {
- if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
-
- cbufoff =(char**) malloc( (2*sizeof(char*)+sizeof(PVFS_size)+3*sizeof(int))* bufcnt );
- cbufsize =(PVFS_size*) &cbufoff[bufcnt];
- cbufindex =(struct extent**) &cbufsize[bufcnt];
- cbufflag =(int*) &cbufindex[bufcnt];
- cbufrcnt =(int*) &cbufflag[bufcnt];
- cbufwcnt =(int*) &cbufrcnt[bufcnt];
-
-
- if ( cbufoff == NULL ) {
- ncac_req->error = -ENOMEM;
- return -ENOMEM;
- }
+ }else{ /* multiple segements */
- ncac_req->cbufoff = cbufoff;
- ncac_req->cbufsize = cbufsize;
- ncac_req->cbufhash = cbufindex;
- ncac_req->cbufflag = cbufflag;
- ncac_req->cbufrcnt = cbufrcnt;
- ncac_req->cbufwcnt = cbufwcnt;
-
- ncac_req->reserved_cbufcnt = bufcnt;
- }
-
- ncac_req->cbufcnt = bufcnt;
- memset(ncac_req->cbufoff, 0, (2*sizeof(char*)+sizeof(PVFS_size)+3*sizeof(int))*ncac_req->reserved_cbufcnt);
-
- if ( !ncac_req->offcnt ) { /* only one contiguous segment */
- ret = NCAC_rwjob_prepare_one_piece( ncac_req->pos,
- ncac_req->size,
- ncac_req->cbufoff,
- ncac_req->cbufsize,
- ncac_req->cbufhash);
- if ( ret != bufcnt) {
- fprintf(stderr, "Error: bufcnt error in prepare\n");
- ncac_req->error = NCAC_JOB_PREPARE_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
- return NCAC_JOB_PREPARE_ERR;
- }
- }else{
+ ret = NCAC_rwjob_prepare_list(ncac_req);
- /* multiple <off len> tuples. Handle each contiguous piece one
- * by one. */
-
- cnt = 0;
- for (seg = 0; seg < ncac_req->offcnt; seg ++) {
- ret = NCAC_rwjob_prepare_one_piece(ncac_req->offvec[seg],
- ncac_req->sizevec[seg],
- ncac_req->cbufoff + cnt,
- ncac_req->cbufsize + cnt,
- ncac_req->cbufhash + cnt);
- cnt += ret;
- }
- if (cnt > bufcnt) {
- fprintf(stderr, "Error: bufcnt error in prepare\n");
- ncac_req->error = NCAC_JOB_PREPARE_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
- return NCAC_JOB_PREPARE_ERR;
- }
}
-
- /* put the request in the internal job list: thread safe */
-
- NCAC_list_add_tail_lock(&ncac_req->list, &NCAC_dev.prepare_list, &NCAC_dev.req_list_lock);
-
- ncac_req->status = NCAC_REQ_SUBMITTED;
-
- DPRINT("NCAC_rwjob_prepare: %p submitted\n", ncac_req);
-
- /* make progress of jobs: thread safe.
- * Choices here are: 1) do one job; 2) scan the whole list. Choose 1) here. */
- //ret = NCAC_do_jobs(&(NCAC_dev.req_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
-
- ret = NCAC_do_a_job(ncac_req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
-
- if ( ret < 0 ) {
- ncac_req->error = NCAC_JOB_DO_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
+ if ( ret < 0 ){
+ ncac_req->error = ret;
return ret;
}
- ncac_req->error = NCAC_OK;
+ /* put the request in the internal job list: thread safe */
+
+ NCAC_list_add_tail_lock(&ncac_req->list, &NCAC_dev.prepare_list,
+ &NCAC_dev.req_list_lock);
+
+ ncac_req->status = NCAC_REQ_SUBMITTED;
+
+ DPRINT("NCAC_rwjob_prepare: %p submitted\n", ncac_req);
+
+ /* make progress of jobs: thread safe.
+ * Choices here are: 1) do one job; 2) scan the whole list.
+ * Choose 1) here.
+ */
+ //ret = NCAC_do_jobs(&(NCAC_dev.req_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
+
+ ret = NCAC_do_a_job(ncac_req, &(NCAC_dev.prepare_list),
+ &(NCAC_dev.bufcomp_list),
+ &(NCAC_dev.comp_list),
+ &NCAC_dev.req_list_lock);
+
+ if ( ret < 0 ) {
+ ncac_req->error = NCAC_JOB_DO_ERR;
+ ncac_req->status = NCAC_ERR_STATUS;
+ return ret;
+ }
+
+ ncac_req->error = NCAC_OK;
+
+ return 0;
+}
+
+
+/* NCAC_rwjob_prepare_single: Given a request which accesses only one
+ * file region, we prepare needed resources for this request:
+ * 1) extent cache buffers;
+ * 2) Communication buffer address;
+ * 3) Communication buffer sizes;
+ * 4) Communication buffer flags;
+ * Given the extent size is 32768 bytes, if a request wants to
+ * read data 32768 bytes from 1024,
+ * (1) two extents: 0-32765, and 32768-65535
+ * (2) comm bufers: extent1.addr+1024, extent2.addr
+ * (3) comm bufer size: 31744, 1024
+ * (4) if data is ready, flag is set.
+ * In this case, the number of extents and the number of communication
+ * buffers are same.
+ */
+
+static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req)
+{
+ int extcnt; /* cache extent count */
+ int comcnt; /* communication buffer count */
+ int allocsize;
+
+ PVFS_offset *foff;
+ char **cbufoff;
+ PVFS_size *cbufsize;
+ int *cbufflag;
+ unsigned long firstoff;
+
+ int i;
+
+ extcnt = (ncac_req->pos + ncac_req->size + NCAC_dev.extsize -1) /
+ NCAC_dev.extsize - ncac_req->pos/NCAC_dev.extsize;
+ comcnt = extcnt;
+
+ if ( ncac_req->reserved_cbufcnt < comcnt ) {
+ if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
+
+ allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
+ + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt;
+
+ ncac_req->foff =(PVFS_offset*) malloc(allocsize);
+
+ if ( ncac_req->foff == NULL ) {
+ ncac_req->error = -ENOMEM;
+ return -ENOMEM;
+ }
+
+ ncac_req->cbufoff =(char**) & ncac_req->foff[comcnt];
+ ncac_req->cbufsize =(PVFS_size*) &ncac_req->cbufoff[comcnt];
+ ncac_req->cbufhash =(struct extent**)
+ &ncac_req->cbufsize[comcnt];
+ ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
+ ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
+ ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
+
+ ncac_req->reserved_cbufcnt = comcnt;
+
+ memset(ncac_req->foff, 0, allocsize);
+ }
+
+ ncac_req->cbufcnt = comcnt;
+
+ foff = ncac_req->foff;
+ cbufoff = ncac_req->cbufoff;
+ cbufsize = ncac_req->cbufsize;
+ cbufflag = ncac_req->cbufflag;
+
+ /* Setup the related values for foff, cbufoff, and cbufsize */
+
+ firstoff = (unsigned long) (ncac_req->pos & (NCAC_dev.extsize -1));
+ foff[0] = ncac_req->pos - firstoff;
+ cbufoff[0] = (char*)firstoff; /* offsize to the extent address */
+ cbufsize[0] = NCAC_dev.extsize - firstoff;
+ for ( i= 1; i < comcnt; i++){
+ foff[i] = foff[i-1] + cbufsize[i-1];
+ cbufoff[i] = 0;
+ cbufsize[i] = NCAC_dev.extsize;
+ cbufflag[i] = NCAC_COMM_NOT_READY;
+ }
+ cbufsize[comcnt-1] = (ncac_req->pos + ncac_req->size)% NCAC_dev.extsize;
+ fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
return 0;
}
+/* NCAC_rwjob_prepare_list: Given a request which accesses a list of
+ * fire regions, we prepare needed resources for this request:
+ * 1) extent cache buffers;
+ * 2) Communication buffer address;
+ * 3) Communication buffer sizes;
+ * 4) Communication buffer flags;
+ * Given the extent size is 32768 bytes, if a request wants to
+ * read data the following regions: (1024, 32768) and (65530, 32768)
+ * (1) Three extents: 0-32765, 32768-65535, 65536-98303
+ * (2) Communication buffers:
+ * extent1.addr+1024, extent2.addr,
+ * extent2.addr+32762, extent3.addr
+ * (3) Communication buffer size:
+ * 31744, 1024, 6, and 32762
+ * This example shows that:
+ * (A) For the underlying I/O system, we are going to read
+ * three extents;
+ * (B) For the upper communcation system, we are goint to
+ * user four different buffers.
+ * The number of communication buffers is equal to or larger
+ * than the number of needed extents.
+ */
+
+struct freg_tuple
+{
+ PVFS_offset fpos;
+ PVFS_size size;
+};
+
+int comp_pos(const PVFS_offset *num1, const PVFS_offset *num2)
+{
+ if (*num1 < *num2) return -1;
+ if (*num1 == *num2) return 0;
+ if (*num1 > *num2) return 1;
+ return 0;
+}
-static inline int NCAC_rwjob_prepare_one_piece(PVFS_offset pos, PVFS_size size, char ** cbufoff, PVFS_size * cbufsize, struct extent **cbufhash)
+static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req)
{
- unsigned long offset;
- unsigned long bufcnt, len;
- int seg;
+ int extcnt; /* cache extent count */
+ int comcnt; /* communication buffer count */
+ int allocsize;
+
+ PVFS_offset *foff;
+ char **cbufoff;
+ PVFS_size *cbufsize;
+ int *cbufflag;
+ unsigned long firstoff;
+
+ int i, j;
+ int cnt;
+
+ struct freg_tuple *fregions;
- bufcnt = (pos + size + NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize;
+ fregions = (struct freg_tuple *)malloc(ncac_req->offcnt *
+ sizeof(struct freg_tuple));
+ if ( NULL == fregions){
+ ncac_req->error = -ENOMEM;
+ return -ENOMEM;
+ }
+
+ extcnt = 0;
+ for (i = 0; i < ncac_req->offcnt; i ++) {
+ extcnt += (ncac_req->offvec[i] + ncac_req->sizevec[i] +
+ NCAC_dev.extsize -1)/NCAC_dev.extsize -
+ ncac_req->offvec[i]/NCAC_dev.extsize;
+
+ fregions[i].fpos = ncac_req->offvec[i];
+ fregions[i].size = ncac_req->sizevec[i];
+ }
+
+ /* Some extents counted by "extcnt" may be same. Also the
+ * number of communication buffers should be same as the
+ * extcnt. Use "comcnt" to overprovision resources.
+ */
+
+ comcnt = extcnt;
+
+ if ( ncac_req->reserved_cbufcnt < comcnt ) {
+ if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
- len = 0;
+ allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
+ + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt;
- /* first one */
- offset = (unsigned long)pos & (NCAC_dev.extsize -1); /* within extent */
- cbufoff[0] =(char*) offset; /* add the extent address later */
- cbufsize[0] = NCAC_dev.extsize - offset;
- len += cbufsize[0];
-
- /* middle ones */
- for (seg = 1; seg < bufcnt - 1; seg ++ ) {
- /* add the extent address later */
- cbufoff[seg] = 0;
- cbufsize[seg] = NCAC_dev.extsize;
- len += cbufsize[seg];
+ ncac_req->foff =(PVFS_offset*) malloc(allocsize);
+
+ if ( ncac_req->foff == NULL ) {
+ ncac_req->error = -ENOMEM;
+
+ free(fregions);
+
+ return -ENOMEM;
+ }
+
+ ncac_req->cbufoff =(char**) & ncac_req->foff[comcnt];
+ ncac_req->cbufsize =(PVFS_size*) &ncac_req->cbufoff[comcnt];
+ ncac_req->cbufhash =(struct extent**)
+ &ncac_req->cbufsize[comcnt];
+ ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
+ ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
+ ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
+
+ ncac_req->reserved_cbufcnt = comcnt;
+
+ memset(ncac_req->foff, 0, allocsize);
}
+
+ foff = ncac_req->foff;
+ cbufoff = ncac_req->cbufoff;
+ cbufsize = ncac_req->cbufsize;
+ cbufflag = ncac_req->cbufflag;
+
+ /* How many different extents are needed? Put them in an
+ * ordered manner to be friendly to the underlying I/O system.
+ * What are communication buffers used for the upper layer?
+ * (offset to the related extent, size).
+ */
+ /* quick sort the list of file regions. If the upper layer
+ * can present the file regions in an ordered manner, we can
+ * eliminate this sorting.
+ */
+ qsort(fregions, ncac_req->offcnt, sizeof(struct freg_tuple), (void*)comp_pos);
+
+#if 1
+ for (i=0; i<ncac_req->offcnt; i++){
+ fprintf(stderr, "fpos:%Ld, size:%Ld\n", fregions[i].fpos,
+fregions[i].size);
+ }
+#endif
+
+ comcnt = 0;
+ for ( i =0; i <ncac_req->offcnt; i++){
+ cnt = (fregions[i].fpos+fregions[i].size+NCAC_dev.extsize-1)/
+ NCAC_dev.extsize - fregions[i].fpos/NCAC_dev.extsize;
+
+ firstoff=(unsigned long)(fregions[i].fpos & (NCAC_dev.extsize -1));
+
+ foff[comcnt] = fregions[i].fpos - firstoff;
+ cbufoff[comcnt] = (char*)firstoff;
+ cbufsize[comcnt] = NCAC_dev.extsize - firstoff;
+ cbufflag[comcnt] = NCAC_COMM_NOT_READY;
+
+ for ( j= 1; j < cnt; j++){
+ foff[comcnt+j] = foff[comcnt+j-1] + NCAC_dev.extsize;
+ cbufoff[comcnt+j] = 0;
+ cbufsize[comcnt+j] = NCAC_dev.extsize;
+ cbufflag[comcnt+j] = NCAC_COMM_NOT_READY;
+ }
+ /* adjust the size of the last buffer in each segment. */
+ cbufsize[comcnt+cnt-1] -= (fregions[i].fpos+fregions[i].size) %
+ NCAC_dev.extsize;
- /* last ones */
- if ( bufcnt > 1 ){
- cbufoff[bufcnt-1] = 0;
- cbufsize[bufcnt-1] = size - len;
+ comcnt += cnt;
}
+ /* so far, in the ncac_req.foff, some extents are probably same,
+ * but they are consecutive.
+ */
+
+ free(fregions);
+
+ ncac_req->cbufcnt = comcnt;
- return bufcnt;
+#if 1
+ fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
+ for (i=0; i<comcnt; i++){
+ fprintf(stderr, "fpos:%Ld, buf_off:%ld, size:%Ld\n", foff[i],
+(unsigned long)cbufoff[i], cbufsize[i]);
+ }
+#endif
+
+ return 0;
}
/* NCAC_do_jobs(): this is the workhorse of NCAC.
@@ -374,33 +528,35 @@ static inline int NCAC_rwjob_prepare_one
*
*/
-int NCAC_do_jobs(struct list_head *prep_list, struct list_head *bufcomp_list, struct list_head *comp_list, NCAC_lock *lock)
+int NCAC_do_jobs(struct list_head *prep_list, struct list_head *bufcomp_list,
+ struct list_head *comp_list, NCAC_lock *lock)
{
- int ret;
- NCAC_req_t *ncac_req;
-
+ int ret;
+ NCAC_req_t *ncac_req;
+
dojob:
- /* read a request from the prep_list job. When a job is read out
- * (NOT taken from the list), there is a flag to inidcate that
- * someone else has read this request out. So get_request_from_list
- * is always return a request which is not read out by others
- */
-
- NCAC_read_request_from_list_lock(prep_list, lock, &ncac_req);
- if (ncac_req) {
- ret = NCAC_do_a_job(ncac_req, prep_list, bufcomp_list, comp_list, lock);
+ /* read a request from the prep_list job. When a job is read out
+ * (NOT taken from the list), there is a flag to indicate that
+ * someone else has read this request out. So get_request_from_list
+ * always returns a request which is not read out by others
+ */
- ncac_req->read_out = 0;
- if ( ret < 0 )
- return ret;
+ NCAC_read_request_from_list_lock(prep_list, lock, &ncac_req);
- if ( ncac_req->status == NCAC_BUFFER_COMPLETE ||
- ncac_req->status == NCAC_COMPLETE )
- goto dojob;
- }
+ if (ncac_req) {
+ ret = NCAC_do_a_job(ncac_req, prep_list, bufcomp_list, comp_list, lock);
- return 0;
+ ncac_req->read_out = 0;
+ if ( ret < 0 )
+ return ret;
+
+ if ( ncac_req->status == NCAC_BUFFER_COMPLETE ||
+ ncac_req->status == NCAC_COMPLETE )
+ goto dojob;
+ }
+
+ return 0;
}
@@ -409,24 +565,28 @@ dojob:
* is called. All horseworkers are implemented in "ncac_job.c".
*/
-int NCAC_do_a_job(NCAC_req_t *ncac_req, struct list_head *prep_list, struct list_head *bufcomp_list, struct list_head *comp_list, NCAC_lock *lock)
+int NCAC_do_a_job(NCAC_req_t *ncac_req, struct list_head *prep_list,
+ struct list_head *bufcomp_list,
+ struct list_head *comp_list, NCAC_lock *lock)
{
- int ret;
-
- switch (ncac_req->optype){
+ int ret;
+
+ fprintf(stderr, "NCAC_do_a_job enter\n");
+
+ switch (ncac_req->optype){
- /* cached read */
+ /* cached read */
case NCAC_READ:
ret = NCAC_do_a_read_job(ncac_req);
break;
- /* cached write */
+ /* cached write */
case NCAC_WRITE:
ret = NCAC_do_a_write_job(ncac_req);
break;
-
+
/* cached buffer read */
case NCAC_BUF_READ:
@@ -438,7 +598,7 @@ int NCAC_do_a_job(NCAC_req_t *ncac_req,
ret = NCAC_do_a_bufwrite_job(ncac_req);
break;
-
+
case NCAC_QUERY:
ret = NCAC_do_a_query_job(ncac_req);
break;
@@ -451,52 +611,53 @@ int NCAC_do_a_job(NCAC_req_t *ncac_req,
ret = NCAC_do_a_sync_job(ncac_req);
break;
- default:
+ default:
ret = NCAC_JOB_OPTYPE_ERR;
- fprintf(stderr, "NCAC_do_a_job: unrecognize optype flag\n");
+ fprintf(stderr, "NCAC_do_a_job: unrecognize optype flag\n");
break;
}
- if ( ncac_req->status == NCAC_BUFFER_COMPLETE ) {
+ if ( ncac_req->status == NCAC_BUFFER_COMPLETE ) {
- NCAC_list_del_lock(&ncac_req->list, lock);
+ NCAC_list_del_lock(&ncac_req->list, lock);
- NCAC_list_add_tail_lock(&ncac_req->list, bufcomp_list, lock);
+ NCAC_list_add_tail_lock(&ncac_req->list, bufcomp_list, lock);
- }else if ( ncac_req->status == NCAC_COMPLETE )
- {
- NCAC_list_del_lock(&ncac_req->list, lock);
- NCAC_list_add_tail_lock(&ncac_req->list, comp_list, lock);
- }
+ }else if ( ncac_req->status == NCAC_COMPLETE )
+ {
+ NCAC_list_del_lock(&ncac_req->list, lock);
+ NCAC_list_add_tail_lock(&ncac_req->list, comp_list, lock);
+ }
+ fprintf(stderr, "NCAC_do_a_job exit\n");
return ret;
}
int NCAC_check_request( int id, struct NCAC_req **ncac_req )
{
- struct NCAC_req *req;
- int ret;
+ struct NCAC_req *req;
+ int ret;
- req = &NCAC_dev.free_req_src[id];
- if ( req->status == NCAC_COMPLETE || req->status == NCAC_BUFFER_COMPLETE ) {
- *ncac_req = req;
- return 0;
- }
+ req = &NCAC_dev.free_req_src[id];
+ if ( req->status == NCAC_COMPLETE || req->status == NCAC_BUFFER_COMPLETE ) {
+ *ncac_req = req;
+ return 0;
+ }
if ( req->status == NCAC_REQ_UNUSED ) {
- *ncac_req = NULL;
+ *ncac_req = NULL;
NCAC_error("NCAC_check_request:no such request");
- return -1;
+ return -1;
}
- ret = NCAC_do_a_job(req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
+ ret = NCAC_do_a_job(req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
- if ( ret < 0 ) {
+ if ( ret < 0 ) {
NCAC_error("NCAC_check_request:do a job error (%d)", req->error);
- }
- *ncac_req = req;
- return ret;
+ }
+ *ncac_req = req;
+ return ret;
}
/* done request(): mark a request is done. Several cases:
@@ -517,23 +678,23 @@ int NCAC_check_request( int id, struct N
int NCAC_done_request( int id )
{
- struct NCAC_req *ncac_req;
- int ret = 0;
+ struct NCAC_req *ncac_req;
+ int ret = 0;
- ncac_req = &NCAC_dev.free_req_src[id];
+ ncac_req = &NCAC_dev.free_req_src[id];
switch ( ncac_req->status ) {
- case NCAC_BUFFER_COMPLETE: /* pending communication is done */
-
- NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
+ case NCAC_BUFFER_COMPLETE: /* pending communication is done */
+
+ NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
ret = NCAC_extent_done_access( ncac_req );
-
+
break;
case NCAC_COMPLETE:
- NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
+ NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
break;
default: /* error. leaking here. */
@@ -542,65 +703,100 @@ int NCAC_done_request( int id )
return ret;
}
-
/* prepare to return this request to the free list.
* We cannot just zero the ncac_req for all cases.
* We want to reuse buffer inforation arrays to avoid
* allcations. */
-
+
if ( ncac_req->reserved_cbufcnt == 0 ) {
id = ncac_req->id;
memset( ncac_req, 0, sizeof(struct NCAC_req) );
ncac_req->id = id;
}else{ /* we want reuse buffer information arrays */
- ncac_req->cbufcnt = 0;
- ncac_req->mapping = 0;
+ ncac_req->cbufcnt = 0;
+ ncac_req->mapping = 0;
ncac_req->ioreq = INVAL_IOREQ;
- ncac_req->read_out = 0;
+ ncac_req->read_out = 0;
}
- ncac_req->status = NCAC_REQ_UNUSED;
- NCAC_list_add_tail_lock( &ncac_req->list, &NCAC_dev.free_req_list, &NCAC_dev.req_list_lock);
+ ncac_req->status = NCAC_REQ_UNUSED;
+
+ NCAC_list_add_tail_lock( &ncac_req->list, &NCAC_dev.free_req_list, &NCAC_dev.req_list_lock);
return ret;
}
+static inline struct inode *search_inode_list (PVFS_handle handle)
+{
+ int inode_index;
+ struct inode * cur;
+
+ inode_index = handle % MAX_INODE_NUM;
+
+ cur = inode_arr[inode_index];
+ while ( NULL != cur ) {
+ if ( cur->handle == handle ) return cur;
+ cur = cur->next;
+ }
+
+ return NULL;
+}
+
+/* get_inode: give a fs_id and a file handler, an inode-like structure
+ * is allocated. Since handle is an arbitrary number, we should
+ * have a mapping between this handler and the index of inode.
+ * get_inode should be called under some lock because two callers may
+ * work on the same collision list.
+ */
static inline struct inode *get_inode(PVFS_fs_id coll_id,
- PVFS_handle handle, PVFS_context_id context_id)
+ PVFS_handle handle, PVFS_context_id context_id)
{
- struct inode *inode;
+ struct inode *inode;
+ int inode_index;
+
+ inode_index = handle % MAX_INODE_NUM;
+
+ /* search the inode list with the index of "inode_index" */
+ inode = search_inode_list (handle);
- if ( !inode_arr[handle] ) {
- inode=(struct inode*)malloc(sizeof(struct inode));
+ fprintf(stderr, "handle: %Ld, index: %d, inode:%p\n", handle, inode_index, inode);
- /* initialize it */
- memset(inode, 0, sizeof(struct inode));
+ if ( NULL == inode ){
+ inode=(struct inode*)malloc(sizeof(struct inode));
- inode->cache_stack = get_cache_stack();
+ /* initialize it */
+ memset(inode, 0, sizeof(struct inode));
- init_single_radix_tree(&inode->page_tree, NCAC_dev.get_value, NCAC_dev.max_b);
+ inode->cache_stack = get_cache_stack();
+ inode->nrpages = 0;
+ inode->nr_dirty = 0;
+ inode->coll_id = coll_id;
+ inode->handle = handle;
+ inode->context_id = context_id;
- inode_arr[handle] = inode;
- inode_arr[handle]->nrpages = 0;
- inode_arr[handle]->nr_dirty = 0;
- inode_arr[handle]->coll_id = coll_id;
- inode_arr[handle]->handle = handle;
- inode_arr[handle]->context_id = context_id;
+ init_single_radix_tree(&inode->page_tree, NCAC_dev.get_value, NCAC_dev.max_b);
spin_lock_init(&inode->lock);
- INIT_LIST_HEAD(&(inode->clean_pages));
- INIT_LIST_HEAD(&(inode->dirty_pages));
- }
- return inode_arr[handle];
+ INIT_LIST_HEAD(&(inode->clean_pages));
+ INIT_LIST_HEAD(&(inode->dirty_pages));
+
+ /* put the new inode to the head of the collision list */
+ inode->next = inode_arr[inode_index];
+ inode_arr[inode_index] = inode;
+ }
+
+ return inode;
}
+
+
static inline void extent_dump(struct extent *extent)
{
fprintf(stderr, "flags:%x\t status:%d\t index:%d\t\n", (int)extent->flags, extent->status, (int)extent->index);
- fprintf(stderr, "writes:%d\t reads:%d\t ioreq:%d\t\n", extent->writes, extent->reads, extent->ioreq);
+ fprintf(stderr, "writes:%d\t reads:%d\t ioreq:%Ld\t\n", extent->writes, extent->reads, extent->ioreq);
}
Index: internal.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/internal.h,v
diff -p -u -r1.2 -r1.3
--- internal.h 23 Mar 2004 04:07:15 -0000 1.2
+++ internal.h 21 Sep 2004 13:46:12 -0000 1.3
@@ -82,18 +82,19 @@ typedef struct NCAC_dev NCAC_dev_t;
struct NCAC_req{
- int id;
- int optype;
- int status;
- int error;
+ int id;
+ int optype;
+ int status;
+ int error;
PVFS_fs_id coll_id;
PVFS_handle handle;
PVFS_context_id context_id;
- PVFS_size usrlen;
- PVFS_size written;
- char *usrbuf;
+ PVFS_size usrlen;
+ PVFS_size written;
+ char *usrbuf;
+ PVFS_offset *foff;
char ** cbufoff;
PVFS_size *cbufsize;
int *cbufflag;
@@ -114,7 +115,7 @@ struct NCAC_req{
struct inode *mapping;
struct aiovec *aiovec;
- int ioreq;
+ PVFS_id_gen_t ioreq;
int read_out;
struct list_head list;
@@ -137,6 +138,8 @@ typedef struct NCAC_req NCAC_req_t;
/* this is an inode-like structure for each
* object <coll_id, handle>
*/
+#define MAX_INODE_NUM 10000
+
struct inode
{
NCAC_lock lock;
@@ -154,8 +157,11 @@ struct inode
struct aiovec aiovec;
struct cache_stack *cache_stack;
+ struct inode *next;
};
+extern struct inode *inode_arr[MAX_INODE_NUM];
+
struct extent {
unsigned long flags;
@@ -175,16 +181,21 @@ struct extent {
struct extent *next;
struct inode *mapping;
- int ioreq;
- struct extent *ioreq_next;
+ PVFS_id_gen_t ioreq;
+ /* for optimization. We can initiate one trove request for
+ * a list of extents. For doing that, all extents will share
+ * the same ioreq. If the ioreq is done, we follow ioreq_next
+ * to mark all other extents.
+ */
+ struct extent *ioreq_next;
};
-
#define MAX_DELT_REQ_NUM 10000
+
#define NCAC_OK 0
#define NCAC_REQ_BUILD_ERR -1000
#define NCAC_SUBMIT_ERR -1001
@@ -232,8 +243,15 @@ int NCAC_rwjob_prepare(NCAC_req_t *ncac_
int NCAC_do_jobs(struct list_head *list, struct list_head *bufcomp_list, struct list_head * comp_list, NCAC_lock *lock);
int NCAC_do_a_job(NCAC_req_t *req, struct list_head *list, struct list_head *bufcomp_list, struct list_head * comp_list, NCAC_lock *lock);
+#define NCAC_COMM_NOT_READY 0
+#define NCAC_READ_PREPARE 1
+#define NCAC_READING 2
+#define NCAC_READ_READY 3
+
int NCAC_do_one_piece_read(NCAC_req_t *ncac_req, PVFS_offset pos,
- PVFS_size size, char **cbufoff,
+ PVFS_size size,
+ PVFS_offset *foff,
+ char **cbufoff,
PVFS_size *cbufsize, struct extent *cbufhash[],
int *cbufflag, int *cbufrcnt, int *cbufwcnt, int *cnt);
Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/module.mk.in,v
diff -p -u -r1.1 -r1.2
--- module.mk.in 21 Aug 2003 18:57:27 -0000 1.1
+++ module.mk.in 21 Sep 2004 13:46:12 -0000 1.2
@@ -1,11 +1,13 @@
DIR := src/io/buffer
SERVERSRC += \
- $(DIR)/ncac-interface.c \
- $(DIR)/ncac-trove.c \
- $(DIR)/ncac-job.c \
- $(DIR)/ncac-buf-job.c \
- $(DIR)/ncac-init.c \
- $(DIR)/internal.c \
- $(DIR)/cache.c \
- $(DIR)/state.c \
- $(DIR)/radix.c
+ $(DIR)/ncac-interface.c \
+ $(DIR)/ncac-trove.c \
+ $(DIR)/ncac-job.c \
+ $(DIR)/ncac-buf-job.c \
+ $(DIR)/ncac-init.c \
+ $(DIR)/internal.c \
+ $(DIR)/cache.c \
+ $(DIR)/ncac-lru.c \
+ $(DIR)/state.c \
+ $(DIR)/radix.c
+
Index: ncac-buf-job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-buf-job.c,v
diff -p -u -r1.1 -r1.2
--- ncac-buf-job.c 21 Aug 2003 18:57:27 -0000 1.1
+++ ncac-buf-job.c 21 Sep 2004 13:46:12 -0000 1.2
@@ -17,6 +17,8 @@
int NCAC_do_a_bufread_job(struct NCAC_req *ncac_req)
{
+
+#if 0
int ret;
int seg, cnt;
int rcomm=0;
@@ -28,7 +30,9 @@ int NCAC_do_a_bufread_job(struct NCAC_re
/* only one contiguous segment */
if ( !ncac_req->offcnt ) {
ret = NCAC_do_one_piece_read( ncac_req, ncac_req->pos,
- ncac_req->size, ncac_req->cbufoff,
+ ncac_req->size,
+ ncac_req->foff,
+ ncac_req->cbufoff,
ncac_req->cbufsize, ncac_req->cbufhash,
ncac_req->cbufflag,
ncac_req->cbufrcnt,
@@ -47,6 +51,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
for (seg = 0; seg < ncac_req->offcnt; seg ++) {
ret = NCAC_do_one_piece_read( ncac_req, ncac_req->offvec[seg],
ncac_req->sizevec[seg],
+ ncac_req->foff + cnt,
ncac_req->cbufoff + cnt,
ncac_req->cbufsize + cnt,
ncac_req->cbufhash + cnt,
@@ -95,6 +100,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
}
}
+#endif
return 0;
@@ -103,6 +109,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
int NCAC_do_a_bufwrite_job(struct NCAC_req *ncac_req)
{
+#if 0
int ret;
int seg, cnt;
int rcomm=0;
@@ -181,6 +188,7 @@ int NCAC_do_a_bufwrite_job(struct NCAC_r
}
}
+#endif
return 0;
}
Index: ncac-init.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-init.c,v
diff -p -u -r1.3 -r1.4
--- ncac-init.c 23 Mar 2004 04:07:15 -0000 1.3
+++ ncac-init.c 21 Sep 2004 13:46:13 -0000 1.4
@@ -15,7 +15,7 @@ extern int posix_memalign(void **memptr,
/* global variable */
NCAC_dev_t NCAC_dev;
-struct inode *inode_arr[1000];
+struct inode *inode_arr[MAX_INODE_NUM];
static inline void init_free_extent_list(int num);
static inline void init_free_req_list(int num);
@@ -76,7 +76,7 @@ int cache_init(NCAC_info_t *info)
INIT_LIST_HEAD( &NCAC_dev.comp_list);
- memset( inode_arr, 0, sizeof(struct inode*)*1000 );
+ memset( inode_arr, 0, sizeof(struct inode*)*MAX_INODE_NUM );
NCAC_dev.get_value = radix_get_value;
NCAC_dev.max_b = RADIX_MAX_BITS;
Index: ncac-interface.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-interface.c,v
diff -p -u -r1.1 -r1.2
--- ncac-interface.c 21 Aug 2003 18:57:27 -0000 1.1
+++ ncac-interface.c 21 Sep 2004 13:46:13 -0000 1.2
@@ -222,9 +222,7 @@ int cache_req_test(cache_request_t *requ
else
*flag = 0;
-
return 0;
-
}
int cache_req_testsome(int count,
Index: ncac-job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-job.c,v
diff -p -u -r1.2 -r1.3
--- ncac-job.c 17 Nov 2003 19:19:29 -0000 1.2
+++ ncac-job.c 21 Sep 2004 13:46:13 -0000 1.3
@@ -1,4 +1,4 @@
-/* This file defines the horseworker for each particular type of jobs. */
+/* This file defines the horseworkers for each particular type of jobs. */
#include <stdio.h>
#include <stdlib.h>
@@ -12,347 +12,141 @@
#include "ncac-trove.h"
extern struct cache_stack global_cache_stack;
-extern struct inode *inode_arr[1000];
/* internal functions */
-static inline struct extent * NCAC_find_get_ext(NCAC_req_t *ncac_req, unsigned long index);
-static inline struct extent * NCAC_alloc_ext(NCAC_req_t *ncac_req);
-static inline struct extent * NCAC_alloc_ext_wait(NCAC_req_t *ncac_req);
-static inline int NCAC_add_to_cache(struct extent * extent,unsigned long index, NCAC_req_t *ncac_req);
-
+static inline struct extent *find_extent(NCAC_req_t *ncac_req,
+ unsigned long index);
+static inline struct extent *allocate_extent(NCAC_req_t *ncac_req,
+ int flag);
+static inline int free_extent(NCAC_req_t *ncac_req,
+ struct extent *extent);
+static inline int init_extent_read(NCAC_req_t *ncac_req,
+ struct extent *extent, PVFS_offset foffset, PVFS_size size);
+static inline void set_extent_read_pending(struct extent *extent);
+static inline int check_extent_read(NCAC_req_t *ncac_req, struct extent *extent);
+static inline void increase_read_reference(struct extent *extent);
+static inline int add_extent_to_cache(struct extent * extent,
+ unsigned long index, NCAC_req_t *ncac_req, int policy);
/* do a read job.
- * return: <0 error code
+ * return: < 0 error code
* 0: ok
* ncac_req->status shows the current status of the job
* ncac_req->error shows the current error of the job if any.
*
* Lock stuff: A design choice has been made to do locks as follows:
* 1) each inode has a lock;
- * 2) each cache stack has a lock (many inodes may share a same cache stack).
- * To avoid lock calls on each extent, we had a sort of "big" lock across jobs on an inode.
- * During a job processing, if the cache stack is touched, the job should acquire the cache
- * stack lock. So the lock order is:
- * inode lock
+ * 2) each cache stack has a lock (many inodes may share a same
+ * cache stack).
+ * To avoid lock calls on each extent, we had a sort of "big" lock
+ * across jobs on an inode. During a job processing, if the cache stack
+ * is touched, the job should acquire the cache stack lock. So the lock
+ * order is:
+ * inode lock
* ----> cache stack lock
* ----> release cache stack lock
- * release inode lock
+ * release inode lock
*
- * So, we make a tradeoff between the number of lock calls and the granularity of lock.
+ * So, we make a tradeoff between the number of lock calls and the
+ * granularity of lock.
*/
int NCAC_do_a_read_job(struct NCAC_req *ncac_req)
{
- int ret;
- int seg, cnt;
- int rcomm=0;
-
- inode_lock(&ncac_req->mapping->lock);
-
- /* only one contiguous segment */
- if ( !ncac_req->offcnt ) {
- ret = NCAC_do_one_piece_read( ncac_req, ncac_req->pos,
- ncac_req->size, ncac_req->cbufoff,
- ncac_req->cbufsize, ncac_req->cbufhash,
- ncac_req->cbufflag,
- ncac_req->cbufrcnt,
- ncac_req->cbufwcnt,
- &cnt);
- if ( ret < 0) {
- ncac_req->error = NCAC_JOB_PROCESS_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
-
- inode_unlock( &ncac_req->mapping->lock );
-
- return ret;
- }
- }else{
-
- /* Handle each contiguous piece one by one. */
-
- cnt = 0;
- for (seg = 0; seg < ncac_req->offcnt; seg ++) {
- ret = NCAC_do_one_piece_read( ncac_req, ncac_req->offvec[seg],
- ncac_req->sizevec[seg],
- ncac_req->cbufoff + cnt,
- ncac_req->cbufsize + cnt,
- ncac_req->cbufhash + cnt,
- ncac_req->cbufflag + cnt,
- ncac_req->cbufrcnt + cnt,
- ncac_req->cbufwcnt + cnt, &seg);
- if ( ret < 0) {
- ncac_req->error = NCAC_JOB_PROCESS_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
-
- inode_unlock( &ncac_req->mapping->lock );
-
- return ret;
- }
- cnt += seg;
- }
- }
+ int ret;
+ struct extent **cbufhash;
+ PVFS_offset *foff;
+ int *cbufflag;
+ struct extent *new_extent;
+ struct extent *last_extent;
- inode_unlock(&ncac_req->mapping->lock);
-
- for (seg = 0; seg < ncac_req->cbufcnt; seg ++)
- if (ncac_req->cbufflag[seg] == 1) rcomm++;
-
- if (rcomm == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
- else if (!rcomm) ncac_req->status = NCAC_REQ_SUBMITTED;
- else ncac_req->status = NCAC_PARTIAL_PROCESS;
-
- return 0;
-}
-
-/* NCAC_do_one_piece_read(): handle one contiguous block.
- * return:
- * < 0: error
- * = 0: no error
- * at the same time, ncac_req->error shows error no if any.
- * ncac_req->status shows the status of this one piece.
- *
- * TODO: 1) use gang lookup;
- * 2) allocate contiguous extents from a bigger buffer
- */
-
-int NCAC_do_one_piece_read(NCAC_req_t *ncac_req, PVFS_offset pos,
- PVFS_size size, char **cbufoff,
- PVFS_size *cbufsize, struct extent *cbufhash[],
- int *cbufflag,
- int *cbufrcnt,
- int *cbufwcnt,
- int *cnt)
-{
unsigned long index;
- unsigned int offset = 0, nr = 0;
- struct extent *cached_ext;
- struct extent *extent;
- int error;
- int ret;
-
- struct aiovec aiovec_arr, *aiovec;
- int ioreq;
-
- int cbufcnt;
- int toread=0;
- int slots;
- int i, j;
-
- PVFS_offset oldpos = pos;
-
-
- aiovec = &aiovec_arr;
- aiovec_init(aiovec);
-
- cbufcnt = (pos+size+ NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize;
- *cnt = cbufcnt;
-
- cached_ext = NULL;
- index = pos >> NCAC_dev.extlog2;
+ int comcnt, readcnt;
+ int i;
- DPRINT("one_piece_read: pos=%Ld, sindex=%ld cnt=%d\n", pos, index, cbufcnt);
- for (i=0; i< cbufcnt; i++) {
-
- if ( cbufhash[i] ) {
- DPRINT("Read recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
-
- /* still previous writes pending on this */
- if ( cbufwcnt[i] > cbufhash[i]->wcmp ) {
- index ++;
- pos += nr;
- continue;
- }
-
- if ( cbufwcnt[i] < cbufhash[i]->wcmp ) {
- NCAC_error("Error: wcnt should not be less than cmp\n");
- index ++;
- pos += nr;
- continue;
+ /* even there are "comcnt" communication buffers, the
+ * number of extents needed may be less.
+ */
+ comcnt = ncac_req->cbufcnt;
+ fprintf(stderr, "NCAC_do_a_read_job: enter (comcnt=%d)\n", comcnt);
+
+ cbufhash = ncac_req->cbufhash;
+ foff = ncac_req->foff;
+ cbufflag = ncac_req->cbufflag;
+
+ inode_lock (&ncac_req->mapping->lock);
+
+ last_extent = NULL;
+ for (i=0; i<comcnt; i++){
+ if ( NULL == cbufhash[i] ){
+ index = foff[i] >> NCAC_dev.extlog2;
+ new_extent = find_extent(ncac_req, index);
+ if ( NULL == new_extent ){ /* not cached */
+ new_extent= allocate_extent(ncac_req,BLOCKING_EXTENT_ALLOC);
+ if ( new_extent ){
+ new_extent->index = index;
+ new_extent->mapping = ncac_req->mapping;
+ new_extent->ioreq = INVAL_IOREQ;
+
+ ret = init_extent_read(ncac_req, new_extent,
+ foff[i], NCAC_dev.extsize);
+ if ( ret < 0 ) {
+ NCAC_error("init_extent_read error ext:%p\n",
+ new_extent);
+
+ free_extent(ncac_req, new_extent);
+ inode_unlock (&ncac_req->mapping->lock);
+ return ret;
+ }
+ add_extent_to_cache(new_extent, index, ncac_req,
+ LRU_POLICY);
+ set_extent_read_pending(new_extent);
+ cbufhash[i] = new_extent;
+ }
+ }else{ /* cached */
+ cbufhash[i] = new_extent;
+ hit_cache_item(new_extent, LRU_POLICY);
}
- extent = cbufhash[i];
- offset = cbufoff[i] - extent->addr;
- nr = cbufsize[i];
-
- DPRINT("recheck: offset=%p, nr=%d extent=%p\n", cbufoff[i], nr, extent);
- error = NCAC_extent_read_access_recheck(ncac_req, extent, offset, nr);
- if (error < 0){
- ncac_req->error = error;
- NCAC_error("NCAC_extent_read_access_recheck error extent=%p\n", extent);
- return error;
- }
- cbufflag[i] = error;
-
- DPRINT("Read recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
- index ++;
- pos += nr;
- continue;
- }
-
-
- offset = (unsigned long)pos & (NCAC_dev.extsize -1);
- nr = cbufsize[i];
-
- /* try to find an cached extent. If cached, the reference count is
- * added.
- */
- extent = NCAC_find_get_ext(ncac_req, index);
-
- if (extent == NULL) {
- goto no_cached_extent;
+ /* only one reference for each request */
+ if ( cbufhash[i] && cbufhash[i] != last_extent )
+ increase_read_reference(cbufhash[i]);
+ last_extent = cbufhash[i];
}
-
- /* the extent is cached */
- error = NCAC_extent_read_access(ncac_req, extent, offset, nr);
- if (error < 0){
- extent_ref_release( extent );
- ncac_req->error = error;
- return error;
- }
-
- DPRINT("index=%ld is cached: extent flags:%lx reads=%d, writes=%d, rcmp=%d, wcmp=%d\n", index, extent->flags, extent->reads, extent->writes, extent->rcmp, extent->wcmp);
-
- cbufflag[i] = error; /* maybe ready, maybe not */
- cbufhash[i] = extent;
-
- cbufrcnt[i] = extent->reads;
- cbufwcnt[i] = extent->writes;
-
- cbufoff[i] += (unsigned long)extent->addr;
-
- /* prepare for the next extent */
- index += 1;
- pos += nr;
-
- continue; /* continue for the next extent */
-
-no_cached_extent:
- /* the extent was not cached. we need to create a new extent. */
-
- if (!cached_ext) {
- cached_ext = NCAC_alloc_ext_wait(ncac_req);
- if (cached_ext) {
- NCAC_extent_first_read_access(ncac_req, cached_ext);
- cached_ext->index = index;
- cached_ext->mapping = ncac_req->mapping;
- }
+ if ( cbufhash[i] ){
+ ret = 1;
+ if ( PageReadPending(cbufhash[i]) ){
+ fprintf(stderr, "extent:%p ioreq:%Ld\n", cbufhash[i], cbufhash[i]->ioreq);
+ ret = check_extent_read(ncac_req, cbufhash[i]);
+ if (ret < 0){
+ ncac_req->error = ret;
+ NCAC_error("check_read_pending extent=%p\n", cbufhash[i]);
+
+ inode_unlock (&ncac_req->mapping->lock);
+ return ret;
+ }
+ }
+ cbufflag[i] = ret;
}
-
- extent = cached_ext;
- cached_ext = NULL;
-
- cbufhash[i] = extent;
- if ( extent ){
- cbufoff[i] += (unsigned long)extent->addr;
- cbufflag[i] = 0; /* not ready for communication */
-
- cbufhash[i]->ioreq = INVAL_IOREQ;
-
- toread ++;
-
- cbufrcnt[i] = extent->reads;
- cbufwcnt[i] = extent->writes;
- }
-
- /* prepare for the next extent */
- index += 1;
- pos += nr;
}
- if ( !toread ) return 0;
-
- pos = oldpos;
- for (i = 0; i < cbufcnt; i++ ) {
-
- if ( cbufhash[i] && PageBlank(cbufhash[i]) ) {
-
- slots = aiovec_add(aiovec, cbufhash[i], pos, cbufsize[i], cbufoff[i], cbufsize[i]);
-
- DPRINT("do_a_job: going to read (%Ld %Ld) to %p\n", pos, cbufsize[i], cbufoff[i]);
- pos += cbufsize[i];
-
- if (!slots){
- ret = NCAC_aio_read_ext(ncac_req->coll_id, ncac_req->handle, ncac_req->context_id, aiovec, &ioreq);
- if ( ret < 0 ) {
- ncac_req->error = NCAC_TROVE_AIO_REQ_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
-
- NCAC_error("aio_read_ext error\n");
-
- aiovec_init(aiovec);
- return ret;
- }else{
- aiovec->extent_array[0]->ioreq = ioreq;
- extent = aiovec->extent_array[0];
- extent->ioreq_next = extent;
- for (j = 1; j < aiovec_count(aiovec); j ++) {
- aiovec->extent_array[j]->ioreq = ioreq;
-
- aiovec->extent_array[j-1]->ioreq_next = aiovec->extent_array[j];
- }
-
- aiovec->extent_array[aiovec_count(aiovec)-1]->ioreq_next = aiovec->extent_array[0];
- }
-
- DPRINT("do_a_job: aio_read cnt=%d\n", aiovec_count(aiovec));
- aiovec_init(aiovec);
- }
- }
- }
-
- DPRINT("do_one_piece_read: aio_read cbufcnt=%d, cnt=%d\n", cbufcnt, aiovec_count(aiovec));
-
- ioreq = INVAL_IOREQ;
-
- if (aiovec_count(aiovec)){
- ret = NCAC_aio_read_ext(ncac_req->coll_id, ncac_req->handle, ncac_req->context_id, aiovec, &ioreq);
- if ( ret < 0 ) {
- ncac_req->error = NCAC_TROVE_AIO_REQ_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
- aiovec_init(aiovec);
-
- NCAC_error("do_one_piece_read: NCAC_aio_read_ext error\n");
-
- return ret;
- }else{
- aiovec->extent_array[0]->ioreq = ioreq;
- extent = aiovec->extent_array[0];
- extent->ioreq_next = extent;
- for (i= 1; i < aiovec_count(aiovec); i ++) {
- aiovec->extent_array[i]->ioreq = ioreq;
-
- aiovec->extent_array[i-1]->ioreq_next = aiovec->extent_array[i];
- }
+ inode_unlock (&ncac_req->mapping->lock);
- aiovec->extent_array[aiovec_count(aiovec)-1]->ioreq_next = aiovec->extent_array[0];
- }
- aiovec_init(aiovec);
+ readcnt = 0;
+ for (i=0; i<comcnt; i++){
+ if (ncac_req->cbufflag[i]) readcnt++;
}
- /* add to cache */
- for (i = 0; i < cbufcnt; i++ ) {
- if ( cbufhash[i] && PageBlank(cbufhash[i]) ) {
- ClearPageBlank(cbufhash[i]);
- ret = NCAC_add_to_cache(cbufhash[i], cbufhash[i]->index, ncac_req);
-
- if ( ret < 0 ) {
- ncac_req->error = NCAC_CACHE_ERR;
-
- NCAC_error("do_one_piece_read: add_to_cache error: index=%ld\n", index);
-
- return ret;
- }
- }
- }
+ if (readcnt == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
+ else if (!readcnt) ncac_req->status = NCAC_REQ_SUBMITTED;
+ else ncac_req->status = NCAC_PARTIAL_PROCESS;
+ fprintf(stderr, "NCAC_do_a_read_job: exit\n");
return 0;
}
-
/* do a write job.
* return: <0 error code
* 0: ok
@@ -362,268 +156,11 @@ no_cached_extent:
int NCAC_do_a_write_job(struct NCAC_req *ncac_req)
{
- int ret;
- int seg, cnt;
- int rcomm=0;
-
- inode_lock(&ncac_req->mapping->lock);
-
- /* only one contiguous segment */
- if ( !ncac_req->offcnt ) {
- ret = NCAC_do_one_piece_write( ncac_req, ncac_req->pos,
- ncac_req->size,
- ncac_req->cbufoff, ncac_req->cbufsize,
- ncac_req->cbufhash, ncac_req->cbufflag,
- ncac_req->cbufrcnt,
- ncac_req->cbufwcnt,
- &cnt );
- if ( ret < 0) {
- ncac_req->error = NCAC_JOB_PROCESS_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
-
- inode_unlock(&ncac_req->mapping->lock);
-
- return ret;
- }
- }else{
-
- /* Handle each contiguous piece one by one. */
-
- cnt = 0;
- for (seg = 0; seg < ncac_req->offcnt; seg ++) {
- ret = NCAC_do_one_piece_write( ncac_req, ncac_req->offvec[seg],
- ncac_req->sizevec[seg],
- ncac_req->cbufoff + cnt,
- ncac_req->cbufsize + cnt,
- ncac_req->cbufhash + cnt,
- ncac_req->cbufflag + cnt,
- ncac_req->cbufrcnt + cnt,
- ncac_req->cbufwcnt + cnt,
- &seg );
- if ( ret < 0) {
- ncac_req->error = NCAC_JOB_PROCESS_ERR;
- ncac_req->status = NCAC_ERR_STATUS;
-
- inode_unlock(&ncac_req->mapping->lock);
-
- return ret;
- }
- cnt += seg;
- }
- }
-
- inode_unlock(&ncac_req->mapping->lock);
-
- for (seg = 0; seg < ncac_req->cbufcnt; seg ++)
- if (ncac_req->cbufflag[seg] == 1 ) rcomm++;
-
- if (rcomm == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
- else if (!rcomm) ncac_req->status = NCAC_REQ_SUBMITTED;
- else ncac_req->status = NCAC_PARTIAL_PROCESS;
-
return 0;
} /* end of do_a_write_job */
-/* NCAC_do_one_piece_write(): handle one contiguous block write.
- * return:
- * < 0: error
- * = 0: no error
- * at the same time, ncac_req->error shows error no if any.
- * ncac_req->status shows the status of this one piece.
- *
- * TODO: 1) use gang lookup;
- * 2) allocate contiguous extents from a bigger buffer
- */
-
-int NCAC_do_one_piece_write(NCAC_req_t *ncac_req, PVFS_offset pos,
- PVFS_size size, char **cbufoff,
- PVFS_size *cbufsize, struct extent *cbufhash[],
- int *cbufflag,
- int *cbufrcnt,
- int *cbufwcnt,
- int *cnt)
-{
- unsigned long index, offset, nr;
- struct extent *cached_ext;
- struct extent *extent;
- int error;
- int ret;
-
- struct aiovec *aiovec;
- int ioreq;
-
- int cbufcnt;
- int i;
-
- /* each inode has an aiovec */
- aiovec = get_aiovec(ncac_req);
- aiovec_init(aiovec);
-
- cbufcnt = (pos+size+ NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize;
- *cnt = cbufcnt;
-
- cached_ext = NULL;
- index = pos >> NCAC_dev.extlog2;
-
- for (i=0; i< cbufcnt; i++) {
- nr = cbufsize[i];
-
- if ( cbufhash[i] ) { /* extent is avaiable. */
-
- DPRINT("Write recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
- /* ugly here: 1: ok, 2: read-modify-write */
- if ( cbufflag[i] == 1 ) { /* has been assigned */
- index ++;
- pos += nr;
- continue;
- }
-
-
- /* Are previous reads and writes pending on this?
- * "+1" to exclude the request itself.
- */
- if ( cbufwcnt[i] > cbufhash[i]->wcmp + 1 ||
- cbufrcnt[i] > cbufhash[i]->rcmp ) {
- index ++;
- pos += nr;
- continue;
- }
-
- /* this is only for error check */
- if ( cbufwcnt[i] < cbufhash[i]->wcmp + 1 ||
- cbufrcnt[i] < cbufhash[i]->rcmp ) {
- NCAC_error("Error: r/wcnt should not be less than r/wcmp\n");
- index ++;
- pos += nr;
- continue;
- }
-
- /* no other pending read or writes on this extent */
- extent = cbufhash[i];
- offset = cbufoff[i] - extent->addr;
-
- error = NCAC_extent_write_access_recheck(ncac_req, extent, offset, nr);
- if (error < 0){
- ncac_req->error = error;
- fprintf(stderr, "NCAC_extent_read_access_recheck error extent=%p\n", extent);
- return error;
- }
- cbufflag[i] = error;
-
- DPRINT("Write recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
-
-
- index ++;
- pos += nr;
- continue;
- }
-
-
- offset = (unsigned long)pos & (NCAC_dev.extsize -1);
-
- extent = NCAC_find_get_ext(ncac_req, index);
-
- if (extent == NULL) {
- goto no_cached_extent;
- }
-
- /* the extent is cached */
- error = NCAC_extent_write_access(ncac_req, extent, offset, nr);
- if (error < 0){
- ncac_req->error = error;
- return error;
- }
-
- cbufflag[i] = error; /* 1 for ready, 0 for not ready */
- cbufhash[i] = extent;
-
-
- /* how many reads and writes pending on this extent before
- * this request */
- cbufrcnt[i] = extent->reads;
- cbufwcnt[i] = extent->writes;
-
- DPRINT("index=%ld is cached: extent flags:%lx cbufflag=%d, reads=%d, writes=%d, rcmp=%d, wcmp=%d\n", index, extent->flags, cbufflag[i], extent->reads, extent->writes, extent->rcmp, extent->wcmp);
-
- cbufoff[i] += (unsigned long)extent->addr;
-
- /* prepare for the next extent */
- index += 1;
- pos += nr;
-
- continue; /* continue for the next extent */
-
-no_cached_extent:
- /* the extent was not cached. we need to create a new extent. */
-
- if (!cached_ext) {
- cached_ext = NCAC_alloc_ext_wait(ncac_req);
- if (!cached_ext) {
- cbufflag[i] = 0;
- }else{
- NCAC_extent_first_write_access(ncac_req, cached_ext);
-
- cached_ext->index = index;
- cached_ext->mapping = ncac_req->mapping;
-
- cbufflag[i] = 1;
-
- cbufrcnt[i] = cached_ext->reads;
- cbufwcnt[i] = cached_ext->writes;
-
-
- /* deal with read, modify and write. In the case if the write size is
- * not the whole write unit, we should read it first, modify it, and
- * then write.
- */
- if ( cbufflag[i] && ( cbufoff[i] || cbufsize[i] < NCAC_dev.extsize )){
- DPRINT("--------do read-modify-write\n");
-
- do_read_for_rmw(ncac_req->coll_id,
- ncac_req->handle,
- ncac_req->context_id,
- cached_ext,
- pos,
- cbufoff[i],
- cbufsize[i],
- &ioreq);
- mark_extent_rmw_lock(cached_ext, ioreq);
- cbufflag[i] = 2;
- }
- cbufoff[i] += (unsigned long)cached_ext->addr;
-
- ret = NCAC_add_to_cache(cached_ext,index, ncac_req);
-
- if (ret) {
- cbufflag[i] = 0;
- cbufhash[i] = 0;
- ncac_req->error = NCAC_CACHE_ERR;
- return ret;
- }
- ncac_req->nr_dirty ++;
-
- DPRINT("index=%ld is NOT cached: extent flags:%lx reads=%d, writes=%d, rcmp=%d, wcmp=%d, cbufoff=%p, flag=%d, size=%Ld pos=%Ld\n", index, cached_ext->flags, cached_ext->reads, cached_ext->writes, cached_ext->rcmp, cached_ext->wcmp, cbufoff[i], cbufflag[i], cbufsize[i], pos);
- }
- }
-
- extent = cached_ext;
- cached_ext = NULL;
-
- cbufhash[i] = extent;
-
- /* prepare for the next extent */
- index += 1;
- pos += nr;
- }
-
- return 0;
-
-} /* end of do_one_piece_write */
-
int NCAC_do_a_query_job(struct NCAC_req *ncac_req)
{
NCAC_error("NCAC_do_a_query_job: not implemented yet\n");
@@ -642,73 +179,40 @@ int NCAC_do_a_sync_job(struct NCAC_req *
return 0;
}
-/* some internal functions */
-/* NCAC_find_get_ext(): try to find an extent from the inode cache tree.
- * This operation is protected by the inode lock. The caller should acquire
- * the inode lock.
+/*
+ * find_extent(): try to find an extent from the inode cache tree.
+ * This operation is protected by the inode lock. The caller should
+ * acquire the inode lock.
*/
-static inline struct extent * NCAC_find_get_ext(NCAC_req_t *ncac_req, unsigned long index)
+static inline struct extent *find_extent(NCAC_req_t *ncac_req,
+ unsigned long index)
{
struct extent *avail;
avail = lookup_cache_item(ncac_req->mapping, index);
-
-#if 0 /* take this back when we have finer lock */
- if ( avail ) { /* add its reference count to prevent disappearance */
- extent_ref_get( avail );
- }
-#endif
-
return avail;
-
}
-/* NCAC_alloc_ext(): get a new extent
- * The caller should have an inode lock
- */
-
-static inline struct extent * NCAC_alloc_ext(NCAC_req_t *ncac_req)
-{
- struct extent *new = NULL;
- struct cache_stack *cache;
- char *buf;
-
- cache = ncac_req->mapping->cache_stack;
-
- if ( !list_empty( &cache->free_extent_list ) ) {
-
- cache_lock( &cache->lock);
-
- new = get_free_extent_list_item( &(cache->free_extent_list) );
-
- cache_unlock(&cache->lock);
- }
-
- if (!new) return NULL;
-
- buf = new->addr;
- memset(new, 0, sizeof(struct extent));
- new->addr = buf;
- SetPageBlank(new);
- fprintf(stderr, "new extent:%p, flags:%lx\n", new, new->flags);
- return new;
-}
-
-
-/* NCAC_alloc_ext_wait(): if no extent is avaiable, discard some if possible.
- * Lock problem is a little more difficult than others since this funtion may
- * interact with the inode resource and the cache resource.
+/*
+ * allocate_extent(): get a new extent. The caller should have
+ * an inode lock. The flag is either BLOCKING_EXTENT_ALLOC or
+ * NONBLOCKING_EXTENT_ALLOC.
+ * If the "flag" is BLOCKING_EXTENT_ALLOC, if no extent is avaiable,
+ * discard some if possible. Lock problem is a little more difficult
+ * than others since this funtion may interact with the inode resource
+ * and the cache resource.
*
* This function is called by functions which holds its inode lock,
* only cache stack lock is needed.
- * .
*/
-static inline struct extent * NCAC_alloc_ext_wait(NCAC_req_t *ncac_req)
+
+static inline struct extent *allocate_extent(NCAC_req_t *ncac_req, int flag)
{
struct extent *new = NULL;
struct cache_stack *cache;
+ int shrinked;
char *buf;
int ret;
@@ -716,12 +220,9 @@ static inline struct extent * NCAC_alloc
cache = ncac_req->mapping->cache_stack;
if ( !list_empty( &cache->free_extent_list ) ) {
-
cache_lock( &cache->lock);
-
new = get_free_extent_list_item( &(cache->free_extent_list) );
-
- cache_unlock(&cache->lock);
+ cache_unlock(&cache->lock);
if ( new ) {
buf = new->addr;
@@ -731,46 +232,102 @@ static inline struct extent * NCAC_alloc
DPRINT("new extent:%p, flags:%lx\n", new, new->flags);
return new;
}
+ }
+
+ /* No free extent so far */
+ if ( BLOCKING_EXTENT_ALLOC == flag ){
- }
+ cache_lock( &cache->lock);
+ ret = shrink_cache(cache, DELT_DISCARD_NUM, LRU_POLICY, &shrinked);
+ if ( ret < 0 ) {
+ ncac_req->error = ret;
+ cache_unlock(&cache->lock);
+ return NULL;
+ }
+ new = get_free_extent_list_item( &(ncac_req->mapping->cache_stack->free_extent_list) );
+ cache_unlock(&cache->lock);
- cache_lock( &cache->lock);
+ if ( !new ) return NULL;
+ else {
+ buf = new->addr;
+ memset(new, 0, sizeof(struct extent));
+ new->addr = buf;
+ SetPageBlank(new);
+ DPRINT("new extent:%p, flags:%lx\n", new, new->flags);
+ return new;
+ }
+ }
- ret = try_to_discard_extents(cache, DELT_DISCARD_NUM);
- if ( ret < 0 ) {
- ncac_req->error = ret;
- cache_unlock(&cache->lock);
- return NULL;
- }
+ return NULL;
+}
- new = get_free_extent_list_item( &(ncac_req->mapping->cache_stack->free_extent_list) );
+/* add it later
+ * free_extent: return an extent to a list
+ */
+static inline int free_extent(NCAC_req_t *ncac_req,struct extent *extent)
+{
+ return 0;
+}
- cache_unlock(&cache->lock);
+/*
+ * init_extent_read: initiate trove request to read an extent. The
+ * file offset is "foffset", and the size is "size".
+ */
+static inline int init_extent_read(NCAC_req_t *ncac_req,
+ struct extent *extent, PVFS_offset foffset, PVFS_size size)
+{
+ int ret;
+ PVFS_id_gen_t ioreq;
- if ( !new ) return NULL;
- buf = new->addr;
- memset(new, 0, sizeof(struct extent));
- new->addr = buf;
- new->ioreq = INVAL_IOREQ;
- SetPageBlank(new);
- DPRINT("new extent:%p, flags:%lx, ioreq=%d\n", new, new->flags, new->ioreq);
- return new;
+ ret = init_io_read(ncac_req->coll_id, ncac_req->handle,
+ ncac_req->context_id, foffset, size, extent->addr, &ioreq);
+ if ( ret < 0 ) {
+ NCAC_error("init_io_read error\n");
+ return ret;
+ }
+ extent->ioreq = ioreq;
+ fprintf(stderr, "init_extent_read: foff:%Ld, size:%Ld, extent:%p, opid:%Ld\n", foffset, size, extent, ioreq);
+ return 0;
}
+static inline void set_extent_read_pending(struct extent *extent)
+{
+ ClearPageBlank(extent);
+ SetPageReadPending(extent);
+}
-
-static inline int NCAC_add_to_cache(struct extent * extent,unsigned long index, NCAC_req_t *ncac_req)
+static inline int check_extent_read(NCAC_req_t *ncac_req, struct extent *extent)
{
int ret;
- ret = add_cache_item(extent, ncac_req->mapping, index);
+ ret = NCAC_check_ioreq(extent);
+ if ( ret > 0 ){
+ ClearPageReadPending(extent);
+ SetPageClean(extent);
+ return 1;
+ }
+ return 0;
+}
- return ret;
+static inline void increase_read_reference(struct extent *extent)
+{
+ extent->reads ++;
+ return;
+}
+
+static inline void increase_write_reference(struct extent *extent)
+{
+ extent->reads ++;
+ return;
}
-static inline int NCAC_read_ext(struct extent *extent, PVFS_offset offset, unsigned long nr)
+static inline int add_extent_to_cache(struct extent * extent,
+ unsigned long index, NCAC_req_t *ncac_req, int policy)
{
- extent->ioreq = 0;
- return 0;
+ int ret;
+
+ ret = add_cache_item(extent, ncac_req->mapping, index, policy);
+
+ return ret;
}
Index: ncac-trove.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-trove.c,v
diff -p -u -r1.1 -r1.2
--- ncac-trove.c 21 Aug 2003 18:57:27 -0000 1.1
+++ ncac-trove.c 21 Sep 2004 13:46:13 -0000 1.2
@@ -271,7 +271,6 @@ int NCAC_check_ioreq(struct extent *exte
int count;
int ret;
-
op_id = extent->ioreq;
if ( op_id == INVAL_IOREQ ) {
@@ -287,8 +286,8 @@ int NCAC_check_ioreq(struct extent *exte
ret = trove_dspace_test(coll_id, op_id, context_id, &count, NULL, NULL, &state, TROVE_DEFAULT_TEST_TIMEOUT);
if ( ret > 0 ) {
- DPRINT("++++++++++++NCAC_check_ioreq: finished %Ld\n", op_id);
- extent->ioreq = INVAL_IOREQ;
+ fprintf(stderr, "++++++++++++NCAC_check_ioreq: finished %Ld\n", op_id);
+ extent->ioreq = INVAL_IOREQ;
}
return ret;
@@ -372,4 +371,29 @@ static inline void offset_shorten( int
*new_m_cnt = seg;
return;
+}
+
+int init_io_read( PVFS_fs_id coll_id, PVFS_handle handle,
+ PVFS_context_id context, PVFS_offset foffset,
+ PVFS_size size, void *buf, TROVE_op_id *ioreq)
+{
+ void *user_ptr_array[1] = { (char *) 13 };
+ int ret;
+
+ ret = trove_bstream_read_at(coll_id,
+ handle,
+ buf,
+ &size,
+ foffset,
+ 0, /* flags */
+ NULL, /* vtag */
+ user_ptr_array,
+ context,
+ ioreq);
+
+ if (ret < 0) {
+ NCAC_error("trove read at failed\n");
+ return -1;
+ }
+ return 0;
}
Index: ncac-trove.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-trove.h,v
diff -p -u -r1.1 -r1.2
--- ncac-trove.h 21 Aug 2003 18:57:27 -0000 1.1
+++ ncac-trove.h 21 Sep 2004 13:46:13 -0000 1.2
@@ -27,5 +27,8 @@ int do_read_for_rmw( PVFS_fs_id coll_id,
int size,
int *ioreq);
+int init_io_read( PVFS_fs_id coll_id, PVFS_handle handle,
+ PVFS_context_id context, PVFS_offset foffset,
+ PVFS_size size, void *buf, PVFS_id_gen_t *ioreq);
#endif /* __CACHE_STORAGE_H */
Index: state.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/state.c,v
diff -p -u -r1.4 -r1.5
--- state.c 17 Nov 2003 19:19:29 -0000 1.4
+++ state.c 21 Sep 2004 13:46:13 -0000 1.5
@@ -215,8 +215,9 @@ int NCAC_extent_done_access(NCAC_req_t *
NCAC_extent_read_comm_done (ncac_req->cbufhash[i]);
//DecReadCount(ncac_req->cbufhash[i]);
ncac_req->cbufhash[i]->rcmp++;
+ ncac_req->cbufhash[i]->reads--;
}
- }
+ }
}
if (ncac_req->optype == NCAC_WRITE)
Index: state.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/state.h,v
diff -p -u -r1.1 -r1.2
--- state.h 21 Aug 2003 18:57:27 -0000 1.1
+++ state.h 21 Sep 2004 13:46:13 -0000 1.2
@@ -1,6 +1,9 @@
#ifndef __STATE_H_
#define __STATE_H_
+#define BLOCKING_EXTENT_ALLOC 1
+#define NONBLOCKING_EXTENT_ALLOC 0
+
int NCAC_extent_read_access(NCAC_req_t *req, struct extent *page,
unsigned long offset, unsigned long size);
int NCAC_extent_write_access(NCAC_req_t *req, struct extent *page,
@@ -17,5 +20,6 @@ int NCAC_extent_write_access_recheck(NCA
unsigned int offset, unsigned int size);
int NCAC_extent_done_access(NCAC_req_t *ncac_req);
void mark_extent_rmw_lock(struct extent *extent, int ioreq);
+void list_set_clean_page(struct extent *page);
#endif
More information about the PVFS2-CVS
mailing list