xref: /openbsd/usr.sbin/unbound/dnstap/dtstream.h (revision e2a0f313)
1d7b4a113Ssthen /*
2d7b4a113Ssthen  * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
3d7b4a113Ssthen  *
4d7b4a113Ssthen  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5d7b4a113Ssthen  *
6d7b4a113Ssthen  * This software is open source.
7d7b4a113Ssthen  *
8d7b4a113Ssthen  * Redistribution and use in source and binary forms, with or without
9d7b4a113Ssthen  * modification, are permitted provided that the following conditions
10d7b4a113Ssthen  * are met:
11d7b4a113Ssthen  *
12d7b4a113Ssthen  * Redistributions of source code must retain the above copyright notice,
13d7b4a113Ssthen  * this list of conditions and the following disclaimer.
14d7b4a113Ssthen  *
15d7b4a113Ssthen  * Redistributions in binary form must reproduce the above copyright notice,
16d7b4a113Ssthen  * this list of conditions and the following disclaimer in the documentation
17d7b4a113Ssthen  * and/or other materials provided with the distribution.
18d7b4a113Ssthen  *
19d7b4a113Ssthen  * Neither the name of the NLNET LABS nor the names of its contributors may
20d7b4a113Ssthen  * be used to endorse or promote products derived from this software without
21d7b4a113Ssthen  * specific prior written permission.
22d7b4a113Ssthen  *
23d7b4a113Ssthen  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24d7b4a113Ssthen  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25d7b4a113Ssthen  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26d7b4a113Ssthen  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27d7b4a113Ssthen  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28d7b4a113Ssthen  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29d7b4a113Ssthen  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30d7b4a113Ssthen  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31d7b4a113Ssthen  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32d7b4a113Ssthen  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33d7b4a113Ssthen  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34d7b4a113Ssthen  *
35d7b4a113Ssthen  */
36d7b4a113Ssthen 
37d7b4a113Ssthen /**
38d7b4a113Ssthen  * \file
39d7b4a113Ssthen  *
40d7b4a113Ssthen  * An implementation of the Frame Streams data transport protocol for
41d7b4a113Ssthen  * the Unbound DNSTAP message logging facility.
42d7b4a113Ssthen  */
43d7b4a113Ssthen 
44d7b4a113Ssthen #ifndef DTSTREAM_H
45d7b4a113Ssthen #define DTSTREAM_H
46d7b4a113Ssthen 
47d7b4a113Ssthen #include "util/locks.h"
48d7b4a113Ssthen struct dt_msg_entry;
49d7b4a113Ssthen struct dt_io_list_item;
50d7b4a113Ssthen struct dt_io_thread;
51d7b4a113Ssthen struct config_file;
52*e2a0f313Ssthen struct comm_base;
53d7b4a113Ssthen 
54d7b4a113Ssthen /**
55d7b4a113Ssthen  * A message buffer with dnstap messages queued up.  It is per-worker.
56d7b4a113Ssthen  * It has locks to synchronize.  If the buffer is full, a new message
57d7b4a113Ssthen  * cannot be added and is discarded.  A thread reads the messages and sends
58d7b4a113Ssthen  * them.
59d7b4a113Ssthen  */
60d7b4a113Ssthen struct dt_msg_queue {
61d7b4a113Ssthen 	/** lock of the buffer structure.  Hold this lock to add or remove
62d7b4a113Ssthen 	 * entries to the buffer.  Release it so that other threads can also
63d7b4a113Ssthen 	 * put messages to log, or a message can be taken out to send away
64d7b4a113Ssthen 	 * by the writer thread.
65d7b4a113Ssthen 	 */
66d7b4a113Ssthen 	lock_basic_type lock;
67d7b4a113Ssthen 	/** the maximum size of the buffer, in bytes */
68d7b4a113Ssthen 	size_t maxsize;
69d7b4a113Ssthen 	/** current size of the buffer, in bytes.  data bytes of messages.
70d7b4a113Ssthen 	 * If a new message make it more than maxsize, the buffer is full */
71d7b4a113Ssthen 	size_t cursize;
72*e2a0f313Ssthen 	/** number of messages in the queue */
73*e2a0f313Ssthen 	int msgcount;
74d7b4a113Ssthen 	/** list of messages.  The messages are added to the back and taken
75d7b4a113Ssthen 	 * out from the front. */
76d7b4a113Ssthen 	struct dt_msg_entry* first, *last;
77d7b4a113Ssthen 	/** reference to the io thread to wakeup */
78d7b4a113Ssthen 	struct dt_io_thread* dtio;
79*e2a0f313Ssthen 	/** the wakeup timer for dtio, on worker event base */
80*e2a0f313Ssthen 	struct comm_timer* wakeup_timer;
81d7b4a113Ssthen };
82d7b4a113Ssthen 
83d7b4a113Ssthen /**
84d7b4a113Ssthen  * An entry in the dt_msg_queue. contains one DNSTAP message.
85d7b4a113Ssthen  * It is malloced.
86d7b4a113Ssthen  */
87d7b4a113Ssthen struct dt_msg_entry {
88d7b4a113Ssthen 	/** next in the list. */
89d7b4a113Ssthen 	struct dt_msg_entry* next;
90d7b4a113Ssthen 	/** the buffer with the data to send, an encoded DNSTAP message */
91d7b4a113Ssthen 	void* buf;
92d7b4a113Ssthen 	/** the length to send. */
93d7b4a113Ssthen 	size_t len;
94d7b4a113Ssthen };
95d7b4a113Ssthen 
96d7b4a113Ssthen /**
97d7b4a113Ssthen  * Containing buffer and counter for reading DNSTAP frames.
98d7b4a113Ssthen  */
99d7b4a113Ssthen struct dt_frame_read_buf {
100d7b4a113Ssthen 	/** Buffer containing frame, except length counter(s). */
101d7b4a113Ssthen 	void* buf;
102d7b4a113Ssthen 	/** Number of bytes written to buffer. */
103d7b4a113Ssthen 	size_t buf_count;
104d7b4a113Ssthen 	/** Capacity of the buffer. */
105d7b4a113Ssthen 	size_t buf_cap;
106d7b4a113Ssthen 
107d7b4a113Ssthen 	/** Frame length field. Will contain the 2nd length field for control
108d7b4a113Ssthen 	 * frames. */
109d7b4a113Ssthen 	uint32_t frame_len;
110d7b4a113Ssthen 	/** Number of bytes that have been written to the frame_length field. */
111d7b4a113Ssthen 	size_t frame_len_done;
112d7b4a113Ssthen 
113d7b4a113Ssthen 	/** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
114d7b4a113Ssthen 	int control_frame;
115d7b4a113Ssthen };
116d7b4a113Ssthen 
117d7b4a113Ssthen /**
118d7b4a113Ssthen  * IO thread that reads from the queues and writes them.
119d7b4a113Ssthen  */
120d7b4a113Ssthen struct dt_io_thread {
121d7b4a113Ssthen 	/** the thread number for the dtio thread,
122d7b4a113Ssthen 	 * must be first to cast thread arg to int* in checklock code. */
123d7b4a113Ssthen 	int threadnum;
124d7b4a113Ssthen 	/** event base, for event handling */
125d7b4a113Ssthen 	void* event_base;
126d7b4a113Ssthen 	/** list of queues that is registered to get written */
127d7b4a113Ssthen 	struct dt_io_list_item* io_list;
128d7b4a113Ssthen 	/** iterator point in the io_list, to pick from them in a
129d7b4a113Ssthen 	 * round-robin fashion, instead of only from the first when busy.
130d7b4a113Ssthen 	 * if NULL it means start at the start of the list. */
131d7b4a113Ssthen 	struct dt_io_list_item* io_list_iter;
132d7b4a113Ssthen 	/** thread id, of the io thread */
133d7b4a113Ssthen 	ub_thread_type tid;
134d7b4a113Ssthen 	/** if the io processing has started */
135d7b4a113Ssthen 	int started;
136d7b4a113Ssthen 	/** ssl context for the io thread, for tls connections. type SSL_CTX* */
137d7b4a113Ssthen 	void* ssl_ctx;
138d7b4a113Ssthen 	/** if SNI will be used for TLS connections. */
139d7b4a113Ssthen 	int tls_use_sni;
140d7b4a113Ssthen 
141d7b4a113Ssthen 	/** file descriptor that the thread writes to */
142d7b4a113Ssthen 	int fd;
143d7b4a113Ssthen 	/** event structure that the thread uses */
144d7b4a113Ssthen 	void* event;
145d7b4a113Ssthen 	/** the event is added */
146d7b4a113Ssthen 	int event_added;
147d7b4a113Ssthen 	/** event added is a write event */
148d7b4a113Ssthen 	int event_added_is_write;
149d7b4a113Ssthen 	/** check for nonblocking connect errors on fd */
150d7b4a113Ssthen 	int check_nb_connect;
151d7b4a113Ssthen 	/** ssl for current connection, type SSL* */
152d7b4a113Ssthen 	void* ssl;
153d7b4a113Ssthen 	/** true if the handshake for SSL is done, 0 if not */
154d7b4a113Ssthen 	int ssl_handshake_done;
155d7b4a113Ssthen 	/** true if briefly the SSL wants a read event, 0 if not.
156d7b4a113Ssthen 	 * This happens during negotiation, we then do not want to write,
157d7b4a113Ssthen 	 * but wait for a read event. */
158d7b4a113Ssthen 	int ssl_brief_read;
159d7b4a113Ssthen 	/** true if SSL_read is waiting for a write event. Set back to 0 after
160d7b4a113Ssthen 	 * single write event is handled. */
161d7b4a113Ssthen 	int ssl_brief_write;
162d7b4a113Ssthen 
163d7b4a113Ssthen 	/** the buffer that currently getting written, or NULL if no
164d7b4a113Ssthen 	 * (partial) message written now */
165d7b4a113Ssthen 	void* cur_msg;
166d7b4a113Ssthen 	/** length of the current message */
167d7b4a113Ssthen 	size_t cur_msg_len;
168d7b4a113Ssthen 	/** number of bytes written for the current message */
169d7b4a113Ssthen 	size_t cur_msg_done;
170d7b4a113Ssthen 	/** number of bytes of the length that have been written,
171d7b4a113Ssthen 	 * for the current message length that precedes the frame */
172d7b4a113Ssthen 	size_t cur_msg_len_done;
173d7b4a113Ssthen 
174*e2a0f313Ssthen 	/** lock on wakeup_timer_enabled */
175*e2a0f313Ssthen 	lock_basic_type wakeup_timer_lock;
176*e2a0f313Ssthen 	/** if wakeup timer is enabled in some thread */
177*e2a0f313Ssthen 	int wakeup_timer_enabled;
178d7b4a113Ssthen 	/** command pipe that stops the pipe if closed.  Used to quit
179d7b4a113Ssthen 	 * the program. [0] is read, [1] is written to. */
180d7b4a113Ssthen 	int commandpipe[2];
181d7b4a113Ssthen 	/** the event to listen to the commandpipe */
182d7b4a113Ssthen 	void* command_event;
183d7b4a113Ssthen 	/** the io thread wants to exit */
184d7b4a113Ssthen 	int want_to_exit;
185d7b4a113Ssthen 
186d7b4a113Ssthen 	/** in stop flush, this is nonNULL and references the stop_ev */
187d7b4a113Ssthen 	void* stop_flush_event;
188d7b4a113Ssthen 
189d7b4a113Ssthen 	/** the timer event for connection retries */
190d7b4a113Ssthen 	void* reconnect_timer;
191d7b4a113Ssthen 	/** if the reconnect timer is added to the event base */
192d7b4a113Ssthen 	int reconnect_is_added;
193d7b4a113Ssthen 	/** the current reconnection timeout, it is increased with
194d7b4a113Ssthen 	 * exponential backoff, in msec */
195d7b4a113Ssthen 	int reconnect_timeout;
196d7b4a113Ssthen 
197d7b4a113Ssthen 	/** If the log server is connected to over unix domain sockets,
198d7b4a113Ssthen 	 * eg. a file is named that is created to log onto. */
199d7b4a113Ssthen 	int upstream_is_unix;
200d7b4a113Ssthen 	/** if the log server is connected to over TCP.  The ip address and
201d7b4a113Ssthen 	 * port are used */
202d7b4a113Ssthen 	int upstream_is_tcp;
203d7b4a113Ssthen 	/** if the log server is connected to over TLS.  ip address, port,
204d7b4a113Ssthen 	 * and client certificates can be used for authentication. */
205d7b4a113Ssthen 	int upstream_is_tls;
206d7b4a113Ssthen 
207d7b4a113Ssthen 	/** Perform bidirectional Frame Streams handshake before sending
208d7b4a113Ssthen 	 * messages. */
209d7b4a113Ssthen 	int is_bidirectional;
210d7b4a113Ssthen 	/** Set if the READY control frame has been sent. */
211d7b4a113Ssthen 	int ready_frame_sent;
212d7b4a113Ssthen 	/** Set if valid ACCEPT frame is received. */
213d7b4a113Ssthen 	int accept_frame_received;
214d7b4a113Ssthen 	/** (partially) read frame */
215d7b4a113Ssthen 	struct dt_frame_read_buf read_frame;
216d7b4a113Ssthen 
217d7b4a113Ssthen 	/** the file path for unix socket (or NULL) */
218d7b4a113Ssthen 	char* socket_path;
219d7b4a113Ssthen 	/** the ip address and port number (or NULL) */
220d7b4a113Ssthen 	char* ip_str;
221d7b4a113Ssthen 	/** is the TLS upstream authenticated by name, if nonNULL,
222d7b4a113Ssthen 	 * we use the same cert bundle as used by other TLS streams. */
223d7b4a113Ssthen 	char* tls_server_name;
224d7b4a113Ssthen 	/** are client certificates in use */
225d7b4a113Ssthen 	int use_client_certs;
226d7b4a113Ssthen 	/** client cert files: the .key file */
227d7b4a113Ssthen 	char* client_key_file;
228d7b4a113Ssthen 	/** client cert files: the .pem file */
229d7b4a113Ssthen 	char* client_cert_file;
230d7b4a113Ssthen };
231d7b4a113Ssthen 
232d7b4a113Ssthen /**
233d7b4a113Ssthen  * IO thread list of queues list item
234d7b4a113Ssthen  * lists a worker queue that should be looked at and sent to the log server.
235d7b4a113Ssthen  */
236d7b4a113Ssthen struct dt_io_list_item {
237d7b4a113Ssthen 	/** next in the list of buffers to inspect */
238d7b4a113Ssthen 	struct dt_io_list_item* next;
239d7b4a113Ssthen 	/** buffer of this worker */
240d7b4a113Ssthen 	struct dt_msg_queue* queue;
241d7b4a113Ssthen };
242d7b4a113Ssthen 
243d7b4a113Ssthen /**
244d7b4a113Ssthen  * Create new (empty) worker message queue. Limit set to default on max.
245*e2a0f313Ssthen  * @param base: event base for wakeup timer.
246d7b4a113Ssthen  * @return NULL on malloc failure or a new queue (not locked).
247d7b4a113Ssthen  */
248*e2a0f313Ssthen struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
249d7b4a113Ssthen 
250d7b4a113Ssthen /**
251d7b4a113Ssthen  * Delete a worker message queue.  It has to be unlinked from access,
252d7b4a113Ssthen  * so it can be deleted without lock worries.  The queue is emptied (deleted).
253d7b4a113Ssthen  * @param mq: message queue.
254d7b4a113Ssthen  */
255d7b4a113Ssthen void dt_msg_queue_delete(struct dt_msg_queue* mq);
256d7b4a113Ssthen 
257d7b4a113Ssthen /**
258d7b4a113Ssthen  * Submit a message to the queue.  The queue is locked by the routine,
259d7b4a113Ssthen  * the message is inserted, and then the queue is unlocked so the
260d7b4a113Ssthen  * message can be picked up by the writer thread.
261d7b4a113Ssthen  * @param mq: message queue.
262d7b4a113Ssthen  * @param buf: buffer with message (dnstap contents).
263d7b4a113Ssthen  * 	The buffer must have been malloced by caller.  It is linked in
264d7b4a113Ssthen  * 	the queue, and is free()d after use.  If the routine fails
265d7b4a113Ssthen  * 	the buffer is freed as well (and nothing happens, the item
266d7b4a113Ssthen  * 	could not be logged).
267d7b4a113Ssthen  * @param len: length of buffer.
268d7b4a113Ssthen  */
269d7b4a113Ssthen void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
270d7b4a113Ssthen 
271*e2a0f313Ssthen /** timer callback to wakeup dtio thread to process messages */
272*e2a0f313Ssthen void mq_wakeup_cb(void* arg);
273*e2a0f313Ssthen 
274d7b4a113Ssthen /**
275d7b4a113Ssthen  * Create IO thread.
276d7b4a113Ssthen  * @return new io thread object. not yet started. or NULL malloc failure.
277d7b4a113Ssthen  */
278d7b4a113Ssthen struct dt_io_thread* dt_io_thread_create(void);
279d7b4a113Ssthen 
280d7b4a113Ssthen /**
281d7b4a113Ssthen  * Delete the IO thread structure.
282d7b4a113Ssthen  * @param dtio: the io thread that is deleted.  It must not be running.
283d7b4a113Ssthen  */
284d7b4a113Ssthen void dt_io_thread_delete(struct dt_io_thread* dtio);
285d7b4a113Ssthen 
286d7b4a113Ssthen /**
287d7b4a113Ssthen  * Apply config to the dtio thread
288d7b4a113Ssthen  * @param dtio: io thread, not yet started.
289d7b4a113Ssthen  * @param cfg: config file struct.
290d7b4a113Ssthen  * @return false on malloc failure.
291d7b4a113Ssthen  */
292d7b4a113Ssthen int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
293d7b4a113Ssthen 	struct config_file *cfg);
294d7b4a113Ssthen 
295d7b4a113Ssthen /**
296d7b4a113Ssthen  * Register a msg queue to the io thread.  It will be polled to see if
297d7b4a113Ssthen  * there are messages and those then get removed and sent, when the thread
298d7b4a113Ssthen  * is running.
299d7b4a113Ssthen  * @param dtio: the io thread.
300d7b4a113Ssthen  * @param mq: message queue to register.
301d7b4a113Ssthen  * @return false on failure (malloc failure).
302d7b4a113Ssthen  */
303d7b4a113Ssthen int dt_io_thread_register_queue(struct dt_io_thread* dtio,
304d7b4a113Ssthen 	struct dt_msg_queue* mq);
305d7b4a113Ssthen 
306d7b4a113Ssthen /**
307d7b4a113Ssthen  * Unregister queue from io thread.
308d7b4a113Ssthen  * @param dtio: the io thread.
309d7b4a113Ssthen  * @param mq: message queue.
310d7b4a113Ssthen  */
311d7b4a113Ssthen void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
312d7b4a113Ssthen         struct dt_msg_queue* mq);
313d7b4a113Ssthen 
314d7b4a113Ssthen /**
315d7b4a113Ssthen  * Start the io thread
316d7b4a113Ssthen  * @param dtio: the io thread.
317d7b4a113Ssthen  * @param event_base_nothr: the event base to attach the events to, in case
318d7b4a113Ssthen  * 	we are running without threads.  With threads, this is ignored
319d7b4a113Ssthen  * 	and a thread is started to process the dnstap log messages.
320d7b4a113Ssthen  * @param numworkers: number of worker threads.  The dnstap io thread is
321d7b4a113Ssthen  * 	that number +1 as the threadnumber (in logs).
322d7b4a113Ssthen  * @return false on failure.
323d7b4a113Ssthen  */
324d7b4a113Ssthen int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
325d7b4a113Ssthen 	int numworkers);
326d7b4a113Ssthen 
327d7b4a113Ssthen /**
328d7b4a113Ssthen  * Stop the io thread
329d7b4a113Ssthen  * @param dtio: the io thread.
330d7b4a113Ssthen  */
331d7b4a113Ssthen void dt_io_thread_stop(struct dt_io_thread* dtio);
332d7b4a113Ssthen 
333d7b4a113Ssthen /** callback for the dnstap reconnect, to start reconnecting to output */
334d7b4a113Ssthen void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
335d7b4a113Ssthen 
336d7b4a113Ssthen /** callback for the dnstap events, to write to the output */
337d7b4a113Ssthen void dtio_output_cb(int fd, short bits, void* arg);
338d7b4a113Ssthen 
339d7b4a113Ssthen /** callback for the dnstap commandpipe, to stop the dnstap IO */
340d7b4a113Ssthen void dtio_cmd_cb(int fd, short bits, void* arg);
341d7b4a113Ssthen 
342d7b4a113Ssthen /** callback for the timer when the thread stops and wants to finish up */
343d7b4a113Ssthen void dtio_stop_timer_cb(int fd, short bits, void* arg);
344d7b4a113Ssthen 
345d7b4a113Ssthen /** callback for the output when the thread stops and wants to finish up */
346d7b4a113Ssthen void dtio_stop_ev_cb(int fd, short bits, void* arg);
347d7b4a113Ssthen 
348d7b4a113Ssthen /** callback for unbound-dnstap-socket */
349d7b4a113Ssthen void dtio_tap_callback(int fd, short bits, void* arg);
350d7b4a113Ssthen 
351d7b4a113Ssthen /** callback for unbound-dnstap-socket */
352d7b4a113Ssthen void dtio_mainfdcallback(int fd, short bits, void* arg);
353d7b4a113Ssthen 
354d7b4a113Ssthen #endif /* DTSTREAM_H */
355