[Pvfs2-cvs] commit by nlmills in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp-addressing.h bmi-tcp.c module.mk.in socket-collection-epoll.c socket-collection-epoll.h socket-collection.c socket-collection.h sockio.c

CVS commit program cvs at parl.clemson.edu
Tue Aug 25 13:56:12 EDT 2009


Update of /anoncvs/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb1:/tmp/cvs-serv5511/src/io/bmi/bmi_tcp

Modified Files:
      Tag: cu-security-branch
	bmi-tcp-addressing.h bmi-tcp.c module.mk.in 
	socket-collection-epoll.c socket-collection-epoll.h 
	socket-collection.c socket-collection.h sockio.c 
Log Message:
merged in changes from summer at LANL


Index: bmi-tcp-addressing.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp-addressing.h,v
diff -p -u -r1.17 -r1.17.8.1
--- bmi-tcp-addressing.h	6 Nov 2007 23:08:36 -0000	1.17
+++ bmi-tcp-addressing.h	25 Aug 2009 17:56:11 -0000	1.17.8.1
@@ -24,6 +24,11 @@
 */
 #define BMI_TCP_ZERO_READ_LIMIT  10
 
+/* wait no more than 10 seconds for a partial BMI header to arrive on a
+ * socket once we have detected part of it.
+ */
+#define BMI_TCP_HEADER_WAIT_SECONDS 10
+
 /* peer name types */
 #define BMI_TCP_PEER_IP 1
 #define BMI_TCP_PEER_HOSTNAME 2
@@ -47,7 +52,7 @@ struct tcp_allowed_connection_s {
 struct tcp_addr
 {
     bmi_method_addr_p map;		/* points back to generic address */ \
-    PVFS_BMI_addr_t bmi_addr;
+    BMI_addr_t bmi_addr;
     /* stores error code for addresses that are broken for some reason */
     int addr_error;		
     char *hostname;
@@ -65,6 +70,8 @@ struct tcp_addr
     int sc_index;
     /* count of the number of sequential zero read operations */
     int zero_read_limit;
+    /* timer for how long we wait on incomplete headers to arrive */
+    int short_header_timer;
     /* flag used to determine if we can reconnect this address after failure */
     int dont_reconnect;
     char* peer;

Index: bmi-tcp.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.127 -r1.127.4.1
--- bmi-tcp.c	8 Jan 2008 21:18:41 -0000	1.127
+++ bmi-tcp.c	25 Aug 2009 17:56:11 -0000	1.127.4.1
@@ -15,9 +15,11 @@
 #include <assert.h>
 #include <sys/uio.h>
 #include <time.h>
+#include <sys/time.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
+#include "pint-mem.h"
 
 #include "pvfs2-config.h"
 #ifdef HAVE_NETDB_H
@@ -38,21 +40,18 @@
 #include "bmi-byteswap.h"
 #include "id-generator.h"
 #include "pint-event.h"
+#include "pvfs2-debug.h"
 #ifdef USE_TRUSTED
 #include "server-config.h"
 #include "bmi-tcp-addressing.h"
 #endif
 #include "gen-locks.h"
-
-#define BMI_EVENT_START(__op, __id) \
- PINT_event_timestamp(PVFS_EVENT_API_BMI, __op, 0, __id, \
- PVFS_EVENT_FLAG_START)
-
-#define BMI_EVENT_END(__op, __size, __id) \
- PINT_event_timestamp(PVFS_EVENT_API_BMI, __op, __size, __id, \
- PVFS_EVENT_FLAG_END)
+#include "pint-hint.h"
+#include "pint-event.h"
 
 static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
+static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
+static int sc_test_busy = 0;
 
 /* function prototypes */
 int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
@@ -76,7 +75,8 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
 		      enum bmi_buffer_type buffer_type,
 		      bmi_msg_tag_t tag,
 		      void *user_ptr,
-		      bmi_context_id context_id);
+		      bmi_context_id context_id,
+                      PVFS_hint hints);
 int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
 				bmi_method_addr_p dest,
 				const void *buffer,
@@ -84,7 +84,8 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
 				enum bmi_buffer_type buffer_type,
 				bmi_msg_tag_t tag,
 				void *user_ptr,
-				bmi_context_id context_id);
+				bmi_context_id context_id,
+                                PVFS_hint hints);
 int BMI_tcp_post_recv(bmi_op_id_t * id,
 		      bmi_method_addr_p src,
 		      void *buffer,
@@ -93,7 +94,8 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
 		      enum bmi_buffer_type buffer_type,
 		      bmi_msg_tag_t tag,
 		      void *user_ptr,
-		      bmi_context_id context_id);
+		      bmi_context_id context_id,
+                      PVFS_hint hints);
 int BMI_tcp_test(bmi_op_id_t id,
 		 int *outcount,
 		 bmi_error_code_t * error_code,
@@ -134,7 +136,8 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
 			   enum bmi_buffer_type buffer_type,
 			   bmi_msg_tag_t tag,
 			   void *user_ptr,
-			   bmi_context_id context_id);
+			   bmi_context_id context_id,
+                           PVFS_hint hints);
 int BMI_tcp_post_recv_list(bmi_op_id_t * id,
 			   bmi_method_addr_p src,
 			   void *const *buffer_list,
@@ -145,7 +148,8 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
 			   enum bmi_buffer_type buffer_type,
 			   bmi_msg_tag_t tag,
 			   void *user_ptr,
-			   bmi_context_id context_id);
+			   bmi_context_id context_id,
+                           PVFS_hint hints);
 int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
 				     bmi_method_addr_p dest,
 				     const void *const *buffer_list,
@@ -155,7 +159,8 @@ int BMI_tcp_post_sendunexpected_list(bmi
 				     enum bmi_buffer_type buffer_type,
 				     bmi_msg_tag_t tag,
 				     void *user_ptr,
-				     bmi_context_id context_id);
+				     bmi_context_id context_id,
+                                     PVFS_hint hints);
 int BMI_tcp_open_context(bmi_context_id context_id);
 void BMI_tcp_close_context(bmi_context_id context_id);
 int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
@@ -237,7 +242,8 @@ static int enqueue_operation(op_list_p t
 			     void *user_ptr,
 			     bmi_size_t actual_size,
 			     bmi_size_t expected_size,
-			     bmi_context_id context_id);
+			     bmi_context_id context_id,
+                             int32_t event_id);
 static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
 static int tcp_shutdown_addr(bmi_method_addr_p map);
 static int tcp_do_work(int max_idle_time);
