1 /*
2  * Copyright (c) 2014-2016 Cisco Systems, Inc.  All rights reserved.
3  * Copyright (c) 2015      Research Organization for Information Science
4  *                         and Technology (RIST). All rights reserved.
5  * $COPYRIGHT$
6  *
7  * Additional copyrights may follow
8  *
9  * $HEADER$
10  */
11 
12 #include "opal_config.h"
13 
14 #include <assert.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <sys/un.h>
18 #include <unistd.h>
19 #ifdef HAVE_ALLOCA_H
20 #include <alloca.h>
21 #endif
22 
23 #include "opal_stdint.h"
24 #include "opal/threads/mutex.h"
25 #include "opal/mca/event/event.h"
26 #include "opal/util/show_help.h"
27 #include "opal/types.h"
28 #include "opal/util/output.h"
29 #include "opal/util/fd.h"
30 
31 #include "btl_usnic.h"
32 #include "btl_usnic_connectivity.h"
33 
34 /**************************************************************************
35  * Agent data and methods
36  **************************************************************************/
37 
38 /*
39  * Local variables
40  */
41 static int ipc_accept_fd = -1;
42 static char *ipc_filename = NULL;
43 static opal_event_t ipc_event;
44 static struct timeval ack_timeout;
45 static opal_list_t udp_port_listeners;
46 static opal_list_t ipc_listeners;
47 static volatile int ipc_accepts = 0;
48 /* JMS The pings_pending and ping_results should probably both be hash
49    tables for more efficient lookups */
50 static opal_list_t pings_pending;
51 static opal_list_t ping_results;
52 static volatile bool agent_initialized = false;
53 
54 
55 /*
56  * Holds all the information about a UDP port that the agent thread is
57  * listening on (for incoming PINGs and ACKs).
58  */
59 typedef struct {
60     opal_list_item_t super;
61 
62     /* Data from the LISTEN command message */
63     uint32_t ipv4_addr;
64     uint32_t netmask;
65     char ipv4_addr_str[IPV4STRADDRLEN];
66     uint32_t max_msg_size;
67     char *nodename;
68     char *usnic_name;
69 
70     /* File descriptor, UDP port, buffer to receive messages, and event */
71     int fd;
72     uint32_t udp_port;
73     uint8_t *buffer;
74     opal_event_t event;
75     bool active;
76     opal_btl_usnic_module_t *module;
77 } agent_udp_port_listener_t;
78 
79 OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
80 
81 /*
82  * Holds information for a local IPC socket fd (i.e., a connection
83  * from a local process to this agent).
84  */
85 typedef struct {
86     opal_list_item_t super;
87 
88     int client_fd;
89     opal_event_t event;
90     bool active;
91 } agent_ipc_listener_t;
92 
93 OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
94 
95 typedef enum {
96     AGENT_MSG_TYPE_PING = 17,
97     AGENT_MSG_TYPE_ACK
98 } agent_udp_message_type_t;
99 
100 // Arbitrary 64 bit numbers
101 #define MAGIC_ORIGINATOR 0x9a9e2fbce63a11e5
102 #define MAGIC_TARGET 0x60735c68f368aace
103 
104 /*
105  * Ping and ACK messages
106  */
107 typedef struct {
108     uint8_t message_type;
109 
110     /* The sender's IP address and port (i.e., where the ACK can be
111        sent).  This is actually redundant with the sockaddr that we
112        get from recvfrom(), but that's ok -- it provides a sanity
113        check.  */
114     uint32_t src_ipv4_addr;
115     uint32_t src_udp_port;
116 
117     /* A magic number that helps determine that the sender was Open
118        MPI */
119     uint64_t magic_number;
120     uint32_t major_version, minor_version;
121 
122     /* If this is a PING, the message should be this size.
123        If this is an ACK, we are ACKing a ping of this size. */
124     uint32_t size;
125 } agent_udp_message_t;
126 
127 typedef struct {
128     opal_list_item_t super;
129 
130     /* Data from the PING command message */
131     uint32_t src_ipv4_addr; /* in network byte order */
132     uint32_t src_udp_port;
133     agent_udp_port_listener_t *listener;
134     uint32_t dest_ipv4_addr; /* in network byte order */
135     uint32_t dest_netmask;
136     uint32_t dest_udp_port;
137     struct sockaddr_in dest_sockaddr;
138     char *dest_nodename;
139 
140     /* The sizes and corresponding buffers of the PING messages that
141        we'll send, and whether each of those PING messages have been
142        ACKed yet */
143 #define NUM_PING_SIZES 2
144     size_t sizes[NUM_PING_SIZES];
145     uint8_t *buffers[NUM_PING_SIZES];
146     bool acked[NUM_PING_SIZES];
147 
148     /* Number of times we've sent this ping */
149     int num_sends;
150 
151     /* Timer used to re-send the PING, and whether the timer is active
152        or not */
153     opal_event_t timer;
154     bool timer_active;
155 } agent_ping_t;
156 
157 OBJ_CLASS_DECLARATION(agent_ping_t);
158 
159 
160 /**************************************************************************
161  * Utility functions, constructors, destructors
162  **************************************************************************/
163 
udp_port_listener_zero(agent_udp_port_listener_t * obj)164 static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
165 {
166     obj->ipv4_addr =
167         obj->netmask =
168         obj->max_msg_size = 0;
169     obj->nodename =
170         obj->usnic_name = NULL;
171     memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
172 
173     obj->fd = -1;
174     obj->udp_port = -1;
175     obj->buffer = NULL;
176 
177     obj->active = false;
178 }
179 
udp_port_listener_constructor(agent_udp_port_listener_t * obj)180 static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
181 {
182     udp_port_listener_zero(obj);
183 }
184 
udp_port_listener_destructor(agent_udp_port_listener_t * obj)185 static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
186 {
187     /* Find any pings that are pending on this listener and delete
188        them */
189     agent_ping_t *ap, *apnext;
190     OPAL_LIST_FOREACH_SAFE(ap, apnext, &pings_pending, agent_ping_t) {
191         if (ap->src_ipv4_addr == obj->ipv4_addr) {
192             opal_list_remove_item(&pings_pending, &ap->super);
193             OBJ_RELEASE(ap);
194         }
195     }
196 
197     if (-1 != obj->fd) {
198         close(obj->fd);
199     }
200     if (NULL != obj->nodename) {
201         free(obj->nodename);
202     }
203     if (NULL != obj->usnic_name) {
204         free(obj->usnic_name);
205     }
206     if (NULL != obj->buffer) {
207         free(obj->buffer);
208     }
209 
210     /* If the "active" flag is set, then the event is active and the
211        item is on the udp_port_listeners list */
212     if (obj->active) {
213         opal_event_del(&obj->event);
214         opal_list_remove_item(&udp_port_listeners, &obj->super);
215     }
216 
217     udp_port_listener_zero(obj);
218 }
219 
220 OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
221                    opal_list_item_t,
222                    udp_port_listener_constructor,
223                    udp_port_listener_destructor);
224 
ipc_listener_zero(agent_ipc_listener_t * obj)225 static void ipc_listener_zero(agent_ipc_listener_t *obj)
226 {
227     obj->client_fd = -1;
228     obj->active = false;
229 }
230 
ipc_listener_constructor(agent_ipc_listener_t * obj)231 static void ipc_listener_constructor(agent_ipc_listener_t *obj)
232 {
233     ipc_listener_zero(obj);
234 }
235 
ipc_listener_destructor(agent_ipc_listener_t * obj)236 static void ipc_listener_destructor(agent_ipc_listener_t *obj)
237 {
238     if (-1 != obj->client_fd) {
239         close(obj->client_fd);
240     }
241 
242     /* If the "active" flag is set, then the event is active and the
243        item is on the ipc_listeners list */
244     if (obj->active) {
245         opal_event_del(&obj->event);
246         opal_list_remove_item(&ipc_listeners, &obj->super);
247     }
248 
249     ipc_listener_zero(obj);
250 }
251 
252 OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
253                    opal_list_item_t,
254                    ipc_listener_constructor,
255                    ipc_listener_destructor);
256 
agent_ping_result_zero(agent_ping_t * obj)257 static void agent_ping_result_zero(agent_ping_t *obj)
258 {
259     obj->src_ipv4_addr = 0;
260     obj->src_udp_port = 0;
261     obj->listener = NULL;
262     obj->dest_ipv4_addr = 0;
263     obj->dest_udp_port = 0;
264     obj->num_sends = 0;
265     obj->timer_active = false;
266 
267     for (int i = 0; i < NUM_PING_SIZES; ++i) {
268         obj->sizes[i] = 0;
269         obj->buffers[i] = NULL;
270         obj->acked[i] = false;
271     }
272 }
273 
agent_ping_result_constructor(agent_ping_t * obj)274 static void agent_ping_result_constructor(agent_ping_t *obj)
275 {
276     agent_ping_result_zero(obj);
277 }
278 
agent_ping_result_destructor(agent_ping_t * obj)279 static void agent_ping_result_destructor(agent_ping_t *obj)
280 {
281     for (int i = 0; i < NUM_PING_SIZES; ++i) {
282         if (NULL != obj->buffers[i]) {
283             free(obj->buffers[i]);
284         }
285     }
286     if (obj->timer_active) {
287         opal_event_del(&obj->timer);
288     }
289 
290     agent_ping_result_zero(obj);
291 }
292 
293 OBJ_CLASS_INSTANCE(agent_ping_t,
294                    opal_list_item_t,
295                    agent_ping_result_constructor,
296                    agent_ping_result_destructor);
297 
298 /*
299  * Wrapper around sendto() loop
300  */
agent_sendto(int fd,char * buffer,ssize_t numbytes,struct sockaddr * addr)301 static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
302                          struct sockaddr *addr)
303 {
304     ssize_t rc;
305     while (1) {
306         rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
307         /* Note that since this is UDP, so we don't need to check
308            for 0 < rc < numbytes */
309         if (rc == numbytes) {
310             return;
311         } else if (rc < 0) {
312             if (errno == EAGAIN || errno == EINTR) {
313                 continue;
314             } else if (errno == EPERM) {
315                 // We're sending too fast
316                 usleep(5);
317                 continue;
318             }
319 
320             char *msg;
321             asprintf(&msg, "Unexpected sendto() error: errno=%d (%s)",
322                      errno, strerror(errno));
323             ABORT(msg);
324             /* Will not return */
325         }
326 
327         /* We should never get here, but just in case we do, sleep a
328            little, just so we don't hammer the CPU */
329         usleep(1);
330     }
331 
332     /* Will not get here */
333 }
334 
335 /**************************************************************************
336  * All of the following functions run in agent thread
337  **************************************************************************/
338 
339 /*
340  * Handle an incoming PING message (send an ACK)
341  */
agent_thread_handle_ping(agent_udp_port_listener_t * listener,ssize_t numbytes,struct sockaddr * from)342 static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
343                                      ssize_t numbytes, struct sockaddr *from)
344 {
345     /* If the size we received isn't equal to what the sender says it
346        sent, do the simple thing: just don't send an ACK */
347     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
348     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
349     if (msg->size != numbytes) {
350         char str[INET_ADDRSTRLEN];
351         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
352 
353         opal_output_verbose(20, USNIC_OUT,
354                             "usNIC connectivity got bad ping: %d bytes from %s, expected %d (discarded)",
355                             (int) numbytes, str, (int) msg->size);
356         return;
357     }
358 
359     /* Ensure that the sender sent the ping from the IP address that
360        they think they sent it from.  If they didn't, then drop it
361        (i.e., it's a bad ping because the sender sent it from an
362        unexpected interface).  This should probably never happen, but
363        it's a good failsafe for unexpected scenarios. */
364     char msg_ipv4_addr_str[IPV4STRADDRLEN];
365     char real_ipv4_addr_str[IPV4STRADDRLEN];
366 
367     opal_btl_usnic_snprintf_ipv4_addr(msg_ipv4_addr_str,
368                                       sizeof(msg_ipv4_addr_str),
369                                       msg->src_ipv4_addr, 0);
370     opal_btl_usnic_snprintf_ipv4_addr(real_ipv4_addr_str,
371                                       sizeof(real_ipv4_addr_str),
372                                       src_addr_in->sin_addr.s_addr, 0);
373 
374     if (msg->src_ipv4_addr != src_addr_in->sin_addr.s_addr) {
375         opal_output_verbose(20, USNIC_OUT,
376                             "usNIC connectivity got bad ping (from unexpected address: %s != %s, discarded)",
377                             msg_ipv4_addr_str, real_ipv4_addr_str);
378         return;
379     }
380 
381     if (msg->magic_number != MAGIC_ORIGINATOR) {
382         opal_output_verbose(20, USNIC_OUT,
383                             "usNIC connectivity got bad ping (magic number: %" PRIu64 ", discarded)",
384                             msg->magic_number);
385         return;
386     }
387     if (msg->major_version != OPAL_MAJOR_VERSION ||
388         msg->minor_version != OPAL_MINOR_VERSION) {
389         opal_output_verbose(20, USNIC_OUT,
390                             "usNIC connectivity got bad ping (originator version: %d.%d, expected %d.%d, discarded)",
391                             msg->major_version, msg->minor_version,
392                             OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION);
393         return;
394     }
395 
396     /* Ok, this is a good ping.  Send the ACK back.  The PING sender
397        will verify that the ACK came back from the IP address that it
398        expected. */
399 
400     opal_output_verbose(20, USNIC_OUT,
401                         "usNIC connectivity got PING (size=%ld) from %s; sending ACK",
402                         numbytes, msg_ipv4_addr_str);
403 
404     /* Send back an ACK.  No need to allocate a new buffer; just
405        re-use the same buffer we just got.  Note that msg->size is
406        already set.  We simply echo back the sender's IP address/port
407        in the msg (the sender will use the msg fields and the
408        recvfrom() src_addr to check for a match). */
409     msg->message_type = AGENT_MSG_TYPE_ACK;
410     msg->magic_number = MAGIC_TARGET;
411 
412     agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
413 }
414 
415 /*
416  * Handle an incoming ACK message
417  */
agent_thread_handle_ack(agent_udp_port_listener_t * listener,ssize_t numbytes,struct sockaddr * from)418 static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
419                                     ssize_t numbytes, struct sockaddr *from)
420 {
421     char str[INET_ADDRSTRLEN];
422     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
423     inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
424 
425     /* If we got a wonky ACK message that is the wrong length, just
426        return */
427     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
428     if (numbytes != sizeof(*msg)) {
429         opal_output_verbose(20, USNIC_OUT,
430                             "usNIC connectivity got bad ACK: %d bytes from %s, expected %d (discarded)",
431                             (int) numbytes, str, (int) sizeof(*msg));
432         return;
433     }
434     if (msg->magic_number != MAGIC_TARGET) {
435         opal_output_verbose(20, USNIC_OUT,
436                             "usNIC connectivity got bad ACK (magic number: %" PRIu64 ", discarded)",
437                             msg->magic_number);
438         return;
439     }
440 
441     /* Find the pending ping request (on this interface) for this ACK.
442        If we don't find a match, we'll drop it. */
443     agent_ping_t *ap;
444     uint32_t src_in_port = ntohs(src_addr_in->sin_port);
445     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
446         if (ap->dest_ipv4_addr == src_addr_in->sin_addr.s_addr &&
447             ap->dest_udp_port == src_in_port &&
448             ap->src_ipv4_addr == msg->src_ipv4_addr &&
449             ap->src_udp_port == msg->src_udp_port) {
450             /* Found it -- indicate that it has been acked */
451             for (int i = 0; i < NUM_PING_SIZES; ++i) {
452                 if (ap->sizes[i] == msg->size) {
453                     ap->acked[i] = true;
454                     return;
455                 }
456             }
457         }
458     }
459 
460     /* If we didn't find the matching ping for this ACK, then just
461        discard it */
462     opal_output_verbose(20, USNIC_OUT,
463                         "usNIC connectivity got unexpected ACK: %d bytes from %s (discarded)",
464                         (int) numbytes, str);
465 }
466 
467 /*
468  * Receive a message from the listening UDP socket
469  */
agent_thread_receive_ping(int fd,short flags,void * context)470 static void agent_thread_receive_ping(int fd, short flags, void *context)
471 {
472     agent_udp_port_listener_t *listener =
473         (agent_udp_port_listener_t *) context;
474     assert(NULL != listener);
475 
476     /* Receive the message */
477     ssize_t numbytes;
478     struct sockaddr src_addr;
479     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) &src_addr;
480     socklen_t addrlen = sizeof(src_addr);
481 
482     while (1) {
483         numbytes = recvfrom(listener->fd, listener->buffer, listener->max_msg_size, 0,
484                             &src_addr, &addrlen);
485         if (numbytes > 0) {
486             break;
487         } else if (numbytes < 0) {
488             if (errno == EAGAIN || errno == EINTR) {
489                 continue;
490             }
491 
492             ABORT("Unexpected error from recvfrom");
493             /* Will not return */
494         }
495     }
496 
497     char str[INET_ADDRSTRLEN];
498     agent_udp_message_t *msg;
499     msg = (agent_udp_message_t *) listener->buffer;
500     switch (msg->message_type) {
501     case AGENT_MSG_TYPE_PING:
502         agent_thread_handle_ping(listener, numbytes, &src_addr);
503         break;
504     case AGENT_MSG_TYPE_ACK:
505         agent_thread_handle_ack(listener, numbytes, &src_addr);
506         break;
507     default:
508         /* Ignore unknown pings */
509         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
510         opal_output_verbose(20, USNIC_OUT,
511                             "usNIC connectivity agent received unknown message: %d bytes from %s",
512                             (int) numbytes, str);
513         break;
514     }
515 }
516 
517 static agent_udp_port_listener_t *
agent_thread_find_listener(uint32_t ipv4_addr,uint32_t * udp_port)518 agent_thread_find_listener(uint32_t ipv4_addr, uint32_t *udp_port)
519 {
520     agent_udp_port_listener_t *listener;
521     OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
522         if (listener->ipv4_addr == ipv4_addr) {
523             *udp_port = listener->udp_port;
524             return listener;
525         }
526     }
527 
528     return NULL;
529 }
530 
531 /*
532  * Send reply back from the LISTEN command: send back the IP address
533  * and UDP port that we're listening on.
534  */
agent_thread_cmd_listen_reply(int fd,uint32_t addr,int32_t udp_port)535 static int agent_thread_cmd_listen_reply(int fd,
536                                          uint32_t addr, int32_t udp_port)
537 {
538     int ret;
539 
540     opal_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
541         .cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
542         .ipv4_addr = addr,
543         .udp_port = udp_port
544     };
545 
546     ret = opal_fd_write(fd, sizeof(cmd), &cmd);
547     if (OPAL_SUCCESS != ret) {
548         OPAL_ERROR_LOG(ret);
549         ABORT("usnic connectivity agent IPC write failed");
550         /* Will not return */
551     }
552 
553     return OPAL_SUCCESS;
554 }
555 
556 /*
557  * Receive and process the rest of a LISTEN command from a local IPC
558  * client.
559  */
agent_thread_cmd_listen(agent_ipc_listener_t * ipc_listener)560 static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
561 {
562     /* Read the rest of the LISTEN command from the IPC socket */
563     int ret;
564     opal_btl_usnic_connectivity_cmd_listen_t cmd;
565     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
566     if (OPAL_SUCCESS != ret) {
567         OPAL_ERROR_LOG(ret);
568         ABORT("usnic connectivity agent IPC LISTEN read failed");
569         /* Will not return */
570     }
571 
572     /* If we're already listening on this address, send the UDP port
573        back to the client. */
574     uint32_t udp_port;
575     agent_udp_port_listener_t *udp_listener;
576     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
577     if (NULL != udp_listener) {
578         /* If we get a non-NULL "module" pointer value from the
579            client, it means that this client is the same process as
580            this agent, and we should save this pointer value (all
581            non-agent MPI procs will send NULL as their "module"
582            pointer value -- i.e., some non-agent MPI proc was the
583            first one to send the LISTEN command). */
584         if (NULL == udp_listener->module) {
585             udp_listener->module = cmd.module;
586         }
587         agent_thread_cmd_listen_reply(ipc_listener->client_fd,
588                                       cmd.ipv4_addr, udp_port);
589         return;
590     }
591 
592     /* We're not listening on this interface already, so create a
593        UDP port listener entry */
594     udp_listener = OBJ_NEW(agent_udp_port_listener_t);
595     if (NULL == udp_listener) {
596         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
597         ABORT("Out of memory");
598         /* Will not return */
599     }
600 
601     udp_listener->module = cmd.module;
602     udp_listener->max_msg_size = cmd.max_msg_size;
603     udp_listener->ipv4_addr = cmd.ipv4_addr;
604     udp_listener->netmask = cmd.netmask;
605     udp_listener->usnic_name = strdup(cmd.usnic_name);
606 
607     /* Fill in the ipv4_addr_str.  Since we don't have the IPv4
608        address in sockaddr_in form, it's not worth using
609        inet_ntop() */
610     opal_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
611                                       sizeof(udp_listener->ipv4_addr_str),
612                                       cmd.ipv4_addr, cmd.netmask);
613 
614     udp_listener->buffer = malloc(udp_listener->max_msg_size);
615     if (NULL == udp_listener->buffer) {
616         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
617         ABORT("Out of memory");
618         /* Will not return */
619     }
620 
621     /* Create the listening socket */
622     udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
623     if (udp_listener->fd < 0) {
624         OPAL_ERROR_LOG(udp_listener->fd);
625         ABORT("Could not open listening socket");
626         /* Will not return */
627     }
628 
629     /* Bind it to the designated interface */
630     struct sockaddr_in inaddr;
631     memset(&inaddr, 0, sizeof(inaddr));
632     inaddr.sin_family = AF_INET;
633     inaddr.sin_addr.s_addr = cmd.ipv4_addr;
634     inaddr.sin_port = htons(0);
635 
636     ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
637     if (ret < 0) {
638         OPAL_ERROR_LOG(ret);
639         ABORT("Could not bind listening socket");
640         /* Will not return */
641     }
642 
643     /* Find out the port we got */
644     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
645     ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
646     if (ret < 0) {
647         OPAL_ERROR_LOG(ret);
648         ABORT("Could not get UDP port number from listening socket");
649         /* Will not return */
650     }
651     udp_listener->udp_port = ntohs(inaddr.sin_port);
652 
653     opal_output_verbose(20, USNIC_OUT,
654                         "usNIC connectivity agent listening on %s:%d, (%s)",
655                         udp_listener->ipv4_addr_str,
656                         udp_listener->udp_port,
657                         udp_listener->usnic_name);
658 
659     /* Set the "don't fragment" bit on outgoing frames because we
660        want MTU-sized messages to get through successfully to the
661        peer, or fail if they have to fragment because of an MTU
662        mismatch somewhere enroute */
663     int val = IP_PMTUDISC_DO;
664     ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
665                      &val, sizeof(val));
666     if (0 != ret) {
667         OPAL_ERROR_LOG(ret);
668         ABORT("Unable to set \"do not fragment\" on UDP socket");
669         /* Will not return */
670     }
671 
672     /* Set the send and receive buffer sizes to our MTU size */
673     int temp;
674     temp = (int) udp_listener->max_msg_size;
675     if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
676                           &temp, sizeof(temp))) < 0 ||
677         (ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
678                           &temp, sizeof(temp))) < 0) {
679         OPAL_ERROR_LOG(ret);
680         ABORT("Could not set socket buffer sizes");
681         /* Will not return */
682     }
683 
684     /* Create a listening event */
685     opal_event_set(mca_btl_usnic_component.opal_evbase,
686                    &udp_listener->event, udp_listener->fd,
687                    OPAL_EV_READ | OPAL_EV_PERSIST,
688                    agent_thread_receive_ping, udp_listener);
689     opal_event_add(&udp_listener->event, 0);
690 
691     /* Save this listener on the list of udp_port_listeners */
692     opal_list_append(&udp_port_listeners, &udp_listener->super);
693 
694     udp_listener->active = true;
695 
696     /* Return the port number to the sender */
697     ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
698                                         cmd.ipv4_addr, udp_listener->udp_port);
699 
700     /* All done! */
701     return;
702 }
703 
704 /*
705  * Send a ping
706  */
agent_thread_send_ping(int fd,short flags,void * context)707 static void agent_thread_send_ping(int fd, short flags, void *context)
708 {
709     agent_ping_t *ap = (agent_ping_t*) context;
710     ap->timer_active = false;
711 
712     char dest_ipv4_addr_str[IPV4STRADDRLEN];
713     opal_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
714                                       sizeof(dest_ipv4_addr_str),
715                                       ap->dest_ipv4_addr, ap->dest_netmask);
716 
717     /* If we got all the ACKs for this ping, then move this ping from
718        the "pending" list to the "results" list.  We can also free the
719        buffers associated with this ping result, just to save some
720        space in the long run.  */
721     if (ap->acked[0] && ap->acked[1]) {
722         opal_list_remove_item(&pings_pending, &ap->super);
723         opal_list_append(&ping_results, &ap->super);
724 
725         opal_output_verbose(20, USNIC_OUT,
726                             "usNIC connectivity GOOD between %s <--> %s",
727                             ap->listener->ipv4_addr_str,
728                             dest_ipv4_addr_str);
729 
730         for (int i = 0; i < 2; ++i) {
731             if (NULL != ap->buffers[i]) {
732                 free(ap->buffers[i]);
733                 ap->buffers[i] = NULL;
734             }
735         }
736 
737         return;
738     }
739 
740     /* If we've resent too many times, then just abort */
741     if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
742         char *topic;
743         if (ap->acked[0] && !ap->acked[1]) {
744             // For the show_help topic checker script
745             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small ok, large bad"
746             topic = "connectivity error: small ok, large bad";
747         } else if (!ap->acked[0] && ap->acked[1]) {
748             // For the show_help topic checker script
749             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large ok"
750             topic = "connectivity error: small bad, large ok";
751         } else {
752             // For the show_help topic checker script
753             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large bad"
754             topic = "connectivity error: small bad, large bad";
755         }
756 
757         char ipv4_addr_str[IPV4STRADDRLEN];
758         opal_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
759                                           ap->dest_ipv4_addr,
760                                           ap->dest_netmask);
761         opal_show_help("help-mpi-btl-usnic.txt", topic, true,
762                        opal_process_info.nodename,
763                        ap->listener->ipv4_addr_str,
764                        ap->listener->usnic_name,
765                        ap->dest_nodename,
766                        ipv4_addr_str,
767                        ap->sizes[0],
768                        ap->sizes[1]);
769         opal_btl_usnic_exit(NULL);
770         /* Will not return */
771     }
772 
773     time_t t = time(NULL);
774     opal_output_verbose(20, USNIC_OUT,
775                         "usNIC connectivity pinging %s:%d (%s) from %s (%s) at %s",
776                         dest_ipv4_addr_str,
777                         ntohs(ap->dest_sockaddr.sin_port),
778                         ap->dest_nodename,
779                         ap->listener->ipv4_addr_str,
780                         ap->listener->usnic_name,
781                         ctime(&t));
782 
783     /* Send the ping messages to the peer */
784     for (int i = 0; i < NUM_PING_SIZES; ++i) {
785         agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
786                      (struct sockaddr*) &ap->dest_sockaddr);
787     }
788 
789     /* Set a timer to check if these pings are ACKed */
790     opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
791                    -1, 0, agent_thread_send_ping, ap);
792     opal_event_add(&ap->timer, &ack_timeout);
793     ap->timer_active = true;
794 
795     /* Count how many times we've done this */
796     ++ap->num_sends;
797 }
798 
799 /*
800  * Receive and process the rest of a PING command from a local IPC
801  * client.
802  */
agent_thread_cmd_ping(agent_ipc_listener_t * ipc_listener)803 static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
804 {
805     /* Read the rest of the PING command from the IPC socket */
806     int ret;
807     opal_btl_usnic_connectivity_cmd_ping_t cmd;
808     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
809     if (OPAL_SUCCESS != ret) {
810         OPAL_ERROR_LOG(ret);
811         ABORT("usnic connectivity agent IPC PING read failed");
812         /* Will not return */
813     }
814 
815     /* Have we already pinged this IP address / port? */
816     agent_ping_t *ap;
817     OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
818         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
819             ap->dest_udp_port == cmd.dest_udp_port) {
820             /* We already have results from pinging this IP address /
821                port, so there's no need for further action */
822             return;
823         }
824     }
825 
826     /* Are we in the middle of pinging this IP address / port? */
827     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
828         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
829             ap->dest_udp_port == cmd.dest_udp_port) {
830             /* We're already in the middle of pinging this IP address
831                / port, so there's no need for further action */
832             return;
833         }
834     }
835 
836     /* This is a new ping request.  Find the listener with this source
837        ipv4 address */
838     bool found = false;
839     agent_udp_port_listener_t *udp_listener;
840     OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
841                       agent_udp_port_listener_t) {
842         if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
843             found = true;
844             break;
845         }
846     }
847     if (!found) {
848         ABORT("Could not ping listener for ping request");
849         /* Will not return */
850     }
851 
852     /* This is a new ping request; track it */
853     ap = OBJ_NEW(agent_ping_t);
854     if (NULL == ap) {
855         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
856         ABORT("Out of memory");
857         /* Will not return */
858     }
859     ap->src_ipv4_addr = cmd.src_ipv4_addr;
860     ap->src_udp_port = cmd.src_udp_port;
861     ap->listener = udp_listener;
862     ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
863     ap->dest_netmask = cmd.dest_netmask;
864     ap->dest_udp_port = cmd.dest_udp_port;
865     ap->dest_sockaddr.sin_family = AF_INET;
866     ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
867     ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
868     ap->dest_nodename = strdup(cmd.dest_nodename);
869 
870     /* The first message we send will be "short" (a simple control
871        message); the second will be "long" (i.e., caller-specified
872        length) */
873     ap->sizes[0] = sizeof(agent_udp_message_t);
874 
875     /* Note that the MTU is the max Ethernet frame payload.  So from
876        that MTU, we have to subtract off the max IP header (e.g., if
877        all IP options are enabled, which is 60 bytes), and then also
878        subtract off the UDP header (which is 8 bytes).  So we need to
879        subtract off 68 bytes from the MTU, and that's the largest ping
880        payload we can send. */
881     ap->sizes[1] = cmd.max_msg_size - 68;
882 
883     /* Allocate a buffer for each size.  Make sure the smallest size
884        is at least sizeof(agent_udp_message_t). */
885     agent_udp_message_t *msg;
886     for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
887         ap->buffers[i] = calloc(1, ap->sizes[i]);
888         if (NULL == ap->buffers[i]) {
889             OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
890             ABORT("Out of memory");
891             /* Will not return */
892         }
893 
894         /* Fill in the message with return addressing information */
895         msg = (agent_udp_message_t*) ap->buffers[i];
896         msg->message_type = AGENT_MSG_TYPE_PING;
897         msg->src_ipv4_addr = ap->src_ipv4_addr;
898         msg->src_udp_port = ap->src_udp_port;
899         msg->magic_number = MAGIC_ORIGINATOR;
900         msg->major_version = OPAL_MAJOR_VERSION;
901         msg->minor_version = OPAL_MINOR_VERSION;
902         msg->size = ap->sizes[i];
903     }
904 
905     /* Save this ping request on the "pending" list */
906     opal_list_append(&pings_pending, &ap->super);
907 
908     /* Send the ping */
909     agent_thread_send_ping(0, 0, ap);
910 }
911 
912 /*
913  * Receive and process the rest of an UNLISTEN command from a local IPC
914  * client.
915  */
agent_thread_cmd_unlisten(agent_ipc_listener_t * ipc_listener)916 static void agent_thread_cmd_unlisten(agent_ipc_listener_t *ipc_listener)
917 {
918     /* Read the rest of the UNLISTEN command from the IPC socket */
919     int ret;
920     opal_btl_usnic_connectivity_cmd_unlisten_t cmd;
921     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
922     if (OPAL_SUCCESS != ret) {
923         OPAL_ERROR_LOG(ret);
924         ABORT("usnic connectivity agent IPC UNLISTEN read failed");
925         /* Will not return */
926     }
927 
928     /* If we are listening on this address (and we should be), then
929        stop listening on it. */
930     uint32_t udp_port;
931     agent_udp_port_listener_t *udp_listener;
932     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
933     if (NULL != udp_listener) {
934         OBJ_RELEASE(udp_listener);
935     }
936 
937     /* All done! */
938     return;
939 }
940 
941 /*
942  * Called when we get an incoming IPC message
943  */
agent_thread_ipc_receive(int fd,short flags,void * context)944 static void agent_thread_ipc_receive(int fd, short flags, void *context)
945 {
946     int32_t command;
947     agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
948 
949     /* Read the command */
950     command = -1;
951     int ret = opal_fd_read(fd, sizeof(command), &command);
952     if (OPAL_ERR_TIMEOUT == ret) {
953         /* We get OPAL_ERR_TIMEOUT if the remote side hung up */
954         OBJ_RELEASE(ipc_listener);
955         return;
956     } else if (OPAL_SUCCESS != ret) {
957         OPAL_ERROR_LOG(ret);
958         ABORT("usnic connectivity agent IPC command read failed");
959         /* Will not return */
960     }
961 
962     assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
963            CONNECTIVITY_AGENT_CMD_PING == command ||
964            CONNECTIVITY_AGENT_CMD_UNLISTEN == command);
965 
966     switch (command) {
967     case CONNECTIVITY_AGENT_CMD_LISTEN:
968         agent_thread_cmd_listen(ipc_listener);
969         break;
970     case CONNECTIVITY_AGENT_CMD_PING:
971         agent_thread_cmd_ping(ipc_listener);
972         break;
973     case CONNECTIVITY_AGENT_CMD_UNLISTEN:
974         agent_thread_cmd_unlisten(ipc_listener);
975         break;
976     default:
977         ABORT("Unexpected connectivity agent command");
978         break;
979     }
980 }
981 
982 /*
983  * We got a new connection on the IPC named socket.  Add it to the
984  * event base.
985  */
agent_thread_accept(int fd,short flags,void * context)986 static void agent_thread_accept(int fd, short flags, void *context)
987 {
988     struct sockaddr addr;
989     socklen_t len;
990     agent_ipc_listener_t *listener = NULL;
991 
992     len = sizeof(addr);
993     int client_fd = accept(fd, &addr, &len);
994     if (client_fd < 0) {
995         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
996         ABORT("accept() failed");
997         /* Will not return */
998     }
999 
1000     /* If we got a good client, verify that it sent the magic token */
1001     int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
1002     char *msg = alloca(tlen + 1);
1003     if (NULL == msg) {
1004         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1005         ABORT("Out of memory");
1006         /* Will not return */
1007     }
1008     if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
1009         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1010         ABORT("usnic connectivity agent IPC read failed");
1011         /* Will not return */
1012     }
1013     if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
1014         opal_output_verbose(20, USNIC_OUT,
1015                             "usNIC connectivity got bad IPC client (wrong magic token); disconnected");
1016         close(client_fd);
1017         return;
1018     }
1019 
1020     /* Remember how many accepts we have successfully completed */
1021     ++ipc_accepts;
1022 
1023     /* Make a listener object for this peer */
1024     listener = OBJ_NEW(agent_ipc_listener_t);
1025     listener->client_fd = client_fd;
1026 
1027     /* Write back the magic token to ACK that we got the peer's
1028        magic token and all is kosher */
1029     if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
1030                                       CONNECTIVITY_MAGIC_TOKEN)) {
1031         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1032         ABORT("usnic connectivity agent IPC read failed");
1033         /* Will not return */
1034     }
1035 
1036     /* Add this IPC listener to the event base */
1037     opal_event_set(mca_btl_usnic_component.opal_evbase,
1038                    &listener->event, client_fd,
1039                    OPAL_EV_READ | OPAL_EV_PERSIST,
1040                    agent_thread_ipc_receive, listener);
1041     opal_event_add(&listener->event, 0);
1042 
1043     /* Save this listener on the list of ipc_listeners */
1044     opal_list_append(&ipc_listeners, &listener->super);
1045 
1046     listener->active = true;
1047 
1048     return;
1049 }
1050 
1051 /*
1052  * Tear down all active events.
1053  *
1054  * This is done as an event callback in the agent threaf so that there
1055  * is no race condition in the teardown.  Specifically: the progress
1056  * thread will only fire one event at a time.  Therefore, this one
1057  * event can "atomically" delete all the events and data structures
1058  * and not have to worry about concurrent access from some event
1059  * firing in the middle of the teardown process.
1060  */
agent_thread_finalize(int fd,short flags,void * context)1061 static void agent_thread_finalize(int fd, short flags, void *context)
1062 {
1063     /* Free the event that triggered this call */
1064     free(context);
1065 
1066     /* Ensure that all the local IPC clients have connected to me (so
1067        that we don't shut down before someone tries to connect to me),
1068        or 10 seconds have passed (i.e., if 10 seconds pass and they
1069        don't all connect to me, then something else is wrong, and we
1070        should just give up). */
1071     static bool first = true;
1072     static time_t timestamp = 0;
1073     if (first) {
1074         timestamp = time(NULL);
1075         first = false;
1076     }
1077 
1078     if (ipc_accepts < opal_process_info.num_local_peers &&
1079         time(NULL) < timestamp + 10) {
1080         opal_output_verbose(20, USNIC_OUT,
1081                             "usNIC connectivity agent delaying shutdown until all clients connect...");
1082 
1083         opal_event_t *ev = calloc(sizeof(*ev), 1);
1084         struct timeval finalize_retry = {
1085             .tv_sec = 0,
1086             .tv_usec = 10000
1087         };
1088 
1089         opal_event_set(mca_btl_usnic_component.opal_evbase,
1090                        ev, -1, 0, agent_thread_finalize, ev);
1091         opal_event_add(ev, &finalize_retry);
1092         return;
1093     }
1094     if (ipc_accepts < opal_process_info.num_local_peers) {
1095         opal_output_verbose(20, USNIC_OUT,
1096                             "usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
1097     }
1098 
1099     /* Remove the agent listening event from the opal async event
1100        base */
1101     opal_event_del(&ipc_event);
1102 
1103     /* Shut down all active udp_port_listeners */
1104     agent_udp_port_listener_t *udp_listener, *ulnext;
1105     OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
1106                            agent_udp_port_listener_t) {
1107         OBJ_RELEASE(udp_listener);
1108     }
1109 
1110     /* Destroy the pending pings and ping results */
1111     agent_ping_t *request, *pnext;
1112     OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
1113         opal_list_remove_item(&pings_pending, &request->super);
1114         OBJ_RELEASE(request);
1115     }
1116 
1117     OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
1118         opal_list_remove_item(&ping_results, &request->super);
1119         OBJ_RELEASE(request);
1120     }
1121 
1122     /* Shut down all active ipc_listeners */
1123     agent_ipc_listener_t *ipc_listener, *inext;
1124     OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
1125                            agent_ipc_listener_t) {
1126         OBJ_RELEASE(ipc_listener);
1127     }
1128 
1129     agent_initialized = false;
1130 }
1131 
1132 /**************************************************************************
1133  * All of the following functions run in the main application thread
1134  **************************************************************************/
1135 
1136 /*
1137  * Setup the agent and start its event loop running in a dedicated
1138  * thread
1139  */
opal_btl_usnic_connectivity_agent_init(void)1140 int opal_btl_usnic_connectivity_agent_init(void)
1141 {
1142     /* Only do this initialization if I am the agent (the agent is
1143        local rank 0) */
1144     if (opal_process_info.my_local_rank != 0) {
1145         return OPAL_SUCCESS;
1146     }
1147     if (agent_initialized) {
1148         return OPAL_SUCCESS;
1149     }
1150 
1151     /* Make a struct timeval for use with timer events.  Note that the
1152        MCA param is expressed in terms of *milli*seconds, but the
1153        timeval timeout is expressed in terms of *micro*seconds. */
1154     ack_timeout.tv_sec =
1155         mca_btl_usnic_component.connectivity_ack_timeout / 1000;
1156     ack_timeout.tv_usec =
1157         1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
1158 
1159     /* Create lists */
1160     OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
1161     OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
1162     OBJ_CONSTRUCT(&pings_pending, opal_list_t);
1163     OBJ_CONSTRUCT(&ping_results, opal_list_t);
1164 
1165     /********************************************************************
1166      * Once all of the above is setup, create the unix domain socket
1167      * and start the event loop.
1168      ********************************************************************/
1169 
1170     /* Create the unix domain socket in the job session directory */
1171     ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
1172     if (ipc_accept_fd < 0) {
1173         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1174         ABORT("socket() failed");
1175         /* Will not return */
1176     }
1177 
1178     asprintf(&ipc_filename, "%s/%s",
1179              opal_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
1180     if (NULL == ipc_filename) {
1181         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1182         ABORT("Out of memory");
1183         /* Will not return */
1184     }
1185     unlink(ipc_filename);
1186 
1187     struct sockaddr_un address;
1188     assert(strlen(ipc_filename) < sizeof(address.sun_path));
1189 
1190     memset(&address, 0, sizeof(struct sockaddr_un));
1191     address.sun_family = AF_UNIX;
1192     strncpy(address.sun_path, ipc_filename, sizeof(address.sun_path) - 1);
1193 
1194     if (bind(ipc_accept_fd, (struct sockaddr *) &address,
1195              sizeof(struct sockaddr_un)) != 0) {
1196         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1197         ABORT("bind() failed");
1198         /* Will not return */
1199     }
1200 
1201     /* Give an arbitrarily large backlog number so that connecting
1202        clients will never be backlogged (note for Future Jeff: please
1203        don't laugh at Past Jeff if 256 has become a trivially small
1204        number of on-server procs in a single job). */
1205     if (listen(ipc_accept_fd, 256) != 0) {
1206         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1207         ABORT("listen() failed");
1208         /* Will not return */
1209     }
1210 
1211     /* Add the socket to the event base */
1212     opal_event_set(mca_btl_usnic_component.opal_evbase,
1213                    &ipc_event, ipc_accept_fd,
1214                    OPAL_EV_READ | OPAL_EV_PERSIST,
1215                    agent_thread_accept, NULL);
1216     opal_event_add(&ipc_event, 0);
1217 
1218     opal_output_verbose(20, USNIC_OUT,
1219                         "usNIC connectivity agent initialized");
1220     agent_initialized = true;
1221     return OPAL_SUCCESS;
1222 }
1223 
1224 /*
1225  * Shut down the agent
1226  */
opal_btl_usnic_connectivity_agent_finalize(void)1227 int opal_btl_usnic_connectivity_agent_finalize(void)
1228 {
1229     /* Only do this if I have the agent running */
1230     if (!agent_initialized) {
1231         return OPAL_SUCCESS;
1232     }
1233 
1234     /* Submit an event to the async thread and tell it to delete all
1235        the usNIC events.  See the rationale for doing this in the
1236        comment in the agent_thread_finalize() function. */
1237     opal_event_t *ev = calloc(sizeof(*ev), 1);
1238     opal_event_set(mca_btl_usnic_component.opal_evbase,
1239                    ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
1240     opal_event_active(ev, OPAL_EV_WRITE, 1);
1241 
1242     /* Wait for the event to fire and complete */
1243     while (agent_initialized) {
1244         struct timespec tp = {
1245             .tv_sec  = 0,
1246             .tv_nsec = 1000
1247         };
1248         nanosleep(&tp, NULL);
1249     }
1250 
1251     /* Close the local IPC socket and remove the file */
1252     if (ipc_accept_fd != -1) {
1253         close(ipc_accept_fd);
1254         ipc_accept_fd = -1;
1255     }
1256     if (NULL != ipc_filename) {
1257         unlink(ipc_filename);
1258         free(ipc_filename);
1259         ipc_filename = NULL;
1260     }
1261 
1262     opal_output_verbose(20, USNIC_OUT,
1263                         "usNIC connectivity client finalized");
1264     return OPAL_SUCCESS;
1265 }
1266