1 /*
2 * Copyright (c) 2014 Intel Corporation, Inc. All rights reserved.
3 * Copyright (c) 2017 DataDirect Networks, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #include "config.h"
35
36 #include <sys/types.h>
37 #include <stdlib.h>
38 #include <stdio.h>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <netdb.h>
43 #include <netinet/in.h>
44 #include <netinet/ip.h>
45 #include <netinet/tcp.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <net/if.h>
50 #include <ifaddrs.h>
51 #include <poll.h>
52 #include <limits.h>
53
54 #include "sock.h"
55 #include "sock_util.h"
56 #include "ofi_file.h"
57
58 #define SOCK_LOG_DBG(...) _SOCK_LOG_DBG(FI_LOG_EP_CTRL, __VA_ARGS__)
59 #define SOCK_LOG_ERROR(...) _SOCK_LOG_ERROR(FI_LOG_EP_CTRL, __VA_ARGS__)
60
sock_conn_send_src_addr(struct sock_ep_attr * ep_attr,struct sock_tx_ctx * tx_ctx,struct sock_conn * conn)61 ssize_t sock_conn_send_src_addr(struct sock_ep_attr *ep_attr, struct sock_tx_ctx *tx_ctx,
62 struct sock_conn *conn)
63 {
64 int ret;
65 uint64_t total_len;
66 struct sock_op tx_op = { 0 };
67
68 tx_op.op = SOCK_OP_CONN_MSG;
69 SOCK_LOG_DBG("New conn msg on TX: %p using conn: %p\n", tx_ctx, conn);
70
71 total_len = 0;
72 tx_op.src_iov_len = sizeof(union ofi_sock_ip);
73 total_len = tx_op.src_iov_len + sizeof(struct sock_op_send);
74
75 sock_tx_ctx_start(tx_ctx);
76 if (ofi_rbavail(&tx_ctx->rb) < total_len) {
77 ret = -FI_EAGAIN;
78 goto err;
79 }
80
81 sock_tx_ctx_write_op_send(tx_ctx, &tx_op, 0, (uintptr_t) NULL, 0, 0,
82 ep_attr, conn);
83
84 ofi_straddr_dbg(&sock_prov, FI_LOG_EP_CTRL, "sending src addr: ",
85 ep_attr->src_addr);
86 sock_tx_ctx_write(tx_ctx, ep_attr->src_addr, sizeof(union ofi_sock_ip));
87 sock_tx_ctx_commit(tx_ctx);
88 conn->address_published = 1;
89 return 0;
90
91 err:
92 sock_tx_ctx_abort(tx_ctx);
93 return ret;
94 }
95
sock_conn_map_init(struct sock_ep * ep,int init_size)96 int sock_conn_map_init(struct sock_ep *ep, int init_size)
97 {
98 struct sock_conn_map *map = &ep->attr->cmap;
99 int ret;
100 map->table = calloc(init_size, sizeof(*map->table));
101 if (!map->table)
102 return -FI_ENOMEM;
103
104 map->epoll_ctxs = calloc(init_size, sizeof(*map->epoll_ctxs));
105 if (!map->epoll_ctxs)
106 goto err1;
107
108 ret = ofi_epoll_create(&map->epoll_set);
109 if (ret < 0) {
110 SOCK_LOG_ERROR("failed to create epoll set, "
111 "error - %d (%s)\n", ret,
112 strerror(ret));
113 goto err2;
114 }
115
116 fastlock_init(&map->lock);
117 map->used = 0;
118 map->size = init_size;
119 return 0;
120
121 err2:
122 free(map->epoll_ctxs);
123 err1:
124 free(map->table);
125 return -FI_ENOMEM;
126 }
127
sock_conn_map_increase(struct sock_conn_map * map,int new_size)128 static int sock_conn_map_increase(struct sock_conn_map *map, int new_size)
129 {
130 void *_table;
131
132 _table = realloc(map->table, new_size * sizeof(*map->table));
133 if (!_table) {
134 SOCK_LOG_ERROR("*** realloc failed, use FI_SOCKETS_DEF_CONN_MAP_SZ for"
135 "specifying conn-map-size\n");
136 return -FI_ENOMEM;
137 }
138
139 map->size = new_size;
140 map->table = _table;
141 return 0;
142 }
143
sock_conn_map_destroy(struct sock_ep_attr * ep_attr)144 void sock_conn_map_destroy(struct sock_ep_attr *ep_attr)
145 {
146 int i;
147 struct sock_conn_map *cmap = &ep_attr->cmap;
148 for (i = 0; i < cmap->used; i++) {
149 if (cmap->table[i].sock_fd != -1) {
150 sock_pe_poll_del(ep_attr->domain->pe, cmap->table[i].sock_fd);
151 sock_conn_release_entry(cmap, &cmap->table[i]);
152 }
153 }
154 free(cmap->table);
155 cmap->table = NULL;
156 free(cmap->epoll_ctxs);
157 cmap->epoll_ctxs = NULL;
158 cmap->epoll_ctxs_sz = 0;
159 cmap->used = cmap->size = 0;
160 ofi_epoll_close(cmap->epoll_set);
161 fastlock_destroy(&cmap->lock);
162 }
163
sock_conn_release_entry(struct sock_conn_map * map,struct sock_conn * conn)164 void sock_conn_release_entry(struct sock_conn_map *map, struct sock_conn *conn)
165 {
166 ofi_epoll_del(map->epoll_set, conn->sock_fd);
167 ofi_close_socket(conn->sock_fd);
168
169 conn->address_published = 0;
170 conn->connected = 0;
171 conn->sock_fd = -1;
172 }
173
sock_conn_get_next_index(struct sock_conn_map * map)174 static int sock_conn_get_next_index(struct sock_conn_map *map)
175 {
176 int i;
177 for (i = 0; i < map->size; i++) {
178 if (map->table[i].sock_fd == -1)
179 return i;
180 }
181 return -1;
182 }
183
sock_conn_map_insert(struct sock_ep_attr * ep_attr,union ofi_sock_ip * addr,int conn_fd,int addr_published)184 static struct sock_conn *sock_conn_map_insert(struct sock_ep_attr *ep_attr,
185 union ofi_sock_ip *addr, int conn_fd,
186 int addr_published)
187 {
188 int index;
189 struct sock_conn_map *map = &ep_attr->cmap;
190
191 if (map->size == map->used) {
192 index = sock_conn_get_next_index(map);
193 if (index < 0) {
194 if (sock_conn_map_increase(map, map->size * 2))
195 return NULL;
196 index = map->used;
197 map->used++;
198 }
199 } else {
200 index = map->used;
201 map->used++;
202 }
203
204 map->table[index].av_index = FI_ADDR_NOTAVAIL;
205 map->table[index].connected = 1;
206 map->table[index].addr = *addr;
207 map->table[index].sock_fd = conn_fd;
208 map->table[index].ep_attr = ep_attr;
209 sock_set_sockopts(conn_fd, SOCK_OPTS_NONBLOCK |
210 (ep_attr->ep_type == FI_EP_MSG ?
211 SOCK_OPTS_KEEPALIVE : 0));
212
213 if (ofi_epoll_add(map->epoll_set, conn_fd, OFI_EPOLL_IN, &map->table[index]))
214 SOCK_LOG_ERROR("failed to add to epoll set: %d\n", conn_fd);
215
216 map->table[index].address_published = addr_published;
217 sock_pe_poll_add(ep_attr->domain->pe, conn_fd);
218 return &map->table[index];
219 }
220
fd_set_nonblock(int fd)221 int fd_set_nonblock(int fd)
222 {
223 int ret;
224
225 ret = fi_fd_nonblock(fd);
226 if (ret)
227 SOCK_LOG_ERROR("fi_fd_nonblock failed, errno: %d\n",
228 ret);
229
230 return ret;
231 }
232
233 #if !defined __APPLE__ && !defined _WIN32
sock_set_sockopt_keepalive(int sock)234 void sock_set_sockopt_keepalive(int sock)
235 {
236 int optval;
237
238 /* Keepalive is disabled: now leave */
239 if (!sock_keepalive_enable)
240 return;
241
242 optval = 1;
243 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)))
244 SOCK_LOG_ERROR("setsockopt keepalive enable failed: %s\n",
245 strerror(errno));
246
247 if (sock_keepalive_time != INT_MAX) {
248 optval = sock_keepalive_time;
249 if (setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)))
250 SOCK_LOG_ERROR("setsockopt keepalive time failed: %s\n",
251 strerror(errno));
252 }
253
254 if (sock_keepalive_intvl != INT_MAX) {
255 optval = sock_keepalive_intvl;
256 if (setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)))
257 SOCK_LOG_ERROR("setsockopt keepalive intvl failed: %s\n",
258 strerror(errno));
259 }
260
261 if (sock_keepalive_probes != INT_MAX) {
262 optval = sock_keepalive_probes;
263 if (setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval)))
264 SOCK_LOG_ERROR("setsockopt keepalive intvl failed: %s\n",
265 strerror(errno));
266 }
267 }
268 #else
269 #define sock_set_sockopt_keepalive(sock) do {} while (0)
270 #endif
271
sock_set_sockopt_reuseaddr(int sock)272 static void sock_set_sockopt_reuseaddr(int sock)
273 {
274 int optval;
275 optval = 1;
276 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)))
277 SOCK_LOG_ERROR("setsockopt reuseaddr failed\n");
278 }
279
sock_set_sockopts(int sock,int sock_opts)280 void sock_set_sockopts(int sock, int sock_opts)
281 {
282 int optval;
283 optval = 1;
284
285 sock_set_sockopt_reuseaddr(sock);
286 if (sock_opts & SOCK_OPTS_KEEPALIVE)
287 sock_set_sockopt_keepalive(sock);
288 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)))
289 SOCK_LOG_ERROR("setsockopt nodelay failed\n");
290
291 if (sock_opts & SOCK_OPTS_NONBLOCK)
292 fd_set_nonblock(sock);
293 }
294
sock_conn_stop_listener_thread(struct sock_conn_listener * conn_listener)295 int sock_conn_stop_listener_thread(struct sock_conn_listener *conn_listener)
296 {
297 conn_listener->do_listen = 0;
298
299 fastlock_acquire(&conn_listener->signal_lock);
300 fd_signal_set(&conn_listener->signal);
301 fastlock_release(&conn_listener->signal_lock);
302
303 if (conn_listener->listener_thread &&
304 pthread_join(conn_listener->listener_thread, NULL)) {
305 SOCK_LOG_DBG("pthread join failed\n");
306 }
307
308 fd_signal_free(&conn_listener->signal);
309 ofi_epoll_close(conn_listener->emap);
310 fastlock_destroy(&conn_listener->signal_lock);
311
312 return 0;
313 }
314
sock_conn_listener_thread(void * arg)315 static void *sock_conn_listener_thread(void *arg)
316 {
317 struct sock_conn_listener *conn_listener = arg;
318 struct sock_conn_handle *conn_handle;
319 void *ep_contexts[SOCK_EPOLL_WAIT_EVENTS];
320 struct sock_ep_attr *ep_attr;
321 int num_fds, i, conn_fd;
322 union ofi_sock_ip remote;
323 socklen_t addr_size;
324
325 while (conn_listener->do_listen) {
326 num_fds = ofi_epoll_wait(conn_listener->emap, ep_contexts,
327 SOCK_EPOLL_WAIT_EVENTS, -1);
328 if (num_fds < 0) {
329 SOCK_LOG_ERROR("poll failed : %s\n", strerror(errno));
330 continue;
331 }
332
333 fastlock_acquire(&conn_listener->signal_lock);
334 for (i = 0; i < num_fds; i++) {
335 conn_handle = ep_contexts[i];
336
337 if (conn_handle == NULL) { /* signal event */
338 fd_signal_reset(&conn_listener->signal);
339 continue;
340 }
341
342 memset(&remote, 0, sizeof remote);
343 addr_size = sizeof(remote);
344 conn_fd = accept(conn_handle->sock, &remote.sa,
345 &addr_size);
346 SOCK_LOG_DBG("CONN: accepted conn-req: %d\n", conn_fd);
347 ofi_straddr_dbg(&sock_prov, FI_LOG_EP_CTRL,
348 "accepted peer addr: ", &remote.sa);
349
350 if (conn_fd < 0) {
351 SOCK_LOG_ERROR("failed to accept: %s\n",
352 strerror(ofi_sockerr()));
353 continue;
354 }
355
356 ep_attr = container_of(conn_handle, struct sock_ep_attr, conn_handle);
357 fastlock_acquire(&ep_attr->cmap.lock);
358 sock_conn_map_insert(ep_attr, &remote, conn_fd, 1);
359 fastlock_release(&ep_attr->cmap.lock);
360 sock_pe_signal(ep_attr->domain->pe);
361 }
362 fastlock_release(&conn_listener->signal_lock);
363 }
364
365 return NULL;
366 }
367
sock_conn_start_listener_thread(struct sock_conn_listener * conn_listener)368 int sock_conn_start_listener_thread(struct sock_conn_listener *conn_listener)
369 {
370 int ret;
371
372 fastlock_init(&conn_listener->signal_lock);
373
374 ret = ofi_epoll_create(&conn_listener->emap);
375 if (ret < 0) {
376 SOCK_LOG_ERROR("failed to create epoll set\n");
377 goto err1;
378 }
379
380 ret = fd_signal_init(&conn_listener->signal);
381 if (ret < 0) {
382 SOCK_LOG_ERROR("failed to init signal\n");
383 goto err2;
384 }
385
386 ret = ofi_epoll_add(conn_listener->emap,
387 conn_listener->signal.fd[FI_READ_FD],
388 OFI_EPOLL_IN, NULL);
389 if (ret != 0){
390 SOCK_LOG_ERROR("failed to add signal fd to epoll\n");
391 goto err3;
392 }
393
394 conn_listener->do_listen = 1;
395 ret = pthread_create(&conn_listener->listener_thread, NULL,
396 sock_conn_listener_thread, conn_listener);
397 if (ret < 0) {
398 SOCK_LOG_ERROR("failed to create conn listener thread\n");
399 goto err3;
400 }
401 return 0;
402
403 err3:
404 conn_listener->do_listen = 0;
405 fd_signal_free(&conn_listener->signal);
406 err2:
407 ofi_epoll_close(conn_listener->emap);
408 err1:
409 fastlock_destroy(&conn_listener->signal_lock);
410 return ret;
411 }
412
sock_conn_listen(struct sock_ep_attr * ep_attr)413 int sock_conn_listen(struct sock_ep_attr *ep_attr)
414 {
415 int listen_fd, ret;
416 socklen_t addr_size;
417 union ofi_sock_ip addr;
418 struct sock_conn_handle *conn_handle = &ep_attr->conn_handle;
419
420 listen_fd = ofi_socket(ep_attr->src_addr->sa.sa_family,
421 SOCK_STREAM, IPPROTO_TCP);
422 if (listen_fd == INVALID_SOCKET)
423 return -ofi_sockerr();
424
425 sock_set_sockopts(listen_fd, SOCK_OPTS_NONBLOCK);
426
427 addr = *ep_attr->src_addr;
428 if (ep_attr->ep_type == FI_EP_MSG)
429 ofi_addr_set_port(&addr.sa, 0);
430
431 ret = bind(listen_fd, &addr.sa, ofi_sizeofaddr(&addr.sa));
432 if (ret) {
433 SOCK_LOG_ERROR("failed to bind listener: %s\n",
434 strerror(ofi_sockerr()));
435 ofi_straddr_log(&sock_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
436 "bind failed to addr: ", &addr.sa);
437 ret = -ofi_sockerr();
438 goto err;
439 }
440
441 addr_size = sizeof(addr);
442 ret = ofi_getsockname(listen_fd, &addr.sa, &addr_size);
443 if (ret) {
444 ret = -ofi_sockerr();
445 goto err;
446 }
447
448 ep_attr->msg_src_port = ofi_addr_get_port(&addr.sa);
449 if (!ofi_addr_get_port(&ep_attr->src_addr->sa))
450 ofi_addr_set_port(&ep_attr->src_addr->sa, ep_attr->msg_src_port);
451
452 ofi_straddr_dbg(&sock_prov, FI_LOG_EP_CTRL, "listening at addr: ",
453 &addr.sa);
454 ret = listen(listen_fd, sock_cm_def_map_sz);
455 if (ret) {
456 SOCK_LOG_ERROR("failed to listen socket: %s\n",
457 strerror(ofi_sockerr()));
458 ret = -ofi_sockerr();
459 goto err;
460 }
461
462 conn_handle->sock = listen_fd;
463 conn_handle->do_listen = 1;
464
465 fastlock_acquire(&ep_attr->domain->conn_listener.signal_lock);
466 ret = ofi_epoll_add(ep_attr->domain->conn_listener.emap,
467 conn_handle->sock, OFI_EPOLL_IN, conn_handle);
468 fd_signal_set(&ep_attr->domain->conn_listener.signal);
469 fastlock_release(&ep_attr->domain->conn_listener.signal_lock);
470 if (ret) {
471 SOCK_LOG_ERROR("failed to add fd to pollset: %d\n", ret);
472 goto err;
473 }
474
475 return 0;
476 err:
477 if (listen_fd != INVALID_SOCKET) {
478 ofi_close_socket(listen_fd);
479 conn_handle->sock = INVALID_SOCKET;
480 conn_handle->do_listen = 0;
481 }
482
483 return ret;
484 }
485
sock_ep_connect(struct sock_ep_attr * ep_attr,fi_addr_t index,struct sock_conn ** conn)486 int sock_ep_connect(struct sock_ep_attr *ep_attr, fi_addr_t index,
487 struct sock_conn **conn)
488 {
489 int conn_fd = -1, ret;
490 int do_retry = sock_conn_retry;
491 struct sock_conn *new_conn;
492 union ofi_sock_ip addr;
493 socklen_t lon;
494 int valopt = 0;
495 struct pollfd poll_fd;
496
497 if (ep_attr->ep_type == FI_EP_MSG) {
498 /* Need to check that destination address has been
499 passed to endpoint */
500 assert(ep_attr->dest_addr);
501 addr = *ep_attr->dest_addr;
502 ofi_addr_set_port(&addr.sa, ep_attr->msg_dest_port);
503 } else {
504 fastlock_acquire(&ep_attr->av->table_lock);
505 addr = ep_attr->av->table[index].addr;
506 fastlock_release(&ep_attr->av->table_lock);
507 }
508
509 do_connect:
510 fastlock_acquire(&ep_attr->cmap.lock);
511 *conn = sock_ep_lookup_conn(ep_attr, index, &addr);
512 fastlock_release(&ep_attr->cmap.lock);
513
514 if (*conn != SOCK_CM_CONN_IN_PROGRESS)
515 return FI_SUCCESS;
516
517 conn_fd = ofi_socket(addr.sa.sa_family, SOCK_STREAM, 0);
518 if (conn_fd == -1) {
519 SOCK_LOG_ERROR("failed to create conn_fd, errno: %d\n",
520 ofi_sockerr());
521 *conn = NULL;
522 return -FI_EOTHER;
523 }
524
525 ret = fd_set_nonblock(conn_fd);
526 if (ret) {
527 SOCK_LOG_ERROR("failed to set conn_fd nonblocking\n");
528 *conn = NULL;
529 ofi_close_socket(conn_fd);
530 return -FI_EOTHER;
531 }
532
533 ofi_straddr_dbg(&sock_prov, FI_LOG_EP_CTRL, "connecting to addr: ",
534 &addr.sa);
535 ret = connect(conn_fd, &addr.sa, ofi_sizeofaddr(&addr.sa));
536 if (ret < 0) {
537 if (OFI_SOCK_TRY_CONN_AGAIN(ofi_sockerr())) {
538 poll_fd.fd = conn_fd;
539 poll_fd.events = POLLOUT;
540
541 ret = poll(&poll_fd, 1, sock_conn_timeout);
542 if (ret < 0) {
543 SOCK_LOG_DBG("poll failed\n");
544 goto retry;
545 }
546
547 lon = sizeof(int);
548 ret = getsockopt(conn_fd, SOL_SOCKET, SO_ERROR,
549 (void*)(&valopt), &lon);
550 if (ret < 0) {
551 SOCK_LOG_DBG("getsockopt failed: %d, %d\n",
552 ret, conn_fd);
553 goto retry;
554 }
555
556 if (valopt) {
557 SOCK_LOG_DBG("Error in connection() "
558 "%d - %s - %d\n",
559 valopt, strerror(valopt), conn_fd);
560 goto retry;
561 }
562 goto out;
563 } else {
564 SOCK_LOG_DBG("Timeout or error() - %s: %d\n",
565 strerror(ofi_sockerr()), conn_fd);
566 goto retry;
567 }
568 } else {
569 goto out;
570 }
571
572 retry:
573 do_retry--;
574 if (!do_retry)
575 goto err;
576
577 if (conn_fd != -1) {
578 ofi_close_socket(conn_fd);
579 conn_fd = -1;
580 }
581
582 SOCK_LOG_ERROR("Connect error, retrying - %s - %d\n",
583 strerror(ofi_sockerr()), conn_fd);
584 goto do_connect;
585
586 out:
587 fastlock_acquire(&ep_attr->cmap.lock);
588 new_conn = sock_conn_map_insert(ep_attr, &addr, conn_fd, 0);
589 if (!new_conn) {
590 fastlock_release(&ep_attr->cmap.lock);
591 goto err;
592 }
593 new_conn->av_index = (ep_attr->ep_type == FI_EP_MSG) ?
594 FI_ADDR_NOTAVAIL : index;
595 *conn = ofi_idm_lookup(&ep_attr->av_idm, index);
596 if (*conn == SOCK_CM_CONN_IN_PROGRESS) {
597 if (ofi_idm_set(&ep_attr->av_idm, index, new_conn) < 0)
598 SOCK_LOG_ERROR("ofi_idm_set failed\n");
599 *conn = new_conn;
600 }
601 fastlock_release(&ep_attr->cmap.lock);
602 return FI_SUCCESS;
603
604 err:
605 ofi_close_socket(conn_fd);
606 *conn = NULL;
607 return (OFI_SOCK_TRY_CONN_AGAIN(ofi_sockerr()) ? -FI_EAGAIN :
608 -ofi_sockerr());
609 }
610