[Pvfs2-cvs] commit by pcarns in pvfs2-1/src/io/bmi/bmi_mx: module.mk.in mx.c mx.h

CVS commit program cvs at parl.clemson.edu
Mon Apr 7 11:07:25 EDT 2008


Update of /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx
In directory parlweb1:/tmp/cvs-serv26608/src/io/bmi/bmi_mx

Modified Files:
      Tag: small-file-branch
	module.mk.in mx.c mx.h 
Log Message:
syncing small-file-branch back up with trunk at small-file-branch-point2 tag
(reverse merge)


Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/module.mk.in,v
diff -p -u -r1.1 -r1.1.20.1
--- module.mk.in	13 Feb 2007 18:39:39 -0000	1.1
+++ module.mk.in	7 Apr 2008 15:07:24 -0000	1.1.20.1
@@ -1,10 +1,30 @@
-BUILD_MX = @BUILD_MX@
+#
+# Makefile stub for bmi_mx.
+#
+# Copyright (C) 2008 Pete Wyckoff <pw at osc.edu>
+#
+# See COPYING in top-level directory.
+#
 
-# only build MX module if configure detected MX 
-ifdef BUILD_MX
-	DIR := src/io/bmi/bmi_mx
-	LIBSRC += \
-		$(DIR)/mx.c
-	SERVERSRC += \
-		$(DIR)/mx.c
-endif
+# only do any of this if configure decided to use MX
+ifneq (,$(BUILD_MX))
+
+#
+# Local definitions.
+#
+DIR := src/io/bmi/bmi_mx
+cfiles := mx.c
+
+#
+# Export these to the top Makefile to tell it what to build.
+#
+src := $(patsubst %,$(DIR)/%,$(cfiles))
+LIBSRC    += $(src)
+SERVERSRC += $(src)
+
+#
+# Extra cflags for files in this directory.
+#
+MODCFLAGS_$(DIR) := -I at MX_INCDIR@ 
+
+endif  # BUILD_MX

