[Pvfs2-cvs] commit by mtmoore in pvfs2/src/io/trove/trove-dbpf: dbpf-keyval.c dbpf-mgmt.c dbpf.h

CVS commit program cvs at parl.clemson.edu
Mon Aug 10 11:30:10 EDT 2009


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv10659/pvfs2/src/io/trove/trove-dbpf

Modified Files:
      Tag: Orange-mtmoore
	dbpf-keyval.c dbpf-mgmt.c dbpf.h 
Log Message:
Merge range query changes and other fixes


Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.94.24.1 -r1.94.24.2
--- dbpf-keyval.c	14 Jul 2009 17:19:42 -0000	1.94.24.1
+++ dbpf-keyval.c	10 Aug 2009 15:30:09 -0000	1.94.24.2
@@ -24,6 +24,7 @@
 #include <db.h>
 #include <time.h>
 #include <stdlib.h>
+#include <ctype.h>
 #ifdef HAVE_MALLOC_H
 #include <malloc.h>
 #endif
@@ -146,6 +147,10 @@ enum dbpf_handle_info_action
 
 static int dbpf_keyval_handle_info_ops(struct dbpf_op * op_p,
                                        enum dbpf_handle_info_action action);
+
+static int dbpf_result_iterate_selector(char *a, char *b, 
+                                        uint32_t query);
+
 static int dbpf_build_path_of_handle(DBC *dbc_p, char *path, 
                                      TROVE_coll_id coll_id, 
                                      TROVE_handle handle);
@@ -322,9 +327,14 @@ return_error:
 
 static int dbpf_keyval_read_value(TROVE_coll_id coll_id,
                             TROVE_ds_position *position_p,
-                            PVFS_dirent *dirent_p,
+                            uint32_t type,
                             TROVE_keyval_s *key_p,
                             TROVE_keyval_s *val_p,
+                            PVFS_dirent *dirent_array,
+                            TROVE_keyval_s *key_array,
+                            TROVE_keyval_s *val_array,
+                            uint32_t *count,
+                            uint32_t *match_count,
                             TROVE_ds_flags flags,
                             TROVE_vtag_s *vtag,
                             void *user_ptr,
@@ -350,7 +360,7 @@ static int dbpf_keyval_read_value(TROVE_
         &op, &q_op_p,
         KEYVAL_READ_VALUE,
         coll_p,
-        dirent_p->handle,
+        dirent_array[0].handle, // at least initial element will have a handle
         dbpf_keyval_read_value_op_svc,
         flags,
         NULL,
@@ -371,10 +381,15 @@ static int dbpf_keyval_read_value(TROVE_
                      PINT_HINT_GET_OP_ID(hints));
 
     /* initialize the op-specific members */
-    op_p->u.v_read.dirent = dirent_p;
     op_p->u.v_read.key = key_p;
     op_p->u.v_read.val = val_p;
+    op_p->u.v_read.dirent_array = dirent_array;
+    op_p->u.v_read.key_array = key_array;
+    op_p->u.v_read.val_array = val_array;
+    op_p->u.v_read.count = count;
+    op_p->u.v_read.match_count = match_count;
     op_p->u.v_read.position_p = position_p;
+    op_p->u.v_read.query_type = type;
     op_p->hints = hints;
 
     return dbpf_queue_or_service(op_p, q_op_p, coll_p, out_op_id_p,
@@ -383,12 +398,14 @@ static int dbpf_keyval_read_value(TROVE_
 
 static int dbpf_keyval_read_value_op_svc(struct dbpf_op *op_p)
 {
-    int ret = -TROVE_EINVAL, key_size=0;
+    int ret = -TROVE_EINVAL, lookup_key_sz=0, i=0, record_count=0;
+    uint32_t cursor_flags = 0, get_flags = 0;
     struct dbpf_keyval_db_entry key_entry;
-    void *key_data, *value_data;
+    void *lookup_key, *val_datum, *original_key;
     TROVE_ds_position local_p = TROVE_ITERATE_START;
     DBT key, data, pkey;
-    DBC *dbc_p;
+    DBC *dbc_p=NULL, *dbcn_p=NULL, *query_p=NULL;
+    db_recno_t recno;
 
     memset(&key, 0, sizeof(key));
     memset(&data, 0, sizeof(data));
@@ -396,158 +413,353 @@ static int dbpf_keyval_read_value_op_svc
     memset(&key_entry, 0, sizeof(key_entry));
 
     /* size of key to lookup is length of the key and the value */
-    key_size = strlen(op_p->u.v_read.key->buffer) + 1 +
-        op_p->u.v_read.val->buffer_sz;
-    if( (key_data = malloc( key_size )) == 0 )
+    lookup_key_sz = op_p->u.v_read.key->buffer_sz + 
+                    op_p->u.v_read.val->buffer_sz;
+
+    if( (lookup_key = malloc( DBPF_MAX_KEY_LENGTH * 2 )) == 0 )
     { 
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
                      "key_data failed.\n");
         return -TROVE_ENOMEM;
     }
-    memset(key_data, 0, key_size );
-    memcpy(key_data, op_p->u.v_read.key->buffer, 
-       strlen(op_p->u.v_read.key->buffer));
-    memcpy((key_data+strlen(op_p->u.v_read.key->buffer)), 
-        op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
-
-    if( (value_data = malloc(op_p->u.v_read.val->buffer_sz)) == 0)
-    {
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: malloc for "
-                   " value_data failed.\n");
-        free(key_data);
+
+    if( (original_key = malloc( DBPF_MAX_KEY_LENGTH * 2 )) == 0 )
+    { 
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
+                     "key_data failed.\n");
+        free(lookup_key);
+        return -TROVE_ENOMEM;
+    }
+    memset(lookup_key, 0, DBPF_MAX_KEY_LENGTH * 2 );
+    memset(original_key, 0, DBPF_MAX_KEY_LENGTH * 2 );
+
+    gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: key buffer: %d\n",
+                 op_p->u.v_read.key->buffer_sz);
+    /* only copy  data into key if buffer is greater than 1 (null-string) */
+    if( op_p->u.v_read.key->buffer_sz > 1 )
+    { 
+        memcpy(lookup_key, op_p->u.v_read.key->buffer, 
+            op_p->u.v_read.key->buffer_sz);
+        if( op_p->u.v_read.val->buffer_sz > 1 )
+        {
+            /* copy at the end of the last buffer but over-write the null 
+             * terminator */
+            memcpy((lookup_key+(op_p->u.v_read.key->buffer_sz-1)), 
+                op_p->u.v_read.val->buffer, op_p->u.v_read.val->buffer_sz);
+        }
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                     "[DBPF KEYVAL]: lookup_key: [%s]\n", (char *)lookup_key );
+    }
+    else
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                     "[DBPF KEYVAL]: returning, refusing to do empty lookup\n");
+        free(lookup_key);
+        free(original_key);
+        return -TROVE_EINVAL;
+    }
+    /* store the original lookup key based on key, val from v_read */
+    memcpy(original_key, lookup_key, 
+        (op_p->u.v_read.key->buffer_sz + op_p->u.v_read.val->buffer_sz - 1) );
+
+    /* malloc for largest possible datum as 'value' portion of query may be
+     * partial */
+    if( (val_datum = malloc(DBPF_MAX_KEY_LENGTH)) == 0)
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: malloc for "
+                   " val_datum failed.\n");
+        free(lookup_key);
+        free(original_key);
         return -TROVE_ENOMEM;
     
     }
