[Pvfs2-cvs] commit by rzhong in pvfs2/src/server: db-rep-send.sm
CVS commit program
cvs at parl.clemson.edu
Thu Jul 24 17:29:44 EDT 2008
Update of /projects/cvsroot/pvfs2/src/server
In directory parlweb1:/tmp/cvs-serv24255/src/server
Added Files:
Tag: rongrong
db-rep-send.sm
Log Message:
db-rep-send state machine added
--- /dev/null 2004-06-24 14:04:38.000000000 -0400
+++ db-rep-send.sm 2008-07-24 17:29:44.000000000 -0400
@@ -0,0 +1,290 @@
+#include <string.h>
+#include <assert.h>
+
+#include "server-config.h"
+#include "pvfs2-server.h"
+#include "pvfs2-debug.h"
+#include "pvfs2-types.h"
+#include "job.h"
+#include "gossip.h"
+#include "str-utils.h"
+#include "pint-cached-config.h"
+#include "pint-util.h"
+#include "pvfs2-util.h"
+#include "PINT-reqproto-encode.h"
+#include "pvfs2-internal.h"
+#include "db.h"
+
+//extern job_context_id server_job_context;
+extern struct server_configuration_s server_config;
+
+typedef struct PINT_db_rep_send_sm
+{
+ PVFS_fs_id fs_id;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+
+ PVFS_error error_code;
+ int comp_ct;
+
+ PINT_sm_msgarray_op msgarray_op;
+
+ PVFS_object_ref object_ref;
+ PVFS_object_ref parent_ref;
+
+ DB_ENV *dbenv;
+ const DBT *control;
+ const DBT *rec;
+ const DB_LSN *lsnp;
+ int eid;
+ u_int32_t flags;
+ PVFS_BMI_addr_t *server_addrs;
+ int site_count;
+}PINT_db_rep_send_sm;
+
+static int db_rep_send_state_machine_terminate(struct PINT_smcb *smcb, job_status_s *js_p);
+static struct PINT_state_machine_s *db_rep_send_state_get_machine(int op);
+
+%%
+
+machine pvfs2_db_rep_send_sm
+{
+ state setup_msgpairarray
+ {
+ run db_rep_setup_msgpairarray;
+ success => xfer_msgpairarray;
+ default => cleanup;
+ }
+
+ state xfer_msgpairarray
+ {
+ jump pvfs2_msgpairarray_sm;
+ default => cleanup;
+ }
+
+ state cleanup
+ {
+ run db_rep_cleanup;
+ default => terminate;
+ }
+}
+
+%%
+
+struct PINT_state_machine_s *db_rep_send_state_get_machine(int op)
+{
+ return &pvfs2_db_rep_send_sm;
+}
+
+int db_rep_send_state_machine_terminate(
+ struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+ pthread_mutex_lock(&sm_p->mutex);
+ pthread_cond_signal(&sm_p->cond);
+ pthread_mutex_unlock(&sm_p->mutex);
+ return 0;
+}
+
+/*The function has a "i" but it's actually blocking. I can't come up with a name... */
+PVFS_error PVFS_idb_rep_send(
+ DB_ENV *dbenv,
+ const DBT *control,
+ const DBT *rec,
+ const DB_LSN *lsnp,
+ int eid,
+ u_int32_t flags)
+{
+ PINT_smcb *smcb = NULL;
+ PINT_db_rep_send_sm *sm_p = NULL;
+ //job_context_id db_rep_send_context;
+ PINT_sm_msgpair_params *mpp;
+ server_configuration_s *server_config;
+ PINT_sm_action sm_ret;
+ job_status_s js;
+ //PVFS_error ret;
+
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send entered\n");
+ /*ret = job_open_context(&db_rep_send_context);
+ if(ret)
+ {
+ gossip_err("PVFS_idb_rep_send: can't open new job context\n");
+ return ret;
+ }*/
+
+ PINT_smcb_alloc(&smcb, 0,
+ sizeof(struct PINT_db_rep_send_sm),
+ db_rep_send_state_get_machine,
+ db_rep_send_state_machine_terminate,
+ server_job_context);
+ if(smcb == NULL)
+ {
+ return -PVFS_ENOMEM;
+ }
+
+ sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+
+ /*init msgarray params*/
+ mpp = &sm_p->msgarray_op.params;
+ mpp->job_context = server_job_context;
+ mpp->job_timeout = PVFS2_CLIENT_JOB_BMI_TIMEOUT_DEFAULT;
+ mpp->retry_limit = PVFS2_CLIENT_RETRY_LIMIT_DEFAULT;
+ mpp->retry_delay = PVFS2_CLIENT_RETRY_DELAY_MS_DEFAULT;
+
+
+ /*fill the sm*/
+ pthread_mutex_init(&sm_p->mutex, NULL);
+ pthread_cond_init(&sm_p->cond, NULL);
+ sm_p->dbenv = dbenv;
+ sm_p->control = control;
+ sm_p->rec = rec;
+ sm_p->lsnp = lsnp;
+ sm_p->eid = eid;
+ sm_p->flags = flags;
+
+ /*get fs_id and the list of BMI addrs*/
+ sm_p->server_addrs =
+ (PVFS_BMI_addr_t *)malloc(sizeof(PVFS_BMI_addr_t) * 4/*fixed number for test*/);
+ memset(&sm_p->server_addrs, 0, sizeof(PVFS_BMI_addr_t) * 4);
+ server_config = get_server_config_struct();
+ if(server_config)
+ {
+ int i = 0;
+ PINT_llist *alias;
+ struct filesystem_configuration_s *fs_conf = NULL;
+ struct host_alias_s *cur_alias = NULL;
+ struct host_alias_s *rep_group = NULL;
+ /*get fs_id, BASED ON THE ASSUMPTION THAT THERE'S ONLY *ONE* FS IN SERVER_CONFIG*/
+ fs_conf = PINT_llist_head(server_config->file_systems);
+ sm_p->fs_id = fs_conf->coll_id;
+ /*get the list of BMI addrs*/
+ /* !!! the implementation need to be changed according to the format of the conf file !!! */
+ alias = server_config->host_aliases;
+ rep_group = PINT_llist_head(server_config->rep_groups);
+ while(alias)
+ {
+ cur_alias = PINT_llist_head(alias);
+ if(!cur_alias)
+ {
+ break;
+ }
+ assert(cur_alias->host_alias);
+ assert(cur_alias->bmi_address);
+ if(!strcmp(rep_group->host_alias, cur_alias->host_alias) ||
+ !strcmp(rep_group->bmi_address, cur_alias->host_alias))
+ {
+ assert(BMI_addr_lookup(&sm_p->server_addrs[i++], cur_alias->bmi_address));
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "BMI address: %s, BMI_addr_t: %lx\n",
+ cur_alias->bmi_address, sm_p->server_addrs[i-1]);
+ }
+ alias = PINT_llist_next(alias);
+ }
+ sm_p->site_count = i;
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send site_count = %d\n", sm_p->site_count);
+ }
+
+ sm_p->error_code = 0;
+ memset(&js, 0, sizeof(js));
+ sm_ret = PINT_state_machine_start(smcb, &js);
+ assert(SM_ACTION_ISVALID(sm_ret));
+ if(PINT_smcb_complete(smcb))
+ {
+ assert(sm_ret == SM_ACTION_TERMINATE);
+ PINT_smcb_free(smcb);
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send sm terminated\n");
+ }
+ else
+ {
+ pthread_mutex_lock(&sm_p->mutex);
+ pthread_cond_wait(&sm_p->cond, &sm_p->mutex);
+ assert(PINT_smcb_complete(smcb));
+ pthread_mutex_unlock(&sm_p->mutex);
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_idb_rep_send sm terminated after wait\n");
+ }
+ return js.error_code;
+}
+
+int PVFS_db_rep_send(
+ DB_ENV *dbenv,
+ const DBT *control,
+ const DBT *rec,
+ const DB_LSN *lsnp,
+ int eid,
+ u_int32_t flags)
+{
+ int ret;
+
+ gossip_debug(GOSSIP_DB_REP_DEBUG, "PVFS_db_rep_send entered\n");
+ ret = PVFS_idb_rep_send(dbenv, control, rec, lsnp, eid, flags);
+ if(ret)
+ {
+ PVFS_perror_gossip("PVFS_idb_rep_send call", ret);
+ }
+ return ret;
+}
+
+static PINT_sm_action db_rep_setup_msgpairarray(
+ struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ int count = 0;
+ int ret = -PVFS_EINVAL;
+ PINT_sm_msgpair_state *msg_p;
+ PVFS_ds_keyval control, rec;
+ int index;
+
+ js_p->error_code = 0;
+
+ if(sm_p->eid == DB_EID_BROADCAST)
+ {
+ count = sm_p->site_count;
+ }
+ else
+ {
+ count = 1;
+ }
+ ret = PINT_msgpairarray_init(&sm_p->msgarray_op, count);
+ if(ret != 0)
+ {
+ gossip_err("Failed to initialize %d msgpairs\n", count);
+ js_p->error_code = ret;
+ return SM_ACTION_COMPLETE;
+ }
+
+ control.buffer_sz = sm_p->control->size;
+ control.buffer = sm_p->control->data;
+ rec.buffer_sz = sm_p->rec->size;
+ rec.buffer = sm_p->rec->data;
+ foreach_msgpair(&sm_p->msgarray_op, msg_p, index)
+ {
+ assert(msg_p);
+ PINT_SERVREQ_DBREP_FILL(msg_p->req, control, rec);
+ msg_p->fs_id = sm_p->fs_id;
+ msg_p->handle = 0;
+ msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
+ msg_p->comp_fn = NULL;
+ if(sm_p->eid >= 0)
+ {
+ msg_p->svr_addr = sm_p->server_addrs[sm_p->eid];
+ }
+ else
+ {
+ msg_p->svr_addr = sm_p->server_addrs[index];
+ }
+ }
+
+ PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
+ return SM_ACTION_COMPLETE;
+}
+
+static PINT_sm_action db_rep_cleanup(
+ struct PINT_smcb *smcb, job_status_s *js_p)
+{
+ struct PINT_db_rep_send_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
+ sm_p->error_code = js_p->error_code;
+ free(sm_p->server_addrs);
+ PINT_SET_OP_COMPLETE;
+ return SM_ACTION_TERMINATE;
+}
More information about the Pvfs2-cvs
mailing list