[PVFS2-CVS] commit by pcarns in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp-addressing.h bmi-tcp.c socket-collection.c socket-collection.h

CVS commit program cvs at parl.clemson.edu
Wed Mar 3 12:33:26 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb:/tmp/cvs-serv22945/src/io/bmi/bmi_tcp

Modified Files:
	bmi-tcp-addressing.h bmi-tcp.c socket-collection.c 
	socket-collection.h 
Log Message:
Threw away old socket collection code, replaced with completely new
implementation.  Quite a few improvements:
- cleaner
- better thread safety
- O(1) updates of poll set
- O(1) socket <-> bmi address mapping
- no long queue searches
Probably introduced some bugs along the way, so let me know if you see
anything fishy...


Index: bmi-tcp-addressing.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp-addressing.h,v
diff -p -u -r1.7 -r1.8
--- bmi-tcp-addressing.h	16 Feb 2004 21:22:41 -0000	1.7
+++ bmi-tcp-addressing.h	3 Mar 2004 17:33:26 -0000	1.8
@@ -17,9 +17,6 @@
  * Information specific to tcp/ip
  */
 
-typedef int32_t bmi_sock_t;	/* tcp/ip socket */
-typedef int32_t bmi_port_t;	/* tcp/ip port */
-
 /* this contains TCP/IP addressing information- it is filled in as
  * connections are made */
 struct tcp_addr
@@ -29,8 +26,8 @@ struct tcp_addr
     int addr_error;		
     char *hostname;
     char *ipaddr;
-    bmi_port_t port;
-    bmi_sock_t socket;
+    int port;
+    int socket;
     /* flag that indicates this address represents a
      * server port on which connections may be accepted */
     int server_port;
@@ -40,6 +37,7 @@ struct tcp_addr
     int not_connected;
     /* socket collection link */
     struct qlist_head sc_link;
+    int sc_index;
 };
 
 /*****************************************************************

Index: bmi-tcp.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.67 -r1.68
--- bmi-tcp.c	2 Mar 2004 21:16:47 -0000	1.67
+++ bmi-tcp.c	3 Mar 2004 17:33:26 -0000	1.68
@@ -1405,6 +1405,7 @@ method_addr_p alloc_tcp_method_addr(void
     tcp_addr_data->socket = -1;
     tcp_addr_data->port = -1;
     tcp_addr_data->map = my_method_addr;
+    tcp_addr_data->sc_index = -1;
 
     return (my_method_addr);
 }
@@ -2069,12 +2070,16 @@ static int tcp_do_work(int max_idle_time
     int stall_flag = 0;
     int busy_flag = 1;
     struct timespec req;
+    struct tcp_addr* tcp_addr_data = NULL;
 
     /* now we need to poll and see what to work on */
+    /* drop mutex while we make this call */
+    gen_mutex_unlock(&interface_mutex);
     ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
 				       TCP_WORK_METRIC, &socket_count,
 				       addr_array, status_array,
 				       max_idle_time, &interface_mutex);