-    memset(value_data, 0, op_p->u.v_read.val->buffer_sz );
+    memset(val_datum, 0, DBPF_MAX_KEY_LENGTH );
 
-    key.data = key_data;
-    key.ulen = key.size = strlen(key_data)+1;
+    key.data = lookup_key;
+    key.ulen = (2 * DBPF_MAX_KEY_LENGTH);
+    key.size = lookup_key_sz - 1;
 
-    data.data = value_data;
-    data.size = data.ulen = op_p->u.v_read.val->buffer_sz; 
+    data.data = val_datum;
+    data.size = data.ulen = DBPF_MAX_KEY_LENGTH; 
 
     pkey.data = &key_entry;
     pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
     key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
 
+    /* store requested count number */
+    record_count = (*op_p->u.v_read.count);
+    (*op_p->u.v_read.count) = 0; 
+    (*op_p->u.v_read.match_count) = 0; 
+
     /* duplicates in secondary index require use of cursor */
     if( (op_p->coll_p->keyval_secondary_db->cursor(
          op_p->coll_p->keyval_secondary_db, NULL, &dbc_p, 0)) != 0 )
     {
         gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
-                     "[KEYVAL]: Error getting cursor for keyval_secondary "
-                     "key=%*s: %s\n", op_p->u.v_read.key->buffer_sz, 
-                     (char *)op_p->u.v_read.key->buffer, db_strerror(ret));
-        free(key_data);
-        free(value_data);
-        return TROVE_EFAULT;
+                     "[DBPF KEYVAL]: Error getting cursor for "
+                     "keyval_secondary: %s\n", db_strerror(ret));
+        goto return_error;
+        ret = -TROVE_EFAULT;
+    }
+    query_p = dbc_p;
+
+    if( PVFS_KEYVAL_QUERY_MASK_QUERY(op_p->u.v_read.query_type) ==
+        PVFS_KEYVAL_QUERY_NORM )
+
+    {
+        /* if normalized query, open normalized cursor and set pointer */
+        if( (op_p->coll_p->keyval_secondary_norm_db->cursor(
+           op_p->coll_p->keyval_secondary_norm_db, NULL, &dbcn_p, 0)) != 0 )
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                         "[DBPF KEYVAL]: Error getting cursor for "
+                         "keyval_secondary_norm: %s\n", db_strerror(ret));
+            ret = -TROVE_EFAULT;
+            goto return_error;
+        }
+        query_p = dbcn_p;
     }
 
+
     gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
-                 "[KEYVAL]: Doing pget with: %s/(%d)(%d), "
-                 "pkey (%d)(%d), data %s/(%d)(%d), initial position: %llu\n", 
+                 "[DBPF KEYVAL]: Doing pget with key: %s/(%d)(%d), "
+                 "pkey (%d)(%d), (%d)(%d), initial position: %llu on db"
+                 "%s\n", 
                  (char *)key.data, key.ulen, key.size, pkey.ulen, pkey.size,
