1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2016 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2007-2008 Sun Microsystems, Inc. All rights reserved.
13 * Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
14 * Copyright (c) 2014 Intel, Inc. All rights reserved.
15 * Copyright (c) 2015 Research Organization for Information Science
16 * and Technology (RIST). All rights reserved.
17 * $COPYRIGHT$
18 *
19 * Additional copyrights may follow
20 *
21 * $HEADER$
22 *
23 */
24
25 #include "opal_config.h"
26
27 #include <stdlib.h>
28 #include <string.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #include "opal/opal_socket_errno.h"
33 #ifdef HAVE_SYS_TYPES_H
34 #include <sys/types.h>
35 #endif
36 #ifdef HAVE_FCNTL_H
37 #include <fcntl.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42 #ifdef HAVE_NETINET_TCP_H
43 #include <netinet/tcp.h>
44 #endif
45 #ifdef HAVE_ARPA_INET_H
46 #include <arpa/inet.h>
47 #endif
48 #ifdef HAVE_SYS_TIME_H
49 #include <sys/time.h>
50 #endif /* HAVE_SYS_TIME_H */
51 #include <time.h>
52
53 #include "opal/mca/event/event.h"
54 #include "opal/util/net.h"
55 #include "opal/util/show_help.h"
56 #include "opal/util/proc.h"
57 #include "opal/mca/btl/base/btl_base_error.h"
58
59 #include "btl_tcp.h"
60 #include "btl_tcp_endpoint.h"
61 #include "btl_tcp_proc.h"
62 #include "btl_tcp_frag.h"
63 #include "btl_tcp_addr.h"
64
65 /*
66 * Magic ID string send during connect/accept handshake
67 */
68
69 const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH] = "OPAL-TCP-BTL";
70
71 /*
72 * Initialize state of the endpoint instance.
73 *
74 */
mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t * endpoint)75 static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
76 {
77 endpoint->endpoint_btl = NULL;
78 endpoint->endpoint_proc = NULL;
79 endpoint->endpoint_addr = NULL;
80 endpoint->endpoint_sd = -1;
81 endpoint->endpoint_sd_next = -1;
82 endpoint->endpoint_send_frag = 0;
83 endpoint->endpoint_recv_frag = 0;
84 endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
85 endpoint->endpoint_retries = 0;
86 endpoint->endpoint_nbo = false;
87 #if MCA_BTL_TCP_ENDPOINT_CACHE
88 endpoint->endpoint_cache = NULL;
89 endpoint->endpoint_cache_pos = NULL;
90 endpoint->endpoint_cache_length = 0;
91 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
92 OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
93 OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
94 OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
95 }
96
97 /*
98 * Destroy a endpoint
99 *
100 */
mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t * endpoint)101 static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
102 {
103 mca_btl_tcp_endpoint_close(endpoint);
104 mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
105 OBJ_DESTRUCT(&endpoint->endpoint_frags);
106 OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
107 OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
108 }
109
110 OBJ_CLASS_INSTANCE(
111 mca_btl_tcp_endpoint_t,
112 opal_list_item_t,
113 mca_btl_tcp_endpoint_construct,
114 mca_btl_tcp_endpoint_destruct);
115
116
117 static void mca_btl_tcp_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
118 static void mca_btl_tcp_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
119 static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t*);
120 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t*);
121 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user);
122 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
123
124 /*
125 * diagnostics
126 */
127
128 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
129
130 #define DEBUG_LENGTH 1024
131 /**
132 * The lack of protection in the mca_btl_tcp_endpoint_dump function is voluntary
133 * so that it can be called regardless of the state of the mutexes. As a result,
134 * when multiple threads work on the same endpoint not only the information
135 * displayed might be inacurate, but when we manipulate the pending fragments we
136 * might access freed memory. Thus, the caller should lock the endpoint prior
137 * to the call.
138 */
139 void
mca_btl_tcp_endpoint_dump(int level,const char * fname,int lineno,const char * funcname,mca_btl_base_endpoint_t * btl_endpoint,bool full_info,const char * msg)140 mca_btl_tcp_endpoint_dump(int level,
141 const char* fname,
142 int lineno,
143 const char* funcname,
144 mca_btl_base_endpoint_t* btl_endpoint,
145 bool full_info,
146 const char* msg)
147 {
148 char outmsg[DEBUG_LENGTH];
149 int sndbuf, rcvbuf, nodelay, flags, used = 0;
150 #if OPAL_ENABLE_IPV6
151 struct sockaddr_storage inaddr;
152 #else
153 struct sockaddr_in inaddr;
154 #endif
155 opal_socklen_t obtlen;
156 opal_socklen_t addrlen = sizeof(inaddr);
157 mca_btl_tcp_frag_t* item;
158
159 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s: ", msg);
160 if (used >= DEBUG_LENGTH) goto out;
161
162 getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
163 #if OPAL_ENABLE_IPV6
164 {
165 char *address;
166 address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
167 if (NULL != address) {
168 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", address);
169 if (used >= DEBUG_LENGTH) goto out;
170 }
171 }
172 #else
173 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", inet_ntoa(inaddr.sin_addr));
174 if (used >= DEBUG_LENGTH) goto out;
175 #endif
176 getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
177 #if OPAL_ENABLE_IPV6
178 {
179 char *address;
180 address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
181 if (NULL != address) {
182 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", address);
183 if (used >= DEBUG_LENGTH) goto out;
184 }
185 }
186 #else
187 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", inet_ntoa(inaddr.sin_addr));
188 if (used >= DEBUG_LENGTH) goto out;
189 #endif
190
191 used = snprintf(outmsg, DEBUG_LENGTH, "[%d", btl_endpoint->endpoint_sd);
192 if (used >= DEBUG_LENGTH) goto out;
193 switch(btl_endpoint->endpoint_state) {
194 case MCA_BTL_TCP_CONNECTING:
195 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connecting");
196 if (used >= DEBUG_LENGTH) goto out;
197 break;
198 case MCA_BTL_TCP_CONNECT_ACK:
199 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "ack");
200 if (used >= DEBUG_LENGTH) goto out;
201 break;
202 case MCA_BTL_TCP_CLOSED:
203 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "close");
204 if (used >= DEBUG_LENGTH) goto out;
205 break;
206 case MCA_BTL_TCP_FAILED:
207 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "failed");
208 if (used >= DEBUG_LENGTH) goto out;
209 break;
210 case MCA_BTL_TCP_CONNECTED:
211 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connected");
212 if (used >= DEBUG_LENGTH) goto out;
213 break;
214 default:
215 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "unknown");
216 if (used >= DEBUG_LENGTH) goto out;
217 break;
218 }
219
220 if( full_info ) {
221 if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
222 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
223 strerror(opal_socket_errno), opal_socket_errno));
224 }
225
226 #if defined(SO_SNDBUF)
227 obtlen = sizeof(sndbuf);
228 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
229 BTL_ERROR(("SO_SNDBUF option: %s (%d)",
230 strerror(opal_socket_errno), opal_socket_errno));
231 }
232 #else
233 sndbuf = -1;
234 #endif
235 #if defined(SO_RCVBUF)
236 obtlen = sizeof(rcvbuf);
237 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
238 BTL_ERROR(("SO_RCVBUF option: %s (%d)",
239 strerror(opal_socket_errno), opal_socket_errno));
240 }
241 #else
242 rcvbuf = -1;
243 #endif
244 #if defined(TCP_NODELAY)
245 obtlen = sizeof(nodelay);
246 if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
247 BTL_ERROR(("TCP_NODELAY option: %s (%d)",
248 strerror(opal_socket_errno), opal_socket_errno));
249 }
250 #else
251 nodelay = 0;
252 #endif
253 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " nodelay %d sndbuf %d rcvbuf %d flags %08x",
254 nodelay, sndbuf, rcvbuf, flags);
255 if (used >= DEBUG_LENGTH) goto out;
256 #if MCA_BTL_TCP_ENDPOINT_CACHE
257 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
258 (void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
259 btl_endpoint->endpoint_cache_length);
260 if (used >= DEBUG_LENGTH) goto out;
261 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
262 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "{%s - retries %d}",
263 (btl_endpoint->endpoint_nbo ? "NBO" : ""), (int)btl_endpoint->endpoint_retries);
264 if (used >= DEBUG_LENGTH) goto out;
265 }
266 used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n");
267 if (used >= DEBUG_LENGTH) goto out;
268
269 if( NULL != btl_endpoint->endpoint_recv_frag )
270 used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_recv_frag, "active recv",
271 &outmsg[used], DEBUG_LENGTH - used);
272 if (used >= DEBUG_LENGTH) goto out;
273
274 if( NULL != btl_endpoint->endpoint_send_frag )
275 used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_send_frag, "active send (inaccurate iov)",
276 &outmsg[used], DEBUG_LENGTH - used);
277 if (used >= DEBUG_LENGTH) goto out;
278 OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
279 used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], DEBUG_LENGTH - used);
280 if (used >= DEBUG_LENGTH) goto out;
281 }
282 out:
283 outmsg[ used >= DEBUG_LENGTH ? (DEBUG_LENGTH-1) : used ] = '\0';
284 opal_output_verbose(level, opal_btl_base_framework.framework_output,
285 "[%s:%d:%s][%s -> %s] %s",
286 fname, lineno, funcname,
287 OPAL_NAME_PRINT(opal_proc_local_get()->proc_name),
288 (NULL != btl_endpoint->endpoint_proc ? OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name) : "unknown remote"),
289 outmsg);
290 }
291 #endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
292
293 /*
294 * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
295 */
296
mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t * btl_endpoint)297 static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
298 {
299 #if MCA_BTL_TCP_ENDPOINT_CACHE
300 assert(NULL == btl_endpoint->endpoint_cache);
301 btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
302 btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
303 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
304
305 opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
306 btl_endpoint->endpoint_sd,
307 OPAL_EV_READ | OPAL_EV_PERSIST,
308 mca_btl_tcp_endpoint_recv_handler,
309 btl_endpoint );
310 /**
311 * In the multi-threaded case, the send event must be persistent in order
312 * to avoid missing the connection notification in send_handler due to
313 * a local handling of the peer process (which holds the lock).
314 */
315 opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
316 btl_endpoint->endpoint_sd,
317 OPAL_EV_WRITE | OPAL_EV_PERSIST,
318 mca_btl_tcp_endpoint_send_handler,
319 btl_endpoint);
320 }
321
322
323 /*
324 * Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
325 * queue the fragment and start the connection as required.
326 */
327
mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t * btl_endpoint,mca_btl_tcp_frag_t * frag)328 int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp_frag_t* frag)
329 {
330 int rc = OPAL_SUCCESS;
331
332 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
333 switch(btl_endpoint->endpoint_state) {
334 case MCA_BTL_TCP_CONNECTING:
335 case MCA_BTL_TCP_CONNECT_ACK:
336 case MCA_BTL_TCP_CLOSED:
337 opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
338 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
339 if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
340 rc = mca_btl_tcp_endpoint_start_connect(btl_endpoint);
341 break;
342 case MCA_BTL_TCP_FAILED:
343 rc = OPAL_ERR_UNREACH;
344 break;
345 case MCA_BTL_TCP_CONNECTED:
346 if (NULL == btl_endpoint->endpoint_send_frag) {
347 if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
348 mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd)) {
349 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
350
351 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
352 if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
353 frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
354 }
355 if( btl_ownership ) {
356 MCA_BTL_TCP_FRAG_RETURN(frag);
357 }
358 MCA_BTL_TCP_ENDPOINT_DUMP(50, btl_endpoint, true, "complete send fragment [endpoint_send]");
359 return 1;
360 } else {
361 btl_endpoint->endpoint_send_frag = frag;
362 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
363 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
364 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
365 }
366 } else {
367 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
368 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
369 opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
370 }
371 break;
372 }
373 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
374 return rc;
375 }
376
377
378 /*
379 * A blocking send on a non-blocking socket. Used to send the small
380 * amount of connection information that identifies the endpoints endpoint.
381 */
382 static int
mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t * btl_endpoint,const void * data,size_t size)383 mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
384 const void* data, size_t size)
385 {
386 int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size);
387 if (ret < 0) {
388 mca_btl_tcp_endpoint_close(btl_endpoint);
389 }
390 return ret;
391 }
392
393 /*
394 * Send the globally unique identifier for this process to a endpoint on
395 * a newly connected socket.
396 */
397 static int
mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t * btl_endpoint)398 mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
399 {
400 opal_process_name_t guid = opal_proc_local_get()->proc_name;
401 OPAL_PROCESS_NAME_HTON(guid);
402
403 mca_btl_tcp_endpoint_hs_msg_t hs_msg;
404 strcpy(hs_msg.magic_id, mca_btl_tcp_magic_id_string);
405 hs_msg.guid = guid;
406
407 if(sizeof(hs_msg) !=
408 mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
409 &hs_msg, sizeof(hs_msg))) {
410 opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
411 true, opal_process_info.nodename,
412 sizeof(hs_msg),
413 "connect ACK failed to send magic-id and guid");
414 return OPAL_ERR_UNREACH;
415 }
416 return OPAL_SUCCESS;
417 }
418
mca_btl_tcp_endpoint_complete_accept(int fd,int flags,void * context)419 static void *mca_btl_tcp_endpoint_complete_accept(int fd, int flags, void *context)
420 {
421 mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t*)context;
422 struct timeval now = {0, 0};
423 int cmpval;
424
425 if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) ) {
426 opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
427 return NULL;
428 }
429 if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) ) {
430 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
431 opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
432 return NULL;
433 }
434
435 if(NULL == btl_endpoint->endpoint_addr) {
436 CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
437 btl_endpoint->endpoint_sd_next = -1;
438 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
439 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
440 if( NULL != btl_endpoint->endpoint_btl->tcp_error_cb ) {
441 btl_endpoint->endpoint_btl->tcp_error_cb(
442 &btl_endpoint->endpoint_btl->super, MCA_BTL_ERROR_FLAGS_NONFATAL,
443 btl_endpoint->endpoint_proc->proc_opal,
444 "The endpoint addr is set to NULL (unsettling)");
445 }
446 return NULL;
447 }
448
449 cmpval = opal_compare_proc(btl_endpoint->endpoint_proc->proc_opal->proc_name,
450 opal_proc_local_get()->proc_name);
451 if((btl_endpoint->endpoint_sd < 0) ||
452 (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
453 cmpval < 0)) {
454 mca_btl_tcp_endpoint_close(btl_endpoint);
455 btl_endpoint->endpoint_sd = btl_endpoint->endpoint_sd_next;
456 btl_endpoint->endpoint_sd_next = -1;
457 if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
458 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, " [endpoint_accept]");
459 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
460 mca_btl_tcp_endpoint_close(btl_endpoint);
461 goto unlock_and_return;
462 }
463 mca_btl_tcp_endpoint_event_init(btl_endpoint);
464 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [endpoint_accept]");
465 opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
466 if( mca_btl_tcp_event_base == opal_sync_event_base ) {
467 /* If no progress thread then raise the awarness of the default progress engine */
468 opal_progress_event_users_increment();
469 }
470 mca_btl_tcp_endpoint_connected(btl_endpoint);
471
472 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "accepted");
473 goto unlock_and_return;
474 }
475 CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
476 btl_endpoint->endpoint_sd_next = -1;
477 unlock_and_return:
478 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
479 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
480 return NULL;
481 }
482
483 /*
484 * Check the state of this endpoint. If the incoming connection request matches
485 * our endpoints address, check the state of our connection:
486 * (1) if a connection has not been attempted, accept the connection
487 * (2) if a connection has not been established, and the endpoints process identifier
488 * is less than the local process, accept the connection
489 * otherwise, reject the connection and continue with the current connection
490 */
491
mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t * btl_endpoint,struct sockaddr * addr,int sd)492 void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
493 struct sockaddr* addr, int sd)
494 {
495 struct timeval now = {0, 0};
496
497 assert(btl_endpoint->endpoint_sd_next == -1);
498 btl_endpoint->endpoint_sd_next = sd;
499
500 opal_event_evtimer_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_accept_event,
501 mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
502 opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
503 }
504
505
506 /*
507 * Remove any event registrations associated with the socket
508 * and update the endpoint state to reflect the connection has
509 * been closed.
510 */
mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t * btl_endpoint)511 void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
512 {
513 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
514 if(btl_endpoint->endpoint_sd < 0)
515 return;
516 btl_endpoint->endpoint_retries++;
517 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
518 opal_event_del(&btl_endpoint->endpoint_recv_event);
519 if( mca_btl_tcp_event_base == opal_sync_event_base ) {
520 /* If no progress thread then lower the awarness of the default progress engine */
521 opal_progress_event_users_decrement();
522 }
523 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
524 opal_event_del(&btl_endpoint->endpoint_send_event);
525
526 #if MCA_BTL_TCP_ENDPOINT_CACHE
527 free( btl_endpoint->endpoint_cache );
528 btl_endpoint->endpoint_cache = NULL;
529 btl_endpoint->endpoint_cache_pos = NULL;
530 btl_endpoint->endpoint_cache_length = 0;
531 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
532
533 CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
534 btl_endpoint->endpoint_sd = -1;
535 /**
536 * If we keep failing to connect to the peer let the caller know about
537 * this situation by triggering all the pending fragments callback and
538 * reporting the error.
539 */
540 if( MCA_BTL_TCP_FAILED == btl_endpoint->endpoint_state ) {
541 mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
542 if( NULL == frag )
543 frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
544 while(NULL != frag) {
545 frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, OPAL_ERR_UNREACH);
546
547 frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
548 }
549 }
550 btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
551 }
552
553 /*
554 * Setup endpoint state to reflect that connection has been established,
555 * and start any pending sends. This function should be called with the
556 * send lock locked.
557 */
558
mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t * btl_endpoint)559 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
560 {
561 /* setup socket options */
562 assert( MCA_BTL_TCP_CONNECTED != btl_endpoint->endpoint_state );
563 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
564 btl_endpoint->endpoint_retries = 0;
565 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
566
567 if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
568 if(NULL == btl_endpoint->endpoint_send_frag)
569 btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
570 opal_list_remove_first(&btl_endpoint->endpoint_frags);
571 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_connected]");
572 opal_event_add(&btl_endpoint->endpoint_send_event, 0);
573 }
574 }
575
576
577 /*
578 * Receive the endpoints globally unique process identification from a newly
579 * connected socket and verify the expected response. If so, move the
580 * socket to a connected state.
581 *
582 * NOTE: The return codes from this function are checked in
583 * mca_btl_tcp_endpoint_recv_handler(). Don't change them here
584 * without also changing the handling in _recv_handler()!
585 */
mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t * btl_endpoint)586 static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
587 {
588 size_t retval, len = strlen(mca_btl_tcp_magic_id_string);;
589 mca_btl_tcp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
590 opal_process_name_t guid;
591
592 mca_btl_tcp_endpoint_hs_msg_t hs_msg;
593 retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
594
595 if (sizeof(hs_msg) != retval) {
596 mca_btl_tcp_endpoint_close(btl_endpoint);
597 if (0 == retval) {
598 /* If we get zero bytes, the peer closed the socket. This
599 can happen when the two peers started the connection
600 protocol simultaneously. Just report the problem
601 upstream. */
602 return OPAL_ERROR;
603 }
604 opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
605 true, opal_process_info.nodename,
606 getpid(), "did not receive entire connect ACK from peer");
607
608 return OPAL_ERR_BAD_PARAM;
609 }
610 if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
611 opal_show_help("help-mpi-btl-tcp.txt", "server did not receive magic string",
612 true, opal_process_info.nodename,
613 getpid(), "client", hs_msg.magic_id,
614 "string value");
615 return OPAL_ERR_BAD_PARAM;
616 }
617
618 guid = hs_msg.guid;
619 OPAL_PROCESS_NAME_NTOH(guid);
620 /* compare this to the expected values */
621 /* TODO: this deserve a little bit more thinking as we are not supposed
622 * to be able to exchange the opal_process_name_t over the network.
623 */
624 if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
625 BTL_ERROR(("received unexpected process identifier %s",
626 OPAL_NAME_PRINT(guid)));
627 mca_btl_tcp_endpoint_close(btl_endpoint);
628 return OPAL_ERR_UNREACH;
629 }
630
631 return OPAL_SUCCESS;
632 }
633
634
mca_btl_tcp_set_socket_options(int sd)635 void mca_btl_tcp_set_socket_options(int sd)
636 {
637 #if defined(TCP_NODELAY)
638 int optval;
639 optval = !mca_btl_tcp_component.tcp_not_use_nodelay;
640 if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
641 BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
642 strerror(opal_socket_errno), opal_socket_errno));
643 }
644 #endif
645 #if defined(SO_SNDBUF)
646 if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
647 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
648 BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
649 strerror(opal_socket_errno), opal_socket_errno));
650 }
651 #endif
652 #if defined(SO_RCVBUF)
653 if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
654 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
655 BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
656 strerror(opal_socket_errno), opal_socket_errno));
657 }
658 #endif
659 }
660
661
662
663 /*
664 * Start a connection to the endpoint. This will likely not complete,
665 * as the socket is set to non-blocking, so register for event
666 * notification of connect completion. On connection we send our
667 * globally unique process identifier to the endpoint and wait for
668 * the endpoint response.
669 */
mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t * btl_endpoint)670 static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
671 {
672 int rc,flags;
673 struct sockaddr_storage endpoint_addr;
674 /* By default consider a IPv4 connection */
675 uint16_t af_family = AF_INET;
676 opal_socklen_t addrlen = sizeof(struct sockaddr_in);
677
678 #if OPAL_ENABLE_IPV6
679 if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
680 af_family = AF_INET6;
681 addrlen = sizeof (struct sockaddr_in6);
682 }
683 #endif
684 assert( btl_endpoint->endpoint_sd < 0 );
685 btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
686 if (btl_endpoint->endpoint_sd < 0) {
687 btl_endpoint->endpoint_retries++;
688 return OPAL_ERR_UNREACH;
689 }
690
691 /* setup socket buffer sizes */
692 mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
693
694 /* setup event callbacks */
695 mca_btl_tcp_endpoint_event_init(btl_endpoint);
696
697 /* setup the socket as non-blocking */
698 if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
699 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
700 true, opal_process_info.nodename,
701 getpid(), "fcntl(sd, F_GETFL, 0)",
702 strerror(opal_socket_errno), opal_socket_errno);
703 /* Upper layer will handler the error */
704 return OPAL_ERR_UNREACH;
705 } else {
706 flags |= O_NONBLOCK;
707 if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) {
708 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
709 true, opal_process_info.nodename,
710 getpid(),
711 "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
712 strerror(opal_socket_errno), opal_socket_errno);
713 /* Upper layer will handler the error */
714 return OPAL_ERR_UNREACH;
715 }
716 }
717
718 /* start the connect - will likely fail with EINPROGRESS */
719 mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
720
721 /* Bind the socket to one of the addresses associated with
722 * this btl module. This sets the source IP to one of the
723 * addresses shared in modex, so that the destination rank
724 * can properly pair btl modules, even in cases where Linux
725 * might do something unexpected with routing */
726 if (endpoint_addr.ss_family == AF_INET) {
727 assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
728 if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
729 sizeof(struct sockaddr_in)) < 0) {
730 BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
731 opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
732 htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
733 strerror(opal_socket_errno), opal_socket_errno));
734
735 CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
736 return OPAL_ERROR;
737 }
738 }
739 #if OPAL_ENABLE_IPV6
740 if (endpoint_addr.ss_family == AF_INET6) {
741 assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr_6);
742 if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr_6,
743 sizeof(struct sockaddr_in6)) < 0) {
744 BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
745 opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
746 htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
747 strerror(opal_socket_errno), opal_socket_errno));
748
749 CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
750 return OPAL_ERROR;
751 }
752 }
753 #endif
754 opal_output_verbose(10, opal_btl_base_framework.framework_output,
755 "btl: tcp: attempting to connect() to %s address %s on port %d",
756 OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
757 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
758 ntohs(btl_endpoint->endpoint_addr->addr_port));
759
760 if(0 == connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen)) {
761 opal_output_verbose(10, opal_btl_base_framework.framework_output,
762 "btl:tcp: connect() to %s:%d completed",
763 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
764 ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
765 /* send our globally unique process identifier to the endpoint */
766 if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
767 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
768 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [start_connect]");
769 opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
770 if( mca_btl_tcp_event_base == opal_sync_event_base ) {
771 /* If no progress thread then raise the awarness of the default progress engine */
772 opal_progress_event_users_increment();
773 }
774 return OPAL_SUCCESS;
775 }
776 /* We connected to the peer, but he close the socket before we got a chance to send our guid */
777 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "dropped connection [start_connect]");
778 } else {
779 /* non-blocking so wait for completion */
780 if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
781 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
782 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
783 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
784 opal_output_verbose(30, opal_btl_base_framework.framework_output,
785 "btl:tcp: would block, so allowing background progress");
786 return OPAL_SUCCESS;
787 }
788 }
789
790 {
791 char *address;
792 address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
793 BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
794 ( "Unable to connect to the peer %s on port %d: %s\n",
795 address,
796 ntohs(btl_endpoint->endpoint_addr->addr_port), strerror(opal_socket_errno) ) );
797 }
798 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
799 mca_btl_tcp_endpoint_close(btl_endpoint);
800 return OPAL_ERR_UNREACH;
801 }
802
803
804 /*
805 * Check the status of the connection. If the connection failed, will retry
806 * later. Otherwise, send this processes identifier to the endpoint on the
807 * newly connected socket.
808 */
mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t * btl_endpoint)809 static int mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
810 {
811 int so_error = 0;
812 opal_socklen_t so_length = sizeof(so_error);
813 struct sockaddr_storage endpoint_addr;
814
815 /* Delete the send event notification, as the next step is waiting for the ack
816 * from the peer. Once this ack is received we will deal with the send notification
817 * accordingly.
818 */
819 opal_event_del(&btl_endpoint->endpoint_send_event);
820
821 mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
822
823 /* check connect completion status */
824 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
825 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
826 true, opal_process_info.nodename,
827 getpid(), "fcntl(sd, F_GETFL, 0)",
828 strerror(opal_socket_errno), opal_socket_errno);
829 BTL_ERROR(("getsockopt() to %s:%d failed: %s (%d)",
830 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
831 ((struct sockaddr_in*) &endpoint_addr)->sin_port,
832 strerror(opal_socket_errno), opal_socket_errno));
833 mca_btl_tcp_endpoint_close(btl_endpoint);
834 return OPAL_ERROR;
835 }
836 if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
837 return OPAL_SUCCESS;
838 }
839 if(so_error != 0) {
840 char *msg;
841 asprintf(&msg, "connect() to %s:%d failed",
842 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
843 ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
844 opal_show_help("help-mpi-btl-tcp.txt", "client connect fail",
845 true, opal_process_info.nodename,
846 getpid(), msg,
847 strerror(opal_socket_errno), opal_socket_errno);
848 free(msg);
849 mca_btl_tcp_endpoint_close(btl_endpoint);
850 return OPAL_ERROR;
851 }
852
853 opal_output_verbose(10, opal_btl_base_framework.framework_output,
854 "btl:tcp: connect() to %s:%d completed (complete_connect), sending connect ACK",
855 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
856 ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
857
858 if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
859 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
860 opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
861 if( mca_btl_tcp_event_base == opal_sync_event_base ) {
862 /* If no progress thread then raise the awarness of the default progress engine */
863 opal_progress_event_users_increment();
864 }
865 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_add(recv) [complete_connect]");
866 return OPAL_SUCCESS;
867 }
868 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, " [complete_connect]");
869 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
870 mca_btl_tcp_endpoint_close(btl_endpoint);
871 return OPAL_ERROR;
872 }
873
874
875 /*
876 * A file descriptor is available/ready for recv. Check the state
877 * of the socket and take the appropriate action.
878 */
879
mca_btl_tcp_endpoint_recv_handler(int sd,short flags,void * user)880 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
881 {
882 mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
883
884 /* Make sure we don't have a race between a thread that remove the
885 * recv event, and one event already scheduled.
886 */
887 if( sd != btl_endpoint->endpoint_sd )
888 return;
889
890 /**
891 * There is an extremely rare race condition here, that can only be
892 * triggered during the initialization. If the two processes start their
893 * connection in same time, one of the processes will have to close it's
894 * previous endpoint (the one opened from the local send). As a result it
895 * might go in btl_endpoint_close and try to delete the recv_event. This
896 * call will go back in the libevent, and in a multithreaded case will try
897 * to lock the event. If another thread noticed the active event (and this
898 * is possible as during the initialization there will be 2 sockets), one
899 * thread might get stuck trying to lock the endpoint_recv_lock (while
900 * holding the event_base lock) while the other thread will try to lock the
901 * event_base lock (while holding the endpoint_recv lock).
902 *
903 * If we can't lock this mutex, it is OK to cancel the receive operation, it
904 * will be eventually triggered again shorthly.
905 */
906 if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) )
907 return;
908
909 switch(btl_endpoint->endpoint_state) {
910 case MCA_BTL_TCP_CONNECT_ACK:
911 {
912 int rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint);
913 if( OPAL_SUCCESS == rc ) {
914 /* we are now connected. Start sending the data */
915 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
916 mca_btl_tcp_endpoint_connected(btl_endpoint);
917 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
918 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "connected");
919 }
920 else if (OPAL_ERR_BAD_PARAM == rc) {
921 /* If we get a BAD_PARAM, it means that it probably wasn't
922 an OMPI process on the other end of the socket (e.g.,
923 the magic string ID failed). So we can probably just
924 close the socket and ignore this connection. */
925 CLOSE_THE_SOCKET(sd);
926 }
927 else {
928 /* Otherwise, it probably *was* an OMPI peer process on
929 the other end, and something bad has probably
930 happened. */
931 mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
932
933 /* Fail up to the PML */
934 if (NULL != m->tcp_error_cb) {
935 m->tcp_error_cb((mca_btl_base_module_t*) m, MCA_BTL_ERROR_FLAGS_FATAL,
936 btl_endpoint->endpoint_proc->proc_opal,
937 "TCP ACK is neither SUCCESS nor ERR (something bad has probably happened)");
938 }
939 }
940 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
941 return;
942 }
943 case MCA_BTL_TCP_CONNECTED:
944 {
945 mca_btl_tcp_frag_t* frag;
946
947 frag = btl_endpoint->endpoint_recv_frag;
948 if(NULL == frag) {
949 if(mca_btl_tcp_module.super.btl_max_send_size >
950 mca_btl_tcp_module.super.btl_eager_limit) {
951 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
952 } else {
953 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
954 }
955
956 if(NULL == frag) {
957 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
958 return;
959 }
960 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
961 }
962
963 #if MCA_BTL_TCP_ENDPOINT_CACHE
964 assert( 0 == btl_endpoint->endpoint_cache_length );
965 data_still_pending_on_endpoint:
966 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
967 /* check for completion of non-blocking recv on the current fragment */
968 if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
969 btl_endpoint->endpoint_recv_frag = frag;
970 } else {
971 btl_endpoint->endpoint_recv_frag = NULL;
972 if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
973 mca_btl_active_message_callback_t* reg;
974 reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
975 reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
976 }
977 #if MCA_BTL_TCP_ENDPOINT_CACHE
978 if( 0 != btl_endpoint->endpoint_cache_length ) {
979 /* If the cache still contain some data we can reuse the same fragment
980 * until we flush it completly.
981 */
982 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
983 goto data_still_pending_on_endpoint;
984 }
985 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
986 MCA_BTL_TCP_FRAG_RETURN(frag);
987 }
988 #if MCA_BTL_TCP_ENDPOINT_CACHE
989 assert( 0 == btl_endpoint->endpoint_cache_length );
990 #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
991 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
992 break;
993 }
994 case MCA_BTL_TCP_CLOSED:
995 /* This is a thread-safety issue. As multiple threads are allowed
996 * to generate events (in the lib event) we endup with several
997 * threads executing the receive callback, when we reach the end
998 * of the MPI_Finalize. The first one will close the connections,
999 * and all others will complain.
1000 */
1001 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1002 break;
1003 default:
1004 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1005 BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
1006 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
1007 mca_btl_tcp_endpoint_close(btl_endpoint);
1008 break;
1009 }
1010 }
1011
1012
1013 /*
1014 * A file descriptor is available/ready for send. Check the state
1015 * of the socket and take the appropriate action.
1016 */
1017
mca_btl_tcp_endpoint_send_handler(int sd,short flags,void * user)1018 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
1019 {
1020 mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
1021
1022 /* if another thread is already here, give up */
1023 if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1024 return;
1025
1026 switch(btl_endpoint->endpoint_state) {
1027 case MCA_BTL_TCP_CONNECTING:
1028 mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
1029 break;
1030 case MCA_BTL_TCP_CONNECTED:
1031 /* complete the current send */
1032 while (NULL != btl_endpoint->endpoint_send_frag) {
1033 mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
1034 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
1035
1036 if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
1037 break;
1038 }
1039 /* progress any pending sends */
1040 btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
1041 opal_list_remove_first(&btl_endpoint->endpoint_frags);
1042
1043 /* if required - update request status and release fragment */
1044 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1045 assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
1046 frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
1047 if( btl_ownership ) {
1048 MCA_BTL_TCP_FRAG_RETURN(frag);
1049 }
1050 /* if we fail to take the lock simply return. In the worst case the
1051 * send_handler will be triggered once more, and as there will be
1052 * nothing to send the handler will be deleted.
1053 */
1054 if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1055 return;
1056 }
1057
1058 /* if nothing else to do unregister for send event notifications */
1059 if(NULL == btl_endpoint->endpoint_send_frag) {
1060 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
1061 opal_event_del(&btl_endpoint->endpoint_send_event);
1062 }
1063 break;
1064 default:
1065 BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
1066 MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
1067 opal_event_del(&btl_endpoint->endpoint_send_event);
1068 break;
1069 }
1070 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1071 }
1072