1 /*  Copyright (C) 2014-2020 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2  *  SPDX-License-Identifier: GPL-3.0-or-later
3  */
4 
5 #include "kresconfig.h"
6 #include "daemon/worker.h"
7 
8 #include <uv.h>
9 #include <lua.h>
10 #include <lauxlib.h>
11 #include <libknot/packet/pkt.h>
12 #include <libknot/descriptor.h>
13 #include <contrib/cleanup.h>
14 #include <contrib/ucw/lib.h>
15 #include <contrib/ucw/mempool.h>
16 #if defined(__GLIBC__) && defined(_GNU_SOURCE)
17 #include <malloc.h>
18 #endif
19 #include <sys/types.h>
20 #include <unistd.h>
21 #include <gnutls/gnutls.h>
22 
23 #if ENABLE_XDP
24 	#include <libknot/xdp/xdp.h>
25 #endif
26 
27 #include "daemon/bindings/api.h"
28 #include "daemon/engine.h"
29 #include "daemon/io.h"
30 #include "daemon/session.h"
31 #include "daemon/tls.h"
32 #include "daemon/http.h"
33 #include "daemon/udp_queue.h"
34 #include "daemon/zimport.h"
35 #include "lib/layer.h"
36 #include "lib/utils.h"
37 
38 
39 /* Magic defaults for the worker. */
40 #ifndef MP_FREELIST_SIZE
41 # ifdef __clang_analyzer__
42 #  define MP_FREELIST_SIZE 0
43 # else
44 #  define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
45 # endif
46 #endif
47 #ifndef QUERY_RATE_THRESHOLD
48 #define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
49 #endif
50 #ifndef MAX_PIPELINED
51 #define MAX_PIPELINED 100
52 #endif
53 
54 #define VERBOSE_MSG(qry, ...) QRVERBOSE(qry, WORKER, __VA_ARGS__)
55 
56 /** Client request state. */
57 struct request_ctx
58 {
59 	struct kr_request req;
60 
61 	struct worker_ctx *worker;
62 	struct qr_task *task;
63 	struct {
64 		/** NULL if the request didn't come over network. */
65 		struct session *session;
66 		/** Requestor's address; separate because of UDP session "sharing". */
67 		union inaddr addr;
68 		/** Local address.  For AF_XDP we couldn't use session's,
69 		 * as the address might be different every time. */
70 		union inaddr dst_addr;
71 		/** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */
72 		uint8_t eth_addrs[2][6];
73 	} source;
74 };
75 
76 /** Query resolution task. */
77 struct qr_task
78 {
79 	struct request_ctx *ctx;
80 	knot_pkt_t *pktbuf;
81 	qr_tasklist_t waiting;
82 	struct session *pending[MAX_PENDING];
83 	uint16_t pending_count;
84 	uint16_t timeouts;
85 	uint16_t iter_count;
86 	uint32_t refs;
87 	bool finished : 1;
88 	bool leading  : 1;
89 	uint64_t creation_time;
90 	uint64_t send_time;
91 	uint64_t recv_time;
92 	struct kr_transport *transport;
93 };
94 
95 
96 /* Convenience macros */
97 #define qr_task_ref(task) \
98 	do { ++(task)->refs; } while(0)
99 #define qr_task_unref(task) \
100 	do { \
101 		if (task) \
102 			kr_require((task)->refs > 0); \
103 		if ((task) && --(task)->refs == 0) \
104 			qr_task_free((task)); \
105 	} while (0)
106 
107 /** @internal get key for tcp session
108  *  @note kr_straddr() return pointer to static string
109  */
110 #define tcpsess_key(addr) kr_straddr(addr)
111 
112 /* Forward decls */
113 static void qr_task_free(struct qr_task *task);
114 static int qr_task_step(struct qr_task *task,
115 			const struct sockaddr *packet_source,
116 			knot_pkt_t *packet);
117 static int qr_task_send(struct qr_task *task, struct session *session,
118 			const struct sockaddr *addr, knot_pkt_t *pkt);
119 static int qr_task_finalize(struct qr_task *task, int state);
120 static void qr_task_complete(struct qr_task *task);
121 struct session* worker_find_tcp_connected(struct worker_ctx *worker,
122 						 const struct sockaddr *addr);
123 static int worker_add_tcp_waiting(struct worker_ctx *worker,
124 				  const struct sockaddr *addr,
125 				  struct session *session);
126 struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
127 					       const struct sockaddr *addr);
128 static void on_tcp_connect_timeout(uv_timer_t *timer);
129 static void on_udp_timeout(uv_timer_t *timer);
130 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
131 
132 
133 struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
134 struct worker_ctx *the_worker = NULL;
135 
136 /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
137  *  socktype is SOCK_* */
ioreq_spawn(struct worker_ctx * worker,int socktype,sa_family_t family,bool has_tls,bool has_http)138 static uv_handle_t *ioreq_spawn(struct worker_ctx *worker,
139 				int socktype, sa_family_t family, bool has_tls,
140 				bool has_http)
141 {
142 	bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
143 			&& (family == AF_INET  || family == AF_INET6);
144 	if (kr_fails_assert(precond)) {
145 		kr_log_debug(WORKER, "ioreq_spawn: pre-condition failed\n");
146 		return NULL;
147 	}
148 
149 	/* Create connection for iterative query */
150 	uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
151 					? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
152 	if (!handle) {
153 		return NULL;
154 	}
155 	int ret = io_create(worker->loop, handle, socktype, family, has_tls, has_http);
156 	if (ret) {
157 		if (ret == UV_EMFILE) {
158 			worker->too_many_open = true;
159 			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
160 		}
161 		free(handle);
162 		return NULL;
163 	}
164 
165 	/* Bind to outgoing address, according to IP v4/v6. */
166 	union inaddr *addr;
167 	if (family == AF_INET) {
168 		addr = (union inaddr *)&worker->out_addr4;
169 	} else {
170 		addr = (union inaddr *)&worker->out_addr6;
171 	}
172 	if (addr->ip.sa_family != AF_UNSPEC) {
173 		if (kr_fails_assert(addr->ip.sa_family == family)) {
174 			io_free(handle);
175 			return NULL;
176 		}
177 		if (socktype == SOCK_DGRAM) {
178 			uv_udp_t *udp = (uv_udp_t *)handle;
179 			ret = uv_udp_bind(udp, &addr->ip, 0);
180 		} else if (socktype == SOCK_STREAM){
181 			uv_tcp_t *tcp = (uv_tcp_t *)handle;
182 			ret = uv_tcp_bind(tcp, &addr->ip, 0);
183 		}
184 	}
185 
186 	if (ret != 0) {
187 		io_free(handle);
188 		return NULL;
189 	}
190 
191 	/* Set current handle as a subrequest type. */
192 	struct session *session = handle->data;
193 	session_flags(session)->outgoing = true;
194 	/* Connect or issue query datagram */
195 	return handle;
196 }
197 
ioreq_kill_pending(struct qr_task * task)198 static void ioreq_kill_pending(struct qr_task *task)
199 {
200 	for (uint16_t i = 0; i < task->pending_count; ++i) {
201 		session_kill_ioreq(task->pending[i], task);
202 	}
203 	task->pending_count = 0;
204 }
205 
206 /** @cond This memory layout is internal to mempool.c, use only for debugging. */
207 #if defined(__SANITIZE_ADDRESS__)
208 struct mempool_chunk {
209   struct mempool_chunk *next;
210   size_t size;
211 };
mp_poison(struct mempool * mp,bool poison)212 static void mp_poison(struct mempool *mp, bool poison)
213 {
214 	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
215 		kr_asan_unpoison(mp, sizeof(*mp));
216 	}
217 	struct mempool_chunk *chunk = mp->state.last[0];
218 	void *chunk_off = (uint8_t *)chunk - chunk->size;
219 	if (poison) {
220 		kr_asan_poison(chunk_off, chunk->size);
221 	} else {
222 		kr_asan_unpoison(chunk_off, chunk->size);
223 	}
224 }
225 #else
226 #define mp_poison(mp, enable)
227 #endif
228 /** @endcond */
229 
230 /** Get a mempool.  (Recycle if possible.)  */
pool_borrow(struct worker_ctx * worker)231 static inline struct mempool *pool_borrow(struct worker_ctx *worker)
232 {
233 	struct mempool *mp = NULL;
234 	if (worker->pool_mp.len > 0) {
235 		mp = array_tail(worker->pool_mp);
236 		array_pop(worker->pool_mp);
237 		mp_poison(mp, 0);
238 	} else { /* No mempool on the freelist, create new one */
239 		mp = mp_new (4 * CPU_PAGE_SIZE);
240 	}
241 	return mp;
242 }
243 
244 /** Return a mempool.  (Cache them up to some count.) */
pool_release(struct worker_ctx * worker,struct mempool * mp)245 static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
246 {
247 	if (worker->pool_mp.len < MP_FREELIST_SIZE) {
248 		mp_flush(mp);
249 		array_push(worker->pool_mp, mp);
250 		mp_poison(mp, 1);
251 	} else {
252 		mp_delete(mp);
253 	}
254 }
255 
256 /** Create a key for an outgoing subrequest: qname, qclass, qtype.
257  * @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
258  * @return key length if successful or an error
259  */
260 static const size_t SUBREQ_KEY_LEN = KR_RRKEY_LEN;
subreq_key(char * dst,knot_pkt_t * pkt)261 static int subreq_key(char *dst, knot_pkt_t *pkt)
262 {
263 	kr_require(pkt);
264 	return kr_rrkey(dst, knot_pkt_qclass(pkt), knot_pkt_qname(pkt),
265 			knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
266 }
267 
268 #if ENABLE_XDP
alloc_wire_cb(struct kr_request * req,uint16_t * maxlen)269 static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen)
270 {
271 	if (kr_fails_assert(maxlen))
272 		return NULL;
273 	struct request_ctx *ctx = (struct request_ctx *)req;
274 	/* We know it's an AF_XDP socket; otherwise this CB isn't assigned. */
275 	uv_handle_t *handle = session_get_handle(ctx->source.session);
276 	if (kr_fails_assert(handle->type == UV_POLL))
277 		return NULL;
278 	xdp_handle_data_t *xhd = handle->data;
279 	knot_xdp_msg_t out;
280 	bool ipv6 = ctx->source.addr.ip.sa_family == AF_INET6;
281 	int ret = knot_xdp_send_alloc(xhd->socket,
282 			#if KNOT_VERSION_HEX >= 0x030100
283 					ipv6 ? KNOT_XDP_MSG_IPV6 : 0, &out);
284 			#else
285 					ipv6, &out, NULL);
286 			#endif
287 	if (ret != KNOT_EOK) {
288 		kr_assert(ret == KNOT_ENOMEM);
289 		*maxlen = 0;
290 		return NULL;
291 	}
292 	*maxlen = MIN(*maxlen, out.payload.iov_len);
293 	/* It's most convenient to fill the MAC addresses at this point. */
294 	memcpy(out.eth_from, &ctx->source.eth_addrs[0], 6);
295 	memcpy(out.eth_to,   &ctx->source.eth_addrs[1], 6);
296 	return out.payload.iov_base;
297 }
free_wire(const struct request_ctx * ctx)298 static void free_wire(const struct request_ctx *ctx)
299 {
300 	if (kr_fails_assert(ctx->req.alloc_wire_cb == alloc_wire_cb))
301 		return;
302 	knot_pkt_t *ans = ctx->req.answer;
303 	if (unlikely(ans == NULL)) /* dropped */
304 		return;
305 	if (likely(ans->wire == NULL)) /* sent most likely */
306 		return;
307 	/* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */
308 	uv_handle_t *handle = session_get_handle(ctx->source.session);
309 	if (kr_fails_assert(handle->type == UV_POLL))
310 		return;
311 	xdp_handle_data_t *xhd = handle->data;
312 	/* Freeing is done by sending an empty packet (the API won't really send it). */
313 	knot_xdp_msg_t out;
314 	out.payload.iov_base = ans->wire;
315 	out.payload.iov_len = 0;
316 	uint32_t sent;
317 	int ret = knot_xdp_send(xhd->socket, &out, 1, &sent);
318 	kr_assert(ret == KNOT_EOK && sent == 0);
319 	kr_log_debug(XDP, "freed unsent buffer, ret = %d\n", ret);
320 }
321 #endif
322 /* Helper functions for transport selection */
is_tls_capable(struct sockaddr * address)323 static inline bool is_tls_capable(struct sockaddr *address) {
324 	tls_client_param_t *tls_entry = tls_client_param_get(the_worker->engine->net.tls_client_params, address);
325 	return tls_entry;
326 }
327 
is_tcp_connected(struct sockaddr * address)328 static inline bool is_tcp_connected(struct sockaddr *address) {
329 	return worker_find_tcp_connected(the_worker, address);
330 }
331 
is_tcp_waiting(struct sockaddr * address)332 static inline bool is_tcp_waiting(struct sockaddr *address) {
333 	return worker_find_tcp_waiting(the_worker, address);
334 }
335 
336 /** Create and initialize a request_ctx (on a fresh mempool).
337  *
338  * session and addr point to the source of the request, and they are NULL
339  * in case the request didn't come from network.
340  */
request_create(struct worker_ctx * worker,struct session * session,const struct sockaddr * addr,const struct sockaddr * dst_addr,const uint8_t * eth_from,const uint8_t * eth_to,uint32_t uid)341 static struct request_ctx *request_create(struct worker_ctx *worker,
342 					  struct session *session,
343 					  const struct sockaddr *addr,
344 					  const struct sockaddr *dst_addr,
345 					  const uint8_t *eth_from,
346 					  const uint8_t *eth_to,
347 					  uint32_t uid)
348 {
349 	knot_mm_t pool = {
350 		.ctx = pool_borrow(worker),
351 		.alloc = (knot_mm_alloc_t) mp_alloc
352 	};
353 
354 	/* Create request context */
355 	struct request_ctx *ctx = mm_calloc(&pool, 1, sizeof(*ctx));
356 	if (!ctx) {
357 		pool_release(worker, pool.ctx);
358 		return NULL;
359 	}
360 
361 	/* TODO Relocate pool to struct request */
362 	ctx->worker = worker;
363 	if (session && kr_fails_assert(session_flags(session)->outgoing == false)) {
364 		pool_release(worker, pool.ctx);
365 		return NULL;
366 	}
367 	ctx->source.session = session;
368 	if (kr_fails_assert(!!eth_to == !!eth_from)) {
369 		pool_release(worker, pool.ctx);
370 		return NULL;
371 	}
372 	const bool is_xdp = eth_to != NULL;
373 	if (is_xdp) {
374 	#if ENABLE_XDP
375 		if (kr_fails_assert(session)) {
376 			pool_release(worker, pool.ctx);
377 			return NULL;
378 		}
379 		memcpy(&ctx->source.eth_addrs[0], eth_to,   sizeof(ctx->source.eth_addrs[0]));
380 		memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1]));
381 		ctx->req.alloc_wire_cb = alloc_wire_cb;
382 	#else
383 		kr_assert(!EINVAL);
384 		pool_release(worker, pool.ctx);
385 		return NULL;
386 	#endif
387 	}
388 
389 	struct kr_request *req = &ctx->req;
390 	req->pool = pool;
391 	req->vars_ref = LUA_NOREF;
392 	req->uid = uid;
393 	req->qsource.flags.xdp = is_xdp;
394 	array_init(req->qsource.headers);
395 	if (session) {
396 		req->qsource.flags.tcp = session_get_handle(session)->type == UV_TCP;
397 		req->qsource.flags.tls = session_flags(session)->has_tls;
398 		req->qsource.flags.http = session_flags(session)->has_http;
399 		req->qsource.stream_id = -1;
400 #if ENABLE_DOH2
401 		if (req->qsource.flags.http) {
402 			struct http_ctx *http_ctx = session_http_get_server_ctx(session);
403 			struct http_stream stream = queue_head(http_ctx->streams);
404 			req->qsource.stream_id = stream.id;
405 			if (stream.headers) {
406 				req->qsource.headers = *stream.headers;
407 				free(stream.headers);
408 				stream.headers = NULL;
409 			}
410 		}
411 #endif
412 		/* We need to store a copy of peer address. */
413 		memcpy(&ctx->source.addr.ip, addr, kr_sockaddr_len(addr));
414 		req->qsource.addr = &ctx->source.addr.ip;
415 		if (!dst_addr) /* We wouldn't have to copy in this case, but for consistency. */
416 			dst_addr = session_get_sockname(session);
417 		memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr));
418 		req->qsource.dst_addr = &ctx->source.dst_addr.ip;
419 	}
420 
421 	req->selection_context.is_tls_capable = is_tls_capable;
422 	req->selection_context.is_tcp_connected = is_tcp_connected;
423 	req->selection_context.is_tcp_waiting = is_tcp_waiting;
424 	array_init(req->selection_context.forwarding_targets);
425 	array_reserve_mm(req->selection_context.forwarding_targets, 1, kr_memreserve, &req->pool);
426 
427 	worker->stats.rconcurrent += 1;
428 
429 	return ctx;
430 }
431 
432 /** More initialization, related to the particular incoming query/packet. */
request_start(struct request_ctx * ctx,knot_pkt_t * query)433 static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
434 {
435 	if (kr_fails_assert(query && ctx))
436 		return kr_error(EINVAL);
437 
438 	struct kr_request *req = &ctx->req;
439 	req->qsource.size = query->size;
440 	if (knot_pkt_has_tsig(query)) {
441 		req->qsource.size += query->tsig_wire.len;
442 	}
443 
444 	knot_pkt_t *pkt = knot_pkt_new(NULL, req->qsource.size, &req->pool);
445 	if (!pkt) {
446 		return kr_error(ENOMEM);
447 	}
448 
449 	int ret = knot_pkt_copy(pkt, query);
450 	if (ret != KNOT_EOK && ret != KNOT_ETRAIL) {
451 		return kr_error(ENOMEM);
452 	}
453 	req->qsource.packet = pkt;
454 
455 	/* Start resolution */
456 	struct worker_ctx *worker = ctx->worker;
457 	struct engine *engine = worker->engine;
458 	kr_resolve_begin(req, &engine->resolver);
459 	worker->stats.queries += 1;
460 	return kr_ok();
461 }
462 
request_free(struct request_ctx * ctx)463 static void request_free(struct request_ctx *ctx)
464 {
465 	struct worker_ctx *worker = ctx->worker;
466 	/* Dereference any Lua vars table if exists */
467 	if (ctx->req.vars_ref != LUA_NOREF) {
468 		lua_State *L = worker->engine->L;
469 		/* Get worker variables table */
470 		lua_rawgeti(L, LUA_REGISTRYINDEX, worker->vars_table_ref);
471 		/* Get next free element (position 0) and store it under current reference (forming a list) */
472 		lua_rawgeti(L, -1, 0);
473 		lua_rawseti(L, -2, ctx->req.vars_ref);
474 		/* Set current reference as the next free element */
475 		lua_pushinteger(L, ctx->req.vars_ref);
476 		lua_rawseti(L, -2, 0);
477 		lua_pop(L, 1);
478 		ctx->req.vars_ref = LUA_NOREF;
479 	}
480 	/* Free HTTP/2 headers for DoH requests. */
481 	for(int i = 0; i < ctx->req.qsource.headers.len; i++) {
482 		free(ctx->req.qsource.headers.at[i].name);
483 		free(ctx->req.qsource.headers.at[i].value);
484 	}
485 	array_clear(ctx->req.qsource.headers);
486 
487 	/* Make sure to free XDP buffer in case it wasn't sent. */
488 	if (ctx->req.alloc_wire_cb) {
489 	#if ENABLE_XDP
490 		free_wire(ctx);
491 	#else
492 		kr_assert(!EINVAL);
493 	#endif
494 	}
495 	/* Return mempool to ring or free it if it's full */
496 	pool_release(worker, ctx->req.pool.ctx);
497 	/* @note The 'task' is invalidated from now on. */
498 	worker->stats.rconcurrent -= 1;
499 }
500 
qr_task_create(struct request_ctx * ctx)501 static struct qr_task *qr_task_create(struct request_ctx *ctx)
502 {
503 	/* Choose (initial) pktbuf size.  As it is now, pktbuf can be used
504 	 * for UDP answers from upstream *and* from cache
505 	 * and for sending queries upstream */
506 	uint16_t pktbuf_max = KR_EDNS_PAYLOAD;
507 	const knot_rrset_t *opt_our = ctx->worker->engine->resolver.upstream_opt_rr;
508 	if (opt_our) {
509 		pktbuf_max = MAX(pktbuf_max, knot_edns_get_payload(opt_our));
510 	}
511 
512 	/* Create resolution task */
513 	struct qr_task *task = mm_calloc(&ctx->req.pool, 1, sizeof(*task));
514 	if (!task) {
515 		return NULL;
516 	}
517 
518 	/* Create packet buffers for answer and subrequests */
519 	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
520 	if (!pktbuf) {
521 		mm_free(&ctx->req.pool, task);
522 		return NULL;
523 	}
524 	pktbuf->size = 0;
525 
526 	task->ctx = ctx;
527 	task->pktbuf = pktbuf;
528 	array_init(task->waiting);
529 	task->refs = 0;
530 	kr_assert(ctx->task == NULL);
531 	ctx->task = task;
532 	/* Make the primary reference to task. */
533 	qr_task_ref(task);
534 	task->creation_time = kr_now();
535 	ctx->worker->stats.concurrent += 1;
536 	return task;
537 }
538 
539 /* This is called when the task refcount is zero, free memory. */
qr_task_free(struct qr_task * task)540 static void qr_task_free(struct qr_task *task)
541 {
542 	struct request_ctx *ctx = task->ctx;
543 
544 	if (kr_fails_assert(ctx))
545 		return;
546 
547 	struct worker_ctx *worker = ctx->worker;
548 
549 	if (ctx->task == NULL) {
550 		request_free(ctx);
551 	}
552 
553 	/* Update stats */
554 	worker->stats.concurrent -= 1;
555 }
556 
557 /*@ Register new qr_task within session. */
qr_task_register(struct qr_task * task,struct session * session)558 static int qr_task_register(struct qr_task *task, struct session *session)
559 {
560 	if (kr_fails_assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP))
561 		return kr_error(EINVAL);
562 
563 	session_tasklist_add(session, task);
564 
565 	struct request_ctx *ctx = task->ctx;
566 	if (kr_fails_assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)))
567 		return kr_error(EINVAL);
568 	ctx->source.session = session;
569 	/* Soft-limit on parallel queries, there is no "slow down" RCODE
570 	 * that we could use to signalize to client, but we can stop reading,
571 	 * an in effect shrink TCP window size. To get more precise throttling,
572 	 * we would need to copy remainder of the unread buffer and reassemble
573 	 * when resuming reading. This is NYI.  */
574 	if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max &&
575 	    !session_flags(session)->throttled && !session_flags(session)->closing) {
576 		session_stop_read(session);
577 		session_flags(session)->throttled = true;
578 	}
579 
580 	return 0;
581 }
582 
qr_task_complete(struct qr_task * task)583 static void qr_task_complete(struct qr_task *task)
584 {
585 	struct request_ctx *ctx = task->ctx;
586 
587 	/* Kill pending I/O requests */
588 	ioreq_kill_pending(task);
589 	kr_require(task->waiting.len == 0);
590 	kr_require(task->leading == false);
591 
592 	struct session *s = ctx->source.session;
593 	if (s) {
594 		kr_require(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
595 		ctx->source.session = NULL;
596 		session_tasklist_del(s, task);
597 	}
598 
599 	/* Release primary reference to task. */
600 	if (ctx->task == task) {
601 		ctx->task = NULL;
602 		qr_task_unref(task);
603 	}
604 }
605 
606 /* This is called when we send subrequest / answer */
qr_task_on_send(struct qr_task * task,const uv_handle_t * handle,int status)607 int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
608 {
609 	if (task->finished) {
610 		kr_require(task->leading == false);
611 		qr_task_complete(task);
612 	}
613 
614 	if (!handle || kr_fails_assert(handle->data))
615 		return status;
616 	struct session* s = handle->data;
617 
618 	if (handle->type == UV_UDP && session_flags(s)->outgoing) {
619 		// This should ensure that we are only dealing with our question to upstream
620 		if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
621 			return status;
622 		// start the timer
623 		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
624 		if (kr_fails_assert(qry && task->transport))
625 			return status;
626 		size_t timeout = task->transport->timeout;
627 		int ret = session_timer_start(s, on_udp_timeout, timeout, 0);
628 		/* Start next step with timeout, fatal if can't start a timer. */
629 		if (ret != 0) {
630 			subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
631 			qr_task_finalize(task, KR_STATE_FAIL);
632 		}
633 	}
634 
635 	if (handle->type == UV_TCP) {
636 		if (status != 0) { // session probably not usable anymore; typically: ECONNRESET
637 			const struct kr_request *req = &task->ctx->req;
638 			if (kr_log_is_debug(WORKER, req)) {
639 				const char *peer_str = NULL;
640 				if (!session_flags(s)->outgoing) {
641 					peer_str = "hidden"; // avoid logging downstream IPs
642 				} else if (task->transport) {
643 					peer_str = kr_straddr(&task->transport->address.ip);
644 				}
645 				if (!peer_str)
646 					peer_str = "unknown"; // probably shouldn't happen
647 				kr_log_req(req, 0, 0, WORKER,
648 						"=> disconnected from '%s': %s\n",
649 						peer_str, uv_strerror(status));
650 			}
651 			worker_end_tcp(s);
652 			return status;
653 		}
654 
655 		if (session_flags(s)->outgoing || session_flags(s)->closing)
656 			return status;
657 
658 		struct worker_ctx *worker = task->ctx->worker;
659 		if (session_flags(s)->throttled &&
660 		    session_tasklist_get_len(s) < worker->tcp_pipeline_max/2) {
661 			/* Start reading again if the session is throttled and
662 			 * the number of outgoing requests is below watermark. */
663 			session_start_read(s);
664 			session_flags(s)->throttled = false;
665 		}
666 	}
667 
668 	return status;
669 }
670 
on_send(uv_udp_send_t * req,int status)671 static void on_send(uv_udp_send_t *req, int status)
672 {
673 	struct qr_task *task = req->data;
674 	uv_handle_t *h = (uv_handle_t *)req->handle;
675 	qr_task_on_send(task, h, status);
676 	qr_task_unref(task);
677 	free(req);
678 }
679 
on_write(uv_write_t * req,int status)680 static void on_write(uv_write_t *req, int status)
681 {
682 	struct qr_task *task = req->data;
683 	uv_handle_t *h = (uv_handle_t *)req->handle;
684 	qr_task_on_send(task, h, status);
685 	qr_task_unref(task);
686 	free(req);
687 }
688 
qr_task_send(struct qr_task * task,struct session * session,const struct sockaddr * addr,knot_pkt_t * pkt)689 static int qr_task_send(struct qr_task *task, struct session *session,
690 			const struct sockaddr *addr, knot_pkt_t *pkt)
691 {
692 	if (!session)
693 		return qr_task_on_send(task, NULL, kr_error(EIO));
694 
695 	int ret = 0;
696 	struct request_ctx *ctx = task->ctx;
697 
698 	uv_handle_t *handle = session_get_handle(session);
699 	if (kr_fails_assert(handle && handle->data == session))
700 		return qr_task_on_send(task, NULL, kr_error(EINVAL));
701 	const bool is_stream = handle->type == UV_TCP;
702 	if (!is_stream && handle->type != UV_UDP) abort();
703 
704 	if (addr == NULL)
705 		addr = session_get_peer(session);
706 
707 	if (pkt == NULL)
708 		pkt = worker_task_get_pktbuf(task);
709 
710 	if (session_flags(session)->outgoing && handle->type == UV_TCP) {
711 		size_t try_limit = session_tasklist_get_len(session) + 1;
712 		uint16_t msg_id = knot_wire_get_id(pkt->wire);
713 		size_t try_count = 0;
714 		while (session_tasklist_find_msgid(session, msg_id) &&
715 		       try_count <= try_limit) {
716 			++msg_id;
717 			++try_count;
718 		}
719 		if (try_count > try_limit)
720 			return kr_error(ENOENT);
721 		worker_task_pkt_set_msgid(task, msg_id);
722 	}
723 
724 	uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
725 	if (!ioreq)
726 		return qr_task_on_send(task, handle, kr_error(ENOMEM));
727 
728 	/* Pending ioreq on current task */
729 	qr_task_ref(task);
730 
731 	struct worker_ctx *worker = ctx->worker;
732 	/* Note time for upstream RTT */
733 	task->send_time = kr_now();
734 	task->recv_time = 0; // task structure is being reused so we have to zero this out here
735 	/* Send using given protocol */
736 	if (kr_fails_assert(!session_flags(session)->closing))
737 		return qr_task_on_send(task, NULL, kr_error(EIO));
738 	if (session_flags(session)->has_http) {
739 #if ENABLE_DOH2
740 		uv_write_t *write_req = (uv_write_t *)ioreq;
741 		write_req->data = task;
742 		ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
743 #else
744 		ret = kr_error(ENOPROTOOPT);
745 #endif
746 	} else if (session_flags(session)->has_tls) {
747 		uv_write_t *write_req = (uv_write_t *)ioreq;
748 		write_req->data = task;
749 		ret = tls_write(write_req, handle, pkt, &on_write);
750 	} else if (handle->type == UV_UDP) {
751 		uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
752 		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
753 		send_req->data = task;
754 		ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
755 	} else if (handle->type == UV_TCP) {
756 		uv_write_t *write_req = (uv_write_t *)ioreq;
757 		/* We need to write message length in native byte order,
758 		 * but we don't have a convenient place to store those bytes.
759 		 * The problem is that all memory referenced from buf[] MUST retain
760 		 * its contents at least until on_write() is called, and I currently
761 		 * can't see any convenient place outside the `pkt` structure.
762 		 * So we use directly the *individual* bytes in pkt->size.
763 		 * The call to htonl() and the condition will probably be inlinable. */
764 		int lsbi, slsbi; /* (second) least significant byte index */
765 		if (htonl(1) == 1) { /* big endian */
766 			lsbi  = sizeof(pkt->size) - 1;
767 			slsbi = sizeof(pkt->size) - 2;
768 		} else {
769 			lsbi  = 0;
770 			slsbi = 1;
771 		}
772 		uv_buf_t buf[3] = {
773 			{ (char *)&pkt->size + slsbi, 1 },
774 			{ (char *)&pkt->size + lsbi,  1 },
775 			{ (char *)pkt->wire, pkt->size },
776 		};
777 		write_req->data = task;
778 		ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
779 	} else {
780 		kr_assert(false);
781 	}
782 
783 	if (ret == 0) {
784 		session_touch(session);
785 		if (session_flags(session)->outgoing) {
786 			session_tasklist_add(session, task);
787 		}
788 		if (worker->too_many_open &&
789 		    worker->stats.rconcurrent <
790 			worker->rconcurrent_highwatermark - 10) {
791 			worker->too_many_open = false;
792 		}
793 	} else {
794 		free(ioreq);
795 		qr_task_unref(task);
796 		if (ret == UV_EMFILE) {
797 			worker->too_many_open = true;
798 			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
799 			ret = kr_error(UV_EMFILE);
800 		}
801 
802 		if (session_flags(session)->has_http)
803 			worker->stats.err_http += 1;
804 		else if (session_flags(session)->has_tls)
805 			worker->stats.err_tls += 1;
806 		else if (handle->type == UV_UDP)
807 			worker->stats.err_udp += 1;
808 		else
809 			worker->stats.err_tcp += 1;
810 	}
811 
812 	/* Update outgoing query statistics */
813 	if (session_flags(session)->outgoing && addr) {
814 		if (session_flags(session)->has_tls)
815 			worker->stats.tls += 1;
816 		else if (handle->type == UV_UDP)
817 			worker->stats.udp += 1;
818 		else
819 			worker->stats.tcp += 1;
820 
821 		if (addr->sa_family == AF_INET6)
822 			worker->stats.ipv6 += 1;
823 		else if (addr->sa_family == AF_INET)
824 			worker->stats.ipv4 += 1;
825 	}
826 	return ret;
827 }
828 
task_get_last_pending_query(struct qr_task * task)829 static struct kr_query *task_get_last_pending_query(struct qr_task *task)
830 {
831 	if (!task || task->ctx->req.rplan.pending.len == 0) {
832 		return NULL;
833 	}
834 
835 	return array_tail(task->ctx->req.rplan.pending);
836 }
837 
session_tls_hs_cb(struct session * session,int status)838 static int session_tls_hs_cb(struct session *session, int status)
839 {
840 	if (kr_fails_assert(session_flags(session)->outgoing))
841 		return kr_error(EINVAL);
842 	struct sockaddr *peer = session_get_peer(session);
843 	int deletion_res = worker_del_tcp_waiting(the_worker, peer);
844 	int ret = kr_ok();
845 
846 	if (status) {
847 		struct qr_task *task = session_waitinglist_get(session);
848 		if (task) {
849 			// TLS handshake failed, report it to server selection
850 			struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
851 			qry->server_selection.error(qry, task->transport, KR_SELECTION_TLS_HANDSHAKE_FAILED);
852 		}
853 #ifndef NDEBUG
854 		else {
855 			/* Task isn't in the list of tasks
856 			 * waiting for connection to upstream.
857 			 * So that it MUST be unsuccessful rehandshake.
858 			 * Check it. */
859 			kr_require(deletion_res != 0);
860 			const char *key = tcpsess_key(peer);
861 			kr_require(key);
862 			kr_require(map_contains(&the_worker->tcp_connected, key) != 0);
863 		}
864 #endif
865 		return ret;
866 	}
867 
868 	/* handshake was completed successfully */
869 	struct tls_client_ctx *tls_client_ctx = session_tls_get_client_ctx(session);
870 	tls_client_param_t *tls_params = tls_client_ctx->params;
871 	gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
872 	if (gnutls_session_is_resumed(tls_session) != 0) {
873 		kr_log_debug(TLSCLIENT, "TLS session has resumed\n");
874 	} else {
875 		kr_log_debug(TLSCLIENT, "TLS session has not resumed\n");
876 		/* session wasn't resumed, delete old session data ... */
877 		if (tls_params->session_data.data != NULL) {
878 			gnutls_free(tls_params->session_data.data);
879 			tls_params->session_data.data = NULL;
880 			tls_params->session_data.size = 0;
881 		}
882 		/* ... and get the new session data */
883 		gnutls_datum_t tls_session_data = { NULL, 0 };
884 		ret = gnutls_session_get_data2(tls_session, &tls_session_data);
885 		if (ret == 0) {
886 			tls_params->session_data = tls_session_data;
887 		}
888 	}
889 
890 	struct session *s = worker_find_tcp_connected(the_worker, peer);
891 	ret = kr_ok();
892 	if (deletion_res == kr_ok()) {
893 		/* peer was in the waiting list, add to the connected list. */
894 		if (s) {
895 			/* Something went wrong,
896 			 * peer already is in the connected list. */
897 			ret = kr_error(EINVAL);
898 		} else {
899 			ret = worker_add_tcp_connected(the_worker, peer, session);
900 		}
901 	} else {
902 		/* peer wasn't in the waiting list.
903 		 * It can be
904 		 * 1) either successful rehandshake; in this case peer
905 		 *    must be already in the connected list.
906 		 * 2) or successful handshake with session, which was timed out
907 		 *    by on_tcp_connect_timeout(); after successful tcp connection;
908 		 *    in this case peer isn't in the connected list.
909 		 **/
910 		if (!s || s != session) {
911 			ret = kr_error(EINVAL);
912 		}
913 	}
914 	if (ret == kr_ok()) {
915 		while (!session_waitinglist_is_empty(session)) {
916 			struct qr_task *t = session_waitinglist_get(session);
917 			ret = qr_task_send(t, session, NULL, NULL);
918 			if (ret != 0) {
919 				break;
920 			}
921 			session_waitinglist_pop(session, true);
922 		}
923 	} else {
924 		ret = kr_error(EINVAL);
925 	}
926 
927 	if (ret != kr_ok()) {
928 		/* Something went wrong.
929 		 * Either addition to the list of connected sessions
930 		 * or write to upstream failed. */
931 		worker_del_tcp_connected(the_worker, peer);
932 		session_waitinglist_finalize(session, KR_STATE_FAIL);
933 		session_tasklist_finalize(session, KR_STATE_FAIL);
934 		session_close(session);
935 	} else {
936 		session_timer_stop(session);
937 		session_timer_start(session, tcp_timeout_trigger,
938 				    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
939 	}
940 	return kr_ok();
941 }
942 
send_waiting(struct session * session)943 static int send_waiting(struct session *session)
944 {
945 	int ret = 0;
946 	while (!session_waitinglist_is_empty(session)) {
947 		struct qr_task *t = session_waitinglist_get(session);
948 		ret = qr_task_send(t, session, NULL, NULL);
949 		if (ret != 0) {
950 			struct worker_ctx *worker = t->ctx->worker;
951 			struct sockaddr *peer = session_get_peer(session);
952 			session_waitinglist_finalize(session, KR_STATE_FAIL);
953 			session_tasklist_finalize(session, KR_STATE_FAIL);
954 			worker_del_tcp_connected(worker, peer);
955 			session_close(session);
956 			break;
957 		}
958 		session_waitinglist_pop(session, true);
959 	}
960 	return ret;
961 }
962 
on_connect(uv_connect_t * req,int status)963 static void on_connect(uv_connect_t *req, int status)
964 {
965 	struct worker_ctx *worker = the_worker;
966 	kr_require(worker);
967 	uv_stream_t *handle = req->handle;
968 	struct session *session = handle->data;
969 	struct sockaddr *peer = session_get_peer(session);
970 	free(req);
971 
972 	if (kr_fails_assert(session_flags(session)->outgoing))
973 		return;
974 
975 	if (session_flags(session)->closing) {
976 		worker_del_tcp_waiting(worker, peer);
977 		kr_assert(session_is_empty(session));
978 		return;
979 	}
980 
981 	const bool log_debug = kr_log_is_debug(WORKER, NULL);
982 
983 	/* Check if the connection is in the waiting list.
984 	 * If no, most likely this is timed out connection
985 	 * which was removed from waiting list by
986 	 * on_tcp_connect_timeout() callback. */
987 	struct session *s = worker_find_tcp_waiting(worker, peer);
988 	if (!s || s != session) {
989 		/* session isn't on the waiting list.
990 		 * it's timed out session. */
991 		if (log_debug) {
992 			const char *peer_str = kr_straddr(peer);
993 			kr_log_debug(WORKER, "=> connected to '%s', but session "
994 					"is already timed out, close\n",
995 					peer_str ? peer_str : "");
996 		}
997 		kr_assert(session_tasklist_is_empty(session));
998 		session_waitinglist_retry(session, false);
999 		session_close(session);
1000 		return;
1001 	}
1002 
1003 	s = worker_find_tcp_connected(worker, peer);
1004 	if (s) {
1005 		/* session already in the connected list.
1006 		 * Something went wrong, it can be due to races when kresd has tried
1007 		 * to reconnect to upstream after unsuccessful attempt. */
1008 		if (log_debug) {
1009 			const char *peer_str = kr_straddr(peer);
1010 			kr_log_debug(WORKER, "=> connected to '%s', but peer "
1011 					"is already connected, close\n",
1012 					peer_str ? peer_str : "");
1013 		}
1014 		kr_assert(session_tasklist_is_empty(session));
1015 		session_waitinglist_retry(session, false);
1016 		session_close(session);
1017 		return;
1018 	}
1019 
1020 	if (status != 0) {
1021 		if (log_debug) {
1022 			const char *peer_str = kr_straddr(peer);
1023 			kr_log_debug(WORKER, "=> connection to '%s' failed (%s), flagged as 'bad'\n",
1024 					peer_str ? peer_str : "", uv_strerror(status));
1025 		}
1026 		worker_del_tcp_waiting(worker, peer);
1027 		struct qr_task *task = session_waitinglist_get(session);
1028 		if (task && status != UV_ETIMEDOUT) {
1029 			/* Penalize upstream.
1030 			* In case of UV_ETIMEDOUT upstream has been
1031 			* already penalized in on_tcp_connect_timeout() */
1032 			struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
1033 			qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
1034 		}
1035 		kr_assert(session_tasklist_is_empty(session));
1036 		session_waitinglist_retry(session, false);
1037 		session_close(session);
1038 		return;
1039 	}
1040 
1041 	if (!session_flags(session)->has_tls) {
1042 		/* if there is a TLS, session still waiting for handshake,
1043 		 * otherwise remove it from waiting list */
1044 		if (worker_del_tcp_waiting(worker, peer) != 0) {
1045 			/* session isn't in list of waiting queries, *
1046 			 * something gone wrong */
1047 			session_waitinglist_finalize(session, KR_STATE_FAIL);
1048 			kr_assert(session_tasklist_is_empty(session));
1049 			session_close(session);
1050 			return;
1051 		}
1052 	}
1053 
1054 	if (log_debug) {
1055 		const char *peer_str = kr_straddr(peer);
1056 		kr_log_debug(WORKER, "=> connected to '%s'\n", peer_str ? peer_str : "");
1057 	}
1058 
1059 	session_flags(session)->connected = true;
1060 	session_start_read(session);
1061 
1062 	int ret = kr_ok();
1063 	if (session_flags(session)->has_tls) {
1064 		struct tls_client_ctx *tls_ctx = session_tls_get_client_ctx(session);
1065 		ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
1066 		if (ret == kr_error(EAGAIN)) {
1067 			session_timer_stop(session);
1068 			session_timer_start(session, tcp_timeout_trigger,
1069 					    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
1070 			return;
1071 		}
1072 	} else {
1073 		worker_add_tcp_connected(worker, peer, session);
1074 	}
1075 
1076 	ret = send_waiting(session);
1077 	if (ret != 0) {
1078 		return;
1079 	}
1080 
1081 	session_timer_stop(session);
1082 	session_timer_start(session, tcp_timeout_trigger,
1083 			    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
1084 }
1085 
on_tcp_connect_timeout(uv_timer_t * timer)1086 static void on_tcp_connect_timeout(uv_timer_t *timer)
1087 {
1088 	struct session *session = timer->data;
1089 
1090 	uv_timer_stop(timer);
1091 	struct worker_ctx *worker = the_worker;
1092 	kr_require(worker);
1093 
1094 	kr_assert(session_tasklist_is_empty(session));
1095 
1096 	struct sockaddr *peer = session_get_peer(session);
1097 	worker_del_tcp_waiting(worker, peer);
1098 
1099 	struct qr_task *task = session_waitinglist_get(session);
1100 	if (!task) {
1101 		/* Normally shouldn't happen. */
1102 		const char *peer_str = kr_straddr(peer);
1103 		VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
1104 			    peer_str ? peer_str : "");
1105 		return;
1106 	}
1107 
1108 	struct kr_query *qry = task_get_last_pending_query(task);
1109 	if (kr_log_is_debug_qry(WORKER, qry)) {
1110 		const char *peer_str = kr_straddr(peer);
1111 		VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
1112 			    peer_str ? peer_str : "");
1113 	}
1114 
1115 	qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
1116 
1117 	worker->stats.timeout += session_waitinglist_get_len(session);
1118 	session_waitinglist_retry(session, true);
1119 	kr_assert(session_tasklist_is_empty(session));
1120 	/* uv_cancel() doesn't support uv_connect_t request,
1121 	 * so that we can't cancel it.
1122 	 * There still exists possibility of successful connection
1123 	 * for this request.
1124 	 * So connection callback (on_connect()) must check
1125 	 * if connection is in the list of waiting connection.
1126 	 * If no, most likely this is timed out connection even if
1127 	 * it was successful. */
1128 }
1129 
1130 /* This is called when I/O timeouts */
on_udp_timeout(uv_timer_t * timer)1131 static void on_udp_timeout(uv_timer_t *timer)
1132 {
1133 	struct session *session = timer->data;
1134 	kr_assert(session_get_handle(session)->data == session);
1135 	kr_assert(session_tasklist_get_len(session) == 1);
1136 	kr_assert(session_waitinglist_is_empty(session));
1137 
1138 	uv_timer_stop(timer);
1139 
1140 	struct qr_task *task = session_tasklist_get_first(session);
1141 	if (!task)
1142 		return;
1143 	struct worker_ctx *worker = task->ctx->worker;
1144 
1145 	if (task->leading && task->pending_count > 0) {
1146 		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
1147 		qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
1148 	}
1149 
1150 	task->timeouts += 1;
1151 	worker->stats.timeout += 1;
1152 	qr_task_step(task, NULL, NULL);
1153 }
1154 
transmit(struct qr_task * task)1155 static uv_handle_t *transmit(struct qr_task *task)
1156 {
1157 	uv_handle_t *ret = NULL;
1158 
1159 	if (task) {
1160 		struct kr_transport* transport = task->transport;
1161 
1162 		struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
1163 
1164 		if (!choice) {
1165 			return ret;
1166 		}
1167 		if (task->pending_count >= MAX_PENDING) {
1168 			return ret;
1169 		}
1170 		/* Checkout answer before sending it */
1171 		struct request_ctx *ctx = task->ctx;
1172 		if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
1173 			return ret;
1174 		}
1175 		ret = ioreq_spawn(ctx->worker, SOCK_DGRAM, choice->sin6_family, false, false);
1176 		if (!ret) {
1177 			return ret;
1178 		}
1179 		struct sockaddr *addr = (struct sockaddr *)choice;
1180 		struct session *session = ret->data;
1181 		struct sockaddr *peer = session_get_peer(session);
1182 		kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
1183 		memcpy(peer, addr, kr_sockaddr_len(addr));
1184 		if (qr_task_send(task, session, (struct sockaddr *)choice,
1185 				 task->pktbuf) != 0) {
1186 			session_close(session);
1187 			ret = NULL;
1188 		} else {
1189 			task->pending[task->pending_count] = session;
1190 			task->pending_count += 1;
1191 			session_start_read(session); /* Start reading answer */
1192 		}
1193 	}
1194 	return ret;
1195 }
1196 
1197 
subreq_finalize(struct qr_task * task,const struct sockaddr * packet_source,knot_pkt_t * pkt)1198 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
1199 {
1200 	if (!task || task->finished) {
1201 		return;
1202 	}
1203 	/* Close pending timer */
1204 	ioreq_kill_pending(task);
1205 	/* Clear from outgoing table. */
1206 	if (!task->leading)
1207 		return;
1208 	char key[SUBREQ_KEY_LEN];
1209 	const int klen = subreq_key(key, task->pktbuf);
1210 	if (klen > 0) {
1211 		void *val_deleted;
1212 		int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
1213 		kr_assert(ret == KNOT_EOK && val_deleted == task);
1214 	}
1215 	/* Notify waiting tasks. */
1216 	struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
1217 	for (size_t i = task->waiting.len; i > 0; i--) {
1218 		struct qr_task *follower = task->waiting.at[i - 1];
1219 		/* Reuse MSGID and 0x20 secret */
1220 		if (follower->ctx->req.rplan.pending.len > 0) {
1221 			struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
1222 			qry->id = leader_qry->id;
1223 			qry->secret = leader_qry->secret;
1224 
1225 			// Note that this transport may not be present in `leader_qry`'s server selection
1226 			follower->transport = task->transport;
1227 			if(follower->transport) {
1228 				follower->transport->deduplicated = true;
1229 			}
1230 			leader_qry->secret = 0; /* Next will be already decoded */
1231 		}
1232 		qr_task_step(follower, packet_source, pkt);
1233 		qr_task_unref(follower);
1234 	}
1235 	task->waiting.len = 0;
1236 	task->leading = false;
1237 }
1238 
subreq_lead(struct qr_task * task)1239 static void subreq_lead(struct qr_task *task)
1240 {
1241 	if (kr_fails_assert(task))
1242 		return;
1243 	char key[SUBREQ_KEY_LEN];
1244 	const int klen = subreq_key(key, task->pktbuf);
1245 	if (klen < 0)
1246 		return;
1247 	struct qr_task **tvp = (struct qr_task **)
1248 		trie_get_ins(task->ctx->worker->subreq_out, key, klen);
1249 	if (unlikely(!tvp))
1250 		return; /*ENOMEM*/
1251 	if (kr_fails_assert(*tvp == NULL))
1252 		return;
1253 	*tvp = task;
1254 	task->leading = true;
1255 }
1256 
subreq_enqueue(struct qr_task * task)1257 static bool subreq_enqueue(struct qr_task *task)
1258 {
1259 	if (kr_fails_assert(task))
1260 		return false;
1261 	char key[SUBREQ_KEY_LEN];
1262 	const int klen = subreq_key(key, task->pktbuf);
1263 	if (klen < 0)
1264 		return false;
1265 	struct qr_task **leader = (struct qr_task **)
1266 		trie_get_try(task->ctx->worker->subreq_out, key, klen);
1267 	if (!leader /*ENOMEM*/ || !*leader)
1268 		return false;
1269 	/* Enqueue itself to leader for this subrequest. */
1270 	int ret = array_push_mm((*leader)->waiting, task,
1271 				kr_memreserve, &(*leader)->ctx->req.pool);
1272 	if (unlikely(ret < 0)) /*ENOMEM*/
1273 		return false;
1274 	qr_task_ref(task);
1275 	return true;
1276 }
1277 
1278 #if ENABLE_XDP
xdp_tx_waker(uv_idle_t * handle)1279 static void xdp_tx_waker(uv_idle_t *handle)
1280 {
1281 	int ret = knot_xdp_send_finish(handle->data);
1282 	if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
1283 		kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
1284 	/* Apparently some drivers need many explicit wake-up calls
1285 	 * even if we push no additional packets (in case they accumulated a lot) */
1286 	if (ret != KNOT_EAGAIN)
1287 		uv_idle_stop(handle);
1288 	knot_xdp_send_prepare(handle->data);
1289 	/* LATER(opt.): it _might_ be better for performance to do these two steps
1290 	 * at different points in time */
1291 }
1292 #endif
1293 /** Send an answer packet over XDP. */
xdp_push(struct qr_task * task,const uv_handle_t * src_handle)1294 static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
1295 {
1296 #if ENABLE_XDP
1297 	struct request_ctx *ctx = task->ctx;
1298 	xdp_handle_data_t *xhd = src_handle->data;
1299 	if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
1300 		return qr_task_on_send(task, src_handle, kr_error(EINVAL));
1301 
1302 	knot_xdp_msg_t msg;
1303 	const struct sockaddr *ip_from = &ctx->source.dst_addr.ip;
1304 	const struct sockaddr *ip_to   = &ctx->source.addr.ip;
1305 	memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
1306 	memcpy(&msg.ip_to,   ip_to,   kr_sockaddr_len(ip_to));
1307 	msg.payload.iov_base = ctx->req.answer->wire;
1308 	msg.payload.iov_len  = ctx->req.answer->size;
1309 
1310 	uint32_t sent;
1311 	int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
1312 	ctx->req.answer->wire = NULL; /* it's been freed */
1313 
1314 	uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
1315 	kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
1316 
1317 	return qr_task_on_send(task, src_handle, ret);
1318 #else
1319 	kr_assert(!EINVAL);
1320 	return kr_error(EINVAL);
1321 #endif
1322 }
1323 
qr_task_finalize(struct qr_task * task,int state)1324 static int qr_task_finalize(struct qr_task *task, int state)
1325 {
1326 	kr_require(task && task->leading == false);
1327 	if (task->finished) {
1328 		return kr_ok();
1329 	}
1330 	struct request_ctx *ctx = task->ctx;
1331 	struct session *source_session = ctx->source.session;
1332 	kr_resolve_finish(&ctx->req, state);
1333 
1334 	task->finished = true;
1335 	if (source_session == NULL) {
1336 		(void) qr_task_on_send(task, NULL, kr_error(EIO));
1337 		return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
1338 	}
1339 
1340 	if (unlikely(ctx->req.answer == NULL)) { /* meant to be dropped */
1341 		(void) qr_task_on_send(task, NULL, kr_ok());
1342 		return kr_ok();
1343 	}
1344 
1345 	if (session_flags(source_session)->closing ||
1346 	    ctx->source.addr.ip.sa_family == AF_UNSPEC)
1347 		return kr_error(EINVAL);
1348 
1349 	/* Reference task as the callback handler can close it */
1350 	qr_task_ref(task);
1351 
1352 	/* Send back answer */
1353 	int ret;
1354 	const uv_handle_t *src_handle = session_get_handle(source_session);
1355 	if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
1356 		       || src_handle->type == UV_POLL)) {
1357 		ret = kr_error(EINVAL);
1358 	} else if (src_handle->type == UV_POLL) {
1359 		ret = xdp_push(task, src_handle);
1360 	} else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
1361 		int fd;
1362 		ret = uv_fileno(src_handle, &fd);
1363 		if (ret == 0)
1364 			udp_queue_push(fd, &ctx->req, task);
1365 		else
1366 			kr_assert(false);
1367 	} else {
1368 		ret = qr_task_send(task, source_session, &ctx->source.addr.ip, ctx->req.answer);
1369 	}
1370 
1371 	if (ret != kr_ok()) {
1372 		(void) qr_task_on_send(task, NULL, kr_error(EIO));
1373 		/* Since source session is erroneous detach all tasks. */
1374 		while (!session_tasklist_is_empty(source_session)) {
1375 			struct qr_task *t = session_tasklist_del_first(source_session, false);
1376 			struct request_ctx *c = t->ctx;
1377 			kr_assert(c->source.session == source_session);
1378 			c->source.session = NULL;
1379 			/* Don't finalize them as there can be other tasks
1380 			 * waiting for answer to this particular task.
1381 			 * (ie. task->leading is true) */
1382 			worker_task_unref(t);
1383 		}
1384 		session_close(source_session);
1385 	}
1386 
1387 	qr_task_unref(task);
1388 
1389 	if (ret != kr_ok() || state != KR_STATE_DONE)
1390 		return kr_error(EIO);
1391 	return kr_ok();
1392 }
1393 
udp_task_step(struct qr_task * task,const struct sockaddr * packet_source,knot_pkt_t * packet)1394 static int udp_task_step(struct qr_task *task,
1395 			 const struct sockaddr *packet_source, knot_pkt_t *packet)
1396 {
1397 	/* If there is already outgoing query, enqueue to it. */
1398 	if (subreq_enqueue(task)) {
1399 		return kr_ok(); /* Will be notified when outgoing query finishes. */
1400 	}
1401 	/* Start transmitting */
1402 	uv_handle_t *handle = transmit(task);
1403 	if (handle == NULL) {
1404 		subreq_finalize(task, packet_source, packet);
1405 		return qr_task_finalize(task, KR_STATE_FAIL);
1406 	}
1407 
1408 	/* Announce and start subrequest.
1409 	 * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
1410 	 */
1411 	subreq_lead(task);
1412 
1413 	return kr_ok();
1414 }
1415 
tcp_task_waiting_connection(struct session * session,struct qr_task * task)1416 static int tcp_task_waiting_connection(struct session *session, struct qr_task *task)
1417 {
1418 	if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing))
1419 		return kr_error(EINVAL);
1420 	/* Add task to the end of list of waiting tasks.
1421 	 * It will be notified in on_connect() or qr_task_on_send(). */
1422 	int ret = session_waitinglist_push(session, task);
1423 	if (ret < 0) {
1424 		return kr_error(EINVAL);
1425 	}
1426 	return kr_ok();
1427 }
1428 
tcp_task_existing_connection(struct session * session,struct qr_task * task)1429 static int tcp_task_existing_connection(struct session *session, struct qr_task *task)
1430 {
1431 	if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing))
1432 		return kr_error(EINVAL);
1433 	struct request_ctx *ctx = task->ctx;
1434 	struct worker_ctx *worker = ctx->worker;
1435 
1436 	/* If there are any unsent queries, send it first. */
1437 	int ret = send_waiting(session);
1438 	if (ret != 0) {
1439 		return kr_error(EINVAL);
1440 	}
1441 
1442 	/* No unsent queries at that point. */
1443 	if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
1444 		/* Too many outstanding queries, answer with SERVFAIL, */
1445 		return kr_error(EINVAL);
1446 	}
1447 
1448 	/* Send query to upstream. */
1449 	ret = qr_task_send(task, session, NULL, NULL);
1450 	if (ret != 0) {
1451 		/* Error, finalize task with SERVFAIL and
1452 		 * close connection to upstream. */
1453 		session_tasklist_finalize(session, KR_STATE_FAIL);
1454 		worker_del_tcp_connected(worker, session_get_peer(session));
1455 		session_close(session);
1456 		return kr_error(EINVAL);
1457 	}
1458 
1459 	return kr_ok();
1460 }
1461 
tcp_task_make_connection(struct qr_task * task,const struct sockaddr * addr)1462 static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr *addr)
1463 {
1464 	struct request_ctx *ctx = task->ctx;
1465 	struct worker_ctx *worker = ctx->worker;
1466 
1467 	/* Check if there must be TLS */
1468 	struct tls_client_ctx *tls_ctx = NULL;
1469 	struct network *net = &worker->engine->net;
1470 	tls_client_param_t *entry = tls_client_param_get(net->tls_client_params, addr);
1471 	if (entry) {
1472 		/* Address is configured to be used with TLS.
1473 		 * We need to allocate auxiliary data structure. */
1474 		tls_ctx = tls_client_ctx_new(entry, worker);
1475 		if (!tls_ctx) {
1476 			return kr_error(EINVAL);
1477 		}
1478 	}
1479 
1480 	uv_connect_t *conn = malloc(sizeof(uv_connect_t));
1481 	if (!conn) {
1482 		tls_client_ctx_free(tls_ctx);
1483 		return kr_error(EINVAL);
1484 	}
1485 	bool has_http = false;
1486 	bool has_tls = (tls_ctx != NULL);
1487 	uv_handle_t *client = ioreq_spawn(worker, SOCK_STREAM, addr->sa_family, has_tls, has_http);
1488 	if (!client) {
1489 		tls_client_ctx_free(tls_ctx);
1490 		free(conn);
1491 		return kr_error(EINVAL);
1492 	}
1493 	struct session *session = client->data;
1494 	if (kr_fails_assert(session_flags(session)->has_tls == has_tls)) {
1495 		tls_client_ctx_free(tls_ctx);
1496 		free(conn);
1497 		return kr_error(EINVAL);
1498 	}
1499 	if (has_tls) {
1500 		tls_client_ctx_set_session(tls_ctx, session);
1501 		session_tls_set_client_ctx(session, tls_ctx);
1502 	}
1503 
1504 	/* Add address to the waiting list.
1505 	 * Now it "is waiting to be connected to." */
1506 	int ret = worker_add_tcp_waiting(worker, addr, session);
1507 	if (ret < 0) {
1508 		free(conn);
1509 		session_close(session);
1510 		return kr_error(EINVAL);
1511 	}
1512 
1513 	conn->data = session;
1514 	/*  Store peer address for the session. */
1515 	struct sockaddr *peer = session_get_peer(session);
1516 	memcpy(peer, addr, kr_sockaddr_len(addr));
1517 
1518 	/*  Start watchdog to catch eventual connection timeout. */
1519 	ret = session_timer_start(session, on_tcp_connect_timeout,
1520 				  KR_CONN_RTT_MAX, 0);
1521 	if (ret != 0) {
1522 		worker_del_tcp_waiting(worker, addr);
1523 		free(conn);
1524 		session_close(session);
1525 		return kr_error(EINVAL);
1526 	}
1527 
1528 	struct kr_query *qry = task_get_last_pending_query(task);
1529 	if (kr_log_is_debug_qry(WORKER, qry)) {
1530 		const char *peer_str = kr_straddr(peer);
1531 		VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str ? peer_str : "");
1532 	}
1533 
1534 	/*  Start connection process to upstream. */
1535 	ret = uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect);
1536 	if (ret != 0) {
1537 		session_timer_stop(session);
1538 		worker_del_tcp_waiting(worker, addr);
1539 		free(conn);
1540 		session_close(session);
1541 		qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
1542 		return kr_error(EAGAIN);
1543 	}
1544 
1545 	/* Add task to the end of list of waiting tasks.
1546 	 * Will be notified either in on_connect() or in qr_task_on_send(). */
1547 	ret = session_waitinglist_push(session, task);
1548 	if (ret < 0) {
1549 		session_timer_stop(session);
1550 		worker_del_tcp_waiting(worker, addr);
1551 		free(conn);
1552 		session_close(session);
1553 		return kr_error(EINVAL);
1554 	}
1555 
1556 	return kr_ok();
1557 }
1558 
tcp_task_step(struct qr_task * task,const struct sockaddr * packet_source,knot_pkt_t * packet)1559 static int tcp_task_step(struct qr_task *task,
1560 			 const struct sockaddr *packet_source, knot_pkt_t *packet)
1561 {
1562 	if (kr_fails_assert(task->pending_count == 0)) {
1563 		subreq_finalize(task, packet_source, packet);
1564 		return qr_task_finalize(task, KR_STATE_FAIL);
1565 	}
1566 
1567 	/* target */
1568 	const struct sockaddr *addr = &task->transport->address.ip;
1569 	if (addr->sa_family == AF_UNSPEC) {
1570 		/* Target isn't defined. Finalize task with SERVFAIL.
1571 		 * Although task->pending_count is zero, there are can be followers,
1572 		 * so we need to call subreq_finalize() to handle them properly. */
1573 		subreq_finalize(task, packet_source, packet);
1574 		return qr_task_finalize(task, KR_STATE_FAIL);
1575 	}
1576 	/* Checkout task before connecting */
1577 	struct request_ctx *ctx = task->ctx;
1578 	if (kr_resolve_checkout(&ctx->req, NULL, task->transport, task->pktbuf) != 0) {
1579 		subreq_finalize(task, packet_source, packet);
1580 		return qr_task_finalize(task, KR_STATE_FAIL);
1581 	}
1582 	int ret;
1583 	struct session* session = NULL;
1584 	if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
1585 		/* Connection is in the list of waiting connections.
1586 		 * It means that connection establishing is coming right now. */
1587 		ret = tcp_task_waiting_connection(session, task);
1588 	} else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
1589 		/* Connection has been already established. */
1590 		ret = tcp_task_existing_connection(session, task);
1591 	} else {
1592 		/* Make connection. */
1593 		ret = tcp_task_make_connection(task, addr);
1594 	}
1595 
1596 	if (ret != kr_ok()) {
1597 		subreq_finalize(task, addr, packet);
1598 		if (ret == kr_error(EAGAIN)) {
1599 			ret = qr_task_step(task, addr, NULL);
1600 		} else {
1601 			ret = qr_task_finalize(task, KR_STATE_FAIL);
1602 		}
1603 	}
1604 
1605 	return ret;
1606 }
1607 
qr_task_step(struct qr_task * task,const struct sockaddr * packet_source,knot_pkt_t * packet)1608 static int qr_task_step(struct qr_task *task,
1609 			const struct sockaddr *packet_source, knot_pkt_t *packet)
1610 {
1611 	/* No more steps after we're finished. */
1612 	if (!task || task->finished) {
1613 		return kr_error(ESTALE);
1614 	}
1615 
1616 	/* Close pending I/O requests */
1617 	subreq_finalize(task, packet_source, packet);
1618 	if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
1619 		return qr_task_finalize(task, KR_STATE_FAIL);
1620 	}
1621 
1622 	/* Consume input and produce next query */
1623 	struct request_ctx *ctx = task->ctx;
1624 	if (kr_fails_assert(ctx))
1625 		return qr_task_finalize(task, KR_STATE_FAIL);
1626 	struct kr_request *req = &ctx->req;
1627 	struct worker_ctx *worker = ctx->worker;
1628 
1629 	if (worker->too_many_open) {
1630 		/* */
1631 		struct kr_rplan *rplan = &req->rplan;
1632 		if (worker->stats.rconcurrent <
1633 			worker->rconcurrent_highwatermark - 10) {
1634 			worker->too_many_open = false;
1635 		} else {
1636 			if (packet && kr_rplan_empty(rplan)) {
1637 				/* new query; TODO - make this detection more obvious */
1638 				kr_resolve_consume(req, &task->transport, packet);
1639 			}
1640 			return qr_task_finalize(task, KR_STATE_FAIL);
1641 		}
1642 	}
1643 
1644 	// Report network RTT back to server selection
1645 	if (packet && task->send_time && task->recv_time) {
1646 		struct kr_query *qry = array_tail(req->rplan.pending);
1647 		qry->server_selection.update_rtt(qry, task->transport, task->recv_time - task->send_time);
1648 	}
1649 
1650 	int state = kr_resolve_consume(req, &task->transport, packet);
1651 
1652 	task->transport = NULL;
1653 	while (state == KR_STATE_PRODUCE) {
1654 		state = kr_resolve_produce(req, &task->transport, task->pktbuf);
1655 		if (unlikely(++task->iter_count > KR_ITER_LIMIT ||
1656 			     task->timeouts >= KR_TIMEOUT_LIMIT)) {
1657 
1658 			struct kr_rplan *rplan = &req->rplan;
1659 			struct kr_query *last = kr_rplan_last(rplan);
1660 			if (task->iter_count > KR_ITER_LIMIT) {
1661 				VERBOSE_MSG(last, "canceling query due to exceeded iteration count limit of %d\n", KR_ITER_LIMIT);
1662 			}
1663 			if (task->timeouts >= KR_TIMEOUT_LIMIT) {
1664 				VERBOSE_MSG(last, "canceling query due to exceeded timeout retries limit of %d\n", KR_TIMEOUT_LIMIT);
1665 			}
1666 
1667 			return qr_task_finalize(task, KR_STATE_FAIL);
1668 		}
1669 	}
1670 
1671 	/* We're done, no more iterations needed */
1672 	if (state & (KR_STATE_DONE|KR_STATE_FAIL)) {
1673 		return qr_task_finalize(task, state);
1674 	} else if (!task->transport || !task->transport->protocol) {
1675 		return qr_task_step(task, NULL, NULL);
1676 	}
1677 
1678 	switch (task->transport->protocol)
1679 	{
1680 	case KR_TRANSPORT_UDP:
1681 		return udp_task_step(task, packet_source, packet);
1682 	case KR_TRANSPORT_TCP: // fall through
1683 	case KR_TRANSPORT_TLS:
1684 		return tcp_task_step(task, packet_source, packet);
1685 	default:
1686 		kr_assert(!EINVAL);
1687 		return kr_error(EINVAL);
1688 	}
1689 }
1690 
parse_packet(knot_pkt_t * query)1691 static int parse_packet(knot_pkt_t *query)
1692 {
1693 	if (!query){
1694 		return kr_error(EINVAL);
1695 	}
1696 
1697 	/* Parse query packet. */
1698 	int ret = knot_pkt_parse(query, 0);
1699 	if (ret == KNOT_ETRAIL) {
1700 		/* Extra data after message end. */
1701 		ret = kr_error(EMSGSIZE);
1702 	} else if (ret != KNOT_EOK) {
1703 		/* Malformed query. */
1704 		ret = kr_error(EPROTO);
1705 	} else {
1706 		ret = kr_ok();
1707 	}
1708 
1709 	return ret;
1710 }
1711 
worker_submit(struct session * session,const struct sockaddr * peer,const struct sockaddr * dst_addr,const uint8_t * eth_from,const uint8_t * eth_to,knot_pkt_t * pkt)1712 int worker_submit(struct session *session,
1713 		  const struct sockaddr *peer, const struct sockaddr *dst_addr,
1714 		  const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
1715 {
1716 	if (!session || !pkt)
1717 		return kr_error(EINVAL);
1718 
1719 	uv_handle_t *handle = session_get_handle(session);
1720 	if (!handle || !handle->loop->data)
1721 		return kr_error(EINVAL);
1722 
1723 	int ret = parse_packet(pkt);
1724 
1725 	const bool is_query = (knot_wire_get_qr(pkt->wire) == 0);
1726 	const bool is_outgoing = session_flags(session)->outgoing;
1727 
1728 	struct http_ctx *http_ctx = NULL;
1729 #if ENABLE_DOH2
1730 	http_ctx = session_http_get_server_ctx(session);
1731 #endif
1732 
1733 	if (!is_outgoing && http_ctx && queue_len(http_ctx->streams) <= 0)
1734 		return kr_error(ENOENT);
1735 
1736 	/* Ignore badly formed queries. */
1737 	if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
1738 	    (is_query == is_outgoing)) {
1739 		if (!is_outgoing) {
1740 			the_worker->stats.dropped += 1;
1741 		#if ENABLE_DOH2
1742 			if (http_ctx) {
1743 				struct http_stream stream = queue_head(http_ctx->streams);
1744 				http_free_headers(stream.headers);
1745 				queue_pop(http_ctx->streams);
1746 			}
1747 		#endif
1748 		}
1749 		return kr_error(EILSEQ);
1750 	}
1751 
1752 	/* Start new task on listening sockets,
1753 	 * or resume if this is subrequest */
1754 	struct qr_task *task = NULL;
1755 	const struct sockaddr *addr = NULL;
1756 	if (!is_outgoing) { /* request from a client */
1757 		struct request_ctx *ctx =
1758 			request_create(the_worker, session, peer, dst_addr,
1759 					eth_from, eth_to, knot_wire_get_id(pkt->wire));
1760 		if (http_ctx)
1761 			queue_pop(http_ctx->streams);
1762 		if (!ctx)
1763 			return kr_error(ENOMEM);
1764 
1765 		ret = request_start(ctx, pkt);
1766 		if (ret != 0) {
1767 			request_free(ctx);
1768 			return kr_error(ENOMEM);
1769 		}
1770 
1771 		task = qr_task_create(ctx);
1772 		if (!task) {
1773 			request_free(ctx);
1774 			return kr_error(ENOMEM);
1775 		}
1776 
1777 		if (handle->type == UV_TCP && qr_task_register(task, session)) {
1778 			return kr_error(ENOMEM);
1779 		}
1780 	} else { /* response from upstream */
1781 		const uint16_t id = knot_wire_get_id(pkt->wire);
1782 		task = session_tasklist_del_msgid(session, id);
1783 		if (task == NULL) {
1784 			VERBOSE_MSG(NULL, "=> ignoring packet with mismatching ID %d\n",
1785 					(int)id);
1786 			return kr_error(ENOENT);
1787 		}
1788 		if (kr_fails_assert(!session_flags(session)->closing))
1789 			return kr_error(EINVAL);
1790 		addr = peer;
1791 		/* Note receive time for RTT calculation */
1792 		task->recv_time = kr_now();
1793 	}
1794 	if (kr_fails_assert(!uv_is_closing(session_get_handle(session))))
1795 		return kr_error(EINVAL);
1796 
1797 	/* Packet was successfully parsed.
1798 	 * Task was created (found). */
1799 	session_touch(session);
1800 
1801 	/* Consume input and produce next message */
1802 	return qr_task_step(task, addr, pkt);
1803 }
1804 
map_add_tcp_session(map_t * map,const struct sockaddr * addr,struct session * session)1805 static int map_add_tcp_session(map_t *map, const struct sockaddr* addr,
1806 			       struct session *session)
1807 {
1808 	if (kr_fails_assert(map && addr))
1809 		return kr_error(EINVAL);
1810 	const char *key = tcpsess_key(addr);
1811 	if (kr_fails_assert(key && map_contains(map, key) == 0))
1812 		return kr_error(EINVAL);
1813 	int ret = map_set(map, key, session);
1814 	return ret ? kr_error(EINVAL) : kr_ok();
1815 }
1816 
map_del_tcp_session(map_t * map,const struct sockaddr * addr)1817 static int map_del_tcp_session(map_t *map, const struct sockaddr* addr)
1818 {
1819 	if (kr_fails_assert(map && addr))
1820 		return kr_error(EINVAL);
1821 	const char *key = tcpsess_key(addr);
1822 	if (kr_fails_assert(key))
1823 		return kr_error(EINVAL);
1824 	int ret = map_del(map, key);
1825 	return ret ? kr_error(ENOENT) : kr_ok();
1826 }
1827 
map_find_tcp_session(map_t * map,const struct sockaddr * addr)1828 static struct session* map_find_tcp_session(map_t *map,
1829 					    const struct sockaddr *addr)
1830 {
1831 	if (kr_fails_assert(map && addr))
1832 		return NULL;
1833 	const char *key = tcpsess_key(addr);
1834 	if (kr_fails_assert(key))
1835 		return NULL;
1836 	struct session* ret = map_get(map, key);
1837 	return ret;
1838 }
1839 
worker_add_tcp_connected(struct worker_ctx * worker,const struct sockaddr * addr,struct session * session)1840 int worker_add_tcp_connected(struct worker_ctx *worker,
1841 				    const struct sockaddr* addr,
1842 				    struct session *session)
1843 {
1844 	return map_add_tcp_session(&worker->tcp_connected, addr, session);
1845 }
1846 
worker_del_tcp_connected(struct worker_ctx * worker,const struct sockaddr * addr)1847 int worker_del_tcp_connected(struct worker_ctx *worker,
1848 				    const struct sockaddr* addr)
1849 {
1850 	return map_del_tcp_session(&worker->tcp_connected, addr);
1851 }
1852 
worker_find_tcp_connected(struct worker_ctx * worker,const struct sockaddr * addr)1853 struct session* worker_find_tcp_connected(struct worker_ctx *worker,
1854 						 const struct sockaddr* addr)
1855 {
1856 	return map_find_tcp_session(&worker->tcp_connected, addr);
1857 }
1858 
worker_add_tcp_waiting(struct worker_ctx * worker,const struct sockaddr * addr,struct session * session)1859 static int worker_add_tcp_waiting(struct worker_ctx *worker,
1860 				  const struct sockaddr* addr,
1861 				  struct session *session)
1862 {
1863 	return map_add_tcp_session(&worker->tcp_waiting, addr, session);
1864 }
1865 
worker_del_tcp_waiting(struct worker_ctx * worker,const struct sockaddr * addr)1866 int worker_del_tcp_waiting(struct worker_ctx *worker,
1867 			   const struct sockaddr* addr)
1868 {
1869 	return map_del_tcp_session(&worker->tcp_waiting, addr);
1870 }
1871 
worker_find_tcp_waiting(struct worker_ctx * worker,const struct sockaddr * addr)1872 struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
1873 					       const struct sockaddr* addr)
1874 {
1875 	return map_find_tcp_session(&worker->tcp_waiting, addr);
1876 }
1877 
worker_end_tcp(struct session * session)1878 int worker_end_tcp(struct session *session)
1879 {
1880 	if (!session)
1881 		return kr_error(EINVAL);
1882 
1883 	session_timer_stop(session);
1884 
1885 	struct sockaddr *peer = session_get_peer(session);
1886 
1887 	worker_del_tcp_waiting(the_worker, peer);
1888 	worker_del_tcp_connected(the_worker, peer);
1889 	session_flags(session)->connected = false;
1890 
1891 	struct tls_client_ctx *tls_client_ctx = session_tls_get_client_ctx(session);
1892 	if (tls_client_ctx) {
1893 		/* Avoid gnutls_bye() call */
1894 		tls_set_hs_state(&tls_client_ctx->c, TLS_HS_NOT_STARTED);
1895 	}
1896 
1897 	struct tls_ctx *tls_ctx = session_tls_get_server_ctx(session);
1898 	if (tls_ctx) {
1899 		/* Avoid gnutls_bye() call */
1900 		tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
1901 	}
1902 
1903 	while (!session_waitinglist_is_empty(session)) {
1904 		struct qr_task *task = session_waitinglist_pop(session, false);
1905 		kr_assert(task->refs > 1);
1906 		session_tasklist_del(session, task);
1907 		if (session_flags(session)->outgoing) {
1908 			if (task->ctx->req.options.FORWARD) {
1909 				/* We are in TCP_FORWARD mode.
1910 				 * To prevent failing at kr_resolve_consume()
1911 				 * qry.flags.TCP must be cleared.
1912 				 * TODO - refactoring is needed. */
1913 					struct kr_request *req = &task->ctx->req;
1914 					struct kr_rplan *rplan = &req->rplan;
1915 					struct kr_query *qry = array_tail(rplan->pending);
1916 					qry->flags.TCP = false;
1917 			}
1918 			qr_task_step(task, NULL, NULL);
1919 		} else {
1920 			kr_assert(task->ctx->source.session == session);
1921 			task->ctx->source.session = NULL;
1922 		}
1923 		worker_task_unref(task);
1924 	}
1925 	while (!session_tasklist_is_empty(session)) {
1926 		struct qr_task *task = session_tasklist_del_first(session, false);
1927 		if (session_flags(session)->outgoing) {
1928 			if (task->ctx->req.options.FORWARD) {
1929 				struct kr_request *req = &task->ctx->req;
1930 				struct kr_rplan *rplan = &req->rplan;
1931 				struct kr_query *qry = array_tail(rplan->pending);
1932 				qry->flags.TCP = false;
1933 			}
1934 			qr_task_step(task, NULL, NULL);
1935 		} else {
1936 			kr_assert(task->ctx->source.session == session);
1937 			task->ctx->source.session = NULL;
1938 		}
1939 		worker_task_unref(task);
1940 	}
1941 	session_close(session);
1942 	return kr_ok();
1943 }
1944 
worker_resolve_mk_pkt_dname(knot_dname_t * qname,uint16_t qtype,uint16_t qclass,const struct kr_qflags * options)1945 knot_pkt_t *worker_resolve_mk_pkt_dname(knot_dname_t *qname, uint16_t qtype, uint16_t qclass,
1946 				   const struct kr_qflags *options)
1947 {
1948 	knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL);
1949 	if (!pkt)
1950 		return NULL;
1951 	knot_pkt_put_question(pkt, qname, qclass, qtype);
1952 	knot_wire_set_rd(pkt->wire);
1953 	knot_wire_set_ad(pkt->wire);
1954 
1955 	/* Add OPT RR, including wire format so modules can see both representations.
1956 	 * knot_pkt_put() copies the outside; we need to duplicate the inside manually. */
1957 	knot_rrset_t *opt = knot_rrset_copy(the_worker->engine->resolver.downstream_opt_rr, NULL);
1958 	if (!opt) {
1959 		knot_pkt_free(pkt);
1960 		return NULL;
1961 	}
1962 	if (options->DNSSEC_WANT) {
1963 		knot_edns_set_do(opt);
1964 	}
1965 	knot_pkt_begin(pkt, KNOT_ADDITIONAL);
1966 	int ret = knot_pkt_put(pkt, KNOT_COMPR_HINT_NONE, opt, KNOT_PF_FREE);
1967 	if (ret == KNOT_EOK) {
1968 		free(opt); /* inside is owned by pkt now */
1969 	} else {
1970 		knot_rrset_free(opt, NULL);
1971 		knot_pkt_free(pkt);
1972 		return NULL;
1973 	}
1974 
1975 	if (options->DNSSEC_CD) {
1976 		knot_wire_set_cd(pkt->wire);
1977 	}
1978 
1979 	return pkt;
1980 }
1981 
worker_resolve_mk_pkt(const char * qname_str,uint16_t qtype,uint16_t qclass,const struct kr_qflags * options)1982 knot_pkt_t *worker_resolve_mk_pkt(const char *qname_str, uint16_t qtype, uint16_t qclass,
1983 				   const struct kr_qflags *options)
1984 {
1985 	uint8_t qname[KNOT_DNAME_MAXLEN];
1986 	if (!knot_dname_from_str(qname, qname_str, sizeof(qname)))
1987 		return NULL;
1988 	return worker_resolve_mk_pkt_dname(qname, qtype, qclass, options);
1989 }
1990 
worker_resolve_start(knot_pkt_t * query,struct kr_qflags options)1991 struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
1992 {
1993 	struct worker_ctx *worker = the_worker;
1994 	if (kr_fails_assert(worker && query))
1995 		return NULL;
1996 
1997 
1998 	struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL, NULL,
1999 						 worker->next_request_uid);
2000 	if (!ctx)
2001 		return NULL;
2002 
2003 	/* Create task */
2004 	struct qr_task *task = qr_task_create(ctx);
2005 	if (!task) {
2006 		request_free(ctx);
2007 		return NULL;
2008 	}
2009 
2010 	/* Start task */
2011 	int ret = request_start(ctx, query);
2012 	if (ret != 0) {
2013 		/* task is attached to request context,
2014 		 * so dereference (and deallocate) it first */
2015 		ctx->task = NULL;
2016 		qr_task_unref(task);
2017 		request_free(ctx);
2018 		return NULL;
2019 	}
2020 
2021 	worker->next_request_uid += 1;
2022 	if (worker->next_request_uid == 0)
2023 		worker->next_request_uid = UINT16_MAX + 1;
2024 
2025 	/* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
2026 	kr_qflags_set(&task->ctx->req.options, options);
2027 	return task;
2028 }
2029 
worker_resolve_exec(struct qr_task * task,knot_pkt_t * query)2030 int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
2031 {
2032 	if (!task)
2033 		return kr_error(EINVAL);
2034 	return qr_task_step(task, NULL, query);
2035 }
2036 
worker_task_numrefs(const struct qr_task * task)2037 int worker_task_numrefs(const struct qr_task *task)
2038 {
2039 	return task->refs;
2040 }
2041 
worker_task_request(struct qr_task * task)2042 struct kr_request *worker_task_request(struct qr_task *task)
2043 {
2044 	if (!task || !task->ctx)
2045 		return NULL;
2046 
2047 	return &task->ctx->req;
2048 }
2049 
worker_task_finalize(struct qr_task * task,int state)2050 int worker_task_finalize(struct qr_task *task, int state)
2051 {
2052 	return qr_task_finalize(task, state);
2053 }
2054 
worker_task_step(struct qr_task * task,const struct sockaddr * packet_source,knot_pkt_t * packet)2055  int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
2056 		      knot_pkt_t *packet)
2057  {
2058 	 return qr_task_step(task, packet_source, packet);
2059  }
2060 
worker_task_complete(struct qr_task * task)2061 void worker_task_complete(struct qr_task *task)
2062 {
2063 	qr_task_complete(task);
2064 }
2065 
worker_task_ref(struct qr_task * task)2066 void worker_task_ref(struct qr_task *task)
2067 {
2068 	qr_task_ref(task);
2069 }
2070 
worker_task_unref(struct qr_task * task)2071 void worker_task_unref(struct qr_task *task)
2072 {
2073 	qr_task_unref(task);
2074 }
2075 
worker_task_timeout_inc(struct qr_task * task)2076 void worker_task_timeout_inc(struct qr_task *task)
2077 {
2078 	task->timeouts += 1;
2079 }
2080 
worker_task_get_pktbuf(const struct qr_task * task)2081 knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
2082 {
2083 	return task->pktbuf;
2084 }
2085 
worker_task_get_request(struct qr_task * task)2086 struct request_ctx *worker_task_get_request(struct qr_task *task)
2087 {
2088 	return task->ctx;
2089 }
2090 
worker_request_get_source_session(const struct kr_request * req)2091 struct session *worker_request_get_source_session(const struct kr_request *req)
2092 {
2093 	static_assert(offsetof(struct request_ctx, req) == 0,
2094 			"Bad struct request_ctx definition.");
2095 	return ((struct request_ctx *)req)->source.session;
2096 }
2097 
worker_task_pkt_get_msgid(struct qr_task * task)2098 uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
2099 {
2100 	knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
2101 	uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
2102 	return msg_id;
2103 }
2104 
worker_task_pkt_set_msgid(struct qr_task * task,uint16_t msgid)2105 void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
2106 {
2107 	knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
2108 	knot_wire_set_id(pktbuf->wire, msgid);
2109 	struct kr_query *q = task_get_last_pending_query(task);
2110 	q->id = msgid;
2111 }
2112 
worker_task_creation_time(struct qr_task * task)2113 uint64_t worker_task_creation_time(struct qr_task *task)
2114 {
2115 	return task->creation_time;
2116 }
2117 
worker_task_subreq_finalize(struct qr_task * task)2118 void worker_task_subreq_finalize(struct qr_task *task)
2119 {
2120 	subreq_finalize(task, NULL, NULL);
2121 }
2122 
worker_task_finished(struct qr_task * task)2123 bool worker_task_finished(struct qr_task *task)
2124 {
2125 	return task->finished;
2126 }
2127 
2128 /** Reserve worker buffers.  We assume worker's been zeroed. */
worker_reserve(struct worker_ctx * worker,size_t ring_maxlen)2129 static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
2130 {
2131 	worker->tcp_connected = map_make(NULL);
2132 	worker->tcp_waiting = map_make(NULL);
2133 	worker->subreq_out = trie_create(NULL);
2134 
2135 	array_init(worker->pool_mp);
2136 	if (array_reserve(worker->pool_mp, ring_maxlen)) {
2137 		return kr_error(ENOMEM);
2138 	}
2139 
2140 	mm_ctx_mempool(&worker->pkt_pool, 4 * sizeof(knot_pkt_t));
2141 
2142 	return kr_ok();
2143 }
2144 
reclaim_mp_freelist(mp_freelist_t * list)2145 static inline void reclaim_mp_freelist(mp_freelist_t *list)
2146 {
2147 	for (unsigned i = 0; i < list->len; ++i) {
2148 		struct mempool *e = list->at[i];
2149 		kr_asan_unpoison(e, sizeof(*e));
2150 		mp_delete(e);
2151 	}
2152 	array_clear(*list);
2153 }
2154 
worker_deinit(void)2155 void worker_deinit(void)
2156 {
2157 	struct worker_ctx *worker = the_worker;
2158 	if (kr_fails_assert(worker))
2159 		return;
2160 	if (worker->z_import != NULL) {
2161 		zi_free(worker->z_import);
2162 		worker->z_import = NULL;
2163 	}
2164 	map_clear(&worker->tcp_connected);
2165 	map_clear(&worker->tcp_waiting);
2166 	trie_free(worker->subreq_out);
2167 	worker->subreq_out = NULL;
2168 
2169 	for (int i = 0; i < worker->doh_qry_headers.len; i++)
2170 		free((void *)worker->doh_qry_headers.at[i]);
2171 	array_clear(worker->doh_qry_headers);
2172 
2173 	reclaim_mp_freelist(&worker->pool_mp);
2174 	mp_delete(worker->pkt_pool.ctx);
2175 	worker->pkt_pool.ctx = NULL;
2176 
2177 	the_worker = NULL;
2178 }
2179 
worker_init(struct engine * engine,int worker_count)2180 int worker_init(struct engine *engine, int worker_count)
2181 {
2182 	if (kr_fails_assert(engine && engine->L && the_worker == NULL))
2183 		return kr_error(EINVAL);
2184 	kr_bindings_register(engine->L);
2185 
2186 	/* Create main worker. */
2187 	struct worker_ctx *worker = &the_worker_value;
2188 	memset(worker, 0, sizeof(*worker));
2189 	worker->engine = engine;
2190 
2191 	uv_loop_t *loop = uv_default_loop();
2192 	worker->loop = loop;
2193 
2194 	worker->count = worker_count;
2195 
2196 	/* Register table for worker per-request variables */
2197 	lua_newtable(engine->L);
2198 	lua_setfield(engine->L, -2, "vars");
2199 	lua_getfield(engine->L, -1, "vars");
2200 	worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
2201 	lua_pop(engine->L, 1);
2202 
2203 	worker->tcp_pipeline_max = MAX_PIPELINED;
2204 	worker->out_addr4.sin_family = AF_UNSPEC;
2205 	worker->out_addr6.sin6_family = AF_UNSPEC;
2206 
2207 	array_init(worker->doh_qry_headers);
2208 
2209 	int ret = worker_reserve(worker, MP_FREELIST_SIZE);
2210 	if (ret) return ret;
2211 	worker->next_request_uid = UINT16_MAX + 1;
2212 
2213 	/* Set some worker.* fields in Lua */
2214 	lua_getglobal(engine->L, "worker");
2215 	pid_t pid = getpid();
2216 
2217 	auto_free char *pid_str = NULL;
2218 	const char *inst_name = getenv("SYSTEMD_INSTANCE");
2219 	if (inst_name) {
2220 		lua_pushstring(engine->L, inst_name);
2221 	} else {
2222 		ret = asprintf(&pid_str, "%ld", (long)pid);
2223 		kr_assert(ret > 0);
2224 		lua_pushstring(engine->L, pid_str);
2225 	}
2226 	lua_setfield(engine->L, -2, "id");
2227 
2228 	lua_pushnumber(engine->L, pid);
2229 	lua_setfield(engine->L, -2, "pid");
2230 	lua_pushnumber(engine->L, worker_count);
2231 	lua_setfield(engine->L, -2, "count");
2232 
2233 	char cwd[PATH_MAX];
2234 	get_workdir(cwd, sizeof(cwd));
2235 	lua_pushstring(engine->L, cwd);
2236 	lua_setfield(engine->L, -2, "cwd");
2237 
2238 	the_worker = worker;
2239 	loop->data = the_worker;
2240 	/* ^^^^ Now this shouldn't be used anymore, but it's hard to be 100% sure. */
2241 	return kr_ok();
2242 }
2243 
2244 #undef VERBOSE_MSG
2245