[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/bmi/bmi_mx:
module.mk.in mx.c mx.h
CVS commit program
cvs at parl.clemson.edu
Mon Apr 7 11:07:25 EDT 2008
Update of /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx
In directory parlweb1:/tmp/cvs-serv26608/src/io/bmi/bmi_mx
Modified Files:
Tag: small-file-branch
module.mk.in mx.c mx.h
Log Message:
syncing small-file-branch back up with trunk at small-file-branch-point2 tag
(reverse merge)
Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/module.mk.in,v
diff -p -u -r1.1 -r1.1.20.1
--- module.mk.in 13 Feb 2007 18:39:39 -0000 1.1
+++ module.mk.in 7 Apr 2008 15:07:24 -0000 1.1.20.1
@@ -1,10 +1,30 @@
-BUILD_MX = @BUILD_MX@
+#
+# Makefile stub for bmi_mx.
+#
+# Copyright (C) 2008 Pete Wyckoff <pw at osc.edu>
+#
+# See COPYING in top-level directory.
+#
-# only build MX module if configure detected MX
-ifdef BUILD_MX
- DIR := src/io/bmi/bmi_mx
- LIBSRC += \
- $(DIR)/mx.c
- SERVERSRC += \
- $(DIR)/mx.c
-endif
+# only do any of this if configure decided to use MX
+ifneq (,$(BUILD_MX))
+
+#
+# Local definitions.
+#
+DIR := src/io/bmi/bmi_mx
+cfiles := mx.c
+
+#
+# Export these to the top Makefile to tell it what to build.
+#
+src := $(patsubst %,$(DIR)/%,$(cfiles))
+LIBSRC += $(src)
+SERVERSRC += $(src)
+
+#
+# Extra cflags for files in this directory.
+#
+MODCFLAGS_$(DIR) := -I at MX_INCDIR@
+
+endif # BUILD_MX
Index: mx.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/mx.c,v
diff -p -u -r1.11 -r1.11.2.1
--- mx.c 30 Nov 2007 19:33:16 -0000 1.11
+++ mx.c 7 Apr 2008 15:07:24 -0000 1.11.2.1
@@ -34,10 +34,12 @@ bmx_unexpected_recv(void *context, mx_en
static int
bmx_peer_connect(struct bmx_peer *peer);
+static void
+bmx_create_peername(void);
/**** TX/RX handling functions **************************************/
-void
+static void
bmx_ctx_free(struct bmx_ctx *ctx)
{
if (ctx == NULL) return;
@@ -60,7 +62,7 @@ bmx_ctx_free(struct bmx_ctx *ctx)
return;
}
-int
+static int
bmx_ctx_alloc(struct bmx_ctx **ctxp, enum bmx_req_type type)
{
struct bmx_ctx *ctx = NULL;
@@ -142,7 +144,7 @@ bmx_ctx_alloc(struct bmx_ctx **ctxp, enu
return 0;
}
-void
+static void
bmx_ctx_init(struct bmx_ctx *ctx)
{
struct bmx_peer *peer = NULL;
@@ -191,7 +193,7 @@ bmx_ctx_init(struct bmx_ctx *ctx)
}
/* add to peer's queued txs/rxs list */
-void
+static void
bmx_q_ctx(struct bmx_ctx *ctx)
{
struct bmx_peer *peer = ctx->mxc_peer;
@@ -206,7 +208,7 @@ bmx_q_ctx(struct bmx_ctx *ctx)
}
/* remove from peer's queued txs/rxs list */
-void
+static void
bmx_deq_ctx(struct bmx_ctx *ctx)
{
struct bmx_peer *peer = ctx->mxc_peer;
@@ -220,7 +222,7 @@ bmx_deq_ctx(struct bmx_ctx *ctx)
}
/* add to peer's pending rxs list */
-void
+static void
bmx_q_pending_ctx(struct bmx_ctx *ctx)
{
struct bmx_peer *peer = ctx->mxc_peer;
@@ -237,7 +239,7 @@ bmx_q_pending_ctx(struct bmx_ctx *ctx)
}
/* remove from peer's pending rxs list */
-void
+static void
bmx_deq_pending_ctx(struct bmx_ctx *ctx)
{
struct bmx_peer *peer = ctx->mxc_peer;
@@ -256,7 +258,7 @@ bmx_deq_pending_ctx(struct bmx_ctx *ctx)
}
/* add to the global canceled list */
-void
+static void
bmx_q_canceled_ctx(struct bmx_ctx *ctx, bmi_error_code_t error)
{
ctx->mxc_state = BMX_CTX_CANCELED;
@@ -270,7 +272,7 @@ bmx_q_canceled_ctx(struct bmx_ctx *ctx,
return;
}
-struct bmx_ctx *
+static struct bmx_ctx *
bmx_get_idle_rx(void)
{
struct bmx_ctx *rx = NULL;
@@ -301,7 +303,7 @@ bmx_get_idle_rx(void)
return rx;
}
-void
+static void
bmx_put_idle_rx(struct bmx_ctx *rx)
{
if (rx == NULL) {
@@ -326,7 +328,7 @@ bmx_put_idle_rx(struct bmx_ctx *rx)
return;
}
-void
+static void
bmx_reduce_idle_rxs(int count)
{
int i = 0;
@@ -342,7 +344,7 @@ bmx_reduce_idle_rxs(int count)
return;
}
-struct bmx_ctx *
+static struct bmx_ctx *
bmx_get_idle_tx(void)
{
struct bmx_ctx *tx = NULL;
@@ -373,7 +375,7 @@ bmx_get_idle_tx(void)
return tx;
}
-void
+static void
bmx_put_idle_tx(struct bmx_ctx *tx)
{
if (tx == NULL) {
@@ -400,7 +402,7 @@ bmx_put_idle_tx(struct bmx_ctx *tx)
/**** peername parsing functions **************************************/
-int
+static int
bmx_verify_hostname(char *host)
{
int ret = 0;
@@ -423,7 +425,7 @@ bmx_verify_hostname(char *host)
return 0;
}
-int
+static int
bmx_verify_num_str(char *num_str)
{
int ret = 0;
@@ -447,11 +449,10 @@ bmx_verify_num_str(char *num_str)
* returns 0 and we do not know that it failed.
* This handles legal hostnames (1-63 chars) include a-zA-Z0-9 as well as . and -
* It will accept IPv4 addresses but not IPv6 (too many semicolons) */
-int
+static int
bmx_parse_peername(const char *peername, char **hostname, uint32_t *board, uint32_t *ep_id)
{
int ret = 0;
- int len = 0;
int colon1_found = 0;
int colon2_found = 0;
char *s = NULL;
@@ -510,7 +511,6 @@ bmx_parse_peername(const char *peername,
NULL != strchr(colon2, ':')) {
debug(BMX_DB_INFO, "parse_peername() too many ':' (%s %s)",
colon1, colon2);
- len = sizeof(*s);
free(s);
return -1;
}
@@ -522,7 +522,6 @@ bmx_parse_peername(const char *peername,
free(s);
return -1;
}
- strcpy(host, s);
if (colon1_found) {
bd = (uint32_t) strtol(colon1, NULL, 0);
@@ -564,7 +563,7 @@ bmx_parse_peername(const char *peername,
/**** peer handling functions **************************************/
-void
+static void
bmx_peer_free(struct bmx_peer *peer)
{
struct bmx_method_addr *mxmap = peer->mxp_mxmap;
@@ -586,7 +585,7 @@ bmx_peer_free(struct bmx_peer *peer)
return;
}
-void
+static void
bmx_peer_addref(struct bmx_peer *peer)
{
gen_mutex_lock(&peer->mxp_lock);
@@ -596,9 +595,10 @@ bmx_peer_addref(struct bmx_peer *peer)
return;
}
-void
+static void
bmx_peer_decref(struct bmx_peer *peer)
{
+ debug(BMX_DB_FUNC, "entering %s", __func__);
gen_mutex_lock(&peer->mxp_lock);
if (peer->mxp_refcount == 0) {
debug(BMX_DB_WARN, "peer_decref() called for %s when refcount == 0",
@@ -607,6 +607,8 @@ bmx_peer_decref(struct bmx_peer *peer)
peer->mxp_refcount--;
if (peer->mxp_refcount == 1 && peer->mxp_state == BMX_PEER_DISCONNECT) {
/* all txs and rxs are completed or canceled, reset state */
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_INIT;
}
gen_mutex_unlock(&peer->mxp_lock);
@@ -622,10 +624,11 @@ bmx_peer_decref(struct bmx_peer *peer)
gen_mutex_unlock(&bmi_mx->bmx_lock);
bmx_peer_free(peer);
}
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
return;
}
-int
+static int
bmx_peer_alloc(struct bmx_peer **peerp, struct bmx_method_addr *mxmap)
{
int i = 0;
@@ -697,6 +700,9 @@ bmx_peer_alloc(struct bmx_peer **peerp,
bmx_put_idle_rx(rx);
}
+ /* on servers with server-to-server comms, we are racing
+ * between method_addr_lookup() and handle_conn_req() */
+
bmx_peer_addref(peer); /* for the peers list */
gen_mutex_lock(&bmi_mx->bmx_peers_lock);
qlist_add_tail(&peer->mxp_list, &bmi_mx->bmx_peers);
@@ -708,11 +714,13 @@ bmx_peer_alloc(struct bmx_peer **peerp,
return 0;
}
-int
+static int
bmx_peer_init_state(struct bmx_peer *peer)
{
int ret = 0;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
gen_mutex_lock(&peer->mxp_lock);
/* we have a ref for each pending tx and rx, don't init
@@ -722,18 +730,22 @@ bmx_peer_init_state(struct bmx_peer *pee
ret = -1;
} else {
/* ok to init */
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_INIT;
}
gen_mutex_unlock(&peer->mxp_lock);
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return 0;
}
/**** startup/shutdown functions **************************************/
/* init bmi_mx */
-int
+static int
bmx_globals_init(int method_id)
{
#if BMX_MEM_ACCT
@@ -752,6 +764,7 @@ bmx_globals_init(int method_id)
/* bmi_mx->bmx_board */
/* bmi_mx->bmx_ep_id */
/* bmi_mx->bmx_ep */
+ /* bmi_mx->bmx_sid */
/* bmi_mx->bmx_is_server */
INIT_QLIST_HEAD(&bmi_mx->bmx_peers);
@@ -768,6 +781,11 @@ bmx_globals_init(int method_id)
INIT_QLIST_HEAD(&bmi_mx->bmx_canceled);
gen_mutex_init(&bmi_mx->bmx_canceled_lock);
+ INIT_QLIST_HEAD(&bmi_mx->bmx_unex_txs);
+ gen_mutex_init(&bmi_mx->bmx_unex_txs_lock);
+ INIT_QLIST_HEAD(&bmi_mx->bmx_unex_rxs);
+ gen_mutex_init(&bmi_mx->bmx_unex_rxs_lock);
+
bmi_mx->bmx_next_id = 1;
gen_mutex_init(&bmi_mx->bmx_lock); /* global lock, use for global txs,
global rxs, next_id, etc. */
@@ -787,7 +805,7 @@ bmx_globals_init(int method_id)
}
-int
+static int
bmx_open_endpoint(mx_endpoint_t *ep, uint32_t board, uint32_t ep_id)
{
mx_return_t mxret = MX_SUCCESS;
@@ -801,7 +819,7 @@ bmx_open_endpoint(mx_endpoint_t *ep, uin
* matching recvs. */
param.key = MX_PARAM_CONTEXT_ID;
param.val.context_id.bits = 4;
- param.val.context_id.shift = 60;
+ param.val.context_id.shift = BMX_MSG_SHIFT;
mxret = mx_open_endpoint(board, ep_id, BMX_MAGIC,
¶m, 1, ep);
@@ -840,6 +858,8 @@ BMI_mx_initialize(bmi_method_addr_p list
int ret = 0;
mx_return_t mxret = MX_SUCCESS;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
#if BMX_LOGGING
MPE_Init_log();
#define BMX_LOG_STATE 1
@@ -907,11 +927,16 @@ BMI_mx_initialize(bmi_method_addr_p list
if (init_flags & BMI_INIT_SERVER) {
struct bmx_ctx *rx = NULL;
struct bmx_method_addr *mxmap = listen_addr->method_data;
+ mx_endpoint_addr_t epa;
+ uint32_t ep_id = 0;
+ uint32_t sid = 0;
+ uint64_t nic_id = 0ULL;
bmi_mx->bmx_hostname = (char *) mxmap->mxm_hostname;
bmi_mx->bmx_board = mxmap->mxm_board;
bmi_mx->bmx_ep_id = mxmap->mxm_ep_id;
bmi_mx->bmx_is_server = 1;
+ bmx_create_peername();
ret = bmx_open_endpoint(&bmi_mx->bmx_ep, mxmap->mxm_board, mxmap->mxm_ep_id);
if (ret != 0) {
@@ -919,6 +944,12 @@ BMI_mx_initialize(bmi_method_addr_p list
BMX_FREE(bmi_mx, sizeof(*bmi_mx));
exit(1);
}
+
+ /* get our MX session id */
+ mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
+ mx_decompose_endpoint_addr2(epa, &nic_id, &ep_id, &sid);
+ bmi_mx->bmx_sid = sid;
+
/* We allocate BMX_PEER_RX_NUM when we peer_alloc()
* Allocate some here to catch the peer CONN_REQ */
for (i = 0; i < BMX_SERVER_RXS; i++) {
@@ -948,6 +979,8 @@ BMI_mx_initialize(bmi_method_addr_p list
#if BMX_MEM_ACCT
debug(BMX_DB_MEM, "memory used at end of initialization %lld", llu(mem_used));
#endif
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return 0;
}
@@ -1017,6 +1050,9 @@ BMI_mx_finalize(void)
}
#endif
+ if (bmi_mx->bmx_hostname) free(bmi_mx->bmx_hostname);
+ if (bmi_mx->bmx_peername) free(bmi_mx->bmx_peername);
+
bmi_mx = NULL;
gen_mutex_unlock(&tmp->bmx_lock);
@@ -1060,6 +1096,8 @@ bmx_peer_disconnect(struct bmx_peer *pee
gen_mutex_unlock(&peer->mxp_lock);
return;
}
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_DISCONNECT",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_DISCONNECT;
/* cancel queued txs */
@@ -1115,10 +1153,16 @@ BMI_mx_set_info(int option, void *inout_
peer = mxmap->mxm_peer;
bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
}
- if (!mxmap->mxm_peername) free((void *) mxmap->mxm_peername);
- mxmap->mxm_peername = NULL;
- if (!mxmap->mxm_hostname) free((void *) mxmap->mxm_hostname);
- mxmap->mxm_hostname = NULL;
+ if (mxmap->mxm_peername) {
+ debug(BMX_DB_MEM, "freeing mxm_peername");
+ free((void *) mxmap->mxm_peername);
+ mxmap->mxm_peername = NULL;
+ }
+ if (mxmap->mxm_hostname) {
+ debug(BMX_DB_MEM, "freeing mxm_hostname");
+ free((void *) mxmap->mxm_hostname);
+ mxmap->mxm_hostname = NULL;
+ }
debug(BMX_DB_PEER, "freeing map 0x%p", map);
free(map);
}
@@ -1159,7 +1203,7 @@ BMI_mx_get_info(int option, void *inout_
#define BMX_IO_BUF 1
#define BMX_UNEX_BUF 2
-void *
+static void *
bmx_memalloc(bmi_size_t size, int type)
{
void *buf = NULL;
@@ -1290,16 +1334,16 @@ BMI_mx_unexpected_free(void *buf)
return 0;
}
-void
+static void
bmx_parse_match(uint64_t match, uint8_t *type, uint32_t *id, uint32_t *tag)
{
- *type = (uint8_t) (match >> 60);
- *id = (uint32_t) ((match >> 32) & 0xFFFFF); /* 20 bits */
- *tag = (uint32_t) (match & 0xFFFFFFFF);
+ *type = (uint8_t) (match >> BMX_MSG_SHIFT);
+ *id = (uint32_t) ((match >> BMX_ID_SHIFT) & BMX_MAX_PEER_ID); /* 20 bits */
+ *tag = (uint32_t) (match & BMX_MAX_TAG); /* 32 bits */
return;
}
-void
+static void
bmx_create_match(struct bmx_ctx *ctx)
{
int connect = 0;
@@ -1330,12 +1374,12 @@ bmx_create_match(struct bmx_ctx *ctx)
exit(1);
}
- ctx->mxc_match = (type << 60) | (id << 32) | tag;
+ ctx->mxc_match = (type << BMX_MSG_SHIFT) | (id << BMX_ID_SHIFT) | tag;
return;
}
-bmi_error_code_t
+static bmi_error_code_t
bmx_mx_to_bmi_errno(enum mx_status_code code)
{
int err = 0;
@@ -1415,6 +1459,8 @@ bmx_ensure_connected(struct bmx_method_a
int ret = 0;
struct bmx_peer *peer = mxmap->mxm_peer;
+ /* NOTE: can this happen? we call peer_alloc() when using
+ * method_addr_lookup() */
if (peer == NULL) {
ret = bmx_peer_alloc(&peer, mxmap);
if (ret != 0) {
@@ -1557,10 +1603,15 @@ BMI_mx_post_send(bmi_op_id_t *id, struct
enum bmi_buffer_type buffer_flag __unused,
bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
{
+ int ret = 0;
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
+ ret = bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
tag, user_ptr, context_id, 0);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
static int
@@ -1569,10 +1620,16 @@ BMI_mx_post_send_list(bmi_op_id_t *id, s
bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
{
+ int ret = 0;
+
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
+ ret = bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
total_size, tag, user_ptr, context_id, 0);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
static int
@@ -1581,10 +1638,16 @@ BMI_mx_post_sendunexpected(bmi_op_id_t *
enum bmi_buffer_type buffer_flag __unused,
bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
{
+ int ret = 0;
+
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
+ ret = bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
tag, user_ptr, context_id, 1);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
static int
@@ -1593,10 +1656,16 @@ BMI_mx_post_sendunexpected_list(bmi_op_i
bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
{
+ int ret = 0;
+
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
+ ret = bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
total_size, tag, user_ptr, context_id, 1);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
/* if (peer->mxp_state == BMX_PEER_READY)
@@ -1625,7 +1694,7 @@ bmx_post_rx(struct bmx_ctx *rx)
segs = rx->mxc_seg_list;
}
mxret = mx_irecv(bmi_mx->bmx_ep, segs, rx->mxc_nseg,
- rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+ rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
ret = -BMI_ENOMEM;
bmx_deq_pending_ctx(rx); /* uses peer lock */
@@ -1742,10 +1811,16 @@ BMI_mx_post_recv(bmi_op_id_t *id, struct
enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
bmi_context_id context_id)
{
+ int ret = 0;
+
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_recv_common(id, remote_map, 1, &buffer, &expected_len,
+ ret = bmx_post_recv_common(id, remote_map, 1, &buffer, &expected_len,
expected_len, tag, user_ptr, context_id);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
static int
@@ -1755,10 +1830,16 @@ BMI_mx_post_recv_list(bmi_op_id_t *id, s
enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
bmi_context_id context_id)
{
+ int ret = 0;
+
debug(BMX_DB_FUNC, "entering %s", __func__);
- return bmx_post_recv_common(id, remote_map, list_count, buffers, sizes,
+ ret = bmx_post_recv_common(id, remote_map, list_count, buffers, sizes,
tot_expected_len, tag, user_ptr, context_id);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+ return ret;
}
static void
@@ -1767,6 +1848,8 @@ bmx_peer_post_queued_rxs(struct bmx_peer
struct bmx_ctx *rx = NULL;
list_t *queued_rxs = &peer->mxp_queued_rxs;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
gen_mutex_lock(&peer->mxp_lock);
while (!qlist_empty(queued_rxs)) {
if (peer->mxp_state != BMX_PEER_READY) {
@@ -1781,6 +1864,8 @@ bmx_peer_post_queued_rxs(struct bmx_peer
}
gen_mutex_unlock(&peer->mxp_lock);
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return;
}
@@ -1790,6 +1875,8 @@ bmx_peer_post_queued_txs(struct bmx_peer
struct bmx_ctx *tx = NULL;
list_t *queued_txs = &peer->mxp_queued_txs;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
gen_mutex_lock(&peer->mxp_lock);
while (!qlist_empty(queued_txs)) {
if (peer->mxp_state != BMX_PEER_READY) {
@@ -1806,6 +1893,8 @@ bmx_peer_post_queued_txs(struct bmx_peer
}
gen_mutex_unlock(&peer->mxp_lock);
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return;
}
@@ -1817,8 +1906,11 @@ bmx_post_unexpected_recv(mx_endpoint_add
int ret = 0;
struct bmx_ctx *rx = NULL;
struct bmx_peer *peer = NULL;
+ void *peerp = (void *) &peer;
mx_return_t mxret = MX_SUCCESS;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
if (id == 0 && tag == 0 && type == 0) {
bmx_parse_match(match, &type, &id, &tag);
}
@@ -1829,7 +1921,8 @@ bmx_post_unexpected_recv(mx_endpoint_add
rx = bmx_get_idle_rx();
if (rx != NULL) {
- mx_get_endpoint_addr_context(source, (void **) &peer);
+ mx_get_endpoint_addr_context(source, &peerp);
+ peer = (struct bmx_peer *) peerp;
if (peer == NULL) {
debug(BMX_DB_PEER, "unknown peer sent message 0x%llx "
"length %u", llu(match), length);
@@ -1853,7 +1946,7 @@ bmx_post_unexpected_recv(mx_endpoint_add
debug(BMX_DB_CTX, "%s rx match= 0x%llx length= %lld", __func__,
llu(rx->mxc_match), lld(rx->mxc_nob));
mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
- rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+ rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
debug((BMX_DB_MX|BMX_DB_CTX), "mx_irecv() failed with %s for an "
"unexpected recv with tag %d length %d",
@@ -1865,6 +1958,8 @@ bmx_post_unexpected_recv(mx_endpoint_add
ret = -1;
}
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return ret;
}
@@ -1885,6 +1980,7 @@ bmx_unexpected_recv(void *context, mx_en
uint32_t id = 0;
uint32_t tag = 0;
struct bmx_peer *peer = NULL;
+ void *peerp = &peer;
mx_return_t mxret = MX_SUCCESS;
bmx_parse_match(match_value, &type, &id, &tag);
@@ -1909,7 +2005,7 @@ bmx_unexpected_recv(void *context, mx_en
debug(BMX_DB_CONN, "%s rx match= 0x%llx length= %lld",
__func__, llu(rx->mxc_match), lld(rx->mxc_nob));
mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
- rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+ rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
debug(BMX_DB_CONN, "mx_irecv() failed for an "
"unexpected recv with %s",
@@ -1923,11 +2019,8 @@ bmx_unexpected_recv(void *context, mx_en
break;
case BMX_MSG_CONN_ACK:
/* the server is replying to our CONN_REQ */
- if (bmi_mx->bmx_is_server) {
- debug(BMX_DB_ERR, "server receiving CONN_ACK");
- exit(1);
- }
- mx_get_endpoint_addr_context(source, (void **) &peer);
+ mx_get_endpoint_addr_context(source, &peerp);
+ peer = (struct bmx_peer *) peerp;
if (peer == NULL) {
debug((BMX_DB_CONN|BMX_DB_PEER), "receiving CONN_ACK but "
"the endpoint context does not have a peer");
@@ -1952,7 +2045,9 @@ bmx_unexpected_recv(void *context, mx_en
break;
case BMX_MSG_UNEXPECTED:
if (!bmi_mx->bmx_is_server) {
- mx_get_endpoint_addr_context(source, (void **) &peer);
+ void *peerp = &peer;
+ mx_get_endpoint_addr_context(source, &peerp);
+ peer = (struct bmx_peer *) peerp;
debug(BMX_DB_ERR, "client receiving unexpected message "
"from %s with mask 0x%llx length %u",
peer == NULL ? "unknown" : peer->mxp_mxmap->mxm_peername,
@@ -1987,6 +2082,8 @@ bmx_alloc_method_addr(const char *peerna
struct bmi_method_addr *map = NULL;
struct bmx_method_addr *mxmap = NULL;
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
if (bmi_mx == NULL) {
map = bmi_alloc_method_addr(
tmp_id, (bmi_size_t) sizeof(*mxmap));
@@ -2003,6 +2100,8 @@ bmx_alloc_method_addr(const char *peerna
mxmap->mxm_ep_id = ep_id;
/* mxmap->mxm_peer */
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return map;
}
@@ -2019,10 +2118,9 @@ bmx_handle_icon_req(void)
{
uint32_t result = 0;
- if (bmi_mx->bmx_is_server) return;
do {
- uint64_t match = (uint64_t) BMX_MSG_ICON_REQ << 60;
- uint64_t mask = (uint64_t) 0xF << 60;
+ uint64_t match = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
+ uint64_t mask = BMX_MASK_MSG;
mx_status_t status;
mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2092,7 +2190,7 @@ bmx_handle_icon_req(void)
return;
}
-/* test for CONN_REQ messages (on the server)
+/* test for received CONN_REQ messages (on the server)
* if found
* create peer
* create mxmap
@@ -2102,9 +2200,9 @@ static void
bmx_handle_conn_req(void)
{
uint32_t result = 0;
- uint64_t match = (uint64_t) BMX_MSG_CONN_REQ << 60;
- uint64_t mask = (uint64_t) 0xF << 60;
- uint64_t ack = (uint64_t) BMX_MSG_ICON_ACK << 60;
+ uint64_t match = (uint64_t) BMX_MSG_CONN_REQ << BMX_MSG_SHIFT;
+ uint64_t mask = BMX_MASK_MSG;
+ uint64_t ack = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
mx_status_t status;
do {
@@ -2112,7 +2210,10 @@ bmx_handle_conn_req(void)
if (result) {
uint8_t type = 0;
uint32_t id = 0;
+ uint32_t sid = 0;
uint32_t version = 0;
+ uint64_t nic_id = 0ULL;
+ uint32_t ep_id = 0;
mx_request_t request;
struct bmx_ctx *rx = NULL;
struct bmx_peer *peer = NULL;
@@ -2137,7 +2238,6 @@ bmx_handle_conn_req(void)
continue;
}
bmx_parse_match(rx->mxc_match, &type, &id, &version);
-
if (version != BMX_VERSION) {
/* TODO send error conn_ack */
debug(BMX_DB_WARN, "version mismatch with peer "
@@ -2154,7 +2254,13 @@ bmx_handle_conn_req(void)
bmx_put_idle_rx(rx);
continue;
}
- mx_get_endpoint_addr_context(status.source, (void **) &peer);
+ mx_decompose_endpoint_addr2(status.source, &nic_id,
+ &ep_id, &sid);
+ {
+ void *peerp = &peer;
+ mx_get_endpoint_addr_context(status.source, &peerp);
+ peer = (struct bmx_peer *) peerp;
+ }
if (peer == NULL) { /* new peer */
int ret = 0;
char *host = NULL;
@@ -2194,16 +2300,21 @@ bmx_handle_conn_req(void)
bmx_put_idle_rx(rx);
continue;
}
- } else { /* reconnecting peer */
+ } else if (sid != peer->mxp_sid) { /* reconnecting peer */
/* cancel queued txs and rxs, pending rxs */
- debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer %s reconnecting",
- __func__, peer->mxp_mxmap->mxm_peername);
- bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
+ debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer "
+ "%s reconnecting", __func__,
+ peer->mxp_mxmap->mxm_peername);
+ if (peer->mxp_state == BMX_PEER_READY)
+ bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
mxmap = peer->mxp_mxmap;
}
gen_mutex_lock(&peer->mxp_lock);
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_WAIT;
peer->mxp_tx_id = id;
+ peer->mxp_sid = sid;
gen_mutex_unlock(&peer->mxp_lock);
bmx_peer_addref(peer); /* add ref until completion of CONN_ACK */
mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
@@ -2232,8 +2343,8 @@ bmx_handle_icon_ack(void)
if (!bmi_mx->bmx_is_server) return;
do {
- uint64_t match = (uint64_t) BMX_MSG_ICON_ACK << 60;
- uint64_t mask = (uint64_t) 0xF << 60;
+ uint64_t match = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
+ uint64_t mask = BMX_MASK_MSG;
mx_status_t status;
mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2255,6 +2366,8 @@ bmx_handle_icon_ack(void)
}
gen_mutex_lock(&peer->mxp_lock);
peer->mxp_epa = status.source;
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_READY",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_READY;
/* NOTE no need to call bmx_peer_post_queued_[rxs|txs]()
* since the server should not have any queued msgs */
@@ -2288,6 +2401,8 @@ bmx_handle_icon_ack(void)
mx_isend(bmi_mx->bmx_ep, &tx->mxc_seg, tx->mxc_nseg, peer->mxp_epa,
tx->mxc_match, (void *) tx, &tx->mxc_mxreq);
if (!peer->mxp_exist) {
+ debug(BMX_DB_PEER, "calling bmi_method_addr_reg_callback"
+ "on %s", peer->mxp_mxmap->mxm_peername);
bmi_method_addr_reg_callback(peer->mxp_map);
peer->mxp_exist = 1;
}
@@ -2307,10 +2422,10 @@ bmx_handle_conn_ack(void)
uint32_t result = 0;
struct bmx_ctx *tx = NULL;
- if (!bmi_mx->bmx_is_server) return;
+ if (!bmi_mx->bmx_is_server) goto out;
do {
- uint64_t match = (uint64_t) BMX_MSG_CONN_ACK << 60;
- uint64_t mask = (uint64_t) 0xF << 60;
+ uint64_t match = (uint64_t) BMX_MSG_CONN_ACK << BMX_MSG_SHIFT;
+ uint64_t mask = BMX_MASK_MSG;
mx_status_t status;
mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2323,17 +2438,26 @@ bmx_handle_conn_ack(void)
}
} while (result);
+out:
return;
}
static void
bmx_connection_handlers(void)
{
+ static int count = 0;
+ int print = (count++ % 1000 == 0);
+
+ if (print)
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
/* push connection messages along */
bmx_handle_icon_req();
bmx_handle_conn_req();
bmx_handle_icon_ack();
bmx_handle_conn_ack();
+ if (print)
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
return;
}
@@ -2425,12 +2549,20 @@ BMI_mx_testcontext(int incount, bmi_op_i
{
int i = 0;
int completed = 0;
+ int old = 0;
uint64_t match = 0ULL;
- uint64_t mask = (uint64_t) 0xF << 60;
+ uint64_t mask = BMX_MASK_MSG;
struct bmx_ctx *ctx = NULL;
struct bmx_peer *peer = NULL;
list_t *canceled = &bmi_mx->bmx_canceled;
int wait = 0;
+ static int count = 0;
+ int print = 0;
+
+ if (count++ % 1000 == 0) {
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+ print = 1;
+ }
bmx_connection_handlers();
@@ -2465,11 +2597,13 @@ BMI_mx_testcontext(int incount, bmi_op_i
/* return completed messages
* we will always try (incount - completed) times even
* if some iterations have no result */
- match = (uint64_t) BMX_MSG_EXPECTED << 60;
+
+ match = (uint64_t) BMX_MSG_EXPECTED << BMX_MSG_SHIFT;
for (i = completed; i < incount; i++) {
uint32_t result = 0;
mx_status_t status;
- int old = completed;
+
+ old = completed;
if (wait == 0 || wait == 2) {
mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2527,77 +2661,118 @@ BMI_mx_testcontext(int incount, bmi_op_i
}
/* check for completed unexpected sends */
- match = (uint64_t) BMX_MSG_UNEXPECTED << 60;
- if (!bmi_mx->bmx_is_server) { /* client */
- int old = completed;
- for (i = completed; i < incount; i++) {
- uint32_t result = 0;
- mx_status_t status;
+ match = (uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT;
+
+ old = completed;
+
+ for (i = completed; i < incount; i++) {
+ uint32_t result = 0;
+ mx_status_t status;
+ list_t *l = &bmi_mx->bmx_unex_txs;
+ int again = 1;
+
+ ctx = NULL;
+
+ gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock);
+ if (!qlist_empty(l)) {
+ ctx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
+ peer = ctx->mxc_peer;
+ qlist_del_init(&ctx->mxc_list);
+ result = 1;
+ }
+ gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock);
+
+ while (!ctx && again) {
+ again = 0;
mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
if (result) {
ctx = (struct bmx_ctx *) status.context;
peer = ctx->mxc_peer;
- debug(BMX_DB_CTX, "%s completing unexpected %s with "
- "match 0x%llx for %s with op_id %llu",
- __func__,
- ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
- llu(ctx->mxc_match),
- peer->mxp_mxmap->mxm_peername,
- llu(ctx->mxc_mop->op_id));
-
- if (!qlist_empty(&ctx->mxc_list)) {
- gen_mutex_lock(&peer->mxp_lock);
- qlist_del_init(&ctx->mxc_list);
- gen_mutex_unlock(&peer->mxp_lock);
- }
- outids[completed] = ctx->mxc_mop->op_id;
- if (status.code == MX_SUCCESS) {
- errs[completed] = 0;
- sizes[completed] = status.xfer_length;
- } else {
- errs[completed] = bmx_mx_to_bmi_errno(status.code);
+ gen_mutex_lock(&peer->mxp_lock);
+ qlist_del_init(&ctx->mxc_list);
+ gen_mutex_unlock(&peer->mxp_lock);
+ if (ctx->mxc_type == BMX_REQ_RX) {
+ /* queue until testunexpected is called */
+ gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock);
+ qlist_add_tail(&bmi_mx->bmx_unex_rxs, &ctx->mxc_list);
+ gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock);
+ result = 0;
+ again = 1;
}
- if (user_ptrs)
- user_ptrs[completed] = ctx->mxc_mop->user_ptr;
- id_gen_fast_unregister(ctx->mxc_mop->op_id);
- BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));
- completed++;
+ }
+ }
+ if (result) {
+ debug(BMX_DB_CTX, "%s completing unexpected %s with "
+ "match 0x%llx for %s with op_id %llu",
+ __func__,
+ ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
+ llu(ctx->mxc_match),
+ peer->mxp_mxmap->mxm_peername,
+ llu(ctx->mxc_mop->op_id));
+
+ if (!qlist_empty(&ctx->mxc_list)) {
+ gen_mutex_lock(&peer->mxp_lock);
+ qlist_del_init(&ctx->mxc_list);
+ gen_mutex_unlock(&peer->mxp_lock);
+ }
+ outids[completed] = ctx->mxc_mop->op_id;
+ if (status.code == MX_SUCCESS) {
+ errs[completed] = 0;
+ sizes[completed] = status.xfer_length;
+ } else {
+ errs[completed] = bmx_mx_to_bmi_errno(status.code);
+ }
+ if (user_ptrs)
+ user_ptrs[completed] = ctx->mxc_mop->user_ptr;
+ id_gen_fast_unregister(ctx->mxc_mop->op_id);
+ BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));
+ completed++;
#if BMX_LOGGING
- MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL);
+ MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL);
#endif
- if (ctx->mxc_type == BMX_REQ_TX) {
- bmx_put_idle_tx(ctx);
- } else {
- bmx_put_idle_rx(ctx);
- }
- bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */
- }
- }
- if (completed - old > 0) {
- debug(BMX_DB_CTX, "%s found %d unexpected tx messages",
- __func__, completed - old);
+ bmx_put_idle_tx(ctx);
+ bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */
}
}
+ if (completed - old > 0) {
+ debug(BMX_DB_CTX, "%s found %d unexpected tx messages",
+ __func__, completed - old);
+ }
+
+ if (print)
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
*outcount = completed;
return completed;
}
+/* test for unexpected receives only, not unex sends */
static int
BMI_mx_testunexpected(int incount __unused, int *outcount,
struct bmi_method_unexpected_info *ui, int max_idle_time __unused)
{
uint32_t result = 0;
- uint64_t match = (uint64_t) BMX_MSG_UNEXPECTED << 60;
- uint64_t mask = (uint64_t) 0xF << 60;
+ uint64_t match = ((uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT);
+ uint64_t mask = BMX_MASK_MSG;
mx_status_t status;
+ static int count = 0;
+ int print = 0;
+ struct bmx_ctx *rx = NULL;
+ struct bmx_peer *peer = NULL;
+ list_t *l = &bmi_mx->bmx_unex_rxs;
+ int again = 1;
+
+ if (count++ % 1000 == 0) {
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+ print = 1;
+ }
bmx_connection_handlers();
/* if the unexpected handler cannot get a rx, it does not post a receive.
- * probe for unexpected and post a rx */
+ * probe for unexpected and post a rx. */
mx_iprobe(bmi_mx->bmx_ep, match, mask, &status, &result);
if (result) {
int ret = 0;
@@ -2609,13 +2784,39 @@ BMI_mx_testunexpected(int incount __unus
}
}
- /* check for unexpected messages */
+ /* check for unexpected receives */
*outcount = 0;
- mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
+
+ gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock);
+ if (!qlist_empty(l)) {
+ rx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
+ peer = rx->mxc_peer;
+ qlist_del_init(&rx->mxc_list);
+ result = 1;
+ }
+ gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock);
+
+ while (!rx && again) {
+ again = 0;
+ mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
+ if (result) {
+ rx = (struct bmx_ctx *) status.context;
+ peer = rx->mxc_peer;
+ if (rx->mxc_type == BMX_REQ_TX) {
+ gen_mutex_lock(&peer->mxp_lock);
+ qlist_del_init(&rx->mxc_list);
+ gen_mutex_unlock(&peer->mxp_lock);
+ gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock);
+ qlist_add_tail(&bmi_mx->bmx_unex_txs, &rx->mxc_list);
+ gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock);
+ result = 0;
+ again = 1;
+ }
+ }
+ }
+
if (result) {
- struct bmx_ctx *rx = (struct bmx_ctx *) status.context;
- struct bmx_peer *peer = rx->mxc_peer;
- debug(BMX_DB_CTX, "*** %s completing RX with match 0x%llx for %s",
+ debug(BMX_DB_CTX, "%s completing RX with match 0x%llx for %s",
__func__, llu(rx->mxc_match), peer->mxp_mxmap->mxm_peername);
ui->error_code = 0;
@@ -2628,11 +2829,6 @@ BMI_mx_testunexpected(int incount __unus
rx->mxc_seg.segment_ptr = rx->mxc_buffer;
ui->tag = rx->mxc_tag;
- if (!qlist_empty(&rx->mxc_list)) {
- gen_mutex_lock(&peer->mxp_lock);
- qlist_del_init(&rx->mxc_list);
- gen_mutex_unlock(&peer->mxp_lock);
- }
#if BMX_LOGGING
MPE_Log_event(recvunex_finish, (int) rx->mxc_tag, NULL);
#endif
@@ -2640,6 +2836,9 @@ BMI_mx_testunexpected(int incount __unus
bmx_peer_decref(peer); /* drop the ref taken in unexpected_recv() */
*outcount = 1;
}
+ if (print)
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return 0;
}
@@ -2660,14 +2859,15 @@ bmx_peer_connect(struct bmx_peer *peer)
int ret = 0;
uint64_t nic_id = 0ULL;
mx_request_t request;
- uint64_t match = (uint64_t) BMX_MSG_ICON_REQ << 60;
+ uint64_t match = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
struct bmx_method_addr *mxmap = peer->mxp_mxmap;
- if (bmi_mx->bmx_is_server) {
- return 1;
- }
+ debug(BMX_DB_FUNC, "entering %s", __func__);
+
gen_mutex_lock(&peer->mxp_lock);
if (peer->mxp_state == BMX_PEER_INIT) {
+ debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
+ peer->mxp_mxmap->mxm_peername);
peer->mxp_state = BMX_PEER_WAIT;
} else {
gen_mutex_unlock(&peer->mxp_lock);
@@ -2693,7 +2893,8 @@ bmx_peer_connect(struct bmx_peer *peer)
}
mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
/* get our nic_id and ep_id */
- mx_decompose_endpoint_addr(epa, &nic_id, &bmi_mx->bmx_ep_id);
+ mx_decompose_endpoint_addr2(epa, &nic_id, &bmi_mx->bmx_ep_id,
+ &bmi_mx->bmx_sid);
/* get our hostname */
mx_nic_id_to_hostname(nic_id, host);
bmi_mx->bmx_hostname = strdup(host);
@@ -2714,6 +2915,9 @@ bmx_peer_connect(struct bmx_peer *peer)
* by calling mx_iconnect() w/BMX_MSG_ICON_REQ */
mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
BMX_MAGIC, match, (void *) peer, &request);
+
+ debug(BMX_DB_FUNC, "leaving %s", __func__);
+
return ret;
}
@@ -2729,10 +2933,11 @@ static struct bmi_method_addr *
BMI_mx_method_addr_lookup(const char *id)
{
int ret = 0;
+ int len = 0;
char *host = NULL;
uint32_t board = 0;
uint32_t ep_id = 0;
- struct bmi_method_addr *map = NULL;
+ struct bmi_method_addr *map = NULL;
struct bmx_method_addr *mxmap = NULL;
debug(BMX_DB_FUNC, "entering %s", __func__);
@@ -2754,7 +2959,8 @@ BMI_mx_method_addr_lookup(const char *id
mxmap->mxm_board == board &&
mxmap->mxm_ep_id == ep_id) {
map = peer->mxp_map;
- BMX_FREE(host, sizeof(*host));
+ len = strlen(host);
+ BMX_FREE(host, len);
break;
}
}
@@ -2763,7 +2969,7 @@ BMI_mx_method_addr_lookup(const char *id
if (map == NULL) {
map = bmx_alloc_method_addr(id, host, board, ep_id);
- if (bmi_mx != NULL && ! bmi_mx->bmx_is_server) { /* we are a client */
+ if (bmi_mx != NULL) {
struct bmx_peer *peer = NULL;
mxmap = map->method_data;
@@ -2824,7 +3030,19 @@ BMI_mx_cancel(bmi_op_id_t id, bmi_contex
break;
case BMX_CTX_PENDING:
if (ctx->mxc_type == BMX_REQ_TX) {
- bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
+ /* see if it completed first */
+ mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result);
+ if (result == 1) {
+ debug(BMX_DB_CTX, "%s completed TX op_id %llu "
+ "mxc_state %d peer state %d status.code %s",
+ __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state,
+ peer->mxp_state, mx_strstatus(ctx->mxc_mxstat.code));
+ bmx_deq_pending_ctx(ctx);
+ bmx_q_canceled_ctx(ctx, BMI_ECANCEL);
+ } else {
+ /* and if not, then disconnect() */
+ bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
+ }
} else { /* BMX_REQ_RX */
mx_cancel(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &result);
if (result == 1) {
Index: mx.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/mx.h,v
diff -p -u -r1.5 -r1.5.6.1
--- mx.h 4 Oct 2007 13:56:13 -0000 1.5
+++ mx.h 7 Apr 2008 15:07:25 -0000 1.5.6.1
@@ -107,8 +107,7 @@ typedef struct qlist_head list_t;
/* BMX [UN]EXPECTED msgs use the 64-bits of the match info as follows:
* Bits Description
* 60-63 Msg type
- * 56-59 Reserved for credits if implemented
- * 52-55 Reserved
+ * 52-59 Reserved
* 32-51 Peer id (of the sender, assigned by receiver)
* 0-31 bmi_msg_tag_t
*/
@@ -116,14 +115,20 @@ typedef struct qlist_head list_t;
/* BMX CONN_[REQ|ACK] msgs use the 64-bits of the match info as follows:
* Bits Description
* 60-63 Msg type
- * 56-59 Reserved for credits if implemented
- * 52-55 Reserved
+ * 52-59 Reserved
* 32-51 Peer id (to use when contacting the sender)
* 0-31 Version
*/
-#define BMX_MAX_PEER_ID ((1<<20) - 1) /* 20 bits - actually 1,048,574 peers
+#define BMX_MSG_SHIFT 60
+#define BMX_ID_SHIFT 32
+#define BMX_MASK_ALL (~0ULL)
+#define BMX_MASK_MSG (0xFULL << BMX_MSG_SHIFT)
+
+#define BMX_MAX_PEER_ID ((1<<20) - 1) /* 20 bits - actually 1,048,574 peers
1 to 1,048,575 */
+#define BMX_MAX_TAG (~0U) /* 32 bits */
+
#define BMX_TIMEOUT (20 * 1000) /* msg timeout in milliseconds */
@@ -146,6 +151,7 @@ struct bmx_data
uint32_t bmx_board; /* my MX board index */
uint32_t bmx_ep_id; /* my MX endpoint ID */
mx_endpoint_t bmx_ep; /* my MX endpoint */
+ uint32_t bmx_sid; /* my MX session id */
int bmx_is_server; /* am I a server? */
list_t bmx_peers; /* list of all peers */
@@ -162,6 +168,11 @@ struct bmx_data
list_t bmx_canceled; /* canceled reqs waiting for test */
gen_mutex_t bmx_canceled_lock; /* canceled list lock */
+ list_t bmx_unex_txs; /* completed unexpected sends */
+ gen_mutex_t bmx_unex_txs_lock; /* completed unexpected sends lock */
+ list_t bmx_unex_rxs; /* completed unexpected recvs */
+ gen_mutex_t bmx_unex_rxs_lock; /* completed unexpected recvs lock */
+
uint32_t bmx_next_id; /* for the next peer_id */
gen_mutex_t bmx_lock; /* global lock - use for global rxs,
global txs, next_id, etc. */
@@ -190,20 +201,21 @@ enum bmx_peer_state {
struct bmx_method_addr
{
- struct method_addr *mxm_map; /* peer's method_addr */
- const char *mxm_peername; /* mx://hostname/board/ep_id */
- const char *mxm_hostname; /* peer's hostname */
- uint32_t mxm_board; /* peer's MX board index */
- uint32_t mxm_ep_id; /* peer's MX endpoint ID */
- struct bmx_peer *mxm_peer; /* peer pointer */
+ struct bmi_method_addr *mxm_map; /* peer's bmi_method_addrt */
+ const char *mxm_peername; /* mx://hostname/board/ep_id */
+ const char *mxm_hostname; /* peer's hostname */
+ uint32_t mxm_board; /* peer's MX board index */
+ uint32_t mxm_ep_id; /* peer's MX endpoint ID */
+ struct bmx_peer *mxm_peer; /* peer pointer */
};
struct bmx_peer
{
- struct method_addr *mxp_map; /* his method_addr * */
+ struct bmi_method_addr *mxp_map; /* his bmi_method_addr * */
struct bmx_method_addr *mxp_mxmap; /* his bmx_method_addr */
uint64_t mxp_nic_id; /* his NIC id */
mx_endpoint_addr_t mxp_epa; /* his MX endpoint address */
+ uint32_t mxp_sid; /* his MX session id */
int mxp_exist; /* have we connected before? */
enum bmx_peer_state mxp_state; /* INIT, WAIT, READY, DISCONNECT */
@@ -292,7 +304,7 @@ struct bmx_connreq
#if BMX_DEBUG
/* set the mask to the BMX_DB_* errors that you want gossip to report */
-#define BMX_DB_MASK (BMX_DB_ERR|BMX_DB_WARN)
+#define BMX_DB_MASK (BMX_DB_ERR|BMX_DB_WARN|BMX_DB_ALL)
#define debug(lvl,fmt,args...) \
do { \
if (lvl & BMX_DB_MASK) { \
More information about the Pvfs2-cvs
mailing list