[PVFS2-CVS] commit by slang in pvfs2/src/io/trove/trove-dbpf: dbpf-attr-cache.c dbpf-keyval.c dbpf.h

CVS commit program cvs at parl.clemson.edu
Thu Aug 25 17:38:29 EDT 2005


Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb:/tmp/cvs-serv7520/src/io/trove/trove-dbpf

Modified Files:
      Tag: slang-event-changes-branch
	dbpf-attr-cache.c dbpf-keyval.c dbpf.h 
Log Message:
updates to my event changes to bring them inline with trunk


Index: dbpf-attr-cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-attr-cache.c,v
diff -p -u -r1.15 -r1.15.6.1
--- dbpf-attr-cache.c	17 Sep 2004 21:15:42 -0000	1.15
+++ dbpf-attr-cache.c	25 Aug 2005 20:38:28 -0000	1.15.6.1
@@ -9,6 +9,7 @@
 #include "gossip.h"
 #include "dbpf-attr-cache.h"
 #include "gen-locks.h"
+#include "str-utils.h"
 
 /* these are based on code from src/server/request-scheduler.c */
 static int hash_key(void *key, int table_size);
@@ -19,9 +20,7 @@ static int s_cache_size = DBPF_ATTR_CACH
 static int s_max_num_cache_elems = 
 DBPF_ATTR_CACHE_DEFAULT_MAX_NUM_CACHE_ELEMS;
 static struct qhash_table *s_key_to_attr_table = NULL;
-static char *s_cacheable_keywords = NULL;
-static char *s_cacheable_keyword_array[
-    DBPF_ATTR_CACHE_MAX_NUM_KEYVALS] = {0};
+static char **s_cacheable_keyword_array = NULL;
 static int s_cacheable_keyword_array_size = 0;
 static int s_current_num_cache_elems = 0;
 
@@ -48,12 +47,9 @@ int dbpf_attr_cache_set_keywords(char *k
     gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "Setting dbpf_attr_cache "
                  "keywords to:\n%s\n", keywords);
 
-    if (s_cacheable_keywords)
-    {
-        free(s_cacheable_keywords);
-    }
-    s_cacheable_keywords = strdup(keywords);
-    return (s_cacheable_keywords ? 0 : -1);
+    s_cacheable_keyword_array_size = PINT_split_string_list(
+        &s_cacheable_keyword_array, keywords);
+    return (s_cacheable_keyword_array ? 0 : -1);
 }
 
 int dbpf_attr_cache_set_size(int cache_size)
@@ -76,47 +72,12 @@ int dbpf_attr_cache_set_max_num_elems(in
 */
 int dbpf_attr_cache_do_initialize(void)
 {
-    int ret = -1, num_keywords = 0;
-    char *ptr = NULL, *start = NULL, *end = NULL;
-    char *limit  = NULL, *tmp = NULL;
-
-    if (s_cacheable_keywords)
-    {
-        /* freed in finalize */
-        tmp = strdup(s_cacheable_keywords);
-        limit = (char *)(tmp + strlen(tmp));
-
-        /* break up keywords into an array here */
-        ptr = start = tmp;
-        for(; (ptr && (start != limit)); ptr++)
-        {
-            if ((*ptr == '\0') || (*ptr == ' ') || (*ptr == ','))
-            {
-                end = ptr;
-            }
-            if (start && end)
-            {
-                *end = '\0';
-                s_cacheable_keyword_array[num_keywords++] = start;
-
-                gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "Got cacheable "
-                             "attribute keyword %s\n",start);
-
-                start = ++end;
-                if (start >= limit)
-                {
-                    break;
-                }
-                end = NULL;
-            }
-        }
-    }
-
+    int ret = -1;
     gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "There are %d cacheable "
-                 "keywords registered\n", num_keywords);
+                 "keywords registered\n", s_cacheable_keyword_array_size);
     ret = dbpf_attr_cache_initialize(
         s_cache_size, s_max_num_cache_elems,
-        s_cacheable_keyword_array, num_keywords);
+        s_cacheable_keyword_array, s_cacheable_keyword_array_size);
 
     return ret;
 }
