xref: /openbsd/usr.sbin/unbound/dnstap/dtstream.c (revision 9c7f0a49)
1d7b4a113Ssthen /*
2d7b4a113Ssthen  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3d7b4a113Ssthen  *
4d7b4a113Ssthen  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5d7b4a113Ssthen  *
6d7b4a113Ssthen  * This software is open source.
7d7b4a113Ssthen  *
8d7b4a113Ssthen  * Redistribution and use in source and binary forms, with or without
9d7b4a113Ssthen  * modification, are permitted provided that the following conditions
10d7b4a113Ssthen  * are met:
11d7b4a113Ssthen  *
12d7b4a113Ssthen  * Redistributions of source code must retain the above copyright notice,
13d7b4a113Ssthen  * this list of conditions and the following disclaimer.
14d7b4a113Ssthen  *
15d7b4a113Ssthen  * Redistributions in binary form must reproduce the above copyright notice,
16d7b4a113Ssthen  * this list of conditions and the following disclaimer in the documentation
17d7b4a113Ssthen  * and/or other materials provided with the distribution.
18d7b4a113Ssthen  *
19d7b4a113Ssthen  * Neither the name of the NLNET LABS nor the names of its contributors may
20d7b4a113Ssthen  * be used to endorse or promote products derived from this software without
21d7b4a113Ssthen  * specific prior written permission.
22d7b4a113Ssthen  *
23d7b4a113Ssthen  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24d7b4a113Ssthen  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25d7b4a113Ssthen  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26d7b4a113Ssthen  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27d7b4a113Ssthen  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28d7b4a113Ssthen  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29d7b4a113Ssthen  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30d7b4a113Ssthen  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31d7b4a113Ssthen  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32d7b4a113Ssthen  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33d7b4a113Ssthen  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34d7b4a113Ssthen  *
35d7b4a113Ssthen  */
36d7b4a113Ssthen 
37d7b4a113Ssthen /**
38d7b4a113Ssthen  * \file
39d7b4a113Ssthen  *
40d7b4a113Ssthen  * An implementation of the Frame Streams data transport protocol for
41d7b4a113Ssthen  * the Unbound DNSTAP message logging facility.
42d7b4a113Ssthen  */
43d7b4a113Ssthen 
44d7b4a113Ssthen #include "config.h"
45d7b4a113Ssthen #include "dnstap/dtstream.h"
46d7b4a113Ssthen #include "dnstap/dnstap_fstrm.h"
47d7b4a113Ssthen #include "util/config_file.h"
48d7b4a113Ssthen #include "util/ub_event.h"
49d7b4a113Ssthen #include "util/net_help.h"
50d7b4a113Ssthen #include "services/outside_network.h"
51d7b4a113Ssthen #include "sldns/sbuffer.h"
52d7b4a113Ssthen #ifdef HAVE_SYS_UN_H
53d7b4a113Ssthen #include <sys/un.h>
54d7b4a113Ssthen #endif
55d7b4a113Ssthen #include <fcntl.h>
56d7b4a113Ssthen #ifdef HAVE_OPENSSL_SSL_H
57d7b4a113Ssthen #include <openssl/ssl.h>
58d7b4a113Ssthen #endif
59d7b4a113Ssthen #ifdef HAVE_OPENSSL_ERR_H
60d7b4a113Ssthen #include <openssl/err.h>
61d7b4a113Ssthen #endif
62d7b4a113Ssthen 
63d7b4a113Ssthen /** number of messages to process in one output callback */
64d7b4a113Ssthen #define DTIO_MESSAGES_PER_CALLBACK 100
65d7b4a113Ssthen /** the msec to wait for reconnect (if not immediate, the first attempt) */
66d7b4a113Ssthen #define DTIO_RECONNECT_TIMEOUT_MIN 10
67d7b4a113Ssthen /** the msec to wait for reconnect max after backoff */
68d7b4a113Ssthen #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69d7b4a113Ssthen /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70d7b4a113Ssthen #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71e2a0f313Ssthen /** number of messages before wakeup of thread */
72e2a0f313Ssthen #define DTIO_MSG_FOR_WAKEUP 32
73d7b4a113Ssthen 
74d7b4a113Ssthen /** maximum length of received frame */
75d7b4a113Ssthen #define DTIO_RECV_FRAME_MAX_LEN 1000
76d7b4a113Ssthen 
77d7b4a113Ssthen struct stop_flush_info;
78d7b4a113Ssthen /** DTIO command channel commands */
79d7b4a113Ssthen enum {
80d7b4a113Ssthen 	/** DTIO command channel stop */
81d7b4a113Ssthen 	DTIO_COMMAND_STOP = 0,
82d7b4a113Ssthen 	/** DTIO command channel wakeup */
83d7b4a113Ssthen 	DTIO_COMMAND_WAKEUP = 1
84d7b4a113Ssthen } dtio_channel_command;
85d7b4a113Ssthen 
86d7b4a113Ssthen /** open the output channel */
87d7b4a113Ssthen static void dtio_open_output(struct dt_io_thread* dtio);
88d7b4a113Ssthen /** add output event for read and write */
89d7b4a113Ssthen static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90d7b4a113Ssthen /** start reconnection attempts */
91d7b4a113Ssthen static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92d7b4a113Ssthen /** stop from stop_flush event loop */
93d7b4a113Ssthen static void dtio_stop_flush_exit(struct stop_flush_info* info);
94d7b4a113Ssthen /** setup a start control message */
95d7b4a113Ssthen static int dtio_control_start_send(struct dt_io_thread* dtio);
96d7b4a113Ssthen #ifdef HAVE_SSL
97d7b4a113Ssthen /** enable briefly waiting for a read event, for SSL negotiation */
98d7b4a113Ssthen static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99d7b4a113Ssthen /** enable briefly waiting for a write event, for SSL negotiation */
100d7b4a113Ssthen static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101d7b4a113Ssthen #endif
102d7b4a113Ssthen 
103d7b4a113Ssthen struct dt_msg_queue*
dt_msg_queue_create(struct comm_base * base)104e2a0f313Ssthen dt_msg_queue_create(struct comm_base* base)
105d7b4a113Ssthen {
106d7b4a113Ssthen 	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107d7b4a113Ssthen 	if(!mq) return NULL;
108d7b4a113Ssthen 	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109d7b4a113Ssthen 		about 1 M should contain 64K messages with some overhead,
110d7b4a113Ssthen 		or a whole bunch smaller ones */
111e2a0f313Ssthen 	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112e2a0f313Ssthen 	if(!mq->wakeup_timer) {
113e2a0f313Ssthen 		free(mq);
114e2a0f313Ssthen 		return NULL;
115e2a0f313Ssthen 	}
116d7b4a113Ssthen 	lock_basic_init(&mq->lock);
117d7b4a113Ssthen 	lock_protect(&mq->lock, mq, sizeof(*mq));
118d7b4a113Ssthen 	return mq;
119d7b4a113Ssthen }
120d7b4a113Ssthen 
121d7b4a113Ssthen /** clear the message list, caller must hold the lock */
122d7b4a113Ssthen static void
dt_msg_queue_clear(struct dt_msg_queue * mq)123d7b4a113Ssthen dt_msg_queue_clear(struct dt_msg_queue* mq)
124d7b4a113Ssthen {
125d7b4a113Ssthen 	struct dt_msg_entry* e = mq->first, *next=NULL;
126d7b4a113Ssthen 	while(e) {
127d7b4a113Ssthen 		next = e->next;
128d7b4a113Ssthen 		free(e->buf);
129d7b4a113Ssthen 		free(e);
130d7b4a113Ssthen 		e = next;
131d7b4a113Ssthen 	}
132d7b4a113Ssthen 	mq->first = NULL;
133d7b4a113Ssthen 	mq->last = NULL;
134d7b4a113Ssthen 	mq->cursize = 0;
135e2a0f313Ssthen 	mq->msgcount = 0;
136d7b4a113Ssthen }
137d7b4a113Ssthen 
138d7b4a113Ssthen void
dt_msg_queue_delete(struct dt_msg_queue * mq)139d7b4a113Ssthen dt_msg_queue_delete(struct dt_msg_queue* mq)
140d7b4a113Ssthen {
141d7b4a113Ssthen 	if(!mq) return;
142d7b4a113Ssthen 	lock_basic_destroy(&mq->lock);
143d7b4a113Ssthen 	dt_msg_queue_clear(mq);
144e2a0f313Ssthen 	comm_timer_delete(mq->wakeup_timer);
145d7b4a113Ssthen 	free(mq);
146d7b4a113Ssthen }
147d7b4a113Ssthen 
148d7b4a113Ssthen /** make the dtio wake up by sending a wakeup command */
dtio_wakeup(struct dt_io_thread * dtio)149d7b4a113Ssthen static void dtio_wakeup(struct dt_io_thread* dtio)
150d7b4a113Ssthen {
151d7b4a113Ssthen 	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152d7b4a113Ssthen 	if(!dtio) return;
153d7b4a113Ssthen 	if(!dtio->started) return;
154d7b4a113Ssthen 
155d7b4a113Ssthen 	while(1) {
156d7b4a113Ssthen 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157d7b4a113Ssthen 		if(r == -1) {
158d7b4a113Ssthen #ifndef USE_WINSOCK
159d7b4a113Ssthen 			if(errno == EINTR || errno == EAGAIN)
160d7b4a113Ssthen 				continue;
161d7b4a113Ssthen #else
162d7b4a113Ssthen 			if(WSAGetLastError() == WSAEINPROGRESS)
163d7b4a113Ssthen 				continue;
164d7b4a113Ssthen 			if(WSAGetLastError() == WSAEWOULDBLOCK)
165d7b4a113Ssthen 				continue;
166d7b4a113Ssthen #endif
167e2a0f313Ssthen 			log_err("dnstap io wakeup: write: %s",
168e2a0f313Ssthen 				sock_strerror(errno));
169d7b4a113Ssthen 			break;
170d7b4a113Ssthen 		}
171d7b4a113Ssthen 		break;
172d7b4a113Ssthen 	}
173d7b4a113Ssthen }
174d7b4a113Ssthen 
175d7b4a113Ssthen void
mq_wakeup_cb(void * arg)176e2a0f313Ssthen mq_wakeup_cb(void* arg)
177e2a0f313Ssthen {
178e2a0f313Ssthen 	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179e2a0f313Ssthen 	/* even if the dtio is already active, because perhaps much
180e2a0f313Ssthen 	 * traffic suddenly, we leave the timer running to save on
181e2a0f313Ssthen 	 * managing it, the once a second timer is less work then
182e2a0f313Ssthen 	 * starting and stopping the timer frequently */
183e2a0f313Ssthen 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184e2a0f313Ssthen 	mq->dtio->wakeup_timer_enabled = 0;
185e2a0f313Ssthen 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186e2a0f313Ssthen 	dtio_wakeup(mq->dtio);
187e2a0f313Ssthen }
188e2a0f313Ssthen 
189e2a0f313Ssthen /** start timer to wakeup dtio because there is content in the queue */
190e2a0f313Ssthen static void
dt_msg_queue_start_timer(struct dt_msg_queue * mq,int wakeupnow)19183152a15Ssthen dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
192e2a0f313Ssthen {
19383152a15Ssthen 	struct timeval tv = {0};
194e2a0f313Ssthen 	/* Start a timer to process messages to be logged.
195e2a0f313Ssthen 	 * If we woke up the dtio thread for every message, the wakeup
196e2a0f313Ssthen 	 * messages take up too much processing power.  If the queue
197e2a0f313Ssthen 	 * fills up the wakeup happens immediately.  The timer wakes it up
198e2a0f313Ssthen 	 * if there are infrequent messages to log. */
199e2a0f313Ssthen 
200e2a0f313Ssthen 	/* we cannot start a timer in dtio thread, because it is a different
201e2a0f313Ssthen 	 * thread and its event base is in use by the other thread, it would
202e2a0f313Ssthen 	 * give race conditions if we tried to modify its event base,
203e2a0f313Ssthen 	 * and locks would wait until it woke up, and this is what we do. */
204e2a0f313Ssthen 
205e2a0f313Ssthen 	/* do not start the timer if a timer already exists, perhaps
206e2a0f313Ssthen 	 * in another worker.  So this variable is protected by a lock in
20783152a15Ssthen 	 * dtio. */
20883152a15Ssthen 
20983152a15Ssthen 	/* If we need to wakeupnow, 0 the timer to force the callback. */
210e2a0f313Ssthen 	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
211e2a0f313Ssthen 	if(mq->dtio->wakeup_timer_enabled) {
21283152a15Ssthen 		if(wakeupnow) {
21383152a15Ssthen 			comm_timer_set(mq->wakeup_timer, &tv);
21483152a15Ssthen 		}
215e2a0f313Ssthen 		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
216e2a0f313Ssthen 		return;
217e2a0f313Ssthen 	}
218e2a0f313Ssthen 	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
219e2a0f313Ssthen 
220e2a0f313Ssthen 	/* start the timer, in mq, in the event base of our worker */
22183152a15Ssthen 	if(!wakeupnow) {
222e2a0f313Ssthen 		tv.tv_sec = 1;
223e2a0f313Ssthen 		tv.tv_usec = 0;
22483152a15Ssthen 	}
225e2a0f313Ssthen 	comm_timer_set(mq->wakeup_timer, &tv);
22683152a15Ssthen 	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
227e2a0f313Ssthen }
228e2a0f313Ssthen 
229e2a0f313Ssthen void
dt_msg_queue_submit(struct dt_msg_queue * mq,void * buf,size_t len)230d7b4a113Ssthen dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
231d7b4a113Ssthen {
232e2a0f313Ssthen 	int wakeupnow = 0, wakeupstarttimer = 0;
233d7b4a113Ssthen 	struct dt_msg_entry* entry;
234d7b4a113Ssthen 
235d7b4a113Ssthen 	/* check conditions */
236d7b4a113Ssthen 	if(!buf) return;
237d7b4a113Ssthen 	if(len == 0) {
238d7b4a113Ssthen 		/* it is not possible to log entries with zero length,
239d7b4a113Ssthen 		 * because the framestream protocol does not carry it.
240d7b4a113Ssthen 		 * However the protobuf serialization does not create zero
241d7b4a113Ssthen 		 * length datagrams for dnstap, so this should not happen. */
242d7b4a113Ssthen 		free(buf);
243d7b4a113Ssthen 		return;
244d7b4a113Ssthen 	}
245d7b4a113Ssthen 	if(!mq) {
246d7b4a113Ssthen 		free(buf);
247d7b4a113Ssthen 		return;
248d7b4a113Ssthen 	}
249d7b4a113Ssthen 
250d7b4a113Ssthen 	/* allocate memory for queue entry */
251d7b4a113Ssthen 	entry = malloc(sizeof(*entry));
252d7b4a113Ssthen 	if(!entry) {
253d7b4a113Ssthen 		log_err("out of memory logging dnstap");
254d7b4a113Ssthen 		free(buf);
255d7b4a113Ssthen 		return;
256d7b4a113Ssthen 	}
257d7b4a113Ssthen 	entry->next = NULL;
258d7b4a113Ssthen 	entry->buf = buf;
259d7b4a113Ssthen 	entry->len = len;
260d7b4a113Ssthen 
26183152a15Ssthen 	/* acquire lock */
262d7b4a113Ssthen 	lock_basic_lock(&mq->lock);
263e2a0f313Ssthen 	/* if list was empty, start timer for (eventual) wakeup */
264d7b4a113Ssthen 	if(mq->first == NULL)
265e2a0f313Ssthen 		wakeupstarttimer = 1;
266e2a0f313Ssthen 	/* if list contains more than wakeupnum elements, wakeup now,
267e2a0f313Ssthen 	 * or if list is (going to be) almost full */
268e2a0f313Ssthen 	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
269e2a0f313Ssthen 		(mq->cursize < mq->maxsize * 9 / 10 &&
270e2a0f313Ssthen 		mq->cursize+len >= mq->maxsize * 9 / 10))
271e2a0f313Ssthen 		wakeupnow = 1;
272d7b4a113Ssthen 	/* see if it is going to fit */
273d7b4a113Ssthen 	if(mq->cursize + len > mq->maxsize) {
274d7b4a113Ssthen 		/* buffer full, or congested. */
275d7b4a113Ssthen 		/* drop */
276d7b4a113Ssthen 		lock_basic_unlock(&mq->lock);
277d7b4a113Ssthen 		free(buf);
278d7b4a113Ssthen 		free(entry);
279d7b4a113Ssthen 		return;
280d7b4a113Ssthen 	}
281d7b4a113Ssthen 	mq->cursize += len;
282e2a0f313Ssthen 	mq->msgcount ++;
283d7b4a113Ssthen 	/* append to list */
284d7b4a113Ssthen 	if(mq->last) {
285d7b4a113Ssthen 		mq->last->next = entry;
286d7b4a113Ssthen 	} else {
287d7b4a113Ssthen 		mq->first = entry;
288d7b4a113Ssthen 	}
289d7b4a113Ssthen 	mq->last = entry;
290d7b4a113Ssthen 	/* release lock */
291d7b4a113Ssthen 	lock_basic_unlock(&mq->lock);
292d7b4a113Ssthen 
29383152a15Ssthen 	if(wakeupnow || wakeupstarttimer) {
29483152a15Ssthen 		dt_msg_queue_start_timer(mq, wakeupnow);
295e2a0f313Ssthen 	}
296d7b4a113Ssthen }
297d7b4a113Ssthen 
dt_io_thread_create(void)298d7b4a113Ssthen struct dt_io_thread* dt_io_thread_create(void)
299d7b4a113Ssthen {
300d7b4a113Ssthen 	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
301e2a0f313Ssthen 	lock_basic_init(&dtio->wakeup_timer_lock);
302e2a0f313Ssthen 	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
303e2a0f313Ssthen 		sizeof(dtio->wakeup_timer_enabled));
304d7b4a113Ssthen 	return dtio;
305d7b4a113Ssthen }
306d7b4a113Ssthen 
dt_io_thread_delete(struct dt_io_thread * dtio)307d7b4a113Ssthen void dt_io_thread_delete(struct dt_io_thread* dtio)
308d7b4a113Ssthen {
309d7b4a113Ssthen 	struct dt_io_list_item* item, *nextitem;
310d7b4a113Ssthen 	if(!dtio) return;
311e2a0f313Ssthen 	lock_basic_destroy(&dtio->wakeup_timer_lock);
312d7b4a113Ssthen 	item=dtio->io_list;
313d7b4a113Ssthen 	while(item) {
314d7b4a113Ssthen 		nextitem = item->next;
315d7b4a113Ssthen 		free(item);
316d7b4a113Ssthen 		item = nextitem;
317d7b4a113Ssthen 	}
318d7b4a113Ssthen 	free(dtio->socket_path);
319d7b4a113Ssthen 	free(dtio->ip_str);
320d7b4a113Ssthen 	free(dtio->tls_server_name);
321d7b4a113Ssthen 	free(dtio->client_key_file);
322d7b4a113Ssthen 	free(dtio->client_cert_file);
323d7b4a113Ssthen 	if(dtio->ssl_ctx) {
324d7b4a113Ssthen #ifdef HAVE_SSL
325d7b4a113Ssthen 		SSL_CTX_free(dtio->ssl_ctx);
326d7b4a113Ssthen #endif
327d7b4a113Ssthen 	}
328d7b4a113Ssthen 	free(dtio);
329d7b4a113Ssthen }
330d7b4a113Ssthen 
dt_io_thread_apply_cfg(struct dt_io_thread * dtio,struct config_file * cfg)331d7b4a113Ssthen int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
332d7b4a113Ssthen {
333d7b4a113Ssthen 	if(!cfg->dnstap) {
334d7b4a113Ssthen 		log_warn("cannot setup dnstap because dnstap-enable is no");
335d7b4a113Ssthen 		return 0;
336d7b4a113Ssthen 	}
337d7b4a113Ssthen 
338d7b4a113Ssthen 	/* what type of connectivity do we have */
339d7b4a113Ssthen 	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
340d7b4a113Ssthen 		if(cfg->dnstap_tls)
341d7b4a113Ssthen 			dtio->upstream_is_tls = 1;
342d7b4a113Ssthen 		else	dtio->upstream_is_tcp = 1;
343d7b4a113Ssthen 	} else {
344d7b4a113Ssthen 		dtio->upstream_is_unix = 1;
345d7b4a113Ssthen 	}
346d7b4a113Ssthen 	dtio->is_bidirectional = cfg->dnstap_bidirectional;
347d7b4a113Ssthen 
348d7b4a113Ssthen 	if(dtio->upstream_is_unix) {
34972f58708Ssthen 		char* nm;
350d7b4a113Ssthen 		if(!cfg->dnstap_socket_path ||
351d7b4a113Ssthen 			cfg->dnstap_socket_path[0]==0) {
352d7b4a113Ssthen 			log_err("dnstap setup: no dnstap-socket-path for "
353d7b4a113Ssthen 				"socket connect");
354d7b4a113Ssthen 			return 0;
355d7b4a113Ssthen 		}
35672f58708Ssthen 		nm = cfg->dnstap_socket_path;
35772f58708Ssthen 		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
35872f58708Ssthen 			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
35972f58708Ssthen 			nm += strlen(cfg->chrootdir);
360d7b4a113Ssthen 		free(dtio->socket_path);
36172f58708Ssthen 		dtio->socket_path = strdup(nm);
362d7b4a113Ssthen 		if(!dtio->socket_path) {
363d7b4a113Ssthen 			log_err("dnstap setup: malloc failure");
364d7b4a113Ssthen 			return 0;
365d7b4a113Ssthen 		}
366d7b4a113Ssthen 	}
367d7b4a113Ssthen 
368d7b4a113Ssthen 	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
369d7b4a113Ssthen 		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
370d7b4a113Ssthen 			log_err("dnstap setup: no dnstap-ip for TCP connect");
371d7b4a113Ssthen 			return 0;
372d7b4a113Ssthen 		}
373d7b4a113Ssthen 		free(dtio->ip_str);
374d7b4a113Ssthen 		dtio->ip_str = strdup(cfg->dnstap_ip);
375d7b4a113Ssthen 		if(!dtio->ip_str) {
376d7b4a113Ssthen 			log_err("dnstap setup: malloc failure");
377d7b4a113Ssthen 			return 0;
378d7b4a113Ssthen 		}
379d7b4a113Ssthen 	}
380d7b4a113Ssthen 
381d7b4a113Ssthen 	if(dtio->upstream_is_tls) {
382d7b4a113Ssthen #ifdef HAVE_SSL
383d7b4a113Ssthen 		if(cfg->dnstap_tls_server_name &&
384d7b4a113Ssthen 			cfg->dnstap_tls_server_name[0]) {
385d7b4a113Ssthen 			free(dtio->tls_server_name);
386d7b4a113Ssthen 			dtio->tls_server_name = strdup(
387d7b4a113Ssthen 				cfg->dnstap_tls_server_name);
388d7b4a113Ssthen 			if(!dtio->tls_server_name) {
389d7b4a113Ssthen 				log_err("dnstap setup: malloc failure");
390d7b4a113Ssthen 				return 0;
391d7b4a113Ssthen 			}
392d7b4a113Ssthen 			if(!check_auth_name_for_ssl(dtio->tls_server_name))
393d7b4a113Ssthen 				return 0;
394d7b4a113Ssthen 		}
395d7b4a113Ssthen 		if(cfg->dnstap_tls_client_key_file &&
396d7b4a113Ssthen 			cfg->dnstap_tls_client_key_file[0]) {
397d7b4a113Ssthen 			dtio->use_client_certs = 1;
398d7b4a113Ssthen 			free(dtio->client_key_file);
399d7b4a113Ssthen 			dtio->client_key_file = strdup(
400d7b4a113Ssthen 				cfg->dnstap_tls_client_key_file);
401d7b4a113Ssthen 			if(!dtio->client_key_file) {
402d7b4a113Ssthen 				log_err("dnstap setup: malloc failure");
403d7b4a113Ssthen 				return 0;
404d7b4a113Ssthen 			}
405d7b4a113Ssthen 			if(!cfg->dnstap_tls_client_cert_file ||
406d7b4a113Ssthen 				cfg->dnstap_tls_client_cert_file[0]==0) {
407d7b4a113Ssthen 				log_err("dnstap setup: client key "
408d7b4a113Ssthen 					"authentication enabled with "
409d7b4a113Ssthen 					"dnstap-tls-client-key-file, but "
410d7b4a113Ssthen 					"no dnstap-tls-client-cert-file "
411d7b4a113Ssthen 					"is given");
412d7b4a113Ssthen 				return 0;
413d7b4a113Ssthen 			}
414d7b4a113Ssthen 			free(dtio->client_cert_file);
415d7b4a113Ssthen 			dtio->client_cert_file = strdup(
416d7b4a113Ssthen 				cfg->dnstap_tls_client_cert_file);
417d7b4a113Ssthen 			if(!dtio->client_cert_file) {
418d7b4a113Ssthen 				log_err("dnstap setup: malloc failure");
419d7b4a113Ssthen 				return 0;
420d7b4a113Ssthen 			}
421d7b4a113Ssthen 		} else {
422d7b4a113Ssthen 			dtio->use_client_certs = 0;
423d7b4a113Ssthen 			dtio->client_key_file = NULL;
424d7b4a113Ssthen 			dtio->client_cert_file = NULL;
425d7b4a113Ssthen 		}
426d7b4a113Ssthen 
427d7b4a113Ssthen 		if(cfg->dnstap_tls_cert_bundle) {
428d7b4a113Ssthen 			dtio->ssl_ctx = connect_sslctx_create(
429d7b4a113Ssthen 				dtio->client_key_file,
430d7b4a113Ssthen 				dtio->client_cert_file,
431d7b4a113Ssthen 				cfg->dnstap_tls_cert_bundle, 0);
432d7b4a113Ssthen 		} else {
433d7b4a113Ssthen 			dtio->ssl_ctx = connect_sslctx_create(
434d7b4a113Ssthen 				dtio->client_key_file,
435d7b4a113Ssthen 				dtio->client_cert_file,
436d7b4a113Ssthen 				cfg->tls_cert_bundle, cfg->tls_win_cert);
437d7b4a113Ssthen 		}
438d7b4a113Ssthen 		if(!dtio->ssl_ctx) {
439d7b4a113Ssthen 			log_err("could not setup SSL CTX");
440d7b4a113Ssthen 			return 0;
441d7b4a113Ssthen 		}
442d7b4a113Ssthen 		dtio->tls_use_sni = cfg->tls_use_sni;
443d7b4a113Ssthen #endif /* HAVE_SSL */
444d7b4a113Ssthen 	}
445d7b4a113Ssthen 	return 1;
446d7b4a113Ssthen }
447d7b4a113Ssthen 
dt_io_thread_register_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)448d7b4a113Ssthen int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449d7b4a113Ssthen         struct dt_msg_queue* mq)
450d7b4a113Ssthen {
451d7b4a113Ssthen 	struct dt_io_list_item* item = malloc(sizeof(*item));
452d7b4a113Ssthen 	if(!item) return 0;
453d7b4a113Ssthen 	lock_basic_lock(&mq->lock);
454d7b4a113Ssthen 	mq->dtio = dtio;
455d7b4a113Ssthen 	lock_basic_unlock(&mq->lock);
456d7b4a113Ssthen 	item->queue = mq;
457d7b4a113Ssthen 	item->next = dtio->io_list;
458d7b4a113Ssthen 	dtio->io_list = item;
459d7b4a113Ssthen 	dtio->io_list_iter = NULL;
460d7b4a113Ssthen 	return 1;
461d7b4a113Ssthen }
462d7b4a113Ssthen 
dt_io_thread_unregister_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)463d7b4a113Ssthen void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
464d7b4a113Ssthen         struct dt_msg_queue* mq)
465d7b4a113Ssthen {
466d7b4a113Ssthen 	struct dt_io_list_item* item, *prev=NULL;
467d7b4a113Ssthen 	if(!dtio) return;
468d7b4a113Ssthen 	item = dtio->io_list;
469d7b4a113Ssthen 	while(item) {
470d7b4a113Ssthen 		if(item->queue == mq) {
471d7b4a113Ssthen 			/* found it */
472d7b4a113Ssthen 			if(prev) prev->next = item->next;
473d7b4a113Ssthen 			else dtio->io_list = item->next;
474d7b4a113Ssthen 			/* the queue itself only registered, not deleted */
475d7b4a113Ssthen 			lock_basic_lock(&item->queue->lock);
476d7b4a113Ssthen 			item->queue->dtio = NULL;
477d7b4a113Ssthen 			lock_basic_unlock(&item->queue->lock);
478d7b4a113Ssthen 			free(item);
479d7b4a113Ssthen 			dtio->io_list_iter = NULL;
480d7b4a113Ssthen 			return;
481d7b4a113Ssthen 		}
482d7b4a113Ssthen 		prev = item;
483d7b4a113Ssthen 		item = item->next;
484d7b4a113Ssthen 	}
485d7b4a113Ssthen }
486d7b4a113Ssthen 
487d7b4a113Ssthen /** pick a message from the queue, the routine locks and unlocks,
488d7b4a113Ssthen  * returns true if there is a message */
dt_msg_queue_pop(struct dt_msg_queue * mq,void ** buf,size_t * len)489d7b4a113Ssthen static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
490d7b4a113Ssthen 	size_t* len)
491d7b4a113Ssthen {
492d7b4a113Ssthen 	lock_basic_lock(&mq->lock);
493d7b4a113Ssthen 	if(mq->first) {
494d7b4a113Ssthen 		struct dt_msg_entry* entry = mq->first;
495d7b4a113Ssthen 		mq->first = entry->next;
496d7b4a113Ssthen 		if(!entry->next) mq->last = NULL;
497d7b4a113Ssthen 		mq->cursize -= entry->len;
498e2a0f313Ssthen 		mq->msgcount --;
499d7b4a113Ssthen 		lock_basic_unlock(&mq->lock);
500d7b4a113Ssthen 
501d7b4a113Ssthen 		*buf = entry->buf;
502d7b4a113Ssthen 		*len = entry->len;
503d7b4a113Ssthen 		free(entry);
504d7b4a113Ssthen 		return 1;
505d7b4a113Ssthen 	}
506d7b4a113Ssthen 	lock_basic_unlock(&mq->lock);
507d7b4a113Ssthen 	return 0;
508d7b4a113Ssthen }
509d7b4a113Ssthen 
510d7b4a113Ssthen /** find message in queue, false if no message, true if message to send */
dtio_find_in_queue(struct dt_io_thread * dtio,struct dt_msg_queue * mq)511d7b4a113Ssthen static int dtio_find_in_queue(struct dt_io_thread* dtio,
512d7b4a113Ssthen 	struct dt_msg_queue* mq)
513d7b4a113Ssthen {
514d7b4a113Ssthen 	void* buf=NULL;
515d7b4a113Ssthen 	size_t len=0;
516d7b4a113Ssthen 	if(dt_msg_queue_pop(mq, &buf, &len)) {
517d7b4a113Ssthen 		dtio->cur_msg = buf;
518d7b4a113Ssthen 		dtio->cur_msg_len = len;
519d7b4a113Ssthen 		dtio->cur_msg_done = 0;
520d7b4a113Ssthen 		dtio->cur_msg_len_done = 0;
521d7b4a113Ssthen 		return 1;
522d7b4a113Ssthen 	}
523d7b4a113Ssthen 	return 0;
524d7b4a113Ssthen }
525d7b4a113Ssthen 
526d7b4a113Ssthen /** find a new message to write, search message queues, false if none */
dtio_find_msg(struct dt_io_thread * dtio)527d7b4a113Ssthen static int dtio_find_msg(struct dt_io_thread* dtio)
528d7b4a113Ssthen {
529d7b4a113Ssthen 	struct dt_io_list_item *spot, *item;
530d7b4a113Ssthen 
531d7b4a113Ssthen 	spot = dtio->io_list_iter;
532d7b4a113Ssthen 	/* use the next queue for the next message lookup,
533d7b4a113Ssthen 	 * if we hit the end(NULL) the NULL restarts the iter at start. */
534d7b4a113Ssthen 	if(spot)
535d7b4a113Ssthen 		dtio->io_list_iter = spot->next;
536d7b4a113Ssthen 	else if(dtio->io_list)
537d7b4a113Ssthen 		dtio->io_list_iter = dtio->io_list->next;
538d7b4a113Ssthen 
539d7b4a113Ssthen 	/* scan from spot to end-of-io_list */
540d7b4a113Ssthen 	item = spot;
541d7b4a113Ssthen 	while(item) {
542d7b4a113Ssthen 		if(dtio_find_in_queue(dtio, item->queue))
543d7b4a113Ssthen 			return 1;
544d7b4a113Ssthen 		item = item->next;
545d7b4a113Ssthen 	}
546d7b4a113Ssthen 	/* scan starting at the start-of-list (to wrap around the end) */
547d7b4a113Ssthen 	item = dtio->io_list;
548d7b4a113Ssthen 	while(item) {
549d7b4a113Ssthen 		if(dtio_find_in_queue(dtio, item->queue))
550d7b4a113Ssthen 			return 1;
551d7b4a113Ssthen 		item = item->next;
552d7b4a113Ssthen 	}
553d7b4a113Ssthen 	return 0;
554d7b4a113Ssthen }
555d7b4a113Ssthen 
556d7b4a113Ssthen /** callback for the dnstap reconnect, to start reconnecting to output */
dtio_reconnect_timeout_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)557d7b4a113Ssthen void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
558d7b4a113Ssthen 	short ATTR_UNUSED(bits), void* arg)
559d7b4a113Ssthen {
560d7b4a113Ssthen 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
561d7b4a113Ssthen 	dtio->reconnect_is_added = 0;
562d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: reconnect timer");
563d7b4a113Ssthen 
564d7b4a113Ssthen 	dtio_open_output(dtio);
565d7b4a113Ssthen 	if(dtio->event) {
566d7b4a113Ssthen 		if(!dtio_add_output_event_write(dtio))
567d7b4a113Ssthen 			return;
568d7b4a113Ssthen 		/* nothing wrong so far, wait on the output event */
569d7b4a113Ssthen 		return;
570d7b4a113Ssthen 	}
571d7b4a113Ssthen 	/* exponential backoff and retry on timer */
572d7b4a113Ssthen 	dtio_reconnect_enable(dtio);
573d7b4a113Ssthen }
574d7b4a113Ssthen 
575d7b4a113Ssthen /** attempt to reconnect to the output, after a timeout */
dtio_reconnect_enable(struct dt_io_thread * dtio)576d7b4a113Ssthen static void dtio_reconnect_enable(struct dt_io_thread* dtio)
577d7b4a113Ssthen {
578d7b4a113Ssthen 	struct timeval tv;
579d7b4a113Ssthen 	int msec;
580d7b4a113Ssthen 	if(dtio->want_to_exit) return;
581d7b4a113Ssthen 	if(dtio->reconnect_is_added)
582d7b4a113Ssthen 		return; /* already done */
583d7b4a113Ssthen 
584d7b4a113Ssthen 	/* exponential backoff, store the value for next timeout */
585d7b4a113Ssthen 	msec = dtio->reconnect_timeout;
586d7b4a113Ssthen 	if(msec == 0) {
587d7b4a113Ssthen 		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
588d7b4a113Ssthen 	} else {
589d7b4a113Ssthen 		dtio->reconnect_timeout = msec*2;
590d7b4a113Ssthen 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
591d7b4a113Ssthen 			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
592d7b4a113Ssthen 	}
593d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
594d7b4a113Ssthen 		msec);
595d7b4a113Ssthen 
596d7b4a113Ssthen 	/* setup wait timer */
597d7b4a113Ssthen 	memset(&tv, 0, sizeof(tv));
598d7b4a113Ssthen 	tv.tv_sec = msec/1000;
599d7b4a113Ssthen 	tv.tv_usec = (msec%1000)*1000;
600d7b4a113Ssthen 	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
601d7b4a113Ssthen 		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
602d7b4a113Ssthen 		log_err("dnstap io: could not reconnect ev timer add");
603d7b4a113Ssthen 		return;
604d7b4a113Ssthen 	}
605d7b4a113Ssthen 	dtio->reconnect_is_added = 1;
606d7b4a113Ssthen }
607d7b4a113Ssthen 
608d7b4a113Ssthen /** remove dtio reconnect timer */
dtio_reconnect_del(struct dt_io_thread * dtio)609d7b4a113Ssthen static void dtio_reconnect_del(struct dt_io_thread* dtio)
610d7b4a113Ssthen {
611d7b4a113Ssthen 	if(!dtio->reconnect_is_added)
612d7b4a113Ssthen 		return;
613d7b4a113Ssthen 	ub_timer_del(dtio->reconnect_timer);
614d7b4a113Ssthen 	dtio->reconnect_is_added = 0;
615d7b4a113Ssthen }
616d7b4a113Ssthen 
617d7b4a113Ssthen /** clear the reconnect exponential backoff timer.
618d7b4a113Ssthen  * We have successfully connected so we can try again with short timeouts. */
dtio_reconnect_clear(struct dt_io_thread * dtio)619d7b4a113Ssthen static void dtio_reconnect_clear(struct dt_io_thread* dtio)
620d7b4a113Ssthen {
621d7b4a113Ssthen 	dtio->reconnect_timeout = 0;
622d7b4a113Ssthen 	dtio_reconnect_del(dtio);
623d7b4a113Ssthen }
624d7b4a113Ssthen 
625d7b4a113Ssthen /** reconnect slowly, because we already know we have to wait for a bit */
dtio_reconnect_slow(struct dt_io_thread * dtio,int msec)626d7b4a113Ssthen static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
627d7b4a113Ssthen {
628d7b4a113Ssthen 	dtio_reconnect_del(dtio);
629d7b4a113Ssthen 	dtio->reconnect_timeout = msec;
630d7b4a113Ssthen 	dtio_reconnect_enable(dtio);
631d7b4a113Ssthen }
632d7b4a113Ssthen 
633d7b4a113Ssthen /** delete the current message in the dtio, and reset counters */
dtio_cur_msg_free(struct dt_io_thread * dtio)634d7b4a113Ssthen static void dtio_cur_msg_free(struct dt_io_thread* dtio)
635d7b4a113Ssthen {
636d7b4a113Ssthen 	free(dtio->cur_msg);
637d7b4a113Ssthen 	dtio->cur_msg = NULL;
638d7b4a113Ssthen 	dtio->cur_msg_len = 0;
639d7b4a113Ssthen 	dtio->cur_msg_done = 0;
640d7b4a113Ssthen 	dtio->cur_msg_len_done = 0;
641d7b4a113Ssthen }
642d7b4a113Ssthen 
643d7b4a113Ssthen /** delete the buffer and counters used to read frame */
dtio_read_frame_free(struct dt_frame_read_buf * rb)644d7b4a113Ssthen static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
645d7b4a113Ssthen {
646d7b4a113Ssthen 	if(rb->buf) {
647d7b4a113Ssthen 		free(rb->buf);
648d7b4a113Ssthen 		rb->buf = NULL;
649d7b4a113Ssthen 	}
650d7b4a113Ssthen 	rb->buf_count = 0;
651d7b4a113Ssthen 	rb->buf_cap = 0;
652d7b4a113Ssthen 	rb->frame_len = 0;
653d7b4a113Ssthen 	rb->frame_len_done = 0;
654d7b4a113Ssthen 	rb->control_frame = 0;
655d7b4a113Ssthen }
656d7b4a113Ssthen 
657d7b4a113Ssthen /** del the output file descriptor event for listening */
dtio_del_output_event(struct dt_io_thread * dtio)658d7b4a113Ssthen static void dtio_del_output_event(struct dt_io_thread* dtio)
659d7b4a113Ssthen {
660d7b4a113Ssthen 	if(!dtio->event_added)
661d7b4a113Ssthen 		return;
662d7b4a113Ssthen 	ub_event_del(dtio->event);
663d7b4a113Ssthen 	dtio->event_added = 0;
664d7b4a113Ssthen 	dtio->event_added_is_write = 0;
665d7b4a113Ssthen }
666d7b4a113Ssthen 
667d7b4a113Ssthen /** close dtio socket and set it to -1 */
dtio_close_fd(struct dt_io_thread * dtio)668d7b4a113Ssthen static void dtio_close_fd(struct dt_io_thread* dtio)
669d7b4a113Ssthen {
670e2a0f313Ssthen 	sock_close(dtio->fd);
671d7b4a113Ssthen 	dtio->fd = -1;
672d7b4a113Ssthen }
673d7b4a113Ssthen 
674d7b4a113Ssthen /** close and stop the output file descriptor event */
dtio_close_output(struct dt_io_thread * dtio)675d7b4a113Ssthen static void dtio_close_output(struct dt_io_thread* dtio)
676d7b4a113Ssthen {
677d7b4a113Ssthen 	if(!dtio->event)
678d7b4a113Ssthen 		return;
679d7b4a113Ssthen 	ub_event_free(dtio->event);
680d7b4a113Ssthen 	dtio->event = NULL;
681d7b4a113Ssthen 	if(dtio->ssl) {
682d7b4a113Ssthen #ifdef HAVE_SSL
683d7b4a113Ssthen 		SSL_shutdown(dtio->ssl);
684d7b4a113Ssthen 		SSL_free(dtio->ssl);
685d7b4a113Ssthen 		dtio->ssl = NULL;
686d7b4a113Ssthen #endif
687d7b4a113Ssthen 	}
688d7b4a113Ssthen 	dtio_close_fd(dtio);
689d7b4a113Ssthen 
690d7b4a113Ssthen 	/* if there is a (partial) message, discard it
691d7b4a113Ssthen 	 * we cannot send (the remainder of) it, and a new
692d7b4a113Ssthen 	 * connection needs to start with a control frame. */
693d7b4a113Ssthen 	if(dtio->cur_msg) {
694d7b4a113Ssthen 		dtio_cur_msg_free(dtio);
695d7b4a113Ssthen 	}
696d7b4a113Ssthen 
697d7b4a113Ssthen 	dtio->ready_frame_sent = 0;
698d7b4a113Ssthen 	dtio->accept_frame_received = 0;
699d7b4a113Ssthen 	dtio_read_frame_free(&dtio->read_frame);
700d7b4a113Ssthen 
701d7b4a113Ssthen 	dtio_reconnect_enable(dtio);
702d7b4a113Ssthen }
703d7b4a113Ssthen 
704d7b4a113Ssthen /** check for pending nonblocking connect errors,
705d7b4a113Ssthen  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
dtio_check_nb_connect(struct dt_io_thread * dtio)706d7b4a113Ssthen static int dtio_check_nb_connect(struct dt_io_thread* dtio)
707d7b4a113Ssthen {
708d7b4a113Ssthen 	int error = 0;
709d7b4a113Ssthen 	socklen_t len = (socklen_t)sizeof(error);
710d7b4a113Ssthen 	if(!dtio->check_nb_connect)
711d7b4a113Ssthen 		return 1; /* everything okay */
712d7b4a113Ssthen 	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
713d7b4a113Ssthen 		&len) < 0) {
714d7b4a113Ssthen #ifndef USE_WINSOCK
715d7b4a113Ssthen 		error = errno; /* on solaris errno is error */
716d7b4a113Ssthen #else
717d7b4a113Ssthen 		error = WSAGetLastError();
718d7b4a113Ssthen #endif
719d7b4a113Ssthen 	}
720d7b4a113Ssthen #ifndef USE_WINSOCK
721d7b4a113Ssthen #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
722d7b4a113Ssthen 	if(error == EINPROGRESS || error == EWOULDBLOCK)
723d7b4a113Ssthen 		return 0; /* try again later */
724d7b4a113Ssthen #endif
725d7b4a113Ssthen #else
726d7b4a113Ssthen 	if(error == WSAEINPROGRESS) {
727d7b4a113Ssthen 		return 0; /* try again later */
728d7b4a113Ssthen 	} else if(error == WSAEWOULDBLOCK) {
729d7b4a113Ssthen 		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
730d7b4a113Ssthen 			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
731d7b4a113Ssthen 		return 0; /* try again later */
732d7b4a113Ssthen 	}
733d7b4a113Ssthen #endif
734d7b4a113Ssthen 	if(error != 0) {
735d7b4a113Ssthen 		char* to = dtio->socket_path;
736d7b4a113Ssthen 		if(!to) to = dtio->ip_str;
737d7b4a113Ssthen 		if(!to) to = "";
738d7b4a113Ssthen 		log_err("dnstap io: failed to connect to \"%s\": %s",
739e2a0f313Ssthen 			to, sock_strerror(error));
740d7b4a113Ssthen 		return -1; /* error, close it */
741d7b4a113Ssthen 	}
742d7b4a113Ssthen 
743d7b4a113Ssthen 	if(dtio->ip_str)
744d7b4a113Ssthen 		verbose(VERB_DETAIL, "dnstap io: connected to %s",
745d7b4a113Ssthen 			dtio->ip_str);
746d7b4a113Ssthen 	else if(dtio->socket_path)
747d7b4a113Ssthen 		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
748d7b4a113Ssthen 			dtio->socket_path);
749d7b4a113Ssthen 	dtio_reconnect_clear(dtio);
750d7b4a113Ssthen 	dtio->check_nb_connect = 0;
751d7b4a113Ssthen 	return 1; /* everything okay */
752d7b4a113Ssthen }
753d7b4a113Ssthen 
754d7b4a113Ssthen #ifdef HAVE_SSL
755d7b4a113Ssthen /** write to ssl output
756d7b4a113Ssthen  * returns number of bytes written, 0 if nothing happened,
757d7b4a113Ssthen  * try again later, or -1 if the channel is to be closed. */
dtio_write_ssl(struct dt_io_thread * dtio,uint8_t * buf,size_t len)758d7b4a113Ssthen static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
759d7b4a113Ssthen 	size_t len)
760d7b4a113Ssthen {
761d7b4a113Ssthen 	int r;
762d7b4a113Ssthen 	ERR_clear_error();
763d7b4a113Ssthen 	r = SSL_write(dtio->ssl, buf, len);
764d7b4a113Ssthen 	if(r <= 0) {
765d7b4a113Ssthen 		int want = SSL_get_error(dtio->ssl, r);
766d7b4a113Ssthen 		if(want == SSL_ERROR_ZERO_RETURN) {
767d7b4a113Ssthen 			/* closed */
768d7b4a113Ssthen 			return -1;
769d7b4a113Ssthen 		} else if(want == SSL_ERROR_WANT_READ) {
770d7b4a113Ssthen 			/* we want a brief read event */
771d7b4a113Ssthen 			dtio_enable_brief_read(dtio);
772d7b4a113Ssthen 			return 0;
773d7b4a113Ssthen 		} else if(want == SSL_ERROR_WANT_WRITE) {
774d7b4a113Ssthen 			/* write again later */
775d7b4a113Ssthen 			return 0;
776d7b4a113Ssthen 		} else if(want == SSL_ERROR_SYSCALL) {
777d7b4a113Ssthen #ifdef EPIPE
778d7b4a113Ssthen 			if(errno == EPIPE && verbosity < 2)
779d7b4a113Ssthen 				return -1; /* silence 'broken pipe' */
780d7b4a113Ssthen #endif
781d7b4a113Ssthen #ifdef ECONNRESET
782d7b4a113Ssthen 			if(errno == ECONNRESET && verbosity < 2)
783d7b4a113Ssthen 				return -1; /* silence reset by peer */
784d7b4a113Ssthen #endif
785d7b4a113Ssthen 			if(errno != 0) {
786d7b4a113Ssthen 				log_err("dnstap io, SSL_write syscall: %s",
787d7b4a113Ssthen 					strerror(errno));
788d7b4a113Ssthen 			}
789d7b4a113Ssthen 			return -1;
790d7b4a113Ssthen 		}
791*9c7f0a49Ssthen 		log_crypto_err_io("dnstap io, could not SSL_write", want);
792d7b4a113Ssthen 		return -1;
793d7b4a113Ssthen 	}
794d7b4a113Ssthen 	return r;
795d7b4a113Ssthen }
796d7b4a113Ssthen #endif /* HAVE_SSL */
797d7b4a113Ssthen 
798d7b4a113Ssthen /** write buffer to output.
799d7b4a113Ssthen  * returns number of bytes written, 0 if nothing happened,
800d7b4a113Ssthen  * try again later, or -1 if the channel is to be closed. */
dtio_write_buf(struct dt_io_thread * dtio,uint8_t * buf,size_t len)801d7b4a113Ssthen static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
802d7b4a113Ssthen 	size_t len)
803d7b4a113Ssthen {
804d7b4a113Ssthen 	ssize_t ret;
805d7b4a113Ssthen 	if(dtio->fd == -1)
806d7b4a113Ssthen 		return -1;
807d7b4a113Ssthen #ifdef HAVE_SSL
808d7b4a113Ssthen 	if(dtio->ssl)
809d7b4a113Ssthen 		return dtio_write_ssl(dtio, buf, len);
810d7b4a113Ssthen #endif
811d7b4a113Ssthen 	ret = send(dtio->fd, (void*)buf, len, 0);
812d7b4a113Ssthen 	if(ret == -1) {
813d7b4a113Ssthen #ifndef USE_WINSOCK
814d7b4a113Ssthen 		if(errno == EINTR || errno == EAGAIN)
815d7b4a113Ssthen 			return 0;
816d7b4a113Ssthen #else
817d7b4a113Ssthen 		if(WSAGetLastError() == WSAEINPROGRESS)
818d7b4a113Ssthen 			return 0;
819d7b4a113Ssthen 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
820d7b4a113Ssthen 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
821d7b4a113Ssthen 				dtio->stop_flush_event:dtio->event),
822d7b4a113Ssthen 				UB_EV_WRITE);
823d7b4a113Ssthen 			return 0;
824d7b4a113Ssthen 		}
825d7b4a113Ssthen #endif
826e2a0f313Ssthen 		log_err("dnstap io: failed send: %s", sock_strerror(errno));
827d7b4a113Ssthen 		return -1;
828d7b4a113Ssthen 	}
829d7b4a113Ssthen 	return ret;
830d7b4a113Ssthen }
831d7b4a113Ssthen 
832d7b4a113Ssthen #ifdef HAVE_WRITEV
833d7b4a113Ssthen /** write with writev, len and message, in one write, if possible.
834d7b4a113Ssthen  * return true if message is done, false if incomplete */
dtio_write_with_writev(struct dt_io_thread * dtio)835d7b4a113Ssthen static int dtio_write_with_writev(struct dt_io_thread* dtio)
836d7b4a113Ssthen {
837d7b4a113Ssthen 	uint32_t sendlen = htonl(dtio->cur_msg_len);
838d7b4a113Ssthen 	struct iovec iov[2];
839d7b4a113Ssthen 	ssize_t r;
840d7b4a113Ssthen 	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
841d7b4a113Ssthen 	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
842d7b4a113Ssthen 	iov[1].iov_base = dtio->cur_msg;
843d7b4a113Ssthen 	iov[1].iov_len = dtio->cur_msg_len;
844d7b4a113Ssthen 	log_assert(iov[0].iov_len > 0);
845d7b4a113Ssthen 	r = writev(dtio->fd, iov, 2);
846d7b4a113Ssthen 	if(r == -1) {
847d7b4a113Ssthen #ifndef USE_WINSOCK
848d7b4a113Ssthen 		if(errno == EINTR || errno == EAGAIN)
849d7b4a113Ssthen 			return 0;
850d7b4a113Ssthen #else
851d7b4a113Ssthen 		if(WSAGetLastError() == WSAEINPROGRESS)
852d7b4a113Ssthen 			return 0;
853d7b4a113Ssthen 		if(WSAGetLastError() == WSAEWOULDBLOCK) {
854d7b4a113Ssthen 			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
855d7b4a113Ssthen 				dtio->stop_flush_event:dtio->event),
856d7b4a113Ssthen 				UB_EV_WRITE);
857d7b4a113Ssthen 			return 0;
858d7b4a113Ssthen 		}
859d7b4a113Ssthen #endif
860e2a0f313Ssthen 		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
861d7b4a113Ssthen 		/* close the channel */
862d7b4a113Ssthen 		dtio_del_output_event(dtio);
863d7b4a113Ssthen 		dtio_close_output(dtio);
864d7b4a113Ssthen 		return 0;
865d7b4a113Ssthen 	}
866d7b4a113Ssthen 	/* written r bytes */
867d7b4a113Ssthen 	dtio->cur_msg_len_done += r;
868d7b4a113Ssthen 	if(dtio->cur_msg_len_done < 4)
869d7b4a113Ssthen 		return 0;
870d7b4a113Ssthen 	if(dtio->cur_msg_len_done > 4) {
871d7b4a113Ssthen 		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
872d7b4a113Ssthen 		dtio->cur_msg_len_done = 4;
873d7b4a113Ssthen 	}
874d7b4a113Ssthen 	if(dtio->cur_msg_done < dtio->cur_msg_len)
875d7b4a113Ssthen 		return 0;
876d7b4a113Ssthen 	return 1;
877d7b4a113Ssthen }
878d7b4a113Ssthen #endif /* HAVE_WRITEV */
879d7b4a113Ssthen 
880d7b4a113Ssthen /** write more of the length, preceding the data frame.
881d7b4a113Ssthen  * return true if message is done, false if incomplete. */
dtio_write_more_of_len(struct dt_io_thread * dtio)882d7b4a113Ssthen static int dtio_write_more_of_len(struct dt_io_thread* dtio)
883d7b4a113Ssthen {
884d7b4a113Ssthen 	uint32_t sendlen;
885d7b4a113Ssthen 	int r;
886d7b4a113Ssthen 	if(dtio->cur_msg_len_done >= 4)
887d7b4a113Ssthen 		return 1;
888d7b4a113Ssthen #ifdef HAVE_WRITEV
889d7b4a113Ssthen 	if(!dtio->ssl) {
890d7b4a113Ssthen 		/* we try writev for everything.*/
891d7b4a113Ssthen 		return dtio_write_with_writev(dtio);
892d7b4a113Ssthen 	}
893d7b4a113Ssthen #endif /* HAVE_WRITEV */
894d7b4a113Ssthen 	sendlen = htonl(dtio->cur_msg_len);
895d7b4a113Ssthen 	r = dtio_write_buf(dtio,
896d7b4a113Ssthen 		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
897d7b4a113Ssthen 		sizeof(sendlen)-dtio->cur_msg_len_done);
898d7b4a113Ssthen 	if(r == -1) {
899d7b4a113Ssthen 		/* close the channel */
900d7b4a113Ssthen 		dtio_del_output_event(dtio);
901d7b4a113Ssthen 		dtio_close_output(dtio);
902d7b4a113Ssthen 		return 0;
903d7b4a113Ssthen 	} else if(r == 0) {
904d7b4a113Ssthen 		/* try again later */
905d7b4a113Ssthen 		return 0;
906d7b4a113Ssthen 	}
907d7b4a113Ssthen 	dtio->cur_msg_len_done += r;
908d7b4a113Ssthen 	if(dtio->cur_msg_len_done < 4)
909d7b4a113Ssthen 		return 0;
910d7b4a113Ssthen 	return 1;
911d7b4a113Ssthen }
912d7b4a113Ssthen 
913d7b4a113Ssthen /** write more of the data frame.
914d7b4a113Ssthen  * return true if message is done, false if incomplete. */
dtio_write_more_of_data(struct dt_io_thread * dtio)915d7b4a113Ssthen static int dtio_write_more_of_data(struct dt_io_thread* dtio)
916d7b4a113Ssthen {
917d7b4a113Ssthen 	int r;
918d7b4a113Ssthen 	if(dtio->cur_msg_done >= dtio->cur_msg_len)
919d7b4a113Ssthen 		return 1;
920d7b4a113Ssthen 	r = dtio_write_buf(dtio,
921d7b4a113Ssthen 		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
922d7b4a113Ssthen 		dtio->cur_msg_len - dtio->cur_msg_done);
923d7b4a113Ssthen 	if(r == -1) {
924d7b4a113Ssthen 		/* close the channel */
925d7b4a113Ssthen 		dtio_del_output_event(dtio);
926d7b4a113Ssthen 		dtio_close_output(dtio);
927d7b4a113Ssthen 		return 0;
928d7b4a113Ssthen 	} else if(r == 0) {
929d7b4a113Ssthen 		/* try again later */
930d7b4a113Ssthen 		return 0;
931d7b4a113Ssthen 	}
932d7b4a113Ssthen 	dtio->cur_msg_done += r;
933d7b4a113Ssthen 	if(dtio->cur_msg_done < dtio->cur_msg_len)
934d7b4a113Ssthen 		return 0;
935d7b4a113Ssthen 	return 1;
936d7b4a113Ssthen }
937d7b4a113Ssthen 
93883152a15Ssthen /** write more of the current message. false if incomplete, true if
939d7b4a113Ssthen  * the message is done */
dtio_write_more(struct dt_io_thread * dtio)940d7b4a113Ssthen static int dtio_write_more(struct dt_io_thread* dtio)
941d7b4a113Ssthen {
942d7b4a113Ssthen 	if(dtio->cur_msg_len_done < 4) {
943d7b4a113Ssthen 		if(!dtio_write_more_of_len(dtio))
944d7b4a113Ssthen 			return 0;
945d7b4a113Ssthen 	}
946d7b4a113Ssthen 	if(dtio->cur_msg_done < dtio->cur_msg_len) {
947d7b4a113Ssthen 		if(!dtio_write_more_of_data(dtio))
948d7b4a113Ssthen 			return 0;
949d7b4a113Ssthen 	}
950d7b4a113Ssthen 	return 1;
951d7b4a113Ssthen }
952d7b4a113Ssthen 
953d7b4a113Ssthen /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
954d7b4a113Ssthen  * -1: continue, >0: number of bytes read into buffer */
receive_bytes(struct dt_io_thread * dtio,void * buf,size_t len)955d7b4a113Ssthen static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
956d7b4a113Ssthen 	ssize_t r;
9570e9b6f9fSsthen 	r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
958d7b4a113Ssthen 	if(r == -1) {
959d7b4a113Ssthen 		char* to = dtio->socket_path;
960d7b4a113Ssthen 		if(!to) to = dtio->ip_str;
961d7b4a113Ssthen 		if(!to) to = "";
962d7b4a113Ssthen #ifndef USE_WINSOCK
963d7b4a113Ssthen 		if(errno == EINTR || errno == EAGAIN)
964d7b4a113Ssthen 			return -1; /* try later */
965d7b4a113Ssthen #else
966d7b4a113Ssthen 		if(WSAGetLastError() == WSAEINPROGRESS) {
967d7b4a113Ssthen 			return -1; /* try later */
968d7b4a113Ssthen 		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
969d7b4a113Ssthen 			ub_winsock_tcp_wouldblock(
970d7b4a113Ssthen 				(dtio->stop_flush_event?
971d7b4a113Ssthen 				dtio->stop_flush_event:dtio->event),
972d7b4a113Ssthen 				UB_EV_READ);
973d7b4a113Ssthen 			return -1; /* try later */
974d7b4a113Ssthen 		}
975d7b4a113Ssthen #endif
976d7b4a113Ssthen 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977d7b4a113Ssthen 			verbosity < 4)
978d7b4a113Ssthen 			return 0; /* no log retries on low verbosity */
979d7b4a113Ssthen 		log_err("dnstap io: output closed, recv %s: %s", to,
980d7b4a113Ssthen 			strerror(errno));
981d7b4a113Ssthen 		/* and close below */
982d7b4a113Ssthen 		return 0;
983d7b4a113Ssthen 	}
984d7b4a113Ssthen 	if(r == 0) {
985d7b4a113Ssthen 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986d7b4a113Ssthen 			verbosity < 4)
987d7b4a113Ssthen 			return 0; /* no log retries on low verbosity */
988d7b4a113Ssthen 		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
989d7b4a113Ssthen 		/* and close below */
990d7b4a113Ssthen 		return 0;
991d7b4a113Ssthen 	}
992d7b4a113Ssthen 	/* something was received */
993d7b4a113Ssthen 	return r;
994d7b4a113Ssthen }
995d7b4a113Ssthen 
996d7b4a113Ssthen #ifdef HAVE_SSL
997d7b4a113Ssthen /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
998d7b4a113Ssthen  * -1: continue, >0: number of bytes read into buffer */
ssl_read_bytes(struct dt_io_thread * dtio,void * buf,size_t len)999d7b4a113Ssthen static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1000d7b4a113Ssthen {
1001d7b4a113Ssthen 	int r;
1002d7b4a113Ssthen 	ERR_clear_error();
1003d7b4a113Ssthen 	r = SSL_read(dtio->ssl, buf, len);
1004d7b4a113Ssthen 	if(r <= 0) {
1005d7b4a113Ssthen 		int want = SSL_get_error(dtio->ssl, r);
1006d7b4a113Ssthen 		if(want == SSL_ERROR_ZERO_RETURN) {
1007d7b4a113Ssthen 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1008d7b4a113Ssthen 				verbosity < 4)
1009d7b4a113Ssthen 				return 0; /* no log retries on low verbosity */
1010d7b4a113Ssthen 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1011d7b4a113Ssthen 				"other side");
1012d7b4a113Ssthen 			return 0;
1013d7b4a113Ssthen 		} else if(want == SSL_ERROR_WANT_READ) {
1014d7b4a113Ssthen 			/* continue later */
1015d7b4a113Ssthen 			return -1;
1016d7b4a113Ssthen 		} else if(want == SSL_ERROR_WANT_WRITE) {
1017d7b4a113Ssthen 			(void)dtio_enable_brief_write(dtio);
1018d7b4a113Ssthen 			return -1;
1019d7b4a113Ssthen 		} else if(want == SSL_ERROR_SYSCALL) {
1020d7b4a113Ssthen #ifdef ECONNRESET
1021d7b4a113Ssthen 			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1022d7b4a113Ssthen 				errno == ECONNRESET && verbosity < 4)
1023d7b4a113Ssthen 				return 0; /* silence reset by peer */
1024d7b4a113Ssthen #endif
1025d7b4a113Ssthen 			if(errno != 0)
1026d7b4a113Ssthen 				log_err("SSL_read syscall: %s",
1027d7b4a113Ssthen 					strerror(errno));
1028d7b4a113Ssthen 			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029d7b4a113Ssthen 				"other side");
1030d7b4a113Ssthen 			return 0;
1031d7b4a113Ssthen 		}
1032*9c7f0a49Ssthen 		log_crypto_err_io("could not SSL_read", want);
1033d7b4a113Ssthen 		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1034d7b4a113Ssthen 				"other side");
1035d7b4a113Ssthen 		return 0;
1036d7b4a113Ssthen 	}
1037d7b4a113Ssthen 	return r;
1038d7b4a113Ssthen }
1039d7b4a113Ssthen #endif /* HAVE_SSL */
1040d7b4a113Ssthen 
1041d7b4a113Ssthen /** check if the output fd has been closed,
1042d7b4a113Ssthen  * it returns false if the stream is closed. */
dtio_check_close(struct dt_io_thread * dtio)1043d7b4a113Ssthen static int dtio_check_close(struct dt_io_thread* dtio)
1044d7b4a113Ssthen {
1045d7b4a113Ssthen 	/* we don't want to read any packets, but if there are we can
1046d7b4a113Ssthen 	 * discard the input (ignore it).  Ignore of unknown (control)
1047d7b4a113Ssthen 	 * packets is okay for the framestream protocol.  And also, the
1048d7b4a113Ssthen 	 * read call can return that the stream has been closed by the
1049d7b4a113Ssthen 	 * other side. */
1050d7b4a113Ssthen 	uint8_t buf[1024];
1051d7b4a113Ssthen 	int r = -1;
1052d7b4a113Ssthen 
1053d7b4a113Ssthen 
1054d7b4a113Ssthen 	if(dtio->fd == -1) return 0;
1055d7b4a113Ssthen 
1056d7b4a113Ssthen 	while(r != 0) {
1057d7b4a113Ssthen 		/* not interested in buffer content, overwrite */
1058d7b4a113Ssthen 		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1059d7b4a113Ssthen 		if(r == -1)
1060d7b4a113Ssthen 			return 1;
1061d7b4a113Ssthen 	}
1062d7b4a113Ssthen 	/* the other end has been closed */
1063d7b4a113Ssthen 	/* close the channel */
1064d7b4a113Ssthen 	dtio_del_output_event(dtio);
1065d7b4a113Ssthen 	dtio_close_output(dtio);
1066d7b4a113Ssthen 	return 0;
1067d7b4a113Ssthen }
1068d7b4a113Ssthen 
1069d7b4a113Ssthen /** Read accept frame. Returns -1: continue reading, 0: closed,
1070d7b4a113Ssthen  * 1: valid accept received. */
dtio_read_accept_frame(struct dt_io_thread * dtio)1071d7b4a113Ssthen static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1072d7b4a113Ssthen {
1073d7b4a113Ssthen 	int r;
1074d7b4a113Ssthen 	size_t read_frame_done;
1075d7b4a113Ssthen 	while(dtio->read_frame.frame_len_done < 4) {
1076d7b4a113Ssthen #ifdef HAVE_SSL
1077d7b4a113Ssthen 		if(dtio->ssl) {
1078d7b4a113Ssthen 			r = ssl_read_bytes(dtio,
1079d7b4a113Ssthen 				(uint8_t*)&dtio->read_frame.frame_len+
1080d7b4a113Ssthen 				dtio->read_frame.frame_len_done,
1081d7b4a113Ssthen 				4-dtio->read_frame.frame_len_done);
1082d7b4a113Ssthen 		} else {
1083d7b4a113Ssthen #endif
1084d7b4a113Ssthen 			r = receive_bytes(dtio,
1085d7b4a113Ssthen 				(uint8_t*)&dtio->read_frame.frame_len+
1086d7b4a113Ssthen 				dtio->read_frame.frame_len_done,
1087d7b4a113Ssthen 				4-dtio->read_frame.frame_len_done);
1088d7b4a113Ssthen #ifdef HAVE_SSL
1089d7b4a113Ssthen 		}
1090d7b4a113Ssthen #endif
1091d7b4a113Ssthen 		if(r == -1)
1092d7b4a113Ssthen 			return -1; /* continue reading */
1093d7b4a113Ssthen 		if(r == 0) {
1094d7b4a113Ssthen 			 /* connection closed */
1095d7b4a113Ssthen 			goto close_connection;
1096d7b4a113Ssthen 		}
1097d7b4a113Ssthen 		dtio->read_frame.frame_len_done += r;
1098d7b4a113Ssthen 		if(dtio->read_frame.frame_len_done < 4)
1099d7b4a113Ssthen 			return -1; /* continue reading */
1100d7b4a113Ssthen 
1101d7b4a113Ssthen 		if(dtio->read_frame.frame_len == 0) {
1102d7b4a113Ssthen 			dtio->read_frame.frame_len_done = 0;
1103d7b4a113Ssthen 			dtio->read_frame.control_frame = 1;
1104d7b4a113Ssthen 			continue;
1105d7b4a113Ssthen 		}
1106d7b4a113Ssthen 		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1107d7b4a113Ssthen 		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108d7b4a113Ssthen 			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1109d7b4a113Ssthen 				"length of %d bytes, closing connection",
1110d7b4a113Ssthen 				DTIO_RECV_FRAME_MAX_LEN);
1111d7b4a113Ssthen 			goto close_connection;
1112d7b4a113Ssthen 		}
1113d7b4a113Ssthen 		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1114d7b4a113Ssthen 		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1115d7b4a113Ssthen 		if(!dtio->read_frame.buf) {
1116d7b4a113Ssthen 			log_err("dnstap io: out of memory (creating read "
1117d7b4a113Ssthen 				"buffer)");
1118d7b4a113Ssthen 			goto close_connection;
1119d7b4a113Ssthen 		}
1120d7b4a113Ssthen 	}
1121d7b4a113Ssthen 	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1122d7b4a113Ssthen #ifdef HAVE_SSL
1123d7b4a113Ssthen 		if(dtio->ssl) {
1124d7b4a113Ssthen 			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1125d7b4a113Ssthen 				dtio->read_frame.buf_count,
1126d7b4a113Ssthen 				dtio->read_frame.buf_cap-
1127d7b4a113Ssthen 				dtio->read_frame.buf_count);
1128d7b4a113Ssthen 		} else {
1129d7b4a113Ssthen #endif
1130d7b4a113Ssthen 			r = receive_bytes(dtio, dtio->read_frame.buf+
1131d7b4a113Ssthen 				dtio->read_frame.buf_count,
1132d7b4a113Ssthen 				dtio->read_frame.buf_cap-
1133d7b4a113Ssthen 				dtio->read_frame.buf_count);
1134d7b4a113Ssthen #ifdef HAVE_SSL
1135d7b4a113Ssthen 		}
1136d7b4a113Ssthen #endif
1137d7b4a113Ssthen 		if(r == -1)
1138d7b4a113Ssthen 			return -1; /* continue reading */
1139d7b4a113Ssthen 		if(r == 0) {
1140d7b4a113Ssthen 			 /* connection closed */
1141d7b4a113Ssthen 			goto close_connection;
1142d7b4a113Ssthen 		}
1143d7b4a113Ssthen 		dtio->read_frame.buf_count += r;
1144d7b4a113Ssthen 		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145d7b4a113Ssthen 			return -1; /* continue reading */
1146d7b4a113Ssthen 	}
1147d7b4a113Ssthen 
1148d7b4a113Ssthen 	/* Complete frame received, check if this is a valid ACCEPT control
1149d7b4a113Ssthen 	 * frame. */
1150d7b4a113Ssthen 	if(dtio->read_frame.frame_len < 4) {
1151d7b4a113Ssthen 		verbose(VERB_OPS, "dnstap: invalid data received");
1152d7b4a113Ssthen 		goto close_connection;
1153d7b4a113Ssthen 	}
1154d7b4a113Ssthen 	if(sldns_read_uint32(dtio->read_frame.buf) !=
1155d7b4a113Ssthen 		FSTRM_CONTROL_FRAME_ACCEPT) {
1156d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1157d7b4a113Ssthen 			"ignored");
1158d7b4a113Ssthen 		dtio->ready_frame_sent = 0;
1159d7b4a113Ssthen 		dtio->accept_frame_received = 0;
1160d7b4a113Ssthen 		dtio_read_frame_free(&dtio->read_frame);
1161d7b4a113Ssthen 		return -1;
1162d7b4a113Ssthen 	}
1163d7b4a113Ssthen 	read_frame_done = 4; /* control frame type */
1164d7b4a113Ssthen 
1165d7b4a113Ssthen 	/* Iterate over control fields, ignore unknown types.
1166d7b4a113Ssthen 	 * Need to be able to read at least 8 bytes (control field type +
1167d7b4a113Ssthen 	 * length). */
1168d7b4a113Ssthen 	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1169d7b4a113Ssthen 		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1170d7b4a113Ssthen 			read_frame_done);
1171d7b4a113Ssthen 		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1172d7b4a113Ssthen 			read_frame_done + 4);
1173d7b4a113Ssthen 		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1174d7b4a113Ssthen 			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1175d7b4a113Ssthen 				read_frame_done+8+len <=
1176d7b4a113Ssthen 				dtio->read_frame.frame_len &&
1177d7b4a113Ssthen 				memcmp(dtio->read_frame.buf + read_frame_done +
1178d7b4a113Ssthen 					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1179d7b4a113Ssthen 				if(!dtio_control_start_send(dtio)) {
1180d7b4a113Ssthen 					verbose(VERB_OPS, "dnstap io: out of "
1181d7b4a113Ssthen 					 "memory while sending START frame");
1182d7b4a113Ssthen 					goto close_connection;
1183d7b4a113Ssthen 				}
1184d7b4a113Ssthen 				dtio->accept_frame_received = 1;
1185e2a0f313Ssthen 				if(!dtio_add_output_event_write(dtio))
1186e2a0f313Ssthen 					goto close_connection;
1187d7b4a113Ssthen 				return 1;
1188d7b4a113Ssthen 			} else {
118983152a15Ssthen 				/* unknown content type */
1190d7b4a113Ssthen 				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1191d7b4a113Ssthen 					"contains unknown content type, "
1192d7b4a113Ssthen 					"closing connection");
1193d7b4a113Ssthen 				goto close_connection;
1194d7b4a113Ssthen 			}
1195d7b4a113Ssthen 		}
1196d7b4a113Ssthen 		/* unknown option, try next */
1197d7b4a113Ssthen 		read_frame_done += 8+len;
1198d7b4a113Ssthen 	}
1199d7b4a113Ssthen 
1200d7b4a113Ssthen 
1201d7b4a113Ssthen close_connection:
1202d7b4a113Ssthen 	dtio_del_output_event(dtio);
1203d7b4a113Ssthen 	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1204d7b4a113Ssthen 	dtio_close_output(dtio);
1205d7b4a113Ssthen 	return 0;
1206d7b4a113Ssthen }
1207d7b4a113Ssthen 
1208d7b4a113Ssthen /** add the output file descriptor event for listening, read only */
dtio_add_output_event_read(struct dt_io_thread * dtio)1209d7b4a113Ssthen static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1210d7b4a113Ssthen {
1211d7b4a113Ssthen 	if(!dtio->event)
1212d7b4a113Ssthen 		return 0;
1213d7b4a113Ssthen 	if(dtio->event_added && !dtio->event_added_is_write)
1214d7b4a113Ssthen 		return 1;
1215d7b4a113Ssthen 	/* we have to (re-)register the event */
1216d7b4a113Ssthen 	if(dtio->event_added)
1217d7b4a113Ssthen 		ub_event_del(dtio->event);
1218d7b4a113Ssthen 	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1219d7b4a113Ssthen 	if(ub_event_add(dtio->event, NULL) != 0) {
1220d7b4a113Ssthen 		log_err("dnstap io: out of memory (adding event)");
1221d7b4a113Ssthen 		dtio->event_added = 0;
1222d7b4a113Ssthen 		dtio->event_added_is_write = 0;
1223d7b4a113Ssthen 		/* close output and start reattempts to open it */
1224d7b4a113Ssthen 		dtio_close_output(dtio);
1225d7b4a113Ssthen 		return 0;
1226d7b4a113Ssthen 	}
1227d7b4a113Ssthen 	dtio->event_added = 1;
1228d7b4a113Ssthen 	dtio->event_added_is_write = 0;
1229d7b4a113Ssthen 	return 1;
1230d7b4a113Ssthen }
1231d7b4a113Ssthen 
1232d7b4a113Ssthen /** add the output file descriptor event for listening, read and write */
dtio_add_output_event_write(struct dt_io_thread * dtio)1233d7b4a113Ssthen static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1234d7b4a113Ssthen {
1235d7b4a113Ssthen 	if(!dtio->event)
1236d7b4a113Ssthen 		return 0;
1237d7b4a113Ssthen 	if(dtio->event_added && dtio->event_added_is_write)
1238d7b4a113Ssthen 		return 1;
1239d7b4a113Ssthen 	/* we have to (re-)register the event */
1240d7b4a113Ssthen 	if(dtio->event_added)
1241d7b4a113Ssthen 		ub_event_del(dtio->event);
1242d7b4a113Ssthen 	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1243d7b4a113Ssthen 	if(ub_event_add(dtio->event, NULL) != 0) {
1244d7b4a113Ssthen 		log_err("dnstap io: out of memory (adding event)");
1245d7b4a113Ssthen 		dtio->event_added = 0;
1246d7b4a113Ssthen 		dtio->event_added_is_write = 0;
1247d7b4a113Ssthen 		/* close output and start reattempts to open it */
1248d7b4a113Ssthen 		dtio_close_output(dtio);
1249d7b4a113Ssthen 		return 0;
1250d7b4a113Ssthen 	}
1251d7b4a113Ssthen 	dtio->event_added = 1;
1252d7b4a113Ssthen 	dtio->event_added_is_write = 1;
1253d7b4a113Ssthen 	return 1;
1254d7b4a113Ssthen }
1255d7b4a113Ssthen 
1256d7b4a113Ssthen /** put the dtio thread to sleep */
dtio_sleep(struct dt_io_thread * dtio)1257d7b4a113Ssthen static void dtio_sleep(struct dt_io_thread* dtio)
1258d7b4a113Ssthen {
1259d7b4a113Ssthen 	/* unregister the event polling for write, because there is
1260d7b4a113Ssthen 	 * nothing to be written */
1261d7b4a113Ssthen 	(void)dtio_add_output_event_read(dtio);
1262d7b4a113Ssthen }
1263d7b4a113Ssthen 
1264d7b4a113Ssthen #ifdef HAVE_SSL
1265d7b4a113Ssthen /** enable the brief read condition */
dtio_enable_brief_read(struct dt_io_thread * dtio)1266d7b4a113Ssthen static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1267d7b4a113Ssthen {
1268d7b4a113Ssthen 	dtio->ssl_brief_read = 1;
1269d7b4a113Ssthen 	if(dtio->stop_flush_event) {
1270d7b4a113Ssthen 		ub_event_del(dtio->stop_flush_event);
1271d7b4a113Ssthen 		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1272d7b4a113Ssthen 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273d7b4a113Ssthen 			log_err("dnstap io, stop flush, could not ub_event_add");
1274d7b4a113Ssthen 			return 0;
1275d7b4a113Ssthen 		}
1276d7b4a113Ssthen 		return 1;
1277d7b4a113Ssthen 	}
1278d7b4a113Ssthen 	return dtio_add_output_event_read(dtio);
1279d7b4a113Ssthen }
1280d7b4a113Ssthen #endif /* HAVE_SSL */
1281d7b4a113Ssthen 
1282d7b4a113Ssthen #ifdef HAVE_SSL
1283d7b4a113Ssthen /** disable the brief read condition */
dtio_disable_brief_read(struct dt_io_thread * dtio)1284d7b4a113Ssthen static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1285d7b4a113Ssthen {
1286d7b4a113Ssthen 	dtio->ssl_brief_read = 0;
1287d7b4a113Ssthen 	if(dtio->stop_flush_event) {
1288d7b4a113Ssthen 		ub_event_del(dtio->stop_flush_event);
1289d7b4a113Ssthen 		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1290d7b4a113Ssthen 		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291d7b4a113Ssthen 			log_err("dnstap io, stop flush, could not ub_event_add");
1292d7b4a113Ssthen 			return 0;
1293d7b4a113Ssthen 		}
1294d7b4a113Ssthen 		return 1;
1295d7b4a113Ssthen 	}
1296d7b4a113Ssthen 	return dtio_add_output_event_write(dtio);
1297d7b4a113Ssthen }
1298d7b4a113Ssthen #endif /* HAVE_SSL */
1299d7b4a113Ssthen 
1300d7b4a113Ssthen #ifdef HAVE_SSL
1301d7b4a113Ssthen /** enable the brief write condition */
dtio_enable_brief_write(struct dt_io_thread * dtio)1302d7b4a113Ssthen static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1303d7b4a113Ssthen {
1304d7b4a113Ssthen 	dtio->ssl_brief_write = 1;
1305d7b4a113Ssthen 	return dtio_add_output_event_write(dtio);
1306d7b4a113Ssthen }
1307d7b4a113Ssthen #endif /* HAVE_SSL */
1308d7b4a113Ssthen 
1309d7b4a113Ssthen #ifdef HAVE_SSL
1310d7b4a113Ssthen /** disable the brief write condition */
dtio_disable_brief_write(struct dt_io_thread * dtio)1311d7b4a113Ssthen static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1312d7b4a113Ssthen {
1313d7b4a113Ssthen 	dtio->ssl_brief_write = 0;
1314d7b4a113Ssthen 	return dtio_add_output_event_read(dtio);
1315d7b4a113Ssthen }
1316d7b4a113Ssthen #endif /* HAVE_SSL */
1317d7b4a113Ssthen 
1318d7b4a113Ssthen #ifdef HAVE_SSL
1319d7b4a113Ssthen /** check peer verification after ssl handshake connection, false if closed*/
dtio_ssl_check_peer(struct dt_io_thread * dtio)1320d7b4a113Ssthen static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1321d7b4a113Ssthen {
1322d7b4a113Ssthen 	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1323d7b4a113Ssthen 		/* verification */
1324d7b4a113Ssthen 		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1325d7b4a113Ssthen 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1326d7b4a113Ssthen 			if(!x) {
1327d7b4a113Ssthen 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328d7b4a113Ssthen 					"connection failed no certificate",
1329d7b4a113Ssthen 					dtio->ip_str);
1330d7b4a113Ssthen 				return 0;
1331d7b4a113Ssthen 			}
1332d7b4a113Ssthen 			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1333d7b4a113Ssthen 				x);
1334d7b4a113Ssthen #ifdef HAVE_SSL_GET0_PEERNAME
1335d7b4a113Ssthen 			if(SSL_get0_peername(dtio->ssl)) {
1336d7b4a113Ssthen 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1337d7b4a113Ssthen 					"connection to %s authenticated",
1338d7b4a113Ssthen 					dtio->ip_str,
1339d7b4a113Ssthen 					SSL_get0_peername(dtio->ssl));
1340d7b4a113Ssthen 			} else {
1341d7b4a113Ssthen #endif
1342d7b4a113Ssthen 				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1343d7b4a113Ssthen 					"connection authenticated",
1344d7b4a113Ssthen 					dtio->ip_str);
1345d7b4a113Ssthen #ifdef HAVE_SSL_GET0_PEERNAME
1346d7b4a113Ssthen 			}
1347d7b4a113Ssthen #endif
1348d7b4a113Ssthen 			X509_free(x);
1349d7b4a113Ssthen 		} else {
1350d7b4a113Ssthen 			X509* x = SSL_get_peer_certificate(dtio->ssl);
1351d7b4a113Ssthen 			if(x) {
1352d7b4a113Ssthen 				log_cert(VERB_ALGO, "dnstap io, peer "
1353d7b4a113Ssthen 					"certificate", x);
1354d7b4a113Ssthen 				X509_free(x);
1355d7b4a113Ssthen 			}
1356d7b4a113Ssthen 			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1357d7b4a113Ssthen 				"failed: failed to authenticate",
1358d7b4a113Ssthen 				dtio->ip_str);
1359d7b4a113Ssthen 			return 0;
1360d7b4a113Ssthen 		}
1361d7b4a113Ssthen 	} else {
1362d7b4a113Ssthen 		/* unauthenticated, the verify peer flag was not set
1363d7b4a113Ssthen 		 * in ssl when the ssl object was created from ssl_ctx */
1364d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1365d7b4a113Ssthen 			dtio->ip_str);
1366d7b4a113Ssthen 	}
1367d7b4a113Ssthen 	return 1;
1368d7b4a113Ssthen }
1369d7b4a113Ssthen #endif /* HAVE_SSL */
1370d7b4a113Ssthen 
1371d7b4a113Ssthen #ifdef HAVE_SSL
1372d7b4a113Ssthen /** perform ssl handshake, returns 1 if okay, 0 to stop */
dtio_ssl_handshake(struct dt_io_thread * dtio,struct stop_flush_info * info)1373d7b4a113Ssthen static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1374d7b4a113Ssthen 	struct stop_flush_info* info)
1375d7b4a113Ssthen {
1376d7b4a113Ssthen 	int r;
1377d7b4a113Ssthen 	if(dtio->ssl_brief_read) {
1378d7b4a113Ssthen 		/* assume the brief read condition is satisfied,
1379d7b4a113Ssthen 		 * if we need more or again, we can set it again */
1380d7b4a113Ssthen 		if(!dtio_disable_brief_read(dtio)) {
1381d7b4a113Ssthen 			if(info) dtio_stop_flush_exit(info);
1382d7b4a113Ssthen 			return 0;
1383d7b4a113Ssthen 		}
1384d7b4a113Ssthen 	}
1385d7b4a113Ssthen 	if(dtio->ssl_handshake_done)
1386d7b4a113Ssthen 		return 1;
1387d7b4a113Ssthen 
1388d7b4a113Ssthen 	ERR_clear_error();
1389d7b4a113Ssthen 	r = SSL_do_handshake(dtio->ssl);
1390d7b4a113Ssthen 	if(r != 1) {
1391d7b4a113Ssthen 		int want = SSL_get_error(dtio->ssl, r);
1392d7b4a113Ssthen 		if(want == SSL_ERROR_WANT_READ) {
1393d7b4a113Ssthen 			/* we want to read on the connection */
1394d7b4a113Ssthen 			if(!dtio_enable_brief_read(dtio)) {
1395d7b4a113Ssthen 				if(info) dtio_stop_flush_exit(info);
1396d7b4a113Ssthen 				return 0;
1397d7b4a113Ssthen 			}
1398d7b4a113Ssthen 			return 0;
1399d7b4a113Ssthen 		} else if(want == SSL_ERROR_WANT_WRITE) {
1400d7b4a113Ssthen 			/* we want to write on the connection */
1401d7b4a113Ssthen 			return 0;
1402d7b4a113Ssthen 		} else if(r == 0) {
1403d7b4a113Ssthen 			/* closed */
1404d7b4a113Ssthen 			if(info) dtio_stop_flush_exit(info);
1405d7b4a113Ssthen 			dtio_del_output_event(dtio);
1406d7b4a113Ssthen 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1407d7b4a113Ssthen 			dtio_close_output(dtio);
1408d7b4a113Ssthen 			return 0;
1409d7b4a113Ssthen 		} else if(want == SSL_ERROR_SYSCALL) {
1410d7b4a113Ssthen 			/* SYSCALL and errno==0 means closed uncleanly */
1411d7b4a113Ssthen 			int silent = 0;
1412d7b4a113Ssthen #ifdef EPIPE
1413d7b4a113Ssthen 			if(errno == EPIPE && verbosity < 2)
1414d7b4a113Ssthen 				silent = 1; /* silence 'broken pipe' */
1415d7b4a113Ssthen #endif
1416d7b4a113Ssthen #ifdef ECONNRESET
1417d7b4a113Ssthen 			if(errno == ECONNRESET && verbosity < 2)
1418d7b4a113Ssthen 				silent = 1; /* silence reset by peer */
1419d7b4a113Ssthen #endif
1420d7b4a113Ssthen 			if(errno == 0)
1421d7b4a113Ssthen 				silent = 1;
1422d7b4a113Ssthen 			if(!silent)
1423d7b4a113Ssthen 				log_err("dnstap io, SSL_handshake syscall: %s",
1424d7b4a113Ssthen 					strerror(errno));
1425d7b4a113Ssthen 			/* closed */
1426d7b4a113Ssthen 			if(info) dtio_stop_flush_exit(info);
1427d7b4a113Ssthen 			dtio_del_output_event(dtio);
1428d7b4a113Ssthen 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1429d7b4a113Ssthen 			dtio_close_output(dtio);
1430d7b4a113Ssthen 			return 0;
1431d7b4a113Ssthen 		} else {
1432d7b4a113Ssthen 			unsigned long err = ERR_get_error();
1433d7b4a113Ssthen 			if(!squelch_err_ssl_handshake(err)) {
1434*9c7f0a49Ssthen 				log_crypto_err_io_code("dnstap io, ssl handshake failed",
1435*9c7f0a49Ssthen 					want, err);
1436d7b4a113Ssthen 				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1437d7b4a113Ssthen 					"from %s", dtio->ip_str);
1438d7b4a113Ssthen 			}
1439d7b4a113Ssthen 			/* closed */
1440d7b4a113Ssthen 			if(info) dtio_stop_flush_exit(info);
1441d7b4a113Ssthen 			dtio_del_output_event(dtio);
1442d7b4a113Ssthen 			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1443d7b4a113Ssthen 			dtio_close_output(dtio);
1444d7b4a113Ssthen 			return 0;
1445d7b4a113Ssthen 		}
1446d7b4a113Ssthen 
1447d7b4a113Ssthen 	}
1448d7b4a113Ssthen 	/* check peer verification */
1449d7b4a113Ssthen 	dtio->ssl_handshake_done = 1;
1450d7b4a113Ssthen 
1451d7b4a113Ssthen 	if(!dtio_ssl_check_peer(dtio)) {
1452d7b4a113Ssthen 		/* closed */
1453d7b4a113Ssthen 		if(info) dtio_stop_flush_exit(info);
1454d7b4a113Ssthen 		dtio_del_output_event(dtio);
1455d7b4a113Ssthen 		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1456d7b4a113Ssthen 		dtio_close_output(dtio);
1457d7b4a113Ssthen 		return 0;
1458d7b4a113Ssthen 	}
1459d7b4a113Ssthen 	return 1;
1460d7b4a113Ssthen }
1461d7b4a113Ssthen #endif /* HAVE_SSL */
1462d7b4a113Ssthen 
1463d7b4a113Ssthen /** callback for the dnstap events, to write to the output */
dtio_output_cb(int ATTR_UNUSED (fd),short bits,void * arg)1464d7b4a113Ssthen void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1465d7b4a113Ssthen {
1466d7b4a113Ssthen 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1467d7b4a113Ssthen 	int i;
1468d7b4a113Ssthen 
1469d7b4a113Ssthen 	if(dtio->check_nb_connect) {
1470d7b4a113Ssthen 		int connect_err = dtio_check_nb_connect(dtio);
1471d7b4a113Ssthen 		if(connect_err == -1) {
1472d7b4a113Ssthen 			/* close the channel */
1473d7b4a113Ssthen 			dtio_del_output_event(dtio);
1474d7b4a113Ssthen 			dtio_close_output(dtio);
1475d7b4a113Ssthen 			return;
1476d7b4a113Ssthen 		} else if(connect_err == 0) {
1477d7b4a113Ssthen 			/* try again later */
1478d7b4a113Ssthen 			return;
1479d7b4a113Ssthen 		}
1480d7b4a113Ssthen 		/* nonblocking connect check passed, continue */
1481d7b4a113Ssthen 	}
1482d7b4a113Ssthen 
1483d7b4a113Ssthen #ifdef HAVE_SSL
1484d7b4a113Ssthen 	if(dtio->ssl &&
1485d7b4a113Ssthen 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1486d7b4a113Ssthen 		if(!dtio_ssl_handshake(dtio, NULL))
1487d7b4a113Ssthen 			return;
1488d7b4a113Ssthen 	}
1489d7b4a113Ssthen #endif
1490d7b4a113Ssthen 
1491d7b4a113Ssthen 	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1492d7b4a113Ssthen 		if(dtio->ssl_brief_write)
1493d7b4a113Ssthen 			(void)dtio_disable_brief_write(dtio);
1494d7b4a113Ssthen 		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1495d7b4a113Ssthen 			if(dtio_read_accept_frame(dtio) <= 0)
1496d7b4a113Ssthen 				return;
1497d7b4a113Ssthen 		} else if(!dtio_check_close(dtio))
1498d7b4a113Ssthen 			return;
1499d7b4a113Ssthen 	}
1500d7b4a113Ssthen 
1501d7b4a113Ssthen 	/* loop to process a number of messages.  This improves throughput,
1502d7b4a113Ssthen 	 * because selecting on write-event if not needed for busy messages
1503d7b4a113Ssthen 	 * (dnstap log) generation and if they need to all be written back.
1504d7b4a113Ssthen 	 * The write event is usually not blocked up.  But not forever,
1505d7b4a113Ssthen 	 * because the event loop needs to stay responsive for other events.
1506d7b4a113Ssthen 	 * If there are no (more) messages, or if the output buffers get
1507d7b4a113Ssthen 	 * full, it returns out of the loop. */
1508d7b4a113Ssthen 	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1509d7b4a113Ssthen 		/* see if there are messages that need writing */
1510d7b4a113Ssthen 		if(!dtio->cur_msg) {
1511d7b4a113Ssthen 			if(!dtio_find_msg(dtio)) {
1512d7b4a113Ssthen 				if(i == 0) {
1513d7b4a113Ssthen 					/* no messages on the first iteration,
1514d7b4a113Ssthen 					 * the queues are all empty */
1515d7b4a113Ssthen 					dtio_sleep(dtio);
1516d7b4a113Ssthen 				}
1517d7b4a113Ssthen 				return; /* nothing to do */
1518d7b4a113Ssthen 			}
1519d7b4a113Ssthen 		}
1520d7b4a113Ssthen 
1521d7b4a113Ssthen 		/* write it */
1522d7b4a113Ssthen 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1523d7b4a113Ssthen 			if(!dtio_write_more(dtio))
1524d7b4a113Ssthen 				return;
1525d7b4a113Ssthen 		}
1526d7b4a113Ssthen 
1527d7b4a113Ssthen 		/* done with the current message */
1528d7b4a113Ssthen 		dtio_cur_msg_free(dtio);
1529d7b4a113Ssthen 
1530d7b4a113Ssthen 		/* If this is a bidirectional stream the first message will be
1531d7b4a113Ssthen 		 * the READY control frame. We can only continue writing after
1532d7b4a113Ssthen 		 * receiving an ACCEPT control frame. */
1533d7b4a113Ssthen 		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1534d7b4a113Ssthen 			dtio->ready_frame_sent = 1;
1535d7b4a113Ssthen 			(void)dtio_add_output_event_read(dtio);
1536d7b4a113Ssthen 			break;
1537d7b4a113Ssthen 		}
1538d7b4a113Ssthen 	}
1539d7b4a113Ssthen }
1540d7b4a113Ssthen 
1541d7b4a113Ssthen /** callback for the dnstap commandpipe, to stop the dnstap IO */
dtio_cmd_cb(int fd,short ATTR_UNUSED (bits),void * arg)1542d7b4a113Ssthen void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1543d7b4a113Ssthen {
1544d7b4a113Ssthen 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1545d7b4a113Ssthen 	uint8_t cmd;
1546d7b4a113Ssthen 	ssize_t r;
1547d7b4a113Ssthen 	if(dtio->want_to_exit)
1548d7b4a113Ssthen 		return;
1549d7b4a113Ssthen 	r = read(fd, &cmd, sizeof(cmd));
1550d7b4a113Ssthen 	if(r == -1) {
1551d7b4a113Ssthen #ifndef USE_WINSOCK
1552d7b4a113Ssthen 		if(errno == EINTR || errno == EAGAIN)
1553d7b4a113Ssthen 			return; /* ignore this */
1554d7b4a113Ssthen #else
1555d7b4a113Ssthen 		if(WSAGetLastError() == WSAEINPROGRESS)
1556d7b4a113Ssthen 			return;
1557d7b4a113Ssthen 		if(WSAGetLastError() == WSAEWOULDBLOCK)
1558d7b4a113Ssthen 			return;
1559d7b4a113Ssthen #endif
1560e2a0f313Ssthen 		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1561d7b4a113Ssthen 		/* and then fall through to quit the thread */
1562d7b4a113Ssthen 	} else if(r == 0) {
1563d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1564d7b4a113Ssthen 	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1565d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1566d7b4a113Ssthen 	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1567d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1568d7b4a113Ssthen 
1569d7b4a113Ssthen 		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1570d7b4a113Ssthen 			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1571d7b4a113Ssthen 				"waiting for ACCEPT control frame");
1572d7b4a113Ssthen 			return;
1573d7b4a113Ssthen 		}
1574d7b4a113Ssthen 
1575d7b4a113Ssthen 		/* reregister event */
1576d7b4a113Ssthen 		if(!dtio_add_output_event_write(dtio))
1577d7b4a113Ssthen 			return;
1578d7b4a113Ssthen 		return;
1579d7b4a113Ssthen 	} else if(r == 1) {
1580d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1581d7b4a113Ssthen 	}
1582d7b4a113Ssthen 	dtio->want_to_exit = 1;
1583d7b4a113Ssthen 	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1584d7b4a113Ssthen 		!= 0) {
1585d7b4a113Ssthen 		log_err("dnstap io: could not loopexit");
1586d7b4a113Ssthen 	}
1587d7b4a113Ssthen }
1588d7b4a113Ssthen 
1589d7b4a113Ssthen #ifndef THREADS_DISABLED
1590d7b4a113Ssthen /** setup the event base for the dnstap io thread */
dtio_setup_base(struct dt_io_thread * dtio,time_t * secs,struct timeval * now)1591d7b4a113Ssthen static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1592d7b4a113Ssthen 	struct timeval* now)
1593d7b4a113Ssthen {
1594d7b4a113Ssthen 	memset(now, 0, sizeof(*now));
1595d7b4a113Ssthen 	dtio->event_base = ub_default_event_base(0, secs, now);
1596d7b4a113Ssthen 	if(!dtio->event_base) {
1597d7b4a113Ssthen 		fatal_exit("dnstap io: could not create event_base");
1598d7b4a113Ssthen 	}
1599d7b4a113Ssthen }
1600d7b4a113Ssthen #endif /* THREADS_DISABLED */
1601d7b4a113Ssthen 
1602d7b4a113Ssthen /** setup the cmd event for dnstap io */
dtio_setup_cmd(struct dt_io_thread * dtio)1603d7b4a113Ssthen static void dtio_setup_cmd(struct dt_io_thread* dtio)
1604d7b4a113Ssthen {
1605d7b4a113Ssthen 	struct ub_event* cmdev;
1606d7b4a113Ssthen 	fd_set_nonblock(dtio->commandpipe[0]);
1607d7b4a113Ssthen 	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1608d7b4a113Ssthen 		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1609d7b4a113Ssthen 	if(!cmdev) {
1610d7b4a113Ssthen 		fatal_exit("dnstap io: out of memory");
1611d7b4a113Ssthen 	}
1612d7b4a113Ssthen 	dtio->command_event = cmdev;
1613d7b4a113Ssthen 	if(ub_event_add(cmdev, NULL) != 0) {
1614d7b4a113Ssthen 		fatal_exit("dnstap io: out of memory (adding event)");
1615d7b4a113Ssthen 	}
1616d7b4a113Ssthen }
1617d7b4a113Ssthen 
1618d7b4a113Ssthen /** setup the reconnect event for dnstap io */
dtio_setup_reconnect(struct dt_io_thread * dtio)1619d7b4a113Ssthen static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1620d7b4a113Ssthen {
1621d7b4a113Ssthen 	dtio_reconnect_clear(dtio);
1622d7b4a113Ssthen 	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1623d7b4a113Ssthen 		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1624d7b4a113Ssthen 	if(!dtio->reconnect_timer) {
1625d7b4a113Ssthen 		fatal_exit("dnstap io: out of memory");
1626d7b4a113Ssthen 	}
1627d7b4a113Ssthen }
1628d7b4a113Ssthen 
1629d7b4a113Ssthen /**
1630d7b4a113Ssthen  * structure to keep track of information during stop flush
1631d7b4a113Ssthen  */
1632d7b4a113Ssthen struct stop_flush_info {
1633d7b4a113Ssthen 	/** the event base during stop flush */
1634d7b4a113Ssthen 	struct ub_event_base* base;
1635d7b4a113Ssthen 	/** did we already want to exit this stop-flush event base */
1636d7b4a113Ssthen 	int want_to_exit_flush;
1637d7b4a113Ssthen 	/** has the timer fired */
1638d7b4a113Ssthen 	int timer_done;
1639d7b4a113Ssthen 	/** the dtio */
1640d7b4a113Ssthen 	struct dt_io_thread* dtio;
1641d7b4a113Ssthen 	/** the stop control frame */
1642d7b4a113Ssthen 	void* stop_frame;
1643d7b4a113Ssthen 	/** length of the stop frame */
1644d7b4a113Ssthen 	size_t stop_frame_len;
1645d7b4a113Ssthen 	/** how much we have done of the stop frame */
1646d7b4a113Ssthen 	size_t stop_frame_done;
1647d7b4a113Ssthen };
1648d7b4a113Ssthen 
1649d7b4a113Ssthen /** exit the stop flush base */
dtio_stop_flush_exit(struct stop_flush_info * info)1650d7b4a113Ssthen static void dtio_stop_flush_exit(struct stop_flush_info* info)
1651d7b4a113Ssthen {
1652d7b4a113Ssthen 	if(info->want_to_exit_flush)
1653d7b4a113Ssthen 		return;
1654d7b4a113Ssthen 	info->want_to_exit_flush = 1;
1655d7b4a113Ssthen 	if(ub_event_base_loopexit(info->base) != 0) {
1656d7b4a113Ssthen 		log_err("dnstap io: could not loopexit");
1657d7b4a113Ssthen 	}
1658d7b4a113Ssthen }
1659d7b4a113Ssthen 
1660d7b4a113Ssthen /** send the stop control,
1661d7b4a113Ssthen  * return true if completed the frame. */
dtio_control_stop_send(struct stop_flush_info * info)1662d7b4a113Ssthen static int dtio_control_stop_send(struct stop_flush_info* info)
1663d7b4a113Ssthen {
1664d7b4a113Ssthen 	struct dt_io_thread* dtio = info->dtio;
1665d7b4a113Ssthen 	int r;
1666d7b4a113Ssthen 	if(info->stop_frame_done >= info->stop_frame_len)
1667d7b4a113Ssthen 		return 1;
1668d7b4a113Ssthen 	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1669d7b4a113Ssthen 		info->stop_frame_done, info->stop_frame_len -
1670d7b4a113Ssthen 		info->stop_frame_done);
1671d7b4a113Ssthen 	if(r == -1) {
1672d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1673d7b4a113Ssthen 		dtio_stop_flush_exit(info);
1674d7b4a113Ssthen 		return 0;
1675d7b4a113Ssthen 	}
1676d7b4a113Ssthen 	if(r == 0) {
1677d7b4a113Ssthen 		/* try again later, or timeout */
1678d7b4a113Ssthen 		return 0;
1679d7b4a113Ssthen 	}
1680d7b4a113Ssthen 	info->stop_frame_done += r;
1681d7b4a113Ssthen 	if(info->stop_frame_done < info->stop_frame_len)
1682d7b4a113Ssthen 		return 0; /* not done yet */
1683d7b4a113Ssthen 	return 1;
1684d7b4a113Ssthen }
1685d7b4a113Ssthen 
dtio_stop_timer_cb(int ATTR_UNUSED (fd),short ATTR_UNUSED (bits),void * arg)1686d7b4a113Ssthen void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1687d7b4a113Ssthen 	void* arg)
1688d7b4a113Ssthen {
1689d7b4a113Ssthen 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1690d7b4a113Ssthen 	if(info->want_to_exit_flush)
1691d7b4a113Ssthen 		return;
1692d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1693d7b4a113Ssthen 	info->timer_done = 1;
1694d7b4a113Ssthen 	dtio_stop_flush_exit(info);
1695d7b4a113Ssthen }
1696d7b4a113Ssthen 
dtio_stop_ev_cb(int ATTR_UNUSED (fd),short bits,void * arg)1697d7b4a113Ssthen void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1698d7b4a113Ssthen {
1699d7b4a113Ssthen 	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1700d7b4a113Ssthen 	struct dt_io_thread* dtio = info->dtio;
1701d7b4a113Ssthen 	if(info->want_to_exit_flush)
1702d7b4a113Ssthen 		return;
1703d7b4a113Ssthen 	if(dtio->check_nb_connect) {
1704d7b4a113Ssthen 		/* we don't start the stop_flush if connect still
1705d7b4a113Ssthen 		 * in progress, but the check code is here, just in case */
1706d7b4a113Ssthen 		int connect_err = dtio_check_nb_connect(dtio);
1707d7b4a113Ssthen 		if(connect_err == -1) {
1708d7b4a113Ssthen 			/* close the channel, exit the stop flush */
1709d7b4a113Ssthen 			dtio_stop_flush_exit(info);
1710d7b4a113Ssthen 			dtio_del_output_event(dtio);
1711d7b4a113Ssthen 			dtio_close_output(dtio);
1712d7b4a113Ssthen 			return;
1713d7b4a113Ssthen 		} else if(connect_err == 0) {
1714d7b4a113Ssthen 			/* try again later */
1715d7b4a113Ssthen 			return;
1716d7b4a113Ssthen 		}
1717d7b4a113Ssthen 		/* nonblocking connect check passed, continue */
1718d7b4a113Ssthen 	}
1719d7b4a113Ssthen #ifdef HAVE_SSL
1720d7b4a113Ssthen 	if(dtio->ssl &&
1721d7b4a113Ssthen 		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1722d7b4a113Ssthen 		if(!dtio_ssl_handshake(dtio, info))
1723d7b4a113Ssthen 			return;
1724d7b4a113Ssthen 	}
1725d7b4a113Ssthen #endif
1726d7b4a113Ssthen 
1727d7b4a113Ssthen 	if((bits&UB_EV_READ)) {
1728d7b4a113Ssthen 		if(!dtio_check_close(dtio)) {
1729d7b4a113Ssthen 			if(dtio->fd == -1) {
1730d7b4a113Ssthen 				verbose(VERB_ALGO, "dnstap io: "
1731d7b4a113Ssthen 					"stop flush: output closed");
1732d7b4a113Ssthen 				dtio_stop_flush_exit(info);
1733d7b4a113Ssthen 			}
1734d7b4a113Ssthen 			return;
1735d7b4a113Ssthen 		}
1736d7b4a113Ssthen 	}
1737d7b4a113Ssthen 	/* write remainder of last frame */
1738d7b4a113Ssthen 	if(dtio->cur_msg) {
1739d7b4a113Ssthen 		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1740d7b4a113Ssthen 			if(!dtio_write_more(dtio)) {
1741d7b4a113Ssthen 				if(dtio->fd == -1) {
1742d7b4a113Ssthen 					verbose(VERB_ALGO, "dnstap io: "
1743d7b4a113Ssthen 						"stop flush: output closed");
1744d7b4a113Ssthen 					dtio_stop_flush_exit(info);
1745d7b4a113Ssthen 				}
1746d7b4a113Ssthen 				return;
1747d7b4a113Ssthen 			}
1748d7b4a113Ssthen 		}
1749d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1750d7b4a113Ssthen 			"last frame");
1751d7b4a113Ssthen 		dtio_cur_msg_free(dtio);
1752d7b4a113Ssthen 	}
1753d7b4a113Ssthen 	/* write stop frame */
1754d7b4a113Ssthen 	if(info->stop_frame_done < info->stop_frame_len) {
1755d7b4a113Ssthen 		if(!dtio_control_stop_send(info))
1756d7b4a113Ssthen 			return;
1757d7b4a113Ssthen 		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1758d7b4a113Ssthen 			"stop control frame");
1759d7b4a113Ssthen 	}
1760d7b4a113Ssthen 	/* when last frame and stop frame are sent, exit */
1761d7b4a113Ssthen 	dtio_stop_flush_exit(info);
1762d7b4a113Ssthen }
1763d7b4a113Ssthen 
1764d7b4a113Ssthen /** flush at end, last packet and stop control */
dtio_control_stop_flush(struct dt_io_thread * dtio)1765d7b4a113Ssthen static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1766d7b4a113Ssthen {
1767d7b4a113Ssthen 	/* briefly attempt to flush the previous packet to the output,
1768d7b4a113Ssthen 	 * this could be a partial packet, or even the start control frame */
1769d7b4a113Ssthen 	time_t secs = 0;
1770d7b4a113Ssthen 	struct timeval now;
1771d7b4a113Ssthen 	struct stop_flush_info info;
1772d7b4a113Ssthen 	struct timeval tv;
1773d7b4a113Ssthen 	struct ub_event* timer, *stopev;
1774d7b4a113Ssthen 
1775d7b4a113Ssthen 	if(dtio->fd == -1 || dtio->check_nb_connect) {
1776d7b4a113Ssthen 		/* no connection or we have just connected, so nothing is
1777d7b4a113Ssthen 		 * sent yet, so nothing to stop or flush */
1778d7b4a113Ssthen 		return;
1779d7b4a113Ssthen 	}
1780d7b4a113Ssthen 	if(dtio->ssl && !dtio->ssl_handshake_done) {
1781d7b4a113Ssthen 		/* no SSL connection has been established yet */
1782d7b4a113Ssthen 		return;
1783d7b4a113Ssthen 	}
1784d7b4a113Ssthen 
1785d7b4a113Ssthen 	memset(&info, 0, sizeof(info));
1786d7b4a113Ssthen 	memset(&now, 0, sizeof(now));
1787d7b4a113Ssthen 	info.dtio = dtio;
1788d7b4a113Ssthen 	info.base = ub_default_event_base(0, &secs, &now);
1789d7b4a113Ssthen 	if(!info.base) {
1790d7b4a113Ssthen 		log_err("dnstap io: malloc failure");
1791d7b4a113Ssthen 		return;
1792d7b4a113Ssthen 	}
1793d7b4a113Ssthen 	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1794d7b4a113Ssthen 		&dtio_stop_timer_cb, &info);
1795d7b4a113Ssthen 	if(!timer) {
1796d7b4a113Ssthen 		log_err("dnstap io: malloc failure");
1797d7b4a113Ssthen 		ub_event_base_free(info.base);
1798d7b4a113Ssthen 		return;
1799d7b4a113Ssthen 	}
1800d7b4a113Ssthen 	memset(&tv, 0, sizeof(tv));
1801d7b4a113Ssthen 	tv.tv_sec = 2;
1802d7b4a113Ssthen 	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1803d7b4a113Ssthen 		&tv) != 0) {
1804d7b4a113Ssthen 		log_err("dnstap io: cannot event_timer_add");
1805d7b4a113Ssthen 		ub_event_free(timer);
1806d7b4a113Ssthen 		ub_event_base_free(info.base);
1807d7b4a113Ssthen 		return;
1808d7b4a113Ssthen 	}
1809d7b4a113Ssthen 	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1810d7b4a113Ssthen 		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1811d7b4a113Ssthen 	if(!stopev) {
1812d7b4a113Ssthen 		log_err("dnstap io: malloc failure");
1813d7b4a113Ssthen 		ub_timer_del(timer);
1814d7b4a113Ssthen 		ub_event_free(timer);
1815d7b4a113Ssthen 		ub_event_base_free(info.base);
1816d7b4a113Ssthen 		return;
1817d7b4a113Ssthen 	}
1818d7b4a113Ssthen 	if(ub_event_add(stopev, NULL) != 0) {
1819d7b4a113Ssthen 		log_err("dnstap io: cannot event_add");
1820d7b4a113Ssthen 		ub_event_free(stopev);
1821d7b4a113Ssthen 		ub_timer_del(timer);
1822d7b4a113Ssthen 		ub_event_free(timer);
1823d7b4a113Ssthen 		ub_event_base_free(info.base);
1824d7b4a113Ssthen 		return;
1825d7b4a113Ssthen 	}
1826d7b4a113Ssthen 	info.stop_frame = fstrm_create_control_frame_stop(
1827d7b4a113Ssthen 		&info.stop_frame_len);
1828d7b4a113Ssthen 	if(!info.stop_frame) {
1829d7b4a113Ssthen 		log_err("dnstap io: malloc failure");
1830d7b4a113Ssthen 		ub_event_del(stopev);
1831d7b4a113Ssthen 		ub_event_free(stopev);
1832d7b4a113Ssthen 		ub_timer_del(timer);
1833d7b4a113Ssthen 		ub_event_free(timer);
1834d7b4a113Ssthen 		ub_event_base_free(info.base);
1835d7b4a113Ssthen 		return;
1836d7b4a113Ssthen 	}
1837d7b4a113Ssthen 	dtio->stop_flush_event = stopev;
1838d7b4a113Ssthen 
1839d7b4a113Ssthen 	/* wait briefly, or until finished */
1840d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: stop flush started");
1841d7b4a113Ssthen 	if(ub_event_base_dispatch(info.base) < 0) {
1842d7b4a113Ssthen 		log_err("dnstap io: dispatch flush failed, errno is %s",
1843d7b4a113Ssthen 			strerror(errno));
1844d7b4a113Ssthen 	}
1845d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1846d7b4a113Ssthen 	free(info.stop_frame);
1847d7b4a113Ssthen 	dtio->stop_flush_event = NULL;
1848d7b4a113Ssthen 	ub_event_del(stopev);
1849d7b4a113Ssthen 	ub_event_free(stopev);
1850d7b4a113Ssthen 	ub_timer_del(timer);
1851d7b4a113Ssthen 	ub_event_free(timer);
1852d7b4a113Ssthen 	ub_event_base_free(info.base);
1853d7b4a113Ssthen }
1854d7b4a113Ssthen 
1855d7b4a113Ssthen /** perform desetup and free stuff when the dnstap io thread exits */
dtio_desetup(struct dt_io_thread * dtio)1856d7b4a113Ssthen static void dtio_desetup(struct dt_io_thread* dtio)
1857d7b4a113Ssthen {
1858d7b4a113Ssthen 	dtio_control_stop_flush(dtio);
1859d7b4a113Ssthen 	dtio_del_output_event(dtio);
1860d7b4a113Ssthen 	dtio_close_output(dtio);
1861d7b4a113Ssthen 	ub_event_del(dtio->command_event);
1862d7b4a113Ssthen 	ub_event_free(dtio->command_event);
1863d7b4a113Ssthen #ifndef USE_WINSOCK
1864d7b4a113Ssthen 	close(dtio->commandpipe[0]);
1865d7b4a113Ssthen #else
1866d7b4a113Ssthen 	_close(dtio->commandpipe[0]);
1867d7b4a113Ssthen #endif
1868d7b4a113Ssthen 	dtio->commandpipe[0] = -1;
1869d7b4a113Ssthen 	dtio_reconnect_del(dtio);
1870d7b4a113Ssthen 	ub_event_free(dtio->reconnect_timer);
1871d7b4a113Ssthen 	dtio_cur_msg_free(dtio);
1872d7b4a113Ssthen #ifndef THREADS_DISABLED
1873d7b4a113Ssthen 	ub_event_base_free(dtio->event_base);
1874d7b4a113Ssthen #endif
1875d7b4a113Ssthen }
1876d7b4a113Ssthen 
1877d7b4a113Ssthen /** setup a start control message */
dtio_control_start_send(struct dt_io_thread * dtio)1878d7b4a113Ssthen static int dtio_control_start_send(struct dt_io_thread* dtio)
1879d7b4a113Ssthen {
1880d7b4a113Ssthen 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1881d7b4a113Ssthen 	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1882d7b4a113Ssthen 		&dtio->cur_msg_len);
1883d7b4a113Ssthen 	if(!dtio->cur_msg) {
1884d7b4a113Ssthen 		return 0;
1885d7b4a113Ssthen 	}
1886d7b4a113Ssthen 	/* setup to send the control message */
1887d7b4a113Ssthen 	/* set that the buffer needs to be sent, but the length
1888d7b4a113Ssthen 	 * of that buffer is already written, that way the buffer can
1889d7b4a113Ssthen 	 * start with 0 length and then the length of the control frame
1890d7b4a113Ssthen 	 * in it */
1891d7b4a113Ssthen 	dtio->cur_msg_done = 0;
1892d7b4a113Ssthen 	dtio->cur_msg_len_done = 4;
1893d7b4a113Ssthen 	return 1;
1894d7b4a113Ssthen }
1895d7b4a113Ssthen 
1896d7b4a113Ssthen /** setup a ready control message */
dtio_control_ready_send(struct dt_io_thread * dtio)1897d7b4a113Ssthen static int dtio_control_ready_send(struct dt_io_thread* dtio)
1898d7b4a113Ssthen {
1899d7b4a113Ssthen 	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1900d7b4a113Ssthen 	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1901d7b4a113Ssthen 		&dtio->cur_msg_len);
1902d7b4a113Ssthen 	if(!dtio->cur_msg) {
1903d7b4a113Ssthen 		return 0;
1904d7b4a113Ssthen 	}
1905d7b4a113Ssthen 	/* setup to send the control message */
1906d7b4a113Ssthen 	/* set that the buffer needs to be sent, but the length
1907d7b4a113Ssthen 	 * of that buffer is already written, that way the buffer can
1908d7b4a113Ssthen 	 * start with 0 length and then the length of the control frame
1909d7b4a113Ssthen 	 * in it */
1910d7b4a113Ssthen 	dtio->cur_msg_done = 0;
1911d7b4a113Ssthen 	dtio->cur_msg_len_done = 4;
1912d7b4a113Ssthen 	return 1;
1913d7b4a113Ssthen }
1914d7b4a113Ssthen 
1915d7b4a113Ssthen /** open the output file descriptor for af_local */
dtio_open_output_local(struct dt_io_thread * dtio)1916d7b4a113Ssthen static int dtio_open_output_local(struct dt_io_thread* dtio)
1917d7b4a113Ssthen {
1918d7b4a113Ssthen #ifdef HAVE_SYS_UN_H
1919d7b4a113Ssthen 	struct sockaddr_un s;
1920d7b4a113Ssthen 	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1921d7b4a113Ssthen 	if(dtio->fd == -1) {
1922d7b4a113Ssthen 		log_err("dnstap io: failed to create socket: %s",
1923e2a0f313Ssthen 			sock_strerror(errno));
1924d7b4a113Ssthen 		return 0;
1925d7b4a113Ssthen 	}
1926d7b4a113Ssthen 	memset(&s, 0, sizeof(s));
1927d7b4a113Ssthen #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1928d7b4a113Ssthen         /* this member exists on BSDs, not Linux */
1929d7b4a113Ssthen         s.sun_len = (unsigned)sizeof(s);
1930d7b4a113Ssthen #endif
1931d7b4a113Ssthen 	s.sun_family = AF_LOCAL;
1932d7b4a113Ssthen 	/* length is 92-108, 104 on FreeBSD */
1933d7b4a113Ssthen         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1934d7b4a113Ssthen 	fd_set_nonblock(dtio->fd);
1935d7b4a113Ssthen 	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1936d7b4a113Ssthen 		== -1) {
1937d7b4a113Ssthen 		char* to = dtio->socket_path;
1938e2a0f313Ssthen 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1939e2a0f313Ssthen 			verbosity < 4) {
1940e2a0f313Ssthen 			dtio_close_fd(dtio);
1941e2a0f313Ssthen 			return 0; /* no log retries on low verbosity */
1942e2a0f313Ssthen 		}
1943d7b4a113Ssthen 		log_err("dnstap io: failed to connect to \"%s\": %s",
1944e2a0f313Ssthen 			to, sock_strerror(errno));
1945d7b4a113Ssthen 		dtio_close_fd(dtio);
1946d7b4a113Ssthen 		return 0;
1947d7b4a113Ssthen 	}
1948d7b4a113Ssthen 	return 1;
1949d7b4a113Ssthen #else
1950d7b4a113Ssthen 	log_err("cannot create af_local socket");
1951d7b4a113Ssthen 	return 0;
1952d7b4a113Ssthen #endif /* HAVE_SYS_UN_H */
1953d7b4a113Ssthen }
1954d7b4a113Ssthen 
1955d7b4a113Ssthen /** open the output file descriptor for af_inet and af_inet6 */
dtio_open_output_tcp(struct dt_io_thread * dtio)1956d7b4a113Ssthen static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1957d7b4a113Ssthen {
1958d7b4a113Ssthen 	struct sockaddr_storage addr;
1959d7b4a113Ssthen 	socklen_t addrlen;
1960d7b4a113Ssthen 	memset(&addr, 0, sizeof(addr));
1961d7b4a113Ssthen 	addrlen = (socklen_t)sizeof(addr);
1962d7b4a113Ssthen 
19630e9b6f9fSsthen 	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
1964d7b4a113Ssthen 		log_err("could not parse IP '%s'", dtio->ip_str);
1965d7b4a113Ssthen 		return 0;
1966d7b4a113Ssthen 	}
1967d7b4a113Ssthen 	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1968d7b4a113Ssthen 	if(dtio->fd == -1) {
1969e2a0f313Ssthen 		log_err("can't create socket: %s", sock_strerror(errno));
1970d7b4a113Ssthen 		return 0;
1971d7b4a113Ssthen 	}
1972d7b4a113Ssthen 	fd_set_nonblock(dtio->fd);
1973d7b4a113Ssthen 	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1974d7b4a113Ssthen 		if(errno == EINPROGRESS)
1975d7b4a113Ssthen 			return 1; /* wait until connect done*/
1976e2a0f313Ssthen 		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1977e2a0f313Ssthen 			verbosity < 4) {
1978e2a0f313Ssthen 			dtio_close_fd(dtio);
1979e2a0f313Ssthen 			return 0; /* no log retries on low verbosity */
1980e2a0f313Ssthen 		}
1981d7b4a113Ssthen #ifndef USE_WINSOCK
1982d7b4a113Ssthen 		if(tcp_connect_errno_needs_log(
1983d7b4a113Ssthen 			(struct sockaddr *)&addr, addrlen)) {
1984d7b4a113Ssthen 			log_err("dnstap io: failed to connect to %s: %s",
1985d7b4a113Ssthen 				dtio->ip_str, strerror(errno));
1986d7b4a113Ssthen 		}
1987d7b4a113Ssthen #else
1988d7b4a113Ssthen 		if(WSAGetLastError() == WSAEINPROGRESS ||
1989d7b4a113Ssthen 			WSAGetLastError() == WSAEWOULDBLOCK)
1990d7b4a113Ssthen 			return 1; /* wait until connect done*/
1991d7b4a113Ssthen 		if(tcp_connect_errno_needs_log(
1992d7b4a113Ssthen 			(struct sockaddr *)&addr, addrlen)) {
1993d7b4a113Ssthen 			log_err("dnstap io: failed to connect to %s: %s",
1994d7b4a113Ssthen 				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1995d7b4a113Ssthen 		}
1996d7b4a113Ssthen #endif
1997d7b4a113Ssthen 		dtio_close_fd(dtio);
1998d7b4a113Ssthen 		return 0;
1999d7b4a113Ssthen 	}
2000d7b4a113Ssthen 	return 1;
2001d7b4a113Ssthen }
2002d7b4a113Ssthen 
2003d7b4a113Ssthen /** setup the SSL structure for new connection */
dtio_setup_ssl(struct dt_io_thread * dtio)2004d7b4a113Ssthen static int dtio_setup_ssl(struct dt_io_thread* dtio)
2005d7b4a113Ssthen {
2006d7b4a113Ssthen 	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2007d7b4a113Ssthen 	if(!dtio->ssl) return 0;
2008d7b4a113Ssthen 	dtio->ssl_handshake_done = 0;
2009d7b4a113Ssthen 	dtio->ssl_brief_read = 0;
2010d7b4a113Ssthen 
2011d7b4a113Ssthen 	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2012d7b4a113Ssthen 		dtio->tls_use_sni)) {
2013d7b4a113Ssthen 		return 0;
2014d7b4a113Ssthen 	}
2015d7b4a113Ssthen 	return 1;
2016d7b4a113Ssthen }
2017d7b4a113Ssthen 
2018d7b4a113Ssthen /** open the output file descriptor */
dtio_open_output(struct dt_io_thread * dtio)2019d7b4a113Ssthen static void dtio_open_output(struct dt_io_thread* dtio)
2020d7b4a113Ssthen {
2021d7b4a113Ssthen 	struct ub_event* ev;
2022d7b4a113Ssthen 	if(dtio->upstream_is_unix) {
2023d7b4a113Ssthen 		if(!dtio_open_output_local(dtio)) {
2024d7b4a113Ssthen 			dtio_reconnect_enable(dtio);
2025d7b4a113Ssthen 			return;
2026d7b4a113Ssthen 		}
2027d7b4a113Ssthen 	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2028d7b4a113Ssthen 		if(!dtio_open_output_tcp(dtio)) {
2029d7b4a113Ssthen 			dtio_reconnect_enable(dtio);
2030d7b4a113Ssthen 			return;
2031d7b4a113Ssthen 		}
2032d7b4a113Ssthen 		if(dtio->upstream_is_tls) {
2033d7b4a113Ssthen 			if(!dtio_setup_ssl(dtio)) {
2034d7b4a113Ssthen 				dtio_close_fd(dtio);
2035d7b4a113Ssthen 				dtio_reconnect_enable(dtio);
2036d7b4a113Ssthen 				return;
2037d7b4a113Ssthen 			}
2038d7b4a113Ssthen 		}
2039d7b4a113Ssthen 	}
2040d7b4a113Ssthen 	dtio->check_nb_connect = 1;
2041d7b4a113Ssthen 
2042d7b4a113Ssthen 	/* the EV_READ is to read ACCEPT control messages, and catch channel
2043d7b4a113Ssthen 	 * close. EV_WRITE is to write packets */
2044d7b4a113Ssthen 	ev = ub_event_new(dtio->event_base, dtio->fd,
2045d7b4a113Ssthen 		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2046d7b4a113Ssthen 		dtio);
2047d7b4a113Ssthen 	if(!ev) {
2048d7b4a113Ssthen 		log_err("dnstap io: out of memory");
2049d7b4a113Ssthen 		if(dtio->ssl) {
2050d7b4a113Ssthen #ifdef HAVE_SSL
2051d7b4a113Ssthen 			SSL_free(dtio->ssl);
2052d7b4a113Ssthen 			dtio->ssl = NULL;
2053d7b4a113Ssthen #endif
2054d7b4a113Ssthen 		}
2055d7b4a113Ssthen 		dtio_close_fd(dtio);
2056d7b4a113Ssthen 		dtio_reconnect_enable(dtio);
2057d7b4a113Ssthen 		return;
2058d7b4a113Ssthen 	}
2059d7b4a113Ssthen 	dtio->event = ev;
2060d7b4a113Ssthen 
2061d7b4a113Ssthen 	/* setup protocol control message to start */
2062d7b4a113Ssthen 	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2063d7b4a113Ssthen 		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2064d7b4a113Ssthen 		log_err("dnstap io: out of memory");
2065d7b4a113Ssthen 		ub_event_free(dtio->event);
2066d7b4a113Ssthen 		dtio->event = NULL;
2067d7b4a113Ssthen 		if(dtio->ssl) {
2068d7b4a113Ssthen #ifdef HAVE_SSL
2069d7b4a113Ssthen 			SSL_free(dtio->ssl);
2070d7b4a113Ssthen 			dtio->ssl = NULL;
2071d7b4a113Ssthen #endif
2072d7b4a113Ssthen 		}
2073d7b4a113Ssthen 		dtio_close_fd(dtio);
2074d7b4a113Ssthen 		dtio_reconnect_enable(dtio);
2075d7b4a113Ssthen 		return;
2076d7b4a113Ssthen 	}
2077d7b4a113Ssthen }
2078d7b4a113Ssthen 
2079d7b4a113Ssthen /** perform the setup of the writer thread on the established event_base */
dtio_setup_on_base(struct dt_io_thread * dtio)2080d7b4a113Ssthen static void dtio_setup_on_base(struct dt_io_thread* dtio)
2081d7b4a113Ssthen {
2082d7b4a113Ssthen 	dtio_setup_cmd(dtio);
2083d7b4a113Ssthen 	dtio_setup_reconnect(dtio);
2084d7b4a113Ssthen 	dtio_open_output(dtio);
2085d7b4a113Ssthen 	if(!dtio_add_output_event_write(dtio))
2086d7b4a113Ssthen 		return;
2087d7b4a113Ssthen }
2088d7b4a113Ssthen 
2089d7b4a113Ssthen #ifndef THREADS_DISABLED
2090d7b4a113Ssthen /** the IO thread function for the DNSTAP IO */
dnstap_io(void * arg)2091d7b4a113Ssthen static void* dnstap_io(void* arg)
2092d7b4a113Ssthen {
2093d7b4a113Ssthen 	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2094d7b4a113Ssthen 	time_t secs = 0;
2095d7b4a113Ssthen 	struct timeval now;
2096d7b4a113Ssthen 	log_thread_set(&dtio->threadnum);
2097d7b4a113Ssthen 
2098d7b4a113Ssthen 	/* setup */
2099d7b4a113Ssthen 	verbose(VERB_ALGO, "start dnstap io thread");
2100d7b4a113Ssthen 	dtio_setup_base(dtio, &secs, &now);
2101d7b4a113Ssthen 	dtio_setup_on_base(dtio);
2102d7b4a113Ssthen 
2103d7b4a113Ssthen 	/* run */
2104d7b4a113Ssthen 	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2105d7b4a113Ssthen 		log_err("dnstap io: dispatch failed, errno is %s",
2106d7b4a113Ssthen 			strerror(errno));
2107d7b4a113Ssthen 	}
2108d7b4a113Ssthen 
2109d7b4a113Ssthen 	/* cleanup */
2110d7b4a113Ssthen 	verbose(VERB_ALGO, "stop dnstap io thread");
2111d7b4a113Ssthen 	dtio_desetup(dtio);
2112d7b4a113Ssthen 	return NULL;
2113d7b4a113Ssthen }
2114d7b4a113Ssthen #endif /* THREADS_DISABLED */
2115d7b4a113Ssthen 
dt_io_thread_start(struct dt_io_thread * dtio,void * event_base_nothr,int numworkers)2116d7b4a113Ssthen int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2117d7b4a113Ssthen 	int numworkers)
2118d7b4a113Ssthen {
2119d7b4a113Ssthen 	/* set up the thread, can fail */
2120d7b4a113Ssthen #ifndef USE_WINSOCK
2121d7b4a113Ssthen 	if(pipe(dtio->commandpipe) == -1) {
2122d7b4a113Ssthen 		log_err("failed to create pipe: %s", strerror(errno));
2123d7b4a113Ssthen 		return 0;
2124d7b4a113Ssthen 	}
2125d7b4a113Ssthen #else
2126d7b4a113Ssthen 	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2127d7b4a113Ssthen 		log_err("failed to create _pipe: %s",
2128d7b4a113Ssthen 			wsa_strerror(WSAGetLastError()));
2129d7b4a113Ssthen 		return 0;
2130d7b4a113Ssthen 	}
2131d7b4a113Ssthen #endif
2132d7b4a113Ssthen 
2133d7b4a113Ssthen 	/* start the thread */
2134d7b4a113Ssthen 	dtio->threadnum = numworkers+1;
2135d7b4a113Ssthen 	dtio->started = 1;
2136d7b4a113Ssthen #ifndef THREADS_DISABLED
2137d7b4a113Ssthen 	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2138d7b4a113Ssthen 	(void)event_base_nothr;
2139d7b4a113Ssthen #else
2140d7b4a113Ssthen 	dtio->event_base = event_base_nothr;
2141d7b4a113Ssthen 	dtio_setup_on_base(dtio);
2142d7b4a113Ssthen #endif
2143d7b4a113Ssthen 	return 1;
2144d7b4a113Ssthen }
2145d7b4a113Ssthen 
dt_io_thread_stop(struct dt_io_thread * dtio)2146d7b4a113Ssthen void dt_io_thread_stop(struct dt_io_thread* dtio)
2147d7b4a113Ssthen {
2148d7b4a113Ssthen #ifndef THREADS_DISABLED
2149d7b4a113Ssthen 	uint8_t cmd = DTIO_COMMAND_STOP;
2150d7b4a113Ssthen #endif
2151d7b4a113Ssthen 	if(!dtio) return;
2152d7b4a113Ssthen 	if(!dtio->started) return;
2153d7b4a113Ssthen 	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2154d7b4a113Ssthen 
2155d7b4a113Ssthen #ifndef THREADS_DISABLED
2156d7b4a113Ssthen 	while(1) {
2157d7b4a113Ssthen 		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2158d7b4a113Ssthen 		if(r == -1) {
2159d7b4a113Ssthen #ifndef USE_WINSOCK
2160d7b4a113Ssthen 			if(errno == EINTR || errno == EAGAIN)
2161d7b4a113Ssthen 				continue;
2162d7b4a113Ssthen #else
2163d7b4a113Ssthen 			if(WSAGetLastError() == WSAEINPROGRESS)
2164d7b4a113Ssthen 				continue;
2165d7b4a113Ssthen 			if(WSAGetLastError() == WSAEWOULDBLOCK)
2166d7b4a113Ssthen 				continue;
2167d7b4a113Ssthen #endif
2168e2a0f313Ssthen 			log_err("dnstap io stop: write: %s",
2169e2a0f313Ssthen 				sock_strerror(errno));
2170d7b4a113Ssthen 			break;
2171d7b4a113Ssthen 		}
2172d7b4a113Ssthen 		break;
2173d7b4a113Ssthen 	}
2174d7b4a113Ssthen 	dtio->started = 0;
2175d7b4a113Ssthen #endif /* THREADS_DISABLED */
2176d7b4a113Ssthen 
2177d7b4a113Ssthen #ifndef USE_WINSOCK
2178d7b4a113Ssthen 	close(dtio->commandpipe[1]);
2179d7b4a113Ssthen #else
2180d7b4a113Ssthen 	_close(dtio->commandpipe[1]);
2181d7b4a113Ssthen #endif
2182d7b4a113Ssthen 	dtio->commandpipe[1] = -1;
2183d7b4a113Ssthen #ifndef THREADS_DISABLED
2184d7b4a113Ssthen 	ub_thread_join(dtio->tid);
2185d7b4a113Ssthen #else
2186d7b4a113Ssthen 	dtio->want_to_exit = 1;
2187d7b4a113Ssthen 	dtio_desetup(dtio);
2188d7b4a113Ssthen #endif
2189d7b4a113Ssthen }
2190