@@ -252,26 +258,28 @@ static int tcp_accept_init(int *socket, 
 static method_op_p alloc_tcp_method_op(void);
 static void dealloc_tcp_method_op(method_op_p old_op);
 static int handle_new_connection(bmi_method_addr_p map);
-static int BMI_tcp_post_send_generic(bmi_op_id_t * id,
-				     bmi_method_addr_p dest,
-				     const void *const *buffer_list,
-				     const bmi_size_t *size_list,
-				     int list_count,
-				     enum bmi_buffer_type buffer_type,
-				     struct tcp_msg_header my_header,
-				     void *user_ptr,
-				     bmi_context_id context_id);
+static int tcp_post_send_generic(bmi_op_id_t * id,
+                                 bmi_method_addr_p dest,
+                                 const void *const *buffer_list,
+                                 const bmi_size_t *size_list,
+                                 int list_count,
+                                 enum bmi_buffer_type buffer_type,
+                                 struct tcp_msg_header my_header,
+                                 void *user_ptr,
+                                 bmi_context_id context_id,
+                                 PVFS_hint hints);
 static int tcp_post_recv_generic(bmi_op_id_t * id,
-				 bmi_method_addr_p src,
-				 void *const *buffer_list,
-				 const bmi_size_t *size_list,
-				 int list_count,
-				 bmi_size_t expected_size,
-				 bmi_size_t * actual_size,
-				 enum bmi_buffer_type buffer_type,
-				 bmi_msg_tag_t tag,
-				 void *user_ptr,
-				 bmi_context_id context_id);
+                                 bmi_method_addr_p src,
+                                 void *const *buffer_list,
+                                 const bmi_size_t *size_list,
+                                 int list_count,
+                                 bmi_size_t expected_size,
+                                 bmi_size_t * actual_size,
+                                 enum bmi_buffer_type buffer_type,
+                                 bmi_msg_tag_t tag,
+                                 void *user_ptr,
+                                 bmi_context_id context_id,
+                                 PVFS_hint hints);
 static int payload_progress(int s, void *const *buffer_list, const bmi_size_t* 
     size_list, int list_count, bmi_size_t total_size, int* list_index, 
     bmi_size_t* current_index_complete, enum bmi_op_type send_recv, 
@@ -326,6 +334,8 @@ static struct
 static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
 #endif
 
+static int check_unexpected = 1;
+
 /* op_list_array indices */
 enum
 {
@@ -388,6 +398,11 @@ static int forceful_cancel_mode = 0;
 static int tcp_buffer_size_receive = 0;
 static int tcp_buffer_size_send = 0;
 
+static PINT_event_type bmi_tcp_send_event_id;
+static PINT_event_type bmi_tcp_recv_event_id;
+
+static PINT_event_group bmi_tcp_event_group;
+static pid_t bmi_tcp_pid;
 
 /*************************************************************************
  * Visible Interface 
@@ -401,8 +416,8 @@ static int tcp_buffer_size_send = 0;
  * returns 0 on success, -errno on failure
  */
 int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
-		       int method_id,
-		       int init_flags)
+                       int method_id,
+                       int init_flags)
 {
 
     int ret = -1;
@@ -415,8 +430,8 @@ int BMI_tcp_initialize(bmi_method_addr_p
     /* check args */
     if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
     {
-	gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
-	return (bmi_tcp_errno_to_pvfs(-EINVAL));
+        gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
+        return (bmi_tcp_errno_to_pvfs(-EINVAL));
     }
 
     gen_mutex_lock(&interface_mutex);
@@ -428,46 +443,77 @@ int BMI_tcp_initialize(bmi_method_addr_p
 
     if (init_flags & BMI_INIT_SERVER)
     {
-	/* hang on to our local listening address if needed */
-	tcp_method_params.listen_addr = listen_addr;
-	/* and initialize server functions */
-	ret = tcp_server_init();
-	if (ret < 0)
-	{
-	    tmp_errno = bmi_tcp_errno_to_pvfs(ret);
-	    gossip_err("Error: tcp_server_init() failure.\n");
-	    goto initialize_failure;
-	}
+        /* hang on to our local listening address if needed */
+        tcp_method_params.listen_addr = listen_addr;
+        /* and initialize server functions */
+        ret = tcp_server_init();
+        if (ret < 0)
+        {
+            tmp_errno = bmi_tcp_errno_to_pvfs(ret);
+            gossip_err("Error: tcp_server_init() failure.\n");
+            goto initialize_failure;
+        }
     }
 
     /* set up the operation lists */
     for (i = 0; i < NUM_INDICES; i++)
     {
-	op_list_array[i] = op_list_new();
-	if (!op_list_array[i])
-	{
-	    tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
-	    goto initialize_failure;
-	}
+        op_list_array[i] = op_list_new();
+        if (!op_list_array[i])
+        {
+            tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
+            goto initialize_failure;
+        }
     }
 
     /* set up the socket collection */
     if (tcp_method_params.method_flags & BMI_INIT_SERVER)
     {
-	tcp_addr_data = tcp_method_params.listen_addr->method_data;
-	tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
+        tcp_addr_data = tcp_method_params.listen_addr->method_data;
+        tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
     }
     else
     {
-	tcp_socket_collection_p = BMI_socket_collection_init(-1);
+        tcp_socket_collection_p = BMI_socket_collection_init(-1);
     }
 
     if (!tcp_socket_collection_p)
     {
-	tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
-	goto initialize_failure;
+        tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
+        goto initialize_failure;
     }
 
+    bmi_tcp_pid = getpid();
+    PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
+
+    /* Define the send event:
+     *   START: (client_id, request_id, rank, handle, op_id, send_size)
+     *   STOP: (size_sent)
+     */
+    PINT_event_define_event(
+        &bmi_tcp_event_group,
+#ifdef __PVFS2_SERVER__
+        "bmi_server_send",
+#else
+        "bmi_client_send",
+#endif
+        "%d%d%d%llu%d%d",
+        "%d", &bmi_tcp_send_event_id);
+
+    /* Define the recv event:
+     *   START: (client_id, request_id, rank, handle, op_id, recv_size)
+     *   STOP: (size_received)
+     */
+    PINT_event_define_event(
+        &bmi_tcp_event_group,
+#ifdef __PVFS2_SERVER__
+        "bmi_server_recv",
+#else
+        "bmi_client_recv",
+#endif
+        "%d%d%d%llu%d%d",
+        "%d", &bmi_tcp_recv_event_id);
+
     gen_mutex_unlock(&interface_mutex);
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
                   "TCP/IP module successfully initialized.\n");
@@ -478,14 +524,14 @@ int BMI_tcp_initialize(bmi_method_addr_p
     /* cleanup data structures and bail out */
     for (i = 0; i < NUM_INDICES; i++)
     {
-	if (op_list_array[i])
-	{
-	    op_list_cleanup(op_list_array[i]);
-	}
+        if (op_list_array[i])
+        {
+            op_list_cleanup(op_list_array[i]);
+        }
     }
     if (tcp_socket_collection_p)
     {
-	BMI_socket_collection_finalize(tcp_socket_collection_p);
+        BMI_socket_collection_finalize(tcp_socket_collection_p);
     }
     gen_mutex_unlock(&interface_mutex);
     return (tmp_errno);
@@ -506,26 +552,26 @@ int BMI_tcp_finalize(void)
 
     /* shut down our listen addr, if we have one */
     if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
-	&& tcp_method_params.listen_addr)
+        && tcp_method_params.listen_addr)
     {
-	dealloc_tcp_method_addr(tcp_method_params.listen_addr);
+        dealloc_tcp_method_addr(tcp_method_params.listen_addr);
     }
 
     /* note that this forcefully shuts down operations */
     for (i = 0; i < NUM_INDICES; i++)
     {
-	if (op_list_array[i])
-	{
-	    op_list_cleanup(op_list_array[i]);
-	    op_list_array[i] = NULL;
-	}
+        if (op_list_array[i])
+        {
+            op_list_cleanup(op_list_array[i]);
+            op_list_array[i] = NULL;
+        }
     }
 
     /* get rid of socket collection */
     if (tcp_socket_collection_p)
     {
-	BMI_socket_collection_finalize(tcp_socket_collection_p);
-	tcp_socket_collection_p = NULL;
+        BMI_socket_collection_finalize(tcp_socket_collection_p);
+        tcp_socket_collection_p = NULL;
     }
 
     /* NOTE: we are trusting the calling BMI layer to deallocate 
@@ -618,7 +664,8 @@ void *BMI_tcp_memalloc(bmi_size_t size,
      * preferences about how the memory should be configured.
      */
 
-    return (calloc(1,(size_t) size));
+/*    return (calloc(1,(size_t) size)); */
+    return PINT_mem_aligned_alloc(size, 4096);
 }
 
 
@@ -632,16 +679,7 @@ int BMI_tcp_memfree(void *buffer,
 		    bmi_size_t size,
 		    enum bmi_op_type send_recv)
 {
-    /* NOTE: I am not going to bother to check to see if it is really our
-     * buffer.  This function trusts the caller.
-     * We also could care less whether it was a send or recv buffer.
-     */
-    if (buffer)
-    {
-	free(buffer);
-        buffer = NULL;
-    }
-
+    PINT_mem_aligned_free(buffer);
     return (0);
 }
 
@@ -876,6 +914,13 @@ int BMI_tcp_set_info(int option,
         break;
     }
 #endif
+    case BMI_TCP_CHECK_UNEXPECTED:
+    {
+        check_unexpected = *(int *)inout_parameter;
+        ret = 0;
+        break;
+    }
+
     default:
 	gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
                       "TCP hint %d not implemented.\n", option);
@@ -956,7 +1001,8 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
 		      enum bmi_buffer_type buffer_type,
 		      bmi_msg_tag_t tag,
 		      void *user_ptr,
-		      bmi_context_id context_id)
+		      bmi_context_id context_id,
+                      PVFS_hint hints)
 {
     struct tcp_msg_header my_header;
     int ret = -1;
@@ -984,13 +1030,9 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
 
     gen_mutex_lock(&interface_mutex);
 
-    ret = BMI_tcp_post_send_generic(id, dest, &buffer,
-				      &size, 1, buffer_type, my_header,
-				      user_ptr, context_id);
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_SEND, size, *id);
+    ret = tcp_post_send_generic(id, dest, &buffer,
+                                &size, 1, buffer_type, my_header,
+                                user_ptr, context_id, hints);
 
     gen_mutex_unlock(&interface_mutex);
     return(ret);
@@ -1011,7 +1053,8 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
 				enum bmi_buffer_type buffer_type,
 				bmi_msg_tag_t tag,
 				void *user_ptr,
-				bmi_context_id context_id)
+				bmi_context_id context_id,
+                                PVFS_hint hints)
 {
     struct tcp_msg_header my_header;
     int ret = -1;
@@ -1031,14 +1074,9 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
 
     gen_mutex_lock(&interface_mutex);
 
-    ret = BMI_tcp_post_send_generic(id, dest, &buffer,
-				      &size, 1, buffer_type, my_header,
-				      user_ptr, context_id);
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_SEND, size, *id);
-
+    ret = tcp_post_send_generic(id, dest, &buffer,
+                                &size, 1, buffer_type, my_header,
+                                user_ptr, context_id, hints);
     gen_mutex_unlock(&interface_mutex);
     return(ret);
 }