@@ -150,7 +111,6 @@ int dbpf_attr_cache_initialize(
                 goto return_error;
             }
 
-            s_cacheable_keyword_array_size = num_cacheable_keywords;
             /*
               NOTE: our keyword array must have already
               been built by the do_initialize call.
@@ -235,19 +195,11 @@ int dbpf_attr_cache_finalize(void)
                      "dbpf_attr_cache_finalized\n");
     }
 
-    if (s_cacheable_keywords)
+    if (s_cacheable_keyword_array && s_cacheable_keyword_array_size)
     {
-        free(s_cacheable_keywords);
-        s_cacheable_keywords = NULL;
-
-        /*
-          NOTE: this array was not allocated; it pointed
-          into s_cacheable_keywords string above
-        */
-        for(i = 0; i < s_cacheable_keyword_array_size; i++)
-        {
-            s_cacheable_keyword_array[i] = NULL;
-        }
+        PINT_free_string_list(s_cacheable_keyword_array,
+                              s_cacheable_keyword_array_size);
+        s_cacheable_keyword_array = NULL;
         s_cacheable_keyword_array_size = 0;
     }
     return ret;
@@ -564,7 +516,7 @@ int dbpf_attr_cache_insert(
                 memset(cache_elem, 0, sizeof(dbpf_attr_cache_elem_t));
             }
 
-            if (s_cacheable_keywords)
+            if (s_cacheable_keyword_array)
             {
                 /* initialize all of the keyvals we're able to cache */
                 for(i = 0; i < s_cacheable_keyword_array_size; i++)
@@ -622,7 +574,7 @@ int dbpf_attr_cache_remove(TROVE_object_
                 "removing %Lu\n", Lu(key.handle));
 
             /* free any keyval data cached as well */
-            if (s_cacheable_keywords)
+            if (s_cacheable_keyword_array)
             {
                 /* free all of the keyvals we've cached */
                 assert(s_cacheable_keyword_array_size ==

Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.48 -r1.48.6.1
--- dbpf-keyval.c	13 Jan 2005 17:47:21 -0000	1.48
+++ dbpf-keyval.c	25 Aug 2005 20:38:28 -0000	1.48.6.1
@@ -4,6 +4,17 @@
  * See COPYING in top-level directory.
  */
 
+/*
+ * dbpf_keyval_write semantics:
+ *
+ * Value buffer is supplied by user with size of buffer
+ * If size is too small DB returns an error (Cannot allocated memory)
+ * and return the size of the buffer needed in val_p->read_sz
+ *
+ * WBL 6/05
+ *
+ */
+
 #include <unistd.h>
 #include <errno.h>
 #include <sys/types.h>
@@ -24,6 +35,7 @@
 static int dbpf_keyval_read_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_read_list_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_write_op_svc(struct dbpf_op *op_p);
+static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_remove_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_iterate_op_svc(struct dbpf_op *op_p);
 static int dbpf_keyval_flush_op_svc(struct dbpf_op *op_p);
@@ -87,8 +99,8 @@ static int dbpf_keyval_read(TROVE_coll_i
                         context_id);
 
     /* initialize the op-specific members */
-    q_op_p->op.u.k_read.key = *key_p;
-    q_op_p->op.u.k_read.val = *val_p;
+    q_op_p->op.u.k_read.key = key_p;
+    q_op_p->op.u.k_read.val = val_p;
 
     *out_op_id_p = dbpf_queued_op_queue(q_op_p);
 
@@ -114,12 +126,12 @@ static int dbpf_keyval_read_op_svc(struc
     }
 
     memset(&key, 0, sizeof(key));
-    key.data = op_p->u.k_read.key.buffer;
-    key.size = op_p->u.k_read.key.buffer_sz;
+    key.data = op_p->u.k_read.key->buffer;
+    key.size = op_p->u.k_read.key->buffer_sz;
 
     memset(&data, 0, sizeof(data));
-    data.data = op_p->u.k_read.val.buffer;
-    data.ulen = op_p->u.k_read.val.buffer_sz;
+    data.data = op_p->u.k_read.val->buffer;
+    data.ulen = op_p->u.k_read.val->buffer_sz;
     data.flags = DB_DBT_USERMEM;
 
     ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &data, 0);
@@ -130,15 +142,25 @@ static int dbpf_keyval_read_op_svc(struc
                      "key=%s (%s)\n", Lu(op_p->handle),
                      (char *)key.data, db_strerror(ret));
 
+        /* if data buffer is too small returns a memory error */
+        if (data.ulen < data.size)
+        {
+            gossip_debug(GOSSIP_TROVE_DEBUG,
+                    "warning: Value buffer too small %d < %d\n",
+                    data.ulen, data.size);
+            /* let the user know */
+            op_p->u.k_read.val->read_sz = data.size;
+        }
+
         ret = -dbpf_db_error_to_trove_error(ret);
         goto return_error;
     }
-    op_p->u.k_read.val.read_sz = data.size;
+    op_p->u.k_read.val->read_sz = data.size;
 
     /* cache this data in the attr cache if we can */
     if (dbpf_attr_cache_elem_set_data_based_on_key(
-            ref, op_p->u.k_read.key.buffer,
-            op_p->u.k_read.val.buffer, data.size))
+            ref, op_p->u.k_read.key->buffer,
+            op_p->u.k_read.val->buffer, data.size))
     {
         /*
           NOTE: this can happen if the keyword isn't registered, or if
@@ -146,14 +168,14 @@ static int dbpf_keyval_read_op_svc(struc
         */
         gossip_debug(
             GOSSIP_DBPF_ATTRCACHE_DEBUG,"** CANNOT cache data retrieved "
-            "(key is %s)\n", (char *)op_p->u.k_read.key.buffer);
+            "(key is %s)\n", (char *)op_p->u.k_read.key->buffer);
     }
     else
     {
         gossip_debug(
             GOSSIP_DBPF_ATTRCACHE_DEBUG,"*** cached keyval data "
             "retrieved (key is %s)\n",
-            (char *)op_p->u.k_read.key.buffer);
+            (char *)op_p->u.k_read.key->buffer);
     }
 
     dbpf_open_cache_put(&tmp_ref);
