1 /*
2 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
7 *
8 * See the COPYRIGHT file distributed with this work for additional
9 * information regarding copyright ownership.
10 */
11
12 #include <unistd.h>
13 #include <uv.h>
14
15 #include <isc/atomic.h>
16 #include <isc/barrier.h>
17 #include <isc/buffer.h>
18 #include <isc/condition.h>
19 #include <isc/errno.h>
20 #include <isc/magic.h>
21 #include <isc/mem.h>
22 #include <isc/netmgr.h>
23 #include <isc/random.h>
24 #include <isc/refcount.h>
25 #include <isc/region.h>
26 #include <isc/result.h>
27 #include <isc/sockaddr.h>
28 #include <isc/thread.h>
29 #include <isc/util.h>
30
31 #include "netmgr-int.h"
32 #include "uv-compat.h"
33
34 static isc_result_t
35 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
36 isc_sockaddr_t *peer);
37
38 static void
39 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
40 const struct sockaddr *addr, unsigned flags);
41
42 static void
43 udp_send_cb(uv_udp_send_t *req, int status);
44
45 static void
46 udp_close_cb(uv_handle_t *handle);
47
48 static void
49 timer_close_cb(uv_handle_t *handle);
50
51 static void
52 udp_close_direct(isc_nmsocket_t *sock);
53
54 static void
55 stop_udp_parent(isc_nmsocket_t *sock);
56 static void
57 stop_udp_child(isc_nmsocket_t *sock);
58
59 static uv_os_sock_t
isc__nm_udp_lb_socket(sa_family_t sa_family)60 isc__nm_udp_lb_socket(sa_family_t sa_family) {
61 isc_result_t result;
62 uv_os_sock_t sock;
63
64 result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock);
65 RUNTIME_CHECK(result == ISC_R_SUCCESS);
66
67 (void)isc__nm_socket_incoming_cpu(sock);
68 (void)isc__nm_socket_disable_pmtud(sock, sa_family);
69
70 result = isc__nm_socket_reuse(sock);
71 RUNTIME_CHECK(result == ISC_R_SUCCESS);
72
73 #if HAVE_SO_REUSEPORT_LB
74 result = isc__nm_socket_reuse_lb(sock);
75 RUNTIME_CHECK(result == ISC_R_SUCCESS);
76 #endif
77
78 return (sock);
79 }
80
81 static void
start_udp_child(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nmsocket_t * sock,uv_os_sock_t fd,int tid)82 start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
83 uv_os_sock_t fd, int tid) {
84 isc_nmsocket_t *csock;
85 isc__netievent_udplisten_t *ievent = NULL;
86
87 csock = &sock->children[tid];
88
89 isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
90 csock->parent = sock;
91 csock->iface = sock->iface;
92 atomic_init(&csock->reading, true);
93 csock->recv_cb = sock->recv_cb;
94 csock->recv_cbarg = sock->recv_cbarg;
95 csock->extrahandlesize = sock->extrahandlesize;
96 csock->tid = tid;
97
98 #if HAVE_SO_REUSEPORT_LB
99 UNUSED(fd);
100 csock->fd = isc__nm_udp_lb_socket(iface->type.sa.sa_family);
101 #else
102 csock->fd = dup(fd);
103 #endif
104 REQUIRE(csock->fd >= 0);
105
106 ievent = isc__nm_get_netievent_udplisten(mgr, csock);
107 isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
108 (isc__netievent_t *)ievent);
109 }
110
111 static void
enqueue_stoplistening(isc_nmsocket_t * sock)112 enqueue_stoplistening(isc_nmsocket_t *sock) {
113 isc__netievent_udpstop_t *ievent =
114 isc__nm_get_netievent_udpstop(sock->mgr, sock);
115 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
116 (isc__netievent_t *)ievent);
117 }
118
119 isc_result_t
isc_nm_listenudp(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nm_recv_cb_t cb,void * cbarg,size_t extrahandlesize,isc_nmsocket_t ** sockp)120 isc_nm_listenudp(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nm_recv_cb_t cb,
121 void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
122 isc_result_t result = ISC_R_SUCCESS;
123 isc_nmsocket_t *sock = NULL;
124 size_t children_size = 0;
125 REQUIRE(VALID_NM(mgr));
126 uv_os_sock_t fd = -1;
127
128 /*
129 * We are creating mgr->nworkers duplicated sockets, one
130 * socket for each worker thread.
131 */
132 sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
133 isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface);
134
135 atomic_init(&sock->rchildren, 0);
136 sock->nchildren = mgr->nworkers;
137 children_size = sock->nchildren * sizeof(sock->children[0]);
138 sock->children = isc_mem_get(mgr->mctx, children_size);
139 memset(sock->children, 0, children_size);
140
141 sock->recv_cb = cb;
142 sock->recv_cbarg = cbarg;
143 sock->extrahandlesize = extrahandlesize;
144 sock->result = ISC_R_UNSET;
145
146 sock->tid = 0;
147 sock->fd = -1;
148
149 #if !HAVE_SO_REUSEPORT_LB
150 fd = isc__nm_udp_lb_socket(iface->type.sa.sa_family);
151 #endif
152
153 isc_barrier_init(&sock->startlistening, sock->nchildren);
154
155 for (size_t i = 0; i < sock->nchildren; i++) {
156 if ((int)i == isc_nm_tid()) {
157 continue;
158 }
159 start_udp_child(mgr, iface, sock, fd, i);
160 }
161
162 if (isc__nm_in_netthread()) {
163 start_udp_child(mgr, iface, sock, fd, isc_nm_tid());
164 }
165
166 #if !HAVE_SO_REUSEPORT_LB
167 isc__nm_closesocket(fd);
168 #endif
169
170 LOCK(&sock->lock);
171 while (atomic_load(&sock->rchildren) != sock->nchildren) {
172 WAIT(&sock->cond, &sock->lock);
173 }
174 result = sock->result;
175 atomic_store(&sock->active, true);
176 UNLOCK(&sock->lock);
177
178 INSIST(result != ISC_R_UNSET);
179
180 if (result == ISC_R_SUCCESS) {
181 REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
182 *sockp = sock;
183 } else {
184 atomic_store(&sock->active, false);
185 enqueue_stoplistening(sock);
186 isc_nmsocket_close(&sock);
187 }
188
189 return (result);
190 }
191
192 /*
193 * Asynchronous 'udplisten' call handler: start listening on a UDP socket.
194 */
195 void
isc__nm_async_udplisten(isc__networker_t * worker,isc__netievent_t * ev0)196 isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
197 isc__netievent_udplisten_t *ievent = (isc__netievent_udplisten_t *)ev0;
198 isc_nmsocket_t *sock = NULL;
199 int r, uv_bind_flags = 0;
200 int uv_init_flags = 0;
201 sa_family_t sa_family;
202 isc_result_t result = ISC_R_UNSET;
203
204 REQUIRE(VALID_NMSOCK(ievent->sock));
205 REQUIRE(ievent->sock->tid == isc_nm_tid());
206 REQUIRE(VALID_NMSOCK(ievent->sock->parent));
207
208 sock = ievent->sock;
209 sa_family = sock->iface.type.sa.sa_family;
210
211 REQUIRE(sock->type == isc_nm_udpsocket);
212 REQUIRE(sock->parent != NULL);
213 REQUIRE(sock->tid == isc_nm_tid());
214
215 #ifdef UV_UDP_RECVMMSG
216 uv_init_flags |= UV_UDP_RECVMMSG;
217 #endif
218 r = uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags);
219 RUNTIME_CHECK(r == 0);
220 uv_handle_set_data(&sock->uv_handle.handle, sock);
221 /* This keeps the socket alive after everything else is gone */
222 isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
223
224 r = uv_timer_init(&worker->loop, &sock->timer);
225 RUNTIME_CHECK(r == 0);
226 uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
227
228 LOCK(&sock->parent->lock);
229
230 r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
231 if (r < 0) {
232 isc__nm_closesocket(sock->fd);
233 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
234 goto done;
235 }
236 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
237
238 if (sa_family == AF_INET6) {
239 uv_bind_flags |= UV_UDP_IPV6ONLY;
240 }
241
242 #if HAVE_SO_REUSEPORT_LB
243 r = isc_uv_udp_freebind(&sock->uv_handle.udp,
244 &sock->parent->iface.type.sa, uv_bind_flags);
245 if (r < 0) {
246 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
247 goto done;
248 }
249 #else
250 if (sock->parent->fd == -1) {
251 /* This thread is first, bind the socket */
252 r = isc_uv_udp_freebind(&sock->uv_handle.udp,
253 &sock->parent->iface.type.sa,
254 uv_bind_flags);
255 if (r < 0) {
256 isc__nm_incstats(sock->mgr,
257 sock->statsindex[STATID_BINDFAIL]);
258 goto done;
259 }
260 sock->parent->uv_handle.udp.flags = sock->uv_handle.udp.flags;
261 sock->parent->fd = sock->fd;
262 } else {
263 /* The socket is already bound, just copy the flags */
264 sock->uv_handle.udp.flags = sock->parent->uv_handle.udp.flags;
265 }
266 #endif
267
268 isc__nm_set_network_buffers(sock->mgr, &sock->uv_handle.handle);
269
270 r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
271 udp_recv_cb);
272 if (r != 0) {
273 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
274 goto done;
275 }
276
277 atomic_store(&sock->listening, true);
278
279 done:
280 result = isc__nm_uverr2result(r);
281 atomic_fetch_add(&sock->parent->rchildren, 1);
282 if (sock->parent->result == ISC_R_UNSET) {
283 sock->parent->result = result;
284 }
285 SIGNAL(&sock->parent->cond);
286 UNLOCK(&sock->parent->lock);
287
288 isc_barrier_wait(&sock->parent->startlistening);
289 }
290
291 void
isc__nm_udp_stoplistening(isc_nmsocket_t * sock)292 isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
293 REQUIRE(VALID_NMSOCK(sock));
294 REQUIRE(sock->type == isc_nm_udplistener);
295
296 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
297 true)) {
298 INSIST(0);
299 ISC_UNREACHABLE();
300 }
301
302 if (!isc__nm_in_netthread()) {
303 enqueue_stoplistening(sock);
304 } else {
305 stop_udp_parent(sock);
306 }
307 }
308
309 /*
310 * Asynchronous 'udpstop' call handler: stop listening on a UDP socket.
311 */
312 void
isc__nm_async_udpstop(isc__networker_t * worker,isc__netievent_t * ev0)313 isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
314 isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *)ev0;
315 isc_nmsocket_t *sock = ievent->sock;
316
317 UNUSED(worker);
318
319 REQUIRE(VALID_NMSOCK(sock));
320 REQUIRE(sock->tid == isc_nm_tid());
321
322 if (sock->parent != NULL) {
323 stop_udp_child(sock);
324 return;
325 }
326
327 stop_udp_parent(sock);
328 }
329
330 /*
331 * udp_recv_cb handles incoming UDP packet from uv. The buffer here is
332 * reused for a series of packets, so we need to allocate a new one.
333 * This new one can be reused to send the response then.
334 */
335 static void
udp_recv_cb(uv_udp_t * handle,ssize_t nrecv,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)336 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
337 const struct sockaddr *addr, unsigned flags) {
338 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
339 isc__nm_uvreq_t *req = NULL;
340 uint32_t maxudp;
341 bool free_buf;
342 isc_sockaddr_t sockaddr;
343 isc_result_t result;
344
345 REQUIRE(VALID_NMSOCK(sock));
346 REQUIRE(sock->tid == isc_nm_tid());
347 REQUIRE(atomic_load(&sock->reading));
348
349 #ifdef UV_UDP_MMSG_FREE
350 free_buf = ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE);
351 #elif UV_UDP_MMSG_CHUNK
352 free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0);
353 #else
354 free_buf = true;
355 UNUSED(flags);
356 #endif
357
358 /*
359 * Four possible reasons to return now without processing:
360 */
361
362 /*
363 * - If we're simulating a firewall blocking UDP packets
364 * bigger than 'maxudp' bytes for testing purposes.
365 */
366 maxudp = atomic_load(&sock->mgr->maxudp);
367 if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
368 /*
369 * We need to keep the read_cb intact in case, so the
370 * readtimeout_cb can trigger and not crash because of
371 * missing read_req.
372 */
373 goto free;
374 }
375
376 /*
377 * - If there was a networking error.
378 */
379 if (nrecv < 0) {
380 isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nrecv),
381 false);
382 goto free;
383 }
384
385 /*
386 * - If addr == NULL, in which case it's the end of stream;
387 * we can free the buffer and bail.
388 */
389 if (addr == NULL) {
390 isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
391 goto free;
392 }
393
394 /*
395 * - If the socket is no longer active.
396 */
397 if (!isc__nmsocket_active(sock)) {
398 isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
399 goto free;
400 }
401
402 result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
403 RUNTIME_CHECK(result == ISC_R_SUCCESS);
404
405 req = isc__nm_get_read_req(sock, &sockaddr);
406
407 /*
408 * The callback will be called synchronously, because result is
409 * ISC_R_SUCCESS, so we are ok of passing the buf directly.
410 */
411 req->uvbuf.base = buf->base;
412 req->uvbuf.len = nrecv;
413
414 sock->recv_read = false;
415
416 REQUIRE(!sock->processing);
417 sock->processing = true;
418 isc__nm_readcb(sock, req, ISC_R_SUCCESS);
419 sock->processing = false;
420
421 free:
422 if (free_buf) {
423 isc__nm_free_uvbuf(sock, buf);
424 }
425 }
426
427 /*
428 * Send the data in 'region' to a peer via a UDP socket. We try to find
429 * a proper sibling/child socket so that we won't have to jump to
430 * another thread.
431 */
432 void
isc__nm_udp_send(isc_nmhandle_t * handle,const isc_region_t * region,isc_nm_cb_t cb,void * cbarg)433 isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
434 isc_nm_cb_t cb, void *cbarg) {
435 isc_nmsocket_t *sock = handle->sock;
436 isc_nmsocket_t *rsock = NULL;
437 isc_sockaddr_t *peer = &handle->peer;
438 isc__nm_uvreq_t *uvreq = NULL;
439 uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
440 int ntid;
441
442 INSIST(sock->type == isc_nm_udpsocket);
443
444 /*
445 * We're simulating a firewall blocking UDP packets bigger than
446 * 'maxudp' bytes, for testing purposes.
447 *
448 * The client would ordinarily have unreferenced the handle
449 * in the callback, but that won't happen in this case, so
450 * we need to do so here.
451 */
452 if (maxudp != 0 && region->length > maxudp) {
453 isc_nmhandle_detach(&handle);
454 return;
455 }
456
457 if (atomic_load(&sock->client)) {
458 /*
459 * When we are sending from the client socket, we directly use
460 * the socket provided.
461 */
462 rsock = sock;
463 goto send;
464 } else {
465 /*
466 * When we are sending from the server socket, we either use the
467 * socket associated with the network thread we are in, or we
468 * use the thread from the socket associated with the handle.
469 */
470 INSIST(sock->parent != NULL);
471
472 if (isc__nm_in_netthread()) {
473 ntid = isc_nm_tid();
474 } else {
475 ntid = sock->tid;
476 }
477 rsock = &sock->parent->children[ntid];
478 }
479
480 send:
481 uvreq = isc__nm_uvreq_get(rsock->mgr, rsock);
482 uvreq->uvbuf.base = (char *)region->base;
483 uvreq->uvbuf.len = region->length;
484
485 isc_nmhandle_attach(handle, &uvreq->handle);
486
487 uvreq->cb.send = cb;
488 uvreq->cbarg = cbarg;
489
490 if (isc_nm_tid() == rsock->tid) {
491 REQUIRE(rsock->tid == isc_nm_tid());
492 isc__netievent_udpsend_t ievent = { .sock = rsock,
493 .req = uvreq,
494 .peer = *peer };
495
496 isc__nm_async_udpsend(NULL, (isc__netievent_t *)&ievent);
497 } else {
498 isc__netievent_udpsend_t *ievent =
499 isc__nm_get_netievent_udpsend(sock->mgr, rsock);
500 ievent->peer = *peer;
501 ievent->req = uvreq;
502
503 isc__nm_enqueue_ievent(&sock->mgr->workers[rsock->tid],
504 (isc__netievent_t *)ievent);
505 }
506 }
507
508 /*
509 * Asynchronous 'udpsend' event handler: send a packet on a UDP socket.
510 */
511 void
isc__nm_async_udpsend(isc__networker_t * worker,isc__netievent_t * ev0)512 isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
513 isc_result_t result;
514 isc__netievent_udpsend_t *ievent = (isc__netievent_udpsend_t *)ev0;
515 isc_nmsocket_t *sock = ievent->sock;
516 isc__nm_uvreq_t *uvreq = ievent->req;
517
518 REQUIRE(sock->type == isc_nm_udpsocket);
519 REQUIRE(sock->tid == isc_nm_tid());
520 UNUSED(worker);
521
522 if (isc__nmsocket_closing(sock)) {
523 isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED);
524 return;
525 }
526
527 result = udp_send_direct(sock, uvreq, &ievent->peer);
528 if (result != ISC_R_SUCCESS) {
529 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
530 isc__nm_failed_send_cb(sock, uvreq, result);
531 }
532 }
533
534 static void
udp_send_cb(uv_udp_send_t * req,int status)535 udp_send_cb(uv_udp_send_t *req, int status) {
536 isc_result_t result = ISC_R_SUCCESS;
537 isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req);
538 isc_nmsocket_t *sock = NULL;
539
540 REQUIRE(VALID_UVREQ(uvreq));
541 REQUIRE(VALID_NMHANDLE(uvreq->handle));
542
543 sock = uvreq->sock;
544
545 REQUIRE(sock->tid == isc_nm_tid());
546
547 if (status < 0) {
548 result = isc__nm_uverr2result(status);
549 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
550 }
551
552 isc__nm_sendcb(sock, uvreq, result, false);
553 }
554
555 /*
556 * udp_send_direct sends buf to a peer on a socket. Sock has to be in
557 * the same thread as the callee.
558 */
559 static isc_result_t
udp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req,isc_sockaddr_t * peer)560 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
561 isc_sockaddr_t *peer) {
562 const struct sockaddr *sa = &peer->type.sa;
563 int r;
564
565 REQUIRE(VALID_NMSOCK(sock));
566 REQUIRE(VALID_UVREQ(req));
567 REQUIRE(sock->tid == isc_nm_tid());
568 REQUIRE(sock->type == isc_nm_udpsocket);
569
570 if (isc__nmsocket_closing(sock)) {
571 return (ISC_R_CANCELED);
572 }
573
574 #if UV_VERSION_HEX >= UV_VERSION(1, 27, 0)
575 /*
576 * If we used uv_udp_connect() (and not the shim version for
577 * older versions of libuv), then the peer address has to be
578 * set to NULL or else uv_udp_send() could fail or assert,
579 * depending on the libuv version.
580 */
581 if (atomic_load(&sock->connected)) {
582 sa = NULL;
583 }
584 #endif
585
586 r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
587 &req->uvbuf, 1, sa, udp_send_cb);
588 if (r < 0) {
589 return (isc__nm_uverr2result(r));
590 }
591
592 return (ISC_R_SUCCESS);
593 }
594
595 static isc_result_t
udp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)596 udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
597 isc__networker_t *worker = NULL;
598 int uv_bind_flags = UV_UDP_REUSEADDR;
599 isc_result_t result = ISC_R_UNSET;
600 int tries = 3;
601 int r;
602
603 REQUIRE(isc__nm_in_netthread());
604 REQUIRE(sock->tid == isc_nm_tid());
605
606 worker = &sock->mgr->workers[isc_nm_tid()];
607
608 atomic_store(&sock->connecting, true);
609
610 r = uv_udp_init(&worker->loop, &sock->uv_handle.udp);
611 RUNTIME_CHECK(r == 0);
612 uv_handle_set_data(&sock->uv_handle.handle, sock);
613
614 r = uv_timer_init(&worker->loop, &sock->timer);
615 RUNTIME_CHECK(r == 0);
616 uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
617
618 if (isc__nm_closing(sock)) {
619 result = ISC_R_SHUTTINGDOWN;
620 goto error;
621 }
622
623 r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
624 if (r != 0) {
625 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
626 goto done;
627 }
628 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
629
630 if (sock->iface.type.sa.sa_family == AF_INET6) {
631 uv_bind_flags |= UV_UDP_IPV6ONLY;
632 }
633
634 r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface.type.sa,
635 uv_bind_flags);
636 if (r != 0) {
637 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
638 goto done;
639 }
640
641 isc__nm_set_network_buffers(sock->mgr, &sock->uv_handle.handle);
642
643 /*
644 * On FreeBSD the UDP connect() call sometimes results in a
645 * spurious transient EADDRINUSE. Try a few more times before
646 * giving up.
647 */
648 do {
649 r = isc_uv_udp_connect(&sock->uv_handle.udp,
650 &req->peer.type.sa);
651 } while (r == UV_EADDRINUSE && --tries > 0);
652 if (r != 0) {
653 isc__nm_incstats(sock->mgr,
654 sock->statsindex[STATID_CONNECTFAIL]);
655 goto done;
656 }
657 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
658
659 atomic_store(&sock->connecting, false);
660 atomic_store(&sock->connected, true);
661
662 done:
663 result = isc__nm_uverr2result(r);
664 error:
665
666 LOCK(&sock->lock);
667 sock->result = result;
668 SIGNAL(&sock->cond);
669 if (!atomic_load(&sock->active)) {
670 WAIT(&sock->scond, &sock->lock);
671 }
672 INSIST(atomic_load(&sock->active));
673 UNLOCK(&sock->lock);
674
675 return (result);
676 }
677
678 /*
679 * Asynchronous 'udpconnect' call handler: open a new UDP socket and
680 * call the 'open' callback with a handle.
681 */
682 void
isc__nm_async_udpconnect(isc__networker_t * worker,isc__netievent_t * ev0)683 isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
684 isc__netievent_udpconnect_t *ievent =
685 (isc__netievent_udpconnect_t *)ev0;
686 isc_nmsocket_t *sock = ievent->sock;
687 isc__nm_uvreq_t *req = ievent->req;
688 isc_result_t result;
689
690 UNUSED(worker);
691
692 REQUIRE(VALID_NMSOCK(sock));
693 REQUIRE(sock->type == isc_nm_udpsocket);
694 REQUIRE(sock->parent == NULL);
695 REQUIRE(sock->tid == isc_nm_tid());
696
697 result = udp_connect_direct(sock, req);
698 if (result != ISC_R_SUCCESS) {
699 atomic_store(&sock->active, false);
700 isc__nm_udp_close(sock);
701 isc__nm_connectcb(sock, req, result, true);
702 } else {
703 /*
704 * The callback has to be called after the socket has been
705 * initialized
706 */
707 isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true);
708 }
709
710 /*
711 * The sock is now attached to the handle.
712 */
713 isc__nmsocket_detach(&sock);
714 }
715
716 void
isc_nm_udpconnect(isc_nm_t * mgr,isc_sockaddr_t * local,isc_sockaddr_t * peer,isc_nm_cb_t cb,void * cbarg,unsigned int timeout,size_t extrahandlesize)717 isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
718 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
719 size_t extrahandlesize) {
720 isc_result_t result = ISC_R_SUCCESS;
721 isc_nmsocket_t *sock = NULL;
722 isc__netievent_udpconnect_t *event = NULL;
723 isc__nm_uvreq_t *req = NULL;
724 sa_family_t sa_family;
725
726 REQUIRE(VALID_NM(mgr));
727 REQUIRE(local != NULL);
728 REQUIRE(peer != NULL);
729
730 sa_family = peer->type.sa.sa_family;
731
732 sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
733 isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local);
734
735 sock->connect_cb = cb;
736 sock->connect_cbarg = cbarg;
737 sock->read_timeout = timeout;
738 sock->extrahandlesize = extrahandlesize;
739 sock->peer = *peer;
740 sock->result = ISC_R_UNSET;
741 atomic_init(&sock->client, true);
742
743 req = isc__nm_uvreq_get(mgr, sock);
744 req->cb.connect = cb;
745 req->cbarg = cbarg;
746 req->peer = *peer;
747 req->local = *local;
748 req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
749
750 result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock->fd);
751 if (result != ISC_R_SUCCESS) {
752 if (isc__nm_in_netthread()) {
753 sock->tid = isc_nm_tid();
754 }
755 isc__nmsocket_clearcb(sock);
756 isc__nm_connectcb(sock, req, result, true);
757 atomic_store(&sock->closed, true);
758 isc__nmsocket_detach(&sock);
759 return;
760 }
761
762 result = isc__nm_socket_reuse(sock->fd);
763 RUNTIME_CHECK(result == ISC_R_SUCCESS ||
764 result == ISC_R_NOTIMPLEMENTED);
765
766 result = isc__nm_socket_reuse_lb(sock->fd);
767 RUNTIME_CHECK(result == ISC_R_SUCCESS ||
768 result == ISC_R_NOTIMPLEMENTED);
769
770 (void)isc__nm_socket_incoming_cpu(sock->fd);
771
772 (void)isc__nm_socket_disable_pmtud(sock->fd, sa_family);
773
774 event = isc__nm_get_netievent_udpconnect(mgr, sock, req);
775
776 if (isc__nm_in_netthread()) {
777 atomic_store(&sock->active, true);
778 sock->tid = isc_nm_tid();
779 isc__nm_async_udpconnect(&mgr->workers[sock->tid],
780 (isc__netievent_t *)event);
781 isc__nm_put_netievent_udpconnect(mgr, event);
782 } else {
783 atomic_init(&sock->active, false);
784 sock->tid = isc_random_uniform(mgr->nworkers);
785 isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
786 (isc__netievent_t *)event);
787 }
788 LOCK(&sock->lock);
789 while (sock->result == ISC_R_UNSET) {
790 WAIT(&sock->cond, &sock->lock);
791 }
792 atomic_store(&sock->active, true);
793 BROADCAST(&sock->scond);
794 UNLOCK(&sock->lock);
795 }
796
797 void
isc__nm_udp_read_cb(uv_udp_t * handle,ssize_t nrecv,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)798 isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
799 const struct sockaddr *addr, unsigned flags) {
800 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
801 REQUIRE(VALID_NMSOCK(sock));
802
803 udp_recv_cb(handle, nrecv, buf, addr, flags);
804 /*
805 * If a caller calls isc_nm_read() on a listening socket, we can
806 * get here, but we MUST NOT stop reading from the listener
807 * socket. The only difference between listener and connected
808 * sockets is that the former has sock->parent set and later
809 * does not.
810 */
811 if (!sock->parent) {
812 isc__nmsocket_timer_stop(sock);
813 isc__nm_stop_reading(sock);
814 }
815 }
816
817 void
isc__nm_udp_failed_read_cb(isc_nmsocket_t * sock,isc_result_t result)818 isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
819 REQUIRE(VALID_NMSOCK(sock));
820 REQUIRE(result != ISC_R_SUCCESS);
821
822 if (atomic_load(&sock->client)) {
823 isc__nmsocket_timer_stop(sock);
824 isc__nm_stop_reading(sock);
825
826 if (!sock->recv_read) {
827 goto destroy;
828 }
829 sock->recv_read = false;
830
831 if (sock->recv_cb != NULL) {
832 isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
833 isc__nmsocket_clearcb(sock);
834 isc__nm_readcb(sock, req, result);
835 }
836
837 destroy:
838 isc__nmsocket_prep_destroy(sock);
839 return;
840 }
841
842 /*
843 * For UDP server socket, we don't have child socket via
844 * "accept", so we:
845 * - we continue to read
846 * - we don't clear the callbacks
847 * - we don't destroy it (only stoplistening could do that)
848 */
849 if (!sock->recv_read) {
850 return;
851 }
852 sock->recv_read = false;
853
854 if (sock->recv_cb != NULL) {
855 isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
856 isc__nm_readcb(sock, req, result);
857 }
858 }
859
860 /*
861 * Asynchronous 'udpread' call handler: start or resume reading on a
862 * socket; pause reading and call the 'recv' callback after each
863 * datagram.
864 */
865 void
isc__nm_async_udpread(isc__networker_t * worker,isc__netievent_t * ev0)866 isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
867 isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
868 isc_nmsocket_t *sock = ievent->sock;
869 isc_result_t result = ISC_R_SUCCESS;
870
871 UNUSED(worker);
872
873 REQUIRE(VALID_NMSOCK(sock));
874 REQUIRE(sock->tid == isc_nm_tid());
875
876 if (isc__nm_closing(sock)) {
877 result = ISC_R_SHUTTINGDOWN;
878 } else if (isc__nmsocket_closing(sock)) {
879 result = ISC_R_CANCELED;
880 }
881
882 if (result != ISC_R_SUCCESS) {
883 atomic_store(&sock->reading, true);
884 isc__nm_failed_read_cb(sock, result, false);
885 return;
886 }
887
888 isc__nm_start_reading(sock);
889 isc__nmsocket_timer_start(sock);
890 }
891
892 void
isc__nm_udp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)893 isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
894 REQUIRE(VALID_NMHANDLE(handle));
895 REQUIRE(VALID_NMSOCK(handle->sock));
896
897 isc_nmsocket_t *sock = handle->sock;
898
899 REQUIRE(sock->type == isc_nm_udpsocket);
900 REQUIRE(sock->statichandle == handle);
901 REQUIRE(!sock->recv_read);
902
903 sock->recv_cb = cb;
904 sock->recv_cbarg = cbarg;
905 sock->recv_read = true;
906
907 if (!atomic_load(&sock->reading) && sock->tid == isc_nm_tid()) {
908 isc__netievent_udpread_t ievent = { .sock = sock };
909 isc__nm_async_udpread(NULL, (isc__netievent_t *)&ievent);
910 } else {
911 isc__netievent_udpread_t *ievent =
912 isc__nm_get_netievent_udpread(sock->mgr, sock);
913 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
914 (isc__netievent_t *)ievent);
915 }
916 }
917
918 static void
udp_stop_cb(uv_handle_t * handle)919 udp_stop_cb(uv_handle_t *handle) {
920 isc_nmsocket_t *sock = uv_handle_get_data(handle);
921 uv_handle_set_data(handle, NULL);
922
923 REQUIRE(VALID_NMSOCK(sock));
924 REQUIRE(sock->tid == isc_nm_tid());
925 REQUIRE(atomic_load(&sock->closing));
926
927 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
928 true)) {
929 INSIST(0);
930 ISC_UNREACHABLE();
931 }
932
933 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
934
935 atomic_store(&sock->listening, false);
936
937 isc__nmsocket_detach(&sock);
938 }
939
940 static void
udp_close_cb(uv_handle_t * handle)941 udp_close_cb(uv_handle_t *handle) {
942 isc_nmsocket_t *sock = uv_handle_get_data(handle);
943 uv_handle_set_data(handle, NULL);
944
945 REQUIRE(VALID_NMSOCK(sock));
946 REQUIRE(sock->tid == isc_nm_tid());
947 REQUIRE(atomic_load(&sock->closing));
948
949 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
950 true)) {
951 INSIST(0);
952 ISC_UNREACHABLE();
953 }
954
955 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
956
957 if (sock->server != NULL) {
958 isc__nmsocket_detach(&sock->server);
959 }
960
961 atomic_store(&sock->connected, false);
962 atomic_store(&sock->listening, false);
963
964 isc__nmsocket_prep_destroy(sock);
965 }
966
967 static void
timer_close_cb(uv_handle_t * handle)968 timer_close_cb(uv_handle_t *handle) {
969 isc_nmsocket_t *sock = uv_handle_get_data(handle);
970 uv_handle_set_data(handle, NULL);
971
972 if (sock->parent) {
973 uv_close(&sock->uv_handle.handle, udp_stop_cb);
974 } else {
975 uv_close(&sock->uv_handle.handle, udp_close_cb);
976 }
977 }
978
979 static void
stop_udp_child(isc_nmsocket_t * sock)980 stop_udp_child(isc_nmsocket_t *sock) {
981 REQUIRE(sock->type == isc_nm_udpsocket);
982 REQUIRE(sock->tid == isc_nm_tid());
983
984 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
985 true)) {
986 return;
987 }
988
989 udp_close_direct(sock);
990
991 atomic_fetch_sub(&sock->parent->rchildren, 1);
992
993 isc_barrier_wait(&sock->parent->stoplistening);
994 }
995
996 static void
stop_udp_parent(isc_nmsocket_t * sock)997 stop_udp_parent(isc_nmsocket_t *sock) {
998 isc_nmsocket_t *csock = NULL;
999
1000 REQUIRE(VALID_NMSOCK(sock));
1001 REQUIRE(sock->tid == isc_nm_tid());
1002 REQUIRE(sock->type == isc_nm_udplistener);
1003
1004 isc_barrier_init(&sock->stoplistening, sock->nchildren);
1005
1006 for (size_t i = 0; i < sock->nchildren; i++) {
1007 csock = &sock->children[i];
1008 REQUIRE(VALID_NMSOCK(csock));
1009
1010 if ((int)i == isc_nm_tid()) {
1011 /*
1012 * We need to schedule closing the other sockets first
1013 */
1014 continue;
1015 }
1016
1017 atomic_store(&csock->active, false);
1018 enqueue_stoplistening(csock);
1019 }
1020
1021 csock = &sock->children[isc_nm_tid()];
1022 atomic_store(&csock->active, false);
1023 stop_udp_child(csock);
1024
1025 atomic_store(&sock->closed, true);
1026 isc__nmsocket_prep_destroy(sock);
1027 }
1028
1029 static void
udp_close_direct(isc_nmsocket_t * sock)1030 udp_close_direct(isc_nmsocket_t *sock) {
1031 REQUIRE(VALID_NMSOCK(sock));
1032 REQUIRE(sock->tid == isc_nm_tid());
1033
1034 uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
1035 }
1036
1037 void
isc__nm_async_udpclose(isc__networker_t * worker,isc__netievent_t * ev0)1038 isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
1039 isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0;
1040 isc_nmsocket_t *sock = ievent->sock;
1041
1042 REQUIRE(VALID_NMSOCK(sock));
1043 REQUIRE(sock->tid == isc_nm_tid());
1044 UNUSED(worker);
1045
1046 udp_close_direct(sock);
1047 }
1048
1049 void
isc__nm_udp_close(isc_nmsocket_t * sock)1050 isc__nm_udp_close(isc_nmsocket_t *sock) {
1051 REQUIRE(VALID_NMSOCK(sock));
1052 REQUIRE(sock->type == isc_nm_udpsocket);
1053 REQUIRE(!isc__nmsocket_active(sock));
1054
1055 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1056 true)) {
1057 return;
1058 }
1059
1060 if (sock->tid == isc_nm_tid()) {
1061 udp_close_direct(sock);
1062 } else {
1063 isc__netievent_udpclose_t *ievent =
1064 isc__nm_get_netievent_udpclose(sock->mgr, sock);
1065 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1066 (isc__netievent_t *)ievent);
1067 }
1068 }
1069
1070 void
isc__nm_udp_shutdown(isc_nmsocket_t * sock)1071 isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
1072 REQUIRE(VALID_NMSOCK(sock));
1073 REQUIRE(sock->tid == isc_nm_tid());
1074 REQUIRE(sock->type == isc_nm_udpsocket);
1075
1076 /*
1077 * If the socket is active, mark it inactive and
1078 * continue. If it isn't active, stop now.
1079 */
1080 if (!isc__nmsocket_deactivate(sock)) {
1081 return;
1082 }
1083
1084 /*
1085 * If the socket is connecting, the cancel will happen in the
1086 * async_udpconnect() due socket being inactive now.
1087 */
1088 if (atomic_load(&sock->connecting)) {
1089 return;
1090 }
1091
1092 /*
1093 * When the client detaches the last handle, the
1094 * sock->statichandle would be NULL, in that case, nobody is
1095 * interested in the callback.
1096 */
1097 if (sock->statichandle != NULL) {
1098 if (isc__nm_closing(sock)) {
1099 isc__nm_failed_read_cb(sock, ISC_R_SHUTTINGDOWN, false);
1100 } else {
1101 isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
1102 }
1103 return;
1104 }
1105
1106 /*
1107 * Otherwise, we just send the socket to abyss...
1108 */
1109 if (sock->parent == NULL) {
1110 isc__nmsocket_prep_destroy(sock);
1111 }
1112 }
1113
1114 void
isc__nm_udp_cancelread(isc_nmhandle_t * handle)1115 isc__nm_udp_cancelread(isc_nmhandle_t *handle) {
1116 isc_nmsocket_t *sock = NULL;
1117 isc__netievent_udpcancel_t *ievent = NULL;
1118
1119 REQUIRE(VALID_NMHANDLE(handle));
1120
1121 sock = handle->sock;
1122
1123 REQUIRE(VALID_NMSOCK(sock));
1124 REQUIRE(sock->type == isc_nm_udpsocket);
1125
1126 ievent = isc__nm_get_netievent_udpcancel(sock->mgr, sock, handle);
1127
1128 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1129 (isc__netievent_t *)ievent);
1130 }
1131
1132 void
isc__nm_async_udpcancel(isc__networker_t * worker,isc__netievent_t * ev0)1133 isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
1134 isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0;
1135 isc_nmsocket_t *sock = NULL;
1136
1137 UNUSED(worker);
1138
1139 REQUIRE(VALID_NMSOCK(ievent->sock));
1140
1141 sock = ievent->sock;
1142
1143 REQUIRE(sock->tid == isc_nm_tid());
1144 REQUIRE(atomic_load(&sock->client));
1145
1146 isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
1147 }
1148