[PVFS2-CVS] commit by pcarns in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp.c sockio.c

CVS commit program cvs at parl.clemson.edu
Tue Feb 10 10:19:18 EST 2004


Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb:/tmp/cvs-serv32355/src/io/bmi/bmi_tcp

Modified Files:
	bmi-tcp.c sockio.c 
Log Message:
merged in two bmi_tcp optimizations to reduce number of system calls:
send msg envelope along with data payload in the same writev() call, and
assume default socket mode is nonblocking rather than querying/setting
mode on every socket call.


Index: bmi-tcp.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.55 -r1.56
--- bmi-tcp.c	30 Jan 2004 20:12:12 -0000	1.55
+++ bmi-tcp.c	10 Feb 2004 15:19:18 -0000	1.56
@@ -197,7 +197,7 @@ struct tcp_op
  * this because BMI serializes module calls
  */
 #define BMI_TCP_IOV_COUNT 10
-static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT];
+static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
 
 /* internal utility functions */
 static int tcp_server_init(void);
@@ -253,7 +253,8 @@ static int tcp_post_recv_generic(bmi_op_
 				 bmi_context_id context_id);
 static int payload_progress(int s, void *const *buffer_list, const bmi_size_t* 
     size_list, int list_count, bmi_size_t total_size, int* list_index, 
-    bmi_size_t* current_index_complete, enum bmi_op_type send_recv);
+    bmi_size_t* current_index_complete, enum bmi_op_type send_recv, 
+    char* enc_hdr, bmi_size_t* env_amt_complete);
 
 /* exported method interface */
 struct bmi_method_ops bmi_tcp_ops = {
@@ -1895,7 +1896,9 @@ static int tcp_post_recv_generic(bmi_op_
 		query_op->actual_size,
 		&(query_op->list_index),
 		&(query_op->cur_index_complete),
-		BMI_RECV);
+		BMI_RECV,
+		NULL,
+		0);
 	    if (ret < 0)
 	    {
 		gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
@@ -2250,7 +2253,7 @@ static int tcp_do_work_recv(method_addr_
     if (ret < TCP_ENC_HDR_SIZE)
     {
 	tmp_errno = errno;
-	gossip_lerr("Error: BMI_sockio_brecv: %s\n", strerror(tmp_errno));
+	gossip_err("Error: BMI_sockio_brecv: %s\n", strerror(tmp_errno));
 	tcp_forget_addr(map, 0, -tmp_errno);
 	return (0);
     }
@@ -2419,10 +2422,8 @@ static int work_on_send_op(method_op_p m
 			   int *blocked_flag)
 {
     int ret = -1;
-    void *working_buf = NULL;
     struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
     struct tcp_op *tcp_op_data = my_method_op->method_data;
-    int tmp_errno;
 
     *blocked_flag = 1;
 
@@ -2443,58 +2444,29 @@ static int work_on_send_op(method_op_p m
 	}
     }
 
-    /* ok- have we sent the message envelope yet? */
-    if (my_method_op->env_amt_complete < TCP_ENC_HDR_SIZE)
-    {
-	working_buf = &(tcp_op_data->env.enc_hdr[my_method_op->env_amt_complete]);
-	ret = BMI_sockio_nbsend(tcp_addr_data->socket, working_buf,
-		     (TCP_ENC_HDR_SIZE - my_method_op->env_amt_complete));
-	if (ret < 0)
-	{
-	    tmp_errno = errno;
-	    gossip_lerr("Error: BMI_sockio_nbsend: %s\n", strerror(tmp_errno));
-	    tcp_forget_addr(my_method_op->addr, 0, -tmp_errno);
-	    return (0);
-	}
-	my_method_op->env_amt_complete += ret;
-    }
-
-    /* if we didn't finish the envelope, just leave the op in the queue
-     * for later.
-     */
-    if (my_method_op->env_amt_complete < TCP_ENC_HDR_SIZE)
+    ret = payload_progress(tcp_addr_data->socket,
+	my_method_op->buffer_list,
+	my_method_op->size_list,
+	my_method_op->list_count,
+	my_method_op->actual_size,
+	&(my_method_op->list_index),
+	&(my_method_op->cur_index_complete),
+	BMI_SEND,
+	tcp_op_data->env.enc_hdr,
+	&my_method_op->env_amt_complete);
+    if (ret < 0)
     {
-	tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+	gossip_err("Error: payload_progress: %s\n", strerror(-ret));
+	tcp_forget_addr(my_method_op->addr, 0, ret);
 	return (0);
     }
 
-    if (my_method_op->actual_size != 0)
-    {
-	ret = payload_progress(tcp_addr_data->socket,
-	    my_method_op->buffer_list,
-	    my_method_op->size_list,
-	    my_method_op->list_count,
-	    my_method_op->actual_size,
-	    &(my_method_op->list_index),
-	    &(my_method_op->cur_index_complete),
-	    BMI_SEND);
-	if (ret < 0)
-	{
-	    gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
-	    tcp_forget_addr(my_method_op->addr, 0, ret);
-	    return (0);
-	}
-    }
-    else
-    {
-	ret = 0;
-    }
 
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
     my_method_op->amt_complete += ret;
     assert(my_method_op->amt_complete <= my_method_op->actual_size);
 
-    if (my_method_op->amt_complete == my_method_op->actual_size)
+    if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
     {
 	/* we are done */
 	my_method_op->error_code = 0;
@@ -2542,7 +2514,9 @@ static int work_on_recv_op(method_op_p m
 	    my_method_op->actual_size,
 	    &(my_method_op->list_index),
 	    &(my_method_op->cur_index_complete),
-	    BMI_RECV);
+	    BMI_RECV,
+	    NULL,
+	    0);
 	if (ret < 0)
 	{
 	    gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
@@ -2736,8 +2710,8 @@ static int BMI_tcp_post_send_generic(bmi
     struct tcp_addr *tcp_addr_data = dest->method_data;
     method_op_p query_op = NULL;
     int ret = -1;
-    int tmp_errno = 0;
     bmi_size_t amt_complete = 0;
+    bmi_size_t env_amt_complete = 0;
     struct op_list_search_key key;
     int list_index = 0;
     bmi_size_t cur_index_complete = 0;
@@ -2829,56 +2803,23 @@ static int BMI_tcp_post_send_generic(bmi
 	return (ret);
     }
 
-    /* send the message header first */
-    tcp_addr_data = dest->method_data;
-    ret = BMI_sockio_nbsend(tcp_addr_data->socket, my_header.enc_hdr, TCP_ENC_HDR_SIZE);
+    /* try to send some data */
+    env_amt_complete = 0;
+    ret = payload_progress(tcp_addr_data->socket,
+	(void **) buffer_list,
+	size_list, list_count, my_header.size, &list_index,
+	&cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
     if (ret < 0)
     {
-	tmp_errno = errno;
-	gossip_lerr("Error: BMI_sockio_nbsend: %s\n", strerror(tmp_errno));
-	tcp_forget_addr(dest, 0, -tmp_errno);
-	return (-tmp_errno);
-    }
-    if (ret < TCP_ENC_HDR_SIZE)
-    {
-	/* header send not completed */
-	ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
-				dest, (void **) buffer_list, size_list,
-				list_count, 0,
-				ret, id, BMI_TCP_INPROGRESS, my_header,
-				user_ptr, my_header.size, 0, context_id);
-	if(ret < 0)
-	{
-	    gossip_lerr("Error: enqueue_operation() returned: %d\n", ret);
-	}
+	gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
+	tcp_forget_addr(dest, 0, ret);
 	return (ret);
     }
 
-    /* we finished sending the header */
-
-    if (my_header.size != 0)
-    {
-	/* try to send some actual message data */
-	ret = payload_progress(tcp_addr_data->socket,
-	    (void **) buffer_list,
-	    size_list, list_count, my_header.size, &list_index,
-	    &cur_index_complete, BMI_SEND);
-	if (ret < 0)
-	{
-	    gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
-	    tcp_forget_addr(dest, 0, ret);
-	    return (ret);
-	}
-    }
-    else
-    {
-	ret = 0;
-    }
-
     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
     amt_complete = ret;
     assert(amt_complete <= my_header.size);
-    if (amt_complete == my_header.size)
+    if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
     {
 	/* we are already done */
 	return (1);
@@ -2888,7 +2829,7 @@ static int BMI_tcp_post_send_generic(bmi
     ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
 			    dest, (void **) buffer_list,
 			    size_list, list_count,
-			    amt_complete, TCP_ENC_HDR_SIZE, id,
+			    amt_complete, env_amt_complete, id,
 			    BMI_TCP_INPROGRESS, my_header, user_ptr,
 			    my_header.size, 0, context_id);
 
@@ -2908,7 +2849,8 @@ static int BMI_tcp_post_send_generic(bmi
  */
 static int payload_progress(int s, void *const *buffer_list, const bmi_size_t* 
     size_list, int list_count, bmi_size_t total_size, int* list_index, 
-    bmi_size_t* current_index_complete, enum bmi_op_type send_recv)
+    bmi_size_t* current_index_complete, enum bmi_op_type send_recv, 
+    char* enc_hdr, bmi_size_t* env_amt_complete)
 {
     int i;
     int count = 0;
@@ -2918,6 +2860,9 @@ static int payload_progress(int s, void 
     int final_index = list_count-1;
     bmi_size_t final_size = size_list[list_count-1];
     bmi_size_t sum = 0;
+    int vector_index = 0;
+    int header_flag = 0;
+    int tmp_env_done = 0;
 
     if(send_recv == BMI_RECV)
     {
@@ -2942,30 +2887,41 @@ static int payload_progress(int s, void 
 	list_count = (*list_index) + BMI_TCP_IOV_COUNT;
     }
 
+    /* do we need to send any of the header? */
+    if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
+    {
+	stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
+	stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
+	count++;
+	vector_index++;
+	header_flag = 1;
+    }
+
     /* setup vector */
-    stat_io_vector[0].iov_base = 
+    stat_io_vector[vector_index].iov_base = 
 	(char*)buffer_list[*list_index] + *current_index_complete;
-    count = 1;
+    count++;
     if(final_index == 0)
     {
-	stat_io_vector[0].iov_len = final_size - *current_index_complete;
+	stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
     }
     else
     {
-	stat_io_vector[0].iov_len = 
+	stat_io_vector[vector_index].iov_len = 
 	    size_list[*list_index] - *current_index_complete;
 	for(i = (*list_index + 1); i < list_count; i++)
 	{
-	    stat_io_vector[(i-*list_index)].iov_base = buffer_list[i];
+	    stat_io_vector[vector_index].iov_base = buffer_list[i];
+	    vector_index++;
 	    count++;
 	    if(i == final_index)
 	    {
-		stat_io_vector[(i-*list_index)].iov_len = final_size;
+		stat_io_vector[vector_index].iov_len = final_size;
 		break;
 	    }
 	    else
 	    {
-		stat_io_vector[(i-*list_index)].iov_len = size_list[i];
+		stat_io_vector[vector_index].iov_len = size_list[i];
 	    }
 	}
     }
@@ -2986,11 +2942,22 @@ static int payload_progress(int s, void 
     if(ret <= 0)
 	return(-errno);
 
-    /* update position */
     completed = ret;
-    i=0;
+    if(header_flag && (completed >= 0))
+    {
+	/* take care of completed header status */
+	tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
+	if(tmp_env_done > completed)
+	    tmp_env_done = completed;
+	completed -= tmp_env_done;
+	ret -= tmp_env_done;
+	(*env_amt_complete) += tmp_env_done;
+    }
+
+    i=header_flag;
     while(completed > 0)
     {
+	/* take care of completed data payload */
 	if(completed >= stat_io_vector[i].iov_len)
 	{
 	    completed -= stat_io_vector[i].iov_len;

Index: sockio.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/sockio.c,v
diff -p -u -r1.12 -r1.13
--- sockio.c	9 Sep 2003 22:06:37 -0000	1.12
+++ sockio.c	10 Feb 2004 15:19:18 -0000	1.13
@@ -118,6 +118,7 @@ int BMI_sockio_brecv(int s,
 	  int len)
 {
     int oldfl, ret, comp = len;
+    int olderrno;
     oldfl = fcntl(s, F_GETFL, 0);
     if (oldfl & O_NONBLOCK)
 	fcntl(s, F_SETFL, oldfl & (~O_NONBLOCK));
@@ -129,6 +130,9 @@ int BMI_sockio_brecv(int s,
 	{
 	    if (errno == EINTR)
 		goto brecv_restart;
+	    olderrno = errno;
+	    fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
+	    errno = olderrno;
 	    return (-1);
 	}
 	if (!ret)
@@ -137,12 +141,14 @@ int BMI_sockio_brecv(int s,
 	     * like this behavior, so we're going to return -1 w/an EPIPE
 	     * instead.
 	     */
+	    fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
 	    errno = EPIPE;
 	    return (-1);
 	}
 	comp -= ret;
 	buf += ret;
     }
+    fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
     return (len - comp);
 }
 
@@ -151,11 +157,7 @@ int BMI_sockio_nbrecv(int s,
 	   void *buf,
 	   int len)
 {
-    int oldfl, ret, comp = len;
-
-    oldfl = fcntl(s, F_GETFL, 0);
-    if (!(oldfl & O_NONBLOCK))
-	fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+    int ret, comp = len;
 
     while (comp)
     {
@@ -194,11 +196,7 @@ int BMI_sockio_nbrecv(int s,
  */
 int BMI_sockio_nbpeek(int s, void* buf, int len)
 {
-    int oldfl, ret, comp = len;
-
-    oldfl = fcntl(s, F_GETFL, 0);
-    if (!(oldfl & O_NONBLOCK))
-	fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+    int ret, comp = len;
 
     while (comp)
     {
@@ -233,6 +231,7 @@ int BMI_sockio_bsend(int s,
 	  int len)
 {
     int oldfl, ret, comp = len;
+    int olderrno;
     oldfl = fcntl(s, F_GETFL, 0);
     if (oldfl & O_NONBLOCK)
 	fcntl(s, F_SETFL, oldfl & (~O_NONBLOCK));
@@ -244,11 +243,15 @@ int BMI_sockio_bsend(int s,
 	{
 	    if (errno == EINTR)
 		goto bsend_restart;
+	    olderrno = errno;
+	    fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+	    errno = olderrno;
 	    return (-1);
 	}
 	comp -= ret;
 	buf += ret;
     }
+    fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
     return (len - comp);
 }
 
@@ -259,10 +262,7 @@ int BMI_sockio_nbsend(int s,
 	   void *buf,
 	   int len)
 {
-    int oldfl, ret, comp = len;
-    oldfl = fcntl(s, F_GETFL, 0);
-    if (!(oldfl & O_NONBLOCK))
-	fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+    int ret, comp = len;
 
     while (comp)
     {
@@ -288,10 +288,7 @@ int BMI_sockio_nbvector(int s,
 	    int count, 
 	    int recv_flag)
 {
-    int oldfl, ret;
-    oldfl = fcntl(s, F_GETFL, 0);
-    if (!(oldfl & O_NONBLOCK))
-	fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+    int ret;
 
     /* NOTE: this function is different from the others that will
      * keep making the I/O system call until EWOULDBLOCK is encountered; we 
@@ -340,11 +337,7 @@ int BMI_sockio_nbsendfile(int s,
 	       int off,
 	       int len)
 {
-    int oldfl, ret, comp = len, myoff;
-
-    oldfl = fcntl(s, F_GETFL, 0);
-    if (!(oldfl & O_NONBLOCK))
-	fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+    int ret, comp = len, myoff;
 
     while (comp)
     {



More information about the PVFS2-CVS mailing list