[Pvfs2-cvs] commit by pw in pvfs2/src/io/bmi/bmi_ib: ib.c ib.h
mem.c openib.c util.c vapi.c
CVS commit program
cvs at parl.clemson.edu
Thu Dec 7 16:47:47 EST 2006
Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib
In directory parlweb1:/tmp/cvs-serv3941/src/io/bmi/bmi_ib
Modified Files:
ib.c ib.h mem.c openib.c util.c vapi.c
Log Message:
* ib: cache explict BMI_memalloc registrations; huge latency improvement
* bmi: avoid gettimeofday() to determine poll vs block for multi-method
scenario
* ib: rely on bmi to make poll vs block decision
* ib: only check for new connections and async events during blocking periods
Index: ib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.c,v
diff -u -p -p -u -r1.47 -r1.48
--- ib.c 2 Dec 2006 19:08:41 -0000 1.47
+++ ib.c 7 Dec 2006 21:47:47 -0000 1.48
@@ -6,7 +6,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: ib.c,v 1.47 2006/12/02 19:08:41 pw Exp $
+ * $Id: ib.c,v 1.48 2006/12/07 21:47:47 pw Exp $
*/
#include <stdio.h>
#include <stdlib.h>
@@ -119,6 +119,8 @@ static int ib_check_cq(void)
debug(4, "%s: found something", __func__);
++ret;
if (wc.status != 0) {
+ /* opcode is not necessarily valid; only wr_id, status, qp_num,
+ * and vendor_err can be relied upon */
if (wc.opcode == BMI_IB_OP_SEND) {
debug(0, "%s: entry id 0x%llx SEND error %s", __func__,
llu(wc.id), wc_status_string(wc.status));
@@ -1281,13 +1283,8 @@ static int BMI_ib_testsome(int incount,
}
/*
- * Used by the test functions to block if not much is going on
- * since the timeouts at the BMI job layer are too coarse.
- */
-static struct timeval last_action = { 0, 0 };
-
-/*
* Test for multiple completions matching a particular user context.
+ * Return 0 if okay, >0 if want another poll soon, negative for error.
*/
static int
BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
@@ -1295,17 +1292,18 @@ BMI_ib_testcontext(int incount, bmi_op_i
int max_idle_time, bmi_context_id context_id)
{
struct qlist_head *l, *lnext;
- int n, complete;
- void **up = 0;
+ int n = 0, complete, activity = 0;
+ void **up = NULL;
gen_mutex_lock(&interface_mutex);
- ib_check_cq();
+
+restart:
+ activity += ib_check_cq();
/*
* Walk _all_ entries on sq, rq, marking them completed or
* encouraging them as needed due to resource limitations.
*/
- n = 0;
for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
struct ib_work *sq = qlist_upcast(l);
lnext = l->next;
@@ -1331,37 +1329,18 @@ BMI_ib_testcontext(int incount, bmi_op_i
/* drop lock before blocking on new connections below */
gen_mutex_unlock(&interface_mutex);
- *outcount = n;
-
- if (n > 0) {
- gettimeofday(&last_action, 0); /* remember this action */
- } else if (max_idle_time > 0) {
+ if (activity == 0 && n == 0 && max_idle_time > 0) {
/*
- * Spin for an interval after some activity, then go to a
- * blocking interface by using poll() on the IB completion channel
- * and TCP listen socket.
+ * Block if told to from above.
*/
- struct timeval now;
-
- gettimeofday(&now, 0);
- timersub(&now, &last_action, &now);
-
- /* if time since last activity is > 10ms, block */
- if (now.tv_sec > 0 || now.tv_usec > 10000) {
- /* block */
- debug(8, "%s: last activity too long ago, blocking", __func__);
- n = ib_block_for_activity(max_idle_time);
- if (n)
- gettimeofday(&last_action, 0); /* had some action */
- } else {
- /* whee, spin */
- /* totally helps on Lee's old 2.4.21 machine, but may cause
- * big delays on modern kernels; do not make default */
- /* sched_yield(); */
- ;
- }
+ debug(2, "%s: last activity too long ago, blocking", __func__);
+ activity = ib_block_for_activity(max_idle_time);
+ if (activity == 1) /* IB action, go do it immediately */
+ goto restart;
}
- return 0;
+
+ *outcount = n;
+ return activity + n;
}
/*
@@ -1369,20 +1348,21 @@ BMI_ib_testcontext(int incount, bmi_op_i
* This is also where we check for new connections on the TCP socket, since
* those would show up as unexpected the first time anything is sent.
* Return 0 for success, or -1 for failure; number of things in *outcount.
+ * Return >0 if want another poll soon.
*/
static int
BMI_ib_testunexpected(int incount __unused, int *outcount,
struct method_unexpected_info *ui, int max_idle_time __unused)
{
- int num_action;
struct qlist_head *l;
+ int activity, n;
gen_mutex_lock(&interface_mutex);
/* Check CQ, then look for the first unexpected message. */
- num_action = ib_check_cq();
+ activity = ib_check_cq();
- *outcount = 0;
+ n = 0;
qlist_for_each(l, &ib_device->recvq) {
struct ib_work *rq = qlist_upcast(l);
if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
@@ -1403,7 +1383,7 @@ BMI_ib_testunexpected(int incount __unus
ui->tag = rq->bmi_tag;
/* re-post the buffer in which it was sitting, just unexpecteds */
post_rr(c, rq->bh);
- *outcount = 1;
+ n = 1;
qlist_del(&rq->list);
free(rq);
--c->refcnt;
@@ -1413,18 +1393,11 @@ BMI_ib_testunexpected(int incount __unus
}
}
- /* check for new incoming connections */
- num_action += ib_tcp_server_check_new_connections();
-
- /* look for async events on the IB port */
- num_action += check_async_events();
-
- if (num_action)
- gettimeofday(&last_action, 0);
-
out:
gen_mutex_unlock(&interface_mutex);
- return 0;
+
+ *outcount = n;
+ return activity + n;
}
/*
@@ -1792,7 +1765,8 @@ static void ib_tcp_server_init_listen_so
/*
* Check for new connections. The listening socket is left nonblocking
- * so this test can be quick. Returns >0 if an accept worked.
+ * so this test can be quick; but accept is not really that quick compared
+ * to polling an IB interface, for instance. Returns >0 if an accept worked.
*/
static int ib_tcp_server_check_new_connections(void)
{
@@ -1813,6 +1787,8 @@ static int ib_tcp_server_check_new_conne
int port = ntohs(ssin.sin_port);
sprintf(peername, "%s:%d", hostname, port);
+ gen_mutex_lock(&interface_mutex);
+
c = ib_new_connection(s, peername, 1);
if (!c) {
free(hostname);
@@ -1828,6 +1804,9 @@ static int ib_tcp_server_check_new_conne
debug(2, "%s: accepted new connection %s at server", __func__,
c->peername);
+
+ gen_mutex_unlock(&interface_mutex);
+
if (close(s) < 0)
error_errno("%s: close new sock", __func__);
ret = 1;
@@ -1839,28 +1818,39 @@ static int ib_tcp_server_check_new_conne
* Ask the device to write to its FD if a CQ event happens, and poll on it
* as well as the listen_sock for activity, but do not actually respond to
* anything. A later ib_check_cq will handle CQ events, and a later call to
- * testunexpected will pick up new connections. Returns >0 if something is
- * ready.
+ * testunexpected will pick up new connections. Returns ==1 if IB device is
+ * ready, other >0 for some activity, else 0.
*/
static int ib_block_for_activity(int timeout_ms)
{
- struct pollfd pfd[2];
+ struct pollfd pfd[3]; /* cq fd, async fd, accept socket */
int numfd;
int ret;
- pfd[0].fd = prepare_cq_block();
+ prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
pfd[0].events = POLLIN;
- numfd = 1;
+ pfd[1].events = POLLIN;
+ numfd = 2;
if (ib_device->listen_sock >= 0) {
- pfd[1].fd = ib_device->listen_sock;
- pfd[1].events = POLLIN;
- numfd = 2;
+ pfd[2].fd = ib_device->listen_sock;
+ pfd[2].events = POLLIN;
+ numfd = 3;
}
+
ret = poll(pfd, numfd, timeout_ms);
- debug(4, "%s: ret %d rev0 0x%x", __func__, ret, pfd[0].revents);
+ debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
+ pfd[0].revents, pfd[1].revents);
if (ret > 0) {
- if (pfd[0].revents == POLLIN)
+ if (pfd[0].revents == POLLIN) {
ack_cq_completion_event();
+ return 1;
+ }
+ /* check others only if CQ was empty */
+ ret = 2;
+ if (pfd[1].revents == POLLIN)
+ check_async_events();
+ if (pfd[2].revents == POLLIN)
+ ib_tcp_server_check_new_connections();
} else if (ret < 0) {
if (errno == EINTR) /* normal, ignore but break */
ret = 0;
Index: ib.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.h,v
diff -u -p -p -u -r1.25 -r1.26
--- ib.h 2 Dec 2006 19:08:41 -0000 1.25
+++ ib.h 7 Dec 2006 21:47:47 -0000 1.26
@@ -5,7 +5,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: ib.h,v 1.25 2006/12/02 19:08:41 pw Exp $
+ * $Id: ib.h,v 1.26 2006/12/07 21:47:47 pw Exp $
*/
#ifndef __ib_h
#define __ib_h
@@ -347,7 +347,7 @@ struct ib_device_func {
void (*post_rr)(const ib_connection_t *c, struct buf_head *bh);
void (*post_sr_rdmaw)(struct ib_work *sq, msg_header_cts_t *mh_cts,
void *mh_cts_buf);
- int (*prepare_cq_block)(void);
+ void (*prepare_cq_block)(int *cq_fd, int *async_fd);
void (*ack_cq_completion_event)(void);
int (*check_cq)(struct bmi_ib_wc *wc);
const char *(*wc_status_string)(int status);
@@ -394,6 +394,8 @@ void error_xerrno(int errnum, const char
__attribute__((noreturn,format(printf,2,3)));
void warning(const char *fmt, ...) __attribute__((format(printf,1,2)));
void warning_errno(const char *fmt, ...) __attribute__((format(printf,1,2)));
+void warning_xerrno(int errnum, const char *fmt, ...)
+ __attribute__((format(printf,2,3)));
void info(const char *fmt, ...) __attribute__((format(printf,1,2)));
void *Malloc(unsigned long n) __attribute__((malloc));
void *qlist_del_head(struct qlist_head *list);
Index: mem.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/mem.c,v
diff -u -p -p -u -r1.10 -r1.11
--- mem.c 2 Dec 2006 19:08:41 -0000 1.10
+++ mem.c 7 Dec 2006 21:47:47 -0000 1.11
@@ -5,7 +5,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: mem.c,v 1.10 2006/12/02 19:08:41 pw Exp $
+ * $Id: mem.c,v 1.11 2006/12/07 21:47:47 pw Exp $
*/
#include <src/common/gen-locks/gen-locks.h>
#include "pvfs2-internal.h"
@@ -29,6 +29,7 @@
typedef struct {
struct qlist_head list;
gen_mutex_t mutex;
+ struct qlist_head free_chunk_list;
void (*mem_register)(memcache_entry_t *c);
void (*mem_deregister)(memcache_entry_t *c);
} memcache_device_t;
@@ -107,6 +108,12 @@ memcache_lookup_exact(memcache_device_t
/*
* BMI malloc and free routines. If the region is big enough, pin
* it now to save time later in the actual send or recv routine.
+ * These are only ever called from PVFS internal functions to allocate
+ * buffers, on the server, or on the client for non-user-supplied
+ * buffers.
+ *
+ * Standard sizes will appear frequently, thus do not free them. Use
+ * a separate list sorted by sizes that can be used to reuse one.
*/
void *
memcache_memalloc(void *md, bmi_size_t len, int eager_limit)
@@ -114,7 +121,30 @@ memcache_memalloc(void *md, bmi_size_t l
memcache_device_t *memcache_device = md;
void *buf;
+ debug(4, "%s: len %zd limit %d", __func__, len, eager_limit);
+
+ /* search in size cache first */
+#if ENABLE_MEMCACHE
+ if (len > eager_limit) {
+ memcache_entry_t *c;
+ gen_mutex_lock(&memcache_device->mutex);
+ qlist_for_each_entry(c, &memcache_device->free_chunk_list, list) {
+ if (c->len == len) {
+ debug(4, "%s: recycle free chunk, buf %p", __func__, c->buf);
+ qlist_del(&c->list);
+ qlist_add_tail(&c->list, &memcache_device->list);
+ ++c->count;
+ buf = c->buf;
+ gen_mutex_unlock(&memcache_device->mutex);
+ goto out;
+ }
+ }
+ gen_mutex_unlock(&memcache_device->mutex);
+ }
+#endif
+
buf = malloc(len);
+
#if ENABLE_MEMCACHE
if (bmi_ib_unlikely(!buf))
goto out;
@@ -124,16 +154,19 @@ memcache_memalloc(void *md, bmi_size_t l
gen_mutex_lock(&memcache_device->mutex);
/* could be recycled buffer */
c = memcache_lookup_cover(memcache_device, buf, len);
- if (c)
+ if (c) {
++c->count;
- else {
+ debug(4, "%s: reuse reg, buf %p, count %d", __func__, c->buf,
+ c->count);
+ } else {
c = memcache_add(memcache_device, buf, len);
if (bmi_ib_unlikely(!c)) {
free(buf);
- buf = 0;
+ buf = NULL;
} else {
- (memcache_device->mem_register)(c);
+ memcache_device->mem_register(c);
++c->count;
+ debug(4, "%s: new reg, buf %p", __func__, c->buf);
}
}
gen_mutex_unlock(&memcache_device->mutex);
@@ -146,20 +179,24 @@ memcache_memalloc(void *md, bmi_size_t l
int
memcache_memfree(void *md, void *buf, bmi_size_t len)
{
- memcache_device_t *memcache_device = md;
#if ENABLE_MEMCACHE
+ memcache_device_t *memcache_device = md;
memcache_entry_t *c;
- /* okay if not found, just not cached */
gen_mutex_lock(&memcache_device->mutex);
+ /* okay if not found, just not cached; perhaps an eager-size buffer */
c = memcache_lookup_exact(memcache_device, buf, len);
if (c) {
- debug(6, "%s: found %p len %lld", __func__, c->buf, lld(c->len));
+ debug(4, "%s: cache free buf %p len %lld", __func__, c->buf,
+ lld(c->len));
assert(c->count == 1, "%s: buf %p len %lld count = %d, expected 1",
- __func__, c->buf, lld(c->len), c->count);
- (memcache_device->mem_deregister)(c);
+ __func__, c->buf, lld(c->len), c->count);
+ /* cache it */
+ --c->count;
qlist_del(&c->list);
- free(c);
+ qlist_add(&c->list, &memcache_device->free_chunk_list);
+ gen_mutex_unlock(&memcache_device->mutex);
+ return 0;
}
gen_mutex_unlock(&memcache_device->mutex);
#endif
@@ -197,7 +234,7 @@ memcache_register(void *md, ib_buflist_t
if (!c)
error("%s: no memory for cache entry", __func__);
c->count = 1;
- (memcache_device->mem_register)(c);
+ memcache_device->mem_register(c);
}
buflist->memcache[i] = c;
#else
@@ -205,7 +242,7 @@ memcache_register(void *md, ib_buflist_t
cp->buf = buflist->buf.recv[i];
cp->len = buflist->len[i];
cp->type = type;
- (memcache_device->mem_register)(cp);
+ memcache_device->mem_register(cp);
buflist->memcache[i] = cp;
#endif
}
@@ -234,7 +271,7 @@ void memcache_preregister(void *md, cons
c = memcache_add(memcache_device, (void *)(uintptr_t) buf, len);
if (!c)
error("%s: no memory for cache entry", __func__);
- (memcache_device->mem_register)(c);
+ memcache_device->mem_register(c);
}
gen_mutex_unlock(&memcache_device->mutex);
#endif
@@ -257,7 +294,7 @@ memcache_deregister(void *md, ib_buflist
c->buf, lld(c->len), c->count);
/* let garbage collection do ib_mem_deregister(c) for refcnt==0 */
#else
- (memcache_device->mem_deregister)(buflist->memcache[i]);
+ memcache_device->mem_deregister(buflist->memcache[i]);
free(buflist->memcache[i]);
#endif
}
@@ -276,6 +313,7 @@ void *memcache_init(void (*mem_register)
memcache_device = Malloc(sizeof(*memcache_device));
INIT_QLIST_HEAD(&memcache_device->list);
gen_mutex_init(&memcache_device->mutex);
+ INIT_QLIST_HEAD(&memcache_device->free_chunk_list);
memcache_device->mem_register = mem_register;
memcache_device->mem_deregister = mem_deregister;
return memcache_device;
@@ -286,13 +324,17 @@ void *memcache_init(void (*mem_register)
*/
void memcache_shutdown(void *md)
{
- struct qlist_head *l, *lp;
memcache_device_t *memcache_device = md;
+ memcache_entry_t *c, *cn;
gen_mutex_lock(&memcache_device->mutex);
- qlist_for_each_safe(l, lp, &memcache_device->list) {
- memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
- (memcache_device->mem_deregister)(c);
+ qlist_for_each_entry_safe(c, cn, &memcache_device->list, list) {
+ memcache_device->mem_deregister(c);
+ qlist_del(&c->list);
+ free(c);
+ }
+ qlist_for_each_entry_safe(c, cn, &memcache_device->free_chunk_list, list) {
+ memcache_device->mem_deregister(c);
qlist_del(&c->list);
free(c);
}
Index: openib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/openib.c,v
diff -u -p -p -u -r1.9 -r1.10
--- openib.c 2 Dec 2006 19:08:41 -0000 1.9
+++ openib.c 7 Dec 2006 21:47:47 -0000 1.10
@@ -6,7 +6,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: openib.c,v 1.9 2006/12/02 19:08:41 pw Exp $
+ * $Id: openib.c,v 1.10 2006/12/07 21:47:47 pw Exp $
*/
#include <string.h>
#include <errno.h>
@@ -616,7 +616,7 @@ static int openib_check_cq(struct bmi_ib
return 1;
}
-static int openib_prepare_cq_block(void)
+static void openib_prepare_cq_block(int *cq_fd, int *async_fd)
{
struct openib_device_priv *od = ib_device->priv;
int ret;
@@ -627,7 +627,8 @@ static int openib_prepare_cq_block(void)
error_xerrno(ret, "%s: ibv_req_notify_cq", __func__);
/* return the fd that can be fed to poll() */
- return od->channel->fd;
+ *cq_fd = od->channel->fd;
+ *async_fd = od->ctx->async_fd;
}
/*
Index: util.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/util.c,v
diff -u -p -p -u -r1.10 -r1.11
--- util.c 12 Oct 2006 20:37:28 -0000 1.10
+++ util.c 7 Dec 2006 21:47:47 -0000 1.11
@@ -5,7 +5,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: util.c,v 1.10 2006/10/12 20:37:28 pw Exp $
+ * $Id: util.c,v 1.11 2006/12/07 21:47:47 pw Exp $
*/
#include <stdio.h>
#include <string.h>
@@ -83,6 +83,18 @@ warning_errno(const char *fmt, ...)
vsprintf(s, fmt, ap);
va_end(ap);
gossip_err("Warning: %s: %s.\n", s, strerror(errno));
+}
+
+void __attribute__((format(printf,2,3))) __hidden
+warning_xerrno(int errnum, const char *fmt, ...)
+{
+ char s[2048];
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsprintf(s, fmt, ap);
+ va_end(ap);
+ gossip_err("Warning: %s: %s.\n", s, strerror(errnum));
}
void * __attribute__((malloc)) __hidden
Index: vapi.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/vapi.c,v
diff -u -p -p -u -r1.9 -r1.10
--- vapi.c 2 Dec 2006 19:08:41 -0000 1.9
+++ vapi.c 7 Dec 2006 21:47:47 -0000 1.10
@@ -5,7 +5,7 @@
*
* See COPYING in top-level directory.
*
- * $Id: vapi.c,v 1.9 2006/12/02 19:08:41 pw Exp $
+ * $Id: vapi.c,v 1.10 2006/12/07 21:47:47 pw Exp $
*/
#include <stdio.h>
#include <string.h>
@@ -623,17 +623,19 @@ static int vapi_check_cq(struct bmi_ib_w
return 1;
}
-static int vapi_prepare_cq_block(void)
+static void vapi_prepare_cq_block(int *cq_fd, int *async_fd)
{
struct vapi_device_priv *vd = ib_device->priv;
int ret;
+
/* ask for the next notfication */
ret = VAPI_req_comp_notif(vd->nic_handle, vd->nic_cq, VAPI_NEXT_COMP);
if (ret < 0)
error_verrno(ret, "%s: VAPI_req_comp_notif", __func__);
/* return the fd that can be fed to poll() */
- return vd->cq_event_pipe[0];
+ *cq_fd = vd->cq_event_pipe[0];
+ *async_fd = vd->async_event_pipe[0];
}
/*
More information about the Pvfs2-cvs
mailing list