[Pvfs2-cvs] commit by kunkel in pvfs2/src/io/trove/trove-tas: tas.h
tas-handle-manager.h red-black-tree-test.c tas-queue.c tas.c
tas-queue.h tas-handle-manager.c red-black-tree.h
module.mk.in red-black-tree.c README
CVS commit program
cvs at parl.clemson.edu
Sun Oct 29 08:19:13 EST 2006
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-tas
In directory parlweb1:/tmp/cvs-serv25603/src/io/trove/trove-tas
Added Files:
Tag: pvfs2-kunkel-tas-branch
tas.h tas-handle-manager.h red-black-tree-test.c tas-queue.c
tas.c tas-queue.h tas-handle-manager.c red-black-tree.h
module.mk.in red-black-tree.c README
Log Message:
Trove TAS modul added.
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas.h 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,16 @@
+/*
+ * Author Julian Kunkel
+ * Date: 2005
+ */
+
+#ifndef __TROVE_TAS_H
+#define __TROVE_TAS_H
+#include "gossip.h"
+#include "trove.h"
+#include "trove-internal.h"
+
+#define DUMMY_OPNUM -1
+#define RETURN_IMMEDIATE_COMPLETE 1
+#define RETURN_TEST_COMPLETE 0
+
+#endif
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas-handle-manager.h 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,28 @@
+#ifndef TASHANDLEMANAGER_H_
+#define TASHANDLEMANAGER_H_
+
+#include "red-black-tree.h"
+#include "trove.h"
+
+#include "gen-locks.h"
+
+typedef PVFS_handle_extent tas_handleRange;
+
+void initialiseHandleManager(void);
+/*
+ * If handle manager should maintain only one handle range for a collection set
+ * lowerBound2 = 0
+ */
+void initialiseCollection(TROVE_coll_id coll_id,TROVE_handle lowerBound,
+ TROVE_handle upperBound,TROVE_handle lowerBound2, TROVE_handle upperBound2);
+
+TROVE_handle getHandle(TROVE_coll_id coll_id,TROVE_handle_extent_array * extends);
+TROVE_handle getfixedHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+TROVE_handle getArbitraryHandle(TROVE_coll_id coll_id,TROVE_handle handleSep);
+
+int lookupHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+
+int freeHandle(TROVE_coll_id coll_id,TROVE_handle handle);
+int freeHandleRange(TROVE_coll_id coll_id,TROVE_handle lowerhandle,TROVE_handle upperhandle);
+
+#endif /*TASHANDLEMANAGER_H_*/
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree-test.c 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,138 @@
+#include<stdlib.h>
+#include<stdio.h>
+#include<assert.h>
+#include<math.h>
+
+#include "red-black-tree.h"
+
+int inorderCheck(tree_node* node,int depth, int * nodes, int redNodes, int blackNodes, int * leafBlackNodes){
+ if(node == NULL) return 0;
+ int maxdepth = depth+1;
+ int retdepth =0;
+
+ if(depth == 0){ //initialise leaf black nodes
+ *leafBlackNodes = -1;
+ }
+
+ (*nodes)++;
+ if(node->color == RED)
+ redNodes++;
+ else
+ blackNodes++;
+ if(node->left != NULL){
+ if(node->color == RED && node->left->color == RED){
+ printf("WARNING two red nodes ...");
+ assert(0);
+ }
+ retdepth=inorderCheck(node->left,depth+1, nodes,redNodes,blackNodes, leafBlackNodes);
+ if(retdepth > maxdepth) maxdepth = retdepth;
+ if(node->left->parent != node){
+ printf("ERROR wrong parent node %lld parent %lld", *((int64_t*) node->left->data), *((int64_t*) node->data));
+ assert(0);
+ }
+ if(compareInt64(node->left->data,node->data) != +1){
+ printf("ERROR Left key!\n");
+ assert(0);
+ }
+ }
+ if(node->right != NULL){
+ if(node->color == RED && node->right->color == RED){
+ printf("WARNING two red nodes node and right son...");
+ assert(0);
+ }
+ if(compareInt64(node->right->data,node->data) != -1){
+ printf("ERROR right key!\n");
+ assert(0);
+ }
+ if(node->right->parent != node){
+ printf("ERROR wrong parent node %lld parent %lld", *((int64_t*) node->right->data), *((int64_t*) node->data));
+ assert(0);
+ }
+
+ retdepth=inorderCheck(node->right,depth+1, nodes,redNodes,blackNodes,leafBlackNodes);
+ if(retdepth > maxdepth) maxdepth = retdepth;
+ }
+
+ if(node->left == NULL && node->right == NULL){ //leaf, check if blackNodeCondition is held
+ if(*leafBlackNodes == -1){ //first leaf is reached
+ *leafBlackNodes = blackNodes;
+ }
+ if(blackNodes != *leafBlackNodes){
+ printf(" WARNING blackNodes to leafs are different now:%d should be:%d condition not held\n",blackNodes, *leafBlackNodes);
+ assert(0);
+ }
+ }
+
+ if(depth == 0){
+ double nld2 = log((double) *nodes)/log(2.0)*2+1;
+ if( maxdepth > nld2 )
+ printf(" WARNING tree with %d nodes to deep should be max:%f is %d\n", *nodes, nld2, maxdepth);
+ }
+ return maxdepth;
+}
+
+int64_t numexists=0;
+
+void createRandomTreeNode(red_black_tree * tree){
+ int num=rand()%10000;
+ int64_t * data = malloc(sizeof(int64_t));
+ *data = num;
+ insertKeyIntoTree( (RBData*)data, tree);
+ numexists = num;
+}
+
+int main(int argc, char ** argv){
+
+ red_black_tree * tree = newRedBlackTree(compareInt64,compareInt64);
+ int i;
+ int nodecount=0,nodecountA;
+ int blackNodes;
+ int depth=0,depthA;
+
+ RBData * data = malloc(sizeof(int));
+ int one=1;
+ data = (RBData*) &one;
+ int two=2;
+ printf("CompareInt64 1,2 %d\n",compareInt64((RBData*) &one,(RBData*) &two));
+ printf("CompareInt64 2,1 %d\n",compareInt64((RBData*) &two,(RBData*) &one));
+ printf("CompareInt64 1,1 %d\n",compareInt64((RBData*) &one,(RBData*) &one));
+
+ int seed=3;
+
+ if(argc == 2){
+ sscanf(argv[1],"%d",&seed);
+ }
+
+ if(seed == 0){
+ printf("Syntax: %s randomSeedNum\n",argv[0]);
+ exit(1);
+ }
+ //for debugging purpose take argv as initialisation value
+ srand(seed);
+
+ for(i=0; i < 5000; i++){
+ createRandomTreeNode(tree);
+ nodecountA = 0;
+ depthA=inorderCheck(tree->head,0,& nodecountA,0,0,& blackNodes);
+ }
+ printf("depth for %d nodes is %d\n",nodecountA,depthA);
+ nodecount = -1;
+ depth = -1;
+ printf("Try to delete existing num %llu, %p\n", numexists, lookupTree((void*)&numexists,tree) );
+
+ for(i=0; i < 5000; i++){
+ int64_t num=rand()%10000;
+ if(lookupTree((void*)&num,tree) == NULL) continue;
+ tree_node * node = lookupTree((void*)&num,tree);
+ free(node->data);
+ deleteNodeFromTree(node,tree);
+ nodecount = 0;
+ depth=inorderCheck(tree->head,0,& nodecount,0,0,& blackNodes);
+ }
+
+ printf("Test correctly done\n");
+ printf("depth for %d nodes is %d\n",nodecountA,depthA);
+ printf("depth now for %d nodes is %d\n" ,nodecount,depth);
+
+ return 0;
+}
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas-queue.c 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,96 @@
+#include<stdio.h>
+#include "tas-queue.h"
+
+tas_queue *newTasQueue(void){
+ tas_queue * ret=malloc(sizeof(tas_queue));
+ *ret = NULL;
+ return ret;
+}
+
+void relinkFront(queue_elem * elem, tas_queue* queue){
+ if(elem->prev != NULL){
+ //put found element at front of list...
+ queue_elem * tmp = elem->prev;
+ (*queue)->prev = elem; //is NULL before
+ if(elem->next != NULL){
+ elem->next->prev = elem->prev;
+ }
+ tmp->next = elem->next;
+ //now make found the first element...
+ elem->next = *queue;
+ elem->prev = NULL;
+ *queue = elem;
+ }
+}
+
+void * insertKeyIntoQueue(queueKey key, void * data, tas_queue* queue){
+ queue_elem * elem = lookupQueue(key, queue);
+ void * olddata=NULL;
+ if(elem == NULL){
+ pushFrontKey(key, data,queue);
+ }else{
+ //replace existing data
+ olddata = elem->data;
+ elem->data = data;
+ relinkFront(elem,queue);
+ }
+ return olddata;
+}
+
+queue_elem * pushFrontKey(queueKey key, void * data, tas_queue* queue){
+ queue_elem * elem = malloc(sizeof(queue_elem));
+ elem->data = data;
+ elem->key = key;
+ elem->prev = NULL;
+ elem->next = *queue;
+ if(*queue != NULL){
+ (*queue)->prev = elem;
+ }
+ *queue = elem;
+ return elem;
+}
+
+queue_elem * lookupQueue(queueKey key, tas_queue * queue){
+ queue_elem * found=*queue;
+ int i=0;
+ for(; found != NULL ; found = found->next){
+ if(found->key == key){
+ if( i > WARNING_NUM){
+ printf(" WARNING: lookupQueue takes %d iterations\n",i);
+ }
+ return found;
+ }
+ i++;
+ }
+ if( i > WARNING_NUM){
+ printf(" WARNING: lookupQueue takes %d iterations and did not find handle\n",i);
+ }
+ return NULL;
+}
+
+void deleteElementFromList(queue_elem * found, tas_queue* queue){
+ if(found->prev == NULL){ //first entry
+ *queue = found->next;
+ if(found->next != NULL){
+ found->next->prev = NULL;
+ }
+ }else{ //not first entry
+ found->prev->next = found->next;
+ if(found->next != NULL){
+ found->next->prev = found->prev;
+ }
+ }
+}
+
+void * deleteKeyFromList(queueKey key, tas_queue* queue){
+ queue_elem * found=lookupQueue(key,queue);
+ if(found == NULL) return NULL;
+ deleteElementFromList(found,queue);
+ void * data;
+ data = found->data;
+ free(found);
+ return data;
+}
+
+
+
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas.c 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,1765 @@
+/*
+ * Author Julian Kunkel
+ * Date: 2005
+ */
+
+#include <stdio.h>
+#include <string.h>
+/*for unlink:*/
+#include <unistd.h>
+#include <assert.h>
+
+#include "tas.h"
+#include "gen-locks.h"
+#include "pvfs2-internal.h"
+
+extern struct TROVE_dspace_ops dbpf_dspace_ops;
+extern struct TROVE_mgmt_ops dbpf_mgmt_ops;
+
+#define TRUE 1
+#define FALSE 0
+
+#if defined(TAS_USE_RED_BLACKTREE)
+#elif defined(TAS_USE_QUEUE)
+#else
+ #warning "No implementation for TROVE-TAS metadata selected.\n Choosing Red-Black-Tree"
+ #define TAS_USE_RED_BLACKTREE
+#endif
+
+
+/*
+ * There are two different implementations which may be choosen while compile time:
+ * 1) A very fast queue implementation which pushes
+ * the last used handle in front of the queue (speedup for second lookup).
+ * However the queue implementation is for the creation of very much metafiles
+ * slow because all handles have to be checked. There are some optimizations for
+ * that implementation, but the creation optimization is not compatible to the
+ * normal trove behaviour !
+ * 2) a red black tree implementation. Takes more memory than the list,
+ * but faster lookup because the depth <= 40 for 100.000 filehandles....
+ * TAS_USE_QUEUE, TAS_USE_RED_BLACKTREE
+ */
+#if defined(TAS_USE_RED_BLACKTREE)
+ #include "red-black-tree.h"
+#elif defined(TAS_USE_QUEUE)
+
+ #include "tas-queue.h"
+ /*
+ * maximum number of key/value pairs to be read until the functions returns
+ * that the key/value pair does not exist. This is a optimization for the creation of
+ * files. However if more files exist in a directory than that number these files cannot
+ * be accessed !!!! So be warned if you turn on maximumKeyValReadOptimization
+ */
+ #define maximumKeyValReadOptimization
+
+ //set this value to at least ( number of clients x 10 ),
+ //the number depends on the speed of the netwerk link.
+ #define maximumKeyValRead 100
+#endif
+
+//this optimizations can be turned off:
+//set the file size during IO operations, only small performance speedup.
+#define TAS_UPDATE_FILESIZE
+
+//Do not check if handle is already created in dspace_create...
+#define NO_SAVE_CREATION
+
+
+
+//tas metadata + dspace cache....
+struct keyvalpair_{
+ #if defined(TAS_USE_RED_BLACKTREE)
+ struct keyvalpair_ * next;
+ struct keyvalpair_ * prev; //needed for fast keyval_iteration...
+ #elif defined(TAS_USE_QUEUE)
+ struct keyvalpair_ * next;
+ int64_t keyHash;
+ #endif
+
+ TROVE_keyval_s key;
+ TROVE_keyval_s val;
+};
+
+
+typedef struct keyvalpair_ keyvalpair;
+
+
+struct storeAttributes_{
+ TROVE_ds_type type;
+ PVFS_uid uid;
+ PVFS_gid gid;
+ PVFS_permissions mode;
+ PVFS_time atime;
+ PVFS_time ctime;
+ PVFS_time mtime;
+ uint32_t dfile_count;
+ uint32_t dist_size;
+};
+typedef struct storeAttributes_ storeAttributes;
+
+
+
+ struct handle_ {
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ red_black_tree * pairTree;
+ keyvalpair * pairList;
+ #elif defined(TAS_USE_QUEUE)
+ keyvalpair * pairs;
+ #endif
+
+ PVFS_fs_id fs_id;
+ PVFS_handle handle;
+
+ PVFS_size b_size; /* bstream size */
+ PVFS_size k_size; /* keyval size; # of keys */
+ /* stored attributes are below here */
+
+ storeAttributes * attributes;
+ };
+
+
+//If some other values than only zero bytes should be returned, the
+// buffer of that values should be as large as transfered by the flow protocol.
+//This should be the same value as used by the used flow protocol:
+//currently the buffer_size is defined in the FLOWPROTOXY.c file
+//so this value can not be included here
+//#define BUFFER_SIZE (256*1024)
+
+typedef struct handle_ handleCache;
+
+
+struct handleFSID_{
+ PVFS_handle handle;
+ PVFS_fs_id fs_id;
+};
+typedef struct handleFSID_ handleFSID;
+
+
+static PVFS_size meta_count=0;
+static int initialised=0;
+
+static gen_mutex_t tas_meta_mutex = GEN_MUTEX_INITIALIZER;
+
+static char storageName[200] ="";
+static char storagePath[200] ="";
+
+TROVE_handle aktHandleFirst;
+TROVE_handle handleSeperator; //number which seperates first and second handle range, for example when this server is a meta and dataserver
+TROVE_handle aktHandleSecond;
+
+#if defined(TAS_USE_RED_BLACKTREE)
+ //there might be conflicts in the red-black-tree for keyHashValues
+ //a solution would be to use the full void* key and not a int64_t key
+ //a order might be established by a lexical sort.
+ //a other solution is to link
+ //compare function for the red-black-tree
+ static int compareKeyValPairs(RBData * data, RBKey * key){
+ TROVE_keyval_s ikey1=((keyvalpair*)data)->key;
+ TROVE_keyval_s * ikey2=(TROVE_keyval_s *) key;
+
+ if(ikey1.buffer_sz > ikey2->buffer_sz ){ //take right neigbour
+ return +1;
+ }else if(ikey1.buffer_sz < ikey2->buffer_sz){//take left neigbour
+ return -1;
+ }
+ //same length, compare memory
+ return memcmp(ikey1.buffer,ikey2->buffer, ikey1.buffer_sz);
+ }
+
+ static int compareKeyValPairs2(RBData * data, RBData * data2){
+ TROVE_keyval_s ikey1=((keyvalpair*)data)->key;
+ TROVE_keyval_s ikey2=((keyvalpair*)data2)->key;
+
+ if(ikey1.buffer_sz > ikey2.buffer_sz ){ //take right neigbour
+ return +1;
+ }else if(ikey1.buffer_sz < ikey2.buffer_sz){//take left neigbour
+ return -1;
+ }
+ //same length, compare memory
+ return memcmp(ikey1.buffer,ikey2.buffer, ikey1.buffer_sz);
+ }
+
+ //for comparision of handles:
+ static int compareHandle(RBData * data, RBKey * key){
+ handleCache * ikey1=(handleCache *) data;
+ handleFSID * ikey2=(handleFSID *) key;
+
+ if(ikey1->handle > ikey2->handle ){
+ return +1;
+ }else if(ikey1->handle < ikey2->handle){
+ return -1;
+ }
+
+ if(ikey1->fs_id > ikey2->fs_id ){
+ return +1;
+ }else if(ikey1->fs_id < ikey2->fs_id){
+ return -1;
+ }
+ return 0;
+ }
+
+ static int compareHandle2(RBData * data, RBData * data2){
+ handleCache * ikey1=(handleCache *) data;
+ handleCache * ikey2=(handleCache *) data2;
+
+ if(ikey1->handle > ikey2->handle ){
+ return +1;
+ }else if(ikey1->handle < ikey2->handle){
+ return -1;
+ }
+
+ if(ikey1->fs_id > ikey2->fs_id ){
+ return +1;
+ }else if(ikey1->fs_id < ikey2->fs_id){
+ return -1;
+ }
+ return 0;
+ }
+#endif
+
+#if defined(TAS_USE_RED_BLACKTREE)
+ static red_black_tree * tas_meta_cache = NULL;
+#elif defined(TAS_USE_QUEUE)
+ static tas_queue * tas_meta_cache = NULL;
+#endif
+
+//TROVE_handle minMetaHandle = 0;
+//TROVE_handle maxMetaHandle = 0;
+//TROVE_handle minDatafileHandle = 0;
+//TROVE_handle maxDatafileHandle = 0;
+
+//simple word sum is hashval
+static inline int64_t keyHashFunc(void *key, int count){
+ int64_t sum=0;
+ int i;
+ int64_t multiplier=1;
+ char * keyc = (char *) key;
+
+ for(i=0; i < count; i++){
+ sum = sum + multiplier*keyc[i];
+ multiplier=multiplier*10;
+ }
+
+ return sum;
+}
+
+static inline int keysEqual(
+ #ifndef TAS_USE_RED_BLACKTREE
+ int keyHash1,int keyHash2,
+ #endif
+ int keyLen1,int keyLen2, void * key1, void *key2){
+ return
+ #ifndef TAS_USE_RED_BLACKTREE
+ keyHash1 == keyHash2 &&
+ #endif
+ keyLen1 == keyLen2
+ && memcmp(key1,key2,keyLen1) == 0 ;
+}
+
+
+
+static handleCache * allocateNewCacheElement(void){
+ handleCache * found = (handleCache *) malloc(sizeof(handleCache));
+ bzero(found,sizeof(handleCache));
+ found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+ bzero(found->attributes,sizeof(storeAttributes)); //this is neccessary because
+ // in prelude.sm it is expected for a datafile to have a 0 in some fields ..
+ return found;
+}
+
+static void insertCacheElement(handleCache* found,TROVE_handle new_handle, TROVE_coll_id coll_id,
+ TROVE_ds_type type){
+ found->fs_id = coll_id; //
+ found->attributes->type = type;
+ found->handle = new_handle;
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ found->pairList = NULL;
+ found->pairTree = NULL;
+ insertKeyIntoTree((RBData*) found,tas_meta_cache);
+ #elif defined(TAS_USE_QUEUE)
+ found->pairs = NULL;
+ pushFrontKey(new_handle,found,tas_meta_cache);
+ #endif
+
+ meta_count++;
+}
+
+static keyvalpair * findPairForCacheEntry(handleCache * cacheEntry, TROVE_keyval_s * key_p){
+ #if defined(TAS_USE_RED_BLACKTREE)
+ if(cacheEntry->pairTree == NULL){
+ return FALSE;
+ }
+ tree_node* node = (tree_node*) (lookupTree(key_p ,cacheEntry->pairTree));
+
+ if(node == NULL)
+ return FALSE;
+ return (keyvalpair*) node->data;
+ #elif defined(TAS_USE_QUEUE)
+ keyvalpair * found_pair;
+ int i=0;
+ int64_t key_hash = keyHashFunc(key_p->buffer,key_p->buffer_sz);
+ found_pair=cacheEntry->pairs;
+ for(; found_pair != NULL ; found_pair = found_pair->next){
+ if( keysEqual(
+ #ifndef TAS_USE_RED_BLACKTREE
+ found_pair->keyHash, key_hash,
+ #endif
+ found_pair->key.buffer_sz, key_p->buffer_sz ,
+ found_pair->key.buffer,key_p->buffer) ){
+ /* key exists */
+ if( i > 10 ){
+ gossip_err(" WARNING: readToMatchKEY/VALUE: %d\n",i);
+ }
+ return found_pair;
+ }
+ i++;
+ #ifdef maximumKeyValReadOptimization
+ if(i > maximumKeyValRead)
+ // printf(" KILLED %s\n",(char*)key_p->buffer);
+ return NULL;
+ #endif
+ }
+
+ if( i > 10 ){
+ gossip_err(" WARNING: readToMatchKey did not find value for key %s needed iterations: %d\n", (char*)key_p->buffer,i);
+ }
+ #endif
+
+ return NULL;
+}
+
+static int removePairOfCacheEntry(handleCache * cacheEntry, TROVE_keyval_s * key_p){
+ keyvalpair * found_pair;
+
+
+#if defined(TAS_USE_RED_BLACKTREE)
+ if(cacheEntry->pairTree == NULL)
+ return FALSE;
+ tree_node * node = lookupTree(key_p,cacheEntry->pairTree);
+ found_pair=(keyvalpair*) (node->data);
+ if(found_pair != NULL) {
+ //delete Node from Tree:
+ deleteNodeFromTree(node,cacheEntry->pairTree);
+
+ //relink pairList:
+ if(found_pair->next != NULL){
+ found_pair->next->prev = found_pair->prev;
+ }
+ if(found_pair->prev == NULL){
+ //this is currently the head of the list
+ cacheEntry->pairList = found_pair->next;
+ if(found_pair->next == NULL){
+ //The Tree is now empty
+ freeEmptyRedBlackTree(& cacheEntry->pairTree);
+ }else{
+ found_pair->next->prev = NULL;
+ }
+ }else{ //not head of cacheEntry->pairList
+ found_pair->prev->next = found_pair->next;
+ }
+ free(found_pair->key.buffer);
+ free(found_pair->val.buffer);
+ free(found_pair);
+ //free(node->key) is not necessary because data and key are equal :)
+
+ return TRUE;
+ }
+ return FALSE;
+ #elif defined(TAS_USE_QUEUE)
+ keyvalpair *old;
+ int i=0;
+ int key_hash = keyHashFunc(key_p->buffer,key_p->buffer_sz) ;
+ found_pair=cacheEntry->pairs;
+ old = found_pair;
+
+ for(; found_pair != NULL ; old=found_pair, found_pair = found_pair->next){
+ if(keysEqual(
+ #ifndef TAS_USE_RED_BLACKTREE
+ found_pair->keyHash, key_hash,
+ #endif
+ found_pair->key.buffer_sz, key_p->buffer_sz ,
+ found_pair->key.buffer,key_p->buffer)){
+ //key exists
+
+ if(old == found_pair){ //first entry
+ cacheEntry->pairs = found_pair->next;
+ }else{ //not first entry
+ old->next = found_pair->next;
+ }
+ free(found_pair->key.buffer);
+ free(found_pair->val.buffer);
+ free(found_pair);
+
+ if( i > 10 ){
+ gossip_err(" WARNING: readToMatchKEY/VALUE remove: %d\n",i);
+ }
+ return TRUE;
+ }
+ i++;
+ }
+ if( i > 10 ){
+ gossip_err(" WARNING: readToMatchKEY/VALUE remove did not find key: %d\n",i);
+ }
+ return FALSE;
+ #endif
+}
+
+
+static inline handleCache * findCacheEntry(TROVE_handle handle, PVFS_fs_id fs_id){
+ #if defined(TAS_USE_RED_BLACKTREE)
+ handleFSID key;
+ key.handle = handle;
+ key.fs_id = fs_id;
+ tree_node * node= lookupTree(& key,tas_meta_cache);
+ if(node == NULL){
+ return NULL;
+ }
+ return (handleCache*) node->data;
+ #elif defined(TAS_USE_QUEUE)
+ queue_elem * elem = lookupQueue(handle, tas_meta_cache);
+ if(elem == NULL){
+ return NULL;
+ }
+ relinkFront(elem, tas_meta_cache); //push element to front of cache
+ return (handleCache*)elem->data;
+ #endif
+}
+
+static int removeCacheEntry(TROVE_handle handle, PVFS_fs_id fs_id){
+ void * result=NULL;
+
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ handleFSID key;
+ key.handle = handle;
+ key.fs_id = fs_id;
+ tree_node * node = lookupTree(&key,tas_meta_cache);
+ if(node == NULL) return FALSE;
+ result = node->data;
+ deleteNodeFromTree(node,tas_meta_cache);
+ #elif defined(TAS_USE_QUEUE)
+ result = deleteKeyFromList(handle,tas_meta_cache);
+ #endif
+ if(result != NULL){
+ free(((handleCache*) result)->attributes);
+ free((handleCache*) result);
+ meta_count--;
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static inline void insertKeyValPair(handleCache * found, keyvalpair * pair){
+
+
+#if defined(TAS_USE_RED_BLACKTREE)
+ if(found->pairTree == NULL){ //create new RedBlackTree
+ found->pairTree=newRedBlackTree(compareKeyValPairs,compareKeyValPairs2);
+
+ pair->next = NULL;
+ found->pairList = pair;
+ }else{
+ pair->next = found->pairList;
+ found->pairList->prev = pair;
+ found->pairList = pair;
+ }
+ pair->prev = NULL;
+ insertKeyIntoTree((void*) pair, found->pairTree);
+
+ #elif defined(TAS_USE_QUEUE)
+ int64_t keyHash = keyHashFunc(pair->key.buffer,pair->key.buffer_sz);
+ pair->next = found->pairs;
+ found->pairs = pair;
+ pair->keyHash = keyHash;
+ #endif
+}
+
+static int tas_initialize (char *stoname,
+ TROVE_ds_flags flags){
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas Initialize\n");
+ int ret = dbpf_mgmt_ops.initialize(stoname, flags);
+
+ FILE * file = fopen (stoname, "r");
+ if(file == 0 ){ /* if storage does not exist... */
+ return -1;
+ }
+ #if defined(TAS_USE_RED_BLACKTREE)
+ tas_meta_cache = newRedBlackTree(compareHandle,compareHandle2);
+ #elif defined(TAS_USE_QUEUE)
+ tas_meta_cache = newTasQueue();
+ #endif
+
+ fclose(file);
+
+ initialised = 1;
+ meta_count = 0;
+
+ gen_mutex_init(& tas_meta_mutex);
+
+ sprintf(storagePath, "%s/tas", stoname);
+ mkdir(storagePath,511);
+ sprintf(storageName, "%s/tas/metadata.txt", stoname);
+
+ printf("TAS options:\n");
+ printf("\tCachetype ");
+
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ printf("red-black-tree \n");
+ #elif defined(TAS_USE_QUEUE)
+ printf("queue\n");
+ #endif
+
+ #ifdef maximumKeyValReadOptimization
+ printf("\tWARNING maximumKeyValReadOptimization enabled with %d files\n"
+ "\t Some file operations will not be possible with older files...\n",maximumKeyValRead);
+ #endif
+ #ifdef TAS_UPDATE_FILESIZE
+ printf("\tFile write will set filesize properly\n");
+ #else
+ printf("\tFile write will NOT set filesize properly, only resize will resize files\n");
+ #endif
+
+ return ret;
+}
+
+
+struct tas_finalizeData_{
+ FILE * file;
+ int64_t handle_count;
+ int64_t keyval_count;
+ int64_t keyval_mem_size;
+};
+typedef struct tas_finalizeData_ tas_finalizeData;
+
+static void keyvalwrite(void* data, void* funcData){
+ tas_finalizeData * fdata = (tas_finalizeData* ) funcData;
+ keyvalpair * pair = (keyvalpair*) data;
+ FILE* file = fdata->file;
+
+ fprintf(file," Key-size:%d Val-size:%d ", pair->key.buffer_sz, pair->val.buffer_sz);
+ fwrite(pair->key.buffer,1,pair->key.buffer_sz, file);
+ fprintf(file," Value:");
+ fwrite(pair->val.buffer,1,pair->val.buffer_sz, file);
+ fprintf(file,"\n");
+
+ /* count for memory usage */
+ fdata->keyval_count++;
+ fdata->keyval_mem_size+=pair->key.buffer_sz + pair->val.buffer_sz;
+}
+
+static void datawrite(void* data, void* funcData){
+ tas_finalizeData * fdata = (tas_finalizeData* ) funcData;
+ handleCache * found = (handleCache*) data;
+
+ FILE* file = fdata->file;
+ fprintf(file,"Handle:%Lu Type:%u FS-ID:%d UID:%u GID:%u MODE:%u DFILE_COUNT:%u DIST_SIZE:%u BYTE_SIZE:%llu KEYS:%llu CTIME:%llu MTIME:%llu ATIME:%llu\n",
+ found->handle, found->attributes->type,found->fs_id,
+ found->attributes->uid, found->attributes->gid, found->attributes->mode,
+ found->attributes->dfile_count, found->attributes->dist_size,
+ found->b_size, found->k_size,
+ found->attributes->ctime, found->attributes->mtime,found->attributes->atime );
+ fdata->handle_count++;
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ if( found->pairTree != NULL){
+ fdata->keyval_mem_size+= sizeof(red_black_tree);
+ iterateRedBlackTree(keyvalwrite, found->pairTree, fdata );
+ }
+ #elif defined(TAS_USE_QUEUE)
+ keyvalpair * pair = found->pairs;
+ for(; pair != NULL; pair = pair->next)
+ keyvalwrite(pair, NULL,fdata);
+ #endif
+}
+
+ /* calculate the approximate memory usage... */
+static int tas_finalize (void){
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas finalize and save metadata to disk ... \n");
+
+ if(! initialised || meta_count == 0 ) return RETURN_IMMEDIATE_COMPLETE;
+
+ FILE * file = fopen (storageName, "w");
+ if(file == 0){
+ gossip_debug(GOSSIP_TROVE_DEBUG," can not save metadata !!!!\n");
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+ /* do not take care of memory managment because server is terminating... */
+
+ fprintf(file,"Count:%llu\n",meta_count);
+ tas_finalizeData * fdata = malloc(sizeof(tas_finalizeData));
+ fdata->file = file;
+ fdata->handle_count = 0;
+ fdata->keyval_count = 0;
+ fdata->keyval_mem_size = 0;
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ iterateRedBlackTree(datawrite, tas_meta_cache, fdata );
+ #elif defined(TAS_USE_QUEUE)
+ iterate_tas_queue(datawrite(elem->data, & elem->key,fdata) ,elem,tas_meta_cache);
+ #endif
+
+ fclose(file);
+
+ /* calculate memory_usage */
+ int64_t memory_usage;
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ memory_usage = fdata->handle_count*(sizeof(storeAttributes)+ sizeof(handleCache)+sizeof(tree_node))
+ +fdata->keyval_count*(sizeof(keyvalpair)+sizeof(tree_node))+fdata->keyval_mem_size;
+ #elif defined(TAS_USE_QUEUE)
+ memory_usage = fdata->handle_count*(storeAttributes)+sizeof(handleCache)+sizeof(queue_elem))
+ +fdata->keyval_count*(sizeof(keyvalpair))+fdata->keyval_mem_size;
+ #endif
+
+ printf(" TAS handles:%lld keyval-pairs:%lld with buffers:%lld\n"
+ " approximate metadata memory usage: %lld Bytes - %f MByte \n",
+ fdata->handle_count, fdata->keyval_count, fdata->keyval_mem_size, memory_usage,
+ (double)memory_usage/1024/1024 );
+
+ return dbpf_mgmt_ops.finalize();
+}
+
+static int tas_storage_create (char *stoname,
+ void *user_ptr, TROVE_op_id * out_op_id_p){
+ int ret;
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_storage_create here: %s \n",
+ stoname);
+ ret = dbpf_mgmt_ops.storage_create(stoname, user_ptr, out_op_id_p);
+ FILE * file = fopen (storageName, "w");
+ fclose(file);
+ return ret;
+}
+
+static int
+tas_storage_remove (char *stoname, void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_storage_remove\n");
+ if(initialised){
+ unlink(storageName);
+ }
+
+ return dbpf_mgmt_ops.storage_remove(stoname, user_ptr, out_op_id_p);;
+}
+
+static int tas_collection_create(char *collname,
+ TROVE_coll_id new_coll_id,
+ void *user_ptr,
+ TROVE_op_id *out_op_id_p)
+{
+ return dbpf_mgmt_ops.collection_create(collname, new_coll_id,
+ user_ptr, out_op_id_p);
+}
+
+static int
+tas_collection_remove (char *collname,
+ void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+ return dbpf_mgmt_ops.collection_remove(collname, user_ptr, out_op_id_p);
+}
+
+static int
+tas_collection_iterate (TROVE_ds_position * inout_position_p,
+ TROVE_keyval_s * name_array,
+ TROVE_coll_id * coll_id_array,
+ int *inout_count_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * vtag,
+ void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+ return dbpf_mgmt_ops.collection_iterate(inout_position_p, name_array,
+ coll_id_array, inout_count_p, flags, vtag, user_ptr,
+ out_op_id_p);
+}
+
+static int
+tas_collection_lookup (char *collname,
+ TROVE_coll_id * out_coll_id_p,
+ void *user_ptr, TROVE_op_id * out_op_id_p)
+{
+ return dbpf_mgmt_ops.collection_lookup(collname, out_coll_id_p,
+ user_ptr, out_op_id_p);
+}
+
+
+
+/*
+ * For all bytestream operations except resize do not lookup handle if
+ * resize of file is not selected.
+ * Directly discard all I/O operations
+ */
+
+
+static int //currently this function is not used within pvfs2 !!!
+tas_bstream_read_at (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ void *buffer,
+ TROVE_size * inout_size_p,
+ TROVE_offset offset,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * out_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_bstream_read_at not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_read_at\n");
+ return 1;
+}
+
+static int //currently this function is not used within pvfs2 !!!
+tas_bstream_write_at (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ void *buffer,
+ TROVE_size * inout_size_p,
+ TROVE_offset offset,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * inout_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_bstream_write_at not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_write_at\n");
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+static int
+tas_bstream_resize (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_size * inout_size_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_resize\n");
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+ found->b_size = *inout_size_p;
+ *out_op_id_p=DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_bstream_validate (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_validate\n");
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_bstream_read_list (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+
+ char **mem_offset_array, //put the real data here
+ TROVE_size *mem_size_array, //size of the memory buffers
+ int mem_count,//number of memory buffers
+
+ TROVE_offset *stream_offset_array,
+ TROVE_size *stream_size_array,
+ int stream_count,//number of streams to read...
+ TROVE_size *out_size_p,//read bytes....
+ TROVE_ds_flags flags,
+ TROVE_vtag_s *vtag,
+ void *user_ptr,//contains the callback function trove_callback
+ //the function is called from bmi_recv_callback_fn if
+ //bsream_read_list returns 1
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p)
+{
+ int i;
+ TROVE_size readBytes=0;
+ for(i=0; i < stream_count; i++){
+ readBytes+=stream_size_array[i];
+ }
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_read_list handle:%lld flags:%d readBytes:%lld\n",lld(handle),flags,lld(readBytes));
+
+ *out_size_p = readBytes;
+ *out_op_id_p = DUMMY_OPNUM;
+
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+//have a look to io/flow/bmi_recv_callback_fn, there this funktion is called...
+static int
+tas_bstream_write_list (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+
+ char **mem_offset_array, //here comes the real data
+ TROVE_size *mem_size_array, //size of the memory buffers
+ int mem_count, //number of memory buffers
+
+ TROVE_offset *stream_offset_array,
+ TROVE_size *stream_size_array,
+ int stream_count, //number of streams to write...
+
+ TROVE_size *out_size_p, //written bytes....
+ TROVE_ds_flags flags,
+ TROVE_vtag_s *vtag,
+ void *user_ptr, //contains the callback function trove_callback
+ //the function is called from bmi_recv_callback_fn if
+ //bsream_write_list returns 1
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p)
+{
+ int i;
+
+ TROVE_size writtenBytes=0;
+ for(i=0; i < stream_count; i++){
+ writtenBytes+=stream_size_array[i];
+ }
+ gossip_debug(GOSSIP_TROVE_DEBUG, "TAS tas_bstream_write_list for handle:%lld flags:%d fake writtenBytes:%lld mem_count:%d stream_count:%d SYNC:%d\n",lld(handle),flags, lld(writtenBytes),mem_count, stream_count, flags);
+ *out_size_p = writtenBytes;
+
+/* set the actual file size */
+#ifdef TAS_UPDATE_FILESIZE
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+ //get the last written byte and enlarge the file if this byte is larger than filesize
+ for(i=0; i < stream_count; i++){
+ TROVE_size akt_pos = stream_size_array[i] + stream_offset_array[i];
+ if( akt_pos > found->b_size ){
+ found->b_size = akt_pos;
+ }
+ }
+ gen_mutex_unlock(&tas_meta_mutex);
+#endif
+
+ *out_op_id_p = DUMMY_OPNUM;
+
+
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+
+static int
+tas_bstream_flush (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_bstream_flush\n");
+
+ *out_op_id_p = DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+/*
+ * Keyval operations:
+ */
+
+static int
+tas_keyval_read (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_keyval_s * key_p,
+ TROVE_keyval_s * val_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * out_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+
+ if(key_p->buffer_sz == 3 && strncmp(key_p->buffer,"de",3)==0){
+ gossip_debug(GOSSIP_TROVE_DEBUG,"lookup dir\n");
+ }
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_read flags:%d handle:%lld key-length:%d key:%s CollID:%d\n", flags, lld(handle), key_p->buffer_sz, (char *)key_p->buffer, coll_id);
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ /* abort */
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ /* got handle; search for keyval */
+ keyvalpair * found_pair = findPairForCacheEntry(found,key_p);
+ if(found_pair == NULL){
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ if( val_p->buffer_sz < found_pair->val.buffer_sz ){
+ gossip_debug(GOSSIP_TROVE_DEBUG, " WARNING buffer to small tas read-keyval got:%d need:%d \n",
+ val_p->buffer_sz, found_pair->val.buffer_sz);
+ memcpy(val_p->buffer,found_pair->val.buffer, val_p->buffer_sz);
+ val_p->read_sz = val_p->buffer_sz;
+ }else{
+ memcpy(val_p->buffer,found_pair->val.buffer, found_pair->val.buffer_sz);
+ val_p->read_sz = found_pair->val.buffer_sz;
+ }
+ gen_mutex_unlock(&tas_meta_mutex);
+ *out_op_id_p = DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_write (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_keyval_s * key_p,
+ TROVE_keyval_s * val_p,
+ TROVE_ds_flags flags, // see trove.h
+ TROVE_vtag_s * inout_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_write flags:%d handle:%lld val-length:%d key:%s CollID:%d\n",
+ flags, lld(handle), val_p->buffer_sz, (char*) key_p->buffer, coll_id);
+
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ /* If TROVE_NOOVERWRITE flag was set, make sure that we don't create the
+ * key if it exists see dbpf-keyval.c*/
+ //TROVE_NOOVERWRITE
+ /* if TROVE_ONLYOVERWRITE flag was set, make sure that the key exists
+ * before overwriting it */
+ //TROVE_ONLYOVERWRITE
+ //TROVE_SYNC
+
+ //got handle; search for keyval
+ keyvalpair * found_pair = NULL;
+
+ /*
+ * findPairForCacheEntry(found,key_p)
+ * it should never happen that a keyval exist for the list implementation
+ * The old value will be keeped and not be overwritten, advantages:
+ * faster because possible existing old key need not to be found
+ * history, see how keys are manipulated
+ * Disadvantage: more memory usage, slow down search for other keys
+ * if multiple exist....
+ */
+
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ found_pair=findPairForCacheEntry(found,key_p);
+ #endif
+
+ if(found_pair == NULL){
+ found_pair = (keyvalpair *) malloc(sizeof(keyvalpair));
+ found_pair->key.buffer = malloc(key_p->buffer_sz);
+ found_pair->key.buffer_sz = key_p->buffer_sz;
+ memcpy(found_pair->key.buffer,key_p->buffer,key_p->buffer_sz);
+
+ insertKeyValPair(found,found_pair);
+
+ found->k_size++;
+ }else{ //free memory first:
+ free(found_pair->val.buffer);
+ }
+
+ found_pair->val.buffer = malloc(val_p->buffer_sz);
+ found_pair->val.buffer_sz = val_p->buffer_sz;
+ memcpy(found_pair->val.buffer,val_p->buffer,val_p->buffer_sz);
+
+
+ gen_mutex_unlock(&tas_meta_mutex);
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_remove (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_keyval_s *key_p,
+ TROVE_keyval_s *val_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s *vtag,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_remove\n");
+
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ //got handle; search for keyval
+ if(! removePairOfCacheEntry(found,key_p)){
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ found->k_size--;
+
+ gen_mutex_unlock(&tas_meta_mutex);
+ *out_op_id_p = DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_validate (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * inout_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_validate\n");
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_validate not implemented\n");
+ *out_op_id_p = DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+inline static void keyValCopy(TROVE_keyval_s * out_key_array,
+ TROVE_keyval_s * out_val_array, int i, keyvalpair * pair){
+ int max_read_size = pair->key.buffer_sz;
+ if(out_key_array[i].buffer_sz < max_read_size){
+ max_read_size = out_key_array[i].buffer_sz;
+ }
+ out_key_array[i].read_sz = max_read_size;
+ memcpy(out_key_array[i].buffer,pair->key.buffer,
+ max_read_size);
+
+ max_read_size = pair->val.buffer_sz;
+ if(out_val_array[i].buffer_sz < max_read_size){
+ max_read_size = out_val_array[i].buffer_sz;
+ }
+ out_val_array[i].read_sz = max_read_size;
+ memcpy(out_val_array[i].buffer,pair->val.buffer,
+ max_read_size);
+}
+
+/*
+ * reads count keyword/value pairs from the provided logical position in the
+ * keyval space.
+ */
+static int tas_keyval_iterate (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_position * inout_position_p,
+ TROVE_keyval_s * out_key_array,
+ TROVE_keyval_s * out_val_array,
+ int *inout_count_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * inout_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_iterate\n");
+
+ if(*inout_position_p == TROVE_ITERATE_START){
+ *inout_position_p =0;
+ }
+
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+ if(found == NULL){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ int32_t in_read_count = *inout_count_p;
+ int32_t in_read_position = *inout_position_p;
+
+ int32_t remainingKeyValues = found->k_size - in_read_position;
+ gossip_debug(GOSSIP_TROVE_DEBUG," read_count %d read_pos %d remain %d \n",in_read_count,in_read_position,remainingKeyValues);
+ //set read count, if less keys available
+ if(remainingKeyValues < in_read_count){
+ *inout_count_p = remainingKeyValues; //number of keypairs to read
+ in_read_count = remainingKeyValues;
+ }
+ *inout_position_p= in_read_position + *inout_count_p;
+ if(*inout_count_p <= 0){
+ gen_mutex_unlock(&tas_meta_mutex);
+ *inout_count_p = 0;
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+
+ gossip_debug(GOSSIP_TROVE_DEBUG," read_count %d read_pos %d remain %d \n",in_read_count,in_read_position,remainingKeyValues);
+
+ //skip to in_read_position...
+ keyvalpair * found_pair;
+
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ //go to leftmost node:
+ found_pair=found->pairList;
+ #elif defined(TAS_USE_QUEUE)
+ found_pair=found->pairs;
+ #endif
+
+ for(; in_read_position > 0 ;
+ found_pair = found_pair->next,in_read_position--);
+
+ //fill the array...
+ int i=0;
+ for( ; i < in_read_count ;
+ found_pair = found_pair->next,i++){
+ keyValCopy(out_key_array, out_val_array,i ,found_pair );
+ }
+
+ gen_mutex_unlock(&tas_meta_mutex);
+
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_keyval_iterate_keys (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_position * inout_position_p,
+ TROVE_keyval_s * out_key_array,
+ int *inout_count_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * vtag,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_iterate_keys not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_iterate_keys\n");
+ return 1;
+}
+
+static int
+tas_keyval_read_list (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_keyval_s * key_array,
+ TROVE_keyval_s * val_array,
+ TROVE_ds_state *err_array,
+ int count,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * out_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_read_list not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_read_list\n");
+ return 1;
+}
+
+static int
+tas_keyval_write_list (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_keyval_s * key_array,
+ TROVE_keyval_s * val_array,
+ int count,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * inout_vtag,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_write_list not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_write_list\n");
+ return 1;
+}
+
+static int
+tas_keyval_flush (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_keyval_flush not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_keyval_flush\n");
+ return 1;
+}
+
+
+
+static int
+tas_dspace_create (TROVE_coll_id coll_id,
+ TROVE_handle_extent_array * extent_array,
+ TROVE_handle * handle,
+ TROVE_ds_type type,
+ TROVE_keyval_s * hint, /* TODO: figure out what this is! */
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ if (!extent_array || (extent_array->extent_count < 1))
+ {
+ return -TROVE_EINVAL;
+ }
+
+ TROVE_extent cur_extent= extent_array->extent_array[0];
+ TROVE_handle new_handle=TROVE_HANDLE_NULL;
+ /* check if we got a single specific handle */
+ if ((extent_array->extent_count == 1) &&
+ (cur_extent.first == cur_extent.last))
+ {
+ /*
+ check if we MUST use the exact handle value specified;
+ if caller requests a specific handle, honor it
+ */
+ if (flags & TROVE_FORCE_REQUESTED_HANDLE)
+ {
+ new_handle = * handle;
+ if( new_handle < handleSeperator ){
+ if(aktHandleFirst <= new_handle){
+ aktHandleFirst = new_handle+1;
+ }
+ }else{
+ if(aktHandleSecond <= new_handle){
+ aktHandleSecond = new_handle+1;
+ }
+ }
+ }
+ else if (cur_extent.first == TROVE_HANDLE_NULL)
+ {
+ /*
+ if we got TROVE_HANDLE_NULL, the caller doesn't care
+ where the handle comes from
+ */
+ printf("%llu cur_extent.first == TROVE_HANDLE_NULL\n",*handle);
+ //new_handle = getArbitraryHandle(coll_id,*handle);
+ }
+ }
+ else
+ {
+ /*
+ otherwise, we have to try to allocate a handle from
+ the specified range that we're given
+ Ignore that circumstance for now !!!!
+ */
+
+ if( cur_extent.last < handleSeperator ){
+ new_handle = aktHandleFirst;
+ aktHandleFirst++;
+ }else{
+ new_handle = aktHandleSecond;
+ aktHandleSecond++;
+ }
+ }
+
+ *handle = new_handle;
+
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_create Type:%d flag:%d extent_count->%u new_handle->%lld cur_extent.first:%lld cur_extent.last:%lld\n",
+ type, flags,
+ extent_array->extent_count,
+ lld(new_handle),lld(cur_extent.first),lld(cur_extent.last) );
+ /*
+ if we got a zero handle, we're either completely out of handles
+ -- or else something terrible has happened
+ */
+ if (new_handle == TROVE_HANDLE_NULL)
+ {
+ gossip_err("Error: handle allocator returned a zero handle.\n");
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOSPC;
+ }
+
+ handleCache * found=NULL;
+ #ifndef NO_SAVE_CREATION
+ found = findCacheEntry(new_handle,coll_id);
+ #endif
+ if(found == NULL){ //create new metadata structure...
+ found = allocateNewCacheElement();
+ insertCacheElement(found,new_handle,coll_id, type);
+ }else{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "ERROR handle exists already %lld\n",lld(new_handle));
+ }
+ gen_mutex_unlock(&tas_meta_mutex);
+
+ *out_op_id_p = DUMMY_OPNUM;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int tas_dspace_remove (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_remove\n");
+
+ gen_mutex_lock(&tas_meta_mutex);
+
+ if(! removeCacheEntry(handle,coll_id)){ //abort
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_ENOENT;
+ }
+
+ gen_mutex_unlock(&tas_meta_mutex);
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_dspace_iterate_handles (TROVE_coll_id coll_id,
+ TROVE_ds_position * position_p,
+ TROVE_handle * handle_array,
+ int *inout_count_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s * vtag,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_iterate_handles \n");
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_dspace_iterate_handles not implemented\n");
+ *out_op_id_p = DUMMY_OPNUM;
+ return 1;
+}
+
+
+
+static int
+tas_dspace_verify (TROVE_coll_id coll_id, TROVE_handle handle, TROVE_ds_type * type, /* TODO: define types! */
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_verify\n");
+ *out_op_id_p = DUMMY_OPNUM;
+
+ handleCache * found=findCacheEntry(handle,coll_id);
+
+ if(found == NULL){
+ return -TROVE_ENOENT;
+ }
+
+ *type = found->attributes->type;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_dspace_getattr (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_attributes_s * ds_attr_p,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id, TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_getattr handle: %lld flags: %d \n", lld(handle), flags);
+
+ gen_mutex_lock(&tas_meta_mutex);
+
+ handleCache * found=findCacheEntry(handle,coll_id);
+
+ if(found != NULL){
+ *out_op_id_p = DUMMY_OPNUM;
+ ds_attr_p->type = found->attributes->type;
+ ds_attr_p->handle = handle;
+ ds_attr_p->fs_id = found->fs_id;
+ ds_attr_p->uid= found->attributes->uid;
+ ds_attr_p->gid= found->attributes->gid;
+ ds_attr_p->mode= found->attributes->mode;
+ ds_attr_p->ctime= found->attributes->ctime;
+ ds_attr_p->mtime= found->attributes->mtime;
+ ds_attr_p->atime= found->attributes->atime;
+ ds_attr_p->dfile_count= found->attributes->dfile_count;
+ ds_attr_p->dist_size= found->attributes->dist_size;
+
+ ds_attr_p->b_size = found->b_size;
+ //ds_attr_p->k_size = found->k_size; TODO FIXME
+
+ gen_mutex_unlock(&tas_meta_mutex);
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+ gen_mutex_unlock(&tas_meta_mutex);
+
+ return -1;
+}
+
+static int
+tas_dspace_getattr_list(
+ TROVE_coll_id coll_id,
+ int nhandles,
+ TROVE_handle *handle_array,
+ TROVE_ds_attributes_s *ds_attr_p,
+ TROVE_ds_state *error_array,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p)
+{
+ return -TROVE_EINVAL;
+}
+
+static int
+tas_dspace_setattr (TROVE_coll_id coll_id,
+ TROVE_handle handle,
+ TROVE_ds_attributes_s *ds_attr_p,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_setattr handle:%lld uid:%d gid:%d\n",
+ lld(handle),ds_attr_p->uid,ds_attr_p->gid);
+
+ gen_mutex_lock(&tas_meta_mutex);
+ handleCache * found=findCacheEntry(handle,coll_id);
+
+ if(found != NULL){
+ found->attributes->type = ds_attr_p->type;
+ if( found->handle != handle){
+ //found->fs_id != ds_attr_p->fs_id ||
+ printf("Fatal ERROR tas_dspace_setattr it is not allowed "
+ "to change the handle number or filesystem id!!\n");
+ exit(1);
+ }
+ found->attributes->uid=ds_attr_p->uid;
+ found->attributes->gid=ds_attr_p->gid;
+ found->attributes->mode = ds_attr_p->mode;
+ found->attributes->ctime = ds_attr_p->ctime;
+ found->attributes->mtime = ds_attr_p->mtime;
+ found->attributes->atime = ds_attr_p->atime;
+ found->attributes->dfile_count = ds_attr_p->dfile_count;
+ found->attributes->dist_size = ds_attr_p->dist_size;
+
+ //take special care for:
+ //found->b_size = ds_attr_p->b_size;
+ //found->k_size = ds_attr_p->k_size;
+
+ gen_mutex_unlock(&tas_meta_mutex);
+
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+
+ gen_mutex_unlock(&tas_meta_mutex);
+ return -TROVE_EEXIST;
+}
+
+static int
+tas_dspace_cancel (TROVE_coll_id coll_id,
+ TROVE_op_id ds_id,
+ TROVE_context_id context_id)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_dspace_cancel not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_dspace_cancel\n");
+ return 1;
+}
+
+static int
+tas_dspace_test (TROVE_coll_id coll_id,
+ TROVE_op_id ds_id,
+ TROVE_context_id context_id,
+ int *out_count_p,
+ TROVE_vtag_s *vtag,
+ void **returned_user_ptr_p,
+ TROVE_ds_state *out_state_p,
+ int max_idle_time_ms)
+{
+ return dbpf_dspace_ops.dspace_test(coll_id, ds_id,
+ context_id, out_count_p, vtag, returned_user_ptr_p,
+ out_state_p, max_idle_time_ms);
+}
+
+static int
+tas_dspace_testsome (TROVE_coll_id coll_id,
+ TROVE_context_id context_id,
+ TROVE_op_id *ds_id_array,
+ int *inout_count_p,
+ int *out_index_array,
+ TROVE_vtag_s *vtag_array,
+ void **returned_user_ptr_array,
+ TROVE_ds_state *out_state_array,
+ int max_idle_time_ms)
+{
+ return dbpf_dspace_ops.dspace_testsome(coll_id, context_id, ds_id_array,
+ inout_count_p,out_index_array,vtag_array,returned_user_ptr_array,
+ out_state_array,max_idle_time_ms);
+}
+
+static int
+tas_dspace_testcontext (TROVE_coll_id coll_id,
+ TROVE_op_id *ds_id_array,
+ int *inout_count_p,
+ TROVE_ds_state *state_array,
+ void** user_ptr_array,
+ int max_idle_time_ms,
+ TROVE_context_id context_id)
+{
+ return dbpf_dspace_ops.dspace_testcontext(coll_id, ds_id_array,
+ inout_count_p, state_array, user_ptr_array, max_idle_time_ms, context_id);
+}
+
+static int
+tas_collection_setinfo (TROVE_method_id method_id,
+ TROVE_coll_id coll_id,
+ TROVE_context_id context_id,
+ int option,
+ void *parameter)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_setinfo %d, \n", option);
+ int ret = 1; //-TROVE_EINVAL;
+
+ switch(option)
+ {
+ case TROVE_COLLECTION_HANDLE_RANGES:{
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_HANDLE_RANGES: %s %d\n",(char *)parameter,context_id);
+
+ //set minimum handles according to handle range...
+ TROVE_handle minMetaHandle,maxMetaHandle,minDatafileHandle,maxDatafileHandle;
+ ret = sscanf((char *)parameter, "%llu-%llu,%llu-%llu",
+ & minMetaHandle,& maxMetaHandle,& minDatafileHandle, & maxDatafileHandle );
+ if( ret == 4 ){ //set only once !!!
+ aktHandleFirst = minMetaHandle;
+ aktHandleSecond = minDatafileHandle;
+ handleSeperator = minDatafileHandle;
+ if( aktHandleSecond < aktHandleFirst ){
+ printf("error in parsing handle range !!! %s \n", (char*) parameter);
+ exit(1);
+ }
+ }else if(ret == 2){ //only datafiles OR metadata !
+ ret = sscanf((char *)parameter, "%llu-%llu",
+ & minDatafileHandle, & maxDatafileHandle );
+ aktHandleFirst = minDatafileHandle;
+ aktHandleSecond = 0;
+ handleSeperator = maxDatafileHandle+1;
+ }else{
+ return 0;
+ }
+
+ FILE * file;
+ printf("read metadata from file: %s \n",storageName);
+ file = fopen (storageName, "r");
+ if(file == 0){
+ printf(" no metadata information available \n");
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+
+ PVFS_size i;
+
+ handleCache * found=(handleCache *) malloc(sizeof(handleCache));
+ found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+
+ ret=fscanf(file,"Count:%llu\n",& meta_count);
+ if(ret != 1){
+ meta_count = 0;
+ free(found);
+
+ fclose(file);
+ return RETURN_IMMEDIATE_COMPLETE;
+ }
+ for(i=0; i < meta_count; i++){
+ bzero(found->attributes,sizeof(storeAttributes));
+ ret=fscanf(file,"Handle:%llu Type:%u FS-ID:%d UID:%u GID:%u MODE:%u DFILE_COUNT:%u DIST_SIZE:%u BYTE_SIZE:%llu KEYS:%llu CTIME:%llu MTIME:%llu ATIME:%llu\n",
+ &(found->handle), (int*)&(found->attributes->type),&found->fs_id,
+ &found->attributes->uid, &found->attributes->gid,
+ &found->attributes->mode, &found->attributes->dfile_count,
+ &found->attributes->dist_size, &found->b_size, &found->k_size,
+ &found->attributes->ctime, &found->attributes->mtime, &found->attributes->atime);
+
+ if(ret == 13){
+ if( found->handle < handleSeperator ){
+ if(aktHandleFirst <= found->handle){
+ aktHandleFirst = found->handle+1;
+ }
+ }else if(aktHandleSecond == 0) {
+ printf("ERROR aktHandleSecond = 0 \n");
+ }else{
+ if(aktHandleSecond <= found->handle){
+ aktHandleSecond = found->handle+1;
+ }
+ }
+
+ keyvalpair * akt_pair;
+
+ #if defined(TAS_USE_RED_BLACKTREE)
+ insertKeyIntoTree((RBData*) found,tas_meta_cache);
+ found->pairList = NULL;
+ found->pairTree = NULL;
+ #elif defined(TAS_USE_QUEUE)
+ found->pairs = NULL;
+ pushFrontKey(found->handle,found, tas_meta_cache);
+ #endif
+
+ akt_pair = NULL;
+
+ //read key->value pairs:
+ int i;
+ for(i=0; i < found->k_size; i++){
+ akt_pair = (keyvalpair *) malloc(sizeof(keyvalpair));
+
+ int key_sz, val_sz;
+ fscanf(file," Key-size:%d Val-size:%d ",& key_sz, & val_sz);
+ akt_pair->key.buffer_sz = key_sz;
+ akt_pair->val.buffer_sz = val_sz;
+ //
+ akt_pair->key.buffer = malloc(key_sz);
+ akt_pair->val.buffer = malloc(val_sz);
+ fread(akt_pair->key.buffer,1,key_sz, file);
+ fscanf(file," Value:");
+ fread(akt_pair->val.buffer,1,val_sz, file);
+ fscanf(file,"\n");
+
+ insertKeyValPair(found,akt_pair);
+ }
+
+ found = (handleCache *) malloc(sizeof(handleCache));
+ found->attributes = (storeAttributes *) malloc(sizeof(storeAttributes));
+ }else{
+ printf(" Less elements than expected read %llu of %llu \n", i, meta_count);
+ break;
+ }
+ }
+ free(found->attributes);
+ free(found);
+
+ fclose(file);
+
+ ret = 1;
+ break;
+ }case TROVE_COLLECTION_HANDLE_TIMEOUT:
+ ret = 1; // trove_set_handle_timeout(coll_id, context_id, (struct timeval *)parameter);
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_HANDLE_TIMEOUT\n");
+ break;
+ case TROVE_COLLECTION_ATTR_CACHE_KEYWORDS:
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_KEYWORDS: %s\n", (char *) parameter);
+ ret = 1;
+ break;
+ case TROVE_COLLECTION_ATTR_CACHE_SIZE:
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_SIZE: %d\n",*((int *)parameter));
+ ret = 1;
+ break;
+ case TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS:
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_MAX_NUM_ELEMS: %d\n",*((int *)parameter));
+ ret = 1;
+ break;
+ case TROVE_COLLECTION_ATTR_CACHE_INITIALIZE:
+ gossip_debug(GOSSIP_TROVE_DEBUG,"TROVE_COLLECTION_ATTR_CACHE_INITIALIZE \n");
+ ret = 1;
+ break;
+ }
+
+ return ret;
+}
+
+static int
+tas_collection_getinfo (TROVE_coll_id coll_id,
+ TROVE_context_id context_id,
+ TROVE_coll_getinfo_options opt,
+ void *parameter)
+{
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_getinfo not implemented\n");
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_getinfo\n");
+ return 1;
+}
+
+static int
+tas_collection_seteattr (TROVE_coll_id coll_id,
+ TROVE_keyval_s * key_p,
+ TROVE_keyval_s * val_p,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_seteattr Key:%s \n", (char *)key_p->buffer);
+ if(key_p->buffer_sz == 12 && strncmp(key_p->buffer,"root_handle",12)==0){
+ /* rootHandle = *((TROVE_handle *) val_p->buffer); */
+ }else
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_seteattr not implemented\n");
+ return 1;
+}
+
+static int
+tas_collection_geteattr (TROVE_coll_id coll_id,
+ TROVE_keyval_s * key_p,
+ TROVE_keyval_s * val_p,
+ TROVE_ds_flags flags,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id * out_op_id_p)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_geteattr Key:%s\n", (char *)key_p->buffer);
+ fprintf(stderr, "TROVE MODULE TAS: FUNCTION tas_collection_geteattr not implemented\n");
+ return 1;
+}
+
+
+static int
+tas_open_context (TROVE_coll_id coll_id, TROVE_context_id * context_id)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_collection_geteattr\n");
+ *context_id = 1;
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+static int
+tas_close_context (TROVE_coll_id coll_id, TROVE_context_id context_id)
+{
+ gossip_debug(GOSSIP_TROVE_DEBUG, "Tas tas_close_context\n");
+ return RETURN_IMMEDIATE_COMPLETE;
+}
+
+
+
+/*
+ * Function pointers to the tas implementation are set in trove-mgmt.c
+ */
+
+struct TROVE_mgmt_ops tas_mgmt_ops = {
+ .initialize = tas_initialize,
+ .finalize = tas_finalize,
+ tas_storage_create,
+ tas_storage_remove,
+ tas_collection_create,
+ tas_collection_remove,
+ tas_collection_lookup,
+ tas_collection_iterate,
+ tas_collection_setinfo,
+ tas_collection_getinfo,
+ tas_collection_seteattr,
+ tas_collection_geteattr
+};
+
+struct TROVE_dspace_ops tas_dspace_ops = {
+ tas_dspace_create,
+ tas_dspace_remove,
+ tas_dspace_iterate_handles,
+ tas_dspace_verify,
+ tas_dspace_getattr,
+ tas_dspace_getattr_list,
+ tas_dspace_setattr,
+ tas_dspace_cancel,
+ tas_dspace_test,
+ tas_dspace_testsome,
+ tas_dspace_testcontext
+};
+
+
+struct TROVE_keyval_ops tas_keyval_ops =
+{
+ tas_keyval_read,
+ tas_keyval_write,
+ tas_keyval_remove,
+ tas_keyval_validate,
+ tas_keyval_iterate,
+ tas_keyval_iterate_keys,
+ tas_keyval_read_list,
+ tas_keyval_write_list,
+ tas_keyval_flush
+};
+
+
+struct TROVE_bstream_ops tas_bstream_ops = {
+ tas_bstream_read_at,
+ tas_bstream_write_at,
+ tas_bstream_resize,
+ tas_bstream_validate,
+ tas_bstream_read_list,
+ tas_bstream_write_list,
+ tas_bstream_flush
+};
+
+struct TROVE_context_ops tas_context_ops = {
+ tas_open_context,
+ tas_close_context
+};
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas-queue.h 2006-10-29 08:19:13.000000000 -0500
@@ -0,0 +1,59 @@
+/*
+ * Author Julian Kunkel
+ * Simple double linked list with puts object
+ * to front of list when it is accessed.
+ */
+
+#ifndef TASQUEUE_H_
+#define TASQUEUE_H_
+
+#include<stdlib.h>
+
+/*
+ * Print a warning if lookup takes more than WARNING_NUM iterations
+ * Performance decreases the more elements need to be inspected...
+ */
+#define WARNING_NUM 40
+
+typedef int64_t queueKey;
+
+struct tas_queue_elem_{
+ struct tas_queue_elem_ * next;
+ struct tas_queue_elem_ * prev;
+
+ queueKey key;
+ void * data;
+};
+
+typedef struct tas_queue_elem_ queue_elem;
+
+//currently queue and queue_elem are equal,
+//but maybe some elements will be added later so:
+typedef struct tas_queue_elem_* tas_queue;
+
+
+//functions:
+tas_queue *newTasQueue(void);
+
+queue_elem * lookupQueue(queueKey key, tas_queue* queue);
+
+void deleteElementFromList(queue_elem * elem, tas_queue* queue);
+void * deleteKeyFromList(queueKey key, tas_queue* queue);
+//lookup key and replace data if possible, else pushfront.
+//returns replaced data (if key exists)
+//Queue element is put to head of queue
+void * insertKeyIntoQueue(queueKey key, void * data, tas_queue* queue);
+queue_elem * pushFrontKey(queueKey key, void * data, tas_queue* queue);
+
+//remove data from current possition and push it to head of queue,
+void relinkFront(queue_elem * elem, tas_queue* queue);
+
+
+#define iterate_tas_queue(func,elem,queue){ \
+ queue_elem * elem=*queue; \
+ for(; elem != NULL ; elem = elem->next){ \
+ (func); \
+ } \
+}
+
+#endif /*TASQUEUE_H_*/
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ tas-handle-manager.c 2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,327 @@
+#include "tas-handle-manager.h"
+#include <stdio.h>
+#include <assert.h>
+
+static gen_mutex_t tas_handlemanager_mutex;
+red_black_tree * collectionTree=NULL;
+static int handleManagerInititalised = 0;
+
+struct tas_handle_manager_{
+ red_black_tree * handleTree;
+
+ gen_mutex_t tree_mutex;
+};
+
+typedef struct tas_handle_manager_ tas_handle_manager;
+
+/*
+ * Tree for mapping the collection id to the handle_managers
+ */
+struct tas_handle_manager_collections_{
+ TROVE_coll_id coll_id;
+ TROVE_handle seperatorHandleNo;
+ tas_handle_manager * firstHandleType;
+ tas_handle_manager * secondHandleType;
+};
+
+typedef struct tas_handle_manager_collections_ tas_handle_manager_collections;
+
+/*
+ * Comparision functions:
+ */
+static int compareCollections(RBData * data, RBKey * key){
+ TROVE_coll_id ikey1=((tas_handle_manager_collections*)data)->coll_id;
+ TROVE_coll_id ikey2=*((TROVE_coll_id *) key);
+ if(ikey1 > ikey2 ){ //take right neigbour
+ return +1;
+ }else if(ikey1 < ikey2){//take left neigbour
+ return -1;
+ }
+ return 0;
+}
+
+static int compareCollections2(RBData * data, RBData * data2){
+ TROVE_coll_id ikey1=((tas_handle_manager_collections*)data)->coll_id;
+ TROVE_coll_id ikey2=((tas_handle_manager_collections*)data2)->coll_id;
+ if(ikey1 > ikey2 ){ //take right neigbour
+ return +1;
+ }else if(ikey1 < ikey2){//take left neigbour
+ return -1;
+ }
+ return 0;
+}
+
+//for comparision of handleRanges:
+static int compareRanges2(RBData * data, RBData * data2){
+ tas_handleRange * ikey1=(tas_handleRange *) data;
+ tas_handleRange * ikey2=(tas_handleRange *) data2;
+
+ //is no overlapping possible ?
+ if(ikey1->last < ikey2->first ){
+ return -1;
+ }else if(ikey1->first > ikey2->last){
+ return +1;
+ }
+ //it does overlapp !!!
+ return 0;
+}
+
+/*
+ * Helper functions:
+ */
+static inline void insertHandleExtend(tas_handle_manager* manager,TROVE_handle lowerBound, TROVE_handle upperBound){
+ tas_handleRange* handles = (tas_handleRange*) malloc(sizeof(tas_handleRange));
+ handles->first = lowerBound;
+ handles->last = upperBound;
+ insertKeyIntoTree(handles,manager->handleTree);
+};
+
+static inline tas_handle_manager* insertHandleRangeManager(TROVE_handle lowerBound, TROVE_handle upperBound) {
+ tas_handle_manager* manager=(tas_handle_manager*) malloc(sizeof(tas_handle_manager));
+ gen_mutex_init(& manager->tree_mutex);
+ manager->handleTree = newRedBlackTree(compareRanges2,compareRanges2);
+ insertHandleExtend(manager, lowerBound,upperBound);
+ return manager;
+}
+
+static inline tas_handle_manager* lookupCollection(TROVE_coll_id coll_id, TROVE_handle handle){
+ tree_node * node =lookupTree(& coll_id,collectionTree);
+ if(node == NULL) return NULL;
+ tas_handle_manager_collections * collection = (tas_handle_manager_collections*) node->data;
+ if(handle > collection->seperatorHandleNo){
+ return collection->secondHandleType;
+ }else{
+ return collection->firstHandleType;
+ }
+}
+
+/*
+ *
+ */
+void initialiseHandleManager(void){
+ if(!handleManagerInititalised){
+ gen_mutex_init(& tas_handlemanager_mutex);
+ gen_mutex_lock(& tas_handlemanager_mutex);
+ collectionTree = newRedBlackTree(compareCollections,compareCollections2);
+ gen_mutex_unlock(& tas_handlemanager_mutex);
+ }
+ handleManagerInititalised = 1;
+}
+
+
+void initialiseCollection(TROVE_coll_id coll_id,TROVE_handle lowerBound,
+ TROVE_handle upperBound,TROVE_handle lowerBound2, TROVE_handle upperBound2){
+
+ if(handleManagerInititalised != 1){
+ printf("ERROR Handle manager is not initialised !\n");
+ assert(0);
+ }
+ gen_mutex_lock(& tas_handlemanager_mutex);
+ if( lookupTree(& coll_id, collectionTree) != NULL){
+ printf("ERROR: collection already initialised !\n");
+ assert(0);
+ }
+
+ tas_handle_manager_collections * coll_manager =
+ (tas_handle_manager_collections*) malloc(sizeof(tas_handle_manager_collections));
+ coll_manager->coll_id = coll_id;
+ coll_manager->seperatorHandleNo = upperBound;
+
+ coll_manager->firstHandleType = insertHandleRangeManager(lowerBound, upperBound);
+ if(lowerBound2 == 0){
+ coll_manager->secondHandleType = NULL;
+ }else{
+ coll_manager->secondHandleType = insertHandleRangeManager(lowerBound2, upperBound2);
+ }
+ coll_manager->coll_id = coll_id;
+
+ insertKeyIntoTree(coll_manager,collectionTree);
+ gen_mutex_unlock(& tas_handlemanager_mutex);
+}
+
+/*
+ * Remove handle from available handles and return it !
+ * Assume the handles will be never used up....
+ */
+TROVE_handle getHandle(TROVE_coll_id coll_id,TROVE_handle_extent_array * extends){
+ tas_handle_manager* manager = lookupCollection(coll_id, extends->extent_array[0].first);
+ if(manager == NULL) return TROVE_HANDLE_NULL;
+
+ int i;
+ TROVE_handle handle=TROVE_HANDLE_NULL;
+ tree_node * node;
+ gen_mutex_lock(& manager->tree_mutex);
+ for(i=0; i < extends->extent_count ; i++){
+ node= lookupTree(& extends->extent_array[i],manager->handleTree);
+ if(node != NULL){
+ //found a overlapping extend, take first :)
+
+ tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+ handle=nodeRange->first;
+ nodeRange->first++;
+ if(nodeRange->first > nodeRange->last){
+ //remove handle range:
+ deleteNodeFromTree(node,manager->handleTree);
+ free(nodeRange);
+ }
+
+ break;
+ }
+ }
+ gen_mutex_unlock(& manager->tree_mutex);
+ return handle;
+}
+
+TROVE_handle getArbitraryHandle(TROVE_coll_id coll_id,TROVE_handle handleSep){
+ tas_handle_manager* manager = lookupCollection(coll_id,handleSep);
+ if(manager == NULL) return TROVE_HANDLE_NULL;
+
+ TROVE_handle handle=TROVE_HANDLE_NULL;
+ gen_mutex_lock(& manager->tree_mutex);
+ tas_handleRange * nodeRange = (tas_handleRange *) manager->handleTree->head->data;
+ handle=nodeRange->first;
+ nodeRange->first++;
+ if(nodeRange->first > nodeRange->last){
+ //remove handle range:
+ deleteNodeFromTree(manager->handleTree->head,manager->handleTree);
+ free(nodeRange);
+ }
+ gen_mutex_unlock(& manager->tree_mutex);
+ return handle;
+}
+
+/*
+ * May split handle range into two parts
+ */
+TROVE_handle getfixedHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+ tas_handle_manager* manager = lookupCollection(coll_id, handle);
+ if(manager == NULL) return TROVE_HANDLE_NULL;
+ PVFS_handle_extent handleExtend;
+ handleExtend.first = handle;
+ handleExtend.last = handle;
+
+ tree_node * node;
+ gen_mutex_lock(& manager->tree_mutex);
+ node= lookupTree(& handleExtend, manager->handleTree);
+ if(node != NULL){
+ //found a extend which contains handle
+ tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+
+ if(nodeRange->first < handle && nodeRange->last > handle){
+ //split handle range into two parts ...
+ //change data of current node (does not destroy tree order)
+ TROVE_handle tmpRangeFirst = nodeRange->first;
+ nodeRange->first = handle+1;
+ insertHandleExtend(manager, tmpRangeFirst, handle-1);
+ }else{
+ if(nodeRange->first == handle){
+ nodeRange->first++;
+ }else if(nodeRange->last == handle){
+ nodeRange->last--;
+ }
+ if(nodeRange->first > nodeRange->last){
+ //remove handle range:
+ deleteNodeFromTree(node,manager->handleTree);
+ free(nodeRange);
+ }
+ }
+ gen_mutex_unlock(& manager->tree_mutex);
+ return handle;
+ }
+ gen_mutex_unlock(& manager->tree_mutex);
+ return TROVE_HANDLE_NULL;
+}
+
+/*
+ * Free the handle by adding it to a handle range !
+ * Maybe fix entries together in handle manager...
+ */
+int freeHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+ return freeHandleRange(coll_id, handle,handle);
+}
+
+int freeHandleRange(TROVE_coll_id coll_id,TROVE_handle lowerhandle,TROVE_handle upperhandle){
+ tas_handle_manager* manager = lookupCollection(coll_id, upperhandle);
+ if(manager == NULL) return -1;
+
+ tree_node * node;
+ PVFS_handle_extent handleExtend;
+ handleExtend.first = lowerhandle;
+ handleExtend.last = upperhandle;
+ gen_mutex_lock(& manager->tree_mutex);
+ node= lookupTree(& handleExtend, manager->handleTree);
+ if(node != NULL){
+ gen_mutex_unlock(& manager->tree_mutex);
+ //at least one handle in range is already free, this should not happen !
+ return -1;
+ }
+
+ //try to find near handles +- 1...
+ handleExtend.first = lowerhandle-1;
+ handleExtend.last = upperhandle+1;
+ node= lookupTree(& handleExtend, manager->handleTree);
+ if(node != NULL){
+ //found one merge them
+ tas_handleRange * nodeRange = (tas_handleRange *) node->data;
+ if(nodeRange->first == upperhandle+1){
+ nodeRange->first = lowerhandle;
+ }else if(nodeRange->last +1 == lowerhandle){
+ nodeRange->last = upperhandle;
+ }else{
+ assert(0);
+ }
+ //try to recursively merge entries
+ int changed = 1;
+ while(changed){
+ changed = 0;
+ tree_node * newnode;
+ //find left side extend:
+ handleExtend.first = nodeRange->first-1;
+ handleExtend.last = nodeRange->first-1;
+ newnode= lookupTree(& handleExtend, manager->handleTree);
+ if(newnode != NULL){
+ changed = 1;
+ tas_handleRange * nodeRangeNew = (tas_handleRange *) newnode->data;
+ nodeRange->first = nodeRangeNew->first;
+ deleteNodeFromTree(newnode,manager->handleTree);
+ }
+
+ //find right side extend:
+ handleExtend.first = nodeRange->last+1;
+ handleExtend.last = nodeRange->last+1;
+ newnode= lookupTree(& handleExtend, manager->handleTree);
+ if(newnode != NULL){
+ changed = 1;
+ tas_handleRange * nodeRangeNew = (tas_handleRange *) newnode->data;
+ nodeRange->last = nodeRangeNew->last;
+ deleteNodeFromTree(newnode,manager->handleTree);
+ }
+ }
+
+ }else{
+ //create a new single entry...
+ //this should be optimized later
+ insertHandleExtend(manager, lowerhandle, upperhandle);
+ }
+
+ gen_mutex_unlock(& manager->tree_mutex);
+
+
+ return 0;
+}
+
+int lookupHandle(TROVE_coll_id coll_id,TROVE_handle handle){
+ tas_handle_manager* manager = lookupCollection(coll_id, handle);
+ if(manager == NULL) return 0;
+
+ tree_node * node;
+ PVFS_handle_extent handleExtend;
+ handleExtend.first = handle;
+ handleExtend.last = handle;
+ gen_mutex_lock(& manager->tree_mutex);
+ node= lookupTree(& handleExtend, manager->handleTree);
+ gen_mutex_unlock(& manager->tree_mutex);
+ if(node == NULL) return 0;
+ return 1;
+}
+
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree.h 2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,60 @@
+/*
+ * Author: Julian Kunkel 2005
+ */
+#ifndef REDBLACKTREE_H_
+#define REDBLACKTREE_H_
+
+#include<stdlib.h>
+
+#ifndef TRUE
+ #define TRUE 1
+ #define FALSE 0
+#endif
+
+enum node_color{
+ RED=0,BLACK=1
+};
+
+typedef void RBData;
+typedef void RBKey;
+
+
+struct red_black_tree_node{
+ struct red_black_tree_node* parent;
+ struct red_black_tree_node* left;
+ struct red_black_tree_node* right;
+
+ RBData * data;
+ char color;
+};
+
+typedef struct red_black_tree_node tree_node;
+
+struct red_black_tree_{
+ tree_node * head;
+ int (*compare)(RBData * data, RBKey * key);
+ int (*compare2)(RBData * data, RBData * data2);
+};
+
+typedef struct red_black_tree_ red_black_tree;
+
+//functions
+tree_node* lookupTree(RBKey *key, red_black_tree* tree);
+red_black_tree *newRedBlackTree(int (*compare)(RBData * data, RBKey * key),int (*compare2)(RBData * data, RBData* data2));
+void freeEmptyRedBlackTree(red_black_tree ** tree);
+
+tree_node * insertKeyIntoTree(RBData * data, red_black_tree* tree);
+
+void deleteNodeFromTree(tree_node*node , red_black_tree* tree);
+//this function returns NULL if the node is deleted, or the node which changed position
+//this is necessary if the data contains references to the node :)
+tree_node * deleteNodeFromTree2(tree_node*node , red_black_tree* tree);
+
+
+void iterateRedBlackTree(void (*callback)(RBData* data, void* funcData), red_black_tree* tree,void* funcData);
+
+//several compare functions:
+int compareInt64(RBData * data, RBKey * key);
+
+#endif /*REDBLACKTREE_H_*/
+
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ module.mk.in 2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,10 @@
+DIR := src/io/trove/trove-tas
+LIBSRC += \
+ $(DIR)/red-black-tree.c
+
+SERVERSRC += \
+ $(DIR)/tas.c \
+ $(DIR)/red-black-tree.c \
+ $(DIR)/tas-handle-manager.c \
+ $(DIR)/tas-queue.c
+
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ red-black-tree.c 2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,368 @@
+/*
+ * Author: Julian Kunkel 2005
+ */
+
+#include "red-black-tree.h"
+
+int compareInt64(RBData * data, RBKey * key){
+ int64_t ikey1=*((int64_t *) data);
+ int64_t ikey2=*((int64_t *) key);
+
+ if(ikey1 > ikey2 ){ //take right neigbour
+ return +1;
+ }else if(ikey1 < ikey2){//take left neigbour
+ return -1;
+ }
+ return 0;
+}
+
+static void inorderWalkthrough(void (*callback)(RBData* data, void* funcData),tree_node* node,void* funcData){
+ if(node->left != NULL){
+ inorderWalkthrough(callback,node->left,funcData);
+ }
+ (*callback)(node->data,funcData);
+ if(node->right != NULL){
+ inorderWalkthrough(callback,node->right,funcData);
+ }
+}
+
+void iterateRedBlackTree(void (*callback)(RBData* data, void* funcData), red_black_tree* tree,void* funcData){
+ if(tree == NULL || tree->head == NULL)
+ return;
+ inorderWalkthrough(callback,tree->head,funcData);
+}
+
+//take care of memory managment of the data by yourself !
+tree_node* lookupTree(RBKey *key, red_black_tree* tree){
+ tree_node* actNode = tree->head;
+ while(actNode != NULL){
+ int compret=(tree->compare)(actNode->data,key);
+ if(compret > 0){ //take right neigbour
+ actNode = actNode->right;
+ }else if(compret < 0){//take left neigbour
+ actNode = actNode->left;
+ }else{ //found key
+ return actNode;
+ }
+ }
+
+ return NULL;
+}
+
+
+red_black_tree *newRedBlackTree(int (*compare)(RBData * data, RBKey * key),int (*compare2)(RBData * data, RBData* data2)){
+ red_black_tree * tree=malloc(sizeof(red_black_tree));
+ tree->head = NULL;
+ tree->compare = compare;
+ tree->compare2 = compare2;
+ return tree;
+}
+
+void freeEmptyRedBlackTree(red_black_tree ** tree){
+ if(*tree != NULL){
+ free(*tree);
+ *tree = NULL;
+ }
+}
+
+inline tree_node* nodesibling(tree_node * node){
+ tree_node* parent = node->parent;
+ if(parent == NULL) return NULL;
+ return parent->left == node ? parent->right : parent->left;
+}
+
+inline void rotateleft(tree_node* node, red_black_tree* tree){
+ tree_node* parent = node->parent;
+ tree_node* sibling = node->right;
+
+ if(parent != NULL){
+ if(parent->left == node){
+ parent->left = sibling;
+ }else{
+ parent->right = sibling;
+ }
+ }else{ //grandparent == NULL
+ tree->head = sibling;
+ }
+ sibling->parent = parent;
+ //connection to parent tree is now correct...
+ node->right = sibling->left;
+ if(sibling->left != NULL){
+ sibling->left->parent = node;
+ }
+
+ sibling->left = node;
+ node->parent = sibling;
+}
+
+inline void rotateright(tree_node* node, red_black_tree* tree){
+ tree_node* sibling = node->left;
+ if(node->parent != NULL){
+ tree_node* parent = node->parent;
+
+ if(parent->left == node){
+ parent->left = sibling;
+ }else{
+ parent->right = sibling;
+ }
+ }else{ //node is head of tree
+ tree->head = sibling;
+ }
+ sibling->parent = node->parent;
+ //connection to parent tree is now correct...
+
+ node->left = sibling->right;
+ if(sibling->right != NULL)
+ sibling->right->parent = node;
+
+ sibling->right = node;
+ node->parent = sibling;
+}
+
+
+//recursive function to recreate RB tree condition
+static void rebuildRBTree(red_black_tree* tree, tree_node* node){
+ if(node == tree->head){
+ node->color = BLACK;
+ return;
+ }
+ tree_node* parent = node->parent;
+ //No path from the root to a leaf may contain two consecutive nodes colored red
+ if( parent->color == RED ){ //condition not held !
+ //check cases !!! Decision depends on the grandpa of the new node position...
+ tree_node* grandparent = parent->parent;
+ //printf("Condition not held key:%d parent:%d grandpa:%d !\n",node->key, parent->key, grandparent->key);
+ tree_node* uncle=grandparent->right;
+ if(parent == grandparent->right){
+ uncle = grandparent->left;
+ }
+
+ if(uncle != NULL && uncle->color == RED){
+ //case LLr or LRb
+ grandparent->color = RED;
+ uncle->color = BLACK;
+ parent->color = BLACK;
+ //recursivly call function
+ rebuildRBTree(tree, grandparent);
+ return;
+ }
+ if(parent == grandparent->left){ //grandparent->color must be BLACK because parent is RED
+ if(parent->left == node){ //case LLb
+ rotateright(grandparent,tree);
+ grandparent->color = RED;
+ parent->color=BLACK;
+ return;
+ }else{//case LRb
+ rotateleft(parent,tree);
+ rotateright(grandparent,tree);
+ grandparent->color = RED;
+ node->color=BLACK;
+ return;
+ }
+ }else{
+ if(parent->left == node){ //case RLb
+ rotateright(parent,tree);
+ rotateleft(grandparent,tree);
+ grandparent->color = RED;
+ node->color=BLACK;
+ return;
+ }else{//case RRb
+ rotateleft(grandparent,tree);
+ grandparent->color = RED;
+ parent->color=BLACK;
+ return;
+ }
+ }
+
+
+ }
+}
+
+tree_node * insertKeyIntoTree(RBData * data, red_black_tree* tree){
+ tree_node* new_node=(tree_node*) malloc(sizeof(tree_node));
+ new_node->data = data;
+
+ if(tree->head == NULL){ // if the tree is empty
+ tree->head = new_node;
+ new_node->parent = NULL;
+ new_node->left = NULL;
+ new_node->right = NULL;
+ new_node->color = BLACK;
+ return new_node;
+ }
+
+ tree_node* actNode = tree->head;
+ tree_node* parentNode = NULL;
+
+
+ while(actNode != NULL){
+ parentNode = actNode;
+ int compret=(tree->compare2)(actNode->data,data);
+ if(compret > 0){ //take right neigbour
+ actNode = actNode->right;
+ }else if(compret < 0){//take left neigbour
+ actNode = actNode->left;
+ }else{ //found key, whoops data already exits !!!!
+ return NULL;
+ }
+ }
+
+ //found position for insert !!!
+ //binary tree insertion always at a leaf node:
+ if((tree->compare2)(parentNode->data,data) > 0){ //take right neigbour
+ parentNode->right = new_node;
+ }else{//take left neigbour
+ parentNode->left = new_node;
+ }
+ new_node->parent = parentNode;
+ new_node->left = NULL;
+ new_node->right = NULL;
+ new_node->color = RED;
+
+ //check if red-black-tree conditions are held
+ rebuildRBTree(tree, new_node);
+ //insert RedBlack condition is now held.
+ return new_node;
+}
+
+static void rebuildTreeAfterDeletion(red_black_tree* tree, tree_node* node){
+ tree_node * parent = node->parent;
+ if(node->color == BLACK){
+ //complex cases:
+ //now the (possible existing) child is black
+ //if(child == NULL || child->color == BLACK){
+ if(parent != NULL){
+ //delete case 2
+ tree_node * sibling = nodesibling(node);
+ if(sibling != NULL && sibling->color == RED){
+ parent->color = RED;
+ sibling->color = BLACK;
+ if( node == parent->left ){
+ rotateleft(parent,tree);
+ }else{
+ rotateright(parent,tree);
+ }
+ sibling = nodesibling(node);
+ }
+ //delete case 3
+ if(parent->color == BLACK && (sibling == NULL || (sibling->color == BLACK &&
+ (sibling->left == NULL || sibling->left->color == BLACK) &&
+ (sibling->right == NULL || sibling->right->color == BLACK)))
+ ){
+ if(sibling!= NULL)
+ sibling->color = RED;
+ rebuildTreeAfterDeletion(tree,node->parent);
+ }else{
+ //delete case 4
+ if(parent->color == RED && (sibling == NULL || (sibling->color == BLACK &&
+ (sibling->left == NULL || sibling->left->color == BLACK) &&
+ (sibling->right == NULL || sibling->right->color == BLACK)))
+ ){
+ sibling->color = RED;
+ parent->color = BLACK;
+ }else{
+ //delete case 5
+ if(parent->left == node && sibling->color == BLACK &&
+ sibling->left != NULL && sibling->left->color == RED &&
+ (sibling->right == NULL || sibling->right->color == BLACK)
+ ){
+ sibling->color = RED;
+ sibling->left->color = BLACK;
+ rotateright(sibling,tree);
+ }else if(parent->right == node && sibling->color == BLACK &&
+ sibling->right != NULL && sibling->right->color == RED &&
+ (sibling->left == NULL || sibling->left->color == BLACK)
+ ){
+ sibling->color = RED;
+ sibling->right->color = BLACK;
+ rotateleft(sibling,tree);
+ }
+ //delete case 6
+ parent = node->parent;
+ sibling = nodesibling(node);
+ sibling->color = parent->color;
+ parent->color = BLACK;
+ if( node == parent->left){
+ if(sibling->right != NULL)
+ sibling->right->color = BLACK;
+ rotateleft(parent,tree);
+ }else{
+ if(sibling->left != NULL)
+ sibling->left->color = BLACK;
+ rotateright(parent,tree);
+ }
+ }
+ }
+ }//else new node is tree root, do nothing...
+
+
+ }// end complex cases!
+}
+
+void deleteNodeFromTree(tree_node*node , red_black_tree* tree){
+ tree_node * changed = deleteNodeFromTree2(node,tree);
+ if(changed != NULL){
+ deleteNodeFromTree2(changed,tree);
+ }
+}
+
+tree_node * deleteNodeFromTree2(tree_node*node , red_black_tree* tree){
+ if(node->left != NULL && node->right != NULL){ //node has two children...
+ //look for maximum value in the left subtree, O(log(N))
+ tree_node * act_node;
+ for(act_node=node->left; act_node->right != NULL; act_node=act_node->right);
+ node->data = act_node->data;
+
+ //delete old node which has only one child
+ return act_node;
+ }
+
+ tree_node * child = node->left == NULL ? node->right : node->left;
+ int createdChild = FALSE;
+ //delete node which now has at most one child
+ //use node as dummy child
+ if(node->color == BLACK && node->parent != NULL && child == NULL){
+ child=node;
+ child->data = NULL;
+ createdChild = TRUE;
+ }
+
+ //relink nodes
+ if(node->parent != NULL){
+ if(node->parent->left == node){
+ node->parent->left = child;
+ }else{
+ node->parent->right = child;
+ }
+ }else{ //node was tree head !!!
+ tree->head = child;
+ }
+ if(child != NULL){
+ child->parent = node->parent;
+ if(node->color == BLACK){
+ if(child->color == RED){
+ child->color = BLACK;
+ }else{
+ //to start rebuild tree...
+ rebuildTreeAfterDeletion(tree, child);
+ }
+ }
+ }
+
+ if(createdChild){
+ tree_node * parent = child->parent;
+
+ if(parent != NULL){
+ if(parent->left == child){
+ parent->left = NULL;
+ }else{
+ parent->right = NULL;
+ }
+ }
+ }
+
+ //give node memory free !!!
+ free(node);
+ return NULL;
+}
+
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ README 2006-10-29 08:19:14.000000000 -0500
@@ -0,0 +1,3 @@
+This is an implementation of the trove interface which will not save persitent data.
+TAS = Trove analysis stub should be used to measure the performance overhead of the
+pvfs2 layers between application and storage space.
More information about the Pvfs2-cvs
mailing list