[PVFS2-CVS] commit by wujs in pvfs2/src/io/buffer: ncac-lru.c cache.c cache.h flags.h internal.c internal.h module.mk.in ncac-buf-job.c ncac-init.c ncac-interface.c ncac-job.c ncac-trove.c ncac-trove.h state.c state.h

CVS commit program cvs at parl.clemson.edu
Tue Sep 21 10:46:13 EDT 2004


Update of /projects/cvsroot/pvfs2/src/io/buffer
In directory parlweb:/tmp/cvs-serv23414

Modified Files:
	cache.c cache.h flags.h internal.c internal.h module.mk.in 
	ncac-buf-job.c ncac-init.c ncac-interface.c ncac-job.c 
	ncac-trove.c ncac-trove.h state.c state.h 
Added Files:
	ncac-lru.c 
Log Message:
Changes to the buffer code:
    1) Simplify the concurrency control in the buffer code. The upper
       layer is responsible for the write-sharing control.
    2) Separate cache mangement code from other code. "ncac-lru.c" is 
       for LRU policy. "ncac-arc" is for ARC policy which is added soon.
    3) Bug fixes in the list access.

Still working on it for writes and more testing.



--- /dev/null	2003-01-30 05:24:37.000000000 -0500
+++ ncac-lru.c	2004-09-21 09:46:13.000000000 -0400
@@ -0,0 +1,81 @@
+/* Specific functions related to LRU cache policy */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "internal.h"
+#include "state.h"
+#include "flags.h"
+#include "cache.h"
+
+
+/* add an extent into a lru cache list. The caller should hold the lock 
+ * of "cache". 
+ */
+void LRU_add_cache_item(struct cache_stack *cache,struct extent *extent)
+{
+    /* Insert an entry after the specified head "active_list". */
+    list_add(&extent->lru, &cache->active_list);
+    SetPageLRU(extent);
+    extent->mapping->nrpages++;
+    cache->nr_active++;
+}
+
+/* remove an extent from a lru cache list. The caller should hold the
+ * the lock of cache.
+ */
+void LRU_remove_cache_item(struct cache_stack *cache, struct extent *extent)
+{
+    list_del(&extent->lru);
+    extent->mapping->nrpages--;
+    cache->nr_active--;
+}
+
+/* shrink the LRU cache list by discarding some extents from the list.
+ * The expected number of extents discarded is "expected", while the
+ * real number of discarded extents is "shrinked".
+ */
+int LRU_shrink_cache(struct cache_stack *cache, unsigned int expected,
+                unsigned int *shrinked)
+{
+    struct list_head *lru_head, *lru_tail;
+    struct extent *victim;
+    int ret = 0;
+
+    fprintf(stderr, "%s: expected:%d\n", __FUNCTION__, expected);
+     
+    *shrinked = 0;
+    lru_head = &cache->active_list;
+    lru_tail = lru_head->prev;
+
+    while (*shrinked < expected && lru_tail != (& cache->active_list) ){
+        victim = list_entry(lru_tail, struct extent, lru);
+
+        if ( !PageLRU(victim) ){
+            NCAC_error("extent flag is wrong. LRU flag is expected\n");
+            ret = NCAC_INVAL_FLAGS;
+            break;
+        }
+
+        lru_tail = lru_tail->prev;
+
+        if (PageReadPending(victim) || PageWritePending(victim)){
+            ret = NCAC_check_ioreq(victim);
+            if (ret < 0){
+                NCAC_error("NCAC_check_ioreq error: index=%ld, ioreq=%Ld\n",
+                        victim->index, victim->ioreq);
+                break;
+            }
+
+            if (ret) { /* completion */
+                list_set_clean_page(victim);
+            }
+        }
+
+        if ( is_extent_discardable(victim) ){
+            LRU_remove_cache_item(cache, victim);
+            list_add_tail(&victim->list, &cache->free_extent_list);
+            *shrinked ++;
+        }
+    }
+    return ret;
+}

Index: cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/cache.c,v
diff -p -u -r1.1 -r1.2
--- cache.c	21 Aug 2003 18:57:27 -0000	1.1
+++ cache.c	21 Sep 2004 13:46:12 -0000	1.2
@@ -1,3 +1,6 @@
+/* common functions for cache management. These functions are shared
+ * by all cache policies.
+ */
 #include <stdio.h>
 #include <stdlib.h>
 
@@ -5,18 +8,13 @@
 #include "state.h"
 #include "flags.h"
 #include "cache.h"
