[Pvfs2-cvs] commit by mtmoore in pvfs2/src/io/trove/trove-dbpf: dbpf-keyval.c dbpf-mgmt.c dbpf.h

CVS commit program cvs at parl.clemson.edu
Tue Jul 14 13:19:43 EDT 2009


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv2435/src/io/trove/trove-dbpf

Modified Files:
      Tag: Orange-mtmoore
	dbpf-keyval.c dbpf-mgmt.c dbpf.h 
Log Message:
Initial import of branch supporting keyval attribute/value lookup


Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.94 -r1.94.24.1
--- dbpf-keyval.c	30 Jan 2009 15:41:08 -0000	1.94
+++ dbpf-keyval.c	14 Jul 2009 17:19:42 -0000	1.94.24.1
@@ -37,6 +37,8 @@
 #include "gossip.h"
 #include "pvfs2-internal.h"
 #include "pint-perf-counter.h"
+#include "pint-cached-config.h"
+#include "dbpf-keyval.h"
 
 static uint32_t readdir_session = 0;
 
@@ -56,20 +58,6 @@ extern int synccount;
  * likely sizeof(struct dbpf_keyval_db_entry).
  */
 
-#define DBPF_MAX_KEY_LENGTH PVFS_NAME_MAX
-
-struct dbpf_keyval_db_entry
-{
-    TROVE_handle handle;
-    char key[DBPF_MAX_KEY_LENGTH];
-};
-
-#define DBPF_KEYVAL_DB_ENTRY_TOTAL_SIZE(_size) \
-    (sizeof(TROVE_handle) + _size)
-
-#define DBPF_KEYVAL_DB_ENTRY_KEY_SIZE(_size) \
-    (_size - sizeof(TROVE_handle))
-
 /**
  * The keyval database contains attributes for pvfs2 handles
  * (files, directories, symlinks, etc.) that are not considered
@@ -115,6 +103,7 @@ static int dbpf_keyval_do_remove(
     DB *db_p, TROVE_handle handle, TROVE_keyval_s *key, TROVE_keyval_s *val);
 
 static int dbpf_keyval_read_op_svc(struct dbpf_op *op_p);
+static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_read_list_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_write_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p);
@@ -157,6 +146,9 @@ enum dbpf_handle_info_action
 
 static int dbpf_keyval_handle_info_ops(struct dbpf_op * op_p,
                                        enum dbpf_handle_info_action action);
+static int dbpf_build_path_of_handle(DBC *dbc_p, char *path, 
+                                     TROVE_coll_id coll_id, 
+                                     TROVE_handle handle);
 
 static int dbpf_keyval_read(TROVE_coll_id coll_id,
                             TROVE_handle handle,
@@ -328,6 +320,237 @@ return_error:
     return ret;
 }
 
+static int dbpf_keyval_read_value(TROVE_coll_id coll_id,
+                            TROVE_ds_position *position_p,
+                            PVFS_dirent *dirent_p,
+                            TROVE_keyval_s *key_p,
+                            TROVE_keyval_s *val_p,
+                            TROVE_ds_flags flags,
+                            TROVE_vtag_s *vtag,
+                            void *user_ptr,
+                            TROVE_context_id context_id,
+                            TROVE_op_id *out_op_id_p,
+                            PVFS_hint  hints)
+{
+    int ret;
+    dbpf_queued_op_t *q_op_p = NULL;
+    struct dbpf_op op;
+    struct dbpf_op *op_p;
+    struct dbpf_collection *coll_p = NULL;
+    PINT_event_id event_id = 0;
+    PINT_event_type event_type;
+
+    coll_p = dbpf_collection_find_registered(coll_id);
+    if (coll_p == NULL)
+    {
+        return -TROVE_EINVAL;
+    }
+
+    ret = dbpf_op_init_queued_or_immediate(
+        &op, &q_op_p,
+        KEYVAL_READ_VALUE,
+        coll_p,
+        dirent_p->handle,
+        dbpf_keyval_read_value_op_svc,
+        flags,
+        NULL,
+        user_ptr,
+        context_id,
+        &op_p);
+    if(ret < 0)
+    {
+        return ret;
+    }
+
+    event_type = trove_dbpf_keyval_read_event_id;
+    DBPF_EVENT_START(coll_p, q_op_p, event_type, &event_id,
+                     PINT_HINT_GET_CLIENT_ID(hints),
+                     PINT_HINT_GET_REQUEST_ID(hints),
+                     PINT_HINT_GET_RANK(hints),
+                     dirent->handle, 
+                     PINT_HINT_GET_OP_ID(hints));
+
+    /* initialize the op-specific members */
+    op_p->u.v_read.dirent = dirent_p;
+    op_p->u.v_read.key = key_p;
+    op_p->u.v_read.val = val_p;
+    op_p->u.v_read.position_p = position_p;
+    op_p->hints = hints;
+
+    return dbpf_queue_or_service(op_p, q_op_p, coll_p, out_op_id_p,
+                                 event_type, event_id);
+}
+
+static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p)
+{
+    int ret = -TROVE_EINVAL, key_size=0;
+    struct dbpf_keyval_db_entry key_entry;
+    void *key_data, *value_data;
+    TROVE_ds_position local_p = TROVE_ITERATE_START;
+    DBT key, data, pkey;
+    DBC *dbc_p;
+
+    memset(&key, 0, sizeof(key));
+    memset(&data, 0, sizeof(data));
+    memset(&pkey, 0, sizeof(pkey));
+    memset(&key_entry, 0, sizeof(key_entry));
+
+    /* size of key to lookup is length of the key and the value */
+    key_size = strlen(op_p->u.v_read.key->buffer) + 1 +
+        op_p->u.v_read.val->buffer_sz;
+    if( (key_data = malloc( key_size )) == 0 )
+    { 
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+                     "key_data failed.\n");
+        return -TROVE_ENOMEM;
+    }
+    memset(key_data, 0, key_size );
+    memcpy(key_data, op_p->u.v_read.key->buffer, 
+       strlen(op_p->u.v_read.key->buffer));
+    memcpy((key_data+strlen(op_p->u.v_read.key->buffer)), 
+        op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
+
+    if( (value_data = malloc(op_p->u.v_read.val->buffer_sz)) == 0)
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+                   " value_data failed.\n");
+        free(key_data);
+        return -TROVE_ENOMEM;
+    
+    }
+    memset(value_data, 0, op_p->u.v_read.val->buffer_sz );
+
+    key.data = key_data;
+    key.ulen = key.size = strlen(key_data)+1;
+
+    data.data = value_data;
+    data.size = data.ulen = op_p->u.v_read.val->buffer_sz; 
+
+    pkey.data = &key_entry;
+    pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
+    key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
+
+    /* duplicates in secondary index require use of cursor */
+    if( (op_p->coll_p->keyval_secondary_db->cursor(
+         op_p->coll_p->keyval_secondary_db, NULL, &dbc_p, 0)) != 0 )
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                     "[KEYVAL]: Error getting cursor for keyval_secondary "
+                     "key=%*s: %s\n", op_p->u.v_read.key->buffer_sz, 
+                     (char *)op_p->u.v_read.key->buffer, db_strerror(ret));
+        free(key_data);
+        free(value_data);
+        return TROVE_EFAULT;
+    }
+
+    gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                 "[KEYVAL]: Doing pget with: %s/(%d)(%d), "
+                 "pkey (%d)(%d), data %s/(%d)(%d), initial position: %llu\n", 
+                 (char *)key.data, key.ulen, key.size, pkey.ulen, pkey.size,
+                 (char *)data.data, data.ulen, data.size,
+                 llu(*op_p->u.v_read.position_p));
+    
+    ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
+    if( ret == DB_NOTFOUND ) /* no key at all */
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+                 "value: No matching keys found in secondary index\n");
+        (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+        ret = -dbpf_db_error_to_trove_error(ret);
+        goto return_error;
+    }
+    else if(  (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+    {
+        local_p = 0;
+        while( (ret == 0) && (local_p < (*op_p->u.v_read.position_p)) )
+        {
+            ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+            local_p++;
+        }
+    }
+
+    if( (ret != 0) && (ret != DB_NOTFOUND) ) /* failure other than not found */
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+                     "value: pget error in secondary index: %s\n",
+                     db_strerror(ret));
+        (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+        ret = -dbpf_db_error_to_trove_error(ret);
+        goto return_error;
+    }
+    else if( ((local_p) == (*op_p->u.v_read.position_p)) && ret == 0 )
+    {
+        /* got record for requested position, also handles first record */
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                     "[KEYVAL]: dbpf_keyval_read_value: Found key %llu/%s -> "
+                     "%s\n", llu(key_entry.handle), key_entry.key, 
+                     (char *)data.data);
+
+        /* the buffers that values are copied to must be big enough, passed in
+        * pointers have buffers set to max allowable size. */
+        assert(op_p->u.v_read.key->buffer_sz >= pkey.size);
+        assert(op_p->u.v_read.val->buffer_sz >= data.size);
+
+        memcpy(op_p->u.v_read.key->buffer, pkey.data, pkey.size);
+        op_p->u.v_read.key->read_sz = pkey.size;
+        memcpy(op_p->u.v_read.val->buffer, data.data, data.size);
+        op_p->u.v_read.val->read_sz = data.size;
+        op_p->u.v_read.dirent->handle = key_entry.handle;
+
+        if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+        {
+            (*op_p->u.v_read.position_p) = 1;
+        }
+        else
+        {
+            (*op_p->u.v_read.position_p)++; 
+        }
+
+        /* check if another key exists to prevent an additional call to find
+         * the end. if the cursor ever stays open we'll need to return 
+         * current above, not next */
+        ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+        if( ret == DB_NOTFOUND )
+        {
+            (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: pre-empting end"
+                         " iterator\n");
+        }
+
+        ret = dbpf_build_path_of_handle( dbc_p, op_p->u.v_read.dirent->d_name,
+            op_p->coll_p->coll_id, op_p->u.v_read.dirent->handle );
+        
+        if( ret != 0 )
+        {
+            goto return_error;
+        }
+    }
+    else
+    {
+        /* didn't find the record we wanted, but not for first position 
+         * (handle above) so don't return error, just set the end marker */
+        memset( op_p->u.v_read.key->buffer, 0, pkey.size);
+        op_p->u.v_read.key->buffer_sz = 0;
+        memset( op_p->u.v_read.val->buffer, 0, data.size);
+        op_p->u.v_read.val->buffer_sz = 0;
+        memset( op_p->u.v_read.dirent, 0, sizeof( PVFS_dirent ) );
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+                     "value: reached end of cursor with no record to give\n");
+        *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+    }
+
+    dbc_p->c_close(dbc_p);
+    free(key_data);
+    free(value_data);
+    return 1;
+
+return_error:
+    dbc_p->c_close(dbc_p);
+    free(key_data);
+    free(value_data);
+    return ret;
+}
+
 static int dbpf_keyval_write(TROVE_coll_id coll_id,
                              TROVE_handle handle,
                              TROVE_keyval_s *key_p,
@@ -2111,6 +2334,161 @@ static int dbpf_keyval_handle_info_ops(s
     return 0;
 }
 
+static int dbpf_build_path_of_handle( DBC *dbc_p,
+    char *path, 
+    TROVE_coll_id coll_id, 
+    TROVE_handle handle)
+{
+    char *tmp_path = NULL;
+    int ret = 0, path_len = 0;
+    DBT key, data, pkey;
+    TROVE_handle root_h=0, value_h=0, key_h=0;
+    struct dbpf_keyval_db_entry key_entry;
+
+    PINT_cached_config_get_root_handle(coll_id, &root_h); 
+    key_h = handle;
+
+    memset(&key, 0, sizeof(DBT));
+    memset(&data, 0, sizeof(DBT));
+    memset(&pkey, 0, sizeof(DBT));
+    memset(&key_entry, 0, sizeof( struct dbpf_keyval_db_entry ));
+
+    pkey.data = &(key_entry);
+    pkey.ulen = pkey.size = sizeof( struct dbpf_keyval_db_entry );
+
+    key.data = &(key_h);
+    key.ulen = key.size  = sizeof(TROVE_handle);
+
+    data.data = &(value_h);
+    data.ulen = data.size = sizeof(TROVE_handle);
+
+    data.flags = key.flags = pkey.flags = DB_DBT_USERMEM;
+
+    while( ret == 0 )
+    {
+        ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
+        if( ret == 0 )
+        {
+            memcpy(key.data, &(key_entry.handle), sizeof(TROVE_handle));
+            key.ulen = key.size = (sizeof(TROVE_handle));
+
+            if( strncmp("de", key_entry.key, 3) != 0 )
+            {
+                /* set next query for handle just looked up */
+                path_len = strlen(path) + strlen(key_entry.key) + 2;
+                tmp_path = malloc( path_len );
+                if( tmp_path == 0 )
+                {
+                    ret = -TROVE_ENOMEM;
+                    return ret;
+                }
+                memset(tmp_path, 0, path_len);
+
+                /* copy / and key, prefix to existing path */
+                strncpy( tmp_path, "/", 2 );
+                strncpy( (tmp_path+sizeof(char)), key_entry.key,
+                    strlen(key_entry.key)+1);
+                strncpy( (tmp_path+strlen(key_entry.key)+1), path, 
+                    strlen(path));
+
+                memset(path, 0, path_len+1);
+                strncpy(path, tmp_path, path_len+1);
+                free(tmp_path);
+            }
+
+            /* if the root handle has been reached, break */
+            if( key_entry.handle == root_h )
+            {
+                gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                             "[KEYVAL]: Built path (%llu): %s\n",
+                             llu(key_entry.handle), path);
+                break;
+            }
+        }
+        else
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                         "[KEYVAL]: Failed finding parent handle for: "
+                         "%llu: ulen: %d, size: %d, %s\n", 
+                         llu( key_entry.handle), data.ulen, data.size,
+                         db_strerror(ret));
+            ret = -dbpf_db_error_to_trove_error(ret);
+            return ret;
+        }
+    }
+    return 0;
+}
+
+ /* constructs secondary key for keyval_secondary db. the value of the
+  * primary data is returned. */
+int PINT_trove_dbpf_keyval_secondary_callback(
+    DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey)
+{
+    TROVE_handle h;
+    struct dbpf_keyval_db_entry *k;
+    void *key_data;
+
+    memset( skey, 0, sizeof(DBT));
+    k = (struct dbpf_keyval_db_entry *)pkey->data;
+
+    /* for attributes prefixed with user create a secondary key of the form
+     * <attribute><value> */
+    if( (memcmp(k->key, "user.", 5) == 0) )
+    {
+        /* size of new key is length of the attribute plus length of value */
+        if( (key_data = malloc(strlen(k->key) + (pdata->size)) ) == 0 )
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                         "[DBPF KEYVAL]: malloc for secondary_callback "
+                         "for new attribute/value key failed.\n");
+            return TROVE_ENOMEM;
+        }
+        memset(key_data, 0, (strlen(k->key) + (pdata->size)));
+    
+        /* copy attribute to start of key */
+        memcpy(key_data, k->key, strlen(k->key) );
+    
+        /* copy value directly after key */
+        memcpy((key_data + strlen(k->key)), pdata->data, pdata->size);
+        skey->ulen = skey->size = strlen(k->key) + pdata->size;
+
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+                     "INDEX (%s) (%d) -> (%s) (%d)\n", 
+                     (char *)key_data, skey->size, 
+                     (char *)pdata->data, pdata->size); 
+    }
+    else if((pdata->size == sizeof(TROVE_handle)) && (strcmp("dh", k->key)!=0))
+    {
+        /* for items with a value size of 8 and attribute not "dh" 
+         * (an initial pass at only adding<dir handle><filename> -> <handle> )
+         * create a key of only the value (handle) */
+        if( (key_data = malloc(pdata->size) ) == 0 )
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                         "[DBPF KEYVAL]: malloc for secondary_callback "
+                         "failed when creating handle key.\n");
+            return TROVE_ENOMEM;
+        }
+        memset(key_data, 0, (pdata->size));
+        memcpy(key_data, pdata->data, pdata->size );
+        memcpy(&h, pdata->data, pdata->size);
+        /* copy attribute to start of key */
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+                     "INDEX (%llu) (%d) -> (%llu) (%d)\n", 
+                     llu(h), pdata->size, llu(h), pdata->size); 
+        skey->ulen = skey->size = pdata->size;
+    }
+    else
+    {
+        return DB_DONOTINDEX;
+    }
+
+    skey->data = key_data;
+    skey->flags = DB_DBT_APPMALLOC;
+    return 0;
+}
+
+
 int PINT_trove_dbpf_keyval_compare(
     DB * dbp, const DBT * a, const DBT * b)
 {
@@ -2143,6 +2521,7 @@ int PINT_trove_dbpf_keyval_compare(
 struct TROVE_keyval_ops dbpf_keyval_ops =
 {
     dbpf_keyval_read,
+    dbpf_keyval_read_value,
     dbpf_keyval_write,
     dbpf_keyval_remove,
     dbpf_keyval_remove_list,

Index: dbpf-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-mgmt.c,v
diff -p -u -r1.109 -r1.109.24.1
--- dbpf-mgmt.c	29 Jan 2009 22:00:03 -0000	1.109
+++ dbpf-mgmt.c	14 Jul 2009 17:19:42 -0000	1.109.24.1
@@ -298,6 +298,11 @@ static int dbpf_db_create(const char *st
 static DB *dbpf_db_open(
     const char *sto_path, char *dbname, DB_ENV *envp, int *err_p,
     int (*compare_fn) (DB *db, const DBT *dbt1, const DBT *dbt2), uint32_t flags);
+
+static int dbpf_db_associate( DB *p, DB *s,
+    int (*callback) (DB *db, const DBT *k, const DBT *v, DBT *r),
+    uint32_t flags);
+
 static int dbpf_mkpath(char *pathname, mode_t mode);
 
 
@@ -1134,6 +1139,25 @@ int dbpf_collection_create(char *collnam
         }
     }
 
+    DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX, sto_p->name, 
+        new_coll_id);
+    ret = stat(path_name, &dbstat);
+    if(ret < 0 && errno != ENOENT)
+    {
+        gossip_err("failed to stat keyval_secondary db: %s\n", path_name);
+        return -trove_errno_to_trove_error(errno);
+    }
+    if(ret < 0)
+    {
+        ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+        if (ret != 0)
+        {
+            gossip_err("dbpf_db_create failed on %s\n", path_name);
+            return ret;
+        }
+    }
+
+
     DBPF_GET_BSTREAM_DIRNAME(path_name, PATH_MAX, sto_p->name, new_coll_id);
     ret = mkdir(path_name, 0755);
     if(ret != 0)
@@ -1229,6 +1253,7 @@ int dbpf_collection_remove(char *collnam
         /* Clean up properly by closing all db handles */
         db_close(db_collection->coll_attr_db);
         db_close(db_collection->ds_db);
+        db_close(db_collection->keyval_secondary_db);
         db_close(db_collection->keyval_db);
         /* so that environment can also be cleaned up */
         dbpf_putdb_env(db_collection->coll_env, db_collection->path_name);
@@ -1255,6 +1280,14 @@ int dbpf_collection_remove(char *collnam
         ret = -trove_errno_to_trove_error(errno);
     }
 
+    DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX,
+                                     sto_p->name, db_data.coll_id);
+    if(unlink(path_name) != 0)
+    {
+        gossip_err("failure removing keyval secondary db\n");
+        ret = -trove_errno_to_trove_error(errno);
+    }
+
     DBPF_GET_COLL_ATTRIB_DBNAME(path_name, PATH_MAX,
                                 sto_p->name, db_data.coll_id);
     if (unlink(path_name) != 0)
@@ -1567,6 +1600,18 @@ int dbpf_collection_clear(TROVE_coll_id 
         gossip_lerr("db_close(coll_ds_db): %s\n", db_strerror(ret));
     }
 
+    if ((ret = coll_p->keyval_secondary_db->sync(coll_p->keyval_secondary_db, 
+        0)) != 0)
+    {
+        gossip_err("db_sync(coll_keyval_secondary_db): %s\n", db_strerror(ret));
+    }
+
+    if ((ret = db_close(coll_p->keyval_secondary_db)) != 0) 
+    {
+        gossip_lerr("db_close(coll_keyval_secondary_db): %s\n", 
+            db_strerror(ret));
+    }
+
     if ((ret = coll_p->keyval_db->sync(coll_p->keyval_db, 0)) != 0)
     {
         gossip_err("db_sync(coll_keyval_db): %s\n", db_strerror(ret));
@@ -1624,6 +1669,7 @@ int dbpf_collection_lookup(char *collnam
     char path_name[PATH_MAX];
     char trove_dbpf_version[32] = {0};
     int sto_major, sto_minor, sto_inc, major, minor, inc;
+    struct stat dbstat;
 
     gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_collection_lookup of coll: %s\n", 
                  collname);
@@ -1821,8 +1867,7 @@ int dbpf_collection_lookup(char *collnam
         return ret;
     }
 
-    DBPF_GET_KEYVAL_DBNAME(path_name, PATH_MAX,
-                           sto_p->name, coll_p->coll_id);
+    DBPF_GET_KEYVAL_DBNAME(path_name, PATH_MAX, sto_p->name, coll_p->coll_id);
     coll_p->keyval_db = dbpf_db_open(sto_p->name, path_name, coll_p->coll_env,
                                      &ret, PINT_trove_dbpf_keyval_compare, 0);
     if(coll_p->keyval_db == NULL)
@@ -1849,6 +1894,60 @@ int dbpf_collection_lookup(char *collnam
         return -TROVE_ENOMEM;
     }
 
+    DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX,
+                           sto_p->name, coll_p->coll_id);
+    /* if secondary index doesn't exist, just re-create it, don't error out */
+    ret = stat(path_name, &dbstat);
+    if(ret < 0 && errno != ENOENT)
+    {
+        gossip_err("failed to stat keyval_secondary db: %s\n", path_name);
+        return -trove_errno_to_trove_error(errno);
+    }
+    if(ret < 0)
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: Recreating secondary "
+                     "index.\n");
+        ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+        if (ret != 0)
+        {
+            gossip_err("dbpf_db_create failed on %s\n", path_name);
+            return ret;
+        }
+    }
+
+    /* secondary database file already exists, try to open */
+    coll_p->keyval_secondary_db = dbpf_db_open(sto_p->name, path_name, 
+        coll_p->coll_env, &ret, NULL, (DB_DUP|DB_DUPSORT) );
+    /* TODO: add check to ensure BDB thinks secondary index is consistent */
+    if(coll_p->keyval_secondary_db == NULL)
+    {
+        db_close(coll_p->keyval_db);
+        db_close(coll_p->coll_attr_db);
+        db_close(coll_p->ds_db);
+        dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+        free(coll_p->path_name);
+        free(coll_p->name);
+        free(coll_p);
+        return ret;
+    }
+
+    /* associate the secondary index db with the primary and generate keys
+     * if it's empty */
+    ret = dbpf_db_associate(coll_p->keyval_db, coll_p->keyval_secondary_db, 
+          PINT_trove_dbpf_keyval_secondary_callback,DB_CREATE);
+    if( ret != 0 )
+    {
+        db_close(coll_p->keyval_secondary_db);
+        db_close(coll_p->keyval_db);
+        db_close(coll_p->coll_attr_db);
+        db_close(coll_p->ds_db);
+        dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+        free(coll_p->path_name);
+        free(coll_p->name);
+        free(coll_p);
+        return ret;
+    }
+
     coll_p->next_p = NULL;
 
     /*
@@ -2168,6 +2267,33 @@ static DB *dbpf_db_open(
     return db_p;
 }
 
+
+/* dbpf_db_associate()
+ *
+ * Internal function for associating a database with the primary
+ * database to function as a secondary index.
+ *
+ * Returns 0 on success, negative value otherwise
+ */
+static int dbpf_db_associate( DB *p, DB *s,
+    int (*callback) (DB *db, const DBT *k, const DBT *d, DBT *r),
+    uint32_t flags)
+{
+    int ret = -TROVE_EINVAL;
+
+    ret = p->associate( p,
+#ifdef HAVE_TXNID_PARAMETER_TO_DB_OPEN
+                        NULL,
+#endif
+                        s,
+                        callback,
+                        flags );
+    if( ret != 0 )
+       return -dbpf_db_error_to_trove_error(ret);
+
+    return 0;
+}
+
 static void dbpf_db_error_callback(
 #ifdef HAVE_DBENV_PARAMETER_TO_DB_ERROR_CALLBACK
     const DB_ENV *dbenv, 
@@ -2250,6 +2376,7 @@ static __dbpf_op_type_str_map_t s_dbpf_o
     { KEYVAL_WRITE_LIST, "KEYVAL_WRITE_LIST" },
     { KEYVAL_FLUSH, "KEYVAL_FLUSH" },
     { KEYVAL_GET_HANDLE_INFO, "KEYVAL_GET_HANDLE_INFO" },
+    { KEYVAL_READ_VALUE, "KEYVAL_READ_VALUE" },
     { DSPACE_CREATE, "DSPACE_CREATE" },
     { DSPACE_REMOVE, "DSPACE_REMOVE" },
     { DSPACE_ITERATE_HANDLES, "DSPACE_ITERATE_HANDLES" },

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.96 -r1.96.24.1
--- dbpf.h	29 Jan 2009 23:39:52 -0000	1.96
+++ dbpf.h	14 Jul 2009 17:19:42 -0000	1.96.24.1
@@ -142,6 +142,13 @@ do {                                    
            KEYVAL_DBNAME);                                               \
 } while (0)
 
