[Pvfs2-cvs] commit by rzhong in pvfs2/src/server: db-rep-send.sm

CVS commit program cvs at parl.clemson.edu
Thu Jul 24 17:29:44 EDT 2008


Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv24255/src/server

Added Files:
      Tag: rongrong
	db-rep-send.sm 
Log Message:
db-rep-send state machine added


--- /dev/null	2004-06-24 14:04:38.000000000 -0400
+++ db-rep-send.sm	2008-07-24 17:29:44.000000000 -0400
@@ -0,0 +1,290 @@
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-debug.h"
+#include "pvfs2-types.h"
+#include "job.h"
+#include "gossip.h"
+#include "str-utils.h"
+#include "pint-cached-config.h"
+#include "pint-util.h"
+#include "pvfs2-util.h"
+#include "PINT-reqproto-encode.h"
+#include "pvfs2-internal.h"
+#include "db.h"
+
+//extern job_context_id server_job_context;
+extern struct server_configuration_s server_config;
+
+typedef struct PINT_db_rep_send_sm
+{
+    PVFS_fs_id fs_id;
+
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+
+    PVFS_error error_code;
+    int comp_ct;
+
+    PINT_sm_msgarray_op msgarray_op;
+
+    PVFS_object_ref object_ref;
+    PVFS_object_ref parent_ref;
+
+    DB_ENV *dbenv;
+    const DBT *control;
+    const DBT *rec;
+    const DB_LSN *lsnp;
+    int eid;
+    u_int32_t flags;
+    PVFS_BMI_addr_t *server_addrs;
+    int site_count;
+}PINT_db_rep_send_sm;
+
+static int db_rep_send_state_machine_terminate(struct PINT_smcb *smcb, job_status_s *js_p);
+static struct PINT_state_machine_s *db_rep_send_state_get_machine(int op);
+
+%%
+
+machine pvfs2_db_rep_send_sm
+{
+    state setup_msgpairarray
+    {
+	run db_rep_setup_msgpairarray;
+	success => xfer_msgpairarray;
+        default => cleanup;
+    }
+
+    state xfer_msgpairarray
+    {
+	jump pvfs2_msgpairarray_sm;
+        default => cleanup;
+    }
+
+    state cleanup
+    {
+	run db_rep_cleanup;
+        default => terminate;
+    }
+}
+
+%%
+
+struct PINT_state_machine_s *db_rep_send_state_get_machine(int op)
+{
+    return &pvfs2_db_rep_send_sm;
+}
+
+int db_rep_send_state_machine_terminate(
+        struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+    pthread_mutex_lock(&sm_p->mutex);
+    pthread_cond_signal(&sm_p->cond);
+    pthread_mutex_unlock(&sm_p->mutex);
+    return 0;
+}
+
+/*The function has a "i" but it's actually blocking. I can't come up with a name... */
+PVFS_error PVFS_idb_rep_send(
+    DB_ENV *dbenv,
+    const DBT *control,
+    const DBT *rec,
+    const DB_LSN *lsnp,
+    int eid,
+    u_int32_t flags)
+{
+    PINT_smcb *smcb = NULL;
+    PINT_db_rep_send_sm *sm_p = NULL;
+    //job_context_id db_rep_send_context;
+    PINT_sm_msgpair_params *mpp;
+    server_configuration_s *server_config;
+    PINT_sm_action sm_ret;
+    job_status_s js;
+    //PVFS_error ret;
+
+    gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send entered\n");
+    /*ret = job_open_context(&db_rep_send_context);
+    if(ret)
+    {
+	gossip_err("PVFS_idb_rep_send: can't open new job context\n");
+	return ret;
+	}*/
+
+    PINT_smcb_alloc(&smcb, 0, 
+		    sizeof(struct PINT_db_rep_send_sm), 
+		    db_rep_send_state_get_machine,
+		    db_rep_send_state_machine_terminate,
+		    server_job_context);
+    if(smcb == NULL)
+    {
+	return -PVFS_ENOMEM;
+    }
+
+    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+		    
+    /*init msgarray params*/
+    mpp = &sm_p->msgarray_op.params;
+    mpp->job_context = server_job_context;
+    mpp->job_timeout = PVFS2_CLIENT_JOB_BMI_TIMEOUT_DEFAULT;
+    mpp->retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
+    mpp->retry_delay = PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
+
+
+    /*fill the sm*/
+    pthread_mutex_init(&sm_p->mutex, NULL);
+    pthread_cond_init(&sm_p->cond, NULL);
+    sm_p->dbenv = dbenv;
+    sm_p->control = control;
+    sm_p->rec = rec;
+    sm_p->lsnp = lsnp;
+    sm_p->eid = eid;
+    sm_p->flags = flags;
+
+    /*get fs_id and the list of BMI addrs*/
+    sm_p->server_addrs = 
+	(PVFS_BMI_addr_t *)malloc(sizeof(PVFS_BMI_addr_t) * 4/*fixed number for test*/);
+    memset(&sm_p->server_addrs, 0, sizeof(PVFS_BMI_addr_t) * 4);
+    server_config = get_server_config_struct();
+    if(server_config)
+    {
+	int i = 0;
+	PINT_llist *alias;
+	struct filesystem_configuration_s *fs_conf = NULL;
+	struct host_alias_s *cur_alias = NULL;
+	struct host_alias_s *rep_group = NULL;
+	/*get fs_id, BASED ON THE ASSUMPTION THAT THERE'S ONLY *ONE* FS IN SERVER_CONFIG*/
+	fs_conf = PINT_llist_head(server_config->file_systems);
+	sm_p->fs_id = fs_conf->coll_id;
+	/*get the list of BMI addrs*/
+	/* !!! the implementation need to be changed according to the format of the conf file !!! */
+	alias = server_config->host_aliases;
+	rep_group = PINT_llist_head(server_config->rep_groups);
+	while(alias)
+	{
+	    cur_alias = PINT_llist_head(alias);
+	    if(!cur_alias)
+	    {
+		break;
+	    }
+	    assert(cur_alias->host_alias);
+	    assert(cur_alias->bmi_address);
+	    if(!strcmp(rep_group->host_alias, cur_alias->host_alias) || 
+	       !strcmp(rep_group->bmi_address, cur_alias->host_alias))
+	    {
+		assert(BMI_addr_lookup(&sm_p->server_addrs[i++], cur_alias->bmi_address));
+		gossip_debug(GOSSIP_DB_REP_DEBUG, "BMI address: %s, BMI_addr_t: %lx\n", 
+			     cur_alias->bmi_address, sm_p->server_addrs[i-1]);
+	    }
+	    alias = PINT_llist_next(alias);
+	}
+	sm_p->site_count = i;
+	gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send site_count = %d\n", sm_p->site_count);
+    }
+
+    sm_p->error_code = 0;
+    memset(&js, 0, sizeof(js));
+    sm_ret = PINT_state_machine_start(smcb, &js);
+    assert(SM_ACTION_ISVALID(sm_ret));
+    if(PINT_smcb_complete(smcb))
+    {
+	assert(sm_ret == SM_ACTION_TERMINATE);
+	PINT_smcb_free(smcb);
+	gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send sm terminated\n");
+    }
+    else
+    {
+	pthread_mutex_lock(&sm_p->mutex);
+	pthread_cond_wait(&sm_p->cond, &sm_p->mutex);
+	assert(PINT_smcb_complete(smcb));
+	pthread_mutex_unlock(&sm_p->mutex);
+	gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send sm terminated after wait\n");
+    }
+    return js.error_code;
+}
+
+int PVFS_db_rep_send(
+    DB_ENV *dbenv, 
+    const DBT *control, 
+    const DBT *rec,
+    const DB_LSN *lsnp,
+    int eid,
+    u_int32_t flags)
+{
+    int ret;
+
+    gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_db_rep_send entered\n");
+    ret = PVFS_idb_rep_send(dbenv, control, rec, lsnp, eid, flags);
+    if(ret)
+    {
+	PVFS_perror_gossip("PVFS_idb_rep_send call", ret);
+    }
+    return ret;
+}
+
+static PINT_sm_action db_rep_setup_msgpairarray(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    int count = 0;
+    int ret = -PVFS_EINVAL;
+    PINT_sm_msgpair_state *msg_p;
+    PVFS_ds_keyval control, rec;
+    int index;
+
+    js_p->error_code = 0;
+
+    if(sm_p->eid == DB_EID_BROADCAST)
+    {
+	count = sm_p->site_count;
+    }
+    else
+    {
+	count = 1;
+    }
+    ret = PINT_msgpairarray_init(&sm_p->msgarray_op, count);
+    if(ret != 0)
+    {
+	gossip_err("Failed to initialize %d msgpairs\n", count);
+	js_p->error_code = ret;
+	return SM_ACTION_COMPLETE;
+    }
+
+    control.buffer_sz = sm_p->control->size;
+    control.buffer = sm_p->control->data;
+    rec.buffer_sz = sm_p->rec->size;
+    rec.buffer = sm_p->rec->data;
+    foreach_msgpair(&sm_p->msgarray_op, msg_p, index)
+    {
+	assert(msg_p);
+	PINT_SERVREQ_DBREP_FILL(msg_p->req, control, rec);
+	msg_p->fs_id = sm_p->fs_id;
+	msg_p->handle = 0;
+	msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
+	msg_p->comp_fn = NULL;
+	if(sm_p->eid >= 0)
+	{
+	    msg_p->svr_addr = sm_p->server_addrs[sm_p->eid];
+	}
+	else
+	{
+	    msg_p->svr_addr = sm_p->server_addrs[index];
+	}
+    }
+
+    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
+    return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action db_rep_cleanup(
+    struct PINT_smcb *smcb, job_status_s *js_p)
+{
+    struct PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+    sm_p->error_code = js_p->error_code;
+    free(sm_p->server_addrs);
+    PINT_SET_OP_COMPLETE;
+    return SM_ACTION_TERMINATE;
+}



More information about the Pvfs2-cvs mailing list