[Pvfs2-cvs] commit by pw in pvfs2/src/io/bmi/bmi_ib: ib.c ib.h mem.c openib.c util.c vapi.c

CVS commit program cvs at parl.clemson.edu
Thu Dec 7 16:47:47 EST 2006


Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib
In directory parlweb1:/tmp/cvs-serv3941/src/io/bmi/bmi_ib

Modified Files:
	ib.c ib.h mem.c openib.c util.c vapi.c 
Log Message:

* ib: cache explict BMI_memalloc registrations; huge latency improvement
* bmi: avoid gettimeofday() to determine poll vs block for multi-method
    scenario
* ib: rely on bmi to make poll vs block decision
* ib: only check for new connections and async events during blocking periods


Index: ib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.c,v
diff -u -p -p -u -r1.47 -r1.48
--- ib.c	2 Dec 2006 19:08:41 -0000	1.47
+++ ib.c	7 Dec 2006 21:47:47 -0000	1.48
@@ -6,7 +6,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: ib.c,v 1.47 2006/12/02 19:08:41 pw Exp $
+ * $Id: ib.c,v 1.48 2006/12/07 21:47:47 pw Exp $
  */
 #include <stdio.h>
 #include <stdlib.h>
@@ -119,6 +119,8 @@ static int ib_check_cq(void)
 	debug(4, "%s: found something", __func__);
 	++ret;
 	if (wc.status != 0) {
+	    /* opcode is not necessarily valid; only wr_id, status, qp_num,
+	     * and vendor_err can be relied upon */
 	    if (wc.opcode == BMI_IB_OP_SEND) {
 		debug(0, "%s: entry id 0x%llx SEND error %s", __func__,
 		  llu(wc.id), wc_status_string(wc.status));
@@ -1281,13 +1283,8 @@ static int BMI_ib_testsome(int incount, 
 }
 
 /*
- * Used by the test functions to block if not much is going on
- * since the timeouts at the BMI job layer are too coarse.
- */
-static struct timeval last_action = { 0, 0 };
-
-/*
  * Test for multiple completions matching a particular user context.
+ * Return 0 if okay, >0 if want another poll soon, negative for error.
  */
 static int
 BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
@@ -1295,17 +1292,18 @@ BMI_ib_testcontext(int incount, bmi_op_i
   int max_idle_time, bmi_context_id context_id)
 {
     struct qlist_head *l, *lnext;
-    int n, complete;
-    void **up = 0;
+    int n = 0, complete, activity = 0;
+    void **up = NULL;
 
     gen_mutex_lock(&interface_mutex);
-    ib_check_cq();
+
+restart:
+    activity += ib_check_cq();
 
     /*
      * Walk _all_ entries on sq, rq, marking them completed or
      * encouraging them as needed due to resource limitations.
      */
-    n = 0;
     for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
 	struct ib_work *sq = qlist_upcast(l);
 	lnext = l->next;
@@ -1331,37 +1329,18 @@ BMI_ib_testcontext(int incount, bmi_op_i
     /* drop lock before blocking on new connections below */
     gen_mutex_unlock(&interface_mutex);
 
-    *outcount = n;
-
-    if (n > 0) {
-	gettimeofday(&last_action, 0);  /* remember this action */
-    } else if (max_idle_time > 0) {
+    if (activity == 0 && n == 0 && max_idle_time > 0) {
 	/*
-	 * Spin for an interval after some activity, then go to a
-	 * blocking interface by using poll() on the IB completion channel
-	 * and TCP listen socket.
+	 * Block if told to from above.
 	 */
-	struct timeval now;
-
-	gettimeofday(&now, 0);
-	timersub(&now, &last_action, &now);
-
-	/* if time since last activity is > 10ms, block */
-	if (now.tv_sec > 0 || now.tv_usec > 10000) {
-	    /* block */
-	    debug(8, "%s: last activity too long ago, blocking", __func__);
-	    n = ib_block_for_activity(max_idle_time);
-	    if (n)
-		gettimeofday(&last_action, 0);  /* had some action */
-	} else {
-	    /* whee, spin */
-	    /* totally helps on Lee's old 2.4.21 machine, but may cause
-	     * big delays on modern kernels; do not make default */
-	    /* sched_yield(); */
-	    ;
-	}
+	debug(2, "%s: last activity too long ago, blocking", __func__);
+	activity = ib_block_for_activity(max_idle_time);
+	if (activity == 1)   /* IB action, go do it immediately */
+	    goto restart;
     }
-    return 0;
+
+    *outcount = n;
+    return activity + n;
 }
 
 /*
@@ -1369,20 +1348,21 @@ BMI_ib_testcontext(int incount, bmi_op_i
  * This is also where we check for new connections on the TCP socket, since
  * those would show up as unexpected the first time anything is sent.
  * Return 0 for success, or -1 for failure; number of things in *outcount.
+ * Return >0 if want another poll soon.
  */
 static int
 BMI_ib_testunexpected(int incount __unused, int *outcount,
   struct method_unexpected_info *ui, int max_idle_time __unused)
 {
-    int num_action;
     struct qlist_head *l;
+    int activity, n;
 
     gen_mutex_lock(&interface_mutex);
 
     /* Check CQ, then look for the first unexpected message.  */
-    num_action = ib_check_cq();
+    activity = ib_check_cq();
 
-    *outcount = 0;
+    n = 0;
     qlist_for_each(l, &ib_device->recvq) {
 	struct ib_work *rq = qlist_upcast(l);
 	if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
@@ -1403,7 +1383,7 @@ BMI_ib_testunexpected(int incount __unus
 	    ui->tag = rq->bmi_tag;
 	    /* re-post the buffer in which it was sitting, just unexpecteds */
 	    post_rr(c, rq->bh);
-	    *outcount = 1;
+	    n = 1;
 	    qlist_del(&rq->list);
 	    free(rq);
 	    --c->refcnt;
@@ -1413,18 +1393,11 @@ BMI_ib_testunexpected(int incount __unus
 	}
     }
 
-    /* check for new incoming connections */
-    num_action += ib_tcp_server_check_new_connections();
-
-    /* look for async events on the IB port */
-    num_action += check_async_events();
-
-    if (num_action)
-	gettimeofday(&last_action, 0);
-
   out:
     gen_mutex_unlock(&interface_mutex);
-    return 0;
+
+    *outcount = n;
+    return activity + n;
 }
 
 /*
@@ -1792,7 +1765,8 @@ static void ib_tcp_server_init_listen_so
 
 /*
  * Check for new connections.  The listening socket is left nonblocking
- * so this test can be quick.  Returns >0 if an accept worked.
+ * so this test can be quick; but accept is not really that quick compared
+ * to polling an IB interface, for instance.  Returns >0 if an accept worked.
  */
 static int ib_tcp_server_check_new_connections(void)
 {
@@ -1813,6 +1787,8 @@ static int ib_tcp_server_check_new_conne
 	int port = ntohs(ssin.sin_port);
 	sprintf(peername, "%s:%d", hostname, port);
 
+	gen_mutex_lock(&interface_mutex);
+
 	c = ib_new_connection(s, peername, 1);
 	if (!c) {
 	    free(hostname);
@@ -1828,6 +1804,9 @@ static int ib_tcp_server_check_new_conne
 
 	debug(2, "%s: accepted new connection %s at server", __func__,
 	  c->peername);
+
+	gen_mutex_unlock(&interface_mutex);
+
 	if (close(s) < 0)
 	    error_errno("%s: close new sock", __func__);
 	ret = 1;
@@ -1839,28 +1818,39 @@ static int ib_tcp_server_check_new_conne
  * Ask the device to write to its FD if a CQ event happens, and poll on it
  * as well as the listen_sock for activity, but do not actually respond to
  * anything.  A later ib_check_cq will handle CQ events, and a later call to
- * testunexpected will pick up new connections.  Returns >0 if something is
- * ready.
+ * testunexpected will pick up new connections.  Returns ==1 if IB device is
+ * ready, other >0 for some activity, else 0.
  */
 static int ib_block_for_activity(int timeout_ms)
 {
-    struct pollfd pfd[2];
+    struct pollfd pfd[3];  /* cq fd, async fd, accept socket */
     int numfd;
     int ret;
 
-    pfd[0].fd = prepare_cq_block();
+    prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
     pfd[0].events = POLLIN;
-    numfd = 1;
+    pfd[1].events = POLLIN;
+    numfd = 2;
     if (ib_device->listen_sock >= 0) {
-	pfd[1].fd = ib_device->listen_sock;
-	pfd[1].events = POLLIN;
-	numfd = 2;
+	pfd[2].fd = ib_device->listen_sock;
+	pfd[2].events = POLLIN;
+	numfd = 3;
     }
+
     ret = poll(pfd, numfd, timeout_ms);
-    debug(4, "%s: ret %d rev0 0x%x", __func__, ret, pfd[0].revents);
+    debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
+          pfd[0].revents, pfd[1].revents);
     if (ret > 0) {
-	if (pfd[0].revents == POLLIN)
+	if (pfd[0].revents == POLLIN) {
 	    ack_cq_completion_event();
+	    return 1;
+	}
+	/* check others only if CQ was empty */
+	ret = 2;
+	if (pfd[1].revents == POLLIN)
+	    check_async_events();
+	if (pfd[2].revents == POLLIN)
+	    ib_tcp_server_check_new_connections();
     } else if (ret < 0) {
 	if (errno == EINTR)  /* normal, ignore but break */
 	    ret = 0;

Index: ib.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/ib.h,v
diff -u -p -p -u -r1.25 -r1.26
--- ib.h	2 Dec 2006 19:08:41 -0000	1.25
+++ ib.h	7 Dec 2006 21:47:47 -0000	1.26
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: ib.h,v 1.25 2006/12/02 19:08:41 pw Exp $
+ * $Id: ib.h,v 1.26 2006/12/07 21:47:47 pw Exp $
  */
 #ifndef __ib_h
 #define __ib_h
@@ -347,7 +347,7 @@ struct ib_device_func {
     void (*post_rr)(const ib_connection_t *c, struct buf_head *bh);
     void (*post_sr_rdmaw)(struct ib_work *sq, msg_header_cts_t *mh_cts,
                           void *mh_cts_buf);
-    int (*prepare_cq_block)(void);
+    void (*prepare_cq_block)(int *cq_fd, int *async_fd);
     void (*ack_cq_completion_event)(void);
     int (*check_cq)(struct bmi_ib_wc *wc);
     const char *(*wc_status_string)(int status);
@@ -394,6 +394,8 @@ void error_xerrno(int errnum, const char
   __attribute__((noreturn,format(printf,2,3)));
 void warning(const char *fmt, ...) __attribute__((format(printf,1,2)));
 void warning_errno(const char *fmt, ...) __attribute__((format(printf,1,2)));
+void warning_xerrno(int errnum, const char *fmt, ...)
+  __attribute__((format(printf,2,3)));
 void info(const char *fmt, ...) __attribute__((format(printf,1,2)));
 void *Malloc(unsigned long n) __attribute__((malloc));
 void *qlist_del_head(struct qlist_head *list);

Index: mem.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/mem.c,v
diff -u -p -p -u -r1.10 -r1.11
--- mem.c	2 Dec 2006 19:08:41 -0000	1.10
+++ mem.c	7 Dec 2006 21:47:47 -0000	1.11
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: mem.c,v 1.10 2006/12/02 19:08:41 pw Exp $
+ * $Id: mem.c,v 1.11 2006/12/07 21:47:47 pw Exp $
  */
 #include <src/common/gen-locks/gen-locks.h>
 #include "pvfs2-internal.h"
@@ -29,6 +29,7 @@
 typedef struct {
     struct qlist_head list;
     gen_mutex_t mutex;
+    struct qlist_head free_chunk_list;
     void (*mem_register)(memcache_entry_t *c);
     void (*mem_deregister)(memcache_entry_t *c);
 } memcache_device_t;
@@ -107,6 +108,12 @@ memcache_lookup_exact(memcache_device_t 
 /*
  * BMI malloc and free routines.  If the region is big enough, pin
  * it now to save time later in the actual send or recv routine.
+ * These are only ever called from PVFS internal functions to allocate
+ * buffers, on the server, or on the client for non-user-supplied
+ * buffers.
+ *
+ * Standard sizes will appear frequently, thus do not free them.  Use
+ * a separate list sorted by sizes that can be used to reuse one.
  */
 void *
 memcache_memalloc(void *md, bmi_size_t len, int eager_limit)
@@ -114,7 +121,30 @@ memcache_memalloc(void *md, bmi_size_t l
     memcache_device_t *memcache_device = md;
     void *buf;
 
+    debug(4, "%s: len %zd limit %d", __func__, len, eager_limit);
+
+    /* search in size cache first */
+#if ENABLE_MEMCACHE
+    if (len > eager_limit) {
+	memcache_entry_t *c;
+	gen_mutex_lock(&memcache_device->mutex);
+	qlist_for_each_entry(c, &memcache_device->free_chunk_list, list) {
+	    if (c->len == len) {
+		debug(4, "%s: recycle free chunk, buf %p", __func__, c->buf);
+		qlist_del(&c->list);
+		qlist_add_tail(&c->list, &memcache_device->list);
+		++c->count;
+		buf = c->buf;
+		gen_mutex_unlock(&memcache_device->mutex);
+		goto out;
+	    }
+	}
+	gen_mutex_unlock(&memcache_device->mutex);
+    }
+#endif
+
     buf = malloc(len);
+
 #if ENABLE_MEMCACHE
     if (bmi_ib_unlikely(!buf))
 	goto out;
@@ -124,16 +154,19 @@ memcache_memalloc(void *md, bmi_size_t l
 	gen_mutex_lock(&memcache_device->mutex);
 	/* could be recycled buffer */
 	c = memcache_lookup_cover(memcache_device, buf, len);
-	if (c)
+	if (c) {
 	    ++c->count;
-	else {
+	    debug(4, "%s: reuse reg, buf %p, count %d", __func__, c->buf,
+	          c->count);
+	} else {
 	    c = memcache_add(memcache_device, buf, len);
 	    if (bmi_ib_unlikely(!c)) {
 		free(buf);
-		buf = 0;
+		buf = NULL;
 	    } else {
-		(memcache_device->mem_register)(c);
+		memcache_device->mem_register(c);
 		++c->count;
+		debug(4, "%s: new reg, buf %p", __func__, c->buf);
 	    }
 	}
 	gen_mutex_unlock(&memcache_device->mutex);
@@ -146,20 +179,24 @@ memcache_memalloc(void *md, bmi_size_t l
 int
 memcache_memfree(void *md, void *buf, bmi_size_t len)
 {
-    memcache_device_t *memcache_device = md;
 #if ENABLE_MEMCACHE
+    memcache_device_t *memcache_device = md;
     memcache_entry_t *c;
-    /* okay if not found, just not cached */
 
     gen_mutex_lock(&memcache_device->mutex);
+    /* okay if not found, just not cached; perhaps an eager-size buffer */
     c = memcache_lookup_exact(memcache_device, buf, len);
     if (c) {
-	debug(6, "%s: found %p len %lld", __func__, c->buf, lld(c->len));
+	debug(4, "%s: cache free buf %p len %lld", __func__, c->buf,
+	      lld(c->len));
 	assert(c->count == 1, "%s: buf %p len %lld count = %d, expected 1",
-	  __func__, c->buf, lld(c->len), c->count);
-	(memcache_device->mem_deregister)(c);
+	       __func__, c->buf, lld(c->len), c->count);
+	/* cache it */
+	--c->count;
 	qlist_del(&c->list);
-	free(c);
+	qlist_add(&c->list, &memcache_device->free_chunk_list);
+	gen_mutex_unlock(&memcache_device->mutex);
+	return 0;
     }
     gen_mutex_unlock(&memcache_device->mutex);
 #endif
@@ -197,7 +234,7 @@ memcache_register(void *md, ib_buflist_t
 	    if (!c)
 		error("%s: no memory for cache entry", __func__);
 	    c->count = 1;
-	    (memcache_device->mem_register)(c);
+	    memcache_device->mem_register(c);
 	}
 	buflist->memcache[i] = c;
 #else
@@ -205,7 +242,7 @@ memcache_register(void *md, ib_buflist_t
 	cp->buf = buflist->buf.recv[i];
 	cp->len = buflist->len[i];
 	cp->type = type;
-	(memcache_device->mem_register)(cp);
+	memcache_device->mem_register(cp);
 	buflist->memcache[i] = cp;
 #endif
     }
@@ -234,7 +271,7 @@ void memcache_preregister(void *md, cons
 	c = memcache_add(memcache_device, (void *)(uintptr_t) buf, len);
 	if (!c)
 	    error("%s: no memory for cache entry", __func__);
-	(memcache_device->mem_register)(c);
+	memcache_device->mem_register(c);
     }
     gen_mutex_unlock(&memcache_device->mutex);
 #endif
@@ -257,7 +294,7 @@ memcache_deregister(void *md, ib_buflist
 	   c->buf, lld(c->len), c->count);
 	/* let garbage collection do ib_mem_deregister(c) for refcnt==0 */
 #else
-	(memcache_device->mem_deregister)(buflist->memcache[i]);
+	memcache_device->mem_deregister(buflist->memcache[i]);
 	free(buflist->memcache[i]);
 #endif
     }
@@ -276,6 +313,7 @@ void *memcache_init(void (*mem_register)
     memcache_device = Malloc(sizeof(*memcache_device));
     INIT_QLIST_HEAD(&memcache_device->list);
     gen_mutex_init(&memcache_device->mutex);
+    INIT_QLIST_HEAD(&memcache_device->free_chunk_list);
     memcache_device->mem_register = mem_register;
     memcache_device->mem_deregister = mem_deregister;
     return memcache_device;
@@ -286,13 +324,17 @@ void *memcache_init(void (*mem_register)
  */
 void memcache_shutdown(void *md)
 {
-    struct qlist_head *l, *lp;
     memcache_device_t *memcache_device = md;
+    memcache_entry_t *c, *cn;
 
     gen_mutex_lock(&memcache_device->mutex);
-    qlist_for_each_safe(l, lp, &memcache_device->list) {
-	memcache_entry_t *c = qlist_entry(l, memcache_entry_t, list);
-	(memcache_device->mem_deregister)(c);
+    qlist_for_each_entry_safe(c, cn, &memcache_device->list, list) {
+	memcache_device->mem_deregister(c);
+	qlist_del(&c->list);
+	free(c);
+    }
+    qlist_for_each_entry_safe(c, cn, &memcache_device->free_chunk_list, list) {
+	memcache_device->mem_deregister(c);
 	qlist_del(&c->list);
 	free(c);
     }

Index: openib.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/openib.c,v
diff -u -p -p -u -r1.9 -r1.10
--- openib.c	2 Dec 2006 19:08:41 -0000	1.9
+++ openib.c	7 Dec 2006 21:47:47 -0000	1.10
@@ -6,7 +6,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: openib.c,v 1.9 2006/12/02 19:08:41 pw Exp $
+ * $Id: openib.c,v 1.10 2006/12/07 21:47:47 pw Exp $
  */
 #include <string.h>
 #include <errno.h>
@@ -616,7 +616,7 @@ static int openib_check_cq(struct bmi_ib
     return 1;
 }
 
-static int openib_prepare_cq_block(void)
+static void openib_prepare_cq_block(int *cq_fd, int *async_fd)
 {
     struct openib_device_priv *od = ib_device->priv;
     int ret;
@@ -627,7 +627,8 @@ static int openib_prepare_cq_block(void)
 	error_xerrno(ret, "%s: ibv_req_notify_cq", __func__);
 
     /* return the fd that can be fed to poll() */
-    return od->channel->fd;
+    *cq_fd = od->channel->fd;
+    *async_fd = od->ctx->async_fd;
 }
 
 /*

Index: util.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/util.c,v
diff -u -p -p -u -r1.10 -r1.11
--- util.c	12 Oct 2006 20:37:28 -0000	1.10
+++ util.c	7 Dec 2006 21:47:47 -0000	1.11
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: util.c,v 1.10 2006/10/12 20:37:28 pw Exp $
+ * $Id: util.c,v 1.11 2006/12/07 21:47:47 pw Exp $
  */
 #include <stdio.h>
 #include <string.h>
@@ -83,6 +83,18 @@ warning_errno(const char *fmt, ...)
     vsprintf(s, fmt, ap);
     va_end(ap);
     gossip_err("Warning: %s: %s.\n", s, strerror(errno));
+}
+
+void __attribute__((format(printf,2,3))) __hidden
+warning_xerrno(int errnum, const char *fmt, ...)
+{
+    char s[2048];
+    va_list ap;
+
+    va_start(ap, fmt);
+    vsprintf(s, fmt, ap);
+    va_end(ap);
+    gossip_err("Warning: %s: %s.\n", s, strerror(errnum));
 }
 
 void * __attribute__((malloc)) __hidden

Index: vapi.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_ib/vapi.c,v
diff -u -p -p -u -r1.9 -r1.10
--- vapi.c	2 Dec 2006 19:08:41 -0000	1.9
+++ vapi.c	7 Dec 2006 21:47:47 -0000	1.10
@@ -5,7 +5,7 @@
  *
  * See COPYING in top-level directory.
  *
- * $Id: vapi.c,v 1.9 2006/12/02 19:08:41 pw Exp $
+ * $Id: vapi.c,v 1.10 2006/12/07 21:47:47 pw Exp $
  */
 #include <stdio.h>
 #include <string.h>
@@ -623,17 +623,19 @@ static int vapi_check_cq(struct bmi_ib_w
     return 1;
 }
 
-static int vapi_prepare_cq_block(void)
+static void vapi_prepare_cq_block(int *cq_fd, int *async_fd)
 {
     struct vapi_device_priv *vd = ib_device->priv;
     int ret;
+
     /* ask for the next notfication */
     ret = VAPI_req_comp_notif(vd->nic_handle, vd->nic_cq, VAPI_NEXT_COMP);
     if (ret < 0)
 	error_verrno(ret, "%s: VAPI_req_comp_notif", __func__);
 
     /* return the fd that can be fed to poll() */
-    return vd->cq_event_pipe[0];
+    *cq_fd = vd->cq_event_pipe[0];
+    *async_fd = vd->async_event_pipe[0];
 }
 
 /*



More information about the Pvfs2-cvs mailing list