[PVFS2-CVS] commit by pcarns in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp.c

CVS commit program cvs at parl.clemson.edu
Tue Feb 10 17:52:55 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb:/tmp/cvs-serv2160/bmi_tcp

Modified Files:
	bmi-tcp.c 
Log Message:
important fix for bmi_tcp; detect when we have a big backlog of data on 
the sockets being watched, and throttle back if we can't make any progress
on that backlog.  We now give up more cpu time in hopes of the rest of the 
system catching up.  Should make a big difference on systems where the 
tcp/ip net is much faster than the disk (like on big writes to localhost)


Index: bmi-tcp.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.56 -r1.57
--- bmi-tcp.c	10 Feb 2004 15:19:18 -0000	1.56
+++ bmi-tcp.c	10 Feb 2004 22:52:55 -0000	1.57
@@ -14,6 +14,7 @@
 #include <netinet/tcp.h>
 #include <assert.h>
 #include <sys/uio.h>
+#include <time.h>
 
 #include "bmi-method-support.h"
 #include "bmi-method-callback.h"
@@ -222,11 +223,12 @@ static int tcp_cleanse_addr(method_addr_
 static int tcp_shutdown_addr(method_addr_p map);
 static int tcp_do_work(int max_idle_time);
 static int tcp_do_work_error(method_addr_p map);
-static int tcp_do_work_recv(method_addr_p map);
-static int tcp_do_work_send(method_addr_p map);
-static int work_on_recv_op(method_op_p my_method_op);
+static int tcp_do_work_recv(method_addr_p map, int* stall_flag);
+static int tcp_do_work_send(method_addr_p map, int* stall_flag);
+static int work_on_recv_op(method_op_p my_method_op,
+			   int *stall_flag);
 static int work_on_send_op(method_op_p my_method_op,
-			   int *blocked_flag);
+			   int *blocked_flag, int* stall_flag);
 static int tcp_accept_init(int *socket);
 static method_op_p alloc_tcp_method_op(void);
 static void dealloc_tcp_method_op(method_op_p old_op);
