1 /*  Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2 
3     This program is free software: you can redistribute it and/or modify
4     it under the terms of the GNU General Public License as published by
5     the Free Software Foundation, either version 3 of the License, or
6     (at your option) any later version.
7 
8     This program is distributed in the hope that it will be useful,
9     but WITHOUT ANY WARRANTY; without even the implied warranty of
10     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11     GNU General Public License for more details.
12 
13     You should have received a copy of the GNU General Public License
14     along with this program.  If not, see <https://www.gnu.org/licenses/>.
15  */
16 
17 #include <unistd.h>
18 #include <fcntl.h>
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <netinet/tcp.h>
24 #include <netinet/in.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <urcu.h>
28 #ifdef HAVE_SYS_UIO_H	// struct iovec (OpenBSD)
29 #include <sys/uio.h>
30 #endif // HAVE_SYS_UIO_H
31 
32 #include "knot/server/server.h"
33 #include "knot/server/tcp-handler.h"
34 #include "knot/common/log.h"
35 #include "knot/common/fdset.h"
36 #include "knot/nameserver/process_query.h"
37 #include "knot/query/layer.h"
38 #include "contrib/macros.h"
39 #include "contrib/mempattern.h"
40 #include "contrib/net.h"
41 #include "contrib/openbsd/strlcpy.h"
42 #include "contrib/sockaddr.h"
43 #include "contrib/time.h"
44 #include "contrib/ucw/mempool.h"
45 
46 /*! \brief TCP context data. */
47 typedef struct tcp_context {
48 	knot_layer_t layer;              /*!< Query processing layer. */
49 	server_t *server;                /*!< Name server structure. */
50 	struct iovec iov[2];             /*!< TX/RX buffers. */
51 	unsigned client_threshold;       /*!< Index of first TCP client. */
52 	struct timespec last_poll_time;  /*!< Time of the last socket poll. */
53 	bool is_throttled;               /*!< TCP connections throttling switch. */
54 	fdset_t set;                     /*!< Set of server/client sockets. */
55 	unsigned thread_id;              /*!< Thread identifier. */
56 	unsigned max_worker_fds;         /*!< Max TCP clients per worker configuration + no. of ifaces. */
57 	int idle_timeout;                /*!< [s] TCP idle timeout configuration. */
58 	int io_timeout;                  /*!< [ms] TCP send/recv timeout configuration. */
59 } tcp_context_t;
60 
61 #define TCP_SWEEP_INTERVAL 2 /*!< [secs] granularity of connection sweeping. */
62 
update_sweep_timer(struct timespec * timer)63 static void update_sweep_timer(struct timespec *timer)
64 {
65 	*timer = time_now();
66 	timer->tv_sec += TCP_SWEEP_INTERVAL;
67 }
68 
update_tcp_conf(tcp_context_t * tcp)69 static void update_tcp_conf(tcp_context_t *tcp)
70 {
71 	rcu_read_lock();
72 	conf_t *pconf = conf();
73 	tcp->max_worker_fds = tcp->client_threshold + \
74 		MAX(pconf->cache.srv_tcp_max_clients / pconf->cache.srv_tcp_threads, 1);
75 	tcp->idle_timeout = pconf->cache.srv_tcp_idle_timeout;
76 	tcp->io_timeout = pconf->cache.srv_tcp_io_timeout;
77 	rcu_read_unlock();
78 }
79 
client_addr(const struct sockaddr_storage * ss,char * out,size_t out_len)80 static void client_addr(const struct sockaddr_storage *ss, char *out, size_t out_len)
81 {
82 	if (ss->ss_family == AF_UNIX) {
83 		strlcpy(out, "UNIX", out_len);
84 	} else if (sockaddr_tostr(out, out_len, ss) < 0) {
85 		strlcpy(out, "unknown", out_len);
86 	}
87 }
88 
89 /*! \brief Sweep TCP connection. */
tcp_sweep(fdset_t * set,int fd,_unused_ void * data)90 static fdset_sweep_state_t tcp_sweep(fdset_t *set, int fd, _unused_ void *data)
91 {
92 	assert(set && fd >= 0);
93 
94 	/* Best-effort, name and shame. */
95 	struct sockaddr_storage ss = { 0 };
96 	socklen_t len = sizeof(struct sockaddr_storage);
97 	if (getpeername(fd, (struct sockaddr *)&ss, &len) == 0) {
98 		char addr_str[SOCKADDR_STRLEN];
99 		client_addr(&ss, addr_str, sizeof(addr_str));
100 		log_notice("TCP, terminated inactive client, address %s", addr_str);
101 	}
102 
103 	return FDSET_SWEEP;
104 }
105 
tcp_active_state(int state)106 static bool tcp_active_state(int state)
107 {
108 	return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
109 }
110 
tcp_send_state(int state)111 static bool tcp_send_state(int state)
112 {
113 	return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP);
114 }
115 
tcp_log_error(struct sockaddr_storage * ss,const char * operation,int ret)116 static void tcp_log_error(struct sockaddr_storage *ss, const char *operation, int ret)
117 {
118 	/* Don't log ECONN as it usually means client closed the connection. */
119 	if (ret == KNOT_ETIMEOUT) {
120 		char addr_str[SOCKADDR_STRLEN];
121 		client_addr(ss, addr_str, sizeof(addr_str));
122 		log_debug("TCP, failed to %s due to IO timeout, closing connection, address %s",
123 		          operation, addr_str);
124 	}
125 }
126 
tcp_set_ifaces(const iface_t * ifaces,size_t n_ifaces,fdset_t * fds,int thread_id)127 static unsigned tcp_set_ifaces(const iface_t *ifaces, size_t n_ifaces,
128                                fdset_t *fds, int thread_id)
129 {
130 	if (n_ifaces == 0) {
131 		return 0;
132 	}
133 
134 	for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) {
135 		if (i->fd_tcp_count == 0) { // Ignore XDP interface.
136 			assert(i->fd_xdp_count > 0);
137 			continue;
138 		}
139 
140 		int tcp_id = 0;
141 #ifdef ENABLE_REUSEPORT
142 		if (conf()->cache.srv_tcp_reuseport) {
143 			/* Note: thread_ids start with UDP threads, TCP threads follow. */
144 			assert((i->fd_udp_count <= thread_id) &&
145 			       (thread_id < i->fd_tcp_count + i->fd_udp_count));
146 
147 			tcp_id = thread_id - i->fd_udp_count;
148 		}
149 #endif
150 		int ret = fdset_add(fds, i->fd_tcp[tcp_id], FDSET_POLLIN, NULL);
151 		if (ret < 0) {
152 			return 0;
153 		}
154 	}
155 
156 	return fdset_get_length(fds);
157 }
158 
tcp_handle(tcp_context_t * tcp,int fd,struct iovec * rx,struct iovec * tx)159 static int tcp_handle(tcp_context_t *tcp, int fd, struct iovec *rx, struct iovec *tx)
160 {
161 	/* Get peer name. */
162 	struct sockaddr_storage ss;
163 	socklen_t addrlen = sizeof(struct sockaddr_storage);
164 	if (getpeername(fd, (struct sockaddr *)&ss, &addrlen) != 0) {
165 		return KNOT_EADDRNOTAVAIL;
166 	}
167 
168 	/* Create query processing parameter. */
169 	knotd_qdata_params_t params = {
170 		.remote = &ss,
171 		.socket = fd,
172 		.server = tcp->server,
173 		.thread_id = tcp->thread_id
174 	};
175 
176 	rx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
177 	tx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
178 
179 	/* Receive data. */
180 	int recv = net_dns_tcp_recv(fd, rx->iov_base, rx->iov_len, tcp->io_timeout);
181 	if (recv > 0) {
182 		rx->iov_len = recv;
183 	} else {
184 		tcp_log_error(&ss, "receive", recv);
185 		return KNOT_EOF;
186 	}
187 
188 	/* Initialize processing layer. */
189 	knot_layer_begin(&tcp->layer, &params);
190 
191 	/* Create packets. */
192 	knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, tcp->layer.mm);
193 	knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, tcp->layer.mm);
194 
195 	/* Input packet. */
196 	int ret = knot_pkt_parse(query, 0);
197 	if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT)
198 		query->parsed--; // artificially decreasing "parsed" leads to FORMERR
199 	}
200 	knot_layer_consume(&tcp->layer, query);
201 
202 	/* Resolve until NOOP or finished. */
203 	while (tcp_active_state(tcp->layer.state)) {
204 		knot_layer_produce(&tcp->layer, ans);
205 		/* Send, if response generation passed and wasn't ignored. */
206 		if (ans->size > 0 && tcp_send_state(tcp->layer.state)) {
207 			int sent = net_dns_tcp_send(fd, ans->wire, ans->size,
208 			                            tcp->io_timeout, NULL);
209 			if (sent != ans->size) {
210 				tcp_log_error(&ss, "send", sent);
211 				ret = KNOT_EOF;
212 				break;
213 			}
214 		}
215 	}
216 
217 	/* Reset after processing. */
218 	knot_layer_finish(&tcp->layer);
219 
220 	/* Flush per-query memory (including query and answer packets). */
221 	mp_flush(tcp->layer.mm->ctx);
222 
223 	return ret;
224 }
225 
tcp_event_accept(tcp_context_t * tcp,unsigned i)226 static void tcp_event_accept(tcp_context_t *tcp, unsigned i)
227 {
228 	/* Accept client. */
229 	int fd = fdset_get_fd(&tcp->set, i);
230 	int client = net_accept(fd, NULL);
231 	if (client >= 0) {
232 		/* Assign to fdset. */
233 		int idx = fdset_add(&tcp->set, client, FDSET_POLLIN, NULL);
234 		if (idx < 0) {
235 			close(client);
236 			return;
237 		}
238 
239 		/* Update watchdog timer. */
240 		(void)fdset_set_watchdog(&tcp->set, idx, tcp->idle_timeout);
241 	}
242 }
243 
tcp_event_serve(tcp_context_t * tcp,unsigned i)244 static int tcp_event_serve(tcp_context_t *tcp, unsigned i)
245 {
246 	int ret = tcp_handle(tcp, fdset_get_fd(&tcp->set, i),
247 	                     &tcp->iov[0], &tcp->iov[1]);
248 	if (ret == KNOT_EOK) {
249 		/* Update socket activity timer. */
250 		(void)fdset_set_watchdog(&tcp->set, i, tcp->idle_timeout);
251 	}
252 
253 	return ret;
254 }
255 
tcp_wait_for_events(tcp_context_t * tcp)256 static void tcp_wait_for_events(tcp_context_t *tcp)
257 {
258 	fdset_t *set = &tcp->set;
259 
260 	/* Check if throttled with many open TCP connections. */
261 	assert(fdset_get_length(set) <= tcp->max_worker_fds);
262 	tcp->is_throttled = fdset_get_length(set) == tcp->max_worker_fds;
263 
264 	/* If throttled, temporarily ignore new TCP connections. */
265 	unsigned offset = tcp->is_throttled ? tcp->client_threshold : 0;
266 
267 	/* Wait for events. */
268 	fdset_it_t it;
269 	(void)fdset_poll(set, &it, offset, TCP_SWEEP_INTERVAL * 1000);
270 
271 	/* Mark the time of last poll call. */
272 	tcp->last_poll_time = time_now();
273 
274 	/* Process events. */
275 	for (; !fdset_it_is_done(&it); fdset_it_next(&it)) {
276 		bool should_close = false;
277 		unsigned int idx = fdset_it_get_idx(&it);
278 		if (fdset_it_is_error(&it)) {
279 			should_close = (idx >= tcp->client_threshold);
280 		} else if (fdset_it_is_pollin(&it)) {
281 			/* Master sockets - new connection to accept. */
282 			if (idx < tcp->client_threshold) {
283 				/* Don't accept more clients than configured. */
284 				if (fdset_get_length(set) < tcp->max_worker_fds) {
285 					tcp_event_accept(tcp, idx);
286 				}
287 			/* Client sockets - already accepted connection or
288 			   closed connection :-( */
289 			} else if (tcp_event_serve(tcp, idx) != KNOT_EOK) {
290 				should_close = true;
291 			}
292 		}
293 
294 		/* Evaluate. */
295 		if (should_close) {
296 			fdset_it_remove(&it);
297 		}
298 	}
299 	fdset_it_commit(&it);
300 }
301 
tcp_master(dthread_t * thread)302 int tcp_master(dthread_t *thread)
303 {
304 	if (thread == NULL || thread->data == NULL) {
305 		return KNOT_EINVAL;
306 	}
307 
308 	iohandler_t *handler = (iohandler_t *)thread->data;
309 	int thread_id = handler->thread_id[dt_get_id(thread)];
310 
311 #ifdef ENABLE_REUSEPORT
312 	/* Set thread affinity to CPU core (overlaps with UDP/XDP). */
313 	if (conf()->cache.srv_tcp_reuseport) {
314 		unsigned cpu = dt_online_cpus();
315 		if (cpu > 1) {
316 			unsigned cpu_mask = (dt_get_id(thread) % cpu);
317 			dt_setaffinity(thread, &cpu_mask, 1);
318 		}
319 	}
320 #endif
321 
322 	int ret = KNOT_EOK;
323 
324 	/* Create big enough memory cushion. */
325 	knot_mm_t mm;
326 	mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE);
327 
328 	/* Create TCP answering context. */
329 	tcp_context_t tcp = {
330 		.server = handler->server,
331 		.is_throttled = false,
332 		.thread_id = thread_id,
333 	};
334 	knot_layer_init(&tcp.layer, &mm, process_query_layer());
335 
336 	/* Create iovec abstraction. */
337 	for (unsigned i = 0; i < 2; ++i) {
338 		tcp.iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
339 		tcp.iov[i].iov_base = malloc(tcp.iov[i].iov_len);
340 		if (tcp.iov[i].iov_base == NULL) {
341 			ret = KNOT_ENOMEM;
342 			goto finish;
343 		}
344 	}
345 
346 	/* Initialize sweep interval and TCP configuration. */
347 	struct timespec next_sweep;
348 	update_sweep_timer(&next_sweep);
349 	update_tcp_conf(&tcp);
350 
351 	/* Prepare initial buffer for listening and bound sockets. */
352 	if (fdset_init(&tcp.set, FDSET_RESIZE_STEP) != KNOT_EOK) {
353 		goto finish;
354 	}
355 
356 	/* Set descriptors for the configured interfaces. */
357 	tcp.client_threshold = tcp_set_ifaces(handler->server->ifaces,
358 	                                      handler->server->n_ifaces,
359 	                                      &tcp.set, thread_id);
360 	if (tcp.client_threshold == 0) {
361 		goto finish; /* Terminate on zero interfaces. */
362 	}
363 
364 	for (;;) {
365 		/* Check for cancellation. */
366 		if (dt_is_cancelled(thread)) {
367 			break;
368 		}
369 
370 		/* Serve client requests. */
371 		tcp_wait_for_events(&tcp);
372 
373 		/* Sweep inactive clients and refresh TCP configuration. */
374 		if (tcp.last_poll_time.tv_sec >= next_sweep.tv_sec) {
375 			fdset_sweep(&tcp.set, &tcp_sweep, NULL);
376 			update_sweep_timer(&next_sweep);
377 			update_tcp_conf(&tcp);
378 		}
379 	}
380 
381 finish:
382 	free(tcp.iov[0].iov_base);
383 	free(tcp.iov[1].iov_base);
384 	mp_delete(mm.ctx);
385 	fdset_clear(&tcp.set);
386 
387 	return ret;
388 }
389