-                 (char *)data.data, data.ulen, data.size,
-                 llu(*op_p->u.v_read.position_p));
-    
-    ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_SET);
-    if( ret == DB_NOTFOUND ) /* no key at all */
+                 data.ulen, data.size, llu(*op_p->u.v_read.position_p),
+                 query_p->dbp->fname);
+   
+    /* figure out query type and set once */
+    if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+            PVFS_KEYVAL_QUERY_LT) ||
+        (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+            PVFS_KEYVAL_QUERY_LE) || 
+        (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+            PVFS_KEYVAL_QUERY_PEQ) )
+    {
+        cursor_flags = DB_FIRST;
+        get_flags = DB_NEXT;
+    }
+    else if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+                PVFS_KEYVAL_QUERY_GT) || 
+             (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+                PVFS_KEYVAL_QUERY_GE) )
+    {
+        cursor_flags = DB_SET_RANGE;
+        get_flags = DB_NEXT;
+    }
+    else if( (PVFS_KEYVAL_QUERY_MASK_NORM(op_p->u.v_read.query_type) == 
+                PVFS_KEYVAL_QUERY_NT) )
+    {
+        cursor_flags = DB_FIRST;
+        get_flags = DB_NEXT;
+    }
+    else
     {
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
-                 "value: No matching keys found in secondary index\n");
+        cursor_flags = DB_SET; 
+        get_flags = DB_NEXT_DUP;
+    }
+
+    /* do initial query to determine if any records exist */
+    ret = query_p->c_pget(query_p, &key, &pkey, &data, cursor_flags);
+    if( ret == DB_NOTFOUND )  /* no records matching request */
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+                     "read_value: No matching keys found in secondary index\n");
         (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
         ret = -dbpf_db_error_to_trove_error(ret);
         goto return_error;
     }
-    else if(  (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+
+    /* get number of data items the cursor refers to */
+    if( (ret = query_p->c_count(query_p, &recno, 0)) != 0 )
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+                     "read_value: Error getting count of matches: %s\n",
+                     db_strerror(ret) );
+    }
+    else
     {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+                     "read_value: match count: %u\n",
+                     recno );
+        *op_p->u.v_read.match_count = recno;
+    }
+
+    if(  (*op_p->u.v_read.position_p) != TROVE_ITERATE_START )
+    {   /* if request came with position other than start, whip through them */
         local_p = 0;
         while( (ret == 0) && (local_p < (*op_p->u.v_read.position_p)) )
         {
-            ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
+            ret = query_p->c_pget(query_p, &key, &pkey, &data, get_flags);
+            if( ret == DB_NOTFOUND )
+            {
+                memset( op_p->u.v_read.key_array[i].buffer, 0, pkey.size);
+                op_p->u.v_read.key_array[i].buffer_sz = 0;
+                memset( op_p->u.v_read.val_array[i].buffer, 0, data.size);
+                op_p->u.v_read.val_array[i].buffer_sz = 0;
+                memset( &(op_p->u.v_read.dirent_array[i]), 0, 
+                    sizeof( PVFS_dirent ) );
+                gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                             "[DBPF KEYVAL]: dbpf_keyval_read_value: can't "
+                             "iterate to requested position\n" );
+                *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+            }
             local_p++;
         }
     }
 
-    if( (ret != 0) && (ret != DB_NOTFOUND) ) /* failure other than not found */
+    if( (ret != 0) && (ret != DB_NOTFOUND) )
     {
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
-                     "value: pget error in secondary index: %s\n",
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: dbpf_keyval_"
+                     "read_value: pget error in secondary index: %s\n",
                      db_strerror(ret));
         (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
         ret = -dbpf_db_error_to_trove_error(ret);
         goto return_error;
     }
-    else if( ((local_p) == (*op_p->u.v_read.position_p)) && ret == 0 )
-    {
-        /* got record for requested position, also handles first record */
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
-                     "[KEYVAL]: dbpf_keyval_read_value: Found key %llu/%s -> "
-                     "%s\n", llu(key_entry.handle), key_entry.key, 
-                     (char *)data.data);
-
-        /* the buffers that values are copied to must be big enough, passed in
-        * pointers have buffers set to max allowable size. */
-        assert(op_p->u.v_read.key->buffer_sz >= pkey.size);
-        assert(op_p->u.v_read.val->buffer_sz >= data.size);
-
-        memcpy(op_p->u.v_read.key->buffer, pkey.data, pkey.size);
-        op_p->u.v_read.key->read_sz = pkey.size;
-        memcpy(op_p->u.v_read.val->buffer, data.data, data.size);
-        op_p->u.v_read.val->read_sz = data.size;
-        op_p->u.v_read.dirent->handle = key_entry.handle;
 
