[PVFS2-CVS]
commit by slang in pvfs2/src/io/trove/trove-dbpf: dbpf-attr-cache.c
dbpf-keyval.c dbpf.h
CVS commit program
cvs at parl.clemson.edu
Thu Aug 25 17:38:29 EDT 2005
Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb:/tmp/cvs-serv7520/src/io/trove/trove-dbpf
Modified Files:
Tag: slang-event-changes-branch
dbpf-attr-cache.c dbpf-keyval.c dbpf.h
Log Message:
updates to my event changes to bring them inline with trunk
Index: dbpf-attr-cache.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-attr-cache.c,v
diff -p -u -r1.15 -r1.15.6.1
--- dbpf-attr-cache.c 17 Sep 2004 21:15:42 -0000 1.15
+++ dbpf-attr-cache.c 25 Aug 2005 20:38:28 -0000 1.15.6.1
@@ -9,6 +9,7 @@
#include "gossip.h"
#include "dbpf-attr-cache.h"
#include "gen-locks.h"
+#include "str-utils.h"
/* these are based on code from src/server/request-scheduler.c */
static int hash_key(void *key, int table_size);
@@ -19,9 +20,7 @@ static int s_cache_size = DBPF_ATTR_CACH
static int s_max_num_cache_elems =
DBPF_ATTR_CACHE_DEFAULT_MAX_NUM_CACHE_ELEMS;
static struct qhash_table *s_key_to_attr_table = NULL;
-static char *s_cacheable_keywords = NULL;
-static char *s_cacheable_keyword_array[
- DBPF_ATTR_CACHE_MAX_NUM_KEYVALS] = {0};
+static char **s_cacheable_keyword_array = NULL;
static int s_cacheable_keyword_array_size = 0;
static int s_current_num_cache_elems = 0;
@@ -48,12 +47,9 @@ int dbpf_attr_cache_set_keywords(char *k
gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "Setting dbpf_attr_cache "
"keywords to:\n%s\n", keywords);
- if (s_cacheable_keywords)
- {
- free(s_cacheable_keywords);
- }
- s_cacheable_keywords = strdup(keywords);
- return (s_cacheable_keywords ? 0 : -1);
+ s_cacheable_keyword_array_size = PINT_split_string_list(
+ &s_cacheable_keyword_array, keywords);
+ return (s_cacheable_keyword_array ? 0 : -1);
}
int dbpf_attr_cache_set_size(int cache_size)
@@ -76,47 +72,12 @@ int dbpf_attr_cache_set_max_num_elems(in
*/
int dbpf_attr_cache_do_initialize(void)
{
- int ret = -1, num_keywords = 0;
- char *ptr = NULL, *start = NULL, *end = NULL;
- char *limit = NULL, *tmp = NULL;
-
- if (s_cacheable_keywords)
- {
- /* freed in finalize */
- tmp = strdup(s_cacheable_keywords);
- limit = (char *)(tmp + strlen(tmp));
-
- /* break up keywords into an array here */
- ptr = start = tmp;
- for(; (ptr && (start != limit)); ptr++)
- {
- if ((*ptr == '\0') || (*ptr == ' ') || (*ptr == ','))
- {
- end = ptr;
- }
- if (start && end)
- {
- *end = '\0';
- s_cacheable_keyword_array[num_keywords++] = start;
-
- gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "Got cacheable "
- "attribute keyword %s\n",start);
-
- start = ++end;
- if (start >= limit)
- {
- break;
- }
- end = NULL;
- }
- }
- }
-
+ int ret = -1;
gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "There are %d cacheable "
- "keywords registered\n", num_keywords);
+ "keywords registered\n", s_cacheable_keyword_array_size);
ret = dbpf_attr_cache_initialize(
s_cache_size, s_max_num_cache_elems,
- s_cacheable_keyword_array, num_keywords);
+ s_cacheable_keyword_array, s_cacheable_keyword_array_size);
return ret;
}
@@ -150,7 +111,6 @@ int dbpf_attr_cache_initialize(
goto return_error;
}
- s_cacheable_keyword_array_size = num_cacheable_keywords;
/*
NOTE: our keyword array must have already
been built by the do_initialize call.
@@ -235,19 +195,11 @@ int dbpf_attr_cache_finalize(void)
"dbpf_attr_cache_finalized\n");
}
- if (s_cacheable_keywords)
+ if (s_cacheable_keyword_array && s_cacheable_keyword_array_size)
{
- free(s_cacheable_keywords);
- s_cacheable_keywords = NULL;
-
- /*
- NOTE: this array was not allocated; it pointed
- into s_cacheable_keywords string above
- */
- for(i = 0; i < s_cacheable_keyword_array_size; i++)
- {
- s_cacheable_keyword_array[i] = NULL;
- }
+ PINT_free_string_list(s_cacheable_keyword_array,
+ s_cacheable_keyword_array_size);
+ s_cacheable_keyword_array = NULL;
s_cacheable_keyword_array_size = 0;
}
return ret;
@@ -564,7 +516,7 @@ int dbpf_attr_cache_insert(
memset(cache_elem, 0, sizeof(dbpf_attr_cache_elem_t));
}
- if (s_cacheable_keywords)
+ if (s_cacheable_keyword_array)
{
/* initialize all of the keyvals we're able to cache */
for(i = 0; i < s_cacheable_keyword_array_size; i++)
@@ -622,7 +574,7 @@ int dbpf_attr_cache_remove(TROVE_object_
"removing %Lu\n", Lu(key.handle));
/* free any keyval data cached as well */
- if (s_cacheable_keywords)
+ if (s_cacheable_keyword_array)
{
/* free all of the keyvals we've cached */
assert(s_cacheable_keyword_array_size ==
Index: dbpf-keyval.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-keyval.c,v
diff -p -u -r1.48 -r1.48.6.1
--- dbpf-keyval.c 13 Jan 2005 17:47:21 -0000 1.48
+++ dbpf-keyval.c 25 Aug 2005 20:38:28 -0000 1.48.6.1
@@ -4,6 +4,17 @@
* See COPYING in top-level directory.
*/
+/*
+ * dbpf_keyval_write semantics:
+ *
+ * Value buffer is supplied by user with size of buffer
+ * If size is too small DB returns an error (Cannot allocated memory)
+ * and return the size of the buffer needed in val_p->read_sz
+ *
+ * WBL 6/05
+ *
+ */
+
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
@@ -24,6 +35,7 @@
static int dbpf_keyval_read_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_read_list_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_write_op_svc(struct dbpf_op *op_p);
+static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_remove_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_iterate_op_svc(struct dbpf_op *op_p);
static int dbpf_keyval_flush_op_svc(struct dbpf_op *op_p);
@@ -87,8 +99,8 @@ static int dbpf_keyval_read(TROVE_coll_i
context_id);
/* initialize the op-specific members */
- q_op_p->op.u.k_read.key = *key_p;
- q_op_p->op.u.k_read.val = *val_p;
+ q_op_p->op.u.k_read.key = key_p;
+ q_op_p->op.u.k_read.val = val_p;
*out_op_id_p = dbpf_queued_op_queue(q_op_p);
@@ -114,12 +126,12 @@ static int dbpf_keyval_read_op_svc(struc
}
memset(&key, 0, sizeof(key));
- key.data = op_p->u.k_read.key.buffer;
- key.size = op_p->u.k_read.key.buffer_sz;
+ key.data = op_p->u.k_read.key->buffer;
+ key.size = op_p->u.k_read.key->buffer_sz;
memset(&data, 0, sizeof(data));
- data.data = op_p->u.k_read.val.buffer;
- data.ulen = op_p->u.k_read.val.buffer_sz;
+ data.data = op_p->u.k_read.val->buffer;
+ data.ulen = op_p->u.k_read.val->buffer_sz;
data.flags = DB_DBT_USERMEM;
ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &data, 0);
@@ -130,15 +142,25 @@ static int dbpf_keyval_read_op_svc(struc
"key=%s (%s)\n", Lu(op_p->handle),
(char *)key.data, db_strerror(ret));
+ /* if data buffer is too small returns a memory error */
+ if (data.ulen < data.size)
+ {
+ gossip_debug(GOSSIP_TROVE_DEBUG,
+ "warning: Value buffer too small %d < %d\n",
+ data.ulen, data.size);
+ /* let the user know */
+ op_p->u.k_read.val->read_sz = data.size;
+ }
+
ret = -dbpf_db_error_to_trove_error(ret);
goto return_error;
}
- op_p->u.k_read.val.read_sz = data.size;
+ op_p->u.k_read.val->read_sz = data.size;
/* cache this data in the attr cache if we can */
if (dbpf_attr_cache_elem_set_data_based_on_key(
- ref, op_p->u.k_read.key.buffer,
- op_p->u.k_read.val.buffer, data.size))
+ ref, op_p->u.k_read.key->buffer,
+ op_p->u.k_read.val->buffer, data.size))
{
/*
NOTE: this can happen if the keyword isn't registered, or if
@@ -146,14 +168,14 @@ static int dbpf_keyval_read_op_svc(struc
*/
gossip_debug(
GOSSIP_DBPF_ATTRCACHE_DEBUG,"** CANNOT cache data retrieved "
- "(key is %s)\n", (char *)op_p->u.k_read.key.buffer);
+ "(key is %s)\n", (char *)op_p->u.k_read.key->buffer);
}
else
{
gossip_debug(
GOSSIP_DBPF_ATTRCACHE_DEBUG,"*** cached keyval data "
"retrieved (key is %s)\n",
- (char *)op_p->u.k_read.key.buffer);
+ (char *)op_p->u.k_read.key->buffer);
}
dbpf_open_cache_put(&tmp_ref);
@@ -219,6 +241,7 @@ static int dbpf_keyval_write_op_svc(stru
TROVE_object_ref ref = {op_p->handle, op_p->coll_p->coll_id};
TROVE_size k_size;
DB_BTREE_STAT *k_stat_p = NULL;
+ u_int32_t dbflags = 0;
ret = dbpf_open_cache_get(
op_p->coll_p->coll_id, op_p->handle, 1, DBPF_OPEN_DB, &tmp_ref);
@@ -238,7 +261,47 @@ static int dbpf_keyval_write_op_svc(stru
data.data = op_p->u.k_write.val.buffer;
data.size = op_p->u.k_write.val.buffer_sz;
- ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, 0);
+ /* If TROVE_NOOVERWRITE flag was set, make sure that we don't create the
+ * key if it exists */
+ if ((op_p->flags & TROVE_NOOVERWRITE))
+ {
+ dbflags |= DB_NOOVERWRITE;
+ }
+ /* if TROVE_ONLYOVERWRITE flag was set, make sure that the key exists
+ * before overwriting it */
+ else if ((op_p->flags & TROVE_ONLYOVERWRITE))
+ {
+ DBT tmpdata;
+
+ memset(&tmpdata, 0, sizeof(tmpdata));
+ tmpdata.ulen = op_p->u.k_write.val.buffer_sz;
+ tmpdata.data = (void *) malloc(tmpdata.ulen);
+ tmpdata.flags = DB_DBT_USERMEM;
+ ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &tmpdata, 0);
+ /* A failed get implies that keys possibly did not exist */
+ if (ret != 0)
+ {
+ /* The only case where we are ok is val buffer
+ * is too small */
+ if (tmpdata.ulen < tmpdata.size)
+ {
+ ret = 0;
+ }
+ else
+ {
+ ret = -dbpf_db_error_to_trove_error(ret);
+ }
+ }
+ free(tmpdata.data);
+ /* If there was an error, we need to return right here */
+ if (ret != 0)
+ {
+ goto return_error;
+ }
+ }
+
+ ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, dbflags);
+ /* Either a put error or key already exists */
if (ret != 0)
{
tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->put");
@@ -816,7 +879,176 @@ static int dbpf_keyval_write_list(TROVE_
TROVE_context_id context_id,
TROVE_op_id *out_op_id_p)
{
- return -TROVE_ENOSYS;
+ dbpf_queued_op_t *q_op_p = NULL;
+ struct dbpf_collection *coll_p = NULL;
+
+ coll_p = dbpf_collection_find_registered(coll_id);
+ if (coll_p == NULL)
+ {
+ return -TROVE_EINVAL;
+ }
+ q_op_p = dbpf_queued_op_alloc();
+ if (q_op_p == NULL)
+ {
+ return -TROVE_ENOMEM;
+ }
+
+ /* initialize all the common members */
+ dbpf_queued_op_init(q_op_p,
+ KEYVAL_WRITE_LIST,
+ handle,
+ coll_p,
+ dbpf_keyval_write_list_op_svc,
+ user_ptr,
+ flags,
+ context_id);
+
+ /* initialize the op-specific members */
+ q_op_p->op.u.k_write_list.key_array = key_array;
+ q_op_p->op.u.k_write_list.val_array = val_array;
+ q_op_p->op.u.k_write_list.count = count;
+
+ *out_op_id_p = dbpf_queued_op_queue(q_op_p);
+
+ return 0;
+}
+
+static int dbpf_keyval_write_list_op_svc(struct dbpf_op *op_p)
+{
+ int ret = -TROVE_EINVAL, got_db = 0;
+ struct open_cache_ref tmp_ref;
+ DBT key, data;
+ dbpf_attr_cache_elem_t *cache_elem = NULL;
+ TROVE_object_ref ref = {op_p->handle, op_p->coll_p->coll_id};
+ TROVE_size k_size;
+ DB_BTREE_STAT *k_stat_p = NULL;
+ int k;
+
+ ret = dbpf_open_cache_get(
+ op_p->coll_p->coll_id, op_p->handle, 1, DBPF_OPEN_DB, &tmp_ref);
+ if (ret < 0)
+ {
+ goto return_error;
+ }
+ else
+ {
+ got_db = 1;
+ }
+
+ if ((op_p->flags & TROVE_NOOVERWRITE)
+ || (op_p->flags & TROVE_ONLYOVERWRITE))
+ {
+ /* read each key to see if it is present */
+ for (k = 0; k < op_p->u.k_write_list.count; k++)
+ {
+ memset(&key, 0, sizeof(key));
+ memset(&data, 0, sizeof(data));
+ key.data = op_p->u.k_write_list.key_array[k].buffer;
+ key.size = op_p->u.k_write_list.key_array[k].buffer_sz;
+
+ ret = tmp_ref.db_p->get(tmp_ref.db_p, NULL, &key, &data, 0);
+ if (ret != 0)
+ {
+ if ((op_p->flags & TROVE_NOOVERWRITE) && (ret == DB_NOTFOUND))
+ {
+ /* this means key is not in DB, which is what we
+ * want for the no-overwrite case - so go to the next key
+ */
+ continue;
+ }
+ tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->get");
+ ret = -dbpf_db_error_to_trove_error(ret);
+ goto return_error;
+ }
+ }
+ }
+
+ for (k = 0; k < op_p->u.k_write_list.count; k++)
+ {
+ memset(&key, 0, sizeof(key));
+ memset(&data, 0, sizeof(data));
+ key.data = op_p->u.k_write_list.key_array[k].buffer;
+ key.size = op_p->u.k_write_list.key_array[k].buffer_sz;
+ data.data = op_p->u.k_write_list.val_array[k].buffer;
+ data.size = op_p->u.k_write_list.val_array[k].buffer_sz;
+
+ ret = tmp_ref.db_p->put(tmp_ref.db_p, NULL, &key, &data, 0);
+ if (ret != 0)
+ {
+ tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->put");
+ ret = -dbpf_db_error_to_trove_error(ret);
+ goto return_error;
+ }
+
+ gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "*** Trove KeyVal Write "
+ "of %s\n", (char *)op_p->u.k_write_list.key_array[k].buffer);
+
+ /*
+ now that the data is written to disk, update the cache if it's
+ an attr keyval we manage.
+ */
+ cache_elem = dbpf_attr_cache_elem_lookup(ref);
+ if (cache_elem)
+ {
+ if (dbpf_attr_cache_elem_set_data_based_on_key(
+ ref, op_p->u.k_write_list.key_array[k].buffer,
+ op_p->u.k_write_list.val_array[k].buffer, data.size))
+ {
+ /*
+ NOTE: this can happen if the keyword isn't registered,
+ or if there is no associated cache_elem for this key
+ */
+ gossip_debug(
+ GOSSIP_DBPF_ATTRCACHE_DEBUG,"** CANNOT cache data written "
+ "(key is %s)\n", (char *)op_p->u.k_write_list.key_array[k].buffer);
+ }
+ else
+ {
+ gossip_debug(
+ GOSSIP_DBPF_ATTRCACHE_DEBUG,"*** cached keyval data "
+ "written (key is %s)\n",
+ (char *)op_p->u.k_write_list.key_array[k].buffer);
+ }
+ }
+ }
+
+ /* this may have increased the number of keyvals stored in this
+ * object; update the k_size in the cache
+ */
+ ret = tmp_ref.db_p->stat(tmp_ref.db_p,
+#ifdef HAVE_TXNID_PARAMETER_TO_DB_STAT
+ (DB_TXN *) NULL,
+#endif
+ &k_stat_p,
+#ifdef HAVE_UNKNOWN_PARAMETER_TO_DB_STAT
+ NULL,
+#endif
+ 0);
+ if (ret == 0)
+ {
+ k_size = (TROVE_size)k_stat_p->bt_ndata;
+ free(k_stat_p);
+
+ dbpf_attr_cache_ds_attr_update_cached_data_ksize(ref, k_size);
+ }
+ else
+ {
+ tmp_ref.db_p->err(tmp_ref.db_p, ret, "DB->stat");
+ ret = -dbpf_db_error_to_trove_error(ret);
+ goto return_error;
+ }
+
+ DBPF_DB_SYNC_IF_NECESSARY(op_p, tmp_ref.db_p);
+
+ dbpf_open_cache_put(&tmp_ref);
+ return 1;
+
+ return_error:
+ if (got_db)
+ {
+ dbpf_open_cache_put(&tmp_ref);
+ }
+ return ret;
}
static int dbpf_keyval_flush(TROVE_coll_id coll_id,
Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.54.6.2 -r1.54.6.3
--- dbpf.h 7 Jun 2005 21:59:13 -0000 1.54.6.2
+++ dbpf.h 25 Aug 2005 20:38:28 -0000 1.54.6.3
@@ -214,8 +214,8 @@ struct dbpf_dspace_getattr_op
struct dbpf_keyval_read_op
{
- TROVE_keyval_s key;
- TROVE_keyval_s val;
+ TROVE_keyval_s *key;
+ TROVE_keyval_s *val;
/* vtag? */
};
@@ -233,6 +233,13 @@ struct dbpf_keyval_write_op
/* vtag? */
};
+struct dbpf_keyval_write_list_op
+{
+ TROVE_keyval_s *key_array;
+ TROVE_keyval_s *val_array;
+ int count; /* TODO: MAKE INOUT? */
+};
+
struct dbpf_keyval_remove_op
{
TROVE_keyval_s key;
@@ -385,6 +392,7 @@ struct dbpf_op
struct dbpf_keyval_remove_op k_remove;
struct dbpf_keyval_iterate_op k_iterate;
struct dbpf_keyval_read_list_op k_read_list;
+ struct dbpf_keyval_read_list_op k_write_list;
} u;
};
More information about the PVFS2-CVS
mailing list