@@ -1060,7 +1098,8 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
 		      enum bmi_buffer_type buffer_type,
 		      bmi_msg_tag_t tag,
 		      void *user_ptr,
-		      bmi_context_id context_id)
+		      bmi_context_id context_id,
+                      PVFS_hint hints)
 {
     int ret = -1;
 
@@ -1082,14 +1121,9 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
     gen_mutex_lock(&interface_mutex);
 
     ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
-				1, expected_size, actual_size,
-				buffer_type, tag,
-				user_ptr, context_id);
-
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_RECV, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *actual_size, *id);
+                                1, expected_size, actual_size,
+                                buffer_type, tag,
+                                user_ptr, context_id, hints);
 
     gen_mutex_unlock(&interface_mutex);
     return (ret);
@@ -1136,10 +1170,11 @@ int BMI_tcp_test(bmi_op_id_t id,
 	}
 	(*error_code) = query_op->error_code;
 	(*actual_size) = query_op->actual_size;
-	if(query_op->send_recv == BMI_SEND)
-	    BMI_EVENT_END(PVFS_EVENT_BMI_SEND, *actual_size, id);
-	else
-	    BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *actual_size, id);
+        PINT_EVENT_END(
+            (query_op->send_recv == BMI_SEND ?
+             bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
+             query_op->event_id, id, *actual_size);
+
 	dealloc_tcp_method_op(query_op);
 	(*outcount)++;
     }
@@ -1155,14 +1190,14 @@ int BMI_tcp_test(bmi_op_id_t id,
  * returns 0 on success, -errno on failure
  */
 int BMI_tcp_testsome(int incount,
-		     bmi_op_id_t * id_array,
-		     int *outcount,
-		     int *index_array,
-		     bmi_error_code_t * error_code_array,
-		     bmi_size_t * actual_size_array,
-		     void **user_ptr_array,
-		     int max_idle_time,
-		     bmi_context_id context_id)
+                     bmi_op_id_t * id_array,
+                     int *outcount,
+                     int *index_array,
+                     bmi_error_code_t * error_code_array,
+                     bmi_size_t * actual_size_array,
+                     void **user_ptr_array,
+                     int max_idle_time,
+                     bmi_context_id context_id)
 {
     int ret = -1;
     method_op_p query_op = NULL;
@@ -1174,39 +1209,40 @@ int BMI_tcp_testsome(int incount,
     ret = tcp_do_work(max_idle_time);
     if (ret < 0)
     {
-	gen_mutex_unlock(&interface_mutex);
-	return (ret);
+        gen_mutex_unlock(&interface_mutex);
+        return (ret);
     }
 
     for(i=0; i<incount; i++)
     {
-	if(id_array[i])
-	{
-	    /* NOTE: this depends on the user passing in valid id's;
-	     * otherwise we segfault.  
-	     */
-	    query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
-	    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
-		BMI_TCP_COMPLETE)
-	    {
-		assert(query_op->context_id == context_id);
-		/* this one's done; pop it out */
-		op_list_remove(query_op);
-		error_code_array[*outcount] = query_op->error_code;
-		actual_size_array[*outcount] = query_op->actual_size;
-		index_array[*outcount] = i;
-		if (user_ptr_array != NULL)
-		{
-		    user_ptr_array[*outcount] = query_op->user_ptr;
-		}
-		if(query_op->send_recv == BMI_SEND)
-		    BMI_EVENT_END(PVFS_EVENT_BMI_SEND, query_op->actual_size, id_array[i]);
-		else
-		    BMI_EVENT_END(PVFS_EVENT_BMI_RECV, query_op->actual_size, id_array[i]);
-		dealloc_tcp_method_op(query_op);
-		(*outcount)++;
-	    }
-	}
+        if(id_array[i])
+        {
+            /* NOTE: this depends on the user passing in valid id's;
+             * otherwise we segfault.  
+             */
+            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
+            if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
+               BMI_TCP_COMPLETE)
+            {
+                assert(query_op->context_id == context_id);
+                /* this one's done; pop it out */
+                op_list_remove(query_op);
+                error_code_array[*outcount] = query_op->error_code;
+                actual_size_array[*outcount] = query_op->actual_size;
+                index_array[*outcount] = i;
+                if (user_ptr_array != NULL)
+                {
+                    user_ptr_array[*outcount] = query_op->user_ptr;
+                }
+                PINT_EVENT_END(
+                    (query_op->send_recv == BMI_SEND ?
+                     bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
+                    bmi_tcp_pid, NULL,
+                    query_op->event_id, actual_size_array[*outcount]);
+                dealloc_tcp_method_op(query_op);
+                (*outcount)++;
+            }
+        }
     }
 
     gen_mutex_unlock(&interface_mutex);
@@ -1292,7 +1328,8 @@ int BMI_tcp_testcontext(int incount,
          * that the next testunexpected call can pick it up without
          * delay
          */
-        if(!op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
+        if(check_unexpected &&
+           !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
         {
             gen_mutex_unlock(&interface_mutex);
             return(0);
@@ -1308,29 +1345,31 @@ int BMI_tcp_testcontext(int incount,
     }
 
     /* pop as many items off of the completion queue as we can */
-    while((*outcount < incount) && (query_op = 
-	op_list_shownext(completion_array[context_id])))
+    while((*outcount < incount) && 
+          (query_op =
+           op_list_shownext(completion_array[context_id])))
     {
         assert(query_op);
-	assert(query_op->context_id == context_id);
+        assert(query_op->context_id == context_id);
 
-	/* this one's done; pop it out */
-	op_list_remove(query_op);
-	error_code_array[*outcount] = query_op->error_code;
-	actual_size_array[*outcount] = query_op->actual_size;
-	out_id_array[*outcount] = query_op->op_id;
-	if (user_ptr_array != NULL)
-	{
-	    user_ptr_array[*outcount] = query_op->user_ptr;
-	}
-	if(query_op->send_recv == BMI_SEND)
-	    BMI_EVENT_END(PVFS_EVENT_BMI_SEND, query_op->actual_size, query_op->op_id);
-	else
-	    BMI_EVENT_END(PVFS_EVENT_BMI_RECV, query_op->actual_size, query_op->op_id);
+        /* this one's done; pop it out */
+        op_list_remove(query_op);
+        error_code_array[*outcount] = query_op->error_code;
+        actual_size_array[*outcount] = query_op->actual_size;
+        out_id_array[*outcount] = query_op->op_id;
+        if (user_ptr_array != NULL)
+        {
+            user_ptr_array[*outcount] = query_op->user_ptr;
+        }
 
-	dealloc_tcp_method_op(query_op);
+        PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
+                        bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
+                       bmi_tcp_pid, NULL, query_op->event_id,
+                       query_op->actual_size);
+
+        dealloc_tcp_method_op(query_op);
         query_op = NULL;
-	(*outcount)++;
+        (*outcount)++;
     }
 
     gen_mutex_unlock(&interface_mutex);
@@ -1356,7 +1395,8 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
 			   enum bmi_buffer_type buffer_type,
 			   bmi_msg_tag_t tag,
 			   void *user_ptr,
-			   bmi_context_id context_id)
+			   bmi_context_id context_id,
+                           PVFS_hint hints)
 {
     struct tcp_msg_header my_header;
     int ret = -1;
@@ -1385,14 +1425,9 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
 
     gen_mutex_lock(&interface_mutex);
 
-    ret = BMI_tcp_post_send_generic(id, dest, buffer_list,
-				      size_list, list_count, buffer_type,
-				      my_header, user_ptr, context_id);
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, *id);
-
+    ret = tcp_post_send_generic(id, dest, buffer_list,
+                                size_list, list_count, buffer_type,
+                                my_header, user_ptr, context_id, hints);
     gen_mutex_unlock(&interface_mutex);
     return(ret);
 }
@@ -1415,7 +1450,8 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
 			   enum bmi_buffer_type buffer_type,
 			   bmi_msg_tag_t tag,
 			   void *user_ptr,
-			   bmi_context_id context_id)
+			   bmi_context_id context_id,
+                           PVFS_hint hints)
 {
     int ret = -1;
 
@@ -1427,14 +1463,9 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
     gen_mutex_lock(&interface_mutex);
 
     ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
-				list_count, total_expected_size,
-				total_actual_size, buffer_type, tag, user_ptr,
-				context_id);
-
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_RECV, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *total_actual_size, *id);
+                                list_count, total_expected_size,
+                                total_actual_size, buffer_type, tag, user_ptr,
+                                context_id, hints);
 
     gen_mutex_unlock(&interface_mutex);
     return (ret);
