[PVFS2-CVS]
commit by pcarns in pvfs2/src/io/bmi/bmi_tcp: bmi-tcp.c sockio.c
CVS commit program
cvs at parl.clemson.edu
Tue Feb 10 10:19:18 EST 2004
Update of /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb:/tmp/cvs-serv32355/src/io/bmi/bmi_tcp
Modified Files:
bmi-tcp.c sockio.c
Log Message:
merged in two bmi_tcp optimizations to reduce number of system calls:
send msg envelope along with data payload in the same writev() call, and
assume default socket mode is nonblocking rather than querying/setting
mode on every socket call.
Index: bmi-tcp.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.55 -r1.56
--- bmi-tcp.c 30 Jan 2004 20:12:12 -0000 1.55
+++ bmi-tcp.c 10 Feb 2004 15:19:18 -0000 1.56
@@ -197,7 +197,7 @@ struct tcp_op
* this because BMI serializes module calls
*/
#define BMI_TCP_IOV_COUNT 10
-static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT];
+static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
/* internal utility functions */
static int tcp_server_init(void);
@@ -253,7 +253,8 @@ static int tcp_post_recv_generic(bmi_op_
bmi_context_id context_id);
static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
size_list, int list_count, bmi_size_t total_size, int* list_index,
- bmi_size_t* current_index_complete, enum bmi_op_type send_recv);
+ bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
+ char* enc_hdr, bmi_size_t* env_amt_complete);
/* exported method interface */
struct bmi_method_ops bmi_tcp_ops = {
@@ -1895,7 +1896,9 @@ static int tcp_post_recv_generic(bmi_op_
query_op->actual_size,
&(query_op->list_index),
&(query_op->cur_index_complete),
- BMI_RECV);
+ BMI_RECV,
+ NULL,
+ 0);
if (ret < 0)
{
gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
@@ -2250,7 +2253,7 @@ static int tcp_do_work_recv(method_addr_
if (ret < TCP_ENC_HDR_SIZE)
{
tmp_errno = errno;
- gossip_lerr("Error: BMI_sockio_brecv: %s\n", strerror(tmp_errno));
+ gossip_err("Error: BMI_sockio_brecv: %s\n", strerror(tmp_errno));
tcp_forget_addr(map, 0, -tmp_errno);
return (0);
}
@@ -2419,10 +2422,8 @@ static int work_on_send_op(method_op_p m
int *blocked_flag)
{
int ret = -1;
- void *working_buf = NULL;
struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
struct tcp_op *tcp_op_data = my_method_op->method_data;
- int tmp_errno;
*blocked_flag = 1;
@@ -2443,58 +2444,29 @@ static int work_on_send_op(method_op_p m
}
}
- /* ok- have we sent the message envelope yet? */
- if (my_method_op->env_amt_complete < TCP_ENC_HDR_SIZE)
- {
- working_buf = &(tcp_op_data->env.enc_hdr[my_method_op->env_amt_complete]);
- ret = BMI_sockio_nbsend(tcp_addr_data->socket, working_buf,
- (TCP_ENC_HDR_SIZE - my_method_op->env_amt_complete));
- if (ret < 0)
- {
- tmp_errno = errno;
- gossip_lerr("Error: BMI_sockio_nbsend: %s\n", strerror(tmp_errno));
- tcp_forget_addr(my_method_op->addr, 0, -tmp_errno);
- return (0);
- }
- my_method_op->env_amt_complete += ret;
- }
-
- /* if we didn't finish the envelope, just leave the op in the queue
- * for later.
- */
- if (my_method_op->env_amt_complete < TCP_ENC_HDR_SIZE)
+ ret = payload_progress(tcp_addr_data->socket,
+ my_method_op->buffer_list,
+ my_method_op->size_list,
+ my_method_op->list_count,
+ my_method_op->actual_size,
+ &(my_method_op->list_index),
+ &(my_method_op->cur_index_complete),
+ BMI_SEND,
+ tcp_op_data->env.enc_hdr,
+ &my_method_op->env_amt_complete);
+ if (ret < 0)
{
- tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+ gossip_err("Error: payload_progress: %s\n", strerror(-ret));
+ tcp_forget_addr(my_method_op->addr, 0, ret);
return (0);
}
- if (my_method_op->actual_size != 0)
- {
- ret = payload_progress(tcp_addr_data->socket,
- my_method_op->buffer_list,
- my_method_op->size_list,
- my_method_op->list_count,
- my_method_op->actual_size,
- &(my_method_op->list_index),
- &(my_method_op->cur_index_complete),
- BMI_SEND);
- if (ret < 0)
- {
- gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
- tcp_forget_addr(my_method_op->addr, 0, ret);
- return (0);
- }
- }
- else
- {
- ret = 0;
- }
gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
my_method_op->amt_complete += ret;
assert(my_method_op->amt_complete <= my_method_op->actual_size);
- if (my_method_op->amt_complete == my_method_op->actual_size)
+ if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
{
/* we are done */
my_method_op->error_code = 0;
@@ -2542,7 +2514,9 @@ static int work_on_recv_op(method_op_p m
my_method_op->actual_size,
&(my_method_op->list_index),
&(my_method_op->cur_index_complete),
- BMI_RECV);
+ BMI_RECV,
+ NULL,
+ 0);
if (ret < 0)
{
gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
@@ -2736,8 +2710,8 @@ static int BMI_tcp_post_send_generic(bmi
struct tcp_addr *tcp_addr_data = dest->method_data;
method_op_p query_op = NULL;
int ret = -1;
- int tmp_errno = 0;
bmi_size_t amt_complete = 0;
+ bmi_size_t env_amt_complete = 0;
struct op_list_search_key key;
int list_index = 0;
bmi_size_t cur_index_complete = 0;
@@ -2829,56 +2803,23 @@ static int BMI_tcp_post_send_generic(bmi
return (ret);
}
- /* send the message header first */
- tcp_addr_data = dest->method_data;
- ret = BMI_sockio_nbsend(tcp_addr_data->socket, my_header.enc_hdr, TCP_ENC_HDR_SIZE);
+ /* try to send some data */
+ env_amt_complete = 0;
+ ret = payload_progress(tcp_addr_data->socket,
+ (void **) buffer_list,
+ size_list, list_count, my_header.size, &list_index,
+ &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
if (ret < 0)
{
- tmp_errno = errno;
- gossip_lerr("Error: BMI_sockio_nbsend: %s\n", strerror(tmp_errno));
- tcp_forget_addr(dest, 0, -tmp_errno);
- return (-tmp_errno);
- }
- if (ret < TCP_ENC_HDR_SIZE)
- {
- /* header send not completed */
- ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
- dest, (void **) buffer_list, size_list,
- list_count, 0,
- ret, id, BMI_TCP_INPROGRESS, my_header,
- user_ptr, my_header.size, 0, context_id);
- if(ret < 0)
- {
- gossip_lerr("Error: enqueue_operation() returned: %d\n", ret);
- }
+ gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
+ tcp_forget_addr(dest, 0, ret);
return (ret);
}
- /* we finished sending the header */
-
- if (my_header.size != 0)
- {
- /* try to send some actual message data */
- ret = payload_progress(tcp_addr_data->socket,
- (void **) buffer_list,
- size_list, list_count, my_header.size, &list_index,
- &cur_index_complete, BMI_SEND);
- if (ret < 0)
- {
- gossip_lerr("Error: payload_progress: %s\n", strerror(-ret));
- tcp_forget_addr(dest, 0, ret);
- return (ret);
- }
- }
- else
- {
- ret = 0;
- }
-
gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
amt_complete = ret;
assert(amt_complete <= my_header.size);
- if (amt_complete == my_header.size)
+ if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
{
/* we are already done */
return (1);
@@ -2888,7 +2829,7 @@ static int BMI_tcp_post_send_generic(bmi
ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
dest, (void **) buffer_list,
size_list, list_count,
- amt_complete, TCP_ENC_HDR_SIZE, id,
+ amt_complete, env_amt_complete, id,
BMI_TCP_INPROGRESS, my_header, user_ptr,
my_header.size, 0, context_id);
@@ -2908,7 +2849,8 @@ static int BMI_tcp_post_send_generic(bmi
*/
static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
size_list, int list_count, bmi_size_t total_size, int* list_index,
- bmi_size_t* current_index_complete, enum bmi_op_type send_recv)
+ bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
+ char* enc_hdr, bmi_size_t* env_amt_complete)
{
int i;
int count = 0;
@@ -2918,6 +2860,9 @@ static int payload_progress(int s, void
int final_index = list_count-1;
bmi_size_t final_size = size_list[list_count-1];
bmi_size_t sum = 0;
+ int vector_index = 0;
+ int header_flag = 0;
+ int tmp_env_done = 0;
if(send_recv == BMI_RECV)
{
@@ -2942,30 +2887,41 @@ static int payload_progress(int s, void
list_count = (*list_index) + BMI_TCP_IOV_COUNT;
}
+ /* do we need to send any of the header? */
+ if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
+ {
+ stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
+ stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
+ count++;
+ vector_index++;
+ header_flag = 1;
+ }
+
/* setup vector */
- stat_io_vector[0].iov_base =
+ stat_io_vector[vector_index].iov_base =
(char*)buffer_list[*list_index] + *current_index_complete;
- count = 1;
+ count++;
if(final_index == 0)
{
- stat_io_vector[0].iov_len = final_size - *current_index_complete;
+ stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
}
else
{
- stat_io_vector[0].iov_len =
+ stat_io_vector[vector_index].iov_len =
size_list[*list_index] - *current_index_complete;
for(i = (*list_index + 1); i < list_count; i++)
{
- stat_io_vector[(i-*list_index)].iov_base = buffer_list[i];
+ stat_io_vector[vector_index].iov_base = buffer_list[i];
+ vector_index++;
count++;
if(i == final_index)
{
- stat_io_vector[(i-*list_index)].iov_len = final_size;
+ stat_io_vector[vector_index].iov_len = final_size;
break;
}
else
{
- stat_io_vector[(i-*list_index)].iov_len = size_list[i];
+ stat_io_vector[vector_index].iov_len = size_list[i];
}
}
}
@@ -2986,11 +2942,22 @@ static int payload_progress(int s, void
if(ret <= 0)
return(-errno);
- /* update position */
completed = ret;
- i=0;
+ if(header_flag && (completed >= 0))
+ {
+ /* take care of completed header status */
+ tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
+ if(tmp_env_done > completed)
+ tmp_env_done = completed;
+ completed -= tmp_env_done;
+ ret -= tmp_env_done;
+ (*env_amt_complete) += tmp_env_done;
+ }
+
+ i=header_flag;
while(completed > 0)
{
+ /* take care of completed data payload */
if(completed >= stat_io_vector[i].iov_len)
{
completed -= stat_io_vector[i].iov_len;
Index: sockio.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/bmi/bmi_tcp/sockio.c,v
diff -p -u -r1.12 -r1.13
--- sockio.c 9 Sep 2003 22:06:37 -0000 1.12
+++ sockio.c 10 Feb 2004 15:19:18 -0000 1.13
@@ -118,6 +118,7 @@ int BMI_sockio_brecv(int s,
int len)
{
int oldfl, ret, comp = len;
+ int olderrno;
oldfl = fcntl(s, F_GETFL, 0);
if (oldfl & O_NONBLOCK)
fcntl(s, F_SETFL, oldfl & (~O_NONBLOCK));
@@ -129,6 +130,9 @@ int BMI_sockio_brecv(int s,
{
if (errno == EINTR)
goto brecv_restart;
+ olderrno = errno;
+ fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
+ errno = olderrno;
return (-1);
}
if (!ret)
@@ -137,12 +141,14 @@ int BMI_sockio_brecv(int s,
* like this behavior, so we're going to return -1 w/an EPIPE
* instead.
*/
+ fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
errno = EPIPE;
return (-1);
}
comp -= ret;
buf += ret;
}
+ fcntl(s, F_SETFL, oldfl|O_NONBLOCK);
return (len - comp);
}
@@ -151,11 +157,7 @@ int BMI_sockio_nbrecv(int s,
void *buf,
int len)
{
- int oldfl, ret, comp = len;
-
- oldfl = fcntl(s, F_GETFL, 0);
- if (!(oldfl & O_NONBLOCK))
- fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ int ret, comp = len;
while (comp)
{
@@ -194,11 +196,7 @@ int BMI_sockio_nbrecv(int s,
*/
int BMI_sockio_nbpeek(int s, void* buf, int len)
{
- int oldfl, ret, comp = len;
-
- oldfl = fcntl(s, F_GETFL, 0);
- if (!(oldfl & O_NONBLOCK))
- fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ int ret, comp = len;
while (comp)
{
@@ -233,6 +231,7 @@ int BMI_sockio_bsend(int s,
int len)
{
int oldfl, ret, comp = len;
+ int olderrno;
oldfl = fcntl(s, F_GETFL, 0);
if (oldfl & O_NONBLOCK)
fcntl(s, F_SETFL, oldfl & (~O_NONBLOCK));
@@ -244,11 +243,15 @@ int BMI_sockio_bsend(int s,
{
if (errno == EINTR)
goto bsend_restart;
+ olderrno = errno;
+ fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ errno = olderrno;
return (-1);
}
comp -= ret;
buf += ret;
}
+ fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
return (len - comp);
}
@@ -259,10 +262,7 @@ int BMI_sockio_nbsend(int s,
void *buf,
int len)
{
- int oldfl, ret, comp = len;
- oldfl = fcntl(s, F_GETFL, 0);
- if (!(oldfl & O_NONBLOCK))
- fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ int ret, comp = len;
while (comp)
{
@@ -288,10 +288,7 @@ int BMI_sockio_nbvector(int s,
int count,
int recv_flag)
{
- int oldfl, ret;
- oldfl = fcntl(s, F_GETFL, 0);
- if (!(oldfl & O_NONBLOCK))
- fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ int ret;
/* NOTE: this function is different from the others that will
* keep making the I/O system call until EWOULDBLOCK is encountered; we
@@ -340,11 +337,7 @@ int BMI_sockio_nbsendfile(int s,
int off,
int len)
{
- int oldfl, ret, comp = len, myoff;
-
- oldfl = fcntl(s, F_GETFL, 0);
- if (!(oldfl & O_NONBLOCK))
- fcntl(s, F_SETFL, oldfl | O_NONBLOCK);
+ int ret, comp = len, myoff;
while (comp)
{
More information about the PVFS2-CVS
mailing list