@@ -219,6 +241,7 @@ static int dbpf_keyval_write_op_svc(stru
     TROVE_object_ref ref = {op_p->handle, op_p->coll_p->coll_id};
     TROVE_size k_size;
     DB_BTREE_STAT *k_stat_p = NULL;
+    u_int32_t dbflags = 0;
 
     ret = dbpf_open_cache_get(
         op_p->coll_p->coll_id, op_p->handle, 1, DBPF_OPEN_DB, &tmp_ref);
@@ -238,7 +261,47 @@ static int dbpf_keyval_write_op_svc(stru
     data.data = op_p->u.k_write.val.buffer;
     data.size = op_p->u.k_write.val.buffer_sz;
 
-    ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, 0);
+    /* If TROVE_NOOVERWRITE flag was set, make sure that we don't create the
+     * key if it exists */
+    if ((op_p->flags & TROVE_NOOVERWRITE))
+    {
+        dbflags |= DB_NOOVERWRITE;
+    }
+    /* if TROVE_ONLYOVERWRITE flag was set, make sure that the key exists
+     * before overwriting it */
+    else if ((op_p->flags & TROVE_ONLYOVERWRITE))
+    {
+        DBT tmpdata;
+
+        memset(&tmpdata, 0, sizeof(tmpdata));
+        tmpdata.ulen = op_p->u.k_write.val.buffer_sz;
+        tmpdata.data = (void *) malloc(tmpdata.ulen);
+        tmpdata.flags = DB_DBT_USERMEM;
+        ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &tmpdata, 0);
+        /* A failed get implies that keys possibly did not exist */
+        if (ret != 0)
+        {
+            /* The only case where we are ok is val buffer
+            *  is too small */
+            if (tmpdata.ulen < tmpdata.size)
+            {
+                ret = 0;
+            }
+            else
+            {
+                ret = -dbpf_db_error_to_trove_error(ret);
+            }
+        }
+        free(tmpdata.data);
+        /* If there was an error, we need to return right here */
+        if (ret != 0)
+        {
+            goto return_error;
+        }
+    }
+
+    ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, dbflags);
+    /* Either a put error or key already exists */
     if (ret != 0)
     {
         tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->put");
@@ -816,7 +879,176 @@ static int dbpf_keyval_write_list(TROVE_
                                   TROVE_context_id context_id,
                                   TROVE_op_id *out_op_id_p)
 {
-    return -TROVE_ENOSYS;
+    dbpf_queued_op_t *q_op_p = NULL;
+    struct dbpf_collection *coll_p = NULL;
+
+    coll_p = dbpf_collection_find_registered(coll_id);
+    if (coll_p == NULL)
+    {
+        return -TROVE_EINVAL;
+    }
+    q_op_p = dbpf_queued_op_alloc();
+    if (q_op_p == NULL)
+    {
+        return -TROVE_ENOMEM;
+    }
+
+    /* initialize all the common members */
+    dbpf_queued_op_init(q_op_p,
+                        KEYVAL_WRITE_LIST,
+                        handle,
+                        coll_p,
+                        dbpf_keyval_write_list_op_svc,
+                        user_ptr,
+                        flags,
+                        context_id);
+
+    /* initialize the op-specific members */
+    q_op_p->op.u.k_write_list.key_array = key_array;
+    q_op_p->op.u.k_write_list.val_array = val_array;
+    q_op_p->op.u.k_write_list.count = count;
+
+    *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+
+    return 0;
+}
+
+static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p)
+{
+    int ret = -TROVE_EINVAL, got_db = 0;
+    struct open_cache_ref tmp_ref;
+    DBT key, data;
+    dbpf_attr_cache_elem_t *cache_elem = NULL;
+    TROVE_object_ref ref = {op_p->handle, op_p->coll_p->coll_id};
+    TROVE_size k_size;
+    DB_BTREE_STAT *k_stat_p = NULL;
+    int k;
+
+    ret = dbpf_open_cache_get(
+        op_p->coll_p->coll_id, op_p->handle, 1, DBPF_OPEN_DB, &tmp_ref);
+    if (ret < 0)
+    {
+        goto return_error;
+    }
+    else
+    {
+        got_db = 1;
+    }
+
+    if ((op_p->flags & TROVE_NOOVERWRITE) 
+            || (op_p->flags & TROVE_ONLYOVERWRITE))
+    {
+        /* read each key to see if it is present */
+        for (k = 0; k < op_p->u.k_write_list.count; k++)
+        {
+            memset(&key, 0, sizeof(key));
+            memset(&data, 0, sizeof(data));
+            key.data = op_p->u.k_write_list.key_array[k].buffer;
+            key.size = op_p->u.k_write_list.key_array[k].buffer_sz;
+
+            ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &data, 0);
+            if (ret != 0)
+            {
+                if ((op_p->flags & TROVE_NOOVERWRITE) &&  (ret == DB_NOTFOUND))
+                {
+                    /* this means key is not in DB, which is what we
+                     * want for the no-overwrite case - so go to the next key
+                     */
+                    continue;
+                }
+                tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->get");
+                ret = -dbpf_db_error_to_trove_error(ret);
+                goto return_error;
+            }
+        }
+    }
+
+    for (k = 0; k < op_p->u.k_write_list.count; k++)
+    {
+        memset(&key, 0, sizeof(key));
+        memset(&data, 0, sizeof(data));
+        key.data = op_p->u.k_write_list.key_array[k].buffer;
+        key.size = op_p->u.k_write_list.key_array[k].buffer_sz;
+        data.data = op_p->u.k_write_list.val_array[k].buffer;
+        data.size = op_p->u.k_write_list.val_array[k].buffer_sz;
+
+        ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, 0);
+        if (ret != 0)
+        {
+            tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->put");
+            ret = -dbpf_db_error_to_trove_error(ret);
+            goto return_error;
+        }
+
+        gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "*** Trove KeyVal Write "
+                 "of %s\n", (char *)op_p->u.k_write_list.key_array[k].buffer);
+
+        /*
+         now that the data is written to disk, update the cache if it's
+         an attr keyval we manage.
+        */
+        cache_elem = dbpf_attr_cache_elem_lookup(ref);
+        if (cache_elem)
+        {
+            if (dbpf_attr_cache_elem_set_data_based_on_key(
+                    ref, op_p->u.k_write_list.key_array[k].buffer,
+                    op_p->u.k_write_list.val_array[k].buffer, data.size))
+            {
+                /*
+                NOTE: this can happen if the keyword isn't registered,
+                or if there is no associated cache_elem for this key
+                */
+                gossip_debug(
+                    GOSSIP_DBPF_ATTRCACHE_DEBUG,"** CANNOT cache data written "
+                    "(key is %s)\n", (char *)op_p->u.k_write_list.key_array[k].buffer);
+            }
+            else
+            {
+                gossip_debug(
+                    GOSSIP_DBPF_ATTRCACHE_DEBUG,"*** cached keyval data "
+                    "written (key is %s)\n",
+                    (char *)op_p->u.k_write_list.key_array[k].buffer);
+            }
+        }
+    }
+
+    /* this may have increased the number of keyvals stored in this
+     * object; update the k_size in the cache
+     */
+    ret = tmp_ref.db_p->stat(tmp_ref.db_p,
+#ifdef HAVE_TXNID_PARAMETER_TO_DB_STAT
+		        (DB_TXN *) NULL,
+#endif
+                         &k_stat_p,
+#ifdef HAVE_UNKNOWN_PARAMETER_TO_DB_STAT
+                          NULL,
+#endif 
+                           0);
+    if (ret == 0)
+    {
+        k_size = (TROVE_size)k_stat_p->bt_ndata;
+        free(k_stat_p);
+
+        dbpf_attr_cache_ds_attr_update_cached_data_ksize(ref, k_size);
+    }
+    else
+    {
+        tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->stat");
+        ret = -dbpf_db_error_to_trove_error(ret);
+        goto return_error;
+    }
+
+    DBPF_DB_SYNC_IF_NECESSARY(op_p, tmp_ref.db_p);
+
+    dbpf_open_cache_put(&tmp_ref);
+    return 1;
+
+ return_error:
+    if (got_db)
+    {
+        dbpf_open_cache_put(&tmp_ref);
+    }
+    return ret;
 }
 
 static int dbpf_keyval_flush(TROVE_coll_id coll_id,

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.54.6.2 -r1.54.6.3
--- dbpf.h	7 Jun 2005 21:59:13 -0000	1.54.6.2
+++ dbpf.h	25 Aug 2005 20:38:28 -0000	1.54.6.3
@@ -214,8 +214,8 @@ struct dbpf_dspace_getattr_op
 
 struct dbpf_keyval_read_op
 {
-    TROVE_keyval_s key;
-    TROVE_keyval_s val;
+    TROVE_keyval_s *key;
+    TROVE_keyval_s *val;
     /* vtag? */
 };
 
@@ -233,6 +233,13 @@ struct dbpf_keyval_write_op
     /* vtag? */
 };
 
+struct dbpf_keyval_write_list_op
+{
+    TROVE_keyval_s *key_array;
+    TROVE_keyval_s *val_array;
+    int count; /* TODO: MAKE INOUT? */
+};
+
 struct dbpf_keyval_remove_op
 {
     TROVE_keyval_s key;
@@ -385,6 +392,7 @@ struct dbpf_op
         struct dbpf_keyval_remove_op k_remove;
         struct dbpf_keyval_iterate_op k_iterate;
         struct dbpf_keyval_read_list_op k_read_list;
+        struct dbpf_keyval_read_list_op k_write_list;
     } u;
 };
 



More information about the PVFS2-CVS mailing list