1 /*
2 * Copyright (c) 2014-2016 Cisco Systems, Inc. All rights reserved.
3 * Copyright (c) 2015 Research Organization for Information Science
4 * and Technology (RIST). All rights reserved.
5 * $COPYRIGHT$
6 *
7 * Additional copyrights may follow
8 *
9 * $HEADER$
10 */
11
12 #include "opal_config.h"
13
14 #include <assert.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <sys/un.h>
18 #include <unistd.h>
19 #ifdef HAVE_ALLOCA_H
20 #include <alloca.h>
21 #endif
22
23 #include "opal_stdint.h"
24 #include "opal/threads/mutex.h"
25 #include "opal/mca/event/event.h"
26 #include "opal/util/show_help.h"
27 #include "opal/types.h"
28 #include "opal/util/output.h"
29 #include "opal/util/fd.h"
30
31 #include "btl_usnic.h"
32 #include "btl_usnic_connectivity.h"
33
34 /**************************************************************************
35 * Agent data and methods
36 **************************************************************************/
37
38 /*
39 * Local variables
40 */
41 static int ipc_accept_fd = -1;
42 static char *ipc_filename = NULL;
43 static opal_event_t ipc_event;
44 static struct timeval ack_timeout;
45 static opal_list_t udp_port_listeners;
46 static opal_list_t ipc_listeners;
47 static volatile int ipc_accepts = 0;
48 /* JMS The pings_pending and ping_results should probably both be hash
49 tables for more efficient lookups */
50 static opal_list_t pings_pending;
51 static opal_list_t ping_results;
52 static volatile bool agent_initialized = false;
53
54
55 /*
56 * Holds all the information about a UDP port that the agent thread is
57 * listening on (for incoming PINGs and ACKs).
58 */
59 typedef struct {
60 opal_list_item_t super;
61
62 /* Data from the LISTEN command message */
63 uint32_t ipv4_addr;
64 uint32_t netmask;
65 char ipv4_addr_str[IPV4STRADDRLEN];
66 uint32_t max_msg_size;
67 char *nodename;
68 char *usnic_name;
69
70 /* File descriptor, UDP port, buffer to receive messages, and event */
71 int fd;
72 uint32_t udp_port;
73 uint8_t *buffer;
74 opal_event_t event;
75 bool active;
76 opal_btl_usnic_module_t *module;
77 } agent_udp_port_listener_t;
78
79 OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
80
81 /*
82 * Holds information for a local IPC socket fd (i.e., a connection
83 * from a local process to this agent).
84 */
85 typedef struct {
86 opal_list_item_t super;
87
88 int client_fd;
89 opal_event_t event;
90 bool active;
91 } agent_ipc_listener_t;
92
93 OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
94
95 typedef enum {
96 AGENT_MSG_TYPE_PING = 17,
97 AGENT_MSG_TYPE_ACK
98 } agent_udp_message_type_t;
99
100 // Arbitrary 64 bit numbers
101 #define MAGIC_ORIGINATOR 0x9a9e2fbce63a11e5
102 #define MAGIC_TARGET 0x60735c68f368aace
103
104 /*
105 * Ping and ACK messages
106 */
107 typedef struct {
108 uint8_t message_type;
109
110 /* The sender's IP address and port (i.e., where the ACK can be
111 sent). This is actually redundant with the sockaddr that we
112 get from recvfrom(), but that's ok -- it provides a sanity
113 check. */
114 uint32_t src_ipv4_addr;
115 uint32_t src_udp_port;
116
117 /* A magic number that helps determine that the sender was Open
118 MPI */
119 uint64_t magic_number;
120 uint32_t major_version, minor_version;
121
122 /* If this is a PING, the message should be this size.
123 If this is an ACK, we are ACKing a ping of this size. */
124 uint32_t size;
125 } agent_udp_message_t;
126
127 typedef struct {
128 opal_list_item_t super;
129
130 /* Data from the PING command message */
131 uint32_t src_ipv4_addr; /* in network byte order */
132 uint32_t src_udp_port;
133 agent_udp_port_listener_t *listener;
134 uint32_t dest_ipv4_addr; /* in network byte order */
135 uint32_t dest_netmask;
136 uint32_t dest_udp_port;
137 struct sockaddr_in dest_sockaddr;
138 char *dest_nodename;
139
140 /* The sizes and corresponding buffers of the PING messages that
141 we'll send, and whether each of those PING messages have been
142 ACKed yet */
143 #define NUM_PING_SIZES 2
144 size_t sizes[NUM_PING_SIZES];
145 uint8_t *buffers[NUM_PING_SIZES];
146 bool acked[NUM_PING_SIZES];
147
148 /* Number of times we've sent this ping */
149 int num_sends;
150
151 /* Timer used to re-send the PING, and whether the timer is active
152 or not */
153 opal_event_t timer;
154 bool timer_active;
155 } agent_ping_t;
156
157 OBJ_CLASS_DECLARATION(agent_ping_t);
158
159
160 /**************************************************************************
161 * Utility functions, constructors, destructors
162 **************************************************************************/
163
udp_port_listener_zero(agent_udp_port_listener_t * obj)164 static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
165 {
166 obj->ipv4_addr =
167 obj->netmask =
168 obj->max_msg_size = 0;
169 obj->nodename =
170 obj->usnic_name = NULL;
171 memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
172
173 obj->fd = -1;
174 obj->udp_port = -1;
175 obj->buffer = NULL;
176
177 obj->active = false;
178 }
179
udp_port_listener_constructor(agent_udp_port_listener_t * obj)180 static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
181 {
182 udp_port_listener_zero(obj);
183 }
184
udp_port_listener_destructor(agent_udp_port_listener_t * obj)185 static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
186 {
187 /* Find any pings that are pending on this listener and delete
188 them */
189 agent_ping_t *ap, *apnext;
190 OPAL_LIST_FOREACH_SAFE(ap, apnext, &pings_pending, agent_ping_t) {
191 if (ap->src_ipv4_addr == obj->ipv4_addr) {
192 opal_list_remove_item(&pings_pending, &ap->super);
193 OBJ_RELEASE(ap);
194 }
195 }
196
197 if (-1 != obj->fd) {
198 close(obj->fd);
199 }
200 if (NULL != obj->nodename) {
201 free(obj->nodename);
202 }
203 if (NULL != obj->usnic_name) {
204 free(obj->usnic_name);
205 }
206 if (NULL != obj->buffer) {
207 free(obj->buffer);
208 }
209
210 /* If the "active" flag is set, then the event is active and the
211 item is on the udp_port_listeners list */
212 if (obj->active) {
213 opal_event_del(&obj->event);
214 opal_list_remove_item(&udp_port_listeners, &obj->super);
215 }
216
217 udp_port_listener_zero(obj);
218 }
219
220 OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
221 opal_list_item_t,
222 udp_port_listener_constructor,
223 udp_port_listener_destructor);
224
ipc_listener_zero(agent_ipc_listener_t * obj)225 static void ipc_listener_zero(agent_ipc_listener_t *obj)
226 {
227 obj->client_fd = -1;
228 obj->active = false;
229 }
230
ipc_listener_constructor(agent_ipc_listener_t * obj)231 static void ipc_listener_constructor(agent_ipc_listener_t *obj)
232 {
233 ipc_listener_zero(obj);
234 }
235
ipc_listener_destructor(agent_ipc_listener_t * obj)236 static void ipc_listener_destructor(agent_ipc_listener_t *obj)
237 {
238 if (-1 != obj->client_fd) {
239 close(obj->client_fd);
240 }
241
242 /* If the "active" flag is set, then the event is active and the
243 item is on the ipc_listeners list */
244 if (obj->active) {
245 opal_event_del(&obj->event);
246 opal_list_remove_item(&ipc_listeners, &obj->super);
247 }
248
249 ipc_listener_zero(obj);
250 }
251
252 OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
253 opal_list_item_t,
254 ipc_listener_constructor,
255 ipc_listener_destructor);
256
agent_ping_result_zero(agent_ping_t * obj)257 static void agent_ping_result_zero(agent_ping_t *obj)
258 {
259 obj->src_ipv4_addr = 0;
260 obj->src_udp_port = 0;
261 obj->listener = NULL;
262 obj->dest_ipv4_addr = 0;
263 obj->dest_udp_port = 0;
264 obj->num_sends = 0;
265 obj->timer_active = false;
266
267 for (int i = 0; i < NUM_PING_SIZES; ++i) {
268 obj->sizes[i] = 0;
269 obj->buffers[i] = NULL;
270 obj->acked[i] = false;
271 }
272 }
273
agent_ping_result_constructor(agent_ping_t * obj)274 static void agent_ping_result_constructor(agent_ping_t *obj)
275 {
276 agent_ping_result_zero(obj);
277 }
278
agent_ping_result_destructor(agent_ping_t * obj)279 static void agent_ping_result_destructor(agent_ping_t *obj)
280 {
281 for (int i = 0; i < NUM_PING_SIZES; ++i) {
282 if (NULL != obj->buffers[i]) {
283 free(obj->buffers[i]);
284 }
285 }
286 if (obj->timer_active) {
287 opal_event_del(&obj->timer);
288 }
289
290 agent_ping_result_zero(obj);
291 }
292
293 OBJ_CLASS_INSTANCE(agent_ping_t,
294 opal_list_item_t,
295 agent_ping_result_constructor,
296 agent_ping_result_destructor);
297
298 /*
299 * Wrapper around sendto() loop
300 */
agent_sendto(int fd,char * buffer,ssize_t numbytes,struct sockaddr * addr)301 static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
302 struct sockaddr *addr)
303 {
304 ssize_t rc;
305 while (1) {
306 rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
307 /* Note that since this is UDP, so we don't need to check
308 for 0 < rc < numbytes */
309 if (rc == numbytes) {
310 return;
311 } else if (rc < 0) {
312 if (errno == EAGAIN || errno == EINTR) {
313 continue;
314 } else if (errno == EPERM) {
315 // We're sending too fast
316 usleep(5);
317 continue;
318 }
319
320 char *msg;
321 asprintf(&msg, "Unexpected sendto() error: errno=%d (%s)",
322 errno, strerror(errno));
323 ABORT(msg);
324 /* Will not return */
325 }
326
327 /* We should never get here, but just in case we do, sleep a
328 little, just so we don't hammer the CPU */
329 usleep(1);
330 }
331
332 /* Will not get here */
333 }
334
335 /**************************************************************************
336 * All of the following functions run in agent thread
337 **************************************************************************/
338
339 /*
340 * Handle an incoming PING message (send an ACK)
341 */
agent_thread_handle_ping(agent_udp_port_listener_t * listener,ssize_t numbytes,struct sockaddr * from)342 static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
343 ssize_t numbytes, struct sockaddr *from)
344 {
345 /* If the size we received isn't equal to what the sender says it
346 sent, do the simple thing: just don't send an ACK */
347 agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
348 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
349 if (msg->size != numbytes) {
350 char str[INET_ADDRSTRLEN];
351 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
352
353 opal_output_verbose(20, USNIC_OUT,
354 "usNIC connectivity got bad ping: %d bytes from %s, expected %d (discarded)",
355 (int) numbytes, str, (int) msg->size);
356 return;
357 }
358
359 /* Ensure that the sender sent the ping from the IP address that
360 they think they sent it from. If they didn't, then drop it
361 (i.e., it's a bad ping because the sender sent it from an
362 unexpected interface). This should probably never happen, but
363 it's a good failsafe for unexpected scenarios. */
364 char msg_ipv4_addr_str[IPV4STRADDRLEN];
365 char real_ipv4_addr_str[IPV4STRADDRLEN];
366
367 opal_btl_usnic_snprintf_ipv4_addr(msg_ipv4_addr_str,
368 sizeof(msg_ipv4_addr_str),
369 msg->src_ipv4_addr, 0);
370 opal_btl_usnic_snprintf_ipv4_addr(real_ipv4_addr_str,
371 sizeof(real_ipv4_addr_str),
372 src_addr_in->sin_addr.s_addr, 0);
373
374 if (msg->src_ipv4_addr != src_addr_in->sin_addr.s_addr) {
375 opal_output_verbose(20, USNIC_OUT,
376 "usNIC connectivity got bad ping (from unexpected address: %s != %s, discarded)",
377 msg_ipv4_addr_str, real_ipv4_addr_str);
378 return;
379 }
380
381 if (msg->magic_number != MAGIC_ORIGINATOR) {
382 opal_output_verbose(20, USNIC_OUT,
383 "usNIC connectivity got bad ping (magic number: %" PRIu64 ", discarded)",
384 msg->magic_number);
385 return;
386 }
387 if (msg->major_version != OPAL_MAJOR_VERSION ||
388 msg->minor_version != OPAL_MINOR_VERSION) {
389 opal_output_verbose(20, USNIC_OUT,
390 "usNIC connectivity got bad ping (originator version: %d.%d, expected %d.%d, discarded)",
391 msg->major_version, msg->minor_version,
392 OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION);
393 return;
394 }
395
396 /* Ok, this is a good ping. Send the ACK back. The PING sender
397 will verify that the ACK came back from the IP address that it
398 expected. */
399
400 opal_output_verbose(20, USNIC_OUT,
401 "usNIC connectivity got PING (size=%ld) from %s; sending ACK",
402 numbytes, msg_ipv4_addr_str);
403
404 /* Send back an ACK. No need to allocate a new buffer; just
405 re-use the same buffer we just got. Note that msg->size is
406 already set. We simply echo back the sender's IP address/port
407 in the msg (the sender will use the msg fields and the
408 recvfrom() src_addr to check for a match). */
409 msg->message_type = AGENT_MSG_TYPE_ACK;
410 msg->magic_number = MAGIC_TARGET;
411
412 agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
413 }
414
415 /*
416 * Handle an incoming ACK message
417 */
agent_thread_handle_ack(agent_udp_port_listener_t * listener,ssize_t numbytes,struct sockaddr * from)418 static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
419 ssize_t numbytes, struct sockaddr *from)
420 {
421 char str[INET_ADDRSTRLEN];
422 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
423 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
424
425 /* If we got a wonky ACK message that is the wrong length, just
426 return */
427 agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
428 if (numbytes != sizeof(*msg)) {
429 opal_output_verbose(20, USNIC_OUT,
430 "usNIC connectivity got bad ACK: %d bytes from %s, expected %d (discarded)",
431 (int) numbytes, str, (int) sizeof(*msg));
432 return;
433 }
434 if (msg->magic_number != MAGIC_TARGET) {
435 opal_output_verbose(20, USNIC_OUT,
436 "usNIC connectivity got bad ACK (magic number: %" PRIu64 ", discarded)",
437 msg->magic_number);
438 return;
439 }
440
441 /* Find the pending ping request (on this interface) for this ACK.
442 If we don't find a match, we'll drop it. */
443 agent_ping_t *ap;
444 uint32_t src_in_port = ntohs(src_addr_in->sin_port);
445 OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
446 if (ap->dest_ipv4_addr == src_addr_in->sin_addr.s_addr &&
447 ap->dest_udp_port == src_in_port &&
448 ap->src_ipv4_addr == msg->src_ipv4_addr &&
449 ap->src_udp_port == msg->src_udp_port) {
450 /* Found it -- indicate that it has been acked */
451 for (int i = 0; i < NUM_PING_SIZES; ++i) {
452 if (ap->sizes[i] == msg->size) {
453 ap->acked[i] = true;
454 return;
455 }
456 }
457 }
458 }
459
460 /* If we didn't find the matching ping for this ACK, then just
461 discard it */
462 opal_output_verbose(20, USNIC_OUT,
463 "usNIC connectivity got unexpected ACK: %d bytes from %s (discarded)",
464 (int) numbytes, str);
465 }
466
467 /*
468 * Receive a message from the listening UDP socket
469 */
agent_thread_receive_ping(int fd,short flags,void * context)470 static void agent_thread_receive_ping(int fd, short flags, void *context)
471 {
472 agent_udp_port_listener_t *listener =
473 (agent_udp_port_listener_t *) context;
474 assert(NULL != listener);
475
476 /* Receive the message */
477 ssize_t numbytes;
478 struct sockaddr src_addr;
479 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) &src_addr;
480 socklen_t addrlen = sizeof(src_addr);
481
482 while (1) {
483 numbytes = recvfrom(listener->fd, listener->buffer, listener->max_msg_size, 0,
484 &src_addr, &addrlen);
485 if (numbytes > 0) {
486 break;
487 } else if (numbytes < 0) {
488 if (errno == EAGAIN || errno == EINTR) {
489 continue;
490 }
491
492 ABORT("Unexpected error from recvfrom");
493 /* Will not return */
494 }
495 }
496
497 char str[INET_ADDRSTRLEN];
498 agent_udp_message_t *msg;
499 msg = (agent_udp_message_t *) listener->buffer;
500 switch (msg->message_type) {
501 case AGENT_MSG_TYPE_PING:
502 agent_thread_handle_ping(listener, numbytes, &src_addr);
503 break;
504 case AGENT_MSG_TYPE_ACK:
505 agent_thread_handle_ack(listener, numbytes, &src_addr);
506 break;
507 default:
508 /* Ignore unknown pings */
509 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
510 opal_output_verbose(20, USNIC_OUT,
511 "usNIC connectivity agent received unknown message: %d bytes from %s",
512 (int) numbytes, str);
513 break;
514 }
515 }
516
517 static agent_udp_port_listener_t *
agent_thread_find_listener(uint32_t ipv4_addr,uint32_t * udp_port)518 agent_thread_find_listener(uint32_t ipv4_addr, uint32_t *udp_port)
519 {
520 agent_udp_port_listener_t *listener;
521 OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
522 if (listener->ipv4_addr == ipv4_addr) {
523 *udp_port = listener->udp_port;
524 return listener;
525 }
526 }
527
528 return NULL;
529 }
530
531 /*
532 * Send reply back from the LISTEN command: send back the IP address
533 * and UDP port that we're listening on.
534 */
agent_thread_cmd_listen_reply(int fd,uint32_t addr,int32_t udp_port)535 static int agent_thread_cmd_listen_reply(int fd,
536 uint32_t addr, int32_t udp_port)
537 {
538 int ret;
539
540 opal_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
541 .cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
542 .ipv4_addr = addr,
543 .udp_port = udp_port
544 };
545
546 ret = opal_fd_write(fd, sizeof(cmd), &cmd);
547 if (OPAL_SUCCESS != ret) {
548 OPAL_ERROR_LOG(ret);
549 ABORT("usnic connectivity agent IPC write failed");
550 /* Will not return */
551 }
552
553 return OPAL_SUCCESS;
554 }
555
556 /*
557 * Receive and process the rest of a LISTEN command from a local IPC
558 * client.
559 */
agent_thread_cmd_listen(agent_ipc_listener_t * ipc_listener)560 static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
561 {
562 /* Read the rest of the LISTEN command from the IPC socket */
563 int ret;
564 opal_btl_usnic_connectivity_cmd_listen_t cmd;
565 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
566 if (OPAL_SUCCESS != ret) {
567 OPAL_ERROR_LOG(ret);
568 ABORT("usnic connectivity agent IPC LISTEN read failed");
569 /* Will not return */
570 }
571
572 /* If we're already listening on this address, send the UDP port
573 back to the client. */
574 uint32_t udp_port;
575 agent_udp_port_listener_t *udp_listener;
576 udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
577 if (NULL != udp_listener) {
578 /* If we get a non-NULL "module" pointer value from the
579 client, it means that this client is the same process as
580 this agent, and we should save this pointer value (all
581 non-agent MPI procs will send NULL as their "module"
582 pointer value -- i.e., some non-agent MPI proc was the
583 first one to send the LISTEN command). */
584 if (NULL == udp_listener->module) {
585 udp_listener->module = cmd.module;
586 }
587 agent_thread_cmd_listen_reply(ipc_listener->client_fd,
588 cmd.ipv4_addr, udp_port);
589 return;
590 }
591
592 /* We're not listening on this interface already, so create a
593 UDP port listener entry */
594 udp_listener = OBJ_NEW(agent_udp_port_listener_t);
595 if (NULL == udp_listener) {
596 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
597 ABORT("Out of memory");
598 /* Will not return */
599 }
600
601 udp_listener->module = cmd.module;
602 udp_listener->max_msg_size = cmd.max_msg_size;
603 udp_listener->ipv4_addr = cmd.ipv4_addr;
604 udp_listener->netmask = cmd.netmask;
605 udp_listener->usnic_name = strdup(cmd.usnic_name);
606
607 /* Fill in the ipv4_addr_str. Since we don't have the IPv4
608 address in sockaddr_in form, it's not worth using
609 inet_ntop() */
610 opal_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
611 sizeof(udp_listener->ipv4_addr_str),
612 cmd.ipv4_addr, cmd.netmask);
613
614 udp_listener->buffer = malloc(udp_listener->max_msg_size);
615 if (NULL == udp_listener->buffer) {
616 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
617 ABORT("Out of memory");
618 /* Will not return */
619 }
620
621 /* Create the listening socket */
622 udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
623 if (udp_listener->fd < 0) {
624 OPAL_ERROR_LOG(udp_listener->fd);
625 ABORT("Could not open listening socket");
626 /* Will not return */
627 }
628
629 /* Bind it to the designated interface */
630 struct sockaddr_in inaddr;
631 memset(&inaddr, 0, sizeof(inaddr));
632 inaddr.sin_family = AF_INET;
633 inaddr.sin_addr.s_addr = cmd.ipv4_addr;
634 inaddr.sin_port = htons(0);
635
636 ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
637 if (ret < 0) {
638 OPAL_ERROR_LOG(ret);
639 ABORT("Could not bind listening socket");
640 /* Will not return */
641 }
642
643 /* Find out the port we got */
644 opal_socklen_t addrlen = sizeof(struct sockaddr_in);
645 ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
646 if (ret < 0) {
647 OPAL_ERROR_LOG(ret);
648 ABORT("Could not get UDP port number from listening socket");
649 /* Will not return */
650 }
651 udp_listener->udp_port = ntohs(inaddr.sin_port);
652
653 opal_output_verbose(20, USNIC_OUT,
654 "usNIC connectivity agent listening on %s:%d, (%s)",
655 udp_listener->ipv4_addr_str,
656 udp_listener->udp_port,
657 udp_listener->usnic_name);
658
659 /* Set the "don't fragment" bit on outgoing frames because we
660 want MTU-sized messages to get through successfully to the
661 peer, or fail if they have to fragment because of an MTU
662 mismatch somewhere enroute */
663 int val = IP_PMTUDISC_DO;
664 ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
665 &val, sizeof(val));
666 if (0 != ret) {
667 OPAL_ERROR_LOG(ret);
668 ABORT("Unable to set \"do not fragment\" on UDP socket");
669 /* Will not return */
670 }
671
672 /* Set the send and receive buffer sizes to our MTU size */
673 int temp;
674 temp = (int) udp_listener->max_msg_size;
675 if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
676 &temp, sizeof(temp))) < 0 ||
677 (ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
678 &temp, sizeof(temp))) < 0) {
679 OPAL_ERROR_LOG(ret);
680 ABORT("Could not set socket buffer sizes");
681 /* Will not return */
682 }
683
684 /* Create a listening event */
685 opal_event_set(mca_btl_usnic_component.opal_evbase,
686 &udp_listener->event, udp_listener->fd,
687 OPAL_EV_READ | OPAL_EV_PERSIST,
688 agent_thread_receive_ping, udp_listener);
689 opal_event_add(&udp_listener->event, 0);
690
691 /* Save this listener on the list of udp_port_listeners */
692 opal_list_append(&udp_port_listeners, &udp_listener->super);
693
694 udp_listener->active = true;
695
696 /* Return the port number to the sender */
697 ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
698 cmd.ipv4_addr, udp_listener->udp_port);
699
700 /* All done! */
701 return;
702 }
703
704 /*
705 * Send a ping
706 */
agent_thread_send_ping(int fd,short flags,void * context)707 static void agent_thread_send_ping(int fd, short flags, void *context)
708 {
709 agent_ping_t *ap = (agent_ping_t*) context;
710 ap->timer_active = false;
711
712 char dest_ipv4_addr_str[IPV4STRADDRLEN];
713 opal_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
714 sizeof(dest_ipv4_addr_str),
715 ap->dest_ipv4_addr, ap->dest_netmask);
716
717 /* If we got all the ACKs for this ping, then move this ping from
718 the "pending" list to the "results" list. We can also free the
719 buffers associated with this ping result, just to save some
720 space in the long run. */
721 if (ap->acked[0] && ap->acked[1]) {
722 opal_list_remove_item(&pings_pending, &ap->super);
723 opal_list_append(&ping_results, &ap->super);
724
725 opal_output_verbose(20, USNIC_OUT,
726 "usNIC connectivity GOOD between %s <--> %s",
727 ap->listener->ipv4_addr_str,
728 dest_ipv4_addr_str);
729
730 for (int i = 0; i < 2; ++i) {
731 if (NULL != ap->buffers[i]) {
732 free(ap->buffers[i]);
733 ap->buffers[i] = NULL;
734 }
735 }
736
737 return;
738 }
739
740 /* If we've resent too many times, then just abort */
741 if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
742 char *topic;
743 if (ap->acked[0] && !ap->acked[1]) {
744 // For the show_help topic checker script
745 // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small ok, large bad"
746 topic = "connectivity error: small ok, large bad";
747 } else if (!ap->acked[0] && ap->acked[1]) {
748 // For the show_help topic checker script
749 // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large ok"
750 topic = "connectivity error: small bad, large ok";
751 } else {
752 // For the show_help topic checker script
753 // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large bad"
754 topic = "connectivity error: small bad, large bad";
755 }
756
757 char ipv4_addr_str[IPV4STRADDRLEN];
758 opal_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
759 ap->dest_ipv4_addr,
760 ap->dest_netmask);
761 opal_show_help("help-mpi-btl-usnic.txt", topic, true,
762 opal_process_info.nodename,
763 ap->listener->ipv4_addr_str,
764 ap->listener->usnic_name,
765 ap->dest_nodename,
766 ipv4_addr_str,
767 ap->sizes[0],
768 ap->sizes[1]);
769 opal_btl_usnic_exit(NULL);
770 /* Will not return */
771 }
772
773 time_t t = time(NULL);
774 opal_output_verbose(20, USNIC_OUT,
775 "usNIC connectivity pinging %s:%d (%s) from %s (%s) at %s",
776 dest_ipv4_addr_str,
777 ntohs(ap->dest_sockaddr.sin_port),
778 ap->dest_nodename,
779 ap->listener->ipv4_addr_str,
780 ap->listener->usnic_name,
781 ctime(&t));
782
783 /* Send the ping messages to the peer */
784 for (int i = 0; i < NUM_PING_SIZES; ++i) {
785 agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
786 (struct sockaddr*) &ap->dest_sockaddr);
787 }
788
789 /* Set a timer to check if these pings are ACKed */
790 opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
791 -1, 0, agent_thread_send_ping, ap);
792 opal_event_add(&ap->timer, &ack_timeout);
793 ap->timer_active = true;
794
795 /* Count how many times we've done this */
796 ++ap->num_sends;
797 }
798
799 /*
800 * Receive and process the rest of a PING command from a local IPC
801 * client.
802 */
agent_thread_cmd_ping(agent_ipc_listener_t * ipc_listener)803 static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
804 {
805 /* Read the rest of the PING command from the IPC socket */
806 int ret;
807 opal_btl_usnic_connectivity_cmd_ping_t cmd;
808 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
809 if (OPAL_SUCCESS != ret) {
810 OPAL_ERROR_LOG(ret);
811 ABORT("usnic connectivity agent IPC PING read failed");
812 /* Will not return */
813 }
814
815 /* Have we already pinged this IP address / port? */
816 agent_ping_t *ap;
817 OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
818 if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
819 ap->dest_udp_port == cmd.dest_udp_port) {
820 /* We already have results from pinging this IP address /
821 port, so there's no need for further action */
822 return;
823 }
824 }
825
826 /* Are we in the middle of pinging this IP address / port? */
827 OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
828 if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
829 ap->dest_udp_port == cmd.dest_udp_port) {
830 /* We're already in the middle of pinging this IP address
831 / port, so there's no need for further action */
832 return;
833 }
834 }
835
836 /* This is a new ping request. Find the listener with this source
837 ipv4 address */
838 bool found = false;
839 agent_udp_port_listener_t *udp_listener;
840 OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
841 agent_udp_port_listener_t) {
842 if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
843 found = true;
844 break;
845 }
846 }
847 if (!found) {
848 ABORT("Could not ping listener for ping request");
849 /* Will not return */
850 }
851
852 /* This is a new ping request; track it */
853 ap = OBJ_NEW(agent_ping_t);
854 if (NULL == ap) {
855 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
856 ABORT("Out of memory");
857 /* Will not return */
858 }
859 ap->src_ipv4_addr = cmd.src_ipv4_addr;
860 ap->src_udp_port = cmd.src_udp_port;
861 ap->listener = udp_listener;
862 ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
863 ap->dest_netmask = cmd.dest_netmask;
864 ap->dest_udp_port = cmd.dest_udp_port;
865 ap->dest_sockaddr.sin_family = AF_INET;
866 ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
867 ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
868 ap->dest_nodename = strdup(cmd.dest_nodename);
869
870 /* The first message we send will be "short" (a simple control
871 message); the second will be "long" (i.e., caller-specified
872 length) */
873 ap->sizes[0] = sizeof(agent_udp_message_t);
874
875 /* Note that the MTU is the max Ethernet frame payload. So from
876 that MTU, we have to subtract off the max IP header (e.g., if
877 all IP options are enabled, which is 60 bytes), and then also
878 subtract off the UDP header (which is 8 bytes). So we need to
879 subtract off 68 bytes from the MTU, and that's the largest ping
880 payload we can send. */
881 ap->sizes[1] = cmd.max_msg_size - 68;
882
883 /* Allocate a buffer for each size. Make sure the smallest size
884 is at least sizeof(agent_udp_message_t). */
885 agent_udp_message_t *msg;
886 for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
887 ap->buffers[i] = calloc(1, ap->sizes[i]);
888 if (NULL == ap->buffers[i]) {
889 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
890 ABORT("Out of memory");
891 /* Will not return */
892 }
893
894 /* Fill in the message with return addressing information */
895 msg = (agent_udp_message_t*) ap->buffers[i];
896 msg->message_type = AGENT_MSG_TYPE_PING;
897 msg->src_ipv4_addr = ap->src_ipv4_addr;
898 msg->src_udp_port = ap->src_udp_port;
899 msg->magic_number = MAGIC_ORIGINATOR;
900 msg->major_version = OPAL_MAJOR_VERSION;
901 msg->minor_version = OPAL_MINOR_VERSION;
902 msg->size = ap->sizes[i];
903 }
904
905 /* Save this ping request on the "pending" list */
906 opal_list_append(&pings_pending, &ap->super);
907
908 /* Send the ping */
909 agent_thread_send_ping(0, 0, ap);
910 }
911
912 /*
913 * Receive and process the rest of an UNLISTEN command from a local IPC
914 * client.
915 */
agent_thread_cmd_unlisten(agent_ipc_listener_t * ipc_listener)916 static void agent_thread_cmd_unlisten(agent_ipc_listener_t *ipc_listener)
917 {
918 /* Read the rest of the UNLISTEN command from the IPC socket */
919 int ret;
920 opal_btl_usnic_connectivity_cmd_unlisten_t cmd;
921 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
922 if (OPAL_SUCCESS != ret) {
923 OPAL_ERROR_LOG(ret);
924 ABORT("usnic connectivity agent IPC UNLISTEN read failed");
925 /* Will not return */
926 }
927
928 /* If we are listening on this address (and we should be), then
929 stop listening on it. */
930 uint32_t udp_port;
931 agent_udp_port_listener_t *udp_listener;
932 udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
933 if (NULL != udp_listener) {
934 OBJ_RELEASE(udp_listener);
935 }
936
937 /* All done! */
938 return;
939 }
940
941 /*
942 * Called when we get an incoming IPC message
943 */
agent_thread_ipc_receive(int fd,short flags,void * context)944 static void agent_thread_ipc_receive(int fd, short flags, void *context)
945 {
946 int32_t command;
947 agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
948
949 /* Read the command */
950 command = -1;
951 int ret = opal_fd_read(fd, sizeof(command), &command);
952 if (OPAL_ERR_TIMEOUT == ret) {
953 /* We get OPAL_ERR_TIMEOUT if the remote side hung up */
954 OBJ_RELEASE(ipc_listener);
955 return;
956 } else if (OPAL_SUCCESS != ret) {
957 OPAL_ERROR_LOG(ret);
958 ABORT("usnic connectivity agent IPC command read failed");
959 /* Will not return */
960 }
961
962 assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
963 CONNECTIVITY_AGENT_CMD_PING == command ||
964 CONNECTIVITY_AGENT_CMD_UNLISTEN == command);
965
966 switch (command) {
967 case CONNECTIVITY_AGENT_CMD_LISTEN:
968 agent_thread_cmd_listen(ipc_listener);
969 break;
970 case CONNECTIVITY_AGENT_CMD_PING:
971 agent_thread_cmd_ping(ipc_listener);
972 break;
973 case CONNECTIVITY_AGENT_CMD_UNLISTEN:
974 agent_thread_cmd_unlisten(ipc_listener);
975 break;
976 default:
977 ABORT("Unexpected connectivity agent command");
978 break;
979 }
980 }
981
982 /*
983 * We got a new connection on the IPC named socket. Add it to the
984 * event base.
985 */
agent_thread_accept(int fd,short flags,void * context)986 static void agent_thread_accept(int fd, short flags, void *context)
987 {
988 struct sockaddr addr;
989 socklen_t len;
990 agent_ipc_listener_t *listener = NULL;
991
992 len = sizeof(addr);
993 int client_fd = accept(fd, &addr, &len);
994 if (client_fd < 0) {
995 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
996 ABORT("accept() failed");
997 /* Will not return */
998 }
999
1000 /* If we got a good client, verify that it sent the magic token */
1001 int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
1002 char *msg = alloca(tlen + 1);
1003 if (NULL == msg) {
1004 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1005 ABORT("Out of memory");
1006 /* Will not return */
1007 }
1008 if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
1009 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1010 ABORT("usnic connectivity agent IPC read failed");
1011 /* Will not return */
1012 }
1013 if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
1014 opal_output_verbose(20, USNIC_OUT,
1015 "usNIC connectivity got bad IPC client (wrong magic token); disconnected");
1016 close(client_fd);
1017 return;
1018 }
1019
1020 /* Remember how many accepts we have successfully completed */
1021 ++ipc_accepts;
1022
1023 /* Make a listener object for this peer */
1024 listener = OBJ_NEW(agent_ipc_listener_t);
1025 listener->client_fd = client_fd;
1026
1027 /* Write back the magic token to ACK that we got the peer's
1028 magic token and all is kosher */
1029 if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
1030 CONNECTIVITY_MAGIC_TOKEN)) {
1031 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1032 ABORT("usnic connectivity agent IPC read failed");
1033 /* Will not return */
1034 }
1035
1036 /* Add this IPC listener to the event base */
1037 opal_event_set(mca_btl_usnic_component.opal_evbase,
1038 &listener->event, client_fd,
1039 OPAL_EV_READ | OPAL_EV_PERSIST,
1040 agent_thread_ipc_receive, listener);
1041 opal_event_add(&listener->event, 0);
1042
1043 /* Save this listener on the list of ipc_listeners */
1044 opal_list_append(&ipc_listeners, &listener->super);
1045
1046 listener->active = true;
1047
1048 return;
1049 }
1050
1051 /*
1052 * Tear down all active events.
1053 *
1054 * This is done as an event callback in the agent threaf so that there
1055 * is no race condition in the teardown. Specifically: the progress
1056 * thread will only fire one event at a time. Therefore, this one
1057 * event can "atomically" delete all the events and data structures
1058 * and not have to worry about concurrent access from some event
1059 * firing in the middle of the teardown process.
1060 */
agent_thread_finalize(int fd,short flags,void * context)1061 static void agent_thread_finalize(int fd, short flags, void *context)
1062 {
1063 /* Free the event that triggered this call */
1064 free(context);
1065
1066 /* Ensure that all the local IPC clients have connected to me (so
1067 that we don't shut down before someone tries to connect to me),
1068 or 10 seconds have passed (i.e., if 10 seconds pass and they
1069 don't all connect to me, then something else is wrong, and we
1070 should just give up). */
1071 static bool first = true;
1072 static time_t timestamp = 0;
1073 if (first) {
1074 timestamp = time(NULL);
1075 first = false;
1076 }
1077
1078 if (ipc_accepts < opal_process_info.num_local_peers &&
1079 time(NULL) < timestamp + 10) {
1080 opal_output_verbose(20, USNIC_OUT,
1081 "usNIC connectivity agent delaying shutdown until all clients connect...");
1082
1083 opal_event_t *ev = calloc(sizeof(*ev), 1);
1084 struct timeval finalize_retry = {
1085 .tv_sec = 0,
1086 .tv_usec = 10000
1087 };
1088
1089 opal_event_set(mca_btl_usnic_component.opal_evbase,
1090 ev, -1, 0, agent_thread_finalize, ev);
1091 opal_event_add(ev, &finalize_retry);
1092 return;
1093 }
1094 if (ipc_accepts < opal_process_info.num_local_peers) {
1095 opal_output_verbose(20, USNIC_OUT,
1096 "usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
1097 }
1098
1099 /* Remove the agent listening event from the opal async event
1100 base */
1101 opal_event_del(&ipc_event);
1102
1103 /* Shut down all active udp_port_listeners */
1104 agent_udp_port_listener_t *udp_listener, *ulnext;
1105 OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
1106 agent_udp_port_listener_t) {
1107 OBJ_RELEASE(udp_listener);
1108 }
1109
1110 /* Destroy the pending pings and ping results */
1111 agent_ping_t *request, *pnext;
1112 OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
1113 opal_list_remove_item(&pings_pending, &request->super);
1114 OBJ_RELEASE(request);
1115 }
1116
1117 OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
1118 opal_list_remove_item(&ping_results, &request->super);
1119 OBJ_RELEASE(request);
1120 }
1121
1122 /* Shut down all active ipc_listeners */
1123 agent_ipc_listener_t *ipc_listener, *inext;
1124 OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
1125 agent_ipc_listener_t) {
1126 OBJ_RELEASE(ipc_listener);
1127 }
1128
1129 agent_initialized = false;
1130 }
1131
1132 /**************************************************************************
1133 * All of the following functions run in the main application thread
1134 **************************************************************************/
1135
1136 /*
1137 * Setup the agent and start its event loop running in a dedicated
1138 * thread
1139 */
opal_btl_usnic_connectivity_agent_init(void)1140 int opal_btl_usnic_connectivity_agent_init(void)
1141 {
1142 /* Only do this initialization if I am the agent (the agent is
1143 local rank 0) */
1144 if (opal_process_info.my_local_rank != 0) {
1145 return OPAL_SUCCESS;
1146 }
1147 if (agent_initialized) {
1148 return OPAL_SUCCESS;
1149 }
1150
1151 /* Make a struct timeval for use with timer events. Note that the
1152 MCA param is expressed in terms of *milli*seconds, but the
1153 timeval timeout is expressed in terms of *micro*seconds. */
1154 ack_timeout.tv_sec =
1155 mca_btl_usnic_component.connectivity_ack_timeout / 1000;
1156 ack_timeout.tv_usec =
1157 1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
1158
1159 /* Create lists */
1160 OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
1161 OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
1162 OBJ_CONSTRUCT(&pings_pending, opal_list_t);
1163 OBJ_CONSTRUCT(&ping_results, opal_list_t);
1164
1165 /********************************************************************
1166 * Once all of the above is setup, create the unix domain socket
1167 * and start the event loop.
1168 ********************************************************************/
1169
1170 /* Create the unix domain socket in the job session directory */
1171 ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
1172 if (ipc_accept_fd < 0) {
1173 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1174 ABORT("socket() failed");
1175 /* Will not return */
1176 }
1177
1178 asprintf(&ipc_filename, "%s/%s",
1179 opal_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
1180 if (NULL == ipc_filename) {
1181 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1182 ABORT("Out of memory");
1183 /* Will not return */
1184 }
1185 unlink(ipc_filename);
1186
1187 struct sockaddr_un address;
1188 assert(strlen(ipc_filename) < sizeof(address.sun_path));
1189
1190 memset(&address, 0, sizeof(struct sockaddr_un));
1191 address.sun_family = AF_UNIX;
1192 strncpy(address.sun_path, ipc_filename, sizeof(address.sun_path) - 1);
1193
1194 if (bind(ipc_accept_fd, (struct sockaddr *) &address,
1195 sizeof(struct sockaddr_un)) != 0) {
1196 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1197 ABORT("bind() failed");
1198 /* Will not return */
1199 }
1200
1201 /* Give an arbitrarily large backlog number so that connecting
1202 clients will never be backlogged (note for Future Jeff: please
1203 don't laugh at Past Jeff if 256 has become a trivially small
1204 number of on-server procs in a single job). */
1205 if (listen(ipc_accept_fd, 256) != 0) {
1206 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1207 ABORT("listen() failed");
1208 /* Will not return */
1209 }
1210
1211 /* Add the socket to the event base */
1212 opal_event_set(mca_btl_usnic_component.opal_evbase,
1213 &ipc_event, ipc_accept_fd,
1214 OPAL_EV_READ | OPAL_EV_PERSIST,
1215 agent_thread_accept, NULL);
1216 opal_event_add(&ipc_event, 0);
1217
1218 opal_output_verbose(20, USNIC_OUT,
1219 "usNIC connectivity agent initialized");
1220 agent_initialized = true;
1221 return OPAL_SUCCESS;
1222 }
1223
1224 /*
1225 * Shut down the agent
1226 */
opal_btl_usnic_connectivity_agent_finalize(void)1227 int opal_btl_usnic_connectivity_agent_finalize(void)
1228 {
1229 /* Only do this if I have the agent running */
1230 if (!agent_initialized) {
1231 return OPAL_SUCCESS;
1232 }
1233
1234 /* Submit an event to the async thread and tell it to delete all
1235 the usNIC events. See the rationale for doing this in the
1236 comment in the agent_thread_finalize() function. */
1237 opal_event_t *ev = calloc(sizeof(*ev), 1);
1238 opal_event_set(mca_btl_usnic_component.opal_evbase,
1239 ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
1240 opal_event_active(ev, OPAL_EV_WRITE, 1);
1241
1242 /* Wait for the event to fire and complete */
1243 while (agent_initialized) {
1244 struct timespec tp = {
1245 .tv_sec = 0,
1246 .tv_nsec = 1000
1247 };
1248 nanosleep(&tp, NULL);
1249 }
1250
1251 /* Close the local IPC socket and remove the file */
1252 if (ipc_accept_fd != -1) {
1253 close(ipc_accept_fd);
1254 ipc_accept_fd = -1;
1255 }
1256 if (NULL != ipc_filename) {
1257 unlink(ipc_filename);
1258 free(ipc_filename);
1259 ipc_filename = NULL;
1260 }
1261
1262 opal_output_verbose(20, USNIC_OUT,
1263 "usNIC connectivity client finalized");
1264 return OPAL_SUCCESS;
1265 }
1266