[Pvfs2-cvs] commit by nlmills in pvfs2/src/io/bmi/bmi_tcp:
bmi-tcp-addressing.h bmi-tcp.c module.mk.in
socket-collection-epoll.c socket-collection-epoll.h
socket-collection.c socket-collection.h sockio.c
CVS commit program
cvs at parl.clemson.edu
Tue Aug 25 13:56:12 EDT 2009
Update of /anoncvs/pvfs2/src/io/bmi/bmi_tcp
In directory parlweb1:/tmp/cvs-serv5511/src/io/bmi/bmi_tcp
Modified Files:
Tag: cu-security-branch
bmi-tcp-addressing.h bmi-tcp.c module.mk.in
socket-collection-epoll.c socket-collection-epoll.h
socket-collection.c socket-collection.h sockio.c
Log Message:
merged in changes from summer at LANL
Index: bmi-tcp-addressing.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp-addressing.h,v
diff -p -u -r1.17 -r1.17.8.1
--- bmi-tcp-addressing.h 6 Nov 2007 23:08:36 -0000 1.17
+++ bmi-tcp-addressing.h 25 Aug 2009 17:56:11 -0000 1.17.8.1
@@ -24,6 +24,11 @@
*/
#define BMI_TCP_ZERO_READ_LIMIT 10
+/* wait no more than 10 seconds for a partial BMI header to arrive on a
+ * socket once we have detected part of it.
+ */
+#define BMI_TCP_HEADER_WAIT_SECONDS 10
+
/* peer name types */
#define BMI_TCP_PEER_IP 1
#define BMI_TCP_PEER_HOSTNAME 2
@@ -47,7 +52,7 @@ struct tcp_allowed_connection_s {
struct tcp_addr
{
bmi_method_addr_p map; /* points back to generic address */ \
- PVFS_BMI_addr_t bmi_addr;
+ BMI_addr_t bmi_addr;
/* stores error code for addresses that are broken for some reason */
int addr_error;
char *hostname;
@@ -65,6 +70,8 @@ struct tcp_addr
int sc_index;
/* count of the number of sequential zero read operations */
int zero_read_limit;
+ /* timer for how long we wait on incomplete headers to arrive */
+ int short_header_timer;
/* flag used to determine if we can reconnect this address after failure */
int dont_reconnect;
char* peer;
Index: bmi-tcp.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/bmi-tcp.c,v
diff -p -u -r1.127 -r1.127.4.1
--- bmi-tcp.c 8 Jan 2008 21:18:41 -0000 1.127
+++ bmi-tcp.c 25 Aug 2009 17:56:11 -0000 1.127.4.1
@@ -15,9 +15,11 @@
#include <assert.h>
#include <sys/uio.h>
#include <time.h>
+#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
+#include "pint-mem.h"
#include "pvfs2-config.h"
#ifdef HAVE_NETDB_H
@@ -38,21 +40,18 @@
#include "bmi-byteswap.h"
#include "id-generator.h"
#include "pint-event.h"
+#include "pvfs2-debug.h"
#ifdef USE_TRUSTED
#include "server-config.h"
#include "bmi-tcp-addressing.h"
#endif
#include "gen-locks.h"
-
-#define BMI_EVENT_START(__op, __id) \
- PINT_event_timestamp(PVFS_EVENT_API_BMI, __op, 0, __id, \
- PVFS_EVENT_FLAG_START)
-
-#define BMI_EVENT_END(__op, __size, __id) \
- PINT_event_timestamp(PVFS_EVENT_API_BMI, __op, __size, __id, \
- PVFS_EVENT_FLAG_END)
+#include "pint-hint.h"
+#include "pint-event.h"
static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
+static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
+static int sc_test_busy = 0;
/* function prototypes */
int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
@@ -76,7 +75,8 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
bmi_method_addr_p dest,
const void *buffer,
@@ -84,7 +84,8 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_post_recv(bmi_op_id_t * id,
bmi_method_addr_p src,
void *buffer,
@@ -93,7 +94,8 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_test(bmi_op_id_t id,
int *outcount,
bmi_error_code_t * error_code,
@@ -134,7 +136,8 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_post_recv_list(bmi_op_id_t * id,
bmi_method_addr_p src,
void *const *buffer_list,
@@ -145,7 +148,8 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
bmi_method_addr_p dest,
const void *const *buffer_list,
@@ -155,7 +159,8 @@ int BMI_tcp_post_sendunexpected_list(bmi
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ PVFS_hint hints);
int BMI_tcp_open_context(bmi_context_id context_id);
void BMI_tcp_close_context(bmi_context_id context_id);
int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
@@ -237,7 +242,8 @@ static int enqueue_operation(op_list_p t
void *user_ptr,
bmi_size_t actual_size,
bmi_size_t expected_size,
- bmi_context_id context_id);
+ bmi_context_id context_id,
+ int32_t event_id);
static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
static int tcp_shutdown_addr(bmi_method_addr_p map);
static int tcp_do_work(int max_idle_time);
@@ -252,26 +258,28 @@ static int tcp_accept_init(int *socket,
static method_op_p alloc_tcp_method_op(void);
static void dealloc_tcp_method_op(method_op_p old_op);
static int handle_new_connection(bmi_method_addr_p map);
-static int BMI_tcp_post_send_generic(bmi_op_id_t * id,
- bmi_method_addr_p dest,
- const void *const *buffer_list,
- const bmi_size_t *size_list,
- int list_count,
- enum bmi_buffer_type buffer_type,
- struct tcp_msg_header my_header,
- void *user_ptr,
- bmi_context_id context_id);
+static int tcp_post_send_generic(bmi_op_id_t * id,
+ bmi_method_addr_p dest,
+ const void *const *buffer_list,
+ const bmi_size_t *size_list,
+ int list_count,
+ enum bmi_buffer_type buffer_type,
+ struct tcp_msg_header my_header,
+ void *user_ptr,
+ bmi_context_id context_id,
+ PVFS_hint hints);
static int tcp_post_recv_generic(bmi_op_id_t * id,
- bmi_method_addr_p src,
- void *const *buffer_list,
- const bmi_size_t *size_list,
- int list_count,
- bmi_size_t expected_size,
- bmi_size_t * actual_size,
- enum bmi_buffer_type buffer_type,
- bmi_msg_tag_t tag,
- void *user_ptr,
- bmi_context_id context_id);
+ bmi_method_addr_p src,
+ void *const *buffer_list,
+ const bmi_size_t *size_list,
+ int list_count,
+ bmi_size_t expected_size,
+ bmi_size_t * actual_size,
+ enum bmi_buffer_type buffer_type,
+ bmi_msg_tag_t tag,
+ void *user_ptr,
+ bmi_context_id context_id,
+ PVFS_hint hints);
static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
size_list, int list_count, bmi_size_t total_size, int* list_index,
bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
@@ -326,6 +334,8 @@ static struct
static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
#endif
+static int check_unexpected = 1;
+
/* op_list_array indices */
enum
{
@@ -388,6 +398,11 @@ static int forceful_cancel_mode = 0;
static int tcp_buffer_size_receive = 0;
static int tcp_buffer_size_send = 0;
+static PINT_event_type bmi_tcp_send_event_id;
+static PINT_event_type bmi_tcp_recv_event_id;
+
+static PINT_event_group bmi_tcp_event_group;
+static pid_t bmi_tcp_pid;
/*************************************************************************
* Visible Interface
@@ -401,8 +416,8 @@ static int tcp_buffer_size_send = 0;
* returns 0 on success, -errno on failure
*/
int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
- int method_id,
- int init_flags)
+ int method_id,
+ int init_flags)
{
int ret = -1;
@@ -415,8 +430,8 @@ int BMI_tcp_initialize(bmi_method_addr_p
/* check args */
if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
{
- gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
- return (bmi_tcp_errno_to_pvfs(-EINVAL));
+ gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
+ return (bmi_tcp_errno_to_pvfs(-EINVAL));
}
gen_mutex_lock(&interface_mutex);
@@ -428,46 +443,77 @@ int BMI_tcp_initialize(bmi_method_addr_p
if (init_flags & BMI_INIT_SERVER)
{
- /* hang on to our local listening address if needed */
- tcp_method_params.listen_addr = listen_addr;
- /* and initialize server functions */
- ret = tcp_server_init();
- if (ret < 0)
- {
- tmp_errno = bmi_tcp_errno_to_pvfs(ret);
- gossip_err("Error: tcp_server_init() failure.\n");
- goto initialize_failure;
- }
+ /* hang on to our local listening address if needed */
+ tcp_method_params.listen_addr = listen_addr;
+ /* and initialize server functions */
+ ret = tcp_server_init();
+ if (ret < 0)
+ {
+ tmp_errno = bmi_tcp_errno_to_pvfs(ret);
+ gossip_err("Error: tcp_server_init() failure.\n");
+ goto initialize_failure;
+ }
}
/* set up the operation lists */
for (i = 0; i < NUM_INDICES; i++)
{
- op_list_array[i] = op_list_new();
- if (!op_list_array[i])
- {
- tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
- goto initialize_failure;
- }
+ op_list_array[i] = op_list_new();
+ if (!op_list_array[i])
+ {
+ tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
+ goto initialize_failure;
+ }
}
/* set up the socket collection */
if (tcp_method_params.method_flags & BMI_INIT_SERVER)
{
- tcp_addr_data = tcp_method_params.listen_addr->method_data;
- tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
+ tcp_addr_data = tcp_method_params.listen_addr->method_data;
+ tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
}
else
{
- tcp_socket_collection_p = BMI_socket_collection_init(-1);
+ tcp_socket_collection_p = BMI_socket_collection_init(-1);
}
if (!tcp_socket_collection_p)
{
- tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
- goto initialize_failure;
+ tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
+ goto initialize_failure;
}
+ bmi_tcp_pid = getpid();
+ PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
+
+ /* Define the send event:
+ * START: (client_id, request_id, rank, handle, op_id, send_size)
+ * STOP: (size_sent)
+ */
+ PINT_event_define_event(
+ &bmi_tcp_event_group,
+#ifdef __PVFS2_SERVER__
+ "bmi_server_send",
+#else
+ "bmi_client_send",
+#endif
+ "%d%d%d%llu%d%d",
+ "%d", &bmi_tcp_send_event_id);
+
+ /* Define the recv event:
+ * START: (client_id, request_id, rank, handle, op_id, recv_size)
+ * STOP: (size_received)
+ */
+ PINT_event_define_event(
+ &bmi_tcp_event_group,
+#ifdef __PVFS2_SERVER__
+ "bmi_server_recv",
+#else
+ "bmi_client_recv",
+#endif
+ "%d%d%d%llu%d%d",
+ "%d", &bmi_tcp_recv_event_id);
+
gen_mutex_unlock(&interface_mutex);
gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
"TCP/IP module successfully initialized.\n");
@@ -478,14 +524,14 @@ int BMI_tcp_initialize(bmi_method_addr_p
/* cleanup data structures and bail out */
for (i = 0; i < NUM_INDICES; i++)
{
- if (op_list_array[i])
- {
- op_list_cleanup(op_list_array[i]);
- }
+ if (op_list_array[i])
+ {
+ op_list_cleanup(op_list_array[i]);
+ }
}
if (tcp_socket_collection_p)
{
- BMI_socket_collection_finalize(tcp_socket_collection_p);
+ BMI_socket_collection_finalize(tcp_socket_collection_p);
}
gen_mutex_unlock(&interface_mutex);
return (tmp_errno);
@@ -506,26 +552,26 @@ int BMI_tcp_finalize(void)
/* shut down our listen addr, if we have one */
if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
- && tcp_method_params.listen_addr)
+ && tcp_method_params.listen_addr)
{
- dealloc_tcp_method_addr(tcp_method_params.listen_addr);
+ dealloc_tcp_method_addr(tcp_method_params.listen_addr);
}
/* note that this forcefully shuts down operations */
for (i = 0; i < NUM_INDICES; i++)
{
- if (op_list_array[i])
- {
- op_list_cleanup(op_list_array[i]);
- op_list_array[i] = NULL;
- }
+ if (op_list_array[i])
+ {
+ op_list_cleanup(op_list_array[i]);
+ op_list_array[i] = NULL;
+ }
}
/* get rid of socket collection */
if (tcp_socket_collection_p)
{
- BMI_socket_collection_finalize(tcp_socket_collection_p);
- tcp_socket_collection_p = NULL;
+ BMI_socket_collection_finalize(tcp_socket_collection_p);
+ tcp_socket_collection_p = NULL;
}
/* NOTE: we are trusting the calling BMI layer to deallocate
@@ -618,7 +664,8 @@ void *BMI_tcp_memalloc(bmi_size_t size,
* preferences about how the memory should be configured.
*/
- return (calloc(1,(size_t) size));
+/* return (calloc(1,(size_t) size)); */
+ return PINT_mem_aligned_alloc(size, 4096);
}
@@ -632,16 +679,7 @@ int BMI_tcp_memfree(void *buffer,
bmi_size_t size,
enum bmi_op_type send_recv)
{
- /* NOTE: I am not going to bother to check to see if it is really our
- * buffer. This function trusts the caller.
- * We also could care less whether it was a send or recv buffer.
- */
- if (buffer)
- {
- free(buffer);
- buffer = NULL;
- }
-
+ PINT_mem_aligned_free(buffer);
return (0);
}
@@ -876,6 +914,13 @@ int BMI_tcp_set_info(int option,
break;
}
#endif
+ case BMI_TCP_CHECK_UNEXPECTED:
+ {
+ check_unexpected = *(int *)inout_parameter;
+ ret = 0;
+ break;
+ }
+
default:
gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
"TCP hint %d not implemented.\n", option);
@@ -956,7 +1001,8 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
struct tcp_msg_header my_header;
int ret = -1;
@@ -984,13 +1030,9 @@ int BMI_tcp_post_send(bmi_op_id_t * id,
gen_mutex_lock(&interface_mutex);
- ret = BMI_tcp_post_send_generic(id, dest, &buffer,
- &size, 1, buffer_type, my_header,
- user_ptr, context_id);
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, size, *id);
+ ret = tcp_post_send_generic(id, dest, &buffer,
+ &size, 1, buffer_type, my_header,
+ user_ptr, context_id, hints);
gen_mutex_unlock(&interface_mutex);
return(ret);
@@ -1011,7 +1053,8 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
struct tcp_msg_header my_header;
int ret = -1;
@@ -1031,14 +1074,9 @@ int BMI_tcp_post_sendunexpected(bmi_op_i
gen_mutex_lock(&interface_mutex);
- ret = BMI_tcp_post_send_generic(id, dest, &buffer,
- &size, 1, buffer_type, my_header,
- user_ptr, context_id);
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, size, *id);
-
+ ret = tcp_post_send_generic(id, dest, &buffer,
+ &size, 1, buffer_type, my_header,
+ user_ptr, context_id, hints);
gen_mutex_unlock(&interface_mutex);
return(ret);
}
@@ -1060,7 +1098,8 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
int ret = -1;
@@ -1082,14 +1121,9 @@ int BMI_tcp_post_recv(bmi_op_id_t * id,
gen_mutex_lock(&interface_mutex);
ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
- 1, expected_size, actual_size,
- buffer_type, tag,
- user_ptr, context_id);
-
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_RECV, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *actual_size, *id);
+ 1, expected_size, actual_size,
+ buffer_type, tag,
+ user_ptr, context_id, hints);
gen_mutex_unlock(&interface_mutex);
return (ret);
@@ -1136,10 +1170,11 @@ int BMI_tcp_test(bmi_op_id_t id,
}
(*error_code) = query_op->error_code;
(*actual_size) = query_op->actual_size;
- if(query_op->send_recv == BMI_SEND)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, *actual_size, id);
- else
- BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *actual_size, id);
+ PINT_EVENT_END(
+ (query_op->send_recv == BMI_SEND ?
+ bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
+ query_op->event_id, id, *actual_size);
+
dealloc_tcp_method_op(query_op);
(*outcount)++;
}
@@ -1155,14 +1190,14 @@ int BMI_tcp_test(bmi_op_id_t id,
* returns 0 on success, -errno on failure
*/
int BMI_tcp_testsome(int incount,
- bmi_op_id_t * id_array,
- int *outcount,
- int *index_array,
- bmi_error_code_t * error_code_array,
- bmi_size_t * actual_size_array,
- void **user_ptr_array,
- int max_idle_time,
- bmi_context_id context_id)
+ bmi_op_id_t * id_array,
+ int *outcount,
+ int *index_array,
+ bmi_error_code_t * error_code_array,
+ bmi_size_t * actual_size_array,
+ void **user_ptr_array,
+ int max_idle_time,
+ bmi_context_id context_id)
{
int ret = -1;
method_op_p query_op = NULL;
@@ -1174,39 +1209,40 @@ int BMI_tcp_testsome(int incount,
ret = tcp_do_work(max_idle_time);
if (ret < 0)
{
- gen_mutex_unlock(&interface_mutex);
- return (ret);
+ gen_mutex_unlock(&interface_mutex);
+ return (ret);
}
for(i=0; i<incount; i++)
{
- if(id_array[i])
- {
- /* NOTE: this depends on the user passing in valid id's;
- * otherwise we segfault.
- */
- query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
- if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
- BMI_TCP_COMPLETE)
- {
- assert(query_op->context_id == context_id);
- /* this one's done; pop it out */
- op_list_remove(query_op);
- error_code_array[*outcount] = query_op->error_code;
- actual_size_array[*outcount] = query_op->actual_size;
- index_array[*outcount] = i;
- if (user_ptr_array != NULL)
- {
- user_ptr_array[*outcount] = query_op->user_ptr;
- }
- if(query_op->send_recv == BMI_SEND)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, query_op->actual_size, id_array[i]);
- else
- BMI_EVENT_END(PVFS_EVENT_BMI_RECV, query_op->actual_size, id_array[i]);
- dealloc_tcp_method_op(query_op);
- (*outcount)++;
- }
- }
+ if(id_array[i])
+ {
+ /* NOTE: this depends on the user passing in valid id's;
+ * otherwise we segfault.
+ */
+ query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
+ if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
+ BMI_TCP_COMPLETE)
+ {
+ assert(query_op->context_id == context_id);
+ /* this one's done; pop it out */
+ op_list_remove(query_op);
+ error_code_array[*outcount] = query_op->error_code;
+ actual_size_array[*outcount] = query_op->actual_size;
+ index_array[*outcount] = i;
+ if (user_ptr_array != NULL)
+ {
+ user_ptr_array[*outcount] = query_op->user_ptr;
+ }
+ PINT_EVENT_END(
+ (query_op->send_recv == BMI_SEND ?
+ bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
+ bmi_tcp_pid, NULL,
+ query_op->event_id, actual_size_array[*outcount]);
+ dealloc_tcp_method_op(query_op);
+ (*outcount)++;
+ }
+ }
}
gen_mutex_unlock(&interface_mutex);
@@ -1292,7 +1328,8 @@ int BMI_tcp_testcontext(int incount,
* that the next testunexpected call can pick it up without
* delay
*/
- if(!op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
+ if(check_unexpected &&
+ !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
{
gen_mutex_unlock(&interface_mutex);
return(0);
@@ -1308,29 +1345,31 @@ int BMI_tcp_testcontext(int incount,
}
/* pop as many items off of the completion queue as we can */
- while((*outcount < incount) && (query_op =
- op_list_shownext(completion_array[context_id])))
+ while((*outcount < incount) &&
+ (query_op =
+ op_list_shownext(completion_array[context_id])))
{
assert(query_op);
- assert(query_op->context_id == context_id);
+ assert(query_op->context_id == context_id);
- /* this one's done; pop it out */
- op_list_remove(query_op);
- error_code_array[*outcount] = query_op->error_code;
- actual_size_array[*outcount] = query_op->actual_size;
- out_id_array[*outcount] = query_op->op_id;
- if (user_ptr_array != NULL)
- {
- user_ptr_array[*outcount] = query_op->user_ptr;
- }
- if(query_op->send_recv == BMI_SEND)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, query_op->actual_size, query_op->op_id);
- else
- BMI_EVENT_END(PVFS_EVENT_BMI_RECV, query_op->actual_size, query_op->op_id);
+ /* this one's done; pop it out */
+ op_list_remove(query_op);
+ error_code_array[*outcount] = query_op->error_code;
+ actual_size_array[*outcount] = query_op->actual_size;
+ out_id_array[*outcount] = query_op->op_id;
+ if (user_ptr_array != NULL)
+ {
+ user_ptr_array[*outcount] = query_op->user_ptr;
+ }
- dealloc_tcp_method_op(query_op);
+ PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
+ bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
+ bmi_tcp_pid, NULL, query_op->event_id,
+ query_op->actual_size);
+
+ dealloc_tcp_method_op(query_op);
query_op = NULL;
- (*outcount)++;
+ (*outcount)++;
}
gen_mutex_unlock(&interface_mutex);
@@ -1356,7 +1395,8 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
struct tcp_msg_header my_header;
int ret = -1;
@@ -1385,14 +1425,9 @@ int BMI_tcp_post_send_list(bmi_op_id_t *
gen_mutex_lock(&interface_mutex);
- ret = BMI_tcp_post_send_generic(id, dest, buffer_list,
- size_list, list_count, buffer_type,
- my_header, user_ptr, context_id);
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, *id);
-
+ ret = tcp_post_send_generic(id, dest, buffer_list,
+ size_list, list_count, buffer_type,
+ my_header, user_ptr, context_id, hints);
gen_mutex_unlock(&interface_mutex);
return(ret);
}
@@ -1415,7 +1450,8 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
int ret = -1;
@@ -1427,14 +1463,9 @@ int BMI_tcp_post_recv_list(bmi_op_id_t *
gen_mutex_lock(&interface_mutex);
ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
- list_count, total_expected_size,
- total_actual_size, buffer_type, tag, user_ptr,
- context_id);
-
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_RECV, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_RECV, *total_actual_size, *id);
+ list_count, total_expected_size,
+ total_actual_size, buffer_type, tag, user_ptr,
+ context_id, hints);
gen_mutex_unlock(&interface_mutex);
return (ret);
@@ -1458,7 +1489,8 @@ int BMI_tcp_post_sendunexpected_list(bmi
enum bmi_buffer_type buffer_type,
bmi_msg_tag_t tag,
void *user_ptr,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
struct tcp_msg_header my_header;
int ret = -1;
@@ -1478,13 +1510,9 @@ int BMI_tcp_post_sendunexpected_list(bmi
gen_mutex_lock(&interface_mutex);
- ret = BMI_tcp_post_send_generic(id, dest, buffer_list,
- size_list, list_count, buffer_type,
- my_header, user_ptr, context_id);
- if(ret >= 0)
- BMI_EVENT_START(PVFS_EVENT_BMI_SEND, *id);
- if(ret == 1)
- BMI_EVENT_END(PVFS_EVENT_BMI_SEND, total_size, *id);
+ ret = tcp_post_send_generic(id, dest, buffer_list,
+ size_list, list_count, buffer_type,
+ my_header, user_ptr, context_id, hints);
gen_mutex_unlock(&interface_mutex);
return(ret);
@@ -1742,12 +1770,12 @@ int BMI_tcp_query_addr_range(bmi_method_
/* Invalid network address */
if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
{
- gossip_lerr("Invalid network specification: %s\n", tcp_wildcard);
+ gossip_err("Invalid network specification: %s\n", tcp_wildcard);
return -EINVAL;
}
/* Matches the subnet mask! */
if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
- == network_addr.sin_addr.s_addr)
+ == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
{
return 1;
}
@@ -1847,7 +1875,7 @@ void tcp_forget_addr(bmi_method_addr_p m
int error_code)
{
struct tcp_addr* tcp_addr_data = map->method_data;
- PVFS_BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
+ BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
int tmp_outcount;
bmi_method_addr_p tmp_addr;
int tmp_status;
@@ -1858,8 +1886,11 @@ void tcp_forget_addr(bmi_method_addr_p m
/* perform a test to force the socket collection to act on the remove
* request before continuing
*/
- BMI_socket_collection_testglobal(tcp_socket_collection_p,
- 0, &tmp_outcount, &tmp_addr, &tmp_status, 0, &interface_mutex);
+ if(!sc_test_busy)
+ {
+ BMI_socket_collection_testglobal(tcp_socket_collection_p,
+ 0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
+ }
}
tcp_shutdown_addr(map);
@@ -2223,7 +2254,8 @@ static int enqueue_operation(op_list_p t
void *user_ptr,
bmi_size_t actual_size,
bmi_size_t expected_size,
- bmi_context_id context_id)
+ bmi_context_id context_id,
+ int32_t eid)
{
method_op_p new_method_op = NULL;
struct tcp_op *tcp_op_data = NULL;
@@ -2238,6 +2270,7 @@ static int enqueue_operation(op_list_p t
}
*id = new_method_op->op_id;
+ new_method_op->event_id = eid;
/* set the fields */
new_method_op->send_recv = send_recv;
@@ -2354,16 +2387,17 @@ static int enqueue_operation(op_list_p t
* completion, -errno on failure
*/
static int tcp_post_recv_generic(bmi_op_id_t * id,
- bmi_method_addr_p src,
- void *const *buffer_list,
- const bmi_size_t *size_list,
- int list_count,
- bmi_size_t expected_size,
- bmi_size_t * actual_size,
- enum bmi_buffer_type buffer_type,
- bmi_msg_tag_t tag,
- void *user_ptr,
- bmi_context_id context_id)
+ bmi_method_addr_p src,
+ void *const *buffer_list,
+ const bmi_size_t *size_list,
+ int list_count,
+ bmi_size_t expected_size,
+ bmi_size_t * actual_size,
+ enum bmi_buffer_type buffer_type,
+ bmi_msg_tag_t tag,
+ void *user_ptr,
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
method_op_p query_op = NULL;
int ret = -1;
@@ -2374,6 +2408,16 @@ static int tcp_post_recv_generic(bmi_op_
bmi_size_t copy_size = 0;
bmi_size_t total_copied = 0;
int i;
+ PINT_event_id eid = 0;
+
+ PINT_EVENT_START(
+ bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
+ PINT_HINT_GET_CLIENT_ID(hints),
+ PINT_HINT_GET_REQUEST_ID(hints),
+ PINT_HINT_GET_RANK(hints),
+ PINT_HINT_GET_HANDLE(hints),
+ PINT_HINT_GET_OP_ID(hints),
+ expected_size);
tcp_addr_data = src->method_data;
@@ -2382,9 +2426,11 @@ static int tcp_post_recv_generic(bmi_op_
*/
if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
{
- gossip_debug(GOSSIP_BMI_DEBUG_TCP,
- "Warning: BMI communication attempted on an address in failure mode.\n");
- return(tcp_addr_data->addr_error);
+ gossip_debug(
+ GOSSIP_BMI_DEBUG_TCP,
+ "Warning: BMI communication attempted "
+ "on an address in failure mode.\n");
+ return(tcp_addr_data->addr_error);
}
/* lets make sure that the message hasn't already been fully
@@ -2397,163 +2443,170 @@ static int tcp_post_recv_generic(bmi_op_
key.msg_tag_yes = 1;
query_op =
- op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
+ op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
if (query_op)
{
- /* make sure it isn't too big */
- if (query_op->actual_size > expected_size)
- {
- gossip_err("Error: message ordering violation;\n");
- gossip_err("Error: message too large for next buffer.\n");
- return (bmi_tcp_errno_to_pvfs(-EPROTO));
- }
+ /* make sure it isn't too big */
+ if (query_op->actual_size > expected_size)
+ {
+ gossip_err("Error: message ordering violation;\n");
+ gossip_err("Error: message too large for next buffer.\n");
+ return (bmi_tcp_errno_to_pvfs(-EPROTO));
+ }
- /* whoohoo- it is already done! */
- /* copy buffer out to list segments; handle short case */
- for (i = 0; i < list_count; i++)
- {
- copy_size = size_list[i];
- if (copy_size + total_copied > query_op->actual_size)
- {
- copy_size = query_op->actual_size - total_copied;
- }
- memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
- total_copied), copy_size);
- total_copied += copy_size;
- if (total_copied == query_op->actual_size)
- {
- break;
- }
- }
- /* copy out to correct memory regions */
- (*actual_size) = query_op->actual_size;
- free(query_op->buffer);
- *id = 0;
- op_list_remove(query_op);
- dealloc_tcp_method_op(query_op);
- return (1);
+ /* whoohoo- it is already done! */
+ /* copy buffer out to list segments; handle short case */
+ for (i = 0; i < list_count; i++)
+ {
+ copy_size = size_list[i];
+ if (copy_size + total_copied > query_op->actual_size)
+ {
+ copy_size = query_op->actual_size - total_copied;
+ }
+ memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
+ total_copied), copy_size);
+ total_copied += copy_size;
+ if (total_copied == query_op->actual_size)
+ {
+ break;
+ }
+ }
+ /* copy out to correct memory regions */
+ (*actual_size) = query_op->actual_size;
+ free(query_op->buffer);
+ *id = 0;
+ op_list_remove(query_op);
+ dealloc_tcp_method_op(query_op);
+ PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
+ *actual_size);
+
+ return (1);
}
/* look for a message that is already being received */
query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
if (query_op)
{
- tcp_op_data = query_op->method_data;
+ tcp_op_data = query_op->method_data;
}
/* see if it is being buffered into a temporary memory region */
if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
{
- /* make sure it isn't too big */
- if (query_op->actual_size > expected_size)
- {
- gossip_err("Error: message ordering violation;\n");
- gossip_err("Error: message too large for next buffer.\n");
- return (bmi_tcp_errno_to_pvfs(-EPROTO));
- }
-
- /* copy what we have so far into the correct buffers */
- total_copied = 0;
- for (i = 0; i < list_count; i++)
- {
- copy_size = size_list[i];
- if (copy_size + total_copied > query_op->amt_complete)
- {
- copy_size = query_op->amt_complete - total_copied;
- }
- if (copy_size > 0)
- {
- memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
- total_copied), copy_size);
- }
- total_copied += copy_size;
- if (total_copied == query_op->amt_complete)
- {
- query_op->list_index = i;
- query_op->cur_index_complete = copy_size;
- break;
- }
- }
+ /* make sure it isn't too big */
+ if (query_op->actual_size > expected_size)
+ {
+ gossip_err("Error: message ordering violation;\n");
+ gossip_err("Error: message too large for next buffer.\n");
+ return (bmi_tcp_errno_to_pvfs(-EPROTO));
+ }
- /* see if we ended on a buffer boundary */
- if (query_op->cur_index_complete ==
- query_op->size_list[query_op->list_index])
- {
- query_op->list_index++;
- query_op->cur_index_complete = 0;
- }
+ /* copy what we have so far into the correct buffers */
+ total_copied = 0;
+ for (i = 0; i < list_count; i++)
+ {
+ copy_size = size_list[i];
+ if (copy_size + total_copied > query_op->amt_complete)
+ {
+ copy_size = query_op->amt_complete - total_copied;
+ }
+ if (copy_size > 0)
+ {
+ memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
+ total_copied), copy_size);
+ }
+ total_copied += copy_size;
+ if (total_copied == query_op->amt_complete)
+ {
+ query_op->list_index = i;
+ query_op->cur_index_complete = copy_size;
+ break;
+ }
+ }
- /* release the old buffer */
- if (query_op->buffer)
- {
- free(query_op->buffer);
- }
+ /* see if we ended on a buffer boundary */
+ if (query_op->cur_index_complete ==
+ query_op->size_list[query_op->list_index])
+ {
+ query_op->list_index++;
+ query_op->cur_index_complete = 0;
+ }
- *id = query_op->op_id;
- tcp_op_data = query_op->method_data;
- tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+ /* release the old buffer */
+ if (query_op->buffer)
+ {
+ free(query_op->buffer);
+ }
- query_op->list_count = list_count;
- query_op->user_ptr = user_ptr;
- query_op->context_id = context_id;
- /* if there is only one item in the list, then keep the list stored
- * in the op structure. This allows us to use the same code for send
- * and recv as we use for send_list and recv_list, without having to
- * malloc lists for those special cases
- */
- if (list_count == 1)
- {
- query_op->buffer_list = &tcp_op_data->buffer_list_stub;
- query_op->size_list = &tcp_op_data->size_list_stub;
- ((void **)query_op->buffer_list)[0] = buffer_list[0];
- ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
- }
- else
- {
- query_op->buffer_list = buffer_list;
- query_op->size_list = size_list;
- }
+ *id = query_op->op_id;
+ tcp_op_data = query_op->method_data;
+ tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+
+ query_op->list_count = list_count;
+ query_op->user_ptr = user_ptr;
+ query_op->context_id = context_id;
+ /* if there is only one item in the list, then keep the list stored
+ * in the op structure. This allows us to use the same code for send
+ * and recv as we use for send_list and recv_list, without having to
+ * malloc lists for those special cases
+ */
+ if (list_count == 1)
+ {
+ query_op->buffer_list = &tcp_op_data->buffer_list_stub;
+ query_op->size_list = &tcp_op_data->size_list_stub;
+ ((void **)query_op->buffer_list)[0] = buffer_list[0];
+ ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
+ }
+ else
+ {
+ query_op->buffer_list = buffer_list;
+ query_op->size_list = size_list;
+ }
- if (query_op->amt_complete < query_op->actual_size)
- {
- /* try to recv some more data */
- tcp_addr_data = query_op->addr->method_data;
- ret = payload_progress(tcp_addr_data->socket,
- query_op->buffer_list,
- query_op->size_list,
- query_op->list_count,
- query_op->actual_size,
- &(query_op->list_index),
- &(query_op->cur_index_complete),
- BMI_RECV,
- NULL,
- 0);
- if (ret < 0)
- {
+ if (query_op->amt_complete < query_op->actual_size)
+ {
+ /* try to recv some more data */
+ tcp_addr_data = query_op->addr->method_data;
+ ret = payload_progress(tcp_addr_data->socket,
+ query_op->buffer_list,
+ query_op->size_list,
+ query_op->list_count,
+ query_op->actual_size,
+ &(query_op->list_index),
+ &(query_op->cur_index_complete),
+ BMI_RECV,
+ NULL,
+ 0);
+ if (ret < 0)
+ {
PVFS_perror_gossip("Error: payload_progress", ret);
/* payload_progress() returns BMI error codes */
- tcp_forget_addr(query_op->addr, 0, ret);
- return (ret);
- }
+ tcp_forget_addr(query_op->addr, 0, ret);
+ return (ret);
+ }
- query_op->amt_complete += ret;
- }
- assert(query_op->amt_complete <= query_op->actual_size);
- if (query_op->amt_complete == query_op->actual_size)
- {
- /* we are done */
- op_list_remove(query_op);
- *id = 0;
- (*actual_size) = query_op->actual_size;
- dealloc_tcp_method_op(query_op);
- return (1);
- }
- else
- {
- /* there is still more work to do */
- tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
- return (0);
- }
+ query_op->amt_complete += ret;
+ }
+ assert(query_op->amt_complete <= query_op->actual_size);
+ if (query_op->amt_complete == query_op->actual_size)
+ {
+ /* we are done */
+ op_list_remove(query_op);
+ *id = 0;
+ (*actual_size) = query_op->actual_size;
+ dealloc_tcp_method_op(query_op);
+ PINT_EVENT_END(
+ bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
+ 0, *actual_size);
+
+ return (1);
+ }
+ else
+ {
+ /* there is still more work to do */
+ tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
+ return (0);
+ }
}
/* NOTE: if the message was in flight, but not buffering, then
@@ -2566,18 +2619,18 @@ static int tcp_post_recv_generic(bmi_op_
/* if we hit this point we must enqueue */
if (expected_size <= TCP_MODE_EAGER_LIMIT)
{
- bogus_header.mode = TCP_MODE_EAGER;
+ bogus_header.mode = TCP_MODE_EAGER;
}
else
{
- bogus_header.mode = TCP_MODE_REND;
+ bogus_header.mode = TCP_MODE_REND;
}
bogus_header.tag = tag;
ret = enqueue_operation(op_list_array[IND_RECV],
- BMI_RECV, src, buffer_list, size_list,
- list_count, 0, 0, id, BMI_TCP_INPROGRESS,
- bogus_header, user_ptr, 0,
- expected_size, context_id);
+ BMI_RECV, src, buffer_list, size_list,
+ list_count, 0, 0, id, BMI_TCP_INPROGRESS,
+ bogus_header, user_ptr, 0,
+ expected_size, context_id, eid);
/* just for safety; this field isn't valid to the caller anymore */
(*actual_size) = 0;
/* TODO: figure out why this causes deadlocks; observable in 2
@@ -2588,11 +2641,11 @@ static int tcp_post_recv_generic(bmi_op_
#if 0
if (ret >= 0)
{
- /* go ahead and try to do some work while we are in this
- * function since we appear to be backlogged. Make sure that
- * we do not wait in the poll, however.
- */
- ret = tcp_do_work(0);
+ /* go ahead and try to do some work while we are in this
+ * function since we appear to be backlogged. Make sure that
+ * we do not wait in the poll, however.
+ */
+ ret = tcp_do_work(0);
}
#endif
return (ret);
@@ -2688,17 +2741,56 @@ static int tcp_do_work(int max_idle_time
int busy_flag = 1;
struct timespec req;
struct tcp_addr* tcp_addr_data = NULL;
+ struct timespec wait_time;
+ struct timeval start;
- /* now we need to poll and see what to work on */
- /* drop mutex while we make this call */
+ if(sc_test_busy)
+ {
+ /* another thread is already polling or working on sockets */
+ if(max_idle_time == 0)
+ {
+ /* we don't want to spend time waiting on it; return
+ * immediately.
+ */
+ return(0);
+ }
+
+ /* Sleep until working thread thread signals that it has finished
+ * its work and then return. No need for this thread to poll;
+ * the other thread may have already finished what we wanted.
+ * This condition wait is used strictly as a best effort to
+ * prevent busy spin. We'll sort out the results later.
+ */
+ gettimeofday(&start, NULL);
+ wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
+ wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
+ if (wait_time.tv_nsec > 1000000000)
+ {
+ wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
+ wait_time.tv_sec++;
+ }
+ gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
+ return(0);
+ }
+
+ /* this thread has gained control of the polling. */
+ sc_test_busy = 1;
gen_mutex_unlock(&interface_mutex);
+
+ /* our turn to look at the socket collection */
ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
TCP_WORK_METRIC, &socket_count,
addr_array, status_array,
- max_idle_time, &interface_mutex);
+ max_idle_time);
+
gen_mutex_lock(&interface_mutex);
+ sc_test_busy = 0;
+
if (ret < 0)
{
+ /* wake up anyone else who might have been waiting */
+ gen_cond_broadcast(&interface_cond);
+ PVFS_perror_gossip("Error: socket collection:", ret);
/* BMI_socket_collection_testglobal() returns BMI error code */
return (ret);
}
@@ -2723,7 +2815,7 @@ static int tcp_do_work(int max_idle_time
ret = tcp_do_work_error(addr_array[i]);
if (ret < 0)
{
- return (ret);
+ PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
}
}
else
@@ -2733,8 +2825,8 @@ static int tcp_do_work(int max_idle_time
ret = tcp_do_work_send(addr_array[i], &stall_flag);
if (ret < 0)
{
- return (ret);
- }
+ PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
+ }
if(!stall_flag)
busy_flag = 0;
}
@@ -2743,7 +2835,7 @@ static int tcp_do_work(int max_idle_time
ret = tcp_do_work_recv(addr_array[i], &stall_flag);
if (ret < 0)
{
- return (ret);
+ PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
}
if(!stall_flag)
busy_flag = 0;
@@ -2768,6 +2860,8 @@ static int tcp_do_work(int max_idle_time
gen_mutex_lock(&interface_mutex);
}
+ /* wake up anyone else who might have been waiting */
+ gen_cond_broadcast(&interface_cond);
return (0);
}
@@ -2899,6 +2993,7 @@ static int tcp_do_work_recv(bmi_method_a
int tmp_errno;
int tmp;
bmi_size_t old_amt_complete = 0;
+ time_t current_time;
*stall_flag = 1;
@@ -2993,10 +3088,25 @@ static int tcp_do_work_recv(bmi_method_a
if (ret < TCP_ENC_HDR_SIZE)
{
- /* header not ready yet */
+ current_time = time(NULL);
+ if(!tcp_addr_data->short_header_timer)
+ {
+ tcp_addr_data->short_header_timer = current_time;
+ }
+ else if((current_time - tcp_addr_data->short_header_timer) >
+ BMI_TCP_HEADER_WAIT_SECONDS)
+ {
+ gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
+ BMI_TCP_HEADER_WAIT_SECONDS);
+ tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
+ return (0);
+ }
+
+ /* header not ready yet, but we will keep hoping */
return (0);
}
+ tcp_addr_data->short_header_timer = 0;
*stall_flag = 0;
gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
@@ -3462,7 +3572,7 @@ static int tcp_allow_trusted(struct sock
{
/* check with all the masks */
if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr)
- != gtcp_allowed_connection->network[i].s_addr)
+ != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
{
continue;
}
@@ -3489,7 +3599,7 @@ port_check:
return 0;
}
/* no good */
- gossip_lerr("Rejecting client %s on port %d: %s\n",
+ gossip_err("Rejecting client %s on port %d: %s\n",
peer_hostname, peer_port, bad_errors[what_failed]);
return -1;
}
@@ -3557,14 +3667,7 @@ static int tcp_accept_init(int *socket,
{
/* Force closure of the connection */
close(*socket);
- errno = EACCES;
- /* FIXME:
- * BIG KLUDGE
- * if we return an error, pvfs2-server's bmi thread simply terminates.
- * hence I am returning 0 here. Need to ask Phil or RobR about this...
- */
- *socket = -1;
- return 0;
+ return (bmi_tcp_errno_to_pvfs(-EACCES));
}
#endif
@@ -3630,31 +3733,52 @@ static void dealloc_tcp_method_op(method
return;
}
-/* BMI_tcp_post_send_generic()
+/* tcp_post_send_generic()
*
* Submits send operations (low level).
*
* returns 0 on success that requires later poll, returns 1 on instant
* completion, -errno on failure
*/
-static int BMI_tcp_post_send_generic(bmi_op_id_t * id,
- bmi_method_addr_p dest,
- const void *const *buffer_list,
- const bmi_size_t *size_list,
- int list_count,
- enum bmi_buffer_type buffer_type,
- struct tcp_msg_header my_header,
- void *user_ptr,
- bmi_context_id context_id)
+static int tcp_post_send_generic(bmi_op_id_t * id,
+ bmi_method_addr_p dest,
+ const void *const *buffer_list,
+ const bmi_size_t *size_list,
+ int list_count,
+ enum bmi_buffer_type buffer_type,
+ struct tcp_msg_header my_header,
+ void *user_ptr,
+ bmi_context_id context_id,
+ PVFS_hint hints)
{
struct tcp_addr *tcp_addr_data = dest->method_data;
method_op_p query_op = NULL;
int ret = -1;
+ bmi_size_t total_size = 0;
bmi_size_t amt_complete = 0;
bmi_size_t env_amt_complete = 0;
struct op_list_search_key key;
int list_index = 0;
bmi_size_t cur_index_complete = 0;
+ PINT_event_id eid = 0;
+
+ if(PINT_EVENT_ENABLED)
+ {
+ int i = 0;
+ for(; i < list_count; ++i)
+ {
+ total_size += size_list[i];
+ }
+ }
+
+ PINT_EVENT_START(
+ bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
+ PINT_HINT_GET_CLIENT_ID(hints),
+ PINT_HINT_GET_REQUEST_ID(hints),
+ PINT_HINT_GET_RANK(hints),
+ PINT_HINT_GET_HANDLE(hints),
+ PINT_HINT_GET_OP_ID(hints),
+ total_size);
/* Three things can happen here:
* a) another op is already in queue for the address, so we just
@@ -3684,13 +3808,14 @@ static int BMI_tcp_post_send_generic(bmi
query_op = op_list_search(op_list_array[IND_SEND], &key);
if (query_op)
{
- /* queue up operation */
- ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
- dest, (void **) buffer_list,
- size_list, list_count, 0, 0,
- id, BMI_TCP_INPROGRESS, my_header, user_ptr,
- my_header.size, 0,
- context_id);
+ /* queue up operation */
+ ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
+ dest, (void **) buffer_list,
+ size_list, list_count, 0, 0,
+ id, BMI_TCP_INPROGRESS, my_header, user_ptr,
+ my_header.size, 0,
+ context_id,
+ eid);
/* TODO: is this causing deadlocks? See similar call in recv
* path for another example. This particular one seems to be an
@@ -3722,6 +3847,7 @@ static int BMI_tcp_post_send_generic(bmi
gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
/* tcp_sock_init() returns BMI error code */
tcp_forget_addr(dest, 0, ret);
+ PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
return (ret);
}
@@ -3746,7 +3872,8 @@ static int BMI_tcp_post_send_generic(bmi
list_count, 0, 0,
id, BMI_TCP_INPROGRESS, my_header, user_ptr,
my_header.size, 0,
- context_id);
+ context_id,
+ eid);
if(ret < 0)
{
gossip_err("Error: enqueue_operation() returned: %d\n", ret);
@@ -3765,6 +3892,7 @@ static int BMI_tcp_post_send_generic(bmi
PVFS_perror_gossip("Error: payload_progress", ret);
/* payload_progress() returns BMI error codes */
tcp_forget_addr(dest, 0, ret);
+ PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
return (ret);
}
@@ -3773,21 +3901,23 @@ static int BMI_tcp_post_send_generic(bmi
assert(amt_complete <= my_header.size);
if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
{
- /* we are already done */
- return (1);
+ /* we are already done */
+ PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
+ NULL, eid, 0, amt_complete);
+ return (1);
}
/* queue up the remainder */
ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
- dest, (void **) buffer_list,
- size_list, list_count,
- amt_complete, env_amt_complete, id,
- BMI_TCP_INPROGRESS, my_header, user_ptr,
- my_header.size, 0, context_id);
+ dest, (void **) buffer_list,
+ size_list, list_count,
+ amt_complete, env_amt_complete, id,
+ BMI_TCP_INPROGRESS, my_header, user_ptr,
+ my_header.size, 0, context_id, eid);
if(ret < 0)
{
- gossip_err("Error: enqueue_operation() returned: %d\n", ret);
+ gossip_err("Error: enqueue_operation() returned: %d\n", ret);
}
return (ret);
}
Index: module.mk.in
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/module.mk.in,v
diff -p -u -r1.5 -r1.5.8.1
--- module.mk.in 22 Jul 2007 16:02:16 -0000 1.5
+++ module.mk.in 25 Aug 2009 17:56:11 -0000 1.5.8.1
@@ -11,13 +11,19 @@ SERVERSRC += \
$(DIR)/bmi-tcp.c \
$(DIR)/sockio.c
+LIBBMISRC += \
+ $(DIR)/bmi-tcp.c \
+ $(DIR)/sockio.c
+
ifdef BUILD_EPOLL
LIBSRC += $(DIR)/socket-collection-epoll.c
SERVERSRC += $(DIR)/socket-collection-epoll.c
+LIBBMISRC += $(DIR)/socket-collection-epoll.c
MODCFLAGS_$(DIR)/bmi-tcp.c := -D__PVFS2_USE_EPOLL__
else
LIBSRC += $(DIR)/socket-collection.c
SERVERSRC += $(DIR)/socket-collection.c
+LIBBMISRC += $(DIR)/socket-collection.c
endif
endif # BUILD_BMI_TCP
Index: socket-collection-epoll.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection-epoll.c,v
diff -p -u -r1.6 -r1.6.8.1
--- socket-collection-epoll.c 6 Nov 2007 23:08:36 -0000 1.6
+++ socket-collection-epoll.c 25 Aug 2009 17:56:11 -0000 1.6.8.1
@@ -65,11 +65,6 @@ socket_collection_p BMI_socket_collectio
return(NULL);
}
- gen_mutex_init(&tmp_scp->mutex);
- gen_mutex_init(&tmp_scp->queue_mutex);
-
- INIT_QLIST_HEAD(&tmp_scp->remove_queue);
- INIT_QLIST_HEAD(&tmp_scp->add_queue);
tmp_scp->server_socket = new_server_socket;
if(new_server_socket > -1)
@@ -82,10 +77,6 @@ socket_collection_p BMI_socket_collectio
if(ret < 0 && errno != EEXIST)
{
gossip_err("Error: epoll_ctl() failure: %s.\n", strerror(errno));
-#if 0
- gen_mutex_destroy(&tmp_scp->mutex);
- gen_mutex_destroy(&tmp_scp->queue_mutex);
-#endif
free(tmp_scp);
return(NULL);
}
@@ -94,48 +85,6 @@ socket_collection_p BMI_socket_collectio
return (tmp_scp);
}
-/* socket_collection_queue()
- *
- * queues a tcp method_addr for addition or removal from the collection.
- *
- * returns 0 on success, -errno on failure.
- */
-void BMI_socket_collection_queue(socket_collection_p scp,
- bmi_method_addr_p map, struct qlist_head* queue)
-{
- struct qlist_head* iterator = NULL;
- struct qlist_head* scratch = NULL;
- struct tcp_addr* tcp_addr_data = NULL;
-
- /* make sure that this address isn't already slated for addition/removal */
- qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
- {
- tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
- if(tcp_addr_data->map == map)
- {
- qlist_del(&tcp_addr_data->sc_link);
- break;
- }
- }
- qlist_for_each_safe(iterator, scratch, &scp->add_queue)
- {
- tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
- if(tcp_addr_data->map == map)
- {
- qlist_del(&tcp_addr_data->sc_link);
- break;
- }
- }
-
- /* add it on to the appropriate queue */
- tcp_addr_data = map->method_data;
- /* add to head, we are likely to access it again soon */
- qlist_add(&tcp_addr_data->sc_link, queue);
-
- return;
-}
-
-
/* socket_collection_finalize()
*
* destroys a socket collection. IMPORTANT: It DOES NOT destroy the
@@ -146,10 +95,6 @@ void BMI_socket_collection_queue(socket_
*/
void BMI_socket_collection_finalize(socket_collection_p scp)
{
-#if 0
- gen_mutex_destroy(&scp->mutex);
- gen_mutex_destroy(&scp->queue_mutex);
-#endif
free(scp);
return;
}
@@ -170,112 +115,19 @@ int BMI_socket_collection_testglobal(soc
int *outcount,
bmi_method_addr_p * maps,
int * status,
- int poll_timeout,
- gen_mutex_t* external_mutex)
+ int poll_timeout)
{
- struct qlist_head* iterator = NULL;
- struct qlist_head* scratch = NULL;
struct tcp_addr* tcp_addr_data = NULL;
int ret = -1;
int old_errno;
int tmp_count;
int i;
- int skip_flag;
-#ifndef __PVFS2_JOB_THREADED__
- struct epoll_event event;
-#endif
/* init the outgoing arguments for safety */
*outcount = 0;
memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
memset(status, 0, (sizeof(int) * incount));
- gen_mutex_lock(&scp->mutex);
-
-#ifndef __PVFS2_JOB_THREADED__
- gen_mutex_lock(&scp->queue_mutex);
-
- /* look for addresses slated for removal */
- qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
- {
- tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
- qlist_del(&tcp_addr_data->sc_link);
-
-
- /* take out of the epoll set */
- if(tcp_addr_data->sc_index > -1)
- {
- memset(&event, 0, sizeof(event));
- event.events = 0;
- event.data.ptr = tcp_addr_data->map;
-
- ret = epoll_ctl(scp->epfd, EPOLL_CTL_DEL, tcp_addr_data->socket,
- &event);
-
- if(ret < 0 && errno != ENOENT)
- {
- /* TODO: error handling */
- gossip_lerr("Error: epoll_ctl() failure: %s\n",
- strerror(errno));
- assert(0);
- }
-
- tcp_addr_data->sc_index = -1;
- tcp_addr_data->write_ref_count = 0;
- }
- }
-
- /* look for addresses slated for addition */
- qlist_for_each_safe(iterator, scratch, &scp->add_queue)
- {
- tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
- qlist_del(&tcp_addr_data->sc_link);
-
- if(tcp_addr_data->sc_index > -1)
- {
- memset(&event, 0, sizeof(event));
- /* update existing entry */
- event.data.ptr = tcp_addr_data->map;
- event.events = (EPOLLIN|EPOLLERR|EPOLLHUP);
- if(tcp_addr_data->write_ref_count > 0)
- event.events |= EPOLLOUT;
-
- ret = epoll_ctl(scp->epfd, EPOLL_CTL_MOD, tcp_addr_data->socket,
- &event);
-
- if(ret < 0 && errno != ENOENT)
- {
- /* TODO: error handling */
- gossip_lerr("Error: epoll_ctl() failure: %s\n",
- strerror(errno));
- assert(0);
- }
- }
- else
- {
- /* new entry */
- tcp_addr_data->sc_index = 1;
-
- memset(&event, 0, sizeof(event));
- event.data.ptr = tcp_addr_data->map;
- event.events = (EPOLLIN|EPOLLERR|EPOLLHUP);
- if(tcp_addr_data->write_ref_count > 0)
- event.events |= EPOLLOUT;
-
- ret = epoll_ctl(scp->epfd, EPOLL_CTL_ADD, tcp_addr_data->socket,
- &event);
- if(ret < 0 && errno != EEXIST)
- {
- /* TODO: error handling */
- gossip_lerr("Error: epoll_ctl() failure: %s\n",
- strerror(errno));
- assert(0);
- }
- }
- }
- gen_mutex_unlock(&scp->queue_mutex);
-#endif
-
/* actually do the epoll_wait() here */
do
{
@@ -291,14 +143,12 @@ int BMI_socket_collection_testglobal(soc
if(ret < 0)
{
- gen_mutex_unlock(&scp->mutex);
return(-old_errno);
}
/* nothing ready, just return */
if(ret == 0)
{
- gen_mutex_unlock(&scp->mutex);
return(0);
}
@@ -307,22 +157,6 @@ int BMI_socket_collection_testglobal(soc
for(i=0; i<tmp_count; i++)
{
assert(scp->event_array[i].events);
- skip_flag = 0;
-
- /* make sure this addr hasn't been removed */
- gen_mutex_lock(&scp->queue_mutex);
- qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
- {
- tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
- if(tcp_addr_data->map == scp->event_array[i].data.ptr)
- {
- skip_flag = 1;
- break;
- }
- }
- gen_mutex_unlock(&scp->queue_mutex);
- if(skip_flag)
- continue;
if(scp->event_array[i].events & ERRMASK)
status[*outcount] |= SC_ERROR_BIT;
@@ -350,8 +184,6 @@ int BMI_socket_collection_testglobal(soc
*outcount = (*outcount) + 1;
}
-
- gen_mutex_unlock(&scp->mutex);
return (0);
}
Index: socket-collection-epoll.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection-epoll.h,v
diff -p -u -r1.5 -r1.5.2.1
--- socket-collection-epoll.h 11 Feb 2008 19:32:00 -0000 1.5
+++ socket-collection-epoll.h 25 Aug 2009 17:56:11 -0000 1.5.2.1
@@ -30,14 +30,8 @@
struct socket_collection
{
- gen_mutex_t mutex;
-
int epfd;
- gen_mutex_t queue_mutex;
- struct qlist_head add_queue;
- struct qlist_head remove_queue;
-
struct epoll_event event_array[BMI_EPOLL_MAX_PER_CYCLE];
int server_socket;
@@ -52,58 +46,10 @@ enum
};
socket_collection_p BMI_socket_collection_init(int new_server_socket);
-void BMI_socket_collection_queue(socket_collection_p scp,
- bmi_method_addr_p map, struct qlist_head* queue);
/* the bmi_tcp code may try to add a socket to the collection before
* it is fully connected, just ignore in this case
*/
-/* TODO: maybe optimize later; with epoll it is safe to add a new descriptor
- * while a poll is in progress, so we could skip lock and queue in some
- * cases.
- */
-#ifndef __PVFS2_JOB_THREADED__
-
-#define BMI_socket_collection_add(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- if(tcp_data->socket > -1){ \
- gen_mutex_lock(&((s)->queue_mutex)); \
- BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
- } \
-} while(0)
-
-#define BMI_socket_collection_remove(s, m) \
-do { \
- gen_mutex_lock(&((s)->queue_mutex)); \
- BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-/* we _must_ have a valid socket at this point if we want to write data */
-#define BMI_socket_collection_add_write_bit(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- assert(tcp_data->socket > -1); \
- gen_mutex_lock(&((s)->queue_mutex)); \
- tcp_data->write_ref_count++; \
- BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#define BMI_socket_collection_remove_write_bit(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- gen_mutex_lock(&((s)->queue_mutex)); \
- tcp_data->write_ref_count--; \
- assert(tcp_data->write_ref_count > -1); \
- BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#else
-
#define BMI_socket_collection_add(s, m) \
do { \
struct tcp_addr* tcp_data = (m)->method_data; \
@@ -154,16 +100,13 @@ do { \
}\
} while(0)
-#endif
-
void BMI_socket_collection_finalize(socket_collection_p scp);
int BMI_socket_collection_testglobal(socket_collection_p scp,
int incount,
int *outcount,
bmi_method_addr_p * maps,
int * status,
- int poll_timeout,
- gen_mutex_t* external_mutex);
+ int poll_timeout);
#endif /* __SOCKET_COLLECTION_EPOLL_H */
Index: socket-collection.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection.c,v
diff -p -u -r1.17 -r1.17.8.1
--- socket-collection.c 6 Nov 2007 23:08:36 -0000 1.17
+++ socket-collection.c 25 Aug 2009 17:56:11 -0000 1.17.8.1
@@ -55,7 +55,6 @@ socket_collection_p BMI_socket_collectio
memset(tmp_scp, 0, sizeof(struct socket_collection));
- gen_mutex_init(&tmp_scp->mutex);
gen_mutex_init(&tmp_scp->queue_mutex);
tmp_scp->pollfd_array = (struct
@@ -177,8 +176,7 @@ int BMI_socket_collection_testglobal(soc
int *outcount,
bmi_method_addr_p * maps,
int * status,
- int poll_timeout,
- gen_mutex_t* external_mutex)
+ int poll_timeout)
{
struct qlist_head* iterator = NULL;
struct qlist_head* scratch = NULL;
@@ -202,8 +200,6 @@ do_again:
memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
memset(status, 0, (sizeof(int) * incount));
- gen_mutex_lock(&scp->mutex);
-
gen_mutex_lock(&scp->queue_mutex);
/* look for addresses slated for removal */
@@ -291,14 +287,12 @@ do_again:
if(ret < 0)
{
- gen_mutex_unlock(&scp->mutex);
return(bmi_tcp_errno_to_pvfs(-old_errno));
}
/* nothing ready, just return */
if(ret == 0)
{
- gen_mutex_unlock(&scp->mutex);
return(0);
}
@@ -369,8 +363,6 @@ do_again:
*outcount = (*outcount) + 1;
}
}
-
- gen_mutex_unlock(&scp->mutex);
/* Under the following conditions (i.e. all of them must be true) we go back to redoing poll
* a) There were no outstanding sockets/fds that had data
Index: socket-collection.h
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/socket-collection.h,v
diff -p -u -r1.13 -r1.13.8.1
--- socket-collection.h 6 Nov 2007 23:08:36 -0000 1.13
+++ socket-collection.h 25 Aug 2009 17:56:11 -0000 1.13.8.1
@@ -26,8 +26,6 @@
struct socket_collection
{
- gen_mutex_t mutex;
-
struct pollfd* pollfd_array;
bmi_method_addr_p* addr_array;
int array_max;
@@ -53,50 +51,9 @@ socket_collection_p BMI_socket_collectio
void BMI_socket_collection_queue(socket_collection_p scp,
bmi_method_addr_p map, struct qlist_head* queue);
-#ifndef __PVFS2_JOB_THREADED__
/* the bmi_tcp code may try to add a socket to the collection before
* it is fully connected, just ignore in this case
*/
-#define BMI_socket_collection_add(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- if(tcp_data->socket > -1){ \
- gen_mutex_lock(&((s)->queue_mutex)); \
- BMI_socket_collection_queue(s, m, &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
- } \
-} while(0)
-
-#define BMI_socket_collection_remove(s, m) \
-do { \
- gen_mutex_lock(&((s)->queue_mutex)); \
- BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-/* we _must_ have a valid socket at this point if we want to write data */
-#define BMI_socket_collection_add_write_bit(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- assert(tcp_data->socket > -1); \
- gen_mutex_lock(&((s)->queue_mutex)); \
- tcp_data->write_ref_count++; \
- BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#define BMI_socket_collection_remove_write_bit(s, m) \
-do { \
- struct tcp_addr* tcp_data = (m)->method_data; \
- gen_mutex_lock(&((s)->queue_mutex)); \
- tcp_data->write_ref_count--; \
- assert(tcp_data->write_ref_count > -1); \
- BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \
- gen_mutex_unlock(&((s)->queue_mutex)); \
-} while(0)
-
-#else
-
/* write a byte on the pipe_fd[1] so that poll breaks out in case it is idling */
#define BMI_socket_collection_add(s, m) \
do { \
@@ -144,17 +101,13 @@ do { \
write(s->pipe_fd[1], &c, 1);\
} while(0)
-
-#endif
-
void BMI_socket_collection_finalize(socket_collection_p scp);
int BMI_socket_collection_testglobal(socket_collection_p scp,
int incount,
int *outcount,
bmi_method_addr_p * maps,
int * status,
- int poll_timeout,
- gen_mutex_t* external_mutex);
+ int poll_timeout);
#endif /* __SOCKET_COLLECTION_H */
Index: sockio.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/bmi/bmi_tcp/sockio.c,v
diff -p -u -r1.30 -r1.30.2.1
--- sockio.c 22 Feb 2008 18:20:51 -0000 1.30
+++ sockio.c 25 Aug 2009 17:56:11 -0000 1.30.2.1
@@ -196,10 +196,15 @@ int BMI_sockio_nbrecv(int s,
errno = EPIPE;
return (-1);
}
- if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
+ if (ret == -1 && errno == EINTR)
{
goto nbrecv_restart;
}
+ else if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ {
+ /* return what we got so far, this is a nonblocking call */
+ return(len-comp);
+ }
else if (ret == -1)
{
return (-1);
@@ -220,34 +225,30 @@ int BMI_sockio_nbrecv(int s,
*/
int BMI_sockio_nbpeek(int s, void* buf, int len)
{
- int ret, comp = len;
-
+ int ret;
assert(fcntl(s, F_GETFL, 0) & O_NONBLOCK);
- while (comp)
+ nbpeek_restart:
+ ret = recv(s, buf, len, (MSG_PEEK|DEFAULT_MSG_FLAGS));
+ if(ret == 0)
{
- nbpeek_restart:
- ret = recv(s, buf, comp, (MSG_PEEK|DEFAULT_MSG_FLAGS));
- if (!ret) /* socket closed */
- {
- errno = EPIPE;
- return (-1);
- }
- if (ret == -1 && errno == EWOULDBLOCK)
- {
- return (len - comp); /* return amount completed */
- }
- if (ret == -1 && errno == EINTR)
- {
- goto nbpeek_restart;
- }
- else if (ret == -1)
- {
- return (-1);
- }
- comp -= ret;
+ errno = EPIPE;
+ return (-1);
}
- return (len - comp);
+ else if (ret == -1 && errno == EWOULDBLOCK)
+ {
+ return(0);
+ }
+ else if (ret == -1 && errno == EINTR)
+ {
+ goto nbpeek_restart;
+ }
+ else if (ret == -1)
+ {
+ return (-1);
+ }
+
+ return(ret);
}
More information about the Pvfs2-cvs
mailing list