1 /*
2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2016 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2007-2008 Sun Microsystems, Inc.  All rights reserved.
13  * Copyright (c) 2013      Cisco Systems, Inc.  All rights reserved.
14  * Copyright (c) 2014      Intel, Inc.  All rights reserved.
15  * Copyright (c) 2015      Research Organization for Information Science
16  *                         and Technology (RIST). All rights reserved.
17  * $COPYRIGHT$
18  *
19  * Additional copyrights may follow
20  *
21  * $HEADER$
22  *
23  */
24 
25 #include "opal_config.h"
26 
27 #include <stdlib.h>
28 #include <string.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #include "opal/opal_socket_errno.h"
33 #ifdef HAVE_SYS_TYPES_H
34 #include <sys/types.h>
35 #endif
36 #ifdef HAVE_FCNTL_H
37 #include <fcntl.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42 #ifdef HAVE_NETINET_TCP_H
43 #include <netinet/tcp.h>
44 #endif
45 #ifdef HAVE_ARPA_INET_H
46 #include <arpa/inet.h>
47 #endif
48 #ifdef HAVE_SYS_TIME_H
49 #include <sys/time.h>
50 #endif  /* HAVE_SYS_TIME_H */
51 #include <time.h>
52 
53 #include "opal/mca/event/event.h"
54 #include "opal/util/net.h"
55 #include "opal/util/show_help.h"
56 #include "opal/util/proc.h"
57 #include "opal/mca/btl/base/btl_base_error.h"
58 
59 #include "btl_tcp.h"
60 #include "btl_tcp_endpoint.h"
61 #include "btl_tcp_proc.h"
62 #include "btl_tcp_frag.h"
63 #include "btl_tcp_addr.h"
64 
65 /*
66  * Magic ID string send during connect/accept handshake
67  */
68 
69 const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH] = "OPAL-TCP-BTL";
70 
71 /*
72  * Initialize state of the endpoint instance.
73  *
74  */
mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t * endpoint)75 static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
76 {
77     endpoint->endpoint_btl = NULL;
78     endpoint->endpoint_proc = NULL;
79     endpoint->endpoint_addr = NULL;
80     endpoint->endpoint_sd = -1;
81     endpoint->endpoint_sd_next = -1;
82     endpoint->endpoint_send_frag = 0;
83     endpoint->endpoint_recv_frag = 0;
84     endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
85     endpoint->endpoint_retries = 0;
86     endpoint->endpoint_nbo = false;
87 #if MCA_BTL_TCP_ENDPOINT_CACHE
88     endpoint->endpoint_cache        = NULL;
89     endpoint->endpoint_cache_pos    = NULL;
90     endpoint->endpoint_cache_length = 0;
91 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
92     OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
93     OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
94     OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
95 }
96 
97 /*
98  * Destroy a endpoint
99  *
100  */
mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t * endpoint)101 static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
102 {
103     mca_btl_tcp_endpoint_close(endpoint);
104     mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
105     OBJ_DESTRUCT(&endpoint->endpoint_frags);
106     OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
107     OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
108 }
109 
110 OBJ_CLASS_INSTANCE(
111     mca_btl_tcp_endpoint_t,
112     opal_list_item_t,
113     mca_btl_tcp_endpoint_construct,
114     mca_btl_tcp_endpoint_destruct);
115 
116 
117 static void mca_btl_tcp_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
118 static void mca_btl_tcp_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
119 static int  mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t*);
120 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t*);
121 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user);
122 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
123 
124 /*
125  * diagnostics
126  */
127 
128 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
129 
130 #define DEBUG_LENGTH  1024
131 /**
132  * The lack of protection in the mca_btl_tcp_endpoint_dump function is voluntary
133  * so that it can be called regardless of the state of the mutexes. As a result,
134  * when multiple threads work on the same endpoint not only the information
135  * displayed might be inacurate, but when we manipulate the pending fragments we
136  * might access freed memory. Thus, the caller should lock the endpoint prior
137  * to the call.
138  */
139 void
mca_btl_tcp_endpoint_dump(int level,const char * fname,int lineno,const char * funcname,mca_btl_base_endpoint_t * btl_endpoint,bool full_info,const char * msg)140 mca_btl_tcp_endpoint_dump(int level,
141                           const char* fname,
142                           int lineno,
143                           const char* funcname,
144                           mca_btl_base_endpoint_t* btl_endpoint,
145                           bool full_info,
146                           const char* msg)
147 {
148     char outmsg[DEBUG_LENGTH];
149     int sndbuf, rcvbuf, nodelay, flags, used = 0;
150 #if OPAL_ENABLE_IPV6
151     struct sockaddr_storage inaddr;
152 #else
153     struct sockaddr_in inaddr;
154 #endif
155     opal_socklen_t obtlen;
156     opal_socklen_t addrlen = sizeof(inaddr);
157     mca_btl_tcp_frag_t* item;
158 
159     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s: ", msg);
160     if (used >= DEBUG_LENGTH) goto out;
161 
162     getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
163 #if OPAL_ENABLE_IPV6
164     {
165         char *address;
166         address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
167         if (NULL != address) {
168             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", address);
169             if (used >= DEBUG_LENGTH) goto out;
170         }
171     }
172 #else
173     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", inet_ntoa(inaddr.sin_addr));
174     if (used >= DEBUG_LENGTH) goto out;
175 #endif
176     getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
177 #if OPAL_ENABLE_IPV6
178     {
179         char *address;
180         address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
181         if (NULL != address) {
182             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", address);
183             if (used >= DEBUG_LENGTH) goto out;
184         }
185     }
186 #else
187     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", inet_ntoa(inaddr.sin_addr));
188     if (used >= DEBUG_LENGTH) goto out;
189 #endif
190 
191     used = snprintf(outmsg, DEBUG_LENGTH, "[%d", btl_endpoint->endpoint_sd);
192     if (used >= DEBUG_LENGTH) goto out;
193     switch(btl_endpoint->endpoint_state) {
194     case MCA_BTL_TCP_CONNECTING:
195         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connecting");
196         if (used >= DEBUG_LENGTH) goto out;
197         break;
198     case MCA_BTL_TCP_CONNECT_ACK:
199         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "ack");
200         if (used >= DEBUG_LENGTH) goto out;
201         break;
202     case MCA_BTL_TCP_CLOSED:
203         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "close");
204         if (used >= DEBUG_LENGTH) goto out;
205         break;
206     case MCA_BTL_TCP_FAILED:
207         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "failed");
208         if (used >= DEBUG_LENGTH) goto out;
209         break;
210     case MCA_BTL_TCP_CONNECTED:
211         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connected");
212         if (used >= DEBUG_LENGTH) goto out;
213         break;
214     default:
215         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "unknown");
216         if (used >= DEBUG_LENGTH) goto out;
217         break;
218     }
219 
220     if( full_info ) {
221         if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
222             BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
223                        strerror(opal_socket_errno), opal_socket_errno));
224         }
225 
226 #if defined(SO_SNDBUF)
227         obtlen = sizeof(sndbuf);
228         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
229             BTL_ERROR(("SO_SNDBUF option: %s (%d)",
230                        strerror(opal_socket_errno), opal_socket_errno));
231         }
232 #else
233         sndbuf = -1;
234 #endif
235 #if defined(SO_RCVBUF)
236         obtlen = sizeof(rcvbuf);
237         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
238             BTL_ERROR(("SO_RCVBUF option: %s (%d)",
239                        strerror(opal_socket_errno), opal_socket_errno));
240         }
241 #else
242         rcvbuf = -1;
243 #endif
244 #if defined(TCP_NODELAY)
245         obtlen = sizeof(nodelay);
246         if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
247             BTL_ERROR(("TCP_NODELAY option: %s (%d)",
248                        strerror(opal_socket_errno), opal_socket_errno));
249         }
250 #else
251         nodelay = 0;
252 #endif
253         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " nodelay %d sndbuf %d rcvbuf %d flags %08x",
254                          nodelay, sndbuf, rcvbuf, flags);
255         if (used >= DEBUG_LENGTH) goto out;
256 #if MCA_BTL_TCP_ENDPOINT_CACHE
257         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
258                          (void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
259                          btl_endpoint->endpoint_cache_length);
260         if (used >= DEBUG_LENGTH) goto out;
261 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
262         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "{%s - retries %d}",
263                          (btl_endpoint->endpoint_nbo ? "NBO" : ""), (int)btl_endpoint->endpoint_retries);
264         if (used >= DEBUG_LENGTH) goto out;
265     }
266     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n");
267     if (used >= DEBUG_LENGTH) goto out;
268 
269     if( NULL != btl_endpoint->endpoint_recv_frag )
270         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_recv_frag, "active recv",
271                                       &outmsg[used], DEBUG_LENGTH - used);
272     if (used >= DEBUG_LENGTH) goto out;
273 
274     if( NULL != btl_endpoint->endpoint_send_frag )
275         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_send_frag, "active send (inaccurate iov)",
276                                       &outmsg[used], DEBUG_LENGTH - used);
277     if (used >= DEBUG_LENGTH) goto out;
278     OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
279         used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], DEBUG_LENGTH - used);
280         if (used >= DEBUG_LENGTH) goto out;
281     }
282 out:
283     outmsg[ used >= DEBUG_LENGTH ? (DEBUG_LENGTH-1) : used ] = '\0';
284     opal_output_verbose(level, opal_btl_base_framework.framework_output,
285                         "[%s:%d:%s][%s -> %s] %s",
286                         fname, lineno, funcname,
287                         OPAL_NAME_PRINT(opal_proc_local_get()->proc_name),
288                         (NULL != btl_endpoint->endpoint_proc ? OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name) : "unknown remote"),
289                         outmsg);
290 }
291 #endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
292 
293 /*
294  * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
295  */
296 
mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t * btl_endpoint)297 static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
298 {
299 #if MCA_BTL_TCP_ENDPOINT_CACHE
300     assert(NULL == btl_endpoint->endpoint_cache);
301     btl_endpoint->endpoint_cache     = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
302     btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
303 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
304 
305     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
306                     btl_endpoint->endpoint_sd,
307                     OPAL_EV_READ | OPAL_EV_PERSIST,
308                     mca_btl_tcp_endpoint_recv_handler,
309                     btl_endpoint );
310     /**
311      * In the multi-threaded case, the send event must be persistent in order
312      * to avoid missing the connection notification in send_handler due to
313      * a local handling of the peer process (which holds the lock).
314      */
315     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
316                     btl_endpoint->endpoint_sd,
317                     OPAL_EV_WRITE | OPAL_EV_PERSIST,
318                     mca_btl_tcp_endpoint_send_handler,
319                     btl_endpoint);
320 }
321 
322 
323 /*
324  * Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
325  * queue the fragment and start the connection as required.
326  */
327 
mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t * btl_endpoint,mca_btl_tcp_frag_t * frag)328 int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp_frag_t* frag)
329 {
330     int rc = OPAL_SUCCESS;
331 
332     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
333     switch(btl_endpoint->endpoint_state) {
334     case MCA_BTL_TCP_CONNECTING:
335     case MCA_BTL_TCP_CONNECT_ACK:
336     case MCA_BTL_TCP_CLOSED:
337         opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
338         frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
339         if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
340             rc = mca_btl_tcp_endpoint_start_connect(btl_endpoint);
341         break;
342     case MCA_BTL_TCP_FAILED:
343         rc = OPAL_ERR_UNREACH;
344         break;
345     case MCA_BTL_TCP_CONNECTED:
346         if (NULL == btl_endpoint->endpoint_send_frag) {
347             if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
348                mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd)) {
349                 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
350 
351                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
352                 if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
353                     frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
354                 }
355                 if( btl_ownership ) {
356                     MCA_BTL_TCP_FRAG_RETURN(frag);
357                 }
358                 MCA_BTL_TCP_ENDPOINT_DUMP(50, btl_endpoint, true, "complete send fragment [endpoint_send]");
359                 return 1;
360             } else {
361                 btl_endpoint->endpoint_send_frag = frag;
362                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
363                 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
364                 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
365             }
366         } else {
367             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
368             frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
369             opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
370         }
371         break;
372     }
373     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
374     return rc;
375 }
376 
377 
378 /*
379  * A blocking send on a non-blocking socket. Used to send the small
380  * amount of connection information that identifies the endpoints endpoint.
381  */
382 static int
mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t * btl_endpoint,const void * data,size_t size)383 mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
384                                    const void* data, size_t size)
385 {
386     int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size);
387     if (ret < 0) {
388         mca_btl_tcp_endpoint_close(btl_endpoint);
389     }
390     return ret;
391 }
392 
393 /*
394  * Send the globally unique identifier for this process to a endpoint on
395  * a newly connected socket.
396  */
397 static int
mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t * btl_endpoint)398 mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
399 {
400     opal_process_name_t guid = opal_proc_local_get()->proc_name;
401     OPAL_PROCESS_NAME_HTON(guid);
402 
403     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
404     strcpy(hs_msg.magic_id, mca_btl_tcp_magic_id_string);
405     hs_msg.guid = guid;
406 
407     if(sizeof(hs_msg) !=
408        mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
409                                           &hs_msg, sizeof(hs_msg))) {
410          opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
411                        true, opal_process_info.nodename,
412                        sizeof(hs_msg),
413                        "connect ACK failed to send magic-id and guid");
414           return OPAL_ERR_UNREACH;
415     }
416     return OPAL_SUCCESS;
417 }
418 
mca_btl_tcp_endpoint_complete_accept(int fd,int flags,void * context)419 static void *mca_btl_tcp_endpoint_complete_accept(int fd, int flags, void *context)
420 {
421     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t*)context;
422     struct timeval now = {0, 0};
423     int cmpval;
424 
425     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) ) {
426         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
427         return NULL;
428     }
429     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) ) {
430         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
431         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
432         return NULL;
433     }
434 
435     if(NULL == btl_endpoint->endpoint_addr) {
436         CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
437         btl_endpoint->endpoint_sd_next = -1;
438         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
439         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
440         if( NULL != btl_endpoint->endpoint_btl->tcp_error_cb ) {
441             btl_endpoint->endpoint_btl->tcp_error_cb(
442                 &btl_endpoint->endpoint_btl->super, MCA_BTL_ERROR_FLAGS_NONFATAL,
443                 btl_endpoint->endpoint_proc->proc_opal,
444                 "The endpoint addr is set to NULL (unsettling)");
445         }
446         return NULL;
447     }
448 
449     cmpval = opal_compare_proc(btl_endpoint->endpoint_proc->proc_opal->proc_name,
450                                opal_proc_local_get()->proc_name);
451     if((btl_endpoint->endpoint_sd < 0) ||
452        (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
453         cmpval < 0)) {
454         mca_btl_tcp_endpoint_close(btl_endpoint);
455         btl_endpoint->endpoint_sd = btl_endpoint->endpoint_sd_next;
456         btl_endpoint->endpoint_sd_next = -1;
457         if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
458             MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, " [endpoint_accept]");
459             btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
460             mca_btl_tcp_endpoint_close(btl_endpoint);
461             goto unlock_and_return;
462         }
463         mca_btl_tcp_endpoint_event_init(btl_endpoint);
464         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [endpoint_accept]");
465         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
466         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
467             /* If no progress thread then raise the awarness of the default progress engine */
468             opal_progress_event_users_increment();
469         }
470         mca_btl_tcp_endpoint_connected(btl_endpoint);
471 
472         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "accepted");
473         goto unlock_and_return;
474     }
475     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
476     btl_endpoint->endpoint_sd_next = -1;
477   unlock_and_return:
478     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
479     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
480     return NULL;
481 }
482 
483 /*
484  * Check the state of this endpoint. If the incoming connection request matches
485  * our endpoints address, check the state of our connection:
486  * (1) if a connection has not been attempted, accept the connection
487  * (2) if a connection has not been established, and the endpoints process identifier
488  *     is less than the local process, accept the connection
489  * otherwise, reject the connection and continue with the current connection
490  */
491 
mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t * btl_endpoint,struct sockaddr * addr,int sd)492 void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
493                                  struct sockaddr* addr, int sd)
494 {
495     struct timeval now = {0, 0};
496 
497     assert(btl_endpoint->endpoint_sd_next == -1);
498     btl_endpoint->endpoint_sd_next = sd;
499 
500     opal_event_evtimer_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_accept_event,
501                            mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
502     opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
503 }
504 
505 
506 /*
507  * Remove any event registrations associated with the socket
508  * and update the endpoint state to reflect the connection has
509  * been closed.
510  */
mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t * btl_endpoint)511 void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
512 {
513     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
514     if(btl_endpoint->endpoint_sd < 0)
515         return;
516     btl_endpoint->endpoint_retries++;
517     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
518     opal_event_del(&btl_endpoint->endpoint_recv_event);
519     if( mca_btl_tcp_event_base == opal_sync_event_base ) {
520         /* If no progress thread then lower the awarness of the default progress engine */
521         opal_progress_event_users_decrement();
522     }
523     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
524     opal_event_del(&btl_endpoint->endpoint_send_event);
525 
526 #if MCA_BTL_TCP_ENDPOINT_CACHE
527     free( btl_endpoint->endpoint_cache );
528     btl_endpoint->endpoint_cache        = NULL;
529     btl_endpoint->endpoint_cache_pos    = NULL;
530     btl_endpoint->endpoint_cache_length = 0;
531 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
532 
533     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
534     btl_endpoint->endpoint_sd = -1;
535     /**
536      * If we keep failing to connect to the peer let the caller know about
537      * this situation by triggering all the pending fragments callback and
538      * reporting the error.
539      */
540     if( MCA_BTL_TCP_FAILED == btl_endpoint->endpoint_state ) {
541         mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
542         if( NULL == frag )
543             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
544         while(NULL != frag) {
545             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, OPAL_ERR_UNREACH);
546 
547             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
548         }
549     }
550     btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
551 }
552 
553 /*
554  *  Setup endpoint state to reflect that connection has been established,
555  *  and start any pending sends. This function should be called with the
556  *  send lock locked.
557  */
558 
mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t * btl_endpoint)559 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
560 {
561     /* setup socket options */
562     assert( MCA_BTL_TCP_CONNECTED != btl_endpoint->endpoint_state );
563     btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
564     btl_endpoint->endpoint_retries = 0;
565     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
566 
567     if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
568         if(NULL == btl_endpoint->endpoint_send_frag)
569             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
570                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
571         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_connected]");
572         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
573     }
574 }
575 
576 
577 /*
578  *  Receive the endpoints globally unique process identification from a newly
579  *  connected socket and verify the expected response. If so, move the
580  *  socket to a connected state.
581  *
582  *  NOTE: The return codes from this function are checked in
583  *  mca_btl_tcp_endpoint_recv_handler().  Don't change them here
584  *  without also changing the handling in _recv_handler()!
585  */
mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t * btl_endpoint)586 static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
587 {
588     size_t retval, len = strlen(mca_btl_tcp_magic_id_string);;
589     mca_btl_tcp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
590     opal_process_name_t guid;
591 
592     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
593     retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
594 
595     if (sizeof(hs_msg) != retval) {
596         mca_btl_tcp_endpoint_close(btl_endpoint);
597         if (0 == retval) {
598             /* If we get zero bytes, the peer closed the socket. This
599                can happen when the two peers started the connection
600                protocol simultaneously. Just report the problem
601                upstream. */
602             return OPAL_ERROR;
603         }
604         opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
605                        true, opal_process_info.nodename,
606                        getpid(), "did not receive entire connect ACK from peer");
607 
608         return OPAL_ERR_BAD_PARAM;
609     }
610     if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
611         opal_show_help("help-mpi-btl-tcp.txt", "server did not receive magic string",
612                        true, opal_process_info.nodename,
613                        getpid(), "client", hs_msg.magic_id,
614                        "string value");
615         return OPAL_ERR_BAD_PARAM;
616     }
617 
618     guid = hs_msg.guid;
619     OPAL_PROCESS_NAME_NTOH(guid);
620     /* compare this to the expected values */
621     /* TODO: this deserve a little bit more thinking as we are not supposed
622      * to be able to exchange the opal_process_name_t over the network.
623      */
624     if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
625         BTL_ERROR(("received unexpected process identifier %s",
626                    OPAL_NAME_PRINT(guid)));
627         mca_btl_tcp_endpoint_close(btl_endpoint);
628         return OPAL_ERR_UNREACH;
629     }
630 
631     return OPAL_SUCCESS;
632 }
633 
634 
mca_btl_tcp_set_socket_options(int sd)635 void mca_btl_tcp_set_socket_options(int sd)
636 {
637 #if defined(TCP_NODELAY)
638     int optval;
639     optval = !mca_btl_tcp_component.tcp_not_use_nodelay;
640     if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
641         BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
642                    strerror(opal_socket_errno), opal_socket_errno));
643     }
644 #endif
645 #if defined(SO_SNDBUF)
646     if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
647        setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
648         BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
649                    strerror(opal_socket_errno), opal_socket_errno));
650     }
651 #endif
652 #if defined(SO_RCVBUF)
653     if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
654        setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
655         BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
656                    strerror(opal_socket_errno), opal_socket_errno));
657     }
658 #endif
659 }
660 
661 
662 
663 /*
664  *  Start a connection to the endpoint. This will likely not complete,
665  *  as the socket is set to non-blocking, so register for event
666  *  notification of connect completion. On connection we send our
667  *  globally unique process identifier to the endpoint and wait for
668  *  the endpoint response.
669  */
mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t * btl_endpoint)670 static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
671 {
672     int rc,flags;
673     struct sockaddr_storage endpoint_addr;
674     /* By default consider a IPv4 connection */
675     uint16_t af_family = AF_INET;
676     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
677 
678 #if OPAL_ENABLE_IPV6
679     if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
680         af_family = AF_INET6;
681         addrlen = sizeof (struct sockaddr_in6);
682     }
683 #endif
684     assert( btl_endpoint->endpoint_sd < 0 );
685     btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
686     if (btl_endpoint->endpoint_sd < 0) {
687         btl_endpoint->endpoint_retries++;
688         return OPAL_ERR_UNREACH;
689     }
690 
691     /* setup socket buffer sizes */
692     mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
693 
694     /* setup event callbacks */
695     mca_btl_tcp_endpoint_event_init(btl_endpoint);
696 
697     /* setup the socket as non-blocking */
698     if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
699         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
700                        true, opal_process_info.nodename,
701                        getpid(), "fcntl(sd, F_GETFL, 0)",
702                        strerror(opal_socket_errno), opal_socket_errno);
703         /* Upper layer will handler the error */
704         return OPAL_ERR_UNREACH;
705     } else {
706         flags |= O_NONBLOCK;
707         if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) {
708             opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
709                            true, opal_process_info.nodename,
710                            getpid(),
711                            "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
712                            strerror(opal_socket_errno), opal_socket_errno);
713             /* Upper layer will handler the error */
714             return OPAL_ERR_UNREACH;
715         }
716     }
717 
718     /* start the connect - will likely fail with EINPROGRESS */
719     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
720 
721     /* Bind the socket to one of the addresses associated with
722      * this btl module.  This sets the source IP to one of the
723      * addresses shared in modex, so that the destination rank
724      * can properly pair btl modules, even in cases where Linux
725      * might do something unexpected with routing */
726     if (endpoint_addr.ss_family == AF_INET) {
727         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
728         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
729                  sizeof(struct sockaddr_in)) < 0) {
730             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
731                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
732                        htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
733                        strerror(opal_socket_errno), opal_socket_errno));
734 
735             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
736             return OPAL_ERROR;
737         }
738     }
739 #if OPAL_ENABLE_IPV6
740     if (endpoint_addr.ss_family == AF_INET6) {
741         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr_6);
742         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr_6,
743                  sizeof(struct sockaddr_in6)) < 0) {
744             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
745                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
746                        htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
747                        strerror(opal_socket_errno), opal_socket_errno));
748 
749             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
750             return OPAL_ERROR;
751         }
752     }
753 #endif
754     opal_output_verbose(10, opal_btl_base_framework.framework_output,
755                         "btl: tcp: attempting to connect() to %s address %s on port %d",
756                         OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
757                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
758                         ntohs(btl_endpoint->endpoint_addr->addr_port));
759 
760     if(0 == connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen)) {
761         opal_output_verbose(10, opal_btl_base_framework.framework_output,
762                             "btl:tcp: connect() to %s:%d completed",
763                             opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
764                             ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
765         /* send our globally unique process identifier to the endpoint */
766         if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
767             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
768             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [start_connect]");
769             opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
770             if( mca_btl_tcp_event_base == opal_sync_event_base ) {
771                 /* If no progress thread then raise the awarness of the default progress engine */
772                 opal_progress_event_users_increment();
773             }
774             return OPAL_SUCCESS;
775         }
776         /* We connected to the peer, but he close the socket before we got a chance to send our guid */
777         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "dropped connection [start_connect]");
778     } else {
779         /* non-blocking so wait for completion */
780         if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
781             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
782             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
783             MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
784             opal_output_verbose(30, opal_btl_base_framework.framework_output,
785                                 "btl:tcp: would block, so allowing background progress");
786             return OPAL_SUCCESS;
787         }
788     }
789 
790     {
791         char *address;
792         address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
793         BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
794                       ( "Unable to connect to the peer %s on port %d: %s\n",
795                         address,
796                         ntohs(btl_endpoint->endpoint_addr->addr_port), strerror(opal_socket_errno) ) );
797     }
798     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
799     mca_btl_tcp_endpoint_close(btl_endpoint);
800     return OPAL_ERR_UNREACH;
801 }
802 
803 
804 /*
805  * Check the status of the connection. If the connection failed, will retry
806  * later. Otherwise, send this processes identifier to the endpoint on the
807  * newly connected socket.
808  */
mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t * btl_endpoint)809 static int mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
810 {
811     int so_error = 0;
812     opal_socklen_t so_length = sizeof(so_error);
813     struct sockaddr_storage endpoint_addr;
814 
815     /* Delete the send event notification, as the next step is waiting for the ack
816      * from the peer. Once this ack is received we will deal with the send notification
817      * accordingly.
818      */
819     opal_event_del(&btl_endpoint->endpoint_send_event);
820 
821     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
822 
823     /* check connect completion status */
824     if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
825         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
826                        true, opal_process_info.nodename,
827                        getpid(), "fcntl(sd, F_GETFL, 0)",
828                        strerror(opal_socket_errno), opal_socket_errno);
829         BTL_ERROR(("getsockopt() to %s:%d failed: %s (%d)",
830                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
831                    ((struct sockaddr_in*) &endpoint_addr)->sin_port,
832                    strerror(opal_socket_errno), opal_socket_errno));
833         mca_btl_tcp_endpoint_close(btl_endpoint);
834         return OPAL_ERROR;
835     }
836     if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
837         return OPAL_SUCCESS;
838     }
839     if(so_error != 0) {
840         char *msg;
841         asprintf(&msg, "connect() to %s:%d failed",
842                  opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
843                  ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
844         opal_show_help("help-mpi-btl-tcp.txt", "client connect fail",
845                        true, opal_process_info.nodename,
846                        getpid(), msg,
847                        strerror(opal_socket_errno), opal_socket_errno);
848         free(msg);
849         mca_btl_tcp_endpoint_close(btl_endpoint);
850         return OPAL_ERROR;
851     }
852 
853     opal_output_verbose(10, opal_btl_base_framework.framework_output,
854                         "btl:tcp: connect() to %s:%d completed (complete_connect), sending connect ACK",
855                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
856                         ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
857 
858     if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
859         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
860         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
861         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
862             /* If no progress thread then raise the awarness of the default progress engine */
863             opal_progress_event_users_increment();
864         }
865         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_add(recv) [complete_connect]");
866         return OPAL_SUCCESS;
867     }
868     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, " [complete_connect]");
869     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
870     mca_btl_tcp_endpoint_close(btl_endpoint);
871     return OPAL_ERROR;
872 }
873 
874 
875 /*
876  * A file descriptor is available/ready for recv. Check the state
877  * of the socket and take the appropriate action.
878  */
879 
mca_btl_tcp_endpoint_recv_handler(int sd,short flags,void * user)880 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
881 {
882     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
883 
884     /* Make sure we don't have a race between a thread that remove the
885      * recv event, and one event already scheduled.
886      */
887     if( sd != btl_endpoint->endpoint_sd )
888         return;
889 
890     /**
891      * There is an extremely rare race condition here, that can only be
892      * triggered during the initialization. If the two processes start their
893      * connection in same time, one of the processes will have to close it's
894      * previous endpoint (the one opened from the local send). As a result it
895      * might go in btl_endpoint_close and try to delete the recv_event. This
896      * call will go back in the libevent, and in a multithreaded case will try
897      * to lock the event. If another thread noticed the active event (and this
898      * is possible as during the initialization there will be 2 sockets), one
899      * thread might get stuck trying to lock the endpoint_recv_lock (while
900      * holding the event_base lock) while the other thread will try to lock the
901      * event_base lock (while holding the endpoint_recv lock).
902      *
903      * If we can't lock this mutex, it is OK to cancel the receive operation, it
904      * will be eventually triggered again shorthly.
905      */
906     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) )
907         return;
908 
909     switch(btl_endpoint->endpoint_state) {
910     case MCA_BTL_TCP_CONNECT_ACK:
911         {
912             int rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint);
913             if( OPAL_SUCCESS == rc ) {
914                 /* we are now connected. Start sending the data */
915                 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
916                 mca_btl_tcp_endpoint_connected(btl_endpoint);
917                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
918                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "connected");
919             }
920             else if (OPAL_ERR_BAD_PARAM == rc) {
921                 /* If we get a BAD_PARAM, it means that it probably wasn't
922                    an OMPI process on the other end of the socket (e.g.,
923                    the magic string ID failed).  So we can probably just
924                    close the socket and ignore this connection. */
925                 CLOSE_THE_SOCKET(sd);
926             }
927             else {
928                 /* Otherwise, it probably *was* an OMPI peer process on
929                    the other end, and something bad has probably
930                    happened.  */
931                 mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
932 
933                 /* Fail up to the PML */
934                 if (NULL != m->tcp_error_cb) {
935                     m->tcp_error_cb((mca_btl_base_module_t*) m, MCA_BTL_ERROR_FLAGS_FATAL,
936                         btl_endpoint->endpoint_proc->proc_opal,
937                         "TCP ACK is neither SUCCESS nor ERR (something bad has probably happened)");
938                 }
939             }
940             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
941             return;
942         }
943     case MCA_BTL_TCP_CONNECTED:
944         {
945             mca_btl_tcp_frag_t* frag;
946 
947             frag = btl_endpoint->endpoint_recv_frag;
948             if(NULL == frag) {
949                 if(mca_btl_tcp_module.super.btl_max_send_size >
950                    mca_btl_tcp_module.super.btl_eager_limit) {
951                     MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
952                 } else {
953                     MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
954                 }
955 
956                 if(NULL == frag) {
957                     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
958                     return;
959                 }
960                 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
961             }
962 
963 #if MCA_BTL_TCP_ENDPOINT_CACHE
964             assert( 0 == btl_endpoint->endpoint_cache_length );
965         data_still_pending_on_endpoint:
966 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
967             /* check for completion of non-blocking recv on the current fragment */
968             if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
969                 btl_endpoint->endpoint_recv_frag = frag;
970             } else {
971                 btl_endpoint->endpoint_recv_frag = NULL;
972                 if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
973                     mca_btl_active_message_callback_t* reg;
974                     reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
975                     reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
976                 }
977 #if MCA_BTL_TCP_ENDPOINT_CACHE
978                 if( 0 != btl_endpoint->endpoint_cache_length ) {
979                     /* If the cache still contain some data we can reuse the same fragment
980                      * until we flush it completly.
981                      */
982                     MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
983                     goto data_still_pending_on_endpoint;
984                 }
985 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
986                 MCA_BTL_TCP_FRAG_RETURN(frag);
987             }
988 #if MCA_BTL_TCP_ENDPOINT_CACHE
989             assert( 0 == btl_endpoint->endpoint_cache_length );
990 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
991             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
992             break;
993         }
994     case MCA_BTL_TCP_CLOSED:
995         /* This is a thread-safety issue. As multiple threads are allowed
996          * to generate events (in the lib event) we endup with several
997          * threads executing the receive callback, when we reach the end
998          * of the MPI_Finalize. The first one will close the connections,
999          * and all others will complain.
1000          */
1001         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1002         break;
1003     default:
1004         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1005         BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
1006         btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
1007         mca_btl_tcp_endpoint_close(btl_endpoint);
1008         break;
1009     }
1010 }
1011 
1012 
1013 /*
1014  * A file descriptor is available/ready for send. Check the state
1015  * of the socket and take the appropriate action.
1016  */
1017 
mca_btl_tcp_endpoint_send_handler(int sd,short flags,void * user)1018 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
1019 {
1020     mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
1021 
1022     /* if another thread is already here, give up */
1023     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1024         return;
1025 
1026     switch(btl_endpoint->endpoint_state) {
1027     case MCA_BTL_TCP_CONNECTING:
1028         mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
1029         break;
1030     case MCA_BTL_TCP_CONNECTED:
1031         /* complete the current send */
1032         while (NULL != btl_endpoint->endpoint_send_frag) {
1033             mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
1034             int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
1035 
1036             if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
1037                 break;
1038             }
1039             /* progress any pending sends */
1040             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
1041                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
1042 
1043             /* if required - update request status and release fragment */
1044             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1045             assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
1046             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
1047             if( btl_ownership ) {
1048                 MCA_BTL_TCP_FRAG_RETURN(frag);
1049             }
1050             /* if we fail to take the lock simply return. In the worst case the
1051              * send_handler will be triggered once more, and as there will be
1052              * nothing to send the handler will be deleted.
1053              */
1054             if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1055                 return;
1056         }
1057 
1058         /* if nothing else to do unregister for send event notifications */
1059         if(NULL == btl_endpoint->endpoint_send_frag) {
1060             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
1061             opal_event_del(&btl_endpoint->endpoint_send_event);
1062         }
1063         break;
1064     default:
1065         BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
1066         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
1067         opal_event_del(&btl_endpoint->endpoint_send_event);
1068         break;
1069     }
1070     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1071 }
1072