Index: mx.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/mx.c,v
diff -p -u -r1.11 -r1.11.2.1
--- mx.c	30 Nov 2007 19:33:16 -0000	1.11
+++ mx.c	7 Apr 2008 15:07:24 -0000	1.11.2.1
@@ -34,10 +34,12 @@ bmx_unexpected_recv(void *context, mx_en
 
 static int
 bmx_peer_connect(struct bmx_peer *peer);
+static void
+bmx_create_peername(void);
 
 /**** TX/RX handling functions **************************************/
 
-void
+static void
 bmx_ctx_free(struct bmx_ctx *ctx)
 {
         if (ctx == NULL) return;
@@ -60,7 +62,7 @@ bmx_ctx_free(struct bmx_ctx *ctx)
         return;
 }
 
-int
+static int
 bmx_ctx_alloc(struct bmx_ctx **ctxp, enum bmx_req_type type)
 {
         struct bmx_ctx *ctx     = NULL;
@@ -142,7 +144,7 @@ bmx_ctx_alloc(struct bmx_ctx **ctxp, enu
         return 0;
 }
 
-void
+static void
 bmx_ctx_init(struct bmx_ctx *ctx)
 {
         struct bmx_peer *peer   = NULL;
@@ -191,7 +193,7 @@ bmx_ctx_init(struct bmx_ctx *ctx)
 }
 
 /* add to peer's queued txs/rxs list */
-void
+static void
 bmx_q_ctx(struct bmx_ctx *ctx)
 {
         struct bmx_peer *peer = ctx->mxc_peer;
@@ -206,7 +208,7 @@ bmx_q_ctx(struct bmx_ctx *ctx)
 }
 
 /* remove from peer's queued txs/rxs list */
-void
+static void
 bmx_deq_ctx(struct bmx_ctx *ctx)
 {
         struct bmx_peer *peer = ctx->mxc_peer;
@@ -220,7 +222,7 @@ bmx_deq_ctx(struct bmx_ctx *ctx)
 }
 
 /* add to peer's pending rxs list */
-void
+static void
 bmx_q_pending_ctx(struct bmx_ctx *ctx)
 {
         struct bmx_peer *peer = ctx->mxc_peer;
@@ -237,7 +239,7 @@ bmx_q_pending_ctx(struct bmx_ctx *ctx)
 }
 
 /* remove from peer's pending rxs list */
-void
+static void
 bmx_deq_pending_ctx(struct bmx_ctx *ctx)
 {
         struct bmx_peer *peer = ctx->mxc_peer;
@@ -256,7 +258,7 @@ bmx_deq_pending_ctx(struct bmx_ctx *ctx)
 }
 
 /* add to the global canceled list */
-void
+static void
 bmx_q_canceled_ctx(struct bmx_ctx *ctx, bmi_error_code_t error)
 {
         ctx->mxc_state = BMX_CTX_CANCELED;
@@ -270,7 +272,7 @@ bmx_q_canceled_ctx(struct bmx_ctx *ctx, 
         return;
 }
 
-struct bmx_ctx *
+static struct bmx_ctx *
 bmx_get_idle_rx(void)
 {
         struct bmx_ctx  *rx     = NULL;
@@ -301,7 +303,7 @@ bmx_get_idle_rx(void)
         return rx;
 }
 
-void
+static void
 bmx_put_idle_rx(struct bmx_ctx *rx)
 {
         if (rx == NULL) {
@@ -326,7 +328,7 @@ bmx_put_idle_rx(struct bmx_ctx *rx)
         return;
 }
 
-void
+static void
 bmx_reduce_idle_rxs(int count)
 {
         int              i      = 0;
@@ -342,7 +344,7 @@ bmx_reduce_idle_rxs(int count)
         return;
 }
 
-struct bmx_ctx *
+static struct bmx_ctx *
 bmx_get_idle_tx(void)
 {
         struct bmx_ctx  *tx     = NULL;
@@ -373,7 +375,7 @@ bmx_get_idle_tx(void)
         return tx;
 }
 
-void
+static void
 bmx_put_idle_tx(struct bmx_ctx *tx)
 {
         if (tx == NULL) {
@@ -400,7 +402,7 @@ bmx_put_idle_tx(struct bmx_ctx *tx)
 
 /**** peername parsing functions **************************************/
 
-int
+static int
 bmx_verify_hostname(char *host)
 {
         int             ret     = 0;
@@ -423,7 +425,7 @@ bmx_verify_hostname(char *host)
         return 0;
 }
 
-int
+static int
 bmx_verify_num_str(char *num_str)
 {
         int             ret     = 0;
@@ -447,11 +449,10 @@ bmx_verify_num_str(char *num_str)
  * returns 0 and we do not know that it failed. 
  * This handles legal hostnames (1-63 chars) include a-zA-Z0-9 as well as . and -
  * It will accept IPv4 addresses but not IPv6 (too many semicolons) */
-int
+static int
 bmx_parse_peername(const char *peername, char **hostname, uint32_t *board, uint32_t *ep_id)
 {
         int             ret             = 0;
-        int             len             = 0;
         int             colon1_found    = 0;
         int             colon2_found    = 0;
         char           *s               = NULL;
@@ -510,7 +511,6 @@ bmx_parse_peername(const char *peername,
                     NULL != strchr(colon2, ':')) {
                         debug(BMX_DB_INFO, "parse_peername() too many ':' (%s %s)", 
                                            colon1, colon2);
-                        len = sizeof(*s);
                         free(s);
                         return -1;
                 }
@@ -522,7 +522,6 @@ bmx_parse_peername(const char *peername,
                 free(s);
                 return -1;
         }
-        strcpy(host, s);
 
         if (colon1_found) {
                 bd = (uint32_t) strtol(colon1, NULL, 0);
@@ -564,7 +563,7 @@ bmx_parse_peername(const char *peername,
 
 /**** peer handling functions **************************************/
 
-void
+static void
 bmx_peer_free(struct bmx_peer *peer)
 {
         struct bmx_method_addr *mxmap = peer->mxp_mxmap;
@@ -586,7 +585,7 @@ bmx_peer_free(struct bmx_peer *peer)
         return;
 }
 
-void
+static void
 bmx_peer_addref(struct bmx_peer *peer)
 {
         gen_mutex_lock(&peer->mxp_lock);
@@ -596,9 +595,10 @@ bmx_peer_addref(struct bmx_peer *peer)
         return;
 }
 
-void
+static void
 bmx_peer_decref(struct bmx_peer *peer)
 {
+        debug(BMX_DB_FUNC, "entering %s", __func__);
         gen_mutex_lock(&peer->mxp_lock);
         if (peer->mxp_refcount == 0) {
                 debug(BMX_DB_WARN, "peer_decref() called for %s when refcount == 0",
@@ -607,6 +607,8 @@ bmx_peer_decref(struct bmx_peer *peer)
         peer->mxp_refcount--;
         if (peer->mxp_refcount == 1 && peer->mxp_state == BMX_PEER_DISCONNECT) {
                 /* all txs and rxs are completed or canceled, reset state */
+                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
+                                   peer->mxp_mxmap->mxm_peername);
                 peer->mxp_state = BMX_PEER_INIT;
         }
         gen_mutex_unlock(&peer->mxp_lock);
@@ -622,10 +624,11 @@ bmx_peer_decref(struct bmx_peer *peer)
                 gen_mutex_unlock(&bmi_mx->bmx_lock);
                 bmx_peer_free(peer);
         }
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
         return;
 }
 
-int
+static int
 bmx_peer_alloc(struct bmx_peer **peerp, struct bmx_method_addr *mxmap)
 {
         int              i              = 0;
@@ -697,6 +700,9 @@ bmx_peer_alloc(struct bmx_peer **peerp, 
                 bmx_put_idle_rx(rx);
         }
 
+        /* on servers with server-to-server comms, we are racing 
+         * between method_addr_lookup() and handle_conn_req() */
+
         bmx_peer_addref(peer); /* for the peers list */
         gen_mutex_lock(&bmi_mx->bmx_peers_lock);
         qlist_add_tail(&peer->mxp_list, &bmi_mx->bmx_peers);
@@ -708,11 +714,13 @@ bmx_peer_alloc(struct bmx_peer **peerp, 
         return 0;
 }
 
-int
+static int
 bmx_peer_init_state(struct bmx_peer *peer)
 {
         int             ret     = 0;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         gen_mutex_lock(&peer->mxp_lock);
 
         /* we have a ref for each pending tx and rx, don't init
@@ -722,18 +730,22 @@ bmx_peer_init_state(struct bmx_peer *pee
                 ret = -1;
         } else {
                 /* ok to init */
+                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
+                                   peer->mxp_mxmap->mxm_peername);
                 peer->mxp_state = BMX_PEER_INIT;
         }
 
         gen_mutex_unlock(&peer->mxp_lock);
 
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return 0;
 }
 
 /**** startup/shutdown functions **************************************/
 
 /* init bmi_mx */
-int
+static int
 bmx_globals_init(int method_id)
 {
 #if BMX_MEM_ACCT
@@ -752,6 +764,7 @@ bmx_globals_init(int method_id)
         /* bmi_mx->bmx_board */
         /* bmi_mx->bmx_ep_id */
         /* bmi_mx->bmx_ep */
+        /* bmi_mx->bmx_sid */
         /* bmi_mx->bmx_is_server */
 
         INIT_QLIST_HEAD(&bmi_mx->bmx_peers);
@@ -768,6 +781,11 @@ bmx_globals_init(int method_id)
         INIT_QLIST_HEAD(&bmi_mx->bmx_canceled);
         gen_mutex_init(&bmi_mx->bmx_canceled_lock);
 
+        INIT_QLIST_HEAD(&bmi_mx->bmx_unex_txs);
+        gen_mutex_init(&bmi_mx->bmx_unex_txs_lock);
+        INIT_QLIST_HEAD(&bmi_mx->bmx_unex_rxs);
+        gen_mutex_init(&bmi_mx->bmx_unex_rxs_lock);
+
         bmi_mx->bmx_next_id = 1;
         gen_mutex_init(&bmi_mx->bmx_lock);      /* global lock, use for global txs,
                                                    global rxs, next_id, etc. */
@@ -787,7 +805,7 @@ bmx_globals_init(int method_id)
 }
 
 
-int
+static int
 bmx_open_endpoint(mx_endpoint_t *ep, uint32_t board, uint32_t ep_id)
 {
         mx_return_t     mxret   = MX_SUCCESS;
@@ -801,7 +819,7 @@ bmx_open_endpoint(mx_endpoint_t *ep, uin
          * matching recvs. */
         param.key = MX_PARAM_CONTEXT_ID;
         param.val.context_id.bits = 4;
-        param.val.context_id.shift = 60;
+        param.val.context_id.shift = BMX_MSG_SHIFT;
 
         mxret = mx_open_endpoint(board, ep_id, BMX_MAGIC,
                                  &param, 1, ep);
@@ -840,6 +858,8 @@ BMI_mx_initialize(bmi_method_addr_p list
         int             ret     = 0;
         mx_return_t     mxret   = MX_SUCCESS;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
 #if BMX_LOGGING
         MPE_Init_log();
 #define BMX_LOG_STATE 1
@@ -907,11 +927,16 @@ BMI_mx_initialize(bmi_method_addr_p list
         if (init_flags & BMI_INIT_SERVER) {
                 struct bmx_ctx         *rx     = NULL;
                 struct bmx_method_addr *mxmap  = listen_addr->method_data;
+                mx_endpoint_addr_t      epa;
+                uint32_t                ep_id   = 0;
+                uint32_t                sid     = 0;
+                uint64_t                nic_id  = 0ULL;
 
                 bmi_mx->bmx_hostname = (char *) mxmap->mxm_hostname;
                 bmi_mx->bmx_board = mxmap->mxm_board;
                 bmi_mx->bmx_ep_id = mxmap->mxm_ep_id;
                 bmi_mx->bmx_is_server = 1;
+                bmx_create_peername();
 
                 ret = bmx_open_endpoint(&bmi_mx->bmx_ep, mxmap->mxm_board, mxmap->mxm_ep_id);
                 if (ret != 0) {
@@ -919,6 +944,12 @@ BMI_mx_initialize(bmi_method_addr_p list
                         BMX_FREE(bmi_mx, sizeof(*bmi_mx));
                         exit(1);
                 }
+
+                /* get our MX session id */
+                mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
+                mx_decompose_endpoint_addr2(epa, &nic_id, &ep_id, &sid);
+                bmi_mx->bmx_sid = sid;
+
                 /* We allocate BMX_PEER_RX_NUM when we peer_alloc()
                  * Allocate some here to catch the peer CONN_REQ */
                 for (i = 0; i < BMX_SERVER_RXS; i++) {
@@ -948,6 +979,8 @@ BMI_mx_initialize(bmi_method_addr_p list
 #if BMX_MEM_ACCT
         debug(BMX_DB_MEM, "memory used at end of initialization %lld", llu(mem_used));
 #endif
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return 0;
 }
 
@@ -1017,6 +1050,9 @@ BMI_mx_finalize(void)
         }
 #endif
 
+        if (bmi_mx->bmx_hostname) free(bmi_mx->bmx_hostname);
+        if (bmi_mx->bmx_peername) free(bmi_mx->bmx_peername);
+
         bmi_mx = NULL;
 
         gen_mutex_unlock(&tmp->bmx_lock);
@@ -1060,6 +1096,8 @@ bmx_peer_disconnect(struct bmx_peer *pee
                 gen_mutex_unlock(&peer->mxp_lock);
                 return;
         }
+        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_DISCONNECT",
+                           peer->mxp_mxmap->mxm_peername);
         peer->mxp_state = BMX_PEER_DISCONNECT;
 
         /* cancel queued txs */
@@ -1115,10 +1153,16 @@ BMI_mx_set_info(int option, void *inout_
                                         peer = mxmap->mxm_peer;
                                         bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
                                 }
-                                if (!mxmap->mxm_peername) free((void *) mxmap->mxm_peername);
-                                mxmap->mxm_peername = NULL;
-                                if (!mxmap->mxm_hostname) free((void *) mxmap->mxm_hostname);
-                                mxmap->mxm_hostname = NULL;
+                                if (mxmap->mxm_peername) {
+                                        debug(BMX_DB_MEM, "freeing mxm_peername");
+                                        free((void *) mxmap->mxm_peername);
+                                        mxmap->mxm_peername = NULL;
+                                }
+                                if (mxmap->mxm_hostname) {
+                                        debug(BMX_DB_MEM, "freeing mxm_hostname");
+                                        free((void *) mxmap->mxm_hostname);
+                                        mxmap->mxm_hostname = NULL;
+                                }
                                 debug(BMX_DB_PEER, "freeing map 0x%p", map);
                                 free(map);
                         }
@@ -1159,7 +1203,7 @@ BMI_mx_get_info(int option, void *inout_
 #define BMX_IO_BUF      1
 #define BMX_UNEX_BUF    2
 
-void *
+static void *
 bmx_memalloc(bmi_size_t size, int type)
 {
         void                    *buf    = NULL;
@@ -1290,16 +1334,16 @@ BMI_mx_unexpected_free(void *buf)
         return 0;
 }
 
-void
+static void
 bmx_parse_match(uint64_t match, uint8_t *type, uint32_t *id, uint32_t *tag)
 {
-        *type   = (uint8_t)  (match >> 60);
-        *id     = (uint32_t) ((match >> 32) & 0xFFFFF); /* 20 bits */
-        *tag    = (uint32_t) (match & 0xFFFFFFFF);
+        *type   = (uint8_t)  (match >> BMX_MSG_SHIFT);
+        *id     = (uint32_t) ((match >> BMX_ID_SHIFT) & BMX_MAX_PEER_ID); /* 20 bits */
+        *tag    = (uint32_t) (match & BMX_MAX_TAG); /* 32 bits */
         return;
 }
 
-void
+static void
 bmx_create_match(struct bmx_ctx *ctx)
 {
         int             connect = 0;
@@ -1330,12 +1374,12 @@ bmx_create_match(struct bmx_ctx *ctx)
                 exit(1);
         }
 
-        ctx->mxc_match = (type << 60) | (id << 32) | tag;
+        ctx->mxc_match = (type << BMX_MSG_SHIFT) | (id << BMX_ID_SHIFT) | tag;
 
         return;
 }
 
-bmi_error_code_t
+static bmi_error_code_t
 bmx_mx_to_bmi_errno(enum mx_status_code code)
 {
         int     err     = 0;
@@ -1415,6 +1459,8 @@ bmx_ensure_connected(struct bmx_method_a
         int             ret     = 0;
         struct bmx_peer *peer   = mxmap->mxm_peer;
 
+        /* NOTE: can this happen? we call peer_alloc() when using 
+         * method_addr_lookup() */
         if (peer == NULL) {
                 ret = bmx_peer_alloc(&peer, mxmap);
                 if (ret != 0) {
@@ -1557,10 +1603,15 @@ BMI_mx_post_send(bmi_op_id_t *id, struct
                  enum bmi_buffer_type buffer_flag __unused,
                  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
+        int ret = 0;
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
+        ret = bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
                                     tag, user_ptr, context_id, 0);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 static int
@@ -1569,10 +1620,16 @@ BMI_mx_post_send_list(bmi_op_id_t *id, s
                       bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
                       bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
+        int ret = 0;
+
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_send_common(id, remote_map, list_count, buffers, sizes, 
+        ret = bmx_post_send_common(id, remote_map, list_count, buffers, sizes, 
                                     total_size, tag, user_ptr, context_id, 0);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 static int
@@ -1581,10 +1638,16 @@ BMI_mx_post_sendunexpected(bmi_op_id_t *
                  enum bmi_buffer_type buffer_flag __unused,
                  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
+        int ret = 0;
+
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
+        ret = bmx_post_send_common(id, remote_map, 1, &buffer, &size, size,
                                     tag, user_ptr, context_id, 1);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 static int
@@ -1593,10 +1656,16 @@ BMI_mx_post_sendunexpected_list(bmi_op_i
                   bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
                   bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
+        int ret = 0;
+
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_send_common(id, remote_map, list_count, buffers, sizes, 
+        ret = bmx_post_send_common(id, remote_map, list_count, buffers, sizes, 
                                     total_size, tag, user_ptr, context_id, 1);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 /* if (peer->mxp_state == BMX_PEER_READY)
@@ -1625,7 +1694,7 @@ bmx_post_rx(struct bmx_ctx *rx)
                         segs = rx->mxc_seg_list;
                 }
                 mxret = mx_irecv(bmi_mx->bmx_ep, segs, rx->mxc_nseg,
-                                 rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+                                 rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
                 if (mxret != MX_SUCCESS) {
                         ret = -BMI_ENOMEM;
                         bmx_deq_pending_ctx(rx);        /* uses peer lock */
@@ -1742,10 +1811,16 @@ BMI_mx_post_recv(bmi_op_id_t *id, struct
                  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
                  bmi_context_id context_id)
 {
+        int ret = 0;
+
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_recv_common(id, remote_map, 1, &buffer, &expected_len,
+        ret = bmx_post_recv_common(id, remote_map, 1, &buffer, &expected_len,
                                     expected_len, tag, user_ptr, context_id);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 static int
@@ -1755,10 +1830,16 @@ BMI_mx_post_recv_list(bmi_op_id_t *id, s
                enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
                bmi_context_id context_id)
 {
+        int ret = 0;
+
         debug(BMX_DB_FUNC, "entering %s", __func__);
 
-        return bmx_post_recv_common(id, remote_map, list_count, buffers, sizes,
+        ret = bmx_post_recv_common(id, remote_map, list_count, buffers, sizes,
                                     tot_expected_len, tag, user_ptr, context_id);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
+        return ret;
 }
 
 static void
@@ -1767,6 +1848,8 @@ bmx_peer_post_queued_rxs(struct bmx_peer
         struct bmx_ctx  *rx             = NULL;
         list_t          *queued_rxs     = &peer->mxp_queued_rxs;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         gen_mutex_lock(&peer->mxp_lock);
         while (!qlist_empty(queued_rxs)) {
                 if (peer->mxp_state != BMX_PEER_READY) {
@@ -1781,6 +1864,8 @@ bmx_peer_post_queued_rxs(struct bmx_peer
         }
         gen_mutex_unlock(&peer->mxp_lock);
 
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return;
 }
 
@@ -1790,6 +1875,8 @@ bmx_peer_post_queued_txs(struct bmx_peer
         struct bmx_ctx  *tx             = NULL;
         list_t          *queued_txs     = &peer->mxp_queued_txs;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         gen_mutex_lock(&peer->mxp_lock);
         while (!qlist_empty(queued_txs)) {
                 if (peer->mxp_state != BMX_PEER_READY) {
@@ -1806,6 +1893,8 @@ bmx_peer_post_queued_txs(struct bmx_peer
         }
         gen_mutex_unlock(&peer->mxp_lock);
 
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return;
 }
 
@@ -1817,8 +1906,11 @@ bmx_post_unexpected_recv(mx_endpoint_add
         int             ret     = 0;
         struct bmx_ctx  *rx     = NULL;
         struct bmx_peer *peer   = NULL;
+        void            *peerp  = (void *) &peer;
         mx_return_t     mxret   = MX_SUCCESS;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         if (id == 0 && tag == 0 && type == 0) {
                 bmx_parse_match(match, &type, &id, &tag);
         }
@@ -1829,7 +1921,8 @@ bmx_post_unexpected_recv(mx_endpoint_add
 
         rx = bmx_get_idle_rx();
         if (rx != NULL) {
-                mx_get_endpoint_addr_context(source, (void **) &peer);
+                mx_get_endpoint_addr_context(source, &peerp);
+                peer = (struct bmx_peer *) peerp;
                 if (peer == NULL) {
                         debug(BMX_DB_PEER, "unknown peer sent message 0x%llx "
                                         "length %u", llu(match), length);
@@ -1853,7 +1946,7 @@ bmx_post_unexpected_recv(mx_endpoint_add
                 debug(BMX_DB_CTX, "%s rx match= 0x%llx length= %lld", __func__,
                                 llu(rx->mxc_match), lld(rx->mxc_nob));
                 mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
-                                 rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+                                 rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
                 if (mxret != MX_SUCCESS) {
                         debug((BMX_DB_MX|BMX_DB_CTX), "mx_irecv() failed with %s for an "
                                         "unexpected recv with tag %d length %d",
@@ -1865,6 +1958,8 @@ bmx_post_unexpected_recv(mx_endpoint_add
                 ret = -1;
         }
 
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return ret;
 }
 
@@ -1885,6 +1980,7 @@ bmx_unexpected_recv(void *context, mx_en
         uint32_t                id      = 0;
         uint32_t                tag     = 0;
         struct bmx_peer         *peer   = NULL;
+        void                    *peerp  = &peer;
         mx_return_t             mxret   = MX_SUCCESS;
 
         bmx_parse_match(match_value, &type, &id, &tag);
@@ -1909,7 +2005,7 @@ bmx_unexpected_recv(void *context, mx_en
                         debug(BMX_DB_CONN, "%s rx match= 0x%llx length= %lld", 
                                         __func__, llu(rx->mxc_match), lld(rx->mxc_nob));
                         mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
-                                         rx->mxc_match, -1ULL, (void *) rx, &rx->mxc_mxreq);
+                                         rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
                         if (mxret != MX_SUCCESS) {
                                 debug(BMX_DB_CONN, "mx_irecv() failed for an "
                                                 "unexpected recv with %s", 
@@ -1923,11 +2019,8 @@ bmx_unexpected_recv(void *context, mx_en
                 break;
         case BMX_MSG_CONN_ACK:
                 /* the server is replying to our CONN_REQ */
-                if (bmi_mx->bmx_is_server) {
-                        debug(BMX_DB_ERR, "server receiving CONN_ACK");
-                        exit(1);
-                }
-                mx_get_endpoint_addr_context(source, (void **) &peer);
+                mx_get_endpoint_addr_context(source, &peerp);
+                peer = (struct bmx_peer *) peerp;
                 if (peer == NULL) {
                         debug((BMX_DB_CONN|BMX_DB_PEER), "receiving CONN_ACK but "
                                         "the endpoint context does not have a peer");
@@ -1952,7 +2045,9 @@ bmx_unexpected_recv(void *context, mx_en
                 break;
         case BMX_MSG_UNEXPECTED:
                 if (!bmi_mx->bmx_is_server) {
-                        mx_get_endpoint_addr_context(source, (void **) &peer);
+                        void *peerp = &peer;
+                        mx_get_endpoint_addr_context(source, &peerp);
+                        peer = (struct bmx_peer *) peerp;
                         debug(BMX_DB_ERR, "client receiving unexpected message "
                                 "from %s with mask 0x%llx length %u",
                                 peer == NULL ? "unknown" : peer->mxp_mxmap->mxm_peername,
@@ -1987,6 +2082,8 @@ bmx_alloc_method_addr(const char *peerna
         struct bmi_method_addr      *map            = NULL;
         struct bmx_method_addr  *mxmap          = NULL;
 
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         if (bmi_mx == NULL) {
                 map = bmi_alloc_method_addr(
                     tmp_id, (bmi_size_t) sizeof(*mxmap));
@@ -2003,6 +2100,8 @@ bmx_alloc_method_addr(const char *peerna
         mxmap->mxm_ep_id = ep_id;
         /* mxmap->mxm_peer */
 
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return map;
 }
 
@@ -2019,10 +2118,9 @@ bmx_handle_icon_req(void)
 {
         uint32_t        result  = 0;
 
-        if (bmi_mx->bmx_is_server) return;
         do {
-                uint64_t        match   = (uint64_t) BMX_MSG_ICON_REQ << 60;
-                uint64_t        mask    = (uint64_t) 0xF << 60;
+                uint64_t        match   = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
+                uint64_t        mask    = BMX_MASK_MSG;
                 mx_status_t     status;
 
                 mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2092,7 +2190,7 @@ bmx_handle_icon_req(void)
         return;
 }
 
-/* test for CONN_REQ messages (on the server)
+/* test for received CONN_REQ messages (on the server)
  * if found
  *    create peer
  *    create mxmap
@@ -2102,9 +2200,9 @@ static void
 bmx_handle_conn_req(void)
 {
         uint32_t        result  = 0;
-        uint64_t        match   = (uint64_t) BMX_MSG_CONN_REQ << 60;
-        uint64_t        mask    = (uint64_t) 0xF << 60;
-        uint64_t        ack     = (uint64_t) BMX_MSG_ICON_ACK << 60;
+        uint64_t        match   = (uint64_t) BMX_MSG_CONN_REQ << BMX_MSG_SHIFT;
+        uint64_t        mask    = BMX_MASK_MSG;
+        uint64_t        ack     = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
         mx_status_t     status;
 
         do {
@@ -2112,7 +2210,10 @@ bmx_handle_conn_req(void)
                 if (result) {
                         uint8_t                 type    = 0;
                         uint32_t                id      = 0;
+                        uint32_t                sid     = 0;
                         uint32_t                version = 0;
+                        uint64_t                nic_id  = 0ULL;
+                        uint32_t                ep_id   = 0;
                         mx_request_t            request;
                         struct bmx_ctx          *rx     = NULL;
                         struct bmx_peer         *peer   = NULL;
@@ -2137,7 +2238,6 @@ bmx_handle_conn_req(void)
                                 continue;
                         }
                         bmx_parse_match(rx->mxc_match, &type, &id, &version);
-
                         if (version != BMX_VERSION) {
                                 /* TODO send error conn_ack */
                                 debug(BMX_DB_WARN, "version mismatch with peer "
@@ -2154,7 +2254,13 @@ bmx_handle_conn_req(void)
                                 bmx_put_idle_rx(rx);
                                 continue;
                         }
-                        mx_get_endpoint_addr_context(status.source, (void **) &peer);
+                        mx_decompose_endpoint_addr2(status.source, &nic_id,
+                                                    &ep_id, &sid);
+                        {
+                                void *peerp = &peer;
+                                mx_get_endpoint_addr_context(status.source, &peerp);
+                                peer = (struct bmx_peer *) peerp;
+                        }
                         if (peer == NULL) { /* new peer */
                                 int             ret             = 0;
                                 char           *host            = NULL;
@@ -2194,16 +2300,21 @@ bmx_handle_conn_req(void)
                                         bmx_put_idle_rx(rx);
                                         continue;
                                 }
-                        } else { /* reconnecting peer */
+                        } else if (sid != peer->mxp_sid) { /* reconnecting peer */
                                 /* cancel queued txs and rxs, pending rxs */
-                                debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer %s reconnecting",
-                                                __func__, peer->mxp_mxmap->mxm_peername);
-                                bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
+                                debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer "
+                                      "%s reconnecting", __func__, 
+                                      peer->mxp_mxmap->mxm_peername);
+                                if (peer->mxp_state == BMX_PEER_READY)
+                                        bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
                                 mxmap = peer->mxp_mxmap;
                         }
                         gen_mutex_lock(&peer->mxp_lock);
+                        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
+                                           peer->mxp_mxmap->mxm_peername);
                         peer->mxp_state = BMX_PEER_WAIT;
                         peer->mxp_tx_id = id;
+                        peer->mxp_sid = sid;
                         gen_mutex_unlock(&peer->mxp_lock);
                         bmx_peer_addref(peer); /* add ref until completion of CONN_ACK */
                         mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
@@ -2232,8 +2343,8 @@ bmx_handle_icon_ack(void)
 
         if (!bmi_mx->bmx_is_server) return;
         do {
-                uint64_t        match   = (uint64_t) BMX_MSG_ICON_ACK << 60;
-                uint64_t        mask    = (uint64_t) 0xF << 60;
+                uint64_t        match   = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
+                uint64_t        mask    = BMX_MASK_MSG;
                 mx_status_t     status;
 
                 mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2255,6 +2366,8 @@ bmx_handle_icon_ack(void)
                         }
                         gen_mutex_lock(&peer->mxp_lock);
                         peer->mxp_epa = status.source;
+                        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_READY",
+                                           peer->mxp_mxmap->mxm_peername);
                         peer->mxp_state = BMX_PEER_READY;
                         /* NOTE no need to call bmx_peer_post_queued_[rxs|txs]()
                          * since the server should not have any queued msgs */
@@ -2288,6 +2401,8 @@ bmx_handle_icon_ack(void)
                         mx_isend(bmi_mx->bmx_ep, &tx->mxc_seg, tx->mxc_nseg, peer->mxp_epa,
                                  tx->mxc_match, (void *) tx, &tx->mxc_mxreq);
                         if (!peer->mxp_exist) {
+                                debug(BMX_DB_PEER, "calling bmi_method_addr_reg_callback"
+                                      "on %s", peer->mxp_mxmap->mxm_peername);
                                 bmi_method_addr_reg_callback(peer->mxp_map);
                                 peer->mxp_exist = 1;
                         }
@@ -2307,10 +2422,10 @@ bmx_handle_conn_ack(void)
         uint32_t        result  = 0;
         struct bmx_ctx  *tx     = NULL;
 
-        if (!bmi_mx->bmx_is_server) return;
+        if (!bmi_mx->bmx_is_server) goto out;
         do {
-                uint64_t        match   = (uint64_t) BMX_MSG_CONN_ACK << 60;
-                uint64_t        mask    = (uint64_t) 0xF << 60;
+                uint64_t        match   = (uint64_t) BMX_MSG_CONN_ACK << BMX_MSG_SHIFT;
+                uint64_t        mask    = BMX_MASK_MSG;
                 mx_status_t     status;
 
                 mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2323,17 +2438,26 @@ bmx_handle_conn_ack(void)
                 }
         } while (result);
 
+out:
         return;
 }
 
 static void
 bmx_connection_handlers(void)
 {
+        static int      count   = 0;
+        int             print   = (count++ % 1000 == 0);
+
+        if (print)
+                debug(BMX_DB_FUNC, "entering %s", __func__);
+
         /* push connection messages along */
         bmx_handle_icon_req();
         bmx_handle_conn_req();
         bmx_handle_icon_ack();
         bmx_handle_conn_ack();
+        if (print)
+                debug(BMX_DB_FUNC, "leaving %s", __func__);
         return;
 }
 
@@ -2425,12 +2549,20 @@ BMI_mx_testcontext(int incount, bmi_op_i
 {
         int             i               = 0;
         int             completed       = 0;
+        int             old             = 0;
         uint64_t        match           = 0ULL;
-        uint64_t        mask            = (uint64_t) 0xF << 60;
+        uint64_t        mask            = BMX_MASK_MSG;
         struct bmx_ctx  *ctx            = NULL;
         struct bmx_peer *peer           = NULL;
         list_t          *canceled       = &bmi_mx->bmx_canceled;
         int             wait            = 0;
+        static int      count           = 0;
+        int             print           = 0;
+
+        if (count++ % 1000 == 0) {
+                debug(BMX_DB_FUNC, "entering %s", __func__);
+                print = 1;
+        }
 
         bmx_connection_handlers();
 
@@ -2465,11 +2597,13 @@ BMI_mx_testcontext(int incount, bmi_op_i
         /* return completed messages
          * we will always try (incount - completed) times even
          *     if some iterations have no result */
-        match = (uint64_t) BMX_MSG_EXPECTED << 60;
+
+        match = (uint64_t) BMX_MSG_EXPECTED << BMX_MSG_SHIFT;
         for (i = completed; i < incount; i++) {
                 uint32_t        result  = 0;
                 mx_status_t     status;
-                int             old     = completed;
+
+                old = completed;
 
                 if (wait == 0 || wait == 2) {
                         mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
@@ -2527,77 +2661,118 @@ BMI_mx_testcontext(int incount, bmi_op_i
         }
 
         /* check for completed unexpected sends */
-        match = (uint64_t) BMX_MSG_UNEXPECTED << 60;
-        if (!bmi_mx->bmx_is_server) { /* client */
-                int     old     = completed;
-                for (i = completed; i < incount; i++) {
-                        uint32_t        result  = 0;
-                        mx_status_t     status;
 
+        match = (uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT;
+
+        old = completed;
+
+        for (i = completed; i < incount; i++) {
+                uint32_t        result  = 0;
+                mx_status_t     status;
+                list_t          *l      = &bmi_mx->bmx_unex_txs;
+                int             again   = 1;
+
+                ctx = NULL;
+
+                gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock);
+                if (!qlist_empty(l)) {
+                        ctx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
+                        peer = ctx->mxc_peer;
+                        qlist_del_init(&ctx->mxc_list);
+                        result = 1;
+                }
+                gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock);
+
+                while (!ctx && again) {
+                        again = 0;
                         mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
                         if (result) {
                                 ctx = (struct bmx_ctx *) status.context;
                                 peer = ctx->mxc_peer;
-                                debug(BMX_DB_CTX, "%s completing unexpected %s with "
-                                                "match 0x%llx for %s with op_id %llu", 
-                                                __func__, 
-                                                ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
-                                                llu(ctx->mxc_match), 
-                                                peer->mxp_mxmap->mxm_peername,
-                                                llu(ctx->mxc_mop->op_id));
-        
-                                if (!qlist_empty(&ctx->mxc_list)) {
-                                        gen_mutex_lock(&peer->mxp_lock);
-                                        qlist_del_init(&ctx->mxc_list);
-                                        gen_mutex_unlock(&peer->mxp_lock);
-                                }
-                                outids[completed] = ctx->mxc_mop->op_id;
-                                if (status.code == MX_SUCCESS) {
-                                        errs[completed] = 0;
-                                        sizes[completed] = status.xfer_length;
-                                } else {
-                                        errs[completed] = bmx_mx_to_bmi_errno(status.code);
+                                gen_mutex_lock(&peer->mxp_lock);
+                                qlist_del_init(&ctx->mxc_list);
+                                gen_mutex_unlock(&peer->mxp_lock);
+                                if (ctx->mxc_type == BMX_REQ_RX) {
+                                        /* queue until testunexpected is called */
+                                        gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock);
+                                        qlist_add_tail(&bmi_mx->bmx_unex_rxs, &ctx->mxc_list);
+                                        gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock);
+                                        result = 0;
+                                        again = 1;
                                 }
-                                if (user_ptrs)
-                                        user_ptrs[completed] = ctx->mxc_mop->user_ptr;
-                                id_gen_fast_unregister(ctx->mxc_mop->op_id);
-                                BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));
-                                completed++;
+                        }
+                }
+                if (result) {
+                        debug(BMX_DB_CTX, "%s completing unexpected %s with "
+                                        "match 0x%llx for %s with op_id %llu", 
+                                        __func__, 
+                                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
+                                        llu(ctx->mxc_match), 
+                                        peer->mxp_mxmap->mxm_peername,
+                                        llu(ctx->mxc_mop->op_id));
+
+                        if (!qlist_empty(&ctx->mxc_list)) {
+                                gen_mutex_lock(&peer->mxp_lock);
+                                qlist_del_init(&ctx->mxc_list);
+                                gen_mutex_unlock(&peer->mxp_lock);
+                        }
+                        outids[completed] = ctx->mxc_mop->op_id;
+                        if (status.code == MX_SUCCESS) {
+                                errs[completed] = 0;
+                                sizes[completed] = status.xfer_length;
+                        } else {
+                                errs[completed] = bmx_mx_to_bmi_errno(status.code);
+                        }
+                        if (user_ptrs)
+                                user_ptrs[completed] = ctx->mxc_mop->user_ptr;
+                        id_gen_fast_unregister(ctx->mxc_mop->op_id);
+                        BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));
+                        completed++;
 #if BMX_LOGGING
-                                MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL);
+                        MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL);
 #endif
 
-                                if (ctx->mxc_type == BMX_REQ_TX) {
-                                        bmx_put_idle_tx(ctx);
-                                } else {
-                                        bmx_put_idle_rx(ctx);
-                                }
-                                bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */
-                        }
-                }
-                if (completed - old > 0) {
-                        debug(BMX_DB_CTX, "%s found %d unexpected tx messages", 
-                              __func__, completed - old);
+                        bmx_put_idle_tx(ctx);
+                        bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */
                 }
         }
+        if (completed - old > 0) {
+                debug(BMX_DB_CTX, "%s found %d unexpected tx messages", 
+              __func__, completed - old);
+        }
+
+        if (print)
+                debug(BMX_DB_FUNC, "leaving %s", __func__);
 
         *outcount = completed;
         return completed;
 }
 
+/* test for unexpected receives only, not unex sends */
 static int
 BMI_mx_testunexpected(int incount __unused, int *outcount,
             struct bmi_method_unexpected_info *ui, int max_idle_time __unused)
 {
         uint32_t        result  = 0;
-        uint64_t        match   = (uint64_t) BMX_MSG_UNEXPECTED << 60;
-        uint64_t        mask    = (uint64_t) 0xF << 60;
+        uint64_t        match   = ((uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT);
+        uint64_t        mask    = BMX_MASK_MSG;
         mx_status_t     status;
+        static int      count   = 0;
+        int             print   = 0;
+        struct bmx_ctx  *rx     = NULL;
+        struct bmx_peer *peer   = NULL;
+        list_t          *l      = &bmi_mx->bmx_unex_rxs;
+        int             again   = 1;
+
+        if (count++ % 1000 == 0) {
+                debug(BMX_DB_FUNC, "entering %s", __func__);
+                print = 1;
+        }
 
         bmx_connection_handlers();
 
         /* if the unexpected handler cannot get a rx, it does not post a receive.
-         * probe for unexpected and post a rx */
+         * probe for unexpected and post a rx. */
         mx_iprobe(bmi_mx->bmx_ep, match, mask, &status, &result);
         if (result) {
                 int     ret     = 0;
@@ -2609,13 +2784,39 @@ BMI_mx_testunexpected(int incount __unus
                 }
         }
 
-        /* check for unexpected messages */
+        /* check for unexpected receives */
         *outcount = 0;
-        mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
+
+        gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock);
+        if (!qlist_empty(l)) {
+                rx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
+                peer = rx->mxc_peer;
+                qlist_del_init(&rx->mxc_list);
+                result = 1;
+        }
+        gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock);
+
+        while (!rx && again) {
+                again = 0;
+                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
+                if (result) {
+                        rx   = (struct bmx_ctx *) status.context;
+                        peer = rx->mxc_peer;
+                        if (rx->mxc_type == BMX_REQ_TX) {
+                                gen_mutex_lock(&peer->mxp_lock);
+                                qlist_del_init(&rx->mxc_list);
+                                gen_mutex_unlock(&peer->mxp_lock);
+                                gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock);
+                                qlist_add_tail(&bmi_mx->bmx_unex_txs, &rx->mxc_list);
+                                gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock);
+                                result = 0;
+                                again = 1;
+                        }
+                }
+        }
+
         if (result) {
-                struct bmx_ctx  *rx   = (struct bmx_ctx *) status.context;
-                struct bmx_peer *peer = rx->mxc_peer;
-                debug(BMX_DB_CTX, "*** %s completing RX with match 0x%llx for %s",
+                debug(BMX_DB_CTX, "%s completing RX with match 0x%llx for %s",
                                 __func__, llu(rx->mxc_match), peer->mxp_mxmap->mxm_peername);
 
                 ui->error_code = 0;
@@ -2628,11 +2829,6 @@ BMI_mx_testunexpected(int incount __unus
                 rx->mxc_seg.segment_ptr = rx->mxc_buffer;
                 ui->tag = rx->mxc_tag;
 
-                if (!qlist_empty(&rx->mxc_list)) {
-                        gen_mutex_lock(&peer->mxp_lock);
-                        qlist_del_init(&rx->mxc_list);
-                        gen_mutex_unlock(&peer->mxp_lock);
-                }
 #if BMX_LOGGING
                 MPE_Log_event(recvunex_finish, (int) rx->mxc_tag, NULL);
 #endif
@@ -2640,6 +2836,9 @@ BMI_mx_testunexpected(int incount __unus
                 bmx_peer_decref(peer); /* drop the ref taken in unexpected_recv() */
                 *outcount = 1;
         }
+        if (print)
+                debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return 0;
 }
 
@@ -2660,14 +2859,15 @@ bmx_peer_connect(struct bmx_peer *peer)
         int                     ret    = 0;
         uint64_t                nic_id = 0ULL;
         mx_request_t            request;
-        uint64_t                match  = (uint64_t) BMX_MSG_ICON_REQ << 60;
+        uint64_t                match  = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
         struct bmx_method_addr *mxmap  = peer->mxp_mxmap;
 
-        if (bmi_mx->bmx_is_server) {
-                return 1;
-        }
+        debug(BMX_DB_FUNC, "entering %s", __func__);
+
         gen_mutex_lock(&peer->mxp_lock);
         if (peer->mxp_state == BMX_PEER_INIT) {
+                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
+                                   peer->mxp_mxmap->mxm_peername);
                 peer->mxp_state = BMX_PEER_WAIT;
         } else {
                 gen_mutex_unlock(&peer->mxp_lock);
@@ -2693,7 +2893,8 @@ bmx_peer_connect(struct bmx_peer *peer)
                 }
                 mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
                 /* get our nic_id and ep_id */
-                mx_decompose_endpoint_addr(epa, &nic_id, &bmi_mx->bmx_ep_id);
+                mx_decompose_endpoint_addr2(epa, &nic_id, &bmi_mx->bmx_ep_id,
+                                            &bmi_mx->bmx_sid);
                 /* get our hostname */
                 mx_nic_id_to_hostname(nic_id, host);
                 bmi_mx->bmx_hostname = strdup(host);
@@ -2714,6 +2915,9 @@ bmx_peer_connect(struct bmx_peer *peer)
          * by calling mx_iconnect() w/BMX_MSG_ICON_REQ */
         mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
                     BMX_MAGIC, match, (void *) peer, &request);
+
+        debug(BMX_DB_FUNC, "leaving %s", __func__);
+
         return ret;
 }
 
@@ -2729,10 +2933,11 @@ static struct bmi_method_addr *
 BMI_mx_method_addr_lookup(const char *id)
 {
         int                     ret     = 0;
+        int                     len     = 0;
         char                   *host    = NULL;
         uint32_t                board   = 0;
         uint32_t                ep_id   = 0;
-        struct bmi_method_addr     *map     = NULL;
+        struct bmi_method_addr *map     = NULL;
         struct bmx_method_addr *mxmap   = NULL;
 
         debug(BMX_DB_FUNC, "entering %s", __func__);
@@ -2754,7 +2959,8 @@ BMI_mx_method_addr_lookup(const char *id
                             mxmap->mxm_board == board &&
                             mxmap->mxm_ep_id == ep_id) {
                                 map = peer->mxp_map;
-                                BMX_FREE(host, sizeof(*host));
+                                len = strlen(host);
+                                BMX_FREE(host, len);
                                 break;
                         }
                 }
@@ -2763,7 +2969,7 @@ BMI_mx_method_addr_lookup(const char *id
         
         if (map == NULL) {
                 map = bmx_alloc_method_addr(id, host, board, ep_id);
-                if (bmi_mx != NULL && ! bmi_mx->bmx_is_server) { /* we are a client */
+                if (bmi_mx != NULL) {
                         struct bmx_peer *peer   = NULL;
 
                         mxmap = map->method_data;
@@ -2824,7 +3030,19 @@ BMI_mx_cancel(bmi_op_id_t id, bmi_contex
                 break;
         case BMX_CTX_PENDING:
                 if (ctx->mxc_type == BMX_REQ_TX) {
-                        bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
+                        /* see if it completed first */
+                        mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result);
+                        if (result == 1) {
+                                debug(BMX_DB_CTX, "%s completed TX op_id %llu "
+                                      "mxc_state %d peer state %d status.code %s",
+                                      __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state, 
+                                      peer->mxp_state, mx_strstatus(ctx->mxc_mxstat.code));
+                                bmx_deq_pending_ctx(ctx);
+                                bmx_q_canceled_ctx(ctx,  BMI_ECANCEL);
+                        } else {
+                                /* and if not, then disconnect() */
+                                bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
+                        }
                 } else { /* BMX_REQ_RX */
                         mx_cancel(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &result);
                         if (result == 1) {

Index: mx.h
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/bmi/bmi_mx/mx.h,v
diff -p -u -r1.5 -r1.5.6.1
--- mx.h	4 Oct 2007 13:56:13 -0000	1.5
+++ mx.h	7 Apr 2008 15:07:25 -0000	1.5.6.1
@@ -107,8 +107,7 @@ typedef struct qlist_head list_t;       
 /* BMX [UN]EXPECTED msgs use the 64-bits of the match info as follows:
  *    Bits      Description
  *    60-63     Msg type
- *    56-59     Reserved for credits if implemented
- *    52-55     Reserved
+ *    52-59     Reserved
  *    32-51     Peer id (of the sender, assigned by receiver)
  *     0-31     bmi_msg_tag_t
  */
@@ -116,14 +115,20 @@ typedef struct qlist_head list_t;       
 /* BMX CONN_[REQ|ACK] msgs use the 64-bits of the match info as follows:
  *    Bits      Description
  *    60-63     Msg type
- *    56-59     Reserved for credits if implemented
- *    52-55     Reserved
+ *    52-59     Reserved
  *    32-51     Peer id (to use when contacting the sender)
  *     0-31     Version
  */
 
-#define BMX_MAX_PEER_ID ((1<<20) - 1)     /* 20 bits - actually 1,048,574 peers
+#define BMX_MSG_SHIFT   60
+#define BMX_ID_SHIFT    32
+#define BMX_MASK_ALL    (~0ULL)
+#define BMX_MASK_MSG    (0xFULL << BMX_MSG_SHIFT)
+
+#define BMX_MAX_PEER_ID ((1<<20) - 1)   /* 20 bits - actually 1,048,574 peers
                                            1 to 1,048,575 */
+#define BMX_MAX_TAG     (~0U)            /* 32 bits */
+
 
 #define BMX_TIMEOUT     (20 * 1000)       /* msg timeout in milliseconds */
 
@@ -146,6 +151,7 @@ struct bmx_data
         uint32_t            bmx_board;          /* my MX board index */
         uint32_t            bmx_ep_id;          /* my MX endpoint ID */
         mx_endpoint_t       bmx_ep;             /* my MX endpoint */
+        uint32_t            bmx_sid;            /* my MX session id */
         int                 bmx_is_server;      /* am I a server? */
 
         list_t              bmx_peers;          /* list of all peers */
@@ -162,6 +168,11 @@ struct bmx_data
         list_t              bmx_canceled;       /* canceled reqs waiting for test */
         gen_mutex_t         bmx_canceled_lock;  /* canceled list lock */
 
+        list_t              bmx_unex_txs;       /* completed unexpected sends */
+        gen_mutex_t         bmx_unex_txs_lock;  /* completed unexpected sends lock */
+        list_t              bmx_unex_rxs;       /* completed unexpected recvs */
+        gen_mutex_t         bmx_unex_rxs_lock;  /* completed unexpected recvs lock */
+
         uint32_t            bmx_next_id;        /* for the next peer_id */
         gen_mutex_t         bmx_lock;           /* global lock - use for global rxs,
                                                    global txs, next_id, etc. */
@@ -190,20 +201,21 @@ enum bmx_peer_state {
 
 struct bmx_method_addr
 {
-        struct method_addr *mxm_map;        /* peer's method_addr */
-        const char         *mxm_peername;   /* mx://hostname/board/ep_id  */
-        const char         *mxm_hostname;   /* peer's hostname */
-        uint32_t            mxm_board;      /* peer's MX board index */
-        uint32_t            mxm_ep_id;      /* peer's MX endpoint ID */
-        struct bmx_peer    *mxm_peer;       /* peer pointer */
+        struct bmi_method_addr  *mxm_map;        /* peer's bmi_method_addrt */
+        const char              *mxm_peername;   /* mx://hostname/board/ep_id  */
+        const char              *mxm_hostname;   /* peer's hostname */
+        uint32_t                 mxm_board;      /* peer's MX board index */
+        uint32_t                 mxm_ep_id;      /* peer's MX endpoint ID */
+        struct bmx_peer         *mxm_peer;       /* peer pointer */
 };
 
 struct bmx_peer
 {
-        struct method_addr     *mxp_map;        /* his method_addr * */
+        struct bmi_method_addr *mxp_map;        /* his bmi_method_addr * */
         struct bmx_method_addr *mxp_mxmap;      /* his bmx_method_addr */
         uint64_t                mxp_nic_id;     /* his NIC id */
         mx_endpoint_addr_t      mxp_epa;        /* his MX endpoint address */
+        uint32_t                mxp_sid;        /* his MX session id */
         int                     mxp_exist;      /* have we connected before? */
 
         enum bmx_peer_state     mxp_state;      /* INIT, WAIT, READY, DISCONNECT */
@@ -292,7 +304,7 @@ struct bmx_connreq
 
 #if BMX_DEBUG
 /* set the mask to the BMX_DB_* errors that you want gossip to report */
-#define BMX_DB_MASK (BMX_DB_ERR|BMX_DB_WARN)
+#define BMX_DB_MASK (BMX_DB_ERR|BMX_DB_WARN|BMX_DB_ALL)
 #define debug(lvl,fmt,args...)                                                 \
   do {                                                                         \
       if (lvl & BMX_DB_MASK) {                                                 \



More information about the Pvfs2-cvs mailing list