1 /*
2 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 *
8 * See the COPYRIGHT file distributed with this work for additional
9 * information regarding copyright ownership.
10 */
11
12 #include <libgen.h>
13 #include <unistd.h>
14 #include <uv.h>
15
16 #include <isc/atomic.h>
17 #include <isc/buffer.h>
18 #include <isc/condition.h>
19 #include <isc/log.h>
20 #include <isc/magic.h>
21 #include <isc/mem.h>
22 #include <isc/netmgr.h>
23 #include <isc/quota.h>
24 #include <isc/random.h>
25 #include <isc/refcount.h>
26 #include <isc/region.h>
27 #include <isc/result.h>
28 #include <isc/sockaddr.h>
29 #include <isc/stdtime.h>
30 #include <isc/thread.h>
31 #include <isc/util.h>
32
33 #include "netmgr-int.h"
34 #include "uv-compat.h"
35
36 static atomic_uint_fast32_t last_tcpquota_log = ATOMIC_VAR_INIT(0);
37
38 static bool
can_log_tcp_quota()39 can_log_tcp_quota() {
40 isc_stdtime_t now, last;
41
42 isc_stdtime_get(&now);
43 last = atomic_exchange_relaxed(&last_tcpquota_log, now);
44 if (now != last) {
45 return (true);
46 }
47
48 return (false);
49 }
50
51 static int
52 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
53
54 static void
55 tcp_close_direct(isc_nmsocket_t *sock);
56
57 static isc_result_t
58 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
59 static void
60 tcp_connect_cb(uv_connect_t *uvreq, int status);
61
62 static void
63 tcp_connection_cb(uv_stream_t *server, int status);
64
65 static void
66 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
67
68 static void
69 tcp_close_cb(uv_handle_t *uvhandle);
70
71 static void
72 tcp_listenclose_cb(uv_handle_t *handle);
73 static isc_result_t
74 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
75
76 static void
77 quota_accept_cb(isc_quota_t *quota, void *sock0);
78
79 static int
tcp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)80 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
81 isc__networker_t *worker = NULL;
82 int r;
83
84 REQUIRE(isc__nm_in_netthread());
85
86 worker = &sock->mgr->workers[isc_nm_tid()];
87
88 r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
89 if (r != 0) {
90 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
91 return (r);
92 }
93
94 if (req->local.length != 0) {
95 r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0);
96 if (r != 0) {
97 isc__nm_incstats(sock->mgr,
98 sock->statsindex[STATID_BINDFAIL]);
99 tcp_close_direct(sock);
100 return (r);
101 }
102 }
103 uv_handle_set_data(&sock->uv_handle.handle, sock);
104 r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
105 &req->peer.type.sa, tcp_connect_cb);
106 return (r);
107 }
108
109 void
isc__nm_async_tcpconnect(isc__networker_t * worker,isc__netievent_t * ev0)110 isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
111 isc__netievent_tcpconnect_t *ievent =
112 (isc__netievent_tcpconnect_t *)ev0;
113 isc_nmsocket_t *sock = ievent->sock;
114 isc__nm_uvreq_t *req = ievent->req;
115 int r;
116
117 REQUIRE(sock->type == isc_nm_tcpsocket);
118 REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
119
120 r = tcp_connect_direct(sock, req);
121 if (r != 0) {
122 /* We need to issue callbacks ourselves */
123 tcp_connect_cb(&req->uv_req.connect, r);
124 }
125 }
126
127 static void
tcp_connect_cb(uv_connect_t * uvreq,int status)128 tcp_connect_cb(uv_connect_t *uvreq, int status) {
129 isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
130 isc_nmsocket_t *sock = NULL;
131 sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
132
133 REQUIRE(VALID_UVREQ(req));
134
135 if (status == 0) {
136 isc_result_t result;
137 isc_nmhandle_t *handle = NULL;
138 struct sockaddr_storage ss;
139
140 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
141 uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
142 &(int){ sizeof(ss) });
143 result = isc_sockaddr_fromsockaddr(&sock->peer,
144 (struct sockaddr *)&ss);
145 RUNTIME_CHECK(result == ISC_R_SUCCESS);
146
147 handle = isc__nmhandle_get(sock, NULL, NULL);
148 req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
149 } else {
150 /*
151 * TODO:
152 * Handle the connect error properly and free the socket.
153 */
154 isc__nm_incstats(sock->mgr,
155 sock->statsindex[STATID_CONNECTFAIL]);
156 req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
157 }
158
159 isc__nm_uvreq_put(&req, sock);
160 }
161
162 isc_result_t
isc_nm_listentcp(isc_nm_t * mgr,isc_nmiface_t * iface,isc_nm_accept_cb_t accept_cb,void * accept_cbarg,size_t extrahandlesize,int backlog,isc_quota_t * quota,isc_nmsocket_t ** sockp)163 isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
164 isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
165 size_t extrahandlesize, int backlog, isc_quota_t *quota,
166 isc_nmsocket_t **sockp) {
167 isc_nmsocket_t *nsock = NULL;
168 isc__netievent_tcplisten_t *ievent = NULL;
169
170 REQUIRE(VALID_NM(mgr));
171
172 nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
173 isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
174 nsock->accept_cb.accept = accept_cb;
175 nsock->accept_cbarg = accept_cbarg;
176 nsock->extrahandlesize = extrahandlesize;
177 nsock->backlog = backlog;
178 nsock->result = ISC_R_SUCCESS;
179 if (quota != NULL) {
180 /*
181 * We don't attach to quota, just assign - to avoid
182 * increasing quota unnecessarily.
183 */
184 nsock->pquota = quota;
185 }
186 isc_quota_cb_init(&nsock->quotacb, quota_accept_cb, nsock);
187
188 ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
189 ievent->sock = nsock;
190 if (isc__nm_in_netthread()) {
191 nsock->tid = isc_nm_tid();
192 isc__nm_async_tcplisten(&mgr->workers[nsock->tid],
193 (isc__netievent_t *)ievent);
194 isc__nm_put_ievent(mgr, ievent);
195 } else {
196 nsock->tid = isc_random_uniform(mgr->nworkers);
197 isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
198 (isc__netievent_t *)ievent);
199
200 LOCK(&nsock->lock);
201 while (!atomic_load(&nsock->listening) &&
202 !atomic_load(&nsock->listen_error)) {
203 WAIT(&nsock->cond, &nsock->lock);
204 }
205 UNLOCK(&nsock->lock);
206 }
207
208 if (nsock->result == ISC_R_SUCCESS) {
209 *sockp = nsock;
210 return (ISC_R_SUCCESS);
211 } else {
212 isc_result_t result = nsock->result;
213 isc_nmsocket_detach(&nsock);
214 return (result);
215 }
216 }
217
218 /*
219 * For multi-threaded TCP listening, we create a single socket,
220 * bind to it, and start listening. On an incoming connection we accept
221 * it, and then pass the accepted socket using the uv_export/uv_import
222 * mechanism to a child thread.
223 */
224 void
isc__nm_async_tcplisten(isc__networker_t * worker,isc__netievent_t * ev0)225 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
226 isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0;
227 isc_nmsocket_t *sock = ievent->sock;
228 struct sockaddr_storage sname;
229 int r, flags = 0, snamelen = sizeof(sname);
230
231 REQUIRE(isc__nm_in_netthread());
232 REQUIRE(sock->type == isc_nm_tcplistener);
233
234 r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
235 if (r != 0) {
236 /* It was never opened */
237 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
238 atomic_store(&sock->closed, true);
239 sock->result = isc__nm_uverr2result(r);
240 atomic_store(&sock->listen_error, true);
241 goto done;
242 }
243
244 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
245
246 if (sock->iface->addr.type.sa.sa_family == AF_INET6) {
247 flags = UV_TCP_IPV6ONLY;
248 }
249
250 r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa,
251 flags);
252 if (r != 0) {
253 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
254 uv_close(&sock->uv_handle.handle, tcp_close_cb);
255 sock->result = isc__nm_uverr2result(r);
256 atomic_store(&sock->listen_error, true);
257 goto done;
258 }
259
260 /*
261 * By doing this now, we can find out immediately whether bind()
262 * failed, and quit if so. (uv_bind() uses a delayed error,
263 * initially returning success even if bind() fails, and this
264 * could cause a deadlock later if we didn't check first.)
265 */
266 r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&sname,
267 &snamelen);
268 if (r != 0) {
269 uv_close(&sock->uv_handle.handle, tcp_close_cb);
270 sock->result = isc__nm_uverr2result(r);
271 atomic_store(&sock->listen_error, true);
272 goto done;
273 }
274
275 /*
276 * The callback will run in the same thread uv_listen() was called
277 * from, so a race with tcp_connection_cb() isn't possible.
278 */
279 r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
280 tcp_connection_cb);
281 if (r != 0) {
282 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
283 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
284 "uv_listen failed: %s",
285 isc_result_totext(isc__nm_uverr2result(r)));
286 uv_close(&sock->uv_handle.handle, tcp_close_cb);
287 sock->result = isc__nm_uverr2result(r);
288 atomic_store(&sock->listen_error, true);
289 goto done;
290 }
291
292 uv_handle_set_data(&sock->uv_handle.handle, sock);
293
294 atomic_store(&sock->listening, true);
295
296 done:
297 LOCK(&sock->lock);
298 SIGNAL(&sock->cond);
299 UNLOCK(&sock->lock);
300 return;
301 }
302
303 static void
tcp_connection_cb(uv_stream_t * server,int status)304 tcp_connection_cb(uv_stream_t *server, int status) {
305 isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server);
306 isc_result_t result;
307
308 UNUSED(status);
309
310 result = accept_connection(psock, NULL);
311 if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
312 if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
313 can_log_tcp_quota()) {
314 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
315 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
316 "TCP connection failed: %s",
317 isc_result_totext(result));
318 }
319 }
320 }
321
322 void
isc__nm_async_tcpchildaccept(isc__networker_t * worker,isc__netievent_t * ev0)323 isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
324 isc__netievent_tcpchildaccept_t *ievent =
325 (isc__netievent_tcpchildaccept_t *)ev0;
326 isc_nmsocket_t *ssock = ievent->sock;
327 isc_nmsocket_t *csock = NULL;
328 isc_nmhandle_t *handle;
329 isc_result_t result;
330 struct sockaddr_storage ss;
331 isc_sockaddr_t local;
332 int r;
333
334 REQUIRE(isc__nm_in_netthread());
335 REQUIRE(ssock->type == isc_nm_tcplistener);
336
337 csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
338 isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
339 csock->tid = isc_nm_tid();
340 csock->extrahandlesize = ssock->extrahandlesize;
341
342 csock->quota = ievent->quota;
343 ievent->quota = NULL;
344
345 worker = &ssock->mgr->workers[isc_nm_tid()];
346 uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
347
348 r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
349 if (r != 0) {
350 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
351 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
352 "uv_import failed: %s",
353 isc_result_totext(isc__nm_uverr2result(r)));
354 result = isc__nm_uverr2result(r);
355 goto error;
356 }
357
358 r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
359 &(int){ sizeof(ss) });
360 if (r != 0) {
361 result = isc__nm_uverr2result(r);
362 goto error;
363 }
364
365 result = isc_sockaddr_fromsockaddr(&csock->peer,
366 (struct sockaddr *)&ss);
367 if (result != ISC_R_SUCCESS) {
368 goto error;
369 }
370
371 r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
372 &(int){ sizeof(ss) });
373 if (r != 0) {
374 result = isc__nm_uverr2result(r);
375 goto error;
376 }
377
378 result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
379 if (result != ISC_R_SUCCESS) {
380 goto error;
381 }
382
383 isc_nmsocket_attach(ssock, &csock->server);
384
385 handle = isc__nmhandle_get(csock, NULL, &local);
386
387 INSIST(ssock->accept_cb.accept != NULL);
388 csock->read_timeout = ssock->mgr->init;
389 ssock->accept_cb.accept(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
390 isc_nmsocket_detach(&csock);
391 return;
392
393 error:
394 /*
395 * Detach the quota early to make room for other connections;
396 * otherwise it'd be detached later asynchronously, and clog
397 * the quota unnecessarily.
398 */
399 if (csock->quota != NULL) {
400 isc_quota_detach(&csock->quota);
401 }
402 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
403 ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
404 isc_result_totext(result));
405
406 /*
407 * Detach the socket properly to make sure uv_close() is called.
408 */
409 isc_nmsocket_detach(&csock);
410 }
411
412 void
isc__nm_tcp_stoplistening(isc_nmsocket_t * sock)413 isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
414 isc__netievent_tcpstop_t *ievent = NULL;
415
416 REQUIRE(VALID_NMSOCK(sock));
417 REQUIRE(!isc__nm_in_netthread());
418
419 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
420 isc_nmsocket_attach(sock, &ievent->sock);
421 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
422 (isc__netievent_t *)ievent);
423 }
424
425 void
isc__nm_async_tcpstop(isc__networker_t * worker,isc__netievent_t * ev0)426 isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
427 isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
428 isc_nmsocket_t *sock = ievent->sock;
429
430 UNUSED(worker);
431
432 REQUIRE(isc__nm_in_netthread());
433 REQUIRE(VALID_NMSOCK(sock));
434 REQUIRE(sock->type == isc_nm_tcplistener);
435
436 /*
437 * If network manager is interlocked, re-enqueue the event for later.
438 */
439 if (!isc__nm_acquire_interlocked(sock->mgr)) {
440 isc__netievent_tcpstop_t *event = NULL;
441
442 event = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
443 event->sock = sock;
444 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
445 (isc__netievent_t *)event);
446 } else {
447 uv_close((uv_handle_t *)&sock->uv_handle.tcp,
448 tcp_listenclose_cb);
449 isc__nm_drop_interlocked(sock->mgr);
450 }
451 }
452
453 /*
454 * This callback is used for closing listening sockets.
455 */
456 static void
tcp_listenclose_cb(uv_handle_t * handle)457 tcp_listenclose_cb(uv_handle_t *handle) {
458 isc_nmsocket_t *sock = uv_handle_get_data(handle);
459
460 LOCK(&sock->lock);
461 atomic_store(&sock->closed, true);
462 atomic_store(&sock->listening, false);
463 sock->pquota = NULL;
464 UNLOCK(&sock->lock);
465
466 isc_nmsocket_detach(&sock);
467 }
468
469 static void
readtimeout_cb(uv_timer_t * handle)470 readtimeout_cb(uv_timer_t *handle) {
471 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
472
473 REQUIRE(VALID_NMSOCK(sock));
474 REQUIRE(sock->tid == isc_nm_tid());
475
476 /*
477 * Socket is actively processing something, so restart the timer
478 * and return.
479 */
480 if (atomic_load(&sock->processing)) {
481 uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
482 return;
483 }
484
485 /*
486 * Timeout; stop reading and process whatever we have.
487 */
488 uv_read_stop(&sock->uv_handle.stream);
489 if (sock->quota) {
490 isc_quota_detach(&sock->quota);
491 }
492 sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
493 }
494
495 isc_result_t
isc__nm_tcp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)496 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
497 isc_nmsocket_t *sock = NULL;
498 isc__netievent_startread_t *ievent = NULL;
499
500 REQUIRE(VALID_NMHANDLE(handle));
501 REQUIRE(VALID_NMSOCK(handle->sock));
502
503 sock = handle->sock;
504 sock->rcb.recv = cb;
505 sock->rcbarg = cbarg;
506
507 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
508 ievent->sock = sock;
509
510 if (sock->tid == isc_nm_tid()) {
511 isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
512 (isc__netievent_t *)ievent);
513 isc__nm_put_ievent(sock->mgr, ievent);
514 } else {
515 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
516 (isc__netievent_t *)ievent);
517 }
518
519 return (ISC_R_SUCCESS);
520 }
521
522 void
isc__nm_async_tcp_startread(isc__networker_t * worker,isc__netievent_t * ev0)523 isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
524 isc__netievent_startread_t *ievent = (isc__netievent_startread_t *)ev0;
525 isc_nmsocket_t *sock = ievent->sock;
526 int r;
527
528 REQUIRE(worker->id == isc_nm_tid());
529 if (sock->read_timeout != 0) {
530 if (!sock->timer_initialized) {
531 uv_timer_init(&worker->loop, &sock->timer);
532 uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
533 sock->timer_initialized = true;
534 }
535 uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout,
536 0);
537 }
538
539 r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb);
540 if (r != 0) {
541 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
542 }
543 }
544
545 isc_result_t
isc__nm_tcp_pauseread(isc_nmsocket_t * sock)546 isc__nm_tcp_pauseread(isc_nmsocket_t *sock) {
547 isc__netievent_pauseread_t *ievent = NULL;
548
549 REQUIRE(VALID_NMSOCK(sock));
550
551 if (atomic_load(&sock->readpaused)) {
552 return (ISC_R_SUCCESS);
553 }
554
555 atomic_store(&sock->readpaused, true);
556 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
557 ievent->sock = sock;
558
559 if (sock->tid == isc_nm_tid()) {
560 isc__nm_async_tcp_pauseread(&sock->mgr->workers[sock->tid],
561 (isc__netievent_t *)ievent);
562 isc__nm_put_ievent(sock->mgr, ievent);
563 } else {
564 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
565 (isc__netievent_t *)ievent);
566 }
567
568 return (ISC_R_SUCCESS);
569 }
570
571 void
isc__nm_async_tcp_pauseread(isc__networker_t * worker,isc__netievent_t * ev0)572 isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
573 isc__netievent_pauseread_t *ievent = (isc__netievent_pauseread_t *)ev0;
574 isc_nmsocket_t *sock = ievent->sock;
575
576 REQUIRE(VALID_NMSOCK(sock));
577 REQUIRE(worker->id == isc_nm_tid());
578
579 if (sock->timer_initialized) {
580 uv_timer_stop(&sock->timer);
581 }
582 uv_read_stop(&sock->uv_handle.stream);
583 }
584
585 isc_result_t
isc__nm_tcp_resumeread(isc_nmsocket_t * sock)586 isc__nm_tcp_resumeread(isc_nmsocket_t *sock) {
587 isc__netievent_startread_t *ievent = NULL;
588
589 REQUIRE(VALID_NMSOCK(sock));
590 REQUIRE(sock->rcb.recv != NULL);
591
592 if (!atomic_load(&sock->readpaused)) {
593 return (ISC_R_SUCCESS);
594 }
595
596 atomic_store(&sock->readpaused, false);
597
598 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
599 ievent->sock = sock;
600
601 if (sock->tid == isc_nm_tid()) {
602 isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
603 (isc__netievent_t *)ievent);
604 isc__nm_put_ievent(sock->mgr, ievent);
605 } else {
606 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
607 (isc__netievent_t *)ievent);
608 }
609
610 return (ISC_R_SUCCESS);
611 }
612
613 static void
read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)614 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
615 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
616
617 REQUIRE(VALID_NMSOCK(sock));
618 REQUIRE(buf != NULL);
619
620 if (nread >= 0) {
621 isc_region_t region = { .base = (unsigned char *)buf->base,
622 .length = nread };
623
624 if (sock->rcb.recv != NULL) {
625 sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg);
626 }
627
628 sock->read_timeout = (atomic_load(&sock->keepalive)
629 ? sock->mgr->keepalive
630 : sock->mgr->idle);
631
632 if (sock->timer_initialized && sock->read_timeout != 0) {
633 /* The timer will be updated */
634 uv_timer_start(&sock->timer, readtimeout_cb,
635 sock->read_timeout, 0);
636 }
637
638 isc__nm_free_uvbuf(sock, buf);
639 return;
640 }
641
642 isc__nm_free_uvbuf(sock, buf);
643
644 /*
645 * This might happen if the inner socket is closing. It means that
646 * it's detached, so the socket will be closed.
647 */
648 if (sock->rcb.recv != NULL) {
649 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
650 sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
651 }
652
653 /*
654 * We don't need to clean up now; the socket will be closed and
655 * resources and quota reclaimed when handle is freed in
656 * isc__nm_tcp_close().
657 */
658 }
659
660 static void
quota_accept_cb(isc_quota_t * quota,void * sock0)661 quota_accept_cb(isc_quota_t *quota, void *sock0) {
662 isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
663 isc__netievent_tcpaccept_t *ievent = NULL;
664
665 REQUIRE(VALID_NMSOCK(sock));
666
667 /*
668 * Create a tcpaccept event and pass it using the async channel.
669 */
670 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpaccept);
671 ievent->sock = sock;
672 ievent->quota = quota;
673 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
674 (isc__netievent_t *)ievent);
675 }
676
677 /*
678 * This is called after we get a quota_accept_cb() callback.
679 */
680 void
isc__nm_async_tcpaccept(isc__networker_t * worker,isc__netievent_t * ev0)681 isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
682 isc_result_t result;
683 isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
684
685 REQUIRE(worker->id == ievent->sock->tid);
686
687 result = accept_connection(ievent->sock, ievent->quota);
688 if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
689 if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
690 can_log_tcp_quota()) {
691 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
692 ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
693 "TCP connection failed: %s",
694 isc_result_totext(result));
695 }
696 }
697
698 /*
699 * The socket was attached just before we called isc_quota_attach_cb().
700 */
701 isc_nmsocket_detach(&ievent->sock);
702 }
703
704 /*
705 * Close callback for uv_tcp_t strutures created in accept_connection().
706 */
707 static void
free_uvtcpt(uv_handle_t * uvs)708 free_uvtcpt(uv_handle_t *uvs) {
709 isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs);
710 isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t));
711 }
712
713 static isc_result_t
accept_connection(isc_nmsocket_t * ssock,isc_quota_t * quota)714 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
715 isc_result_t result;
716 isc__netievent_tcpchildaccept_t *event = NULL;
717 isc__networker_t *worker = NULL;
718 uv_tcp_t *uvstream = NULL;
719 isc_mem_t *mctx = NULL;
720 int r, w;
721
722 REQUIRE(VALID_NMSOCK(ssock));
723 REQUIRE(ssock->tid == isc_nm_tid());
724
725 if (!atomic_load_relaxed(&ssock->active) ||
726 atomic_load_relaxed(&ssock->mgr->closing))
727 {
728 /* We're closing, bail */
729 if (quota != NULL) {
730 isc_quota_detach("a);
731 }
732 return (ISC_R_CANCELED);
733 }
734
735 /* We can be called directly or as a callback from quota */
736 if (ssock->pquota != NULL && quota == NULL) {
737 /*
738 * We need to attach to ssock, because it might be queued
739 * waiting for a TCP quota slot. If so, then we'll detach it
740 * later when the connection is accepted. (XXX: This may be
741 * suboptimal, it might be better not to attach unless
742 * we need to - but we risk a race then.)
743 */
744 isc_nmsocket_t *tsock = NULL;
745 isc_nmsocket_attach(ssock, &tsock);
746 result = isc_quota_attach_cb(ssock->pquota, "a,
747 &ssock->quotacb);
748 if (result == ISC_R_QUOTA) {
749 isc__nm_incstats(ssock->mgr,
750 ssock->statsindex[STATID_ACCEPTFAIL]);
751 return (result);
752 }
753
754 /*
755 * We're under quota, so there's no need to wait;
756 * Detach the socket.
757 */
758 isc_nmsocket_detach(&tsock);
759 }
760
761 isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
762
763 worker = &ssock->mgr->workers[isc_nm_tid()];
764 uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t));
765
766 isc_mem_attach(ssock->mgr->mctx, &mctx);
767 uv_handle_set_data((uv_handle_t *)uvstream, mctx);
768 mctx = NULL; /* Detached later in free_uvtcpt() */
769
770 uv_tcp_init(&worker->loop, uvstream);
771
772 r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream);
773 if (r != 0) {
774 result = isc__nm_uverr2result(r);
775 uv_close((uv_handle_t *)uvstream, free_uvtcpt);
776 isc_quota_detach("a);
777 return (result);
778 }
779
780 /* We have an accepted TCP socket, pass it to a random worker */
781 w = isc_random_uniform(ssock->mgr->nworkers);
782 event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
783 event->sock = ssock;
784 event->quota = quota;
785
786 r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
787 RUNTIME_CHECK(r == 0);
788
789 uv_close((uv_handle_t *)uvstream, free_uvtcpt);
790
791 if (w == isc_nm_tid()) {
792 isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w],
793 (isc__netievent_t *)event);
794 isc__nm_put_ievent(ssock->mgr, event);
795 } else {
796 isc__nm_enqueue_ievent(&ssock->mgr->workers[w],
797 (isc__netievent_t *)event);
798 }
799
800 return (ISC_R_SUCCESS);
801 }
802
803 isc_result_t
isc__nm_tcp_send(isc_nmhandle_t * handle,isc_region_t * region,isc_nm_cb_t cb,void * cbarg)804 isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
805 void *cbarg) {
806 isc_nmsocket_t *sock = handle->sock;
807 isc__netievent_tcpsend_t *ievent = NULL;
808 isc__nm_uvreq_t *uvreq = NULL;
809
810 REQUIRE(sock->type == isc_nm_tcpsocket);
811
812 uvreq = isc__nm_uvreq_get(sock->mgr, sock);
813 uvreq->uvbuf.base = (char *)region->base;
814 uvreq->uvbuf.len = region->length;
815 uvreq->handle = handle;
816 isc_nmhandle_ref(uvreq->handle);
817 uvreq->cb.send = cb;
818 uvreq->cbarg = cbarg;
819
820 if (sock->tid == isc_nm_tid()) {
821 /*
822 * If we're in the same thread as the socket we can send the
823 * data directly
824 */
825 return (tcp_send_direct(sock, uvreq));
826 } else {
827 /*
828 * We need to create an event and pass it using async channel
829 */
830 ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpsend);
831 ievent->sock = sock;
832 ievent->req = uvreq;
833 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
834 (isc__netievent_t *)ievent);
835 return (ISC_R_SUCCESS);
836 }
837
838 return (ISC_R_UNEXPECTED);
839 }
840
841 static void
tcp_send_cb(uv_write_t * req,int status)842 tcp_send_cb(uv_write_t *req, int status) {
843 isc_result_t result = ISC_R_SUCCESS;
844 isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
845
846 REQUIRE(VALID_UVREQ(uvreq));
847 REQUIRE(VALID_NMHANDLE(uvreq->handle));
848
849 if (status < 0) {
850 result = isc__nm_uverr2result(status);
851 isc__nm_incstats(uvreq->sock->mgr,
852 uvreq->sock->statsindex[STATID_SENDFAIL]);
853 }
854
855 uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
856 isc_nmhandle_unref(uvreq->handle);
857 isc__nm_uvreq_put(&uvreq, uvreq->handle->sock);
858 }
859
860 /*
861 * Handle 'tcpsend' async event - send a packet on the socket
862 */
863 void
isc__nm_async_tcpsend(isc__networker_t * worker,isc__netievent_t * ev0)864 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
865 isc_result_t result;
866 isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
867
868 REQUIRE(worker->id == ievent->sock->tid);
869
870 if (!atomic_load(&ievent->sock->active)) {
871 return;
872 }
873
874 result = tcp_send_direct(ievent->sock, ievent->req);
875 if (result != ISC_R_SUCCESS) {
876 ievent->req->cb.send(ievent->req->handle, result,
877 ievent->req->cbarg);
878 isc__nm_uvreq_put(&ievent->req, ievent->req->handle->sock);
879 }
880 }
881
882 static isc_result_t
tcp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)883 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
884 int r;
885
886 REQUIRE(sock->tid == isc_nm_tid());
887 REQUIRE(sock->type == isc_nm_tcpsocket);
888
889 isc_nmhandle_ref(req->handle);
890 r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf,
891 1, tcp_send_cb);
892 if (r < 0) {
893 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
894 req->cb.send(NULL, isc__nm_uverr2result(r), req->cbarg);
895 isc__nm_uvreq_put(&req, sock);
896 return (isc__nm_uverr2result(r));
897 }
898
899 return (ISC_R_SUCCESS);
900 }
901
902 static void
tcp_close_cb(uv_handle_t * uvhandle)903 tcp_close_cb(uv_handle_t *uvhandle) {
904 isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
905
906 REQUIRE(VALID_NMSOCK(sock));
907
908 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
909 atomic_store(&sock->closed, true);
910 isc__nmsocket_prep_destroy(sock);
911 }
912
913 static void
timer_close_cb(uv_handle_t * uvhandle)914 timer_close_cb(uv_handle_t *uvhandle) {
915 isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
916
917 REQUIRE(VALID_NMSOCK(sock));
918
919 isc_nmsocket_detach(&sock->server);
920 uv_close(&sock->uv_handle.handle, tcp_close_cb);
921 }
922
923 static void
tcp_close_direct(isc_nmsocket_t * sock)924 tcp_close_direct(isc_nmsocket_t *sock) {
925 REQUIRE(VALID_NMSOCK(sock));
926 REQUIRE(sock->tid == isc_nm_tid());
927 REQUIRE(sock->type == isc_nm_tcpsocket);
928 if (sock->quota != NULL) {
929 isc_quota_detach(&sock->quota);
930 }
931 if (sock->timer_initialized) {
932 sock->timer_initialized = false;
933 uv_timer_stop(&sock->timer);
934 uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
935 } else {
936 if (sock->server != NULL) {
937 isc_nmsocket_detach(&sock->server);
938 }
939 uv_close(&sock->uv_handle.handle, tcp_close_cb);
940 }
941 }
942
943 void
isc__nm_tcp_close(isc_nmsocket_t * sock)944 isc__nm_tcp_close(isc_nmsocket_t *sock) {
945 REQUIRE(VALID_NMSOCK(sock));
946 REQUIRE(sock->type == isc_nm_tcpsocket);
947
948 if (sock->tid == isc_nm_tid()) {
949 tcp_close_direct(sock);
950 } else {
951 /*
952 * We need to create an event and pass it using async channel
953 */
954 isc__netievent_tcpclose_t *ievent =
955 isc__nm_get_ievent(sock->mgr, netievent_tcpclose);
956
957 ievent->sock = sock;
958 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
959 (isc__netievent_t *)ievent);
960 }
961 }
962
963 void
isc__nm_async_tcpclose(isc__networker_t * worker,isc__netievent_t * ev0)964 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
965 isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
966
967 REQUIRE(worker->id == ievent->sock->tid);
968
969 tcp_close_direct(ievent->sock);
970 }
971
972 void
isc__nm_tcp_shutdown(isc_nmsocket_t * sock)973 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
974 REQUIRE(VALID_NMSOCK(sock));
975
976 if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
977 sock->rcb.recv != NULL)
978 {
979 sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
980 }
981 }
982