@@ -1458,7 +1489,8 @@ int BMI_tcp_post_sendunexpected_list(bmi
 				     enum bmi_buffer_type buffer_type,
 				     bmi_msg_tag_t tag,
 				     void *user_ptr,
-				     bmi_context_id context_id)
+				     bmi_context_id context_id,
+                                     PVFS_hint hints)
 {
     struct tcp_msg_header my_header;
     int ret = -1;
@@ -1478,13 +1510,9 @@ int BMI_tcp_post_sendunexpected_list(bmi
 
     gen_mutex_lock(&interface_mutex);
 
-    ret = BMI_tcp_post_send_generic(id, dest, buffer_list,
-				      size_list, list_count, buffer_type,
-				      my_header, user_ptr, context_id);
-    if(ret >= 0)
-	BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
-    if(ret == 1)
-	BMI_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, *id);
+    ret = tcp_post_send_generic(id, dest, buffer_list,
+                                size_list, list_count, buffer_type,
+                                my_header, user_ptr, context_id, hints);
 
     gen_mutex_unlock(&interface_mutex);
     return(ret);
@@ -1742,12 +1770,12 @@ int BMI_tcp_query_addr_range(bmi_method_
         /* Invalid network address */
         if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
         {
-            gossip_lerr("Invalid network specification: %s\n", tcp_wildcard);
+            gossip_err("Invalid network specification: %s\n", tcp_wildcard);
             return -EINVAL;
         }
         /* Matches the subnet mask! */
         if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
-                == network_addr.sin_addr.s_addr)
+                == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
         {
             return 1;
         }
@@ -1847,7 +1875,7 @@ void tcp_forget_addr(bmi_method_addr_p m
 		     int error_code)
 {
     struct tcp_addr* tcp_addr_data = map->method_data;
-    PVFS_BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
+    BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
     int tmp_outcount;
     bmi_method_addr_p tmp_addr;
     int tmp_status;
@@ -1858,8 +1886,11 @@ void tcp_forget_addr(bmi_method_addr_p m
 	/* perform a test to force the socket collection to act on the remove
 	 * request before continuing
 	 */
-	BMI_socket_collection_testglobal(tcp_socket_collection_p,
-	    0, &tmp_outcount, &tmp_addr, &tmp_status, 0, &interface_mutex);
+        if(!sc_test_busy)
+        {
+            BMI_socket_collection_testglobal(tcp_socket_collection_p,
+                0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
+        }
     }
 
     tcp_shutdown_addr(map);
@@ -2223,7 +2254,8 @@ static int enqueue_operation(op_list_p t
 			     void *user_ptr,
 			     bmi_size_t actual_size,
 			     bmi_size_t expected_size,
-			     bmi_context_id context_id)
+			     bmi_context_id context_id,
+                             int32_t eid)
 {
     method_op_p new_method_op = NULL;
     struct tcp_op *tcp_op_data = NULL;
@@ -2238,6 +2270,7 @@ static int enqueue_operation(op_list_p t
     }
 
     *id = new_method_op->op_id;
+    new_method_op->event_id = eid;
 
     /* set the fields */
     new_method_op->send_recv = send_recv;
@@ -2354,16 +2387,17 @@ static int enqueue_operation(op_list_p t
  * completion, -errno on failure
  */
 static int tcp_post_recv_generic(bmi_op_id_t * id,
-				 bmi_method_addr_p src,
-				 void *const *buffer_list,
-				 const bmi_size_t *size_list,
-				 int list_count,
-				 bmi_size_t expected_size,
-				 bmi_size_t * actual_size,
-				 enum bmi_buffer_type buffer_type,
-				 bmi_msg_tag_t tag,
-				 void *user_ptr,
-				 bmi_context_id context_id)
+                                 bmi_method_addr_p src,
+                                 void *const *buffer_list,
+                                 const bmi_size_t *size_list,
+                                 int list_count,
+                                 bmi_size_t expected_size,
+                                 bmi_size_t * actual_size,
+                                 enum bmi_buffer_type buffer_type,
+                                 bmi_msg_tag_t tag,
+                                 void *user_ptr,
+                                 bmi_context_id context_id,
+                                 PVFS_hint hints)
 {
     method_op_p query_op = NULL;
     int ret = -1;
@@ -2374,6 +2408,16 @@ static int tcp_post_recv_generic(bmi_op_
     bmi_size_t copy_size = 0;
     bmi_size_t total_copied = 0;
     int i;
+    PINT_event_id eid = 0;
+
+    PINT_EVENT_START(
+        bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
+        PINT_HINT_GET_CLIENT_ID(hints),
+        PINT_HINT_GET_REQUEST_ID(hints),
+        PINT_HINT_GET_RANK(hints),
+        PINT_HINT_GET_HANDLE(hints),
+        PINT_HINT_GET_OP_ID(hints),
+        expected_size);
 
     tcp_addr_data = src->method_data;
 
@@ -2382,9 +2426,11 @@ static int tcp_post_recv_generic(bmi_op_
      */
     if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
     {
-	gossip_debug(GOSSIP_BMI_DEBUG_TCP, 
-	"Warning: BMI communication attempted on an address in failure mode.\n");
-	return(tcp_addr_data->addr_error);
+        gossip_debug(
+            GOSSIP_BMI_DEBUG_TCP,
+            "Warning: BMI communication attempted "
+            "on an address in failure mode.\n");
+        return(tcp_addr_data->addr_error);
     }
 
     /* lets make sure that the message hasn't already been fully
@@ -2397,163 +2443,170 @@ static int tcp_post_recv_generic(bmi_op_
     key.msg_tag_yes = 1;
 
     query_op =
-	op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
+        op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
     if (query_op)
     {
-	/* make sure it isn't too big */
-	if (query_op->actual_size > expected_size)
-	{
-	    gossip_err("Error: message ordering violation;\n");
-	    gossip_err("Error: message too large for next buffer.\n");
-	    return (bmi_tcp_errno_to_pvfs(-EPROTO));
-	}
+        /* make sure it isn't too big */
+        if (query_op->actual_size > expected_size)
+        {
+            gossip_err("Error: message ordering violation;\n");
+            gossip_err("Error: message too large for next buffer.\n");
+            return (bmi_tcp_errno_to_pvfs(-EPROTO));
+        }
 
-	/* whoohoo- it is already done! */
-	/* copy buffer out to list segments; handle short case */
-	for (i = 0; i < list_count; i++)
-	{
-	    copy_size = size_list[i];
-	    if (copy_size + total_copied > query_op->actual_size)
-	    {
-		copy_size = query_op->actual_size - total_copied;
-	    }
-	    memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
-					     total_copied), copy_size);
-	    total_copied += copy_size;
-	    if (total_copied == query_op->actual_size)
-	    {
-		break;
-	    }
-	}
-	/* copy out to correct memory regions */
-	(*actual_size) = query_op->actual_size;
-	free(query_op->buffer);
-	*id = 0;
-	op_list_remove(query_op);
-	dealloc_tcp_method_op(query_op);
-	return (1);
+        /* whoohoo- it is already done! */
+        /* copy buffer out to list segments; handle short case */
+        for (i = 0; i < list_count; i++)
+        {
+            copy_size = size_list[i];
+            if (copy_size + total_copied > query_op->actual_size)
+            {
+                copy_size = query_op->actual_size - total_copied;
+            }
+            memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
+                                             total_copied), copy_size);
+            total_copied += copy_size;
+            if (total_copied == query_op->actual_size)
+            {
+                break;
+            }
+        }
+        /* copy out to correct memory regions */
+        (*actual_size) = query_op->actual_size;
+        free(query_op->buffer);
+        *id = 0;
+        op_list_remove(query_op);
+        dealloc_tcp_method_op(query_op);
+        PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
+                       *actual_size);
+
+        return (1);
     }
 
     /* look for a message that is already being received */
     query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
     if (query_op)
     {
-	tcp_op_data = query_op->method_data;
+        tcp_op_data = query_op->method_data;
     }
 
     /* see if it is being buffered into a temporary memory region */
     if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
     {
-	/* make sure it isn't too big */
-	if (query_op->actual_size > expected_size)
-	{
-	    gossip_err("Error: message ordering violation;\n");
-	    gossip_err("Error: message too large for next buffer.\n");
-	    return (bmi_tcp_errno_to_pvfs(-EPROTO));
-	}
-
-	/* copy what we have so far into the correct buffers */
-	total_copied = 0;
-	for (i = 0; i < list_count; i++)
-	{
-	    copy_size = size_list[i];
-	    if (copy_size + total_copied > query_op->amt_complete)
-	    {
-		copy_size = query_op->amt_complete - total_copied;
-	    }
-	    if (copy_size > 0)
-	    {
-		memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
-						 total_copied), copy_size);
-	    }
-	    total_copied += copy_size;
-	    if (total_copied == query_op->amt_complete)
-	    {
-		query_op->list_index = i;
-		query_op->cur_index_complete = copy_size;
-		break;
-	    }
-	}
+        /* make sure it isn't too big */
+        if (query_op->actual_size > expected_size)
+        {
+            gossip_err("Error: message ordering violation;\n");
+            gossip_err("Error: message too large for next buffer.\n");
+            return (bmi_tcp_errno_to_pvfs(-EPROTO));
+        }
 
