[Pvfs2-cvs] commit by kunkel in pvfs2/src/io/trove/trove-tas: tas.h tas-handle-manager.h red-black-tree-test.c tas-queue.c tas.c tas-queue.h tas-handle-manager.c red-black-tree.h module.mk.in red-black-tree.c README

CVS commit program cvs at parl.clemson.edu
Sun Oct 29 08:19:13 EST 2006


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-tas
In directory parlweb1:/tmp/cvs-serv25603/src/io/trove/trove-tas

Added Files:
      Tag: pvfs2-kunkel-tas-branch
	tas.h tas-handle-manager.h red-black-tree-test.c tas-queue.c 
	tas.c tas-queue.h tas-handle-manager.c red-black-tree.h 
	module.mk.in red-black-tree.c README 
Log Message:
Trove TAS modul added.


--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas.h	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,16 @@
+/*
+ * Author Julian Kunkel 
+ * Date: 2005
+ */
+ 
+#ifndef __TROVE_TAS_H
+#define __TROVE_TAS_H
+#include "gossip.h"
+#include "trove.h"
+#include "trove-internal.h"
+	
+#define DUMMY_OPNUM -1
+#define RETURN_IMMEDIATE_COMPLETE 1
+#define RETURN_TEST_COMPLETE 0
+
+#endif

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas-handle-manager.h	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,28 @@
+#ifndef TASHANDLEMANAGER_H_
+#define TASHANDLEMANAGER_H_
+
+#include "red-black-tree.h"
+#include "trove.h"
+
+#include "gen-locks.h" 
+
+typedef PVFS_handle_extent tas_handleRange;
+
+void initialiseHandleManager(void);
+/*
+ * If handle manager should maintain only one handle range for a collection set
+ * lowerBound2 = 0
+ */
+void initialiseCollection(TROVE_coll_id coll_id,TROVE_handle lowerBound, 
+	TROVE_handle upperBound,TROVE_handle lowerBound2, TROVE_handle upperBound2);
+
+TROVE_handle getHandle(TROVE_coll_id coll_id,TROVE_handle_extent_array * extends);
+TROVE_handle getfixedHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+TROVE_handle getArbitraryHandle(TROVE_coll_id coll_id,TROVE_handle handleSep);
+
+int lookupHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+
+int freeHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+int freeHandleRange(TROVE_coll_id coll_id,TROVE_handle lowerhandle,TROVE_handle upperhandle);
+
+#endif /*TASHANDLEMANAGER_H_*/

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree-test.c	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,138 @@
+#include<stdlib.h>
+#include<stdio.h>
+#include<assert.h>
+#include<math.h>
+
+#include "red-black-tree.h"
+
+int inorderCheck(tree_node* node,int depth, int * nodes, int redNodes, int blackNodes, int * leafBlackNodes){
+	if(node == NULL) return 0;
+	int maxdepth = depth+1;
+	int retdepth =0;
+
+	if(depth == 0){ //initialise leaf black nodes
+		*leafBlackNodes = -1;
+	}	
+	
+	(*nodes)++;
+	if(node->color == RED)
+		redNodes++;
+	else
+		blackNodes++;
+	if(node->left != NULL){
+		if(node->color == RED && node->left->color == RED){
+			printf("WARNING two red nodes ...");
+			assert(0);
+		}
+		retdepth=inorderCheck(node->left,depth+1, nodes,redNodes,blackNodes, leafBlackNodes);
+		if(retdepth > maxdepth) maxdepth = retdepth;
+		if(node->left->parent != node){
+			printf("ERROR wrong parent node %lld parent %lld", *((int64_t*) node->left->data), *((int64_t*) node->data));
+			assert(0);
+		}
+		if(compareInt64(node->left->data,node->data) != +1){
+			printf("ERROR Left key!\n");
+			assert(0);
+		}
+	}
+	if(node->right != NULL){
+		if(node->color == RED && node->right->color == RED){
+			printf("WARNING two red nodes node and right son...");
+			assert(0);
+		}		
+		if(compareInt64(node->right->data,node->data) != -1){
+			printf("ERROR right key!\n");
+			assert(0);
+		}
+		if(node->right->parent != node){
+			printf("ERROR wrong parent node %lld parent %lld", *((int64_t*) node->right->data), *((int64_t*) node->data));
+			assert(0);			
+		}
+		
+		retdepth=inorderCheck(node->right,depth+1, nodes,redNodes,blackNodes,leafBlackNodes);
+		if(retdepth > maxdepth) maxdepth = retdepth;
+	}	
+	
+	if(node->left == NULL && node->right == NULL){ //leaf, check if blackNodeCondition is held
+		 if(*leafBlackNodes == -1){  //first leaf is reached
+		 	*leafBlackNodes = blackNodes;
+		 }
+		 if(blackNodes != *leafBlackNodes){
+		 	printf(" WARNING  blackNodes to leafs are different now:%d should be:%d condition not held\n",blackNodes, *leafBlackNodes);
+		 	assert(0);
+		 }
+	}
+	
+	if(depth == 0){
+		double nld2 = log((double) *nodes)/log(2.0)*2+1;
+		if( maxdepth > nld2 )
+			printf(" WARNING tree with %d nodes to deep should be max:%f is %d\n", *nodes, nld2, maxdepth);
+	}
+	return maxdepth;
+}
+
+int64_t numexists=0;
+
+void createRandomTreeNode(red_black_tree * tree){
+	int num=rand()%10000;
+	int64_t * data = malloc(sizeof(int64_t));
+	*data = num;
+    insertKeyIntoTree( (RBData*)data, tree);
+	numexists = num;
+}
+
+int main(int argc, char ** argv){
+	
+ 	red_black_tree * tree = newRedBlackTree(compareInt64,compareInt64);
+	int i;
+	int nodecount=0,nodecountA;
+	int blackNodes;
+	int depth=0,depthA;
+	
+	RBData * data = malloc(sizeof(int));
+	int one=1;
+ 	data = (RBData*) &one;
+ 	int two=2;
+	printf("CompareInt64 1,2 %d\n",compareInt64((RBData*) &one,(RBData*) &two));
+	printf("CompareInt64 2,1 %d\n",compareInt64((RBData*) &two,(RBData*) &one));
+	printf("CompareInt64 1,1 %d\n",compareInt64((RBData*) &one,(RBData*) &one));	
+	
+	int seed=3;
+	
+	if(argc == 2){
+		sscanf(argv[1],"%d",&seed);
+	}
+	
+	if(seed == 0){
+		printf("Syntax: %s randomSeedNum\n",argv[0]);
+		exit(1);
+	}
+	//for debugging purpose take argv as initialisation value
+	srand(seed);
+	
+	for(i=0; i < 5000; i++){
+		createRandomTreeNode(tree);
+	    nodecountA = 0;		
+	    depthA=inorderCheck(tree->head,0,& nodecountA,0,0,& blackNodes);
+	}
+	printf("depth for %d nodes is %d\n",nodecountA,depthA);
+	nodecount = -1;
+	depth = -1;
+	printf("Try to delete existing num %llu, %p\n", numexists, lookupTree((void*)&numexists,tree) );
+	
+	for(i=0; i < 5000; i++){
+		int64_t num=rand()%10000;
+		if(lookupTree((void*)&num,tree) == NULL) continue;
+		tree_node * node = lookupTree((void*)&num,tree);
+		free(node->data);
+    	deleteNodeFromTree(node,tree);
+	    nodecount = 0;
+	    depth=inorderCheck(tree->head,0,& nodecount,0,0,& blackNodes);
+	}
+
+	printf("Test correctly done\n");
+	printf("depth for %d nodes is %d\n",nodecountA,depthA);
+	printf("depth now for %d nodes is %d\n" ,nodecount,depth);
+
+ 	return 0;
+}

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas-queue.c	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,96 @@
+#include<stdio.h>
+#include "tas-queue.h"
+
+tas_queue *newTasQueue(void){
+	tas_queue * ret=malloc(sizeof(tas_queue));
+	*ret = NULL;
+	return ret;
+}
+
+void relinkFront(queue_elem * elem, tas_queue* queue){
+		if(elem->prev != NULL){ 
+			//put found element at front of list...
+			queue_elem * tmp = elem->prev;				
+			(*queue)->prev = elem; //is NULL before
+			if(elem->next != NULL){
+				elem->next->prev = elem->prev;
+			}
+			tmp->next = elem->next;
+			//now make found the first element...
+			elem->next = *queue;
+			elem->prev = NULL;
+			*queue = elem;
+		}
+}
+
+void *  insertKeyIntoQueue(queueKey key, void * data, tas_queue* queue){
+	queue_elem * elem = lookupQueue(key, queue);
+	void * olddata=NULL;
+	if(elem == NULL){
+		pushFrontKey(key, data,queue);
+	}else{
+		//replace existing data
+		olddata = elem->data;
+		elem->data = data;
+		relinkFront(elem,queue);
+	}
+	return olddata;
+}
+
+queue_elem * pushFrontKey(queueKey key, void * data, tas_queue* queue){
+	queue_elem * elem = malloc(sizeof(queue_elem));
+	elem->data = data;
+	elem->key = key;
+	elem->prev = NULL;
+	elem->next = *queue;
+	if(*queue != NULL){
+		(*queue)->prev = elem;
+	}
+	*queue = elem;
+	return elem;
+}
+
+queue_elem * lookupQueue(queueKey key, tas_queue * queue){
+	queue_elem * found=*queue;
+	int i=0;
+	for(; found != NULL ; found = found->next){
+		if(found->key == key){
+			if( i > WARNING_NUM){
+				printf(" WARNING: lookupQueue takes %d iterations\n",i);
+			}
+			return found;
+		}
+		i++;
+	}
+	if( i > WARNING_NUM){
+		printf(" WARNING: lookupQueue takes %d iterations and did not find handle\n",i);
+	}	
+	return NULL;
+}
+
+void deleteElementFromList(queue_elem * found, tas_queue* queue){
+		if(found->prev == NULL){ //first entry
+			*queue = found->next;
+			if(found->next != NULL){
+				found->next->prev = NULL;
+			}
+		}else{ //not first entry
+			found->prev->next = found->next;
+			if(found->next != NULL){
+				found->next->prev = found->prev;
+			}
+		}
+}
+
+void * deleteKeyFromList(queueKey key, tas_queue* queue){
+	queue_elem * found=lookupQueue(key,queue);
+	if(found == NULL) return NULL;
+	deleteElementFromList(found,queue);
+	void * data;			
+	data = found->data;
+	free(found); 
+	return data;
+}
+
+
+

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas.c	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,1765 @@
+/*
+ * Author Julian Kunkel
+ * Date: 2005
+ */
+
+#include <stdio.h>
+#include <string.h>
+/*for unlink:*/
+#include <unistd.h>
+#include <assert.h>
+
+#include "tas.h"
+#include "gen-locks.h"
+#include "pvfs2-internal.h"
+
+extern struct TROVE_dspace_ops dbpf_dspace_ops;
+extern struct TROVE_mgmt_ops dbpf_mgmt_ops;
+
+#define TRUE 1
+#define FALSE 0
+
+#if defined(TAS_USE_RED_BLACKTREE)
+#elif defined(TAS_USE_QUEUE)
+#else
+    #warning "No implementation for TROVE-TAS metadata selected.\n Choosing Red-Black-Tree"
+    #define TAS_USE_RED_BLACKTREE
+#endif
+    
+
+/*
+ * There are two different implementations which may be choosen while compile time:
+ * 1) A very fast queue implementation which pushes
+ * the last used handle in front of the queue (speedup for second lookup).
+ * However the queue implementation is for the creation of very much metafiles
+ * slow because all handles have to be checked. There are some optimizations for
+ * that implementation, but the creation optimization is not compatible to the
+ * normal trove behaviour !
+ * 2) a red black tree implementation. Takes more memory than the list,
+ * but faster lookup because the depth <= 40 for 100.000 filehandles....
+ * TAS_USE_QUEUE, TAS_USE_RED_BLACKTREE
+ */
+#if defined(TAS_USE_RED_BLACKTREE)
+	#include "red-black-tree.h"
+#elif defined(TAS_USE_QUEUE)
+
+	#include "tas-queue.h"
+	/*
+	 * maximum number of key/value pairs to be read until the functions returns
+	 * that the key/value pair does not exist. This is a optimization for the creation of
+	 * files. However if more files exist in a directory than that number these files cannot
+	 * be accessed !!!! So be warned if you turn on maximumKeyValReadOptimization
+	 */
+	#define maximumKeyValReadOptimization
+
+	//set this value to at least ( number of clients x 10 ),
+	//the number depends on the speed of the netwerk link.
+	#define maximumKeyValRead 100
+#endif
+
+//this optimizations can be turned off:
+//set the file size during IO operations, only small performance speedup.
+#define TAS_UPDATE_FILESIZE
+
+//Do not check if handle is already created in dspace_create...
+#define NO_SAVE_CREATION
+
+
+
+//tas metadata + dspace cache....
+struct keyvalpair_{
+	#if defined(TAS_USE_RED_BLACKTREE)
+		struct keyvalpair_ * next;
+		struct keyvalpair_ * prev; //needed for fast keyval_iteration...
+	#elif defined(TAS_USE_QUEUE)
+		struct keyvalpair_ * next;
+		int64_t keyHash;
+	#endif
+
+	TROVE_keyval_s key;
+	TROVE_keyval_s val;
+};
+
+
+typedef struct keyvalpair_ keyvalpair;
+
+
+struct storeAttributes_{
+    TROVE_ds_type type;
+    PVFS_uid uid;
+    PVFS_gid gid;
+    PVFS_permissions mode;
+    PVFS_time atime;
+    PVFS_time ctime;
+    PVFS_time mtime;
+    uint32_t dfile_count;
+    uint32_t dist_size;
+};
+typedef struct storeAttributes_ storeAttributes;
+
+
+
+ struct handle_ {
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+	 	red_black_tree * pairTree;
+		keyvalpair * pairList;
+ 	#elif defined(TAS_USE_QUEUE)
+ 		keyvalpair * pairs;
+ 	#endif
+
+	PVFS_fs_id fs_id;
+	PVFS_handle handle;
+
+    PVFS_size b_size; /* bstream size */
+    PVFS_size k_size; /* keyval size; # of keys */
+    /* stored attributes are below here */
+
+	storeAttributes * attributes;
+ };
+
+
+//If some other values than only zero bytes should be returned, the
+// buffer of that values should be as large as transfered by the flow protocol.
+//This should be the same value as used by the used flow protocol:
+//currently the buffer_size is defined in the FLOWPROTOXY.c file
+//so this value can not be included here
+//#define BUFFER_SIZE (256*1024)
+
+typedef struct handle_ handleCache;
+
+
+struct handleFSID_{
+	PVFS_handle handle;
+	PVFS_fs_id fs_id;
+};
+typedef struct handleFSID_ handleFSID;
+
+
+static PVFS_size meta_count=0;
+static int initialised=0;
+
+static gen_mutex_t tas_meta_mutex = GEN_MUTEX_INITIALIZER;
+
+static char storageName[200] ="";
+static char storagePath[200] ="";
+
+TROVE_handle aktHandleFirst;
+TROVE_handle handleSeperator; //number which seperates first and second handle range, for example when this server is a meta and dataserver
+TROVE_handle aktHandleSecond;
+
+#if defined(TAS_USE_RED_BLACKTREE)
+	//there might be conflicts in the red-black-tree for keyHashValues
+	//a solution would be to use the full void* key and not a int64_t key
+	//a order might be established by a lexical sort.
+	//a other solution is to link
+	//compare function for the red-black-tree
+	static int compareKeyValPairs(RBData * data, RBKey * key){
+		TROVE_keyval_s ikey1=((keyvalpair*)data)->key;
+		TROVE_keyval_s * ikey2=(TROVE_keyval_s *) key;
+
+		if(ikey1.buffer_sz > ikey2->buffer_sz ){ //take right neigbour
+			return +1;
+		}else if(ikey1.buffer_sz < ikey2->buffer_sz){//take left neigbour
+			return -1;
+		}
+		//same length, compare memory
+		return memcmp(ikey1.buffer,ikey2->buffer, ikey1.buffer_sz);
+	}
+
+	static int compareKeyValPairs2(RBData * data, RBData * data2){
+		TROVE_keyval_s ikey1=((keyvalpair*)data)->key;
+		TROVE_keyval_s ikey2=((keyvalpair*)data2)->key;
+
+		if(ikey1.buffer_sz > ikey2.buffer_sz ){ //take right neigbour
+			return +1;
+		}else if(ikey1.buffer_sz < ikey2.buffer_sz){//take left neigbour
+			return -1;
+		}
+		//same length, compare memory
+		return memcmp(ikey1.buffer,ikey2.buffer, ikey1.buffer_sz);
+	}
+
+	//for comparision of handles:
+	static int compareHandle(RBData * data, RBKey * key){
+		handleCache * ikey1=(handleCache *) data;
+		handleFSID * ikey2=(handleFSID *) key;
+
+		if(ikey1->handle > ikey2->handle ){
+			return +1;
+		}else if(ikey1->handle < ikey2->handle){
+			return -1;
+		}
+
+		if(ikey1->fs_id > ikey2->fs_id ){
+			return +1;
+		}else if(ikey1->fs_id < ikey2->fs_id){
+			return -1;
+		}
+		return 0;
+	}
+
+	static int compareHandle2(RBData * data, RBData * data2){
+		handleCache * ikey1=(handleCache *) data;
+		handleCache * ikey2=(handleCache *) data2;
+
+		if(ikey1->handle > ikey2->handle ){
+			return +1;
+		}else if(ikey1->handle < ikey2->handle){
+			return -1;
+		}
+
+		if(ikey1->fs_id > ikey2->fs_id ){
+			return +1;
+		}else if(ikey1->fs_id < ikey2->fs_id){
+			return -1;
+		}
+		return 0;
+	}
+#endif
+
+#if defined(TAS_USE_RED_BLACKTREE)
+	static red_black_tree * tas_meta_cache = NULL;
+#elif defined(TAS_USE_QUEUE)
+	static tas_queue * tas_meta_cache = NULL;
+#endif
+
+//TROVE_handle minMetaHandle = 0;
+//TROVE_handle maxMetaHandle = 0;
+//TROVE_handle minDatafileHandle = 0;
+//TROVE_handle maxDatafileHandle = 0;
+
+//simple word sum is hashval
+static inline int64_t keyHashFunc(void *key, int count){
+	int64_t sum=0;
+	int i;
+	int64_t multiplier=1;
+	char * keyc = (char *) key;
+
+	for(i=0; i < count; i++){
+		sum = sum + multiplier*keyc[i];
+		multiplier=multiplier*10;
+	}
+  
+	return sum;
+}
+
+static inline int keysEqual(
+	#ifndef TAS_USE_RED_BLACKTREE
+		int keyHash1,int keyHash2,
+	#endif
+		int keyLen1,int keyLen2, void * key1, void *key2){
+	return
+	#ifndef TAS_USE_RED_BLACKTREE
+		keyHash1 == keyHash2 &&
+	#endif
+			keyLen1 == keyLen2
+			&& memcmp(key1,key2,keyLen1) == 0 ;
+}
+
+
+
+static handleCache *  allocateNewCacheElement(void){
+		handleCache * found = (handleCache *) malloc(sizeof(handleCache));
+		bzero(found,sizeof(handleCache));
+		found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+		bzero(found->attributes,sizeof(storeAttributes)); //this is neccessary because
+		// in prelude.sm it is expected for a datafile to have a 0 in some fields ..
+		return found;
+}
+
+static void insertCacheElement(handleCache* found,TROVE_handle new_handle, TROVE_coll_id coll_id,
+						TROVE_ds_type type){
+		found->fs_id = coll_id;  //
+		found->attributes->type = type;
+		found->handle = new_handle;
+
+		#if defined(TAS_USE_RED_BLACKTREE)
+			found->pairList = NULL;
+			found->pairTree = NULL;
+			insertKeyIntoTree((RBData*) found,tas_meta_cache);
+		#elif defined(TAS_USE_QUEUE)
+			found->pairs = NULL;
+			pushFrontKey(new_handle,found,tas_meta_cache);
+		#endif
+
+		meta_count++;
+}
+
+static keyvalpair * findPairForCacheEntry(handleCache * cacheEntry, TROVE_keyval_s * key_p){
+	#if defined(TAS_USE_RED_BLACKTREE)
+		if(cacheEntry->pairTree == NULL){
+			return FALSE;
+		}
+		tree_node* node = (tree_node*) (lookupTree(key_p ,cacheEntry->pairTree));
+
+		if(node == NULL)
+			return FALSE;
+		return (keyvalpair*) node->data;
+	#elif defined(TAS_USE_QUEUE)
+		keyvalpair * found_pair;
+		int i=0;
+		int64_t key_hash = keyHashFunc(key_p->buffer,key_p->buffer_sz);
+		found_pair=cacheEntry->pairs;
+		for(; found_pair != NULL ; found_pair = found_pair->next){
+			if( keysEqual(
+				#ifndef TAS_USE_RED_BLACKTREE
+				found_pair->keyHash, key_hash,
+				#endif
+		        found_pair->key.buffer_sz, key_p->buffer_sz ,
+				found_pair->key.buffer,key_p->buffer) ){
+				/* key exists */
+				if( i > 10 ){
+					gossip_err(" WARNING: readToMatchKEY/VALUE: %d\n",i);
+				}
+				return found_pair;
+			}
+			i++;
+		#ifdef maximumKeyValReadOptimization
+			if(i > maximumKeyValRead)
+			//	printf(" KILLED %s\n",(char*)key_p->buffer);
+				return NULL;
+		#endif
+		}
+
+		if( i > 10 ){
+			gossip_err(" WARNING: readToMatchKey did not find value for key %s needed iterations: %d\n", (char*)key_p->buffer,i);
+		}
+	#endif
+
+	return NULL;
+}
+
+static int removePairOfCacheEntry(handleCache * cacheEntry, TROVE_keyval_s * key_p){
+	keyvalpair * found_pair;
+
+
+#if defined(TAS_USE_RED_BLACKTREE)
+		if(cacheEntry->pairTree == NULL)
+			return FALSE;
+	    tree_node * node = lookupTree(key_p,cacheEntry->pairTree);
+		found_pair=(keyvalpair*) (node->data);
+		if(found_pair != NULL) {
+			//delete Node from Tree:
+			deleteNodeFromTree(node,cacheEntry->pairTree);
+
+			//relink pairList:
+			if(found_pair->next != NULL){
+				found_pair->next->prev = found_pair->prev;
+			}
+			if(found_pair->prev == NULL){
+				//this is currently the head of the list
+				cacheEntry->pairList = found_pair->next;
+				if(found_pair->next == NULL){
+					//The Tree is now empty
+					freeEmptyRedBlackTree(& cacheEntry->pairTree);
+				}else{
+					found_pair->next->prev = NULL;
+				}
+			}else{ //not head of cacheEntry->pairList
+				found_pair->prev->next = found_pair->next;
+			}
+			free(found_pair->key.buffer);
+			free(found_pair->val.buffer);
+			free(found_pair);
+			//free(node->key) is not necessary because data and key are equal :)
+
+			return TRUE;
+		}
+		return FALSE;
+	#elif defined(TAS_USE_QUEUE)
+		keyvalpair *old;
+		int i=0;
+		int key_hash = keyHashFunc(key_p->buffer,key_p->buffer_sz) ;
+		found_pair=cacheEntry->pairs;
+		old = found_pair;
+
+		for(; found_pair != NULL ; old=found_pair, found_pair = found_pair->next){
+			if(keysEqual(
+				#ifndef TAS_USE_RED_BLACKTREE
+				found_pair->keyHash, key_hash,
+				#endif
+				found_pair->key.buffer_sz, key_p->buffer_sz ,
+				found_pair->key.buffer,key_p->buffer)){
+				//key exists
+
+				if(old == found_pair){ //first entry
+					cacheEntry->pairs = found_pair->next;
+				}else{ //not first entry
+					old->next = found_pair->next;
+				}
+				free(found_pair->key.buffer);
+				free(found_pair->val.buffer);
+				free(found_pair);
+
+				if( i > 10 ){
+					gossip_err(" WARNING: readToMatchKEY/VALUE remove: %d\n",i);
+				}
+				return TRUE;
+			}
+			i++;
+		}
+		if( i > 10 ){
+			gossip_err(" WARNING: readToMatchKEY/VALUE remove did not find key: %d\n",i);
+		}
+		return FALSE;
+	#endif
+}
+
+
+static inline handleCache * findCacheEntry(TROVE_handle handle, PVFS_fs_id fs_id){
+	#if defined(TAS_USE_RED_BLACKTREE)
+		handleFSID key;
+		key.handle = handle;
+		key.fs_id = fs_id;
+		tree_node * node= lookupTree(& key,tas_meta_cache);
+		if(node == NULL){
+			return NULL;
+		}
+		return (handleCache*) node->data;
+	#elif defined(TAS_USE_QUEUE)
+		queue_elem * elem = lookupQueue(handle, tas_meta_cache);
+		if(elem == NULL){
+			return NULL;
+		}
+		relinkFront(elem, tas_meta_cache);  //push element to front of cache
+		return (handleCache*)elem->data;
+	#endif
+}
+
+static int removeCacheEntry(TROVE_handle handle, PVFS_fs_id fs_id){
+	void * result=NULL;
+
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+		handleFSID key;
+		key.handle = handle;
+		key.fs_id = fs_id;
+		tree_node * node = lookupTree(&key,tas_meta_cache);
+		if(node == NULL) return FALSE;
+		result = node->data;
+		deleteNodeFromTree(node,tas_meta_cache);
+	#elif defined(TAS_USE_QUEUE)
+		result = deleteKeyFromList(handle,tas_meta_cache);
+	#endif
+	if(result != NULL){
+		free(((handleCache*) result)->attributes);
+		free((handleCache*) result);
+		meta_count--;
+		return TRUE;
+	}
+	return FALSE;
+}
+
+static inline void insertKeyValPair(handleCache * found, keyvalpair * pair){
+
+
+#if defined(TAS_USE_RED_BLACKTREE)
+		if(found->pairTree == NULL){ //create new RedBlackTree
+			found->pairTree=newRedBlackTree(compareKeyValPairs,compareKeyValPairs2);
+
+		    pair->next = NULL;
+		    found->pairList = pair;
+		}else{
+			pair->next = found->pairList;
+			found->pairList->prev = pair;
+		    found->pairList = pair;
+		}
+	    pair->prev = NULL;
+	    insertKeyIntoTree((void*) pair, found->pairTree);
+
+	#elif defined(TAS_USE_QUEUE)
+		int64_t keyHash = keyHashFunc(pair->key.buffer,pair->key.buffer_sz);
+		pair->next = found->pairs;
+		found->pairs = pair;
+ 		pair->keyHash = keyHash;
+	#endif
+}
+
+static int tas_initialize (char *stoname,
+		TROVE_ds_flags flags){
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas Initialize\n");
+  int ret = dbpf_mgmt_ops.initialize(stoname, flags);
+
+	FILE * file = fopen (stoname, "r");
+	if(file == 0 ){ /* if storage does not exist... */
+		return -1;
+	}
+	#if defined(TAS_USE_RED_BLACKTREE)
+		tas_meta_cache = newRedBlackTree(compareHandle,compareHandle2);
+	#elif defined(TAS_USE_QUEUE)
+		tas_meta_cache = newTasQueue();
+	#endif
+
+	fclose(file);
+
+	initialised = 1;
+	meta_count = 0;
+
+	gen_mutex_init(& tas_meta_mutex);
+
+	sprintf(storagePath, "%s/tas", stoname);
+	mkdir(storagePath,511);
+	sprintf(storageName, "%s/tas/metadata.txt", stoname);
+
+	printf("TAS options:\n");
+	printf("\tCachetype ");
+
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+		printf("red-black-tree \n");
+	#elif defined(TAS_USE_QUEUE)
+		printf("queue\n");
+	#endif
+
+	#ifdef maximumKeyValReadOptimization
+		printf("\tWARNING maximumKeyValReadOptimization enabled with %d files\n"
+		"\t Some file operations will not be possible with older files...\n",maximumKeyValRead);
+	#endif
+	#ifdef TAS_UPDATE_FILESIZE
+		printf("\tFile write will set filesize properly\n");
+	#else
+		printf("\tFile write will NOT set filesize properly, only resize will resize files\n");
+	#endif
+
+  return ret;
+}
+
+
+struct tas_finalizeData_{
+	FILE * file;
+	int64_t handle_count;
+	int64_t keyval_count;
+	int64_t keyval_mem_size;
+};
+typedef struct tas_finalizeData_ tas_finalizeData;
+
+static void keyvalwrite(void* data, void* funcData){
+   		tas_finalizeData * fdata = (tas_finalizeData* ) funcData;
+		keyvalpair * pair = (keyvalpair*) data;
+   		FILE* file = fdata->file;
+
+		fprintf(file,"  Key-size:%d Val-size:%d  ", pair->key.buffer_sz, pair->val.buffer_sz);
+		fwrite(pair->key.buffer,1,pair->key.buffer_sz, file);
+		fprintf(file," Value:");
+		fwrite(pair->val.buffer,1,pair->val.buffer_sz, file);
+		fprintf(file,"\n");
+
+		/* count for memory usage */
+		fdata->keyval_count++;
+		fdata->keyval_mem_size+=pair->key.buffer_sz + pair->val.buffer_sz;
+}
+
+static void datawrite(void* data, void* funcData){
+   	tas_finalizeData * fdata = (tas_finalizeData* ) funcData;
+	handleCache * found = (handleCache*) data;
+
+  FILE* file = fdata->file;
+	fprintf(file,"Handle:%Lu Type:%u FS-ID:%d UID:%u GID:%u MODE:%u DFILE_COUNT:%u DIST_SIZE:%u BYTE_SIZE:%llu KEYS:%llu CTIME:%llu MTIME:%llu ATIME:%llu\n",
+		found->handle, found->attributes->type,found->fs_id,
+		found->attributes->uid,  found->attributes->gid, found->attributes->mode,
+		found->attributes->dfile_count, found->attributes->dist_size,
+		found->b_size, found->k_size,
+		found->attributes->ctime, found->attributes->mtime,found->attributes->atime );
+	fdata->handle_count++;
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+	    if( found->pairTree != NULL){
+	    	fdata->keyval_mem_size+= sizeof(red_black_tree);
+			iterateRedBlackTree(keyvalwrite, found->pairTree, fdata );
+	    }
+	#elif defined(TAS_USE_QUEUE)
+		keyvalpair * pair = found->pairs;
+		for(; pair != NULL; pair = pair->next)
+			keyvalwrite(pair, NULL,fdata);
+	#endif
+}
+
+ /* calculate the approximate memory usage... */
+static int tas_finalize (void){
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas finalize and save metadata to disk ... \n");
+
+	if(! initialised || meta_count == 0 ) return RETURN_IMMEDIATE_COMPLETE;
+
+	FILE * file = fopen (storageName, "w");
+	if(file == 0){
+		gossip_debug(GOSSIP_TROVE_DEBUG," can not save metadata !!!!\n");
+	    return RETURN_IMMEDIATE_COMPLETE;
+	}
+	/* do not take care of memory managment because server is terminating... */
+
+	fprintf(file,"Count:%llu\n",meta_count);
+    tas_finalizeData * fdata = malloc(sizeof(tas_finalizeData));
+    fdata->file = file;
+    fdata->handle_count = 0;
+   	fdata->keyval_count = 0;
+   	fdata->keyval_mem_size = 0;
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+		iterateRedBlackTree(datawrite, tas_meta_cache, fdata );
+	#elif defined(TAS_USE_QUEUE)
+		iterate_tas_queue(datawrite(elem->data, & elem->key,fdata) ,elem,tas_meta_cache);
+	#endif
+
+	fclose(file);
+
+	/* calculate memory_usage */
+	int64_t memory_usage;
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+		memory_usage = fdata->handle_count*(sizeof(storeAttributes)+ sizeof(handleCache)+sizeof(tree_node))
+			+fdata->keyval_count*(sizeof(keyvalpair)+sizeof(tree_node))+fdata->keyval_mem_size;
+	#elif defined(TAS_USE_QUEUE)
+		memory_usage = fdata->handle_count*(storeAttributes)+sizeof(handleCache)+sizeof(queue_elem))
+			+fdata->keyval_count*(sizeof(keyvalpair))+fdata->keyval_mem_size;
+	#endif
+
+	printf(" TAS handles:%lld keyval-pairs:%lld with buffers:%lld\n"
+	       "     approximate metadata memory usage: %lld Bytes - %f MByte \n",
+		fdata->handle_count, fdata->keyval_count, fdata->keyval_mem_size, memory_usage,
+		(double)memory_usage/1024/1024 );
+
+	return dbpf_mgmt_ops.finalize();
+}
+
+static int tas_storage_create (char *stoname, 
+  void *user_ptr, TROVE_op_id * out_op_id_p){
+  int ret;
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_storage_create here: %s \n", 
+    stoname);
+  ret = dbpf_mgmt_ops.storage_create(stoname, user_ptr, out_op_id_p);
+	FILE * file = fopen (storageName, "w");
+	fclose(file);
+ 	return ret;
+}
+
+static int
+tas_storage_remove (char *stoname, void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_storage_remove\n");
+	if(initialised){
+		unlink(storageName);
+	}
+  
+  return dbpf_mgmt_ops.storage_remove(stoname, user_ptr, out_op_id_p);;
+}
+
+static int tas_collection_create(char *collname,
+                                  TROVE_coll_id new_coll_id,
+                                  void *user_ptr,
+                                  TROVE_op_id *out_op_id_p)
+{
+  return dbpf_mgmt_ops.collection_create(collname, new_coll_id,
+    user_ptr, out_op_id_p);
+}
+
+static int
+tas_collection_remove (char *collname,
+		       void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+  return dbpf_mgmt_ops.collection_remove(collname, user_ptr, out_op_id_p);
+}
+
+static int
+tas_collection_iterate (TROVE_ds_position * inout_position_p,
+			TROVE_keyval_s * name_array,
+			TROVE_coll_id * coll_id_array,
+			int *inout_count_p,
+			TROVE_ds_flags flags,
+			TROVE_vtag_s * vtag,
+			void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+  return dbpf_mgmt_ops.collection_iterate(inout_position_p, name_array,
+    coll_id_array, inout_count_p, flags,  vtag, user_ptr, 
+    out_op_id_p);
+}
+
+static int
+tas_collection_lookup (char *collname,
+		       TROVE_coll_id * out_coll_id_p,
+		       void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+  return dbpf_mgmt_ops.collection_lookup(collname, out_coll_id_p, 
+    user_ptr, out_op_id_p);
+}
+
+
+
+/*
+ * For all bytestream operations except resize do not lookup handle if
+ * resize of file is not selected.
+ * Directly discard all I/O operations
+ */
+
+
+static int //currently this function is not used within pvfs2 !!!
+tas_bstream_read_at (TROVE_coll_id coll_id,
+		     TROVE_handle handle,
+		     void *buffer,
+		     TROVE_size * inout_size_p,
+		     TROVE_offset offset,
+		     TROVE_ds_flags flags,
+		     TROVE_vtag_s * out_vtag,
+		     void *user_ptr,
+		     TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_bstream_read_at not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_read_at\n");
+  return 1;
+}
+
+static int //currently this function is not used within pvfs2 !!!
+tas_bstream_write_at (TROVE_coll_id coll_id,
+		      TROVE_handle handle,
+		      void *buffer,
+		      TROVE_size * inout_size_p,
+		      TROVE_offset offset,
+		      TROVE_ds_flags flags,
+		      TROVE_vtag_s * inout_vtag,
+		      void *user_ptr,
+		      TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_bstream_write_at not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_write_at\n");
+  return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+static int
+tas_bstream_resize (TROVE_coll_id coll_id,
+		    TROVE_handle handle,
+		    TROVE_size * inout_size_p,
+		    TROVE_ds_flags flags,
+		    TROVE_vtag_s * vtag,
+		    void *user_ptr,
+		    TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+  	gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_resize\n");
+	handleCache * found=findCacheEntry(handle,coll_id);
+	if(found == NULL){ //abort
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+	found->b_size = *inout_size_p;
+	*out_op_id_p=DUMMY_OPNUM;
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_bstream_validate (TROVE_coll_id coll_id,
+		      TROVE_handle handle,
+		      TROVE_ds_flags flags,
+		      TROVE_vtag_s * vtag,
+		      void *user_ptr,
+		      TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_validate\n");
+	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_bstream_read_list (TROVE_coll_id coll_id,
+                      TROVE_handle handle,
+
+                      char **mem_offset_array,   //put the real data here
+                      TROVE_size *mem_size_array, //size of the memory buffers
+                      int mem_count,//number of memory buffers
+
+                      TROVE_offset *stream_offset_array,
+                      TROVE_size *stream_size_array,
+                      int stream_count,//number of streams to read...
+                      TROVE_size *out_size_p,//read bytes....
+                      TROVE_ds_flags flags,
+                      TROVE_vtag_s *vtag,
+                      void *user_ptr,//contains the callback function trove_callback
+                       //the function is called from bmi_recv_callback_fn if
+                       //bsream_read_list returns 1
+                      TROVE_context_id context_id,
+                      TROVE_op_id *out_op_id_p)
+{
+	int i;
+	TROVE_size readBytes=0;
+	for(i=0; i < stream_count; i++){
+		readBytes+=stream_size_array[i];
+	}
+
+	gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_read_list handle:%lld flags:%d readBytes:%lld\n",lld(handle),flags,lld(readBytes));
+
+	*out_size_p = readBytes;
+  	*out_op_id_p = DUMMY_OPNUM;
+
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+//have a look to io/flow/bmi_recv_callback_fn, there this funktion is called...
+static int
+tas_bstream_write_list (TROVE_coll_id coll_id,
+                       TROVE_handle handle,
+
+                       char **mem_offset_array, //here comes the real data
+                       TROVE_size *mem_size_array, //size of the memory buffers
+                       int mem_count, //number of memory buffers
+
+                       TROVE_offset *stream_offset_array,
+                       TROVE_size *stream_size_array,
+                       int stream_count, //number of streams to write...
+
+                       TROVE_size *out_size_p, //written bytes....
+                       TROVE_ds_flags flags,
+                       TROVE_vtag_s *vtag,
+                       void *user_ptr, //contains the callback function trove_callback
+                       //the function is called from bmi_recv_callback_fn if
+                       //bsream_write_list returns 1
+                       TROVE_context_id context_id,
+                       TROVE_op_id *out_op_id_p)
+{
+	int i;
+
+	TROVE_size writtenBytes=0;
+	for(i=0; i < stream_count; i++){
+		writtenBytes+=stream_size_array[i];
+	}
+	gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_write_list for handle:%lld flags:%d fake writtenBytes:%lld  mem_count:%d stream_count:%d SYNC:%d\n",lld(handle),flags, lld(writtenBytes),mem_count, stream_count, flags);
+	*out_size_p = writtenBytes;
+
+/* set the actual file size */
+#ifdef TAS_UPDATE_FILESIZE
+		gen_mutex_lock(&tas_meta_mutex);
+		handleCache * found=findCacheEntry(handle,coll_id);
+		if(found == NULL){ //abort
+			gen_mutex_unlock(&tas_meta_mutex);
+			return -TROVE_ENOENT;
+		}
+		//get the last written byte and enlarge the file if this byte is larger than filesize
+		for(i=0; i < stream_count; i++){
+			TROVE_size akt_pos = stream_size_array[i] + stream_offset_array[i];
+			if( akt_pos > found->b_size ){
+				found->b_size = akt_pos;
+			}
+		}
+		gen_mutex_unlock(&tas_meta_mutex);
+#endif
+
+  	*out_op_id_p = DUMMY_OPNUM;
+
+
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+
+static int
+tas_bstream_flush (TROVE_coll_id coll_id,
+		   TROVE_handle handle,
+		   TROVE_ds_flags flags,
+		   void *user_ptr,
+		   TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_flush\n");
+
+  	*out_op_id_p = DUMMY_OPNUM;
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+/*
+ * Keyval operations:
+ */
+
+static int
+tas_keyval_read (TROVE_coll_id coll_id,
+		 TROVE_handle handle,
+		 TROVE_keyval_s * key_p,
+		 TROVE_keyval_s * val_p,
+		 TROVE_ds_flags flags,
+		 TROVE_vtag_s * out_vtag,
+		 void *user_ptr,
+		 TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+
+	if(key_p->buffer_sz == 3 && strncmp(key_p->buffer,"de",3)==0){
+		gossip_debug(GOSSIP_TROVE_DEBUG,"lookup dir\n");
+	}
+  
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_read flags:%d handle:%lld key-length:%d key:%s CollID:%d\n", flags, lld(handle), key_p->buffer_sz, (char *)key_p->buffer, coll_id);
+	gen_mutex_lock(&tas_meta_mutex);
+    handleCache * found=findCacheEntry(handle,coll_id);
+	if(found == NULL){ /* abort */
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	/* got handle; search for keyval */
+	keyvalpair * found_pair = findPairForCacheEntry(found,key_p);
+	if(found_pair == NULL){
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	if( val_p->buffer_sz < found_pair->val.buffer_sz ){
+		gossip_debug(GOSSIP_TROVE_DEBUG, " WARNING buffer to small tas read-keyval got:%d need:%d \n",
+			val_p->buffer_sz, found_pair->val.buffer_sz);
+		memcpy(val_p->buffer,found_pair->val.buffer, val_p->buffer_sz);
+		val_p->read_sz = val_p->buffer_sz;
+	}else{
+		memcpy(val_p->buffer,found_pair->val.buffer, found_pair->val.buffer_sz);
+		val_p->read_sz = found_pair->val.buffer_sz;
+	}
+   	gen_mutex_unlock(&tas_meta_mutex);
+	*out_op_id_p = DUMMY_OPNUM;
+	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_write (TROVE_coll_id coll_id,
+		  TROVE_handle handle,
+		  TROVE_keyval_s * key_p,
+		  TROVE_keyval_s * val_p,
+		  TROVE_ds_flags flags, // see trove.h
+		  TROVE_vtag_s * inout_vtag,
+		  void *user_ptr,
+		  TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_write flags:%d handle:%lld val-length:%d key:%s CollID:%d\n",
+		 flags, lld(handle), val_p->buffer_sz, (char*) key_p->buffer, coll_id);
+
+	gen_mutex_lock(&tas_meta_mutex);
+    handleCache * found=findCacheEntry(handle,coll_id);
+	if(found == NULL){ //abort
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	/* If TROVE_NOOVERWRITE flag was set, make sure that we don't create the
+     * key if it exists see dbpf-keyval.c*/
+    //TROVE_NOOVERWRITE
+    /* if TROVE_ONLYOVERWRITE flag was set, make sure that the key exists
+     * before overwriting it */
+    //TROVE_ONLYOVERWRITE
+	//TROVE_SYNC
+
+	//got handle; search for keyval
+	keyvalpair * found_pair = NULL;
+
+	/*
+	 * findPairForCacheEntry(found,key_p)
+	 * it should never happen that a keyval exist for the list implementation
+	 * The old value will be keeped and not be overwritten, advantages:
+	 * faster because possible existing old key need not to be found
+	 * history, see how keys are manipulated
+	 * Disadvantage: more memory usage, slow down search for other keys
+	 * 	if multiple exist....
+	 */
+
+
+    #if defined(TAS_USE_RED_BLACKTREE)
+		found_pair=findPairForCacheEntry(found,key_p);
+	#endif
+
+	if(found_pair == NULL){
+		found_pair = (keyvalpair *) malloc(sizeof(keyvalpair));
+		found_pair->key.buffer = malloc(key_p->buffer_sz);
+		found_pair->key.buffer_sz = key_p->buffer_sz;
+		memcpy(found_pair->key.buffer,key_p->buffer,key_p->buffer_sz);
+
+		insertKeyValPair(found,found_pair);
+
+		found->k_size++;
+	}else{ //free memory first:
+		free(found_pair->val.buffer);
+	}
+
+	found_pair->val.buffer = malloc(val_p->buffer_sz);
+	found_pair->val.buffer_sz = val_p->buffer_sz;
+	memcpy(found_pair->val.buffer,val_p->buffer,val_p->buffer_sz);
+
+
+	gen_mutex_unlock(&tas_meta_mutex);
+  return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_remove (TROVE_coll_id coll_id,
+			TROVE_handle handle,
+			TROVE_keyval_s *key_p,
+                        TROVE_keyval_s *val_p,
+			TROVE_ds_flags flags,
+			TROVE_vtag_s *vtag,
+			void *user_ptr,
+		        TROVE_context_id context_id,
+			TROVE_op_id *out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_remove\n");
+
+	gen_mutex_lock(&tas_meta_mutex);
+    handleCache * found=findCacheEntry(handle,coll_id);
+	if(found == NULL){ //abort
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	//got handle; search for keyval
+	if(! removePairOfCacheEntry(found,key_p)){
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	found->k_size--;
+
+   	gen_mutex_unlock(&tas_meta_mutex);
+	*out_op_id_p = DUMMY_OPNUM;
+	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_validate (TROVE_coll_id coll_id,
+		     TROVE_handle handle,
+		     TROVE_ds_flags flags,
+		     TROVE_vtag_s * inout_vtag,
+		     void *user_ptr,
+		     TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_validate\n");
+	fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_validate not implemented\n");
+	*out_op_id_p = DUMMY_OPNUM;
+	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+inline static void keyValCopy(TROVE_keyval_s * out_key_array,
+		    TROVE_keyval_s * out_val_array, int i, keyvalpair * pair){
+	int max_read_size = pair->key.buffer_sz;
+	if(out_key_array[i].buffer_sz < max_read_size){
+		max_read_size = out_key_array[i].buffer_sz;
+	}
+	out_key_array[i].read_sz = max_read_size;
+	memcpy(out_key_array[i].buffer,pair->key.buffer,
+			max_read_size);
+
+	max_read_size = pair->val.buffer_sz;
+	if(out_val_array[i].buffer_sz < max_read_size){
+		max_read_size = out_val_array[i].buffer_sz;
+	}
+	out_val_array[i].read_sz = max_read_size;
+	memcpy(out_val_array[i].buffer,pair->val.buffer,
+			max_read_size);
+}
+
+/*
+ * reads count keyword/value pairs from the provided logical position in the
+ * keyval space.
+ */
+static int tas_keyval_iterate (TROVE_coll_id coll_id,
+		    TROVE_handle handle,
+		    TROVE_ds_position * inout_position_p,
+		    TROVE_keyval_s * out_key_array,
+		    TROVE_keyval_s * out_val_array,
+		    int *inout_count_p,
+		    TROVE_ds_flags flags,
+		    TROVE_vtag_s * inout_vtag,
+		    void *user_ptr,
+		    TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_iterate\n");
+
+	if(*inout_position_p == TROVE_ITERATE_START){
+		*inout_position_p =0;
+	}
+
+	gen_mutex_lock(&tas_meta_mutex);
+    handleCache * found=findCacheEntry(handle,coll_id);
+	if(found == NULL){ //abort
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	int32_t in_read_count = *inout_count_p;
+	int32_t in_read_position = *inout_position_p;
+
+	int32_t remainingKeyValues = found->k_size - in_read_position;
+	gossip_debug(GOSSIP_TROVE_DEBUG," read_count %d read_pos %d remain %d \n",in_read_count,in_read_position,remainingKeyValues);
+	//set read count, if less keys available
+	if(remainingKeyValues < in_read_count){
+		*inout_count_p = remainingKeyValues; //number of keypairs to read
+		in_read_count = remainingKeyValues;
+	}
+	*inout_position_p= in_read_position + *inout_count_p;
+	if(*inout_count_p <= 0){
+		gen_mutex_unlock(&tas_meta_mutex);
+		*inout_count_p = 0;
+		return RETURN_IMMEDIATE_COMPLETE;
+	}
+
+	gossip_debug(GOSSIP_TROVE_DEBUG," read_count %d read_pos %d remain %d \n",in_read_count,in_read_position,remainingKeyValues);
+
+	//skip to in_read_position...
+	keyvalpair * found_pair;
+
+
+	#if defined(TAS_USE_RED_BLACKTREE)
+		//go to leftmost node:
+		found_pair=found->pairList;
+	#elif defined(TAS_USE_QUEUE)
+		found_pair=found->pairs;
+	#endif
+
+	for(;  in_read_position > 0 ;
+	found_pair = found_pair->next,in_read_position--);
+
+	//fill the array...
+	int i=0;
+	for(	;  i < in_read_count ;
+		found_pair = found_pair->next,i++){
+		keyValCopy(out_key_array, out_val_array,i ,found_pair );
+	}
+
+	gen_mutex_unlock(&tas_meta_mutex);
+
+	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_iterate_keys (TROVE_coll_id coll_id,
+			 TROVE_handle handle,
+			 TROVE_ds_position * inout_position_p,
+			 TROVE_keyval_s * out_key_array,
+			 int *inout_count_p,
+			 TROVE_ds_flags flags,
+			 TROVE_vtag_s * vtag,
+			 void *user_ptr,
+			 TROVE_context_id context_id,
+			 TROVE_op_id * out_op_id_p)
+{
+	fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_iterate_keys not implemented\n");
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_iterate_keys\n");
+  	return 1;
+}
+
+static int
+tas_keyval_read_list (TROVE_coll_id coll_id,
+		      TROVE_handle handle,
+		      TROVE_keyval_s * key_array,
+		      TROVE_keyval_s * val_array,
+		      TROVE_ds_state *err_array,
+		      int count,
+		      TROVE_ds_flags flags,
+		      TROVE_vtag_s * out_vtag,
+		      void *user_ptr,
+		      TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_read_list not implemented\n");
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_read_list\n");
+	return 1;
+}
+
+static int
+tas_keyval_write_list (TROVE_coll_id coll_id,
+		       TROVE_handle handle,
+		       TROVE_keyval_s * key_array,
+		       TROVE_keyval_s * val_array,
+		       int count,
+		       TROVE_ds_flags flags,
+		       TROVE_vtag_s * inout_vtag,
+		       void *user_ptr,
+		       TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_write_list not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_write_list\n");
+  return 1;
+}
+
+static int
+tas_keyval_flush (TROVE_coll_id coll_id,
+		  TROVE_handle handle,
+		  TROVE_ds_flags flags,
+		  void *user_ptr,
+		  TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_flush not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_flush\n");
+  return 1;
+}
+
+
+
+static int
+tas_dspace_create (TROVE_coll_id coll_id,
+			TROVE_handle_extent_array * extent_array,
+			TROVE_handle * handle,
+			TROVE_ds_type type,
+			TROVE_keyval_s * hint,	/* TODO: figure out what this is! */
+		    TROVE_ds_flags flags,
+		    void *user_ptr,
+		    TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	if (!extent_array || (extent_array->extent_count < 1))
+    {
+        return -TROVE_EINVAL;
+    }
+
+   	TROVE_extent cur_extent= extent_array->extent_array[0];
+    TROVE_handle new_handle=TROVE_HANDLE_NULL;
+    /* check if we got a single specific handle */
+    if ((extent_array->extent_count == 1) &&
+        (cur_extent.first == cur_extent.last))
+    {
+        /*
+          check if we MUST use the exact handle value specified;
+          if caller requests a specific handle, honor it
+        */
+        if (flags & TROVE_FORCE_REQUESTED_HANDLE)
+        {
+			new_handle =  * handle;
+			if( new_handle < handleSeperator ){
+				if(aktHandleFirst <= new_handle){
+					aktHandleFirst = new_handle+1;
+				}
+			}else{
+				if(aktHandleSecond <= new_handle){
+					aktHandleSecond = new_handle+1;
+				}
+			}
+        }
+        else if (cur_extent.first == TROVE_HANDLE_NULL)
+        {
+            /*
+              if we got TROVE_HANDLE_NULL, the caller doesn't care
+              where the handle comes from
+            */
+            printf("%llu cur_extent.first == TROVE_HANDLE_NULL\n",*handle);
+			//new_handle = getArbitraryHandle(coll_id,*handle);
+        }
+    }
+    else
+    {
+        /*
+          otherwise, we have to try to allocate a handle from
+          the specified range that we're given
+          Ignore that circumstance for now !!!!
+        */
+
+			if( cur_extent.last < handleSeperator ){
+					new_handle = aktHandleFirst;
+					aktHandleFirst++;
+			}else{
+					new_handle = aktHandleSecond;
+					aktHandleSecond++;
+			}
+    }
+
+    *handle = new_handle;
+
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_create Type:%d flag:%d extent_count->%u new_handle->%lld cur_extent.first:%lld cur_extent.last:%lld\n",
+			type,   flags,
+			extent_array->extent_count,
+			lld(new_handle),lld(cur_extent.first),lld(cur_extent.last) );
+    /*
+      if we got a zero handle, we're either completely out of handles
+      -- or else something terrible has happened
+    */
+    if (new_handle == TROVE_HANDLE_NULL)
+    {
+        gossip_err("Error: handle allocator returned a zero handle.\n");
+        gen_mutex_unlock(&tas_meta_mutex);
+       return -TROVE_ENOSPC;
+    }
+
+    handleCache * found=NULL;
+	#ifndef NO_SAVE_CREATION
+		found = findCacheEntry(new_handle,coll_id);
+	#endif
+	if(found == NULL){ //create new metadata structure...
+		found = allocateNewCacheElement();
+		insertCacheElement(found,new_handle,coll_id, type);
+	}else{
+		gossip_debug(GOSSIP_TROVE_DEBUG, "ERROR handle exists already %lld\n",lld(new_handle));
+	}
+	gen_mutex_unlock(&tas_meta_mutex);
+
+	*out_op_id_p = DUMMY_OPNUM;
+  return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int tas_dspace_remove (TROVE_coll_id coll_id,
+		   TROVE_handle handle,
+		   TROVE_ds_flags flags,
+		   void *user_ptr,
+		   TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_remove\n");
+
+	gen_mutex_lock(&tas_meta_mutex);
+
+	if(! removeCacheEntry(handle,coll_id)){ //abort
+		gen_mutex_unlock(&tas_meta_mutex);
+		return -TROVE_ENOENT;
+	}
+
+	gen_mutex_unlock(&tas_meta_mutex);
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_dspace_iterate_handles (TROVE_coll_id coll_id,
+			    TROVE_ds_position * position_p,
+			    TROVE_handle * handle_array,
+			    int *inout_count_p,
+			    TROVE_ds_flags flags,
+			    TROVE_vtag_s * vtag,
+			    void *user_ptr,
+			    TROVE_context_id context_id,
+			    TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_iterate_handles \n");
+ 	fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_dspace_iterate_handles not implemented\n");
+  	*out_op_id_p = DUMMY_OPNUM;
+  	return 1;
+}
+
+
+
+static int
+tas_dspace_verify (TROVE_coll_id coll_id, TROVE_handle handle, TROVE_ds_type * type,	/* TODO: define types! */
+		   TROVE_ds_flags flags,
+		   void *user_ptr,
+		   TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_verify\n");
+	*out_op_id_p = DUMMY_OPNUM;
+
+    handleCache * found=findCacheEntry(handle,coll_id);
+
+	if(found == NULL){
+		return -TROVE_ENOENT;
+	}
+
+    *type = found->attributes->type;
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_dspace_getattr (TROVE_coll_id coll_id,
+		    TROVE_handle handle,
+		    TROVE_ds_attributes_s * ds_attr_p,
+		    TROVE_ds_flags flags,
+		    void *user_ptr,
+		    TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_getattr handle: %lld flags: %d \n", lld(handle), flags);
+
+	gen_mutex_lock(&tas_meta_mutex);
+
+    handleCache * found=findCacheEntry(handle,coll_id);
+
+	if(found != NULL){
+			*out_op_id_p = DUMMY_OPNUM;
+			ds_attr_p->type = found->attributes->type;
+			ds_attr_p->handle = handle;
+			ds_attr_p->fs_id = found->fs_id;
+		    ds_attr_p->uid= found->attributes->uid;
+		    ds_attr_p->gid= found->attributes->gid;
+		    ds_attr_p->mode= found->attributes->mode;
+		    ds_attr_p->ctime= found->attributes->ctime;
+		    ds_attr_p->mtime= found->attributes->mtime;
+		    ds_attr_p->atime= found->attributes->atime;
+		    ds_attr_p->dfile_count= found->attributes->dfile_count;
+		    ds_attr_p->dist_size= found->attributes->dist_size;
+
+		    ds_attr_p->b_size = found->b_size;
+		    //ds_attr_p->k_size = found->k_size; TODO FIXME
+
+   			gen_mutex_unlock(&tas_meta_mutex);
+			return RETURN_IMMEDIATE_COMPLETE;
+	}
+	gen_mutex_unlock(&tas_meta_mutex);
+
+  return -1;
+}
+
+static int 
+tas_dspace_getattr_list(
+              TROVE_coll_id coll_id,
+              int nhandles,
+              TROVE_handle *handle_array,
+              TROVE_ds_attributes_s *ds_attr_p,
+                          TROVE_ds_state *error_array,
+              TROVE_ds_flags flags,
+              void *user_ptr,
+              TROVE_context_id context_id,
+              TROVE_op_id *out_op_id_p)
+{
+  return -TROVE_EINVAL;
+}
+
+static int
+tas_dspace_setattr (TROVE_coll_id coll_id,
+              TROVE_handle handle,
+              TROVE_ds_attributes_s *ds_attr_p, 
+              TROVE_ds_flags flags,
+              void *user_ptr,
+              TROVE_context_id context_id,
+              TROVE_op_id *out_op_id_p)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_setattr handle:%lld uid:%d gid:%d\n",
+			lld(handle),ds_attr_p->uid,ds_attr_p->gid);
+
+	gen_mutex_lock(&tas_meta_mutex);
+    handleCache * found=findCacheEntry(handle,coll_id);
+
+	if(found != NULL){
+			found->attributes->type = ds_attr_p->type;
+			if( found->handle != handle){
+				//found->fs_id != ds_attr_p->fs_id ||
+				printf("Fatal ERROR tas_dspace_setattr it is not allowed "
+					"to change the handle number or filesystem id!!\n");
+				exit(1);
+			}
+		    found->attributes->uid=ds_attr_p->uid;
+		    found->attributes->gid=ds_attr_p->gid;
+		    found->attributes->mode = ds_attr_p->mode;
+		    found->attributes->ctime = ds_attr_p->ctime;
+		    found->attributes->mtime = ds_attr_p->mtime;
+		    found->attributes->atime = ds_attr_p->atime;
+	        found->attributes->dfile_count = ds_attr_p->dfile_count;
+		    found->attributes->dist_size = ds_attr_p->dist_size;
+
+			//take special care for:
+		    //found->b_size = ds_attr_p->b_size;
+		    //found->k_size = ds_attr_p->k_size;
+
+			gen_mutex_unlock(&tas_meta_mutex);
+
+			return RETURN_IMMEDIATE_COMPLETE;
+	}
+
+	gen_mutex_unlock(&tas_meta_mutex);
+	return -TROVE_EEXIST;
+}
+
+static int
+tas_dspace_cancel (TROVE_coll_id coll_id,
+               TROVE_op_id ds_id,
+               TROVE_context_id context_id)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_dspace_cancel not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_cancel\n");
+  return 1;
+}
+
+static int
+tas_dspace_test (TROVE_coll_id coll_id,
+               TROVE_op_id ds_id,
+               TROVE_context_id context_id,
+               int *out_count_p,
+               TROVE_vtag_s *vtag,
+               void **returned_user_ptr_p,
+               TROVE_ds_state *out_state_p,
+               int max_idle_time_ms)
+{
+  return dbpf_dspace_ops.dspace_test(coll_id, ds_id, 
+    context_id, out_count_p, vtag, returned_user_ptr_p, 
+    out_state_p, max_idle_time_ms);
+}
+
+static int
+tas_dspace_testsome (TROVE_coll_id coll_id,
+               TROVE_context_id context_id,
+               TROVE_op_id *ds_id_array,
+               int *inout_count_p,
+               int *out_index_array,
+               TROVE_vtag_s *vtag_array,
+               void **returned_user_ptr_array,
+               TROVE_ds_state *out_state_array,
+               int max_idle_time_ms)
+{
+  return dbpf_dspace_ops.dspace_testsome(coll_id, context_id, ds_id_array, 
+    inout_count_p,out_index_array,vtag_array,returned_user_ptr_array, 
+    out_state_array,max_idle_time_ms);
+}
+
+static int
+tas_dspace_testcontext (TROVE_coll_id coll_id,
+                           TROVE_op_id *ds_id_array,
+                           int *inout_count_p,
+                           TROVE_ds_state *state_array,
+                           void** user_ptr_array,
+                           int max_idle_time_ms,
+                           TROVE_context_id context_id)
+{
+  return dbpf_dspace_ops.dspace_testcontext(coll_id, ds_id_array,
+    inout_count_p, state_array, user_ptr_array, max_idle_time_ms, context_id);
+}
+
+static int
+tas_collection_setinfo (TROVE_method_id method_id,
+                  TROVE_coll_id coll_id,
+                  TROVE_context_id context_id,
+                  int option,
+                  void *parameter)
+{
+		gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_setinfo %d, \n", option);
+    int ret = 1; //-TROVE_EINVAL;
+
+    switch(option)
+    {
+        case TROVE_COLLECTION_HANDLE_RANGES:{
+        	gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_HANDLE_RANGES: %s %d\n",(char *)parameter,context_id);
+
+			//set minimum handles according to handle range...
+		   TROVE_handle minMetaHandle,maxMetaHandle,minDatafileHandle,maxDatafileHandle;
+           ret = sscanf((char *)parameter, "%llu-%llu,%llu-%llu",
+	           		& minMetaHandle,& maxMetaHandle,& minDatafileHandle, & maxDatafileHandle );
+          if( ret == 4 ){ //set only once !!!
+					aktHandleFirst = minMetaHandle;
+					aktHandleSecond = minDatafileHandle;
+					handleSeperator = minDatafileHandle;
+					if( aktHandleSecond < aktHandleFirst ){
+						printf("error in parsing handle range !!! %s \n", (char*) parameter);
+						exit(1);
+					}
+           }else if(ret == 2){ //only datafiles OR metadata !
+          		ret = sscanf((char *)parameter, "%llu-%llu",
+           		 	& minDatafileHandle, & maxDatafileHandle );
+				aktHandleFirst = minDatafileHandle;
+				aktHandleSecond = 0;
+				handleSeperator = maxDatafileHandle+1;
+           }else{
+           		return 0;
+           }
+
+			FILE * file;
+			printf("read metadata from file: %s \n",storageName);
+			file = fopen (storageName, "r");
+			if(file == 0){
+				printf(" no metadata information available \n");
+			    return RETURN_IMMEDIATE_COMPLETE;
+			}
+
+			PVFS_size i;
+
+			handleCache * found=(handleCache *) malloc(sizeof(handleCache));
+			found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+
+			ret=fscanf(file,"Count:%llu\n",& meta_count);
+			if(ret != 1){
+				meta_count = 0;
+				free(found);
+
+				fclose(file);
+			    return RETURN_IMMEDIATE_COMPLETE;
+			}
+			for(i=0; i < meta_count; i++){
+				bzero(found->attributes,sizeof(storeAttributes));
+				ret=fscanf(file,"Handle:%llu Type:%u FS-ID:%d UID:%u GID:%u MODE:%u DFILE_COUNT:%u DIST_SIZE:%u BYTE_SIZE:%llu KEYS:%llu CTIME:%llu MTIME:%llu ATIME:%llu\n",
+				&(found->handle), (int*)&(found->attributes->type),&found->fs_id,
+				&found->attributes->uid,  &found->attributes->gid,
+				&found->attributes->mode, &found->attributes->dfile_count,
+				&found->attributes->dist_size, &found->b_size, &found->k_size,
+				&found->attributes->ctime, &found->attributes->mtime, &found->attributes->atime);
+
+				if(ret == 13){
+					if( found->handle < handleSeperator ){
+						if(aktHandleFirst <= found->handle){
+							aktHandleFirst = found->handle+1;
+						}
+					}else if(aktHandleSecond == 0) {
+						printf("ERROR aktHandleSecond = 0 \n");
+					}else{
+						if(aktHandleSecond <= found->handle){
+							aktHandleSecond = found->handle+1;
+						}
+					}
+
+					keyvalpair * akt_pair;
+
+					#if defined(TAS_USE_RED_BLACKTREE)
+						insertKeyIntoTree((RBData*) found,tas_meta_cache);
+						found->pairList = NULL;
+						found->pairTree = NULL;
+					#elif defined(TAS_USE_QUEUE)
+						found->pairs = NULL;
+						pushFrontKey(found->handle,found, tas_meta_cache);
+					#endif
+
+					akt_pair = NULL;
+
+					//read key->value pairs:
+					int i;
+					for(i=0; i < found->k_size; i++){
+						akt_pair = (keyvalpair *) malloc(sizeof(keyvalpair));
+
+						int key_sz, val_sz;
+						fscanf(file,"  Key-size:%d Val-size:%d  ",& key_sz, & val_sz);
+		         		akt_pair->key.buffer_sz = key_sz;
+		         		akt_pair->val.buffer_sz = val_sz;
+		         		//
+		         		akt_pair->key.buffer = malloc(key_sz);
+		         		akt_pair->val.buffer = malloc(val_sz);
+		         		fread(akt_pair->key.buffer,1,key_sz, file);
+						fscanf(file," Value:");
+						fread(akt_pair->val.buffer,1,val_sz, file);
+						fscanf(file,"\n");
+
+						insertKeyValPair(found,akt_pair);
+					}
+
+					found = (handleCache *) malloc(sizeof(handleCache));
+					found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+				}else{
+					printf(" Less elements than expected read %llu of %llu \n", i, meta_count);
+					break;
+				}
+			}
+			free(found->attributes);
+			free(found);
+
+			fclose(file);
+
+       	 	ret = 1;
+        	break;
+    	}case TROVE_COLLECTION_HANDLE_TIMEOUT:
+            ret = 1; // trove_set_handle_timeout(coll_id, context_id, (struct timeval *)parameter);
+        	gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_HANDLE_TIMEOUT\n");
+            break;
+        case TROVE_COLLECTION_ATTR_CACHE_KEYWORDS:
+            gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_KEYWORDS: %s\n", (char *) parameter);
+            ret = 1;
+            break;
+        case TROVE_COLLECTION_ATTR_CACHE_SIZE:
+            gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_SIZE: %d\n",*((int *)parameter));
+            ret = 1;
+            break;
+        case TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS:
+            gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS: %d\n",*((int *)parameter));
+            ret = 1;
+            break;
+        case TROVE_COLLECTION_ATTR_CACHE_INITIALIZE:
+            gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_INITIALIZE \n");
+            ret = 1;
+            break;
+    }
+    
+    return ret;
+}
+
+static int
+tas_collection_getinfo (TROVE_coll_id coll_id,
+                  TROVE_context_id context_id,
+                  TROVE_coll_getinfo_options opt,
+                  void *parameter)
+{
+  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_getinfo not implemented\n");
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_getinfo\n");
+  return 1;
+}
+
+static int
+tas_collection_seteattr (TROVE_coll_id coll_id,
+			 TROVE_keyval_s * key_p,
+			 TROVE_keyval_s * val_p,
+			 TROVE_ds_flags flags,
+			 void *user_ptr,
+			 TROVE_context_id context_id,
+			 TROVE_op_id * out_op_id_p)
+{
+  gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_seteattr Key:%s \n", (char *)key_p->buffer);
+  if(key_p->buffer_sz == 12 && strncmp(key_p->buffer,"root_handle",12)==0){
+	   /* rootHandle = *((TROVE_handle *) val_p->buffer); */
+  }else
+	  fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_seteattr not implemented\n");
+  return 1;
+}
+
+static int
+tas_collection_geteattr (TROVE_coll_id coll_id,
+			 TROVE_keyval_s * key_p,
+			 TROVE_keyval_s * val_p,
+			 TROVE_ds_flags flags,
+			 void *user_ptr,
+			 TROVE_context_id context_id,
+			 TROVE_op_id * out_op_id_p)
+{
+		gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_geteattr Key:%s\n", (char *)key_p->buffer);
+		fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_geteattr not implemented\n");
+		return 1;
+}
+
+
+static int
+tas_open_context (TROVE_coll_id coll_id, TROVE_context_id * context_id)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_geteattr\n");
+	*context_id = 1;
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_close_context (TROVE_coll_id coll_id, TROVE_context_id context_id)
+{
+	gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_close_context\n");
+  	return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+
+/*
+ * Function pointers to the tas implementation are set in trove-mgmt.c
+ */
+
+struct TROVE_mgmt_ops tas_mgmt_ops = {
+  .initialize = tas_initialize,
+  .finalize = tas_finalize,
+  tas_storage_create,
+  tas_storage_remove,
+  tas_collection_create,
+  tas_collection_remove,
+  tas_collection_lookup,
+  tas_collection_iterate,
+  tas_collection_setinfo,
+  tas_collection_getinfo,
+  tas_collection_seteattr,
+  tas_collection_geteattr
+};
+
+struct TROVE_dspace_ops tas_dspace_ops = {
+  tas_dspace_create,
+  tas_dspace_remove,
+  tas_dspace_iterate_handles,
+  tas_dspace_verify,
+  tas_dspace_getattr,
+  tas_dspace_getattr_list,
+  tas_dspace_setattr,
+  tas_dspace_cancel,
+  tas_dspace_test,
+  tas_dspace_testsome,
+  tas_dspace_testcontext
+};
+
+
+struct TROVE_keyval_ops tas_keyval_ops =
+{
+    tas_keyval_read,
+    tas_keyval_write,
+    tas_keyval_remove,
+    tas_keyval_validate,
+    tas_keyval_iterate,
+    tas_keyval_iterate_keys,
+    tas_keyval_read_list,
+    tas_keyval_write_list,
+    tas_keyval_flush
+};
+ 
+
+struct TROVE_bstream_ops tas_bstream_ops = {
+  tas_bstream_read_at,
+  tas_bstream_write_at,
+  tas_bstream_resize,
+  tas_bstream_validate,
+  tas_bstream_read_list,
+  tas_bstream_write_list,
+  tas_bstream_flush
+};
+
+struct TROVE_context_ops tas_context_ops = {
+  tas_open_context,
+  tas_close_context
+};

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas-queue.h	2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,59 @@
+/*
+ * Author Julian Kunkel
+ * Simple double linked list with puts object 
+ * 	to front of list when it is accessed.
+ */
+
+#ifndef TASQUEUE_H_
+#define TASQUEUE_H_
+
+#include<stdlib.h>
+
+/*
+ * Print a warning if lookup takes more than WARNING_NUM iterations
+ * Performance decreases the more elements need to be inspected...
+ */
+#define WARNING_NUM 40
+
+typedef int64_t queueKey;
+
+struct tas_queue_elem_{
+	struct tas_queue_elem_ * next;
+ 	struct tas_queue_elem_ * prev;
+
+	queueKey key;
+ 	void * data;
+};
+
+typedef struct tas_queue_elem_ queue_elem;
+
+//currently queue and queue_elem are equal,
+//but maybe some elements will be added later so:
+typedef struct tas_queue_elem_* tas_queue;
+
+
+//functions:
+tas_queue *newTasQueue(void);
+
+queue_elem * lookupQueue(queueKey key, tas_queue* queue);
+
+void deleteElementFromList(queue_elem * elem, tas_queue* queue);
+void * deleteKeyFromList(queueKey key, tas_queue* queue);
+//lookup key and replace data if possible, else pushfront.
+//returns replaced data (if key exists)
+//Queue element is put to head of queue
+void * insertKeyIntoQueue(queueKey key, void * data, tas_queue* queue);
+queue_elem * pushFrontKey(queueKey key, void * data, tas_queue* queue);
+
+//remove data from current possition and push it to head of queue,
+void relinkFront(queue_elem * elem, tas_queue* queue);
+
+
+#define iterate_tas_queue(func,elem,queue){ 		\
+	queue_elem * elem=*queue; 					    \
+	for(; elem != NULL ; elem = elem->next){        \
+		(func); 									\
+	} 											    \
+}                                         
+
+#endif /*TASQUEUE_H_*/

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ tas-handle-manager.c	2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,327 @@
+#include "tas-handle-manager.h"
+#include <stdio.h>
+#include <assert.h>
+
+static gen_mutex_t tas_handlemanager_mutex;
+red_black_tree * collectionTree=NULL;
+static int handleManagerInititalised = 0;
+
+struct tas_handle_manager_{
+	red_black_tree * handleTree;
+	
+	gen_mutex_t tree_mutex;
+};
+
+typedef struct tas_handle_manager_ tas_handle_manager;
+
+/*
+ * Tree for mapping the collection id to the handle_managers
+ */
+struct tas_handle_manager_collections_{
+	TROVE_coll_id coll_id;
+	TROVE_handle seperatorHandleNo;
+	tas_handle_manager * firstHandleType;
+	tas_handle_manager * secondHandleType;	
+};
+
+typedef struct tas_handle_manager_collections_ tas_handle_manager_collections;
+
+/*
+ * Comparision functions:
+ */
+static int compareCollections(RBData * data, RBKey * key){
+	TROVE_coll_id ikey1=((tas_handle_manager_collections*)data)->coll_id;
+	TROVE_coll_id ikey2=*((TROVE_coll_id *) key);	
+	if(ikey1 > ikey2 ){ //take right neigbour
+		return +1;
+	}else if(ikey1 < ikey2){//take left neigbour
+		return -1;
+	}
+	return 0;
+}
+
+static int compareCollections2(RBData * data, RBData * data2){
+	TROVE_coll_id ikey1=((tas_handle_manager_collections*)data)->coll_id;
+	TROVE_coll_id ikey2=((tas_handle_manager_collections*)data2)->coll_id;
+	if(ikey1 > ikey2 ){ //take right neigbour
+		return +1;
+	}else if(ikey1 < ikey2){//take left neigbour
+		return -1;
+	}
+	return 0;
+}
+
+//for comparision of handleRanges:
+static int compareRanges2(RBData * data, RBData * data2){
+	tas_handleRange * ikey1=(tas_handleRange *) data;
+	tas_handleRange * ikey2=(tas_handleRange *) data2;
+	
+	//is no overlapping possible ?
+	if(ikey1->last < ikey2->first ){ 
+		return -1;
+	}else if(ikey1->first > ikey2->last){
+		return +1;
+	}
+	//it does overlapp !!!
+	return 0;
+}
+
+/*
+ * Helper functions:
+ */
+static inline void insertHandleExtend(tas_handle_manager* manager,TROVE_handle lowerBound, TROVE_handle upperBound){
+	tas_handleRange* handles = (tas_handleRange*) malloc(sizeof(tas_handleRange));
+	handles->first = lowerBound;
+	handles->last = upperBound;
+	insertKeyIntoTree(handles,manager->handleTree);
+};
+
+static inline tas_handle_manager* insertHandleRangeManager(TROVE_handle lowerBound, TROVE_handle upperBound) {
+	tas_handle_manager* manager=(tas_handle_manager*) malloc(sizeof(tas_handle_manager));
+	gen_mutex_init(& manager->tree_mutex);
+	manager->handleTree = newRedBlackTree(compareRanges2,compareRanges2);
+	insertHandleExtend(manager, lowerBound,upperBound);
+	return manager;
+}
+
+static inline tas_handle_manager* lookupCollection(TROVE_coll_id coll_id, TROVE_handle handle){
+  	tree_node * node =lookupTree(& coll_id,collectionTree);
+  	if(node == NULL) return NULL;
+  	tas_handle_manager_collections * collection = (tas_handle_manager_collections*) node->data;
+  	if(handle > collection->seperatorHandleNo){
+  		return collection->secondHandleType;
+  	}else{
+  		return collection->firstHandleType;
+  	}
+}
+	
+/*
+ * 
+ */
+void initialiseHandleManager(void){
+	if(!handleManagerInititalised){
+		gen_mutex_init(& tas_handlemanager_mutex);
+		gen_mutex_lock(& tas_handlemanager_mutex);
+		collectionTree = newRedBlackTree(compareCollections,compareCollections2);
+		gen_mutex_unlock(& tas_handlemanager_mutex);	
+	}
+	handleManagerInititalised = 1;
+}
+
+
+void initialiseCollection(TROVE_coll_id coll_id,TROVE_handle lowerBound, 
+	TROVE_handle upperBound,TROVE_handle lowerBound2, TROVE_handle upperBound2){
+		
+	if(handleManagerInititalised != 1){
+		printf("ERROR Handle manager is not initialised !\n");
+		assert(0);
+	}
+	gen_mutex_lock(& tas_handlemanager_mutex);
+	if( lookupTree(& coll_id, collectionTree) != NULL){
+		printf("ERROR: collection already initialised !\n");
+		assert(0);
+	}
+			
+	tas_handle_manager_collections * coll_manager = 
+		(tas_handle_manager_collections*) malloc(sizeof(tas_handle_manager_collections));
+	coll_manager->coll_id = coll_id;
+	coll_manager->seperatorHandleNo = upperBound;
+
+	coll_manager->firstHandleType = insertHandleRangeManager(lowerBound, upperBound);
+	if(lowerBound2 == 0){
+		coll_manager->secondHandleType = NULL;
+	}else{
+		coll_manager->secondHandleType = insertHandleRangeManager(lowerBound2, upperBound2);		
+	}
+	coll_manager->coll_id = coll_id;
+
+  	insertKeyIntoTree(coll_manager,collectionTree);
+	gen_mutex_unlock(& tas_handlemanager_mutex);		
+}
+
+/*
+ * Remove handle from available handles and return it !
+ * Assume the handles will be never used up....
+ */
+TROVE_handle getHandle(TROVE_coll_id coll_id,TROVE_handle_extent_array * extends){
+	tas_handle_manager* manager = lookupCollection(coll_id, extends->extent_array[0].first);
+	if(manager == NULL) return TROVE_HANDLE_NULL;
+	
+	int i;
+	TROVE_handle handle=TROVE_HANDLE_NULL;
+	tree_node * node;
+	gen_mutex_lock(& manager->tree_mutex);
+	for(i=0; i < extends->extent_count ; i++){
+		 node= lookupTree(& extends->extent_array[i],manager->handleTree);
+		 if(node != NULL){
+		 	//found a overlapping extend, take first :)
+		 	
+		 	tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+		 	handle=nodeRange->first;
+		 	nodeRange->first++;
+		 	if(nodeRange->first > nodeRange->last){ 
+		 		//remove handle range:
+		 		deleteNodeFromTree(node,manager->handleTree);
+		 		free(nodeRange);
+		 	}
+		 	
+		 	break;
+		 }
+	}
+	gen_mutex_unlock(& manager->tree_mutex);	
+	return handle;
+}
+
+TROVE_handle getArbitraryHandle(TROVE_coll_id coll_id,TROVE_handle handleSep){
+	tas_handle_manager* manager = lookupCollection(coll_id,handleSep);
+	if(manager == NULL) return TROVE_HANDLE_NULL;
+
+	TROVE_handle handle=TROVE_HANDLE_NULL;
+	gen_mutex_lock(& manager->tree_mutex);
+   	tas_handleRange * nodeRange = (tas_handleRange *) manager->handleTree->head->data;
+   	handle=nodeRange->first;
+ 	nodeRange->first++;
+ 	if(nodeRange->first > nodeRange->last){
+ 		//remove handle range:
+ 		deleteNodeFromTree(manager->handleTree->head,manager->handleTree);
+ 		free(nodeRange);
+ 	}
+	gen_mutex_unlock(& manager->tree_mutex);
+	return handle;
+}
+
+/*
+ * May split handle range into two parts
+ */
+TROVE_handle getfixedHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+	tas_handle_manager* manager = lookupCollection(coll_id, handle);
+	if(manager == NULL) return TROVE_HANDLE_NULL;
+	PVFS_handle_extent handleExtend;
+	handleExtend.first = handle;
+	handleExtend.last = handle;
+
+	tree_node * node;
+	gen_mutex_lock(& manager->tree_mutex);
+	node= lookupTree(& handleExtend, manager->handleTree);
+	if(node != NULL){
+	 	//found a extend which contains handle
+	 	tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+	 	
+	 	if(nodeRange->first < handle && nodeRange->last > handle){
+	 		//split handle range into two parts ...
+	 		//change data of current node (does not destroy tree order)
+	 		TROVE_handle tmpRangeFirst = nodeRange->first;
+	 		nodeRange->first = handle+1;
+	 		insertHandleExtend(manager, tmpRangeFirst, handle-1);
+	 	}else{
+		 	if(nodeRange->first == handle){
+		 		nodeRange->first++;
+		 	}else if(nodeRange->last == handle){
+		 		nodeRange->last--;
+		 	}
+			if(nodeRange->first > nodeRange->last){
+				//remove handle range:
+				deleteNodeFromTree(node,manager->handleTree);
+				free(nodeRange);
+			}
+	 	}
+ 		gen_mutex_unlock(& manager->tree_mutex);	
+ 		return handle;
+	}
+	gen_mutex_unlock(& manager->tree_mutex);	
+	return TROVE_HANDLE_NULL;
+}
+
+/*
+ * Free the handle by adding it to a handle range !
+ * Maybe fix entries together in handle manager...
+ */
+int freeHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+	return freeHandleRange(coll_id, handle,handle);
+}
+
+int freeHandleRange(TROVE_coll_id coll_id,TROVE_handle lowerhandle,TROVE_handle upperhandle){
+	tas_handle_manager* manager = lookupCollection(coll_id, upperhandle);
+	if(manager == NULL) return -1;
+	
+	tree_node * node;
+	PVFS_handle_extent handleExtend;
+	handleExtend.first = lowerhandle;
+	handleExtend.last = upperhandle;
+	gen_mutex_lock(& manager->tree_mutex);
+	node= lookupTree(& handleExtend, manager->handleTree);
+	if(node != NULL){
+		gen_mutex_unlock(& manager->tree_mutex);	
+		//at least one handle in range is already free, this should not happen !
+		return -1;
+	}
+	
+	//try to find near handles +- 1...
+	handleExtend.first = lowerhandle-1;
+	handleExtend.last = upperhandle+1;
+	node= lookupTree(& handleExtend, manager->handleTree);
+	if(node != NULL){
+		//found one merge them
+	 	tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+		if(nodeRange->first == upperhandle+1){
+			nodeRange->first = lowerhandle;			
+		}else if(nodeRange->last +1 == lowerhandle){
+			nodeRange->last = upperhandle;
+		}else{
+			assert(0);
+		}
+	 	//try to recursively merge entries
+	 	int changed = 1;
+	 	while(changed){
+			changed = 0;	 		
+	 		tree_node * newnode;	 			 		
+	 		//find left side extend:
+			handleExtend.first = nodeRange->first-1;
+			handleExtend.last  = nodeRange->first-1;
+			newnode= lookupTree(& handleExtend, manager->handleTree);
+			if(newnode != NULL){
+				changed = 1;
+			 	tas_handleRange * nodeRangeNew = (tas_handleRange *) newnode->data;
+				nodeRange->first = nodeRangeNew->first;
+				deleteNodeFromTree(newnode,manager->handleTree);				
+			}
+			
+			//find right side extend:
+			handleExtend.first = nodeRange->last+1;
+			handleExtend.last  = nodeRange->last+1;
+			newnode= lookupTree(& handleExtend, manager->handleTree);			
+			if(newnode != NULL){
+				changed = 1;
+			 	tas_handleRange * nodeRangeNew = (tas_handleRange *) newnode->data;
+				nodeRange->last = nodeRangeNew->last;
+				deleteNodeFromTree(newnode,manager->handleTree);				
+			}
+	 	}
+	 	
+	}else{
+		//create a new single entry...
+		//this should be optimized later
+		insertHandleExtend(manager, lowerhandle, upperhandle);
+	}
+	
+	gen_mutex_unlock(& manager->tree_mutex);	
+	
+	
+	return 0;
+}
+
+int lookupHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+	tas_handle_manager* manager = lookupCollection(coll_id, handle);
+	if(manager == NULL) return 0;
+	
+	tree_node * node;
+	PVFS_handle_extent handleExtend;
+	handleExtend.first = handle;
+	handleExtend.last = handle;
+	gen_mutex_lock(& manager->tree_mutex);
+	node= lookupTree(& handleExtend, manager->handleTree);
+	gen_mutex_unlock(& manager->tree_mutex);
+	if(node == NULL) return 0;
+	return 1;
+}
+

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree.h	2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,60 @@
+/*
+ * Author: Julian Kunkel 2005
+ */
+#ifndef REDBLACKTREE_H_
+#define REDBLACKTREE_H_
+
+#include<stdlib.h>
+
+#ifndef TRUE
+	#define TRUE 1
+	#define FALSE 0
+#endif
+
+enum node_color{
+	RED=0,BLACK=1
+};
+
+typedef void RBData;
+typedef void RBKey;
+
+
+struct red_black_tree_node{
+	struct red_black_tree_node* parent;	
+	struct red_black_tree_node* left;
+	struct red_black_tree_node* right;
+
+	RBData * data;
+	char color;	
+};
+
+typedef struct red_black_tree_node tree_node;
+
+struct red_black_tree_{
+	tree_node * head;
+	int (*compare)(RBData * data, RBKey * key);
+	int (*compare2)(RBData * data, RBData * data2);	
+};
+
+typedef struct red_black_tree_ red_black_tree;
+
+//functions
+tree_node* lookupTree(RBKey *key, red_black_tree* tree);
+red_black_tree *newRedBlackTree(int (*compare)(RBData * data, RBKey * key),int (*compare2)(RBData * data, RBData* data2));
+void freeEmptyRedBlackTree(red_black_tree ** tree);
+ 
+tree_node * insertKeyIntoTree(RBData * data, red_black_tree* tree);
+
+void deleteNodeFromTree(tree_node*node , red_black_tree* tree);
+//this function returns NULL if the node is deleted, or the node which changed position
+//this is necessary if the data contains references to the node :)
+tree_node * deleteNodeFromTree2(tree_node*node , red_black_tree* tree);
+
+
+void iterateRedBlackTree(void (*callback)(RBData* data, void* funcData), red_black_tree* tree,void* funcData);
+
+//several compare functions:
+int compareInt64(RBData * data, RBKey * key);
+
+#endif /*REDBLACKTREE_H_*/
+

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ module.mk.in	2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,10 @@
+DIR := src/io/trove/trove-tas
+LIBSRC += \
+	$(DIR)/red-black-tree.c
+
+SERVERSRC += \
+	$(DIR)/tas.c \
+	$(DIR)/red-black-tree.c	\
+	$(DIR)/tas-handle-manager.c \
+	$(DIR)/tas-queue.c 
+	

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree.c	2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,368 @@
+/*
+ * Author: Julian Kunkel 2005
+ */
+
+#include "red-black-tree.h"
+
+int compareInt64(RBData * data, RBKey * key){
+		int64_t ikey1=*((int64_t *) data);
+		int64_t ikey2=*((int64_t *) key);		
+	
+		if(ikey1 > ikey2 ){ //take right neigbour
+			return +1;
+		}else if(ikey1 < ikey2){//take left neigbour
+			return -1;
+		}	
+		return 0;
+}
+
+static void inorderWalkthrough(void (*callback)(RBData* data, void* funcData),tree_node* node,void* funcData){
+	if(node->left != NULL){
+		inorderWalkthrough(callback,node->left,funcData);
+	}
+	(*callback)(node->data,funcData);
+	if(node->right != NULL){
+		inorderWalkthrough(callback,node->right,funcData);
+	}	
+}
+
+void iterateRedBlackTree(void (*callback)(RBData* data, void* funcData), red_black_tree* tree,void* funcData){
+	if(tree == NULL || tree->head == NULL) 
+		return;
+	inorderWalkthrough(callback,tree->head,funcData);
+}
+
+//take care of memory managment of the data by yourself !
+tree_node* lookupTree(RBKey *key, red_black_tree* tree){
+	tree_node* actNode = tree->head;
+	while(actNode != NULL){
+		int compret=(tree->compare)(actNode->data,key);
+		if(compret > 0){ //take right neigbour
+			actNode = actNode->right;
+		}else if(compret < 0){//take left neigbour
+			actNode = actNode->left;			
+		}else{ //found key
+			return actNode;
+		}
+	}
+	
+	return NULL;
+}
+
+
+red_black_tree *newRedBlackTree(int (*compare)(RBData * data, RBKey * key),int (*compare2)(RBData * data, RBData* data2)){
+ 	red_black_tree * tree=malloc(sizeof(red_black_tree));
+ 	tree->head = NULL;
+ 	tree->compare = compare;
+ 	tree->compare2 = compare2;
+ 	return tree;
+}
+
+void freeEmptyRedBlackTree(red_black_tree ** tree){
+	if(*tree != NULL){
+		free(*tree);
+		*tree = NULL;
+	}
+}
+
+inline tree_node* nodesibling(tree_node * node){
+	tree_node* parent = node->parent;
+	if(parent == NULL) return NULL;
+	return parent->left == node ? parent->right : parent->left;
+}
+
+inline void rotateleft(tree_node* node, red_black_tree* tree){
+	tree_node* parent = node->parent;
+	tree_node* sibling = node->right;
+	
+	if(parent != NULL){
+		if(parent->left == node){
+			parent->left = sibling;	
+		}else{
+			parent->right = sibling;	
+		}
+	}else{ //grandparent == NULL
+		tree->head = sibling;
+	}
+	sibling->parent = parent;
+	//connection to parent tree is now correct...
+	node->right = sibling->left;
+	if(sibling->left != NULL){
+		sibling->left->parent = node;
+	}
+
+	sibling->left = node;
+	node->parent = sibling;
+}
+
+inline void rotateright(tree_node* node, red_black_tree* tree){
+	tree_node* sibling = node->left;
+	if(node->parent != NULL){
+		tree_node* parent = node->parent;
+		
+		if(parent->left == node){
+			parent->left = sibling;	
+		}else{
+			parent->right = sibling;	
+		}
+	}else{ //node is head of tree
+		tree->head = sibling;
+	}
+	sibling->parent = node->parent;
+	//connection to parent tree is now correct...
+
+	node->left = sibling->right;
+	if(sibling->right != NULL)
+		sibling->right->parent = node;
+	
+	sibling->right = node;	
+	node->parent = sibling;
+}
+
+
+//recursive function to recreate RB tree condition
+static void rebuildRBTree(red_black_tree* tree, tree_node* node){
+	if(node == tree->head){
+		node->color = BLACK;
+		return;
+	}
+	tree_node* parent = node->parent;
+	//No path from the root to a leaf may contain two consecutive nodes colored red
+	if( parent->color == RED ){ //condition not held !
+		//check cases !!! Decision depends on the grandpa of the new node position...
+		tree_node* grandparent = parent->parent;
+		//printf("Condition not held key:%d parent:%d grandpa:%d !\n",node->key, parent->key, grandparent->key);
+		tree_node* uncle=grandparent->right;
+		if(parent == grandparent->right){
+			uncle = grandparent->left;
+		}
+		
+		if(uncle != NULL && uncle->color == RED){ 
+			//case LLr or LRb
+			grandparent->color = RED;
+			uncle->color = BLACK;
+			parent->color = BLACK;
+			//recursivly call function
+			rebuildRBTree(tree, grandparent);
+			return;
+		}		
+		if(parent == grandparent->left){ //grandparent->color must be BLACK because parent is RED
+			 if(parent->left == node){ //case LLb
+				rotateright(grandparent,tree);
+				grandparent->color = RED;
+				parent->color=BLACK;
+				return;
+			}else{//case LRb
+				rotateleft(parent,tree);
+				rotateright(grandparent,tree);
+				grandparent->color = RED;
+				node->color=BLACK;				
+				return;
+			}			
+		}else{
+			 if(parent->left == node){ //case RLb
+				rotateright(parent,tree);
+				rotateleft(grandparent,tree);				
+				grandparent->color = RED;
+				node->color=BLACK;
+				return;
+			}else{//case RRb
+				rotateleft(grandparent,tree);
+				grandparent->color = RED;
+				parent->color=BLACK;				
+				return;
+			}						
+		}
+		
+		
+	} 
+}
+
+tree_node * insertKeyIntoTree(RBData * data, red_black_tree* tree){
+	tree_node* new_node=(tree_node*) malloc(sizeof(tree_node));
+	new_node->data = data;
+	
+	if(tree->head == NULL){ // if the tree is empty 
+		tree->head = new_node;
+		new_node->parent = NULL;
+		new_node->left = NULL;
+		new_node->right = NULL;		
+		new_node->color = BLACK;
+		return new_node;
+	}
+	
+	tree_node* actNode = tree->head;
+	tree_node* parentNode = NULL;
+	
+
+	while(actNode != NULL){
+		parentNode = actNode;
+		int compret=(tree->compare2)(actNode->data,data);
+		if(compret > 0){ //take right neigbour
+			actNode = actNode->right;
+		}else if(compret < 0){//take left neigbour
+			actNode = actNode->left;			
+		}else{ //found key, whoops data already exits !!!!
+			return NULL;
+		}	
+	}
+	
+	//found position for insert !!!	
+	//binary tree insertion always at a leaf node:
+	if((tree->compare2)(parentNode->data,data) > 0){ //take right neigbour
+		parentNode->right = new_node;
+	}else{//take left neigbour
+		parentNode->left = new_node;
+	}	
+	new_node->parent = parentNode;
+	new_node->left = NULL;
+	new_node->right = NULL;
+	new_node->color = RED;
+	
+	//check if red-black-tree conditions are held	
+	rebuildRBTree(tree, new_node);
+	//insert RedBlack condition is now held.
+	return new_node;
+}
+
+static void rebuildTreeAfterDeletion(red_black_tree* tree, tree_node* node){
+	tree_node * parent = node->parent;
+	if(node->color == BLACK){
+		//complex cases:
+		//now the (possible existing) child is black 
+		//if(child == NULL  || child->color == BLACK){ 
+		if(parent != NULL){  
+			//delete case 2				
+			tree_node * sibling = nodesibling(node);
+			if(sibling != NULL && sibling->color == RED){
+				parent->color = RED;
+				sibling->color = BLACK;
+				if( node == parent->left ){
+					rotateleft(parent,tree);
+				}else{
+					rotateright(parent,tree);						
+				}
+				sibling = nodesibling(node);
+			}
+			//delete case 3
+			if(parent->color == BLACK && (sibling == NULL || (sibling->color == BLACK && 
+				(sibling->left == NULL || sibling->left->color == BLACK) && 
+				(sibling->right == NULL || sibling->right->color == BLACK)))
+																			){
+				if(sibling!= NULL) 
+					sibling->color = RED;
+				rebuildTreeAfterDeletion(tree,node->parent);
+			}else{ 
+				//delete case 4
+				if(parent->color == RED && (sibling == NULL || (sibling->color == BLACK && 
+				(sibling->left == NULL || sibling->left->color == BLACK) && 
+				(sibling->right == NULL || sibling->right->color == BLACK)))
+																		){
+					sibling->color = RED;
+					parent->color = BLACK;
+				}else{
+				//delete case 5
+					if(parent->left == node && sibling->color == BLACK &&
+						sibling->left != NULL && sibling->left->color == RED &&
+						(sibling->right == NULL || sibling->right->color == BLACK)
+					){
+						sibling->color = RED;
+						sibling->left->color = BLACK;
+						rotateright(sibling,tree);
+					}else if(parent->right == node && sibling->color == BLACK  && 
+						sibling->right != NULL && sibling->right->color == RED &&
+						(sibling->left == NULL || sibling->left->color == BLACK)
+																				){
+						sibling->color = RED;
+						sibling->right->color = BLACK;
+						rotateleft(sibling,tree);
+					}
+				//delete case 6
+					parent = node->parent;
+					sibling = nodesibling(node);
+					sibling->color = parent->color;
+					parent->color = BLACK;
+					if( node == parent->left){
+						if(sibling->right != NULL)
+							sibling->right->color = BLACK;
+						rotateleft(parent,tree);
+					}else{
+						if(sibling->left != NULL)
+							sibling->left->color = BLACK;
+						rotateright(parent,tree);
+					}
+				}
+			}	
+		}//else new node is tree root, do nothing...
+
+		
+	}// end complex cases!
+}
+
+void deleteNodeFromTree(tree_node*node , red_black_tree* tree){
+	tree_node * changed = deleteNodeFromTree2(node,tree);
+	if(changed != NULL){
+		deleteNodeFromTree2(changed,tree);
+	}
+}
+
+tree_node * deleteNodeFromTree2(tree_node*node , red_black_tree* tree){
+	if(node->left != NULL && node->right != NULL){	//node has two children...
+		//look for maximum value in the left subtree, O(log(N))
+		tree_node * act_node;
+		for(act_node=node->left; act_node->right != NULL; act_node=act_node->right);
+		node->data = act_node->data;
+
+		//delete old node which has only one child
+		return act_node;
+	}
+	
+	tree_node * child = node->left == NULL ? node->right : node->left;
+	int createdChild = FALSE;
+	//delete node which now has at most one child
+	//use node as dummy child
+	if(node->color == BLACK && node->parent != NULL && child == NULL){		
+			child=node;
+			child->data = NULL;
+			createdChild = TRUE;
+	}
+	
+	//relink nodes
+	if(node->parent != NULL){
+		if(node->parent->left == node){
+			node->parent->left = child;
+		}else{
+			node->parent->right = child;
+		}
+	}else{ //node was tree head !!!
+		tree->head = child;
+	} 
+	if(child != NULL){
+		child->parent = node->parent;
+		if(node->color == BLACK){
+			if(child->color == RED){
+				child->color = BLACK; 
+			}else{
+				//to start rebuild tree...
+				rebuildTreeAfterDeletion(tree, child);
+			}
+		}
+	}
+
+	if(createdChild){
+			tree_node * parent = child->parent;
+			
+			if(parent != NULL){
+				if(parent->left == child){
+					parent->left = NULL;
+				}else{
+					parent->right = NULL;
+				}
+			}
+	}
+
+	//give node memory free !!!
+	free(node);
+	return NULL;
+}
+

--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ README	2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,3 @@
+This is an implementation of the trove interface which will not save persitent data.
+TAS = Trove analysis stub should be used to measure the performance overhead of the 
+pvfs2 layers between application and storage space.



More information about the Pvfs2-cvs mailing list