1 /*  Copyright (C) 2014-2020 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2  *  SPDX-License-Identifier: GPL-3.0-or-later
3  */
4 
5 #include "daemon/io.h"
6 
7 #include <contrib/ucw/lib.h>
8 #include <contrib/ucw/mempool.h>
9 #include <libknot/errcode.h>
10 #include <string.h>
11 #include <sys/resource.h>
12 
13 #if ENABLE_XDP
14 	#include <libknot/xdp/eth.h>
15 	#include <libknot/xdp/xdp.h>
16 	#include <net/if.h>
17 #endif
18 
19 #include "daemon/network.h"
20 #include "daemon/worker.h"
21 #include "daemon/tls.h"
22 #include "daemon/http.h"
23 #include "daemon/session.h"
24 #include "contrib/cleanup.h"
25 #include "lib/utils.h"
26 
27 #define negotiate_bufsize(func, handle, bufsize_want) do { \
28     int bufsize = 0; (func)((handle), &bufsize); \
29 	if (bufsize < (bufsize_want)) { \
30 		bufsize = (bufsize_want); \
31 		(func)((handle), &bufsize); \
32 	} \
33 } while (0)
34 
check_bufsize(uv_handle_t * handle)35 static void check_bufsize(uv_handle_t* handle)
36 {
37 	return; /* TODO: resurrect after https://github.com/libuv/libuv/issues/419 */
38 	/* We want to buffer at least N waves in advance.
39 	 * This is magic presuming we can pull in a whole recvmmsg width in one wave.
40 	 * Linux will double this the bufsize wanted.
41 	 */
42 	const int bufsize_want = 2 * sizeof( ((struct worker_ctx *)NULL)->wire_buf ) ;
43 	negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
44 	negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
45 }
46 
47 #undef negotiate_bufsize
48 
handle_getbuf(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)49 static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
50 {
51 	/* UDP sessions use worker buffer for wire data,
52 	 * TCP sessions use session buffer for wire data
53 	 * (see session_set_handle()).
54 	 * TLS sessions use buffer from TLS context.
55 	 * The content of the worker buffer is
56 	 * guaranteed to be unchanged only for the duration of
57 	 * udp_read() and tcp_read().
58 	 */
59 	struct session *s = handle->data;
60 	if (!session_flags(s)->has_tls) {
61 		buf->base = (char *) session_wirebuf_get_free_start(s);
62 		buf->len = session_wirebuf_get_free_size(s);
63 	} else {
64 		struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
65 		buf->base = (char *) ctx->recv_buf;
66 		buf->len = sizeof(ctx->recv_buf);
67 	}
68 }
69 
udp_recv(uv_udp_t * handle,ssize_t nread,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)70 void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
71 	const struct sockaddr *addr, unsigned flags)
72 {
73 	struct session *s = handle->data;
74 	if (session_flags(s)->closing || nread <= 0 || addr->sa_family == AF_UNSPEC)
75 		return;
76 
77 	if (session_flags(s)->outgoing) {
78 		const struct sockaddr *peer = session_get_peer(s);
79 		if (kr_fails_assert(peer->sa_family != AF_UNSPEC))
80 			return;
81 		if (kr_sockaddr_cmp(peer, addr) != 0) {
82 			kr_log_debug(IO, "<= ignoring UDP from unexpected address '%s'\n",
83 					kr_straddr(addr));
84 			return;
85 		}
86 	}
87 	ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base,
88 						   nread);
89 	kr_assert(consumed == nread);
90 	session_wirebuf_process(s, addr);
91 	session_wirebuf_discard(s);
92 	mp_flush(the_worker->pkt_pool.ctx);
93 }
94 
family_to_freebind_option(sa_family_t sa_family,int * level,int * name)95 static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name)
96 {
97 	switch (sa_family) {
98 	case AF_INET:
99 		*level = IPPROTO_IP;
100 #if defined(IP_FREEBIND)
101 		*name = IP_FREEBIND;
102 #elif defined(IP_BINDANY)
103 		*name = IP_BINDANY;
104 #else
105 		return kr_error(ENOTSUP);
106 #endif
107 		break;
108 	case AF_INET6:
109 #if defined(IP_FREEBIND)
110 		*level = IPPROTO_IP;
111 		*name = IP_FREEBIND;
112 #elif defined(IPV6_BINDANY)
113 		*level = IPPROTO_IPV6;
114 		*name = IPV6_BINDANY;
115 #else
116 		return kr_error(ENOTSUP);
117 #endif
118 		break;
119 	default:
120 		return kr_error(ENOTSUP);
121 	}
122 	return kr_ok();
123 }
124 
io_bind(const struct sockaddr * addr,int type,const endpoint_flags_t * flags)125 int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags)
126 {
127 	const int fd = socket(addr->sa_family, type, 0);
128 	if (fd < 0) return kr_error(errno);
129 
130 	int yes = 1;
131 	if (addr->sa_family == AF_INET || addr->sa_family == AF_INET6) {
132 		if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
133 			return kr_error(errno);
134 
135 #ifdef SO_REUSEPORT_LB
136 		if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &yes, sizeof(yes)))
137 			return kr_error(errno);
138 #elif defined(SO_REUSEPORT) && defined(__linux__) /* different meaning on (Free)BSD */
139 		if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
140 			return kr_error(errno);
141 #endif
142 
143 #ifdef IPV6_V6ONLY
144 		if (addr->sa_family == AF_INET6
145 		    && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes)))
146 			return kr_error(errno);
147 #endif
148 		if (flags != NULL && flags->freebind) {
149 			int optlevel;
150 			int optname;
151 			int ret = family_to_freebind_option(addr->sa_family, &optlevel, &optname);
152 			if (ret) return kr_error(ret);
153 			if (setsockopt(fd, optlevel, optname, &yes, sizeof(yes)))
154 				return kr_error(errno);
155 		}
156 
157 		/* Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets
158 		 * ignore PMTU information and send packets with DF=0.
159 		 * This mitigates DNS fragmentation attacks by preventing
160 		 * forged PMTU information.  FreeBSD already has same semantics
161 		 * without setting the option.
162 			https://gitlab.nic.cz/knot/knot-dns/-/issues/640
163 		 */
164 #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
165 		int omit = IP_PMTUDISC_OMIT;
166 		if (type == SOCK_DGRAM && addr->sa_family == AF_INET
167 		    && setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &omit, sizeof(omit))) {
168 			kr_log_error(IO,
169 				"failed to disable Path MTU discovery for %s UDP: %s\n",
170 				kr_straddr(addr), strerror(errno));
171 		}
172 #endif
173 	}
174 
175 	if (bind(fd, addr, kr_sockaddr_len(addr)))
176 		return kr_error(errno);
177 
178 	return fd;
179 }
180 
io_listen_udp(uv_loop_t * loop,uv_udp_t * handle,int fd)181 int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd)
182 {
183 	if (!handle) {
184 		return kr_error(EINVAL);
185 	}
186 	int ret = uv_udp_init(loop, handle);
187 	if (ret) return ret;
188 
189 	ret = uv_udp_open(handle, fd);
190 	if (ret) return ret;
191 
192 	uv_handle_t *h = (uv_handle_t *)handle;
193 	check_bufsize(h);
194 	/* Handle is already created, just create context. */
195 	struct session *s = session_new(h, false, false);
196 	kr_require(s);
197 	session_flags(s)->outgoing = false;
198 
199 	int socklen = sizeof(union inaddr);
200 	ret = uv_udp_getsockname(handle, session_get_sockname(s), &socklen);
201 	if (ret) {
202 		kr_log_error(IO, "ERROR: getsockname failed: %s\n", uv_strerror(ret));
203 		abort(); /* It might be nontrivial not to leak something here. */
204 	}
205 
206 	return io_start_read(h);
207 }
208 
tcp_timeout_trigger(uv_timer_t * timer)209 void tcp_timeout_trigger(uv_timer_t *timer)
210 {
211 	struct session *s = timer->data;
212 
213 	if (kr_fails_assert(!session_flags(s)->closing))
214 		return;
215 
216 	if (!session_tasklist_is_empty(s)) {
217 		int finalized = session_tasklist_finalize_expired(s);
218 		the_worker->stats.timeout += finalized;
219 		/* session_tasklist_finalize_expired() may call worker_task_finalize().
220 		 * If session is a source session and there were IO errors,
221 		 * worker_task_finalize() can finalize all tasks and close session. */
222 		if (session_flags(s)->closing) {
223 			return;
224 		}
225 
226 	}
227 	if (!session_tasklist_is_empty(s)) {
228 		uv_timer_stop(timer);
229 		session_timer_start(s, tcp_timeout_trigger,
230 				    KR_RESOLVE_TIME_LIMIT / 2,
231 				    KR_RESOLVE_TIME_LIMIT / 2);
232 	} else {
233 		/* Normally it should not happen,
234 		 * but better to check if there anything in this list. */
235 		while (!session_waitinglist_is_empty(s)) {
236 			struct qr_task *t = session_waitinglist_pop(s, false);
237 			worker_task_finalize(t, KR_STATE_FAIL);
238 			worker_task_unref(t);
239 			the_worker->stats.timeout += 1;
240 			if (session_flags(s)->closing) {
241 				return;
242 			}
243 		}
244 		const struct network *net = &the_worker->engine->net;
245 		uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
246 		uint64_t last_activity = session_last_activity(s);
247 		uint64_t idle_time = kr_now() - last_activity;
248 		if (idle_time < idle_in_timeout) {
249 			idle_in_timeout -= idle_time;
250 			uv_timer_stop(timer);
251 			session_timer_start(s, tcp_timeout_trigger,
252 					    idle_in_timeout, idle_in_timeout);
253 		} else {
254 			struct sockaddr *peer = session_get_peer(s);
255 			char *peer_str = kr_straddr(peer);
256 			kr_log_debug(IO, "=> closing connection to '%s'\n",
257 				       peer_str ? peer_str : "");
258 			if (session_flags(s)->outgoing) {
259 				worker_del_tcp_waiting(the_worker, peer);
260 				worker_del_tcp_connected(the_worker, peer);
261 			}
262 			session_close(s);
263 		}
264 	}
265 }
266 
tcp_recv(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)267 static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
268 {
269 	struct session *s = handle->data;
270 	if (kr_fails_assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP))
271 		return;
272 
273 	if (session_flags(s)->closing) {
274 		return;
275 	}
276 
277 	/* nread might be 0, which does not indicate an error or EOF.
278 	 * This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
279 	if (nread == 0) {
280 		return;
281 	}
282 
283 	if (nread < 0 || !buf->base) {
284 		if (kr_log_is_debug(IO, NULL)) {
285 			struct sockaddr *peer = session_get_peer(s);
286 			char *peer_str = kr_straddr(peer);
287 			kr_log_debug(IO, "=> connection to '%s' closed by peer (%s)\n",
288 				       peer_str ? peer_str : "",
289 				       uv_strerror(nread));
290 		}
291 		worker_end_tcp(s);
292 		return;
293 	}
294 
295 	ssize_t consumed = 0;
296 	const uint8_t *data = (const uint8_t *)buf->base;
297 	ssize_t data_len = nread;
298 	if (session_flags(s)->has_tls) {
299 		/* buf->base points to start of the tls receive buffer.
300 		   Decode data free space in session wire buffer. */
301 		consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
302 		if (consumed < 0) {
303 			if (kr_log_is_debug(IO, NULL)) {
304 				struct sockaddr *peer = session_get_peer(s);
305 				char *peer_str = kr_straddr(peer);
306 				kr_log_debug(IO, "=> connection to '%s': "
307 					       "error processing TLS data, close\n",
308 					       peer_str ? peer_str : "");
309 			}
310 			worker_end_tcp(s);
311 			return;
312 		} else if (consumed == 0) {
313 			return;
314 		}
315 		data = session_wirebuf_get_free_start(s);
316 		data_len = consumed;
317 	}
318 #if ENABLE_DOH2
319 	if (session_flags(s)->has_http) {
320 		consumed = http_process_input_data(s, data, data_len);
321 		if (consumed < 0) {
322 			if (kr_log_is_debug(IO, NULL)) {
323 				struct sockaddr *peer = session_get_peer(s);
324 				char *peer_str = kr_straddr(peer);
325 				kr_log_debug(IO, "=> connection to '%s': "
326 				       "error processing HTTP data, close\n",
327 				       peer_str ? peer_str : "");
328 			}
329 			worker_end_tcp(s);
330 			return;
331 		} else if (consumed == 0) {
332 			return;
333 		}
334 		data = session_wirebuf_get_free_start(s);
335 		data_len = consumed;
336 	}
337 #endif
338 
339 	/* data points to start of the free space in session wire buffer.
340 	   Simple increase internal counter. */
341 	consumed = session_wirebuf_consume(s, data, data_len);
342 	kr_assert(consumed == data_len);
343 
344 	int ret = session_wirebuf_process(s, session_get_peer(s));
345 	if (ret < 0) {
346 		/* An error has occurred, close the session. */
347 		worker_end_tcp(s);
348 	}
349 	session_wirebuf_compress(s);
350 	mp_flush(the_worker->pkt_pool.ctx);
351 }
352 
353 #if ENABLE_DOH2
tls_send(const uint8_t * buf,const size_t len,struct session * session)354 static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session)
355 {
356 	struct tls_ctx *ctx = session_tls_get_server_ctx(session);
357 	ssize_t sent = 0;
358 	kr_require(ctx);
359 
360 	sent = gnutls_record_send(ctx->c.tls_session, buf, len);
361 	if (sent < 0) {
362 		kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n",
363 			       gnutls_strerror_name(sent), sent);
364 		return kr_error(EIO);
365 	}
366 	return sent;
367 }
368 #endif
369 
_tcp_accept(uv_stream_t * master,int status,bool tls,bool http)370 static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http)
371 {
372  	if (status != 0) {
373 		return;
374 	}
375 
376 	struct worker_ctx *worker = the_worker;
377 	uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
378 	if (!client) {
379 		return;
380 	}
381 	int res = io_create(master->loop, (uv_handle_t *)client,
382 			    SOCK_STREAM, AF_UNSPEC, tls, http);
383 	if (res) {
384 		if (res == UV_EMFILE) {
385 			worker->too_many_open = true;
386 			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
387 		}
388 		/* Since res isn't OK struct session wasn't allocated \ borrowed.
389 		 * We must release client handle only.
390 		 */
391 		free(client);
392 		return;
393 	}
394 
395 	/* struct session was allocated \ borrowed from memory pool. */
396 	struct session *s = client->data;
397 	kr_require(session_flags(s)->outgoing == false);
398 	kr_require(session_flags(s)->has_tls == tls);
399 
400 	if (uv_accept(master, (uv_stream_t *)client) != 0) {
401 		/* close session, close underlying uv handles and
402 		 * deallocate (or return to memory pool) memory. */
403 		session_close(s);
404 		return;
405 	}
406 
407 	/* Get peer's and our address.  We apparently get specific sockname here
408 	 * even if we listened on a wildcard address. */
409 	struct sockaddr *sa = session_get_peer(s);
410 	int sa_len = sizeof(struct sockaddr_in6);
411 	int ret = uv_tcp_getpeername(client, sa, &sa_len);
412 	if (ret || sa->sa_family == AF_UNSPEC) {
413 		session_close(s);
414 		return;
415 	}
416 	sa = session_get_sockname(s);
417 	sa_len = sizeof(struct sockaddr_in6);
418 	ret = uv_tcp_getsockname(client, sa, &sa_len);
419 	if (ret || sa->sa_family == AF_UNSPEC) {
420 		session_close(s);
421 		return;
422 	}
423 
424 	/* Set deadlines for TCP connection and start reading.
425 	 * It will re-check every half of a request time limit if the connection
426 	 * is idle and should be terminated, this is an educated guess. */
427 
428 	const struct network *net = &worker->engine->net;
429 	uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
430 
431 	uint64_t timeout = KR_CONN_RTT_MAX / 2;
432 	if (tls) {
433 		timeout += TLS_MAX_HANDSHAKE_TIME;
434 		struct tls_ctx *ctx = session_tls_get_server_ctx(s);
435 		if (!ctx) {
436 			ctx = tls_new(worker);
437 			if (!ctx) {
438 				session_close(s);
439 				return;
440 			}
441 			ctx->c.session = s;
442 			ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
443 
444 			/* Configure ALPN. */
445 			gnutls_datum_t proto;
446 			if (!http) {
447 				proto.data = (unsigned char *)"dot";
448 				proto.size = 3;
449 			} else {
450 				proto.data = (unsigned char *)"h2";
451 				proto.size = 2;
452 			}
453 			unsigned int flags = 0;
454 #if GNUTLS_VERSION_NUMBER >= 0x030500
455 			/* Mandatory ALPN means the protocol must match if and
456 			 * only if ALPN extension is used by the client. */
457 			flags |= GNUTLS_ALPN_MANDATORY;
458 #endif
459 			ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags);
460 			if (ret != GNUTLS_E_SUCCESS) {
461 				session_close(s);
462 				return;
463 			}
464 
465 			session_tls_set_server_ctx(s, ctx);
466 		}
467 	}
468 #if ENABLE_DOH2
469 	if (http) {
470 		struct http_ctx *ctx = session_http_get_server_ctx(s);
471 		if (!ctx) {
472 			if (!tls) {  /* Plain HTTP is not supported. */
473 				session_close(s);
474 				return;
475 			}
476 			ctx = http_new(s, tls_send);
477 			if (!ctx) {
478 				session_close(s);
479 				return;
480 			}
481 			session_http_set_server_ctx(s, ctx);
482 		}
483 	}
484 #endif
485 	session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
486 	io_start_read((uv_handle_t *)client);
487 }
488 
tcp_accept(uv_stream_t * master,int status)489 static void tcp_accept(uv_stream_t *master, int status)
490 {
491 	_tcp_accept(master, status, false, false);
492 }
493 
tls_accept(uv_stream_t * master,int status)494 static void tls_accept(uv_stream_t *master, int status)
495 {
496 	_tcp_accept(master, status, true, false);
497 }
498 
499 #if ENABLE_DOH2
https_accept(uv_stream_t * master,int status)500 static void https_accept(uv_stream_t *master, int status)
501 {
502 	_tcp_accept(master, status, true, true);
503 }
504 #endif
505 
io_listen_tcp(uv_loop_t * loop,uv_tcp_t * handle,int fd,int tcp_backlog,bool has_tls,bool has_http)506 int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls, bool has_http)
507 {
508 	uv_connection_cb connection;
509 
510 	if (!handle) {
511 		return kr_error(EINVAL);
512 	}
513 	int ret = uv_tcp_init(loop, handle);
514 	if (ret) return ret;
515 
516 	if (has_tls && has_http) {
517 #if ENABLE_DOH2
518 		connection = https_accept;
519 #else
520 		kr_log_error(IO, "kresd was compiled without libnghttp2 support\n");
521 		return kr_error(ENOPROTOOPT);
522 #endif
523 	} else if (has_tls) {
524 		connection = tls_accept;
525 	} else if (has_http) {
526 		return kr_error(EPROTONOSUPPORT);
527 	} else {
528 		connection = tcp_accept;
529 	}
530 
531 	ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
532 	if (ret) return ret;
533 
534 	int val; (void)val;
535 	/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
536 #ifdef TCP_DEFER_ACCEPT
537 	val = KR_CONN_RTT_MAX/1000;
538 	if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val))) {
539 		kr_log_error(IO, "listen TCP (defer_accept): %s\n", strerror(errno));
540 	}
541 #endif
542 
543 	ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
544 	if (ret != 0) {
545 		return ret;
546 	}
547 
548 	/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
549 #ifdef TCP_FASTOPEN
550 	#ifdef __linux__
551 	val = 16; /* Accepts queue length hint */
552 	#else
553 	val = 1; /* Accepts on/off */
554 	#endif
555 	if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &val, sizeof(val))) {
556 		kr_log_error(IO, "listen TCP (fastopen): %s%s\n", strerror(errno),
557 			(errno != EPERM ? "" :
558 			 ".  This may be caused by TCP Fast Open being disabled in the OS."));
559 	}
560 #endif
561 
562 	handle->data = NULL;
563 	return 0;
564 }
565 
566 
567 enum io_stream_mode {
568 	io_mode_text = 0,
569 	io_mode_binary = 1,
570 };
571 
572 struct io_stream_data {
573 	enum io_stream_mode mode;
574 	size_t blen; ///< length of `buf`
575 	char *buf;  ///< growing buffer residing on `pool` (mp_append_*)
576 	knot_mm_t *pool;
577 };
578 
579 /**
580  * TTY control: process input and free() the buffer.
581  *
582  * For parameters see http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb
583  *
584  * - This is just basic read-eval-print; libedit is supported through kresc;
585  */
io_tty_process_input(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)586 void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
587 {
588 	auto_free char *commands = buf ? buf->base : NULL;
589 
590 	/* Set output streams */
591 	FILE *out = stdout;
592 	uv_os_fd_t stream_fd = -1;
593 	struct args *args = the_args;
594 	struct io_stream_data *data = (struct io_stream_data*) stream->data;
595 	if (nread < 0 || uv_fileno((uv_handle_t *)stream, &stream_fd)) {
596 		mp_delete(data->pool->ctx);
597 		uv_close((uv_handle_t *)stream, (uv_close_cb) free);
598 		return;
599 	}
600 	if (nread <= 0) {
601 		return;
602 	}
603 	if (stream_fd != STDIN_FILENO) {
604 		uv_os_fd_t dup_fd = dup(stream_fd);
605 		if (dup_fd >= 0) {
606 			out = fdopen(dup_fd, "w");
607 		}
608 	}
609 
610 	/** The current single command and the remaining command(s). */
611 	char *cmd, *cmd_next = NULL;
612 	bool incomplete_cmd = false;
613 
614 	if (!(stream && commands && nread > 0)) {
615 		goto finish;
616 	}
617 	/* Execute */
618 
619 	if (commands[nread - 1] != '\n') {
620 		incomplete_cmd = true;
621 	}
622 	/* Ensure commands is 0-terminated */
623 	if (nread >= buf->len) { /* only equality should be possible */
624 		char *newbuf = realloc(commands, nread + 1);
625 		if (!newbuf)
626 			goto finish;
627 		commands = newbuf;
628 	}
629 	commands[nread] = '\0';
630 
631 	char *boundary = "\n\0";
632 	cmd = strtok(commands, "\n");
633 	/* strtok skip '\n' but we need process alone '\n' too */
634 	if (commands[0] == '\n') {
635 		cmd_next = cmd;
636 		cmd = boundary;
637 	} else {
638 		cmd_next = strtok(NULL, "\n");
639 	}
640 
641 	/** Moving pointer to end of buffer with incomplete command. */
642 	char *pbuf = data->buf + data->blen;
643 	lua_State *L = the_worker->engine->L;
644 	while (cmd != NULL) {
645 		/* Last command is incomplete - save it and execute later */
646 		if (incomplete_cmd && cmd_next == NULL) {
647 			pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
648 			mp_append_char(data->pool->ctx, pbuf, '\0');
649 			data->buf = mp_ptr(data->pool->ctx);
650 			data->blen = data->blen + strlen(cmd);
651 
652 			/* There is new incomplete command */
653 			if (commands[nread - 1] == '\n')
654 				incomplete_cmd = false;
655 			goto next_iter;
656 		}
657 
658 		/* Process incomplete command from previously call */
659 		if (data->blen > 0) {
660 			if (commands[0] != '\n' && commands[0] != '\0') {
661 				pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
662 				mp_append_char(data->pool->ctx, pbuf, '\0');
663 				data->buf = mp_ptr(data->pool->ctx);
664 				cmd = data->buf;
665 			} else {
666 				cmd = data->buf;
667 			}
668 			data->blen = 0;
669 			pbuf = data->buf;
670 		}
671 
672 		/* Pseudo-command for switching to "binary output"; */
673 		if (strcmp(cmd, "__binary") == 0) {
674 			data->mode = io_mode_binary;
675 			goto next_iter;
676 		}
677 
678 		const bool cmd_failed = engine_cmd(L, cmd, false);
679 		const char *message = NULL;
680 		size_t len_s;
681 		if (lua_gettop(L) > 0) {
682 			message = lua_tolstring(L, -1, &len_s);
683 		}
684 
685 		/* Send back the output, either in "binary" or normal mode. */
686 		if (data->mode == io_mode_binary) {
687 			/* Leader expects length field in all cases */
688 			if (!message || len_s > UINT32_MAX) {
689 				kr_log_error(IO, "unrepresentable response on control socket, "
690 						"sending back empty block (command '%s')\n", cmd);
691 				len_s = 0;
692 			}
693 			uint32_t len_n = htonl(len_s);
694 			fwrite(&len_n, sizeof(len_n), 1, out);
695 			if (len_s > 0)
696 				fwrite(message, len_s, 1, out);
697 		} else {
698 			if (message)
699 				fprintf(out, "%s", message);
700 			if (message || !args->quiet)
701 				fprintf(out, "\n");
702 			if (!args->quiet)
703 				fprintf(out, "> ");
704 		}
705 
706 		/* Duplicate command and output to logs */
707 		if (cmd_failed) {
708 			kr_log_warning(CONTROL, "> %s\n", cmd);
709 			if (message)
710 				kr_log_warning(CONTROL, "%s\n", message);
711 		} else {
712 			kr_log_debug(CONTROL, "> %s\n", cmd);
713 			if (message)
714 				kr_log_debug(CONTROL, "%s\n", message);
715 		}
716 	next_iter:
717 		lua_settop(L, 0); /* not required in some cases but harmless */
718 		cmd = cmd_next;
719 		cmd_next = strtok(NULL, "\n");
720 	}
721 
722 finish:
723 	/* Close if redirected */
724 	if (stream_fd != STDIN_FILENO) {
725 		fclose(out);
726 	}
727 }
728 
io_tty_alloc(uv_handle_t * handle,size_t suggested,uv_buf_t * buf)729 void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
730 {
731 	buf->len = suggested;
732 	buf->base = malloc(suggested);
733 }
734 
io_tty_alloc_data()735 struct io_stream_data *io_tty_alloc_data() {
736 	knot_mm_t *pool = mm_ctx_mempool2(MM_DEFAULT_BLKSIZE);
737 	if (!pool) {
738 		return NULL;
739 	}
740 	struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data));
741 
742 	data->buf = mp_start(pool->ctx, 512);
743 	data->mode = io_mode_text;
744 	data->blen = 0;
745 	data->pool = pool;
746 
747 	return data;
748 }
749 
io_tty_accept(uv_stream_t * master,int status)750 void io_tty_accept(uv_stream_t *master, int status)
751 {
752 	struct io_stream_data *data = io_tty_alloc_data();
753 	/* We can't use any allocations after mp_start() and it's easier anyway. */
754 	uv_pipe_t *client = malloc(sizeof(*client));
755 	client->data = data;
756 
757 	struct args *args = the_args;
758 	if (client && client->data) {
759 		 uv_pipe_init(master->loop, client, 0);
760 		 if (uv_accept(master, (uv_stream_t *)client) != 0) {
761 			mp_delete(data->pool->ctx);
762 			return;
763 		 }
764 		 uv_read_start((uv_stream_t *)client, io_tty_alloc, io_tty_process_input);
765 		 /* Write command line */
766 		 if (!args->quiet) {
767 			uv_buf_t buf = { "> ", 2 };
768 			uv_try_write((uv_stream_t *)client, &buf, 1);
769 		 }
770 	}
771 }
772 
io_listen_pipe(uv_loop_t * loop,uv_pipe_t * handle,int fd)773 int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd)
774 {
775 	if (!handle) {
776 		return kr_error(EINVAL);
777 	}
778 	int ret = uv_pipe_init(loop, handle, 0);
779 	if (ret) return ret;
780 
781 	ret = uv_pipe_open(handle, fd);
782 	if (ret) return ret;
783 
784 	ret = uv_listen((uv_stream_t *)handle, 16, io_tty_accept);
785 	if (ret) return ret;
786 
787 	handle->data = NULL;
788 
789 	return 0;
790 }
791 
792 #if ENABLE_XDP
xdp_rx(uv_poll_t * handle,int status,int events)793 static void xdp_rx(uv_poll_t* handle, int status, int events)
794 {
795 	const int XDP_RX_BATCH_SIZE = 64;
796 	if (status < 0) {
797 		kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
798 		return;
799 	}
800 	if (events != UV_READABLE) {
801 		kr_log_error(XDP, "poll unexpected events: %d\n", events);
802 		return;
803 	}
804 
805 	xdp_handle_data_t *xhd = handle->data;
806 	kr_require(xhd && xhd->session && xhd->socket);
807 	uint32_t rcvd;
808 	knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
809 	int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
810 			#if KNOT_VERSION_HEX >= 0x030100
811 			, NULL
812 			#endif
813 			);
814 
815 	if (kr_fails_assert(ret == KNOT_EOK)) {
816 		/* ATM other error codes can only be returned when called incorrectly */
817 		kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
818 		return;
819 	}
820 	kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
821 	kr_require(rcvd <= XDP_RX_BATCH_SIZE);
822 	for (int i = 0; i < rcvd; ++i) {
823 		const knot_xdp_msg_t *msg = &msgs[i];
824 		kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
825 		knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
826 						&the_worker->pkt_pool);
827 		if (kpkt == NULL) {
828 			ret = kr_error(ENOMEM);
829 		} else {
830 			ret = worker_submit(xhd->session,
831 					(const struct sockaddr *)&msg->ip_from,
832 					(const struct sockaddr *)&msg->ip_to,
833 					msg->eth_from, msg->eth_to, kpkt);
834 		}
835 		if (ret)
836 			kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
837 		mp_flush(the_worker->pkt_pool.ctx);
838 	}
839 	knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
840 }
841 /// Warn if the XDP program is running in emulated mode (XDP_SKB)
xdp_warn_mode(const char * ifname)842 static void xdp_warn_mode(const char *ifname)
843 {
844 	if (kr_fails_assert(ifname))
845 		return;
846 
847 	const unsigned if_index = if_nametoindex(ifname);
848 	if (!if_index) {
849 		kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
850 				ifname, strerror(errno));
851 		return;
852 	}
853 
854 	const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
855 	switch (mode) {
856 	case KNOT_XDP_MODE_FULL:
857 		return;
858 	case KNOT_XDP_MODE_EMUL:
859 		kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
860 				ifname);
861 		return;
862 	case KNOT_XDP_MODE_NONE: // enum warnings from compiler
863 		break;
864 	}
865 	kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
866 			ifname, (int)mode);
867 }
io_listen_xdp(uv_loop_t * loop,struct endpoint * ep,const char * ifname)868 int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
869 {
870 	if (!ep || !ep->handle) {
871 		return kr_error(EINVAL);
872 	}
873 
874 	// RLIMIT_MEMLOCK often needs raising when operating on BPF
875 	static int ret_limit = 1;
876 	if (ret_limit == 1) {
877 		struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
878 		ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
879 			? kr_error(errno) : 0;
880 	}
881 	if (ret_limit) return ret_limit;
882 
883 	xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
884 	if (!xhd) return kr_error(ENOMEM);
885 
886 	const int port = ep->port ? ep->port : // all ports otherwise
887 			#if KNOT_VERSION_HEX >= 0x030100
888 				(KNOT_XDP_LISTEN_PORT_PASS | 0);
889 			#else
890 				KNOT_XDP_LISTEN_PORT_ALL;
891 			#endif
892 	xhd->socket = NULL; // needed for some reason
893 	int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, port,
894 				KNOT_XDP_LOAD_BPF_MAYBE);
895 	if (!ret) xdp_warn_mode(ifname);
896 
897 	if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
898 	if (ret || kr_fails_assert(xhd->socket)) {
899 		free(xhd);
900 		return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
901 	}
902 	xhd->tx_waker.data = xhd->socket;
903 
904 	ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
905 	ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
906 	if (ret) {
907 		knot_xdp_deinit(xhd->socket);
908 		free(xhd);
909 		return kr_error(ret);
910 	}
911 
912 	// beware: this sets poll_handle->data
913 	xhd->session = session_new(ep->handle, false, false);
914 	kr_require(!session_flags(xhd->session)->outgoing);
915 	session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
916 
917 	ep->handle->data = xhd;
918 	ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
919 	return ret;
920 }
921 #endif
922 
923 
io_create(uv_loop_t * loop,uv_handle_t * handle,int type,unsigned family,bool has_tls,bool has_http)924 int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http)
925 {
926 	int ret = -1;
927 	if (type == SOCK_DGRAM) {
928 		ret = uv_udp_init(loop, (uv_udp_t *)handle);
929 	} else if (type == SOCK_STREAM) {
930 		ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
931 		uv_tcp_nodelay((uv_tcp_t *)handle, 1);
932 	}
933 	if (ret != 0) {
934 		return ret;
935 	}
936 	struct session *s = session_new(handle, has_tls, has_http);
937 	if (s == NULL) {
938 		ret = -1;
939 	}
940 	return ret;
941 }
942 
io_deinit(uv_handle_t * handle)943 static void io_deinit(uv_handle_t *handle)
944 {
945 	if (!handle || !handle->data) {
946 		return;
947 	}
948 	if (handle->type != UV_POLL) {
949 		session_free(handle->data);
950 	} else {
951 	#if ENABLE_XDP
952 		xdp_handle_data_t *xhd = handle->data;
953 		uv_idle_stop(&xhd->tx_waker);
954 		uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
955 		session_free(xhd->session);
956 		knot_xdp_deinit(xhd->socket);
957 		free(xhd);
958 	#else
959 		kr_assert(false);
960 	#endif
961 	}
962 }
963 
io_free(uv_handle_t * handle)964 void io_free(uv_handle_t *handle)
965 {
966 	io_deinit(handle);
967 	free(handle);
968 }
969 
io_start_read(uv_handle_t * handle)970 int io_start_read(uv_handle_t *handle)
971 {
972 	switch (handle->type) {
973 	case UV_UDP:
974 		return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
975 	case UV_TCP:
976 		return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
977 	default:
978 		kr_assert(false);
979 		return kr_error(EINVAL);
980 	}
981 }
982 
io_stop_read(uv_handle_t * handle)983 int io_stop_read(uv_handle_t *handle)
984 {
985 	if (handle->type == UV_UDP) {
986 		return uv_udp_recv_stop((uv_udp_t *)handle);
987 	} else {
988 		return uv_read_stop((uv_stream_t *)handle);
989 	}
990 }
991