@@ -2044,6 +2046,9 @@ static int tcp_do_work(int max_idle_time
     int status_array[TCP_WORK_METRIC];
     int socket_count = 0;
     int i = 0;
+    int stall_flag = 0;
+    int busy_flag = 1;
+    struct timespec req;
 
     /* now we need to poll and see what to work on */
     ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
@@ -2055,6 +2060,9 @@ static int tcp_do_work(int max_idle_time
 	return (ret);
     }
 
+    if(socket_count == 0)
+	busy_flag = 0;
+
     /* do different kinds of work depending on results */
     for (i = 0; i < socket_count; i++)
     {
@@ -2070,22 +2078,42 @@ static int tcp_do_work(int max_idle_time
 	{
 	    if (status_array[i] & SC_WRITE_BIT)
 	    {
-		ret = tcp_do_work_send(addr_array[i]);
+		ret = tcp_do_work_send(addr_array[i], &stall_flag);
 		if (ret < 0)
 		{
 		    return (ret);
 		}
+		if(!stall_flag)
+		    busy_flag = 0;
 	    }
 	    if (status_array[i] & SC_READ_BIT)
 	    {
-		ret = tcp_do_work_recv(addr_array[i]);
+		ret = tcp_do_work_recv(addr_array[i], &stall_flag);
 		if (ret < 0)
 		{
 		    return (ret);
 		}
+		if(!stall_flag)
+		    busy_flag = 0;
 	    }
 	}
     }
+
+    /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
+     * poll() is finding data on our sockets, yet we are not able to move
+     * any of it right now.  This means that the sockets are backlogged, and
+     * BMI is in danger of busy spinning during test functions.  Let's sleep
+     * for a millisecond here in hopes of letting the rest of the system
+     * catch up somehow (either by clearing a backlog in another I/O
+     * component, or by posting more matching BMI recieve operations)
+     */
+    if(busy_flag)
+    {
+	req.tv_sec = 0;
+	req.tv_nsec = 1000;
+	nanosleep(&req, NULL);
+    }
+
     return (0);
 }
 
@@ -2096,12 +2124,15 @@ static int tcp_do_work(int max_idle_time
  *
  * returns 0 on success, -errno on failure
  */
-static int tcp_do_work_send(method_addr_p map)
+static int tcp_do_work_send(method_addr_p map, int* stall_flag)
 {
     method_op_p active_method_op = NULL;
     struct op_list_search_key key;
     int blocked_flag = 0;
     int ret = 0;
+    int tmp_stall_flag;
+
+    *stall_flag = 1;
 
     while (blocked_flag == 0 && ret == 0)
     {
@@ -2118,7 +2149,9 @@ static int tcp_do_work_send(method_addr_
 	    return (0);
 	}
 
-	ret = work_on_send_op(active_method_op, &blocked_flag);
+	ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
+	if(!tmp_stall_flag)
+	    *stall_flag = 0;
     }
 
     return (ret);
@@ -2191,7 +2224,7 @@ static int handle_new_connection(method_
  *
  * returns 0 on success, -errno on failure
  */
-static int tcp_do_work_recv(method_addr_p map)
+static int tcp_do_work_recv(method_addr_p map, int* stall_flag)
 {
 
     method_op_p active_method_op = NULL;
@@ -2202,11 +2235,15 @@ static int tcp_do_work_recv(method_addr_
     struct tcp_addr *tcp_addr_data = map->method_data;
     struct tcp_op *tcp_op_data = NULL;
     int tmp_errno;
+    int tmp;
+
+    *stall_flag = 1;
 
     /* figure out if this is a new connection */
     if (tcp_addr_data->server_port)
     {
 	/* just try to accept connection- no work yet */
+	*stall_flag = 0;
 	return (handle_new_connection(map));
     }
 
@@ -2224,7 +2261,7 @@ static int tcp_do_work_recv(method_addr_
 	}
 	else
 	{
-	    return (work_on_recv_op(active_method_op));
+	    return (work_on_recv_op(active_method_op, stall_flag));
 	}
     }
 
@@ -2245,6 +2282,7 @@ static int tcp_do_work_recv(method_addr_
 	return (0);
     }
 
+    *stall_flag = 0;
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
     /* NOTE: we only allow a blocking call here because we peeked to see
      * if this amount of data was ready above.  
@@ -2320,7 +2358,7 @@ static int tcp_do_work_recv(method_addr_
 
 	op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
 	/* grab some data if we can */
-	return (work_on_recv_op(active_method_op));
+	return (work_on_recv_op(active_method_op, &tmp));
     }
 
     memset(&key, 0, sizeof(struct op_list_search_key));
@@ -2351,7 +2389,7 @@ static int tcp_do_work_recv(method_addr_
 	active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
 	active_method_op->actual_size = new_header.size;
 	op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
-	return (work_on_recv_op(active_method_op));
+	return (work_on_recv_op(active_method_op, &tmp));
     }
 
     /* no match anywhere.  Start a new operation */
@@ -2401,7 +2439,7 @@ static int tcp_do_work_recv(method_addr_
     /* grab some data if we can */
     if (new_header.mode == TCP_MODE_EAGER)
     {
-	return (work_on_recv_op(active_method_op));
+	return (work_on_recv_op(active_method_op, &tmp));
     }
 
     return (0);
@@ -2419,13 +2457,14 @@ static int tcp_do_work_recv(method_addr_
  * returns 0 on success, -errno on failure.
  */
 static int work_on_send_op(method_op_p my_method_op,
-			   int *blocked_flag)
+			   int *blocked_flag, int* stall_flag)
 {
     int ret = -1;
     struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
     struct tcp_op *tcp_op_data = my_method_op->method_data;
 
     *blocked_flag = 1;
+    *stall_flag = 0;
 
     /* make sure that the connection is done before we continue */
     if (tcp_addr_data->not_connected)
@@ -2461,6 +2500,8 @@ static int work_on_send_op(method_op_p m
 	return (0);
     }
 
+    if(ret == 0)
+	*stall_flag = 1;
 
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
     my_method_op->amt_complete += ret;
@@ -2497,13 +2538,15 @@ static int work_on_send_op(method_op_p m
  *
  * returns 0 on success, -errno on failure.
  */
-static int work_on_recv_op(method_op_p my_method_op)
+static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
 {
 
     int ret = -1;
     struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
     struct tcp_op *tcp_op_data = my_method_op->method_data;
 
+    *stall_flag = 1;
+
     if (my_method_op->actual_size != 0)
     {
 	/* now let's try to recv some actual data */
@@ -2528,6 +2571,9 @@ static int work_on_recv_op(method_op_p m
     {
 	ret = 0;
     }
+
+    if(ret > 0)
+	*stall_flag = 0;
 
     my_method_op->amt_complete += ret;
     assert(my_method_op->amt_complete <= my_method_op->actual_size);



More information about the PVFS2-CVS mailing list