-	/* see if we ended on a buffer boundary */
-	if (query_op->cur_index_complete ==
-	    query_op->size_list[query_op->list_index])
-	{
-	    query_op->list_index++;
-	    query_op->cur_index_complete = 0;
-	}
+        /* copy what we have so far into the correct buffers */
+        total_copied = 0;
+        for (i = 0; i < list_count; i++)
+        {
+            copy_size = size_list[i];
+            if (copy_size + total_copied > query_op->amt_complete)
+            {
+                copy_size = query_op->amt_complete - total_copied;
+            }
+            if (copy_size > 0)
+            {
+                memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
+                                                 total_copied), copy_size);
+            }
+            total_copied += copy_size;
+            if (total_copied == query_op->amt_complete)
+            {
+                query_op->list_index = i;
+                query_op->cur_index_complete = copy_size;
+                break;
+            }
+        }
 
-	/* release the old buffer */
-	if (query_op->buffer)
-	{
-	    free(query_op->buffer);
-	}
+        /* see if we ended on a buffer boundary */
+        if (query_op->cur_index_complete ==
+            query_op->size_list[query_op->list_index])
+        {
+            query_op->list_index++;
+            query_op->cur_index_complete = 0;
+        }
 
-	*id = query_op->op_id;
-	tcp_op_data = query_op->method_data;
-	tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+        /* release the old buffer */
+        if (query_op->buffer)
+        {
+            free(query_op->buffer);
+        }
 
-	query_op->list_count = list_count;
-	query_op->user_ptr = user_ptr;
-	query_op->context_id = context_id;
-	/* if there is only one item in the list, then keep the list stored
-	 * in the op structure.  This allows us to use the same code for send
-	 * and recv as we use for send_list and recv_list, without having to 
-	 * malloc lists for those special cases
-	 */
-	if (list_count == 1)
-	{
-	    query_op->buffer_list = &tcp_op_data->buffer_list_stub;
-	    query_op->size_list = &tcp_op_data->size_list_stub;
-	    ((void **)query_op->buffer_list)[0] = buffer_list[0];
-	    ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
-	}
-	else
-	{
-	    query_op->buffer_list = buffer_list;
-	    query_op->size_list = size_list;
-	}
+        *id = query_op->op_id;
+        tcp_op_data = query_op->method_data;
+        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+
+        query_op->list_count = list_count;
+        query_op->user_ptr = user_ptr;
+        query_op->context_id = context_id;
+        /* if there is only one item in the list, then keep the list stored
+         * in the op structure.  This allows us to use the same code for send
+         * and recv as we use for send_list and recv_list, without having to 
+         * malloc lists for those special cases
+         */
+        if (list_count == 1)
+        {
+            query_op->buffer_list = &tcp_op_data->buffer_list_stub;
+            query_op->size_list = &tcp_op_data->size_list_stub;
+            ((void **)query_op->buffer_list)[0] = buffer_list[0];
+            ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
+        }
+        else
+        {
+            query_op->buffer_list = buffer_list;
+            query_op->size_list = size_list;
+        }
 
-	if (query_op->amt_complete < query_op->actual_size)
-	{
-	    /* try to recv some more data */
-	    tcp_addr_data = query_op->addr->method_data;
-	    ret = payload_progress(tcp_addr_data->socket,
-		query_op->buffer_list,
-		query_op->size_list,
-		query_op->list_count,
-		query_op->actual_size,
-		&(query_op->list_index),
-		&(query_op->cur_index_complete),
-		BMI_RECV,
-		NULL,
-		0);
-	    if (ret < 0)
-	    {
+        if (query_op->amt_complete < query_op->actual_size)
+        {
+            /* try to recv some more data */
+            tcp_addr_data = query_op->addr->method_data;
+            ret = payload_progress(tcp_addr_data->socket,
+                                   query_op->buffer_list,
+                                   query_op->size_list,
+                                   query_op->list_count,
+                                   query_op->actual_size,
+                                   &(query_op->list_index),
+                                   &(query_op->cur_index_complete),
+                                   BMI_RECV,
+                                   NULL,
+                                   0);
+            if (ret < 0)
+            {
                 PVFS_perror_gossip("Error: payload_progress", ret);
                 /* payload_progress() returns BMI error codes */
-		tcp_forget_addr(query_op->addr, 0, ret);
-		return (ret);
-	    }
+                tcp_forget_addr(query_op->addr, 0, ret);
+                return (ret);
+            }
 
-	    query_op->amt_complete += ret;
-	}
-	assert(query_op->amt_complete <= query_op->actual_size);
-	if (query_op->amt_complete == query_op->actual_size)
-	{
-	    /* we are done */
-	    op_list_remove(query_op);
-	    *id = 0;
-	    (*actual_size) = query_op->actual_size;
-	    dealloc_tcp_method_op(query_op);
-	    return (1);
-	}
-	else
-	{
-	    /* there is still more work to do */
-	    tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
-	    return (0);
-	}
+            query_op->amt_complete += ret;
+        }
+        assert(query_op->amt_complete <= query_op->actual_size);
+        if (query_op->amt_complete == query_op->actual_size)
+        {
+            /* we are done */
+            op_list_remove(query_op);
+            *id = 0;
+            (*actual_size) = query_op->actual_size;
+            dealloc_tcp_method_op(query_op);
+            PINT_EVENT_END(
+                bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
+                0, *actual_size);
+
+            return (1);
+        }
+        else
+        {
+            /* there is still more work to do */
+            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+            return (0);
+        }
     }
 
     /* NOTE: if the message was in flight, but not buffering, then
@@ -2566,18 +2619,18 @@ static int tcp_post_recv_generic(bmi_op_
     /* if we hit this point we must enqueue */
     if (expected_size <= TCP_MODE_EAGER_LIMIT)
     {
-	bogus_header.mode = TCP_MODE_EAGER;
+        bogus_header.mode = TCP_MODE_EAGER;
     }
     else
     {
-	bogus_header.mode = TCP_MODE_REND;
+        bogus_header.mode = TCP_MODE_REND;
     }
     bogus_header.tag = tag;
     ret = enqueue_operation(op_list_array[IND_RECV],
-			    BMI_RECV, src, buffer_list, size_list,
-			    list_count, 0, 0, id, BMI_TCP_INPROGRESS,
-			    bogus_header, user_ptr, 0,
-			    expected_size, context_id);
+                            BMI_RECV, src, buffer_list, size_list,
+                            list_count, 0, 0, id, BMI_TCP_INPROGRESS,
+                            bogus_header, user_ptr, 0,
+                            expected_size, context_id, eid);
     /* just for safety; this field isn't valid to the caller anymore */
     (*actual_size) = 0;
     /* TODO: figure out why this causes deadlocks; observable in 2
@@ -2588,11 +2641,11 @@ static int tcp_post_recv_generic(bmi_op_
 #if 0
     if (ret >= 0)
     {
-	/* go ahead and try to do some work while we are in this
-	 * function since we appear to be backlogged.  Make sure that
-	 * we do not wait in the poll, however.
-	 */
-	ret = tcp_do_work(0);
+        /* go ahead and try to do some work while we are in this
+         * function since we appear to be backlogged.  Make sure that
+         * we do not wait in the poll, however.
+         */
+        ret = tcp_do_work(0);
     }
 #endif
     return (ret);
@@ -2688,17 +2741,56 @@ static int tcp_do_work(int max_idle_time
     int busy_flag = 1;
     struct timespec req;
     struct tcp_addr* tcp_addr_data = NULL;
+    struct timespec wait_time;
+    struct timeval start;
 
-    /* now we need to poll and see what to work on */
-    /* drop mutex while we make this call */
+    if(sc_test_busy)
+    {
+        /* another thread is already polling or working on sockets */
+        if(max_idle_time == 0)
+        {
+            /* we don't want to spend time waiting on it; return
+             * immediately.
+             */
+            return(0);
+        }
+
+        /* Sleep until working thread thread signals that it has finished
+         * its work and then return.  No need for this thread to poll;
+         * the other thread may have already finished what we wanted.
+         * This condition wait is used strictly as a best effort to
+         * prevent busy spin.  We'll sort out the results later.
+         */
+        gettimeofday(&start, NULL);
+        wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
+        wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
+        if (wait_time.tv_nsec > 1000000000)
+        {
+            wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
+            wait_time.tv_sec++;
+        }
+        gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
+        return(0);
+    }
+
+    /* this thread has gained control of the polling.  */
+    sc_test_busy = 1;
     gen_mutex_unlock(&interface_mutex);
+
+    /* our turn to look at the socket collection */
     ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
 				       TCP_WORK_METRIC, &socket_count,
 				       addr_array, status_array,
-				       max_idle_time, &interface_mutex);
+				       max_idle_time);
+
     gen_mutex_lock(&interface_mutex);
+    sc_test_busy = 0;
+
     if (ret < 0)
     {
+        /* wake up anyone else who might have been waiting */
+        gen_cond_broadcast(&interface_cond);
+        PVFS_perror_gossip("Error: socket collection:", ret);
         /* BMI_socket_collection_testglobal() returns BMI error code */
 	return (ret);
     }
