1d7b4a113Ssthen /* 2d7b4a113Ssthen * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP 3d7b4a113Ssthen * 4d7b4a113Ssthen * Copyright (c) 2020, NLnet Labs. All rights reserved. 5d7b4a113Ssthen * 6d7b4a113Ssthen * This software is open source. 7d7b4a113Ssthen * 8d7b4a113Ssthen * Redistribution and use in source and binary forms, with or without 9d7b4a113Ssthen * modification, are permitted provided that the following conditions 10d7b4a113Ssthen * are met: 11d7b4a113Ssthen * 12d7b4a113Ssthen * Redistributions of source code must retain the above copyright notice, 13d7b4a113Ssthen * this list of conditions and the following disclaimer. 14d7b4a113Ssthen * 15d7b4a113Ssthen * Redistributions in binary form must reproduce the above copyright notice, 16d7b4a113Ssthen * this list of conditions and the following disclaimer in the documentation 17d7b4a113Ssthen * and/or other materials provided with the distribution. 18d7b4a113Ssthen * 19d7b4a113Ssthen * Neither the name of the NLNET LABS nor the names of its contributors may 20d7b4a113Ssthen * be used to endorse or promote products derived from this software without 21d7b4a113Ssthen * specific prior written permission. 22d7b4a113Ssthen * 23d7b4a113Ssthen * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24d7b4a113Ssthen * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25d7b4a113Ssthen * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26d7b4a113Ssthen * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27d7b4a113Ssthen * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28d7b4a113Ssthen * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29d7b4a113Ssthen * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30d7b4a113Ssthen * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31d7b4a113Ssthen * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32d7b4a113Ssthen * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33d7b4a113Ssthen * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34d7b4a113Ssthen * 35d7b4a113Ssthen */ 36d7b4a113Ssthen 37d7b4a113Ssthen /** 38d7b4a113Ssthen * \file 39d7b4a113Ssthen * 40d7b4a113Ssthen * An implementation of the Frame Streams data transport protocol for 41d7b4a113Ssthen * the Unbound DNSTAP message logging facility. 42d7b4a113Ssthen */ 43d7b4a113Ssthen 44d7b4a113Ssthen #ifndef DTSTREAM_H 45d7b4a113Ssthen #define DTSTREAM_H 46d7b4a113Ssthen 47d7b4a113Ssthen #include "util/locks.h" 48d7b4a113Ssthen struct dt_msg_entry; 49d7b4a113Ssthen struct dt_io_list_item; 50d7b4a113Ssthen struct dt_io_thread; 51d7b4a113Ssthen struct config_file; 52*e2a0f313Ssthen struct comm_base; 53d7b4a113Ssthen 54d7b4a113Ssthen /** 55d7b4a113Ssthen * A message buffer with dnstap messages queued up. It is per-worker. 56d7b4a113Ssthen * It has locks to synchronize. If the buffer is full, a new message 57d7b4a113Ssthen * cannot be added and is discarded. A thread reads the messages and sends 58d7b4a113Ssthen * them. 59d7b4a113Ssthen */ 60d7b4a113Ssthen struct dt_msg_queue { 61d7b4a113Ssthen /** lock of the buffer structure. Hold this lock to add or remove 62d7b4a113Ssthen * entries to the buffer. Release it so that other threads can also 63d7b4a113Ssthen * put messages to log, or a message can be taken out to send away 64d7b4a113Ssthen * by the writer thread. 65d7b4a113Ssthen */ 66d7b4a113Ssthen lock_basic_type lock; 67d7b4a113Ssthen /** the maximum size of the buffer, in bytes */ 68d7b4a113Ssthen size_t maxsize; 69d7b4a113Ssthen /** current size of the buffer, in bytes. data bytes of messages. 70d7b4a113Ssthen * If a new message make it more than maxsize, the buffer is full */ 71d7b4a113Ssthen size_t cursize; 72*e2a0f313Ssthen /** number of messages in the queue */ 73*e2a0f313Ssthen int msgcount; 74d7b4a113Ssthen /** list of messages. The messages are added to the back and taken 75d7b4a113Ssthen * out from the front. */ 76d7b4a113Ssthen struct dt_msg_entry* first, *last; 77d7b4a113Ssthen /** reference to the io thread to wakeup */ 78d7b4a113Ssthen struct dt_io_thread* dtio; 79*e2a0f313Ssthen /** the wakeup timer for dtio, on worker event base */ 80*e2a0f313Ssthen struct comm_timer* wakeup_timer; 81d7b4a113Ssthen }; 82d7b4a113Ssthen 83d7b4a113Ssthen /** 84d7b4a113Ssthen * An entry in the dt_msg_queue. contains one DNSTAP message. 85d7b4a113Ssthen * It is malloced. 86d7b4a113Ssthen */ 87d7b4a113Ssthen struct dt_msg_entry { 88d7b4a113Ssthen /** next in the list. */ 89d7b4a113Ssthen struct dt_msg_entry* next; 90d7b4a113Ssthen /** the buffer with the data to send, an encoded DNSTAP message */ 91d7b4a113Ssthen void* buf; 92d7b4a113Ssthen /** the length to send. */ 93d7b4a113Ssthen size_t len; 94d7b4a113Ssthen }; 95d7b4a113Ssthen 96d7b4a113Ssthen /** 97d7b4a113Ssthen * Containing buffer and counter for reading DNSTAP frames. 98d7b4a113Ssthen */ 99d7b4a113Ssthen struct dt_frame_read_buf { 100d7b4a113Ssthen /** Buffer containing frame, except length counter(s). */ 101d7b4a113Ssthen void* buf; 102d7b4a113Ssthen /** Number of bytes written to buffer. */ 103d7b4a113Ssthen size_t buf_count; 104d7b4a113Ssthen /** Capacity of the buffer. */ 105d7b4a113Ssthen size_t buf_cap; 106d7b4a113Ssthen 107d7b4a113Ssthen /** Frame length field. Will contain the 2nd length field for control 108d7b4a113Ssthen * frames. */ 109d7b4a113Ssthen uint32_t frame_len; 110d7b4a113Ssthen /** Number of bytes that have been written to the frame_length field. */ 111d7b4a113Ssthen size_t frame_len_done; 112d7b4a113Ssthen 113d7b4a113Ssthen /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */ 114d7b4a113Ssthen int control_frame; 115d7b4a113Ssthen }; 116d7b4a113Ssthen 117d7b4a113Ssthen /** 118d7b4a113Ssthen * IO thread that reads from the queues and writes them. 119d7b4a113Ssthen */ 120d7b4a113Ssthen struct dt_io_thread { 121d7b4a113Ssthen /** the thread number for the dtio thread, 122d7b4a113Ssthen * must be first to cast thread arg to int* in checklock code. */ 123d7b4a113Ssthen int threadnum; 124d7b4a113Ssthen /** event base, for event handling */ 125d7b4a113Ssthen void* event_base; 126d7b4a113Ssthen /** list of queues that is registered to get written */ 127d7b4a113Ssthen struct dt_io_list_item* io_list; 128d7b4a113Ssthen /** iterator point in the io_list, to pick from them in a 129d7b4a113Ssthen * round-robin fashion, instead of only from the first when busy. 130d7b4a113Ssthen * if NULL it means start at the start of the list. */ 131d7b4a113Ssthen struct dt_io_list_item* io_list_iter; 132d7b4a113Ssthen /** thread id, of the io thread */ 133d7b4a113Ssthen ub_thread_type tid; 134d7b4a113Ssthen /** if the io processing has started */ 135d7b4a113Ssthen int started; 136d7b4a113Ssthen /** ssl context for the io thread, for tls connections. type SSL_CTX* */ 137d7b4a113Ssthen void* ssl_ctx; 138d7b4a113Ssthen /** if SNI will be used for TLS connections. */ 139d7b4a113Ssthen int tls_use_sni; 140d7b4a113Ssthen 141d7b4a113Ssthen /** file descriptor that the thread writes to */ 142d7b4a113Ssthen int fd; 143d7b4a113Ssthen /** event structure that the thread uses */ 144d7b4a113Ssthen void* event; 145d7b4a113Ssthen /** the event is added */ 146d7b4a113Ssthen int event_added; 147d7b4a113Ssthen /** event added is a write event */ 148d7b4a113Ssthen int event_added_is_write; 149d7b4a113Ssthen /** check for nonblocking connect errors on fd */ 150d7b4a113Ssthen int check_nb_connect; 151d7b4a113Ssthen /** ssl for current connection, type SSL* */ 152d7b4a113Ssthen void* ssl; 153d7b4a113Ssthen /** true if the handshake for SSL is done, 0 if not */ 154d7b4a113Ssthen int ssl_handshake_done; 155d7b4a113Ssthen /** true if briefly the SSL wants a read event, 0 if not. 156d7b4a113Ssthen * This happens during negotiation, we then do not want to write, 157d7b4a113Ssthen * but wait for a read event. */ 158d7b4a113Ssthen int ssl_brief_read; 159d7b4a113Ssthen /** true if SSL_read is waiting for a write event. Set back to 0 after 160d7b4a113Ssthen * single write event is handled. */ 161d7b4a113Ssthen int ssl_brief_write; 162d7b4a113Ssthen 163d7b4a113Ssthen /** the buffer that currently getting written, or NULL if no 164d7b4a113Ssthen * (partial) message written now */ 165d7b4a113Ssthen void* cur_msg; 166d7b4a113Ssthen /** length of the current message */ 167d7b4a113Ssthen size_t cur_msg_len; 168d7b4a113Ssthen /** number of bytes written for the current message */ 169d7b4a113Ssthen size_t cur_msg_done; 170d7b4a113Ssthen /** number of bytes of the length that have been written, 171d7b4a113Ssthen * for the current message length that precedes the frame */ 172d7b4a113Ssthen size_t cur_msg_len_done; 173d7b4a113Ssthen 174*e2a0f313Ssthen /** lock on wakeup_timer_enabled */ 175*e2a0f313Ssthen lock_basic_type wakeup_timer_lock; 176*e2a0f313Ssthen /** if wakeup timer is enabled in some thread */ 177*e2a0f313Ssthen int wakeup_timer_enabled; 178d7b4a113Ssthen /** command pipe that stops the pipe if closed. Used to quit 179d7b4a113Ssthen * the program. [0] is read, [1] is written to. */ 180d7b4a113Ssthen int commandpipe[2]; 181d7b4a113Ssthen /** the event to listen to the commandpipe */ 182d7b4a113Ssthen void* command_event; 183d7b4a113Ssthen /** the io thread wants to exit */ 184d7b4a113Ssthen int want_to_exit; 185d7b4a113Ssthen 186d7b4a113Ssthen /** in stop flush, this is nonNULL and references the stop_ev */ 187d7b4a113Ssthen void* stop_flush_event; 188d7b4a113Ssthen 189d7b4a113Ssthen /** the timer event for connection retries */ 190d7b4a113Ssthen void* reconnect_timer; 191d7b4a113Ssthen /** if the reconnect timer is added to the event base */ 192d7b4a113Ssthen int reconnect_is_added; 193d7b4a113Ssthen /** the current reconnection timeout, it is increased with 194d7b4a113Ssthen * exponential backoff, in msec */ 195d7b4a113Ssthen int reconnect_timeout; 196d7b4a113Ssthen 197d7b4a113Ssthen /** If the log server is connected to over unix domain sockets, 198d7b4a113Ssthen * eg. a file is named that is created to log onto. */ 199d7b4a113Ssthen int upstream_is_unix; 200d7b4a113Ssthen /** if the log server is connected to over TCP. The ip address and 201d7b4a113Ssthen * port are used */ 202d7b4a113Ssthen int upstream_is_tcp; 203d7b4a113Ssthen /** if the log server is connected to over TLS. ip address, port, 204d7b4a113Ssthen * and client certificates can be used for authentication. */ 205d7b4a113Ssthen int upstream_is_tls; 206d7b4a113Ssthen 207d7b4a113Ssthen /** Perform bidirectional Frame Streams handshake before sending 208d7b4a113Ssthen * messages. */ 209d7b4a113Ssthen int is_bidirectional; 210d7b4a113Ssthen /** Set if the READY control frame has been sent. */ 211d7b4a113Ssthen int ready_frame_sent; 212d7b4a113Ssthen /** Set if valid ACCEPT frame is received. */ 213d7b4a113Ssthen int accept_frame_received; 214d7b4a113Ssthen /** (partially) read frame */ 215d7b4a113Ssthen struct dt_frame_read_buf read_frame; 216d7b4a113Ssthen 217d7b4a113Ssthen /** the file path for unix socket (or NULL) */ 218d7b4a113Ssthen char* socket_path; 219d7b4a113Ssthen /** the ip address and port number (or NULL) */ 220d7b4a113Ssthen char* ip_str; 221d7b4a113Ssthen /** is the TLS upstream authenticated by name, if nonNULL, 222d7b4a113Ssthen * we use the same cert bundle as used by other TLS streams. */ 223d7b4a113Ssthen char* tls_server_name; 224d7b4a113Ssthen /** are client certificates in use */ 225d7b4a113Ssthen int use_client_certs; 226d7b4a113Ssthen /** client cert files: the .key file */ 227d7b4a113Ssthen char* client_key_file; 228d7b4a113Ssthen /** client cert files: the .pem file */ 229d7b4a113Ssthen char* client_cert_file; 230d7b4a113Ssthen }; 231d7b4a113Ssthen 232d7b4a113Ssthen /** 233d7b4a113Ssthen * IO thread list of queues list item 234d7b4a113Ssthen * lists a worker queue that should be looked at and sent to the log server. 235d7b4a113Ssthen */ 236d7b4a113Ssthen struct dt_io_list_item { 237d7b4a113Ssthen /** next in the list of buffers to inspect */ 238d7b4a113Ssthen struct dt_io_list_item* next; 239d7b4a113Ssthen /** buffer of this worker */ 240d7b4a113Ssthen struct dt_msg_queue* queue; 241d7b4a113Ssthen }; 242d7b4a113Ssthen 243d7b4a113Ssthen /** 244d7b4a113Ssthen * Create new (empty) worker message queue. Limit set to default on max. 245*e2a0f313Ssthen * @param base: event base for wakeup timer. 246d7b4a113Ssthen * @return NULL on malloc failure or a new queue (not locked). 247d7b4a113Ssthen */ 248*e2a0f313Ssthen struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); 249d7b4a113Ssthen 250d7b4a113Ssthen /** 251d7b4a113Ssthen * Delete a worker message queue. It has to be unlinked from access, 252d7b4a113Ssthen * so it can be deleted without lock worries. The queue is emptied (deleted). 253d7b4a113Ssthen * @param mq: message queue. 254d7b4a113Ssthen */ 255d7b4a113Ssthen void dt_msg_queue_delete(struct dt_msg_queue* mq); 256d7b4a113Ssthen 257d7b4a113Ssthen /** 258d7b4a113Ssthen * Submit a message to the queue. The queue is locked by the routine, 259d7b4a113Ssthen * the message is inserted, and then the queue is unlocked so the 260d7b4a113Ssthen * message can be picked up by the writer thread. 261d7b4a113Ssthen * @param mq: message queue. 262d7b4a113Ssthen * @param buf: buffer with message (dnstap contents). 263d7b4a113Ssthen * The buffer must have been malloced by caller. It is linked in 264d7b4a113Ssthen * the queue, and is free()d after use. If the routine fails 265d7b4a113Ssthen * the buffer is freed as well (and nothing happens, the item 266d7b4a113Ssthen * could not be logged). 267d7b4a113Ssthen * @param len: length of buffer. 268d7b4a113Ssthen */ 269d7b4a113Ssthen void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); 270d7b4a113Ssthen 271*e2a0f313Ssthen /** timer callback to wakeup dtio thread to process messages */ 272*e2a0f313Ssthen void mq_wakeup_cb(void* arg); 273*e2a0f313Ssthen 274d7b4a113Ssthen /** 275d7b4a113Ssthen * Create IO thread. 276d7b4a113Ssthen * @return new io thread object. not yet started. or NULL malloc failure. 277d7b4a113Ssthen */ 278d7b4a113Ssthen struct dt_io_thread* dt_io_thread_create(void); 279d7b4a113Ssthen 280d7b4a113Ssthen /** 281d7b4a113Ssthen * Delete the IO thread structure. 282d7b4a113Ssthen * @param dtio: the io thread that is deleted. It must not be running. 283d7b4a113Ssthen */ 284d7b4a113Ssthen void dt_io_thread_delete(struct dt_io_thread* dtio); 285d7b4a113Ssthen 286d7b4a113Ssthen /** 287d7b4a113Ssthen * Apply config to the dtio thread 288d7b4a113Ssthen * @param dtio: io thread, not yet started. 289d7b4a113Ssthen * @param cfg: config file struct. 290d7b4a113Ssthen * @return false on malloc failure. 291d7b4a113Ssthen */ 292d7b4a113Ssthen int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, 293d7b4a113Ssthen struct config_file *cfg); 294d7b4a113Ssthen 295d7b4a113Ssthen /** 296d7b4a113Ssthen * Register a msg queue to the io thread. It will be polled to see if 297d7b4a113Ssthen * there are messages and those then get removed and sent, when the thread 298d7b4a113Ssthen * is running. 299d7b4a113Ssthen * @param dtio: the io thread. 300d7b4a113Ssthen * @param mq: message queue to register. 301d7b4a113Ssthen * @return false on failure (malloc failure). 302d7b4a113Ssthen */ 303d7b4a113Ssthen int dt_io_thread_register_queue(struct dt_io_thread* dtio, 304d7b4a113Ssthen struct dt_msg_queue* mq); 305d7b4a113Ssthen 306d7b4a113Ssthen /** 307d7b4a113Ssthen * Unregister queue from io thread. 308d7b4a113Ssthen * @param dtio: the io thread. 309d7b4a113Ssthen * @param mq: message queue. 310d7b4a113Ssthen */ 311d7b4a113Ssthen void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 312d7b4a113Ssthen struct dt_msg_queue* mq); 313d7b4a113Ssthen 314d7b4a113Ssthen /** 315d7b4a113Ssthen * Start the io thread 316d7b4a113Ssthen * @param dtio: the io thread. 317d7b4a113Ssthen * @param event_base_nothr: the event base to attach the events to, in case 318d7b4a113Ssthen * we are running without threads. With threads, this is ignored 319d7b4a113Ssthen * and a thread is started to process the dnstap log messages. 320d7b4a113Ssthen * @param numworkers: number of worker threads. The dnstap io thread is 321d7b4a113Ssthen * that number +1 as the threadnumber (in logs). 322d7b4a113Ssthen * @return false on failure. 323d7b4a113Ssthen */ 324d7b4a113Ssthen int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 325d7b4a113Ssthen int numworkers); 326d7b4a113Ssthen 327d7b4a113Ssthen /** 328d7b4a113Ssthen * Stop the io thread 329d7b4a113Ssthen * @param dtio: the io thread. 330d7b4a113Ssthen */ 331d7b4a113Ssthen void dt_io_thread_stop(struct dt_io_thread* dtio); 332d7b4a113Ssthen 333d7b4a113Ssthen /** callback for the dnstap reconnect, to start reconnecting to output */ 334d7b4a113Ssthen void dtio_reconnect_timeout_cb(int fd, short bits, void* arg); 335d7b4a113Ssthen 336d7b4a113Ssthen /** callback for the dnstap events, to write to the output */ 337d7b4a113Ssthen void dtio_output_cb(int fd, short bits, void* arg); 338d7b4a113Ssthen 339d7b4a113Ssthen /** callback for the dnstap commandpipe, to stop the dnstap IO */ 340d7b4a113Ssthen void dtio_cmd_cb(int fd, short bits, void* arg); 341d7b4a113Ssthen 342d7b4a113Ssthen /** callback for the timer when the thread stops and wants to finish up */ 343d7b4a113Ssthen void dtio_stop_timer_cb(int fd, short bits, void* arg); 344d7b4a113Ssthen 345d7b4a113Ssthen /** callback for the output when the thread stops and wants to finish up */ 346d7b4a113Ssthen void dtio_stop_ev_cb(int fd, short bits, void* arg); 347d7b4a113Ssthen 348d7b4a113Ssthen /** callback for unbound-dnstap-socket */ 349d7b4a113Ssthen void dtio_tap_callback(int fd, short bits, void* arg); 350d7b4a113Ssthen 351d7b4a113Ssthen /** callback for unbound-dnstap-socket */ 352d7b4a113Ssthen void dtio_mainfdcallback(int fd, short bits, void* arg); 353d7b4a113Ssthen 354d7b4a113Ssthen #endif /* DTSTREAM_H */ 355