+#define KEYVAL_SECONDARY_DBNAME "keyval_secondary.db"
+#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname,__collid)      \
+do {                                                                     \
+  snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid,        \
+           KEYVAL_SECONDARY_DBNAME);                                     \
+} while (0)
+
 inline int dbpf_pread(int fd, void *buf, size_t count, off_t offset);
 inline int dbpf_pwrite(int fd, const void *buf, size_t count, off_t offset);
 
@@ -210,6 +217,7 @@ struct dbpf_collection
     DB *coll_attr_db;
     DB *ds_db;
     DB *keyval_db;
+    DB *keyval_secondary_db;
     DB_ENV *coll_env;
     TROVE_coll_id coll_id;
     TROVE_handle root_dir_handle;
@@ -243,6 +251,8 @@ struct dbpf_collection_db_entry
 
 int PINT_trove_dbpf_keyval_compare(
     DB * dbp, const DBT * a, const DBT * b);
+int PINT_trove_dbpf_keyval_secondary_callback(
+    DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey);
 int PINT_trove_dbpf_ds_attr_compare(
     DB * dbp, const DBT * a, const DBT * b);
 int PINT_trove_dbpf_ds_attr_compare_reversed(
@@ -374,6 +384,15 @@ struct dbpf_keyval_iterate_keys_op
     /* vtag? */
 };
 
+struct dbpf_keyval_read_value_op
+{
+    PVFS_dirent *dirent;
+    PVFS_ds_keyval *key;
+    PVFS_ds_keyval *val;
+    TROVE_ds_position *position_p;
+    /* vtag? */
+};
+
 struct dbpf_bstream_resize_op
 {
     TROVE_size size;
@@ -484,6 +503,7 @@ enum dbpf_op_type
     KEYVAL_WRITE_LIST,
     KEYVAL_FLUSH,
     KEYVAL_GET_HANDLE_INFO,
+    KEYVAL_READ_VALUE,
     DSPACE_CREATE = DSPACE_OP_TYPE,
     DSPACE_REMOVE,
     DSPACE_ITERATE_HANDLES,
@@ -565,6 +585,7 @@ struct dbpf_op
         struct dbpf_dspace_getattr_list_op d_getattr_list;
         struct dbpf_dspace_remove_list_op d_remove_list;
         struct dbpf_keyval_get_handle_info_op k_get_handle_info;
+        struct dbpf_keyval_read_value_op v_read;
     } u;
 };
 



More information about the Pvfs2-cvs mailing list