@@ -2723,7 +2815,7 @@ static int tcp_do_work(int max_idle_time
 	    ret = tcp_do_work_error(addr_array[i]);
 	    if (ret < 0)
 	    {
-		return (ret);
+                PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
 	    }
 	}
 	else
@@ -2733,8 +2825,8 @@ static int tcp_do_work(int max_idle_time
 		ret = tcp_do_work_send(addr_array[i], &stall_flag);
 		if (ret < 0)
 		{
-		    return (ret);
-		}
+                    PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
+                }
 		if(!stall_flag)
 		    busy_flag = 0;
 	    }
@@ -2743,7 +2835,7 @@ static int tcp_do_work(int max_idle_time
 		ret = tcp_do_work_recv(addr_array[i], &stall_flag);
 		if (ret < 0)
 		{
-		    return (ret);
+                    PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
 		}
 		if(!stall_flag)
 		    busy_flag = 0;
@@ -2768,6 +2860,8 @@ static int tcp_do_work(int max_idle_time
         gen_mutex_lock(&interface_mutex);
     }
 
+    /* wake up anyone else who might have been waiting */
+    gen_cond_broadcast(&interface_cond);
     return (0);
 }
 
@@ -2899,6 +2993,7 @@ static int tcp_do_work_recv(bmi_method_a
     int tmp_errno;
     int tmp;
     bmi_size_t old_amt_complete = 0;
+    time_t current_time;
 
     *stall_flag = 1;
 
@@ -2993,10 +3088,25 @@ static int tcp_do_work_recv(bmi_method_a
 
     if (ret < TCP_ENC_HDR_SIZE)
     {
-	/* header not ready yet */
+        current_time = time(NULL);
+        if(!tcp_addr_data->short_header_timer)
+        {
+            tcp_addr_data->short_header_timer = current_time;
+        }
+        else if((current_time - tcp_addr_data->short_header_timer) > 
+            BMI_TCP_HEADER_WAIT_SECONDS)
+        {
+	    gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
+                BMI_TCP_HEADER_WAIT_SECONDS);
+            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
+            return (0);
+        }
+
+	/* header not ready yet, but we will keep hoping */
 	return (0);
     }
 
+    tcp_addr_data->short_header_timer = 0;
     *stall_flag = 0;
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
     ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
@@ -3462,7 +3572,7 @@ static int tcp_allow_trusted(struct sock
         {
             /* check with all the masks */
             if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr) 
-                    != gtcp_allowed_connection->network[i].s_addr)
+                    != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
             {
                 continue;
             }
@@ -3489,7 +3599,7 @@ port_check:
         return 0;
     }
     /* no good */
-    gossip_lerr("Rejecting client %s on port %d: %s\n",
+    gossip_err("Rejecting client %s on port %d: %s\n",
            peer_hostname, peer_port, bad_errors[what_failed]);
     return -1;
 }
@@ -3557,14 +3667,7 @@ static int tcp_accept_init(int *socket, 
     {
         /* Force closure of the connection */
         close(*socket);
-        errno = EACCES;
-        /* FIXME: 
-         * BIG KLUDGE
-         * if we return an error, pvfs2-server's bmi thread simply terminates. 
-         * hence I am returning 0 here. Need to ask Phil or RobR about this...
-         */
-        *socket = -1;
-        return 0;
+        return (bmi_tcp_errno_to_pvfs(-EACCES));
     }
 
 #endif
@@ -3630,31 +3733,52 @@ static void dealloc_tcp_method_op(method
     return;
 }
 
-/* BMI_tcp_post_send_generic()
+/* tcp_post_send_generic()
  * 
  * Submits send operations (low level).
  *
  * returns 0 on success that requires later poll, returns 1 on instant
  * completion, -errno on failure
  */
-static int BMI_tcp_post_send_generic(bmi_op_id_t * id,
-				     bmi_method_addr_p dest,
-				     const void *const *buffer_list,
-				     const bmi_size_t *size_list,
-				     int list_count,
-				     enum bmi_buffer_type buffer_type,
-				     struct tcp_msg_header my_header,
-				     void *user_ptr,
-				     bmi_context_id context_id)
+static int tcp_post_send_generic(bmi_op_id_t * id,
+                                 bmi_method_addr_p dest,
+                                 const void *const *buffer_list,
+                                 const bmi_size_t *size_list,
+                                 int list_count,
+                                 enum bmi_buffer_type buffer_type,
+                                 struct tcp_msg_header my_header,
+                                 void *user_ptr,
+                                 bmi_context_id context_id,
+                                 PVFS_hint hints)
 {
     struct tcp_addr *tcp_addr_data = dest->method_data;
     method_op_p query_op = NULL;
     int ret = -1;
+    bmi_size_t total_size = 0;
     bmi_size_t amt_complete = 0;
     bmi_size_t env_amt_complete = 0;
     struct op_list_search_key key;
     int list_index = 0;
     bmi_size_t cur_index_complete = 0;
+    PINT_event_id eid = 0;
+
+    if(PINT_EVENT_ENABLED)
+    {
+        int i = 0;
+        for(; i < list_count; ++i)
+        {
+            total_size += size_list[i];
+        }
+    }
+
+    PINT_EVENT_START(
+        bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
+        PINT_HINT_GET_CLIENT_ID(hints),
+        PINT_HINT_GET_REQUEST_ID(hints),
+        PINT_HINT_GET_RANK(hints),
+        PINT_HINT_GET_HANDLE(hints),
+        PINT_HINT_GET_OP_ID(hints),
+        total_size);
 
     /* Three things can happen here:
      * a) another op is already in queue for the address, so we just
@@ -3684,13 +3808,14 @@ static int BMI_tcp_post_send_generic(bmi
     query_op = op_list_search(op_list_array[IND_SEND], &key);
     if (query_op)
     {
-	/* queue up operation */
-	ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
-				dest, (void **) buffer_list,
-				size_list, list_count, 0, 0,
-				id, BMI_TCP_INPROGRESS, my_header, user_ptr,
-				my_header.size, 0,
-				context_id);
+        /* queue up operation */
+        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
+                                dest, (void **) buffer_list,
+                                size_list, list_count, 0, 0,
+                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
+                                my_header.size, 0,
+                                context_id,
+                                eid);
 
         /* TODO: is this causing deadlocks?  See similar call in recv
          * path for another example.  This particular one seems to be an
@@ -3722,6 +3847,7 @@ static int BMI_tcp_post_send_generic(bmi
 	gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
         /* tcp_sock_init() returns BMI error code */
 	tcp_forget_addr(dest, 0, ret);
+        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
 	return (ret);
     }
 
@@ -3746,7 +3872,8 @@ static int BMI_tcp_post_send_generic(bmi
 				list_count, 0, 0,
 				id, BMI_TCP_INPROGRESS, my_header, user_ptr,
 				my_header.size, 0,
-				context_id);
+				context_id,
+                                eid);
 	if(ret < 0)
 	{
 	    gossip_err("Error: enqueue_operation() returned: %d\n", ret);
@@ -3765,6 +3892,7 @@ static int BMI_tcp_post_send_generic(bmi
         PVFS_perror_gossip("Error: payload_progress", ret);
         /* payload_progress() returns BMI error codes */
 	tcp_forget_addr(dest, 0, ret);
+        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
 	return (ret);
     }
 
@@ -3773,21 +3901,23 @@ static int BMI_tcp_post_send_generic(bmi
     assert(amt_complete <= my_header.size);
     if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
     {
-	/* we are already done */
-	return (1);
+        /* we are already done */
+        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
+                       NULL, eid, 0, amt_complete);
+        return (1);
     }
 
     /* queue up the remainder */
     ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
-			    dest, (void **) buffer_list,
-			    size_list, list_count,
-			    amt_complete, env_amt_complete, id,
-			    BMI_TCP_INPROGRESS, my_header, user_ptr,
-			    my_header.size, 0, context_id);
+                            dest, (void **) buffer_list,
+                            size_list, list_count,
+                            amt_complete, env_amt_complete, id,
+                            BMI_TCP_INPROGRESS, my_header, user_ptr,
+                            my_header.size, 0, context_id, eid);
 
     if(ret < 0)
     {
-	gossip_err("Error: enqueue_operation() returned: %d\n", ret);
+        gossip_err("Error: enqueue_operation() returned: %d\n", ret);
     }
     return (ret);
 }

Index: module.mk.in
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/module.mk.in,v
diff -p -u -r1.5 -r1.5.8.1
--- module.mk.in	22 Jul 2007 16:02:16 -0000	1.5
+++ module.mk.in	25 Aug 2009 17:56:11 -0000	1.5.8.1
@@ -11,13 +11,19 @@ SERVERSRC += \
 	$(DIR)/bmi-tcp.c \
 	$(DIR)/sockio.c
 
