1 /*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37 /**
38 * \file
39 *
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
42 */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80 /** DTIO command channel stop */
81 DTIO_COMMAND_STOP = 0,
82 /** DTIO command channel wakeup */
83 DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102
103 struct dt_msg_queue*
dt_msg_queue_create(struct comm_base * base)104 dt_msg_queue_create(struct comm_base* base)
105 {
106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 if(!mq) return NULL;
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 about 1 M should contain 64K messages with some overhead,
110 or a whole bunch smaller ones */
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 if(!mq->wakeup_timer) {
113 free(mq);
114 return NULL;
115 }
116 lock_basic_init(&mq->lock);
117 lock_protect(&mq->lock, mq, sizeof(*mq));
118 return mq;
119 }
120
121 /** clear the message list, caller must hold the lock */
122 static void
dt_msg_queue_clear(struct dt_msg_queue * mq)123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125 struct dt_msg_entry* e = mq->first, *next=NULL;
126 while(e) {
127 next = e->next;
128 free(e->buf);
129 free(e);
130 e = next;
131 }
132 mq->first = NULL;
133 mq->last = NULL;
134 mq->cursize = 0;
135 mq->msgcount = 0;
136 }
137
138 void
dt_msg_queue_delete(struct dt_msg_queue * mq)139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141 if(!mq) return;
142 lock_basic_destroy(&mq->lock);
143 dt_msg_queue_clear(mq);
144 comm_timer_delete(mq->wakeup_timer);
145 free(mq);
146 }
147
148 /** make the dtio wake up by sending a wakeup command */
dtio_wakeup(struct dt_io_thread * dtio)149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151 uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 if(!dtio) return;
153 if(!dtio->started) return;
154
155 while(1) {
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 if(r == -1) {
158 #ifndef USE_WINSOCK
159 if(errno == EINTR || errno == EAGAIN)
160 continue;
161 #else
162 if(WSAGetLastError() == WSAEINPROGRESS)
163 continue;
164 if(WSAGetLastError() == WSAEWOULDBLOCK)
165 continue;
166 #endif
167 log_err("dnstap io wakeup: write: %s",
168 sock_strerror(errno));
169 break;
170 }
171 break;
172 }
173 }
174
175 void
mq_wakeup_cb(void * arg)176 mq_wakeup_cb(void* arg)
177 {
178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 /* even if the dtio is already active, because perhaps much
180 * traffic suddenly, we leave the timer running to save on
181 * managing it, the once a second timer is less work then
182 * starting and stopping the timer frequently */
183 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 mq->dtio->wakeup_timer_enabled = 0;
185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 dtio_wakeup(mq->dtio);
187 }
188
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
dt_msg_queue_start_timer(struct dt_msg_queue * mq)191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192 {
193 struct timeval tv;
194 /* Start a timer to process messages to be logged.
195 * If we woke up the dtio thread for every message, the wakeup
196 * messages take up too much processing power. If the queue
197 * fills up the wakeup happens immediately. The timer wakes it up
198 * if there are infrequent messages to log. */
199
200 /* we cannot start a timer in dtio thread, because it is a different
201 * thread and its event base is in use by the other thread, it would
202 * give race conditions if we tried to modify its event base,
203 * and locks would wait until it woke up, and this is what we do. */
204
205 /* do not start the timer if a timer already exists, perhaps
206 * in another worker. So this variable is protected by a lock in
207 * dtio */
208 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 if(mq->dtio->wakeup_timer_enabled) {
210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211 return;
212 }
213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215
216 /* start the timer, in mq, in the event base of our worker */
217 tv.tv_sec = 1;
218 tv.tv_usec = 0;
219 comm_timer_set(mq->wakeup_timer, &tv);
220 }
221
222 void
dt_msg_queue_submit(struct dt_msg_queue * mq,void * buf,size_t len)223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224 {
225 int wakeupnow = 0, wakeupstarttimer = 0;
226 struct dt_msg_entry* entry;
227
228 /* check conditions */
229 if(!buf) return;
230 if(len == 0) {
231 /* it is not possible to log entries with zero length,
232 * because the framestream protocol does not carry it.
233 * However the protobuf serialization does not create zero
234 * length datagrams for dnstap, so this should not happen. */
235 free(buf);
236 return;
237 }
238 if(!mq) {
239 free(buf);
240 return;
241 }
242
243 /* allocate memory for queue entry */
244 entry = malloc(sizeof(*entry));
245 if(!entry) {
246 log_err("out of memory logging dnstap");
247 free(buf);
248 return;
249 }
250 entry->next = NULL;
251 entry->buf = buf;
252 entry->len = len;
253
254 /* acquire lock */
255 lock_basic_lock(&mq->lock);
256 /* if list was empty, start timer for (eventual) wakeup */
257 if(mq->first == NULL)
258 wakeupstarttimer = 1;
259 /* if list contains more than wakeupnum elements, wakeup now,
260 * or if list is (going to be) almost full */
261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262 (mq->cursize < mq->maxsize * 9 / 10 &&
263 mq->cursize+len >= mq->maxsize * 9 / 10))
264 wakeupnow = 1;
265 /* see if it is going to fit */
266 if(mq->cursize + len > mq->maxsize) {
267 /* buffer full, or congested. */
268 /* drop */
269 lock_basic_unlock(&mq->lock);
270 free(buf);
271 free(entry);
272 return;
273 }
274 mq->cursize += len;
275 mq->msgcount ++;
276 /* append to list */
277 if(mq->last) {
278 mq->last->next = entry;
279 } else {
280 mq->first = entry;
281 }
282 mq->last = entry;
283 /* release lock */
284 lock_basic_unlock(&mq->lock);
285
286 if(wakeupnow) {
287 dtio_wakeup(mq->dtio);
288 } else if(wakeupstarttimer) {
289 dt_msg_queue_start_timer(mq);
290 }
291 }
292
dt_io_thread_create(void)293 struct dt_io_thread* dt_io_thread_create(void)
294 {
295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 lock_basic_init(&dtio->wakeup_timer_lock);
297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 sizeof(dtio->wakeup_timer_enabled));
299 return dtio;
300 }
301
dt_io_thread_delete(struct dt_io_thread * dtio)302 void dt_io_thread_delete(struct dt_io_thread* dtio)
303 {
304 struct dt_io_list_item* item, *nextitem;
305 if(!dtio) return;
306 lock_basic_destroy(&dtio->wakeup_timer_lock);
307 item=dtio->io_list;
308 while(item) {
309 nextitem = item->next;
310 free(item);
311 item = nextitem;
312 }
313 free(dtio->socket_path);
314 free(dtio->ip_str);
315 free(dtio->tls_server_name);
316 free(dtio->client_key_file);
317 free(dtio->client_cert_file);
318 if(dtio->ssl_ctx) {
319 #ifdef HAVE_SSL
320 SSL_CTX_free(dtio->ssl_ctx);
321 #endif
322 }
323 free(dtio);
324 }
325
dt_io_thread_apply_cfg(struct dt_io_thread * dtio,struct config_file * cfg)326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327 {
328 if(!cfg->dnstap) {
329 log_warn("cannot setup dnstap because dnstap-enable is no");
330 return 0;
331 }
332
333 /* what type of connectivity do we have */
334 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335 if(cfg->dnstap_tls)
336 dtio->upstream_is_tls = 1;
337 else dtio->upstream_is_tcp = 1;
338 } else {
339 dtio->upstream_is_unix = 1;
340 }
341 dtio->is_bidirectional = cfg->dnstap_bidirectional;
342
343 if(dtio->upstream_is_unix) {
344 char* nm;
345 if(!cfg->dnstap_socket_path ||
346 cfg->dnstap_socket_path[0]==0) {
347 log_err("dnstap setup: no dnstap-socket-path for "
348 "socket connect");
349 return 0;
350 }
351 nm = cfg->dnstap_socket_path;
352 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
353 cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
354 nm += strlen(cfg->chrootdir);
355 free(dtio->socket_path);
356 dtio->socket_path = strdup(nm);
357 if(!dtio->socket_path) {
358 log_err("dnstap setup: malloc failure");
359 return 0;
360 }
361 }
362
363 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
364 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
365 log_err("dnstap setup: no dnstap-ip for TCP connect");
366 return 0;
367 }
368 free(dtio->ip_str);
369 dtio->ip_str = strdup(cfg->dnstap_ip);
370 if(!dtio->ip_str) {
371 log_err("dnstap setup: malloc failure");
372 return 0;
373 }
374 }
375
376 if(dtio->upstream_is_tls) {
377 #ifdef HAVE_SSL
378 if(cfg->dnstap_tls_server_name &&
379 cfg->dnstap_tls_server_name[0]) {
380 free(dtio->tls_server_name);
381 dtio->tls_server_name = strdup(
382 cfg->dnstap_tls_server_name);
383 if(!dtio->tls_server_name) {
384 log_err("dnstap setup: malloc failure");
385 return 0;
386 }
387 if(!check_auth_name_for_ssl(dtio->tls_server_name))
388 return 0;
389 }
390 if(cfg->dnstap_tls_client_key_file &&
391 cfg->dnstap_tls_client_key_file[0]) {
392 dtio->use_client_certs = 1;
393 free(dtio->client_key_file);
394 dtio->client_key_file = strdup(
395 cfg->dnstap_tls_client_key_file);
396 if(!dtio->client_key_file) {
397 log_err("dnstap setup: malloc failure");
398 return 0;
399 }
400 if(!cfg->dnstap_tls_client_cert_file ||
401 cfg->dnstap_tls_client_cert_file[0]==0) {
402 log_err("dnstap setup: client key "
403 "authentication enabled with "
404 "dnstap-tls-client-key-file, but "
405 "no dnstap-tls-client-cert-file "
406 "is given");
407 return 0;
408 }
409 free(dtio->client_cert_file);
410 dtio->client_cert_file = strdup(
411 cfg->dnstap_tls_client_cert_file);
412 if(!dtio->client_cert_file) {
413 log_err("dnstap setup: malloc failure");
414 return 0;
415 }
416 } else {
417 dtio->use_client_certs = 0;
418 dtio->client_key_file = NULL;
419 dtio->client_cert_file = NULL;
420 }
421
422 if(cfg->dnstap_tls_cert_bundle) {
423 dtio->ssl_ctx = connect_sslctx_create(
424 dtio->client_key_file,
425 dtio->client_cert_file,
426 cfg->dnstap_tls_cert_bundle, 0);
427 } else {
428 dtio->ssl_ctx = connect_sslctx_create(
429 dtio->client_key_file,
430 dtio->client_cert_file,
431 cfg->tls_cert_bundle, cfg->tls_win_cert);
432 }
433 if(!dtio->ssl_ctx) {
434 log_err("could not setup SSL CTX");
435 return 0;
436 }
437 dtio->tls_use_sni = cfg->tls_use_sni;
438 #endif /* HAVE_SSL */
439 }
440 return 1;
441 }
442
dt_io_thread_register_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)443 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
444 struct dt_msg_queue* mq)
445 {
446 struct dt_io_list_item* item = malloc(sizeof(*item));
447 if(!item) return 0;
448 lock_basic_lock(&mq->lock);
449 mq->dtio = dtio;
450 lock_basic_unlock(&mq->lock);
451 item->queue = mq;
452 item->next = dtio->io_list;
453 dtio->io_list = item;
454 dtio->io_list_iter = NULL;
455 return 1;
456 }
457
dt_io_thread_unregister_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)458 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
459 struct dt_msg_queue* mq)
460 {
461 struct dt_io_list_item* item, *prev=NULL;
462 if(!dtio) return;
463 item = dtio->io_list;
464 while(item) {
465 if(item->queue == mq) {
466 /* found it */
467 if(prev) prev->next = item->next;
468 else dtio->io_list = item->next;
469 /* the queue itself only registered, not deleted */
470 lock_basic_lock(&item->queue->lock);
471 item->queue->dtio = NULL;
472 lock_basic_unlock(&item->queue->lock);
473 free(item);
474 dtio->io_list_iter = NULL;
475 return;
476 }
477 prev = item;
478 item = item->next;
479 }
480 }
481
482 /** pick a message from the queue, the routine locks and unlocks,
483 * returns true if there is a message */
dt_msg_queue_pop(struct dt_msg_queue * mq,void ** buf,size_t * len)484 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
485 size_t* len)
486 {
487 lock_basic_lock(&mq->lock);
488 if(mq->first) {
489 struct dt_msg_entry* entry = mq->first;
490 mq->first = entry->next;
491 if(!entry->next) mq->last = NULL;
492 mq->cursize -= entry->len;
493 mq->msgcount --;
494 lock_basic_unlock(&mq->lock);
495
496 *buf = entry->buf;
497 *len = entry->len;
498 free(entry);
499 return 1;
500 }
501 lock_basic_unlock(&mq->lock);
502 return 0;
503 }
504
505 /** find message in queue, false if no message, true if message to send */
dtio_find_in_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)506 static int dtio_find_in_queue(struct dt_io_thread* dtio,
507 struct dt_msg_queue* mq)
508 {
509 void* buf=NULL;
510 size_t len=0;
511 if(dt_msg_queue_pop(mq, &buf, &len)) {
512 dtio->cur_msg = buf;
513 dtio->cur_msg_len = len;
514 dtio->cur_msg_done = 0;
515 dtio->cur_msg_len_done = 0;
516 return 1;
517 }
518 return 0;
519 }
520
521 /** find a new message to write, search message queues, false if none */
dtio_find_msg(struct dt_io_thread * dtio)522 static int dtio_find_msg(struct dt_io_thread* dtio)
523 {
524 struct dt_io_list_item *spot, *item;
525
526 spot = dtio->io_list_iter;
527 /* use the next queue for the next message lookup,
528 * if we hit the end(NULL) the NULL restarts the iter at start. */
529 if(spot)
530 dtio->io_list_iter = spot->next;
531 else if(dtio->io_list)
532 dtio->io_list_iter = dtio->io_list->next;
533
534 /* scan from spot to end-of-io_list */
535 item = spot;
536 while(item) {
537 if(dtio_find_in_queue(dtio, item->queue))
538 return 1;
539 item = item->next;
540 }
541 /* scan starting at the start-of-list (to wrap around the end) */
542 item = dtio->io_list;
543 while(item) {
544 if(dtio_find_in_queue(dtio, item->queue))
545 return 1;
546 item = item->next;
547 }
548 return 0;
549 }
550
551 /** callback for the dnstap reconnect, to start reconnecting to output */
dtio_reconnect_timeout_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)552 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
553 short ATTR_UNUSED(bits), void* arg)
554 {
555 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
556 dtio->reconnect_is_added = 0;
557 verbose(VERB_ALGO, "dnstap io: reconnect timer");
558
559 dtio_open_output(dtio);
560 if(dtio->event) {
561 if(!dtio_add_output_event_write(dtio))
562 return;
563 /* nothing wrong so far, wait on the output event */
564 return;
565 }
566 /* exponential backoff and retry on timer */
567 dtio_reconnect_enable(dtio);
568 }
569
570 /** attempt to reconnect to the output, after a timeout */
dtio_reconnect_enable(struct dt_io_thread * dtio)571 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
572 {
573 struct timeval tv;
574 int msec;
575 if(dtio->want_to_exit) return;
576 if(dtio->reconnect_is_added)
577 return; /* already done */
578
579 /* exponential backoff, store the value for next timeout */
580 msec = dtio->reconnect_timeout;
581 if(msec == 0) {
582 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
583 } else {
584 dtio->reconnect_timeout = msec*2;
585 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
586 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
587 }
588 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
589 msec);
590
591 /* setup wait timer */
592 memset(&tv, 0, sizeof(tv));
593 tv.tv_sec = msec/1000;
594 tv.tv_usec = (msec%1000)*1000;
595 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
596 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
597 log_err("dnstap io: could not reconnect ev timer add");
598 return;
599 }
600 dtio->reconnect_is_added = 1;
601 }
602
603 /** remove dtio reconnect timer */
dtio_reconnect_del(struct dt_io_thread * dtio)604 static void dtio_reconnect_del(struct dt_io_thread* dtio)
605 {
606 if(!dtio->reconnect_is_added)
607 return;
608 ub_timer_del(dtio->reconnect_timer);
609 dtio->reconnect_is_added = 0;
610 }
611
612 /** clear the reconnect exponential backoff timer.
613 * We have successfully connected so we can try again with short timeouts. */
dtio_reconnect_clear(struct dt_io_thread * dtio)614 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
615 {
616 dtio->reconnect_timeout = 0;
617 dtio_reconnect_del(dtio);
618 }
619
620 /** reconnect slowly, because we already know we have to wait for a bit */
dtio_reconnect_slow(struct dt_io_thread * dtio,int msec)621 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
622 {
623 dtio_reconnect_del(dtio);
624 dtio->reconnect_timeout = msec;
625 dtio_reconnect_enable(dtio);
626 }
627
628 /** delete the current message in the dtio, and reset counters */
dtio_cur_msg_free(struct dt_io_thread * dtio)629 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
630 {
631 free(dtio->cur_msg);
632 dtio->cur_msg = NULL;
633 dtio->cur_msg_len = 0;
634 dtio->cur_msg_done = 0;
635 dtio->cur_msg_len_done = 0;
636 }
637
638 /** delete the buffer and counters used to read frame */
dtio_read_frame_free(struct dt_frame_read_buf * rb)639 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
640 {
641 if(rb->buf) {
642 free(rb->buf);
643 rb->buf = NULL;
644 }
645 rb->buf_count = 0;
646 rb->buf_cap = 0;
647 rb->frame_len = 0;
648 rb->frame_len_done = 0;
649 rb->control_frame = 0;
650 }
651
652 /** del the output file descriptor event for listening */
dtio_del_output_event(struct dt_io_thread * dtio)653 static void dtio_del_output_event(struct dt_io_thread* dtio)
654 {
655 if(!dtio->event_added)
656 return;
657 ub_event_del(dtio->event);
658 dtio->event_added = 0;
659 dtio->event_added_is_write = 0;
660 }
661
662 /** close dtio socket and set it to -1 */
dtio_close_fd(struct dt_io_thread * dtio)663 static void dtio_close_fd(struct dt_io_thread* dtio)
664 {
665 sock_close(dtio->fd);
666 dtio->fd = -1;
667 }
668
669 /** close and stop the output file descriptor event */
dtio_close_output(struct dt_io_thread * dtio)670 static void dtio_close_output(struct dt_io_thread* dtio)
671 {
672 if(!dtio->event)
673 return;
674 ub_event_free(dtio->event);
675 dtio->event = NULL;
676 if(dtio->ssl) {
677 #ifdef HAVE_SSL
678 SSL_shutdown(dtio->ssl);
679 SSL_free(dtio->ssl);
680 dtio->ssl = NULL;
681 #endif
682 }
683 dtio_close_fd(dtio);
684
685 /* if there is a (partial) message, discard it
686 * we cannot send (the remainder of) it, and a new
687 * connection needs to start with a control frame. */
688 if(dtio->cur_msg) {
689 dtio_cur_msg_free(dtio);
690 }
691
692 dtio->ready_frame_sent = 0;
693 dtio->accept_frame_received = 0;
694 dtio_read_frame_free(&dtio->read_frame);
695
696 dtio_reconnect_enable(dtio);
697 }
698
699 /** check for pending nonblocking connect errors,
700 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
dtio_check_nb_connect(struct dt_io_thread * dtio)701 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
702 {
703 int error = 0;
704 socklen_t len = (socklen_t)sizeof(error);
705 if(!dtio->check_nb_connect)
706 return 1; /* everything okay */
707 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
708 &len) < 0) {
709 #ifndef USE_WINSOCK
710 error = errno; /* on solaris errno is error */
711 #else
712 error = WSAGetLastError();
713 #endif
714 }
715 #ifndef USE_WINSOCK
716 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
717 if(error == EINPROGRESS || error == EWOULDBLOCK)
718 return 0; /* try again later */
719 #endif
720 #else
721 if(error == WSAEINPROGRESS) {
722 return 0; /* try again later */
723 } else if(error == WSAEWOULDBLOCK) {
724 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
725 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
726 return 0; /* try again later */
727 }
728 #endif
729 if(error != 0) {
730 char* to = dtio->socket_path;
731 if(!to) to = dtio->ip_str;
732 if(!to) to = "";
733 log_err("dnstap io: failed to connect to \"%s\": %s",
734 to, sock_strerror(error));
735 return -1; /* error, close it */
736 }
737
738 if(dtio->ip_str)
739 verbose(VERB_DETAIL, "dnstap io: connected to %s",
740 dtio->ip_str);
741 else if(dtio->socket_path)
742 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
743 dtio->socket_path);
744 dtio_reconnect_clear(dtio);
745 dtio->check_nb_connect = 0;
746 return 1; /* everything okay */
747 }
748
749 #ifdef HAVE_SSL
750 /** write to ssl output
751 * returns number of bytes written, 0 if nothing happened,
752 * try again later, or -1 if the channel is to be closed. */
dtio_write_ssl(struct dt_io_thread * dtio,uint8_t * buf,size_t len)753 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
754 size_t len)
755 {
756 int r;
757 ERR_clear_error();
758 r = SSL_write(dtio->ssl, buf, len);
759 if(r <= 0) {
760 int want = SSL_get_error(dtio->ssl, r);
761 if(want == SSL_ERROR_ZERO_RETURN) {
762 /* closed */
763 return -1;
764 } else if(want == SSL_ERROR_WANT_READ) {
765 /* we want a brief read event */
766 dtio_enable_brief_read(dtio);
767 return 0;
768 } else if(want == SSL_ERROR_WANT_WRITE) {
769 /* write again later */
770 return 0;
771 } else if(want == SSL_ERROR_SYSCALL) {
772 #ifdef EPIPE
773 if(errno == EPIPE && verbosity < 2)
774 return -1; /* silence 'broken pipe' */
775 #endif
776 #ifdef ECONNRESET
777 if(errno == ECONNRESET && verbosity < 2)
778 return -1; /* silence reset by peer */
779 #endif
780 if(errno != 0) {
781 log_err("dnstap io, SSL_write syscall: %s",
782 strerror(errno));
783 }
784 return -1;
785 }
786 log_crypto_err("dnstap io, could not SSL_write");
787 return -1;
788 }
789 return r;
790 }
791 #endif /* HAVE_SSL */
792
793 /** write buffer to output.
794 * returns number of bytes written, 0 if nothing happened,
795 * try again later, or -1 if the channel is to be closed. */
dtio_write_buf(struct dt_io_thread * dtio,uint8_t * buf,size_t len)796 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
797 size_t len)
798 {
799 ssize_t ret;
800 if(dtio->fd == -1)
801 return -1;
802 #ifdef HAVE_SSL
803 if(dtio->ssl)
804 return dtio_write_ssl(dtio, buf, len);
805 #endif
806 ret = send(dtio->fd, (void*)buf, len, 0);
807 if(ret == -1) {
808 #ifndef USE_WINSOCK
809 if(errno == EINTR || errno == EAGAIN)
810 return 0;
811 #else
812 if(WSAGetLastError() == WSAEINPROGRESS)
813 return 0;
814 if(WSAGetLastError() == WSAEWOULDBLOCK) {
815 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
816 dtio->stop_flush_event:dtio->event),
817 UB_EV_WRITE);
818 return 0;
819 }
820 #endif
821 log_err("dnstap io: failed send: %s", sock_strerror(errno));
822 return -1;
823 }
824 return ret;
825 }
826
827 #ifdef HAVE_WRITEV
828 /** write with writev, len and message, in one write, if possible.
829 * return true if message is done, false if incomplete */
dtio_write_with_writev(struct dt_io_thread * dtio)830 static int dtio_write_with_writev(struct dt_io_thread* dtio)
831 {
832 uint32_t sendlen = htonl(dtio->cur_msg_len);
833 struct iovec iov[2];
834 ssize_t r;
835 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
836 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
837 iov[1].iov_base = dtio->cur_msg;
838 iov[1].iov_len = dtio->cur_msg_len;
839 log_assert(iov[0].iov_len > 0);
840 r = writev(dtio->fd, iov, 2);
841 if(r == -1) {
842 #ifndef USE_WINSOCK
843 if(errno == EINTR || errno == EAGAIN)
844 return 0;
845 #else
846 if(WSAGetLastError() == WSAEINPROGRESS)
847 return 0;
848 if(WSAGetLastError() == WSAEWOULDBLOCK) {
849 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
850 dtio->stop_flush_event:dtio->event),
851 UB_EV_WRITE);
852 return 0;
853 }
854 #endif
855 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
856 /* close the channel */
857 dtio_del_output_event(dtio);
858 dtio_close_output(dtio);
859 return 0;
860 }
861 /* written r bytes */
862 dtio->cur_msg_len_done += r;
863 if(dtio->cur_msg_len_done < 4)
864 return 0;
865 if(dtio->cur_msg_len_done > 4) {
866 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
867 dtio->cur_msg_len_done = 4;
868 }
869 if(dtio->cur_msg_done < dtio->cur_msg_len)
870 return 0;
871 return 1;
872 }
873 #endif /* HAVE_WRITEV */
874
875 /** write more of the length, preceding the data frame.
876 * return true if message is done, false if incomplete. */
dtio_write_more_of_len(struct dt_io_thread * dtio)877 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
878 {
879 uint32_t sendlen;
880 int r;
881 if(dtio->cur_msg_len_done >= 4)
882 return 1;
883 #ifdef HAVE_WRITEV
884 if(!dtio->ssl) {
885 /* we try writev for everything.*/
886 return dtio_write_with_writev(dtio);
887 }
888 #endif /* HAVE_WRITEV */
889 sendlen = htonl(dtio->cur_msg_len);
890 r = dtio_write_buf(dtio,
891 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
892 sizeof(sendlen)-dtio->cur_msg_len_done);
893 if(r == -1) {
894 /* close the channel */
895 dtio_del_output_event(dtio);
896 dtio_close_output(dtio);
897 return 0;
898 } else if(r == 0) {
899 /* try again later */
900 return 0;
901 }
902 dtio->cur_msg_len_done += r;
903 if(dtio->cur_msg_len_done < 4)
904 return 0;
905 return 1;
906 }
907
908 /** write more of the data frame.
909 * return true if message is done, false if incomplete. */
dtio_write_more_of_data(struct dt_io_thread * dtio)910 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
911 {
912 int r;
913 if(dtio->cur_msg_done >= dtio->cur_msg_len)
914 return 1;
915 r = dtio_write_buf(dtio,
916 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
917 dtio->cur_msg_len - dtio->cur_msg_done);
918 if(r == -1) {
919 /* close the channel */
920 dtio_del_output_event(dtio);
921 dtio_close_output(dtio);
922 return 0;
923 } else if(r == 0) {
924 /* try again later */
925 return 0;
926 }
927 dtio->cur_msg_done += r;
928 if(dtio->cur_msg_done < dtio->cur_msg_len)
929 return 0;
930 return 1;
931 }
932
933 /** write more of the current message. false if incomplete, true if
934 * the message is done */
dtio_write_more(struct dt_io_thread * dtio)935 static int dtio_write_more(struct dt_io_thread* dtio)
936 {
937 if(dtio->cur_msg_len_done < 4) {
938 if(!dtio_write_more_of_len(dtio))
939 return 0;
940 }
941 if(dtio->cur_msg_done < dtio->cur_msg_len) {
942 if(!dtio_write_more_of_data(dtio))
943 return 0;
944 }
945 return 1;
946 }
947
948 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
949 * -1: continue, >0: number of bytes read into buffer */
receive_bytes(struct dt_io_thread * dtio,void * buf,size_t len)950 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
951 ssize_t r;
952 r = recv(dtio->fd, (void*)buf, len, 0);
953 if(r == -1) {
954 char* to = dtio->socket_path;
955 if(!to) to = dtio->ip_str;
956 if(!to) to = "";
957 #ifndef USE_WINSOCK
958 if(errno == EINTR || errno == EAGAIN)
959 return -1; /* try later */
960 #else
961 if(WSAGetLastError() == WSAEINPROGRESS) {
962 return -1; /* try later */
963 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
964 ub_winsock_tcp_wouldblock(
965 (dtio->stop_flush_event?
966 dtio->stop_flush_event:dtio->event),
967 UB_EV_READ);
968 return -1; /* try later */
969 }
970 #endif
971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
972 verbosity < 4)
973 return 0; /* no log retries on low verbosity */
974 log_err("dnstap io: output closed, recv %s: %s", to,
975 strerror(errno));
976 /* and close below */
977 return 0;
978 }
979 if(r == 0) {
980 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
981 verbosity < 4)
982 return 0; /* no log retries on low verbosity */
983 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
984 /* and close below */
985 return 0;
986 }
987 /* something was received */
988 return r;
989 }
990
991 #ifdef HAVE_SSL
992 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
993 * -1: continue, >0: number of bytes read into buffer */
ssl_read_bytes(struct dt_io_thread * dtio,void * buf,size_t len)994 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
995 {
996 int r;
997 ERR_clear_error();
998 r = SSL_read(dtio->ssl, buf, len);
999 if(r <= 0) {
1000 int want = SSL_get_error(dtio->ssl, r);
1001 if(want == SSL_ERROR_ZERO_RETURN) {
1002 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1003 verbosity < 4)
1004 return 0; /* no log retries on low verbosity */
1005 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1006 "other side");
1007 return 0;
1008 } else if(want == SSL_ERROR_WANT_READ) {
1009 /* continue later */
1010 return -1;
1011 } else if(want == SSL_ERROR_WANT_WRITE) {
1012 (void)dtio_enable_brief_write(dtio);
1013 return -1;
1014 } else if(want == SSL_ERROR_SYSCALL) {
1015 #ifdef ECONNRESET
1016 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017 errno == ECONNRESET && verbosity < 4)
1018 return 0; /* silence reset by peer */
1019 #endif
1020 if(errno != 0)
1021 log_err("SSL_read syscall: %s",
1022 strerror(errno));
1023 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1024 "other side");
1025 return 0;
1026 }
1027 log_crypto_err("could not SSL_read");
1028 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029 "other side");
1030 return 0;
1031 }
1032 return r;
1033 }
1034 #endif /* HAVE_SSL */
1035
1036 /** check if the output fd has been closed,
1037 * it returns false if the stream is closed. */
dtio_check_close(struct dt_io_thread * dtio)1038 static int dtio_check_close(struct dt_io_thread* dtio)
1039 {
1040 /* we don't want to read any packets, but if there are we can
1041 * discard the input (ignore it). Ignore of unknown (control)
1042 * packets is okay for the framestream protocol. And also, the
1043 * read call can return that the stream has been closed by the
1044 * other side. */
1045 uint8_t buf[1024];
1046 int r = -1;
1047
1048
1049 if(dtio->fd == -1) return 0;
1050
1051 while(r != 0) {
1052 /* not interested in buffer content, overwrite */
1053 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1054 if(r == -1)
1055 return 1;
1056 }
1057 /* the other end has been closed */
1058 /* close the channel */
1059 dtio_del_output_event(dtio);
1060 dtio_close_output(dtio);
1061 return 0;
1062 }
1063
1064 /** Read accept frame. Returns -1: continue reading, 0: closed,
1065 * 1: valid accept received. */
dtio_read_accept_frame(struct dt_io_thread * dtio)1066 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1067 {
1068 int r;
1069 size_t read_frame_done;
1070 while(dtio->read_frame.frame_len_done < 4) {
1071 #ifdef HAVE_SSL
1072 if(dtio->ssl) {
1073 r = ssl_read_bytes(dtio,
1074 (uint8_t*)&dtio->read_frame.frame_len+
1075 dtio->read_frame.frame_len_done,
1076 4-dtio->read_frame.frame_len_done);
1077 } else {
1078 #endif
1079 r = receive_bytes(dtio,
1080 (uint8_t*)&dtio->read_frame.frame_len+
1081 dtio->read_frame.frame_len_done,
1082 4-dtio->read_frame.frame_len_done);
1083 #ifdef HAVE_SSL
1084 }
1085 #endif
1086 if(r == -1)
1087 return -1; /* continue reading */
1088 if(r == 0) {
1089 /* connection closed */
1090 goto close_connection;
1091 }
1092 dtio->read_frame.frame_len_done += r;
1093 if(dtio->read_frame.frame_len_done < 4)
1094 return -1; /* continue reading */
1095
1096 if(dtio->read_frame.frame_len == 0) {
1097 dtio->read_frame.frame_len_done = 0;
1098 dtio->read_frame.control_frame = 1;
1099 continue;
1100 }
1101 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1102 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1103 verbose(VERB_OPS, "dnstap: received frame exceeds max "
1104 "length of %d bytes, closing connection",
1105 DTIO_RECV_FRAME_MAX_LEN);
1106 goto close_connection;
1107 }
1108 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1109 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1110 if(!dtio->read_frame.buf) {
1111 log_err("dnstap io: out of memory (creating read "
1112 "buffer)");
1113 goto close_connection;
1114 }
1115 }
1116 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1117 #ifdef HAVE_SSL
1118 if(dtio->ssl) {
1119 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1120 dtio->read_frame.buf_count,
1121 dtio->read_frame.buf_cap-
1122 dtio->read_frame.buf_count);
1123 } else {
1124 #endif
1125 r = receive_bytes(dtio, dtio->read_frame.buf+
1126 dtio->read_frame.buf_count,
1127 dtio->read_frame.buf_cap-
1128 dtio->read_frame.buf_count);
1129 #ifdef HAVE_SSL
1130 }
1131 #endif
1132 if(r == -1)
1133 return -1; /* continue reading */
1134 if(r == 0) {
1135 /* connection closed */
1136 goto close_connection;
1137 }
1138 dtio->read_frame.buf_count += r;
1139 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1140 return -1; /* continue reading */
1141 }
1142
1143 /* Complete frame received, check if this is a valid ACCEPT control
1144 * frame. */
1145 if(dtio->read_frame.frame_len < 4) {
1146 verbose(VERB_OPS, "dnstap: invalid data received");
1147 goto close_connection;
1148 }
1149 if(sldns_read_uint32(dtio->read_frame.buf) !=
1150 FSTRM_CONTROL_FRAME_ACCEPT) {
1151 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1152 "ignored");
1153 dtio->ready_frame_sent = 0;
1154 dtio->accept_frame_received = 0;
1155 dtio_read_frame_free(&dtio->read_frame);
1156 return -1;
1157 }
1158 read_frame_done = 4; /* control frame type */
1159
1160 /* Iterate over control fields, ignore unknown types.
1161 * Need to be able to read at least 8 bytes (control field type +
1162 * length). */
1163 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1164 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1165 read_frame_done);
1166 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1167 read_frame_done + 4);
1168 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1169 if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1170 read_frame_done+8+len <=
1171 dtio->read_frame.frame_len &&
1172 memcmp(dtio->read_frame.buf + read_frame_done +
1173 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1174 if(!dtio_control_start_send(dtio)) {
1175 verbose(VERB_OPS, "dnstap io: out of "
1176 "memory while sending START frame");
1177 goto close_connection;
1178 }
1179 dtio->accept_frame_received = 1;
1180 if(!dtio_add_output_event_write(dtio))
1181 goto close_connection;
1182 return 1;
1183 } else {
1184 /* unknown content type */
1185 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1186 "contains unknown content type, "
1187 "closing connection");
1188 goto close_connection;
1189 }
1190 }
1191 /* unknown option, try next */
1192 read_frame_done += 8+len;
1193 }
1194
1195
1196 close_connection:
1197 dtio_del_output_event(dtio);
1198 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1199 dtio_close_output(dtio);
1200 return 0;
1201 }
1202
1203 /** add the output file descriptor event for listening, read only */
dtio_add_output_event_read(struct dt_io_thread * dtio)1204 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1205 {
1206 if(!dtio->event)
1207 return 0;
1208 if(dtio->event_added && !dtio->event_added_is_write)
1209 return 1;
1210 /* we have to (re-)register the event */
1211 if(dtio->event_added)
1212 ub_event_del(dtio->event);
1213 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1214 if(ub_event_add(dtio->event, NULL) != 0) {
1215 log_err("dnstap io: out of memory (adding event)");
1216 dtio->event_added = 0;
1217 dtio->event_added_is_write = 0;
1218 /* close output and start reattempts to open it */
1219 dtio_close_output(dtio);
1220 return 0;
1221 }
1222 dtio->event_added = 1;
1223 dtio->event_added_is_write = 0;
1224 return 1;
1225 }
1226
1227 /** add the output file descriptor event for listening, read and write */
dtio_add_output_event_write(struct dt_io_thread * dtio)1228 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1229 {
1230 if(!dtio->event)
1231 return 0;
1232 if(dtio->event_added && dtio->event_added_is_write)
1233 return 1;
1234 /* we have to (re-)register the event */
1235 if(dtio->event_added)
1236 ub_event_del(dtio->event);
1237 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1238 if(ub_event_add(dtio->event, NULL) != 0) {
1239 log_err("dnstap io: out of memory (adding event)");
1240 dtio->event_added = 0;
1241 dtio->event_added_is_write = 0;
1242 /* close output and start reattempts to open it */
1243 dtio_close_output(dtio);
1244 return 0;
1245 }
1246 dtio->event_added = 1;
1247 dtio->event_added_is_write = 1;
1248 return 1;
1249 }
1250
1251 /** put the dtio thread to sleep */
dtio_sleep(struct dt_io_thread * dtio)1252 static void dtio_sleep(struct dt_io_thread* dtio)
1253 {
1254 /* unregister the event polling for write, because there is
1255 * nothing to be written */
1256 (void)dtio_add_output_event_read(dtio);
1257 }
1258
1259 #ifdef HAVE_SSL
1260 /** enable the brief read condition */
dtio_enable_brief_read(struct dt_io_thread * dtio)1261 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1262 {
1263 dtio->ssl_brief_read = 1;
1264 if(dtio->stop_flush_event) {
1265 ub_event_del(dtio->stop_flush_event);
1266 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1267 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1268 log_err("dnstap io, stop flush, could not ub_event_add");
1269 return 0;
1270 }
1271 return 1;
1272 }
1273 return dtio_add_output_event_read(dtio);
1274 }
1275 #endif /* HAVE_SSL */
1276
1277 #ifdef HAVE_SSL
1278 /** disable the brief read condition */
dtio_disable_brief_read(struct dt_io_thread * dtio)1279 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1280 {
1281 dtio->ssl_brief_read = 0;
1282 if(dtio->stop_flush_event) {
1283 ub_event_del(dtio->stop_flush_event);
1284 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1285 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1286 log_err("dnstap io, stop flush, could not ub_event_add");
1287 return 0;
1288 }
1289 return 1;
1290 }
1291 return dtio_add_output_event_write(dtio);
1292 }
1293 #endif /* HAVE_SSL */
1294
1295 #ifdef HAVE_SSL
1296 /** enable the brief write condition */
dtio_enable_brief_write(struct dt_io_thread * dtio)1297 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1298 {
1299 dtio->ssl_brief_write = 1;
1300 return dtio_add_output_event_write(dtio);
1301 }
1302 #endif /* HAVE_SSL */
1303
1304 #ifdef HAVE_SSL
1305 /** disable the brief write condition */
dtio_disable_brief_write(struct dt_io_thread * dtio)1306 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1307 {
1308 dtio->ssl_brief_write = 0;
1309 return dtio_add_output_event_read(dtio);
1310 }
1311 #endif /* HAVE_SSL */
1312
1313 #ifdef HAVE_SSL
1314 /** check peer verification after ssl handshake connection, false if closed*/
dtio_ssl_check_peer(struct dt_io_thread * dtio)1315 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1316 {
1317 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1318 /* verification */
1319 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1320 X509* x = SSL_get_peer_certificate(dtio->ssl);
1321 if(!x) {
1322 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1323 "connection failed no certificate",
1324 dtio->ip_str);
1325 return 0;
1326 }
1327 log_cert(VERB_ALGO, "dnstap io, peer certificate",
1328 x);
1329 #ifdef HAVE_SSL_GET0_PEERNAME
1330 if(SSL_get0_peername(dtio->ssl)) {
1331 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1332 "connection to %s authenticated",
1333 dtio->ip_str,
1334 SSL_get0_peername(dtio->ssl));
1335 } else {
1336 #endif
1337 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1338 "connection authenticated",
1339 dtio->ip_str);
1340 #ifdef HAVE_SSL_GET0_PEERNAME
1341 }
1342 #endif
1343 X509_free(x);
1344 } else {
1345 X509* x = SSL_get_peer_certificate(dtio->ssl);
1346 if(x) {
1347 log_cert(VERB_ALGO, "dnstap io, peer "
1348 "certificate", x);
1349 X509_free(x);
1350 }
1351 verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1352 "failed: failed to authenticate",
1353 dtio->ip_str);
1354 return 0;
1355 }
1356 } else {
1357 /* unauthenticated, the verify peer flag was not set
1358 * in ssl when the ssl object was created from ssl_ctx */
1359 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1360 dtio->ip_str);
1361 }
1362 return 1;
1363 }
1364 #endif /* HAVE_SSL */
1365
1366 #ifdef HAVE_SSL
1367 /** perform ssl handshake, returns 1 if okay, 0 to stop */
dtio_ssl_handshake(struct dt_io_thread * dtio,struct stop_flush_info * info)1368 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1369 struct stop_flush_info* info)
1370 {
1371 int r;
1372 if(dtio->ssl_brief_read) {
1373 /* assume the brief read condition is satisfied,
1374 * if we need more or again, we can set it again */
1375 if(!dtio_disable_brief_read(dtio)) {
1376 if(info) dtio_stop_flush_exit(info);
1377 return 0;
1378 }
1379 }
1380 if(dtio->ssl_handshake_done)
1381 return 1;
1382
1383 ERR_clear_error();
1384 r = SSL_do_handshake(dtio->ssl);
1385 if(r != 1) {
1386 int want = SSL_get_error(dtio->ssl, r);
1387 if(want == SSL_ERROR_WANT_READ) {
1388 /* we want to read on the connection */
1389 if(!dtio_enable_brief_read(dtio)) {
1390 if(info) dtio_stop_flush_exit(info);
1391 return 0;
1392 }
1393 return 0;
1394 } else if(want == SSL_ERROR_WANT_WRITE) {
1395 /* we want to write on the connection */
1396 return 0;
1397 } else if(r == 0) {
1398 /* closed */
1399 if(info) dtio_stop_flush_exit(info);
1400 dtio_del_output_event(dtio);
1401 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1402 dtio_close_output(dtio);
1403 return 0;
1404 } else if(want == SSL_ERROR_SYSCALL) {
1405 /* SYSCALL and errno==0 means closed uncleanly */
1406 int silent = 0;
1407 #ifdef EPIPE
1408 if(errno == EPIPE && verbosity < 2)
1409 silent = 1; /* silence 'broken pipe' */
1410 #endif
1411 #ifdef ECONNRESET
1412 if(errno == ECONNRESET && verbosity < 2)
1413 silent = 1; /* silence reset by peer */
1414 #endif
1415 if(errno == 0)
1416 silent = 1;
1417 if(!silent)
1418 log_err("dnstap io, SSL_handshake syscall: %s",
1419 strerror(errno));
1420 /* closed */
1421 if(info) dtio_stop_flush_exit(info);
1422 dtio_del_output_event(dtio);
1423 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1424 dtio_close_output(dtio);
1425 return 0;
1426 } else {
1427 unsigned long err = ERR_get_error();
1428 if(!squelch_err_ssl_handshake(err)) {
1429 log_crypto_err_code("dnstap io, ssl handshake failed",
1430 err);
1431 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1432 "from %s", dtio->ip_str);
1433 }
1434 /* closed */
1435 if(info) dtio_stop_flush_exit(info);
1436 dtio_del_output_event(dtio);
1437 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1438 dtio_close_output(dtio);
1439 return 0;
1440 }
1441
1442 }
1443 /* check peer verification */
1444 dtio->ssl_handshake_done = 1;
1445
1446 if(!dtio_ssl_check_peer(dtio)) {
1447 /* closed */
1448 if(info) dtio_stop_flush_exit(info);
1449 dtio_del_output_event(dtio);
1450 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1451 dtio_close_output(dtio);
1452 return 0;
1453 }
1454 return 1;
1455 }
1456 #endif /* HAVE_SSL */
1457
1458 /** callback for the dnstap events, to write to the output */
dtio_output_cb(int ATTR_UNUSED (fd),short bits,void * arg)1459 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1460 {
1461 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1462 int i;
1463
1464 if(dtio->check_nb_connect) {
1465 int connect_err = dtio_check_nb_connect(dtio);
1466 if(connect_err == -1) {
1467 /* close the channel */
1468 dtio_del_output_event(dtio);
1469 dtio_close_output(dtio);
1470 return;
1471 } else if(connect_err == 0) {
1472 /* try again later */
1473 return;
1474 }
1475 /* nonblocking connect check passed, continue */
1476 }
1477
1478 #ifdef HAVE_SSL
1479 if(dtio->ssl &&
1480 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1481 if(!dtio_ssl_handshake(dtio, NULL))
1482 return;
1483 }
1484 #endif
1485
1486 if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1487 if(dtio->ssl_brief_write)
1488 (void)dtio_disable_brief_write(dtio);
1489 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1490 if(dtio_read_accept_frame(dtio) <= 0)
1491 return;
1492 } else if(!dtio_check_close(dtio))
1493 return;
1494 }
1495
1496 /* loop to process a number of messages. This improves throughput,
1497 * because selecting on write-event if not needed for busy messages
1498 * (dnstap log) generation and if they need to all be written back.
1499 * The write event is usually not blocked up. But not forever,
1500 * because the event loop needs to stay responsive for other events.
1501 * If there are no (more) messages, or if the output buffers get
1502 * full, it returns out of the loop. */
1503 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1504 /* see if there are messages that need writing */
1505 if(!dtio->cur_msg) {
1506 if(!dtio_find_msg(dtio)) {
1507 if(i == 0) {
1508 /* no messages on the first iteration,
1509 * the queues are all empty */
1510 dtio_sleep(dtio);
1511 }
1512 return; /* nothing to do */
1513 }
1514 }
1515
1516 /* write it */
1517 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1518 if(!dtio_write_more(dtio))
1519 return;
1520 }
1521
1522 /* done with the current message */
1523 dtio_cur_msg_free(dtio);
1524
1525 /* If this is a bidirectional stream the first message will be
1526 * the READY control frame. We can only continue writing after
1527 * receiving an ACCEPT control frame. */
1528 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1529 dtio->ready_frame_sent = 1;
1530 (void)dtio_add_output_event_read(dtio);
1531 break;
1532 }
1533 }
1534 }
1535
1536 /** callback for the dnstap commandpipe, to stop the dnstap IO */
dtio_cmd_cb(int fd,short ATTR_UNUSED (bits),void * arg)1537 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1538 {
1539 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1540 uint8_t cmd;
1541 ssize_t r;
1542 if(dtio->want_to_exit)
1543 return;
1544 r = read(fd, &cmd, sizeof(cmd));
1545 if(r == -1) {
1546 #ifndef USE_WINSOCK
1547 if(errno == EINTR || errno == EAGAIN)
1548 return; /* ignore this */
1549 #else
1550 if(WSAGetLastError() == WSAEINPROGRESS)
1551 return;
1552 if(WSAGetLastError() == WSAEWOULDBLOCK)
1553 return;
1554 #endif
1555 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1556 /* and then fall through to quit the thread */
1557 } else if(r == 0) {
1558 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1559 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1560 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1561 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1562 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1563
1564 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1565 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1566 "waiting for ACCEPT control frame");
1567 return;
1568 }
1569
1570 /* reregister event */
1571 if(!dtio_add_output_event_write(dtio))
1572 return;
1573 return;
1574 } else if(r == 1) {
1575 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1576 }
1577 dtio->want_to_exit = 1;
1578 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1579 != 0) {
1580 log_err("dnstap io: could not loopexit");
1581 }
1582 }
1583
1584 #ifndef THREADS_DISABLED
1585 /** setup the event base for the dnstap io thread */
dtio_setup_base(struct dt_io_thread * dtio,time_t * secs,struct timeval * now)1586 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1587 struct timeval* now)
1588 {
1589 memset(now, 0, sizeof(*now));
1590 dtio->event_base = ub_default_event_base(0, secs, now);
1591 if(!dtio->event_base) {
1592 fatal_exit("dnstap io: could not create event_base");
1593 }
1594 }
1595 #endif /* THREADS_DISABLED */
1596
1597 /** setup the cmd event for dnstap io */
dtio_setup_cmd(struct dt_io_thread * dtio)1598 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1599 {
1600 struct ub_event* cmdev;
1601 fd_set_nonblock(dtio->commandpipe[0]);
1602 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1603 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1604 if(!cmdev) {
1605 fatal_exit("dnstap io: out of memory");
1606 }
1607 dtio->command_event = cmdev;
1608 if(ub_event_add(cmdev, NULL) != 0) {
1609 fatal_exit("dnstap io: out of memory (adding event)");
1610 }
1611 }
1612
1613 /** setup the reconnect event for dnstap io */
dtio_setup_reconnect(struct dt_io_thread * dtio)1614 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1615 {
1616 dtio_reconnect_clear(dtio);
1617 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1618 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1619 if(!dtio->reconnect_timer) {
1620 fatal_exit("dnstap io: out of memory");
1621 }
1622 }
1623
1624 /**
1625 * structure to keep track of information during stop flush
1626 */
1627 struct stop_flush_info {
1628 /** the event base during stop flush */
1629 struct ub_event_base* base;
1630 /** did we already want to exit this stop-flush event base */
1631 int want_to_exit_flush;
1632 /** has the timer fired */
1633 int timer_done;
1634 /** the dtio */
1635 struct dt_io_thread* dtio;
1636 /** the stop control frame */
1637 void* stop_frame;
1638 /** length of the stop frame */
1639 size_t stop_frame_len;
1640 /** how much we have done of the stop frame */
1641 size_t stop_frame_done;
1642 };
1643
1644 /** exit the stop flush base */
dtio_stop_flush_exit(struct stop_flush_info * info)1645 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1646 {
1647 if(info->want_to_exit_flush)
1648 return;
1649 info->want_to_exit_flush = 1;
1650 if(ub_event_base_loopexit(info->base) != 0) {
1651 log_err("dnstap io: could not loopexit");
1652 }
1653 }
1654
1655 /** send the stop control,
1656 * return true if completed the frame. */
dtio_control_stop_send(struct stop_flush_info * info)1657 static int dtio_control_stop_send(struct stop_flush_info* info)
1658 {
1659 struct dt_io_thread* dtio = info->dtio;
1660 int r;
1661 if(info->stop_frame_done >= info->stop_frame_len)
1662 return 1;
1663 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1664 info->stop_frame_done, info->stop_frame_len -
1665 info->stop_frame_done);
1666 if(r == -1) {
1667 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1668 dtio_stop_flush_exit(info);
1669 return 0;
1670 }
1671 if(r == 0) {
1672 /* try again later, or timeout */
1673 return 0;
1674 }
1675 info->stop_frame_done += r;
1676 if(info->stop_frame_done < info->stop_frame_len)
1677 return 0; /* not done yet */
1678 return 1;
1679 }
1680
dtio_stop_timer_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)1681 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1682 void* arg)
1683 {
1684 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1685 if(info->want_to_exit_flush)
1686 return;
1687 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1688 info->timer_done = 1;
1689 dtio_stop_flush_exit(info);
1690 }
1691
dtio_stop_ev_cb(int ATTR_UNUSED (fd),short bits,void * arg)1692 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1693 {
1694 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1695 struct dt_io_thread* dtio = info->dtio;
1696 if(info->want_to_exit_flush)
1697 return;
1698 if(dtio->check_nb_connect) {
1699 /* we don't start the stop_flush if connect still
1700 * in progress, but the check code is here, just in case */
1701 int connect_err = dtio_check_nb_connect(dtio);
1702 if(connect_err == -1) {
1703 /* close the channel, exit the stop flush */
1704 dtio_stop_flush_exit(info);
1705 dtio_del_output_event(dtio);
1706 dtio_close_output(dtio);
1707 return;
1708 } else if(connect_err == 0) {
1709 /* try again later */
1710 return;
1711 }
1712 /* nonblocking connect check passed, continue */
1713 }
1714 #ifdef HAVE_SSL
1715 if(dtio->ssl &&
1716 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1717 if(!dtio_ssl_handshake(dtio, info))
1718 return;
1719 }
1720 #endif
1721
1722 if((bits&UB_EV_READ)) {
1723 if(!dtio_check_close(dtio)) {
1724 if(dtio->fd == -1) {
1725 verbose(VERB_ALGO, "dnstap io: "
1726 "stop flush: output closed");
1727 dtio_stop_flush_exit(info);
1728 }
1729 return;
1730 }
1731 }
1732 /* write remainder of last frame */
1733 if(dtio->cur_msg) {
1734 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1735 if(!dtio_write_more(dtio)) {
1736 if(dtio->fd == -1) {
1737 verbose(VERB_ALGO, "dnstap io: "
1738 "stop flush: output closed");
1739 dtio_stop_flush_exit(info);
1740 }
1741 return;
1742 }
1743 }
1744 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1745 "last frame");
1746 dtio_cur_msg_free(dtio);
1747 }
1748 /* write stop frame */
1749 if(info->stop_frame_done < info->stop_frame_len) {
1750 if(!dtio_control_stop_send(info))
1751 return;
1752 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1753 "stop control frame");
1754 }
1755 /* when last frame and stop frame are sent, exit */
1756 dtio_stop_flush_exit(info);
1757 }
1758
1759 /** flush at end, last packet and stop control */
dtio_control_stop_flush(struct dt_io_thread * dtio)1760 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1761 {
1762 /* briefly attempt to flush the previous packet to the output,
1763 * this could be a partial packet, or even the start control frame */
1764 time_t secs = 0;
1765 struct timeval now;
1766 struct stop_flush_info info;
1767 struct timeval tv;
1768 struct ub_event* timer, *stopev;
1769
1770 if(dtio->fd == -1 || dtio->check_nb_connect) {
1771 /* no connection or we have just connected, so nothing is
1772 * sent yet, so nothing to stop or flush */
1773 return;
1774 }
1775 if(dtio->ssl && !dtio->ssl_handshake_done) {
1776 /* no SSL connection has been established yet */
1777 return;
1778 }
1779
1780 memset(&info, 0, sizeof(info));
1781 memset(&now, 0, sizeof(now));
1782 info.dtio = dtio;
1783 info.base = ub_default_event_base(0, &secs, &now);
1784 if(!info.base) {
1785 log_err("dnstap io: malloc failure");
1786 return;
1787 }
1788 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1789 &dtio_stop_timer_cb, &info);
1790 if(!timer) {
1791 log_err("dnstap io: malloc failure");
1792 ub_event_base_free(info.base);
1793 return;
1794 }
1795 memset(&tv, 0, sizeof(tv));
1796 tv.tv_sec = 2;
1797 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1798 &tv) != 0) {
1799 log_err("dnstap io: cannot event_timer_add");
1800 ub_event_free(timer);
1801 ub_event_base_free(info.base);
1802 return;
1803 }
1804 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1805 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1806 if(!stopev) {
1807 log_err("dnstap io: malloc failure");
1808 ub_timer_del(timer);
1809 ub_event_free(timer);
1810 ub_event_base_free(info.base);
1811 return;
1812 }
1813 if(ub_event_add(stopev, NULL) != 0) {
1814 log_err("dnstap io: cannot event_add");
1815 ub_event_free(stopev);
1816 ub_timer_del(timer);
1817 ub_event_free(timer);
1818 ub_event_base_free(info.base);
1819 return;
1820 }
1821 info.stop_frame = fstrm_create_control_frame_stop(
1822 &info.stop_frame_len);
1823 if(!info.stop_frame) {
1824 log_err("dnstap io: malloc failure");
1825 ub_event_del(stopev);
1826 ub_event_free(stopev);
1827 ub_timer_del(timer);
1828 ub_event_free(timer);
1829 ub_event_base_free(info.base);
1830 return;
1831 }
1832 dtio->stop_flush_event = stopev;
1833
1834 /* wait briefly, or until finished */
1835 verbose(VERB_ALGO, "dnstap io: stop flush started");
1836 if(ub_event_base_dispatch(info.base) < 0) {
1837 log_err("dnstap io: dispatch flush failed, errno is %s",
1838 strerror(errno));
1839 }
1840 verbose(VERB_ALGO, "dnstap io: stop flush ended");
1841 free(info.stop_frame);
1842 dtio->stop_flush_event = NULL;
1843 ub_event_del(stopev);
1844 ub_event_free(stopev);
1845 ub_timer_del(timer);
1846 ub_event_free(timer);
1847 ub_event_base_free(info.base);
1848 }
1849
1850 /** perform desetup and free stuff when the dnstap io thread exits */
dtio_desetup(struct dt_io_thread * dtio)1851 static void dtio_desetup(struct dt_io_thread* dtio)
1852 {
1853 dtio_control_stop_flush(dtio);
1854 dtio_del_output_event(dtio);
1855 dtio_close_output(dtio);
1856 ub_event_del(dtio->command_event);
1857 ub_event_free(dtio->command_event);
1858 #ifndef USE_WINSOCK
1859 close(dtio->commandpipe[0]);
1860 #else
1861 _close(dtio->commandpipe[0]);
1862 #endif
1863 dtio->commandpipe[0] = -1;
1864 dtio_reconnect_del(dtio);
1865 ub_event_free(dtio->reconnect_timer);
1866 dtio_cur_msg_free(dtio);
1867 #ifndef THREADS_DISABLED
1868 ub_event_base_free(dtio->event_base);
1869 #endif
1870 }
1871
1872 /** setup a start control message */
dtio_control_start_send(struct dt_io_thread * dtio)1873 static int dtio_control_start_send(struct dt_io_thread* dtio)
1874 {
1875 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1876 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1877 &dtio->cur_msg_len);
1878 if(!dtio->cur_msg) {
1879 return 0;
1880 }
1881 /* setup to send the control message */
1882 /* set that the buffer needs to be sent, but the length
1883 * of that buffer is already written, that way the buffer can
1884 * start with 0 length and then the length of the control frame
1885 * in it */
1886 dtio->cur_msg_done = 0;
1887 dtio->cur_msg_len_done = 4;
1888 return 1;
1889 }
1890
1891 /** setup a ready control message */
dtio_control_ready_send(struct dt_io_thread * dtio)1892 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1893 {
1894 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1895 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1896 &dtio->cur_msg_len);
1897 if(!dtio->cur_msg) {
1898 return 0;
1899 }
1900 /* setup to send the control message */
1901 /* set that the buffer needs to be sent, but the length
1902 * of that buffer is already written, that way the buffer can
1903 * start with 0 length and then the length of the control frame
1904 * in it */
1905 dtio->cur_msg_done = 0;
1906 dtio->cur_msg_len_done = 4;
1907 return 1;
1908 }
1909
1910 /** open the output file descriptor for af_local */
dtio_open_output_local(struct dt_io_thread * dtio)1911 static int dtio_open_output_local(struct dt_io_thread* dtio)
1912 {
1913 #ifdef HAVE_SYS_UN_H
1914 struct sockaddr_un s;
1915 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1916 if(dtio->fd == -1) {
1917 log_err("dnstap io: failed to create socket: %s",
1918 sock_strerror(errno));
1919 return 0;
1920 }
1921 memset(&s, 0, sizeof(s));
1922 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1923 /* this member exists on BSDs, not Linux */
1924 s.sun_len = (unsigned)sizeof(s);
1925 #endif
1926 s.sun_family = AF_LOCAL;
1927 /* length is 92-108, 104 on FreeBSD */
1928 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1929 fd_set_nonblock(dtio->fd);
1930 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1931 == -1) {
1932 char* to = dtio->socket_path;
1933 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1934 verbosity < 4) {
1935 dtio_close_fd(dtio);
1936 return 0; /* no log retries on low verbosity */
1937 }
1938 log_err("dnstap io: failed to connect to \"%s\": %s",
1939 to, sock_strerror(errno));
1940 dtio_close_fd(dtio);
1941 return 0;
1942 }
1943 return 1;
1944 #else
1945 log_err("cannot create af_local socket");
1946 return 0;
1947 #endif /* HAVE_SYS_UN_H */
1948 }
1949
1950 /** open the output file descriptor for af_inet and af_inet6 */
dtio_open_output_tcp(struct dt_io_thread * dtio)1951 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1952 {
1953 struct sockaddr_storage addr;
1954 socklen_t addrlen;
1955 memset(&addr, 0, sizeof(addr));
1956 addrlen = (socklen_t)sizeof(addr);
1957
1958 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1959 log_err("could not parse IP '%s'", dtio->ip_str);
1960 return 0;
1961 }
1962 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1963 if(dtio->fd == -1) {
1964 log_err("can't create socket: %s", sock_strerror(errno));
1965 return 0;
1966 }
1967 fd_set_nonblock(dtio->fd);
1968 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1969 if(errno == EINPROGRESS)
1970 return 1; /* wait until connect done*/
1971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1972 verbosity < 4) {
1973 dtio_close_fd(dtio);
1974 return 0; /* no log retries on low verbosity */
1975 }
1976 #ifndef USE_WINSOCK
1977 if(tcp_connect_errno_needs_log(
1978 (struct sockaddr *)&addr, addrlen)) {
1979 log_err("dnstap io: failed to connect to %s: %s",
1980 dtio->ip_str, strerror(errno));
1981 }
1982 #else
1983 if(WSAGetLastError() == WSAEINPROGRESS ||
1984 WSAGetLastError() == WSAEWOULDBLOCK)
1985 return 1; /* wait until connect done*/
1986 if(tcp_connect_errno_needs_log(
1987 (struct sockaddr *)&addr, addrlen)) {
1988 log_err("dnstap io: failed to connect to %s: %s",
1989 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1990 }
1991 #endif
1992 dtio_close_fd(dtio);
1993 return 0;
1994 }
1995 return 1;
1996 }
1997
1998 /** setup the SSL structure for new connection */
dtio_setup_ssl(struct dt_io_thread * dtio)1999 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2000 {
2001 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2002 if(!dtio->ssl) return 0;
2003 dtio->ssl_handshake_done = 0;
2004 dtio->ssl_brief_read = 0;
2005
2006 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2007 dtio->tls_use_sni)) {
2008 return 0;
2009 }
2010 return 1;
2011 }
2012
2013 /** open the output file descriptor */
dtio_open_output(struct dt_io_thread * dtio)2014 static void dtio_open_output(struct dt_io_thread* dtio)
2015 {
2016 struct ub_event* ev;
2017 if(dtio->upstream_is_unix) {
2018 if(!dtio_open_output_local(dtio)) {
2019 dtio_reconnect_enable(dtio);
2020 return;
2021 }
2022 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2023 if(!dtio_open_output_tcp(dtio)) {
2024 dtio_reconnect_enable(dtio);
2025 return;
2026 }
2027 if(dtio->upstream_is_tls) {
2028 if(!dtio_setup_ssl(dtio)) {
2029 dtio_close_fd(dtio);
2030 dtio_reconnect_enable(dtio);
2031 return;
2032 }
2033 }
2034 }
2035 dtio->check_nb_connect = 1;
2036
2037 /* the EV_READ is to read ACCEPT control messages, and catch channel
2038 * close. EV_WRITE is to write packets */
2039 ev = ub_event_new(dtio->event_base, dtio->fd,
2040 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2041 dtio);
2042 if(!ev) {
2043 log_err("dnstap io: out of memory");
2044 if(dtio->ssl) {
2045 #ifdef HAVE_SSL
2046 SSL_free(dtio->ssl);
2047 dtio->ssl = NULL;
2048 #endif
2049 }
2050 dtio_close_fd(dtio);
2051 dtio_reconnect_enable(dtio);
2052 return;
2053 }
2054 dtio->event = ev;
2055
2056 /* setup protocol control message to start */
2057 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2058 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2059 log_err("dnstap io: out of memory");
2060 ub_event_free(dtio->event);
2061 dtio->event = NULL;
2062 if(dtio->ssl) {
2063 #ifdef HAVE_SSL
2064 SSL_free(dtio->ssl);
2065 dtio->ssl = NULL;
2066 #endif
2067 }
2068 dtio_close_fd(dtio);
2069 dtio_reconnect_enable(dtio);
2070 return;
2071 }
2072 }
2073
2074 /** perform the setup of the writer thread on the established event_base */
dtio_setup_on_base(struct dt_io_thread * dtio)2075 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2076 {
2077 dtio_setup_cmd(dtio);
2078 dtio_setup_reconnect(dtio);
2079 dtio_open_output(dtio);
2080 if(!dtio_add_output_event_write(dtio))
2081 return;
2082 }
2083
2084 #ifndef THREADS_DISABLED
2085 /** the IO thread function for the DNSTAP IO */
dnstap_io(void * arg)2086 static void* dnstap_io(void* arg)
2087 {
2088 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2089 time_t secs = 0;
2090 struct timeval now;
2091 log_thread_set(&dtio->threadnum);
2092
2093 /* setup */
2094 verbose(VERB_ALGO, "start dnstap io thread");
2095 dtio_setup_base(dtio, &secs, &now);
2096 dtio_setup_on_base(dtio);
2097
2098 /* run */
2099 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2100 log_err("dnstap io: dispatch failed, errno is %s",
2101 strerror(errno));
2102 }
2103
2104 /* cleanup */
2105 verbose(VERB_ALGO, "stop dnstap io thread");
2106 dtio_desetup(dtio);
2107 return NULL;
2108 }
2109 #endif /* THREADS_DISABLED */
2110
dt_io_thread_start(struct dt_io_thread * dtio,void * event_base_nothr,int numworkers)2111 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2112 int numworkers)
2113 {
2114 /* set up the thread, can fail */
2115 #ifndef USE_WINSOCK
2116 if(pipe(dtio->commandpipe) == -1) {
2117 log_err("failed to create pipe: %s", strerror(errno));
2118 return 0;
2119 }
2120 #else
2121 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2122 log_err("failed to create _pipe: %s",
2123 wsa_strerror(WSAGetLastError()));
2124 return 0;
2125 }
2126 #endif
2127
2128 /* start the thread */
2129 dtio->threadnum = numworkers+1;
2130 dtio->started = 1;
2131 #ifndef THREADS_DISABLED
2132 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2133 (void)event_base_nothr;
2134 #else
2135 dtio->event_base = event_base_nothr;
2136 dtio_setup_on_base(dtio);
2137 #endif
2138 return 1;
2139 }
2140
dt_io_thread_stop(struct dt_io_thread * dtio)2141 void dt_io_thread_stop(struct dt_io_thread* dtio)
2142 {
2143 #ifndef THREADS_DISABLED
2144 uint8_t cmd = DTIO_COMMAND_STOP;
2145 #endif
2146 if(!dtio) return;
2147 if(!dtio->started) return;
2148 verbose(VERB_ALGO, "dnstap io: send stop cmd");
2149
2150 #ifndef THREADS_DISABLED
2151 while(1) {
2152 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2153 if(r == -1) {
2154 #ifndef USE_WINSOCK
2155 if(errno == EINTR || errno == EAGAIN)
2156 continue;
2157 #else
2158 if(WSAGetLastError() == WSAEINPROGRESS)
2159 continue;
2160 if(WSAGetLastError() == WSAEWOULDBLOCK)
2161 continue;
2162 #endif
2163 log_err("dnstap io stop: write: %s",
2164 sock_strerror(errno));
2165 break;
2166 }
2167 break;
2168 }
2169 dtio->started = 0;
2170 #endif /* THREADS_DISABLED */
2171
2172 #ifndef USE_WINSOCK
2173 close(dtio->commandpipe[1]);
2174 #else
2175 _close(dtio->commandpipe[1]);
2176 #endif
2177 dtio->commandpipe[1] = -1;
2178 #ifndef THREADS_DISABLED
2179 ub_thread_join(dtio->tid);
2180 #else
2181 dtio->want_to_exit = 1;
2182 dtio_desetup(dtio);
2183 #endif
2184 }
2185