[Pvfs2-cvs] commit by mtmoore in pvfs2/src/io/trove/trove-dbpf:
dbpf-keyval.c dbpf-mgmt.c dbpf.h
CVS commit program
cvs at parl.clemson.edu
Tue Jul 14 13:19:43 EDT 2009
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv2435/src/io/trove/trove-dbpf
Modified Files:
Tag: Orange-mtmoore
dbpf-keyval.c dbpf-mgmt.c dbpf.h
Log Message:
Initial import of branch supporting keyval attribute/value lookup
Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.94 -r1.94.24.1
--- dbpf-keyval.c 30 Jan 2009 15:41:08 -0000 1.94
+++ dbpf-keyval.c 14 Jul 2009 17:19:42 -0000 1.94.24.1
@@ -37,6 +37,8 @@
#include "gossip.h"
#include "pvfs2-internal.h"
#include "pint-perf-counter.h"
+#include "pint-cached-config.h"
+#include "dbpf-keyval.h"
static uint32_t readdir_session = 0;
@@ -56,20 +58,6 @@ extern int synccount;
* likely sizeof(struct dbpf_keyval_db_entry).
*/
-#define DBPF_MAX_KEY_LENGTH PVFS_NAME_MAX
-
-struct dbpf_keyval_db_entry
-{
- TROVE_handle handle;
- char key[DBPF_MAX_KEY_LENGTH];
-};
-
-#define DBPF_KEYVAL_DB_ENTRY_TOTAL_SIZE(_size) \
- (sizeof(TROVE_handle) + _size)
-
-#define DBPF_KEYVAL_DB_ENTRY_KEY_SIZE(_size) \
- (_size - sizeof(TROVE_handle))
-
/**
* The keyval database contains attributes for pvfs2 handles
* (files, directories, symlinks, etc.) that are not considered
@@ -115,6 +103,7 @@ static int dbpf_keyval_do_remove(
DB *db_p, TROVE_handle handle, TROVE_keyval_s *key, TROVE_keyval_s *val);
static int dbpf_keyval_read_op_svc(struct dbpf_op *op_p);
+static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_read_list_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_write_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p);
@@ -157,6 +146,9 @@ enum dbpf_handle_info_action
static int dbpf_keyval_handle_info_ops(struct dbpf_op * op_p,
enum dbpf_handle_info_action action);
+static int dbpf_build_path_of_handle(DBC *dbc_p, char *path,
+ TROVE_coll_id coll_id,
+ TROVE_handle handle);
static int dbpf_keyval_read(TROVE_coll_id coll_id,
TROVE_handle handle,
@@ -328,6 +320,237 @@ return_error:
return ret;
}
+static int dbpf_keyval_read_value(TROVE_coll_id coll_id,
+ TROVE_ds_position *position_p,
+ PVFS_dirent *dirent_p,
+ TROVE_keyval_s *key_p,
+ TROVE_keyval_s *val_p,
+ TROVE_ds_flags flags,
+ TROVE_vtag_s *vtag,
+ void *user_ptr,
+ TROVE_context_id context_id,
+ TROVE_op_id *out_op_id_p,
+ PVFS_hint hints)
+{
+ int ret;
+ dbpf_queued_op_t *q_op_p = NULL;
+ struct dbpf_op op;
+ struct dbpf_op *op_p;
+ struct dbpf_collection *coll_p = NULL;
+ PINT_event_id event_id = 0;
+ PINT_event_type event_type;
+
+ coll_p = dbpf_collection_find_registered(coll_id);
+ if (coll_p == NULL)
+ {
+ return -TROVE_EINVAL;
+ }
+
+ ret = dbpf_op_init_queued_or_immediate(
+ &op, &q_op_p,
+ KEYVAL_READ_VALUE,
+ coll_p,
+ dirent_p->handle,
+ dbpf_keyval_read_value_op_svc,
+ flags,
+ NULL,
+ user_ptr,
+ context_id,
+ &op_p);
+ if(ret < 0)
+ {
+ return ret;
+ }
+
+ event_type = trove_dbpf_keyval_read_event_id;
+ DBPF_EVENT_START(coll_p, q_op_p, event_type, &event_id,
+ PINT_HINT_GET_CLIENT_ID(hints),
+ PINT_HINT_GET_REQUEST_ID(hints),
+ PINT_HINT_GET_RANK(hints),
+ dirent->handle,
+ PINT_HINT_GET_OP_ID(hints));
+
+ /* initialize the op-specific members */
+ op_p->u.v_read.dirent = dirent_p;
+ op_p->u.v_read.key = key_p;
+ op_p->u.v_read.val = val_p;
+ op_p->u.v_read.position_p = position_p;
+ op_p->hints = hints;
+
+ return dbpf_queue_or_service(op_p, q_op_p, coll_p, out_op_id_p,
+ event_type, event_id);
+}
+
+static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p)
+{
+ int ret = -TROVE_EINVAL, key_size=0;
+ struct dbpf_keyval_db_entry key_entry;
+ void *key_data, *value_data;
+ TROVE_ds_position local_p = TROVE_ITERATE_START;
+ DBT key, data, pkey;
+ DBC *dbc_p;
+
+ memset(&key, 0, sizeof(key));
+ memset(&data, 0, sizeof(data));
+ memset(&pkey, 0, sizeof(pkey));
+ memset(&key_entry, 0, sizeof(key_entry));
+
+ /* size of key to lookup is length of the key and the value */
+ key_size = strlen(op_p->u.v_read.key->buffer) + 1 +
+ op_p->u.v_read.val->buffer_sz;
+ if( (key_data = malloc( key_size )) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+ "key_data failed.\n");
+ return -TROVE_ENOMEM;
+ }
+ memset(key_data, 0, key_size );
+ memcpy(key_data, op_p->u.v_read.key->buffer,
+ strlen(op_p->u.v_read.key->buffer));
+ memcpy((key_data+strlen(op_p->u.v_read.key->buffer)),
+ op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
+
+ if( (value_data = malloc(op_p->u.v_read.val->buffer_sz)) == 0)
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+ " value_data failed.\n");
+ free(key_data);
+ return -TROVE_ENOMEM;
+
+ }
+ memset(value_data, 0, op_p->u.v_read.val->buffer_sz );
+
+ key.data = key_data;
+ key.ulen = key.size = strlen(key_data)+1;
+
+ data.data = value_data;
+ data.size = data.ulen = op_p->u.v_read.val->buffer_sz;
+
+ pkey.data = &key_entry;
+ pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
+ key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
+
+ /* duplicates in secondary index require use of cursor */
+ if( (op_p->coll_p->keyval_secondary_db->cursor(
+ op_p->coll_p->keyval_secondary_db, NULL, &dbc_p, 0)) != 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: Error getting cursor for keyval_secondary "
+ "key=%*s: %s\n", op_p->u.v_read.key->buffer_sz,
+ (char *)op_p->u.v_read.key->buffer, db_strerror(ret));
+ free(key_data);
+ free(value_data);
+ return TROVE_EFAULT;
+ }
+
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: Doing pget with: %s/(%d)(%d), "
+ "pkey (%d)(%d), data %s/(%d)(%d), initial position: %llu\n",
+ (char *)key.data, key.ulen, key.size, pkey.ulen, pkey.size,
+ (char *)data.data, data.ulen, data.size,
+ llu(*op_p->u.v_read.position_p));
+
+ ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
+ if( ret == DB_NOTFOUND ) /* no key at all */
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+ "value: No matching keys found in secondary index\n");
+ (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+ ret = -dbpf_db_error_to_trove_error(ret);
+ goto return_error;
+ }
+ else if( (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+ {
+ local_p = 0;
+ while( (ret == 0) && (local_p < (*op_p->u.v_read.position_p)) )
+ {
+ ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+ local_p++;
+ }
+ }
+
+ if( (ret != 0) && (ret != DB_NOTFOUND) ) /* failure other than not found */
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+ "value: pget error in secondary index: %s\n",
+ db_strerror(ret));
+ (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+ ret = -dbpf_db_error_to_trove_error(ret);
+ goto return_error;
+ }
+ else if( ((local_p) == (*op_p->u.v_read.position_p)) && ret == 0 )
+ {
+ /* got record for requested position, also handles first record */
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: dbpf_keyval_read_value: Found key %llu/%s -> "
+ "%s\n", llu(key_entry.handle), key_entry.key,
+ (char *)data.data);
+
+ /* the buffers that values are copied to must be big enough, passed in
+ * pointers have buffers set to max allowable size. */
+ assert(op_p->u.v_read.key->buffer_sz >= pkey.size);
+ assert(op_p->u.v_read.val->buffer_sz >= data.size);
+
+ memcpy(op_p->u.v_read.key->buffer, pkey.data, pkey.size);
+ op_p->u.v_read.key->read_sz = pkey.size;
+ memcpy(op_p->u.v_read.val->buffer, data.data, data.size);
+ op_p->u.v_read.val->read_sz = data.size;
+ op_p->u.v_read.dirent->handle = key_entry.handle;
+
+ if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+ {
+ (*op_p->u.v_read.position_p) = 1;
+ }
+ else
+ {
+ (*op_p->u.v_read.position_p)++;
+ }
+
+ /* check if another key exists to prevent an additional call to find
+ * the end. if the cursor ever stays open we'll need to return
+ * current above, not next */
+ ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+ if( ret == DB_NOTFOUND )
+ {
+ (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: pre-empting end"
+ " iterator\n");
+ }
+
+ ret = dbpf_build_path_of_handle( dbc_p, op_p->u.v_read.dirent->d_name,
+ op_p->coll_p->coll_id, op_p->u.v_read.dirent->handle );
+
+ if( ret != 0 )
+ {
+ goto return_error;
+ }
+ }
+ else
+ {
+ /* didn't find the record we wanted, but not for first position
+ * (handle above) so don't return error, just set the end marker */
+ memset( op_p->u.v_read.key->buffer, 0, pkey.size);
+ op_p->u.v_read.key->buffer_sz = 0;
+ memset( op_p->u.v_read.val->buffer, 0, data.size);
+ op_p->u.v_read.val->buffer_sz = 0;
+ memset( op_p->u.v_read.dirent, 0, sizeof( PVFS_dirent ) );
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
+ "value: reached end of cursor with no record to give\n");
+ *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+ }
+
+ dbc_p->c_close(dbc_p);
+ free(key_data);
+ free(value_data);
+ return 1;
+
+return_error:
+ dbc_p->c_close(dbc_p);
+ free(key_data);
+ free(value_data);
+ return ret;
+}
+
static int dbpf_keyval_write(TROVE_coll_id coll_id,
TROVE_handle handle,
TROVE_keyval_s *key_p,
@@ -2111,6 +2334,161 @@ static int dbpf_keyval_handle_info_ops(s
return 0;
}
+static int dbpf_build_path_of_handle( DBC *dbc_p,
+ char *path,
+ TROVE_coll_id coll_id,
+ TROVE_handle handle)
+{
+ char *tmp_path = NULL;
+ int ret = 0, path_len = 0;
+ DBT key, data, pkey;
+ TROVE_handle root_h=0, value_h=0, key_h=0;
+ struct dbpf_keyval_db_entry key_entry;
+
+ PINT_cached_config_get_root_handle(coll_id, &root_h);
+ key_h = handle;
+
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ memset(&pkey, 0, sizeof(DBT));
+ memset(&key_entry, 0, sizeof( struct dbpf_keyval_db_entry ));
+
+ pkey.data = &(key_entry);
+ pkey.ulen = pkey.size = sizeof( struct dbpf_keyval_db_entry );
+
+ key.data = &(key_h);
+ key.ulen = key.size = sizeof(TROVE_handle);
+
+ data.data = &(value_h);
+ data.ulen = data.size = sizeof(TROVE_handle);
+
+ data.flags = key.flags = pkey.flags = DB_DBT_USERMEM;
+
+ while( ret == 0 )
+ {
+ ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
+ if( ret == 0 )
+ {
+ memcpy(key.data, &(key_entry.handle), sizeof(TROVE_handle));
+ key.ulen = key.size = (sizeof(TROVE_handle));
+
+ if( strncmp("de", key_entry.key, 3) != 0 )
+ {
+ /* set next query for handle just looked up */
+ path_len = strlen(path) + strlen(key_entry.key) + 2;
+ tmp_path = malloc( path_len );
+ if( tmp_path == 0 )
+ {
+ ret = -TROVE_ENOMEM;
+ return ret;
+ }
+ memset(tmp_path, 0, path_len);
+
+ /* copy / and key, prefix to existing path */
+ strncpy( tmp_path, "/", 2 );
+ strncpy( (tmp_path+sizeof(char)), key_entry.key,
+ strlen(key_entry.key)+1);
+ strncpy( (tmp_path+strlen(key_entry.key)+1), path,
+ strlen(path));
+
+ memset(path, 0, path_len+1);
+ strncpy(path, tmp_path, path_len+1);
+ free(tmp_path);
+ }
+
+ /* if the root handle has been reached, break */
+ if( key_entry.handle == root_h )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: Built path (%llu): %s\n",
+ llu(key_entry.handle), path);
+ break;
+ }
+ }
+ else
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: Failed finding parent handle for: "
+ "%llu: ulen: %d, size: %d, %s\n",
+ llu( key_entry.handle), data.ulen, data.size,
+ db_strerror(ret));
+ ret = -dbpf_db_error_to_trove_error(ret);
+ return ret;
+ }
+ }
+ return 0;
+}
+
+ /* constructs secondary key for keyval_secondary db. the value of the
+ * primary data is returned. */
+int PINT_trove_dbpf_keyval_secondary_callback(
+ DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey)
+{
+ TROVE_handle h;
+ struct dbpf_keyval_db_entry *k;
+ void *key_data;
+
+ memset( skey, 0, sizeof(DBT));
+ k = (struct dbpf_keyval_db_entry *)pkey->data;
+
+ /* for attributes prefixed with user create a secondary key of the form
+ * <attribute><value> */
+ if( (memcmp(k->key, "user.", 5) == 0) )
+ {
+ /* size of new key is length of the attribute plus length of value */
+ if( (key_data = malloc(strlen(k->key) + (pdata->size)) ) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: malloc for secondary_callback "
+ "for new attribute/value key failed.\n");
+ return TROVE_ENOMEM;
+ }
+ memset(key_data, 0, (strlen(k->key) + (pdata->size)));
+
+ /* copy attribute to start of key */
+ memcpy(key_data, k->key, strlen(k->key) );
+
+ /* copy value directly after key */
+ memcpy((key_data + strlen(k->key)), pdata->data, pdata->size);
+ skey->ulen = skey->size = strlen(k->key) + pdata->size;
+
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+ "INDEX (%s) (%d) -> (%s) (%d)\n",
+ (char *)key_data, skey->size,
+ (char *)pdata->data, pdata->size);
+ }
+ else if((pdata->size == sizeof(TROVE_handle)) && (strcmp("dh", k->key)!=0))
+ {
+ /* for items with a value size of 8 and attribute not "dh"
+ * (an initial pass at only adding<dir handle><filename> -> <handle> )
+ * create a key of only the value (handle) */
+ if( (key_data = malloc(pdata->size) ) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: malloc for secondary_callback "
+ "failed when creating handle key.\n");
+ return TROVE_ENOMEM;
+ }
+ memset(key_data, 0, (pdata->size));
+ memcpy(key_data, pdata->data, pdata->size );
+ memcpy(&h, pdata->data, pdata->size);
+ /* copy attribute to start of key */
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+ "INDEX (%llu) (%d) -> (%llu) (%d)\n",
+ llu(h), pdata->size, llu(h), pdata->size);
+ skey->ulen = skey->size = pdata->size;
+ }
+ else
+ {
+ return DB_DONOTINDEX;
+ }
+
+ skey->data = key_data;
+ skey->flags = DB_DBT_APPMALLOC;
+ return 0;
+}
+
+
int PINT_trove_dbpf_keyval_compare(
DB * dbp, const DBT * a, const DBT * b)
{
@@ -2143,6 +2521,7 @@ int PINT_trove_dbpf_keyval_compare(
struct TROVE_keyval_ops dbpf_keyval_ops =
{
dbpf_keyval_read,
+ dbpf_keyval_read_value,
dbpf_keyval_write,
dbpf_keyval_remove,
dbpf_keyval_remove_list,
Index: dbpf-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-mgmt.c,v
diff -p -u -r1.109 -r1.109.24.1
--- dbpf-mgmt.c 29 Jan 2009 22:00:03 -0000 1.109
+++ dbpf-mgmt.c 14 Jul 2009 17:19:42 -0000 1.109.24.1
@@ -298,6 +298,11 @@ static int dbpf_db_create(const char *st
static DB *dbpf_db_open(
const char *sto_path, char *dbname, DB_ENV *envp, int *err_p,
int (*compare_fn) (DB *db, const DBT *dbt1, const DBT *dbt2), uint32_t flags);
+
+static int dbpf_db_associate( DB *p, DB *s,
+ int (*callback) (DB *db, const DBT *k, const DBT *v, DBT *r),
+ uint32_t flags);
+
static int dbpf_mkpath(char *pathname, mode_t mode);
@@ -1134,6 +1139,25 @@ int dbpf_collection_create(char *collnam
}
}
+ DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX, sto_p->name,
+ new_coll_id);
+ ret = stat(path_name, &dbstat);
+ if(ret < 0 && errno != ENOENT)
+ {
+ gossip_err("failed to stat keyval_secondary db: %s\n", path_name);
+ return -trove_errno_to_trove_error(errno);
+ }
+ if(ret < 0)
+ {
+ ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+ if (ret != 0)
+ {
+ gossip_err("dbpf_db_create failed on %s\n", path_name);
+ return ret;
+ }
+ }
+
+
DBPF_GET_BSTREAM_DIRNAME(path_name, PATH_MAX, sto_p->name, new_coll_id);
ret = mkdir(path_name, 0755);
if(ret != 0)
@@ -1229,6 +1253,7 @@ int dbpf_collection_remove(char *collnam
/* Clean up properly by closing all db handles */
db_close(db_collection->coll_attr_db);
db_close(db_collection->ds_db);
+ db_close(db_collection->keyval_secondary_db);
db_close(db_collection->keyval_db);
/* so that environment can also be cleaned up */
dbpf_putdb_env(db_collection->coll_env, db_collection->path_name);
@@ -1255,6 +1280,14 @@ int dbpf_collection_remove(char *collnam
ret = -trove_errno_to_trove_error(errno);
}
+ DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX,
+ sto_p->name, db_data.coll_id);
+ if(unlink(path_name) != 0)
+ {
+ gossip_err("failure removing keyval secondary db\n");
+ ret = -trove_errno_to_trove_error(errno);
+ }
+
DBPF_GET_COLL_ATTRIB_DBNAME(path_name, PATH_MAX,
sto_p->name, db_data.coll_id);
if (unlink(path_name) != 0)
@@ -1567,6 +1600,18 @@ int dbpf_collection_clear(TROVE_coll_id
gossip_lerr("db_close(coll_ds_db): %s\n", db_strerror(ret));
}
+ if ((ret = coll_p->keyval_secondary_db->sync(coll_p->keyval_secondary_db,
+ 0)) != 0)
+ {
+ gossip_err("db_sync(coll_keyval_secondary_db): %s\n", db_strerror(ret));
+ }
+
+ if ((ret = db_close(coll_p->keyval_secondary_db)) != 0)
+ {
+ gossip_lerr("db_close(coll_keyval_secondary_db): %s\n",
+ db_strerror(ret));
+ }
+
if ((ret = coll_p->keyval_db->sync(coll_p->keyval_db, 0)) != 0)
{
gossip_err("db_sync(coll_keyval_db): %s\n", db_strerror(ret));
@@ -1624,6 +1669,7 @@ int dbpf_collection_lookup(char *collnam
char path_name[PATH_MAX];
char trove_dbpf_version[32] = {0};
int sto_major, sto_minor, sto_inc, major, minor, inc;
+ struct stat dbstat;
gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_collection_lookup of coll: %s\n",
collname);
@@ -1821,8 +1867,7 @@ int dbpf_collection_lookup(char *collnam
return ret;
}
- DBPF_GET_KEYVAL_DBNAME(path_name, PATH_MAX,
- sto_p->name, coll_p->coll_id);
+ DBPF_GET_KEYVAL_DBNAME(path_name, PATH_MAX, sto_p->name, coll_p->coll_id);
coll_p->keyval_db = dbpf_db_open(sto_p->name, path_name, coll_p->coll_env,
&ret, PINT_trove_dbpf_keyval_compare, 0);
if(coll_p->keyval_db == NULL)
@@ -1849,6 +1894,60 @@ int dbpf_collection_lookup(char *collnam
return -TROVE_ENOMEM;
}
+ DBPF_GET_KEYVAL_SECONDARY_DBNAME(path_name, PATH_MAX,
+ sto_p->name, coll_p->coll_id);
+ /* if secondary index doesn't exist, just re-create it, don't error out */
+ ret = stat(path_name, &dbstat);
+ if(ret < 0 && errno != ENOENT)
+ {
+ gossip_err("failed to stat keyval_secondary db: %s\n", path_name);
+ return -trove_errno_to_trove_error(errno);
+ }
+ if(ret < 0)
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: Recreating secondary "
+ "index.\n");
+ ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+ if (ret != 0)
+ {
+ gossip_err("dbpf_db_create failed on %s\n", path_name);
+ return ret;
+ }
+ }
+
+ /* secondary database file already exists, try to open */
+ coll_p->keyval_secondary_db = dbpf_db_open(sto_p->name, path_name,
+ coll_p->coll_env, &ret, NULL, (DB_DUP|DB_DUPSORT) );
+ /* TODO: add check to ensure BDB thinks secondary index is consistent */
+ if(coll_p->keyval_secondary_db == NULL)
+ {
+ db_close(coll_p->keyval_db);
+ db_close(coll_p->coll_attr_db);
+ db_close(coll_p->ds_db);
+ dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+ free(coll_p->path_name);
+ free(coll_p->name);
+ free(coll_p);
+ return ret;
+ }
+
+ /* associate the secondary index db with the primary and generate keys
+ * if it's empty */
+ ret = dbpf_db_associate(coll_p->keyval_db, coll_p->keyval_secondary_db,
+ PINT_trove_dbpf_keyval_secondary_callback,DB_CREATE);
+ if( ret != 0 )
+ {
+ db_close(coll_p->keyval_secondary_db);
+ db_close(coll_p->keyval_db);
+ db_close(coll_p->coll_attr_db);
+ db_close(coll_p->ds_db);
+ dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+ free(coll_p->path_name);
+ free(coll_p->name);
+ free(coll_p);
+ return ret;
+ }
+
coll_p->next_p = NULL;
/*
@@ -2168,6 +2267,33 @@ static DB *dbpf_db_open(
return db_p;
}
+
+/* dbpf_db_associate()
+ *
+ * Internal function for associating a database with the primary
+ * database to function as a secondary index.
+ *
+ * Returns 0 on success, negative value otherwise
+ */
+static int dbpf_db_associate( DB *p, DB *s,
+ int (*callback) (DB *db, const DBT *k, const DBT *d, DBT *r),
+ uint32_t flags)
+{
+ int ret = -TROVE_EINVAL;
+
+ ret = p->associate( p,
+#ifdef HAVE_TXNID_PARAMETER_TO_DB_OPEN
+ NULL,
+#endif
+ s,
+ callback,
+ flags );
+ if( ret != 0 )
+ return -dbpf_db_error_to_trove_error(ret);
+
+ return 0;
+}
+
static void dbpf_db_error_callback(
#ifdef HAVE_DBENV_PARAMETER_TO_DB_ERROR_CALLBACK
const DB_ENV *dbenv,
@@ -2250,6 +2376,7 @@ static __dbpf_op_type_str_map_t s_dbpf_o
{ KEYVAL_WRITE_LIST, "KEYVAL_WRITE_LIST" },
{ KEYVAL_FLUSH, "KEYVAL_FLUSH" },
{ KEYVAL_GET_HANDLE_INFO, "KEYVAL_GET_HANDLE_INFO" },
+ { KEYVAL_READ_VALUE, "KEYVAL_READ_VALUE" },
{ DSPACE_CREATE, "DSPACE_CREATE" },
{ DSPACE_REMOVE, "DSPACE_REMOVE" },
{ DSPACE_ITERATE_HANDLES, "DSPACE_ITERATE_HANDLES" },
Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.96 -r1.96.24.1
--- dbpf.h 29 Jan 2009 23:39:52 -0000 1.96
+++ dbpf.h 14 Jul 2009 17:19:42 -0000 1.96.24.1
@@ -142,6 +142,13 @@ do {
KEYVAL_DBNAME); \
} while (0)
+#define KEYVAL_SECONDARY_DBNAME "keyval_secondary.db"
+#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname,__collid) \
+do { \
+ snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid, \
+ KEYVAL_SECONDARY_DBNAME); \
+} while (0)
+
inline int dbpf_pread(int fd, void *buf, size_t count, off_t offset);
inline int dbpf_pwrite(int fd, const void *buf, size_t count, off_t offset);
@@ -210,6 +217,7 @@ struct dbpf_collection
DB *coll_attr_db;
DB *ds_db;
DB *keyval_db;
+ DB *keyval_secondary_db;
DB_ENV *coll_env;
TROVE_coll_id coll_id;
TROVE_handle root_dir_handle;
@@ -243,6 +251,8 @@ struct dbpf_collection_db_entry
int PINT_trove_dbpf_keyval_compare(
DB * dbp, const DBT * a, const DBT * b);
+int PINT_trove_dbpf_keyval_secondary_callback(
+ DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey);
int PINT_trove_dbpf_ds_attr_compare(
DB * dbp, const DBT * a, const DBT * b);
int PINT_trove_dbpf_ds_attr_compare_reversed(
@@ -374,6 +384,15 @@ struct dbpf_keyval_iterate_keys_op
/* vtag? */
};
+struct dbpf_keyval_read_value_op
+{
+ PVFS_dirent *dirent;
+ PVFS_ds_keyval *key;
+ PVFS_ds_keyval *val;
+ TROVE_ds_position *position_p;
+ /* vtag? */
+};
+
struct dbpf_bstream_resize_op
{
TROVE_size size;
@@ -484,6 +503,7 @@ enum dbpf_op_type
KEYVAL_WRITE_LIST,
KEYVAL_FLUSH,
KEYVAL_GET_HANDLE_INFO,
+ KEYVAL_READ_VALUE,
DSPACE_CREATE = DSPACE_OP_TYPE,
DSPACE_REMOVE,
DSPACE_ITERATE_HANDLES,
@@ -565,6 +585,7 @@ struct dbpf_op
struct dbpf_dspace_getattr_list_op d_getattr_list;
struct dbpf_dspace_remove_list_op d_remove_list;
struct dbpf_keyval_get_handle_info_op k_get_handle_info;
+ struct dbpf_keyval_read_value_op v_read;
} u;
};
More information about the Pvfs2-cvs
mailing list