-        if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+    /* cursor is now in position to return requested number of records */
+    gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                 "[DBPF KEYVAL]: dbpf_keyval_read_value: Cursor at key "
+                 "%llu/%s -> %s\n", llu(key_entry.handle), key_entry.key, 
+                 (char *)data.data);
+
+    if( (*op_p->u.v_read.position_p) == TROVE_ITERATE_START )
+    {
+        (*op_p->u.v_read.position_p) = 0;
+    }
+
+    /* the buffers that values are copied to must be big enough, passed in
+    * pointers have buffers set to max allowable size. */
+    while( 
+            ( (*op_p->u.v_read.count) < record_count ) &&
+            ( (*op_p->u.v_read.position_p) != TROVE_ITERATE_END ) && 
+            ( ret == 0 )
+         )
+    {
+        ret = dbpf_result_iterate_selector( original_key, key.data, 
+                                            op_p->u.v_read.query_type);
+
+        if( ret == 0 ) /* should include record in return set */
+        {
+            memcpy(op_p->u.v_read.key_array[(*op_p->u.v_read.count)].buffer, 
+                pkey.data, pkey.size);
+            op_p->u.v_read.key_array[(*op_p->u.v_read.count)].read_sz = 
+                pkey.size;
+            memcpy(op_p->u.v_read.val_array[(*op_p->u.v_read.count)].buffer, 
+                data.data, data.size);
+            op_p->u.v_read.val_array[(*op_p->u.v_read.count)].read_sz = 
+                data.size;
+            op_p->u.v_read.dirent_array[(*op_p->u.v_read.count)].handle = 
+                key_entry.handle;
+    
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                 "[DBPF KEYVAL]: dbpf_keyval_read_value: storing "
+                 "count: %u, handle: %llu, key: %s, value: %s\n",
+                 (*op_p->u.v_read.count), 
+                 llu(op_p->u.v_read.dirent_array[
+                    (*op_p->u.v_read.count)].handle),
+                 (char *) (op_p->u.v_read.key_array[
+                    (*op_p->u.v_read.count)].buffer+sizeof(PVFS_handle)),
+                 (char *)op_p->u.v_read.val_array[
+                    (*op_p->u.v_read.count)].buffer);
+
+            (*op_p->u.v_read.count)++;
+        }
+        else if( ret == -1 ) /* end of what we need to add */
+        { 
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                         "[DBPF_KEYVAL]: dbpf_keyval_read_value: comp "
+                         "function breaking on %s\n", (char *)key.data);
+            *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+            break;
+        }
+        /* otherwise, it's likely junk (handle as attr) so iterate by it */
+
+        (*op_p->u.v_read.position_p)++; 
+
+        if( get_flags == DB_NEXT )
         {
-            (*op_p->u.v_read.position_p) = 1;
+            memset(key.data, 0, 2 * DBPF_MAX_KEY_LENGTH);
+            key.size = 2 * DBPF_MAX_KEY_LENGTH;
         }
         else
         {
-            (*op_p->u.v_read.position_p)++; 
+            key.size = lookup_key_sz - 1;
         }
 
-        /* check if another key exists to prevent an additional call to find
-         * the end. if the cursor ever stays open we'll need to return 
-         * current above, not next */
-        ret = dbc_p->c_pget(dbc_p, &key, &pkey, &data, DB_NEXT_DUP);
-        if( ret == DB_NOTFOUND )
+        key.ulen = (2 * DBPF_MAX_KEY_LENGTH);
+        key.size = lookup_key_sz - 1;
+        data.ulen = data.size = DBPF_MAX_KEY_LENGTH; 
+        pkey.size = pkey.ulen = sizeof( struct dbpf_keyval_db_entry );
+        key.flags = data.flags = pkey.flags = DB_DBT_USERMEM;
+        memset(data.data, 0, DBPF_MAX_KEY_LENGTH);
+        memset(pkey.data, 0, sizeof( struct dbpf_keyval_db_entry ));
+       
+        /* if just iterating, clear out the key too */
+        if( get_flags == DB_NEXT )
         {
-            (*op_p->u.v_read.position_p) = TROVE_ITERATE_END;
-            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: pre-empting end"
-                         " iterator\n");
+            memset(key.data, 0, 2 * DBPF_MAX_KEY_LENGTH);
+            key.size = 2 * DBPF_MAX_KEY_LENGTH;
         }
 
-        ret = dbpf_build_path_of_handle( dbc_p, op_p->u.v_read.dirent->d_name,
-            op_p->coll_p->coll_id, op_p->u.v_read.dirent->handle );
-        
-        if( ret != 0 )
+        ret = query_p->c_pget(query_p, &key, &pkey, &data, get_flags);
+        if( ret == DB_NOTFOUND )
         {
-            goto return_error;
+            /* trying to get next record ran us out of records, mark the 
+             * end, we're out */
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                         "[DBPF KEYVAL]: dbpf_keyval_read_value: reached "
+                         "end of records before filling count. "
+                         "%d / %d records\n", (*op_p->u.v_read.count), 
+                         record_count);
+            *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+        }
+        else if( ret != 0 )
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                         "[DBPF KEYVAL]: dbpf_keyval_read_value: BDB error "
+                         "before filling count: %d / %d records: %s\n", 
+                         i, *op_p->u.v_read.count, db_strerror(ret));
         }
     }
-    else
+
+    /* have to build the path after finding matching values because the
+     * cursor position gets whacked when building the path */
+    for( i = 0; i < (*op_p->u.v_read.count); i++ )
     {
-        /* didn't find the record we wanted, but not for first position 
-         * (handle above) so don't return error, just set the end marker */
-        memset( op_p->u.v_read.key->buffer, 0, pkey.size);
-        op_p->u.v_read.key->buffer_sz = 0;
-        memset( op_p->u.v_read.val->buffer, 0, data.size);
-        op_p->u.v_read.val->buffer_sz = 0;
-        memset( op_p->u.v_read.dirent, 0, sizeof( PVFS_dirent ) );
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: dbpf_keyval_read_"
-                     "value: reached end of cursor with no record to give\n");
-        *op_p->u.v_read.position_p = TROVE_ITERATE_END;
+        /* build path of read handle, use un-normalized associated db */
+        ret = dbpf_build_path_of_handle( dbc_p, 
+            op_p->u.v_read.dirent_array[i].d_name,
+            op_p->coll_p->coll_id, op_p->u.v_read.dirent_array[i].handle );
     }
 
