1 /*
2  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
7  *
8  * See the COPYRIGHT file distributed with this work for additional
9  * information regarding copyright ownership.
10  */
11 
12 #include <unistd.h>
13 #include <uv.h>
14 
15 #include <isc/atomic.h>
16 #include <isc/barrier.h>
17 #include <isc/buffer.h>
18 #include <isc/condition.h>
19 #include <isc/errno.h>
20 #include <isc/magic.h>
21 #include <isc/mem.h>
22 #include <isc/netmgr.h>
23 #include <isc/random.h>
24 #include <isc/refcount.h>
25 #include <isc/region.h>
26 #include <isc/result.h>
27 #include <isc/sockaddr.h>
28 #include <isc/thread.h>
29 #include <isc/util.h>
30 
31 #include "netmgr-int.h"
32 #include "uv-compat.h"
33 
34 static isc_result_t
35 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
36 		isc_sockaddr_t *peer);
37 
38 static void
39 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
40 	    const struct sockaddr *addr, unsigned flags);
41 
42 static void
43 udp_send_cb(uv_udp_send_t *req, int status);
44 
45 static void
46 udp_close_cb(uv_handle_t *handle);
47 
48 static void
49 timer_close_cb(uv_handle_t *handle);
50 
51 static void
52 udp_close_direct(isc_nmsocket_t *sock);
53 
54 static void
55 stop_udp_parent(isc_nmsocket_t *sock);
56 static void
57 stop_udp_child(isc_nmsocket_t *sock);
58 
59 static uv_os_sock_t
isc__nm_udp_lb_socket(sa_family_t sa_family)60 isc__nm_udp_lb_socket(sa_family_t sa_family) {
61 	isc_result_t result;
62 	uv_os_sock_t sock;
63 
64 	result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock);
65 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
66 
67 	(void)isc__nm_socket_incoming_cpu(sock);
68 	(void)isc__nm_socket_disable_pmtud(sock, sa_family);
69 
70 	result = isc__nm_socket_reuse(sock);
71 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
72 
73 #if HAVE_SO_REUSEPORT_LB
74 	result = isc__nm_socket_reuse_lb(sock);
75 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
76 #endif
77 
78 	return (sock);
79 }
80 
81 static void
start_udp_child(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nmsocket_t * sock,uv_os_sock_t fd,int tid)82 start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
83 		uv_os_sock_t fd, int tid) {
84 	isc_nmsocket_t *csock;
85 	isc__netievent_udplisten_t *ievent = NULL;
86 
87 	csock = &sock->children[tid];
88 
89 	isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
90 	csock->parent = sock;
91 	csock->iface = sock->iface;
92 	atomic_init(&csock->reading, true);
93 	csock->recv_cb = sock->recv_cb;
94 	csock->recv_cbarg = sock->recv_cbarg;
95 	csock->extrahandlesize = sock->extrahandlesize;
96 	csock->tid = tid;
97 
98 #if HAVE_SO_REUSEPORT_LB
99 	UNUSED(fd);
100 	csock->fd = isc__nm_udp_lb_socket(iface->type.sa.sa_family);
101 #else
102 	csock->fd = dup(fd);
103 #endif
104 	REQUIRE(csock->fd >= 0);
105 
106 	ievent = isc__nm_get_netievent_udplisten(mgr, csock);
107 	isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
108 				     (isc__netievent_t *)ievent);
109 }
110 
111 static void
enqueue_stoplistening(isc_nmsocket_t * sock)112 enqueue_stoplistening(isc_nmsocket_t *sock) {
113 	isc__netievent_udpstop_t *ievent =
114 		isc__nm_get_netievent_udpstop(sock->mgr, sock);
115 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
116 			       (isc__netievent_t *)ievent);
117 }
118 
119 isc_result_t
isc_nm_listenudp(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nm_recv_cb_t cb,void * cbarg,size_t extrahandlesize,isc_nmsocket_t ** sockp)120 isc_nm_listenudp(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nm_recv_cb_t cb,
121 		 void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
122 	isc_result_t result = ISC_R_SUCCESS;
123 	isc_nmsocket_t *sock = NULL;
124 	size_t children_size = 0;
125 	REQUIRE(VALID_NM(mgr));
126 	uv_os_sock_t fd = -1;
127 
128 	/*
129 	 * We are creating mgr->nworkers duplicated sockets, one
130 	 * socket for each worker thread.
131 	 */
132 	sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
133 	isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface);
134 
135 	atomic_init(&sock->rchildren, 0);
136 	sock->nchildren = mgr->nworkers;
137 	children_size = sock->nchildren * sizeof(sock->children[0]);
138 	sock->children = isc_mem_get(mgr->mctx, children_size);
139 	memset(sock->children, 0, children_size);
140 
141 	sock->recv_cb = cb;
142 	sock->recv_cbarg = cbarg;
143 	sock->extrahandlesize = extrahandlesize;
144 	sock->result = ISC_R_UNSET;
145 
146 	sock->tid = 0;
147 	sock->fd = -1;
148 
149 #if !HAVE_SO_REUSEPORT_LB
150 	fd = isc__nm_udp_lb_socket(iface->type.sa.sa_family);
151 #endif
152 
153 	isc_barrier_init(&sock->startlistening, sock->nchildren);
154 
155 	for (size_t i = 0; i < sock->nchildren; i++) {
156 		if ((int)i == isc_nm_tid()) {
157 			continue;
158 		}
159 		start_udp_child(mgr, iface, sock, fd, i);
160 	}
161 
162 	if (isc__nm_in_netthread()) {
163 		start_udp_child(mgr, iface, sock, fd, isc_nm_tid());
164 	}
165 
166 #if !HAVE_SO_REUSEPORT_LB
167 	isc__nm_closesocket(fd);
168 #endif
169 
170 	LOCK(&sock->lock);
171 	while (atomic_load(&sock->rchildren) != sock->nchildren) {
172 		WAIT(&sock->cond, &sock->lock);
173 	}
174 	result = sock->result;
175 	atomic_store(&sock->active, true);
176 	UNLOCK(&sock->lock);
177 
178 	INSIST(result != ISC_R_UNSET);
179 
180 	if (result == ISC_R_SUCCESS) {
181 		REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
182 		*sockp = sock;
183 	} else {
184 		atomic_store(&sock->active, false);
185 		enqueue_stoplistening(sock);
186 		isc_nmsocket_close(&sock);
187 	}
188 
189 	return (result);
190 }
191 
192 /*
193  * Asynchronous 'udplisten' call handler: start listening on a UDP socket.
194  */
195 void
isc__nm_async_udplisten(isc__networker_t * worker,isc__netievent_t * ev0)196 isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
197 	isc__netievent_udplisten_t *ievent = (isc__netievent_udplisten_t *)ev0;
198 	isc_nmsocket_t *sock = NULL;
199 	int r, uv_bind_flags = 0;
200 	int uv_init_flags = 0;
201 	sa_family_t sa_family;
202 	isc_result_t result = ISC_R_UNSET;
203 
204 	REQUIRE(VALID_NMSOCK(ievent->sock));
205 	REQUIRE(ievent->sock->tid == isc_nm_tid());
206 	REQUIRE(VALID_NMSOCK(ievent->sock->parent));
207 
208 	sock = ievent->sock;
209 	sa_family = sock->iface.type.sa.sa_family;
210 
211 	REQUIRE(sock->type == isc_nm_udpsocket);
212 	REQUIRE(sock->parent != NULL);
213 	REQUIRE(sock->tid == isc_nm_tid());
214 
215 #ifdef UV_UDP_RECVMMSG
216 	uv_init_flags |= UV_UDP_RECVMMSG;
217 #endif
218 	r = uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags);
219 	RUNTIME_CHECK(r == 0);
220 	uv_handle_set_data(&sock->uv_handle.handle, sock);
221 	/* This keeps the socket alive after everything else is gone */
222 	isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
223 
224 	r = uv_timer_init(&worker->loop, &sock->timer);
225 	RUNTIME_CHECK(r == 0);
226 	uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
227 
228 	LOCK(&sock->parent->lock);
229 
230 	r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
231 	if (r < 0) {
232 		isc__nm_closesocket(sock->fd);
233 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
234 		goto done;
235 	}
236 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
237 
238 	if (sa_family == AF_INET6) {
239 		uv_bind_flags |= UV_UDP_IPV6ONLY;
240 	}
241 
242 #if HAVE_SO_REUSEPORT_LB
243 	r = isc_uv_udp_freebind(&sock->uv_handle.udp,
244 				&sock->parent->iface.type.sa, uv_bind_flags);
245 	if (r < 0) {
246 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
247 		goto done;
248 	}
249 #else
250 	if (sock->parent->fd == -1) {
251 		/* This thread is first, bind the socket */
252 		r = isc_uv_udp_freebind(&sock->uv_handle.udp,
253 					&sock->parent->iface.type.sa,
254 					uv_bind_flags);
255 		if (r < 0) {
256 			isc__nm_incstats(sock->mgr,
257 					 sock->statsindex[STATID_BINDFAIL]);
258 			goto done;
259 		}
260 		sock->parent->uv_handle.udp.flags = sock->uv_handle.udp.flags;
261 		sock->parent->fd = sock->fd;
262 	} else {
263 		/* The socket is already bound, just copy the flags */
264 		sock->uv_handle.udp.flags = sock->parent->uv_handle.udp.flags;
265 	}
266 #endif
267 
268 	isc__nm_set_network_buffers(sock->mgr, &sock->uv_handle.handle);
269 
270 	r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
271 			      udp_recv_cb);
272 	if (r != 0) {
273 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
274 		goto done;
275 	}
276 
277 	atomic_store(&sock->listening, true);
278 
279 done:
280 	result = isc__nm_uverr2result(r);
281 	atomic_fetch_add(&sock->parent->rchildren, 1);
282 	if (sock->parent->result == ISC_R_UNSET) {
283 		sock->parent->result = result;
284 	}
285 	SIGNAL(&sock->parent->cond);
286 	UNLOCK(&sock->parent->lock);
287 
288 	isc_barrier_wait(&sock->parent->startlistening);
289 }
290 
291 void
isc__nm_udp_stoplistening(isc_nmsocket_t * sock)292 isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
293 	REQUIRE(VALID_NMSOCK(sock));
294 	REQUIRE(sock->type == isc_nm_udplistener);
295 
296 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
297 					    true)) {
298 		INSIST(0);
299 		ISC_UNREACHABLE();
300 	}
301 
302 	if (!isc__nm_in_netthread()) {
303 		enqueue_stoplistening(sock);
304 	} else {
305 		stop_udp_parent(sock);
306 	}
307 }
308 
309 /*
310  * Asynchronous 'udpstop' call handler: stop listening on a UDP socket.
311  */
312 void
isc__nm_async_udpstop(isc__networker_t * worker,isc__netievent_t * ev0)313 isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
314 	isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *)ev0;
315 	isc_nmsocket_t *sock = ievent->sock;
316 
317 	UNUSED(worker);
318 
319 	REQUIRE(VALID_NMSOCK(sock));
320 	REQUIRE(sock->tid == isc_nm_tid());
321 
322 	if (sock->parent != NULL) {
323 		stop_udp_child(sock);
324 		return;
325 	}
326 
327 	stop_udp_parent(sock);
328 }
329 
330 /*
331  * udp_recv_cb handles incoming UDP packet from uv.  The buffer here is
332  * reused for a series of packets, so we need to allocate a new one.
333  * This new one can be reused to send the response then.
334  */
335 static void
udp_recv_cb(uv_udp_t * handle,ssize_t nrecv,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)336 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
337 	    const struct sockaddr *addr, unsigned flags) {
338 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
339 	isc__nm_uvreq_t *req = NULL;
340 	uint32_t maxudp;
341 	bool free_buf;
342 	isc_sockaddr_t sockaddr;
343 	isc_result_t result;
344 
345 	REQUIRE(VALID_NMSOCK(sock));
346 	REQUIRE(sock->tid == isc_nm_tid());
347 	REQUIRE(atomic_load(&sock->reading));
348 
349 #ifdef UV_UDP_MMSG_FREE
350 	free_buf = ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE);
351 #elif UV_UDP_MMSG_CHUNK
352 	free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0);
353 #else
354 	free_buf = true;
355 	UNUSED(flags);
356 #endif
357 
358 	/*
359 	 * Four possible reasons to return now without processing:
360 	 */
361 
362 	/*
363 	 * - If we're simulating a firewall blocking UDP packets
364 	 *   bigger than 'maxudp' bytes for testing purposes.
365 	 */
366 	maxudp = atomic_load(&sock->mgr->maxudp);
367 	if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
368 		/*
369 		 * We need to keep the read_cb intact in case, so the
370 		 * readtimeout_cb can trigger and not crash because of
371 		 * missing read_req.
372 		 */
373 		goto free;
374 	}
375 
376 	/*
377 	 * - If there was a networking error.
378 	 */
379 	if (nrecv < 0) {
380 		isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nrecv),
381 				       false);
382 		goto free;
383 	}
384 
385 	/*
386 	 * - If addr == NULL, in which case it's the end of stream;
387 	 *   we can free the buffer and bail.
388 	 */
389 	if (addr == NULL) {
390 		isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
391 		goto free;
392 	}
393 
394 	/*
395 	 * - If the socket is no longer active.
396 	 */
397 	if (!isc__nmsocket_active(sock)) {
398 		isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
399 		goto free;
400 	}
401 
402 	result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
403 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
404 
405 	req = isc__nm_get_read_req(sock, &sockaddr);
406 
407 	/*
408 	 * The callback will be called synchronously, because result is
409 	 * ISC_R_SUCCESS, so we are ok of passing the buf directly.
410 	 */
411 	req->uvbuf.base = buf->base;
412 	req->uvbuf.len = nrecv;
413 
414 	sock->recv_read = false;
415 
416 	REQUIRE(!sock->processing);
417 	sock->processing = true;
418 	isc__nm_readcb(sock, req, ISC_R_SUCCESS);
419 	sock->processing = false;
420 
421 free:
422 	if (free_buf) {
423 		isc__nm_free_uvbuf(sock, buf);
424 	}
425 }
426 
427 /*
428  * Send the data in 'region' to a peer via a UDP socket. We try to find
429  * a proper sibling/child socket so that we won't have to jump to
430  * another thread.
431  */
432 void
isc__nm_udp_send(isc_nmhandle_t * handle,const isc_region_t * region,isc_nm_cb_t cb,void * cbarg)433 isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
434 		 isc_nm_cb_t cb, void *cbarg) {
435 	isc_nmsocket_t *sock = handle->sock;
436 	isc_nmsocket_t *rsock = NULL;
437 	isc_sockaddr_t *peer = &handle->peer;
438 	isc__nm_uvreq_t *uvreq = NULL;
439 	uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
440 	int ntid;
441 
442 	INSIST(sock->type == isc_nm_udpsocket);
443 
444 	/*
445 	 * We're simulating a firewall blocking UDP packets bigger than
446 	 * 'maxudp' bytes, for testing purposes.
447 	 *
448 	 * The client would ordinarily have unreferenced the handle
449 	 * in the callback, but that won't happen in this case, so
450 	 * we need to do so here.
451 	 */
452 	if (maxudp != 0 && region->length > maxudp) {
453 		isc_nmhandle_detach(&handle);
454 		return;
455 	}
456 
457 	if (atomic_load(&sock->client)) {
458 		/*
459 		 * When we are sending from the client socket, we directly use
460 		 * the socket provided.
461 		 */
462 		rsock = sock;
463 		goto send;
464 	} else {
465 		/*
466 		 * When we are sending from the server socket, we either use the
467 		 * socket associated with the network thread we are in, or we
468 		 * use the thread from the socket associated with the handle.
469 		 */
470 		INSIST(sock->parent != NULL);
471 
472 		if (isc__nm_in_netthread()) {
473 			ntid = isc_nm_tid();
474 		} else {
475 			ntid = sock->tid;
476 		}
477 		rsock = &sock->parent->children[ntid];
478 	}
479 
480 send:
481 	uvreq = isc__nm_uvreq_get(rsock->mgr, rsock);
482 	uvreq->uvbuf.base = (char *)region->base;
483 	uvreq->uvbuf.len = region->length;
484 
485 	isc_nmhandle_attach(handle, &uvreq->handle);
486 
487 	uvreq->cb.send = cb;
488 	uvreq->cbarg = cbarg;
489 
490 	if (isc_nm_tid() == rsock->tid) {
491 		REQUIRE(rsock->tid == isc_nm_tid());
492 		isc__netievent_udpsend_t ievent = { .sock = rsock,
493 						    .req = uvreq,
494 						    .peer = *peer };
495 
496 		isc__nm_async_udpsend(NULL, (isc__netievent_t *)&ievent);
497 	} else {
498 		isc__netievent_udpsend_t *ievent =
499 			isc__nm_get_netievent_udpsend(sock->mgr, rsock);
500 		ievent->peer = *peer;
501 		ievent->req = uvreq;
502 
503 		isc__nm_enqueue_ievent(&sock->mgr->workers[rsock->tid],
504 				       (isc__netievent_t *)ievent);
505 	}
506 }
507 
508 /*
509  * Asynchronous 'udpsend' event handler: send a packet on a UDP socket.
510  */
511 void
isc__nm_async_udpsend(isc__networker_t * worker,isc__netievent_t * ev0)512 isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
513 	isc_result_t result;
514 	isc__netievent_udpsend_t *ievent = (isc__netievent_udpsend_t *)ev0;
515 	isc_nmsocket_t *sock = ievent->sock;
516 	isc__nm_uvreq_t *uvreq = ievent->req;
517 
518 	REQUIRE(sock->type == isc_nm_udpsocket);
519 	REQUIRE(sock->tid == isc_nm_tid());
520 	UNUSED(worker);
521 
522 	if (isc__nmsocket_closing(sock)) {
523 		isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED);
524 		return;
525 	}
526 
527 	result = udp_send_direct(sock, uvreq, &ievent->peer);
528 	if (result != ISC_R_SUCCESS) {
529 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
530 		isc__nm_failed_send_cb(sock, uvreq, result);
531 	}
532 }
533 
534 static void
udp_send_cb(uv_udp_send_t * req,int status)535 udp_send_cb(uv_udp_send_t *req, int status) {
536 	isc_result_t result = ISC_R_SUCCESS;
537 	isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req);
538 	isc_nmsocket_t *sock = NULL;
539 
540 	REQUIRE(VALID_UVREQ(uvreq));
541 	REQUIRE(VALID_NMHANDLE(uvreq->handle));
542 
543 	sock = uvreq->sock;
544 
545 	REQUIRE(sock->tid == isc_nm_tid());
546 
547 	if (status < 0) {
548 		result = isc__nm_uverr2result(status);
549 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
550 	}
551 
552 	isc__nm_sendcb(sock, uvreq, result, false);
553 }
554 
555 /*
556  * udp_send_direct sends buf to a peer on a socket. Sock has to be in
557  * the same thread as the callee.
558  */
559 static isc_result_t
udp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req,isc_sockaddr_t * peer)560 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
561 		isc_sockaddr_t *peer) {
562 	const struct sockaddr *sa = &peer->type.sa;
563 	int r;
564 
565 	REQUIRE(VALID_NMSOCK(sock));
566 	REQUIRE(VALID_UVREQ(req));
567 	REQUIRE(sock->tid == isc_nm_tid());
568 	REQUIRE(sock->type == isc_nm_udpsocket);
569 
570 	if (isc__nmsocket_closing(sock)) {
571 		return (ISC_R_CANCELED);
572 	}
573 
574 #if UV_VERSION_HEX >= UV_VERSION(1, 27, 0)
575 	/*
576 	 * If we used uv_udp_connect() (and not the shim version for
577 	 * older versions of libuv), then the peer address has to be
578 	 * set to NULL or else uv_udp_send() could fail or assert,
579 	 * depending on the libuv version.
580 	 */
581 	if (atomic_load(&sock->connected)) {
582 		sa = NULL;
583 	}
584 #endif
585 
586 	r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
587 			&req->uvbuf, 1, sa, udp_send_cb);
588 	if (r < 0) {
589 		return (isc__nm_uverr2result(r));
590 	}
591 
592 	return (ISC_R_SUCCESS);
593 }
594 
595 static isc_result_t
udp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)596 udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
597 	isc__networker_t *worker = NULL;
598 	int uv_bind_flags = UV_UDP_REUSEADDR;
599 	isc_result_t result = ISC_R_UNSET;
600 	int tries = 3;
601 	int r;
602 
603 	REQUIRE(isc__nm_in_netthread());
604 	REQUIRE(sock->tid == isc_nm_tid());
605 
606 	worker = &sock->mgr->workers[isc_nm_tid()];
607 
608 	atomic_store(&sock->connecting, true);
609 
610 	r = uv_udp_init(&worker->loop, &sock->uv_handle.udp);
611 	RUNTIME_CHECK(r == 0);
612 	uv_handle_set_data(&sock->uv_handle.handle, sock);
613 
614 	r = uv_timer_init(&worker->loop, &sock->timer);
615 	RUNTIME_CHECK(r == 0);
616 	uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
617 
618 	if (isc__nm_closing(sock)) {
619 		result = ISC_R_SHUTTINGDOWN;
620 		goto error;
621 	}
622 
623 	r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
624 	if (r != 0) {
625 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
626 		goto done;
627 	}
628 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
629 
630 	if (sock->iface.type.sa.sa_family == AF_INET6) {
631 		uv_bind_flags |= UV_UDP_IPV6ONLY;
632 	}
633 
634 	r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface.type.sa,
635 			uv_bind_flags);
636 	if (r != 0) {
637 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
638 		goto done;
639 	}
640 
641 	isc__nm_set_network_buffers(sock->mgr, &sock->uv_handle.handle);
642 
643 	/*
644 	 * On FreeBSD the UDP connect() call sometimes results in a
645 	 * spurious transient EADDRINUSE. Try a few more times before
646 	 * giving up.
647 	 */
648 	do {
649 		r = isc_uv_udp_connect(&sock->uv_handle.udp,
650 				       &req->peer.type.sa);
651 	} while (r == UV_EADDRINUSE && --tries > 0);
652 	if (r != 0) {
653 		isc__nm_incstats(sock->mgr,
654 				 sock->statsindex[STATID_CONNECTFAIL]);
655 		goto done;
656 	}
657 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
658 
659 	atomic_store(&sock->connecting, false);
660 	atomic_store(&sock->connected, true);
661 
662 done:
663 	result = isc__nm_uverr2result(r);
664 error:
665 
666 	LOCK(&sock->lock);
667 	sock->result = result;
668 	SIGNAL(&sock->cond);
669 	if (!atomic_load(&sock->active)) {
670 		WAIT(&sock->scond, &sock->lock);
671 	}
672 	INSIST(atomic_load(&sock->active));
673 	UNLOCK(&sock->lock);
674 
675 	return (result);
676 }
677 
678 /*
679  * Asynchronous 'udpconnect' call handler: open a new UDP socket and
680  * call the 'open' callback with a handle.
681  */
682 void
isc__nm_async_udpconnect(isc__networker_t * worker,isc__netievent_t * ev0)683 isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
684 	isc__netievent_udpconnect_t *ievent =
685 		(isc__netievent_udpconnect_t *)ev0;
686 	isc_nmsocket_t *sock = ievent->sock;
687 	isc__nm_uvreq_t *req = ievent->req;
688 	isc_result_t result;
689 
690 	UNUSED(worker);
691 
692 	REQUIRE(VALID_NMSOCK(sock));
693 	REQUIRE(sock->type == isc_nm_udpsocket);
694 	REQUIRE(sock->parent == NULL);
695 	REQUIRE(sock->tid == isc_nm_tid());
696 
697 	result = udp_connect_direct(sock, req);
698 	if (result != ISC_R_SUCCESS) {
699 		atomic_store(&sock->active, false);
700 		isc__nm_udp_close(sock);
701 		isc__nm_connectcb(sock, req, result, true);
702 	} else {
703 		/*
704 		 * The callback has to be called after the socket has been
705 		 * initialized
706 		 */
707 		isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true);
708 	}
709 
710 	/*
711 	 * The sock is now attached to the handle.
712 	 */
713 	isc__nmsocket_detach(&sock);
714 }
715 
716 void
isc_nm_udpconnect(isc_nm_t * mgr,isc_sockaddr_t * local,isc_sockaddr_t * peer,isc_nm_cb_t cb,void * cbarg,unsigned int timeout,size_t extrahandlesize)717 isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
718 		  isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
719 		  size_t extrahandlesize) {
720 	isc_result_t result = ISC_R_SUCCESS;
721 	isc_nmsocket_t *sock = NULL;
722 	isc__netievent_udpconnect_t *event = NULL;
723 	isc__nm_uvreq_t *req = NULL;
724 	sa_family_t sa_family;
725 
726 	REQUIRE(VALID_NM(mgr));
727 	REQUIRE(local != NULL);
728 	REQUIRE(peer != NULL);
729 
730 	sa_family = peer->type.sa.sa_family;
731 
732 	sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
733 	isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local);
734 
735 	sock->connect_cb = cb;
736 	sock->connect_cbarg = cbarg;
737 	sock->read_timeout = timeout;
738 	sock->extrahandlesize = extrahandlesize;
739 	sock->peer = *peer;
740 	sock->result = ISC_R_UNSET;
741 	atomic_init(&sock->client, true);
742 
743 	req = isc__nm_uvreq_get(mgr, sock);
744 	req->cb.connect = cb;
745 	req->cbarg = cbarg;
746 	req->peer = *peer;
747 	req->local = *local;
748 	req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
749 
750 	result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock->fd);
751 	if (result != ISC_R_SUCCESS) {
752 		if (isc__nm_in_netthread()) {
753 			sock->tid = isc_nm_tid();
754 		}
755 		isc__nmsocket_clearcb(sock);
756 		isc__nm_connectcb(sock, req, result, true);
757 		atomic_store(&sock->closed, true);
758 		isc__nmsocket_detach(&sock);
759 		return;
760 	}
761 
762 	result = isc__nm_socket_reuse(sock->fd);
763 	RUNTIME_CHECK(result == ISC_R_SUCCESS ||
764 		      result == ISC_R_NOTIMPLEMENTED);
765 
766 	result = isc__nm_socket_reuse_lb(sock->fd);
767 	RUNTIME_CHECK(result == ISC_R_SUCCESS ||
768 		      result == ISC_R_NOTIMPLEMENTED);
769 
770 	(void)isc__nm_socket_incoming_cpu(sock->fd);
771 
772 	(void)isc__nm_socket_disable_pmtud(sock->fd, sa_family);
773 
774 	event = isc__nm_get_netievent_udpconnect(mgr, sock, req);
775 
776 	if (isc__nm_in_netthread()) {
777 		atomic_store(&sock->active, true);
778 		sock->tid = isc_nm_tid();
779 		isc__nm_async_udpconnect(&mgr->workers[sock->tid],
780 					 (isc__netievent_t *)event);
781 		isc__nm_put_netievent_udpconnect(mgr, event);
782 	} else {
783 		atomic_init(&sock->active, false);
784 		sock->tid = isc_random_uniform(mgr->nworkers);
785 		isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
786 				       (isc__netievent_t *)event);
787 	}
788 	LOCK(&sock->lock);
789 	while (sock->result == ISC_R_UNSET) {
790 		WAIT(&sock->cond, &sock->lock);
791 	}
792 	atomic_store(&sock->active, true);
793 	BROADCAST(&sock->scond);
794 	UNLOCK(&sock->lock);
795 }
796 
797 void
isc__nm_udp_read_cb(uv_udp_t * handle,ssize_t nrecv,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)798 isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
799 		    const struct sockaddr *addr, unsigned flags) {
800 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
801 	REQUIRE(VALID_NMSOCK(sock));
802 
803 	udp_recv_cb(handle, nrecv, buf, addr, flags);
804 	/*
805 	 * If a caller calls isc_nm_read() on a listening socket, we can
806 	 * get here, but we MUST NOT stop reading from the listener
807 	 * socket.  The only difference between listener and connected
808 	 * sockets is that the former has sock->parent set and later
809 	 * does not.
810 	 */
811 	if (!sock->parent) {
812 		isc__nmsocket_timer_stop(sock);
813 		isc__nm_stop_reading(sock);
814 	}
815 }
816 
817 void
isc__nm_udp_failed_read_cb(isc_nmsocket_t * sock,isc_result_t result)818 isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
819 	REQUIRE(VALID_NMSOCK(sock));
820 	REQUIRE(result != ISC_R_SUCCESS);
821 
822 	if (atomic_load(&sock->client)) {
823 		isc__nmsocket_timer_stop(sock);
824 		isc__nm_stop_reading(sock);
825 
826 		if (!sock->recv_read) {
827 			goto destroy;
828 		}
829 		sock->recv_read = false;
830 
831 		if (sock->recv_cb != NULL) {
832 			isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
833 			isc__nmsocket_clearcb(sock);
834 			isc__nm_readcb(sock, req, result);
835 		}
836 
837 	destroy:
838 		isc__nmsocket_prep_destroy(sock);
839 		return;
840 	}
841 
842 	/*
843 	 * For UDP server socket, we don't have child socket via
844 	 * "accept", so we:
845 	 * - we continue to read
846 	 * - we don't clear the callbacks
847 	 * - we don't destroy it (only stoplistening could do that)
848 	 */
849 	if (!sock->recv_read) {
850 		return;
851 	}
852 	sock->recv_read = false;
853 
854 	if (sock->recv_cb != NULL) {
855 		isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
856 		isc__nm_readcb(sock, req, result);
857 	}
858 }
859 
860 /*
861  * Asynchronous 'udpread' call handler: start or resume reading on a
862  * socket; pause reading and call the 'recv' callback after each
863  * datagram.
864  */
865 void
isc__nm_async_udpread(isc__networker_t * worker,isc__netievent_t * ev0)866 isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
867 	isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
868 	isc_nmsocket_t *sock = ievent->sock;
869 	isc_result_t result = ISC_R_SUCCESS;
870 
871 	UNUSED(worker);
872 
873 	REQUIRE(VALID_NMSOCK(sock));
874 	REQUIRE(sock->tid == isc_nm_tid());
875 
876 	if (isc__nm_closing(sock)) {
877 		result = ISC_R_SHUTTINGDOWN;
878 	} else if (isc__nmsocket_closing(sock)) {
879 		result = ISC_R_CANCELED;
880 	}
881 
882 	if (result != ISC_R_SUCCESS) {
883 		atomic_store(&sock->reading, true);
884 		isc__nm_failed_read_cb(sock, result, false);
885 		return;
886 	}
887 
888 	isc__nm_start_reading(sock);
889 	isc__nmsocket_timer_start(sock);
890 }
891 
892 void
isc__nm_udp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)893 isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
894 	REQUIRE(VALID_NMHANDLE(handle));
895 	REQUIRE(VALID_NMSOCK(handle->sock));
896 
897 	isc_nmsocket_t *sock = handle->sock;
898 
899 	REQUIRE(sock->type == isc_nm_udpsocket);
900 	REQUIRE(sock->statichandle == handle);
901 	REQUIRE(!sock->recv_read);
902 
903 	sock->recv_cb = cb;
904 	sock->recv_cbarg = cbarg;
905 	sock->recv_read = true;
906 
907 	if (!atomic_load(&sock->reading) && sock->tid == isc_nm_tid()) {
908 		isc__netievent_udpread_t ievent = { .sock = sock };
909 		isc__nm_async_udpread(NULL, (isc__netievent_t *)&ievent);
910 	} else {
911 		isc__netievent_udpread_t *ievent =
912 			isc__nm_get_netievent_udpread(sock->mgr, sock);
913 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
914 				       (isc__netievent_t *)ievent);
915 	}
916 }
917 
918 static void
udp_stop_cb(uv_handle_t * handle)919 udp_stop_cb(uv_handle_t *handle) {
920 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
921 	uv_handle_set_data(handle, NULL);
922 
923 	REQUIRE(VALID_NMSOCK(sock));
924 	REQUIRE(sock->tid == isc_nm_tid());
925 	REQUIRE(atomic_load(&sock->closing));
926 
927 	if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
928 					    true)) {
929 		INSIST(0);
930 		ISC_UNREACHABLE();
931 	}
932 
933 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
934 
935 	atomic_store(&sock->listening, false);
936 
937 	isc__nmsocket_detach(&sock);
938 }
939 
940 static void
udp_close_cb(uv_handle_t * handle)941 udp_close_cb(uv_handle_t *handle) {
942 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
943 	uv_handle_set_data(handle, NULL);
944 
945 	REQUIRE(VALID_NMSOCK(sock));
946 	REQUIRE(sock->tid == isc_nm_tid());
947 	REQUIRE(atomic_load(&sock->closing));
948 
949 	if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
950 					    true)) {
951 		INSIST(0);
952 		ISC_UNREACHABLE();
953 	}
954 
955 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
956 
957 	if (sock->server != NULL) {
958 		isc__nmsocket_detach(&sock->server);
959 	}
960 
961 	atomic_store(&sock->connected, false);
962 	atomic_store(&sock->listening, false);
963 
964 	isc__nmsocket_prep_destroy(sock);
965 }
966 
967 static void
timer_close_cb(uv_handle_t * handle)968 timer_close_cb(uv_handle_t *handle) {
969 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
970 	uv_handle_set_data(handle, NULL);
971 
972 	if (sock->parent) {
973 		uv_close(&sock->uv_handle.handle, udp_stop_cb);
974 	} else {
975 		uv_close(&sock->uv_handle.handle, udp_close_cb);
976 	}
977 }
978 
979 static void
stop_udp_child(isc_nmsocket_t * sock)980 stop_udp_child(isc_nmsocket_t *sock) {
981 	REQUIRE(sock->type == isc_nm_udpsocket);
982 	REQUIRE(sock->tid == isc_nm_tid());
983 
984 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
985 					    true)) {
986 		return;
987 	}
988 
989 	udp_close_direct(sock);
990 
991 	atomic_fetch_sub(&sock->parent->rchildren, 1);
992 
993 	isc_barrier_wait(&sock->parent->stoplistening);
994 }
995 
996 static void
stop_udp_parent(isc_nmsocket_t * sock)997 stop_udp_parent(isc_nmsocket_t *sock) {
998 	isc_nmsocket_t *csock = NULL;
999 
1000 	REQUIRE(VALID_NMSOCK(sock));
1001 	REQUIRE(sock->tid == isc_nm_tid());
1002 	REQUIRE(sock->type == isc_nm_udplistener);
1003 
1004 	isc_barrier_init(&sock->stoplistening, sock->nchildren);
1005 
1006 	for (size_t i = 0; i < sock->nchildren; i++) {
1007 		csock = &sock->children[i];
1008 		REQUIRE(VALID_NMSOCK(csock));
1009 
1010 		if ((int)i == isc_nm_tid()) {
1011 			/*
1012 			 * We need to schedule closing the other sockets first
1013 			 */
1014 			continue;
1015 		}
1016 
1017 		atomic_store(&csock->active, false);
1018 		enqueue_stoplistening(csock);
1019 	}
1020 
1021 	csock = &sock->children[isc_nm_tid()];
1022 	atomic_store(&csock->active, false);
1023 	stop_udp_child(csock);
1024 
1025 	atomic_store(&sock->closed, true);
1026 	isc__nmsocket_prep_destroy(sock);
1027 }
1028 
1029 static void
udp_close_direct(isc_nmsocket_t * sock)1030 udp_close_direct(isc_nmsocket_t *sock) {
1031 	REQUIRE(VALID_NMSOCK(sock));
1032 	REQUIRE(sock->tid == isc_nm_tid());
1033 
1034 	uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
1035 }
1036 
1037 void
isc__nm_async_udpclose(isc__networker_t * worker,isc__netievent_t * ev0)1038 isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
1039 	isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0;
1040 	isc_nmsocket_t *sock = ievent->sock;
1041 
1042 	REQUIRE(VALID_NMSOCK(sock));
1043 	REQUIRE(sock->tid == isc_nm_tid());
1044 	UNUSED(worker);
1045 
1046 	udp_close_direct(sock);
1047 }
1048 
1049 void
isc__nm_udp_close(isc_nmsocket_t * sock)1050 isc__nm_udp_close(isc_nmsocket_t *sock) {
1051 	REQUIRE(VALID_NMSOCK(sock));
1052 	REQUIRE(sock->type == isc_nm_udpsocket);
1053 	REQUIRE(!isc__nmsocket_active(sock));
1054 
1055 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1056 					    true)) {
1057 		return;
1058 	}
1059 
1060 	if (sock->tid == isc_nm_tid()) {
1061 		udp_close_direct(sock);
1062 	} else {
1063 		isc__netievent_udpclose_t *ievent =
1064 			isc__nm_get_netievent_udpclose(sock->mgr, sock);
1065 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1066 				       (isc__netievent_t *)ievent);
1067 	}
1068 }
1069 
1070 void
isc__nm_udp_shutdown(isc_nmsocket_t * sock)1071 isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
1072 	REQUIRE(VALID_NMSOCK(sock));
1073 	REQUIRE(sock->tid == isc_nm_tid());
1074 	REQUIRE(sock->type == isc_nm_udpsocket);
1075 
1076 	/*
1077 	 * If the socket is active, mark it inactive and
1078 	 * continue. If it isn't active, stop now.
1079 	 */
1080 	if (!isc__nmsocket_deactivate(sock)) {
1081 		return;
1082 	}
1083 
1084 	/*
1085 	 * If the socket is connecting, the cancel will happen in the
1086 	 * async_udpconnect() due socket being inactive now.
1087 	 */
1088 	if (atomic_load(&sock->connecting)) {
1089 		return;
1090 	}
1091 
1092 	/*
1093 	 * When the client detaches the last handle, the
1094 	 * sock->statichandle would be NULL, in that case, nobody is
1095 	 * interested in the callback.
1096 	 */
1097 	if (sock->statichandle != NULL) {
1098 		if (isc__nm_closing(sock)) {
1099 			isc__nm_failed_read_cb(sock, ISC_R_SHUTTINGDOWN, false);
1100 		} else {
1101 			isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
1102 		}
1103 		return;
1104 	}
1105 
1106 	/*
1107 	 * Otherwise, we just send the socket to abyss...
1108 	 */
1109 	if (sock->parent == NULL) {
1110 		isc__nmsocket_prep_destroy(sock);
1111 	}
1112 }
1113 
1114 void
isc__nm_udp_cancelread(isc_nmhandle_t * handle)1115 isc__nm_udp_cancelread(isc_nmhandle_t *handle) {
1116 	isc_nmsocket_t *sock = NULL;
1117 	isc__netievent_udpcancel_t *ievent = NULL;
1118 
1119 	REQUIRE(VALID_NMHANDLE(handle));
1120 
1121 	sock = handle->sock;
1122 
1123 	REQUIRE(VALID_NMSOCK(sock));
1124 	REQUIRE(sock->type == isc_nm_udpsocket);
1125 
1126 	ievent = isc__nm_get_netievent_udpcancel(sock->mgr, sock, handle);
1127 
1128 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1129 			       (isc__netievent_t *)ievent);
1130 }
1131 
1132 void
isc__nm_async_udpcancel(isc__networker_t * worker,isc__netievent_t * ev0)1133 isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
1134 	isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0;
1135 	isc_nmsocket_t *sock = NULL;
1136 
1137 	UNUSED(worker);
1138 
1139 	REQUIRE(VALID_NMSOCK(ievent->sock));
1140 
1141 	sock = ievent->sock;
1142 
1143 	REQUIRE(sock->tid == isc_nm_tid());
1144 	REQUIRE(atomic_load(&sock->client));
1145 
1146 	isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
1147 }
1148