[Pvfs2-cvs] commit by kunkel in pvfs2/src/io/bmi/bmi_ib: README mem.c module.mk.in openib.c ib.h ib.c vapi.c util.c

CVS commit program cvs at parl.clemson.edu
Sat Feb 17 06:16:35 EST 2007


Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib
In directory parlweb1:/tmp/cvs-serv2872/src/io/bmi/bmi_ib

Modified Files:
      Tag: kunkel-migration-branch
	README mem.c module.mk.in openib.c ib.h ib.c vapi.c util.c 
Log Message:
Update migration branch to current CVS version


Index: README
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/README,v
diff -p -u -r1.5 -r1.5.10.1
--- README	30 May 2006 20:24:57 -0000	1.5
+++ README	17 Feb 2007 11:16:32 -0000	1.5.10.1
@@ -2,7 +2,7 @@ Notes on the BMI InfiniBand implementati
 
 Copyright (C) 2003-6 Pete Wyckoff <pw at osc.edu>
 
-$Id: README,v 1.5 2006/05/30 20:24:57 pw Exp $
+$Id: README,v 1.5.10.1 2007/02/17 11:16:32 kunkel Exp $
 
 InifiniBand programming interface
 ---------------------------------
@@ -36,15 +36,9 @@ will listen on the given TCP port number
 IB hardware address info, then drop that connection and use only IB for all
 future communication.
 
-Between each pair of hosts are two connected queue pairs (QPs).  One for
-sending data:  SEND/RECEIVE with notification and RDMA write without
-notification.  The second is only used for zero-byte acknowledgement packets.
-Since there is no receive-side matching at the NIC, the second ack-only
-QP is used to allow posting receive descriptors with no memory to avoid having
-to flow-control acks.  Eager receive descriptors are posted immediately after
-having been emptied, before sending an acknowledgement.  Ack descriptors are
-only posted when the sender knows it will soon send a message that must be
-acked.
+Between each pair of hosts is one connected queue pair (QP).  Send and
+receive are used, as well as RDMA write.  No atomics or RDMA read are used.
+The immediate data feature of Infiniband is not used.
 
 
 Buffer management
@@ -53,13 +47,11 @@ Since BMI permits sends to occur without
 InfiniBand does not allow this, we must manage a queue of preposted
 buffers for each possible sender.  We allocate some number of fixed
 size receive buffers per sender, and also have the same number of send
-buffers dedicated to that sender.  These are matched for flow control
-so that we know how many receive buffers are available at a potential
-receiver by looking at our allocated send buffers dedicated to that receiver.
-The receiver explicitly acknowledges buffers after finishing with
-the contents.  Note that the bh->num values will get out of order although
-the overall number will match, implying we always ack _his_ bh->num, not
-ours.  This is a bit tied up with the protocol, described below.
+buffers dedicated to that sender.  The receiver keeps a count of buffers
+it has processed and reposted, sending this value to the sender on top
+of its own send messages.  If the accumulated credit gets large with
+respect to the number of total buffers, an explicit credit-return message
+is sent.
 
 These eager buffers are shipped back and forth using basic SEND/RECEIVE since
 completion on the receiver is important for the protocol and there is no speed
@@ -72,27 +64,28 @@ gather at the sender, not scatter at the
 might do this with RDMA reads similarly.)
 
 IB completion queue entries have a 64-bit "id" field to store information
-which is retrievable at completion.  For incoming RECEIVE messages, this
-holds a pointer to the buffer head which will lead to the connection and
-some state.  For RDMA write send completions, this holds a pointer to the
-sendq entry.  There is also a 32-bit immediate data which is used only
-in the case of an ACK packet that carries no data but consumes a descriptor
-(managed on the second QP).
+which is retrievable at completion.  For outgoing SEND messages and incoming
+RECEIVE messages, this holds a pointer to the buffer head which will lead to
+the connection and some state.  For RDMA write send completions, there may
+be many outgoing RDMA write operations to satisfy scatter requirements on
+the receiver.  All but the last of these have an id of 0; the last holds a
+pointer to the send work item.
+
 
 State paths
 -----------
-Below are descriptions of how the states progress for the sender
-and receiver for the various possible message types.
+Below are descriptions of how the states progress for the sender and receiver
+for the various possible message types.  Completion events are tracked
+explicitly, even though they do not tell us anything in most cases.
 
 Eager send
 ----------
     SQ_WAITING_BUFFER
-	alloc bh, local tied to remote, so know credit okay
-	post_ack_recv_slot, 0 bytes, just imm data ack
+	credit?
+	alloc bh
 	post_sr
-    SQ_WAITING_EAGER_ACK
-	(wait recv cq event on ack channel)
-	get bh->num from imm_data
+    SQ_WAITING_EAGER_SEND_COMPLETION
+	(wait local send completion)
 	free bh
     SQ_WAITING_USER_TEST
 	wait test
@@ -104,10 +97,9 @@ Eager recv, pre-post recv
 	build recvq
     RQ_WAITING_INCOMING
 	(wait recv cq event)
+	refill credits
 	copy memory to dest
-	mark recv complete
 	re-post_rr
-	post_ack_send imm_data = his bh->num  (no cq event)
     RQ_WAITING_USER_TEST
 	wait test
 	release recvq
@@ -115,17 +107,17 @@ Eager recv, pre-post recv
 Eager recv, non-pre-post recv
 -----------------------------
     (msg arrives)
+	refill credits
 	build recvq
     RQ_EAGER_WAITING_USER_POST
 	(matching user post arrives)
 	copy memory to dest
 	re-post_rr
-	post_ack_send imm_data = his bh->num  (no cq event)
     RQ_WAITING_USER_TEST
 	wait test
 	release recvq
 
-Eager sendunexpected
+Eager send unexpected
 --------------------
 (Same as eager send but different msg header tag tells receiver
 it is unexpected.)
@@ -133,29 +125,54 @@ it is unexpected.)
 Eager recv unexpected
 ---------------------
     (msg arrives)
+	refill credits
 	build recvq
     RQ_EAGER_WAITING_USER_TESTUNEXPECTED
 	(user calls testunexpected)
 	scan recvq looking for this state, no tag matching
-	?? fill in method_unexpected_info, return to user
+	fill in method_unexpected_info
+	copy memory to dest
 	re-post_rr
-	post_ack_send imm_data = his bh->num  (no cq event)
-	?? release recvq entry
+	release recvq entry
 
 RTS send
 --------
     SQ_WAITING_BUFFER
+	credit?
 	alloc bh
-	post_sr mh + mh_rts
+	post_sr mh_rts
+    SQ_WAITING_RTS_SEND_COMPLETION
+
+    (Note: alternate paths here based on adapter CQ processing:  could get
+    a message from the peer before processing our own send to the peer that
+    caused him to send something.)
+	(send cq event)
+	free bh
     SQ_WAITING_CTS
-	(wait recv cq event)
-	free bh used for rts, incoming cts implicitly acked it
+	(recv cq event)
+	refill credits
 	post RDMA to address given in CTS
 	repost rr used to receive cts
-    SQ_WAITING_DATA_LOCAL_SEND_COMPLETE
-	(wait local send cq event for rdma write)
-	ack bufnum from his cts message
+
+    (Path 2)
+	(recv cq event)
+	refill credits
+	post RDMA to address given in CTS
+	repost rr used to receive cts
+    SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS
+	(send cq event)
+	free bh
+
+    SQ_WAITING_DATA_SEND_COMPLETION
+	(local send cq event for rdma write)
+    SQ_WAITING_RTS_DONE_BUFFER
+	credit?
+	alloc bh
+	post_sr mh_rts_done
 	unpin
+    SQ_WAITING_RTS_DONE_SEND_COMPLETION
+	(send cq event)
+	free bh
     SQ_WAITING_USER_TEST
 	wait test
 	release sendq
@@ -165,18 +182,19 @@ RTS recv, pre-post recv
     (user posts)
 	build recvq
     RQ_WAITING_INCOMING
-	(wait recv cq event)
-	match existing rq entry
-	re-post_rr from rts, but remember his bufnum
+	(recv cq event)
+	refill credit
+	re-post_rr from rts
     RQ_RTS_WAITING_CTS_BUFFER
+	credit?
 	alloc bh local for cts
-	    -> if failure state = RQ_RTS_WAITING_CTS_BUFFER
 	pin recv buffer
-	post_ack_recv_slot
-	send cts with his bufnum from his rts to ack it
-    RQ_RTS_WAITING_DATA
-	(wait recv cq event ack)
-	free bh local used for cts
+	send cts
+    RQ_RTS_WAITING_CTS_SEND_COMPLETION
+	(send cq event)
+	free bh
+    RQ_RTS_WAITING_RTS_DONE
+	(wait recv cq event)
 	unpin recv buffer
     RQ_WAITING_USER_TEST
 	(wait user test)
@@ -185,35 +203,23 @@ RTS recv, pre-post recv
 RTS recv, non-pre post
 ----------------------
     (rts arrives on network)
+	refill credit
 	build recvq
 	re-post_rr from rts
-	# ack rts for simplicitly, else must carry this number until post
     RQ_RTS_WAITING_USER_POST
 	(wait user post that matches)
-	alloc bh local for cts
-	pin recv buffer
-	post_ack_recv_slot
-	send cts (with bufnum as above)
-	    -> if failure state = RQ_RTS_WAITING_CTS_BUFFER
-    RQ_RTS_WAITING_DATA ... continue above
+    RQ_RTS_WAITING_CTS_BUFFER  ... continue as in prepost case above
 
 
 Other
 -----
 All QPs are tied to a single CQ for easier polling.
 
-Note that IBA guarantees that WQEs are retired in order for any single QP.  We
-could rely on this to do allocation and deallocation of outgoing buffer
-resources using a producer and consumer pointer rather than a general linked
-list, but that little optimization does not seem worth the risk that this may
-not be true on other networks.
-
-For now all messages are assumed to move atomically, even the big ones,
-since IB performs RDMA write as if it were one operation.
-
 IB guarantees that work requests are _initiated_ in the same order they
 are placed in a given queue (send or receive).  For the receive queue, for
-any mode except RD, work requests _complete_ in the same order too.
+any mode except RD, work requests _complete_ in the same order too.  But
+there is no correspondence between our send work requests, and receives
+that happen as initiated by the peer.
 
 
 BMI interface issues
@@ -232,10 +238,7 @@ For items in *_WAITING_BUFFER, implement
 retire you can use buffers immediately to trigger another send.
 
 Maybe have a separate completion queue distinct from sendq and recvq.
-
-What is the lifetime of a method_addr?  Do I control them all and only
-hand back const pointers?  Must I copy each one when returned from a
-direct call to lookup, or inside an unexpected info structure?
+Some lookups are O(N), like for incoming RTS and receive matching.
 
 On QP allocation failure, probe remote side of existing QPs to see if
 any have become disconnected.  Close those connections, which might
@@ -244,5 +247,11 @@ result from a client crash.
 If client crashes or fails to call BMI_finalize() make sure server does
 the right thing.
 
+Points to test cancellation
+---------------------------
+Kill client at these spots to test server behavior:
+
+    1. pvfs2-ls: just after the send in BMI_ib_post_sendunexpected_list.
+    2. pvfs2-cp unix->pvfs: start of encourage_send_incoming_cts
 
 % vi: set tw=78 :

Index: mem.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/mem.c,v
diff -p -u -r1.7 -r1.7.14.1
--- mem.c	30 May 2006 20:24:57 -0000	1.7
+++ mem.c	17 Feb 2007 11:16:32 -0000	1.7.14.1
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: mem.c,v 1.7 2006/05/30 20:24:57 pw Exp $
+ * $Id: mem.c,v 1.7.14.1 2007/02/17 11:16:32 kunkel Exp $
  */
 #include <src/common/gen-locks/gen-locks.h>
 #include "pvfs2-internal.h"
@@ -21,10 +21,15 @@
  * This internal state structure is allocated when the init function
  * is called.  The device hangs onto it and gives it back to us as
  * needed.
+ *
+ * TODO: Use an rbtree here instead.  Also deregister refcnt==0 regions
+ * when new ones come along that overlap, much like dreg, as an indication
+ * that application buffers have changed.
  */
 typedef struct {
-    list_t list;
+    struct qlist_head list;
     gen_mutex_t mutex;
+    struct qlist_head free_chunk_list;
     void (*mem_register)(memcache_entry_t *c);
     void (*mem_deregister)(memcache_entry_t *c);
 } memcache_device_t;