-    dbc_p->c_close(dbc_p);
-    free(key_data);
-    free(value_data);
+    gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                 "[DBPF_KEYVAL]: dbpf_keyval_read_value: exiting: "
+                 "token (%llu)\n", llu(*op_p->u.v_read.position_p));
+    if( dbcn_p != NULL )
+    {
+        dbcn_p->c_close(dbcn_p);
+    }
+    if( dbc_p != NULL )
+    {
+        dbc_p->c_close(dbc_p);
+    }
+    free(lookup_key);
+    free(original_key);
+    free(val_datum);
+
     return 1;
 
 return_error:
+    if( dbcn_p != NULL )
+    {
+        dbcn_p->c_close(dbcn_p);
+    }
     dbc_p->c_close(dbc_p);
-    free(key_data);
-    free(value_data);
+    free(lookup_key);
+    free(val_datum);
     return ret;
 }
 
@@ -2334,6 +2546,125 @@ static int dbpf_keyval_handle_info_ops(s
     return 0;
 }
 
+/* return 0 or 1 if a is part of the result set for b and query */
+static int dbpf_result_iterate_selector(char *a, char *b, 
+                                        uint32_t query)
+{
+
+    int max_len = (strlen(a)>strlen(b)?strlen(a):strlen(b));
+    if( strncmp(b, "user.", 5) != 0 )
+    {
+        /* if key doesn't begin with user. it's not a valid attribute 
+         * if less than, just don't include it. if it's greater we're done */
+        if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LT ||
+            PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LE || 
+            PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_NT )
+        {
+            return 1;
+        }
+        else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GT ||
+                 PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GE ||
+                 PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_EQ ||
+                 PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_PEQ )
+        {
+            return -1;
+        }
+    }
+
+    if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LT )
+    {
+        if( memcmp( b, a, max_len ) < 0 )
+        {
+            return 0;
+        }
+        else
+        {   /* time to stop, we've passed the keys */
+            return -1;
+        }
+
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_LE )
+    {
+        if( memcmp( b, a, max_len) <= 0 )
+        {
+            return 0;
+        }
+        else
+        {   /* time to stop, we've passed the keys */
+            return -1;
+        }
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_EQ )
+    {
+        if( memcmp( b, a, max_len) == 0 )
+        {
+            return 0;
+        }
+        else
+        {   /* should only see equal keys in here */
+            return -1;
+        }
+    
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_PEQ )
+    {
+        if( memcmp( b, a, strlen(a)) == 0 )
+        {
+            return 0;
+        }
+        else if( memcmp( b, a, strlen(a) ) > 0 )
+        { 
+            return -1;
+        }
+        else
+        {
+            return 1;
+        }
+    
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GE )
+    {
+        if( memcmp( b, a, max_len) >= 0 )
+        {
+            return 0;
+        }
+        else
+        {   /* something funny (or a bug) happened*/
+            return -1;
+        }
+    
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_GT )
+    {
+        /* will be called starting with equal keys */
+        if( memcmp( b, a, max_len) == 0 )
+        {
+            return 1;
+        }
+        else if( memcmp( b, a, max_len) > 0 )
+        {
+            return 0;
+        }
+        else
+        {   /* something funny (or a bug) happened*/
+            return -1;
+        }
+    
+    }
+    else if( PVFS_KEYVAL_QUERY_MASK_NORM(query) == PVFS_KEYVAL_QUERY_NT )
+    {
+        if( memcmp( b, a, max_len) != 0 )
+        {
+            return 0;
+        }
+        else
+        {
+            return 1;
+        }
+    }
+    return 1;
+}
+
 static int dbpf_build_path_of_handle( DBC *dbc_p,
     char *path, 
     TROVE_coll_id coll_id, 
@@ -2400,16 +2731,16 @@ static int dbpf_build_path_of_handle( DB
             if( key_entry.handle == root_h )
             {
                 gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
-                             "[KEYVAL]: Built path (%llu): %s\n",
-                             llu(key_entry.handle), path);
+                             "[DBPF KEYVAL]: Built path (%s) for (%llu)\n",
+                             path, llu(handle));
                 break;
             }
         }
         else
         {
             gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
-                         "[KEYVAL]: Failed finding parent handle for: "
-                         "%llu: ulen: %d, size: %d, %s\n", 
+                         "[DBPF KEYVAL]: Failed finding parent handle for: "
+                         "handle %llu, ulen: %d, size: %d, %s\n", 
                          llu( key_entry.handle), data.ulen, data.size,
                          db_strerror(ret));
             ret = -dbpf_db_error_to_trove_error(ret);
@@ -2433,28 +2764,33 @@ int PINT_trove_dbpf_keyval_secondary_cal
 
     /* for attributes prefixed with user create a secondary key of the form
      * <attribute><value> */
-    if( (memcmp(k->key, "user.", 5) == 0) )
+    if( ( pkey->size > ((sizeof(PVFS_handle) + strlen("user."))) ) &&
+        ( memcmp(k->key, "user.", 5) == 0) )
     {
         /* size of new key is length of the attribute plus length of value */
-        if( (key_data = malloc(strlen(k->key) + (pdata->size)) ) == 0 )
+        if( (key_data = malloc(strlen(k->key)+strlen(pdata->data) + 1) ) == 0 )
         {
             gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
                          "[DBPF KEYVAL]: malloc for secondary_callback "
                          "for new attribute/value key failed.\n");
             return TROVE_ENOMEM;
         }