+LIBBMISRC += \
+	$(DIR)/bmi-tcp.c \
+	$(DIR)/sockio.c
+
 ifdef BUILD_EPOLL
 LIBSRC += $(DIR)/socket-collection-epoll.c
 SERVERSRC += $(DIR)/socket-collection-epoll.c
+LIBBMISRC += $(DIR)/socket-collection-epoll.c
 MODCFLAGS_$(DIR)/bmi-tcp.c := -D__PVFS2_USE_EPOLL__
 else
 LIBSRC += $(DIR)/socket-collection.c
 SERVERSRC += $(DIR)/socket-collection.c
+LIBBMISRC += $(DIR)/socket-collection.c
 endif
 
 endif  # BUILD_BMI_TCP

Index: socket-collection-epoll.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection-epoll.c,v
diff -p -u -r1.6 -r1.6.8.1
--- socket-collection-epoll.c	6 Nov 2007 23:08:36 -0000	1.6
+++ socket-collection-epoll.c	25 Aug 2009 17:56:11 -0000	1.6.8.1
@@ -65,11 +65,6 @@ socket_collection_p BMI_socket_collectio
         return(NULL);
     }
 
-    gen_mutex_init(&tmp_scp->mutex);
-    gen_mutex_init(&tmp_scp->queue_mutex);
-
-    INIT_QLIST_HEAD(&tmp_scp->remove_queue);
-    INIT_QLIST_HEAD(&tmp_scp->add_queue);
     tmp_scp->server_socket = new_server_socket;
 
     if(new_server_socket > -1)
@@ -82,10 +77,6 @@ socket_collection_p BMI_socket_collectio
         if(ret < 0 && errno != EEXIST)
         {
             gossip_err("Error: epoll_ctl() failure: %s.\n", strerror(errno));
-#if 0
-            gen_mutex_destroy(&tmp_scp->mutex);
-            gen_mutex_destroy(&tmp_scp->queue_mutex);
-#endif
             free(tmp_scp);
             return(NULL);
         }
@@ -94,48 +85,6 @@ socket_collection_p BMI_socket_collectio
     return (tmp_scp);
 }
 
-/* socket_collection_queue()
- * 
- * queues a tcp method_addr for addition or removal from the collection.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_queue(socket_collection_p scp,
-			   bmi_method_addr_p map, struct qlist_head* queue)
-{
-    struct qlist_head* iterator = NULL;
-    struct qlist_head* scratch = NULL;
-    struct tcp_addr* tcp_addr_data = NULL;
-
-    /* make sure that this address isn't already slated for addition/removal */
-    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
-    {
-	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
-	if(tcp_addr_data->map == map)
-	{
-	    qlist_del(&tcp_addr_data->sc_link);
-	    break;
-	}
-    }
-    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
-    {
-	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
-	if(tcp_addr_data->map == map)
-	{
-	    qlist_del(&tcp_addr_data->sc_link);
-	    break;
-	}
-    }
-
-    /* add it on to the appropriate queue */
-    tcp_addr_data = map->method_data;
-    /* add to head, we are likely to access it again soon */
-    qlist_add(&tcp_addr_data->sc_link, queue);
-
-    return;
-}
-
-
 /* socket_collection_finalize()
  *
  * destroys a socket collection.  IMPORTANT:  It DOES NOT destroy the
@@ -146,10 +95,6 @@ void BMI_socket_collection_queue(socket_
  */
 void BMI_socket_collection_finalize(socket_collection_p scp)
 {
-#if 0
-    gen_mutex_destroy(&scp->mutex);
-    gen_mutex_destroy(&scp->queue_mutex);
-#endif
     free(scp);
     return;
 }