-
-/* contains core functions about cache */
-
-
+#include "ncac-lru.h"
 
 /*
- * Given the index of an extent, look up whether this extent is cached
- * or not.
- * Cached: retured the extent 
- * NOT cached: return NULL.
- *
- * No cache position management. NOT cache policy related.
+ * lookup_cache_item: Given the index of an extent, look up whether this 
+ * extent is cached or not.
+ *     Cached: retured the extent 
+ *     NOT cached: return NULL.
  */
 struct extent * lookup_cache_item(struct inode *mapping, unsigned long index)
 {
@@ -27,19 +25,19 @@ struct extent * lookup_cache_item(struct
     return extent;
 }
 
-
 /* ==================================================================
- *                add an item into the cache:						*
+ * add an item into the cache:                                      *
  * (1) add an item into the radix tree (add_cache_item_no_policy).  *
- * (2) add an item into the cache (add_cache_item_with_policy).		*
- * ================================================================== 
+ * (2) add an item into the cache (add_cache_item_with_policy).     *
+ * ==================================================================
  */
 
 /* add_cache_item_no_policy(): add an extent into the cache tree.
  * Each inode has a cache tree, protected by its "lock". 
  * NOT cache policy related.
  */
-int add_cache_item_no_policy(struct extent *extent, struct inode *mapping, unsigned long index)
+static inline int add_cache_item_no_policy(struct extent *extent, 
+         struct inode *mapping, unsigned long index)
 {
 	int error;
 
@@ -53,40 +51,42 @@ int add_cache_item_no_policy(struct exte
 	return error;
 }
 
-
 /* add an item into a cache list with certain policy.
  * Current implementation is related to LRU. This function is
  * cache policy related.
  */
-void add_cache_item_with_policy(struct extent *extent)
+static inline void add_cache_item_with_policy(struct extent *extent, int cache_policy)
 {
     struct cache_stack *cache_stack = NULL;
 
     cache_stack = get_extent_cache_stack(extent);
 
-    if ( TestSetPageLRU(extent) )
-		NCAC_error("flag error");
-
-    add_page_to_inactive_list(cache_stack, extent);
-
-	return;
+    switch (cache_policy) {
+        case LRU_POLICY:
+            LRU_add_cache_item(cache_stack, extent);
+            break;
+
+        default:
+		    NCAC_error("unknown cache policy");
+            break;
+    }
 }
 
-
+/* add an extent into the cache. */
 int add_cache_item(struct extent *extent, struct inode *mapping,
-unsigned long index)
+            unsigned long index, int policy)
 {
+    /* 1. bookkeeping in the radix tree */
     int ret = add_cache_item_no_policy(extent, mapping, index);
 
+    /* 2. put into cache list with respect to the cache policy */ 
 	if (ret == 0){
-		add_cache_item_with_policy(extent);
+		add_cache_item_with_policy(extent, policy);
 		return ret;
 	}
     return ret;    
 }
 
-
-
 /* ==================================================================
  *                remove an item from the cache:					*
  * (1) remove it from the radix tree (remove_cache_item_no_policy). *
@@ -95,7 +95,7 @@ unsigned long index)
  * ================================================================== 
  */
 
-void remove_cache_item_no_policy(struct extent *extent)
+static void remove_cache_item_no_policy(struct extent *extent)
 {
     struct inode *mapping = extent->mapping;
 
@@ -104,29 +104,34 @@ void remove_cache_item_no_policy(struct 
     /* get this back if the "list" field is used */
     //list_del(&page->list);
     extent->mapping = NULL;
-
 }
 
-void remove_cache_item_with_policy(struct extent *victim)
+static void remove_cache_item_with_policy(struct extent *victim, int policy)
 {
-    list_del(&victim->lru);
-    victim->mapping->nrpages--;
+    struct cache_stack *cache;
+
+    cache = get_extent_cache_stack(victim);
+    if ( NULL == cache ){
+        NCAC_error("extent cache stack is NULL");
+        return;
+    }
+    switch (policy){
+        case LRU_POLICY:
+            LRU_remove_cache_item(cache, victim);
+            break;
+        default:
+		    NCAC_error("unknown cache policy");
+            break;
+    }
 }
 
-void remove_cache_item(struct extent *extent)
+void remove_cache_item(struct extent *extent, int policy)
 {
 
-    remove_cache_item_with_policy(extent);
+    remove_cache_item_with_policy(extent, policy);
     remove_cache_item_no_policy(extent);
 }
 
-
-void add_free_extent_list_item(struct list_head *head, struct extent *page)
-{
-     list_add_tail(&page->list, head);
-}
-
-
 struct extent * get_free_extent_list_item(struct list_head *list)
 {
     struct extent *new;
@@ -143,260 +148,49 @@ struct extent * get_free_extent_list_ite
 }
 
 
-int shrink_extent_cache( struct cache_stack *cache_stack, 
-						 unsigned int expected, unsigned int *scanned);
-int wakeup_dirty_flush(void);
-int shrink_extent_inactive( struct cache_stack *cache_stack, int max_scan, int expected );
-int refill_inactive_list( struct cache_stack *cache_stack, int expected);
-static inline int is_extent_movable(struct extent *victim);
-
-
-/* here is the main entry point part of cache replacement. 
- * When we run out of free extents, someone should be discarded 
- * from the cache. 
- */
-
-/* try_to_discard_extents(): this function scans the inactive_list
- * and try to discard up to  "expected" extents.
- * The discardable extents are clean and no referenced. If there is not
- * enough clean extents present, the dirty flush thread will be
- * waken up.
- *
- * this call return the number of extents which have been discarded.
- * if the return value is less than 0, error occurs.
- *
- */
-int  try_to_discard_extents( struct cache_stack *cache_stack, 
-			     unsigned int expected ) 
-{
-   int ret=0;
-   unsigned int scan;
-
-   DPRINT("try_to_discard_extents: dirty=%ld, expected=%d\n",cache_stack->nr_dirty, expected);
-
-   ret = shrink_extent_cache( cache_stack, expected, &scan);
-
-   if ( ret < 0 ) {
-	   NCAC_error("try_to_discard_extents: error in shrink_extent_cache\n");
-	   return ret;
-   }
-
-   DPRINT("try_to_discard_extents: expected=%d, flushed=%d\n", expected, ret);
-
-   if ( ret < expected && cache_stack->nr_dirty ) {
-       wakeup_dirty_flush();
-   }
-   
-   return ret;
-}
-
-/* if there is no pending write or read operations on this
- * extent, this extent is movable.
- */
-static inline int is_extent_movable(struct extent *victim)
-{
-    if ( PageClean(victim) && victim->writes == victim->wcmp && victim->reads == victim->rcmp )
-        return 1;
-    else return 0;
-}
-
-
-
-/* shrink_extent_cache(): this function is to discard as many as
- * "expected" clean extents from the cache.
- *
- * This function is dependent on the cache replacement policy. 
- * The current implementation is more and less a simplified
- * version of the LRU-2Q policy.
- *
- * return value is less than 0, if error. Otherwise, return
- * the number of extents shrinked.
- *
- */
-int shrink_extent_cache(struct cache_stack *cache_stack, unsigned int expected, unsigned int *scanned)
-{
-   int ret;
-   unsigned int nr_reclaimed = 0;
-   unsigned int nr_refilled = 0;
-   
-   DPRINT("shrink_extent_cache: to shrink inactive list: max_scan=%ld, expected=%d\n",cache_stack->nr_inactive, expected);
-
-   ret = shrink_extent_inactive(cache_stack, cache_stack->nr_inactive, expected);
-   if ( ret < 0 ) {
-       NCAC_error("shrink_extent_inactive error: error=%d\n", ret);
-       return ret;
-   }
-
-   DPRINT("shrink_extent_cache: to shrink inactive list: expected=%d, shrinked=%d\n",expected, ret);
-
-   nr_reclaimed += ret;
-
-   if ( nr_reclaimed >= expected) return nr_reclaimed;
-
-   /* how many extents are moved from active list to the inactive list? 
-    * In Linux, it tries to keep the active list of 2/3 size of the cache.
-    */
-   nr_refilled = expected * cache_stack->nr_active/ 
-					( (cache_stack->nr_inactive | 1) *2 );			
-
-   DPRINT("----------:refill inactive: num=%d, active=%ld, inactive=%ld\n", nr_refilled, cache_stack->nr_active, cache_stack->nr_inactive);
-
-   if ( !nr_refilled ) return nr_reclaimed;
-
-   /* Limit the number of refilled extents in one run. */
-   if ( nr_refilled > 2*REFILL_CLUSTER_MAX )
-       nr_refilled = 2*REFILL_CLUSTER_MAX;
-
-   ret = refill_inactive_list(cache_stack, nr_refilled);
-   if ( ret < 0 ) {
-       NCAC_error("refill_inactive_list error: error=%d\n", ret);
-       return ret;
-   }
-
-   ret = shrink_extent_inactive(cache_stack, cache_stack->nr_inactive, expected - nr_reclaimed);
-   if ( ret < 0 ) {
-       NCAC_error("shrink_extent_inactive error: error=%d\n", ret);
-       return ret;
-   }
-
-   nr_reclaimed += ret;
-
-   return nr_reclaimed;
-}
-
-
-/* shrink_extent_inactive(): dicards clean extents from the inactive_list.
- */
-int shrink_extent_inactive( struct cache_stack *cache_stack, int max_scan, int expected )
-{
-    int nr_to_process;
-	int error;
-    int ret = 0;
-    struct extent * victim;
-	struct list_head * inactive_list, *tail;
-
-    DPRINT("shrink_extent_inactive: max_scan=%d, expected=%d\n", max_scan, expected);
-
-    nr_to_process = expected;
-    if (nr_to_process < DISCARD_CLUSTER_MIN)
-        nr_to_process = DISCARD_CLUSTER_MIN;
-	
-    inactive_list =&cache_stack->inactive_list; 
-
-    tail  = inactive_list->prev;
-    while ( nr_to_process && tail != inactive_list ) {
-
-        victim = list_entry(tail, struct extent, lru);
-        tail = tail->prev;
-
-        if ( !PageLRU(victim) ) {
-			NCAC_error("extent flag is wrong\n");
-            return NCAC_INVAL_FLAGS;
-        }
-
-		DPRINT("victim.flags=%lx, wcnt=%d, rcnt=%d, wcmp=%d, rcmp=%d, index=%ld\n", victim->flags, victim->writes, victim->reads, victim->wcmp, victim->rcmp, victim->index);
-
-        if ( is_extent_movable(victim) ) {
-			remove_cache_item(victim);
-            add_free_extent_list_item(&cache_stack->free_extent_list, victim);
-
-            DPRINT("discard extent: %p\n", victim);
-
-			nr_to_process --; 
-			ret ++;
-        }            
-
-
-		if ( PageReadPending(victim) || PageWritePending(victim)) {
-			error = NCAC_check_ioreq(victim);
-			if (error <0) {
-
-				NCAC_error("NCAC_check_ioreq error: index=%ld, flags=%lx\n", victim->index, victim->flags);
-
-				return error;
-			}
-
-			if (error) { /* completion */
-                /* set all other related extents */
-				list_set_clean_page(victim);
-			}
-		}
-	}
-
+/* shrink_cache: shrink a cache with expected number of extents. The
+ * real number of extents which have been shrinked is returned by
+ * "scanned". This number might be less than "expected". All shrinked
+ * extents are returned into the extent free list.
+ * Different cache policies take their own ways to do shrink.
+ */
+int shrink_cache(struct cache_stack *cache_stack, 
+                 unsigned int expected, 
+                 int policy, 
+                 unsigned int *shrinked)
+{
+    int ret=-1;
+
+    switch (policy){
+        case LRU_POLICY:
+            ret = LRU_shrink_cache(cache_stack, expected, shrinked);
+            break;
+
+        case ARC_POLICY: 
+            ret = LRU_shrink_cache(cache_stack, expected, shrinked);
+            break;
+        
+        default:
+		    NCAC_error("unknown cache policy");
+            break;
+    }
     return ret;
 }
 
 
-/*
- * refill_inactive_list(): Try to move extents from "cache_stack" active
- * list to its inactive list.
- * If the extent is not movable, we move it to the head of the active
- * list. 
- * TODO: to verify this does make sense.
- * 
- * Returns how many extents moved, may be less than expected.
- */
-int refill_inactive_list( struct cache_stack *cache_stack, int expected)
+int is_extent_discardable(struct extent *victim)
 {
-    struct list_head *tail;
-    struct list_head *active_list;
-    int  moved = 0;
-	int  error;
-
-    DPRINT("refill_inactive_list: expected=%d\n", expected);
-
-    active_list =&cache_stack->active_list; 
-
-    tail  = active_list->prev;
-    while ( expected && tail != active_list ) {
-        struct extent * victim;
-
-        victim = list_entry(tail, struct extent, lru);
-        tail = tail->prev;
-
-		if ( PageReadPending(victim) || PageWritePending(victim)) {
-			error = NCAC_check_ioreq(victim);
-			if (error <0) {
-				NCAC_error("NCAC_check_ioreq error");
-				return error;
-			}
-
-			if (error) { /* completion */
-                /* set all other related extents */
-				list_set_clean_page(victim);
-			}
-		}
-
-        if ( !is_extent_movable(victim) ) {  /* not movable */
-            //list_del(&victim->lru);
-            //list_add(&victim->lru, active_list);
-            continue;
-        }
-
-		DPRINT("victim.flags=%lx, wcnt=%d, rcnt=%d, wcmp=%d, rcmp=%d\n", victim->flags, victim->writes, victim->reads, victim->wcmp, victim->rcmp);
-
-        expected --;
-        moved ++;
-        list_move(&victim->lru, &cache_stack->inactive_list);
-
-        /* set reference here to show that this extent was once "hot".
-         * If there is a reference on it again when it is still in
-         * inactive list, this extent will be quickly promoted into
-         * the active list.
-         */
-        SetPageReferenced(victim);
-		ClearPageActive(victim);
-    }
-
-    cache_stack->nr_active -= moved;
-    cache_stack->nr_inactive += moved;
-
-    DPRINT("************ move %d extents into inactive list\n", moved);
-
-    return (moved);
+    if ( PageClean(victim) && 0 == victim->reads && 0 == victim->writes ) 
+        return 1;
+    else 
+        return 0;
 }
 
-int wakeup_dirty_flush()
+/* hit_cache_item: cache hit, change the position according to the policy */
+void hit_cache_item(struct extent *extent, int cache_policy)
 {
-   return 0;
+    remove_cache_item_with_policy(extent, cache_policy);
+    add_cache_item_with_policy(extent, cache_policy);
+    return;
 }
+

Index: cache.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/cache.h,v
diff -p -u -r1.1 -r1.2
--- cache.h	21 Aug 2003 18:57:27 -0000	1.1
+++ cache.h	21 Sep 2004 13:46:12 -0000	1.2
@@ -7,16 +7,18 @@
 #define DISCARD_CLUSTER_MIN 4
 #define DELT_DISCARD_NUM    5
 
-struct extent * lookup_cache_item(struct inode *mapping, unsigned long offset);
-
-struct extent * get_free_extent_list_item(struct list_head *list);
-
-int add_cache_item(struct extent *page, struct inode *mapping, unsigned long offset);
-
-void list_set_clean_page(struct extent *page);
-int  try_to_discard_extents( struct cache_stack *cache_stack, unsigned int num);
-
-
-
+#define LRU_POLICY      1
+#define ARC_POLICY      2
+#define TWOQ_POLICY       3
+
+struct extent *lookup_cache_item(struct inode *mapping, unsigned long offset);
+struct extent *get_free_extent_list_item(struct list_head *list);
+int add_cache_item(struct extent *page, struct inode *mapping, 
+                    unsigned long index, int policy);
+void remove_cache_item(struct extent *page, int policy);
+int shrink_cache(struct cache_stack *cache_stack, unsigned int expected, 
+                    int policy, unsigned int *shrinked);
+int is_extent_discardable(struct extent *victim);
+void hit_cache_item(struct extent *page, int policy);
 
 #endif

Index: flags.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/flags.h,v
diff -p -u -r1.3 -r1.4
--- flags.h	22 Aug 2003 15:48:48 -0000	1.3
+++ flags.h	21 Sep 2004 13:46:12 -0000	1.4
@@ -34,11 +34,11 @@ static inline int test_and_set_bit(int n
 #define PG_writecomm		10	/* Write communication */
 #define PG_writepending		11	/* Write op pending */
 
-#define PG_referenced		12 
-#define PG_blank		13	/* Blank page */
-
-
+#define PG_readpreparing	12	/* Preparing for reading */
+#define PG_writepreparing	13	/* Preparing for writing */
 
+#define PG_referenced		14 
+#define PG_blank		    15	/* Blank page */
 
 /*
  * Manipulation of state flags
@@ -87,9 +87,17 @@ static inline int test_and_set_bit(int n
 #define SetPageReadPending(page) set_bit(PG_readpending, &(page)->flags)
 #define ClearPageReadPending(page) clear_bit(PG_readpending, &(page)->flags)
 
+#define PageReadPreparing(page)	 test_bit(PG_readpreparing, &(page)->flags)
+#define SetPageReadPreparing(page) set_bit(PG_readpreparing, &(page)->flags)
+#define ClearPageReadPreparing(page) clear_bit(PG_readpreparing, &(page)->flags)
+
 #define PageWritePending(page)	 test_bit(PG_writepending, &(page)->flags)
 #define SetPageWritePending(page) set_bit(PG_writepending, &(page)->flags)
 #define ClearPageWritePending(page) clear_bit(PG_writepending, &(page)->flags)
+
+#define PageWritePreparing(page)	 test_bit(PG_writepreparing, &(page)->flags)
+#define SetPageWritePreparing(page) set_bit(PG_writepreparing, &(page)->flags)
+#define ClearPageWritePreparing(page) clear_bit(PG_writepreparing, &(page)->flags)
 
 #define PageClean(page)	 test_bit(PG_clean, &(page)->flags)
 #define SetPageClean(page) set_bit(PG_clean, &(page)->flags)

Index: internal.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/internal.c,v
diff -p -u -r1.2 -r1.3
--- internal.c	23 Mar 2004 04:07:15 -0000	1.2
+++ internal.c	21 Sep 2004 13:46:12 -0000	1.3
@@ -10,14 +10,13 @@
 #include "cache.h"
 #include "ncac-job.h"
 
-extern struct inode *inode_arr[1000];
 extern struct NCAC_dev  NCAC_dev;
 
 /* This file contains NCAC internal functions. */
 
-
-static inline struct inode *get_inode(PVFS_fs_id, PVFS_handle , PVFS_context_id);
-static inline int NCAC_rwjob_prepare_one_piece(PVFS_offset pos, PVFS_size size, char ** cbufoff, PVFS_size * cbufsize, struct extent **cbufhash);
+static inline struct inode *get_inode( PVFS_fs_id, PVFS_handle , PVFS_context_id);
+static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req);
+static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req);
 
 /* get_internal_req(): get a internal request structure from the free
  * list. To avoid dynamic allocation, for the timebeing, I hard code
@@ -33,7 +32,7 @@ static inline struct NCAC_req * get_inte
 	NCAC_req_t *req=NULL;
 	struct list_head *new;
 
-	//list_lock(&NCAC_dev.req_list_lock); 
+	list_lock(&NCAC_dev.req_list_lock); 
 
 	if ( list_empty(&NCAC_dev.free_req_list) ) return NULL;
 
@@ -44,7 +43,7 @@ static inline struct NCAC_req * get_inte
 	}
 	list_del_init(new);
 
-	//list_unlock(&NCAC_dev.req_list_lock); 
+	list_unlock(&NCAC_dev.req_list_lock); 
 
 	req = list_entry(new->prev, NCAC_req_t, list);
 
@@ -63,7 +62,7 @@ void NCAC_list_add_tail_lock(struct list
 
 }
 
-/* del an entry from its list */
+/* delete an entry from its list */
 void NCAC_list_del_lock(struct list_head *entry, NCAC_lock *lock)
 {
     list_lock(lock);
@@ -94,13 +93,12 @@ void NCAC_read_request_from_list_lock(st
     *ncac_req_ptr = req;
 }
 
-
-
 /* build internal read/write requests */
 NCAC_req_t *NCAC_rwreq_build( NCAC_desc_t *desc, NCAC_optype optype)
 {
     void *iovec;
     NCAC_req_t *ncac_req;
+	int tmp_off, tmp_size;
 
     ncac_req = get_internal_req_lock(desc->coll_id, desc->handle);
     if (ncac_req == NULL) { /* run out of ncac request resources */
@@ -118,14 +116,14 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
     ncac_req->aiovec  = &(ncac_req->mapping->aiovec);
     ncac_req->nr_dirty = 0;
 
-    if ( desc->buffer ) { /* buffer read or not */
+    if ( desc->buffer ) { /* copy data into the user's buffer */
         if ( optype == NCAC_GEN_READ ) 
             ncac_req->optype = NCAC_BUF_READ;
         else ncac_req->optype = NCAC_BUF_WRITE;
 
         ncac_req->usrbuf  = desc->buffer;
         ncac_req->usrlen  = desc->len;
-    }else{
+    }else{ /* use cache buffers for communication */
         if ( optype == NCAC_GEN_READ ) 
             ncac_req->optype = NCAC_READ;
         else ncac_req->optype = NCAC_WRITE;
@@ -145,10 +143,10 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
         ncac_req->sizevec = NULL; /* no vector */
 
     }else{ /* a list of <off, len> tuples */
+		tmp_off = desc->stream_array_count*sizeof(PVFS_offset);
+		tmp_size = desc->stream_array_count*sizeof(PVFS_size);
 
-        /* I do want to avoid this malloc */
-
-        iovec = (void*) malloc( desc->stream_array_count*(sizeof(PVFS_offset)+sizeof(PVFS_size)) );
+        iovec = (void*) malloc( tmp_off + tmp_size );
         if (iovec == NULL ) {
             ncac_req->status = NCAC_NO_MEM;
             return ncac_req;
@@ -156,10 +154,11 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
 
         ncac_req->offcnt = desc->stream_array_count;
         ncac_req->offvec  = (PVFS_offset*)iovec;
-        ncac_req->sizevec = (PVFS_size *)( (unsigned long)iovec + desc->stream_array_count*sizeof(PVFS_offset) );
-
-        /* copy user reuqest's stuff here */
+        ncac_req->sizevec = (PVFS_size *)( (unsigned long)iovec + tmp_off );
 
+        /* copy the user request information into an internal request */
+		memcpy(ncac_req->offvec, desc->stream_offset_array, tmp_off);
+		memcpy(ncac_req->sizevec, desc->stream_size_array, tmp_size);
     }
 
     /* success */
@@ -170,163 +169,318 @@ NCAC_req_t *NCAC_rwreq_build( NCAC_desc_
 /*
  * NCAC_rwjob_prepare(): does three things:
  * (1) allocate resource; caculate index, offset, and length; 
- * (2) put the request in the interanl job list
+ * (2) put the request in the internal job list
  * (3) make progress of the requests in the job list.
  */
 int NCAC_rwjob_prepare(NCAC_req_t *ncac_req, NCAC_reply_t *reply )
 {
-    int bufcnt, cnt;
-    char **cbufoff;
-    PVFS_size     *cbufsize;
-    struct extent **cbufindex;
-    int           *cbufflag;
-    int 		  *cbufrcnt;
-    int			  *cbufwcnt;
-    int ret;
-    int seg;
-    
+	int ret;
 
-    /* stream <off, len> --> page info. */
+    /* prepare the request */
+	if ( !ncac_req->offcnt ) { /* only one contiguous segment */
 
-    if ( !ncac_req->offcnt ) { /* only one contiguous segment */
+        ret = NCAC_rwjob_prepare_single(ncac_req);
 
-        /* bufcnt: the biggest number of buffers the data could be
-         * placed in the cache.
-         */
-        bufcnt = (ncac_req->pos + ncac_req->size + NCAC_dev.extsize -1)/NCAC_dev.extsize - ncac_req->pos/NCAC_dev.extsize; 
-    }else {
-        bufcnt = 0;
-        for (seg = 0; seg < ncac_req->offcnt; seg ++) 
-            bufcnt += (ncac_req->offvec[seg]+ncac_req->sizevec[seg] + NCAC_dev.extsize -1)/NCAC_dev.extsize - ncac_req->offvec[seg]/NCAC_dev.extsize;
-    }
-   
-    /* try to reuse buffer info. arrays if possible. "reserved_cbufcnt" is the
-	 * size of the previous request. If the size of the current request is
-	 * not larger than the previous one, we reuse the previous resource. Otherwise,
-	 * we free the old one and malloc the new one. */
-
-    if ( ncac_req->reserved_cbufcnt < bufcnt ) {
-        if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
-
-        cbufoff  =(char**) malloc( (2*sizeof(char*)+sizeof(PVFS_size)+3*sizeof(int))* bufcnt ); 
-        cbufsize =(PVFS_size*)  &cbufoff[bufcnt];
-        cbufindex =(struct extent**) &cbufsize[bufcnt];
-        cbufflag =(int*) &cbufindex[bufcnt];
-        cbufrcnt =(int*) &cbufflag[bufcnt];
-        cbufwcnt =(int*) &cbufrcnt[bufcnt];
-        
-
-        if ( cbufoff == NULL ) {
-            ncac_req->error = -ENOMEM;
-            return -ENOMEM;
-        }
+    }else{      /* multiple segements */
 
-        ncac_req->cbufoff  = cbufoff;
-        ncac_req->cbufsize = cbufsize;
-        ncac_req->cbufhash = cbufindex;
-        ncac_req->cbufflag = cbufflag;
-        ncac_req->cbufrcnt = cbufrcnt;
-        ncac_req->cbufwcnt = cbufwcnt;
-
-        ncac_req->reserved_cbufcnt = bufcnt;
-    }
-
-    ncac_req->cbufcnt = bufcnt;
-    memset(ncac_req->cbufoff, 0, (2*sizeof(char*)+sizeof(PVFS_size)+3*sizeof(int))*ncac_req->reserved_cbufcnt);
-
-    if ( !ncac_req->offcnt ) { /* only one contiguous segment */
-        ret = NCAC_rwjob_prepare_one_piece( ncac_req->pos, 
-											ncac_req->size, 
-											ncac_req->cbufoff, 
-											ncac_req->cbufsize, 
-											ncac_req->cbufhash);
-        if ( ret != bufcnt) {
-            fprintf(stderr, "Error: bufcnt error in prepare\n");
-            ncac_req->error = NCAC_JOB_PREPARE_ERR;
-            ncac_req->status = NCAC_ERR_STATUS;
-            return NCAC_JOB_PREPARE_ERR;
-        }
-    }else{
+        ret = NCAC_rwjob_prepare_list(ncac_req);
 
-        /* multiple <off len> tuples. Handle each contiguous piece one
-         * by one. */
-        
-        cnt = 0;
-        for (seg = 0; seg < ncac_req->offcnt; seg ++) {
-            ret = NCAC_rwjob_prepare_one_piece(ncac_req->offvec[seg],
-                                               ncac_req->sizevec[seg],
-                                               ncac_req->cbufoff + cnt, 
-                                               ncac_req->cbufsize + cnt, 
-                                               ncac_req->cbufhash + cnt);
-            cnt += ret;
-        }
-        if (cnt > bufcnt) {
-            fprintf(stderr, "Error: bufcnt error in prepare\n");
-            ncac_req->error = NCAC_JOB_PREPARE_ERR;
-        	ncac_req->status = NCAC_ERR_STATUS;
-            return NCAC_JOB_PREPARE_ERR;
-        }
     }
-         
-    /* put the request in the internal job list: thread safe */
-   
-    NCAC_list_add_tail_lock(&ncac_req->list, &NCAC_dev.prepare_list, &NCAC_dev.req_list_lock);
-
-    ncac_req->status = NCAC_REQ_SUBMITTED;
-
-    DPRINT("NCAC_rwjob_prepare: %p submitted\n", ncac_req);
-     
-    /* make progress of jobs: thread safe.
-	 * Choices here are: 1) do one job; 2) scan the whole list. Choose 1) here. */
-    //ret = NCAC_do_jobs(&(NCAC_dev.req_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock); 
-
-    ret = NCAC_do_a_job(ncac_req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
-
-    if ( ret < 0 ) {
-        ncac_req->error = NCAC_JOB_DO_ERR;
-        ncac_req->status = NCAC_ERR_STATUS;
+    if ( ret < 0 ){
+        ncac_req->error = ret;
         return ret;
     }
 
-    ncac_req->error  = NCAC_OK;
+	/* put the request in the internal job list: thread safe */
+
+	NCAC_list_add_tail_lock(&ncac_req->list, &NCAC_dev.prepare_list, 
+                            &NCAC_dev.req_list_lock);
+
+	ncac_req->status = NCAC_REQ_SUBMITTED;
+
+	DPRINT("NCAC_rwjob_prepare: %p submitted\n", ncac_req);
+
+	/* make progress of jobs: thread safe.
+	 * Choices here are: 1) do one job; 2) scan the whole list. 
+	 * Choose 1) here. 
+     */
+	//ret = NCAC_do_jobs(&(NCAC_dev.req_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock); 
+
+	ret = NCAC_do_a_job(ncac_req, &(NCAC_dev.prepare_list), 
+                    &(NCAC_dev.bufcomp_list), 
+                    &(NCAC_dev.comp_list), 
+                    &NCAC_dev.req_list_lock);
+
+	if ( ret < 0 ) {
+		ncac_req->error = NCAC_JOB_DO_ERR;
+		ncac_req->status = NCAC_ERR_STATUS;
+		return ret;
+	}
+
+	ncac_req->error  = NCAC_OK;
+
+	return 0;
+}
+
+
+/* NCAC_rwjob_prepare_single: Given a request which accesses only one
+ *      file region, we prepare needed resources for this request:
+ *  1) extent cache buffers;
+ *  2) Communication buffer address;    
+ *  3) Communication buffer sizes;
+ *  4) Communication buffer flags;
+ *  Given the extent size is 32768 bytes, if a request wants to
+ *  read data 32768 bytes from 1024,
+ *      (1) two extents: 0-32765, and 32768-65535
+ *      (2) comm bufers: extent1.addr+1024, extent2.addr
+ *      (3) comm bufer size: 31744, 1024
+ *      (4) if data is ready, flag is set.
+ *  In this case, the number of extents and the number of communication
+ *  buffers are same.
+ */
+
+static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req)
+{
+	int extcnt;  /* cache extent count */
+    int comcnt;  /* communication buffer count */
+    int allocsize;
+
+    PVFS_offset   *foff;
+	char          **cbufoff;
+	PVFS_size     *cbufsize;
+    int           *cbufflag;
+    unsigned long firstoff;
+    
+    int i;
+
+    extcnt = (ncac_req->pos + ncac_req->size + NCAC_dev.extsize -1) /
+                NCAC_dev.extsize - ncac_req->pos/NCAC_dev.extsize; 
+    comcnt = extcnt;
+
+	if ( ncac_req->reserved_cbufcnt < comcnt ) {
+		if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
+
+		allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
+            + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt; 
+
+		ncac_req->foff  =(PVFS_offset*) malloc(allocsize); 
+
+		if ( ncac_req->foff == NULL ) {
+			ncac_req->error = -ENOMEM;
+			return -ENOMEM;
+		}
+
+		ncac_req->cbufoff  =(char**) & ncac_req->foff[comcnt];
+		ncac_req->cbufsize =(PVFS_size*)  &ncac_req->cbufoff[comcnt];
+		ncac_req->cbufhash =(struct extent**)
+                            &ncac_req->cbufsize[comcnt];
+		ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
+		ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
+		ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
+
+		ncac_req->reserved_cbufcnt = comcnt;
+
+	    memset(ncac_req->foff, 0, allocsize);
+    }
+
+	ncac_req->cbufcnt = comcnt;
+
+    foff = ncac_req->foff;
+    cbufoff = ncac_req->cbufoff;
+    cbufsize = ncac_req->cbufsize;
+    cbufflag = ncac_req->cbufflag;
+
+    /* Setup the related values for foff, cbufoff, and cbufsize */
+
+    firstoff = (unsigned long) (ncac_req->pos & (NCAC_dev.extsize -1)); 
+    foff[0] = ncac_req->pos - firstoff;
+    cbufoff[0] = (char*)firstoff;     /* offsize to the extent address */
+    cbufsize[0] = NCAC_dev.extsize - firstoff;
 
+    for ( i= 1; i < comcnt; i++){
+        foff[i] = foff[i-1] + cbufsize[i-1];
+        cbufoff[i] = 0;
+        cbufsize[i] = NCAC_dev.extsize;
+        cbufflag[i] = NCAC_COMM_NOT_READY;
+    }
+    cbufsize[comcnt-1] = (ncac_req->pos + ncac_req->size)% NCAC_dev.extsize;
 
+    fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
     return 0;
 }
 
+/* NCAC_rwjob_prepare_list: Given a request which accesses a list of
+ *      fire regions, we prepare needed resources for this request:
+ *  1) extent cache buffers;
+ *  2) Communication buffer address;    
+ *  3) Communication buffer sizes;
+ *  4) Communication buffer flags;
+ *  Given the extent size is 32768 bytes, if a request wants to
+ *  read data the following regions: (1024, 32768) and (65530, 32768)
+ *      (1) Three extents: 0-32765, 32768-65535, 65536-98303
+ *      (2) Communication buffers:
+ *            extent1.addr+1024, extent2.addr,
+ *            extent2.addr+32762, extent3.addr
+ *      (3) Communication buffer size:
+ *                31744, 1024, 6, and 32762
+ *  This example shows that:
+ *    (A) For the underlying I/O system, we are going to read
+ *        three extents;
+ *    (B) For the upper communcation system, we are goint to
+ *        user four different buffers.
+ *   The number of communication buffers is equal to or larger 
+ *   than the number of needed extents.
+ */ 
+
+struct freg_tuple
+{
+    PVFS_offset fpos;
+    PVFS_size   size;
+};
+
+int comp_pos(const PVFS_offset *num1, const PVFS_offset *num2)
+{
+    if (*num1 <  *num2) return -1;
+    if (*num1 == *num2) return  0;
+    if (*num1 >  *num2) return  1;
+    return 0;
+}
 
-static inline int NCAC_rwjob_prepare_one_piece(PVFS_offset pos, PVFS_size size, char ** cbufoff, PVFS_size * cbufsize, struct extent **cbufhash)
+static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req)
 {
-    unsigned long offset;
-    unsigned long bufcnt, len;
-    int seg;
+	int extcnt;  /* cache extent count */
+    int comcnt;  /* communication buffer count */
+    int allocsize;
+
+    PVFS_offset   *foff;
+	char          **cbufoff;
+	PVFS_size     *cbufsize;
+    int           *cbufflag;
+    unsigned long   firstoff;
+
+    int i, j;
+    int cnt;
+
+    struct freg_tuple *fregions;
     
-    bufcnt = (pos + size + NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize; 
+    fregions = (struct freg_tuple *)malloc(ncac_req->offcnt *
+                    sizeof(struct freg_tuple));
+    if ( NULL == fregions){
+		ncac_req->error = -ENOMEM;
+        return -ENOMEM;
+    }
+
+    extcnt = 0;
+    for (i = 0; i < ncac_req->offcnt; i ++) {
+        extcnt += (ncac_req->offvec[i] + ncac_req->sizevec[i] +
+                NCAC_dev.extsize -1)/NCAC_dev.extsize - 
+                ncac_req->offvec[i]/NCAC_dev.extsize;
+
+        fregions[i].fpos = ncac_req->offvec[i];
+        fregions[i].size = ncac_req->sizevec[i];
+    }
+
+    /* Some extents counted by "extcnt" may be same. Also the
+     * number of communication buffers should be same as the 
+     * extcnt. Use "comcnt" to overprovision resources.
+     */
+
+    comcnt = extcnt;
+
+	if ( ncac_req->reserved_cbufcnt < comcnt ) {
+		if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
 
-    len = 0;
+		allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
+            + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt; 
 
-    /* first one */
-    offset = (unsigned long)pos & (NCAC_dev.extsize -1); /* within extent */
-    cbufoff[0] =(char*) offset; /* add the extent address later */
-    cbufsize[0] = NCAC_dev.extsize - offset;
-    len += cbufsize[0];
-        
-    /* middle ones */
-    for (seg = 1; seg < bufcnt - 1; seg ++ ) {
-        /* add the extent address later */
-        cbufoff[seg] = 0;
-        cbufsize[seg] = NCAC_dev.extsize;
-        len += cbufsize[seg];
+		ncac_req->foff  =(PVFS_offset*) malloc(allocsize); 
+
+		if ( ncac_req->foff == NULL ) {
+			ncac_req->error = -ENOMEM;
+
+            free(fregions);
+
+			return -ENOMEM;
+		}
+
+		ncac_req->cbufoff  =(char**) & ncac_req->foff[comcnt];
+		ncac_req->cbufsize =(PVFS_size*)  &ncac_req->cbufoff[comcnt];
+		ncac_req->cbufhash =(struct extent**)
+                            &ncac_req->cbufsize[comcnt];
+		ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
+		ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
+		ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
+
+		ncac_req->reserved_cbufcnt = comcnt;
+
+	    memset(ncac_req->foff, 0, allocsize);
     }
+    
+    foff = ncac_req->foff;
+    cbufoff = ncac_req->cbufoff;
+    cbufsize = ncac_req->cbufsize;
+    cbufflag = ncac_req->cbufflag;
+
+    /* How many different extents are needed? Put them in an
+     * ordered manner to be friendly to the underlying I/O system.
+     * What are communication buffers used for the upper layer?
+     * (offset to the related extent, size).
+     */
+    /* quick sort the list of file regions. If the upper layer
+     * can present the file regions in an ordered manner, we can
+     * eliminate this sorting.
+     */
+    qsort(fregions, ncac_req->offcnt, sizeof(struct freg_tuple), (void*)comp_pos);
+
+#if  1
+    for (i=0; i<ncac_req->offcnt; i++){
+        fprintf(stderr, "fpos:%Ld, size:%Ld\n", fregions[i].fpos,
+fregions[i].size);
+    }
+#endif
+
+    comcnt = 0;
+    for ( i =0; i <ncac_req->offcnt; i++){
+        cnt = (fregions[i].fpos+fregions[i].size+NCAC_dev.extsize-1)/
+            NCAC_dev.extsize - fregions[i].fpos/NCAC_dev.extsize;
+
+        firstoff=(unsigned long)(fregions[i].fpos & (NCAC_dev.extsize -1)); 
+
+        foff[comcnt] = fregions[i].fpos - firstoff;
+        cbufoff[comcnt] = (char*)firstoff;
+        cbufsize[comcnt] = NCAC_dev.extsize - firstoff;
+        cbufflag[comcnt] = NCAC_COMM_NOT_READY;
+
+        for ( j= 1; j < cnt; j++){
+            foff[comcnt+j] = foff[comcnt+j-1] + NCAC_dev.extsize;
+            cbufoff[comcnt+j] = 0;
+            cbufsize[comcnt+j] = NCAC_dev.extsize;
+            cbufflag[comcnt+j] = NCAC_COMM_NOT_READY;
+        }
+        /* adjust the size of the last buffer in each segment. */
+        cbufsize[comcnt+cnt-1] -= (fregions[i].fpos+fregions[i].size) % 
+                    NCAC_dev.extsize;
 
-    /* last ones */
-    if ( bufcnt > 1 ){
-        cbufoff[bufcnt-1] = 0;
-        cbufsize[bufcnt-1] = size - len;
+        comcnt += cnt;
     }
 
+    /* so far, in the ncac_req.foff, some extents are probably same,
+     * but they are consecutive.
+     */
+
+    free(fregions);
+
+    ncac_req->cbufcnt = comcnt;
 
-    return bufcnt;
+#if 1
+    fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
+    for (i=0; i<comcnt; i++){
+        fprintf(stderr, "fpos:%Ld, buf_off:%ld, size:%Ld\n", foff[i],
+(unsigned long)cbufoff[i], cbufsize[i]);
+    }
+#endif
+
+    return 0;
 }
 
 /* NCAC_do_jobs(): this is the workhorse of NCAC.
@@ -374,33 +528,35 @@ static inline int NCAC_rwjob_prepare_one
  *    
  */
 
-int NCAC_do_jobs(struct list_head *prep_list, struct list_head *bufcomp_list, struct list_head *comp_list, NCAC_lock *lock) 
+int NCAC_do_jobs(struct list_head *prep_list, struct list_head *bufcomp_list,
+				 struct list_head *comp_list, NCAC_lock *lock)
 {
-    int ret; 
-    NCAC_req_t *ncac_req;
-        
+	int ret; 
+	NCAC_req_t *ncac_req;
+
 dojob:
-    /* read a request from the prep_list job. When a job is read out 
-     * (NOT taken from the list), there is a flag to inidcate that 
-     * someone else has read this request out. So get_request_from_list 
-     * is always return a request which is not read out by others 
-     */    
-    
-    NCAC_read_request_from_list_lock(prep_list, lock, &ncac_req);
 
-    if (ncac_req) {
-        ret = NCAC_do_a_job(ncac_req, prep_list, bufcomp_list, comp_list, lock); 
+	/* read a request from the prep_list job. When a job is read out 
+	 * (NOT taken from the list), there is a flag to indicate that 
+	 * someone else has read this request out. So get_request_from_list 
+	 * always returns a request which is not read out by others 
+	 */    
 
-        ncac_req->read_out = 0;
-        if ( ret < 0 ) 
-            return ret;
+	NCAC_read_request_from_list_lock(prep_list, lock, &ncac_req);
 
-        if ( ncac_req->status == NCAC_BUFFER_COMPLETE || 
-             ncac_req->status == NCAC_COMPLETE ) 
-            goto dojob; 
-    }
+	if (ncac_req) {
+		ret = NCAC_do_a_job(ncac_req, prep_list, bufcomp_list, comp_list, lock); 
 
-    return 0; 
+		ncac_req->read_out = 0;
+		if ( ret < 0 ) 
+			return ret;
+
+		if ( ncac_req->status == NCAC_BUFFER_COMPLETE || 
+			ncac_req->status == NCAC_COMPLETE ) 
+		goto dojob; 
+	}
+
+	return 0; 
 }
 
 
@@ -409,24 +565,28 @@ dojob:
  * is called. All horseworkers are implemented in "ncac_job.c".
  */
 
-int NCAC_do_a_job(NCAC_req_t *ncac_req, struct list_head *prep_list, struct list_head *bufcomp_list, struct list_head *comp_list, NCAC_lock *lock)
+int NCAC_do_a_job(NCAC_req_t *ncac_req, struct list_head *prep_list, 
+				struct list_head *bufcomp_list, 
+                struct list_head *comp_list, NCAC_lock *lock)
 {
-    int ret;
-   
-    switch (ncac_req->optype){
+	int ret;
+
+    fprintf(stderr, "NCAC_do_a_job enter\n");
+
+	switch (ncac_req->optype){
 
-        /* cached read */
+		/* cached read */
 		case NCAC_READ: 
 
 			ret = NCAC_do_a_read_job(ncac_req);
 			break;
 
-        /* cached write */
+		/* cached write */
 		case NCAC_WRITE: 
 
 			ret = NCAC_do_a_write_job(ncac_req);
 			break;
-        
+
 		/* cached buffer read */
 		case NCAC_BUF_READ:
 
@@ -438,7 +598,7 @@ int NCAC_do_a_job(NCAC_req_t *ncac_req, 
 
 			ret = NCAC_do_a_bufwrite_job(ncac_req);
 			break;
-       
+
 		case NCAC_QUERY:
 			ret = NCAC_do_a_query_job(ncac_req);
 			break;
@@ -451,52 +611,53 @@ int NCAC_do_a_job(NCAC_req_t *ncac_req, 
 			ret = NCAC_do_a_sync_job(ncac_req);
 			break;
 
-        default:
+		default:
 			ret = NCAC_JOB_OPTYPE_ERR;
-            fprintf(stderr, "NCAC_do_a_job: unrecognize optype flag\n");
+			fprintf(stderr, "NCAC_do_a_job: unrecognize optype flag\n");
 			break;
 	}
 
-    if ( ncac_req->status == NCAC_BUFFER_COMPLETE ) {
+	if ( ncac_req->status == NCAC_BUFFER_COMPLETE ) {
 
-        NCAC_list_del_lock(&ncac_req->list, lock);
+		NCAC_list_del_lock(&ncac_req->list, lock);
 
-        NCAC_list_add_tail_lock(&ncac_req->list, bufcomp_list, lock); 
+		NCAC_list_add_tail_lock(&ncac_req->list, bufcomp_list, lock); 
 
-    }else if ( ncac_req->status == NCAC_COMPLETE ) 
-    {
-        NCAC_list_del_lock(&ncac_req->list, lock);
-        NCAC_list_add_tail_lock(&ncac_req->list, comp_list, lock); 
-    }
+	}else if ( ncac_req->status == NCAC_COMPLETE ) 
+	{
+		NCAC_list_del_lock(&ncac_req->list, lock);
+		NCAC_list_add_tail_lock(&ncac_req->list, comp_list, lock); 
+	}
 
+    fprintf(stderr, "NCAC_do_a_job exit\n");
 	return ret;
 }
 
 
 int NCAC_check_request( int id, struct NCAC_req **ncac_req )
 {
-    struct NCAC_req *req;
-    int ret;
+	struct NCAC_req *req;
+	int ret;
 
-    req = &NCAC_dev.free_req_src[id]; 
-    if ( req->status == NCAC_COMPLETE || req->status == NCAC_BUFFER_COMPLETE ) {
-    	*ncac_req = req;
-        return 0;
-    }
+	req = &NCAC_dev.free_req_src[id]; 
+	if ( req->status == NCAC_COMPLETE || req->status == NCAC_BUFFER_COMPLETE ) {
+		*ncac_req = req;
+		return 0;
+	}
 
 	if ( req->status == NCAC_REQ_UNUSED ) {
-    	*ncac_req = NULL;
+		*ncac_req = NULL;
 		NCAC_error("NCAC_check_request:no such request");
-        return -1;
+		return -1;
 	}
 
-    ret = NCAC_do_a_job(req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
+	ret = NCAC_do_a_job(req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
 
-    if ( ret < 0 ) {
+	if ( ret < 0 ) {
 		NCAC_error("NCAC_check_request:do a job error (%d)", req->error);
-    }
-    *ncac_req = req;
-    return ret;
+	}
+	*ncac_req = req;
+	return ret;
 }
 
 /* done request(): mark a request is done. Several cases:
@@ -517,23 +678,23 @@ int NCAC_check_request( int id, struct N
 
 int NCAC_done_request( int id )
 {
-    struct NCAC_req *ncac_req;
-    int ret = 0;
+	struct NCAC_req *ncac_req;
+	int ret = 0;
 
-    ncac_req = &NCAC_dev.free_req_src[id]; 
+	ncac_req = &NCAC_dev.free_req_src[id]; 
 
 	switch ( ncac_req->status ) {
 
-	    case NCAC_BUFFER_COMPLETE:  /* pending communication is done */
-			
-            NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
+		case NCAC_BUFFER_COMPLETE:  /* pending communication is done */
+
+			NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
 			ret = NCAC_extent_done_access( ncac_req );
-			
+
 			break;
 
 		case NCAC_COMPLETE:
 
-            NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
+			NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
 			break;
 
 		default: /* error. leaking here. */
@@ -542,65 +703,100 @@ int NCAC_done_request( int id )
 			return ret;
 	}
 
-
 	/* prepare to return this request to the free list.
 	 * We cannot just zero the ncac_req for all cases. 
 	 * We want to reuse buffer inforation arrays to avoid
 	 * allcations. */
-	 
+
 	if ( ncac_req->reserved_cbufcnt == 0 ) {
 		id = ncac_req->id;
 		memset( ncac_req, 0, sizeof(struct NCAC_req) );
 		ncac_req->id = id;
 
 	}else{ /* we want reuse buffer information arrays */
-	    ncac_req->cbufcnt = 0;
-	    ncac_req->mapping = 0;
+		ncac_req->cbufcnt = 0;
+		ncac_req->mapping = 0;
 		ncac_req->ioreq   = INVAL_IOREQ;
-        ncac_req->read_out = 0;
+		ncac_req->read_out = 0;
 	}
-    ncac_req->status  = NCAC_REQ_UNUSED; 
 
-    NCAC_list_add_tail_lock( &ncac_req->list, &NCAC_dev.free_req_list, &NCAC_dev.req_list_lock); 
+	ncac_req->status  = NCAC_REQ_UNUSED; 
+
+	NCAC_list_add_tail_lock( &ncac_req->list, &NCAC_dev.free_req_list, &NCAC_dev.req_list_lock); 
 
 
 	return ret;
 }
 
+static inline struct inode *search_inode_list (PVFS_handle handle)
+{
+	int inode_index;
+	struct inode * cur;
+
+	inode_index = handle % MAX_INODE_NUM;
+
+	cur = inode_arr[inode_index]; 
+	while ( NULL != cur ) {
+		if ( cur->handle == handle ) return cur;	
+		cur = cur->next;
+	}	
+
+	return NULL;
+}
+
+/* get_inode: give a fs_id and a file handler, an inode-like structure
+ *            is allocated. Since handle is an arbitrary number, we should
+ *            have a mapping between this handler and the index of inode.
+ * get_inode should be called under some lock because two callers may
+ *     work on the same collision list.  
+ */
 static inline struct inode *get_inode(PVFS_fs_id coll_id, 
-									PVFS_handle handle, PVFS_context_id context_id)
+				PVFS_handle handle, PVFS_context_id context_id)
 {
-    struct inode *inode;
+	struct inode *inode;
+	int inode_index;
+
+	inode_index = handle % MAX_INODE_NUM;
+
+	/* search the inode list with the index of "inode_index" */
+	inode = search_inode_list (handle);
 
-    if ( !inode_arr[handle] ) {
-        inode=(struct inode*)malloc(sizeof(struct inode));
+	fprintf(stderr, "handle: %Ld, index: %d, inode:%p\n", handle, inode_index, inode);
 
-                /* initialize it */
-                memset(inode, 0, sizeof(struct inode));
+	if ( NULL == inode ){
+		inode=(struct inode*)malloc(sizeof(struct inode));
 
-        inode->cache_stack = get_cache_stack();
+		/* initialize it */
+		memset(inode, 0, sizeof(struct inode));
 
-	init_single_radix_tree(&inode->page_tree, NCAC_dev.get_value, NCAC_dev.max_b);
+		inode->cache_stack = get_cache_stack();
+		inode->nrpages = 0;
+		inode->nr_dirty = 0;
+		inode->coll_id = coll_id;
+		inode->handle = handle;
+		inode->context_id = context_id;
 
-        inode_arr[handle] = inode;
-		inode_arr[handle]->nrpages = 0;
-		inode_arr[handle]->nr_dirty = 0;
-		inode_arr[handle]->coll_id = coll_id;
-		inode_arr[handle]->handle = handle;
-		inode_arr[handle]->context_id = context_id;
+		init_single_radix_tree(&inode->page_tree, NCAC_dev.get_value, NCAC_dev.max_b);
 
 		spin_lock_init(&inode->lock);
 
-        INIT_LIST_HEAD(&(inode->clean_pages));
-        INIT_LIST_HEAD(&(inode->dirty_pages));
-    }
-    return inode_arr[handle];
+		INIT_LIST_HEAD(&(inode->clean_pages));
+		INIT_LIST_HEAD(&(inode->dirty_pages));
+
+		/* put the new inode to the head of the collision list */
+		inode->next = inode_arr[inode_index];
+		inode_arr[inode_index] = inode;
+	}
+
+	return inode;
 }
 
+
+
 static inline void extent_dump(struct extent *extent)
 {
 	fprintf(stderr, "flags:%x\t status:%d\t	index:%d\t\n", (int)extent->flags, extent->status, (int)extent->index);
-	fprintf(stderr, "writes:%d\t reads:%d\t	ioreq:%d\t\n", extent->writes, extent->reads, extent->ioreq);
+	fprintf(stderr, "writes:%d\t reads:%d\t	ioreq:%Ld\t\n", extent->writes, extent->reads, extent->ioreq);
 
 }
 

Index: internal.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/internal.h,v
diff -p -u -r1.2 -r1.3
--- internal.h	23 Mar 2004 04:07:15 -0000	1.2
+++ internal.h	21 Sep 2004 13:46:12 -0000	1.3
@@ -82,18 +82,19 @@ typedef struct NCAC_dev NCAC_dev_t;
 
 
 struct NCAC_req{
-   int  id;
-   int  optype;
-   int  status;
-   int  error;
+   int              id;
+   int              optype;
+   int              status;
+   int              error;
    PVFS_fs_id   	coll_id;
    PVFS_handle  	handle;
    PVFS_context_id  context_id;
 
-   PVFS_size usrlen;
-   PVFS_size written;
-   char *usrbuf;
+   PVFS_size        usrlen;
+   PVFS_size        written;
+   char             *usrbuf;
 
+   PVFS_offset *foff;
    char ** cbufoff;
    PVFS_size *cbufsize;
    int *cbufflag;
@@ -114,7 +115,7 @@ struct NCAC_req{
 
    struct inode *mapping;
    struct aiovec *aiovec;
-   int ioreq;
+   PVFS_id_gen_t ioreq;
 
    int read_out;
    struct list_head list;
@@ -137,6 +138,8 @@ typedef struct NCAC_req NCAC_req_t;
 /* this is an inode-like structure for each
  * object <coll_id, handle>
  */
+#define MAX_INODE_NUM 	10000
+
 struct inode
 {
     NCAC_lock  lock;
@@ -154,8 +157,11 @@ struct inode
 
     struct aiovec aiovec;
     struct cache_stack *cache_stack;
+	struct inode *next;
 };
 
+extern struct inode *inode_arr[MAX_INODE_NUM];
+
 
 struct extent {
    unsigned long   flags;
@@ -175,16 +181,21 @@ struct extent {
    struct extent *next;
    struct inode *mapping;
 
-   int  	ioreq;
-   struct extent *ioreq_next;
+   PVFS_id_gen_t  	ioreq;
 
+   /* for optimization. We can initiate one trove request for
+    * a list of extents. For doing that, all extents will share
+    * the same ioreq. If the ioreq is done, we follow ioreq_next
+    * to mark all other extents.
+    */
+   struct extent    *ioreq_next; 
 };
 
 
-
 #define MAX_DELT_REQ_NUM 10000
 
 
+
 #define NCAC_OK			0
 #define NCAC_REQ_BUILD_ERR    -1000
 #define NCAC_SUBMIT_ERR     -1001
@@ -232,8 +243,15 @@ int NCAC_rwjob_prepare(NCAC_req_t *ncac_
 int NCAC_do_jobs(struct list_head *list, struct list_head *bufcomp_list, struct list_head * comp_list, NCAC_lock *lock);
 int NCAC_do_a_job(NCAC_req_t *req, struct list_head *list, struct list_head *bufcomp_list, struct list_head * comp_list, NCAC_lock *lock);
 
+#define NCAC_COMM_NOT_READY    0
+#define NCAC_READ_PREPARE   1
+#define NCAC_READING        2
+#define NCAC_READ_READY     3
+
 int NCAC_do_one_piece_read(NCAC_req_t *ncac_req, PVFS_offset pos,
-                           PVFS_size size, char **cbufoff,
+                           PVFS_size size, 
+                            PVFS_offset *foff,
+                            char **cbufoff,
                            PVFS_size *cbufsize, struct extent *cbufhash[],
                            int *cbufflag, int *cbufrcnt, int *cbufwcnt, int *cnt);
 

Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/module.mk.in,v
diff -p -u -r1.1 -r1.2
--- module.mk.in	21 Aug 2003 18:57:27 -0000	1.1
+++ module.mk.in	21 Sep 2004 13:46:12 -0000	1.2
@@ -1,11 +1,13 @@
 DIR := src/io/buffer
 SERVERSRC += \
-	$(DIR)/ncac-interface.c \
-	$(DIR)/ncac-trove.c \
-	$(DIR)/ncac-job.c \
-	$(DIR)/ncac-buf-job.c \
-	$(DIR)/ncac-init.c \
-	$(DIR)/internal.c \
-	$(DIR)/cache.c \
-	$(DIR)/state.c \
-	$(DIR)/radix.c
+    $(DIR)/ncac-interface.c \
+    $(DIR)/ncac-trove.c \
+    $(DIR)/ncac-job.c \
+    $(DIR)/ncac-buf-job.c \
+    $(DIR)/ncac-init.c \
+    $(DIR)/internal.c \
+    $(DIR)/cache.c \
+    $(DIR)/ncac-lru.c \
+    $(DIR)/state.c \
+    $(DIR)/radix.c
+

Index: ncac-buf-job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-buf-job.c,v
diff -p -u -r1.1 -r1.2
--- ncac-buf-job.c	21 Aug 2003 18:57:27 -0000	1.1
+++ ncac-buf-job.c	21 Sep 2004 13:46:12 -0000	1.2
@@ -17,6 +17,8 @@
 
 int NCAC_do_a_bufread_job(struct NCAC_req *ncac_req)
 {
+
+#if 0
     int ret;
     int seg, cnt;
     int rcomm=0;
@@ -28,7 +30,9 @@ int NCAC_do_a_bufread_job(struct NCAC_re
     /* only one contiguous segment */
     if ( !ncac_req->offcnt ) { 
         ret = NCAC_do_one_piece_read( ncac_req, ncac_req->pos, 
-									  ncac_req->size, ncac_req->cbufoff, 
+									  ncac_req->size, 
+									  ncac_req->foff, 
+                                        ncac_req->cbufoff, 
 									  ncac_req->cbufsize, ncac_req->cbufhash,
  									  ncac_req->cbufflag, 
 									  ncac_req->cbufrcnt,
@@ -47,6 +51,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
         for (seg = 0; seg < ncac_req->offcnt; seg ++) {
             ret = NCAC_do_one_piece_read( ncac_req, ncac_req->offvec[seg],
                                           ncac_req->sizevec[seg],
+									        ncac_req->foff + cnt, 
                                           ncac_req->cbufoff + cnt, 
                                           ncac_req->cbufsize + cnt, 
                                           ncac_req->cbufhash + cnt, 
@@ -95,6 +100,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
 		}
 		
 	}
+#endif
 
     return 0;
 
@@ -103,6 +109,7 @@ int NCAC_do_a_bufread_job(struct NCAC_re
 int NCAC_do_a_bufwrite_job(struct NCAC_req *ncac_req)
 {
 	
+#if 0
     int ret;
     int seg, cnt;
     int rcomm=0;
@@ -181,6 +188,7 @@ int NCAC_do_a_bufwrite_job(struct NCAC_r
 		}
 		
 	}
+#endif
 
     return 0;
 }

Index: ncac-init.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-init.c,v
diff -p -u -r1.3 -r1.4
--- ncac-init.c	23 Mar 2004 04:07:15 -0000	1.3
+++ ncac-init.c	21 Sep 2004 13:46:13 -0000	1.4
@@ -15,7 +15,7 @@ extern int posix_memalign(void **memptr,
 
 /* global variable */
 NCAC_dev_t NCAC_dev;
-struct inode *inode_arr[1000];
+struct inode *inode_arr[MAX_INODE_NUM];
 
 static inline void init_free_extent_list(int num);
 static inline void init_free_req_list(int num);
@@ -76,7 +76,7 @@ int cache_init(NCAC_info_t *info)
     INIT_LIST_HEAD( &NCAC_dev.comp_list);
 
 
-    memset( inode_arr, 0, sizeof(struct inode*)*1000 );
+    memset( inode_arr, 0, sizeof(struct inode*)*MAX_INODE_NUM );
 
     NCAC_dev.get_value = radix_get_value;
     NCAC_dev.max_b     = RADIX_MAX_BITS;

Index: ncac-interface.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-interface.c,v
diff -p -u -r1.1 -r1.2
--- ncac-interface.c	21 Aug 2003 18:57:27 -0000	1.1
+++ ncac-interface.c	21 Sep 2004 13:46:13 -0000	1.2
@@ -222,9 +222,7 @@ int cache_req_test(cache_request_t *requ
     else 
 		*flag = 0;
 
-
     return 0;
-
 }
 
 int cache_req_testsome(int count, 

Index: ncac-job.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-job.c,v
diff -p -u -r1.2 -r1.3
--- ncac-job.c	17 Nov 2003 19:19:29 -0000	1.2
+++ ncac-job.c	21 Sep 2004 13:46:13 -0000	1.3
@@ -1,4 +1,4 @@
-/* This file defines the horseworker for each particular type of jobs.  */
+/* This file defines the horseworkers for each particular type of jobs.  */
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -12,347 +12,141 @@
 #include "ncac-trove.h"
 
 extern struct cache_stack global_cache_stack;
-extern struct inode *inode_arr[1000];
 
 /* internal functions */
-static inline struct extent * NCAC_find_get_ext(NCAC_req_t *ncac_req, unsigned long index);
-static inline struct extent * NCAC_alloc_ext(NCAC_req_t *ncac_req);
-static inline struct extent * NCAC_alloc_ext_wait(NCAC_req_t *ncac_req);
-static inline int NCAC_add_to_cache(struct extent * extent,unsigned long index, NCAC_req_t *ncac_req);
-
+static inline struct extent *find_extent(NCAC_req_t *ncac_req,
+                       unsigned long index);
+static inline struct extent *allocate_extent(NCAC_req_t *ncac_req, 
+                       int flag);
+static inline int free_extent(NCAC_req_t *ncac_req,
+                       struct extent *extent);
+static inline int init_extent_read(NCAC_req_t *ncac_req,
+                struct extent *extent, PVFS_offset foffset, PVFS_size size);
+static inline void set_extent_read_pending(struct extent *extent);
+static inline int check_extent_read(NCAC_req_t *ncac_req, struct extent *extent);
+static inline void increase_read_reference(struct extent *extent);
+static inline int add_extent_to_cache(struct extent * extent,
+         unsigned long index, NCAC_req_t *ncac_req, int policy);
 
 /* do a read job.
- * return: <0 error code
+ * return: < 0 error code
  *         0: ok
  * ncac_req->status shows the current status of the job
  * ncac_req->error shows the current error of the job if any.
  *
  * Lock stuff: A design choice has been made to do locks as follows:
  *     1) each inode has a lock;
- *     2) each cache stack has a lock (many inodes may share a same cache stack).
- * To avoid lock calls on each extent, we had a sort of "big" lock across jobs on an inode.
- * During a job processing, if the cache stack is touched, the job should acquire the cache
- * stack lock. So the lock order is:
- *     inode lock
+ *     2) each cache stack has a lock (many inodes may share a same 
+ *	cache stack).
+ * To avoid lock calls on each extent, we had a sort of "big" lock 
+ * across jobs on an inode. During a job processing, if the cache stack 
+ * is touched, the job should acquire the cache stack lock. So the lock 
+ * order is:
+ *	inode lock
  *             ----> cache stack lock
  *             ----> release cache stack lock
- *     release inode lock
+ *  release inode lock
  * 
- *  So, we make a tradeoff between the number of lock calls and the granularity of lock.
+ *  So, we make a tradeoff between the number of lock calls and the 
+ *  granularity of lock.
  */
 
 int NCAC_do_a_read_job(struct NCAC_req *ncac_req)
 {
-    int ret;
-    int seg, cnt;
-    int rcomm=0;
-
-	inode_lock(&ncac_req->mapping->lock);
-	
-    /* only one contiguous segment */
-    if ( !ncac_req->offcnt ) { 
-        ret = NCAC_do_one_piece_read( ncac_req, ncac_req->pos, 
-									  ncac_req->size, ncac_req->cbufoff, 
-									  ncac_req->cbufsize, ncac_req->cbufhash,
- 									  ncac_req->cbufflag, 
-									  ncac_req->cbufrcnt,
-									  ncac_req->cbufwcnt,
-									  &cnt);
-        if ( ret < 0) {
-            ncac_req->error = NCAC_JOB_PROCESS_ERR;
-            ncac_req->status = NCAC_ERR_STATUS;
-
-			inode_unlock( &ncac_req->mapping->lock );
-
-            return ret;
-        }
-    }else{
-
-        /* Handle each contiguous piece one by one. */
-        
-        cnt = 0;
-        for (seg = 0; seg < ncac_req->offcnt; seg ++) {
-            ret = NCAC_do_one_piece_read( ncac_req, ncac_req->offvec[seg],
-                                          ncac_req->sizevec[seg],
-                                          ncac_req->cbufoff + cnt, 
-                                          ncac_req->cbufsize + cnt, 
-                                          ncac_req->cbufhash + cnt, 
-                                          ncac_req->cbufflag + cnt,
-									      ncac_req->cbufrcnt + cnt,
-									      ncac_req->cbufwcnt + cnt, &seg);
-            if ( ret < 0) {
-            	ncac_req->error = NCAC_JOB_PROCESS_ERR;
-            	ncac_req->status = NCAC_ERR_STATUS;
-
-			    inode_unlock( &ncac_req->mapping->lock );
-
-            	return ret;
-            }
-            cnt += seg;
-        }
-    }
+	int ret;
+    struct extent **cbufhash;
+    PVFS_offset *foff;
+    int *cbufflag;
+    struct extent *new_extent;
+    struct extent *last_extent;
 
-	inode_unlock(&ncac_req->mapping->lock);
-
-    for (seg = 0; seg < ncac_req->cbufcnt; seg ++)
-         if (ncac_req->cbufflag[seg] == 1) rcomm++;
-         
-    if (rcomm == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
-    else if (!rcomm) ncac_req->status = NCAC_REQ_SUBMITTED;
-    else ncac_req->status = NCAC_PARTIAL_PROCESS;
-
-    return 0;
-}
-
-/* NCAC_do_one_piece_read(): handle one contiguous block.
- * return:
- *    < 0: error
- *    = 0: no error
- *    at the same time, ncac_req->error shows error no if any.
- *    ncac_req->status shows the status of this one piece.
- *
- *    TODO: 1) use gang lookup;
- *          2) allocate contiguous extents from a bigger buffer
- */ 
-
-int NCAC_do_one_piece_read(NCAC_req_t *ncac_req, PVFS_offset pos, 
-                           PVFS_size size, char **cbufoff, 
-                           PVFS_size *cbufsize, struct extent *cbufhash[],
-                           int *cbufflag,
-                           int *cbufrcnt,
-                           int *cbufwcnt,
-                           int *cnt)
-{
     unsigned long index;
-    unsigned int offset = 0, nr = 0;
-    struct extent *cached_ext;
-    struct extent *extent;
-    int error;
-    int ret;
-
-    struct aiovec aiovec_arr, *aiovec;
-    int ioreq;
-
-    int cbufcnt;
-    int toread=0;
-    int slots;
-    int i, j;
-    
-    PVFS_offset oldpos = pos;
-
-
-    aiovec = &aiovec_arr;
-    aiovec_init(aiovec);
-
-    cbufcnt = (pos+size+ NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize;
-    *cnt = cbufcnt;
-
-    cached_ext = NULL;
-    index =  pos >> NCAC_dev.extlog2;
+    int comcnt, readcnt;
+    int i;
     
-    DPRINT("one_piece_read: pos=%Ld, sindex=%ld  cnt=%d\n", pos, index, cbufcnt);
-    for (i=0; i< cbufcnt; i++) {
-
-        if ( cbufhash[i] ) {
 
-            DPRINT("Read recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
-
-            /* still previous writes pending  on this */
-            if ( cbufwcnt[i] >  cbufhash[i]->wcmp )  {
-	            index ++;
-				pos += nr;
-				continue;
-            }
-
-            if ( cbufwcnt[i] <  cbufhash[i]->wcmp ) {
-                NCAC_error("Error: wcnt should not be less than cmp\n");
-	            index ++;
-				pos += nr;
-				continue;
+    /* even there are "comcnt" communication buffers, the
+     * number of extents needed may be less.
+     */
+    comcnt = ncac_req->cbufcnt;
+    fprintf(stderr, "NCAC_do_a_read_job: enter (comcnt=%d)\n", comcnt);
+
+    cbufhash = ncac_req->cbufhash;
+    foff = ncac_req->foff;
+    cbufflag = ncac_req->cbufflag;
+
+    inode_lock (&ncac_req->mapping->lock);
+
+    last_extent = NULL;
+    for (i=0; i<comcnt; i++){
+        if ( NULL == cbufhash[i] ){
+            index = foff[i] >> NCAC_dev.extlog2;
+		    new_extent = find_extent(ncac_req, index);
+            if ( NULL == new_extent ){ /* not cached */
+			    new_extent= allocate_extent(ncac_req,BLOCKING_EXTENT_ALLOC);
+			    if ( new_extent ){
+				    new_extent->index = index;
+				    new_extent->mapping = ncac_req->mapping;
+                    new_extent->ioreq = INVAL_IOREQ;
+
+				    ret = init_extent_read(ncac_req, new_extent,
+                                         foff[i], NCAC_dev.extsize);
+                    if ( ret < 0 ) {
+				        NCAC_error("init_extent_read error ext:%p\n",
+                                 new_extent);
+
+                        free_extent(ncac_req, new_extent);
+	                    inode_unlock (&ncac_req->mapping->lock);
+                        return ret;
+                    }
+                    add_extent_to_cache(new_extent, index, ncac_req,
+                                        LRU_POLICY);
+                    set_extent_read_pending(new_extent);
+                    cbufhash[i] = new_extent;
+                }
+            }else{ /* cached */
+                cbufhash[i] = new_extent;
+                hit_cache_item(new_extent, LRU_POLICY);
             }
 
-            extent = cbufhash[i];
-            offset = cbufoff[i] - extent->addr;
-			nr = cbufsize[i];
-
-            DPRINT("recheck: offset=%p, nr=%d extent=%p\n", cbufoff[i], nr, extent);
-		    error = NCAC_extent_read_access_recheck(ncac_req, extent, offset, nr);
-		    if (error < 0){
-			    ncac_req->error = error;	
-                NCAC_error("NCAC_extent_read_access_recheck error  extent=%p\n", extent);
-			    return error;
-		    }
-            cbufflag[i] = error;
-
-            DPRINT("Read recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-            
-            index ++;
-            pos += nr;
-            continue;
-        }
-
-
-        offset = (unsigned long)pos & (NCAC_dev.extsize -1);
-        nr = cbufsize[i];
-
-		/* try to find an cached extent. If cached, the reference count is
-		 * added.
-		 */
-        extent = NCAC_find_get_ext(ncac_req, index);
-
-        if (extent == NULL) {
-            goto no_cached_extent;
+            /* only one reference for each request */
+            if ( cbufhash[i] && cbufhash[i] != last_extent )
+                increase_read_reference(cbufhash[i]); 
+            last_extent = cbufhash[i];
         }
 
-
-		/* the extent is cached */
-		error = NCAC_extent_read_access(ncac_req, extent, offset, nr);
-		if (error < 0){
-			extent_ref_release( extent );	
-			ncac_req->error = error;	
-			return error;
-		}
-
-        DPRINT("index=%ld is cached: extent flags:%lx reads=%d, writes=%d, rcmp=%d, wcmp=%d\n", index, extent->flags, extent->reads, extent->writes, extent->rcmp, extent->wcmp);
-
-        cbufflag[i] = error; /* maybe ready, maybe not */
-        cbufhash[i] = extent;
-
-        cbufrcnt[i] = extent->reads;
-        cbufwcnt[i] = extent->writes;
-
-        cbufoff[i] += (unsigned long)extent->addr;
-
-		/* prepare for the next extent */
-		index += 1;
-        pos += nr;
-
-        continue; /* continue for the next extent */
-
-no_cached_extent:
-        /* the extent was not cached. we need to create a new extent. */
-
-        if (!cached_ext) {
-            cached_ext = NCAC_alloc_ext_wait(ncac_req);
-            if (cached_ext) {
-            	NCAC_extent_first_read_access(ncac_req, cached_ext);
-				cached_ext->index = index;
-				cached_ext->mapping = ncac_req->mapping;
-			}
+        if ( cbufhash[i] ){
+            ret = 1;
+            if ( PageReadPending(cbufhash[i]) ){
+                fprintf(stderr, "extent:%p ioreq:%Ld\n", cbufhash[i], cbufhash[i]->ioreq);
+                ret = check_extent_read(ncac_req, cbufhash[i]);
+                if (ret < 0){
+				    ncac_req->error = ret;	
+				    NCAC_error("check_read_pending extent=%p\n", cbufhash[i]);
+
+                    inode_unlock (&ncac_req->mapping->lock);
+				    return ret;
+			    }
+            }
+			cbufflag[i] = ret;
 		}
-
-        extent = cached_ext;
-        cached_ext = NULL;
-
-        cbufhash[i] = extent;
-        if ( extent ){
-            cbufoff[i] += (unsigned long)extent->addr;
-            cbufflag[i] = 0; /* not ready for communication */
-
-            cbufhash[i]->ioreq = INVAL_IOREQ;
-
-            toread ++;
-
-            cbufrcnt[i] = extent->reads;
-            cbufwcnt[i] = extent->writes;
-        }
-     
-		/* prepare for the next extent */
-		index += 1;
-        pos += nr;
     }
 
-    if ( !toread ) return 0;
-
-    pos = oldpos;
-    for (i = 0; i < cbufcnt; i++ ) {
-
-        if ( cbufhash[i] && PageBlank(cbufhash[i]) ) {
-
-	    	 slots = aiovec_add(aiovec, cbufhash[i], pos, cbufsize[i], cbufoff[i], cbufsize[i]);
-
-             DPRINT("do_a_job: going to read (%Ld %Ld) to %p\n", pos, cbufsize[i], cbufoff[i]);
-             pos += cbufsize[i];
-
-             if (!slots){
-                 ret = NCAC_aio_read_ext(ncac_req->coll_id, ncac_req->handle, ncac_req->context_id, aiovec, &ioreq);
-		         if ( ret < 0 ) {
-                     ncac_req->error = NCAC_TROVE_AIO_REQ_ERR;
-			         ncac_req->status = NCAC_ERR_STATUS;
-					
-					 NCAC_error("aio_read_ext error\n");
-
-        		     aiovec_init(aiovec);
-                     return ret;
-		         }else{
-                     aiovec->extent_array[0]->ioreq = ioreq;
-                     extent = aiovec->extent_array[0];
-                     extent->ioreq_next = extent;
-			         for (j = 1; j < aiovec_count(aiovec); j ++) {
-                         aiovec->extent_array[j]->ioreq = ioreq;
-
-                         aiovec->extent_array[j-1]->ioreq_next = aiovec->extent_array[j];
-			         }
-
-			         aiovec->extent_array[aiovec_count(aiovec)-1]->ioreq_next = aiovec->extent_array[0]; 
-                 }
-
-                 DPRINT("do_a_job: aio_read cnt=%d\n", aiovec_count(aiovec));
-        		 aiovec_init(aiovec);
-             }
-		}
-	}
-
-    DPRINT("do_one_piece_read: aio_read cbufcnt=%d, cnt=%d\n", cbufcnt, aiovec_count(aiovec));
-
-    ioreq = INVAL_IOREQ;
-
-    if (aiovec_count(aiovec)){
-        ret = NCAC_aio_read_ext(ncac_req->coll_id, ncac_req->handle, ncac_req->context_id, aiovec, &ioreq);
-	    if ( ret < 0 ) {
-            ncac_req->error = NCAC_TROVE_AIO_REQ_ERR;
-	        ncac_req->status = NCAC_ERR_STATUS;
-            aiovec_init(aiovec);
-
-            NCAC_error("do_one_piece_read: NCAC_aio_read_ext error\n");
-
-            return ret;
-	    }else{
-            aiovec->extent_array[0]->ioreq = ioreq;
-            extent = aiovec->extent_array[0];
-            extent->ioreq_next = extent;
-	        for (i= 1; i < aiovec_count(aiovec); i ++) {
-                aiovec->extent_array[i]->ioreq = ioreq;
-
-                aiovec->extent_array[i-1]->ioreq_next = aiovec->extent_array[i];
-            }
+	inode_unlock (&ncac_req->mapping->lock);
 
-            aiovec->extent_array[aiovec_count(aiovec)-1]->ioreq_next = aiovec->extent_array[0]; 
-        }
-        aiovec_init(aiovec);
+    readcnt = 0;
+    for (i=0; i<comcnt; i++){
+        if (ncac_req->cbufflag[i]) readcnt++;
     }
 
-    /* add to cache */
-    for (i = 0; i < cbufcnt; i++ ) {
-        if ( cbufhash[i] && PageBlank(cbufhash[i]) ) {
-             ClearPageBlank(cbufhash[i]);
-           	 ret = NCAC_add_to_cache(cbufhash[i], cbufhash[i]->index, ncac_req);
-
-           	 if ( ret < 0 ) {
-                 ncac_req->error = NCAC_CACHE_ERR;
-
-                 NCAC_error("do_one_piece_read: add_to_cache error: index=%ld\n", index);
-
-               	 return ret;
-           	}
-		}
-	}
+    if (readcnt == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
+    else if (!readcnt) ncac_req->status = NCAC_REQ_SUBMITTED;
+    else ncac_req->status = NCAC_PARTIAL_PROCESS;
 
+    fprintf(stderr, "NCAC_do_a_read_job: exit\n");
 	return 0;
 }
 
-
 /* do a write job.
  * return: <0 error code
  *         0: ok
@@ -362,268 +156,11 @@ no_cached_extent:
 
 int NCAC_do_a_write_job(struct NCAC_req *ncac_req)
 {
-    int ret;
-    int seg, cnt;
-    int rcomm=0;
-
-	inode_lock(&ncac_req->mapping->lock);
-
-    /* only one contiguous segment */
-    if ( !ncac_req->offcnt ) { 
-        ret = NCAC_do_one_piece_write(  ncac_req, ncac_req->pos, 
-										ncac_req->size, 
-										ncac_req->cbufoff, ncac_req->cbufsize, 
-										ncac_req->cbufhash, ncac_req->cbufflag, 
-									    ncac_req->cbufrcnt,
-									    ncac_req->cbufwcnt,
-										&cnt );
-        if ( ret < 0) {
-            ncac_req->error = NCAC_JOB_PROCESS_ERR;
-            ncac_req->status = NCAC_ERR_STATUS;
-
-			inode_unlock(&ncac_req->mapping->lock);
-
-            return ret;
-        }
-    }else{
-
-        /* Handle each contiguous piece one by one. */
-        
-        cnt = 0;
-        for (seg = 0; seg < ncac_req->offcnt; seg ++) {
-            ret = NCAC_do_one_piece_write( ncac_req, ncac_req->offvec[seg],
-                                                ncac_req->sizevec[seg],
-                                                ncac_req->cbufoff + cnt, 
-                                                ncac_req->cbufsize + cnt, 
-                                                ncac_req->cbufhash + cnt, 
-                                                ncac_req->cbufflag + cnt, 
-									    		ncac_req->cbufrcnt + cnt,
-									    		ncac_req->cbufwcnt + cnt,
-												&seg );
-            if ( ret < 0) {
-            	ncac_req->error = NCAC_JOB_PROCESS_ERR;
-            	ncac_req->status = NCAC_ERR_STATUS;
-
-				inode_unlock(&ncac_req->mapping->lock);
-
-            	return ret;
-            }
-            cnt += seg;
-        }
-    }
-
-	inode_unlock(&ncac_req->mapping->lock);
-
-    for (seg = 0; seg < ncac_req->cbufcnt; seg ++)
-         if (ncac_req->cbufflag[seg] == 1 ) rcomm++;
-         
-    if (rcomm == ncac_req->cbufcnt) ncac_req->status = NCAC_BUFFER_COMPLETE;
-    else if (!rcomm) ncac_req->status = NCAC_REQ_SUBMITTED;
-    else ncac_req->status = NCAC_PARTIAL_PROCESS;
-
     return 0;
 
 } /* end of  do_a_write_job */
 
 
-/* NCAC_do_one_piece_write(): handle one contiguous block write.
- * return:
- *    < 0: error
- *    = 0: no error
- *    at the same time, ncac_req->error shows error no if any.
- *    ncac_req->status shows the status of this one piece.
- *
- *    TODO: 1) use gang lookup;
- *          2) allocate contiguous extents from a bigger buffer
- */ 
-
-int NCAC_do_one_piece_write(NCAC_req_t *ncac_req, PVFS_offset pos, 
-                            PVFS_size size, char **cbufoff, 
-                            PVFS_size *cbufsize, struct extent *cbufhash[],
-                            int *cbufflag, 
-							int *cbufrcnt,
-							int *cbufwcnt,
-							int *cnt)
-{
-    unsigned long index, offset, nr;
-    struct extent *cached_ext;
-    struct extent *extent;
-    int error;
-    int ret;
-
-    struct aiovec *aiovec;
-    int ioreq;
-
-    int cbufcnt;
-    int i;
-
-    /* each inode has an aiovec */
-    aiovec = get_aiovec(ncac_req);
-    aiovec_init(aiovec);
-
-    cbufcnt = (pos+size+ NCAC_dev.extsize -1)/NCAC_dev.extsize - pos/NCAC_dev.extsize;
-    *cnt = cbufcnt;
-
-    cached_ext = NULL;
-    index =  pos >> NCAC_dev.extlog2;
-    
-    for (i=0; i< cbufcnt; i++) {
-		nr = cbufsize[i];
-
-        if ( cbufhash[i] ) { /* extent is avaiable. */
-
-            DPRINT("Write recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
-			/* ugly here: 1: ok, 2: read-modify-write */
-            if ( cbufflag[i] == 1 ) { /* has been assigned */
-	            index ++;
-				pos += nr;
-				continue;
-            }
-
-
-            /* Are previous reads and writes pending on this? 
-             * "+1" to exclude the request itself.
-             */
-            if ( cbufwcnt[i] >  cbufhash[i]->wcmp + 1 || 
-                 cbufrcnt[i] >  cbufhash[i]->rcmp  )  {
-	            index ++;
-				pos += nr;
-				continue;
-            }
-
-            /* this is only for error check */
-            if ( cbufwcnt[i] <  cbufhash[i]->wcmp + 1 ||
-                 cbufrcnt[i] <  cbufhash[i]->rcmp ) {
-                NCAC_error("Error: r/wcnt should not be less than r/wcmp\n");
-	            index ++;
-				pos += nr;
-				continue;
-            }
-
-            /* no other pending read or writes on this extent */
-            extent = cbufhash[i];
-            offset = cbufoff[i] - extent->addr;
-
-		    error = NCAC_extent_write_access_recheck(ncac_req, extent, offset, nr);
-		    if (error < 0){
-			    ncac_req->error = error;	
-                fprintf(stderr, "NCAC_extent_read_access_recheck error  extent=%p\n", extent);
-			    return error;
-		    }
-            cbufflag[i] = error;
-
-            DPRINT("Write recheck: cbufrcnt[%d]=%d, cbufwcnt[%d]=%d, e.rcmp=%d, e.wcmp=%d, extent flags=%lx (cbufflag=%d)\n", i, cbufrcnt[i], i, cbufwcnt[i], cbufhash[i]->rcmp, cbufhash[i]->wcmp, cbufhash[i]->flags, cbufflag[i]);
-
-			
-            
-            index ++;
-            pos += nr;
-            continue;
-        }
-
-
-        offset = (unsigned long)pos & (NCAC_dev.extsize -1);
-
-        extent = NCAC_find_get_ext(ncac_req, index);
-
-        if (extent == NULL) {
-            goto no_cached_extent;
-        }
-
-		/* the extent is cached */
-		error = NCAC_extent_write_access(ncac_req, extent, offset, nr);
-		if (error < 0){
-			ncac_req->error = error;	
-			return error;
-		}
-
-        cbufflag[i] = error; /* 1 for ready, 0 for not ready */
-        cbufhash[i] = extent;
-
-
-        /* how many reads and writes pending on this extent before
-         * this request */
-        cbufrcnt[i] = extent->reads;
-        cbufwcnt[i] = extent->writes;
-
-        DPRINT("index=%ld is cached: extent flags:%lx cbufflag=%d, reads=%d, writes=%d, rcmp=%d, wcmp=%d\n", index, extent->flags, cbufflag[i], extent->reads, extent->writes, extent->rcmp, extent->wcmp);
-
-        cbufoff[i] += (unsigned long)extent->addr;
-
-		/* prepare for the next extent */
-		index += 1;
-        pos += nr;
-
-        continue; /* continue for the next extent */
-
-no_cached_extent:
-        /* the extent was not cached. we need to create a new extent. */
-
-        if (!cached_ext) {
-            cached_ext = NCAC_alloc_ext_wait(ncac_req);
-            if (!cached_ext) {
-		        cbufflag[i] = 0;
-            }else{
-            	NCAC_extent_first_write_access(ncac_req, cached_ext);
-
-				cached_ext->index = index;
-				cached_ext->mapping = ncac_req->mapping;
-
-		        cbufflag[i] = 1;
-
-		        cbufrcnt[i] = cached_ext->reads;
-        		cbufwcnt[i] = cached_ext->writes;
-
-
-	            /* deal with read, modify and write. In the case if the write size is
-		         * not the whole write unit, we should read it first, modify it, and 
-		         * then write.
-		         */
-                if ( cbufflag[i] && ( cbufoff[i] || cbufsize[i] <  NCAC_dev.extsize )){
-			        DPRINT("--------do read-modify-write\n");
-
-			        do_read_for_rmw(ncac_req->coll_id, 
-									ncac_req->handle, 
-									ncac_req->context_id, 
-									cached_ext, 
-									pos, 
-									cbufoff[i], 
-									cbufsize[i], 
-									&ioreq);
-			        mark_extent_rmw_lock(cached_ext, ioreq); 
-			        cbufflag[i] = 2;
-		        }
-                cbufoff[i] += (unsigned long)cached_ext->addr;
-
-            	ret = NCAC_add_to_cache(cached_ext,index, ncac_req);
-
-            	if (ret) {
-		            cbufflag[i] = 0;
-                    cbufhash[i] = 0;
-               		ncac_req->error = NCAC_CACHE_ERR;
-                	return ret;
-            	}
-				ncac_req->nr_dirty ++;
-
-                DPRINT("index=%ld is NOT cached: extent flags:%lx reads=%d, writes=%d, rcmp=%d, wcmp=%d, cbufoff=%p, flag=%d, size=%Ld pos=%Ld\n", index, cached_ext->flags, cached_ext->reads, cached_ext->writes, cached_ext->rcmp, cached_ext->wcmp, cbufoff[i], cbufflag[i], cbufsize[i], pos);
-			}
-		}
-
-        extent = cached_ext;
-        cached_ext = NULL;
-
-        cbufhash[i] = extent;
-     
-		/* prepare for the next extent */
-		index += 1;
-        pos += nr;
-    }
-
-	return 0;
-
-} /* end of do_one_piece_write */
-
 int NCAC_do_a_query_job(struct NCAC_req *ncac_req)
 {
     NCAC_error("NCAC_do_a_query_job: not implemented yet\n");
@@ -642,73 +179,40 @@ int NCAC_do_a_sync_job(struct NCAC_req *
     return 0;
 }
 
-/* some internal functions */
 
-/* NCAC_find_get_ext(): try to find an extent from the inode cache tree.
- * This operation is protected by the inode lock. The caller should acquire
- * the inode lock.
+/* 
+ * find_extent(): try to find an extent from the inode cache tree.
+ * This operation is protected by the inode lock. The caller should 
+ * acquire the inode lock.
  */
-static inline struct extent * NCAC_find_get_ext(NCAC_req_t *ncac_req, unsigned long index)
+static inline struct extent *find_extent(NCAC_req_t *ncac_req, 
+                                        unsigned long index)
 {
     struct extent *avail;
 
     avail = lookup_cache_item(ncac_req->mapping, index);
-
-#if 0 /* take this back when we have finer lock */
-	if ( avail ) { /* add its reference count to prevent disappearance */
-		extent_ref_get( avail );
-	}
-#endif
-
     return avail;
-
 }
 
 
-/* NCAC_alloc_ext(): get a new extent
- * The caller should have an inode lock
- */
-
-static inline struct extent * NCAC_alloc_ext(NCAC_req_t *ncac_req)
-{
-    struct extent *new = NULL;
-	struct cache_stack *cache;
-    char *buf;
-
-	cache = ncac_req->mapping->cache_stack;
-
-    if ( !list_empty( &cache->free_extent_list ) ) {
-
-		cache_lock( &cache->lock);
-
-        new = get_free_extent_list_item( &(cache->free_extent_list) );
-
-		cache_unlock(&cache->lock);
-	}	
-
-    if (!new) return NULL;
-
-    buf = new->addr;
-    memset(new, 0, sizeof(struct extent));
-    new->addr = buf;
-    SetPageBlank(new);
-    fprintf(stderr, "new extent:%p, flags:%lx\n", new, new->flags);
-	return new;
-}
-
-
-/* NCAC_alloc_ext_wait(): if no extent is avaiable, discard some if possible. 
- * Lock problem is a little more difficult than others since this funtion may
- * interact with the inode resource and the cache resource.
+/* 
+ * allocate_extent(): get a new extent. The caller should have 
+ * an inode lock. The flag is either BLOCKING_EXTENT_ALLOC or
+ * NONBLOCKING_EXTENT_ALLOC.
+ * If the "flag" is BLOCKING_EXTENT_ALLOC, if no extent is avaiable, 
+ * discard some if possible. Lock problem is a little more difficult 
+ * than others since this funtion may interact with the inode resource 
+ * and the cache resource.
  * 
  * This function is called by functions which holds its inode lock,
  * only cache stack lock is needed.
- * .
  */
-static inline struct extent * NCAC_alloc_ext_wait(NCAC_req_t *ncac_req)
+
+static inline struct extent *allocate_extent(NCAC_req_t *ncac_req, int flag)
 {
     struct extent *new = NULL;
 	struct cache_stack *cache;
+    int shrinked;
 
     char *buf;
     int ret;
@@ -716,12 +220,9 @@ static inline struct extent * NCAC_alloc
 	cache = ncac_req->mapping->cache_stack;
 
     if ( !list_empty( &cache->free_extent_list ) ) {
-
 		cache_lock( &cache->lock);
-
         new = get_free_extent_list_item( &(cache->free_extent_list) );
-
-		cache_unlock(&cache->lock);
+	    cache_unlock(&cache->lock);
 
         if ( new ) {
     		buf = new->addr;
@@ -731,46 +232,102 @@ static inline struct extent * NCAC_alloc
     		DPRINT("new extent:%p, flags:%lx\n", new, new->flags);
 			return new;
 		}
+    }
+
+    /* No free extent so far */
+    if ( BLOCKING_EXTENT_ALLOC == flag ){
 
-	}
+		cache_lock( &cache->lock);
+        ret = shrink_cache(cache, DELT_DISCARD_NUM, LRU_POLICY, &shrinked); 
+        if ( ret < 0 ) {
+            ncac_req->error = ret;
+		    cache_unlock(&cache->lock);
+            return NULL;
+        }
+        new = get_free_extent_list_item( &(ncac_req->mapping->cache_stack->free_extent_list) );
+	    cache_unlock(&cache->lock);
 
-	cache_lock( &cache->lock);
+	    if ( !new ) return NULL;
+        else {
+    		buf = new->addr;
+   			memset(new, 0, sizeof(struct extent));
+    		new->addr = buf;
+    		SetPageBlank(new);
+    		DPRINT("new extent:%p, flags:%lx\n", new, new->flags);
+			return new;
+	    }
+    }
 
-    ret = try_to_discard_extents(cache, DELT_DISCARD_NUM); 
-    if ( ret < 0 ) {
-        ncac_req->error = ret;
-		cache_unlock(&cache->lock);
-        return NULL;
-	}
+    return NULL;
+}
 
-    new = get_free_extent_list_item( &(ncac_req->mapping->cache_stack->free_extent_list) );
+/* add it later 
+ * free_extent: return an extent to a list
+ */
+static inline int free_extent(NCAC_req_t *ncac_req,struct extent *extent)
+{
+    return 0;
+}
 
-	cache_unlock(&cache->lock);
+/* 
+ * init_extent_read: initiate trove request to read an extent. The
+ * file offset is "foffset", and the size is "size".
+ */
+static inline int init_extent_read(NCAC_req_t *ncac_req, 
+                   struct extent *extent, PVFS_offset foffset, PVFS_size size)
+{
+    int ret;
+    PVFS_id_gen_t ioreq;
 
-	if ( !new ) return NULL;
 
-    buf = new->addr;
-    memset(new, 0, sizeof(struct extent));
-    new->addr = buf;
-    new->ioreq = INVAL_IOREQ;
-    SetPageBlank(new);
-    DPRINT("new extent:%p, flags:%lx, ioreq=%d\n", new, new->flags, new->ioreq);
-	return new;
+    ret = init_io_read(ncac_req->coll_id, ncac_req->handle, 
+            ncac_req->context_id, foffset, size, extent->addr, &ioreq);
+    if ( ret < 0 ) {
+        NCAC_error("init_io_read error\n");
+        return ret;
+    }
+    extent->ioreq = ioreq;
+    fprintf(stderr, "init_extent_read: foff:%Ld, size:%Ld, extent:%p, opid:%Ld\n", foffset, size, extent, ioreq);
+    return 0;
 }
 
+static inline void set_extent_read_pending(struct extent *extent)
+{
+    ClearPageBlank(extent);
+	SetPageReadPending(extent);
+}
 
-
-static inline int NCAC_add_to_cache(struct extent * extent,unsigned long index, NCAC_req_t *ncac_req)
+static inline int check_extent_read(NCAC_req_t *ncac_req, struct extent *extent)
 {
     int ret;
 
-    ret = add_cache_item(extent, ncac_req->mapping, index);
+    ret = NCAC_check_ioreq(extent);
+    if ( ret > 0 ){
+         ClearPageReadPending(extent);
+        SetPageClean(extent);
+        return 1;
+    }
+    return 0;
+}
 
-    return ret;
+static inline void increase_read_reference(struct extent *extent)
+{
+    extent->reads ++;
+    return;
+}
+
+static inline void increase_write_reference(struct extent *extent)
+{
+    extent->reads ++;
+    return;
 }
 
-static inline int NCAC_read_ext(struct extent *extent, PVFS_offset offset, unsigned long nr)
+static inline int add_extent_to_cache(struct extent * extent,
+            unsigned long index, NCAC_req_t *ncac_req, int policy)
 {
-   extent->ioreq = 0;
-   return 0;
+    int ret;
+
+    ret = add_cache_item(extent, ncac_req->mapping, index, policy);
+
+    return ret;
 }

Index: ncac-trove.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-trove.c,v
diff -p -u -r1.1 -r1.2
--- ncac-trove.c	21 Aug 2003 18:57:27 -0000	1.1
+++ ncac-trove.c	21 Sep 2004 13:46:13 -0000	1.2
@@ -271,7 +271,6 @@ int NCAC_check_ioreq(struct extent *exte
     int count;
     int ret;
 
-
     op_id = extent->ioreq;
 
     if ( op_id == INVAL_IOREQ ) {
@@ -287,8 +286,8 @@ int NCAC_check_ioreq(struct extent *exte
     ret = trove_dspace_test(coll_id, op_id, context_id, &count, NULL, NULL, &state, TROVE_DEFAULT_TEST_TIMEOUT);
 
     if ( ret > 0 ) {
-    	DPRINT("++++++++++++NCAC_check_ioreq: finished %Ld\n", op_id);
-	extent->ioreq = INVAL_IOREQ;
+    	fprintf(stderr, "++++++++++++NCAC_check_ioreq: finished %Ld\n", op_id);
+        extent->ioreq = INVAL_IOREQ;
     }
 
     return ret;
@@ -372,4 +371,29 @@ static inline void  offset_shorten( int 
     *new_m_cnt = seg;
 
     return;
+}
+
+int init_io_read( PVFS_fs_id coll_id, PVFS_handle handle, 
+        PVFS_context_id context, PVFS_offset foffset, 
+        PVFS_size size, void *buf, TROVE_op_id *ioreq)
+{
+    void *user_ptr_array[1] = { (char *) 13 };
+    int ret;
+
+    ret = trove_bstream_read_at(coll_id,
+                               handle,
+                               buf,
+                                &size,
+                                foffset,
+                                  0, /* flags */
+                                  NULL, /* vtag */
+                                  user_ptr_array,
+                                  context,
+                                  ioreq);
+
+    if (ret < 0) {
+        NCAC_error("trove read at failed\n");
+        return -1;
+    }
+    return 0;
 }

Index: ncac-trove.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/ncac-trove.h,v
diff -p -u -r1.1 -r1.2
--- ncac-trove.h	21 Aug 2003 18:57:27 -0000	1.1
+++ ncac-trove.h	21 Sep 2004 13:46:13 -0000	1.2
@@ -27,5 +27,8 @@ int do_read_for_rmw( PVFS_fs_id coll_id,
 					 int size, 
 					 int *ioreq);
 
+int init_io_read( PVFS_fs_id coll_id, PVFS_handle handle,
+        PVFS_context_id context, PVFS_offset foffset,
+        PVFS_size size, void *buf, PVFS_id_gen_t *ioreq);
 
 #endif  /* __CACHE_STORAGE_H */

Index: state.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/state.c,v
diff -p -u -r1.4 -r1.5
--- state.c	17 Nov 2003 19:19:29 -0000	1.4
+++ state.c	21 Sep 2004 13:46:13 -0000	1.5
@@ -215,8 +215,9 @@ int NCAC_extent_done_access(NCAC_req_t *
 				NCAC_extent_read_comm_done (ncac_req->cbufhash[i]);
 				//DecReadCount(ncac_req->cbufhash[i]);
 				ncac_req->cbufhash[i]->rcmp++;
+				ncac_req->cbufhash[i]->reads--;
 			}
-      		}
+        }
   	}
 
 	if (ncac_req->optype == NCAC_WRITE)

Index: state.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/buffer/state.h,v
diff -p -u -r1.1 -r1.2
--- state.h	21 Aug 2003 18:57:27 -0000	1.1
+++ state.h	21 Sep 2004 13:46:13 -0000	1.2
@@ -1,6 +1,9 @@
 #ifndef __STATE_H_
 #define __STATE_H_
 
+#define BLOCKING_EXTENT_ALLOC       1
+#define NONBLOCKING_EXTENT_ALLOC    0
+
 int NCAC_extent_read_access(NCAC_req_t *req, struct extent *page,
                         unsigned long offset, unsigned long size);
 int NCAC_extent_write_access(NCAC_req_t *req, struct extent *page,
@@ -17,5 +20,6 @@ int NCAC_extent_write_access_recheck(NCA
                         unsigned int offset, unsigned int size);
 int NCAC_extent_done_access(NCAC_req_t *ncac_req);
 void mark_extent_rmw_lock(struct extent *extent, int ioreq);
+void list_set_clean_page(struct extent *page);
 
 #endif



More information about the PVFS2-CVS mailing list