1 /* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <https://www.gnu.org/licenses/>.
15 */
16
17 #include <unistd.h>
18 #include <fcntl.h>
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <netinet/tcp.h>
24 #include <netinet/in.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <urcu.h>
28 #ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD)
29 #include <sys/uio.h>
30 #endif // HAVE_SYS_UIO_H
31
32 #include "knot/server/server.h"
33 #include "knot/server/tcp-handler.h"
34 #include "knot/common/log.h"
35 #include "knot/common/fdset.h"
36 #include "knot/nameserver/process_query.h"
37 #include "knot/query/layer.h"
38 #include "contrib/macros.h"
39 #include "contrib/mempattern.h"
40 #include "contrib/net.h"
41 #include "contrib/openbsd/strlcpy.h"
42 #include "contrib/sockaddr.h"
43 #include "contrib/time.h"
44 #include "contrib/ucw/mempool.h"
45
46 /*! \brief TCP context data. */
47 typedef struct tcp_context {
48 knot_layer_t layer; /*!< Query processing layer. */
49 server_t *server; /*!< Name server structure. */
50 struct iovec iov[2]; /*!< TX/RX buffers. */
51 unsigned client_threshold; /*!< Index of first TCP client. */
52 struct timespec last_poll_time; /*!< Time of the last socket poll. */
53 bool is_throttled; /*!< TCP connections throttling switch. */
54 fdset_t set; /*!< Set of server/client sockets. */
55 unsigned thread_id; /*!< Thread identifier. */
56 unsigned max_worker_fds; /*!< Max TCP clients per worker configuration + no. of ifaces. */
57 int idle_timeout; /*!< [s] TCP idle timeout configuration. */
58 int io_timeout; /*!< [ms] TCP send/recv timeout configuration. */
59 } tcp_context_t;
60
61 #define TCP_SWEEP_INTERVAL 2 /*!< [secs] granularity of connection sweeping. */
62
update_sweep_timer(struct timespec * timer)63 static void update_sweep_timer(struct timespec *timer)
64 {
65 *timer = time_now();
66 timer->tv_sec += TCP_SWEEP_INTERVAL;
67 }
68
update_tcp_conf(tcp_context_t * tcp)69 static void update_tcp_conf(tcp_context_t *tcp)
70 {
71 rcu_read_lock();
72 conf_t *pconf = conf();
73 tcp->max_worker_fds = tcp->client_threshold + \
74 MAX(pconf->cache.srv_tcp_max_clients / pconf->cache.srv_tcp_threads, 1);
75 tcp->idle_timeout = pconf->cache.srv_tcp_idle_timeout;
76 tcp->io_timeout = pconf->cache.srv_tcp_io_timeout;
77 rcu_read_unlock();
78 }
79
client_addr(const struct sockaddr_storage * ss,char * out,size_t out_len)80 static void client_addr(const struct sockaddr_storage *ss, char *out, size_t out_len)
81 {
82 if (ss->ss_family == AF_UNIX) {
83 strlcpy(out, "UNIX", out_len);
84 } else if (sockaddr_tostr(out, out_len, ss) < 0) {
85 strlcpy(out, "unknown", out_len);
86 }
87 }
88
89 /*! \brief Sweep TCP connection. */
tcp_sweep(fdset_t * set,int fd,_unused_ void * data)90 static fdset_sweep_state_t tcp_sweep(fdset_t *set, int fd, _unused_ void *data)
91 {
92 assert(set && fd >= 0);
93
94 /* Best-effort, name and shame. */
95 struct sockaddr_storage ss = { 0 };
96 socklen_t len = sizeof(struct sockaddr_storage);
97 if (getpeername(fd, (struct sockaddr *)&ss, &len) == 0) {
98 char addr_str[SOCKADDR_STRLEN];
99 client_addr(&ss, addr_str, sizeof(addr_str));
100 log_notice("TCP, terminated inactive client, address %s", addr_str);
101 }
102
103 return FDSET_SWEEP;
104 }
105
tcp_active_state(int state)106 static bool tcp_active_state(int state)
107 {
108 return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
109 }
110
tcp_send_state(int state)111 static bool tcp_send_state(int state)
112 {
113 return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP);
114 }
115
tcp_log_error(struct sockaddr_storage * ss,const char * operation,int ret)116 static void tcp_log_error(struct sockaddr_storage *ss, const char *operation, int ret)
117 {
118 /* Don't log ECONN as it usually means client closed the connection. */
119 if (ret == KNOT_ETIMEOUT) {
120 char addr_str[SOCKADDR_STRLEN];
121 client_addr(ss, addr_str, sizeof(addr_str));
122 log_debug("TCP, failed to %s due to IO timeout, closing connection, address %s",
123 operation, addr_str);
124 }
125 }
126
tcp_set_ifaces(const iface_t * ifaces,size_t n_ifaces,fdset_t * fds,int thread_id)127 static unsigned tcp_set_ifaces(const iface_t *ifaces, size_t n_ifaces,
128 fdset_t *fds, int thread_id)
129 {
130 if (n_ifaces == 0) {
131 return 0;
132 }
133
134 for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) {
135 if (i->fd_tcp_count == 0) { // Ignore XDP interface.
136 assert(i->fd_xdp_count > 0);
137 continue;
138 }
139
140 int tcp_id = 0;
141 #ifdef ENABLE_REUSEPORT
142 if (conf()->cache.srv_tcp_reuseport) {
143 /* Note: thread_ids start with UDP threads, TCP threads follow. */
144 assert((i->fd_udp_count <= thread_id) &&
145 (thread_id < i->fd_tcp_count + i->fd_udp_count));
146
147 tcp_id = thread_id - i->fd_udp_count;
148 }
149 #endif
150 int ret = fdset_add(fds, i->fd_tcp[tcp_id], FDSET_POLLIN, NULL);
151 if (ret < 0) {
152 return 0;
153 }
154 }
155
156 return fdset_get_length(fds);
157 }
158
tcp_handle(tcp_context_t * tcp,int fd,struct iovec * rx,struct iovec * tx)159 static int tcp_handle(tcp_context_t *tcp, int fd, struct iovec *rx, struct iovec *tx)
160 {
161 /* Get peer name. */
162 struct sockaddr_storage ss;
163 socklen_t addrlen = sizeof(struct sockaddr_storage);
164 if (getpeername(fd, (struct sockaddr *)&ss, &addrlen) != 0) {
165 return KNOT_EADDRNOTAVAIL;
166 }
167
168 /* Create query processing parameter. */
169 knotd_qdata_params_t params = {
170 .remote = &ss,
171 .socket = fd,
172 .server = tcp->server,
173 .thread_id = tcp->thread_id
174 };
175
176 rx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
177 tx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
178
179 /* Receive data. */
180 int recv = net_dns_tcp_recv(fd, rx->iov_base, rx->iov_len, tcp->io_timeout);
181 if (recv > 0) {
182 rx->iov_len = recv;
183 } else {
184 tcp_log_error(&ss, "receive", recv);
185 return KNOT_EOF;
186 }
187
188 /* Initialize processing layer. */
189 knot_layer_begin(&tcp->layer, ¶ms);
190
191 /* Create packets. */
192 knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, tcp->layer.mm);
193 knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, tcp->layer.mm);
194
195 /* Input packet. */
196 int ret = knot_pkt_parse(query, 0);
197 if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT)
198 query->parsed--; // artificially decreasing "parsed" leads to FORMERR
199 }
200 knot_layer_consume(&tcp->layer, query);
201
202 /* Resolve until NOOP or finished. */
203 while (tcp_active_state(tcp->layer.state)) {
204 knot_layer_produce(&tcp->layer, ans);
205 /* Send, if response generation passed and wasn't ignored. */
206 if (ans->size > 0 && tcp_send_state(tcp->layer.state)) {
207 int sent = net_dns_tcp_send(fd, ans->wire, ans->size,
208 tcp->io_timeout, NULL);
209 if (sent != ans->size) {
210 tcp_log_error(&ss, "send", sent);
211 ret = KNOT_EOF;
212 break;
213 }
214 }
215 }
216
217 /* Reset after processing. */
218 knot_layer_finish(&tcp->layer);
219
220 /* Flush per-query memory (including query and answer packets). */
221 mp_flush(tcp->layer.mm->ctx);
222
223 return ret;
224 }
225
tcp_event_accept(tcp_context_t * tcp,unsigned i)226 static void tcp_event_accept(tcp_context_t *tcp, unsigned i)
227 {
228 /* Accept client. */
229 int fd = fdset_get_fd(&tcp->set, i);
230 int client = net_accept(fd, NULL);
231 if (client >= 0) {
232 /* Assign to fdset. */
233 int idx = fdset_add(&tcp->set, client, FDSET_POLLIN, NULL);
234 if (idx < 0) {
235 close(client);
236 return;
237 }
238
239 /* Update watchdog timer. */
240 (void)fdset_set_watchdog(&tcp->set, idx, tcp->idle_timeout);
241 }
242 }
243
tcp_event_serve(tcp_context_t * tcp,unsigned i)244 static int tcp_event_serve(tcp_context_t *tcp, unsigned i)
245 {
246 int ret = tcp_handle(tcp, fdset_get_fd(&tcp->set, i),
247 &tcp->iov[0], &tcp->iov[1]);
248 if (ret == KNOT_EOK) {
249 /* Update socket activity timer. */
250 (void)fdset_set_watchdog(&tcp->set, i, tcp->idle_timeout);
251 }
252
253 return ret;
254 }
255
tcp_wait_for_events(tcp_context_t * tcp)256 static void tcp_wait_for_events(tcp_context_t *tcp)
257 {
258 fdset_t *set = &tcp->set;
259
260 /* Check if throttled with many open TCP connections. */
261 assert(fdset_get_length(set) <= tcp->max_worker_fds);
262 tcp->is_throttled = fdset_get_length(set) == tcp->max_worker_fds;
263
264 /* If throttled, temporarily ignore new TCP connections. */
265 unsigned offset = tcp->is_throttled ? tcp->client_threshold : 0;
266
267 /* Wait for events. */
268 fdset_it_t it;
269 (void)fdset_poll(set, &it, offset, TCP_SWEEP_INTERVAL * 1000);
270
271 /* Mark the time of last poll call. */
272 tcp->last_poll_time = time_now();
273
274 /* Process events. */
275 for (; !fdset_it_is_done(&it); fdset_it_next(&it)) {
276 bool should_close = false;
277 unsigned int idx = fdset_it_get_idx(&it);
278 if (fdset_it_is_error(&it)) {
279 should_close = (idx >= tcp->client_threshold);
280 } else if (fdset_it_is_pollin(&it)) {
281 /* Master sockets - new connection to accept. */
282 if (idx < tcp->client_threshold) {
283 /* Don't accept more clients than configured. */
284 if (fdset_get_length(set) < tcp->max_worker_fds) {
285 tcp_event_accept(tcp, idx);
286 }
287 /* Client sockets - already accepted connection or
288 closed connection :-( */
289 } else if (tcp_event_serve(tcp, idx) != KNOT_EOK) {
290 should_close = true;
291 }
292 }
293
294 /* Evaluate. */
295 if (should_close) {
296 fdset_it_remove(&it);
297 }
298 }
299 fdset_it_commit(&it);
300 }
301
tcp_master(dthread_t * thread)302 int tcp_master(dthread_t *thread)
303 {
304 if (thread == NULL || thread->data == NULL) {
305 return KNOT_EINVAL;
306 }
307
308 iohandler_t *handler = (iohandler_t *)thread->data;
309 int thread_id = handler->thread_id[dt_get_id(thread)];
310
311 #ifdef ENABLE_REUSEPORT
312 /* Set thread affinity to CPU core (overlaps with UDP/XDP). */
313 if (conf()->cache.srv_tcp_reuseport) {
314 unsigned cpu = dt_online_cpus();
315 if (cpu > 1) {
316 unsigned cpu_mask = (dt_get_id(thread) % cpu);
317 dt_setaffinity(thread, &cpu_mask, 1);
318 }
319 }
320 #endif
321
322 int ret = KNOT_EOK;
323
324 /* Create big enough memory cushion. */
325 knot_mm_t mm;
326 mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE);
327
328 /* Create TCP answering context. */
329 tcp_context_t tcp = {
330 .server = handler->server,
331 .is_throttled = false,
332 .thread_id = thread_id,
333 };
334 knot_layer_init(&tcp.layer, &mm, process_query_layer());
335
336 /* Create iovec abstraction. */
337 for (unsigned i = 0; i < 2; ++i) {
338 tcp.iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
339 tcp.iov[i].iov_base = malloc(tcp.iov[i].iov_len);
340 if (tcp.iov[i].iov_base == NULL) {
341 ret = KNOT_ENOMEM;
342 goto finish;
343 }
344 }
345
346 /* Initialize sweep interval and TCP configuration. */
347 struct timespec next_sweep;
348 update_sweep_timer(&next_sweep);
349 update_tcp_conf(&tcp);
350
351 /* Prepare initial buffer for listening and bound sockets. */
352 if (fdset_init(&tcp.set, FDSET_RESIZE_STEP) != KNOT_EOK) {
353 goto finish;
354 }
355
356 /* Set descriptors for the configured interfaces. */
357 tcp.client_threshold = tcp_set_ifaces(handler->server->ifaces,
358 handler->server->n_ifaces,
359 &tcp.set, thread_id);
360 if (tcp.client_threshold == 0) {
361 goto finish; /* Terminate on zero interfaces. */
362 }
363
364 for (;;) {
365 /* Check for cancellation. */
366 if (dt_is_cancelled(thread)) {
367 break;
368 }
369
370 /* Serve client requests. */
371 tcp_wait_for_events(&tcp);
372
373 /* Sweep inactive clients and refresh TCP configuration. */
374 if (tcp.last_poll_time.tv_sec >= next_sweep.tv_sec) {
375 fdset_sweep(&tcp.set, &tcp_sweep, NULL);
376 update_sweep_timer(&next_sweep);
377 update_tcp_conf(&tcp);
378 }
379 }
380
381 finish:
382 free(tcp.iov[0].iov_base);
383 free(tcp.iov[1].iov_base);
384 mp_delete(mm.ctx);
385 fdset_clear(&tcp.set);
386
387 return ret;
388 }
389