@@ -170,112 +115,19 @@ int BMI_socket_collection_testglobal(soc
 				 int *outcount,
 				 bmi_method_addr_p * maps,
 				 int * status,
-				 int poll_timeout,
-				 gen_mutex_t* external_mutex)
+				 int poll_timeout)
 {
-    struct qlist_head* iterator = NULL;
-    struct qlist_head* scratch = NULL;
     struct tcp_addr* tcp_addr_data = NULL;
     int ret = -1;
     int old_errno;
     int tmp_count;
     int i;
-    int skip_flag;
-#ifndef __PVFS2_JOB_THREADED__
-    struct epoll_event event;
-#endif
 
     /* init the outgoing arguments for safety */
     *outcount = 0;
     memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
     memset(status, 0, (sizeof(int) * incount));
 
-    gen_mutex_lock(&scp->mutex);
-
-#ifndef __PVFS2_JOB_THREADED__
-    gen_mutex_lock(&scp->queue_mutex);
-
-    /* look for addresses slated for removal */
-    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
-    {
-	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
-	qlist_del(&tcp_addr_data->sc_link);
-        
-
-        /* take out of the epoll set */
-        if(tcp_addr_data->sc_index > -1)
-        {
-            memset(&event, 0, sizeof(event));
-            event.events = 0;
-            event.data.ptr = tcp_addr_data->map;
-
-            ret = epoll_ctl(scp->epfd, EPOLL_CTL_DEL, tcp_addr_data->socket,
-                &event);
-
-            if(ret < 0 && errno != ENOENT)
-            {
-                /* TODO: error handling */
-                gossip_lerr("Error: epoll_ctl() failure: %s\n",
-                    strerror(errno));
-                assert(0);
-            }
-
-	    tcp_addr_data->sc_index = -1;
-	    tcp_addr_data->write_ref_count = 0;
-        }
-    }
-
-    /* look for addresses slated for addition */
-    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
-    {
-	tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
-	qlist_del(&tcp_addr_data->sc_link);
-
-	if(tcp_addr_data->sc_index > -1)
-	{
-            memset(&event, 0, sizeof(event));
-	    /* update existing entry */
-            event.data.ptr = tcp_addr_data->map;
-            event.events = (EPOLLIN|EPOLLERR|EPOLLHUP);
-	    if(tcp_addr_data->write_ref_count > 0)
-                event.events |= EPOLLOUT;
-
-            ret = epoll_ctl(scp->epfd, EPOLL_CTL_MOD, tcp_addr_data->socket,
-                            &event);
-
-            if(ret < 0 && errno != ENOENT)
-            {
-                /* TODO: error handling */
-                gossip_lerr("Error: epoll_ctl() failure: %s\n",
-                    strerror(errno));
-                assert(0);
-            }
-	}
-	else
-	{
-	    /* new entry */
-            tcp_addr_data->sc_index = 1;
-
-            memset(&event, 0, sizeof(event));
-            event.data.ptr = tcp_addr_data->map;
-            event.events = (EPOLLIN|EPOLLERR|EPOLLHUP);
-	    if(tcp_addr_data->write_ref_count > 0)
-                event.events |= EPOLLOUT;
-
-            ret = epoll_ctl(scp->epfd, EPOLL_CTL_ADD, tcp_addr_data->socket,
-                &event);
-            if(ret < 0 && errno != EEXIST)
-            {
-                /* TODO: error handling */
-                gossip_lerr("Error: epoll_ctl() failure: %s\n",
-                    strerror(errno));
-                assert(0);
-            }
-	}
-    }
-    gen_mutex_unlock(&scp->queue_mutex);
-#endif
-
     /* actually do the epoll_wait() here */
     do
     {
@@ -291,14 +143,12 @@ int BMI_socket_collection_testglobal(soc
 
     if(ret < 0)
     {
-	gen_mutex_unlock(&scp->mutex);
 	return(-old_errno);
     }
 
     /* nothing ready, just return */
     if(ret == 0)
     {
-	gen_mutex_unlock(&scp->mutex);
 	return(0);
     }
 
@@ -307,22 +157,6 @@ int BMI_socket_collection_testglobal(soc
     for(i=0; i<tmp_count; i++)
     {
         assert(scp->event_array[i].events);
-        skip_flag = 0;
-
-        /* make sure this addr hasn't been removed */
-        gen_mutex_lock(&scp->queue_mutex);
-        qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
-        {
-            tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
-            if(tcp_addr_data->map == scp->event_array[i].data.ptr)
-            {
-                skip_flag = 1;
-                break;
-            }
-        }
-        gen_mutex_unlock(&scp->queue_mutex);
-        if(skip_flag)
-            continue;
 
         if(scp->event_array[i].events & ERRMASK)
             status[*outcount] |= SC_ERROR_BIT;
@@ -350,8 +184,6 @@ int BMI_socket_collection_testglobal(soc
 
         *outcount = (*outcount) + 1;
     }
-
-    gen_mutex_unlock(&scp->mutex);
 
     return (0);
 }

Index: socket-collection-epoll.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection-epoll.h,v
diff -p -u -r1.5 -r1.5.2.1
--- socket-collection-epoll.h	11 Feb 2008 19:32:00 -0000	1.5
+++ socket-collection-epoll.h	25 Aug 2009 17:56:11 -0000	1.5.2.1
@@ -30,14 +30,8 @@
 
 struct socket_collection
 {
-    gen_mutex_t mutex;
-
     int epfd;
     
-    gen_mutex_t queue_mutex;
-    struct qlist_head add_queue;
-    struct qlist_head remove_queue;
-
     struct epoll_event event_array[BMI_EPOLL_MAX_PER_CYCLE];
 
     int server_socket;
@@ -52,58 +46,10 @@ enum
 };
 
 socket_collection_p BMI_socket_collection_init(int new_server_socket);
-void BMI_socket_collection_queue(socket_collection_p scp,
-			   bmi_method_addr_p map, struct qlist_head* queue);
 
 /* the bmi_tcp code may try to add a socket to the collection before
  * it is fully connected, just ignore in this case
  */
-/* TODO: maybe optimize later; with epoll it is safe to add a new descriptor
- * while a poll is in progress, so we could skip lock and queue in some
- * cases.
- */
-#ifndef __PVFS2_JOB_THREADED__
-
-#define BMI_socket_collection_add(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    if(tcp_data->socket > -1){ \
-	gen_mutex_lock(&((s)->queue_mutex)); \
-	BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
-	gen_mutex_unlock(&((s)->queue_mutex)); \
-    } \
-} while(0)
-
-#define BMI_socket_collection_remove(s, m) \
-do { \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-/* we _must_ have a valid socket at this point if we want to write data */
-#define BMI_socket_collection_add_write_bit(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    assert(tcp_data->socket > -1); \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    tcp_data->write_ref_count++; \
-    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#define BMI_socket_collection_remove_write_bit(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    tcp_data->write_ref_count--; \
-    assert(tcp_data->write_ref_count > -1); \
-    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#else
-
 #define BMI_socket_collection_add(s, m) \
 do { \
     struct tcp_addr* tcp_data = (m)->method_data; \
@@ -154,16 +100,13 @@ do { \
     }\
 } while(0)
 
-#endif
-
 void BMI_socket_collection_finalize(socket_collection_p scp);
 int BMI_socket_collection_testglobal(socket_collection_p scp,
 				 int incount,
 				 int *outcount,
 				 bmi_method_addr_p * maps,
 				 int * status,
-				 int poll_timeout,
-				 gen_mutex_t* external_mutex);
+				 int poll_timeout);
 
 #endif /* __SOCKET_COLLECTION_EPOLL_H */
 

Index: socket-collection.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection.c,v
diff -p -u -r1.17 -r1.17.8.1
--- socket-collection.c	6 Nov 2007 23:08:36 -0000	1.17
+++ socket-collection.c	25 Aug 2009 17:56:11 -0000	1.17.8.1
@@ -55,7 +55,6 @@ socket_collection_p BMI_socket_collectio
 
     memset(tmp_scp, 0, sizeof(struct socket_collection));
 
-    gen_mutex_init(&tmp_scp->mutex);
     gen_mutex_init(&tmp_scp->queue_mutex);
 
     tmp_scp->pollfd_array = (struct
@@ -177,8 +176,7 @@ int BMI_socket_collection_testglobal(soc
 				 int *outcount,
 				 bmi_method_addr_p * maps,
 				 int * status,
-				 int poll_timeout,
-				 gen_mutex_t* external_mutex)
+				 int poll_timeout)
 {
     struct qlist_head* iterator = NULL;
     struct qlist_head* scratch = NULL;
@@ -202,8 +200,6 @@ do_again:
     memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
     memset(status, 0, (sizeof(int) * incount));
 
-    gen_mutex_lock(&scp->mutex);
-
     gen_mutex_lock(&scp->queue_mutex);
 
     /* look for addresses slated for removal */
@@ -291,14 +287,12 @@ do_again:
 
     if(ret < 0)
     {
-	gen_mutex_unlock(&scp->mutex);
 	return(bmi_tcp_errno_to_pvfs(-old_errno));
     }
 
     /* nothing ready, just return */
     if(ret == 0)
     {
-	gen_mutex_unlock(&scp->mutex);
 	return(0);
     }
 
@@ -369,8 +363,6 @@ do_again:
 	    *outcount = (*outcount) + 1;
 	}
     }
-
-    gen_mutex_unlock(&scp->mutex);
 
     /* Under the following conditions (i.e. all of them must be true) we go back to redoing poll
      * a) There were no outstanding sockets/fds that had data

Index: socket-collection.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection.h,v
diff -p -u -r1.13 -r1.13.8.1
--- socket-collection.h	6 Nov 2007 23:08:36 -0000	1.13
+++ socket-collection.h	25 Aug 2009 17:56:11 -0000	1.13.8.1
@@ -26,8 +26,6 @@
 
 struct socket_collection
 {
-    gen_mutex_t mutex;
-
     struct pollfd* pollfd_array;
     bmi_method_addr_p* addr_array;
     int array_max;
@@ -53,50 +51,9 @@ socket_collection_p BMI_socket_collectio
 void BMI_socket_collection_queue(socket_collection_p scp,
 			   bmi_method_addr_p map, struct qlist_head* queue);
 
-#ifndef __PVFS2_JOB_THREADED__
 /* the bmi_tcp code may try to add a socket to the collection before
  * it is fully connected, just ignore in this case
  */
-#define BMI_socket_collection_add(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    if(tcp_data->socket > -1){ \
-	gen_mutex_lock(&((s)->queue_mutex)); \
-	BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
-	gen_mutex_unlock(&((s)->queue_mutex)); \
-    } \
-} while(0)
-
-#define BMI_socket_collection_remove(s, m) \
-do { \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-/* we _must_ have a valid socket at this point if we want to write data */
-#define BMI_socket_collection_add_write_bit(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    assert(tcp_data->socket > -1); \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    tcp_data->write_ref_count++; \
-    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#define BMI_socket_collection_remove_write_bit(s, m) \
-do { \
-    struct tcp_addr* tcp_data = (m)->method_data; \
-    gen_mutex_lock(&((s)->queue_mutex)); \
-    tcp_data->write_ref_count--; \
-    assert(tcp_data->write_ref_count > -1); \
-    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
-    gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#else
-
 /* write a byte on the pipe_fd[1] so that poll breaks out in case it is idling */
 #define BMI_socket_collection_add(s, m) \
 do { \
@@ -144,17 +101,13 @@ do { \
     write(s->pipe_fd[1], &c, 1);\
 } while(0)
 
-
-#endif
-
 void BMI_socket_collection_finalize(socket_collection_p scp);
 int BMI_socket_collection_testglobal(socket_collection_p scp,
 				 int incount,
 				 int *outcount,
 				 bmi_method_addr_p * maps,
 				 int * status,
-				 int poll_timeout,
-				 gen_mutex_t* external_mutex);
+				 int poll_timeout);
 
 #endif /* __SOCKET_COLLECTION_H */
 

Index: sockio.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/sockio.c,v
diff -p -u -r1.30 -r1.30.2.1
--- sockio.c	22 Feb 2008 18:20:51 -0000	1.30
+++ sockio.c	25 Aug 2009 17:56:11 -0000	1.30.2.1
@@ -196,10 +196,15 @@ int BMI_sockio_nbrecv(int s,
             errno = EPIPE;
             return (-1);
         }
-	if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
+	if (ret == -1 && errno == EINTR) 
 	{
 	    goto nbrecv_restart;
 	}
+        else if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+        {
+            /* return what we got so far, this is a nonblocking call */
+            return(len-comp);
+        }
 	else if (ret == -1)
 	{
 	    return (-1);
@@ -220,34 +225,30 @@ int BMI_sockio_nbrecv(int s,
  */
 int BMI_sockio_nbpeek(int s, void* buf, int len)
 {
-    int ret, comp = len;
-
+    int ret;
     assert(fcntl(s, F_GETFL, 0) & O_NONBLOCK);
 
-    while (comp)
+  nbpeek_restart:
+    ret = recv(s, buf, len, (MSG_PEEK|DEFAULT_MSG_FLAGS));
+    if(ret == 0)
     {
-      nbpeek_restart:
-	ret = recv(s, buf, comp, (MSG_PEEK|DEFAULT_MSG_FLAGS));
-	if (!ret)	/* socket closed */
-	{
-	    errno = EPIPE;
-	    return (-1);
-	}
-	if (ret == -1 && errno == EWOULDBLOCK)
-	{
-	    return (len - comp);	/* return amount completed */
-	}
-	if (ret == -1 && errno == EINTR)
-	{
-	    goto nbpeek_restart;
-	}
-	else if (ret == -1)
-	{
-	    return (-1);
-	}
-	comp -= ret;
+        errno = EPIPE;
+        return (-1);
     }
-    return (len - comp);
+    else if (ret == -1 && errno == EWOULDBLOCK)
+    {
+        return(0);
+    }
+    else if (ret == -1 && errno == EINTR)
+    {
+        goto nbpeek_restart;
+    }
+    else if (ret == -1)
+    {
+        return (-1);
+    }
+    
+    return(ret);
 }
 
 



More information about the Pvfs2-cvs mailing list