[PVFS2-CVS]
commit by pcarns in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp-addressing.h
bmi-tcp.c socket-collection.c socket-collection.h
CVS commit program
cvs at parl.clemson.edu
Wed Mar 3 12:33:26 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb:/tmp/cvs-serv22945/src/io/bmi/bmi_tcp
Modified Files:
bmi-tcp-addressing.h bmi-tcp.c socket-collection.c
socket-collection.h
Log Message:
Threw away old socket collection code, replaced with completely new
implementation. Quite a few improvements:
- cleaner
- better thread safety
- O(1) updates of poll set
- O(1) socket <-> bmi address mapping
- no long queue searches
Probably introduced some bugs along the way, so let me know if you see
anything fishy...
Index: bmi-tcp-addressing.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp-addressing.h,v
diff -p -u -r1.7 -r1.8
--- bmi-tcp-addressing.h 16 Feb 2004 21:22:41 -0000 1.7
+++ bmi-tcp-addressing.h 3 Mar 2004 17:33:26 -0000 1.8
@@ -17,9 +17,6 @@
* Information specific to tcp/ip
*/
-typedef int32_t bmi_sock_t; /* tcp/ip socket */
-typedef int32_t bmi_port_t; /* tcp/ip port */
-
/* this contains TCP/IP addressing information- it is filled in as
* connections are made */
struct tcp_addr
@@ -29,8 +26,8 @@ struct tcp_addr
int addr_error;
char *hostname;
char *ipaddr;
- bmi_port_t port;
- bmi_sock_t socket;
+ int port;
+ int socket;
/* flag that indicates this address represents a
* server port on which connections may be accepted */
int server_port;
@@ -40,6 +37,7 @@ struct tcp_addr
int not_connected;
/* socket collection link */
struct qlist_head sc_link;
+ int sc_index;
};
/*****************************************************************
Index: bmi-tcp.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.67 -r1.68
--- bmi-tcp.c 2 Mar 2004 21:16:47 -0000 1.67
+++ bmi-tcp.c 3 Mar 2004 17:33:26 -0000 1.68
@@ -1405,6 +1405,7 @@ method_addr_p alloc_tcp_method_addr(void
tcp_addr_data->socket = -1;
tcp_addr_data->port = -1;
tcp_addr_data->map = my_method_addr;
+ tcp_addr_data->sc_index = -1;
return (my_method_addr);
}
@@ -2069,12 +2070,16 @@ static int tcp_do_work(int max_idle_time
int stall_flag = 0;
int busy_flag = 1;
struct timespec req;
+ struct tcp_addr* tcp_addr_data = NULL;
/* now we need to poll and see what to work on */
+ /* drop mutex while we make this call */
+ gen_mutex_unlock(&interface_mutex);
ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
TCP_WORK_METRIC, &socket_count,
addr_array, status_array,
max_idle_time, &interface_mutex);
+ gen_mutex_lock(&interface_mutex);
if (ret < 0)
{
return (ret);
@@ -2086,6 +2091,14 @@ static int tcp_do_work(int max_idle_time
/* do different kinds of work depending on results */
for (i = 0; i < socket_count; i++)
{
+ tcp_addr_data = addr_array[i]->method_data;
+ /* skip working on addresses in failure mode */
+ if(tcp_addr_data->addr_error)
+ {
+ tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
+ continue;
+ }
+
if (status_array[i] & SC_ERROR_BIT)
{
ret = tcp_do_work_error(addr_array[i]);
Index: socket-collection.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/socket-collection.c,v
diff -p -u -r1.12 -r1.13
--- socket-collection.c 2 Mar 2004 22:57:37 -0000 1.12
+++ socket-collection.c 3 Mar 2004 17:33:26 -0000 1.13
@@ -15,11 +15,6 @@
* will always check to see if there is data to be read on a socket.
*/
-/* NOTE: also note that this code is not re-entrant. It is written
- * assuming that only one thread (method) will be accessing it at any
- * given time.
- */
-
#include <sys/poll.h>
#include <string.h>
#include <unistd.h>
@@ -31,30 +26,13 @@
#include "bmi-tcp-addressing.h"
#include "gen-locks.h"
-/* number of sockets to poll at a time */
-#define SC_POLL_SIZE 128
-
/* errors that can occur on a poll socket */
#define ERRMASK (POLLERR+POLLHUP+POLLNVAL)
-/* used to keep up with the server socket if we have one. */
-static bmi_sock_t server_socket = -1;
-
-/* internal function prototypes */
-static method_addr_p socket_collection_search_addr(socket_collection_p scp,
- method_addr_p map);
-static method_addr_p socket_collection_shownext(socket_collection_p scp);
-
-static struct pollfd big_poll_fds[SC_POLL_SIZE];
-static method_addr_p big_poll_addr[SC_POLL_SIZE];
-static gen_mutex_t big_poll_mutex = GEN_MUTEX_INITIALIZER;
+#define POLLFD_ARRAY_START 32
+#define POLLFD_ARRAY_INC 32
-/*********************************************************************
- * public function implementations
- */
-
-/*
- * socket_collection_init()
+/* socket_collection_init()
*
* creates a new socket collection. It also acquires the server socket
* from the caller if it is available. Passing in a negative value
@@ -63,124 +41,98 @@ static gen_mutex_t big_poll_mutex = GEN_
*
* returns a pointer to the collection on success, NULL on failure.
*/
-socket_collection_p BMI_socket_collection_init(bmi_sock_t new_server_socket)
+socket_collection_p BMI_socket_collection_init(int new_server_socket)
{
socket_collection_p tmp_scp = NULL;
- if (new_server_socket > 0)
+ tmp_scp = (struct socket_collection*) malloc(sizeof(struct
+ socket_collection));
+ if(!tmp_scp)
{
- server_socket = new_server_socket;
+ return(NULL);
}
- tmp_scp = (struct qlist_head *) malloc(sizeof(struct qlist_head));
- if (tmp_scp)
- {
- INIT_QLIST_HEAD(tmp_scp);
- }
-
- return (tmp_scp);
-}
-
-/*
- * socket_collection_add()
- *
- * adds a tcp method_addr to the collection. It checks to see if the
- * addr is already present in the list first.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_add(socket_collection_p scp,
- method_addr_p map)
-{
-
- method_addr_p tmp_map = NULL;
- struct tcp_addr *tcp_data = NULL;
+ memset(tmp_scp, 0, sizeof(struct socket_collection));
- /* see if it is already in the collection first */
- tmp_map = socket_collection_search_addr(scp, map);
+ gen_mutex_init(&tmp_scp->mutex);
+ gen_mutex_init(&tmp_scp->queue_mutex);
- if (tmp_map)
+ tmp_scp->pollfd_array = (struct
+ pollfd*)malloc(POLLFD_ARRAY_START*sizeof(struct pollfd));
+ if(!tmp_scp->pollfd_array)
{
- /* we already have it */
- return;
+ free(tmp_scp);
+ return(NULL);
+ }
+ tmp_scp->addr_array =
+ (method_addr_p*)malloc(POLLFD_ARRAY_START*sizeof(method_addr_p));
+ if(!tmp_scp->addr_array)
+ {
+ free(tmp_scp->pollfd_array);
+ free(tmp_scp);
}
- tcp_data = map->method_data;
- tcp_data->write_ref_count = 0;
-
- /* NOTE: adding to head on purpose. Probably we will access
- * this socket soon after adding
- */
- qlist_add(&(tcp_data->sc_link), scp);
- return;
-}
-
-
-/*
- * socket_collection_remove()
- *
- * removes a tcp method_addr from the collection.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_remove(socket_collection_p scp,
- method_addr_p map)
-{
-
- struct tcp_addr *tcp_data = map->method_data;
+ tmp_scp->array_max = POLLFD_ARRAY_START;
+ tmp_scp->array_count = 0;
+ INIT_QLIST_HEAD(&tmp_scp->remove_queue);
+ INIT_QLIST_HEAD(&tmp_scp->add_queue);
+ tmp_scp->server_socket = new_server_socket;
- qlist_del(&(tcp_data->sc_link));
+ if(new_server_socket > -1)
+ {
+ tmp_scp->pollfd_array[0].fd = new_server_socket;
+ tmp_scp->pollfd_array[0].events = POLLIN;
+ tmp_scp->addr_array[0] = NULL;
+ tmp_scp->array_count++;
+ }
- return;
+ return (tmp_scp);
}
-
-/*
- * socket_collection_add_write_bit()
- *
- * indicates that a poll operation should check for the write condition
- * on this particular socket. This may be called several times for one
- * socket (method_addr).
+/* socket_collection_queue()
*
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_add_write_bit(socket_collection_p scp,
- method_addr_p map)
-{
-
- struct tcp_addr *tcp_data = map->method_data;
- tcp_data->write_ref_count++;
-
- return;
-}
-
-/*
- * socket_collection_remove_write_bit()
- *
- * indicates that the given socket no longer needs to be polled for the
- * write condition. This may also be called multiple times for one
- * address (all instances must be removed before it is no longer polled
- * for the write condition)
+ * queues a tcp method_addr for addition or removal from the collection.
*
* returns 0 on success, -errno on failure.
*/
-void BMI_socket_collection_remove_write_bit(socket_collection_p scp,
- method_addr_p map)
+void BMI_socket_collection_queue(socket_collection_p scp,
+ method_addr_p map, struct qlist_head* queue)
{
+ struct qlist_head* iterator = NULL;
+ struct qlist_head* scratch = NULL;
+ struct tcp_addr* tcp_addr_data = NULL;
- struct tcp_addr *tcp_data = map->method_data;
- tcp_data->write_ref_count--;
- if (tcp_data->write_ref_count < 0)
+ /* make sure that this address isn't already slated for addition/removal */
+ qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
+ {
+ tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+ if(tcp_addr_data->map == map)
+ {
+ qlist_del(&tcp_addr_data->sc_link);
+ break;
+ }
+ }
+ qlist_for_each_safe(iterator, scratch, &scp->add_queue)
{
- tcp_data->write_ref_count = 0;
+ tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+ if(tcp_addr_data->map == map)
+ {
+ qlist_del(&tcp_addr_data->sc_link);
+ break;
+ }
}
+
+ /* add it on to the appropriate queue */
+ tcp_addr_data = map->method_data;
+ /* add to head, we are likely to access it again soon */
+ qlist_add(&tcp_addr_data->sc_link, queue);
+
return;
}
-/*
- * socket_collection_finalize()
+/* socket_collection_finalize()
*
* destroys a socket collection. IMPORTANT: It DOES NOT destroy the
* addresses contained within the collection, nor does it terminate
@@ -190,13 +142,13 @@ void BMI_socket_collection_remove_write_
*/
void BMI_socket_collection_finalize(socket_collection_p scp)
{
-
- /* not much to do here */
+ free(scp->pollfd_array);
+ free(scp->addr_array);
free(scp);
+ return;
}
-/*
- * socket_collection_testglobal()
+/* socket_collection_testglobal()
*
* this function is used to poll to see if any of the new sockets are
* available for work. The array of method addresses and array of
@@ -214,246 +166,186 @@ int BMI_socket_collection_testglobal(soc
int poll_timeout,
gen_mutex_t* external_mutex)
{
-
- int num_to_poll = 0;
- int max_to_poll = SC_POLL_SIZE;
- struct tcp_addr *tcp_data = NULL;
- method_addr_p tmp_map = NULL;
+ struct qlist_head* iterator = NULL;
+ struct qlist_head* scratch = NULL;
+ struct tcp_addr* tcp_addr_data = NULL;
+ struct tcp_addr* shifted_tcp_addr_data = NULL;
+ struct pollfd* tmp_pollfd_array = NULL;
+ method_addr_p* tmp_addr_array = NULL;
int ret = -1;
- int num_handled = 0;
- int i = 0;
- char* tmp_host = NULL;
- int old_errno = -1;
-
- if ((incount < 1) || !(outcount) || !(maps) || !(status))
- {
- return (-EINVAL);
- }
-
- /* NOTES:
- * Locking here is weird for now as a duct tape solution.
- * What we want to do is release the monster bmi-tcp lock while calling
- * poll(), but we have to also prevent concurrent access to this
- * function. So we grab a mutex specific to this section of the code
- * while in this function, while taking exceptional care with the lock
- * ordering to prevent deadlock. Modify at your own risk!
- *
- * This should all be rewritten one day...
- */
- gen_mutex_unlock(external_mutex);
- gen_mutex_lock(&big_poll_mutex);
- gen_mutex_lock(external_mutex);
+ int old_errno;
+ int tmp_count;
+ int i;
+ int skip_flag;
/* init the outgoing arguments for safety */
*outcount = 0;
memset(maps, 0, (sizeof(method_addr_p) * incount));
memset(status, 0, (sizeof(int) * incount));
- /* paranoia */
- memset(big_poll_fds, 0, (sizeof(struct pollfd) * SC_POLL_SIZE));
- memset(big_poll_addr, 0, (sizeof(method_addr_p) * SC_POLL_SIZE));
-
- /* leave room for server socket if needed */
- if (server_socket >= 0)
- {
- max_to_poll--;
- }
+ gen_mutex_lock(&scp->mutex);
- /* put a sentinal in the first poll field */
- big_poll_fds[0].fd = -1;
- big_poll_fds[1].fd = -1;
- num_to_poll = 0;
+ gen_mutex_lock(&scp->queue_mutex);
- /* add the server socket if we have one */
- if (server_socket >= 0)
+ /* look for addresses slated for removal */
+ qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
{
- big_poll_fds[num_to_poll].fd = server_socket;
- big_poll_fds[num_to_poll].events = POLLIN;
- num_to_poll++;
+ tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+ qlist_del(&tcp_addr_data->sc_link);
+ /* take out of poll array, shift last entry into its place */
+ if(tcp_addr_data->sc_index > -1)
+ {
+ scp->pollfd_array[tcp_addr_data->sc_index] =
+ scp->pollfd_array[scp->array_count-1];
+ scp->addr_array[tcp_addr_data->sc_index] =
+ scp->addr_array[scp->array_count-1];
+ shifted_tcp_addr_data =
+ scp->addr_array[tcp_addr_data->sc_index]->method_data;
+ shifted_tcp_addr_data->sc_index = tcp_addr_data->sc_index;
+ scp->array_count--;
+ tcp_addr_data->sc_index = -1;
+ tcp_addr_data->write_ref_count = 0;
+ }
}
- while (num_to_poll < max_to_poll &&
- (tmp_map = socket_collection_shownext(scp)) &&
- ((struct tcp_addr *) (tmp_map->method_data))->socket !=
- big_poll_fds[0].fd &&
- ((struct tcp_addr *) (tmp_map->method_data))->socket !=
- big_poll_fds[1].fd)
+ /* look for addresses slated for addition */
+ qlist_for_each_safe(iterator, scratch, &scp->add_queue)
{
- /* remove the job; add it back at the end of the queue */
- BMI_socket_collection_remove(scp, tmp_map);
- tcp_data = tmp_map->method_data;
- if (tcp_data->socket < 0)
+ tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+ qlist_del(&tcp_addr_data->sc_link);
+ if(tcp_addr_data->sc_index > -1)
{
- /* TODO: not sure how we hit this case, but it is definitely
- * happening. For now just ignore and keep going, fix better
- * later
- */
- qlist_add_tail(&(tcp_data->sc_link), scp);
+ /* update existing entry */
+#if 0
+ gossip_err("HELLO: updating addr: %p, index: %d, ref: %d.\n",
+ scp->addr_array[tcp_addr_data->sc_index],
+ tcp_addr_data->sc_index,
+ tcp_addr_data->write_ref_count);
+#endif
+ scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
+ if(tcp_addr_data->write_ref_count > 0)
+ scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
}
else
{
- big_poll_fds[num_to_poll].fd = tcp_data->socket;
- if (tcp_data->write_ref_count > 0)
+ /* new entry */
+ if(scp->array_count == scp->array_max)
{
- big_poll_fds[num_to_poll].events += POLLOUT;
+ /* we must enlarge the poll arrays */
+ tmp_pollfd_array = (struct pollfd*)malloc(
+ (scp->array_max+POLLFD_ARRAY_INC)*sizeof(struct pollfd));
+ /* TODO: handle this */
+ assert(tmp_pollfd_array);
+ tmp_addr_array = (method_addr_p*)malloc(
+ (scp->array_max+POLLFD_ARRAY_INC)*sizeof(method_addr_p));
+ /* TODO: handle this */
+ assert(tmp_addr_array);
+ memcpy(tmp_pollfd_array, scp->pollfd_array,
+ scp->array_max*sizeof(struct pollfd));
+ free(scp->pollfd_array);
+ scp->pollfd_array = tmp_pollfd_array;
+ memcpy(tmp_addr_array, scp->addr_array,
+ scp->array_max*sizeof(method_addr_p));
+ free(scp->addr_array);
+ scp->addr_array = tmp_addr_array;
+ scp->array_max = scp->array_max+POLLFD_ARRAY_INC;
}
- big_poll_fds[num_to_poll].events += POLLIN;
- big_poll_addr[num_to_poll] = tmp_map;
- num_to_poll++;
- qlist_add_tail(&(tcp_data->sc_link), scp);
+ /* add into pollfd array */
+ tcp_addr_data->sc_index = scp->array_count;
+ scp->array_count++;
+ scp->addr_array[tcp_addr_data->sc_index] = tcp_addr_data->map;
+ scp->pollfd_array[tcp_addr_data->sc_index].fd =
+ tcp_addr_data->socket;
+ scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
+ if(tcp_addr_data->write_ref_count > 0)
+ scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
}
}
+ gen_mutex_unlock(&scp->queue_mutex);
- /* we should be all set now to perform the poll operation */
- gen_mutex_unlock(external_mutex);
+ /* actually do the poll() work */
do
{
- ret = poll(big_poll_fds, num_to_poll, poll_timeout);
- } while (ret < 0 && errno == EINTR);
+ ret = poll(scp->pollfd_array, scp->array_count, poll_timeout);
+ } while(ret < 0 && errno == EINTR);
old_errno = errno;
- gen_mutex_lock(external_mutex);
- /* look for poll error */
- if (ret < 0)
+ if(ret < 0)
{
- gen_mutex_unlock(external_mutex);
- gen_mutex_unlock(&big_poll_mutex);
- gen_mutex_lock(external_mutex);
- return (-old_errno);
+ gen_mutex_unlock(&scp->mutex);
+ return(-old_errno);
}
- /* short out if nothing is ready */
- if (ret == 0)
+ /* nothing ready, just return */
+ if(ret == 0)
{
- gen_mutex_unlock(external_mutex);
- gen_mutex_unlock(&big_poll_mutex);
- gen_mutex_lock(external_mutex);
- return (0);
+ gen_mutex_unlock(&scp->mutex);
+ return(0);
}
- if (ret <= incount)
- {
- *outcount = ret;
- }
- else
- {
- *outcount = incount;
- }
+ tmp_count = ret;
- num_handled = 0;
- for (i = 0; i < num_to_poll; i++)
+ for(i=0; i<scp->array_count; i++)
{
- /* short out if we have handled as many as the caller wanted */
- if (num_handled == *outcount)
+ /* short out if we hit count limit */
+ if(*outcount == incount || *outcount == tmp_count)
{
break;
}
/* anything ready on this socket? */
- if (big_poll_fds[i].revents)
+ if (scp->pollfd_array[i].revents)
{
- /* error case */
- if (big_poll_fds[i].revents & ERRMASK)
- {
- if(big_poll_addr[i] == NULL)
- tmp_host = "NONE";
- else
- tmp_host = ((struct
- tcp_addr*)(big_poll_addr[i]->method_data))->hostname;
- gossip_err("Error: bmi_tcp: socket closed to host:"
- " %s, socket %d.\n", tmp_host, big_poll_fds[i].fd);
- status[num_handled] += SC_ERROR_BIT;
- }
- if (big_poll_fds[i].revents & POLLIN)
- {
- status[num_handled] += SC_READ_BIT;
- }
- if (big_poll_fds[i].revents & POLLOUT)
+ skip_flag = 0;
+
+ /* make sure that this addr hasn't been removed */
+ gen_mutex_lock(&scp->queue_mutex);
+ qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
{
- status[num_handled] += SC_WRITE_BIT;
+ tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
+ if(tcp_addr_data->map == scp->addr_array[i])
+ {
+ skip_flag = 1;
+ break;
+ }
}
+ gen_mutex_unlock(&scp->queue_mutex);
+ if(skip_flag)
+ continue;
+
+ if(scp->pollfd_array[i].revents & ERRMASK)
+ status[*outcount] |= SC_ERROR_BIT;
+ if(scp->pollfd_array[i].revents & POLLIN)
+ status[*outcount] |= SC_READ_BIT;
+ if(scp->pollfd_array[i].revents & POLLOUT)
+ status[*outcount] |= SC_WRITE_BIT;
- if (big_poll_addr[i] == NULL)
+ if(scp->addr_array[i] == NULL)
{
/* server socket */
- maps[num_handled] = alloc_tcp_method_addr();
- if (!(maps[num_handled]))
- {
- /* TODO: handle better? */
- gen_mutex_unlock(external_mutex);
- gen_mutex_unlock(&big_poll_mutex);
- gen_mutex_lock(external_mutex);
- return (-ENOMEM);
- }
- tcp_data = (maps[num_handled])->method_data;
- tcp_data->server_port = 1;
- tcp_data->socket = server_socket;
- tcp_data->port = -1;
+ maps[*outcount] = alloc_tcp_method_addr();
+ /* TODO: handle this */
+ assert(maps[*outcount]);
+ tcp_addr_data = (maps[*outcount])->method_data;
+ tcp_addr_data->server_port = 1;
+ tcp_addr_data->socket = scp->server_socket;
+ tcp_addr_data->port = -1;
}
else
{
- /* "normal" socket */
- maps[num_handled] = big_poll_addr[i];
+ /* normal case */
+ maps[*outcount] = scp->addr_array[i];
}
- num_handled++;
- }
- }
-
- gen_mutex_unlock(external_mutex);
- gen_mutex_unlock(&big_poll_mutex);
- gen_mutex_lock(external_mutex);
- return (0);
-}
-
-
-/*********************************************************************
- * internal utility functions
- */
-
-/*
- * socket_collection_search_addr()
- *
- * searches a socket collection to find an entry that matches the
- * given method addr.
- *
- * returns a pointer to the method_addr on success, NULL on failure
- */
-static method_addr_p socket_collection_search_addr(socket_collection_p scp,
- method_addr_p map)
-{
- struct tcp_addr *tmp_entry = NULL;
- socket_collection_p tmp_link = NULL;
-
- qlist_for_each(tmp_link, scp)
- {
- tmp_entry = qlist_entry(tmp_link, struct tcp_addr,
- sc_link);
- if (tmp_entry->map == map)
- {
- return (map);
+ *outcount = (*outcount) + 1;
}
}
- return (NULL);
-}
-/* socket_collection_shownext()
- *
- * returns a pointer to the next item in the collection
- *
- * returns pointer to method address on success, NULL on failure
- */
-static method_addr_p socket_collection_shownext(socket_collection_p scp)
-{
- struct tcp_addr *tcp_data = NULL;
+ gen_mutex_unlock(&scp->mutex);
- if (scp->next == scp)
- {
- return (NULL);
- }
- tcp_data = qlist_entry(scp->next, struct tcp_addr, sc_link);
- return (tcp_data->map);
+ return (0);
}
+
/*
* Local variables:
Index: socket-collection.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/socket-collection.h,v
diff -p -u -r1.7 -r1.8
--- socket-collection.h 9 Dec 2003 21:11:41 -0000 1.7
+++ socket-collection.h 3 Mar 2004 17:33:26 -0000 1.8
@@ -18,12 +18,28 @@
#ifndef __SOCKET_COLLECTION_H
#define __SOCKET_COLLECTION_H
+#include <assert.h>
#include "bmi-method-support.h"
#include "bmi-tcp-addressing.h"
#include "quicklist.h"
#include "gen-locks.h"
-typedef struct qlist_head *socket_collection_p;
+struct socket_collection
+{
+ gen_mutex_t mutex;
+
+ struct pollfd* pollfd_array;
+ method_addr_p* addr_array;
+ int array_max;
+ int array_count;
+
+ gen_mutex_t queue_mutex;
+ struct qlist_head remove_queue;
+ struct qlist_head add_queue;
+
+ int server_socket;
+};
+typedef struct socket_collection* socket_collection_p;
enum
{
@@ -32,15 +48,42 @@ enum
SC_ERROR_BIT = 4
};
-socket_collection_p BMI_socket_collection_init(bmi_sock_t new_server_socket);
-void BMI_socket_collection_add(socket_collection_p scp,
- method_addr_p map);
-void BMI_socket_collection_remove(socket_collection_p scp,
- method_addr_p map);
-void BMI_socket_collection_add_write_bit(socket_collection_p scp,
- method_addr_p map);
-void BMI_socket_collection_remove_write_bit(socket_collection_p scp,
- method_addr_p map);
+socket_collection_p BMI_socket_collection_init(int new_server_socket);
+void BMI_socket_collection_queue(socket_collection_p scp,
+ method_addr_p map, struct qlist_head* queue);
+#define BMI_socket_collection_add(s, m) \
+do { \
+ gen_mutex_lock(&((s)->queue_mutex)); \
+ BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
+ gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_remove(s, m) \
+do { \
+ gen_mutex_lock(&((s)->queue_mutex)); \
+ BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
+ gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_add_write_bit(s, m) \
+do { \
+ gen_mutex_lock(&((s)->queue_mutex)); \
+ struct tcp_addr* tcp_data = (m)->method_data; \
+ tcp_data->write_ref_count++; \
+ BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
+ gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
+#define BMI_socket_collection_remove_write_bit(s, m) \
+do { \
+ gen_mutex_lock(&((s)->queue_mutex)); \
+ struct tcp_addr* tcp_data = (m)->method_data; \
+ tcp_data->write_ref_count--; \
+ assert(tcp_data->write_ref_count > -1); \
+ BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
+ gen_mutex_unlock(&((s)->queue_mutex)); \
+} while(0)
+
void BMI_socket_collection_finalize(socket_collection_p scp);
int BMI_socket_collection_testglobal(socket_collection_p scp,
int incount,
More information about the PVFS2-CVS
mailing list