-        memset(key_data, 0, (strlen(k->key) + (pdata->size)));
+        memset(key_data, 0, (strlen(k->key) + strlen(pdata->data)+1));
     
         /* copy attribute to start of key */
         memcpy(key_data, k->key, strlen(k->key) );
     
         /* copy value directly after key */
-        memcpy((key_data + strlen(k->key)), pdata->data, pdata->size);
-        skey->ulen = skey->size = strlen(k->key) + pdata->size;
+        memcpy((key_data + strlen(k->key)), pdata->data, strlen(pdata->data));
+        skey->ulen = skey->size = strlen(key_data) + 1;
 
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
-                     "INDEX (%s) (%d) -> (%s) (%d)\n", 
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: CREATING "
+                     "SECONDARY INDEX (%s) (%d) -> "
+                     "[(%llu)(%s) (%d)]:[(%s) (%d)]\n", 
                      (char *)key_data, skey->size, 
+                     llu(k->handle),
+                     (char *)k->key,
+                     pkey->size,
                      (char *)pdata->data, pdata->size); 
     }
     else if((pdata->size == sizeof(TROVE_handle)) && (strcmp("dh", k->key)!=0))
@@ -2473,7 +2809,8 @@ int PINT_trove_dbpf_keyval_secondary_cal
         memcpy(key_data, pdata->data, pdata->size );
         memcpy(&h, pdata->data, pdata->size);
         /* copy attribute to start of key */
-        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: CREATING SECONDARY "
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, 
+                     "[DBPF KEYVAL]: CREATING SECONDARY "
                      "INDEX (%llu) (%d) -> (%llu) (%d)\n", 
                      llu(h), pdata->size, llu(h), pdata->size); 
         skey->ulen = skey->size = pdata->size;
@@ -2488,6 +2825,61 @@ int PINT_trove_dbpf_keyval_secondary_cal
     return 0;
 }
 
+ /* constructs secondary key for keyval_secondary_norm db. the value of the
+  * primary data is returned. */
+int PINT_trove_dbpf_keyval_secondary_norm_callback(
+    DB *secondary_norm, const DBT *pkey, const DBT *pdata, DBT *skey)
+{
+    struct dbpf_keyval_db_entry *k;
+    char *key_data;
+    int i = 0;
+
+    memset( skey, 0, sizeof(DBT));
+    k = (struct dbpf_keyval_db_entry *)pkey->data;
+
+    /* for attributes prefixed with user create a secondary key normalized 
+     * of the form <attribute><value> */
+    if( ( pkey->size > ((sizeof(PVFS_handle) + strlen("user."))) ) &&
+        ( memcmp(k->key, "user.", 5) == 0) )
+    {
+        /* size of new key is length of the attribute plus length of value */
+        if( (key_data = malloc(strlen(k->key)+strlen(pdata->data)+1) ) == 0 )
+        {
+            gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                         "[DBPF KEYVAL]: malloc for secondary_callback "
+                         "for new attribute/value key failed.\n");
+            return TROVE_ENOMEM;
+        }
+        memset(key_data, 0, (strlen(k->key) + strlen(pdata->data)+1));
+    
+        for( i = 0; i < strlen(k->key); i++ )
+        {
+            key_data[i] = tolower( k->key[i] );
+        }
+
+        for( i = 0; i < strlen(pdata->data); i++ )
+        {
+            key_data[i+strlen(k->key)] = tolower( ((char *)pdata->data)[i] );
+        }
+        skey->ulen = skey->size = strlen(key_data) + 1;
+
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[DBPF KEYVAL]: CREATING "
+                     "SECONDARY NORM INDEX (%s) (%d) -> "
+                     "[(%llu)(%s) (%d)]:[(%s) (%d)]\n", 
+                     (char *)key_data, skey->size, llu(k->handle), 
+                     (char *)k->key, pkey->size,
+                     (char *)pdata->data, pdata->size); 
+    }
+    else
+    {
+        return DB_DONOTINDEX;
+    }
+
+    skey->data = key_data;
+    skey->flags = DB_DBT_APPMALLOC;
+    return 0;
+}
+
 
 int PINT_trove_dbpf_keyval_compare(
     DB * dbp, const DBT * a, const DBT * b)
