1 /*
2  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  *
8  * See the COPYRIGHT file distributed with this work for additional
9  * information regarding copyright ownership.
10  */
11 
12 #include <libgen.h>
13 #include <unistd.h>
14 #include <uv.h>
15 
16 #include <isc/atomic.h>
17 #include <isc/buffer.h>
18 #include <isc/condition.h>
19 #include <isc/log.h>
20 #include <isc/magic.h>
21 #include <isc/mem.h>
22 #include <isc/netmgr.h>
23 #include <isc/quota.h>
24 #include <isc/random.h>
25 #include <isc/refcount.h>
26 #include <isc/region.h>
27 #include <isc/result.h>
28 #include <isc/sockaddr.h>
29 #include <isc/stdtime.h>
30 #include <isc/thread.h>
31 #include <isc/util.h>
32 
33 #include "netmgr-int.h"
34 #include "uv-compat.h"
35 
36 static atomic_uint_fast32_t last_tcpquota_log = ATOMIC_VAR_INIT(0);
37 
38 static bool
can_log_tcp_quota()39 can_log_tcp_quota() {
40 	isc_stdtime_t now, last;
41 
42 	isc_stdtime_get(&now);
43 	last = atomic_exchange_relaxed(&last_tcpquota_log, now);
44 	if (now != last) {
45 		return (true);
46 	}
47 
48 	return (false);
49 }
50 
51 static int
52 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
53 
54 static void
55 tcp_close_direct(isc_nmsocket_t *sock);
56 
57 static isc_result_t
58 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
59 static void
60 tcp_connect_cb(uv_connect_t *uvreq, int status);
61 
62 static void
63 tcp_connection_cb(uv_stream_t *server, int status);
64 
65 static void
66 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
67 
68 static void
69 tcp_close_cb(uv_handle_t *uvhandle);
70 
71 static void
72 tcp_listenclose_cb(uv_handle_t *handle);
73 static isc_result_t
74 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
75 
76 static void
77 quota_accept_cb(isc_quota_t *quota, void *sock0);
78 
79 static int
tcp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)80 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
81 	isc__networker_t *worker = NULL;
82 	int r;
83 
84 	REQUIRE(isc__nm_in_netthread());
85 
86 	worker = &sock->mgr->workers[isc_nm_tid()];
87 
88 	r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
89 	if (r != 0) {
90 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
91 		return (r);
92 	}
93 
94 	if (req->local.length != 0) {
95 		r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0);
96 		if (r != 0) {
97 			isc__nm_incstats(sock->mgr,
98 					 sock->statsindex[STATID_BINDFAIL]);
99 			tcp_close_direct(sock);
100 			return (r);
101 		}
102 	}
103 	uv_handle_set_data(&sock->uv_handle.handle, sock);
104 	r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
105 			   &req->peer.type.sa, tcp_connect_cb);
106 	return (r);
107 }
108 
109 void
isc__nm_async_tcpconnect(isc__networker_t * worker,isc__netievent_t * ev0)110 isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
111 	isc__netievent_tcpconnect_t *ievent =
112 		(isc__netievent_tcpconnect_t *)ev0;
113 	isc_nmsocket_t *sock = ievent->sock;
114 	isc__nm_uvreq_t *req = ievent->req;
115 	int r;
116 
117 	REQUIRE(sock->type == isc_nm_tcpsocket);
118 	REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
119 
120 	r = tcp_connect_direct(sock, req);
121 	if (r != 0) {
122 		/* We need to issue callbacks ourselves */
123 		tcp_connect_cb(&req->uv_req.connect, r);
124 	}
125 }
126 
127 static void
tcp_connect_cb(uv_connect_t * uvreq,int status)128 tcp_connect_cb(uv_connect_t *uvreq, int status) {
129 	isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
130 	isc_nmsocket_t *sock = NULL;
131 	sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
132 
133 	REQUIRE(VALID_UVREQ(req));
134 
135 	if (status == 0) {
136 		isc_result_t result;
137 		isc_nmhandle_t *handle = NULL;
138 		struct sockaddr_storage ss;
139 
140 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
141 		uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
142 				   &(int){ sizeof(ss) });
143 		result = isc_sockaddr_fromsockaddr(&sock->peer,
144 						   (struct sockaddr *)&ss);
145 		RUNTIME_CHECK(result == ISC_R_SUCCESS);
146 
147 		handle = isc__nmhandle_get(sock, NULL, NULL);
148 		req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
149 	} else {
150 		/*
151 		 * TODO:
152 		 * Handle the connect error properly and free the socket.
153 		 */
154 		isc__nm_incstats(sock->mgr,
155 				 sock->statsindex[STATID_CONNECTFAIL]);
156 		req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
157 	}
158 
159 	isc__nm_uvreq_put(&req, sock);
160 }
161 
162 isc_result_t
isc_nm_listentcp(isc_nm_t * mgr,isc_nmiface_t * iface,isc_nm_accept_cb_t accept_cb,void * accept_cbarg,size_t extrahandlesize,int backlog,isc_quota_t * quota,isc_nmsocket_t ** sockp)163 isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
164 		 isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
165 		 size_t extrahandlesize, int backlog, isc_quota_t *quota,
166 		 isc_nmsocket_t **sockp) {
167 	isc_nmsocket_t *nsock = NULL;
168 	isc__netievent_tcplisten_t *ievent = NULL;
169 
170 	REQUIRE(VALID_NM(mgr));
171 
172 	nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
173 	isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
174 	nsock->accept_cb.accept = accept_cb;
175 	nsock->accept_cbarg = accept_cbarg;
176 	nsock->extrahandlesize = extrahandlesize;
177 	nsock->backlog = backlog;
178 	nsock->result = ISC_R_SUCCESS;
179 	if (quota != NULL) {
180 		/*
181 		 * We don't attach to quota, just assign - to avoid
182 		 * increasing quota unnecessarily.
183 		 */
184 		nsock->pquota = quota;
185 	}
186 	isc_quota_cb_init(&nsock->quotacb, quota_accept_cb, nsock);
187 
188 	ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
189 	ievent->sock = nsock;
190 	if (isc__nm_in_netthread()) {
191 		nsock->tid = isc_nm_tid();
192 		isc__nm_async_tcplisten(&mgr->workers[nsock->tid],
193 					(isc__netievent_t *)ievent);
194 		isc__nm_put_ievent(mgr, ievent);
195 	} else {
196 		nsock->tid = isc_random_uniform(mgr->nworkers);
197 		isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
198 				       (isc__netievent_t *)ievent);
199 
200 		LOCK(&nsock->lock);
201 		while (!atomic_load(&nsock->listening) &&
202 		       !atomic_load(&nsock->listen_error)) {
203 			WAIT(&nsock->cond, &nsock->lock);
204 		}
205 		UNLOCK(&nsock->lock);
206 	}
207 
208 	if (nsock->result == ISC_R_SUCCESS) {
209 		*sockp = nsock;
210 		return (ISC_R_SUCCESS);
211 	} else {
212 		isc_result_t result = nsock->result;
213 		isc_nmsocket_detach(&nsock);
214 		return (result);
215 	}
216 }
217 
218 /*
219  * For multi-threaded TCP listening, we create a single socket,
220  * bind to it, and start listening. On an incoming connection we accept
221  * it, and then pass the accepted socket using the uv_export/uv_import
222  * mechanism to a child thread.
223  */
224 void
isc__nm_async_tcplisten(isc__networker_t * worker,isc__netievent_t * ev0)225 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
226 	isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0;
227 	isc_nmsocket_t *sock = ievent->sock;
228 	struct sockaddr_storage sname;
229 	int r, flags = 0, snamelen = sizeof(sname);
230 
231 	REQUIRE(isc__nm_in_netthread());
232 	REQUIRE(sock->type == isc_nm_tcplistener);
233 
234 	r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
235 	if (r != 0) {
236 		/* It was never opened */
237 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
238 		atomic_store(&sock->closed, true);
239 		sock->result = isc__nm_uverr2result(r);
240 		atomic_store(&sock->listen_error, true);
241 		goto done;
242 	}
243 
244 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
245 
246 	if (sock->iface->addr.type.sa.sa_family == AF_INET6) {
247 		flags = UV_TCP_IPV6ONLY;
248 	}
249 
250 	r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa,
251 			flags);
252 	if (r != 0) {
253 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
254 		uv_close(&sock->uv_handle.handle, tcp_close_cb);
255 		sock->result = isc__nm_uverr2result(r);
256 		atomic_store(&sock->listen_error, true);
257 		goto done;
258 	}
259 
260 	/*
261 	 * By doing this now, we can find out immediately whether bind()
262 	 * failed, and quit if so. (uv_bind() uses a delayed error,
263 	 * initially returning success even if bind() fails, and this
264 	 * could cause a deadlock later if we didn't check first.)
265 	 */
266 	r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&sname,
267 			       &snamelen);
268 	if (r != 0) {
269 		uv_close(&sock->uv_handle.handle, tcp_close_cb);
270 		sock->result = isc__nm_uverr2result(r);
271 		atomic_store(&sock->listen_error, true);
272 		goto done;
273 	}
274 
275 	/*
276 	 * The callback will run in the same thread uv_listen() was called
277 	 * from, so a race with tcp_connection_cb() isn't possible.
278 	 */
279 	r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
280 		      tcp_connection_cb);
281 	if (r != 0) {
282 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
283 			      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
284 			      "uv_listen failed: %s",
285 			      isc_result_totext(isc__nm_uverr2result(r)));
286 		uv_close(&sock->uv_handle.handle, tcp_close_cb);
287 		sock->result = isc__nm_uverr2result(r);
288 		atomic_store(&sock->listen_error, true);
289 		goto done;
290 	}
291 
292 	uv_handle_set_data(&sock->uv_handle.handle, sock);
293 
294 	atomic_store(&sock->listening, true);
295 
296 done:
297 	LOCK(&sock->lock);
298 	SIGNAL(&sock->cond);
299 	UNLOCK(&sock->lock);
300 	return;
301 }
302 
303 static void
tcp_connection_cb(uv_stream_t * server,int status)304 tcp_connection_cb(uv_stream_t *server, int status) {
305 	isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server);
306 	isc_result_t result;
307 
308 	UNUSED(status);
309 
310 	result = accept_connection(psock, NULL);
311 	if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
312 		if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
313 		    can_log_tcp_quota()) {
314 			isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
315 				      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
316 				      "TCP connection failed: %s",
317 				      isc_result_totext(result));
318 		}
319 	}
320 }
321 
322 void
isc__nm_async_tcpchildaccept(isc__networker_t * worker,isc__netievent_t * ev0)323 isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
324 	isc__netievent_tcpchildaccept_t *ievent =
325 		(isc__netievent_tcpchildaccept_t *)ev0;
326 	isc_nmsocket_t *ssock = ievent->sock;
327 	isc_nmsocket_t *csock = NULL;
328 	isc_nmhandle_t *handle;
329 	isc_result_t result;
330 	struct sockaddr_storage ss;
331 	isc_sockaddr_t local;
332 	int r;
333 
334 	REQUIRE(isc__nm_in_netthread());
335 	REQUIRE(ssock->type == isc_nm_tcplistener);
336 
337 	csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
338 	isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
339 	csock->tid = isc_nm_tid();
340 	csock->extrahandlesize = ssock->extrahandlesize;
341 
342 	csock->quota = ievent->quota;
343 	ievent->quota = NULL;
344 
345 	worker = &ssock->mgr->workers[isc_nm_tid()];
346 	uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
347 
348 	r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
349 	if (r != 0) {
350 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
351 			      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
352 			      "uv_import failed: %s",
353 			      isc_result_totext(isc__nm_uverr2result(r)));
354 		result = isc__nm_uverr2result(r);
355 		goto error;
356 	}
357 
358 	r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
359 			       &(int){ sizeof(ss) });
360 	if (r != 0) {
361 		result = isc__nm_uverr2result(r);
362 		goto error;
363 	}
364 
365 	result = isc_sockaddr_fromsockaddr(&csock->peer,
366 					   (struct sockaddr *)&ss);
367 	if (result != ISC_R_SUCCESS) {
368 		goto error;
369 	}
370 
371 	r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
372 			       &(int){ sizeof(ss) });
373 	if (r != 0) {
374 		result = isc__nm_uverr2result(r);
375 		goto error;
376 	}
377 
378 	result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
379 	if (result != ISC_R_SUCCESS) {
380 		goto error;
381 	}
382 
383 	isc_nmsocket_attach(ssock, &csock->server);
384 
385 	handle = isc__nmhandle_get(csock, NULL, &local);
386 
387 	INSIST(ssock->accept_cb.accept != NULL);
388 	csock->read_timeout = ssock->mgr->init;
389 	ssock->accept_cb.accept(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
390 	isc_nmsocket_detach(&csock);
391 	return;
392 
393 error:
394 	/*
395 	 * Detach the quota early to make room for other connections;
396 	 * otherwise it'd be detached later asynchronously, and clog
397 	 * the quota unnecessarily.
398 	 */
399 	if (csock->quota != NULL) {
400 		isc_quota_detach(&csock->quota);
401 	}
402 	isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
403 		      ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
404 		      isc_result_totext(result));
405 
406 	/*
407 	 * Detach the socket properly to make sure uv_close() is called.
408 	 */
409 	isc_nmsocket_detach(&csock);
410 }
411 
412 void
isc__nm_tcp_stoplistening(isc_nmsocket_t * sock)413 isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
414 	isc__netievent_tcpstop_t *ievent = NULL;
415 
416 	REQUIRE(VALID_NMSOCK(sock));
417 	REQUIRE(!isc__nm_in_netthread());
418 
419 	ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
420 	isc_nmsocket_attach(sock, &ievent->sock);
421 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
422 			       (isc__netievent_t *)ievent);
423 }
424 
425 void
isc__nm_async_tcpstop(isc__networker_t * worker,isc__netievent_t * ev0)426 isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
427 	isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
428 	isc_nmsocket_t *sock = ievent->sock;
429 
430 	UNUSED(worker);
431 
432 	REQUIRE(isc__nm_in_netthread());
433 	REQUIRE(VALID_NMSOCK(sock));
434 	REQUIRE(sock->type == isc_nm_tcplistener);
435 
436 	/*
437 	 * If network manager is interlocked, re-enqueue the event for later.
438 	 */
439 	if (!isc__nm_acquire_interlocked(sock->mgr)) {
440 		isc__netievent_tcpstop_t *event = NULL;
441 
442 		event = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
443 		event->sock = sock;
444 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
445 				       (isc__netievent_t *)event);
446 	} else {
447 		uv_close((uv_handle_t *)&sock->uv_handle.tcp,
448 			 tcp_listenclose_cb);
449 		isc__nm_drop_interlocked(sock->mgr);
450 	}
451 }
452 
453 /*
454  * This callback is used for closing listening sockets.
455  */
456 static void
tcp_listenclose_cb(uv_handle_t * handle)457 tcp_listenclose_cb(uv_handle_t *handle) {
458 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
459 
460 	LOCK(&sock->lock);
461 	atomic_store(&sock->closed, true);
462 	atomic_store(&sock->listening, false);
463 	sock->pquota = NULL;
464 	UNLOCK(&sock->lock);
465 
466 	isc_nmsocket_detach(&sock);
467 }
468 
469 static void
readtimeout_cb(uv_timer_t * handle)470 readtimeout_cb(uv_timer_t *handle) {
471 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
472 
473 	REQUIRE(VALID_NMSOCK(sock));
474 	REQUIRE(sock->tid == isc_nm_tid());
475 
476 	/*
477 	 * Socket is actively processing something, so restart the timer
478 	 * and return.
479 	 */
480 	if (atomic_load(&sock->processing)) {
481 		uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
482 		return;
483 	}
484 
485 	/*
486 	 * Timeout; stop reading and process whatever we have.
487 	 */
488 	uv_read_stop(&sock->uv_handle.stream);
489 	if (sock->quota) {
490 		isc_quota_detach(&sock->quota);
491 	}
492 	sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
493 }
494 
495 isc_result_t
isc__nm_tcp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)496 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
497 	isc_nmsocket_t *sock = NULL;
498 	isc__netievent_startread_t *ievent = NULL;
499 
500 	REQUIRE(VALID_NMHANDLE(handle));
501 	REQUIRE(VALID_NMSOCK(handle->sock));
502 
503 	sock = handle->sock;
504 	sock->rcb.recv = cb;
505 	sock->rcbarg = cbarg;
506 
507 	ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
508 	ievent->sock = sock;
509 
510 	if (sock->tid == isc_nm_tid()) {
511 		isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
512 					    (isc__netievent_t *)ievent);
513 		isc__nm_put_ievent(sock->mgr, ievent);
514 	} else {
515 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
516 				       (isc__netievent_t *)ievent);
517 	}
518 
519 	return (ISC_R_SUCCESS);
520 }
521 
522 void
isc__nm_async_tcp_startread(isc__networker_t * worker,isc__netievent_t * ev0)523 isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
524 	isc__netievent_startread_t *ievent = (isc__netievent_startread_t *)ev0;
525 	isc_nmsocket_t *sock = ievent->sock;
526 	int r;
527 
528 	REQUIRE(worker->id == isc_nm_tid());
529 	if (sock->read_timeout != 0) {
530 		if (!sock->timer_initialized) {
531 			uv_timer_init(&worker->loop, &sock->timer);
532 			uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
533 			sock->timer_initialized = true;
534 		}
535 		uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout,
536 			       0);
537 	}
538 
539 	r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb);
540 	if (r != 0) {
541 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
542 	}
543 }
544 
545 isc_result_t
isc__nm_tcp_pauseread(isc_nmsocket_t * sock)546 isc__nm_tcp_pauseread(isc_nmsocket_t *sock) {
547 	isc__netievent_pauseread_t *ievent = NULL;
548 
549 	REQUIRE(VALID_NMSOCK(sock));
550 
551 	if (atomic_load(&sock->readpaused)) {
552 		return (ISC_R_SUCCESS);
553 	}
554 
555 	atomic_store(&sock->readpaused, true);
556 	ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
557 	ievent->sock = sock;
558 
559 	if (sock->tid == isc_nm_tid()) {
560 		isc__nm_async_tcp_pauseread(&sock->mgr->workers[sock->tid],
561 					    (isc__netievent_t *)ievent);
562 		isc__nm_put_ievent(sock->mgr, ievent);
563 	} else {
564 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
565 				       (isc__netievent_t *)ievent);
566 	}
567 
568 	return (ISC_R_SUCCESS);
569 }
570 
571 void
isc__nm_async_tcp_pauseread(isc__networker_t * worker,isc__netievent_t * ev0)572 isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
573 	isc__netievent_pauseread_t *ievent = (isc__netievent_pauseread_t *)ev0;
574 	isc_nmsocket_t *sock = ievent->sock;
575 
576 	REQUIRE(VALID_NMSOCK(sock));
577 	REQUIRE(worker->id == isc_nm_tid());
578 
579 	if (sock->timer_initialized) {
580 		uv_timer_stop(&sock->timer);
581 	}
582 	uv_read_stop(&sock->uv_handle.stream);
583 }
584 
585 isc_result_t
isc__nm_tcp_resumeread(isc_nmsocket_t * sock)586 isc__nm_tcp_resumeread(isc_nmsocket_t *sock) {
587 	isc__netievent_startread_t *ievent = NULL;
588 
589 	REQUIRE(VALID_NMSOCK(sock));
590 	REQUIRE(sock->rcb.recv != NULL);
591 
592 	if (!atomic_load(&sock->readpaused)) {
593 		return (ISC_R_SUCCESS);
594 	}
595 
596 	atomic_store(&sock->readpaused, false);
597 
598 	ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
599 	ievent->sock = sock;
600 
601 	if (sock->tid == isc_nm_tid()) {
602 		isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
603 					    (isc__netievent_t *)ievent);
604 		isc__nm_put_ievent(sock->mgr, ievent);
605 	} else {
606 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
607 				       (isc__netievent_t *)ievent);
608 	}
609 
610 	return (ISC_R_SUCCESS);
611 }
612 
613 static void
read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)614 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
615 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
616 
617 	REQUIRE(VALID_NMSOCK(sock));
618 	REQUIRE(buf != NULL);
619 
620 	if (nread >= 0) {
621 		isc_region_t region = { .base = (unsigned char *)buf->base,
622 					.length = nread };
623 
624 		if (sock->rcb.recv != NULL) {
625 			sock->rcb.recv(sock->tcphandle, &region, sock->rcbarg);
626 		}
627 
628 		sock->read_timeout = (atomic_load(&sock->keepalive)
629 					      ? sock->mgr->keepalive
630 					      : sock->mgr->idle);
631 
632 		if (sock->timer_initialized && sock->read_timeout != 0) {
633 			/* The timer will be updated */
634 			uv_timer_start(&sock->timer, readtimeout_cb,
635 				       sock->read_timeout, 0);
636 		}
637 
638 		isc__nm_free_uvbuf(sock, buf);
639 		return;
640 	}
641 
642 	isc__nm_free_uvbuf(sock, buf);
643 
644 	/*
645 	 * This might happen if the inner socket is closing.  It means that
646 	 * it's detached, so the socket will be closed.
647 	 */
648 	if (sock->rcb.recv != NULL) {
649 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
650 		sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
651 	}
652 
653 	/*
654 	 * We don't need to clean up now; the socket will be closed and
655 	 * resources and quota reclaimed when handle is freed in
656 	 * isc__nm_tcp_close().
657 	 */
658 }
659 
660 static void
quota_accept_cb(isc_quota_t * quota,void * sock0)661 quota_accept_cb(isc_quota_t *quota, void *sock0) {
662 	isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
663 	isc__netievent_tcpaccept_t *ievent = NULL;
664 
665 	REQUIRE(VALID_NMSOCK(sock));
666 
667 	/*
668 	 * Create a tcpaccept event and pass it using the async channel.
669 	 */
670 	ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpaccept);
671 	ievent->sock = sock;
672 	ievent->quota = quota;
673 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
674 			       (isc__netievent_t *)ievent);
675 }
676 
677 /*
678  * This is called after we get a quota_accept_cb() callback.
679  */
680 void
isc__nm_async_tcpaccept(isc__networker_t * worker,isc__netievent_t * ev0)681 isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
682 	isc_result_t result;
683 	isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
684 
685 	REQUIRE(worker->id == ievent->sock->tid);
686 
687 	result = accept_connection(ievent->sock, ievent->quota);
688 	if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
689 		if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
690 		    can_log_tcp_quota()) {
691 			isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
692 				      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
693 				      "TCP connection failed: %s",
694 				      isc_result_totext(result));
695 		}
696 	}
697 
698 	/*
699 	 * The socket was attached just before we called isc_quota_attach_cb().
700 	 */
701 	isc_nmsocket_detach(&ievent->sock);
702 }
703 
704 /*
705  * Close callback for uv_tcp_t strutures created in accept_connection().
706  */
707 static void
free_uvtcpt(uv_handle_t * uvs)708 free_uvtcpt(uv_handle_t *uvs) {
709 	isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs);
710 	isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t));
711 }
712 
713 static isc_result_t
accept_connection(isc_nmsocket_t * ssock,isc_quota_t * quota)714 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
715 	isc_result_t result;
716 	isc__netievent_tcpchildaccept_t *event = NULL;
717 	isc__networker_t *worker = NULL;
718 	uv_tcp_t *uvstream = NULL;
719 	isc_mem_t *mctx = NULL;
720 	int r, w;
721 
722 	REQUIRE(VALID_NMSOCK(ssock));
723 	REQUIRE(ssock->tid == isc_nm_tid());
724 
725 	if (!atomic_load_relaxed(&ssock->active) ||
726 	    atomic_load_relaxed(&ssock->mgr->closing))
727 	{
728 		/* We're closing, bail */
729 		if (quota != NULL) {
730 			isc_quota_detach(&quota);
731 		}
732 		return (ISC_R_CANCELED);
733 	}
734 
735 	/* We can be called directly or as a callback from quota */
736 	if (ssock->pquota != NULL && quota == NULL) {
737 		/*
738 		 * We need to attach to ssock, because it might be queued
739 		 * waiting for a TCP quota slot.  If so, then we'll detach it
740 		 * later when the connection is accepted. (XXX: This may be
741 		 * suboptimal, it might be better not to attach unless
742 		 * we need to - but we risk a race then.)
743 		 */
744 		isc_nmsocket_t *tsock = NULL;
745 		isc_nmsocket_attach(ssock, &tsock);
746 		result = isc_quota_attach_cb(ssock->pquota, &quota,
747 					     &ssock->quotacb);
748 		if (result == ISC_R_QUOTA) {
749 			isc__nm_incstats(ssock->mgr,
750 					 ssock->statsindex[STATID_ACCEPTFAIL]);
751 			return (result);
752 		}
753 
754 		/*
755 		 * We're under quota, so there's no need to wait;
756 		 * Detach the socket.
757 		 */
758 		isc_nmsocket_detach(&tsock);
759 	}
760 
761 	isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
762 
763 	worker = &ssock->mgr->workers[isc_nm_tid()];
764 	uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t));
765 
766 	isc_mem_attach(ssock->mgr->mctx, &mctx);
767 	uv_handle_set_data((uv_handle_t *)uvstream, mctx);
768 	mctx = NULL; /* Detached later in free_uvtcpt() */
769 
770 	uv_tcp_init(&worker->loop, uvstream);
771 
772 	r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream);
773 	if (r != 0) {
774 		result = isc__nm_uverr2result(r);
775 		uv_close((uv_handle_t *)uvstream, free_uvtcpt);
776 		isc_quota_detach(&quota);
777 		return (result);
778 	}
779 
780 	/* We have an accepted TCP socket, pass it to a random worker */
781 	w = isc_random_uniform(ssock->mgr->nworkers);
782 	event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
783 	event->sock = ssock;
784 	event->quota = quota;
785 
786 	r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
787 	RUNTIME_CHECK(r == 0);
788 
789 	uv_close((uv_handle_t *)uvstream, free_uvtcpt);
790 
791 	if (w == isc_nm_tid()) {
792 		isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w],
793 					     (isc__netievent_t *)event);
794 		isc__nm_put_ievent(ssock->mgr, event);
795 	} else {
796 		isc__nm_enqueue_ievent(&ssock->mgr->workers[w],
797 				       (isc__netievent_t *)event);
798 	}
799 
800 	return (ISC_R_SUCCESS);
801 }
802 
803 isc_result_t
isc__nm_tcp_send(isc_nmhandle_t * handle,isc_region_t * region,isc_nm_cb_t cb,void * cbarg)804 isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
805 		 void *cbarg) {
806 	isc_nmsocket_t *sock = handle->sock;
807 	isc__netievent_tcpsend_t *ievent = NULL;
808 	isc__nm_uvreq_t *uvreq = NULL;
809 
810 	REQUIRE(sock->type == isc_nm_tcpsocket);
811 
812 	uvreq = isc__nm_uvreq_get(sock->mgr, sock);
813 	uvreq->uvbuf.base = (char *)region->base;
814 	uvreq->uvbuf.len = region->length;
815 	uvreq->handle = handle;
816 	isc_nmhandle_ref(uvreq->handle);
817 	uvreq->cb.send = cb;
818 	uvreq->cbarg = cbarg;
819 
820 	if (sock->tid == isc_nm_tid()) {
821 		/*
822 		 * If we're in the same thread as the socket we can send the
823 		 * data directly
824 		 */
825 		return (tcp_send_direct(sock, uvreq));
826 	} else {
827 		/*
828 		 * We need to create an event and pass it using async channel
829 		 */
830 		ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpsend);
831 		ievent->sock = sock;
832 		ievent->req = uvreq;
833 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
834 				       (isc__netievent_t *)ievent);
835 		return (ISC_R_SUCCESS);
836 	}
837 
838 	return (ISC_R_UNEXPECTED);
839 }
840 
841 static void
tcp_send_cb(uv_write_t * req,int status)842 tcp_send_cb(uv_write_t *req, int status) {
843 	isc_result_t result = ISC_R_SUCCESS;
844 	isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
845 
846 	REQUIRE(VALID_UVREQ(uvreq));
847 	REQUIRE(VALID_NMHANDLE(uvreq->handle));
848 
849 	if (status < 0) {
850 		result = isc__nm_uverr2result(status);
851 		isc__nm_incstats(uvreq->sock->mgr,
852 				 uvreq->sock->statsindex[STATID_SENDFAIL]);
853 	}
854 
855 	uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
856 	isc_nmhandle_unref(uvreq->handle);
857 	isc__nm_uvreq_put(&uvreq, uvreq->handle->sock);
858 }
859 
860 /*
861  * Handle 'tcpsend' async event - send a packet on the socket
862  */
863 void
isc__nm_async_tcpsend(isc__networker_t * worker,isc__netievent_t * ev0)864 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
865 	isc_result_t result;
866 	isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
867 
868 	REQUIRE(worker->id == ievent->sock->tid);
869 
870 	if (!atomic_load(&ievent->sock->active)) {
871 		return;
872 	}
873 
874 	result = tcp_send_direct(ievent->sock, ievent->req);
875 	if (result != ISC_R_SUCCESS) {
876 		ievent->req->cb.send(ievent->req->handle, result,
877 				     ievent->req->cbarg);
878 		isc__nm_uvreq_put(&ievent->req, ievent->req->handle->sock);
879 	}
880 }
881 
882 static isc_result_t
tcp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)883 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
884 	int r;
885 
886 	REQUIRE(sock->tid == isc_nm_tid());
887 	REQUIRE(sock->type == isc_nm_tcpsocket);
888 
889 	isc_nmhandle_ref(req->handle);
890 	r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf,
891 		     1, tcp_send_cb);
892 	if (r < 0) {
893 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
894 		req->cb.send(NULL, isc__nm_uverr2result(r), req->cbarg);
895 		isc__nm_uvreq_put(&req, sock);
896 		return (isc__nm_uverr2result(r));
897 	}
898 
899 	return (ISC_R_SUCCESS);
900 }
901 
902 static void
tcp_close_cb(uv_handle_t * uvhandle)903 tcp_close_cb(uv_handle_t *uvhandle) {
904 	isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
905 
906 	REQUIRE(VALID_NMSOCK(sock));
907 
908 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
909 	atomic_store(&sock->closed, true);
910 	isc__nmsocket_prep_destroy(sock);
911 }
912 
913 static void
timer_close_cb(uv_handle_t * uvhandle)914 timer_close_cb(uv_handle_t *uvhandle) {
915 	isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
916 
917 	REQUIRE(VALID_NMSOCK(sock));
918 
919 	isc_nmsocket_detach(&sock->server);
920 	uv_close(&sock->uv_handle.handle, tcp_close_cb);
921 }
922 
923 static void
tcp_close_direct(isc_nmsocket_t * sock)924 tcp_close_direct(isc_nmsocket_t *sock) {
925 	REQUIRE(VALID_NMSOCK(sock));
926 	REQUIRE(sock->tid == isc_nm_tid());
927 	REQUIRE(sock->type == isc_nm_tcpsocket);
928 	if (sock->quota != NULL) {
929 		isc_quota_detach(&sock->quota);
930 	}
931 	if (sock->timer_initialized) {
932 		sock->timer_initialized = false;
933 		uv_timer_stop(&sock->timer);
934 		uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
935 	} else {
936 		if (sock->server != NULL) {
937 			isc_nmsocket_detach(&sock->server);
938 		}
939 		uv_close(&sock->uv_handle.handle, tcp_close_cb);
940 	}
941 }
942 
943 void
isc__nm_tcp_close(isc_nmsocket_t * sock)944 isc__nm_tcp_close(isc_nmsocket_t *sock) {
945 	REQUIRE(VALID_NMSOCK(sock));
946 	REQUIRE(sock->type == isc_nm_tcpsocket);
947 
948 	if (sock->tid == isc_nm_tid()) {
949 		tcp_close_direct(sock);
950 	} else {
951 		/*
952 		 * We need to create an event and pass it using async channel
953 		 */
954 		isc__netievent_tcpclose_t *ievent =
955 			isc__nm_get_ievent(sock->mgr, netievent_tcpclose);
956 
957 		ievent->sock = sock;
958 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
959 				       (isc__netievent_t *)ievent);
960 	}
961 }
962 
963 void
isc__nm_async_tcpclose(isc__networker_t * worker,isc__netievent_t * ev0)964 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
965 	isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
966 
967 	REQUIRE(worker->id == ievent->sock->tid);
968 
969 	tcp_close_direct(ievent->sock);
970 }
971 
972 void
isc__nm_tcp_shutdown(isc_nmsocket_t * sock)973 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
974 	REQUIRE(VALID_NMSOCK(sock));
975 
976 	if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
977 	    sock->rcb.recv != NULL)
978 	{
979 		sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
980 	}
981 }
982