@@ -58,7 +63,7 @@ memcache_add(memcache_device_t *memcache
 static memcache_entry_t *
 memcache_lookup_cover(memcache_device_t *memcache_device, const void *const buf, bmi_size_t len)
 {
-    list_t *l;
+    struct qlist_head *l;
     const char *end = (const char *) buf + len;
     memcache_entry_t *cbest = 0;
 
@@ -89,7 +94,7 @@ memcache_lookup_cover(memcache_device_t 
 static memcache_entry_t *
 memcache_lookup_exact(memcache_device_t *memcache_device, const void *const buf, bmi_size_t len)
 {
-    list_t *l;
+    struct qlist_head *l;
 
     qlist_for_each(l, &memcache_device->list) {
 	memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
@@ -103,6 +108,12 @@ memcache_lookup_exact(memcache_device_t 
 /*
  * BMI malloc and free routines.  If the region is big enough, pin
  * it now to save time later in the actual send or recv routine.
+ * These are only ever called from PVFS internal functions to allocate
+ * buffers, on the server, or on the client for non-user-supplied
+ * buffers.
+ *
+ * Standard sizes will appear frequently, thus do not free them.  Use
+ * a separate list sorted by sizes that can be used to reuse one.
  */
 void *
 memcache_memalloc(void *md, bmi_size_t len, int eager_limit)
@@ -110,7 +121,30 @@ memcache_memalloc(void *md, bmi_size_t l
     memcache_device_t *memcache_device = md;
     void *buf;
 
+    debug(4, "%s: len %zd limit %d", __func__, len, eager_limit);
+
+    /* search in size cache first */
+#if ENABLE_MEMCACHE
+    if (len > eager_limit) {
+	memcache_entry_t *c;
+	gen_mutex_lock(&memcache_device->mutex);
+	qlist_for_each_entry(c, &memcache_device->free_chunk_list, list) {
+	    if (c->len == len) {
+		debug(4, "%s: recycle free chunk, buf %p", __func__, c->buf);
+		qlist_del(&c->list);
+		qlist_add_tail(&c->list, &memcache_device->list);
+		++c->count;
+		buf = c->buf;
+		gen_mutex_unlock(&memcache_device->mutex);
+		goto out;
+	    }
+	}
+	gen_mutex_unlock(&memcache_device->mutex);
+    }
+#endif
+
     buf = malloc(len);
+
 #if ENABLE_MEMCACHE
     if (bmi_ib_unlikely(!buf))
 	goto out;
@@ -120,16 +154,19 @@ memcache_memalloc(void *md, bmi_size_t l
 	gen_mutex_lock(&memcache_device->mutex);
 	/* could be recycled buffer */
 	c = memcache_lookup_cover(memcache_device, buf, len);
-	if (c)
+	if (c) {
 	    ++c->count;
-	else {
+	    debug(4, "%s: reuse reg, buf %p, count %d", __func__, c->buf,
+	          c->count);
+	} else {
 	    c = memcache_add(memcache_device, buf, len);
 	    if (bmi_ib_unlikely(!c)) {
 		free(buf);
-		buf = 0;
+		buf = NULL;
 	    } else {
-		(memcache_device->mem_register)(c);
+		memcache_device->mem_register(c);
 		++c->count;
+		debug(4, "%s: new reg, buf %p", __func__, c->buf);
 	    }
 	}
 	gen_mutex_unlock(&memcache_device->mutex);
@@ -142,20 +179,24 @@ memcache_memalloc(void *md, bmi_size_t l
 int
 memcache_memfree(void *md, void *buf, bmi_size_t len)
 {
-    memcache_device_t *memcache_device = md;
 #if ENABLE_MEMCACHE
+    memcache_device_t *memcache_device = md;
     memcache_entry_t *c;
-    /* okay if not found, just not cached */
 
     gen_mutex_lock(&memcache_device->mutex);
+    /* okay if not found, just not cached; perhaps an eager-size buffer */
     c = memcache_lookup_exact(memcache_device, buf, len);
     if (c) {
-	debug(6, "%s: found %p len %lld", __func__, c->buf, lld(c->len));
+	debug(4, "%s: cache free buf %p len %lld", __func__, c->buf,
+	      lld(c->len));
 	assert(c->count == 1, "%s: buf %p len %lld count = %d, expected 1",
-	  __func__, c->buf, lld(c->len), c->count);
-	(memcache_device->mem_deregister)(c);
+	       __func__, c->buf, lld(c->len), c->count);
+	/* cache it */
+	--c->count;
 	qlist_del(&c->list);
-	free(c);
+	qlist_add(&c->list, &memcache_device->free_chunk_list);
+	gen_mutex_unlock(&memcache_device->mutex);
+	return 0;
     }
     gen_mutex_unlock(&memcache_device->mutex);
 #endif
@@ -193,7 +234,7 @@ memcache_register(void *md, ib_buflist_t
 	    if (!c)
 		error("%s: no memory for cache entry", __func__);
 	    c->count = 1;
-	    (memcache_device->mem_register)(c);
+	    memcache_device->mem_register(c);
 	}
 	buflist->memcache[i] = c;
 #else
@@ -201,13 +242,41 @@ memcache_register(void *md, ib_buflist_t
 	cp->buf = buflist->buf.recv[i];
 	cp->len = buflist->len[i];
 	cp->type = type;
-	(memcache_device->mem_register)(cp);
+	memcache_device->mem_register(cp);
 	buflist->memcache[i] = cp;
 #endif
     }
     gen_mutex_unlock(&memcache_device->mutex);
 }
 
+/*
+ * Similar to the normal register call, but does not use a buflist,
+ * just adds an entry to the cache for use by later registrations.
+ * Also does not add a refcnt on any entry.
+ */
+void memcache_preregister(void *md, const void *buf, bmi_size_t len,
+                          enum PVFS_io_type rw __unused)
+{
+#if ENABLE_MEMCACHE
+    memcache_device_t *memcache_device = md;
+    memcache_entry_t *c;
+
+    gen_mutex_lock(&memcache_device->mutex);
+    c = memcache_lookup_cover(memcache_device, buf, len);
+    if (c) {
+	debug(2, "%s: hit %p len %lld (via %p len %lld) refcnt now %d",
+	      __func__, buf, lld(len), c->buf, lld(c->len), c->count);
+    } else {
+	debug(2, "%s: miss %p len %lld", __func__, buf, lld(len));
+	c = memcache_add(memcache_device, (void *)(uintptr_t) buf, len);
+	if (!c)
+	    error("%s: no memory for cache entry", __func__);
+	memcache_device->mem_register(c);
+    }
+    gen_mutex_unlock(&memcache_device->mutex);
+#endif
+}
+
 void
 memcache_deregister(void *md, ib_buflist_t *buflist)
 {
@@ -219,11 +288,13 @@ memcache_deregister(void *md, ib_buflist
 #if ENABLE_MEMCACHE
 	memcache_entry_t *c = buflist->memcache[i];
 	--c->count;
-	debug(2, "%s: dec refcount [%d] %p len %lld count now %d", __func__, i,
-	  buflist->buf.send[i], lld(buflist->len[i]), c->count);
+	debug(2,
+	   "%s: dec refcount [%d] %p len %lld (via %p len %lld) refcnt now %d",
+	   __func__, i, buflist->buf.send[i], lld(buflist->len[i]),
+	   c->buf, lld(c->len), c->count);
 	/* let garbage collection do ib_mem_deregister(c) for refcnt==0 */
 #else
-	(memcache_device->mem_deregister)(buflist->memcache[i]);
+	memcache_device->mem_deregister(buflist->memcache[i]);
 	free(buflist->memcache[i]);
 #endif
     }
@@ -242,6 +313,7 @@ void *memcache_init(void (*mem_register)
     memcache_device = Malloc(sizeof(*memcache_device));
     INIT_QLIST_HEAD(&memcache_device->list);
     gen_mutex_init(&memcache_device->mutex);
+    INIT_QLIST_HEAD(&memcache_device->free_chunk_list);
     memcache_device->mem_register = mem_register;
     memcache_device->mem_deregister = mem_deregister;
     return memcache_device;
@@ -252,14 +324,19 @@ void *memcache_init(void (*mem_register)
  */
 void memcache_shutdown(void *md)
 {
-    list_t *l, *lp;
     memcache_device_t *memcache_device = md;
+    memcache_entry_t *c, *cn;
 
     gen_mutex_lock(&memcache_device->mutex);
-    qlist_for_each_safe(l, lp, &memcache_device->list) {
-	memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
-	(memcache_device->mem_deregister)(c);
+    qlist_for_each_entry_safe(c, cn, &memcache_device->list, list) {
+	memcache_device->mem_deregister(c);
+	qlist_del(&c->list);
+	free(c);
+    }
+    qlist_for_each_entry_safe(c, cn, &memcache_device->free_chunk_list, list) {
+	memcache_device->mem_deregister(c);
 	qlist_del(&c->list);
+	free(c->buf);
 	free(c);
     }
     gen_mutex_unlock(&memcache_device->mutex);

Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/module.mk.in,v
diff -p -u -r1.11 -r1.11.10.1
--- module.mk.in	30 May 2006 20:24:57 -0000	1.11
+++ module.mk.in	17 Feb 2007 11:16:33 -0000	1.11.10.1
@@ -5,7 +5,7 @@
 #
 # See COPYING in top-level directory.
 #
-# $Id: module.mk.in,v 1.11 2006/05/30 20:24:57 pw Exp $
+# $Id: module.mk.in,v 1.11.10.1 2007/02/17 11:16:33 kunkel Exp $
 #
 
 # only do any of this if configure decided to use IB on OpenIB
@@ -39,13 +39,7 @@ SERVERSRC += $(src)
 #
 cflags :=
 ifdef GNUC
-cflags += -fno-common
-cflags += -Wall -W -Wpointer-arith
-cflags += -Wcast-align -Wcast-qual -Wbad-function-cast
-cflags += -Wmissing-prototypes -Wmissing-declarations
-cflags += -Wnested-externs
-cflags += -Wshadow -Wstrict-prototypes -Wredundant-decls
-cflags += -Wundef -Wwrite-strings
+cflags += -W -Wcast-qual -Wshadow -Wwrite-strings
 endif
 
 #

Index: openib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/openib.c,v
diff -p -u -r1.5 -r1.5.4.1
--- openib.c	1 Sep 2006 15:33:35 -0000	1.5
+++ openib.c	17 Feb 2007 11:16:33 -0000	1.5.4.1
@@ -6,17 +6,23 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: openib.c,v 1.5 2006/09/01 15:33:35 pw Exp $
+ * $Id: openib.c,v 1.5.4.1 2007/02/17 11:16:33 kunkel Exp $
  */
 #include <string.h>
 #include <errno.h>
+#include <unistd.h>
 #include <fcntl.h>
 #define __PINT_REQPROTO_ENCODE_FUNCS_C  /* include definitions */
 #include <src/io/bmi/bmi-byteswap.h>  /* bmitoh64 */
 #include <src/common/misc/pvfs2-internal.h>  /* llu */
-
 #include <infiniband/verbs.h>
 
+#ifdef HAVE_VALGRIND
+#include <memcheck.h>
+#else
+#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
+#endif
+
 #include "ib.h"
 
 /*
@@ -51,8 +57,6 @@ struct openib_device_priv {
      */
     unsigned int num_unsignaled_sends;
     unsigned int max_unsignaled_sends;
-    unsigned int num_ack_unsignaled_sends;
-    unsigned int max_ack_unsignaled_sends;
 };
 
 /*
@@ -61,18 +65,14 @@ struct openib_device_priv {
 struct openib_connection_priv {
     /* ibv local params */
     struct ibv_qp *qp;
-    struct ibv_qp *qp_ack;
     struct ibv_mr *eager_send_mr;
     struct ibv_mr *eager_recv_mr;
     /* in the past, did this per-connection, not per-device
-     *
      * unsigned int num_unsignaled_wr;
-     * unsigned int num_unsignaled_wr_ack;
      */
     /* ib remote params */
     uint16_t remote_lid;
     uint32_t remote_qp_num;
-    uint32_t remote_qp_ack_num;
 };
 
 /* NOTE:  You have to be sure that ib_uverbs.ko is loaded, otherwise it
@@ -89,7 +89,7 @@ static int exchange_data(int sock, int i
                          size_t len);
 static void init_connection_modify_qp(struct ibv_qp *qp,
                                       uint32_t remote_qp_num, int remote_lid);
-static void openib_post_rr(const ib_connection_t *c, buf_head_t *bh);
+static void openib_post_rr(const ib_connection_t *c, struct buf_head *bh);
 int openib_ib_initialize(void);
 static void openib_ib_finalize(void);
 
@@ -101,17 +101,19 @@ static int openib_new_connection(ib_conn
     struct openib_connection_priv *oc;
     struct openib_device_priv *od = ib_device->priv;
     int i, ret;
+    int num_wr;
     size_t len;
     struct ibv_qp_init_attr att;
+
     /*
      * Values passed through TCP to permit IB connection.  These
      * are transformed to appear in network byte order (big endian)
-     * on the network.
+     * on the network.  The lid is pushed up to 32 bits to avoid struct
+     * alignment issues.
      */
     struct {
-	uint16_t lid;
+	uint32_t lid;
 	uint32_t qp_num;
-	uint32_t qp_ack_num;
     } ch_in, ch_out;
 
     /* build new connection/context */
@@ -139,10 +141,11 @@ static int openib_new_connection(ib_conn
     memset(&att, 0, sizeof(att));
     att.send_cq = od->nic_cq;
     att.recv_cq = od->nic_cq;
-    att.cap.max_recv_wr = ib_device->eager_buf_num + 50;  /* plus some rdmaw */
-    if ((int) att.cap.max_recv_wr > od->nic_max_wr)
-	att.cap.max_recv_wr = od->nic_max_wr;
-    att.cap.max_send_wr = att.cap.max_recv_wr;
+    num_wr = ib_device->eager_buf_num + 50;  /* plus some rdmaw */
+    if (num_wr > od->nic_max_wr)
+	num_wr = od->nic_max_wr;
+    att.cap.max_recv_wr = num_wr;
+    att.cap.max_send_wr = num_wr;
     att.cap.max_recv_sge = 16;
     att.cap.max_send_sge = 16;
     if ((int) att.cap.max_recv_sge > od->nic_max_sge) {
@@ -154,6 +157,8 @@ static int openib_new_connection(ib_conn
     oc->qp = ibv_create_qp(od->nic_pd, &att);
     if (!oc->qp)
 	error("%s: create QP", __func__);
+    VALGRIND_MAKE_MEM_DEFINED(&att, sizeof(att));
+    VALGRIND_MAKE_MEM_DEFINED(&oc->qp->qp_num, sizeof(oc->qp->qp_num));
 
     /* compare the caps that came back against what we already have */
     if (od->sg_max_len == 0) {
@@ -177,47 +182,29 @@ static int openib_new_connection(ib_conn
 	    error("%s: new connection has smaller max_send_wr, %d vs %d",
 	          __func__, att.cap.max_send_wr, od->max_unsignaled_sends);
 
-    /* create the ack queue pair */
-    memset(&att, 0, sizeof(att));
-    att.send_cq = od->nic_cq;
-    att.recv_cq = od->nic_cq;
-    att.cap.max_recv_wr = ib_device->eager_buf_num + 10;  /* and some extra */
-    if ((int) att.cap.max_recv_wr > od->nic_max_wr)
-	att.cap.max_recv_wr = od->nic_max_wr;
-    att.cap.max_send_wr = att.cap.max_recv_wr;
-    att.cap.max_recv_sge = 1;
-    att.cap.max_send_sge = 1;
-    att.qp_type = IBV_QPT_RC;
-    oc->qp_ack = ibv_create_qp(od->nic_pd, &att);
-    if (!oc->qp_ack)
-	error("%s: create QP ack", __func__);
-
-    if (od->max_ack_unsignaled_sends == 0)
-	od->max_ack_unsignaled_sends = att.cap.max_send_wr;
-    else
-	if (att.cap.max_send_wr < od->max_ack_unsignaled_sends)
-	    error("%s: new ack connection has smaller max_send_wr, %d vs %d",
-	          __func__, att.cap.max_send_wr, od->max_ack_unsignaled_sends);
+    /* verify we got what we asked for */
+    if ((int) att.cap.max_recv_wr < num_wr)
+	error("%s: asked for %d recv WRs on QP, got %d", __func__, num_wr,
+	      att.cap.max_recv_wr);
+    if ((int) att.cap.max_send_wr < num_wr)
+	error("%s: asked for %d send WRs on QP, got %d", __func__, num_wr,
+	      att.cap.max_send_wr);
 
     /* exchange data, converting info to network order and back */
-    ch_out.lid = htobmi16(od->nic_lid);
+    ch_out.lid = htobmi32(od->nic_lid);
     ch_out.qp_num = htobmi32(oc->qp->qp_num);
-    ch_out.qp_ack_num = htobmi32(oc->qp_ack->qp_num);
 
     ret = exchange_data(sock, is_server, &ch_in, &ch_out, sizeof(ch_in));
     if (ret)
 	goto out;
 
-    oc->remote_lid = bmitoh16(ch_in.lid);
+    oc->remote_lid = bmitoh32(ch_in.lid);
     oc->remote_qp_num = bmitoh32(ch_in.qp_num);
-    oc->remote_qp_ack_num = bmitoh32(ch_in.qp_ack_num);
 
     /* bring the two QPs up to RTR */
     init_connection_modify_qp(oc->qp, oc->remote_qp_num, oc->remote_lid);
-    init_connection_modify_qp(oc->qp_ack, oc->remote_qp_ack_num,
-                              oc->remote_lid);
 
-    /* post initial RRs */
+    /* post initial RRs and RRs for acks */
     for (i=0; i<ib_device->eager_buf_num; i++)
 	openib_post_rr(c, &c->eager_recv_buf_head_contig[i]);
 
@@ -345,8 +332,7 @@ static void init_connection_modify_qp(st
 }
 
 /*
- * Close the QP associated with this connection.  Do not bother draining
- * qp_ack, nothing sending on it anyway.
+ * Close the QP associated with this connection.
  */
 static void openib_drain_qp(ib_connection_t *c)
 {
@@ -366,48 +352,6 @@ static void openib_drain_qp(ib_connectio
 }
 
 /*
- * When a client prepares to exit, it notifies its servers and transitions
- * the QP to drain state, then waits for all messages to finish.
- */
-static void openib_send_bye(ib_connection_t *c)
-{
-    struct openib_connection_priv *oc = c->priv;
-    buf_head_t *bh;
-    int ret;
-    msg_header_common_t mh_common;
-    char *ptr;
-    struct ibv_send_wr *bad_wr;
-    struct ibv_sge sg;
-    struct ibv_send_wr sr;
-
-    bh = qlist_try_del_head(&c->eager_send_buf_free);
-    if (!bh) {
-	/* if no messages available, let garbage collection on server deal */
-	return;
-    }
-
-    debug(2, "%s: sending bye", __func__);
-
-    ptr = bh->buf;
-    mh_common.type = MSG_BYE;
-    encode_msg_header_common_t(&ptr, &mh_common);
-
-    sg.addr = int64_from_ptr(bh->buf),
-    sg.length = sizeof(mh_common),
-    sg.lkey = oc->eager_send_mr->lkey,
-
-    memset(&sr, 0, sizeof(sr));
-    sr.opcode = IBV_WR_SEND;
-    sr.send_flags = IBV_SEND_SIGNALED;
-    sr.sg_list = &sg;
-    sr.num_sge = 1;
-
-    ret = ibv_post_send(oc->qp, &sr, &bad_wr);
-    if (ret < 0)
-	error("%s: ibv_post_send", __func__);
-}
-
-/*
  * At an explicit BYE message, or at finalize time, shut down a connection.
  * If descriptors are posted, defer and clean up the connection structures
  * later.
@@ -417,12 +361,7 @@ static void openib_close_connection(ib_c
     int ret;
     struct openib_connection_priv *oc = c->priv;
 
-    /* destroy the queue pairs */
-    if (oc->qp_ack) {
-	ret = ibv_destroy_qp(oc->qp_ack);
-	if (ret < 0)
-	    error_xerrno(ret, "%s: ibv_destroy_qp ack", __func__);
-    }
+    /* destroy the queue pair */
     if (oc->qp) {
 	ret = ibv_destroy_qp(oc->qp);
 	if (ret < 0)
@@ -446,11 +385,9 @@ static void openib_close_connection(ib_c
 
 /*
  * Simplify IB interface to post sends.  Not RDMA, just SEND.
- * Called for an eager send, rts send, or cts send.  Local send
- * completion is ignored, except rarely to clear the queue (see comments
- * at post_sr_ack).
+ * Called for an eager send, rts send, or cts send.
  */
-static void openib_post_sr(const buf_head_t *bh, u_int32_t len)
+static void openib_post_sr(const struct buf_head *bh, u_int32_t len)
 {
     ib_connection_t *c = bh->c;
     struct openib_connection_priv *oc = c->priv;
@@ -463,7 +400,7 @@ static void openib_post_sr(const buf_hea
     };
     struct ibv_send_wr sr = {
         .next = NULL,
-        .wr_id = int64_from_ptr(c),
+        .wr_id = int64_from_ptr(bh),
         .sg_list = &sg,
         .num_sge = 1,
         .opcode = IBV_WR_SEND,
@@ -481,13 +418,13 @@ static void openib_post_sr(const buf_hea
 
     ret = ibv_post_send(oc->qp, &sr, &bad_wr);
     if (ret < 0)
-        error("%s: ibv_post_send", __func__);
+        error("%s: ibv_post_send (%d)", __func__, ret);
 }
 
 /*
  * Post one of the eager recv bufs for this connection.
  */
-static void openib_post_rr(const ib_connection_t *c, buf_head_t *bh)
+static void openib_post_rr(const ib_connection_t *c, struct buf_head *bh)
 {
     struct openib_connection_priv *oc = c->priv;
     int ret;
@@ -511,90 +448,13 @@ static void openib_post_rr(const ib_conn
 }
 
 /*
- * Explicitly return a credit.  Immediate data says for which of
- * his buffer numbers does this ack apply.  Buffers will get reposted
- * out of order, although the buffers are always matched pairwise, so we
- * always return _his_ number, not ours.  (Consider this scenario
- *
- *     client                        server
- *        buf 0: send unex eager        buf 0: recv unex eager, no ack
- *        buf 1: send rts                      until app recognizes
- *                                      buf 1: recv rts, ack immediate,
- *                                             repost my buf 1
- *                                      ....
- *                                      app deals with eager, repost
- *                                             my buf 0
- *
- * Now the buffers are posted on the server in a different order.)
- *
- * Don't want to get a local completion from this, but if we don't do
- * so every once in a while, the NIC will fill up apparently.  So we
- * generate one every N - 100, where N =~ 5000, the number asked for
- * at QP build time.
- */
-static void openib_post_sr_ack(ib_connection_t *c, int his_bufnum)
-{
-    struct openib_connection_priv *oc = c->priv;
-    struct openib_device_priv *od = ib_device->priv;
-    int ret;
-    struct ibv_send_wr sr = {
-        .wr_id = int64_from_ptr(c),
-        .opcode = IBV_WR_SEND_WITH_IMM,
-        .send_flags = IBV_SEND_SIGNALED,
-        .imm_data = htobmi32(his_bufnum),
-    };
-    struct ibv_send_wr *bad_wr;
-
-    debug(2, "%s: %s bh %d wr %d/%d", __func__, c->peername, his_bufnum,
-          od->num_unsignaled_sends, od->max_ack_unsignaled_sends);
-
-    if (od->num_ack_unsignaled_sends + 10 == od->max_ack_unsignaled_sends)
-        od->num_ack_unsignaled_sends = 0;
-    else
-        ++od->num_ack_unsignaled_sends;
-
-    ret = ibv_post_send(oc->qp_ack, &sr, &bad_wr);
-    if (ret)
-        error("%s: ibv_post_sr_ack", __func__);
-}
-
-/*
- * Put another receive entry on the list for an ack.  These have no
- * data, so require no local buffers.  Just add a descriptor to the
- * NIC list.  We do keep the .id pointing to the bh which is the originator
- * of the eager (or RTS or whatever) send, just as a consistency check
- * that when the ack comes in, it is for the outgoing message we expected.
- *
- * In the future they could be out-of-order, though, so perhaps that will
- * go away.
- *
- * Could prepost a whole load of these and just replenish them without
- * thinking.
- */
-static void openib_post_rr_ack(const ib_connection_t *c, const buf_head_t * bh)
-{
-    int ret;
-    struct openib_connection_priv *oc = c->priv;
-    struct ibv_recv_wr rr = {
-        .wr_id = int64_from_ptr(bh),
-    };
-    struct ibv_recv_wr *bad_rr;
-
-    debug(4, "%s: %s bh %d", __func__, c->peername, bh->num);
-
-    ret = ibv_post_recv(oc->qp_ack, &rr, &bad_rr);
-    if (ret < 0)
-	error("%s: ibv_post_recv_ack", __func__);
-}
-
-/*
  * Called only in response to receipt of a CTS on the sender.  RDMA write
  * the big data to the other side.  A bit messy since an RDMA write may
  * not scatter to the receiver, but can gather from the sender, and we may
  * have a non-trivial buflist on both sides.  The mh_cts variable length
  * fields must be decoded as we go.
  */
-static void openib_post_sr_rdmaw(ib_send_t *sq, msg_header_cts_t *mh_cts,
+static void openib_post_sr_rdmaw(struct ib_work *sq, msg_header_cts_t *mh_cts,
                                  void *mh_cts_buf)
 {
     ib_connection_t *c = sq->c;
@@ -718,7 +578,7 @@ static void openib_post_sr_rdmaw(ib_send
 
         ret = ibv_post_send(oc->qp, &sr, &bad_wr);
         if (ret < 0)
-            error("%s: ibv_post_send", __func__);
+            error("%s: ibv_post_send (%d)", __func__, ret);
     }
 
 #if MEMCACHE_BOUNCEBUF
@@ -734,7 +594,7 @@ static int openib_check_cq(struct bmi_ib
 
     ret = ibv_poll_cq(od->nic_cq, 1, &desc);
     if (ret < 0)
-	error_xerrno(ret, "%s: ibv_poll_cq", __func__);
+	error("%s: ibv_poll_cq (%d)", __func__, ret);
     if (ret == 0) {  /* empty */
 	return 0;
     }
@@ -743,7 +603,6 @@ static int openib_check_cq(struct bmi_ib
     wc->id = desc.wr_id;
     wc->status = desc.status;
     wc->byte_len = desc.byte_len;
-    wc->imm_data = desc.imm_data;  /* data appears in network order */
     if (desc.opcode == IBV_WC_SEND)
 	wc->opcode = BMI_IB_OP_SEND;
     else if (desc.opcode == (IBV_WC_SEND | IBV_WC_RECV))
@@ -762,11 +621,12 @@ static int openib_check_cq(struct bmi_ib
 	      __func__, desc.sl, desc.dlid_path_bits);
 	error("%s: unknown opcode %d", __func__, desc.opcode);
     }
+    VALGRIND_MAKE_MEM_DEFINED(wc, sizeof(*wc));
 
     return 1;
 }
 
-static int openib_prepare_cq_block(void)
+static void openib_prepare_cq_block(int *cq_fd, int *async_fd)
 {
     struct openib_device_priv *od = ib_device->priv;
     int ret;
@@ -777,7 +637,25 @@ static int openib_prepare_cq_block(void)
 	error_xerrno(ret, "%s: ibv_req_notify_cq", __func__);
 
     /* return the fd that can be fed to poll() */
-    return od->channel->fd;
+    *cq_fd = od->channel->fd;
+    *async_fd = od->ctx->async_fd;
+}
+
+/*
+ * As poll says there is something to read, get the event, but
+ * ignore the contents as we only have one CQ.  But ack it
+ * so that the count is correct and the CQ can be shutdown later.
+ */
+static void openib_ack_cq_completion_event(void)
+{
+    struct openib_device_priv *od = ib_device->priv;
+    struct ibv_cq *cq;
+    void *cq_context;
+    int ret;
+
+    ret = ibv_get_cq_event(od->channel, &cq, &cq_context);
+    if (ret == 0)
+	ibv_ack_cq_events(cq, 1);
 }
 
 /*
@@ -980,6 +858,7 @@ int openib_ib_initialize(void)
 	warning("%s: ibv_open_device", __func__);
 	return -ENOSYS;
     }
+    VALGRIND_MAKE_MEM_DEFINED(ctx, sizeof(*ctx));
 
     od = Malloc(sizeof(*od));
     ib_device->priv = od;
@@ -988,16 +867,14 @@ int openib_ib_initialize(void)
     ib_device->func.new_connection = openib_new_connection;
     ib_device->func.close_connection = openib_close_connection;
     ib_device->func.drain_qp = openib_drain_qp;
-    ib_device->func.send_bye = openib_send_bye;
     ib_device->func.ib_initialize = openib_ib_initialize;
     ib_device->func.ib_finalize = openib_ib_finalize;
     ib_device->func.post_sr = openib_post_sr;
     ib_device->func.post_rr = openib_post_rr;
-    ib_device->func.post_sr_ack = openib_post_sr_ack;
-    ib_device->func.post_rr_ack = openib_post_rr_ack;
     ib_device->func.post_sr_rdmaw = openib_post_sr_rdmaw;
     ib_device->func.check_cq = openib_check_cq;
     ib_device->func.prepare_cq_block = openib_prepare_cq_block;
+    ib_device->func.ack_cq_completion_event = openib_ack_cq_completion_event;
     ib_device->func.wc_status_string = openib_wc_status_string;
     ib_device->func.mem_register = openib_mem_register;
     ib_device->func.mem_deregister = openib_mem_deregister;
@@ -1010,6 +887,8 @@ int openib_ib_initialize(void)
     ret = ibv_query_port(od->ctx, od->nic_port, &hca_port);
     if (ret)
 	error_xerrno(ret, "%s: ibv_query_port", __func__);
+    VALGRIND_MAKE_MEM_DEFINED(&hca_port, sizeof(hca_port));
+
     od->nic_lid = hca_port.lid;
 
     if (hca_port.state != IBV_PORT_ACTIVE)
@@ -1020,6 +899,7 @@ int openib_ib_initialize(void)
     ret = ibv_query_device(od->ctx, &hca_cap);
     if (ret)
 	error_xerrno(ret, "%s: ibv_query_device", __func__);
+    VALGRIND_MAKE_MEM_DEFINED(&hca_cap, sizeof(hca_cap));
 
     debug(1, "%s: max %d completion queue entries", __func__, hca_cap.max_cq);
     cqe_num = IBV_NUM_CQ_ENTRIES;
@@ -1047,20 +927,24 @@ int openib_ib_initialize(void)
     if (!od->nic_cq)
 	error("%s: ibv_create_cq failed", __func__);
 
-    /* use non-blocking IO on the async fd */
+    /* use non-blocking IO on the async fd and completion fd */
     flags = fcntl(ctx->async_fd, F_GETFL);
     if (flags < 0)
 	error_errno("%s: get async fd flags", __func__);
     if (fcntl(ctx->async_fd, F_SETFL, flags | O_NONBLOCK) < 0)
 	error_errno("%s: set async fd nonblocking", __func__);
 
+    flags = fcntl(od->channel->fd, F_GETFL);
+    if (flags < 0)
+	error_errno("%s: get completion fd flags", __func__);
+    if (fcntl(od->channel->fd, F_SETFL, flags | O_NONBLOCK) < 0)
+	error_errno("%s: set completion fd nonblocking", __func__);
+
     /* will be set on first connection */
     od->sg_tmp_array = 0;
     od->sg_max_len = 0;
     od->num_unsignaled_sends = 0;
     od->max_unsignaled_sends = 0;
-    od->num_ack_unsignaled_sends = 0;
-    od->max_ack_unsignaled_sends = 0;
 
     return 0;
 }

Index: ib.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.h,v
diff -p -u -r1.20 -r1.20.4.1
--- ib.h	18 Aug 2006 21:27:30 -0000	1.20
+++ ib.h	17 Feb 2007 11:16:33 -0000	1.20.4.1
@@ -5,36 +5,37 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: ib.h,v 1.20 2006/08/18 21:27:30 pw Exp $
+ * $Id: ib.h,v 1.20.4.1 2007/02/17 11:16:33 kunkel Exp $
  */
 #ifndef __ib_h
 #define __ib_h
 
 #include <src/io/bmi/bmi-types.h>
 #include <src/common/quicklist/quicklist.h>
+#include <src/common/gossip/gossip.h>
 
 #ifdef __GNUC__
+/* confuses debugger */
 /* #  define __hidden __attribute__((visibility("hidden"))) */
-#  define __hidden  /* confuses debugger */
+#  define __hidden
 #  define __unused __attribute__((unused))
 #else
 #  define __hidden
 #  define __unused
 #endif
 
-typedef struct qlist_head list_t;  /* easier to type */
-
 /* 20 8kB buffers allocated to each connection for unexpected messages */
 #define DEFAULT_EAGER_BUF_NUM  (20)
 #define DEFAULT_EAGER_BUF_SIZE (8 << 10)
 
-struct S_buf_head;
+struct buf_head;
+
 /*
  * Connection record.  Each machine gets its own set of buffers and
  * an entry in this table.
  */
 typedef struct {
-    list_t list;
+    struct qlist_head list;
 
     /* connection management */
     struct method_addr *remote_map;
@@ -45,15 +46,18 @@ typedef struct {
     void *eager_recv_buf_contig;    /* eager bufs, for short recvs */
 
     /* lists of free bufs */
-    list_t eager_send_buf_free;
-    list_t eager_recv_buf_free;
-    struct S_buf_head *eager_send_buf_head_contig;
-    struct S_buf_head *eager_recv_buf_head_contig;
+    struct qlist_head eager_send_buf_free;
+    struct qlist_head eager_recv_buf_free;
+    struct buf_head *eager_send_buf_head_contig;
+    struct buf_head *eager_recv_buf_head_contig;
 
     int cancelled;  /* was any operation cancelled by BMI */
     int refcnt;  /* sq or rq that need the connection to hang around */
     int closed;  /* closed, but hanging around waiting for zero refcnt */
 
+    int send_credit;    /* free slots on receiver */
+    int return_credit;  /* receive buffers he filled but that we've emptied */
+
     void *priv;
 
 } ib_connection_t;
@@ -62,13 +66,13 @@ typedef struct {
  * List structure of buffer areas, represents one at each local
  * and remote sides.
  */
-typedef struct S_buf_head {
-    list_t list;
-    int num;               /* ordinal index in the alloced buf heads */
-    ib_connection_t *c;    /* owning connection */
-    struct S_ib_send *sq;  /* owning sq or rq */
-    void *buf;             /* actual memory */
-} buf_head_t;
+struct buf_head {
+    struct qlist_head list;
+    int num;             /* ordinal index in the alloced buf heads */
+    ib_connection_t *c;  /* owning connection */
+    struct ib_work *sq;  /* owning sq (usually) or rq */
+    void *buf;           /* actual memory */
+};
 
 /* "private data" part of method_addr */
 typedef struct {
@@ -79,32 +83,39 @@ typedef struct {
 
 /*
  * Names of all the sendq and recvq states and message types, with string
- * arrays for debugging.
+ * arrays for debugging.  These must start at 1, not 0, for name printing.
  */
 typedef enum {
-    SQ_WAITING_BUFFER=1,
-    SQ_WAITING_EAGER_ACK,
+    SQ_WAITING_BUFFER = 1,
+    SQ_WAITING_EAGER_SEND_COMPLETION,
+    SQ_WAITING_RTS_SEND_COMPLETION,
+    SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS,
     SQ_WAITING_CTS,
-    SQ_WAITING_DATA_LOCAL_SEND_COMPLETE,
+    SQ_WAITING_DATA_SEND_COMPLETION,
+    SQ_WAITING_RTS_DONE_BUFFER,
+    SQ_WAITING_RTS_DONE_SEND_COMPLETION,
     SQ_WAITING_USER_TEST,
     SQ_CANCELLED,
 } sq_state_t;
-typedef enum {
+typedef enum {  /* bits *_USER_POST will be ORed */
     RQ_EAGER_WAITING_USER_POST = 0x1,
     RQ_EAGER_WAITING_USER_TESTUNEXPECTED = 0x2,
     RQ_EAGER_WAITING_USER_TEST = 0x4,
     RQ_RTS_WAITING_USER_POST = 0x8,
     RQ_RTS_WAITING_CTS_BUFFER = 0x10,
-    RQ_RTS_WAITING_DATA = 0x20,
-    RQ_RTS_WAITING_USER_TEST = 0x40,
-    RQ_WAITING_INCOMING = 0x80,
-    RQ_CANCELLED = 0x100,
+    RQ_RTS_WAITING_CTS_SEND_COMPLETION = 0x20,
+    RQ_RTS_WAITING_RTS_DONE = 0x40,
+    RQ_RTS_WAITING_USER_TEST = 0x80,
+    RQ_WAITING_INCOMING = 0x100,
+    RQ_CANCELLED = 0x200,
 } rq_state_t;
 typedef enum {
-    MSG_EAGER_SEND=1,
+    MSG_EAGER_SEND = 1,
     MSG_EAGER_SENDUNEXPECTED,
     MSG_RTS,
     MSG_CTS,
+    MSG_RTS_DONE,
+    MSG_CREDIT,
     MSG_BYE,
 } msg_type_t;
 
@@ -116,9 +127,13 @@ typedef struct {
 } name_t;
 static name_t sq_state_names[] = {
     entry(SQ_WAITING_BUFFER),
-    entry(SQ_WAITING_EAGER_ACK),
+    entry(SQ_WAITING_EAGER_SEND_COMPLETION),
+    entry(SQ_WAITING_RTS_SEND_COMPLETION),
+    entry(SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS),
     entry(SQ_WAITING_CTS),
-    entry(SQ_WAITING_DATA_LOCAL_SEND_COMPLETE),
+    entry(SQ_WAITING_DATA_SEND_COMPLETION),
+    entry(SQ_WAITING_RTS_DONE_BUFFER),
+    entry(SQ_WAITING_RTS_DONE_SEND_COMPLETION),
     entry(SQ_WAITING_USER_TEST),
     entry(SQ_CANCELLED),
     { 0, 0 }
@@ -129,7 +144,8 @@ static name_t rq_state_names[] = {
     entry(RQ_EAGER_WAITING_USER_TEST),
     entry(RQ_RTS_WAITING_USER_POST),
     entry(RQ_RTS_WAITING_CTS_BUFFER),
-    entry(RQ_RTS_WAITING_DATA),
+    entry(RQ_RTS_WAITING_CTS_SEND_COMPLETION),
+    entry(RQ_RTS_WAITING_RTS_DONE),
     entry(RQ_RTS_WAITING_USER_TEST),
     entry(RQ_WAITING_INCOMING),
     entry(RQ_CANCELLED),
@@ -140,6 +156,8 @@ static name_t msg_type_names[] = {
     entry(MSG_EAGER_SENDUNEXPECTED),
     entry(MSG_RTS),
     entry(MSG_CTS),
+    entry(MSG_RTS_DONE),
+    entry(MSG_CREDIT),
     entry(MSG_BYE),
     { 0, 0 }
 };
@@ -155,7 +173,7 @@ static name_t msg_type_names[] = {
  * needs a dreg-style consistency check against userspace freeing, though.
  */
 typedef struct {
-    list_t list;
+    struct qlist_head list;
     void *buf;
     bmi_size_t len;
     int count;  /* refcount, usage of this entry */
@@ -185,61 +203,43 @@ typedef struct {
 } ib_buflist_t;
 
 /*
- * Send message record.  There is no EAGER_SENT since we use RD which
- * ensures reliability, so the message is marked complete immediately
- * and removed from the queue.
- */
-typedef struct S_ib_send {
-    list_t list;
-    int type;  /* BMI_SEND */
-    /* pointer back to owning method_op (BMI interface) */
-    struct method_op *mop;
-    sq_state_t state;
-    ib_connection_t *c;
-
-    /* gather list of buffers */
-    ib_buflist_t buflist;
-
-    /* places to hang just one buf when not using _list funcs, avoids
-     * small mallocs in that case but permits use of generic code */
-    const void *buflist_one_buf;
-    bmi_size_t  buflist_one_len;
-
-    int is_unexpected;  /* if user posted this that way */
-    /* bh represents our local buffer that is tied up until completed */
-    buf_head_t *bh;
-    /* his_bufnum is used to tell him what his buffer was that sent to
-     * us, so he can free it up for reuse; only needed to ack a CTS */
-    int his_bufnum;
-    bmi_msg_tag_t bmi_tag;
-} ib_send_t;
-
-/*
- * Receive message record.
+ * Common structure for both ib_send and ib_recv outstanding work items.
  */
-typedef struct {
-    list_t list;
-    int type;  /* BMI_RECV */
-    /* pointer back to owning method_op (BMI interface) */
-    struct method_op *mop;
-    rq_state_t state;
+struct ib_work {
+    struct qlist_head list;
+    int type;  /* BMI_SEND or BMI_RECV */
+    struct method_op *mop;   /* pointer back to owning method_op */
+
     ib_connection_t *c;
 
-    /* scatter list of buffers */
+    /* gather (send) or scatter (recv) list of buffers */
     ib_buflist_t buflist;
 
     /* places to hang just one buf when not using _list funcs, avoids
      * small mallocs in that case but permits use of generic code */
-    void *      buflist_one_buf;
+    union {
+	const void *send;
+	void *recv;
+    } buflist_one_buf;
     bmi_size_t  buflist_one_len;
 
-    /* local and remote buf heads for sending associated cts */
-    buf_head_t *bh, *bhr;
-    u_int64_t rts_mop_id;  /* return tag to give to rts sender */
-    /* return value for test and wait, necessary when not pre-posted */
+    /* bh represents our local buffer for sending, maybe CTS messages
+     * as sent for receive items */
+    struct buf_head *bh;
+
+    /* tag as posted by user, or return value on recvs */
     bmi_msg_tag_t bmi_tag;
-    bmi_size_t actual_len;
-} ib_recv_t;
+
+    /* send or receive state */
+    union {
+	sq_state_t send;
+	rq_state_t recv;
+    } state;
+
+    int is_unexpected;      /* send: if user posted an unexpected message */
+    u_int64_t rts_mop_id;    /* recv: return tag to give to rts sender */
+    bmi_size_t actual_len;   /* recv: could be shorter than posted */
+};
 
 /*
  * Header structure used for various sends.  Make sure these stay fully 64-bit
@@ -249,43 +249,49 @@ typedef struct {
  */
 typedef struct {
     msg_type_t type;
+    u_int32_t credit;  /* return credits */
 } msg_header_common_t;
-endecode_fields_1(msg_header_common_t,
-    enum, type);
+endecode_fields_2(msg_header_common_t,
+    enum, type,
+    uint32_t, credit);
 
+/*
+ * Eager message header, with data following this struct.
+ */
 typedef struct {
-    msg_type_t type;
+    msg_header_common_t c;
     bmi_msg_tag_t bmi_tag;
-    u_int32_t bufnum;  /* sender's bufnum for acknowledgement messages */
     u_int32_t __pad;
 } msg_header_eager_t;
-endecode_fields_3(msg_header_eager_t,
-    enum, type,
+endecode_fields_4(msg_header_eager_t,
+    enum, c.type,
+    uint32_t, c.credit,
     int32_t, bmi_tag,
-    uint32_t, bufnum);
+    uint32_t, __pad);
 
 /*
- * Follows msg_header_t only on MSG_RTS messages.  No bufnum here as
- * the sender remembers via sq->bh and will free upon receipt of CTS.
+ * MSG_RTS instead of MSG_EAGER from sender to receiver for big messages.
  */
 typedef struct {
-    msg_type_t type;
+    msg_header_common_t c;
     bmi_msg_tag_t bmi_tag;
+    u_int32_t __pad;
     u_int64_t mop_id;  /* handle to ease lookup when CTS is delivered */
     u_int64_t tot_len;
 } msg_header_rts_t;
-endecode_fields_4(msg_header_rts_t,
-    enum, type,
+endecode_fields_6(msg_header_rts_t,
+    enum, c.type,
+    uint32_t, c.credit,
     int32_t, bmi_tag,
+    int32_t, __pad,
     uint64_t, mop_id,
     uint64_t, tot_len);
 
 /*
- * Ditto for MSG_CTS.
+ * MSG_CTS from receiver to sender with buffer information.
  */
 typedef struct {
-    msg_type_t type;
-    u_int32_t bufnum;  /* sender's bufnum for acknowledgment messages */
+    msg_header_common_t c;
     u_int64_t rts_mop_id;  /* return id from the RTS */
     u_int64_t buflist_tot_len;
     u_int32_t buflist_num;  /* number of buffers, then lengths to follow */
@@ -298,20 +304,33 @@ typedef struct {
 } msg_header_cts_t;
 #define MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE (8 + 4 + 4)
 endecode_fields_5(msg_header_cts_t,
-    enum, type,
-    uint32_t, bufnum,
+    enum, c.type,
+    uint32_t, c.credit,
     uint64_t, rts_mop_id,
     uint64_t, buflist_tot_len,
     uint32_t, buflist_num);
 
 /*
+ * After RDMA data has been sent, this RTS_DONE message tells the
+ * receiver that the sender has finished.
+ */
+typedef struct {
+    msg_header_common_t c;
+    u_int64_t mop_id;
+} msg_header_rts_done_t;
+endecode_fields_3(msg_header_rts_done_t,
+    enum, c.type,
+    uint32_t, c.credit,
+    uint64_t, mop_id);
+
+
+/*
  * Generic work completion from poll_cq() for both vapi and openib.
  */
 struct bmi_ib_wc {
     uint64_t id;
     int status;  /* opaque, but zero means success */
     uint32_t byte_len;
-    uint32_t imm_data;
     enum { BMI_IB_OP_SEND, BMI_IB_OP_RECV, BMI_IB_OP_RDMA_WRITE } opcode;
 };
 
@@ -322,16 +341,14 @@ struct ib_device_func {
     int (*new_connection)(ib_connection_t *c, int sock, int is_server);
     void (*close_connection)(ib_connection_t *c);
     void (*drain_qp)(ib_connection_t *c);
-    void (*send_bye)(ib_connection_t *c);
     int (*ib_initialize)(void);
     void (*ib_finalize)(void);
-    void (*post_sr)(const buf_head_t *bh, u_int32_t len);
-    void (*post_rr)(const ib_connection_t *c, buf_head_t *bh);
-    void (*post_sr_ack)(ib_connection_t *c, int his_bufnum);
-    void (*post_rr_ack)(const ib_connection_t *c, const buf_head_t *bh);
-    void (*post_sr_rdmaw)(ib_send_t *sq, msg_header_cts_t *mh_cts,
+    void (*post_sr)(const struct buf_head *bh, u_int32_t len);
+    void (*post_rr)(const ib_connection_t *c, struct buf_head *bh);
+    void (*post_sr_rdmaw)(struct ib_work *sq, msg_header_cts_t *mh_cts,
                           void *mh_cts_buf);
-    int (*prepare_cq_block)(void);
+    void (*prepare_cq_block)(int *cq_fd, int *async_fd);
+    void (*ack_cq_completion_event)(void);
     int (*check_cq)(struct bmi_ib_wc *wc);
     const char *(*wc_status_string)(int status);
     void (*mem_register)(memcache_entry_t *c);
@@ -343,10 +360,12 @@ struct ib_device_func {
  * State that applies across all users of the device, built at initialization.
  */
 typedef struct {
-    int listen_sock;  /* TCP sock on whih to listen for new connections */
-    list_t connection; /* list of current connections */
-    list_t sendq;  /* outstanding sent items */
-    list_t recvq;  /* outstanding posted receives (or unexpecteds) */
+    int listen_sock;  /* TCP sock on which to listen for new connections */
+    struct method_addr *listen_addr;  /* and BMI listen address */
+
+    struct qlist_head connection; /* list of current connections */
+    struct qlist_head sendq;  /* outstanding sent items */
+    struct qlist_head recvq;  /* outstanding posted receives (or unexpecteds) */
     void *memcache;  /* opaque structure that holds memory cache state */
 
     /*
@@ -377,6 +396,8 @@ void error_xerrno(int errnum, const char
   __attribute__((noreturn,format(printf,2,3)));
 void warning(const char *fmt, ...) __attribute__((format(printf,1,2)));
 void warning_errno(const char *fmt, ...) __attribute__((format(printf,1,2)));
+void warning_xerrno(int errnum, const char *fmt, ...)
+  __attribute__((format(printf,2,3)));
 void info(const char *fmt, ...) __attribute__((format(printf,1,2)));
 void *Malloc(unsigned long n) __attribute__((malloc));
 void *qlist_del_head(struct qlist_head *list);
@@ -395,6 +416,8 @@ int write_full(int fd, const void *buf, 
 void *memcache_memalloc(void *md, bmi_size_t len, int eager_limit);
 int memcache_memfree(void *md, void *buf, bmi_size_t len);
 void memcache_register(void *md, ib_buflist_t *buflist);
+void memcache_preregister(void *md, const void *buf, bmi_size_t len,
+                          enum PVFS_io_type rw);
 void memcache_deregister(void *md, ib_buflist_t *buflist);
 void *memcache_init(void (*mem_register)(memcache_entry_t *),
                     void (*mem_deregister)(memcache_entry_t *));
@@ -435,13 +458,13 @@ void memcache_shutdown(void *md);
 #define debug(lvl,fmt,args...) \
     do { \
 	if (lvl <= DEBUG_LEVEL) \
-	    info(fmt,##args); \
+	    gossip_debug(GOSSIP_BMI_DEBUG_IB, fmt ".\n", ##args); \
     } while (0)
 #else
 #  define debug(lvl,fmt,...) do { } while (0)
 #endif
 
-#if 1
+#if !defined(NDEBUG)
 #define assert(cond,fmt,args...) \
     do { \
 	if (bmi_ib_unlikely(!(cond))) \

Index: ib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.c,v
diff -p -u -r1.38 -r1.38.4.1
--- ib.c	28 Aug 2006 17:33:11 -0000	1.38
+++ ib.c	17 Feb 2007 11:16:34 -0000	1.38.4.1
@@ -6,7 +6,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: ib.c,v 1.38 2006/08/28 17:33:11 pw Exp $
+ * $Id: ib.c,v 1.38.4.1 2007/02/17 11:16:34 kunkel Exp $
  */
 #include <stdio.h>
 #include <stdlib.h>
@@ -24,6 +24,13 @@
 #include <src/io/bmi/bmi-method-callback.h>  /* bmi_method_addr_reg_callback */
 #include <src/common/gen-locks/gen-locks.h>  /* gen_mutex_t ... */
 #include <src/common/misc/pvfs2-internal.h>
+
+#ifdef HAVE_VALGRIND_H
+#include <memcheck.h>
+#else
+#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
+#endif
+
 #include "ib.h"
 
 static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
@@ -42,16 +49,13 @@ ib_device_t *ib_device __hidden = NULL;
 #define new_connection ib_device->func.new_connection
 #define close_connection ib_device->func.close_connection
 #define drain_qp ib_device->func.drain_qp
-#define send_bye ib_device->func.send_bye
 #define ib_initialize ib_device->func.ib_initialize
 #define ib_finalize ib_device->func.ib_finalize
 #define post_sr ib_device->func.post_sr
-#define post_rr ib_device->func.post_rr
-#define post_sr_ack ib_device->func.post_sr_ack
-#define post_rr_ack ib_device->func.post_rr_ack
 #define post_sr_rdmaw ib_device->func.post_sr_rdmaw
 #define check_cq ib_device->func.check_cq
 #define prepare_cq_block ib_device->func.prepare_cq_block
+#define ack_cq_completion_event ib_device->func.ack_cq_completion_event
 #define wc_status_string ib_device->func.wc_status_string
 #define mem_register ib_device->func.mem_register
 #define mem_deregister ib_device->func.mem_deregister
@@ -73,12 +77,11 @@ static const bmi_size_t reg_send_buflist
 static const bmi_size_t reg_recv_buflist_len = 256 * 1024;
 #endif
 
-static void encourage_send_incoming_cts(buf_head_t *bh, u_int32_t byte_len);
-static void encourage_recv_incoming(ib_connection_t *c, buf_head_t *bh,
+static void encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len);
+static void encourage_recv_incoming(struct buf_head *bh, msg_type_t type,
                                     u_int32_t byte_len);
-static void encourage_recv_incoming_cts_ack(ib_recv_t *rq);
-static int send_cts(ib_recv_t *rq);
-static void maybe_free_connection(ib_connection_t *c);
+static void encourage_rts_done_waiting_buffer(struct ib_work *sq);
+static int send_cts(struct ib_work *rq);
 static void ib_close_connection(ib_connection_t *c);
 #ifndef __PVFS2_SERVER__
 static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
@@ -123,6 +126,8 @@ static int ib_check_cq(void)
 	debug(4, "%s: found something", __func__);
 	++ret;
 	if (wc.status != 0) {
+	    /* opcode is not necessarily valid; only wr_id, status, qp_num,
+	     * and vendor_err can be relied upon */
 	    if (wc.opcode == BMI_IB_OP_SEND) {
 		debug(0, "%s: entry id 0x%llx SEND error %s", __func__,
 		  llu(wc.id), wc_status_string(wc.status));
@@ -143,96 +148,91 @@ static int ib_check_cq(void)
 
 	if (wc.opcode == BMI_IB_OP_RECV) {
 	    /*
-	     * Remote side did a send to us.  Filled one of the receive
-	     * queue descriptors, either message or ack.
+	     * Remote side did a send to us.
 	     */
-	    buf_head_t *bh = ptr_from_int64(wc.id);
+	    msg_header_common_t mh_common;
+	    struct buf_head *bh = ptr_from_int64(wc.id);
+	    char *ptr = bh->buf;
 	    u_int32_t byte_len = wc.byte_len;
 
-	    if (byte_len == 0) {
-		/*
-		 * Acknowledgment message on qp_ack.
-		 */
-		int bufnum = bmitoh32(wc.imm_data);
-		ib_send_t *sq;
-
-		debug(3, "%s: ack message %s my bufnum %d", __func__,
-		      bh->c->peername, bufnum);
-
-		/*
-		 * Do not get the sq from the bh that posted this because
-		 * these do not necessarily come in order, in particular
-		 * there is no explicit ACK for an RTS instead the CTS serves
-		 * as the ACK.  Instead look up the bufnum in the static
-		 * send array.  This sq will actually be an rq if the ack
-		 * is of a CTS.
-		 */
-		sq = bh->c->eager_send_buf_head_contig[bufnum].sq;
-		if (bmi_ib_unlikely(sq->type == BMI_RECV))
-		    /* ack of a CTS sent by the receiver */
-		    encourage_recv_incoming_cts_ack((ib_recv_t *)sq);
-		else {
-		    assert(sq->state == SQ_WAITING_EAGER_ACK,
-		      "%s: unknown send state %s of eager send bh %d"
-		      " received in eager recv bh %d", __func__,
-		      sq_state_name(sq->state), bufnum, bh->num);
-
-		    sq->state = SQ_WAITING_USER_TEST;
-		    qlist_add_tail(&sq->bh->list, &sq->c->eager_send_buf_free);
-
-		    debug(3, "%s: sq %p"
-		      " SQ_WAITING_EAGER_ACK -> SQ_WAITING_USER_TEST",
-		      __func__, sq);
-		}
-
+	    VALGRIND_MAKE_MEM_DEFINED(ptr, byte_len);
+	    decode_msg_header_common_t(&ptr, &mh_common);
+	    bh->c->send_credit += mh_common.credit;
+
+	    debug(2, "%s: recv from %s len %d type %s credit %d",
+		  __func__, bh->c->peername, byte_len,
+		  msg_type_name(mh_common.type), mh_common.credit);
+	    if (mh_common.type == MSG_CTS) {
+		/* incoming CTS messages go to the send engine */
+		encourage_send_incoming_cts(bh, byte_len);
 	    } else {
-		/*
-		 * Some other message: eager send, RTS, CTS, BYE.
-		 */
-		msg_header_common_t mh_common;
-		char *ptr = bh->buf;
-
-		decode_msg_header_common_t(&ptr, &mh_common);
-
-		debug(3, "%s: found len %d at %s my bufnum %d type %s",
-		  __func__, byte_len, bh->c->peername, bh->num,
-		  msg_type_name(mh_common.type));
-		if (mh_common.type == MSG_CTS) {
-		    /* incoming CTS messages go to the send engine */
-		    encourage_send_incoming_cts(bh, byte_len);
-		} else {
-		    /* something for the recv side, no known rq yet */
-		    encourage_recv_incoming(bh->c, bh, byte_len);
-		}
+		/* something for the recv side, no known rq yet */
+		encourage_recv_incoming(bh, mh_common.type, byte_len);
 	    }
 
 	} else if (wc.opcode == BMI_IB_OP_RDMA_WRITE) {
 
 	    /* completion event for the rdma write we initiated, used
 	     * to signal memory unpin etc. */
-	    ib_send_t *sq = ptr_from_int64(wc.id);
+	    struct ib_work *sq = ptr_from_int64(wc.id);
 
-	    debug(3, "%s: sq %p %s", __func__, sq, sq_state_name(sq->state));
+	    debug(3, "%s: sq %p %s", __func__, sq,
+	          sq_state_name(sq->state.send));
 
-	    assert(sq->state == SQ_WAITING_DATA_LOCAL_SEND_COMPLETE,
-	      "%s: wrong send state %s", __func__, sq_state_name(sq->state));
+	    assert(sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION,
+	           "%s: wrong send state %s", __func__,
+		   sq_state_name(sq->state.send));
 
-	    /* ack his cts, signals rdma completed */
-	    post_sr_ack(sq->c, sq->his_bufnum);
+	    sq->state.send = SQ_WAITING_RTS_DONE_BUFFER;
 
 #if !MEMCACHE_BOUNCEBUF
 	    memcache_deregister(ib_device->memcache, &sq->buflist);
 #endif
-	    sq->state = SQ_WAITING_USER_TEST;
+	    debug(2, "%s: sq %p RDMA write done, now %s", __func__, sq,
+	          sq_state_name(sq->state.send));
 
-	    debug(2, "%s: sq %p now %s", __func__, sq,
-	      sq_state_name(sq->state));
+	    encourage_rts_done_waiting_buffer(sq);
 
 	} else if (wc.opcode == BMI_IB_OP_SEND) {
 
-	    /* periodic send queue flush, qp or qp_ack */
-	    debug(2, "%s: send to %s completed locally", __func__,
-	      ((ib_connection_t *) ptr_from_int64(wc.id))->peername);
+	    struct buf_head *bh = ptr_from_int64(wc.id);
+	    struct ib_work *sq = bh->sq;
+
+	    if (sq == NULL) {
+		/* MSG_BYE or MSG_CREDIT */
+		debug(2, "%s: MSG_BYE or MSG_CREDIT completed locally",
+		      __func__);
+	    } else if (sq->type == BMI_SEND) {
+		sq_state_t state = sq->state.send;
+
+		if (state == SQ_WAITING_EAGER_SEND_COMPLETION)
+		    sq->state.send = SQ_WAITING_USER_TEST;
+		else if (state == SQ_WAITING_RTS_SEND_COMPLETION)
+		    sq->state.send = SQ_WAITING_CTS;
+		else if (state == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS)
+		    sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
+		else if (state == SQ_WAITING_RTS_DONE_SEND_COMPLETION)
+		    sq->state.send = SQ_WAITING_USER_TEST;
+		else
+		    assert(0, "%s: unknown send state %s of sq %p",
+		           __func__, sq_state_name(sq->state.send), sq);
+		debug(2, "%s: send to %s completed locally: -> %s",
+		      __func__, bh->c->peername, sq_state_name(sq->state.send));
+
+	    } else {
+		struct ib_work *rq = sq;  /* rename */
+		rq_state_t state = rq->state.recv;
+		
+		if (state == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
+		    rq->state.recv = RQ_RTS_WAITING_RTS_DONE;
+		else
+		    assert(0, "%s: unknown send state %s of rq %p",
+		           __func__, rq_state_name(rq->state.recv), rq);
+		debug(2, "%s: send to %s completed locally: -> %s",
+		      __func__, bh->c->peername, rq_state_name(rq->state.recv));
+	    }
+
+	    qlist_add_tail(&bh->list, &bh->c->eager_send_buf_free);
 
 	} else {
 	    error("%s: cq entry id 0x%llx opcode %d unexpected", __func__,
@@ -243,25 +243,75 @@ static int ib_check_cq(void)
 }
 
 /*
+ * Initialize common header of all messages.
+ */
+static void msg_header_init(msg_header_common_t *mh_common,
+                            ib_connection_t *c, msg_type_t type)
+{
+    mh_common->type = type;
+    mh_common->credit = c->return_credit;
+    c->return_credit = 0;
+}
+
+/*
+ * Grab an empty buf head, if available.
+ */
+static struct buf_head *get_eager_buf(ib_connection_t *c)
+{
+    struct buf_head *bh = NULL;
+
+    if (c->send_credit > 0) {
+	--c->send_credit;
+	bh = qlist_try_del_head(&c->eager_send_buf_free);
+	assert(bh, "%s: empty eager_send_buf_free list, peer %s", __func__,
+	       c->peername);
+    }
+    return bh;
+}
+
+/*
+ * Re-post a receive buffer, possibly returning credit to the peer.
+ */
+static void post_rr(ib_connection_t *c, struct buf_head *bh)
+{
+    ib_device->func.post_rr(c, bh);
+    ++c->return_credit;
+
+    /* if credits are building up, explicitly send them over */
+    if (c->return_credit > ib_device->eager_buf_num - 4) {
+	msg_header_common_t mh_common;
+	char *ptr;
+
+	/* one credit saved back for just this situation, do not check */
+	--c->send_credit;
+	bh = qlist_try_del_head(&c->eager_send_buf_free);
+	assert(bh, "%s: empty eager_send_buf_free list", __func__);
+	bh->sq = NULL;
+	debug(2, "%s: return %d credits to %s", __func__, c->return_credit,
+	      c->peername);
+	msg_header_init(&mh_common, c, MSG_CREDIT);
+	ptr = bh->buf;
+	encode_msg_header_common_t(&ptr, &mh_common);
+	post_sr(bh, sizeof(mh_common));
+    }
+}
+
+/*
  * Push a send message along its next step.  Called internally only.
  */
-static void
-encourage_send_waiting_buffer(ib_send_t *sq)
+static void encourage_send_waiting_buffer(struct ib_work *sq)
 {
-    /*
-     * Must get buffers both locally and remote to do an eager send
-     * or to initiate an RTS.  Maybe pair these two allocations if it
-     * happens frequently.
-     */
-    buf_head_t *bh;
+    struct buf_head *bh;
+    ib_connection_t *c = sq->c;
 
     debug(3, "%s: sq %p", __func__, sq);
-    assert(sq->state == SQ_WAITING_BUFFER, "%s: wrong send state %s",
-      __func__, sq_state_name(sq->state));
+    assert(sq->state.send == SQ_WAITING_BUFFER, "%s: wrong send state %s",
+           __func__, sq_state_name(sq->state.send));
 
-    bh = qlist_try_del_head(&sq->c->eager_send_buf_free);
+    bh = get_eager_buf(c);
     if (!bh) {
-	debug(2, "%s: sq %p no free send buffers", __func__, sq);
+	debug(2, "%s: sq %p no free send buffers to %s", __func__,
+	      sq, c->peername);
 	return;
     }
     sq->bh = bh;
@@ -273,27 +323,22 @@ encourage_send_waiting_buffer(ib_send_t 
 	 */
 	msg_header_eager_t mh_eager;
 	char *ptr = bh->buf;
-	
-	mh_eager.type = sq->is_unexpected
-	  ? MSG_EAGER_SENDUNEXPECTED : MSG_EAGER_SEND;
-	mh_eager.bmi_tag = sq->bmi_tag;
-	mh_eager.bufnum = bh->num;
 
+	msg_header_init(&mh_eager.c, c, sq->is_unexpected
+	                ? MSG_EAGER_SENDUNEXPECTED : MSG_EAGER_SEND);
+	mh_eager.bmi_tag = sq->bmi_tag;
 	encode_msg_header_eager_t(&ptr, &mh_eager);
 
 	memcpy_from_buflist(&sq->buflist,
 	                    (msg_header_eager_t *) bh->buf + 1);
 
-	/* get ready to receive the ack */
-	post_rr_ack(sq->c, bh);
-
 	/* send the message */
 	post_sr(bh, (u_int32_t) (sizeof(mh_eager) + sq->buflist.tot_len));
 
 	/* wait for ack saying remote has received and recycled his buf */
-	sq->state = SQ_WAITING_EAGER_ACK;
-	debug(3, "%s: sq %p sent EAGER now %s", __func__, sq,
-	  sq_state_name(sq->state));
+	sq->state.send = SQ_WAITING_EAGER_SEND_COMPLETION;
+	debug(2, "%s: sq %p sent EAGER len %lld", __func__, sq,
+	      lld(sq->buflist.tot_len));
 
     } else {
 	/*
@@ -303,14 +348,13 @@ encourage_send_waiting_buffer(ib_send_t 
 	msg_header_rts_t mh_rts;
 	char *ptr = bh->buf;
 
-	mh_rts.type = MSG_RTS;
+	msg_header_init(&mh_rts.c, c, MSG_RTS);
 	mh_rts.bmi_tag = sq->bmi_tag;
 	mh_rts.mop_id = sq->mop->op_id;
 	mh_rts.tot_len = sq->buflist.tot_len;
 
 	encode_msg_header_rts_t(&ptr, &mh_rts);
 
-	/* do not expect an ack back from this ever (implicit with CTS) */
 	post_sr(bh, sizeof(mh_rts));
 
 #if MEMCACHE_EARLY_REG
@@ -319,9 +363,9 @@ encourage_send_waiting_buffer(ib_send_t 
 	memcache_register(ib_device->memcache, &sq->buflist);
 #endif
 
-	sq->state = SQ_WAITING_CTS;
-	debug(3, "%s: sq %p sent RTS now %s", __func__, sq,
-	  sq_state_name(sq->state));
+	sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION;
+	debug(2, "%s: sq %p sent RTS mopid %llx len %lld", __func__, sq,
+	      llu(sq->mop->op_id), lld(sq->buflist.tot_len));
     }
 }
 
@@ -330,12 +374,11 @@ encourage_send_waiting_buffer(ib_send_t 
  * from us, and start the real data send.
  */
 static void
-encourage_send_incoming_cts(buf_head_t *bh, u_int32_t byte_len)
+encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len)
 {
     msg_header_cts_t mh_cts;
-    ib_send_t *sq;
+    struct ib_work *sq, *sqt;
     u_int32_t want;
-    list_t *l;
     char *ptr = bh->buf;
 
     decode_msg_header_cts_t(&ptr, &mh_cts);
@@ -345,11 +388,11 @@ encourage_send_incoming_cts(buf_head_t *
      * using the mop_id which was sent during the RTS, now returned to us.
      */
     sq = 0;
-    qlist_for_each(l, &ib_device->sendq) {
-	ib_send_t *sqt = (ib_send_t *) l;
+    qlist_for_each_entry(sqt, &ib_device->sendq, list) {
 	debug(8, "%s: looking for op_id 0x%llx, consider 0x%llx", __func__,
 	  llu(mh_cts.rts_mop_id), llu(sqt->mop->op_id));
-	if (sqt->mop->op_id == (bmi_op_id_t) mh_cts.rts_mop_id) {
+	if (sqt->c == bh->c
+	    && sqt->mop->op_id == (bmi_op_id_t) mh_cts.rts_mop_id) {
 	    sq = sqt;
 	    break;
 	}
@@ -358,65 +401,63 @@ encourage_send_incoming_cts(buf_head_t *
 	error("%s: mop_id %llx in CTS message not found", __func__,
 	  llu(mh_cts.rts_mop_id));
 
-    debug(2, "%s: sq %p %s my bufnum %d his bufnum %d len %u", __func__,
-      sq, sq_state_name(sq->state), bh->num, mh_cts.bufnum, byte_len);
-    assert(sq->state == SQ_WAITING_CTS,
-      "%s: wrong send state %s", __func__, sq_state_name(sq->state));
+    debug(2, "%s: sq %p %s mopid %llx len %u", __func__, sq,
+          sq_state_name(sq->state.send), llu(mh_cts.rts_mop_id), byte_len);
+    assert(sq->state.send == SQ_WAITING_CTS
+           || sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION,
+	   "%s: wrong send state %s", __func__, sq_state_name(sq->state.send));
 
     /* message; cts content; list of buffers, lengths, and keys */
     want = sizeof(mh_cts)
-      + mh_cts.buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
+         + mh_cts.buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
     if (bmi_ib_unlikely(byte_len != want))
 	error("%s: wrong message size for CTS, got %u, want %u", __func__,
-          byte_len, want);
-
-    /* the cts serves as an implicit ack of our rts, free that send buf */
-    qlist_add_tail(&sq->bh->list, &sq->c->eager_send_buf_free);
-
-    /* save the bufnum from his cts for later acking */
-    sq->his_bufnum = mh_cts.bufnum;
+              byte_len, want);
 
     /* start the big tranfser */
     post_sr_rdmaw(sq, &mh_cts, (msg_header_cts_t *) bh->buf + 1);
 
-    /* re-post our recv buf now that we have all the information from CTS,
-     * but don't tell him this until the rdma is complete. */
+    /* re-post our recv buf now that we have all the information from CTS */
     post_rr(sq->c, bh);
 
-    sq->state = SQ_WAITING_DATA_LOCAL_SEND_COMPLETE;
-    debug(2, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state));
+    if (sq->state.send == SQ_WAITING_CTS)
+	sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
+    else
+	sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS;
+    debug(3, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state.send));
 }
 
 
 /*
  * See if anything was preposted that matches this.
  */
-static ib_recv_t *
+static struct ib_work *
 find_matching_recv(rq_state_t statemask, const ib_connection_t *c,
   bmi_msg_tag_t bmi_tag)
 {
-    list_t *l;
+    struct ib_work *rq;
 
-    qlist_for_each(l, &ib_device->recvq) {
-	ib_recv_t *rq = qlist_upcast(l);
-	if ((rq->state & statemask) && rq->c == c && rq->bmi_tag == bmi_tag)
+    qlist_for_each_entry(rq, &ib_device->recvq, list) {
+	if ((rq->state.recv & statemask) && rq->c == c
+	    && rq->bmi_tag == bmi_tag)
 	    return rq;
     }
-    return 0;
+    return NULL;
 }
 
 /*
  * Init a new recvq entry from something that arrived on the wire.
  */
-static ib_recv_t *
-alloc_new_recv(ib_connection_t *c, buf_head_t *bh)
+static struct ib_work *
+alloc_new_recv(ib_connection_t *c, struct buf_head *bh)
 {
-    ib_recv_t *rq = Malloc(sizeof(*rq));
+    struct ib_work *rq = Malloc(sizeof(*rq));
     rq->type = BMI_RECV;
     rq->c = c;
     ++rq->c->refcnt;
     rq->bh = bh;
     rq->mop = 0;  /* until user posts for it */
+    rq->rts_mop_id = 0;
     qlist_add_tail(&rq->list, &ib_device->recvq);
     return rq;
 }
@@ -428,26 +469,22 @@ alloc_new_recv(ib_connection_t *c, buf_h
  * Unexpected receive, either no post or explicit sendunexpected.
  */
 static void
-encourage_recv_incoming(ib_connection_t *c, buf_head_t *bh, u_int32_t byte_len)
+encourage_recv_incoming(struct buf_head *bh, msg_type_t type, u_int32_t byte_len)
 {
-    ib_recv_t *rq;
-    msg_header_common_t mh_common;
+    ib_connection_t *c = bh->c;
+    struct ib_work *rq;
     char *ptr = bh->buf;
 
-    decode_msg_header_common_t(&ptr, &mh_common);
+    debug(4, "%s: incoming msg type %s", __func__, msg_type_name(type));
 
-    debug(4, "%s: incoming msg type %s", __func__,
-          msg_type_name(mh_common.type));
-
-    if (mh_common.type == MSG_EAGER_SEND) {
+    if (type == MSG_EAGER_SEND) {
 
 	msg_header_eager_t mh_eager;
 
 	ptr = bh->buf;
 	decode_msg_header_eager_t(&ptr, &mh_eager);
 
-	debug(2, "%s: recv eager my bufnum %d his bufnum %d len %u", __func__,
-	  bh->num, mh_eager.bufnum, byte_len);
+	debug(2, "%s: recv eager len %u", __func__, byte_len);
 
 	rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_eager.bmi_tag);
 	if (rq) {
@@ -462,11 +499,10 @@ encourage_recv_incoming(ib_connection_t 
 
 	    /* re-post */
 	    post_rr(c, bh);
-	    /* done with buffer, ack to remote */
-	    post_sr_ack(c, mh_eager.bufnum);
-	    rq->state = RQ_EAGER_WAITING_USER_TEST;
+
+	    rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
 	    debug(2, "%s: matched rq %p now %s", __func__, rq,
-	      rq_state_name(rq->state));
+	      rq_state_name(rq->state.recv));
 #if MEMCACHE_EARLY_REG
 	    /* if a big receive was posted but only a small message came
 	     * through, unregister it now */
@@ -481,33 +517,32 @@ encourage_recv_incoming(ib_connection_t 
 	    rq = alloc_new_recv(c, bh);
 	    /* return value for when user does post_recv for this one */
 	    rq->bmi_tag = mh_eager.bmi_tag;
-	    rq->state = RQ_EAGER_WAITING_USER_POST;
+	    rq->state.recv = RQ_EAGER_WAITING_USER_POST;
 	    /* do not repost or ack, keeping bh until user test */
 	    debug(2, "%s: new rq %p now %s", __func__, rq,
-	      rq_state_name(rq->state));
+	      rq_state_name(rq->state.recv));
 	}
 	rq->actual_len = byte_len - sizeof(mh_eager);
 
-    } else if (mh_common.type == MSG_EAGER_SENDUNEXPECTED) {
+    } else if (type == MSG_EAGER_SENDUNEXPECTED) {
 
 	msg_header_eager_t mh_eager;
 
 	ptr = bh->buf;
 	decode_msg_header_eager_t(&ptr, &mh_eager);
 
-	debug(2, "%s: recv eager unexpected my bufnum %d his bufnum %d len %u",
-	  __func__, bh->num, mh_eager.bufnum, byte_len);
+	debug(2, "%s: recv eager unexpected len %u", __func__, byte_len);
 
 	rq = alloc_new_recv(c, bh);
 	/* return values for when user does testunexpected for this one */
 	rq->bmi_tag = mh_eager.bmi_tag;
-	rq->state = RQ_EAGER_WAITING_USER_TESTUNEXPECTED;
+	rq->state.recv = RQ_EAGER_WAITING_USER_TESTUNEXPECTED;
 	rq->actual_len = byte_len - sizeof(mh_eager);
-	/* do not repost or ack, keeping bh until user test */
+	/* do not repost, keeping bh until user test */
 	debug(2, "%s: new rq %p now %s", __func__, rq,
-	  rq_state_name(rq->state));
+	  rq_state_name(rq->state.recv));
 
-    } else if (mh_common.type == MSG_RTS) {
+    } else if (type == MSG_RTS) {
 	/*
 	 * Sender wants to send a big message, initiates rts/cts protocol.
 	 * Has the user posted a matching receive for it yet?
@@ -517,8 +552,8 @@ encourage_recv_incoming(ib_connection_t 
 	ptr = bh->buf;
 	decode_msg_header_rts_t(&ptr, &mh_rts);
 
-	debug(2, "%s: recv RTS my bufnum %d len %u",
-	  __func__, bh->num, byte_len);
+	debug(2, "%s: recv RTS len %lld mopid %llx", __func__,
+	      lld(mh_rts.tot_len), llu(mh_rts.mop_id));
 
 	rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_rts.bmi_tag);
 	if (rq) {
@@ -526,68 +561,137 @@ encourage_recv_incoming(ib_connection_t 
 		error("%s: RTS received %llu too small for buffer %llu",
 		  __func__, llu(mh_rts.tot_len), llu(rq->buflist.tot_len));
 	    }
-	    rq->state = RQ_RTS_WAITING_CTS_BUFFER;
+	    rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
 	    debug(2, "%s: matched rq %p MSG_RTS now %s", __func__, rq,
-	      rq_state_name(rq->state));
+	      rq_state_name(rq->state.recv));
 	} else {
 	    rq = alloc_new_recv(c, bh);
 	    /* return value for when user does post_recv for this one */
 	    rq->bmi_tag = mh_rts.bmi_tag;
-	    rq->state = RQ_RTS_WAITING_USER_POST;
+	    rq->state.recv = RQ_RTS_WAITING_USER_POST;
 	    debug(2, "%s: new rq %p MSG_RTS now %s", __func__, rq,
-	      rq_state_name(rq->state));
+	      rq_state_name(rq->state.recv));
 	}
 	rq->actual_len = mh_rts.tot_len;
 	rq->rts_mop_id = mh_rts.mop_id;
 
-	/* Do not ack his rts, later cts implicitly acks it.
-	 * Done with our buffer though we won't tell him yet. */
 	post_rr(c, bh);
 
-	if (rq->state == RQ_RTS_WAITING_CTS_BUFFER) {
+	if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
 	    int ret;
 	    ret = send_cts(rq);
 	    if (ret == 0)
-		rq->state = RQ_RTS_WAITING_DATA;
+		rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
 	    /* else keep waiting until we can send that cts */
 	}
 
-    } else if (mh_common.type == MSG_BYE) {
+    } else if (type == MSG_RTS_DONE) {
+
+	msg_header_rts_done_t mh_rts_done;
+	struct ib_work *rqt;
+
+	ptr = bh->buf;
+	decode_msg_header_rts_done_t(&ptr, &mh_rts_done);
+
+	debug(2, "%s: recv RTS_DONE mop_id %llx", __func__,
+	      llu(mh_rts_done.mop_id));
+
+	rq = NULL;
+	qlist_for_each_entry(rqt, &ib_device->recvq, list) {
+	    if (rqt->c == c && rqt->rts_mop_id == mh_rts_done.mop_id) {
+		rq = rqt;
+		break;
+	    }
+	}
+
+	assert(rq, "%s: mop_id %llx in RTS_DONE message not found",
+	       __func__, llu(mh_rts_done.mop_id));
+	assert(rq->state.recv == RQ_RTS_WAITING_RTS_DONE,
+	       "%s: RTS_DONE to rq wrong state %s",
+	       __func__, rq_state_name(rq->state.recv));
+
+#if MEMCACHE_BOUNCEBUF
+	memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf,
+	                  rq->buflist.tot_len);
+#else
+	memcache_deregister(ib_device->memcache, &rq->buflist);
+#endif
+
+	post_rr(c, bh);
+
+	rq->state.recv = RQ_RTS_WAITING_USER_TEST;
+
+    } else if (type == MSG_BYE) {
 	/*
 	 * Other side requests connection close.  Do it.
 	 */
-	debug(2, "%s: recv BYE my bufnum %d len %u",
-	  __func__, bh->num, byte_len);
-
+	debug(2, "%s: recv BYE", __func__);
+	post_rr(c, bh);
 	ib_close_connection(c);
 
+    } else if (type == MSG_CREDIT) {
+
+	/* already added the credit in check_cq */
+	debug(2, "%s: recv CREDIT", __func__);
+	post_rr(c, bh);
+
     } else {
-	error("%s: unknown message header type %d my bufnum %d len %u",
-	      __func__, mh_common.type, bh->num, byte_len);
+	error("%s: unknown message header type %d len %u", __func__,
+	      type, byte_len);
     }
 }
 
 /*
- * Data has arrived, we know because we got the ack to the CTS
- * we sent out.  Serves to release remote cts buffer too.
+ * We finished the RDMA write.  Send him a done message.
  */
-static void
-encourage_recv_incoming_cts_ack(ib_recv_t *rq)
+static void encourage_rts_done_waiting_buffer(struct ib_work *sq)
 {
-    debug(2, "%s: rq %p %s", __func__, rq, rq_state_name(rq->state));
-    assert(rq->state == RQ_RTS_WAITING_DATA, "%s: CTS ack to rq wrong state %s",
-      __func__, rq_state_name(rq->state));
+    ib_connection_t *c = sq->c;
+    struct buf_head *bh;
+    char *ptr;
+    msg_header_rts_done_t mh_rts_done;
 
-    /* XXX: should be head for cache, but use tail for debugging */
-    qlist_add_tail(&rq->bh->list, &rq->c->eager_send_buf_free);
-#if MEMCACHE_BOUNCEBUF
-    memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf, rq->buflist.tot_len);
-#else
-    memcache_deregister(ib_device->memcache, &rq->buflist);
-#endif
-    rq->state = RQ_RTS_WAITING_USER_TEST;
+    bh = get_eager_buf(c);
+    if (!bh) {
+	debug(2, "%s: sq %p no free send buffers to %s",
+	      __func__, sq, c->peername);
+	return;
+    }
+    sq->bh = bh;
+    bh->sq = sq;
+    ptr = bh->buf;
+
+    msg_header_init(&mh_rts_done.c, c, MSG_RTS_DONE);
+    mh_rts_done.mop_id = sq->mop->op_id;
+
+    debug(2, "%s: sq %p sent RTS_DONE mopid %llx", __func__,
+          sq, llu(sq->mop->op_id));
 
-    debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state));
+    encode_msg_header_rts_done_t(&ptr, &mh_rts_done);
+
+    post_sr(bh, sizeof(mh_rts_done));
+    sq->state.send = SQ_WAITING_RTS_DONE_SEND_COMPLETION;
+}
+
+static void send_bye(ib_connection_t *c)
+{
+    msg_header_common_t mh_common;
+    struct buf_head *bh;
+    char *ptr;
+
+    debug(2, "%s: sending bye", __func__);
+    bh = get_eager_buf(c);
+    if (!bh) {
+	debug(2, "%s: no free send buffers to %s", __func__, c->peername);
+	/* if no messages available, let garbage collection on server deal */
+	return;
+    }
+    bh->sq = NULL;
+    ptr = bh->buf;
+    msg_header_init(&mh_common, c, MSG_BYE);
+    encode_msg_header_common_t(&ptr, &mh_common);
+
+    post_sr(bh, sizeof(mh_common));
 }
 
 /*
@@ -596,9 +700,10 @@ encourage_recv_incoming_cts_ack(ib_recv_
  * to unpin when done.
  */
 static int
-send_cts(ib_recv_t *rq)
+send_cts(struct ib_work *rq)
 {
-    buf_head_t *bh;
+    ib_connection_t *c = rq->c;
+    struct buf_head *bh;
     msg_header_cts_t mh_cts;
     u_int64_t *bufp;
     u_int32_t *lenp;
@@ -607,17 +712,18 @@ send_cts(ib_recv_t *rq)
     char *ptr;
     int i;
 
-    debug(2, "%s: rq %p from %s opid 0x%llx len %lld",
-      __func__, rq, rq->c->peername, llu(rq->rts_mop_id),
-      lld(rq->buflist.tot_len));
+    debug(2, "%s: rq %p from %s mopid %llx len %lld", __func__,
+          rq, rq->c->peername, llu(rq->rts_mop_id),
+          lld(rq->buflist.tot_len));
 
-    bh = qlist_try_del_head(&rq->c->eager_send_buf_free);
+    bh = get_eager_buf(c);
     if (!bh) {
-	debug(2, "%s: no bh available", __func__);
+	debug(2, "%s: rq %p no free send buffers to %s",
+	      __func__, rq, c->peername);
 	return 1;
     }
     rq->bh = bh;
-    bh->sq = (ib_send_t *) rq;  /* uplink for completion */
+    bh->sq = (struct ib_work *) rq;  /* uplink for completion */
 
 #if MEMCACHE_BOUNCEBUF
     if (reg_recv_buflist.num == 0) {
@@ -640,8 +746,7 @@ send_cts(ib_recv_t *rq)
 #  endif
 #endif
 
-    mh_cts.type = MSG_CTS;
-    mh_cts.bufnum = bh->num;
+    msg_header_init(&mh_cts.c, c, MSG_CTS);
     mh_cts.rts_mop_id = rq->rts_mop_id;
     mh_cts.buflist_tot_len = rq->buflist.tot_len;
     mh_cts.buflist_num = rq->buflist.num;
@@ -655,7 +760,7 @@ send_cts(ib_recv_t *rq)
     keyp = (u_int32_t *)(lenp + rq->buflist.num);
     post_len = (char *)(keyp + rq->buflist.num) - (char *)bh->buf;
     if (post_len > ib_device->eager_buf_size)
-	error("%s: too many (%d) recv buflist entries for buf",  __func__,
+	error("%s: too many (%d) recv buflist entries for buf", __func__,
 	  rq->buflist.num);
     for (i=0; i<rq->buflist.num; i++) {
 	bufp[i] = htobmi64(int64_from_ptr(rq->buflist.buf.recv[i]));
@@ -663,8 +768,6 @@ send_cts(ib_recv_t *rq)
 	keyp[i] = htobmi32(rq->buflist.memcache[i]->memkeys.rkey);
     }
 
-    /* expect an ack for this cts; will come after he does the big RDMA write */
-    post_rr_ack(rq->c, bh);
     /* send the cts */
     post_sr(bh, post_len);
 
@@ -695,15 +798,15 @@ ensure_connected(struct method_addr *rem
 }
 
 /*
- * Used by both send and sendunexpected.
+ * Generic interface for both send and sendunexpected, list and non-list send.
  */
 static int
-generic_post_send(bmi_op_id_t *id, struct method_addr *remote_map,
-  int numbufs, const void *const *buffers, const bmi_size_t *sizes,
-  bmi_size_t total_size, bmi_msg_tag_t tag, void *user_ptr,
-  bmi_context_id context_id, int is_unexpected)
+post_send(bmi_op_id_t *id, struct method_addr *remote_map,
+          int numbufs, const void *const *buffers, const bmi_size_t *sizes,
+          bmi_size_t total_size, bmi_msg_tag_t tag, void *user_ptr,
+          bmi_context_id context_id, int is_unexpected)
 {
-    ib_send_t *sq;
+    struct ib_work *sq;
     struct method_op *mop;
     ib_method_addr_t *ibmap;
     int i;
@@ -718,7 +821,10 @@ generic_post_send(bmi_op_id_t *id, struc
     /* alloc and build new sendq structure */
     sq = Malloc(sizeof(*sq));
     sq->type = BMI_SEND;
-    sq->state = SQ_WAITING_BUFFER;
+    sq->state.send = SQ_WAITING_BUFFER;
+
+    debug(2, "%s: sq %p len %lld peer %s", __func__, sq, (long long) total_size,
+          ibmap->c->peername);
 
     /*
      * For a single buffer, store it inside the sq directly, else save
@@ -727,10 +833,10 @@ generic_post_send(bmi_op_id_t *id, struc
      * a zero in numbufs.
      */
     if (numbufs == 0) {
-	sq->buflist_one_buf = *buffers;
+	sq->buflist_one_buf.send = *buffers;
 	sq->buflist_one_len = *sizes;
 	sq->buflist.num = 1;
-	sq->buflist.buf.send = &sq->buflist_one_buf;
+	sq->buflist.buf.send = &sq->buflist_one_buf.send;
 	sq->buflist.len = &sq->buflist_one_len;
     } else {
 	sq->buflist.num = numbufs;
@@ -783,14 +889,12 @@ generic_post_send(bmi_op_id_t *id, struc
 
 static int
 BMI_ib_post_send(bmi_op_id_t *id, struct method_addr *remote_map,
-  const void *buffer, bmi_size_t size,
-  enum bmi_buffer_type buffer_flag __unused,
-  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
+                 const void *buffer, bmi_size_t total_size,
+                 enum bmi_buffer_type buffer_flag __unused,
+                 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
-    debug(3, "%s: len %d tag %d", __func__, (int) size, tag);
-    /* references here will not be saved after this func returns */
-    return generic_post_send(id, remote_map, 0, &buffer, &size, size,
-      tag, user_ptr, context_id, 0);
+    return post_send(id, remote_map, 0, &buffer, &total_size,
+                     total_size, tag, user_ptr, context_id, 0);
 }
 
 static int
@@ -799,49 +903,44 @@ BMI_ib_post_send_list(bmi_op_id_t *id, s
   bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
   bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
 {
-    debug(2, "%s: listlen %d tag %d", __func__, list_count, tag);
-    if (list_count < 1)
-	error("%s: list count must be positive", __func__);
-    return generic_post_send(id, remote_map, list_count, buffers, sizes,
-      total_size, tag, user_ptr, context_id, 0);
+    return post_send(id, remote_map, list_count, buffers, sizes,
+                     total_size, tag, user_ptr, context_id, 0);
 }
 
 static int
 BMI_ib_post_sendunexpected(bmi_op_id_t *id, struct method_addr *remote_map,
-  const void *buffer, bmi_size_t size,
-  enum bmi_buffer_type buffer_flag __unused,
-  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
+                           const void *buffer, bmi_size_t total_size,
+                           enum bmi_buffer_type buffer_flag __unused,
+                           bmi_msg_tag_t tag, void *user_ptr,
+			   bmi_context_id context_id)
 {
-    debug(2, "%s: len %d tag %d", __func__, (int) size, tag);
-    /* references here will not be saved after this func returns */
-    return generic_post_send(id, remote_map, 0, &buffer, &size, size, tag,
-      user_ptr, context_id, 1);
+    return post_send(id, remote_map, 0, &buffer, &total_size,
+                     total_size, tag, user_ptr, context_id, 1);
 }
 
 static int
 BMI_ib_post_sendunexpected_list(bmi_op_id_t *id, struct method_addr *remote_map,
-  const void *const *buffers, const bmi_size_t *sizes, int list_count,
-  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
-  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
+                                const void *const *buffers,
+				const bmi_size_t *sizes, int list_count,
+                                bmi_size_t total_size,
+				enum bmi_buffer_type buffer_flag __unused,
+                                bmi_msg_tag_t tag, void *user_ptr,
+				bmi_context_id context_id)
 {
-    debug(2, "%s: listlen %d tag %d", __func__, list_count, tag);
-    if (list_count < 1)
-	error("%s: list count must be positive", __func__);
-    /* references here will not be saved after this func returns */
-    return generic_post_send(id, remote_map, list_count, buffers, sizes,
-      total_size, tag, user_ptr, context_id, 1);
+    return post_send(id, remote_map, list_count, buffers, sizes,
+                     total_size, tag, user_ptr, context_id, 1);
 }
 
 /*
  * Used by both recv and recv_list.
  */
 static int
-generic_post_recv(bmi_op_id_t *id, struct method_addr *remote_map,
-  int numbufs, void *const *buffers, const bmi_size_t *sizes,
-  bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
-  void *user_ptr, bmi_context_id context_id)
+post_recv(bmi_op_id_t *id, struct method_addr *remote_map,
+          int numbufs, void *const *buffers, const bmi_size_t *sizes,
+          bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
+          void *user_ptr, bmi_context_id context_id)
 {
-    ib_recv_t *rq;
+    struct ib_work *rq;
     struct method_op *mop;
     ib_method_addr_t *ibmap;
     ib_connection_t *c;
@@ -863,20 +962,20 @@ generic_post_recv(bmi_op_id_t *id, struc
       RQ_EAGER_WAITING_USER_POST | RQ_RTS_WAITING_USER_POST, c, tag);
     if (rq) {
 	debug(2, "%s: rq %p matches %s", __func__, rq,
-	  rq_state_name(rq->state));
+	  rq_state_name(rq->state.recv));
     } else {
 	/* alloc and build new recvq structure */
 	rq = alloc_new_recv(c, NULL);
-	rq->state = RQ_WAITING_INCOMING;
+	rq->state.recv = RQ_WAITING_INCOMING;
 	rq->bmi_tag = tag;
 	debug(2, "%s: new rq %p", __func__, rq);
     }
 
     if (numbufs == 0) {
-	rq->buflist_one_buf = *buffers;
+	rq->buflist_one_buf.recv = *buffers;
 	rq->buflist_one_len = *sizes;
 	rq->buflist.num = 1;
-	rq->buflist.buf.recv = &rq->buflist_one_buf;
+	rq->buflist.buf.recv = &rq->buflist_one_buf.recv;
 	rq->buflist.len = &rq->buflist_one_len;
     } else {
 	rq->buflist.num = numbufs;
@@ -907,7 +1006,7 @@ generic_post_recv(bmi_op_id_t *id, struc
     rq->mop = mop;
 
     /* handle the two "waiting for a local user post" states */
-    if (rq->state == RQ_EAGER_WAITING_USER_POST) {
+    if (rq->state.recv == RQ_EAGER_WAITING_USER_POST) {
 
 	msg_header_eager_t mh_eager;
 	char *ptr = rq->bh->buf;
@@ -915,7 +1014,7 @@ generic_post_recv(bmi_op_id_t *id, struc
 	decode_msg_header_eager_t(&ptr, &mh_eager);
 
 	debug(2, "%s: rq %p state %s finish eager directly", __func__,
-	  rq, rq_state_name(rq->state));
+	  rq, rq_state_name(rq->state.recv));
 	if (rq->actual_len > tot_expected_len) {
 	    error("%s: received %lld matches too-small buffer %lld",
 	      __func__, lld(rq->actual_len), lld(rq->buflist.tot_len));
@@ -927,25 +1026,23 @@ generic_post_recv(bmi_op_id_t *id, struc
 
 	/* re-post */
 	post_rr(rq->c, rq->bh);
-	/* done with buffer, ack to remote */
-	post_sr_ack(rq->c, mh_eager.bufnum);
 
 	/* now just wait for user to test, never do "immediate completion" */
-	rq->state = RQ_EAGER_WAITING_USER_TEST;
+	rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
 	goto out;
 
-    } else if (rq->state == RQ_RTS_WAITING_USER_POST) {
+    } else if (rq->state.recv == RQ_RTS_WAITING_USER_POST) {
 	int sret;
 	debug(2, "%s: rq %p %s send cts", __func__, rq,
-	  rq_state_name(rq->state));
+	  rq_state_name(rq->state.recv));
 	/* try to send, or wait for send buffer space */
-	rq->state = RQ_RTS_WAITING_CTS_BUFFER;
+	rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
 #if MEMCACHE_EARLY_REG
 	memcache_register(ib_device->memcache, &rq->buflist);
 #endif
 	sret = send_cts(rq);
 	if (sret == 0)
-	    rq->state = RQ_RTS_WAITING_DATA;
+	    rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
 	goto out;
     }
 
@@ -967,9 +1064,8 @@ BMI_ib_post_recv(bmi_op_id_t *id, struct
   enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
   bmi_context_id context_id)
 {
-    debug(2, "%s: expected len %d tag %d", __func__, (int) expected_len, tag);
-    return generic_post_recv(id, remote_map, 0, &buffer, &expected_len,
-      expected_len, tag, user_ptr, context_id);
+    return post_recv(id, remote_map, 0, &buffer, &expected_len,
+                     expected_len, tag, user_ptr, context_id);
 }
 
 static int
@@ -979,12 +1075,8 @@ BMI_ib_post_recv_list(bmi_op_id_t *id, s
   enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
   bmi_context_id context_id)
 {
-    debug(2, "%s: tot expected len %d tag %d", __func__,
-      (int) tot_expected_len, tag);
-    if (list_count < 1)
-	error("%s: list count must be positive", __func__);
-    return generic_post_recv(id, remote_map, list_count, buffers, sizes,
-      tot_expected_len, tag, user_ptr, context_id);
+    return post_recv(id, remote_map, list_count, buffers, sizes,
+                     tot_expected_len, tag, user_ptr, context_id);
 }
 
 /*
@@ -992,7 +1084,7 @@ BMI_ib_post_recv_list(bmi_op_id_t *id, s
  * completed.
  */
 static int
-test_sq(ib_send_t *sq, bmi_op_id_t *outid, bmi_error_code_t *err,
+test_sq(struct ib_work *sq, bmi_op_id_t *outid, bmi_error_code_t *err,
   bmi_size_t *size, void **user_ptr, int complete)
 {
     ib_connection_t *c;
@@ -1000,7 +1092,7 @@ test_sq(ib_send_t *sq, bmi_op_id_t *outi
     debug(9, "%s: sq %p outid %p err %p size %p user_ptr %p complete %d",
       __func__, sq, outid, err, size, user_ptr, complete);
 
-    if (sq->state == SQ_WAITING_USER_TEST) {
+    if (sq->state.send == SQ_WAITING_USER_TEST) {
 	if (complete) {
 	    debug(2, "%s: sq %p completed %lld to %s", __func__,
 	      sq, lld(sq->buflist.tot_len), sq->c->peername);
@@ -1015,17 +1107,21 @@ test_sq(ib_send_t *sq, bmi_op_id_t *outi
 	    free(sq->mop);
 	    free(sq);
 	    --c->refcnt;
-	    if (c->closed)
+	    if (c->closed || c->cancelled)
 		ib_close_connection(c);
 	    return 1;
 	}
     /* this state needs help, push it (ideally would be triggered
      * when the resource is freed... XXX */
-    } else if (sq->state == SQ_WAITING_BUFFER) {
+    } else if (sq->state.send == SQ_WAITING_BUFFER) {
 	debug(2, "%s: sq %p %s, encouraging", __func__, sq,
-	  sq_state_name(sq->state));
+	  sq_state_name(sq->state.send));
 	encourage_send_waiting_buffer(sq);
-    } else if (sq->state == SQ_CANCELLED && complete) {
+    } else if (sq->state.send == SQ_WAITING_RTS_DONE_BUFFER) {
+	debug(2, "%s: sq %p %s, encouraging", __func__, sq,
+	  sq_state_name(sq->state.send));
+	encourage_rts_done_waiting_buffer(sq);
+    } else if (sq->state.send == SQ_CANCELLED && complete) {
 	debug(2, "%s: sq %p cancelled", __func__, sq);
 	*outid = sq->mop->op_id;
 	*err = -PVFS_ETIMEDOUT;
@@ -1037,13 +1133,12 @@ test_sq(ib_send_t *sq, bmi_op_id_t *outi
 	free(sq->mop);
 	free(sq);
 	--c->refcnt;
-	maybe_free_connection(c);
-	if (c->closed)
+	if (c->closed || c->cancelled)
 	    ib_close_connection(c);
 	return 1;
     } else {
 	debug(9, "%s: sq %p found, not done, state %s", __func__,
-	  sq, sq_state_name(sq->state));
+	  sq, sq_state_name(sq->state.send));
     }
     return 0;
 }
@@ -1054,7 +1149,7 @@ test_sq(ib_send_t *sq, bmi_op_id_t *outi
  * messages.
  */
 static int
-test_rq(ib_recv_t *rq, bmi_op_id_t *outid, bmi_error_code_t *err,
+test_rq(struct ib_work *rq, bmi_op_id_t *outid, bmi_error_code_t *err,
   bmi_size_t *size, void **user_ptr, int complete)
 {
     ib_connection_t *c;
@@ -1062,8 +1157,8 @@ test_rq(ib_recv_t *rq, bmi_op_id_t *outi
     debug(9, "%s: rq %p outid %p err %p size %p user_ptr %p complete %d",
       __func__, rq, outid, err, size, user_ptr, complete);
 
-    if (rq->state == RQ_EAGER_WAITING_USER_TEST 
-      || rq->state == RQ_RTS_WAITING_USER_TEST) {
+    if (rq->state.recv == RQ_EAGER_WAITING_USER_TEST 
+      || rq->state.recv == RQ_RTS_WAITING_USER_TEST) {
 	if (complete) {
 	    debug(2, "%s: rq %p completed %lld from %s", __func__,
 	      rq, lld(rq->actual_len), rq->c->peername);
@@ -1080,22 +1175,22 @@ test_rq(ib_recv_t *rq, bmi_op_id_t *outi
 	    c = rq->c;
 	    free(rq);
 	    --c->refcnt;
-	    if (c->closed)
+	    if (c->closed || c->cancelled)
 		ib_close_connection(c);
 	    return 1;
 	}
     /* this state needs help, push it (ideally would be triggered
-     * when the resource is freed... XXX */
-    } else if (rq->state == RQ_RTS_WAITING_CTS_BUFFER) {
+     * when the resource is freed...) XXX */
+    } else if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
 	int ret;
 	debug(2, "%s: rq %p %s, encouraging", __func__, rq,
-	  rq_state_name(rq->state));
+	  rq_state_name(rq->state.recv));
 	ret = send_cts(rq);
 	if (ret == 0)
-	    rq->state = RQ_RTS_WAITING_DATA;
+	    rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
 	/* else keep waiting until we can send that cts */
-	debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state));
-    } else if (rq->state == RQ_CANCELLED && complete) {
+	debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state.recv));
+    } else if (rq->state.recv == RQ_CANCELLED && complete) {
 	debug(2, "%s: rq %p cancelled", __func__, rq);
 	*err = -PVFS_ETIMEDOUT;
 	if (rq->mop) {
@@ -1108,14 +1203,13 @@ test_rq(ib_recv_t *rq, bmi_op_id_t *outi
 	qlist_del(&rq->list);
 	c = rq->c;
 	free(rq);
-	maybe_free_connection(c);
 	--c->refcnt;
-	if (c->closed)
+	if (c->closed || c->cancelled)
 	    ib_close_connection(c);
 	return 1;
     } else {
 	debug(9, "%s: rq %p found, not done, state %s", __func__,
-	  rq, rq_state_name(rq->state));
+	  rq, rq_state_name(rq->state.recv));
     }
     return 0;
 }
@@ -1130,7 +1224,7 @@ BMI_ib_test(bmi_op_id_t id, int *outcoun
   bmi_context_id context_id __unused)
 {
     struct method_op *mop;
-    ib_send_t *sq;
+    struct ib_work *sq;
     int n;
 
     gen_mutex_lock(&interface_mutex);
@@ -1144,7 +1238,7 @@ BMI_ib_test(bmi_op_id_t id, int *outcoun
 	    n = 1;
     } else {
 	/* actually a recv */
-	ib_recv_t *rq = mop->method_data;
+	struct ib_work *rq = mop->method_data;
 	if (test_rq(rq, &id, err, size, user_ptr, 1))
 	    n = 1;
     }
@@ -1162,7 +1256,7 @@ static int BMI_ib_testsome(int incount, 
   int max_idle_time __unused, bmi_context_id context_id __unused)
 {
     struct method_op *mop;
-    ib_send_t *sq;
+    struct ib_work *sq;
     bmi_op_id_t tid;
     int i, n;
 
@@ -1183,7 +1277,7 @@ static int BMI_ib_testsome(int incount, 
 	    }
 	} else {
 	    /* actually a recv */
-	    ib_recv_t *rq = mop->method_data;
+	    struct ib_work *rq = mop->method_data;
 	    if (test_rq(rq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
 		index_array[n] = i;
 		++n;
@@ -1198,33 +1292,29 @@ static int BMI_ib_testsome(int incount, 
 }
 
 /*
- * Used by the test functions to block if not much is going on
- * since the timeouts at the BMI job layer are too coarse.
- */
-static struct timeval last_action = { 0, 0 };
-
-/*
  * Test for multiple completions matching a particular user context.
+ * Return 0 if okay, >0 if want another poll soon, negative for error.
  */
 static int
 BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
   bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
   int max_idle_time, bmi_context_id context_id)
 {
-    list_t *l, *lnext;
-    int n, complete;
-    void **up = 0;
+    struct qlist_head *l, *lnext;
+    int n = 0, complete, activity = 0;
+    void **up = NULL;
 
     gen_mutex_lock(&interface_mutex);
-    ib_check_cq();
+
+restart:
+    activity += ib_check_cq();
 
     /*
      * Walk _all_ entries on sq, rq, marking them completed or
      * encouraging them as needed due to resource limitations.
      */
-    n = 0;
     for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
-	ib_send_t *sq = qlist_upcast(l);
+	struct ib_work *sq = qlist_upcast(l);
 	lnext = l->next;
 	/* test them all, even if can't reap them, just to encourage */
 	complete = (sq->mop->context_id == context_id) && (n < incount);
@@ -1234,7 +1324,7 @@ BMI_ib_testcontext(int incount, bmi_op_i
     }
 
     for (l=ib_device->recvq.next; l != &ib_device->recvq; l=lnext) {
-	ib_recv_t *rq = qlist_upcast(l);
+	struct ib_work *rq = qlist_upcast(l);
 	lnext = l->next;
 
 	/* some receives have no mops:  unexpected */
@@ -1248,36 +1338,20 @@ BMI_ib_testcontext(int incount, bmi_op_i
     /* drop lock before blocking on new connections below */
     gen_mutex_unlock(&interface_mutex);
 
-    *outcount = n;
-
-    if (n > 0) {
-	gettimeofday(&last_action, 0);  /* remember this action */
-    } else if (max_idle_time > 0) {
+    if (activity == 0 && n == 0 && max_idle_time > 0) {
 	/*
-	 * Spin for an interval after some activity, then go to a
-	 * blocking interface by using poll() on the IB completion channel
-	 * and TCP listen socket.
+	 * Block if told to from above.
 	 */
-	struct timeval now;
-
-	gettimeofday(&now, 0);
-	timersub(&now, &last_action, &now);
-
-	/* if time since last activity is > 10ms, block */
-	if (now.tv_sec > 0 || now.tv_usec > 10000) {
-	    /* block */
-	    n = ib_block_for_activity(max_idle_time);
-	    if (n)
-		gettimeofday(&last_action, 0);  /* had some action */
-	} else {
-	    /* whee, spin */
-	    /* totally helps on Lee's old 2.4.21 machine, but may cause
-	     * big delays on modern kernels; do not make default */
-	    /* sched_yield(); */
-	    ;
+	debug(8, "%s: last activity too long ago, blocking", __func__);
+	activity = ib_block_for_activity(max_idle_time);
+	if (activity == 1) {   /* IB action, go do it immediately */
+	    gen_mutex_lock(&interface_mutex);
+	    goto restart;
 	}
     }
-    return 0;
+
+    *outcount = n;
+    return activity + n;
 }
 
 /*
@@ -1285,32 +1359,34 @@ BMI_ib_testcontext(int incount, bmi_op_i
  * This is also where we check for new connections on the TCP socket, since
  * those would show up as unexpected the first time anything is sent.
  * Return 0 for success, or -1 for failure; number of things in *outcount.
+ * Return >0 if want another poll soon.
  */
 static int
 BMI_ib_testunexpected(int incount __unused, int *outcount,
-  struct method_unexpected_info *ui, int max_idle_time __unused)
+  struct method_unexpected_info *ui, int max_idle_time)
 {
-    int num_action;
-    list_t *l;
+    struct qlist_head *l;
+    int activity = 0, n;
 
     gen_mutex_lock(&interface_mutex);
 
     /* Check CQ, then look for the first unexpected message.  */
-    num_action = ib_check_cq();
+restart:
+    activity += ib_check_cq();
 
-    *outcount = 0;
+    n = 0;
     qlist_for_each(l, &ib_device->recvq) {
-	ib_recv_t *rq = qlist_upcast(l);
-	if (rq->state == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
+	struct ib_work *rq = qlist_upcast(l);
+	if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
 	    msg_header_eager_t mh_eager;
 	    char *ptr = rq->bh->buf;
-	    ib_connection_t *c;
+	    ib_connection_t *c = rq->c;
 
 	    decode_msg_header_eager_t(&ptr, &mh_eager);
 
 	    debug(2, "%s: found waiting testunexpected", __func__);
 	    ui->error_code = 0;
-	    ui->addr = rq->c->remote_map;  /* hand back permanent method_addr */
+	    ui->addr = c->remote_map;  /* hand back permanent method_addr */
 	    ui->buffer = Malloc((unsigned long) rq->actual_len);
 	    ui->size = rq->actual_len;
 	    memcpy(ui->buffer,
@@ -1318,32 +1394,34 @@ BMI_ib_testunexpected(int incount __unus
 	           (size_t) ui->size);
 	    ui->tag = rq->bmi_tag;
 	    /* re-post the buffer in which it was sitting, just unexpecteds */
-	    post_rr(rq->c, rq->bh);
-	    /* freed our eager buffer, ack it */
-	    post_sr_ack(rq->c, mh_eager.bufnum);
-	    *outcount = 1;
+	    post_rr(c, rq->bh);
+	    n = 1;
 	    qlist_del(&rq->list);
-	    c = rq->c;
 	    free(rq);
 	    --c->refcnt;
-	    if (c->closed)
+	    if (c->closed || c->cancelled)
 		ib_close_connection(c);
 	    goto out;
 	}
     }
 
-    /* check for new incoming connections */
-    num_action += ib_tcp_server_check_new_connections();
-
-    /* look for async events on the IB port */
-    num_action += check_async_events();
-
-    if (num_action)
-	gettimeofday(&last_action, 0);
-
   out:
     gen_mutex_unlock(&interface_mutex);
-    return 0;
+
+    if (activity == 0 && n == 0 && max_idle_time > 0) {
+	/*
+	 * Block if told to from above, also polls TCP listening socket.
+	 */
+	debug(8, "%s: last activity too long ago, blocking", __func__);
+	activity = ib_block_for_activity(max_idle_time);
+	if (activity == 1) {   /* IB action, go do it immediately */
+	    gen_mutex_lock(&interface_mutex);
+	    goto restart;
+	}
+    }
+
+    *outcount = n;
+    return activity + n;
 }
 
 /*
@@ -1369,7 +1447,7 @@ static int
 BMI_ib_cancel(bmi_op_id_t id, bmi_context_id context_id __unused)
 {
     struct method_op *mop;
-    ib_send_t *tsq;
+    struct ib_work *tsq;
     ib_connection_t *c = 0;
 
     gen_mutex_lock(&interface_mutex);
@@ -1382,13 +1460,13 @@ BMI_ib_cancel(bmi_op_id_t id, bmi_contex
 	 * tested later.  Any others trigger full shutdown of the
 	 * connection.
 	 */
-	if (tsq->state != SQ_WAITING_USER_TEST)
+	if (tsq->state.send != SQ_WAITING_USER_TEST)
 	    c = tsq->c;
     } else {
 	/* actually a recv */
-	ib_recv_t *rq = mop->method_data;
-	if (!(rq->state == RQ_EAGER_WAITING_USER_TEST 
-	   || rq->state == RQ_RTS_WAITING_USER_TEST))
+	struct ib_work *rq = mop->method_data;
+	if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST 
+	   || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
 	    c = rq->c;
     }
 
@@ -1399,43 +1477,43 @@ BMI_ib_cancel(bmi_op_id_t id, bmi_contex
 	 * anyway.  Do not close the connection until all the sq/rq on it have
 	 * gone away.
 	 */
-	list_t *l;
+	struct qlist_head *l;
 
 	c->cancelled = 1;
 	drain_qp(c);
 	qlist_for_each(l, &ib_device->sendq) {
-	    ib_send_t *sq = qlist_upcast(l);
+	    struct ib_work *sq = qlist_upcast(l);
 	    if (sq->c != c) continue;
 #if !MEMCACHE_BOUNCEBUF
-	    if (sq->state == SQ_WAITING_DATA_LOCAL_SEND_COMPLETE)
+	    if (sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION)
 		memcache_deregister(ib_device->memcache, &sq->buflist);
 #  if MEMCACHE_EARLY_REG
 	    /* pin when sending rts, so also must dereg in this state */
-	    if (sq->state == SQ_WAITING_CTS)
+	    if (sq->state.send == SQ_WAITING_CTS)
 		memcache_deregister(ib_device->memcache, &sq->buflist);
 #  endif
 #endif
-	    if (sq->state != SQ_WAITING_USER_TEST)
-		sq->state = SQ_CANCELLED;
+	    if (sq->state.send != SQ_WAITING_USER_TEST)
+		sq->state.send = SQ_CANCELLED;
 	}
 	qlist_for_each(l, &ib_device->recvq) {
-	    ib_recv_t *rq = qlist_upcast(l);
+	    struct ib_work *rq = qlist_upcast(l);
 	    if (rq->c != c) continue;
 #if !MEMCACHE_BOUNCEBUF
-	    if (rq->state == RQ_RTS_WAITING_DATA)
+	    if (rq->state.recv == RQ_RTS_WAITING_RTS_DONE)
 		memcache_deregister(ib_device->memcache, &rq->buflist);
 #  if MEMCACHE_EARLY_REG
 	    /* pin on post, dereg all these */
-	    if (rq->state == RQ_RTS_WAITING_CTS_BUFFER)
+	    if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER)
 		memcache_deregister(ib_device->memcache, &rq->buflist);
-	    if (rq->state == RQ_WAITING_INCOMING
+	    if (rq->state.recv == RQ_WAITING_INCOMING
 	      && rq->buflist.tot_len > ib_device->eager_buf_payload)
 		memcache_deregister(ib_device->memcache, &rq->buflist);
 #  endif
 #endif
-	    if (!(rq->state == RQ_EAGER_WAITING_USER_TEST 
-	       || rq->state == RQ_RTS_WAITING_USER_TEST))
-		rq->state = RQ_CANCELLED;
+	    if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST 
+	       || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
+		rq->state.recv = RQ_CANCELLED;
 	}
     }
 
@@ -1443,28 +1521,6 @@ BMI_ib_cancel(bmi_op_id_t id, bmi_contex
     return 0;
 }
 
-/*
- * For connections that are being cancelled, maybe delete them if no
- * more send or recvq entries remain.
- */
-static void
-maybe_free_connection(ib_connection_t *c)
-{
-    list_t *l;
-
-    if (!c->cancelled)
-	return;
-    qlist_for_each(l, &ib_device->sendq) {
-	ib_send_t *sq = qlist_upcast(l);
-	if (sq->c == c) return;
-    }
-    qlist_for_each(l, &ib_device->recvq) {
-	ib_recv_t *rq = qlist_upcast(l);
-	if (rq->c == c) return;
-    }
-    ib_close_connection(c);
-}
-
 static const char *
 BMI_ib_rev_lookup(struct method_addr *meth)
 {
@@ -1535,7 +1591,7 @@ static struct method_addr *BMI_ib_method
     /* lookup in known connections, if there are any */
     gen_mutex_lock(&interface_mutex);
     if (ib_device) {
-	list_t *l;
+	struct qlist_head *l;
 	qlist_for_each(l, &ib_device->connection) {
 	    ib_connection_t *c = qlist_upcast(l);
 	    ib_method_addr_t *ibmap = c->remote_map->method_data;
@@ -1551,7 +1607,7 @@ static struct method_addr *BMI_ib_method
 	free(hostname);  /* found it */
     else
 	map = ib_alloc_method_addr(0, hostname, port);  /* alloc new one */
-	/* but don't call method_addr_reg_callback! */
+	/* but don't call bmi_method_addr_reg_callback! */
 
     return map;
 }
@@ -1577,8 +1633,8 @@ static ib_connection_t *ib_new_connectio
     c->eager_recv_buf_head_contig = Malloc(ib_device->eager_buf_num
       * sizeof(*c->eager_recv_buf_head_contig));
     for (i=0; i<ib_device->eager_buf_num; i++) {
-	buf_head_t *ebs = &c->eager_send_buf_head_contig[i];
-	buf_head_t *ebr = &c->eager_recv_buf_head_contig[i];
+	struct buf_head *ebs = &c->eager_send_buf_head_contig[i];
+	struct buf_head *ebr = &c->eager_recv_buf_head_contig[i];
 	INIT_QLIST_HEAD(&ebs->list);
 	INIT_QLIST_HEAD(&ebr->list);
 	ebs->c = c;
@@ -1602,6 +1658,10 @@ static ib_connection_t *ib_new_connectio
     c->refcnt = 0;
     c->closed = 0;
 
+    /* save one credit back for emergency credit refill */
+    c->send_credit = ib_device->eager_buf_num - 1;
+    c->return_credit = 0;
+
     ret = new_connection(c, sock, is_server);
     if (ret) {
 	ib_close_connection(c);
@@ -1611,10 +1671,12 @@ static ib_connection_t *ib_new_connectio
     return c;
 }
 
+/*
+ * Try to close and free a connection, but only do it if refcnt has
+ * gone to zero.
+ */
 static void ib_close_connection(ib_connection_t *c)
 {
-    ib_method_addr_t *ibmap;
-
     debug(2, "%s: closing connection to %s", __func__, c->peername);
     c->closed = 1;
     if (c->refcnt != 0) {
@@ -1630,8 +1692,10 @@ static void ib_close_connection(ib_conne
     free(c->eager_recv_buf_head_contig);
     /* never free the remote map, for the life of the executable, just
      * mark it unconnected since BMI will always have this structure. */
-    ibmap = c->remote_map->method_data;
-    ibmap->c = NULL;
+    if (c->remote_map) {
+	ib_method_addr_t *ibmap = c->remote_map->method_data;
+	ibmap->c = NULL;
+    }
     free(c->peername);
     qlist_del(&c->list);
     free(c);
@@ -1725,7 +1789,8 @@ static void ib_tcp_server_init_listen_so
 
 /*
  * Check for new connections.  The listening socket is left nonblocking
- * so this test can be quick.  Returns >0 if an accept worked.
+ * so this test can be quick; but accept is not really that quick compared
+ * to polling an IB interface, for instance.  Returns >0 if an accept worked.
  */
 static int ib_tcp_server_check_new_connections(void)
 {
@@ -1746,11 +1811,12 @@ static int ib_tcp_server_check_new_conne
 	int port = ntohs(ssin.sin_port);
 	sprintf(peername, "%s:%d", hostname, port);
 
+	gen_mutex_lock(&interface_mutex);
+
 	c = ib_new_connection(s, peername, 1);
 	if (!c) {
 	    free(hostname);
-	    close(s);
-	    return 0;
+	    goto out_unlock;
 	}
 
 	c->remote_map = ib_alloc_method_addr(c, hostname, port);
@@ -1761,9 +1827,12 @@ static int ib_tcp_server_check_new_conne
 
 	debug(2, "%s: accepted new connection %s at server", __func__,
 	  c->peername);
+	ret = 1;
+
+out_unlock:
+	gen_mutex_unlock(&interface_mutex);
 	if (close(s) < 0)
 	    error_errno("%s: close new sock", __func__);
-	ret = 1;
     }
     return ret;
 }
@@ -1772,26 +1841,40 @@ static int ib_tcp_server_check_new_conne
  * Ask the device to write to its FD if a CQ event happens, and poll on it
  * as well as the listen_sock for activity, but do not actually respond to
  * anything.  A later ib_check_cq will handle CQ events, and a later call to
- * testunexpected will pick up new connections.  Returns >0 if something is
- * ready.
+ * testunexpected will pick up new connections.  Returns ==1 if IB device is
+ * ready, other >0 for some activity, else 0.
  */
 static int ib_block_for_activity(int timeout_ms)
 {
-    struct pollfd pfd[2];
+    struct pollfd pfd[3];  /* cq fd, async fd, accept socket */
     int numfd;
     int ret;
 
-    pfd[0].fd = prepare_cq_block();
+    prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
     pfd[0].events = POLLIN;
-    numfd = 1;
+    pfd[1].events = POLLIN;
+    numfd = 2;
     if (ib_device->listen_sock >= 0) {
-	pfd[1].fd = ib_device->listen_sock;
-	pfd[1].events = POLLIN;
-	numfd = 2;
+	pfd[2].fd = ib_device->listen_sock;
+	pfd[2].events = POLLIN;
+	numfd = 3;
     }
+
     ret = poll(pfd, numfd, timeout_ms);
-    debug(4, "%s: ret %d rev0 0x%x", __func__, ret, pfd[0].revents);
-    if (ret < 0) {
+    debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
+          pfd[0].revents, pfd[1].revents);
+    if (ret > 0) {
+	if (pfd[0].revents == POLLIN) {
+	    ack_cq_completion_event();
+	    return 1;
+	}
+	/* check others only if CQ was empty */
+	ret = 2;
+	if (pfd[1].revents == POLLIN)
+	    check_async_events();
+	if (pfd[2].revents == POLLIN)
+	    ib_tcp_server_check_new_connections();
+    } else if (ret < 0) {
 	if (errno == EINTR)  /* normal, ignore but break */
 	    ret = 0;
 	else
@@ -1854,6 +1937,12 @@ static int BMI_ib_set_info(int option, v
 	free(map);
 	break;
     }
+    case BMI_OPTIMISTIC_BUFFER_REG: {
+	const struct bmi_optimistic_buffer_info *binfo = param;
+	memcache_preregister(ib_device->memcache, binfo->buffer,
+	                     binfo->len, binfo->rw);
+	break;
+    }
     default:
 	/* Should return -ENOSYS, but return 0 for caller ease. */
 	break;
@@ -1918,10 +2007,13 @@ static int BMI_ib_initialize(struct meth
      * The hostname is currently ignored; the port number is used to bind
      * the listening TCP socket which accepts new connections.
      */
-    if (init_flags & BMI_INIT_SERVER)
+    if (init_flags & BMI_INIT_SERVER) {
 	ib_tcp_server_init_listen_socket(listen_addr);
-    else
+	ib_device->listen_addr = listen_addr;
+    } else {
 	ib_device->listen_sock = -1;
+	ib_device->listen_addr = NULL;
+    }
 
     /*
      * Initialize data structures.
@@ -1949,7 +2041,7 @@ static int BMI_ib_finalize(void)
 
     /* if client, send BYE to each connection and bring down the QP */
     if (ib_device->listen_sock < 0) {
-	list_t *l;
+	struct qlist_head *l;
 	qlist_for_each(l, &ib_device->connection) {
 	    ib_connection_t *c = qlist_upcast(l);
 	    if (c->cancelled)
@@ -1960,8 +2052,12 @@ static int BMI_ib_finalize(void)
 	}
     }
     /* if server, stop listening */
-    if (ib_device->listen_sock >= 0)
+    if (ib_device->listen_sock >= 0) {
+	ib_method_addr_t *ibmap = ib_device->listen_addr->method_data;
 	close(ib_device->listen_sock);
+	free(ibmap->hostname);
+	free(ib_device->listen_addr);
+    }
 
     /* destroy QPs and other connection structures */
     while (ib_device->connection.next != &ib_device->connection) {
@@ -2018,5 +2114,6 @@ const struct bmi_method_ops bmi_ib_ops =
     .BMI_meth_close_context = BMI_ib_close_context,
     .BMI_meth_cancel = BMI_ib_cancel,
     .BMI_meth_rev_lookup_unexpected = BMI_ib_rev_lookup,
+    .BMI_meth_query_addr_range = NULL,
 };
 

Index: vapi.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/vapi.c,v
diff -p -u -r1.4 -r1.4.4.1
--- vapi.c	28 Aug 2006 17:33:03 -0000	1.4
+++ vapi.c	17 Feb 2007 11:16:34 -0000	1.4.4.1
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: vapi.c,v 1.4 2006/08/28 17:33:03 pw Exp $
+ * $Id: vapi.c,v 1.4.4.1 2007/02/17 11:16:34 kunkel Exp $
  */
 #include <stdio.h>
 #include <string.h>
@@ -17,7 +17,6 @@
 #include <src/io/bmi/bmi-method-support.h>   /* struct method_addr */
 #include <src/common/misc/pvfs2-internal.h>
 #include <src/io/bmi/bmi-byteswap.h>  /* bmitoh64 */
-#include <src/common/gossip/gossip.h>
 
 #include "pvfs2-config.h" /* HAVE_IB_WRAP_COMMON_H configure symbol */
 
@@ -54,8 +53,8 @@ struct vapi_device_priv {
 
     /*
      * Maximum number of outstanding work requests in the NIC, same for both
-     * SQ and RQ, though we only care about SQ on the QP and ack QP.  Used to
-     * decide when to use a SIGNALED completion on a send to avoid WQE buildup.
+     * SQ and RQ.  Used to decide when to use a SIGNALED completion on a send
+     * to avoid WQE buildup.
      */
     unsigned int max_outstanding_wr;
 
@@ -74,19 +73,15 @@ struct vapi_device_priv {
 struct vapi_connection_priv {
     /* ib local params */
     VAPI_qp_hndl_t qp;
-    VAPI_qp_hndl_t qp_ack;
     VAPI_qp_num_t qp_num;
-    VAPI_qp_num_t qp_ack_num;
     VAPI_mr_hndl_t eager_send_mr;
     VAPI_mr_hndl_t eager_recv_mr;
     VAPI_lkey_t eager_send_lkey;  /* for post_sr */
     VAPI_lkey_t eager_recv_lkey;  /* for post_rr */
     unsigned int num_unsignaled_wr;  /* keep track of outstanding WRs */
-    unsigned int num_unsignaled_wr_ack;
     /* ib remote params */
     IB_lid_t remote_lid;
     VAPI_qp_num_t remote_qp_num;
-    VAPI_qp_num_t remote_qp_ack_num;
 };
 
 /* constants used to initialize infiniband device */
@@ -99,7 +94,7 @@ static int exchange_data(int sock, int i
 static void verify_prop_caps(VAPI_qp_cap_t *cap);
 static void init_connection_modify_qp(VAPI_qp_hndl_t qp,
   VAPI_qp_num_t remote_qp_num, int remote_lid);
-static void vapi_post_rr(const ib_connection_t *c, buf_head_t *bh);
+static void vapi_post_rr(const ib_connection_t *c, struct buf_head *bh);
 static void __attribute__((noreturn,format(printf,2,3)))
   error_verrno(int ecode, const char *fmt, ...);
 int vapi_ib_initialize(void);
@@ -120,7 +115,6 @@ static int vapi_new_connection(ib_connec
     struct {
 	IB_lid_t lid;
 	VAPI_qp_num_t qp_num;
-	VAPI_qp_num_t qp_ack_num;
     } ch_in, ch_out;
 
     vc = Malloc(sizeof(*vc));
@@ -170,16 +164,8 @@ static int vapi_new_connection(ib_connec
     vc->qp_num = prop.qp_num;
     verify_prop_caps(&prop.cap);
 
-    /* and qp ack */
-    ret = VAPI_create_qp(vd->nic_handle, &qp_init_attr, &vc->qp_ack, &prop);
-    if (ret < 0)
-	error_verrno(ret, "%s: create QP ack", __func__);
-    vc->qp_ack_num = prop.qp_num;
-    verify_prop_caps(&prop.cap);
-
-    /* initialize for post_sr and post_sr_ack */
+    /* initialize for post_sr */
     vc->num_unsignaled_wr = 0;
-    vc->num_unsignaled_wr_ack = 0;
 
     /* share connection information across TCP */
     /* sanity check sizes of things (actually only 24 bits in qp_num) */
@@ -193,7 +179,6 @@ static int vapi_new_connection(ib_connec
     /* convert all to network order and back */
     ch_out.lid = htobmi16(vd->nic_lid);
     ch_out.qp_num = htobmi32(vc->qp_num);
-    ch_out.qp_ack_num = htobmi32(vc->qp_ack_num);
 
     ret = exchange_data(sock, is_server, &ch_in, &ch_out, sizeof(ch_in));
     if (ret)
@@ -201,12 +186,9 @@ static int vapi_new_connection(ib_connec
 
     vc->remote_lid = bmitoh16(ch_in.lid);
     vc->remote_qp_num = bmitoh32(ch_in.qp_num);
-    vc->remote_qp_ack_num = bmitoh32(ch_in.qp_ack_num);
 
     /* bring the two QPs up to RTR */
     init_connection_modify_qp(vc->qp, vc->remote_qp_num, vc->remote_lid);
-    init_connection_modify_qp(vc->qp_ack, vc->remote_qp_ack_num,
-                              vc->remote_lid);
 
     /* post initial RRs */
     for (i=0; i<ib_device->eager_buf_num; i++)
@@ -363,10 +345,9 @@ static void init_connection_modify_qp(VA
 }
 
 /*
- * Close the QP associated with this connection.  Do not bother draining
- * qp_ack, nothing sending on it anyway.  Used to wait for drain to finish,
- * but many seconds pass before the adapter tells us about it via an asynch
- * event.  Perhaps there is a way to do it via polling.
+ * Close the QP associated with this connection.  Used to wait for drain to
+ * finish, but many seconds pass before the adapter tells us about it via an
+ * asynch event.  Perhaps there is a way to do it via polling.
  */
 static void vapi_drain_qp(ib_connection_t *c)
 {
@@ -391,42 +372,6 @@ static void vapi_drain_qp(ib_connection_
 	error_verrno(ret, "%s: VAPI_modify_qp RTS -> SQD", __func__);
 }
 
-static void vapi_send_bye(ib_connection_t *c)
-{
-    struct vapi_connection_priv *vc = c->priv;
-    struct vapi_device_priv *vd = ib_device->priv;
-    buf_head_t *bh;
-    VAPI_sg_lst_entry_t sg;
-    VAPI_sr_desc_t sr;
-    msg_header_common_t mh_common;
-    char *ptr;
-    int ret;
-
-    bh = qlist_try_del_head(&c->eager_send_buf_free);
-    if (!bh) {
-	/* if no messages available, let garbage collection on server deal */
-	return;
-    }
-
-    ptr = bh->buf;
-    mh_common.type = MSG_BYE;
-    encode_msg_header_common_t(&ptr, &mh_common);
-
-    debug(2, "%s: sending bye", __func__);
-    sg.addr = int64_from_ptr(bh->buf);
-    sg.len = sizeof(mh_common);
-    sg.lkey = vc->eager_send_lkey;
-
-    memset(&sr, 0, sizeof(sr));
-    sr.opcode = VAPI_SEND;
-    sr.comp_type = VAPI_UNSIGNALED;  /* == 1 */
-    sr.sg_lst_p = &sg;
-    sr.sg_lst_len = 1;
-    ret = VAPI_post_sr(vd->nic_handle, vc->qp, &sr);
-    if (ret < 0)
-	error_verrno(ret, "%s: VAPI_post_sr", __func__);
-}
-
 /*
  * At an explicit BYE message, or at finalize time, shut down a connection.
  * If descriptors are posted, defer and clean up the connection structures
@@ -438,9 +383,6 @@ static void vapi_close_connection(ib_con
     struct vapi_connection_priv *vc = c->priv;
     struct vapi_device_priv *vd = ib_device->priv;
 
-    ret = VAPI_destroy_qp(vd->nic_handle, vc->qp_ack);
-    if (ret < 0)
-	error_verrno(ret, "%s: VAPI_destroy_qp ack", __func__);
     ret = VAPI_destroy_qp(vd->nic_handle, vc->qp);
     if (ret < 0)
 	error_verrno(ret, "%s: VAPI_destroy_qp", __func__);
@@ -456,11 +398,9 @@ static void vapi_close_connection(ib_con
 
 /*
  * VAPI interface to post sends.  Not RDMA, just SEND.
- * Called for an eager send, rts send, or cts send.  Local send
- * completion is ignored, except rarely to clear the queue (see comments
- * at post_sr_ack).
+ * Called for an eager send, rts send, or cts send.
  */
-static void vapi_post_sr(const buf_head_t *bh, u_int32_t len)
+static void vapi_post_sr(const struct buf_head *bh, u_int32_t len)
 {
     VAPI_sg_lst_entry_t sg;
     VAPI_sr_desc_t sr;
@@ -493,7 +433,7 @@ static void vapi_post_sr(const buf_head_
 /*
  * Post one of the eager recv bufs for this connection.
  */
-static void vapi_post_rr(const ib_connection_t *c, buf_head_t *bh)
+static void vapi_post_rr(const ib_connection_t *c, struct buf_head *bh)
 {
     VAPI_sg_lst_entry_t sg;
     VAPI_rr_desc_t rr;
@@ -517,91 +457,13 @@ static void vapi_post_rr(const ib_connec
 }
 
 /*
- * Explicitly return a credit.  Immediate data says for which of
- * his buffer numbers does this ack apply.  Buffers will get reposted
- * out of order, although the buffers are always matched pairwise, so we
- * always return _his_ number, not ours.  (Consider this scenario
- *
- *     client                        server
- *        buf 0: send unex eager        buf 0: recv unex eager, no ack
- *        buf 1: send rts                      until app recognizes
- *                                      buf 1: recv rts, ack immediate,
- *                                             repost my buf 1
- *                                      ....
- *                                      app deals with eager, repost
- *                                             my buf 0
- *
- * Now the buffers are posted on the server in a different order.)
- *
- * Don't want to get a local completion from this, but if we don't do
- * so every once in a while, the NIC will fill up apparently.  So we
- * generate one every