[Pvfs2-cvs] commit by mtmoore in pvfs2/src/io/trove/trove-dbpf:
dbpf-keyval.c dbpf-mgmt.c dbpf.h
CVS commit program
cvs at parl.clemson.edu
Mon Aug 10 11:30:10 EDT 2009
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv10659/pvfs2/src/io/trove/trove-dbpf
Modified Files:
Tag: Orange-mtmoore
dbpf-keyval.c dbpf-mgmt.c dbpf.h
Log Message:
Merge range query changes and other fixes
Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.94.24.1 -r1.94.24.2
--- dbpf-keyval.c 14 Jul 2009 17:19:42 -0000 1.94.24.1
+++ dbpf-keyval.c 10 Aug 2009 15:30:09 -0000 1.94.24.2
@@ -24,6 +24,7 @@
#include <db.h>
#include <time.h>
#include <stdlib.h>
+#include <ctype.h>
#ifdef HAVE_MALLOC_H
#include <malloc.h>
#endif
@@ -146,6 +147,10 @@ enum dbpf_handle_info_action
static int dbpf_keyval_handle_info_ops(struct dbpf_op * op_p,
enum dbpf_handle_info_action action);
+
+static int dbpf_result_iterate_selector(char *a, char *b,
+ uint32_t query);
+
static int dbpf_build_path_of_handle(DBC *dbc_p, char *path,
TROVE_coll_id coll_id,
TROVE_handle handle);
@@ -322,9 +327,14 @@ return_error:
static int dbpf_keyval_read_value(TROVE_coll_id coll_id,
TROVE_ds_position *position_p,
- PVFS_dirent *dirent_p,
+ uint32_t type,
TROVE_keyval_s *key_p,
TROVE_keyval_s *val_p,
+ PVFS_dirent *dirent_array,
+ TROVE_keyval_s *key_array,
+ TROVE_keyval_s *val_array,
+ uint32_t *count,
+ uint32_t *match_count,
TROVE_ds_flags flags,
TROVE_vtag_s *vtag,
void *user_ptr,
@@ -350,7 +360,7 @@ static int dbpf_keyval_read_value(TROVE_
&op, &q_op_p,
KEYVAL_READ_VALUE,
coll_p,
- dirent_p->handle,
+ dirent_array[0].handle, // at least initial element will have a handle
dbpf_keyval_read_value_op_svc,
flags,
NULL,
@@ -371,10 +381,15 @@ static int dbpf_keyval_read_value(TROVE_
PINT_HINT_GET_OP_ID(hints));
/* initialize the op-specific members */
- op_p->u.v_read.dirent = dirent_p;
op_p->u.v_read.key = key_p;
op_p->u.v_read.val = val_p;
+ op_p->u.v_read.dirent_array = dirent_array;
+ op_p->u.v_read.key_array = key_array;
+ op_p->u.v_read.val_array = val_array;
+ op_p->u.v_read.count = count;
+ op_p->u.v_read.match_count = match_count;
op_p->u.v_read.position_p = position_p;
+ op_p->u.v_read.query_type = type;
op_p->hints = hints;
return dbpf_queue_or_service(op_p, q_op_p, coll_p, out_op_id_p,
@@ -383,12 +398,14 @@ static int dbpf_keyval_read_value(TROVE_
static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p)
{
- int ret = -TROVE_EINVAL, key_size=0;
+ int ret = -TROVE_EINVAL, lookup_key_sz=0, i=0, record_count=0;
+ uint32_t cursor_flags = 0, get_flags = 0;
struct dbpf_keyval_db_entry key_entry;
- void *key_data, *value_data;
+ void *lookup_key, *val_datum, *original_key;
TROVE_ds_position local_p = TROVE_ITERATE_START;
DBT key, data, pkey;
- DBC *dbc_p;
+ DBC *dbc_p=NULL, *dbcn_p=NULL, *query_p=NULL;
+ db_recno_t recno;
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
@@ -396,158 +413,353 @@ static int dbpf_keyval_read_value_op_svc
memset(&key_entry, 0, sizeof(key_entry));
/* size of key to lookup is length of the key and the value */
- key_size = strlen(op_p->u.v_read.key->buffer) + 1 +
- op_p->u.v_read.val->buffer_sz;
- if( (key_data = malloc( key_size )) == 0 )
+ lookup_key_sz = op_p->u.v_read.key->buffer_sz +
+ op_p->u.v_read.val->buffer_sz;
+
+ if( (lookup_key = malloc( DBPF_MAX_KEY_LENGTH * 2 )) == 0 )
{
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
"key_data failed.\n");
return -TROVE_ENOMEM;
}
- memset(key_data, 0, key_size );
- memcpy(key_data, op_p->u.v_read.key->buffer,
- strlen(op_p->u.v_read.key->buffer));
- memcpy((key_data+strlen(op_p->u.v_read.key->buffer)),
- op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
-
- if( (value_data = malloc(op_p->u.v_read.val->buffer_sz)) == 0)
- {
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
- " value_data failed.\n");
- free(key_data);
+
+ if( (original_key = malloc( DBPF_MAX_KEY_LENGTH * 2 )) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
+ "key_data failed.\n");
+ free(lookup_key);
+ return -TROVE_ENOMEM;
+ }
+ memset(lookup_key, 0, DBPF_MAX_KEY_LENGTH * 2 );
+ memset(original_key, 0, DBPF_MAX_KEY_LENGTH * 2 );
+
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: key buffer: %d\n",
+ op_p->u.v_read.key->buffer_sz);
+ /* only copy data into key if buffer is greater than 1 (null-string) */
+ if( op_p->u.v_read.key->buffer_sz > 1 )
+ {
+ memcpy(lookup_key, op_p->u.v_read.key->buffer,
+ op_p->u.v_read.key->buffer_sz);
+ if( op_p->u.v_read.val->buffer_sz > 1 )
+ {
+ /* copy at the end of the last buffer but over-write the null
+ * terminator */
+ memcpy((lookup_key+(op_p->u.v_read.key->buffer_sz-1)),
+ op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
+ }
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: lookup_key: [%s]\n", (char *)lookup_key );
+ }
+ else
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: returning, refusing to do empty lookup\n");
+ free(lookup_key);
+ free(original_key);
+ return -TROVE_EINVAL;
+ }
+ /* store the original lookup key based on key, val from v_read */
+ memcpy(original_key, lookup_key,
+ (op_p->u.v_read.key->buffer_sz + op_p->u.v_read.val->buffer_sz - 1) );
+
+ /* malloc for largest possible datum as 'value' portion of query may be
+ * partial */
+ if( (val_datum = malloc(DBPF_MAX_KEY_LENGTH)) == 0)
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
+ " val_datum failed.\n");
+ free(lookup_key);
+ free(original_key);
return -TROVE_ENOMEM;
}
- memset(value_data, 0, op_p->u.v_read.val->buffer_sz );
+ memset(val_datum, 0, DBPF_MAX_KEY_LENGTH );
- key.data = key_data;
- key.ulen = key.size = strlen(key_data)+1;
+ key.data = lookup_key;
+ key.ulen = (2 * DBPF_MAX_KEY_LENGTH);
+ key.size = lookup_key_sz - 1;
- data.data = value_data;
- data.size = data.ulen = op_p->u.v_read.val->buffer_sz;
+ data.data = val_datum;
+ data.size = data.ulen = DBPF_MAX_KEY_LENGTH;
pkey.data = &key_entry;
pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
+ /* store requested count number */
+ record_count = (*op_p->u.v_read.count);
+ (*op_p->u.v_read.count) = 0;
+ (*op_p->u.v_read.match_count) = 0;
+
/* duplicates in secondary index require use of cursor */
if( (op_p->coll_p->keyval_secondary_db->cursor(
op_p->coll_p->keyval_secondary_db, NULL, &dbc_p, 0)) != 0 )
{
gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
- "[KEYVAL]: Error getting cursor for keyval_secondary "
- "key=%*s: %s\n", op_p->u.v_read.key->buffer_sz,
- (char *)op_p->u.v_read.key->buffer, db_strerror(ret));
- free(key_data);
- free(value_data);
- return TROVE_EFAULT;
+ "[DBPF KEYVAL]: Error getting cursor for "
+ "keyval_secondary: %s\n", db_strerror(ret));
+ goto return_error;
+ ret = -TROVE_EFAULT;
+ }
+ query_p = dbc_p;
+
+ if( PVFS_KEYVAL_QUERY_MASK_QUERY(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_NORM )
+
+ {
+ /* if normalized query, open normalized cursor and set pointer */
+ if( (op_p->coll_p->keyval_secondary_norm_db->cursor(
+ op_p->coll_p->keyval_secondary_norm_db, NULL, &dbcn_p, 0)) != 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: Error getting cursor for "
+ "keyval_secondary_norm: %s\n", db_strerror(ret));
+ ret = -TROVE_EFAULT;
+ goto return_error;
+ }
+ query_p = dbcn_p;
}
+
gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
- "[KEYVAL]: Doing pget with: %s/(%d)(%d), "
- "pkey (%d)(%d), data %s/(%d)(%d), initial position: %llu\n",
+ "[DBPF KEYVAL]: Doing pget with key: %s/(%d)(%d), "
+ "pkey (%d)(%d), (%d)(%d), initial position: %llu on db"
+ "%s\n",
(char *)key.data, key.ulen, key.size, pkey.ulen, pkey.size,
- (char *)data.data, data.ulen, data.size,
- llu(*op_p->u.v_read.position_p));
-
- ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
- if( ret == DB_NOTFOUND ) /* no key at all */
+ data.ulen, data.size, llu(*op_p->u.v_read.position_p),
+ query_p->dbp->fname);
+
+ /* figure out query type and set once */
+ if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_LT) ||
+ (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_LE) ||
+ (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_PEQ) )
+ {
+ cursor_flags = DB_FIRST;
+ get_flags = DB_NEXT;
+ }
+ else if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_GT) ||
+ (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_GE) )
+ {
+ cursor_flags = DB_SET_RANGE;
+ get_flags = DB_NEXT;
+ }
+ else if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) ==
+ PVFS_KEYVAL_QUERY_NT) )
+ {
+ cursor_flags = DB_FIRST;
+ get_flags = DB_NEXT;
+ }
+ else
{
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
- "value: No matching keys found in secondary index\n");
+ cursor_flags = DB_SET;
+ get_flags = DB_NEXT_DUP;
+ }
+
+ /* do initial query to determine if any records exist */
+ ret = query_p->c_pget(query_p, &key, &pkey, &data, cursor_flags);
+ if( ret == DB_NOTFOUND ) /* no records matching request */
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+ "read_value: No matching keys found in secondary index\n");
(*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
ret = -dbpf_db_error_to_trove_error(ret);
goto return_error;
}
- else if( (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+
+ /* get number of data items the cursor refers to */
+ if( (ret = query_p->c_count(query_p, &recno, 0)) != 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+ "read_value: Error getting count of matches: %s\n",
+ db_strerror(ret) );
+ }
+ else
{
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+ "read_value: match count: %u\n",
+ recno );
+ *op_p->u.v_read.match_count = recno;
+ }
+
+ if( (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+ { /* if request came with position other than start, whip through them */
local_p = 0;
while( (ret == 0) && (local_p < (*op_p->u.v_read.position_p)) )
{
- ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+ ret = query_p->c_pget(query_p, &key, &pkey, &data, get_flags);
+ if( ret == DB_NOTFOUND )
+ {
+ memset( op_p->u.v_read.key_array[i].buffer, 0, pkey.size);
+ op_p->u.v_read.key_array[i].buffer_sz = 0;
+ memset( op_p->u.v_read.val_array[i].buffer, 0, data.size);
+ op_p->u.v_read.val_array[i].buffer_sz = 0;
+ memset( &(op_p->u.v_read.dirent_array[i]), 0,
+ sizeof( PVFS_dirent ) );
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: dbpf_keyval_read_value: can't "
+ "iterate to requested position\n" );
+ *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+ }
local_p++;
}
}
- if( (ret != 0) && (ret != DB_NOTFOUND) ) /* failure other than not found */
+ if( (ret != 0) && (ret != DB_NOTFOUND) )
{
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
- "value: pget error in secondary index: %s\n",
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+ "read_value: pget error in secondary index: %s\n",
db_strerror(ret));
(*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
ret = -dbpf_db_error_to_trove_error(ret);
goto return_error;
}
- else if( ((local_p) == (*op_p->u.v_read.position_p)) && ret == 0 )
- {
- /* got record for requested position, also handles first record */
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
- "[KEYVAL]: dbpf_keyval_read_value: Found key %llu/%s -> "
- "%s\n", llu(key_entry.handle), key_entry.key,
- (char *)data.data);
-
- /* the buffers that values are copied to must be big enough, passed in
- * pointers have buffers set to max allowable size. */
- assert(op_p->u.v_read.key->buffer_sz >= pkey.size);
- assert(op_p->u.v_read.val->buffer_sz >= data.size);
-
- memcpy(op_p->u.v_read.key->buffer, pkey.data, pkey.size);
- op_p->u.v_read.key->read_sz = pkey.size;
- memcpy(op_p->u.v_read.val->buffer, data.data, data.size);
- op_p->u.v_read.val->read_sz = data.size;
- op_p->u.v_read.dirent->handle = key_entry.handle;
- if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+ /* cursor is now in position to return requested number of records */
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: dbpf_keyval_read_value: Cursor at key "
+ "%llu/%s -> %s\n", llu(key_entry.handle), key_entry.key,
+ (char *)data.data);
+
+ if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+ {
+ (*op_p->u.v_read.position_p) = 0;
+ }
+
+ /* the buffers that values are copied to must be big enough, passed in
+ * pointers have buffers set to max allowable size. */
+ while(
+ ( (*op_p->u.v_read.count) < record_count ) &&
+ ( (*op_p->u.v_read.position_p) != TROVE_ITERATE_END ) &&
+ ( ret == 0 )
+ )
+ {
+ ret = dbpf_result_iterate_selector( original_key, key.data,
+ op_p->u.v_read.query_type);
+
+ if( ret == 0 ) /* should include record in return set */
+ {
+ memcpy(op_p->u.v_read.key_array[(*op_p->u.v_read.count)].buffer,
+ pkey.data, pkey.size);
+ op_p->u.v_read.key_array[(*op_p->u.v_read.count)].read_sz =
+ pkey.size;
+ memcpy(op_p->u.v_read.val_array[(*op_p->u.v_read.count)].buffer,
+ data.data, data.size);
+ op_p->u.v_read.val_array[(*op_p->u.v_read.count)].read_sz =
+ data.size;
+ op_p->u.v_read.dirent_array[(*op_p->u.v_read.count)].handle =
+ key_entry.handle;
+
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: dbpf_keyval_read_value: storing "
+ "count: %u, handle: %llu, key: %s, value: %s\n",
+ (*op_p->u.v_read.count),
+ llu(op_p->u.v_read.dirent_array[
+ (*op_p->u.v_read.count)].handle),
+ (char *) (op_p->u.v_read.key_array[
+ (*op_p->u.v_read.count)].buffer+sizeof(PVFS_handle)),
+ (char *)op_p->u.v_read.val_array[
+ (*op_p->u.v_read.count)].buffer);
+
+ (*op_p->u.v_read.count)++;
+ }
+ else if( ret == -1 ) /* end of what we need to add */
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF_KEYVAL]: dbpf_keyval_read_value: comp "
+ "function breaking on %s\n", (char *)key.data);
+ *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+ break;
+ }
+ /* otherwise, it's likely junk (handle as attr) so iterate by it */
+
+ (*op_p->u.v_read.position_p)++;
+
+ if( get_flags == DB_NEXT )
{
- (*op_p->u.v_read.position_p) = 1;
+ memset(key.data, 0, 2 * DBPF_MAX_KEY_LENGTH);
+ key.size = 2 * DBPF_MAX_KEY_LENGTH;
}
else
{
- (*op_p->u.v_read.position_p)++;
+ key.size = lookup_key_sz - 1;
}
- /* check if another key exists to prevent an additional call to find
- * the end. if the cursor ever stays open we'll need to return
- * current above, not next */
- ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
- if( ret == DB_NOTFOUND )
+ key.ulen = (2 * DBPF_MAX_KEY_LENGTH);
+ key.size = lookup_key_sz - 1;
+ data.ulen = data.size = DBPF_MAX_KEY_LENGTH;
+ pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
+ key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
+ memset(data.data, 0, DBPF_MAX_KEY_LENGTH);
+ memset(pkey.data, 0, sizeof( struct dbpf_keyval_db_entry ));
+
+ /* if just iterating, clear out the key too */
+ if( get_flags == DB_NEXT )
{
- (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: pre-empting end"
- " iterator\n");
+ memset(key.data, 0, 2 * DBPF_MAX_KEY_LENGTH);
+ key.size = 2 * DBPF_MAX_KEY_LENGTH;
}
- ret = dbpf_build_path_of_handle( dbc_p, op_p->u.v_read.dirent->d_name,
- op_p->coll_p->coll_id, op_p->u.v_read.dirent->handle );
-
- if( ret != 0 )
+ ret = query_p->c_pget(query_p, &key, &pkey, &data, get_flags);
+ if( ret == DB_NOTFOUND )
{
- goto return_error;
+ /* trying to get next record ran us out of records, mark the
+ * end, we're out */
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: dbpf_keyval_read_value: reached "
+ "end of records before filling count. "
+ "%d / %d records\n", (*op_p->u.v_read.count),
+ record_count);
+ *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+ }
+ else if( ret != 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: dbpf_keyval_read_value: BDB error "
+ "before filling count: %d / %d records: %s\n",
+ i, *op_p->u.v_read.count, db_strerror(ret));
}
}
- else
+
+ /* have to build the path after finding matching values because the
+ * cursor position gets whacked when building the path */
+ for( i = 0; i < (*op_p->u.v_read.count); i++ )
{
- /* didn't find the record we wanted, but not for first position
- * (handle above) so don't return error, just set the end marker */
- memset( op_p->u.v_read.key->buffer, 0, pkey.size);
- op_p->u.v_read.key->buffer_sz = 0;
- memset( op_p->u.v_read.val->buffer, 0, data.size);
- op_p->u.v_read.val->buffer_sz = 0;
- memset( op_p->u.v_read.dirent, 0, sizeof( PVFS_dirent ) );
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
- "value: reached end of cursor with no record to give\n");
- *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+ /* build path of read handle, use un-normalized associated db */
+ ret = dbpf_build_path_of_handle( dbc_p,
+ op_p->u.v_read.dirent_array[i].d_name,
+ op_p->coll_p->coll_id, op_p->u.v_read.dirent_array[i].handle );
}
- dbc_p->c_close(dbc_p);
- free(key_data);
- free(value_data);
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF_KEYVAL]: dbpf_keyval_read_value: exiting: "
+ "token (%llu)\n", llu(*op_p->u.v_read.position_p));
+ if( dbcn_p != NULL )
+ {
+ dbcn_p->c_close(dbcn_p);
+ }
+ if( dbc_p != NULL )
+ {
+ dbc_p->c_close(dbc_p);
+ }
+ free(lookup_key);
+ free(original_key);
+ free(val_datum);
+
return 1;
return_error:
+ if( dbcn_p != NULL )
+ {
+ dbcn_p->c_close(dbcn_p);
+ }
dbc_p->c_close(dbc_p);
- free(key_data);
- free(value_data);
+ free(lookup_key);
+ free(val_datum);
return ret;
}
@@ -2334,6 +2546,125 @@ static int dbpf_keyval_handle_info_ops(s
return 0;
}
+/* return 0 or 1 if a is part of the result set for b and query */
+static int dbpf_result_iterate_selector(char *a, char *b,
+ uint32_t query)
+{
+
+ int max_len = (strlen(a)>strlen(b)?strlen(a):strlen(b));
+ if( strncmp(b, "user.", 5) != 0 )
+ {
+ /* if key doesn't begin with user. it's not a valid attribute
+ * if less than, just don't include it. if it's greater we're done */
+ if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LT ||
+ PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LE ||
+ PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_NT )
+ {
+ return 1;
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GT ||
+ PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GE ||
+ PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_EQ ||
+ PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_PEQ )
+ {
+ return -1;
+ }
+ }
+
+ if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LT )
+ {
+ if( memcmp( b, a, max_len ) < 0 )
+ {
+ return 0;
+ }
+ else
+ { /* time to stop, we've passed the keys */
+ return -1;
+ }
+
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LE )
+ {
+ if( memcmp( b, a, max_len) <= 0 )
+ {
+ return 0;
+ }
+ else
+ { /* time to stop, we've passed the keys */
+ return -1;
+ }
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_EQ )
+ {
+ if( memcmp( b, a, max_len) == 0 )
+ {
+ return 0;
+ }
+ else
+ { /* should only see equal keys in here */
+ return -1;
+ }
+
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_PEQ )
+ {
+ if( memcmp( b, a, strlen(a)) == 0 )
+ {
+ return 0;
+ }
+ else if( memcmp( b, a, strlen(a) ) > 0 )
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GE )
+ {
+ if( memcmp( b, a, max_len) >= 0 )
+ {
+ return 0;
+ }
+ else
+ { /* something funny (or a bug) happened*/
+ return -1;
+ }
+
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GT )
+ {
+ /* will be called starting with equal keys */
+ if( memcmp( b, a, max_len) == 0 )
+ {
+ return 1;
+ }
+ else if( memcmp( b, a, max_len) > 0 )
+ {
+ return 0;
+ }
+ else
+ { /* something funny (or a bug) happened*/
+ return -1;
+ }
+
+ }
+ else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_NT )
+ {
+ if( memcmp( b, a, max_len) != 0 )
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ return 1;
+}
+
static int dbpf_build_path_of_handle( DBC *dbc_p,
char *path,
TROVE_coll_id coll_id,
@@ -2400,16 +2731,16 @@ static int dbpf_build_path_of_handle( DB
if( key_entry.handle == root_h )
{
gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
- "[KEYVAL]: Built path (%llu): %s\n",
- llu(key_entry.handle), path);
+ "[DBPF KEYVAL]: Built path (%s) for (%llu)\n",
+ path, llu(handle));
break;
}
}
else
{
gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
- "[KEYVAL]: Failed finding parent handle for: "
- "%llu: ulen: %d, size: %d, %s\n",
+ "[DBPF KEYVAL]: Failed finding parent handle for: "
+ "handle %llu, ulen: %d, size: %d, %s\n",
llu( key_entry.handle), data.ulen, data.size,
db_strerror(ret));
ret = -dbpf_db_error_to_trove_error(ret);
@@ -2433,28 +2764,33 @@ int PINT_trove_dbpf_keyval_secondary_cal
/* for attributes prefixed with user create a secondary key of the form
* <attribute><value> */
- if( (memcmp(k->key, "user.", 5) == 0) )
+ if( ( pkey->size > ((sizeof(PVFS_handle) + strlen("user."))) ) &&
+ ( memcmp(k->key, "user.", 5) == 0) )
{
/* size of new key is length of the attribute plus length of value */
- if( (key_data = malloc(strlen(k->key) + (pdata->size)) ) == 0 )
+ if( (key_data = malloc(strlen(k->key)+strlen(pdata->data) + 1) ) == 0 )
{
gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
"[DBPF KEYVAL]: malloc for secondary_callback "
"for new attribute/value key failed.\n");
return TROVE_ENOMEM;
}
- memset(key_data, 0, (strlen(k->key) + (pdata->size)));
+ memset(key_data, 0, (strlen(k->key) + strlen(pdata->data)+1));
/* copy attribute to start of key */
memcpy(key_data, k->key, strlen(k->key) );
/* copy value directly after key */
- memcpy((key_data + strlen(k->key)), pdata->data, pdata->size);
- skey->ulen = skey->size = strlen(k->key) + pdata->size;
+ memcpy((key_data + strlen(k->key)), pdata->data, strlen(pdata->data));
+ skey->ulen = skey->size = strlen(key_data) + 1;
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
- "INDEX (%s) (%d) -> (%s) (%d)\n",
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: CREATING "
+ "SECONDARY INDEX (%s) (%d) -> "
+ "[(%llu)(%s) (%d)]:[(%s) (%d)]\n",
(char *)key_data, skey->size,
+ llu(k->handle),
+ (char *)k->key,
+ pkey->size,
(char *)pdata->data, pdata->size);
}
else if((pdata->size == sizeof(TROVE_handle)) && (strcmp("dh", k->key)!=0))
@@ -2473,7 +2809,8 @@ int PINT_trove_dbpf_keyval_secondary_cal
memcpy(key_data, pdata->data, pdata->size );
memcpy(&h, pdata->data, pdata->size);
/* copy attribute to start of key */
- gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: CREATING SECONDARY "
"INDEX (%llu) (%d) -> (%llu) (%d)\n",
llu(h), pdata->size, llu(h), pdata->size);
skey->ulen = skey->size = pdata->size;
@@ -2488,6 +2825,61 @@ int PINT_trove_dbpf_keyval_secondary_cal
return 0;
}
+ /* constructs secondary key for keyval_secondary_norm db. the value of the
+ * primary data is returned. */
+int PINT_trove_dbpf_keyval_secondary_norm_callback(
+ DB *secondary_norm, const DBT *pkey, const DBT *pdata, DBT *skey)
+{
+ struct dbpf_keyval_db_entry *k;
+ char *key_data;
+ int i = 0;
+
+ memset( skey, 0, sizeof(DBT));
+ k = (struct dbpf_keyval_db_entry *)pkey->data;
+
+ /* for attributes prefixed with user create a secondary key normalized
+ * of the form <attribute><value> */
+ if( ( pkey->size > ((sizeof(PVFS_handle) + strlen("user."))) ) &&
+ ( memcmp(k->key, "user.", 5) == 0) )
+ {
+ /* size of new key is length of the attribute plus length of value */
+ if( (key_data = malloc(strlen(k->key)+strlen(pdata->data)+1) ) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[DBPF KEYVAL]: malloc for secondary_callback "
+ "for new attribute/value key failed.\n");
+ return TROVE_ENOMEM;
+ }
+ memset(key_data, 0, (strlen(k->key) + strlen(pdata->data)+1));
+
+ for( i = 0; i < strlen(k->key); i++ )
+ {
+ key_data[i] = tolower( k->key[i] );
+ }
+
+ for( i = 0; i < strlen(pdata->data); i++ )
+ {
+ key_data[i+strlen(k->key)] = tolower( ((char *)pdata->data)[i] );
+ }
+ skey->ulen = skey->size = strlen(key_data) + 1;
+
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: CREATING "
+ "SECONDARY NORM INDEX (%s) (%d) -> "
+ "[(%llu)(%s) (%d)]:[(%s) (%d)]\n",
+ (char *)key_data, skey->size, llu(k->handle),
+ (char *)k->key, pkey->size,
+ (char *)pdata->data, pdata->size);
+ }
+ else
+ {
+ return DB_DONOTINDEX;
+ }
+
+ skey->data = key_data;
+ skey->flags = DB_DBT_APPMALLOC;
+ return 0;
+}
+
int PINT_trove_dbpf_keyval_compare(
DB * dbp, const DBT * a, const DBT * b)
@@ -2497,6 +2889,61 @@ int PINT_trove_dbpf_keyval_compare(
db_entry_a = (const struct dbpf_keyval_db_entry *) a->data;
db_entry_b = (const struct dbpf_keyval_db_entry *) b->data;
+
+ if(db_entry_a->handle != db_entry_b->handle)
+ {
+ return (db_entry_a->handle < db_entry_b->handle) ? -1 : 1;
+ }
+
+ if(a->size > b->size)
+ {
+ return 1;
+ }
+
+ if(a->size < b->size)
+ {
+ return -1;
+ }
+
+ /* must be equal */
+ return (memcmp(db_entry_a->key, db_entry_b->key,
+ DBPF_KEYVAL_DB_ENTRY_KEY_SIZE(a->size)));
+}
+
+int PINT_trove_dbpf_keyval_secondary_compare(
+ DB * dbp, const DBT * a, const DBT * b)
+{
+ const struct dbpf_keyval_db_entry * db_entry_a;
+ const struct dbpf_keyval_db_entry * db_entry_b;
+
+ db_entry_a = (const struct dbpf_keyval_db_entry *) a->data;
+ db_entry_b = (const struct dbpf_keyval_db_entry *) b->data;
+
+ if( a->size > 5 && b->size > 5 )
+ {
+ if( strncmp(a->data, "user.", 5) == 0 )
+ {
+ if( strncmp(b->data, "user.", 5) == 0 )
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+ "[KEYVAL]: comparing two user. strings: [%s]:[%s] "
+ "strcmp says: %d\n", (char *)a->data,
+ (char *)b->data, strcmp(a->data, b->data));
+ return strcoll(a->data, b->data); /* lexical comparison */
+ }
+ else
+ {
+ return -1; /* a is an attr, b is not (a is less) */
+ }
+ }
+ else
+ {
+ if( strncmp(b->data, "user.", 5) == 0 )
+ {
+ return 1; /* b is an attr, a is not (b is greater) */
+ }
+ }
+ }
if(db_entry_a->handle != db_entry_b->handle)
{
Index: dbpf-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-mgmt.c,v
diff -p -u -r1.109.24.1 -r1.109.24.2
--- dbpf-mgmt.c 14 Jul 2009 17:19:42 -0000 1.109.24.1
+++ dbpf-mgmt.c 10 Aug 2009 15:30:09 -0000 1.109.24.2
@@ -1157,6 +1157,23 @@ int dbpf_collection_create(char *collnam
}
}
+ DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX, sto_p->name,
+ new_coll_id);
+ ret = stat(path_name, &dbstat);
+ if(ret < 0 && errno != ENOENT)
+ {
+ gossip_err("failed to stat keyval_secondary_norm db: %s\n", path_name);
+ return -trove_errno_to_trove_error(errno);
+ }
+ if(ret < 0)
+ {
+ ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+ if (ret != 0)
+ {
+ gossip_err("dbpf_db_create failed on %s\n", path_name);
+ return ret;
+ }
+ }
DBPF_GET_BSTREAM_DIRNAME(path_name, PATH_MAX, sto_p->name, new_coll_id);
ret = mkdir(path_name, 0755);
@@ -1253,6 +1270,7 @@ int dbpf_collection_remove(char *collnam
/* Clean up properly by closing all db handles */
db_close(db_collection->coll_attr_db);
db_close(db_collection->ds_db);
+ db_close(db_collection->keyval_secondary_norm_db);
db_close(db_collection->keyval_secondary_db);
db_close(db_collection->keyval_db);
/* so that environment can also be cleaned up */
@@ -1288,6 +1306,14 @@ int dbpf_collection_remove(char *collnam
ret = -trove_errno_to_trove_error(errno);
}
+ DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX,
+ sto_p->name, db_data.coll_id);
+ if(unlink(path_name) != 0)
+ {
+ gossip_err("failure removing keyval secondary norm db\n");
+ ret = -trove_errno_to_trove_error(errno);
+ }
+
DBPF_GET_COLL_ATTRIB_DBNAME(path_name, PATH_MAX,
sto_p->name, db_data.coll_id);
if (unlink(path_name) != 0)
@@ -1612,6 +1638,18 @@ int dbpf_collection_clear(TROVE_coll_id
db_strerror(ret));
}
+ if ((ret = coll_p->keyval_secondary_norm_db->sync(coll_p->keyval_secondary_norm_db, 0)) != 0)
+ {
+ gossip_err("db_sync(coll_keyval_secondary_norm_db): %s\n",
+ db_strerror(ret));
+ }
+
+ if ((ret = db_close(coll_p->keyval_secondary_norm_db)) != 0)
+ {
+ gossip_lerr("db_close(coll_keyval_secondary_norm_db): %s\n",
+ db_strerror(ret));
+ }
+
if ((ret = coll_p->keyval_db->sync(coll_p->keyval_db, 0)) != 0)
{
gossip_err("db_sync(coll_keyval_db): %s\n", db_strerror(ret));
@@ -1917,7 +1955,8 @@ int dbpf_collection_lookup(char *collnam
/* secondary database file already exists, try to open */
coll_p->keyval_secondary_db = dbpf_db_open(sto_p->name, path_name,
- coll_p->coll_env, &ret, NULL, (DB_DUP|DB_DUPSORT) );
+ coll_p->coll_env, &ret, PINT_trove_dbpf_keyval_secondary_compare,
+ (DB_DUP|DB_DUPSORT) );
/* TODO: add check to ensure BDB thinks secondary index is consistent */
if(coll_p->keyval_secondary_db == NULL)
{
@@ -1937,6 +1976,63 @@ int dbpf_collection_lookup(char *collnam
PINT_trove_dbpf_keyval_secondary_callback,DB_CREATE);
if( ret != 0 )
{
+ db_close(coll_p->keyval_secondary_db);
+ db_close(coll_p->keyval_db);
+ db_close(coll_p->coll_attr_db);
+ db_close(coll_p->ds_db);
+ dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+ free(coll_p->path_name);
+ free(coll_p->name);
+ free(coll_p);
+ return ret;
+ }
+
+ DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX,
+ sto_p->name, coll_p->coll_id);
+ /* if secondary normalized index doesn't exist, just re-create it */
+ ret = stat(path_name, &dbstat);
+ if(ret < 0 && errno != ENOENT)
+ {
+ gossip_err("failed to stat keyval_secondary_norm db: %s\n", path_name);
+ return -trove_errno_to_trove_error(errno);
+ }
+ if(ret < 0)
+ {
+ gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: Recreating secondary "
+ "normalized index.\n");
+ ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+ if (ret != 0)
+ {
+ gossip_err("dbpf_db_create failed on %s\n", path_name);
+ return ret;
+ }
+ }
+
+ /* secondary normalized database file already exists, try to open */
+ coll_p->keyval_secondary_norm_db = dbpf_db_open(sto_p->name, path_name,
+ coll_p->coll_env, &ret, PINT_trove_dbpf_keyval_secondary_compare,
+ (DB_DUP|DB_DUPSORT) );
+ /* TODO: add check to ensure BDB thinks secondary index is consistent */
+ if(coll_p->keyval_secondary_norm_db == NULL)
+ {
+ db_close(coll_p->keyval_secondary_db);
+ db_close(coll_p->keyval_db);
+ db_close(coll_p->coll_attr_db);
+ db_close(coll_p->ds_db);
+ dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+ free(coll_p->path_name);
+ free(coll_p->name);
+ free(coll_p);
+ return ret;
+ }
+
+ /* associate the secondary index db with the primary and generate keys
+ * if it's empty */
+ ret = dbpf_db_associate(coll_p->keyval_db, coll_p->keyval_secondary_norm_db,
+ PINT_trove_dbpf_keyval_secondary_norm_callback,DB_CREATE);
+ if( ret != 0 )
+ {
+ db_close(coll_p->keyval_secondary_norm_db);
db_close(coll_p->keyval_secondary_db);
db_close(coll_p->keyval_db);
db_close(coll_p->coll_attr_db);
Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.96.24.1 -r1.96.24.2
--- dbpf.h 14 Jul 2009 17:19:42 -0000 1.96.24.1
+++ dbpf.h 10 Aug 2009 15:30:09 -0000 1.96.24.2
@@ -143,12 +143,21 @@ do {
} while (0)
#define KEYVAL_SECONDARY_DBNAME "keyval_secondary.db"
-#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname,__collid) \
+#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname, \
+__collid) \
do { \
snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid, \
KEYVAL_SECONDARY_DBNAME); \
} while (0)
+#define KEYVAL_SECONDARY_NORM_DBNAME "keyval_secondary_norm.db"
+#define DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(__buf,__path_max,__stoname,\
+__collid) \
+do { \
+ snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid, \
+ KEYVAL_SECONDARY_NORM_DBNAME); \
+} while (0)
+
inline int dbpf_pread(int fd, void *buf, size_t count, off_t offset);
inline int dbpf_pwrite(int fd, const void *buf, size_t count, off_t offset);
@@ -218,6 +227,7 @@ struct dbpf_collection
DB *ds_db;
DB *keyval_db;
DB *keyval_secondary_db;
+ DB *keyval_secondary_norm_db;
DB_ENV *coll_env;
TROVE_coll_id coll_id;
TROVE_handle root_dir_handle;
@@ -251,8 +261,12 @@ struct dbpf_collection_db_entry
int PINT_trove_dbpf_keyval_compare(
DB * dbp, const DBT * a, const DBT * b);
+int PINT_trove_dbpf_keyval_secondary_compare(
+ DB * dbp, const DBT * a, const DBT * b);
int PINT_trove_dbpf_keyval_secondary_callback(
DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey);
+int PINT_trove_dbpf_keyval_secondary_norm_callback(
+ DB *secondary_norm, const DBT *pkey, const DBT *pdata, DBT *skey);
int PINT_trove_dbpf_ds_attr_compare(
DB * dbp, const DBT * a, const DBT * b);
int PINT_trove_dbpf_ds_attr_compare_reversed(
@@ -386,10 +400,15 @@ struct dbpf_keyval_iterate_keys_op
struct dbpf_keyval_read_value_op
{
- PVFS_dirent *dirent;
PVFS_ds_keyval *key;
PVFS_ds_keyval *val;
+ PVFS_dirent *dirent_array;
+ PVFS_ds_keyval *key_array;
+ PVFS_ds_keyval *val_array;
+ uint32_t *count;
+ uint32_t *match_count;
TROVE_ds_position *position_p;
+ uint32_t query_type;
/* vtag? */
};
More information about the Pvfs2-cvs
mailing list