@@ -2497,6 +2889,61 @@ int PINT_trove_dbpf_keyval_compare(
 
     db_entry_a = (const struct dbpf_keyval_db_entry *) a->data;
     db_entry_b = (const struct dbpf_keyval_db_entry *) b->data;
+
+    if(db_entry_a->handle != db_entry_b->handle)
+    {
+        return (db_entry_a->handle < db_entry_b->handle) ? -1 : 1;
+    }
+
+    if(a->size > b->size)
+    {
+        return 1;
+    }
+
+    if(a->size < b->size)
+    {
+        return -1;
+    }
+
+    /* must be equal */
+    return (memcmp(db_entry_a->key, db_entry_b->key, 
+                    DBPF_KEYVAL_DB_ENTRY_KEY_SIZE(a->size)));
+}
+
+int PINT_trove_dbpf_keyval_secondary_compare(
+    DB * dbp, const DBT * a, const DBT * b)
+{
+    const struct dbpf_keyval_db_entry * db_entry_a;
+    const struct dbpf_keyval_db_entry * db_entry_b;
+
+    db_entry_a = (const struct dbpf_keyval_db_entry *) a->data;
+    db_entry_b = (const struct dbpf_keyval_db_entry *) b->data;
+
+    if( a->size > 5 && b->size > 5 )
+    {
+        if( strncmp(a->data, "user.", 5) == 0 )
+        {
+            if( strncmp(b->data, "user.", 5) == 0 )
+            {
+                gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG,
+                             "[KEYVAL]: comparing two user. strings: [%s]:[%s] "
+                             "strcmp says: %d\n", (char *)a->data, 
+                             (char *)b->data, strcmp(a->data, b->data));
+                return strcoll(a->data, b->data); /* lexical comparison */
+            }
+            else
+            {
+                return -1; /* a is an attr, b is not (a is less) */
+            }
+        }
+        else 
+        {
+            if( strncmp(b->data, "user.", 5) == 0 )
+            {
+                return 1; /* b is an attr, a is not (b is greater) */
+            }
+        }
+    }
 
     if(db_entry_a->handle != db_entry_b->handle)
     {

Index: dbpf-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-mgmt.c,v
diff -p -u -r1.109.24.1 -r1.109.24.2
--- dbpf-mgmt.c	14 Jul 2009 17:19:42 -0000	1.109.24.1
+++ dbpf-mgmt.c	10 Aug 2009 15:30:09 -0000	1.109.24.2
@@ -1157,6 +1157,23 @@ int dbpf_collection_create(char *collnam
         }
     }
 
+    DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX, sto_p->name, 
+        new_coll_id);
+    ret = stat(path_name, &dbstat);
+    if(ret < 0 && errno != ENOENT)
+    {
+        gossip_err("failed to stat keyval_secondary_norm db: %s\n", path_name);
+        return -trove_errno_to_trove_error(errno);
+    }
+    if(ret < 0)
+    {
+        ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+        if (ret != 0)
+        {
+            gossip_err("dbpf_db_create failed on %s\n", path_name);
+            return ret;
+        }
+    }
 
     DBPF_GET_BSTREAM_DIRNAME(path_name, PATH_MAX, sto_p->name, new_coll_id);
     ret = mkdir(path_name, 0755);
@@ -1253,6 +1270,7 @@ int dbpf_collection_remove(char *collnam
         /* Clean up properly by closing all db handles */
         db_close(db_collection->coll_attr_db);
         db_close(db_collection->ds_db);
+        db_close(db_collection->keyval_secondary_norm_db);
         db_close(db_collection->keyval_secondary_db);
         db_close(db_collection->keyval_db);
         /* so that environment can also be cleaned up */
@@ -1288,6 +1306,14 @@ int dbpf_collection_remove(char *collnam
         ret = -trove_errno_to_trove_error(errno);
     }
 
+    DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX,
+                                          sto_p->name, db_data.coll_id);
+    if(unlink(path_name) != 0)
+    {
+        gossip_err("failure removing keyval secondary norm db\n");
+        ret = -trove_errno_to_trove_error(errno);
+    }
+
     DBPF_GET_COLL_ATTRIB_DBNAME(path_name, PATH_MAX,
                                 sto_p->name, db_data.coll_id);
     if (unlink(path_name) != 0)
@@ -1612,6 +1638,18 @@ int dbpf_collection_clear(TROVE_coll_id 
             db_strerror(ret));
     }
 
