diff -urN --exclude=.svn ../../2.6-old/pvfs2_src/src/io/trove/trove-dbpf/dbpf-open-cache.c ./src/io/trove/trove-dbpf/dbpf-open-cache.c --- ../../2.6-old/pvfs2_src/src/io/trove/trove-dbpf/dbpf-open-cache.c 2008-10-06 16:29:29.000000000 -0500 +++ ./src/io/trove/trove-dbpf/dbpf-open-cache.c 2008-10-13 15:38:40.000000000 -0500 @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "trove.h" @@ -42,6 +43,27 @@ struct qlist_head queue_link; }; +struct unlink_context +{ + pthread_t thread_id; + pthread_mutex_t mutex; + pthread_cond_t data_available; + struct qlist_head global_list; +}; + +struct file_struct +{ + struct qlist_head list_link; + char *pathname; +}; + +static struct unlink_context dbpf_unlink_context; +static void* unlink_bstream(void *context); +static int fast_unlink( + const char *pathname, + TROVE_coll_id coll_id, + TROVE_handle handle); + /* "used_list" is for active objects (ref_ct > 0) */ static QLIST_HEAD(used_list); /* "unused_list" is for inactive objects (ref_ct == 0) that we are still @@ -69,7 +91,7 @@ void dbpf_open_cache_initialize(void) { - int i = 0; + int i = 0, ret = 0; gen_mutex_lock(&cache_mutex); @@ -88,6 +110,17 @@ } gen_mutex_unlock(&cache_mutex); + + /* Initialize and create the worker thread for threaded deletes */ + INIT_QLIST_HEAD(&dbpf_unlink_context.global_list); + pthread_mutex_init(&dbpf_unlink_context.mutex, NULL); + pthread_cond_init(&dbpf_unlink_context.data_available, NULL); + ret = pthread_create(&dbpf_unlink_context.thread_id, NULL, unlink_bstream, (void*)&dbpf_unlink_context); + if(ret) + { + gossip_err("dbpf_open_cache_initialize: failed [%d]\n", ret); + return; + } } static void dbpf_open_cache_entries_finalize( @@ -104,6 +137,8 @@ dbpf_open_cache_entries_finalize(&free_list); gen_mutex_unlock(&cache_mutex); + + pthread_cancel(dbpf_unlink_context.thread_id); } /** @@ -321,8 +356,8 @@ int found = 0; char filename[PATH_MAX]; int ret = -1; - struct qlist_head* scratch; int tmp_error = 0; + struct qlist_head* scratch; gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, "dbpf_open_cache_remove: called\n"); @@ -382,15 +417,13 @@ DBPF_GET_BSTREAM_FILENAME(filename, PATH_MAX, my_storage_p->name, coll_id, llu(handle)); - ret = DBPF_UNLINK(filename); + ret = fast_unlink(filename, coll_id, handle); + if ((ret != 0) && (errno != ENOENT)) { - tmp_error = -trove_errno_to_trove_error(errno); + tmp_error = -trove_errno_to_trove_error(errno); } - gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, "Unlinked filename: " - "(ret=%d, errno=%d)\n%s\n", ret, errno, filename); - gen_mutex_unlock(&cache_mutex); gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, @@ -446,6 +479,8 @@ } qlist_del(&entry->queue_link); } + /* Cancel the deletion thread */ + pthread_cancel(dbpf_unlink_context.thread_id); } inline static struct open_cache_entry * dbpf_open_cache_find_entry( @@ -475,6 +510,100 @@ return NULL; } +int fast_unlink(const char *pathname, TROVE_coll_id coll_id, TROVE_handle handle) +{ + int ret; + struct file_struct *tmp_item; + + tmp_item = (struct file_struct *) malloc(sizeof(struct file_struct)); + if(!tmp_item) + { + gossip_err("Unable to allocate memory for file_struct [%d].\n", errno); + return -TROVE_ENOMEM; + } + tmp_item->pathname = malloc(PATH_MAX); + if(!tmp_item->pathname) + { + gossip_err("Unable to allocate memory for pathname[%d].\n", errno); + free(tmp_item); + return -TROVE_ENOMEM; + } + DBPF_GET_STRANDED_BSTREAM_FILENAME(tmp_item->pathname, PATH_MAX, + my_storage_p->name, + coll_id, + llu(handle)); + + gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, + "Renaming [%s] to [%s] for threaded delete.\n", pathname, tmp_item->pathname); + + ret = rename(pathname, tmp_item->pathname); + if(ret != 0) + { + gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, + "Warning: During unlink, the rename failed on file [%s] with errno [%d] strerr [%s].\n", + pathname, errno, strerror(errno)); + free(tmp_item->pathname); + free(tmp_item); + return ret; + } + + /* Add to the queue */ + pthread_mutex_lock(&dbpf_unlink_context.mutex); + qlist_add_tail(&tmp_item->list_link, &dbpf_unlink_context.global_list); + pthread_cond_signal(&dbpf_unlink_context.data_available); + pthread_mutex_unlock(&dbpf_unlink_context.mutex); + gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, + "Added [%s] to the queue.\n", tmp_item->pathname); + + return(0); +} + +static void* unlink_bstream(void *context) +{ + struct unlink_context *loc_context = (struct unlink_context *) context; + int ret; + time_t start_time; + struct qlist_head *tmp_item; + struct file_struct *tmp_st; + + while(1) + { + pthread_mutex_lock(&loc_context->mutex); + /* If there is no work to do, go into a condition wait */ + if(qlist_empty(&loc_context->global_list)) + { + pthread_cond_wait(&loc_context->data_available, &loc_context->mutex); + } + + if(!qlist_empty(&loc_context->global_list)) + { + tmp_item = loc_context->global_list.next; + qlist_del(tmp_item); + pthread_mutex_unlock(&loc_context->mutex); + } + else /* Condition triggered without items in qlist */ + { + pthread_mutex_unlock(&loc_context->mutex); + gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, + "Unlink condition triggered when qlist empty\n"); + continue; /* Enter while loop again */ + } + + tmp_st = qlist_entry(tmp_item, struct file_struct, list_link); + time(&start_time); + ret = DBPF_UNLINK(tmp_st->pathname); + gossip_debug(GOSSIP_DBPF_OPEN_CACHE_DEBUG, + "Unlinked filename: (ret=%d, errno=%d, elapsed-time=%ld(secs) )\n%s\n", + ret, errno, (time(NULL) - start_time), tmp_st->pathname); + free(tmp_st->pathname); + free(tmp_st); + } + + pthread_exit(&loc_context->thread_id); + return NULL; +} + + /* * Local variables: * c-indent-level: 4