+    gen_mutex_lock(&interface_mutex);
     if (ret < 0)
     {
 	return (ret);
@@ -2086,6 +2091,14 @@ static int tcp_do_work(int max_idle_time
     /* do different kinds of work depending on results */
     for (i = 0; i < socket_count; i++)
     {
+	tcp_addr_data = addr_array[i]->method_data;
+	/* skip working on addresses in failure mode */
+	if(tcp_addr_data->addr_error)
+	{
+	    tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
+	    continue;
+	}
+
 	if (status_array[i] & SC_ERROR_BIT)
 	{
 	    ret = tcp_do_work_error(addr_array[i]);

Index: socket-collection.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/socket-collection.c,v
diff -p -u -r1.12 -r1.13
--- socket-collection.c	2 Mar 2004 22:57:37 -0000	1.12
+++ socket-collection.c	3 Mar 2004 17:33:26 -0000	1.13
@@ -15,11 +15,6 @@
  * will always check to see if there is data to be read on a socket.
  */
 
-/* NOTE: also note that this code is not re-entrant.  It is written
- * assuming that only one thread (method) will be accessing it at any
- * given time.
- */
-
 #include <sys/poll.h>
 #include <string.h>
 #include <unistd.h>
@@ -31,30 +26,13 @@
 #include "bmi-tcp-addressing.h"
 #include "gen-locks.h"
 
-/* number of sockets to poll at a time */
-#define SC_POLL_SIZE 128
-
 /* errors that can occur on a poll socket */
 #define ERRMASK (POLLERR+POLLHUP+POLLNVAL)
 
-/* used to keep up with the server socket if we have one. */
-static bmi_sock_t server_socket = -1;
-
-/* internal function prototypes */
-static method_addr_p socket_collection_search_addr(socket_collection_p scp,
-						   method_addr_p map);
-static method_addr_p socket_collection_shownext(socket_collection_p scp);
-
-static struct pollfd big_poll_fds[SC_POLL_SIZE];
-static method_addr_p big_poll_addr[SC_POLL_SIZE];
-static gen_mutex_t big_poll_mutex = GEN_MUTEX_INITIALIZER;
+#define POLLFD_ARRAY_START 32
+#define POLLFD_ARRAY_INC 32
 
-/*********************************************************************
- * public function implementations
- */
-
-/*
- * socket_collection_init()
+/* socket_collection_init()
  * 
  * creates a new socket collection.  It also acquires the server socket
  * from the caller if it is available.  Passing in a negative value
@@ -63,124 +41,98 @@ static gen_mutex_t big_poll_mutex = GEN_
  *
  * returns a pointer to the collection on success, NULL on failure.
  */
-socket_collection_p BMI_socket_collection_init(bmi_sock_t new_server_socket)
+socket_collection_p BMI_socket_collection_init(int new_server_socket)
 {
 
     socket_collection_p tmp_scp = NULL;
 
-    if (new_server_socket > 0)
+    tmp_scp = (struct socket_collection*) malloc(sizeof(struct
+	socket_collection));
+    if(!tmp_scp)
     {
-	server_socket = new_server_socket;
+	return(NULL);
     }
 
-    tmp_scp = (struct qlist_head *) malloc(sizeof(struct qlist_head));
-    if (tmp_scp)
-    {
-	INIT_QLIST_HEAD(tmp_scp);
-    }
-
-    return (tmp_scp);
-}
-
-/*
- * socket_collection_add()
- * 
- * adds a tcp method_addr to the collection.  It checks to see if the
- * addr is already present in the list first.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_add(socket_collection_p scp,
-			   method_addr_p map)
-{
-
-    method_addr_p tmp_map = NULL;
-    struct tcp_addr *tcp_data = NULL;
+    memset(tmp_scp, 0, sizeof(struct socket_collection));
 
-    /* see if it is already in the collection first */
-    tmp_map = socket_collection_search_addr(scp, map);
+    gen_mutex_init(&tmp_scp->mutex);
+    gen_mutex_init(&tmp_scp->queue_mutex);
 
-    if (tmp_map)
+    tmp_scp->pollfd_array = (struct
+	pollfd*)malloc(POLLFD_ARRAY_START*sizeof(struct pollfd));
+    if(!tmp_scp->pollfd_array)
     {
-	/* we already have it */
-	return;
+	free(tmp_scp);
+	return(NULL);
+    }
+    tmp_scp->addr_array =
+	(method_addr_p*)malloc(POLLFD_ARRAY_START*sizeof(method_addr_p));
+    if(!tmp_scp->addr_array)
+    {
+	free(tmp_scp->pollfd_array);
+	free(tmp_scp);
     }
 
-    tcp_data = map->method_data;
-    tcp_data->write_ref_count = 0;
-
-    /* NOTE: adding to head on purpose.  Probably we will access
-     * this socket soon after adding
-     */
-    qlist_add(&(tcp_data->sc_link), scp);
-    return;
-}
-
-
-/*
- * socket_collection_remove()
- *
- * removes a tcp method_addr from the collection.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_remove(socket_collection_p scp,
-			      method_addr_p map)
-{
-
-    struct tcp_addr *tcp_data = map->method_data;
+    tmp_scp->array_max = POLLFD_ARRAY_START;
+    tmp_scp->array_count = 0;
+    INIT_QLIST_HEAD(&tmp_scp->remove_queue);
+    INIT_QLIST_HEAD(&tmp_scp->add_queue);
+    tmp_scp->server_socket = new_server_socket;
 
-    qlist_del(&(tcp_data->sc_link));
+    if(new_server_socket > -1)
+    {
+	tmp_scp->pollfd_array[0].fd = new_server_socket;
+	tmp_scp->pollfd_array[0].events = POLLIN;
+	tmp_scp->addr_array[0] = NULL;
+	tmp_scp->array_count++;
+    }
 
-    return;
+    return (tmp_scp);
 }
 
-
-/*
- * socket_collection_add_write_bit()
- *
- * indicates that a poll operation should check for the write condition
- * on this particular socket.  This may be called several times for one
- * socket (method_addr).
+/* socket_collection_queue()
  * 
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_add_write_bit(socket_collection_p scp,
-				     method_addr_p map)
-{
-
-    struct tcp_addr *tcp_data = map->method_data;
-    tcp_data->write_ref_count++;
-
-    return;
-}
-
-/*
- * socket_collection_remove_write_bit()
- *
- * indicates that the given socket no longer needs to be polled for the
- * write condition.  This may also be called multiple times for one
- * address (all instances must be removed before it is no longer polled
- * for the write condition)
+ * queues a tcp method_addr for addition or removal from the collection.
  *
  * returns 0 on success, -errno on failure.
  */
-void BMI_socket_collection_remove_write_bit(socket_collection_p scp,
-					method_addr_p map)
+void BMI_socket_collection_queue(socket_collection_p scp,
+			   method_addr_p map, struct qlist_head* queue)
 {
+    struct qlist_head* iterator = NULL;
+    struct qlist_head* scratch = NULL;
+    struct tcp_addr* tcp_addr_data = NULL;
 
-    struct tcp_addr *tcp_data = map->method_data;
-    tcp_data->write_ref_count--;
-    if (tcp_data->write_ref_count < 0)
+    /* make sure that this address isn't already slated for addition/removal */
+    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
+    {
+	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+	if(tcp_addr_data->map == map)
+	{
+	    qlist_del(&tcp_addr_data->sc_link);
+	    break;
+	}
+    }
+    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
     {
-	tcp_data->write_ref_count = 0;
+	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+	if(tcp_addr_data->map == map)
+	{
+	    qlist_del(&tcp_addr_data->sc_link);
+	    break;
+	}
     }
+
+    /* add it on to the appropriate queue */
+    tcp_addr_data = map->method_data;
+    /* add to head, we are likely to access it again soon */
+    qlist_add(&tcp_addr_data->sc_link, queue);
+
     return;
 }
 
 
-/*
- * socket_collection_finalize()
+/* socket_collection_finalize()
  *
  * destroys a socket collection.  IMPORTANT:  It DOES NOT destroy the
  * addresses contained within the collection, nor does it terminate
@@ -190,13 +142,13 @@ void BMI_socket_collection_remove_write_
  */
 void BMI_socket_collection_finalize(socket_collection_p scp)
 {
-
-    /* not much to do here */
+    free(scp->pollfd_array);
+    free(scp->addr_array);
     free(scp);
+    return;
 }
 
-/*
- * socket_collection_testglobal()
+/* socket_collection_testglobal()
  *
  * this function is used to poll to see if any of the new sockets are
  * available for work.  The array of method addresses and array of
@@ -214,246 +166,186 @@ int BMI_socket_collection_testglobal(soc
 				 int poll_timeout,
 				 gen_mutex_t* external_mutex)
 {
-
-    int num_to_poll = 0;
-    int max_to_poll = SC_POLL_SIZE;
-    struct tcp_addr *tcp_data = NULL;
-    method_addr_p tmp_map = NULL;
+    struct qlist_head* iterator = NULL;
+    struct qlist_head* scratch = NULL;
+    struct tcp_addr* tcp_addr_data = NULL;
+    struct tcp_addr* shifted_tcp_addr_data = NULL;
+    struct pollfd* tmp_pollfd_array = NULL;
+    method_addr_p* tmp_addr_array = NULL;
     int ret = -1;
-    int num_handled = 0;
-    int i = 0;
-    char* tmp_host = NULL;
-    int old_errno = -1;
-
-    if ((incount < 1) || !(outcount) || !(maps) || !(status))
-    {
-	return (-EINVAL);
-    }
-
-    /* NOTES:
-     * Locking here is weird for now as a duct tape solution.  
-     * What we want to do is release the monster bmi-tcp lock while calling
-     * poll(), but we have to also prevent concurrent access to this
-     * function.  So we grab a mutex specific to this section of the code
-     * while in this function, while taking exceptional care with the lock
-     * ordering to prevent deadlock.  Modify at your own risk!
-     *
-     * This should all be rewritten one day...
-     */
-    gen_mutex_unlock(external_mutex);
-    gen_mutex_lock(&big_poll_mutex);
-    gen_mutex_lock(external_mutex);
+    int old_errno;
+    int tmp_count;
+    int i;
+    int skip_flag;
 
     /* init the outgoing arguments for safety */
     *outcount = 0;
     memset(maps, 0, (sizeof(method_addr_p) * incount));
     memset(status, 0, (sizeof(int) * incount));
 
-    /* paranoia */
-    memset(big_poll_fds, 0, (sizeof(struct pollfd) * SC_POLL_SIZE));
-    memset(big_poll_addr, 0, (sizeof(method_addr_p) * SC_POLL_SIZE));
-
-    /* leave room for server socket if needed */
-    if (server_socket >= 0)
-    {
-	max_to_poll--;
-    }
+    gen_mutex_lock(&scp->mutex);
 
-    /* put a sentinal in the first poll field */
-    big_poll_fds[0].fd = -1;
-    big_poll_fds[1].fd = -1;
-    num_to_poll = 0;
+    gen_mutex_lock(&scp->queue_mutex);
 
-    /* add the server socket if we have one */
-    if (server_socket >= 0)
+    /* look for addresses slated for removal */
+    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
     {
-	big_poll_fds[num_to_poll].fd = server_socket;
-	big_poll_fds[num_to_poll].events = POLLIN;
-	num_to_poll++;
+	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+	qlist_del(&tcp_addr_data->sc_link);
+	/* take out of poll array, shift last entry into its place */
+	if(tcp_addr_data->sc_index > -1)
+	{
+	    scp->pollfd_array[tcp_addr_data->sc_index] = 
+		scp->pollfd_array[scp->array_count-1];
+	    scp->addr_array[tcp_addr_data->sc_index] = 
+		scp->addr_array[scp->array_count-1];
+	    shifted_tcp_addr_data =
+		scp->addr_array[tcp_addr_data->sc_index]->method_data;
+	    shifted_tcp_addr_data->sc_index = tcp_addr_data->sc_index;
+	    scp->array_count--;
+	    tcp_addr_data->sc_index = -1;
+	    tcp_addr_data->write_ref_count = 0;
+	}
     }
 
-    while (num_to_poll < max_to_poll &&
-	   (tmp_map = socket_collection_shownext(scp)) &&
-	   ((struct tcp_addr *) (tmp_map->method_data))->socket !=
-	   big_poll_fds[0].fd &&
-	   ((struct tcp_addr *) (tmp_map->method_data))->socket !=
-	   big_poll_fds[1].fd)
+    /* look for addresses slated for addition */
+    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
     {
-	/* remove the job; add it back at the end of the queue */
-	BMI_socket_collection_remove(scp, tmp_map);
-	tcp_data = tmp_map->method_data;
-	if (tcp_data->socket < 0)
+	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+	qlist_del(&tcp_addr_data->sc_link);
+	if(tcp_addr_data->sc_index > -1)
 	{
-	    /* TODO: not sure how we hit this case, but it is definitely
-	     * happening.  For now just ignore and keep going, fix better
-	     * later
-	     */
-	    qlist_add_tail(&(tcp_data->sc_link), scp);
+	    /* update existing entry */
+#if 0
+	    gossip_err("HELLO: updating addr: %p, index: %d, ref: %d.\n",
+		scp->addr_array[tcp_addr_data->sc_index],
+		tcp_addr_data->sc_index,
+		tcp_addr_data->write_ref_count);
+#endif
+	    scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
+	    if(tcp_addr_data->write_ref_count > 0)
+		scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
 	}
 	else
 	{
-	    big_poll_fds[num_to_poll].fd = tcp_data->socket;
-	    if (tcp_data->write_ref_count > 0)
+	    /* new entry */
+	    if(scp->array_count == scp->array_max)
 	    {
-		big_poll_fds[num_to_poll].events += POLLOUT;
+		/* we must enlarge the poll arrays */
+		tmp_pollfd_array = (struct pollfd*)malloc(
+		    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(struct pollfd)); 
+		/* TODO: handle this */
+		assert(tmp_pollfd_array);
+		tmp_addr_array = (method_addr_p*)malloc(
+		    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(method_addr_p)); 
+		/* TODO: handle this */
+		assert(tmp_addr_array);
+		memcpy(tmp_pollfd_array, scp->pollfd_array,
+		    scp->array_max*sizeof(struct pollfd));
+		free(scp->pollfd_array);
+		scp->pollfd_array = tmp_pollfd_array;
+		memcpy(tmp_addr_array, scp->addr_array,
+		    scp->array_max*sizeof(method_addr_p));
+		free(scp->addr_array);
+		scp->addr_array = tmp_addr_array;
+		scp->array_max = scp->array_max+POLLFD_ARRAY_INC;
 	    }
-	    big_poll_fds[num_to_poll].events += POLLIN;
-	    big_poll_addr[num_to_poll] = tmp_map;
-	    num_to_poll++;
-	    qlist_add_tail(&(tcp_data->sc_link), scp);
+	    /* add into pollfd array */
+	    tcp_addr_data->sc_index = scp->array_count;
+	    scp->array_count++;
+	    scp->addr_array[tcp_addr_data->sc_index] = tcp_addr_data->map;
+	    scp->pollfd_array[tcp_addr_data->sc_index].fd =
+		tcp_addr_data->socket;
+	    scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
+	    if(tcp_addr_data->write_ref_count > 0)
+		scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
 	}
     }
+    gen_mutex_unlock(&scp->queue_mutex);
 
-    /* we should be all set now to perform the poll operation */
-    gen_mutex_unlock(external_mutex);
+    /* actually do the poll() work */
     do
     {
-	ret = poll(big_poll_fds, num_to_poll, poll_timeout);
-    } while (ret < 0 && errno == EINTR);
+	ret = poll(scp->pollfd_array, scp->array_count, poll_timeout);
+    } while(ret < 0 && errno == EINTR);
     old_errno = errno;
-    gen_mutex_lock(external_mutex);
 
-    /* look for poll error */
-    if (ret < 0)
+    if(ret < 0)
     {
-	gen_mutex_unlock(external_mutex);
-	gen_mutex_unlock(&big_poll_mutex);
-	gen_mutex_lock(external_mutex);
-	return (-old_errno);
+	gen_mutex_unlock(&scp->mutex);
+	return(-old_errno);
     }
 
-    /* short out if nothing is ready */
-    if (ret == 0)
+    /* nothing ready, just return */
+    if(ret == 0)
     {
-	gen_mutex_unlock(external_mutex);
-	gen_mutex_unlock(&big_poll_mutex);
-	gen_mutex_lock(external_mutex);
-	return (0);
+	gen_mutex_unlock(&scp->mutex);
+	return(0);
     }
 
-    if (ret <= incount)
-    {
-	*outcount = ret;
-    }
-    else
-    {
-	*outcount = incount;
-    }
+    tmp_count = ret;
 
-    num_handled = 0;
-    for (i = 0; i < num_to_poll; i++)
+    for(i=0; i<scp->array_count; i++)
     {
-	/* short out if we have handled as many as the caller wanted */
-	if (num_handled == *outcount)
+	/* short out if we hit count limit */
+	if(*outcount == incount || *outcount == tmp_count)
 	{
 	    break;
 	}
 
 	/* anything ready on this socket? */
-	if (big_poll_fds[i].revents)
+	if (scp->pollfd_array[i].revents)
 	{
-	    /* error case */
-	    if (big_poll_fds[i].revents & ERRMASK)
-	    {
-		if(big_poll_addr[i] == NULL)
-		    tmp_host = "NONE";
-		else
-		    tmp_host = ((struct
-			tcp_addr*)(big_poll_addr[i]->method_data))->hostname;
-	    	gossip_err("Error: bmi_tcp: socket closed to host:"
-		    " %s, socket %d.\n", tmp_host, big_poll_fds[i].fd);
-		status[num_handled] += SC_ERROR_BIT;
-	    }
-	    if (big_poll_fds[i].revents & POLLIN)
-	    {
-		status[num_handled] += SC_READ_BIT;
-	    }
-	    if (big_poll_fds[i].revents & POLLOUT)
+	    skip_flag = 0;
+
+	    /* make sure that this addr hasn't been removed */
+	    gen_mutex_lock(&scp->queue_mutex);
+	    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
 	    {
-		status[num_handled] += SC_WRITE_BIT;
+		tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+		if(tcp_addr_data->map == scp->addr_array[i])
+		{
+		    skip_flag = 1;
+		    break;
+		}
 	    }
+	    gen_mutex_unlock(&scp->queue_mutex);
+	    if(skip_flag)
+		continue;
+
+	    if(scp->pollfd_array[i].revents & ERRMASK)
+		status[*outcount] |= SC_ERROR_BIT;
+	    if(scp->pollfd_array[i].revents & POLLIN)
+		status[*outcount] |= SC_READ_BIT;
+	    if(scp->pollfd_array[i].revents & POLLOUT)
+		status[*outcount] |= SC_WRITE_BIT;
 
-	    if (big_poll_addr[i] == NULL)
+	    if(scp->addr_array[i] == NULL)
 	    {
 		/* server socket */
-		maps[num_handled] = alloc_tcp_method_addr();
-		if (!(maps[num_handled]))
-		{
-		    /* TODO: handle better? */
-		    gen_mutex_unlock(external_mutex);
-		    gen_mutex_unlock(&big_poll_mutex);
-		    gen_mutex_lock(external_mutex);
-		    return (-ENOMEM);
-		}
-		tcp_data = (maps[num_handled])->method_data;
-		tcp_data->server_port = 1;
-		tcp_data->socket = server_socket;
-		tcp_data->port = -1;
+		maps[*outcount] = alloc_tcp_method_addr();
+		/* TODO: handle this */
+		assert(maps[*outcount]);
+		tcp_addr_data = (maps[*outcount])->method_data;
+		tcp_addr_data->server_port = 1;
+		tcp_addr_data->socket = scp->server_socket;
+		tcp_addr_data->port = -1;
 	    }
 	    else
 	    {
-		/* "normal" socket */
-		maps[num_handled] = big_poll_addr[i];
+		/* normal case */
+		maps[*outcount] = scp->addr_array[i];
 	    }
-	    num_handled++;
-	}
-    }
-
-    gen_mutex_unlock(external_mutex);
-    gen_mutex_unlock(&big_poll_mutex);
-    gen_mutex_lock(external_mutex);
-    return (0);
-}
-
-
-/*********************************************************************
- * internal utility functions
- */
-
-/*
- * socket_collection_search_addr()
- * 
- * searches a socket collection to find an entry that matches the
- * given method addr.
- *
- * returns a pointer to the method_addr on success, NULL on failure
- */
-static method_addr_p socket_collection_search_addr(socket_collection_p scp,
-						   method_addr_p map)
-{
 
-    struct tcp_addr *tmp_entry = NULL;
-    socket_collection_p tmp_link = NULL;
-
-    qlist_for_each(tmp_link, scp)
-    {
-	tmp_entry = qlist_entry(tmp_link, struct tcp_addr,
-				sc_link);
-	if (tmp_entry->map == map)
-	{
-	    return (map);
+	    *outcount = (*outcount) + 1;
 	}
     }
-    return (NULL);
-}
 
-/* socket_collection_shownext()
- *
- * returns a pointer to the next item in the collection
- *
- * returns pointer to method address on success, NULL on failure
- */
-static method_addr_p socket_collection_shownext(socket_collection_p scp)
-{
-    struct tcp_addr *tcp_data = NULL;
+    gen_mutex_unlock(&scp->mutex);
 
-    if (scp->next == scp)
-    {
-	return (NULL);
-    }
-    tcp_data = qlist_entry(scp->next, struct tcp_addr, sc_link);
-    return (tcp_data->map);
+    return (0);
 }
+
 
 /*
  * Local variables:

Index: socket-collection.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/socket-collection.h,v
diff -p -u -r1.7 -r1.8
--- socket-collection.h	9 Dec 2003 21:11:41 -0000	1.7
+++ socket-collection.h	3 Mar 2004 17:33:26 -0000	1.8
@@ -18,12 +18,28 @@
 #ifndef __SOCKET_COLLECTION_H
 #define __SOCKET_COLLECTION_H
 
+#include <assert.h>
 #include "bmi-method-support.h"
 #include "bmi-tcp-addressing.h"
 #include "quicklist.h"
 #include "gen-locks.h"
 
-typedef struct qlist_head *socket_collection_p;
+struct socket_collection
+{
+    gen_mutex_t mutex;
+
+    struct pollfd* pollfd_array;
+    method_addr_p* addr_array;
+    int array_max;
+    int array_count;
+
+    gen_mutex_t queue_mutex;
+    struct qlist_head remove_queue;
+    struct qlist_head add_queue;
+
+    int server_socket;
+};
+typedef struct socket_collection* socket_collection_p;
 
 enum
 {
@@ -32,15 +48,42 @@ enum
     SC_ERROR_BIT = 4
 };
 
-socket_collection_p BMI_socket_collection_init(bmi_sock_t new_server_socket);
-void BMI_socket_collection_add(socket_collection_p scp,
-			   method_addr_p map);
-void BMI_socket_collection_remove(socket_collection_p scp,
-			      method_addr_p map);
-void BMI_socket_collection_add_write_bit(socket_collection_p scp,
-				     method_addr_p map);
-void BMI_socket_collection_remove_write_bit(socket_collection_p scp,
-					method_addr_p map);
+socket_collection_p BMI_socket_collection_init(int new_server_socket);
+void BMI_socket_collection_queue(socket_collection_p scp,
+			   method_addr_p map, struct qlist_head* queue);
+#define BMI_socket_collection_add(s, m) \
+do { \
+    gen_mutex_lock(&((s)->queue_mutex)); \
+    BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
+    gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_remove(s, m) \
+do { \
+    gen_mutex_lock(&((s)->queue_mutex)); \
+    BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
+    gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_add_write_bit(s, m) \
+do { \
+    gen_mutex_lock(&((s)->queue_mutex)); \
+    struct tcp_addr* tcp_data = (m)->method_data; \
+    tcp_data->write_ref_count++; \
+    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
+    gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_remove_write_bit(s, m) \
+do { \
+    gen_mutex_lock(&((s)->queue_mutex)); \
+    struct tcp_addr* tcp_data = (m)->method_data; \
+    tcp_data->write_ref_count--; \
+    assert(tcp_data->write_ref_count > -1); \
+    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
+    gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
 void BMI_socket_collection_finalize(socket_collection_p scp);
 int BMI_socket_collection_testglobal(socket_collection_p scp,
 				 int incount,



More information about the PVFS2-CVS mailing list