+    if ((ret = coll_p->keyval_secondary_norm_db->sync(coll_p->keyval_secondary_norm_db, 0)) != 0)
+    {
+        gossip_err("db_sync(coll_keyval_secondary_norm_db): %s\n", 
+            db_strerror(ret));
+    }
+
+    if ((ret = db_close(coll_p->keyval_secondary_norm_db)) != 0) 
+    {
+        gossip_lerr("db_close(coll_keyval_secondary_norm_db): %s\n", 
+            db_strerror(ret));
+    }
+
     if ((ret = coll_p->keyval_db->sync(coll_p->keyval_db, 0)) != 0)
     {
         gossip_err("db_sync(coll_keyval_db): %s\n", db_strerror(ret));
@@ -1917,7 +1955,8 @@ int dbpf_collection_lookup(char *collnam
 
     /* secondary database file already exists, try to open */
     coll_p->keyval_secondary_db = dbpf_db_open(sto_p->name, path_name, 
-        coll_p->coll_env, &ret, NULL, (DB_DUP|DB_DUPSORT) );
+        coll_p->coll_env, &ret, PINT_trove_dbpf_keyval_secondary_compare, 
+        (DB_DUP|DB_DUPSORT) );
     /* TODO: add check to ensure BDB thinks secondary index is consistent */
     if(coll_p->keyval_secondary_db == NULL)
     {
@@ -1937,6 +1976,63 @@ int dbpf_collection_lookup(char *collnam
           PINT_trove_dbpf_keyval_secondary_callback,DB_CREATE);
     if( ret != 0 )
     {
+        db_close(coll_p->keyval_secondary_db);
+        db_close(coll_p->keyval_db);
+        db_close(coll_p->coll_attr_db);
+        db_close(coll_p->ds_db);
+        dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+        free(coll_p->path_name);
+        free(coll_p->name);
+        free(coll_p);
+        return ret;
+    }
+
+    DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(path_name, PATH_MAX,
+                           sto_p->name, coll_p->coll_id);
+    /* if secondary normalized index doesn't exist, just re-create it */
+    ret = stat(path_name, &dbstat);
+    if(ret < 0 && errno != ENOENT)
+    {
+        gossip_err("failed to stat keyval_secondary_norm db: %s\n", path_name);
+        return -trove_errno_to_trove_error(errno);
+    }
+    if(ret < 0)
+    {
+        gossip_debug(GOSSIP_DBPF_KEYVAL_DEBUG, "[KEYVAL]: Recreating secondary "
+                     "normalized index.\n");
+        ret = dbpf_db_create(sto_p->name, path_name, NULL, (DB_DUP|DB_DUPSORT));
+        if (ret != 0)
+        {
+            gossip_err("dbpf_db_create failed on %s\n", path_name);
+            return ret;
+        }
+    }
+
+    /* secondary normalized database file already exists, try to open */
+    coll_p->keyval_secondary_norm_db = dbpf_db_open(sto_p->name, path_name, 
+        coll_p->coll_env, &ret, PINT_trove_dbpf_keyval_secondary_compare, 
+        (DB_DUP|DB_DUPSORT) );
+    /* TODO: add check to ensure BDB thinks secondary index is consistent */
+    if(coll_p->keyval_secondary_norm_db == NULL)
+    {
+        db_close(coll_p->keyval_secondary_db);
+        db_close(coll_p->keyval_db);
+        db_close(coll_p->coll_attr_db);
+        db_close(coll_p->ds_db);
+        dbpf_putdb_env(coll_p->coll_env, coll_p->path_name);
+        free(coll_p->path_name);
+        free(coll_p->name);
+        free(coll_p);
+        return ret;
+    }
+
+    /* associate the secondary index db with the primary and generate keys
+     * if it's empty */
+    ret = dbpf_db_associate(coll_p->keyval_db, coll_p->keyval_secondary_norm_db,
+          PINT_trove_dbpf_keyval_secondary_norm_callback,DB_CREATE);
+    if( ret != 0 )
+    {
+        db_close(coll_p->keyval_secondary_norm_db);
         db_close(coll_p->keyval_secondary_db);
         db_close(coll_p->keyval_db);
         db_close(coll_p->coll_attr_db);

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.96.24.1 -r1.96.24.2
--- dbpf.h	14 Jul 2009 17:19:42 -0000	1.96.24.1
+++ dbpf.h	10 Aug 2009 15:30:09 -0000	1.96.24.2
@@ -143,12 +143,21 @@ do {                                    
 } while (0)
 
 #define KEYVAL_SECONDARY_DBNAME "keyval_secondary.db"
-#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname,__collid)      \
+#define DBPF_GET_KEYVAL_SECONDARY_DBNAME(__buf,__path_max,__stoname,     \
+__collid)                                                                \
 do {                                                                     \
   snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid,        \
            KEYVAL_SECONDARY_DBNAME);                                     \
 } while (0)
 
+#define KEYVAL_SECONDARY_NORM_DBNAME "keyval_secondary_norm.db"
+#define DBPF_GET_KEYVAL_SECONDARY_NORM_DBNAME(__buf,__path_max,__stoname,\
+__collid)                                                                \
+do {                                                                     \
+  snprintf(__buf, __path_max, "/%s/%08x/%s", __stoname, __collid,        \
+           KEYVAL_SECONDARY_NORM_DBNAME);                                \
+} while (0)
+
 inline int dbpf_pread(int fd, void *buf, size_t count, off_t offset);
 inline int dbpf_pwrite(int fd, const void *buf, size_t count, off_t offset);
 
@@ -218,6 +227,7 @@ struct dbpf_collection
     DB *ds_db;
     DB *keyval_db;
     DB *keyval_secondary_db;
+    DB *keyval_secondary_norm_db;
     DB_ENV *coll_env;
     TROVE_coll_id coll_id;
     TROVE_handle root_dir_handle;
@@ -251,8 +261,12 @@ struct dbpf_collection_db_entry
 
 int PINT_trove_dbpf_keyval_compare(
     DB * dbp, const DBT * a, const DBT * b);
+int PINT_trove_dbpf_keyval_secondary_compare(
+    DB * dbp, const DBT * a, const DBT * b);
 int PINT_trove_dbpf_keyval_secondary_callback(
     DB *secondary, const DBT *pkey, const DBT *pdata, DBT *skey);
+int PINT_trove_dbpf_keyval_secondary_norm_callback(
+    DB *secondary_norm, const DBT *pkey, const DBT *pdata, DBT *skey);
 int PINT_trove_dbpf_ds_attr_compare(
     DB * dbp, const DBT * a, const DBT * b);
 int PINT_trove_dbpf_ds_attr_compare_reversed(
@@ -386,10 +400,15 @@ struct dbpf_keyval_iterate_keys_op
 
 struct dbpf_keyval_read_value_op
 {
-    PVFS_dirent *dirent;
     PVFS_ds_keyval *key;
     PVFS_ds_keyval *val;
+    PVFS_dirent *dirent_array;
+    PVFS_ds_keyval *key_array;
+    PVFS_ds_keyval *val_array;
+    uint32_t *count;
+    uint32_t *match_count;
     TROVE_ds_position *position_p;
+    uint32_t query_type;
     /* vtag? */
 };
 



More information about the Pvfs2-cvs mailing list