1 /*
2 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 *
6 * This Source Code Form is subject to the terms of the Mozilla Public
7 * License, v. 2.0. If a copy of the MPL was not distributed with this
8 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
9 *
10 * See the COPYRIGHT file distributed with this work for additional
11 * information regarding copyright ownership.
12 */
13
14 #include <libgen.h>
15 #include <unistd.h>
16 #include <uv.h>
17
18 #include <isc/atomic.h>
19 #include <isc/barrier.h>
20 #include <isc/buffer.h>
21 #include <isc/condition.h>
22 #include <isc/errno.h>
23 #include <isc/log.h>
24 #include <isc/magic.h>
25 #include <isc/mem.h>
26 #include <isc/netmgr.h>
27 #include <isc/quota.h>
28 #include <isc/random.h>
29 #include <isc/refcount.h>
30 #include <isc/region.h>
31 #include <isc/result.h>
32 #include <isc/sockaddr.h>
33 #include <isc/stdtime.h>
34 #include <isc/thread.h>
35 #include <isc/util.h>
36
37 #include "netmgr-int.h"
38 #include "uv-compat.h"
39
40 static atomic_uint_fast32_t last_tcpquota_log = ATOMIC_VAR_INIT(0);
41
42 static bool
can_log_tcp_quota(void)43 can_log_tcp_quota(void) {
44 isc_stdtime_t now, last;
45
46 isc_stdtime_get(&now);
47 last = atomic_exchange_relaxed(&last_tcpquota_log, now);
48 if (now != last) {
49 return (true);
50 }
51
52 return (false);
53 }
54
55 static isc_result_t
56 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
57
58 static void
59 tcp_close_direct(isc_nmsocket_t *sock);
60
61 static isc_result_t
62 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
63 static void
64 tcp_connect_cb(uv_connect_t *uvreq, int status);
65
66 static void
67 tcp_connection_cb(uv_stream_t *server, int status);
68
69 static void
70 tcp_close_cb(uv_handle_t *uvhandle);
71
72 static isc_result_t
73 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
74
75 static void
76 quota_accept_cb(isc_quota_t *quota, void *sock0);
77
78 static void
79 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
80
81 static void
82 failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
83 isc_result_t eresult);
84 static void
85 stop_tcp_parent(isc_nmsocket_t *sock);
86 static void
87 stop_tcp_child(isc_nmsocket_t *sock);
88
89 static void
failed_accept_cb(isc_nmsocket_t * sock,isc_result_t eresult)90 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
91 REQUIRE(sock->accepting);
92 REQUIRE(sock->server);
93
94 /*
95 * Detach the quota early to make room for other connections;
96 * otherwise it'd be detached later asynchronously, and clog
97 * the quota unnecessarily.
98 */
99 if (sock->quota != NULL) {
100 isc_quota_detach(&sock->quota);
101 }
102
103 isc__nmsocket_detach(&sock->server);
104
105 sock->accepting = false;
106
107 switch (eresult) {
108 case ISC_R_NOTCONNECTED:
109 /* IGNORE: The client disconnected before we could accept */
110 break;
111 default:
112 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
113 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
114 "Accepting TCP connection failed: %s",
115 isc_result_totext(eresult));
116 }
117 }
118
119 static isc_result_t
tcp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)120 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
121 isc__networker_t *worker = NULL;
122 isc_result_t result = ISC_R_UNSET;
123 int r;
124
125 REQUIRE(VALID_NMSOCK(sock));
126 REQUIRE(VALID_UVREQ(req));
127
128 REQUIRE(isc__nm_in_netthread());
129 REQUIRE(sock->tid == isc_nm_tid());
130
131 worker = &sock->mgr->workers[sock->tid];
132
133 atomic_store(&sock->connecting, true);
134
135 /* 2 minute timeout */
136 result = isc__nm_socket_connectiontimeout(sock->fd, 120 * 1000);
137 RUNTIME_CHECK(result == ISC_R_SUCCESS);
138
139 r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
140 UV_RUNTIME_CHECK(uv_tcp_init, r);
141 uv_handle_set_data(&sock->uv_handle.handle, sock);
142
143 r = uv_timer_init(&worker->loop, &sock->read_timer);
144 UV_RUNTIME_CHECK(uv_timer_init, r);
145 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
146
147 r = uv_timer_init(&worker->loop, &sock->write_timer);
148 UV_RUNTIME_CHECK(uv_timer_init, r);
149 uv_handle_set_data((uv_handle_t *)&sock->write_timer, sock);
150
151 r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd);
152 if (r != 0) {
153 isc__nm_closesocket(sock->fd);
154 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
155 goto done;
156 }
157 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
158
159 if (req->local.length != 0) {
160 r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0);
161 if (r != 0) {
162 isc__nm_incstats(sock->mgr,
163 sock->statsindex[STATID_BINDFAIL]);
164 goto done;
165 }
166 }
167
168 uv_handle_set_data(&req->uv_req.handle, req);
169 r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
170 &req->peer.type.sa, tcp_connect_cb);
171 if (r != 0) {
172 isc__nm_incstats(sock->mgr,
173 sock->statsindex[STATID_CONNECTFAIL]);
174 goto done;
175 }
176 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
177
178 uv_handle_set_data((uv_handle_t *)&sock->read_timer,
179 &req->uv_req.connect);
180 isc__nmsocket_timer_start(sock);
181
182 atomic_store(&sock->connected, true);
183
184 done:
185 result = isc__nm_uverr2result(r);
186 LOCK(&sock->lock);
187 sock->result = result;
188 SIGNAL(&sock->cond);
189 if (!atomic_load(&sock->active)) {
190 WAIT(&sock->scond, &sock->lock);
191 }
192 INSIST(atomic_load(&sock->active));
193 UNLOCK(&sock->lock);
194
195 return (result);
196 }
197
198 void
isc__nm_async_tcpconnect(isc__networker_t * worker,isc__netievent_t * ev0)199 isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
200 isc__netievent_tcpconnect_t *ievent =
201 (isc__netievent_tcpconnect_t *)ev0;
202 isc_nmsocket_t *sock = ievent->sock;
203 isc__nm_uvreq_t *req = ievent->req;
204 isc_result_t result = ISC_R_SUCCESS;
205
206 UNUSED(worker);
207
208 REQUIRE(VALID_NMSOCK(sock));
209 REQUIRE(sock->type == isc_nm_tcpsocket);
210 REQUIRE(sock->parent == NULL);
211 REQUIRE(sock->tid == isc_nm_tid());
212
213 result = tcp_connect_direct(sock, req);
214 if (result != ISC_R_SUCCESS) {
215 atomic_store(&sock->active, false);
216 if (sock->fd != (uv_os_sock_t)(-1)) {
217 isc__nm_tcp_close(sock);
218 }
219 isc__nm_connectcb(sock, req, result, true);
220 }
221
222 /*
223 * The sock is now attached to the handle.
224 */
225 isc__nmsocket_detach(&sock);
226 }
227
228 static void
tcp_connect_cb(uv_connect_t * uvreq,int status)229 tcp_connect_cb(uv_connect_t *uvreq, int status) {
230 isc_result_t result;
231 isc__nm_uvreq_t *req = NULL;
232 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
233 struct sockaddr_storage ss;
234 int r;
235
236 REQUIRE(VALID_NMSOCK(sock));
237 REQUIRE(sock->tid == isc_nm_tid());
238
239 isc__nmsocket_timer_stop(sock);
240 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
241
242 req = uv_handle_get_data((uv_handle_t *)uvreq);
243
244 REQUIRE(VALID_UVREQ(req));
245 REQUIRE(VALID_NMHANDLE(req->handle));
246
247 if (atomic_load(&sock->timedout)) {
248 result = ISC_R_TIMEDOUT;
249 goto error;
250 }
251
252 if (!atomic_load(&sock->connecting)) {
253 /*
254 * The connect was cancelled from timeout; just clean up
255 * the req.
256 */
257 isc__nm_uvreq_put(&req, sock);
258 return;
259 } else if (isc__nmsocket_closing(sock)) {
260 /* Socket was closed midflight by isc__nm_tcp_shutdown() */
261 result = ISC_R_CANCELED;
262 goto error;
263 } else if (status == UV_ETIMEDOUT) {
264 /* Timeout status code here indicates hard error */
265 result = ISC_R_TIMEDOUT;
266 goto error;
267 } else if (status != 0) {
268 result = isc__nm_uverr2result(status);
269 goto error;
270 }
271
272 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
273 r = uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
274 &(int){ sizeof(ss) });
275 if (r != 0) {
276 result = isc__nm_uverr2result(r);
277 goto error;
278 }
279
280 atomic_store(&sock->connecting, false);
281
282 result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
283 RUNTIME_CHECK(result == ISC_R_SUCCESS);
284
285 isc__nm_connectcb(sock, req, ISC_R_SUCCESS, false);
286
287 return;
288
289 error:
290 isc__nm_failed_connect_cb(sock, req, result, false);
291 }
292
293 void
isc_nm_tcpconnect(isc_nm_t * mgr,isc_sockaddr_t * local,isc_sockaddr_t * peer,isc_nm_cb_t cb,void * cbarg,unsigned int timeout,size_t extrahandlesize)294 isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
295 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
296 size_t extrahandlesize) {
297 isc_result_t result = ISC_R_SUCCESS;
298 isc_nmsocket_t *sock = NULL;
299 isc__netievent_tcpconnect_t *ievent = NULL;
300 isc__nm_uvreq_t *req = NULL;
301 sa_family_t sa_family;
302
303 REQUIRE(VALID_NM(mgr));
304 REQUIRE(local != NULL);
305 REQUIRE(peer != NULL);
306
307 sa_family = peer->type.sa.sa_family;
308
309 sock = isc_mem_get(mgr->mctx, sizeof(*sock));
310 isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local);
311
312 sock->extrahandlesize = extrahandlesize;
313 sock->connect_timeout = timeout;
314 sock->result = ISC_R_UNSET;
315 sock->fd = (uv_os_sock_t)-1;
316 atomic_init(&sock->client, true);
317
318 req = isc__nm_uvreq_get(mgr, sock);
319 req->cb.connect = cb;
320 req->cbarg = cbarg;
321 req->peer = *peer;
322 req->local = *local;
323 req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
324
325 result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock->fd);
326 if (result != ISC_R_SUCCESS) {
327 if (isc__nm_in_netthread()) {
328 sock->tid = isc_nm_tid();
329 isc__nmsocket_clearcb(sock);
330 isc__nm_connectcb(sock, req, result, false);
331 } else {
332 isc__nmsocket_clearcb(sock);
333 sock->tid = isc_random_uniform(mgr->nworkers);
334 isc__nm_connectcb(sock, req, result, true);
335 }
336 atomic_store(&sock->closed, true);
337 isc__nmsocket_detach(&sock);
338 return;
339 }
340
341 ievent = isc__nm_get_netievent_tcpconnect(mgr, sock, req);
342
343 if (isc__nm_in_netthread()) {
344 atomic_store(&sock->active, true);
345 sock->tid = isc_nm_tid();
346 isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
347 (isc__netievent_t *)ievent);
348 isc__nm_put_netievent_tcpconnect(mgr, ievent);
349 } else {
350 atomic_init(&sock->active, false);
351 sock->tid = isc_random_uniform(mgr->nworkers);
352 isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
353 (isc__netievent_t *)ievent);
354 }
355 LOCK(&sock->lock);
356 while (sock->result == ISC_R_UNSET) {
357 WAIT(&sock->cond, &sock->lock);
358 }
359 atomic_store(&sock->active, true);
360 BROADCAST(&sock->scond);
361 UNLOCK(&sock->lock);
362 }
363
364 static uv_os_sock_t
isc__nm_tcp_lb_socket(sa_family_t sa_family)365 isc__nm_tcp_lb_socket(sa_family_t sa_family) {
366 isc_result_t result;
367 uv_os_sock_t sock;
368
369 result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock);
370 RUNTIME_CHECK(result == ISC_R_SUCCESS);
371
372 (void)isc__nm_socket_incoming_cpu(sock);
373
374 /* FIXME: set mss */
375
376 result = isc__nm_socket_reuse(sock);
377 RUNTIME_CHECK(result == ISC_R_SUCCESS);
378
379 #if HAVE_SO_REUSEPORT_LB
380 result = isc__nm_socket_reuse_lb(sock);
381 RUNTIME_CHECK(result == ISC_R_SUCCESS);
382 #endif
383
384 return (sock);
385 }
386
387 static void
start_tcp_child(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nmsocket_t * sock,uv_os_sock_t fd,int tid)388 start_tcp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
389 uv_os_sock_t fd, int tid) {
390 isc__netievent_tcplisten_t *ievent = NULL;
391 isc_nmsocket_t *csock = &sock->children[tid];
392
393 isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
394 csock->parent = sock;
395 csock->accept_cb = sock->accept_cb;
396 csock->accept_cbarg = sock->accept_cbarg;
397 csock->extrahandlesize = sock->extrahandlesize;
398 csock->backlog = sock->backlog;
399 csock->tid = tid;
400 /*
401 * We don't attach to quota, just assign - to avoid
402 * increasing quota unnecessarily.
403 */
404 csock->pquota = sock->pquota;
405 isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
406
407 #if HAVE_SO_REUSEPORT_LB || defined(WIN32)
408 UNUSED(fd);
409 csock->fd = isc__nm_tcp_lb_socket(iface->type.sa.sa_family);
410 #else
411 csock->fd = dup(fd);
412 #endif
413 REQUIRE(csock->fd >= 0);
414
415 ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
416 isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
417 (isc__netievent_t *)ievent);
418 }
419
420 static void
enqueue_stoplistening(isc_nmsocket_t * sock)421 enqueue_stoplistening(isc_nmsocket_t *sock) {
422 isc__netievent_tcpstop_t *ievent =
423 isc__nm_get_netievent_tcpstop(sock->mgr, sock);
424 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
425 (isc__netievent_t *)ievent);
426 }
427
428 isc_result_t
isc_nm_listentcp(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nm_accept_cb_t accept_cb,void * accept_cbarg,size_t extrahandlesize,int backlog,isc_quota_t * quota,isc_nmsocket_t ** sockp)429 isc_nm_listentcp(isc_nm_t *mgr, isc_sockaddr_t *iface,
430 isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
431 size_t extrahandlesize, int backlog, isc_quota_t *quota,
432 isc_nmsocket_t **sockp) {
433 isc_result_t result = ISC_R_SUCCESS;
434 isc_nmsocket_t *sock = NULL;
435 size_t children_size = 0;
436 uv_os_sock_t fd = -1;
437
438 REQUIRE(VALID_NM(mgr));
439
440 sock = isc_mem_get(mgr->mctx, sizeof(*sock));
441 isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface);
442
443 atomic_init(&sock->rchildren, 0);
444 #if defined(WIN32)
445 sock->nchildren = 1;
446 #else
447 sock->nchildren = mgr->nworkers;
448 #endif
449 children_size = sock->nchildren * sizeof(sock->children[0]);
450 sock->children = isc_mem_get(mgr->mctx, children_size);
451 memset(sock->children, 0, children_size);
452
453 sock->result = ISC_R_UNSET;
454
455 sock->accept_cb = accept_cb;
456 sock->accept_cbarg = accept_cbarg;
457 sock->extrahandlesize = extrahandlesize;
458 sock->backlog = backlog;
459 sock->pquota = quota;
460
461 sock->tid = 0;
462 sock->fd = -1;
463
464 #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
465 fd = isc__nm_tcp_lb_socket(iface->type.sa.sa_family);
466 #endif
467
468 isc_barrier_init(&sock->startlistening, sock->nchildren);
469
470 for (size_t i = 0; i < sock->nchildren; i++) {
471 if ((int)i == isc_nm_tid()) {
472 continue;
473 }
474 start_tcp_child(mgr, iface, sock, fd, i);
475 }
476
477 if (isc__nm_in_netthread()) {
478 start_tcp_child(mgr, iface, sock, fd, isc_nm_tid());
479 }
480
481 #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
482 isc__nm_closesocket(fd);
483 #endif
484
485 LOCK(&sock->lock);
486 while (atomic_load(&sock->rchildren) != sock->nchildren) {
487 WAIT(&sock->cond, &sock->lock);
488 }
489 result = sock->result;
490 atomic_store(&sock->active, true);
491 UNLOCK(&sock->lock);
492
493 INSIST(result != ISC_R_UNSET);
494
495 if (result == ISC_R_SUCCESS) {
496 REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
497 *sockp = sock;
498 } else {
499 atomic_store(&sock->active, false);
500 enqueue_stoplistening(sock);
501 isc_nmsocket_close(&sock);
502 }
503
504 return (result);
505 }
506
507 void
isc__nm_async_tcplisten(isc__networker_t * worker,isc__netievent_t * ev0)508 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
509 isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0;
510 sa_family_t sa_family;
511 int r;
512 int flags = 0;
513 isc_nmsocket_t *sock = NULL;
514 isc_result_t result;
515
516 REQUIRE(VALID_NMSOCK(ievent->sock));
517 REQUIRE(ievent->sock->tid == isc_nm_tid());
518 REQUIRE(VALID_NMSOCK(ievent->sock->parent));
519
520 sock = ievent->sock;
521 sa_family = sock->iface.type.sa.sa_family;
522
523 REQUIRE(sock->type == isc_nm_tcpsocket);
524 REQUIRE(sock->parent != NULL);
525 REQUIRE(sock->tid == isc_nm_tid());
526
527 /* TODO: set min mss */
528
529 r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
530 UV_RUNTIME_CHECK(uv_tcp_init, r);
531
532 uv_handle_set_data(&sock->uv_handle.handle, sock);
533 /* This keeps the socket alive after everything else is gone */
534 isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
535
536 r = uv_timer_init(&worker->loop, &sock->read_timer);
537 UV_RUNTIME_CHECK(uv_timer_init, r);
538 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
539
540 r = uv_timer_init(&worker->loop, &sock->write_timer);
541 UV_RUNTIME_CHECK(uv_timer_init, r);
542 uv_handle_set_data((uv_handle_t *)&sock->write_timer, sock);
543
544 LOCK(&sock->parent->lock);
545
546 r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd);
547 if (r < 0) {
548 isc__nm_closesocket(sock->fd);
549 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
550 goto done;
551 }
552 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
553
554 if (sa_family == AF_INET6) {
555 flags = UV_TCP_IPV6ONLY;
556 }
557
558 #if HAVE_SO_REUSEPORT_LB || defined(WIN32)
559 r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, &sock->iface.type.sa,
560 flags);
561 if (r < 0) {
562 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
563 goto done;
564 }
565 #else
566 if (sock->parent->fd == -1) {
567 r = isc_uv_tcp_freebind(&sock->uv_handle.tcp,
568 &sock->iface.type.sa, flags);
569 if (r < 0) {
570 isc__nm_incstats(sock->mgr,
571 sock->statsindex[STATID_BINDFAIL]);
572 goto done;
573 }
574 sock->parent->uv_handle.tcp.flags = sock->uv_handle.tcp.flags;
575 sock->parent->fd = sock->fd;
576 } else {
577 /* The socket is already bound, just copy the flags */
578 sock->uv_handle.tcp.flags = sock->parent->uv_handle.tcp.flags;
579 }
580 #endif
581
582 /*
583 * The callback will run in the same thread uv_listen() was called
584 * from, so a race with tcp_connection_cb() isn't possible.
585 */
586 r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
587 tcp_connection_cb);
588 if (r != 0) {
589 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
590 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
591 "uv_listen failed: %s",
592 isc_result_totext(isc__nm_uverr2result(r)));
593 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
594 goto done;
595 }
596
597 atomic_store(&sock->listening, true);
598
599 done:
600 result = isc__nm_uverr2result(r);
601 if (result != ISC_R_SUCCESS) {
602 sock->pquota = NULL;
603 }
604
605 atomic_fetch_add(&sock->parent->rchildren, 1);
606 if (sock->parent->result == ISC_R_UNSET) {
607 sock->parent->result = result;
608 }
609 SIGNAL(&sock->parent->cond);
610 UNLOCK(&sock->parent->lock);
611
612 isc_barrier_wait(&sock->parent->startlistening);
613 }
614
615 static void
tcp_connection_cb(uv_stream_t * server,int status)616 tcp_connection_cb(uv_stream_t *server, int status) {
617 isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *)server);
618 isc_result_t result;
619 isc_quota_t *quota = NULL;
620
621 if (status != 0) {
622 result = isc__nm_uverr2result(status);
623 goto done;
624 }
625
626 REQUIRE(VALID_NMSOCK(ssock));
627 REQUIRE(ssock->tid == isc_nm_tid());
628
629 if (isc__nmsocket_closing(ssock)) {
630 result = ISC_R_CANCELED;
631 goto done;
632 }
633
634 if (ssock->pquota != NULL) {
635 result = isc_quota_attach_cb(ssock->pquota, "a,
636 &ssock->quotacb);
637 if (result == ISC_R_QUOTA) {
638 isc__nm_incstats(ssock->mgr,
639 ssock->statsindex[STATID_ACCEPTFAIL]);
640 goto done;
641 }
642 }
643
644 result = accept_connection(ssock, quota);
645 done:
646 isc__nm_accept_connection_log(result, can_log_tcp_quota());
647 }
648
649 void
isc__nm_tcp_stoplistening(isc_nmsocket_t * sock)650 isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
651 REQUIRE(VALID_NMSOCK(sock));
652 REQUIRE(sock->type == isc_nm_tcplistener);
653
654 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
655 true)) {
656 INSIST(0);
657 ISC_UNREACHABLE();
658 }
659
660 if (!isc__nm_in_netthread()) {
661 enqueue_stoplistening(sock);
662 } else {
663 stop_tcp_parent(sock);
664 }
665 }
666
667 void
isc__nm_async_tcpstop(isc__networker_t * worker,isc__netievent_t * ev0)668 isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
669 isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
670 isc_nmsocket_t *sock = ievent->sock;
671
672 UNUSED(worker);
673
674 REQUIRE(VALID_NMSOCK(sock));
675 REQUIRE(sock->tid == isc_nm_tid());
676
677 if (sock->parent != NULL) {
678 stop_tcp_child(sock);
679 return;
680 }
681
682 stop_tcp_parent(sock);
683 }
684
685 void
isc__nm_tcp_failed_read_cb(isc_nmsocket_t * sock,isc_result_t result)686 isc__nm_tcp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
687 REQUIRE(VALID_NMSOCK(sock));
688 REQUIRE(result != ISC_R_SUCCESS);
689
690 isc__nmsocket_timer_stop(sock);
691 isc__nm_stop_reading(sock);
692
693 if (!sock->recv_read) {
694 goto destroy;
695 }
696 sock->recv_read = false;
697
698 if (sock->recv_cb != NULL) {
699 isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
700 isc__nmsocket_clearcb(sock);
701 isc__nm_readcb(sock, req, result);
702 }
703
704 destroy:
705 isc__nmsocket_prep_destroy(sock);
706
707 /*
708 * We need to detach from quota after the read callback function had a
709 * chance to be executed.
710 */
711 if (sock->quota != NULL) {
712 isc_quota_detach(&sock->quota);
713 }
714 }
715
716 static void
failed_send_cb(isc_nmsocket_t * sock,isc__nm_uvreq_t * req,isc_result_t eresult)717 failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
718 isc_result_t eresult) {
719 REQUIRE(VALID_NMSOCK(sock));
720 REQUIRE(VALID_UVREQ(req));
721
722 if (req->cb.send != NULL) {
723 isc__nm_sendcb(sock, req, eresult, true);
724 } else {
725 isc__nm_uvreq_put(&req, sock);
726 }
727 }
728
729 void
isc__nm_tcp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)730 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
731 REQUIRE(VALID_NMHANDLE(handle));
732 REQUIRE(VALID_NMSOCK(handle->sock));
733
734 isc_nmsocket_t *sock = handle->sock;
735 isc__netievent_tcpstartread_t *ievent = NULL;
736
737 REQUIRE(sock->type == isc_nm_tcpsocket);
738 REQUIRE(sock->statichandle == handle);
739 REQUIRE(sock->tid == isc_nm_tid());
740 REQUIRE(!sock->recv_read);
741
742 sock->recv_cb = cb;
743 sock->recv_cbarg = cbarg;
744 sock->recv_read = true;
745 if (sock->read_timeout == 0) {
746 sock->read_timeout =
747 (atomic_load(&sock->keepalive)
748 ? atomic_load(&sock->mgr->keepalive)
749 : atomic_load(&sock->mgr->idle));
750 }
751
752 ievent = isc__nm_get_netievent_tcpstartread(sock->mgr, sock);
753
754 /*
755 * This MUST be done asynchronously, no matter which thread we're
756 * in. The callback function for isc_nm_read() often calls
757 * isc_nm_read() again; if we tried to do that synchronously
758 * we'd clash in processbuffer() and grow the stack indefinitely.
759 */
760 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
761 (isc__netievent_t *)ievent);
762
763 return;
764 }
765
766 void
isc__nm_async_tcpstartread(isc__networker_t * worker,isc__netievent_t * ev0)767 isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
768 isc__netievent_tcpstartread_t *ievent =
769 (isc__netievent_tcpstartread_t *)ev0;
770 isc_nmsocket_t *sock = ievent->sock;
771
772 REQUIRE(VALID_NMSOCK(sock));
773 REQUIRE(sock->tid == isc_nm_tid());
774 UNUSED(worker);
775
776 if (isc__nmsocket_closing(sock)) {
777 sock->reading = true;
778 isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
779 return;
780 }
781
782 isc__nm_start_reading(sock);
783 isc__nmsocket_timer_start(sock);
784 }
785
786 void
isc__nm_tcp_pauseread(isc_nmhandle_t * handle)787 isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
788 isc__netievent_tcppauseread_t *ievent = NULL;
789 isc_nmsocket_t *sock = NULL;
790
791 REQUIRE(VALID_NMHANDLE(handle));
792
793 sock = handle->sock;
794
795 REQUIRE(VALID_NMSOCK(sock));
796
797 if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ false },
798 true)) {
799 return;
800 }
801
802 ievent = isc__nm_get_netievent_tcppauseread(sock->mgr, sock);
803
804 isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
805 (isc__netievent_t *)ievent);
806
807 return;
808 }
809
810 void
isc__nm_async_tcppauseread(isc__networker_t * worker,isc__netievent_t * ev0)811 isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
812 isc__netievent_tcppauseread_t *ievent =
813 (isc__netievent_tcppauseread_t *)ev0;
814 isc_nmsocket_t *sock = ievent->sock;
815
816 REQUIRE(VALID_NMSOCK(sock));
817 REQUIRE(sock->tid == isc_nm_tid());
818 UNUSED(worker);
819
820 isc__nmsocket_timer_stop(sock);
821 isc__nm_stop_reading(sock);
822 }
823
824 void
isc__nm_tcp_resumeread(isc_nmhandle_t * handle)825 isc__nm_tcp_resumeread(isc_nmhandle_t *handle) {
826 REQUIRE(VALID_NMHANDLE(handle));
827 REQUIRE(VALID_NMSOCK(handle->sock));
828
829 isc__netievent_tcpstartread_t *ievent = NULL;
830 isc_nmsocket_t *sock = handle->sock;
831
832 REQUIRE(sock->tid == isc_nm_tid());
833
834 if (sock->recv_cb == NULL) {
835 /* We are no longer reading */
836 return;
837 }
838
839 if (!isc__nmsocket_active(sock)) {
840 sock->reading = true;
841 isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
842 return;
843 }
844
845 if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ true },
846 false)) {
847 return;
848 }
849
850 ievent = isc__nm_get_netievent_tcpstartread(sock->mgr, sock);
851
852 isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
853 (isc__netievent_t *)ievent);
854 }
855
856 void
isc__nm_tcp_read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)857 isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
858 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
859 isc__nm_uvreq_t *req = NULL;
860
861 REQUIRE(VALID_NMSOCK(sock));
862 REQUIRE(sock->tid == isc_nm_tid());
863 REQUIRE(sock->reading);
864 REQUIRE(buf != NULL);
865
866 if (isc__nmsocket_closing(sock)) {
867 isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
868 goto free;
869 }
870
871 if (nread < 0) {
872 if (nread != UV_EOF) {
873 isc__nm_incstats(sock->mgr,
874 sock->statsindex[STATID_RECVFAIL]);
875 }
876
877 isc__nm_tcp_failed_read_cb(sock, isc__nm_uverr2result(nread));
878
879 goto free;
880 }
881
882 req = isc__nm_get_read_req(sock, NULL);
883
884 /*
885 * The callback will be called synchronously because the
886 * result is ISC_R_SUCCESS, so we don't need to retain
887 * the buffer
888 */
889 req->uvbuf.base = buf->base;
890 req->uvbuf.len = nread;
891
892 if (!atomic_load(&sock->client)) {
893 sock->read_timeout =
894 (atomic_load(&sock->keepalive)
895 ? atomic_load(&sock->mgr->keepalive)
896 : atomic_load(&sock->mgr->idle));
897 }
898
899 isc__nm_readcb(sock, req, ISC_R_SUCCESS);
900
901 /* The readcb could have paused the reading */
902 if (sock->reading) {
903 /* The timer will be updated */
904 isc__nmsocket_timer_restart(sock);
905 }
906
907 free:
908 if (nread < 0) {
909 /*
910 * The buffer may be a null buffer on error.
911 */
912 if (buf->base == NULL && buf->len == 0) {
913 return;
914 }
915 }
916
917 isc__nm_free_uvbuf(sock, buf);
918 }
919
920 static void
quota_accept_cb(isc_quota_t * quota,void * sock0)921 quota_accept_cb(isc_quota_t *quota, void *sock0) {
922 isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
923 isc__netievent_tcpaccept_t *ievent = NULL;
924
925 REQUIRE(VALID_NMSOCK(sock));
926
927 /*
928 * Create a tcpaccept event and pass it using the async channel.
929 */
930 ievent = isc__nm_get_netievent_tcpaccept(sock->mgr, sock, quota);
931 isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
932 (isc__netievent_t *)ievent);
933 }
934
935 /*
936 * This is called after we get a quota_accept_cb() callback.
937 */
938 void
isc__nm_async_tcpaccept(isc__networker_t * worker,isc__netievent_t * ev0)939 isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
940 isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
941 isc_nmsocket_t *sock = ievent->sock;
942 isc_result_t result;
943
944 UNUSED(worker);
945
946 REQUIRE(VALID_NMSOCK(sock));
947 REQUIRE(sock->tid == isc_nm_tid());
948
949 result = accept_connection(sock, ievent->quota);
950 isc__nm_accept_connection_log(result, can_log_tcp_quota());
951 }
952
953 static isc_result_t
accept_connection(isc_nmsocket_t * ssock,isc_quota_t * quota)954 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
955 isc_nmsocket_t *csock = NULL;
956 isc__networker_t *worker = NULL;
957 int r;
958 isc_result_t result;
959 struct sockaddr_storage ss;
960 isc_sockaddr_t local;
961 isc_nmhandle_t *handle = NULL;
962
963 REQUIRE(VALID_NMSOCK(ssock));
964 REQUIRE(ssock->tid == isc_nm_tid());
965
966 if (isc__nmsocket_closing(ssock)) {
967 if (quota != NULL) {
968 isc_quota_detach("a);
969 }
970 return (ISC_R_CANCELED);
971 }
972
973 csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
974 isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, &ssock->iface);
975 csock->tid = ssock->tid;
976 csock->extrahandlesize = ssock->extrahandlesize;
977 isc__nmsocket_attach(ssock, &csock->server);
978 csock->recv_cb = ssock->recv_cb;
979 csock->recv_cbarg = ssock->recv_cbarg;
980 csock->quota = quota;
981 csock->accepting = true;
982
983 worker = &csock->mgr->workers[isc_nm_tid()];
984
985 r = uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
986 UV_RUNTIME_CHECK(uv_tcp_init, r);
987 uv_handle_set_data(&csock->uv_handle.handle, csock);
988
989 r = uv_timer_init(&worker->loop, &csock->read_timer);
990 UV_RUNTIME_CHECK(uv_timer_init, r);
991 uv_handle_set_data((uv_handle_t *)&csock->read_timer, csock);
992
993 r = uv_timer_init(&worker->loop, &csock->write_timer);
994 UV_RUNTIME_CHECK(uv_timer_init, r);
995 uv_handle_set_data((uv_handle_t *)&csock->write_timer, csock);
996
997 r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream);
998 if (r != 0) {
999 result = isc__nm_uverr2result(r);
1000 goto failure;
1001 }
1002
1003 r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
1004 &(int){ sizeof(ss) });
1005 if (r != 0) {
1006 result = isc__nm_uverr2result(r);
1007 goto failure;
1008 }
1009
1010 result = isc_sockaddr_fromsockaddr(&csock->peer,
1011 (struct sockaddr *)&ss);
1012 if (result != ISC_R_SUCCESS) {
1013 goto failure;
1014 }
1015
1016 r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
1017 &(int){ sizeof(ss) });
1018 if (r != 0) {
1019 result = isc__nm_uverr2result(r);
1020 goto failure;
1021 }
1022
1023 result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
1024 if (result != ISC_R_SUCCESS) {
1025 goto failure;
1026 }
1027
1028 handle = isc__nmhandle_get(csock, NULL, &local);
1029
1030 result = ssock->accept_cb(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
1031 if (result != ISC_R_SUCCESS) {
1032 isc_nmhandle_detach(&handle);
1033 goto failure;
1034 }
1035
1036 csock->accepting = false;
1037
1038 isc__nm_incstats(csock->mgr, csock->statsindex[STATID_ACCEPT]);
1039
1040 csock->read_timeout = atomic_load(&csock->mgr->init);
1041
1042 atomic_fetch_add(&ssock->parent->active_child_connections, 1);
1043
1044 /*
1045 * The acceptcb needs to attach to the handle if it wants to keep the
1046 * connection alive
1047 */
1048 isc_nmhandle_detach(&handle);
1049
1050 /*
1051 * sock is now attached to the handle.
1052 */
1053 isc__nmsocket_detach(&csock);
1054
1055 return (ISC_R_SUCCESS);
1056
1057 failure:
1058 atomic_store(&csock->active, false);
1059
1060 failed_accept_cb(csock, result);
1061
1062 isc__nmsocket_prep_destroy(csock);
1063
1064 isc__nmsocket_detach(&csock);
1065
1066 return (result);
1067 }
1068
1069 void
isc__nm_tcp_send(isc_nmhandle_t * handle,const isc_region_t * region,isc_nm_cb_t cb,void * cbarg)1070 isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
1071 isc_nm_cb_t cb, void *cbarg) {
1072 REQUIRE(VALID_NMHANDLE(handle));
1073 REQUIRE(VALID_NMSOCK(handle->sock));
1074
1075 isc_nmsocket_t *sock = handle->sock;
1076 isc__netievent_tcpsend_t *ievent = NULL;
1077 isc__nm_uvreq_t *uvreq = NULL;
1078
1079 REQUIRE(sock->type == isc_nm_tcpsocket);
1080
1081 uvreq = isc__nm_uvreq_get(sock->mgr, sock);
1082 uvreq->uvbuf.base = (char *)region->base;
1083 uvreq->uvbuf.len = region->length;
1084
1085 isc_nmhandle_attach(handle, &uvreq->handle);
1086
1087 uvreq->cb.send = cb;
1088 uvreq->cbarg = cbarg;
1089
1090 if (sock->write_timeout == 0) {
1091 sock->write_timeout =
1092 (atomic_load(&sock->keepalive)
1093 ? atomic_load(&sock->mgr->keepalive)
1094 : atomic_load(&sock->mgr->idle));
1095 }
1096
1097 ievent = isc__nm_get_netievent_tcpsend(sock->mgr, sock, uvreq);
1098 isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
1099 (isc__netievent_t *)ievent);
1100
1101 return;
1102 }
1103
1104 static void
tcp_send_cb(uv_write_t * req,int status)1105 tcp_send_cb(uv_write_t *req, int status) {
1106 isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
1107
1108 REQUIRE(VALID_UVREQ(uvreq));
1109 REQUIRE(VALID_NMHANDLE(uvreq->handle));
1110
1111 isc_nmsocket_t *sock = uvreq->sock;
1112
1113 if (--sock->writes == 0) {
1114 int r = uv_timer_stop(&sock->write_timer);
1115 UV_RUNTIME_CHECK(uv_timer_stop, r);
1116 }
1117
1118 if (status < 0) {
1119 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
1120 failed_send_cb(sock, uvreq, isc__nm_uverr2result(status));
1121 return;
1122 }
1123
1124 isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
1125 }
1126
1127 /*
1128 * Handle 'tcpsend' async event - send a packet on the socket
1129 */
1130 void
isc__nm_async_tcpsend(isc__networker_t * worker,isc__netievent_t * ev0)1131 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
1132 isc_result_t result;
1133 isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
1134 isc_nmsocket_t *sock = ievent->sock;
1135 isc__nm_uvreq_t *uvreq = ievent->req;
1136
1137 REQUIRE(sock->type == isc_nm_tcpsocket);
1138 REQUIRE(sock->tid == isc_nm_tid());
1139 UNUSED(worker);
1140
1141 result = tcp_send_direct(sock, uvreq);
1142 if (result != ISC_R_SUCCESS) {
1143 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
1144 failed_send_cb(sock, uvreq, result);
1145 }
1146 }
1147
1148 static isc_result_t
tcp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)1149 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
1150 REQUIRE(VALID_NMSOCK(sock));
1151 REQUIRE(VALID_UVREQ(req));
1152 REQUIRE(sock->tid == isc_nm_tid());
1153 REQUIRE(sock->type == isc_nm_tcpsocket);
1154
1155 int r;
1156
1157 if (isc__nmsocket_closing(sock)) {
1158 return (ISC_R_CANCELED);
1159 }
1160
1161 r = uv_timer_start(&sock->write_timer, isc__nmsocket_writetimeout_cb,
1162 sock->write_timeout, 0);
1163 UV_RUNTIME_CHECK(uv_timer_start, r);
1164 RUNTIME_CHECK(sock->writes++ >= 0);
1165
1166 r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf,
1167 1, tcp_send_cb);
1168 if (r < 0) {
1169 return (isc__nm_uverr2result(r));
1170 }
1171
1172 return (ISC_R_SUCCESS);
1173 }
1174
1175 static void
tcp_stop_cb(uv_handle_t * handle)1176 tcp_stop_cb(uv_handle_t *handle) {
1177 isc_nmsocket_t *sock = uv_handle_get_data(handle);
1178 uv_handle_set_data(handle, NULL);
1179
1180 REQUIRE(VALID_NMSOCK(sock));
1181 REQUIRE(sock->tid == isc_nm_tid());
1182 REQUIRE(atomic_load(&sock->closing));
1183
1184 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
1185 true)) {
1186 INSIST(0);
1187 ISC_UNREACHABLE();
1188 }
1189
1190 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
1191
1192 atomic_store(&sock->listening, false);
1193
1194 isc__nmsocket_detach(&sock);
1195 }
1196
1197 static void
tcp_close_sock(isc_nmsocket_t * sock)1198 tcp_close_sock(isc_nmsocket_t *sock) {
1199 REQUIRE(VALID_NMSOCK(sock));
1200 REQUIRE(sock->tid == isc_nm_tid());
1201 REQUIRE(atomic_load(&sock->closing));
1202
1203 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
1204 true)) {
1205 INSIST(0);
1206 ISC_UNREACHABLE();
1207 }
1208
1209 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
1210
1211 if (sock->server != NULL) {
1212 isc__nmsocket_detach(&sock->server);
1213 }
1214
1215 atomic_store(&sock->connected, false);
1216
1217 isc__nmsocket_prep_destroy(sock);
1218 }
1219
1220 static void
tcp_close_cb(uv_handle_t * handle)1221 tcp_close_cb(uv_handle_t *handle) {
1222 isc_nmsocket_t *sock = uv_handle_get_data(handle);
1223 uv_handle_set_data(handle, NULL);
1224
1225 tcp_close_sock(sock);
1226 }
1227
1228 static void
read_timer_close_cb(uv_handle_t * handle)1229 read_timer_close_cb(uv_handle_t *handle) {
1230 isc_nmsocket_t *sock = uv_handle_get_data(handle);
1231 uv_handle_set_data(handle, NULL);
1232
1233 if (sock->parent) {
1234 uv_close(&sock->uv_handle.handle, tcp_stop_cb);
1235 } else if (uv_is_closing(&sock->uv_handle.handle)) {
1236 tcp_close_sock(sock);
1237 } else {
1238 uv_close(&sock->uv_handle.handle, tcp_close_cb);
1239 }
1240 }
1241
1242 static void
write_timer_close_cb(uv_handle_t * timer)1243 write_timer_close_cb(uv_handle_t *timer) {
1244 isc_nmsocket_t *sock = uv_handle_get_data(timer);
1245 uv_handle_set_data(timer, NULL);
1246
1247 REQUIRE(VALID_NMSOCK(sock));
1248
1249 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
1250 uv_close((uv_handle_t *)&sock->read_timer, read_timer_close_cb);
1251 }
1252
1253 static void
stop_tcp_child(isc_nmsocket_t * sock)1254 stop_tcp_child(isc_nmsocket_t *sock) {
1255 REQUIRE(sock->type == isc_nm_tcpsocket);
1256 REQUIRE(sock->tid == isc_nm_tid());
1257
1258 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1259 true)) {
1260 return;
1261 }
1262
1263 tcp_close_direct(sock);
1264
1265 atomic_fetch_sub(&sock->parent->rchildren, 1);
1266
1267 isc_barrier_wait(&sock->parent->stoplistening);
1268 }
1269
1270 static void
stop_tcp_parent(isc_nmsocket_t * sock)1271 stop_tcp_parent(isc_nmsocket_t *sock) {
1272 isc_nmsocket_t *csock = NULL;
1273
1274 REQUIRE(VALID_NMSOCK(sock));
1275 REQUIRE(sock->tid == isc_nm_tid());
1276 REQUIRE(sock->type == isc_nm_tcplistener);
1277
1278 isc_barrier_init(&sock->stoplistening, sock->nchildren);
1279
1280 for (size_t i = 0; i < sock->nchildren; i++) {
1281 csock = &sock->children[i];
1282 REQUIRE(VALID_NMSOCK(csock));
1283
1284 if ((int)i == isc_nm_tid()) {
1285 /*
1286 * We need to schedule closing the other sockets first
1287 */
1288 continue;
1289 }
1290
1291 atomic_store(&csock->active, false);
1292 enqueue_stoplistening(csock);
1293 }
1294
1295 csock = &sock->children[isc_nm_tid()];
1296 atomic_store(&csock->active, false);
1297 stop_tcp_child(csock);
1298
1299 atomic_store(&sock->closed, true);
1300 isc__nmsocket_prep_destroy(sock);
1301 }
1302
1303 static void
tcp_close_direct(isc_nmsocket_t * sock)1304 tcp_close_direct(isc_nmsocket_t *sock) {
1305 int r;
1306
1307 REQUIRE(VALID_NMSOCK(sock));
1308 REQUIRE(sock->tid == isc_nm_tid());
1309 REQUIRE(atomic_load(&sock->closing));
1310
1311 if (sock->server != NULL) {
1312 REQUIRE(VALID_NMSOCK(sock->server));
1313 REQUIRE(VALID_NMSOCK(sock->server->parent));
1314 if (sock->server->parent != NULL) {
1315 atomic_fetch_sub(
1316 &sock->server->parent->active_child_connections,
1317 1);
1318 }
1319 }
1320
1321 if (sock->quota != NULL) {
1322 isc_quota_detach(&sock->quota);
1323 }
1324
1325 isc__nmsocket_timer_stop(sock);
1326 isc__nm_stop_reading(sock);
1327
1328 r = uv_timer_stop(&sock->write_timer);
1329 UV_RUNTIME_CHECK(uv_timer_stop, r);
1330 uv_handle_set_data((uv_handle_t *)&sock->write_timer, sock);
1331 uv_close((uv_handle_t *)&sock->write_timer, write_timer_close_cb);
1332 }
1333
1334 void
isc__nm_tcp_close(isc_nmsocket_t * sock)1335 isc__nm_tcp_close(isc_nmsocket_t *sock) {
1336 REQUIRE(VALID_NMSOCK(sock));
1337 REQUIRE(sock->type == isc_nm_tcpsocket);
1338 REQUIRE(!isc__nmsocket_active(sock));
1339
1340 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1341 true)) {
1342 return;
1343 }
1344
1345 if (sock->tid == isc_nm_tid()) {
1346 tcp_close_direct(sock);
1347 } else {
1348 /*
1349 * We need to create an event and pass it using async channel
1350 */
1351 isc__netievent_tcpclose_t *ievent =
1352 isc__nm_get_netievent_tcpclose(sock->mgr, sock);
1353
1354 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1355 (isc__netievent_t *)ievent);
1356 }
1357 }
1358
1359 void
isc__nm_async_tcpclose(isc__networker_t * worker,isc__netievent_t * ev0)1360 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
1361 isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
1362 isc_nmsocket_t *sock = ievent->sock;
1363
1364 REQUIRE(VALID_NMSOCK(sock));
1365 REQUIRE(sock->tid == isc_nm_tid());
1366
1367 UNUSED(worker);
1368
1369 tcp_close_direct(sock);
1370 }
1371
1372 static void
tcp_close_connect_cb(uv_handle_t * handle)1373 tcp_close_connect_cb(uv_handle_t *handle) {
1374 isc_nmsocket_t *sock = uv_handle_get_data(handle);
1375
1376 REQUIRE(VALID_NMSOCK(sock));
1377
1378 REQUIRE(isc__nm_in_netthread());
1379 REQUIRE(sock->tid == isc_nm_tid());
1380
1381 isc__nmsocket_prep_destroy(sock);
1382 isc__nmsocket_detach(&sock);
1383 }
1384
1385 void
isc__nm_tcp_shutdown(isc_nmsocket_t * sock)1386 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
1387 REQUIRE(VALID_NMSOCK(sock));
1388 REQUIRE(sock->tid == isc_nm_tid());
1389 REQUIRE(sock->type == isc_nm_tcpsocket);
1390
1391 /*
1392 * If the socket is active, mark it inactive and
1393 * continue. If it isn't active, stop now.
1394 */
1395 if (!isc__nmsocket_deactivate(sock)) {
1396 return;
1397 }
1398
1399 if (sock->accepting) {
1400 return;
1401 }
1402
1403 if (atomic_load(&sock->connecting)) {
1404 isc_nmsocket_t *tsock = NULL;
1405 isc__nmsocket_attach(sock, &tsock);
1406 uv_close(&sock->uv_handle.handle, tcp_close_connect_cb);
1407 return;
1408 }
1409
1410 if (sock->statichandle != NULL) {
1411 isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
1412 return;
1413 }
1414
1415 /*
1416 * Otherwise, we just send the socket to abyss...
1417 */
1418 if (sock->parent == NULL) {
1419 isc__nmsocket_prep_destroy(sock);
1420 }
1421 }
1422
1423 void
isc__nm_tcp_cancelread(isc_nmhandle_t * handle)1424 isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
1425 isc_nmsocket_t *sock = NULL;
1426 isc__netievent_tcpcancel_t *ievent = NULL;
1427
1428 REQUIRE(VALID_NMHANDLE(handle));
1429
1430 sock = handle->sock;
1431
1432 REQUIRE(VALID_NMSOCK(sock));
1433 REQUIRE(sock->type == isc_nm_tcpsocket);
1434
1435 ievent = isc__nm_get_netievent_tcpcancel(sock->mgr, sock, handle);
1436 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1437 (isc__netievent_t *)ievent);
1438 }
1439
1440 void
isc__nm_async_tcpcancel(isc__networker_t * worker,isc__netievent_t * ev0)1441 isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
1442 isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0;
1443 isc_nmsocket_t *sock = ievent->sock;
1444
1445 REQUIRE(VALID_NMSOCK(sock));
1446 REQUIRE(sock->tid == isc_nm_tid());
1447 UNUSED(worker);
1448
1449 uv_timer_stop(&sock->read_timer);
1450
1451 isc__nm_tcp_failed_read_cb(sock, ISC_R_EOF);
1452 }
1453
1454 int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t * listener)1455 isc__nm_tcp_listener_nactive(isc_nmsocket_t *listener) {
1456 int_fast32_t nactive;
1457
1458 REQUIRE(VALID_NMSOCK(listener));
1459
1460 nactive = atomic_load(&listener->active_child_connections);
1461 INSIST(nactive >= 0);
1462 return (nactive);
1463 }
1464