[PVFS2-CVS]
commit by pw in pvfs2/src/io/bmi/bmi_ib: mem.c README ib.c ib.h
module.mk.in setup.c util.c
CVS commit program
cvs at parl.clemson.edu
Thu Nov 3 16:23:19 EST 2005
Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib
In directory parlweb:/tmp/cvs-serv691/src/io/bmi/bmi_ib
Modified Files:
README ib.c ib.h module.mk.in setup.c util.c
Added Files:
mem.c
Log Message:
- ib: add memory registration caching
- ib: add support for cancel operation
- ib: fix minor protocol problems
- ib: cleanup build
--- /dev/null 2003-01-30 05:24:37.000000000 -0500
+++ mem.c 2005-11-03 16:23:19.000000000 -0500
@@ -0,0 +1,235 @@
+/*
+ * InfiniBand BMI method, memory allocation and caching.
+ *
+ * Copyright (C) 2004-5 Pete Wyckoff <pw at osc.edu>
+ *
+ * See COPYING in top-level directory.
+ *
+ * $Id: mem.c,v 1.1 2005/11/03 21:23:19 pw Exp $
+ */
+#include <src/common/gen-locks/gen-locks.h>
+#include "ib.h"
+
+#ifdef __PVFS2_SERVER__
+# define ENABLE_MEMCACHE 1
+#else
+# define ENABLE_MEMCACHE 1
+#endif
+
+list_t memcache __hidden;
+static gen_mutex_t memcache_mutex = GEN_MUTEX_INITIALIZER;
+
+#if ENABLE_MEMCACHE
+/*
+ * Create and link a new memcache entry. Assumes lock already held.
+ */
+static memcache_entry_t *
+memcache_add(void *buf, bmi_size_t len)
+{
+ memcache_entry_t *c;
+
+ c = malloc(sizeof(*c));
+ if (likely(c)) {
+ c->buf = buf;
+ c->len = len;
+ c->count = 0;
+ qlist_add_tail(&c->list, &memcache);
+ }
+ return c;
+}
+
+/*
+ * See if an entry exists that totally covers the request. Assumes lock
+ * already held. These criteria apply:
+ * 1. existing bounds must cover potential new one
+ * 2. prefer higest refcnt (hoping for maximal reuse)
+ * 3. prefer tightest bounds among matching refcnt
+ */
+static memcache_entry_t *
+memcache_lookup_cover(const void *const buf, bmi_size_t len)
+{
+ list_t *l;
+ const char *end = (const char *) buf + len;
+ memcache_entry_t *cbest = 0;
+
+ qlist_for_each(l, &memcache) {
+ memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
+ if (!(c->buf <= buf && end <= (const char *)c->buf + c->len))
+ continue;
+ if (!cbest)
+ goto take;
+ if (c->count < cbest->count)
+ continue; /* discard lower refcnt one */
+ if (c->count > cbest->count)
+ goto take; /* prefer higher refcnt */
+ /* equal refcnt, prefer tighter bounds */
+ if (c->len < cbest->len)
+ goto take;
+ continue;
+ take:
+ cbest = c;
+ }
+ return cbest;
+}
+
+/*
+ * See if the exact entry exists. There must never be more than one
+ * of the same entry. Used only for BMI_ib_memfree.
+ */
+static memcache_entry_t *
+memcache_lookup_exact(const void *const buf, bmi_size_t len)
+{
+ list_t *l;
+
+ qlist_for_each(l, &memcache) {
+ memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
+ if (c->buf == buf && c->len == len)
+ return c;
+ }
+ return 0;
+}
+#endif /* ENABLE_MEMCACHE */
+
+/*
+ * BMI malloc and free routines. If the region is big enough, pin
+ * it now to save time later in the actual send or recv routine.
+ */
+void *
+BMI_ib_memalloc(bmi_size_t len, enum bmi_op_type send_recv __unused)
+{
+ void *buf;
+
+ buf = malloc(len);
+#if ENABLE_MEMCACHE
+ if (unlikely(!buf))
+ goto out;
+ if (len > EAGER_BUF_PAYLOAD) {
+ memcache_entry_t *c;
+
+ gen_mutex_lock(&memcache_mutex);
+ c = memcache_lookup_cover(buf, len); /* could be recycled buffer */
+ if (c)
+ ++c->count;
+ else {
+ c = memcache_add(buf, len);
+ if (unlikely(!c)) {
+ free(buf);
+ buf = 0;
+ } else {
+ ib_mem_register(c);
+ ++c->count;
+ }
+ }
+ gen_mutex_unlock(&memcache_mutex);
+ }
+ out:
+#endif /* ENABLE_MEMCACHE */
+ return buf;
+}
+
+int
+BMI_ib_memfree(void *buf, bmi_size_t len __unused,
+ enum bmi_op_type send_recv __unused)
+{
+#if ENABLE_MEMCACHE
+ memcache_entry_t *c;
+ /* okay if not found, just not cached */
+
+ gen_mutex_lock(&memcache_mutex);
+ c = memcache_lookup_exact(buf, len);
+ if (c) {
+ debug(6, "%s: found %p len %Ld", __func__, c->buf, c->len);
+ assert(c->count == 1, "%s: buf %p len %Ld count = %d, expected 1",
+ __func__, c->buf, c->len, c->count);
+ ib_mem_deregister(c);
+ qlist_del(&c->list);
+ free(c);
+ }
+ gen_mutex_unlock(&memcache_mutex);
+#endif
+ free(buf);
+ return 0;
+}
+
+/*
+ * Interface for bmi_ib send and recv routines in ib.c. Takes a buflist
+ * and looks up each entry in the memcache, adding it if not yet pinned.
+ */
+void
+memcache_register(ib_buflist_t *buflist)
+{
+ int i;
+
+ buflist->memcache = Malloc(buflist->num * sizeof(*buflist->memcache));
+ gen_mutex_lock(&memcache_mutex);
+ for (i=0; i<buflist->num; i++) {
+#if ENABLE_MEMCACHE
+ memcache_entry_t *c;
+ c = memcache_lookup_cover(buflist->buf.send[i], buflist->len[i]);
+ if (c) {
+ ++c->count;
+ debug(2, "%s: hit [%d] %p len %Ld (via %p len %Ld) refcnt now %d",
+ __func__, i, buflist->buf.send[i], buflist->len[i], c->buf,
+ c->len, c->count);
+ } else {
+ debug(2, "%s: miss [%d] %p len %Ld", __func__, i,
+ buflist->buf.send[i], buflist->len[i]);
+ c = memcache_add(buflist->buf.recv[i], buflist->len[i]);
+ if (!c)
+ error("%s: no memory for cache entry", __func__);
+ c->count = 1;
+ ib_mem_register(c);
+ }
+ buflist->memcache[i] = c;
+#else
+ memcache_entry_t cp = Malloc(sizeof(*cp));
+ cp->buf = buflist->buf.recv[i];
+ cp->len = buflist->len[i];
+ cp->type = type;
+ ib_mem_register(cp);
+ buflist->memcache[i] = cp;
+#endif
+ }
+ gen_mutex_unlock(&memcache_mutex);
+}
+
+void
+memcache_deregister(ib_buflist_t *buflist)
+{
+ int i;
+
+ gen_mutex_lock(&memcache_mutex);
+ for (i=0; i<buflist->num; i++) {
+#if ENABLE_MEMCACHE
+ memcache_entry_t *c = buflist->memcache[i];
+ --c->count;
+ debug(2, "%s: dec refcount [%d] %p len %Ld count now %d", __func__, i,
+ buflist->buf.send[i], buflist->len[i], c->count);
+ /* let garbage collection do ib_mem_deregister(c) for refcnt==0 */
+#else
+ ib_mem_deregister(buflist->memcache[i]);
+ free(buflist->memcache[i]);
+#endif
+ }
+ free(buflist->memcache);
+ gen_mutex_unlock(&memcache_mutex);
+}
+
+/*
+ * Remove all mappings in preparation for closing the pd.
+ */
+void
+memcache_shutdown(void)
+{
+ list_t *l, *lp;
+
+ gen_mutex_lock(&memcache_mutex);
+ qlist_for_each_safe(l, lp, &memcache) {
+ memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
+ ib_mem_deregister(c);
+ qlist_del(&c->list);
+ free(c);
+ }
+ gen_mutex_unlock(&memcache_mutex);
+}
+
Index: README
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/README,v
diff -u -p -u -r1.2 -r1.3
--- README 29 Sep 2004 13:47:55 -0000 1.2
+++ README 3 Nov 2005 21:23:19 -0000 1.3
@@ -1,8 +1,8 @@
Notes on the BMI InfiniBand implementation
-Copyright (C) 2003 Pete Wyckoff <pw at osc.edu>
+Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
-$Id: README,v 1.2 2004/09/29 13:47:55 pw Exp $
+$Id: README,v 1.3 2005/11/03 21:23:19 pw Exp $
Connection management
---------------------
@@ -121,21 +121,28 @@ RTS send
--------
SQ_WAITING_BUFFER
alloc bh
- post_ack_recv_slot
+ # post_ack_recv_slot
post_sr mh + mh_rts
- SQ_WAITING_RTS_ACK
- (wait recv cq event)
- free bh from rts
+ ### 24 nov 04, removed this state: problem happened when
+ # receiver sent RTS_ACK then CTS very close together. They
+ # go on different QPs but wired to the same CQ. Sender polled
+ # and found the CTS before the RTS_ACK and got unhappy. Not convinced
+ # there was any good reason for the RTS_ACK anyway, as it is a bug
+ # if the receiver doesn't reply to the CTS before other messages; these
+ # things are supposed to be in order.
+ #SQ_WAITING_RTS_ACK
+ # (wait recv cq event)
+ # free bh from rts
SQ_WAITING_CTS
(wait recv cq event)
- re-post_rr
- pin memory
- RDMA big message to given address
+ free bh used for rts
+ post RDMA to address given in CTS
SQ_WAITING_DATA_LOCAL_SEND_COMPLETE
(wait local send cq event for rdma write)
- unpin
+ repost rr used to receive cts
ack cts # could probably do this in previous state since IB
guarantees order, but need this state to unpin anyway
+ unpin
SQ_WAITING_USER_TEST
wait test
release sendq
@@ -148,16 +155,13 @@ RTS recv, pre-post recv
(wait recv cq event)
match existing rq entry
re-post_rr from rts
- ack rts for simplicitly, else must carry this number until cts
+ # ack rts for simplicitly, else must carry this number until cts
RQ_RTS_WAITING_CTS_BUFFER
alloc bh local for cts
-> if failure state = RQ_RTS_WAITING_CTS_BUFFER
pin recv buffer
post_ack_recv_slot
send cts
- RQ_RTS_WAITING_CTS_LOCAL_SEND_COMPLETE
- (wait send cq event)
- ignore
RQ_RTS_WAITING_DATA
(wait recv cq event ack)
free bh local from cts
@@ -171,14 +175,14 @@ RTS recv, non-pre post
(rts arrives on network)
build recvq
re-post_rr from rts
- ack rts for simplicitly, else must carry this number until post
+ # ack rts for simplicitly, else must carry this number until post
RQ_RTS_WAITING_USER_POST
(wait user post that matches)
alloc bh local for cts
pin recv buffer
post_ack_recv_slot
send cts -> if failure state = RQ_RTS_WAITING_CTS_BUFFER
- RQ_RTS_WAITING_CTS_LOCAL_SEND_COMPLETE ... continue above
+ RQ_RTS_WAITING_DATA ... continue above
Other
Index: ib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.c,v
diff -u -p -u -r1.16 -r1.17
--- ib.c 2 Aug 2005 17:56:12 -0000 1.16
+++ ib.c 3 Nov 2005 21:23:19 -0000 1.17
@@ -1,20 +1,17 @@
/*
* InfiniBand BMI method.
*
- * Copyright (C) 2003-4 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
*
* See COPYING in top-level directory.
*
- * $Id: ib.c,v 1.16 2005/08/02 17:56:12 robl Exp $
+ * $Id: ib.c,v 1.17 2005/11/03 21:23:19 pw Exp $
*/
#include <stdio.h> /* just for NULL for id-generator.h */
-#include <sys/time.h>
#include <src/common/id-generator/id-generator.h>
-#include <src/common/quicklist/quicklist.h>
-#include <src/io/bmi/bmi-method-support.h>
-#include <src/common/gen-locks/gen-locks.h>
-#include <vapi.h>
-#include <vapi_common.h>
+#include <src/io/bmi/bmi-method-support.h> /* bmi_method_ops ... */
+#include <src/common/gen-locks/gen-locks.h> /* gen_mutex_t ... */
+#include <vapi_common.h> /* VAPI_cqe_opcode_sym ... */
#include "ib.h"
static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
@@ -28,25 +25,41 @@ list_t connection __hidden;
list_t sendq __hidden;
list_t recvq __hidden;
VAPI_sg_lst_entry_t *sg_tmp_array __hidden;
-int sg_max_len __hidden;
-int max_outstanding_wr __hidden;
+unsigned int sg_max_len __hidden;
+unsigned int max_outstanding_wr __hidden;
+
+#define MEMCACHE_BOUNCEBUF 0
+#define MEMCACHE_EARLY_REG 1
+
+#if MEMCACHE_EARLY_REG
+#if MEMCACHE_BOUNCEBUF
+#error Not sensible to use bouncebuf with early reg. First use of bouncebuf \
+ will register it, thus no effect whether early reg or not.
+#endif
+#endif
+
+#if MEMCACHE_BOUNCEBUF
+static ib_buflist_t reg_send_buflist = { .num = 0 };
+static ib_buflist_t reg_recv_buflist = { .num = 0 };
+static void *reg_send_buflist_buf;
+static void *reg_recv_buflist_buf;
+static const bmi_size_t reg_send_buflist_len = 256 * 1024;
+static const bmi_size_t reg_recv_buflist_len = 256 * 1024;
+#endif
static int send_cts(ib_recv_t *rq);
static void post_sr(const buf_head_t *bh, u_int32_t len);
/* post_rr declared externally */
-static void post_sr_ack(const ib_connection_t *c, const buf_head_t *bh);
+static void post_sr_ack(ib_connection_t *c, const buf_head_t *bh);
static void post_rr_ack(const ib_connection_t *c, const buf_head_t *bh);
static void post_sr_rdmaw(ib_send_t *sq, msg_header_cts_t *mh_cts);
static void encourage_send_waiting_buffer(ib_send_t *sq);
-static void encourage_send_incoming_ack(ib_send_t *sq);
-static void encourage_send_send_completed(ib_send_t *sq);
static void encourage_send_incoming_cts(buf_head_t *bh, u_int32_t byte_len);
static void encourage_recv_incoming(ib_connection_t *c, buf_head_t *bh,
u_int32_t byte_len);
static void encourage_recv_incoming_cts_ack(ib_recv_t *rq);
-static void encourage_recv_to_send_cts(ib_recv_t *rq);
static void maybe_free_connection(ib_connection_t *c);
/*
@@ -72,17 +85,21 @@ check_cq(void)
debug(2, "%s: found something", __func__);
++ret;
if (desc.status != VAPI_SUCCESS) {
- warning("%s: entry id 0x%Lx opcode %s error %s", __func__,
- desc.id, VAPI_cqe_opcode_sym(desc.opcode),
- VAPI_wc_status_sym(desc.status));
if (desc.opcode == VAPI_CQE_SQ_SEND_DATA) {
+ debug(0, "%s: entry id 0x%Lx SQ_SEND_DATA error %s", __func__,
+ desc.id, VAPI_wc_status_sym(desc.status));
if (desc.id) {
ib_connection_t *c = ptr_from_int64(desc.id);
- if (c->cancelled)
+ if (c->cancelled) {
debug(0,
"%s: ignoring send error on cancelled conn to %s",
__func__, c->peername);
+ }
}
+ } else {
+ error("%s: entry id 0x%Lx opcode %s error %s", __func__,
+ desc.id, VAPI_cqe_opcode_sym(desc.opcode),
+ VAPI_wc_status_sym(desc.status));
}
}
@@ -105,21 +122,31 @@ check_cq(void)
__func__);
debug(2, "%s: acknowledgment message %s buf %d",
__func__, bh->c->peername, bufnum);
- /* maybe this is okay? --pw, 6 mar 04 */
- assert(bufnum == bh->num, "%s: ack out of sequence, got %d"
- " in descriptor for buffer %d", __func__, bufnum, bh->num);
+
/*
- * Do not get the sq from the bh that posted this, just in case
- * it ever becomes okay to do out-of-order. Instead look up
- * the bufnum in the static send array. This sq will actually
- * be an rq if the ack is of a CTS.
+ * Do not get the sq from the bh that posted this because
+ * these do not necessarily come in order, in particular
+ * there is no explicit ACK for an RTS instead the CTS serves
+ * as the ACK. Instead look up the bufnum in the static
+ * send array. This sq will actually be an rq if the ack
+ * is of a CTS.
*/
sq = bh->c->eager_send_buf_head_contig[bufnum].sq;
- if (unlikely(sq->type == TYPE_RECV))
- /* ack of a CTS send by the receiver */
+ if (unlikely(sq->type == BMI_RECV))
+ /* ack of a CTS sent by the receiver */
encourage_recv_incoming_cts_ack((ib_recv_t *)sq);
- else
- encourage_send_incoming_ack(sq);
+ else {
+ assert(sq->state == SQ_WAITING_EAGER_ACK,
+ "%s: unknown send state %s", __func__,
+ sq_state_name(sq->state));
+
+ sq->state = SQ_WAITING_USER_TEST;
+ qlist_add_tail(&sq->bh->list, &sq->c->eager_send_buf_free);
+
+ debug(2, "%s: sq %p"
+ " SQ_WAITING_EAGER_ACK -> SQ_WAITING_USER_TEST",
+ __func__, sq);
+ }
} else {
/*
@@ -127,16 +154,14 @@ check_cq(void)
*/
msg_header_t *mh = bh->buf;
- debug(2, "%s: found len %d at %s bufnum %d type %s",
+ debug(1, "%s: found len %d at %s bufnum %d type %s",
__func__, byte_len, bh->c->peername, bh->num,
msg_type_name(mh->type));
if (mh->type == MSG_CTS) {
/* incoming CTS messages go to the send engine */
- debug(2, "%s: found cts message", __func__);
encourage_send_incoming_cts(bh, byte_len);
} else {
/* something for the recv side, no known rq yet */
- debug(2, "%s: found message for receive engine", __func__);
encourage_recv_incoming(bh->c, bh, byte_len);
}
}
@@ -146,14 +171,29 @@ check_cq(void)
/* completion event for the rdma write we initiated, used
* to signal memory unpin etc. */
ib_send_t *sq = ptr_from_int64(desc.id);
- encourage_send_send_completed(sq);
+
+ debug(2, "%s: sq %p %s", __func__, sq, sq_state_name(sq->state));
+
+ assert(sq->state == SQ_WAITING_DATA_LOCAL_SEND_COMPLETE,
+ "%s: wrong send state %s", __func__, sq_state_name(sq->state));
+
+ /* re-post and ack cts saved earlier, signals rdma completed */
+ post_rr(sq->c, sq->bh);
+ post_sr_ack(sq->c, sq->bh);
+
+#if !MEMCACHE_BOUNCEBUF
+ memcache_deregister(&sq->buflist);
+#endif
+ sq->state = SQ_WAITING_USER_TEST;
+
+ debug(2, "%s: sq %p now %s", __func__, sq,
+ sq_state_name(sq->state));
} else if (desc.opcode == VAPI_CQE_SQ_SEND_DATA) {
/* periodic send queue flush, qp or qp_ack */
- ib_connection_t *c = ptr_from_int64(desc.id);
- debug(2, "%s: sr (ack) to %s send completed", __func__,
- c->peername);
+ debug(2, "%s: sr (ack?) to %s send completed", __func__,
+ ((ib_connection_t *) ptr_from_int64(desc.id))->peername);
} else {
const char *ops = VAPI_cqe_opcode_sym(desc.opcode);
@@ -207,7 +247,7 @@ encourage_send_waiting_buffer(ib_send_t
post_rr_ack(sq->c, bh);
/* send the message */
- post_sr(bh, sizeof(*mh) + sq->buflist.tot_len);
+ post_sr(bh, (u_int32_t) (sizeof(*mh) + sq->buflist.tot_len));
/* wait for ack saying remote has received and recycled his buf */
sq->state = SQ_WAITING_EAGER_ACK;
@@ -228,57 +268,22 @@ encourage_send_waiting_buffer(ib_send_t
mh_rts->mop_id = sq->mop->op_id;
mh_rts->tot_len = sq->buflist.tot_len;
- /* get ready to receive the ack */
- post_rr_ack(sq->c, bh);
-
+ /* do not expect an ack back from this */
post_sr(bh, sizeof(*mh) + sizeof(*mh_rts));
- sq->state = SQ_WAITING_RTS_ACK;
+#if MEMCACHE_EARLY_REG
+ /* XXX: need to lock against receiver thread? Could poll return
+ * the CTS and start the data send before this completes? */
+ memcache_register(&sq->buflist);
+#endif
+
+ sq->state = SQ_WAITING_CTS;
debug(2, "%s: sq %p sent RTS now %s", __func__, sq,
sq_state_name(sq->state));
}
}
/*
- * Push a send message along its next step. Called in response to an
- * incoming message (including local acks of messages we sent).
- *
- * bh_in and byte_len only needed for CTS reply to check length
- */
-static void
-encourage_send_incoming_ack(ib_send_t *sq)
-{
- debug(2, "%s: sq %p %s", __func__, sq, sq_state_name(sq->state));
-
- if (sq->state == SQ_WAITING_EAGER_ACK)
- sq->state = SQ_WAITING_USER_TEST;
- else if (sq->state == SQ_WAITING_RTS_ACK)
- sq->state = SQ_WAITING_CTS;
- else
- error("%s: unknown send state %s", __func__, sq_state_name(sq->state));
-
- qlist_add_tail(&sq->bh->list, &sq->c->eager_send_buf_free);
- debug(2, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state));
-}
-
-static void
-encourage_send_send_completed(ib_send_t *sq)
-{
- debug(2, "%s: sq %p %s", __func__, sq, sq_state_name(sq->state));
- assert(sq->state == SQ_WAITING_DATA_LOCAL_SEND_COMPLETE,
- "%s: wrong send state %s", __func__, sq_state_name(sq->state));
-
- /* re-post and ack cts saved above, signals rdma completed */
- post_rr(sq->c, sq->bh);
- post_sr_ack(sq->c, sq->bh);
-
- ib_mem_deregister(&sq->buflist);
- sq->state = SQ_WAITING_USER_TEST;
- debug(2, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state));
-}
-
-
-/*
* Look at the incoming message which is a response to an earlier RTS
* from us, and start the real data send.
*/
@@ -317,12 +322,15 @@ encourage_send_incoming_cts(buf_head_t *
assert(sq->state == SQ_WAITING_CTS,
"%s: wrong send state %s", __func__, sq_state_name(sq->state));
+ /* the cts serves as an implicit ack of our rts, free that send buf */
+ qlist_add_tail(&sq->bh->list, &sq->c->eager_send_buf_free);
+
/* message; cts content; list of buffers, lengths, and keys */
want = sizeof(*mh) + sizeof(*mh_cts)
+ mh_cts->buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
- assert(byte_len == want,
- "%s: wrong message size for CTS, got %u, want %u", __func__,
- byte_len, want);
+ if (unlikely(byte_len != want))
+ error("%s: wrong message size for CTS, got %u, want %u", __func__,
+ byte_len, want);
/* save the bh which received the CTS for later acking */
sq->bh = bh;
@@ -359,7 +367,7 @@ static ib_recv_t *
alloc_new_recv(ib_connection_t *c, buf_head_t *bh)
{
ib_recv_t *rq = Malloc(sizeof(*rq));
- rq->type = TYPE_RECV;
+ rq->type = BMI_RECV;
rq->c = c;
rq->bh = bh;
rq->mop = 0; /* until user posts for it */
@@ -379,17 +387,16 @@ encourage_recv_incoming(ib_connection_t
msg_header_t *mh = bh->buf;
ib_recv_t *rq;
- debug(2, "%s: incoming msg type %s", __func__, msg_type_name(mh->type));
+ debug(4, "%s: incoming msg type %s", __func__, msg_type_name(mh->type));
if (mh->type == MSG_EAGER_SEND) {
rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh->bmi_tag);
if (rq) {
- int len = byte_len - sizeof(*mh);
+ bmi_size_t len = byte_len - sizeof(*mh);
if (len > rq->buflist.tot_len)
- error("%s: EAGER received %d too small for buffer "
- FORMAT_BMI_SIZE_T,
- __func__, len, rq->buflist.tot_len);
+ error("%s: EAGER received %Ld too small for buffer %Ld",
+ __func__, Ld(len), Ld(rq->buflist.tot_len));
memcpy_to_buflist(&rq->buflist,
(char *) bh->buf + sizeof(*mh), len);
@@ -401,6 +408,15 @@ encourage_recv_incoming(ib_connection_t
rq->state = RQ_EAGER_WAITING_USER_TEST;
debug(2, "%s: matched rq %p now %s", __func__, rq,
rq_state_name(rq->state));
+#if MEMCACHE_EARLY_REG
+ /* if a big receive was posted but only a small message came
+ * through, unregister it now */
+ if (rq->buflist.tot_len > EAGER_BUF_PAYLOAD) {
+ debug(2, "%s: early registration not needed, dereg after eager",
+ __func__);
+ memcache_deregister(&rq->buflist);
+ }
+#endif
} else {
rq = alloc_new_recv(c, bh);
@@ -434,9 +450,8 @@ encourage_recv_incoming(ib_connection_t
rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh->bmi_tag);
if (rq) {
if ((int)mh_rts->tot_len > rq->buflist.tot_len) {
- error("%s: RTS received " FORMAT_U_INT64_T
- " too small for buffer " FORMAT_U_INT64_T,
- __func__, mh_rts->tot_len, rq->buflist.tot_len);
+ error("%s: RTS received %Lu too small for buffer %Lu",
+ __func__, Lu(mh_rts->tot_len), Lu(rq->buflist.tot_len));
}
rq->state = RQ_RTS_WAITING_CTS_BUFFER;
debug(2, "%s: matched rq %p MSG_RTS now %s", __func__, rq,
@@ -452,14 +467,23 @@ encourage_recv_incoming(ib_connection_t
rq->actual_len = mh_rts->tot_len;
rq->rts_mop_id = mh_rts->mop_id;
- /* ack his rts for simplicity */
- debug(2, "%s: rq %p ack RTS from %s opid 0x%Lx", __func__,
- rq, c->peername, rq->rts_mop_id);
+#if 0
+ /* do NOT ack his rts, send CTS later implicitly acks */
+ debug(1, "%s: rq %p %s ack RTS from %s opid 0x%Lx", __func__,
+ rq, rq_state_name(rq->state), c->peername, rq->rts_mop_id);
post_rr(c, bh);
post_sr_ack(c, bh);
+#else
+ post_rr(c, bh);
+#endif
- if (rq->state == RQ_RTS_WAITING_CTS_BUFFER)
- encourage_recv_to_send_cts(rq);
+ if (rq->state == RQ_RTS_WAITING_CTS_BUFFER) {
+ int ret;
+ ret = send_cts(rq);
+ if (ret == 0)
+ rq->state = RQ_RTS_WAITING_DATA;
+ /* else keep waiting until we can send that cts */
+ }
} else if (mh->type == MSG_BYE) {
/*
@@ -473,31 +497,6 @@ encourage_recv_incoming(ib_connection_t
}
/*
- * Called internally when we notice that we should send a CTS for a
- * message, either just after receiving an RTS, or when the user gets
- * around to posting a receive that matchies an earlier received RTS.
- *
- * Also at test time if this state is found, since sending a CTS requires
- * a local buffer, and we might run out of that.
- */
-static void
-encourage_recv_to_send_cts(ib_recv_t *rq)
-{
- int ret;
-
- debug(2, "%s: rq %p %s", __func__, rq, rq_state_name(rq->state));
- assert(rq->state == RQ_RTS_WAITING_CTS_BUFFER,
- "%s: wrong state %s", __func__, rq_state_name(rq->state));
-
- ret = send_cts(rq);
- if (ret == 0)
- rq->state = RQ_RTS_WAITING_DATA;
- /* else keep waiting until we can send that cts */
-
- debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state));
-}
-
-/*
* Data has arrived, we know because we got the ack to the CTS
* we sent out. Serves to release remote cts buffer too.
*/
@@ -510,7 +509,11 @@ encourage_recv_incoming_cts_ack(ib_recv_
/* XXX: should be head for cache, but use tail for debugging */
qlist_add_tail(&rq->bh->list, &rq->c->eager_send_buf_free);
- ib_mem_deregister(&rq->buflist);
+#if MEMCACHE_BOUNCEBUF
+ memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf, rq->buflist.tot_len);
+#else
+ memcache_deregister(&rq->buflist);
+#endif
rq->state = RQ_RTS_WAITING_USER_TEST;
debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state));
@@ -533,7 +536,7 @@ send_cts(ib_recv_t *rq)
u_int32_t post_len;
int i;
- debug(2, "%s: rq %p, offering to recv %s opid 0x%Lx len %Ld",
+ debug(2, "%s: rq %p from %s opid 0x%Lx len %Ld",
__func__, rq, rq->c->peername, rq->rts_mop_id, Ld(rq->buflist.tot_len));
bh = qlist_try_del_head(&rq->c->eager_send_buf_free);
@@ -544,10 +547,26 @@ send_cts(ib_recv_t *rq)
rq->bh = bh;
bh->sq = (ib_send_t *) rq; /* uplink for completion */
- ib_mem_register(&rq->buflist, TYPE_RECV);
-
- /* expect an ack for this cts */
- post_rr_ack(rq->c, bh);
+#if MEMCACHE_BOUNCEBUF
+ if (reg_recv_buflist.num == 0) {
+ reg_recv_buflist.num = 1;
+ reg_recv_buflist.buf.recv = ®_recv_buflist_buf;
+ reg_recv_buflist.len = ®_recv_buflist_len;
+ reg_recv_buflist.tot_len = reg_recv_buflist_len;
+ reg_recv_buflist_buf = Malloc(reg_recv_buflist_len);
+ memcache_register(®_recv_buflist, BMI_RECV);
+ }
+ if (rq->buflist.tot_len > reg_recv_buflist_len)
+ error("%s: recv prereg buflist too small, need %Ld", __func__,
+ Ld(rq->buflist.tot_len));
+
+ ib_buflist_t save_buflist = rq->buflist;
+ rq->buflist = reg_recv_buflist;
+#else
+# if !MEMCACHE_EARLY_REG
+ memcache_register(&rq->buflist, BMI_RECV);
+# endif
+#endif
mh = bh->buf;
mh->type = MSG_CTS;
@@ -567,10 +586,18 @@ send_cts(ib_recv_t *rq)
for (i=0; i<rq->buflist.num; i++) {
bufp[i] = int64_from_ptr(rq->buflist.buf.recv[i]);
lenp[i] = rq->buflist.len[i];
- keyp[i] = rq->buflist.rkey[i];
+ keyp[i] = rq->buflist.memcache[i]->memkeys.rkey;
}
+ /* expect an ack for this cts; will come after he does the big RDMA write */
+ post_rr_ack(rq->c, bh);
+ /* send the cts */
post_sr(bh, post_len);
+
+#if MEMCACHE_BOUNCEBUF
+ rq->buflist = save_buflist;
+#endif
+
return 0;
}
@@ -587,11 +614,10 @@ post_sr(const buf_head_t *bh, u_int32_t
VAPI_sg_lst_entry_t sg;
VAPI_sr_desc_t sr;
int ret;
- const ib_connection_t *c = bh->c;
- static int num_sr = 0;
+ ib_connection_t *c = bh->c;
debug(2, "%s: %s bh %d len %u wr %d/%d", __func__, c->peername, bh->num,
- len, num_sr, max_outstanding_wr);
+ len, c->num_unsignaled_wr, max_outstanding_wr);
sg.addr = int64_from_ptr(bh->buf);
sg.len = len;
sg.lkey = c->eager_send_lkey;
@@ -599,8 +625,8 @@ post_sr(const buf_head_t *bh, u_int32_t
memset(&sr, 0, sizeof(sr));
sr.opcode = VAPI_SEND;
sr.id = int64_from_ptr(c); /* for error checking if send fails */
- if (++num_sr + 100 == max_outstanding_wr) {
- num_sr = 0;
+ if (++c->num_unsignaled_wr + 100 == max_outstanding_wr) {
+ c->num_unsignaled_wr = 0;
sr.comp_type = VAPI_SIGNALED;
} else
sr.comp_type = VAPI_UNSIGNALED; /* == 1 */
@@ -647,19 +673,18 @@ post_rr(const ib_connection_t *c, buf_he
* at QP build time.
*/
static void
-post_sr_ack(const ib_connection_t *c, const buf_head_t *bh)
+post_sr_ack(ib_connection_t *c, const buf_head_t *bh)
{
VAPI_sr_desc_t sr;
int ret;
- static int num_sr_ack = 0;
debug(2, "%s: %s bh %d wr %d/%d", __func__, c->peername, bh->num,
- num_sr_ack, max_outstanding_wr);
+ c->num_unsignaled_wr_ack, max_outstanding_wr);
memset(&sr, 0, sizeof(sr));
sr.opcode = VAPI_SEND_WITH_IMM;
sr.id = int64_from_ptr(c); /* for error checking if send fails */
- if (++num_sr_ack + 100 == max_outstanding_wr) {
- num_sr_ack = 0;
+ if (++c->num_unsignaled_wr_ack + 100 == max_outstanding_wr) {
+ c->num_unsignaled_wr_ack = 0;
sr.comp_type = VAPI_SIGNALED;
} else
sr.comp_type = VAPI_UNSIGNALED; /* == 1 */
@@ -719,7 +744,28 @@ post_sr_rdmaw(ib_send_t *sq, msg_header_
debug(2, "%s: sq %p totlen %d", __func__, sq, (int) sq->buflist.tot_len);
- ib_mem_register(&sq->buflist, TYPE_SEND);
+#if MEMCACHE_BOUNCEBUF
+ if (reg_send_buflist.num == 0) {
+ reg_send_buflist.num = 1;
+ reg_send_buflist.buf.recv = ®_send_buflist_buf;
+ reg_send_buflist.len = ®_send_buflist_len;
+ reg_send_buflist.tot_len = reg_send_buflist_len;
+ reg_send_buflist_buf = Malloc(reg_send_buflist_len);
+ memcache_register(®_send_buflist, BMI_SEND);
+ }
+ if (sq->buflist.tot_len > reg_send_buflist_len)
+ error("%s: send prereg buflist too small, need %Ld", __func__,
+ Ld(sq->buflist.tot_len));
+ memcpy_from_buflist(&sq->buflist, reg_send_buflist_buf);
+
+ ib_buflist_t save_buflist = sq->buflist;
+ sq->buflist = reg_send_buflist;
+
+#else
+#if !MEMCACHE_EARLY_REG
+ memcache_register(&sq->buflist, BMI_SEND);
+#endif
+#endif
/* constant things for every send */
memset(&sr, 0, sizeof(sr));
@@ -754,7 +800,8 @@ post_sr_rdmaw(ib_send_t *sq, msg_header_
int64_from_ptr(sq->buflist.buf.send[send_index])
+ send_offset;
sg_tmp_array[sr.sg_lst_len].len = this_bytes;
- sg_tmp_array[sr.sg_lst_len].lkey = sq->buflist.lkey[send_index];
+ sg_tmp_array[sr.sg_lst_len].lkey =
+ sq->buflist.memcache[send_index]->memkeys.lkey;
debug(4, "%s: chunk %d local addr %Lx len %d lkey %x",
__func__, sr.sg_lst_len,
@@ -763,7 +810,7 @@ post_sr_rdmaw(ib_send_t *sq, msg_header_
sg_tmp_array[sr.sg_lst_len].lkey);
++sr.sg_lst_len;
- if ((int)sr.sg_lst_len > sg_max_len)
+ if (sr.sg_lst_len > sg_max_len)
error("%s: send buflist len %d bigger than max %d", __func__,
sr.sg_lst_len, sg_max_len);
@@ -793,6 +840,9 @@ post_sr_rdmaw(ib_send_t *sq, msg_header_
if (ret < 0)
error_verrno(ret, "%s: VAPI_post_sr", __func__);
}
+#if MEMCACHE_BOUNCEBUF
+ sq->buflist = save_buflist;
+#endif
}
/*
@@ -828,7 +878,7 @@ generic_post_send(bmi_op_id_t *id, struc
/* alloc and build new sendq structure */
sq = Malloc(sizeof(*sq));
- sq->type = TYPE_SEND;
+ sq->type = BMI_SEND;
sq->state = SQ_WAITING_BUFFER;
/*
@@ -857,9 +907,9 @@ generic_post_send(bmi_op_id_t *id, struc
* to me, but I'll at least check it for accuracy.
*/
if (sq->buflist.tot_len != total_size)
- error("%s: user-provided tot len " FORMAT_BMI_SIZE_T
- " does not match buffer list tot len " FORMAT_BMI_SIZE_T,
- __func__, total_size, sq->buflist.tot_len);
+ error("%s: user-provided tot len %Ld"
+ " does not match buffer list tot len %Ld",
+ __func__, Ld(total_size), Ld(sq->buflist.tot_len));
/* unexpected messages must fit inside an eager message */
if (is_unexpected && sq->buflist.tot_len > EAGER_BUF_PAYLOAD) {
@@ -974,7 +1024,7 @@ generic_post_recv(bmi_op_id_t *id, struc
} else {
/* alloc and build new recvq structure */
rq = Malloc(sizeof(*rq));
- rq->type = TYPE_RECV;
+ rq->type = BMI_RECV;
rq->state = RQ_WAITING_INCOMING;
rq->bmi_tag = tag;
rq->c = c;
@@ -1002,9 +1052,9 @@ generic_post_recv(bmi_op_id_t *id, struc
* to me, but I'll at least check it for accuracy.
*/
if (rq->buflist.tot_len != tot_expected_len)
- error("%s: user-provided tot len " FORMAT_BMI_SIZE_T
- " does not match buffer list tot len " FORMAT_BMI_SIZE_T,
- __func__, tot_expected_len, rq->buflist.tot_len);
+ error("%s: user-provided tot len %Ld"
+ " does not match buffer list tot len %Ld",
+ __func__, Ld(tot_expected_len), Ld(rq->buflist.tot_len));
/* generate identifier used by caller to test for message later */
mop = Malloc(sizeof(*mop));
@@ -1022,9 +1072,8 @@ generic_post_recv(bmi_op_id_t *id, struc
debug(2, "%s: rq %p state %s finish eager directly", __func__,
rq, rq_state_name(rq->state));
if (rq->actual_len > tot_expected_len) {
- error("%s: received " FORMAT_BMI_SIZE_T
- " matches too-small buffer " FORMAT_BMI_SIZE_T,
- __func__, rq->actual_len, rq->buflist.tot_len);
+ error("%s: received %Ld matches too-small buffer %Ld",
+ __func__, Ld(rq->actual_len), Ld(rq->buflist.tot_len));
}
memcpy_to_buflist(&rq->buflist,
@@ -1038,17 +1087,31 @@ generic_post_recv(bmi_op_id_t *id, struc
/* now just wait for user to test, never do "immediate completion" */
rq->state = RQ_EAGER_WAITING_USER_TEST;
+ goto out;
} else if (rq->state == RQ_RTS_WAITING_USER_POST) {
int ret;
debug(2, "%s: rq %p %s send cts", __func__, rq,
rq_state_name(rq->state));
+ /* try to send, or wait for send buffer space */
+ rq->state = RQ_RTS_WAITING_CTS_BUFFER;
+#if MEMCACHE_EARLY_REG
+ memcache_register(&rq->buflist);
+#endif
ret = send_cts(rq);
if (ret == 0)
rq->state = RQ_RTS_WAITING_DATA;
- else
- rq->state = RQ_RTS_WAITING_CTS_BUFFER;
+ goto out;
}
+
+#if MEMCACHE_EARLY_REG
+ /* but remember that this might not be used if the other side sends
+ * less than we posted for receive; that's legal */
+ if (rq->buflist.tot_len > EAGER_BUF_PAYLOAD)
+ memcache_register(&rq->buflist);
+#endif
+
+ out:
gen_mutex_unlock(&interface_mutex);
}
@@ -1168,9 +1231,14 @@ test_rq(ib_recv_t *rq, bmi_op_id_t *outi
/* this state needs help, push it (ideally would be triggered
* when the resource is freed... XXX */
} else if (rq->state == RQ_RTS_WAITING_CTS_BUFFER) {
+ int ret;
debug(2, "%s: rq %p %s, encouraging", __func__, rq,
rq_state_name(rq->state));
- encourage_recv_to_send_cts(rq);
+ ret = send_cts(rq);
+ if (ret == 0)
+ rq->state = RQ_RTS_WAITING_DATA;
+ /* else keep waiting until we can send that cts */
+ debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state));
} else if (rq->state == RQ_CANCELLED && complete) {
debug(2, "%s: rq %p cancelled", __func__, rq);
*err = -PVFS_ETIMEDOUT;
@@ -1211,7 +1279,7 @@ BMI_ib_test(bmi_op_id_t id, int *outcoun
mop = id_gen_safe_lookup(id);
sq = mop->method_data;
n = 0;
- if (sq->type == TYPE_SEND) {
+ if (sq->type == BMI_SEND) {
if (test_sq(sq, &id, err, size, user_ptr, 1))
n = 1;
} else {
@@ -1277,21 +1345,23 @@ BMI_ib_testcontext(int incount, bmi_op_i
gettimeofday(&last_action, 0);
} else if (max_idle_time > 0) {
/*
- * Block for up to max_idle_time to avoid spinning from BMI. Instead
- * of sleeping, watch the accept socket for something new. No way
- * to blockingly poll in standard VAPI.
+ * Block for up to max_idle_time to avoid spinning from BMI. In the
+ * server, instead of sleeping, watch the accept socket for something
+ * new. No way to blockingly poll in standard VAPI.
*/
- struct timeval now;
- gettimeofday(&now, 0);
- now.tv_sec -= last_action.tv_sec;
- if (now.tv_sec == 1) {
- now.tv_usec -= last_action.tv_usec;
- if (now.tv_usec < 0)
- --now.tv_sec;
+ if (listen_sock >= 0) {
+ struct timeval now;
+ gettimeofday(&now, 0);
+ now.tv_sec -= last_action.tv_sec;
+ if (now.tv_sec == 1) {
+ now.tv_usec -= last_action.tv_usec;
+ if (now.tv_usec < 0)
+ --now.tv_sec;
+ }
+ if (now.tv_sec > 0) /* spin for 1 sec following any activity */
+ if (ib_tcp_server_block_new_connections(max_idle_time))
+ gettimeofday(&last_action, 0);
}
- if (now.tv_sec > 0) /* spin for 1 sec following any activity */
- if (ib_tcp_server_block_new_connections(max_idle_time))
- gettimeofday(&last_action, 0);
}
gen_mutex_unlock(&interface_mutex);
return 0;
@@ -1322,10 +1392,10 @@ BMI_ib_testunexpected(int incount __unus
debug(2, "%s: found waiting testunexpected", __func__);
ui->error_code = 0;
ui->addr = rq->c->remote_map; /* hand back permanent method_addr */
- ui->buffer = Malloc(rq->actual_len);
+ ui->buffer = Malloc((unsigned long) rq->actual_len);
ui->size = rq->actual_len;
memcpy(ui->buffer, (char *) rq->bh->buf + sizeof(msg_header_t),
- ui->size);
+ (size_t) ui->size);
ui->tag = rq->bmi_tag;
/* re-post the buffer in which it was sitting, just unexpecteds */
post_rr(rq->c, rq->bh);
@@ -1348,25 +1418,6 @@ BMI_ib_testunexpected(int incount __unus
}
/*
- * Do not care about memory allocation. Send/recv functions will pin as
- * necessary.
- */
-static void *
-BMI_ib_memalloc(bmi_size_t size,
- enum bmi_op_type send_recv __unused)
-{
- return malloc((size_t) size);
-}
-
-static int
-BMI_ib_memfree(void *buf, bmi_size_t size __unused,
- enum bmi_op_type send_recv __unused)
-{
- free(buf);
- return 0;
-}
-
-/*
* No need to track these internally. Just search the entire queue.
*/
static int
@@ -1396,7 +1447,7 @@ BMI_ib_cancel(bmi_op_id_t id, bmi_contex
check_cq();
mop = id_gen_safe_lookup(id);
tsq = mop->method_data;
- if (tsq->type == TYPE_SEND) {
+ if (tsq->type == BMI_SEND) {
/*
* Cancelling completed operations is fine, they will be
* tested later. Any others trigger full shutdown of the
@@ -1426,16 +1477,33 @@ BMI_ib_cancel(bmi_op_id_t id, bmi_contex
qlist_for_each(l, &sendq) {
ib_send_t *sq = qlist_upcast(l);
if (sq->c != c) continue;
+#if !MEMCACHE_BOUNCEBUF
if (sq->state == SQ_WAITING_DATA_LOCAL_SEND_COMPLETE)
- ib_mem_deregister(&sq->buflist);
+ memcache_deregister(&sq->buflist);
+# if MEMCACHE_EARLY_REG
+ /* pin when sending rts, so also must dereg in this state */
+ if (sq->state == SQ_WAITING_CTS)
+ memcache_deregister(&sq->buflist);
+# endif
+#endif
if (sq->state != SQ_WAITING_USER_TEST)
sq->state = SQ_CANCELLED;
}
qlist_for_each(l, &recvq) {
ib_recv_t *rq = qlist_upcast(l);
if (rq->c != c) continue;
+#if !MEMCACHE_BOUNCEBUF
if (rq->state == RQ_RTS_WAITING_DATA)
- ib_mem_deregister(&rq->buflist);
+ memcache_deregister(&rq->buflist);
+# if MEMCACHE_EARLY_REG
+ /* pin on post, dereg all these */
+ if (rq->state == RQ_RTS_WAITING_CTS_BUFFER)
+ memcache_deregister(&rq->buflist);
+ if (rq->state == RQ_WAITING_INCOMING
+ && rq->buflist.tot_len > EAGER_BUF_PAYLOAD)
+ memcache_deregister(&rq->buflist);
+# endif
+#endif
if (!(rq->state == RQ_EAGER_WAITING_USER_TEST
|| rq->state == RQ_RTS_WAITING_USER_TEST))
rq->state = RQ_CANCELLED;
@@ -1468,12 +1536,41 @@ maybe_free_connection(ib_connection_t *c
ib_close_connection(c);
}
+#if MEMCACHE_BOUNCEBUF
+static int
+del_bouncebuf_then_BMI_ib_finalize(void)
+{
+ if (reg_send_buflist.num > 0) {
+ memcache_deregister(®_send_buflist);
+ reg_send_buflist.num = 0;
+ free(reg_send_buflist_buf);
+ }
+ if (reg_recv_buflist.num > 0) {
+ memcache_deregister(®_recv_buflist);
+ reg_recv_buflist.num = 0;
+ free(reg_recv_buflist_buf);
+ }
+ return BMI_ib_finalize();
+}
+#endif
+
+static const char *
+BMI_ib_rev_lookup(struct method_addr *meth)
+{
+ ib_method_addr_t *ibmap = meth->method_data;
+ return ibmap->c->peername;
+}
+
/* exported method interface */
struct bmi_method_ops bmi_ib_ops =
{
.method_name = "bmi_ib",
.BMI_meth_initialize = BMI_ib_initialize,
+#if MEMCACHE_BOUNCEBUF
+ .BMI_meth_finalize = del_bouncebuf_then_BMI_ib_finalize,
+#else
.BMI_meth_finalize = BMI_ib_finalize,
+#endif
.BMI_meth_set_info = BMI_ib_set_info,
.BMI_meth_get_info = BMI_ib_get_info,
.BMI_meth_memalloc = BMI_ib_memalloc,
@@ -1492,6 +1589,6 @@ struct bmi_method_ops bmi_ib_ops =
.BMI_meth_open_context = BMI_ib_open_context,
.BMI_meth_close_context = BMI_ib_close_context,
.BMI_meth_cancel = BMI_ib_cancel,
- .BMI_meth_rev_lookup_unexpected = 0, /* unimplemented */
+ .BMI_meth_rev_lookup_unexpected = BMI_ib_rev_lookup,
};
Index: ib.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.h,v
diff -u -p -u -r1.8 -r1.9
--- ib.h 19 Nov 2004 18:06:07 -0000 1.8
+++ ib.h 3 Nov 2005 21:23:19 -0000 1.9
@@ -1,15 +1,19 @@
/*
* Private header shared by BMI InfiniBand implementation files.
*
- * Copyright (C) 2003-4 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
*
* See COPYING in top-level directory.
*
- * $Id: ib.h,v 1.8 2004/11/19 18:06:07 pw Exp $
+ * $Id: ib.h,v 1.9 2005/11/03 21:23:19 pw Exp $
*/
#ifndef __ib_h
#define __ib_h
+#include <src/io/bmi/bmi-types.h>
+#include <src/common/quicklist/quicklist.h>
+#include <vapi.h>
+
#ifdef __GNUC__
/* # define __hidden __attribute__((visibility("hidden"))) */
# define __hidden /* confuses debugger */
@@ -42,6 +46,8 @@ typedef struct {
VAPI_mr_hndl_t eager_recv_mr;
VAPI_lkey_t eager_send_lkey; /* for post_sr */
VAPI_lkey_t eager_recv_lkey; /* for post_rr */
+ unsigned int num_unsignaled_wr; /* keep track of outstanding WRs */
+ unsigned int num_unsignaled_wr_ack;
/* ib remote params */
IB_lid_t remote_lid;
VAPI_qp_num_t remote_qp_num;
@@ -87,7 +93,6 @@ typedef struct {
typedef enum {
SQ_WAITING_BUFFER=1,
SQ_WAITING_EAGER_ACK,
- SQ_WAITING_RTS_ACK,
SQ_WAITING_CTS,
SQ_WAITING_DATA_LOCAL_SEND_COMPLETE,
SQ_WAITING_USER_TEST,
@@ -121,10 +126,10 @@ typedef struct {
static name_t sq_state_names[] = {
entry(SQ_WAITING_BUFFER),
entry(SQ_WAITING_EAGER_ACK),
- entry(SQ_WAITING_RTS_ACK),
entry(SQ_WAITING_CTS),
entry(SQ_WAITING_DATA_LOCAL_SEND_COMPLETE),
entry(SQ_WAITING_USER_TEST),
+ entry(SQ_CANCELLED),
{ 0, 0 }
};
static name_t rq_state_names[] = {
@@ -136,6 +141,7 @@ static name_t rq_state_names[] = {
entry(RQ_RTS_WAITING_DATA),
entry(RQ_RTS_WAITING_USER_TEST),
entry(RQ_WAITING_INCOMING),
+ entry(RQ_CANCELLED),
{ 0, 0 }
};
static name_t msg_type_names[] = {
@@ -150,9 +156,36 @@ static name_t msg_type_names[] = {
#endif /* __ib_c */
/*
- * This could be generically useful. Instead of passing around three
- * values, use this struct. Must use a union to handle the cast differences
- * between send and receive usage, though.
+ * Fields for memory registration. Both lkey and rkey are always valid,
+ * even if only being used for a BMI_SEND. Permissions managed at app level
+ * not at mem reg level.
+ */
+typedef struct {
+ VAPI_mr_hndl_t mrh;
+ VAPI_lkey_t lkey;
+ VAPI_rkey_t rkey;
+} memkeys_t;
+
+/*
+ * Pin and cache explicitly allocated things to avoid registration
+ * overheads. Two sources of entries here: first, when BMI_memalloc
+ * is used to allocate big enough chunks, the malloced regions are
+ * entered into this list. Second, when a bmi/ib routine needs to pin
+ * memory, it is cached here too. Note that the second case really
+ * needs a dreg-style consistency check against userspace freeing, though.
+ */
+typedef struct {
+ list_t list;
+ void *buf;
+ bmi_size_t len;
+ int count; /* refcount, usage of this entry */
+ memkeys_t memkeys;
+} memcache_entry_t;
+
+/*
+ * This struct describes multiple memory ranges, used in the list send and
+ * recv routines. The memkeys array here will be filled from the individual
+ * memcache_entry memkeys above.
*/
typedef struct {
int num;
@@ -161,11 +194,8 @@ typedef struct {
void *const *recv;
} buf;
const bmi_size_t *len; /* this type chosen to match BMI API */
- bmi_size_t tot_len; /* sum_{i=1..num} len[i] */
- /* fields for memory registration */
- VAPI_mr_hndl_t *mr_handle;
- VAPI_lkey_t *lkey;
- VAPI_rkey_t *rkey;
+ bmi_size_t tot_len; /* sum_{i=0..num-1} len[i] */
+ memcache_entry_t **memcache; /* storage managed by memcache_register etc. */
} ib_buflist_t;
/*
@@ -173,11 +203,9 @@ typedef struct {
* ensures reliability, so the message is marked complete immediately
* and removed from the queue.
*/
-#define TYPE_SEND 0
-#define TYPE_RECV 1
typedef struct S_ib_send {
list_t list;
- int type; /* TYPE_SEND */
+ int type; /* BMI_SEND */
/* pointer back to owning method_op (BMI interface) */
struct method_op *mop;
sq_state_t state;
@@ -202,7 +230,7 @@ typedef struct S_ib_send {
*/
typedef struct {
list_t list;
- int type; /* TYPE_RECV */
+ int type; /* BMI_RECV */
/* pointer back to owning method_op (BMI interface) */
struct method_op *mop;
rq_state_t state;
@@ -259,7 +287,7 @@ typedef struct {
#define MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE (8 + 4 + 4)
/*
- * Internal functions in setup.c used by ib.c
+ * Internal functions in setup.c used by ib.c.
*/
extern void ib_close_connection(ib_connection_t *c);
extern void close_connection_drain_qp(VAPI_qp_hndl_t qp);
@@ -267,11 +295,11 @@ extern void ib_tcp_client_connect(ib_met
struct method_addr *remote_map);
extern int ib_tcp_server_check_new_connections(void);
extern int ib_tcp_server_block_new_connections(int timeout_ms);
-extern void ib_mem_register(ib_buflist_t *buflist, int send_or_recv_type);
-extern void ib_mem_deregister(ib_buflist_t *buflist);
+extern void ib_mem_register(memcache_entry_t *c);
+extern void ib_mem_deregister(memcache_entry_t *c);
/*
- * Method functions in setup.c
+ * Method functions in setup.c.
*/
extern int BMI_ib_initialize(struct method_addr *listen_addr, int method_id,
int init_flags);
@@ -283,12 +311,12 @@ extern int BMI_ib_get_info(int option, v
extern int BMI_ib_set_info(int option, void *param);
/*
- * Internal functions in ib.c used by setup.c
+ * Internal functions in ib.c used by setup.c.
*/
extern void post_rr(const ib_connection_t *c, buf_head_t *bh);
/*
- * Internal functions in util.c
+ * Internal functions in util.c.
*/
void error(const char *fmt, ...) __attribute__((noreturn,format(printf,1,2)));
void error_errno(const char *fmt, ...)
@@ -298,16 +326,17 @@ void error_xerrno(int errnum, const char
void error_verrno(int ecode, const char *fmt, ...)
__attribute__((noreturn,format(printf,2,3)));
void warning(const char *fmt, ...) __attribute__((format(printf,1,2)));
+void warning_errno(const char *fmt, ...) __attribute__((format(printf,1,2)));
void info(const char *fmt, ...) __attribute__((format(printf,1,2)));
-extern void *Malloc(unsigned int n) __attribute__((malloc));
+extern void *Malloc(unsigned long n) __attribute__((malloc));
extern u_int64_t swab64(u_int64_t x);
extern void *qlist_del_head(struct qlist_head *list);
extern void *qlist_try_del_head(struct qlist_head *list);
/* convenient to assign this to the owning type */
#define qlist_upcast(l) ((void *)(l))
-extern const char *sq_state_name(int num);
-extern const char *rq_state_name(int num);
-extern const char *msg_type_name(int num);
+extern const char *sq_state_name(sq_state_t num);
+extern const char *rq_state_name(rq_state_t num);
+extern const char *msg_type_name(msg_type_t num);
extern void memcpy_to_buflist(ib_buflist_t *buflist, const void *buf,
bmi_size_t len);
extern void memcpy_from_buflist(ib_buflist_t *buflist, void *buf);
@@ -315,6 +344,16 @@ extern int read_full(int fd, void *buf,
extern int write_full(int fd, const void *buf, size_t count);
/*
+ * Memory allocation and caching internal functions, in mem.c.
+ */
+extern void *BMI_ib_memalloc(bmi_size_t size, enum bmi_op_type send_recv);
+extern int BMI_ib_memfree(void *buf, bmi_size_t size,
+ enum bmi_op_type send_recv);
+extern void memcache_register(ib_buflist_t *buflist);
+extern void memcache_deregister(ib_buflist_t *buflist);
+extern void memcache_shutdown(void);
+
+/*
* Shared variables, space allocated in ib.c.
*/
extern VAPI_hca_hndl_t nic_handle; /* NIC reference */
@@ -323,12 +362,19 @@ extern int listen_sock; /* TCP sock on
extern list_t connection; /* list of current connections */
extern list_t sendq;
extern list_t recvq;
+extern list_t memcache;
/*
* Temp array for filling scatter/gather lists to pass to IB functions,
* allocated once at start to max size defined as reported by the qp.
*/
extern VAPI_sg_lst_entry_t *sg_tmp_array;
-extern int sg_max_len;
+extern unsigned int sg_max_len;
+/*
+ * Maximum number of outstanding work requests in the NIC, same for both
+ * SQ and RQ, though we only care about SQ on the QP and ack QP. Used to
+ * decide when to use a SIGNALED completion on a send to avoid WQE buildup.
+ */
+extern unsigned int max_outstanding_wr;
/*
* Both eager and bounce buffers are the same size, and same number, since
* there is a symmetry of how many can be in use at the same time.
@@ -337,18 +383,9 @@ extern int sg_max_len;
* sits in the buffer where it was received until the user posts or tests
* for it.
*/
-static const bmi_size_t EAGER_BUF_NUM = 20;
-static const bmi_size_t EAGER_BUF_SIZE = 8 << 10; /* 8 kB */
+static const int EAGER_BUF_NUM = 20;
+static const unsigned long EAGER_BUF_SIZE = 8 << 10; /* 8 kB */
extern bmi_size_t EAGER_BUF_PAYLOAD;
-/*
- * Maximum number of outstanding work requests in the NIC, same for both
- * SQ and RQ, though we only care about SQ on the ack QP. Used to decide
- * when to use a SIGNALED completion on a send to avoid WQE buildup.
- */
-extern int max_outstanding_wr;
-
-#define htonq(x) swab64(x)
-#define ntohq(x) swab64(x)
/*
* Handle pointer to 64-bit integer conversions. On 32-bit architectures
@@ -359,39 +396,36 @@ extern int max_outstanding_wr;
#define int64_from_ptr(p) (u_int64_t)(unsigned long)(p)
/*
+ * Tell the compiler we really do not expect this to happen.
+ */
+#if defined(__GNUC_MINOR__) && (__GNUC_MINOR__ < 96)
+# define __builtin_expect(x, v) (x)
+#endif
+#define likely(x) __builtin_expect(!!(x), 1)
+#define unlikely(x) __builtin_expect(!!(x), 0)
+
+/*
* Debugging macros.
*/
-#if 1
-#define DEBUG_LEVEL 2
+#if 0
+#define DEBUG_LEVEL 4
#define debug(lvl,fmt,args...) \
do { \
if (lvl <= DEBUG_LEVEL) \
info(fmt,##args); \
} while (0)
+#else
+# define debug(lvl,fmt,...) do { } while (0)
+#endif
+
+#if 0
#define assert(cond,fmt,args...) \
do { \
- if (__builtin_expect(!(cond),0)) \
+ if (unlikely(!(cond))) \
error(fmt,##args); \
} while (0)
-#else /* no debug version */
-# define debug(lvl,cond,fmt,...)
-# define assert(cond,fmt,...)
-#endif
-
-/*
- * Formats for printing. Should be somewhere more generic, and also
- * architecture-specific.
- */
-#define FORMAT_BMI_SIZE_T "%Ld"
-#define FORMAT_U_INT64_T "%Ld"
-
-/*
- * Tell the compiler we really do not expect this to happen.
- */
-#if defined(__GNUC_MINOR__) && (__GNUC_MINOR__ < 96)
-# define __builtin_expect(x, v) (x)
+#else
+# define assert(cond,fmt,...) do { } while (0)
#endif
-#define likely(x) __builtin_expect(!!(x), 1)
-#define unlikely(x) __builtin_expect(!!(x), 0)
#endif /* __ib_h */
Index: module.mk.in
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/module.mk.in,v
diff -u -p -u -r1.5 -r1.6
--- module.mk.in 7 Mar 2004 02:14:57 -0000 1.5
+++ module.mk.in 3 Nov 2005 21:23:19 -0000 1.6
@@ -1,82 +1,55 @@
#
# Makefile stub for bmi_ib.
#
-# Copyright (C) 2003-4 Pete Wyckoff <pw at osc.edu>
+# Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
#
# See COPYING in top-level directory.
#
-# $Id: module.mk.in,v 1.5 2004/03/07 02:14:57 pw Exp $
+# $Id: module.mk.in,v 1.6 2005/11/03 21:23:19 pw Exp $
#
# only do any of this if configure decided to use IB
ifdef BUILD_IB
-# local definitions
-bmi_ib_subdir := src/io/bmi/bmi_ib
-bmi_ib_src := $(patsubst %,$(bmi_ib_subdir)/%,ib.c setup.c util.c)
-bmi_ib_obj := $(bmi_ib_src:.c=.o)
-bmi_ib_picobj := $(bmi_ib_src:.c=.po)
-bmi_ib_server_obj := $(bmi_ib_src:.c=-server.o)
-bmi_ib_dep := $(bmi_ib_src:.c=.d)
-
-# export these to the top Makefile to tell it what to build
-LIBSRC += $(bmi_ib_src)
-SERVERSRC += $(bmi_ib_src)
-
-# minimal cflags, removing the million include paths auto-generated from MODULES
-bmi_ib_bad_mod_inc := $(patsubst %, -I $(srcdir)/%, $(MODULES))
-bmi_ib_cflags := $(patsubst $(bmi_ib_bad_mod_inc),,$(CFLAGS))
-# Now add back include directories required by included files which include
-# other files without explicit paths.
-bmi_ib_cflags += -I$(srcdir)/src/common/quicklist
-# And finally the IB_HOME defined from configure and some flags.
-bmi_ib_cflags += -I at IB_HOME@/include
-bmi_ib_cflags += -fno-common
-bmi_ib_cflags += -Werror -Wall -W -Wpointer-arith
-bmi_ib_cflags += -Wcast-align -Wcast-qual -Wbad-function-cast
-bmi_ib_cflags += -Wmissing-prototypes -Wmissing-declarations
-bmi_ib_cflags += -Wnested-externs
-bmi_ib_cflags += -Wshadow -Wstrict-prototypes -Wredundant-decls
-# Do not use these, because mellanox writes bad code:
-# -Wundef -Wwrite-strings
-# This one gives too many false positives:
-# -Wconversion
+#
+# Local definitions.
+#
+DIR := src/io/bmi/bmi_ib
+cfiles := ib.c setup.c util.c mem.c
+src := $(patsubst %,$(DIR)/%,$(cfiles))
+cflags :=
+
+#
+# Export these to the top Makefile to tell it what to build.
+#
+LIBSRC += $(src)
+SERVERSRC += $(src)
#
# Add extra include paths and warnings just for these IB files.
# No need to expose the rest of the tree to the mess of IB include
# dirs and warnings used here.
#
-$(bmi_ib_obj): %.o: %.c
-ifdef QUIET_COMPILE
- @echo " IBCC $@"
- @$(CC) $(IBCFLAGS) $(LIBCFLAGS) $(bmi_ib_cflags) $< -c -o $@
-else
- $(CC) $(IBCFLAGS) $(LIBCFLAGS) $(bmi_ib_cflags) $< -c -o $@
-endif
-
-$(bmi_ib_picobj): %.po: %.c
-ifdef QUIET_COMPILE
- @echo " IBCCPIC $@"
- @$(CC) $(IBCFLAGS) $(LIBCFLAGS) $(PICFLAGS) $(bmi_ib_cflags) $< -c -o $@
-else
- $(CC) $(IBCFLAGS) $(LIBCFLAGS) $(PICFLAGS) $(bmi_ib_cflags) $< -c -o $@
-endif
-
-$(bmi_ib_server_obj): %-server.o: %.c
-ifdef QUIET_COMPILE
- @echo " IBCC $@"
- @$(CC) $(IBCFLAGS) $(SERVERCFLAGS) $(bmi_ib_cflags) $< -c -o $@
-else
- $(CC) $(IBCFLAGS) $(SERVERCFLAGS) $(bmi_ib_cflags) $< -c -o $@
-endif
-
-$(bmi_ib_dep): %.d: %.c
-ifdef QUIET_COMPILE
- @echo " IBDEP $@"
- @$(srcdir)/maint/depend.sh $(bmi_ib_subdir) $(bmi_ib_cflags) $< > $@
-else
- $(srcdir)/maint/depend.sh $(bmi_ib_subdir) $(bmi_ib_cflags) $< > $@
-endif
+cflags += -I at IB_HOME@/include
+cflags += -fno-common
+cflags += -Werror -Wall -W -Wpointer-arith
+cflags += -Wcast-align -Wcast-qual -Wbad-function-cast
+cflags += -Wmissing-prototypes -Wmissing-declarations
+cflags += -Wnested-externs
+cflags += -Wshadow -Wstrict-prototypes -Wredundant-decls
+cflags += -Wundef -Wwrite-strings
+# these are not otherwise defined and cause warnings
+cflags += -DMAX_TRACE=0 -DMAX_ERROR=0 -DMAX_DEBUG=0
+
+#
+# One prototype in the mellanox includes specifies passing a u8 but that's
+# not possible so this always throws a warning:
+# -Wconversion
+#
+
+#
+# Tell the toplevel make about the extra cflags.
+#
+MODCFLAGS_$(DIR) := $(cflags)
endif # BUILD_IB
Index: setup.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/setup.c,v
diff -u -p -u -r1.15 -r1.16
--- setup.c 19 Nov 2004 18:06:07 -0000 1.15
+++ setup.c 3 Nov 2005 21:23:19 -0000 1.16
@@ -2,27 +2,22 @@
* InfiniBand BMI method initialization and other out-of-line
* boring stuff.
*
- * Copyright (C) 2003-4 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
*
* See COPYING in top-level directory.
*
- * $Id: setup.c,v 1.15 2004/11/19 18:06:07 pw Exp $
+ * $Id: setup.c,v 1.16 2005/11/03 21:23:19 pw Exp $
*/
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
-#include <sys/socket.h>
+#include <malloc.h>
#include <sys/poll.h>
#include <netinet/in.h> /* ntohs et al */
#include <arpa/inet.h> /* inet_ntoa */
#include <netdb.h> /* gethostbyname */
-#include <src/common/quicklist/quicklist.h>
-#include <src/io/bmi/bmi-method-support.h>
#include <src/io/bmi/bmi-method-callback.h>
-/* ib includes */
-#include <vapi.h>
#include <vapi_common.h> /* VAPI_event_(record|syndrome)_sym */
-#include <evapi.h>
#ifdef HAVE_IB_WRAP_COMMON_H
#include <wrap_common.h> /* reinit_mosal externs */
#endif
@@ -54,9 +49,10 @@ static VAPI_pd_hndl_t nic_pd; /* single
static EVAPI_async_handler_hndl_t nic_async_event_handler;
static int async_event_handler_waiting_drain = 0;
-static void exchange_connection_data(ib_connection_t *c, int s, int is_server);
+static void verify_prop_caps(VAPI_qp_cap_t *cap);
+static int exchange_connection_data(ib_connection_t *c, int s, int is_server);
static void init_connection_modify_qp(VAPI_qp_hndl_t qp,
- VAPI_qp_num_t remote_qp_num, IB_lid_t remote_lid, int s, int is_server);
+ VAPI_qp_num_t remote_qp_num, int remote_lid);
/*
* Build new conneciton.
@@ -119,7 +115,7 @@ ib_new_connection(int s, const char *pee
error_verrno(ret, "%s: register_mr bounce", __func__);
c->eager_send_lkey = mr_out.l_key;
- /* build qp */
+ /* common qp properites */
qp_init_attr.cap.max_oust_wr_sq = 5000; /* outstanding WQEs */
qp_init_attr.cap.max_oust_wr_rq = 5000;
qp_init_attr.cap.max_sg_size_sq = 40; /* scatter/gather entries */
@@ -133,51 +129,43 @@ ib_new_connection(int s, const char *pee
qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR;
qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR;
qp_init_attr.ts_type = VAPI_TS_RC;
+
+ /* build main qp */
ret = VAPI_create_qp(nic_handle, &qp_init_attr, &c->qp, &prop);
if (ret < 0)
error_verrno(ret, "%s: create QP", __func__);
c->qp_num = prop.qp_num;
-
- if (sg_max_len == 0) {
- sg_max_len = prop.cap.max_sg_size_sq;
- if ((int)prop.cap.max_sg_size_rq < sg_max_len)
- sg_max_len = prop.cap.max_sg_size_rq;
- sg_tmp_array = Malloc(sg_max_len * sizeof(*sg_tmp_array));
- } else {
- if ((int)prop.cap.max_sg_size_sq < sg_max_len)
- error(
- "%s: new connection has smaller send scatter/gather array size,"
- " %d vs %d", __func__, prop.cap.max_sg_size_sq, sg_max_len);
- if ((int)prop.cap.max_sg_size_rq < sg_max_len)
- error(
- "%s: new connection has smaller recv scatter/gather array size,"
- " %d vs %d", __func__, prop.cap.max_sg_size_rq, sg_max_len);
- }
+ verify_prop_caps(&prop.cap);
/* and qp ack */
ret = VAPI_create_qp(nic_handle, &qp_init_attr, &c->qp_ack, &prop);
if (ret < 0)
error_verrno(ret, "%s: create QP ack", __func__);
c->qp_ack_num = prop.qp_num;
- if ((int)prop.cap.max_sg_size_sq < sg_max_len)
- error(
- "%s: new ack connection has smaller send scatter/gather array size,"
- " %d vs %d", __func__, prop.cap.max_sg_size_sq, sg_max_len);
- if ((int)prop.cap.max_sg_size_rq < sg_max_len)
- error(
- "%s: new ack connection has smaller recv scatter/gather array size,"
- " %d vs %d", __func__, prop.cap.max_sg_size_rq, sg_max_len);
-
- /* remember this for post_sr_ack */
- max_outstanding_wr = prop.cap.max_oust_wr_sq;
-
- exchange_connection_data(c, s, is_server);
-
- init_connection_modify_qp(c->qp, c->remote_qp_num,
- c->remote_lid, s, is_server);
- init_connection_modify_qp(c->qp_ack, c->remote_qp_ack_num,
- c->remote_lid, s, is_server);
+ verify_prop_caps(&prop.cap);
+
+ /* initialize for post_sr and post_sr_ack */
+ c->num_unsignaled_wr = 0;
+ c->num_unsignaled_wr_ack = 0;
+ /* put it on the list */
+ qlist_add(&c->list, &connection);
+
+ /* other vars */
+ c->remote_map = 0;
+ c->cancelled = 0;
+
+ /* talk with the peer to get his lid and QP nums */
+ if (exchange_connection_data(c, s, is_server) != 0) {
+ ret = 1;
+ goto out;
+ }
+
+ /* bring the two QPs up to RTR */
+ init_connection_modify_qp(c->qp, c->remote_qp_num, c->remote_lid);
+ init_connection_modify_qp(c->qp_ack, c->remote_qp_ack_num, c->remote_lid);
+
+ /* post initial RRs */
for (i=0; i<EAGER_BUF_NUM; i++)
post_rr(c, &c->eager_recv_buf_head_contig[i]);
@@ -186,30 +174,77 @@ ib_new_connection(int s, const char *pee
int x;
if (i ^ is_server) {
ret = read_full(s, &x, sizeof(x));
- if (ret < 0)
- error_errno("%s: read rr post synch", __func__);
- if (ret != sizeof(x))
- error("%s: partial read of rr post synch, %d / %d", __func__,
+ if (ret < 0) {
+ ret = 1;
+ warning_errno("%s: read rr post synch", __func__);
+ goto out;
+ }
+ if (ret != sizeof(x)) {
+ ret = 1;
+ warning("%s: partial read of rr post synch, %d / %d", __func__,
ret, sizeof(x));
+ goto out;
+ }
} else {
ret = write_full(s, &x, sizeof(x));
- if (ret < 0)
- error_errno("%s: write rr post synch", __func__);
+ if (ret < 0) {
+ ret = 1;
+ warning_errno("%s: write rr post synch", __func__);
+ goto out;
+ }
}
}
- /* done, put it on the list */
- c->remote_map = 0;
- c->cancelled = 0;
- qlist_add(&c->list, &connection);
+ ret = 0;
+
+ out:
+ if (ret != 0) {
+ /* XXX: any way to unpost the RRs first? */
+ ib_close_connection(c);
+ c = 0;
+ }
+
return c;
}
/*
+ * If not set, set them. Otherwise verify that none of our assumed global
+ * limits are different for this new connection.
+ */
+static void
+verify_prop_caps(VAPI_qp_cap_t *cap)
+{
+ if (sg_max_len == 0) {
+ sg_max_len = cap->max_sg_size_sq;
+ if (cap->max_sg_size_rq < sg_max_len)
+ sg_max_len = cap->max_sg_size_rq;
+ sg_tmp_array = Malloc(sg_max_len * sizeof(*sg_tmp_array));
+ } else {
+ if (cap->max_sg_size_sq < sg_max_len)
+ error(
+ "%s: new connection has smaller send scatter/gather array size,"
+ " %d vs %d", __func__, cap->max_sg_size_sq, sg_max_len);
+ if (cap->max_sg_size_rq < sg_max_len)
+ error(
+ "%s: new connection has smaller recv scatter/gather array size,"
+ " %d vs %d", __func__, cap->max_sg_size_rq, sg_max_len);
+ }
+
+ if (max_outstanding_wr == 0) {
+ max_outstanding_wr = cap->max_oust_wr_sq;
+ } else {
+ if (cap->max_oust_wr_sq < max_outstanding_wr)
+ error(
+ "%s: new connection has smaller max_oust_wr_sq size, %d vs %d",
+ __func__, cap->max_oust_wr_sq, max_outstanding_wr);
+ }
+}
+
+/*
* Over TCP, share information about the connection needed to transition
* the IB link to active.
*/
-static void
+static int
exchange_connection_data(ib_connection_t *c, int s, int is_server)
{
/*
@@ -237,11 +272,17 @@ exchange_connection_data(ib_connection_t
if (i ^ is_server) {
ret = read_full(s, &connection_handshake,
sizeof(connection_handshake));
- if (ret < 0)
- error_errno("%s: read", __func__);
- if (ret != sizeof(connection_handshake))
- error("%s: partial read, %d / %d", __func__, ret,
+ if (ret < 0) {
+ ret = 1;
+ warning_errno("%s: read", __func__);
+ goto out;
+ }
+ if (ret != sizeof(connection_handshake)) {
+ ret = 1;
+ warning("%s: partial read, %d / %d", __func__, ret,
sizeof(connection_handshake));
+ goto out;
+ }
c->remote_lid = ntohs(connection_handshake.lid);
c->remote_qp_num = ntohl(connection_handshake.qp_num);
c->remote_qp_ack_num = ntohl(connection_handshake.qp_ack_num);
@@ -251,10 +292,17 @@ exchange_connection_data(ib_connection_t
connection_handshake.qp_ack_num = htonl(c->qp_ack_num);
ret = write_full(s, &connection_handshake,
sizeof(connection_handshake));
- if (ret < 0)
- error_errno("%s: write", __func__);
+ if (ret < 0) {
+ ret = 1;
+ warning_errno("%s: write", __func__);
+ goto out;
+ }
}
}
+
+ ret = 0;
+ out:
+ return ret;
}
@@ -263,9 +311,9 @@ exchange_connection_data(ib_connection_t
*/
static void
init_connection_modify_qp(VAPI_qp_hndl_t qp, VAPI_qp_num_t remote_qp_num,
- IB_lid_t remote_lid, int s, int is_server)
+ int remote_lid)
{
- int i, ret;
+ int ret;
VAPI_qp_attr_t attr;
VAPI_qp_attr_mask_t mask;
VAPI_qp_cap_t cap;
@@ -308,26 +356,6 @@ init_connection_modify_qp(VAPI_qp_hndl_t
if (ret < 0)
error_verrno(ret, "%s: VAPI_modify_qp INIT -> RTR", __func__);
- /* syncronize both in RTR before going RTS */
- for (i=0; i<2; i++) {
- int x;
- if (i ^ is_server) {
- ret = read(s, &x, sizeof(x));
- if (ret < 0)
- error_errno("%s: read rtr synch", __func__);
- if (ret != sizeof(x))
- error("%s: partial read of rtr synch, %d / %d", __func__,
- ret, sizeof(x));
- } else {
- ret = write(s, &x, sizeof(x));
- if (ret < 0)
- error_errno("%s: write rtr synch", __func__);
- if (ret != sizeof(x))
- error("%s: partial write of rtr synch, %d / %d", __func__,
- ret, sizeof(x));
- }
- }
-
/* transition qp to ready-to-send */
QP_ATTR_MASK_CLR_ALL(mask);
QP_ATTR_MASK_SET(mask,
@@ -394,6 +422,10 @@ ib_drain_connection(ib_connection_t *c)
{
buf_head_t *bh;
+ /* already drained */
+ if (c->cancelled)
+ return;
+
bh = qlist_try_del_head(&c->eager_send_buf_free);
if (bh) {
/* if no messages available, let garbage collection on server deal */
@@ -469,7 +501,7 @@ ib_alloc_method_addr(ib_connection_t *c,
struct method_addr *map;
ib_method_addr_t *ibmap;
- map = alloc_method_addr(bmi_ib_method_id, sizeof(*ibmap));
+ map = alloc_method_addr(bmi_ib_method_id, (bmi_size_t) sizeof(*ibmap));
ibmap = map->method_data;
ibmap->c = c;
ibmap->hostname = hostname;
@@ -502,8 +534,8 @@ BMI_ib_method_addr_lookup(const char *id
error("%s: no ':' found", __func__);
/* copy to permanent storage */
- hostname = Malloc(cp - s + 1);
- strncpy(hostname, s, cp-s);
+ hostname = Malloc((unsigned long) (cp - s + 1));
+ strncpy(hostname, s, (size_t) (cp-s));
hostname[cp-s] = '\0';
/* strip /filesystem */
@@ -560,7 +592,7 @@ ib_tcp_client_connect(ib_method_addr_t *
error_errno("%s: cannot resolve server %s", __func__, ibmap->hostname);
memset(&skin, 0, sizeof(skin));
skin.sin_family = hp->h_addrtype;
- memcpy(&skin.sin_addr, hp->h_addr_list[0], hp->h_length);
+ memcpy(&skin.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
skin.sin_port = htons(ibmap->port);
sprintf(peername, "%s:%d", ibmap->hostname, ibmap->port);
retry:
@@ -571,6 +603,8 @@ ib_tcp_client_connect(ib_method_addr_t *
error_errno("%s: connect to server %s", __func__, peername);
}
ibmap->c = ib_new_connection(s, peername, 0);
+ if (!ibmap->c)
+ error("%s: ib_new_connection failed", __func__);
ibmap->c->remote_map = remote_map;
if (close(s) < 0)
@@ -638,6 +672,12 @@ ib_tcp_server_check_new_connections(void
sprintf(peername, "%s:%d", hostname, port);
c = ib_new_connection(s, peername, 1);
+ if (!c) {
+ free(hostname);
+ close(s);
+ return 0;
+ }
+
c->remote_map = ib_alloc_method_addr(c, hostname, port);
/* register this address with the method control layer */
ret = bmi_method_addr_reg_callback(c->remote_map);
@@ -842,6 +882,7 @@ BMI_ib_initialize(struct method_addr *li
error_verrno(ret, "%s: EVAPI_set_async_event_handler", __func__);
/* get my lid */
+ /* ignore different-width-prototype warning here, cannot pass u8 */
ret = VAPI_query_hca_port_prop(nic_handle, VAPI_PORT, &nic_port_props);
if (ret < 0)
error_verrno(ret, "%s: VAPI_query_hca_port_prop", __func__);
@@ -891,15 +932,27 @@ BMI_ib_initialize(struct method_addr *li
INIT_QLIST_HEAD(&connection);
INIT_QLIST_HEAD(&sendq);
INIT_QLIST_HEAD(&recvq);
+ INIT_QLIST_HEAD(&memcache);
EAGER_BUF_PAYLOAD = EAGER_BUF_SIZE - sizeof(msg_header_t);
/* will be set on first connection */
sg_tmp_array = 0;
sg_max_len = 0;
+ max_outstanding_wr = 0;
bmi_ib_initialized = 1; /* okay to play with state variables now */
+#if 0
+ /*
+ * XXX: temporary while using registration cache. Perhaps switch to
+ * malloc/free hooks, or better yet, use dreg kernel module.
+ * Think about how this fights with mpich's malloc hooks.
+ */
+ mallopt(M_TRIM_THRESHOLD, -1);
+ mallopt(M_MMAP_MAX, 0);
+#endif
+
debug(0, "%s: done", __func__);
return 0;
}
@@ -935,6 +988,7 @@ BMI_ib_finalize(void)
ret = VAPI_destroy_cq(nic_handle, nic_cq);
if (ret < 0)
error_verrno(ret, "%s: VAPI_destroy_cq", __func__);
+ memcache_shutdown();
ret = VAPI_dealloc_pd(nic_handle, nic_pd);
if (ret < 0)
error_verrno(ret, "%s: VAPI_dealloc_pd", __func__);
@@ -961,63 +1015,33 @@ BMI_ib_finalize(void)
* wuj's clever discontig allocation stuff.
*/
void
-ib_mem_register(ib_buflist_t *buflist, int send_or_recv_type)
+ib_mem_register(memcache_entry_t *c)
{
- int i;
- VAPI_mrw_t mrw;
-
- if (send_or_recv_type == TYPE_SEND) {
- buflist->lkey = Malloc(buflist->num * sizeof(*buflist->lkey));
- buflist->rkey = 0;
- mrw.acl = 0; /* just local read for sender */
- } else {
- buflist->lkey = 0;
- buflist->rkey = Malloc(buflist->num * sizeof(*buflist->rkey));
- /* must turn on local write if want remote write */
- mrw.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
- }
- buflist->mr_handle = Malloc(buflist->num * sizeof(*buflist->mr_handle));
+ VAPI_mrw_t mrw, mrw_out;
+ int ret;
- /* constant across loop */
+ /* always turn on local write and write even if just BMI_SEND */
+ mrw.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
mrw.type = VAPI_MR;
mrw.pd_hndl = nic_pd;
-
- for (i=0; i<buflist->num; i++) {
- VAPI_mrw_t mrw_out;
- int ret;
- mrw.start = int64_from_ptr(buflist->buf.send[i]); /* union */
- mrw.size = buflist->len[i];
- ret = VAPI_register_mr(nic_handle, &mrw, &buflist->mr_handle[i],
- &mrw_out);
- if (ret < 0)
- error_verrno(ret, "%s: VAPI_register_mr %d", __func__, i);
- if (send_or_recv_type == TYPE_SEND)
- buflist->lkey[i] = mrw_out.l_key;
- else
- buflist->rkey[i] = mrw_out.r_key;
- debug(4, "%s: %d addr %Lx size %Ld %s %x", __func__, i, mrw.start,
- mrw.size, send_or_recv_type == TYPE_SEND ? "lkey" : "rkey",
- send_or_recv_type == TYPE_SEND ? mrw_out.l_key : mrw_out.r_key);
- }
+ mrw.start = int64_from_ptr(c->buf);
+ mrw.size = c->len;
+ ret = VAPI_register_mr(nic_handle, &mrw, &c->memkeys.mrh, &mrw_out);
+ if (ret < 0)
+ error_verrno(ret, "%s: VAPI_register_mr", __func__);
+ c->memkeys.lkey = mrw_out.l_key;
+ c->memkeys.rkey = mrw_out.r_key;
+ debug(4, "%s: buf %p len %Ld", __func__, c->buf, c->len);
}
void
-ib_mem_deregister(ib_buflist_t *buflist)
+ib_mem_deregister(memcache_entry_t *c)
{
- int i;
-
- for (i=0; i<buflist->num; i++) {
- int ret = VAPI_deregister_mr(nic_handle, buflist->mr_handle[i]);
- if (ret < 0)
- error_verrno(ret, "%s: VAPI_deregister_mr %d", __func__, i);
- debug(4, "%s: %d addr %Lx size %Ld lkey %x rkey %x", __func__, i,
- int64_from_ptr(buflist->buf.send[i]), buflist->len[i],
- buflist->lkey ? buflist->lkey[i] : 0,
- buflist->rkey ? buflist->rkey[i] : 0);
- }
- free(buflist->mr_handle);
- if (buflist->lkey)
- free(buflist->lkey);
- if (buflist->rkey)
- free(buflist->rkey);
+ int ret;
+
+ ret = VAPI_deregister_mr(nic_handle, c->memkeys.mrh);
+ if (ret < 0)
+ error_verrno(ret, "%s: VAPI_deregister_mr", __func__);
+ debug(4, "%s: buf %p len %Ld lkey %x rkey %x", __func__,
+ c->buf, c->len, c->memkeys.lkey, c->memkeys.rkey);
}
Index: util.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/util.c,v
diff -u -p -u -r1.3 -r1.4
--- util.c 30 Jan 2004 20:12:12 -0000 1.3
+++ util.c 3 Nov 2005 21:23:19 -0000 1.4
@@ -1,21 +1,18 @@
/*
* InfiniBand BMI handy utilities that are not really core functions.
*
- * Copyright (C) 2003 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2003-5 Pete Wyckoff <pw at osc.edu>
*
* See COPYING in top-level directory.
*
- * $Id: util.c,v 1.3 2004/01/30 20:12:12 neill Exp $
+ * $Id: util.c,v 1.4 2005/11/03 21:23:19 pw Exp $
*/
#include <stdio.h>
#include <stdarg.h>
#include <errno.h>
#include <unistd.h>
-#include <src/common/quicklist/quicklist.h>
#include <src/common/gossip/gossip.h>
-#include <src/io/bmi/bmi-method-support.h>
-#include <vapi.h>
-#include <vapi_common.h>
+#include <vapi_common.h> /* VAPI_strerror */
#define __util_c
#include "ib.h"
@@ -32,6 +29,9 @@ error(const char *fmt, ...)
vsprintf(s, fmt, ap);
va_end(ap);
gossip_err("Error: %s.\n", s);
+#ifdef GOSSIP_ENABLE_BACKTRACE
+ gossip_backtrace();
+#endif
exit(1);
}
@@ -87,6 +87,18 @@ warning(const char *fmt, ...)
}
void __attribute__((format(printf,1,2))) __hidden
+warning_errno(const char *fmt, ...)
+{
+ char s[2048];
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsprintf(s, fmt, ap);
+ va_end(ap);
+ gossip_err("Warning: %s: %s.\n", s, strerror(errno));
+}
+
+void __attribute__((format(printf,1,2))) __hidden
info(const char *fmt, ...)
{
char s[2048];
@@ -99,7 +111,7 @@ info(const char *fmt, ...)
}
void * __attribute__((malloc)) __hidden
-Malloc(unsigned int n)
+Malloc(unsigned long n)
{
char *x;
@@ -107,28 +119,11 @@ Malloc(unsigned int n)
error("%s: alloc 0 bytes", __func__);
x = malloc(n);
if (!x)
- error("%s: malloc %d bytes failed", __func__, n);
+ error("%s: malloc %ld bytes failed", __func__, n);
return x;
}
/*
- * Next step in ntoh[sl].
- */
-u_int64_t __hidden
-swab64(u_int64_t x)
-{
- return (u_int64_t)
- ((u_int64_t)((x & (u_int64_t)0x00000000000000ffULL) << 56)
- | (u_int64_t)((x & (u_int64_t)0x000000000000ff00ULL) << 40)
- | (u_int64_t)((x & (u_int64_t)0x0000000000ff0000ULL) << 24)
- | (u_int64_t)((x & (u_int64_t)0x00000000ff000000ULL) << 8)
- | (u_int64_t)((x & (u_int64_t)0x000000ff00000000ULL) >> 8)
- | (u_int64_t)((x & (u_int64_t)0x0000ff0000000000ULL) >> 24)
- | (u_int64_t)((x & (u_int64_t)0x00ff000000000000ULL) >> 40)
- | (u_int64_t)((x & (u_int64_t)0xff00000000000000ULL) >> 56));
-}
-
-/*
* Grab the first item and delete it from the list.
*/
void * __hidden
@@ -166,19 +161,19 @@ name_lookup(name_t *a, int num)
}
const char *
-sq_state_name(int num)
+sq_state_name(sq_state_t num)
{
- return name_lookup(sq_state_names, num);
+ return name_lookup(sq_state_names, (int) num);
}
const char *
-rq_state_name(int num)
+rq_state_name(rq_state_t num)
{
- return name_lookup(rq_state_names, num);
+ return name_lookup(rq_state_names, (int) num);
}
const char *
-msg_type_name(int num)
+msg_type_name(msg_type_t num)
{
- return name_lookup(msg_type_names, num);
+ return name_lookup(msg_type_names, (int) num);
}
/*
@@ -192,7 +187,7 @@ memcpy_to_buflist(ib_buflist_t *buflist,
const char *cp = buf;
for (i=0; i<buflist->num && len > 0; i++) {
- int bytes = buflist->len[i];
+ size_t bytes = buflist->len[i];
if (bytes > len)
bytes = len;
memcpy(buflist->buf.recv[i], cp, bytes);
@@ -208,7 +203,7 @@ memcpy_from_buflist(ib_buflist_t *buflis
char *cp = buf;
for (i=0; i<buflist->num; i++) {
- memcpy(cp, buflist->buf.send[i], buflist->len[i]);
+ memcpy(cp, buflist->buf.send[i], (size_t) buflist->len[i]);
cp += buflist->len[i];
}
}
More information about the PVFS2-CVS
mailing list