1cd92f1a1Sprlw1 /*
2cd92f1a1Sprlw1  * dnstap/dnstap_collector.c -- nsd collector process for dnstap information
3cd92f1a1Sprlw1  *
4cd92f1a1Sprlw1  * Copyright (c) 2018, NLnet Labs. All rights reserved.
5cd92f1a1Sprlw1  *
6cd92f1a1Sprlw1  * See LICENSE for the license.
7cd92f1a1Sprlw1  *
8cd92f1a1Sprlw1  */
9cd92f1a1Sprlw1 
10cd92f1a1Sprlw1 #include "config.h"
11cd92f1a1Sprlw1 #include <sys/types.h>
12cd92f1a1Sprlw1 #include <sys/socket.h>
13cd92f1a1Sprlw1 #include <errno.h>
14cd92f1a1Sprlw1 #include <fcntl.h>
15cd92f1a1Sprlw1 #include <unistd.h>
16cd92f1a1Sprlw1 #ifndef USE_MINI_EVENT
17cd92f1a1Sprlw1 #  ifdef HAVE_EVENT_H
18cd92f1a1Sprlw1 #    include <event.h>
19cd92f1a1Sprlw1 #  else
20cd92f1a1Sprlw1 #    include <event2/event.h>
21cd92f1a1Sprlw1 #    include "event2/event_struct.h"
22cd92f1a1Sprlw1 #    include "event2/event_compat.h"
23cd92f1a1Sprlw1 #  endif
24cd92f1a1Sprlw1 #else
25cd92f1a1Sprlw1 #  include "mini_event.h"
26cd92f1a1Sprlw1 #endif
27cd92f1a1Sprlw1 #include "dnstap/dnstap_collector.h"
28cd92f1a1Sprlw1 #include "dnstap/dnstap.h"
29cd92f1a1Sprlw1 #include "util.h"
30cd92f1a1Sprlw1 #include "nsd.h"
31cd92f1a1Sprlw1 #include "region-allocator.h"
32cd92f1a1Sprlw1 #include "buffer.h"
33cd92f1a1Sprlw1 #include "namedb.h"
34cd92f1a1Sprlw1 #include "options.h"
35cd92f1a1Sprlw1 
36*66a1527dSchristos #include "udb.h"
37*66a1527dSchristos #include "rrl.h"
38*66a1527dSchristos 
dt_collector_create(struct nsd * nsd)39cd92f1a1Sprlw1 struct dt_collector* dt_collector_create(struct nsd* nsd)
40cd92f1a1Sprlw1 {
41cd92f1a1Sprlw1 	int i, sv[2];
42cd92f1a1Sprlw1 	struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero(
43cd92f1a1Sprlw1 		sizeof(*dt_col));
44*66a1527dSchristos 	dt_col->count = nsd->child_count * 2;
45cd92f1a1Sprlw1 	dt_col->dt_env = NULL;
46cd92f1a1Sprlw1 	dt_col->region = region_create(xalloc, free);
47cd92f1a1Sprlw1 	dt_col->send_buffer = buffer_create(dt_col->region,
48*66a1527dSchristos 		/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
49cd92f1a1Sprlw1 		4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
50cd92f1a1Sprlw1 #ifdef INET6
51*66a1527dSchristos 		sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
52cd92f1a1Sprlw1 #else
53*66a1527dSchristos 		sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
54cd92f1a1Sprlw1 #endif
55cd92f1a1Sprlw1 		);
56cd92f1a1Sprlw1 
57*66a1527dSchristos 	/* open communication channels in struct nsd */
58cd92f1a1Sprlw1 	nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count,
59cd92f1a1Sprlw1 		sizeof(int));
60cd92f1a1Sprlw1 	nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count,
61cd92f1a1Sprlw1 		sizeof(int));
62cd92f1a1Sprlw1 	for(i=0; i<dt_col->count; i++) {
63*66a1527dSchristos 		int sv[2];
64*66a1527dSchristos 		int bufsz = buffer_capacity(dt_col->send_buffer);
65*66a1527dSchristos 		sv[0] = -1; /* For receiving by parent (dnstap-collector) */
66*66a1527dSchristos 		sv[1] = -1; /* For sending   by child  (server childs) */
67*66a1527dSchristos 		if(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sv) < 0) {
68*66a1527dSchristos 			error("dnstap_collector: cannot create communication channel: %s",
69cd92f1a1Sprlw1 				strerror(errno));
70cd92f1a1Sprlw1 		}
71*66a1527dSchristos 		if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) {
72*66a1527dSchristos 			log_msg(LOG_ERR, "setting dnstap_collector "
73*66a1527dSchristos 				"receive buffer size failed: %s", strerror(errno));
74cd92f1a1Sprlw1 		}
75*66a1527dSchristos 		if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) {
76*66a1527dSchristos 			log_msg(LOG_ERR, "setting dnstap_collector "
77*66a1527dSchristos 				"send buffer size failed: %s", strerror(errno));
78cd92f1a1Sprlw1 		}
79*66a1527dSchristos 		nsd->dt_collector_fd_recv[i] = sv[0];
80*66a1527dSchristos 		nsd->dt_collector_fd_send[i] = sv[1];
81cd92f1a1Sprlw1 	}
82*66a1527dSchristos 	nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count;
83cd92f1a1Sprlw1 
84cd92f1a1Sprlw1 	/* open socketpair */
85cd92f1a1Sprlw1 	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
86cd92f1a1Sprlw1 		error("dnstap_collector: cannot create socketpair: %s",
87cd92f1a1Sprlw1 			strerror(errno));
88cd92f1a1Sprlw1 	}
89cd92f1a1Sprlw1 	if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
90cd92f1a1Sprlw1 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
91cd92f1a1Sprlw1 	}
92cd92f1a1Sprlw1 	if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
93cd92f1a1Sprlw1 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
94cd92f1a1Sprlw1 	}
95cd92f1a1Sprlw1 	dt_col->cmd_socket_dt = sv[0];
96cd92f1a1Sprlw1 	dt_col->cmd_socket_nsd = sv[1];
97cd92f1a1Sprlw1 
98cd92f1a1Sprlw1 	return dt_col;
99cd92f1a1Sprlw1 }
100cd92f1a1Sprlw1 
dt_collector_destroy(struct dt_collector * dt_col,struct nsd * nsd)101cd92f1a1Sprlw1 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd)
102cd92f1a1Sprlw1 {
103cd92f1a1Sprlw1 	if(!dt_col) return;
104cd92f1a1Sprlw1 	free(nsd->dt_collector_fd_recv);
105cd92f1a1Sprlw1 	nsd->dt_collector_fd_recv = NULL;
106*66a1527dSchristos 	if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap)
107cd92f1a1Sprlw1 		free(nsd->dt_collector_fd_send);
108*66a1527dSchristos 	else
109*66a1527dSchristos 		free(nsd->dt_collector_fd_swap);
110cd92f1a1Sprlw1 	nsd->dt_collector_fd_send = NULL;
111*66a1527dSchristos 	nsd->dt_collector_fd_swap = NULL;
112cd92f1a1Sprlw1 	region_destroy(dt_col->region);
113cd92f1a1Sprlw1 	free(dt_col);
114cd92f1a1Sprlw1 }
115cd92f1a1Sprlw1 
dt_collector_close(struct dt_collector * dt_col,struct nsd * nsd)116cd92f1a1Sprlw1 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd)
117cd92f1a1Sprlw1 {
118*66a1527dSchristos 	int i, *fd_send;
119cd92f1a1Sprlw1 	if(!dt_col) return;
120cd92f1a1Sprlw1 	if(dt_col->cmd_socket_dt != -1) {
121cd92f1a1Sprlw1 		close(dt_col->cmd_socket_dt);
122cd92f1a1Sprlw1 		dt_col->cmd_socket_dt = -1;
123cd92f1a1Sprlw1 	}
124cd92f1a1Sprlw1 	if(dt_col->cmd_socket_nsd != -1) {
125cd92f1a1Sprlw1 		close(dt_col->cmd_socket_nsd);
126cd92f1a1Sprlw1 		dt_col->cmd_socket_nsd = -1;
127cd92f1a1Sprlw1 	}
128*66a1527dSchristos 	fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
129*66a1527dSchristos 	        ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
130cd92f1a1Sprlw1 	for(i=0; i<dt_col->count; i++) {
131cd92f1a1Sprlw1 		if(nsd->dt_collector_fd_recv[i] != -1) {
132cd92f1a1Sprlw1 			close(nsd->dt_collector_fd_recv[i]);
133cd92f1a1Sprlw1 			nsd->dt_collector_fd_recv[i] = -1;
134cd92f1a1Sprlw1 		}
135*66a1527dSchristos 		if(fd_send[i] != -1) {
136*66a1527dSchristos 			close(fd_send[i]);
137*66a1527dSchristos 			fd_send[i] = -1;
138cd92f1a1Sprlw1 		}
139cd92f1a1Sprlw1 	}
140cd92f1a1Sprlw1 }
141cd92f1a1Sprlw1 
142cd92f1a1Sprlw1 /* handle command from nsd to dt collector.
143cd92f1a1Sprlw1  * mostly, check for fd closed, this means we have to exit */
144cd92f1a1Sprlw1 void
dt_handle_cmd_from_nsd(int ATTR_UNUSED (fd),short event,void * arg)145cd92f1a1Sprlw1 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg)
146cd92f1a1Sprlw1 {
147cd92f1a1Sprlw1 	struct dt_collector* dt_col = (struct dt_collector*)arg;
148cd92f1a1Sprlw1 	if((event&EV_READ) != 0) {
149cd92f1a1Sprlw1 		event_base_loopexit(dt_col->event_base, NULL);
150cd92f1a1Sprlw1 	}
151cd92f1a1Sprlw1 }
152cd92f1a1Sprlw1 
153*66a1527dSchristos /* receive data from fd into buffer, 1 when message received, -1 on error */
recv_into_buffer(int fd,struct buffer * buf)154*66a1527dSchristos static int recv_into_buffer(int fd, struct buffer* buf)
155cd92f1a1Sprlw1 {
156cd92f1a1Sprlw1 	size_t msglen;
157cd92f1a1Sprlw1 	ssize_t r;
158cd92f1a1Sprlw1 
159*66a1527dSchristos 	assert(buffer_position(buf) == 0);
160*66a1527dSchristos 	r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT);
161*66a1527dSchristos 	if(r == -1) {
162*66a1527dSchristos 		if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) {
163*66a1527dSchristos 			/* continue to receive a message later */
164*66a1527dSchristos 			return 0;
165*66a1527dSchristos 		}
166*66a1527dSchristos 		log_msg(LOG_ERR, "dnstap collector: receive failed: %s",
167*66a1527dSchristos 			strerror(errno));
168*66a1527dSchristos 		return -1;
169*66a1527dSchristos 	}
170*66a1527dSchristos 	if(r == 0) {
171*66a1527dSchristos 		/* Remote end closed the connection? */
172*66a1527dSchristos 		log_msg(LOG_ERR, "dnstap collector: remote closed connection");
173*66a1527dSchristos 		return -1;
174*66a1527dSchristos 	}
175*66a1527dSchristos 	assert(r > 4);
176cd92f1a1Sprlw1 	msglen = buffer_read_u32_at(buf, 0);
177*66a1527dSchristos 	if(msglen != (size_t)(r - 4)) {
178*66a1527dSchristos 		/* Is this still possible now the communication channel is of
179*66a1527dSchristos 		 * type SOCK_DGRAM? I think not, but better safe than sorry. */
180*66a1527dSchristos 		log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)",
181*66a1527dSchristos 			(unsigned int) msglen);
182cd92f1a1Sprlw1 		return 0;
183cd92f1a1Sprlw1 	}
184cd92f1a1Sprlw1 	buffer_skip(buf, r);
185cd92f1a1Sprlw1 	buffer_flip(buf);
186cd92f1a1Sprlw1 	return 1;
187cd92f1a1Sprlw1 }
188cd92f1a1Sprlw1 
189cd92f1a1Sprlw1 /* submit the content of the buffer received to dnstap */
190cd92f1a1Sprlw1 static void
dt_submit_content(struct dt_env * dt_env,struct buffer * buf)191cd92f1a1Sprlw1 dt_submit_content(struct dt_env* dt_env, struct buffer* buf)
192cd92f1a1Sprlw1 {
193cd92f1a1Sprlw1 	uint8_t is_response, is_tcp;
194cd92f1a1Sprlw1 #ifdef INET6
195*66a1527dSchristos 	struct sockaddr_storage local_addr, addr;
196cd92f1a1Sprlw1 #else
197*66a1527dSchristos 	struct sockaddr_in local_addr, addr;
198cd92f1a1Sprlw1 #endif
199cd92f1a1Sprlw1 	socklen_t addrlen;
200cd92f1a1Sprlw1 	size_t pktlen;
201cd92f1a1Sprlw1 	uint8_t* data;
202cd92f1a1Sprlw1 	size_t zonelen;
203cd92f1a1Sprlw1 	uint8_t* zone;
204cd92f1a1Sprlw1 
205cd92f1a1Sprlw1 	/* parse content from buffer */
206cd92f1a1Sprlw1 	if(!buffer_available(buf, 4+1+4)) return;
207cd92f1a1Sprlw1 	buffer_skip(buf, 4); /* skip msglen */
208cd92f1a1Sprlw1 	is_response = buffer_read_u8(buf);
209cd92f1a1Sprlw1 	addrlen = buffer_read_u32(buf);
210*66a1527dSchristos 	if(addrlen > sizeof(local_addr) || addrlen > sizeof(addr)) return;
211*66a1527dSchristos 	if(!buffer_available(buf, 2*addrlen)) return;
212*66a1527dSchristos 	buffer_read(buf, &local_addr, addrlen);
213cd92f1a1Sprlw1 	buffer_read(buf, &addr, addrlen);
214cd92f1a1Sprlw1 	if(!buffer_available(buf, 1+4)) return;
215cd92f1a1Sprlw1 	is_tcp = buffer_read_u8(buf);
216cd92f1a1Sprlw1 	pktlen = buffer_read_u32(buf);
217cd92f1a1Sprlw1 	if(!buffer_available(buf, pktlen)) return;
218cd92f1a1Sprlw1 	data = buffer_current(buf);
219cd92f1a1Sprlw1 	buffer_skip(buf, pktlen);
220cd92f1a1Sprlw1 	if(!buffer_available(buf, 4)) return;
221cd92f1a1Sprlw1 	zonelen = buffer_read_u32(buf);
222cd92f1a1Sprlw1 	if(zonelen == 0) {
223cd92f1a1Sprlw1 		zone = NULL;
224cd92f1a1Sprlw1 	} else {
225cd92f1a1Sprlw1 		if(zonelen > MAXDOMAINLEN) return;
226cd92f1a1Sprlw1 		if(!buffer_available(buf, zonelen)) return;
227cd92f1a1Sprlw1 		zone = buffer_current(buf);
228cd92f1a1Sprlw1 		buffer_skip(buf, zonelen);
229cd92f1a1Sprlw1 	}
230cd92f1a1Sprlw1 
231cd92f1a1Sprlw1 	/* submit it */
232cd92f1a1Sprlw1 	if(is_response) {
233*66a1527dSchristos 		dt_msg_send_auth_response(dt_env, &local_addr, &addr, is_tcp, zone,
234cd92f1a1Sprlw1 			zonelen, data, pktlen);
235cd92f1a1Sprlw1 	} else {
236*66a1527dSchristos 		dt_msg_send_auth_query(dt_env, &local_addr, &addr, is_tcp, zone,
237cd92f1a1Sprlw1 			zonelen, data, pktlen);
238cd92f1a1Sprlw1 	}
239cd92f1a1Sprlw1 }
240cd92f1a1Sprlw1 
241cd92f1a1Sprlw1 /* handle input from worker for dnstap */
242cd92f1a1Sprlw1 void
dt_handle_input(int fd,short event,void * arg)243cd92f1a1Sprlw1 dt_handle_input(int fd, short event, void* arg)
244cd92f1a1Sprlw1 {
245cd92f1a1Sprlw1 	struct dt_collector_input* dt_input = (struct dt_collector_input*)arg;
246cd92f1a1Sprlw1 	if((event&EV_READ) != 0) {
247*66a1527dSchristos 		/* receive */
248*66a1527dSchristos 		int r = recv_into_buffer(fd, dt_input->buffer);
249*66a1527dSchristos 		if(r == 0)
250cd92f1a1Sprlw1 			return;
251*66a1527dSchristos 		else if(r < 0) {
252*66a1527dSchristos 			event_base_loopexit(dt_input->dt_collector->event_base, NULL);
253*66a1527dSchristos 			return;
254*66a1527dSchristos 		}
255*66a1527dSchristos 		/* once data is complete, send it to dnstap */
256cd92f1a1Sprlw1 		VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d",
257cd92f1a1Sprlw1 			(int)buffer_remaining(dt_input->buffer)));
258cd92f1a1Sprlw1 		if(dt_input->dt_collector->dt_env) {
259cd92f1a1Sprlw1 			dt_submit_content(dt_input->dt_collector->dt_env,
260cd92f1a1Sprlw1 				dt_input->buffer);
261cd92f1a1Sprlw1 		}
262cd92f1a1Sprlw1 
263cd92f1a1Sprlw1 		/* clear buffer for next message */
264cd92f1a1Sprlw1 		buffer_clear(dt_input->buffer);
265cd92f1a1Sprlw1 	}
266cd92f1a1Sprlw1 }
267cd92f1a1Sprlw1 
268cd92f1a1Sprlw1 /* init dnstap */
dt_init_dnstap(struct dt_collector * dt_col,struct nsd * nsd)269cd92f1a1Sprlw1 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd)
270cd92f1a1Sprlw1 {
271cd92f1a1Sprlw1 	int num_workers = 1;
272cd92f1a1Sprlw1 #ifdef HAVE_CHROOT
273cd92f1a1Sprlw1 	if(nsd->chrootdir && nsd->chrootdir[0]) {
274cd92f1a1Sprlw1 		int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */
275cd92f1a1Sprlw1 		if (nsd->options->dnstap_socket_path &&
276cd92f1a1Sprlw1 			nsd->options->dnstap_socket_path[0] == '/' &&
277cd92f1a1Sprlw1 			strncmp(nsd->options->dnstap_socket_path,
278cd92f1a1Sprlw1 				nsd->chrootdir, l) == 0)
279cd92f1a1Sprlw1 			nsd->options->dnstap_socket_path += l;
280cd92f1a1Sprlw1 	}
281cd92f1a1Sprlw1 #endif
282cd92f1a1Sprlw1 	dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path, num_workers);
283cd92f1a1Sprlw1 	if(!dt_col->dt_env) {
284cd92f1a1Sprlw1 		log_msg(LOG_ERR, "could not create dnstap env");
285cd92f1a1Sprlw1 		return;
286cd92f1a1Sprlw1 	}
287cd92f1a1Sprlw1 	dt_apply_cfg(dt_col->dt_env, nsd->options);
288cd92f1a1Sprlw1 	dt_init(dt_col->dt_env);
289cd92f1a1Sprlw1 }
290cd92f1a1Sprlw1 
291cd92f1a1Sprlw1 /* cleanup dt collector process for exit */
dt_collector_cleanup(struct dt_collector * dt_col,struct nsd * nsd)292cd92f1a1Sprlw1 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd)
293cd92f1a1Sprlw1 {
294cd92f1a1Sprlw1 	int i;
295cd92f1a1Sprlw1 	dt_delete(dt_col->dt_env);
296cd92f1a1Sprlw1 	event_del(dt_col->cmd_event);
297cd92f1a1Sprlw1 	for(i=0; i<dt_col->count; i++) {
298cd92f1a1Sprlw1 		event_del(dt_col->inputs[i].event);
299cd92f1a1Sprlw1 	}
300cd92f1a1Sprlw1 	dt_collector_close(dt_col, nsd);
301cd92f1a1Sprlw1 	event_base_free(dt_col->event_base);
302cd92f1a1Sprlw1 #ifdef MEMCLEAN
303cd92f1a1Sprlw1 	free(dt_col->cmd_event);
304cd92f1a1Sprlw1 	if(dt_col->inputs) {
305cd92f1a1Sprlw1 		for(i=0; i<dt_col->count; i++) {
306cd92f1a1Sprlw1 			free(dt_col->inputs[i].event);
307cd92f1a1Sprlw1 		}
308cd92f1a1Sprlw1 		free(dt_col->inputs);
309cd92f1a1Sprlw1 	}
310cd92f1a1Sprlw1 	dt_collector_destroy(dt_col, nsd);
311cd92f1a1Sprlw1 #endif
312cd92f1a1Sprlw1 }
313cd92f1a1Sprlw1 
314cd92f1a1Sprlw1 /* attach events to the event base to listen to the workers and cmd channel */
dt_attach_events(struct dt_collector * dt_col,struct nsd * nsd)315cd92f1a1Sprlw1 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd)
316cd92f1a1Sprlw1 {
317cd92f1a1Sprlw1 	int i;
318cd92f1a1Sprlw1 	/* create event base */
319cd92f1a1Sprlw1 	dt_col->event_base = nsd_child_event_base();
320cd92f1a1Sprlw1 	if(!dt_col->event_base) {
321cd92f1a1Sprlw1 		error("dnstap collector: event_base create failed");
322cd92f1a1Sprlw1 	}
323cd92f1a1Sprlw1 
324cd92f1a1Sprlw1 	/* add command handler */
325cd92f1a1Sprlw1 	dt_col->cmd_event = (struct event*)xalloc_zero(
326cd92f1a1Sprlw1 		sizeof(*dt_col->cmd_event));
327cd92f1a1Sprlw1 	event_set(dt_col->cmd_event, dt_col->cmd_socket_dt,
328cd92f1a1Sprlw1 		EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col);
329cd92f1a1Sprlw1 	if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0)
330cd92f1a1Sprlw1 		log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
331cd92f1a1Sprlw1 	if(event_add(dt_col->cmd_event, NULL) != 0)
332cd92f1a1Sprlw1 		log_msg(LOG_ERR, "dnstap collector: event_add failed");
333cd92f1a1Sprlw1 
334cd92f1a1Sprlw1 	/* add worker input handlers */
335cd92f1a1Sprlw1 	dt_col->inputs = xalloc_array_zero(dt_col->count,
336cd92f1a1Sprlw1 		sizeof(*dt_col->inputs));
337cd92f1a1Sprlw1 	for(i=0; i<dt_col->count; i++) {
338cd92f1a1Sprlw1 		dt_col->inputs[i].dt_collector = dt_col;
339cd92f1a1Sprlw1 		dt_col->inputs[i].event = (struct event*)xalloc_zero(
340cd92f1a1Sprlw1 			sizeof(struct event));
341cd92f1a1Sprlw1 		event_set(dt_col->inputs[i].event,
342cd92f1a1Sprlw1 			nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ,
343cd92f1a1Sprlw1 			dt_handle_input, &dt_col->inputs[i]);
344cd92f1a1Sprlw1 		if(event_base_set(dt_col->event_base,
345cd92f1a1Sprlw1 			dt_col->inputs[i].event) != 0)
346cd92f1a1Sprlw1 			log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
347cd92f1a1Sprlw1 		if(event_add(dt_col->inputs[i].event, NULL) != 0)
348cd92f1a1Sprlw1 			log_msg(LOG_ERR, "dnstap collector: event_add failed");
349cd92f1a1Sprlw1 
350cd92f1a1Sprlw1 		dt_col->inputs[i].buffer = buffer_create(dt_col->region,
351*66a1527dSchristos 			/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
352cd92f1a1Sprlw1 			4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
353cd92f1a1Sprlw1 #ifdef INET6
354*66a1527dSchristos 			sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
355cd92f1a1Sprlw1 #else
356*66a1527dSchristos 			sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
357cd92f1a1Sprlw1 #endif
358cd92f1a1Sprlw1 		);
359cd92f1a1Sprlw1 		assert(buffer_capacity(dt_col->inputs[i].buffer) ==
360cd92f1a1Sprlw1 			buffer_capacity(dt_col->send_buffer));
361cd92f1a1Sprlw1 	}
362cd92f1a1Sprlw1 }
363cd92f1a1Sprlw1 
364cd92f1a1Sprlw1 /* the dnstap collector process main routine */
dt_collector_run(struct dt_collector * dt_col,struct nsd * nsd)365cd92f1a1Sprlw1 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd)
366cd92f1a1Sprlw1 {
367cd92f1a1Sprlw1 	/* init dnstap */
368cd92f1a1Sprlw1 	VERBOSITY(1, (LOG_INFO, "dnstap collector started"));
369cd92f1a1Sprlw1 	dt_init_dnstap(dt_col, nsd);
370cd92f1a1Sprlw1 	dt_attach_events(dt_col, nsd);
371cd92f1a1Sprlw1 
372cd92f1a1Sprlw1 	/* run */
373cd92f1a1Sprlw1 	if(event_base_loop(dt_col->event_base, 0) == -1) {
374cd92f1a1Sprlw1 		error("dnstap collector: event_base_loop failed");
375cd92f1a1Sprlw1 	}
376cd92f1a1Sprlw1 
377cd92f1a1Sprlw1 	/* cleanup and done */
378cd92f1a1Sprlw1 	VERBOSITY(1, (LOG_INFO, "dnstap collector stopped"));
379cd92f1a1Sprlw1 	dt_collector_cleanup(dt_col, nsd);
380cd92f1a1Sprlw1 	exit(0);
381cd92f1a1Sprlw1 }
382cd92f1a1Sprlw1 
dt_collector_start(struct dt_collector * dt_col,struct nsd * nsd)383cd92f1a1Sprlw1 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd)
384cd92f1a1Sprlw1 {
385*66a1527dSchristos 	int i, *fd_send;
386cd92f1a1Sprlw1 	/* fork */
387cd92f1a1Sprlw1 	dt_col->dt_pid = fork();
388cd92f1a1Sprlw1 	if(dt_col->dt_pid == -1) {
389cd92f1a1Sprlw1 		error("dnstap_collector: fork failed: %s", strerror(errno));
390cd92f1a1Sprlw1 	}
391cd92f1a1Sprlw1 	if(dt_col->dt_pid == 0) {
392cd92f1a1Sprlw1 		/* the dt collector process is this */
393cd92f1a1Sprlw1 		/* close the nsd side of the command channel */
394cd92f1a1Sprlw1 		close(dt_col->cmd_socket_nsd);
395cd92f1a1Sprlw1 		dt_col->cmd_socket_nsd = -1;
396*66a1527dSchristos 
397*66a1527dSchristos 		/* close the send side of the communication channels */
398*66a1527dSchristos 		assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap);
399*66a1527dSchristos 		fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
400*66a1527dSchristos 			? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
401*66a1527dSchristos 		for(i=0; i<dt_col->count; i++) {
402*66a1527dSchristos 			if(fd_send[i] != -1) {
403*66a1527dSchristos 				close(fd_send[i]);
404*66a1527dSchristos 				fd_send[i] = -1;
405*66a1527dSchristos 			}
406*66a1527dSchristos 		}
407*66a1527dSchristos #ifdef HAVE_SETPROCTITLE
408*66a1527dSchristos 		setproctitle("dnstap_collector");
409*66a1527dSchristos #endif
410*66a1527dSchristos 		/* Free serve process specific memory pages */
411*66a1527dSchristos #ifdef RATELIMIT
412*66a1527dSchristos 		rrl_mmap_deinit_keep_mmap();
413*66a1527dSchristos #endif
414*66a1527dSchristos 		udb_base_free_keep_mmap(nsd->task[0]);
415*66a1527dSchristos 		udb_base_free_keep_mmap(nsd->task[1]);
416*66a1527dSchristos 		namedb_close_udb(nsd->db); /* keeps mmap */
417*66a1527dSchristos 		namedb_close(nsd->db);
418*66a1527dSchristos 
419cd92f1a1Sprlw1 		dt_collector_run(dt_col, nsd);
420cd92f1a1Sprlw1 		/* NOTREACH */
421cd92f1a1Sprlw1 		exit(0);
422cd92f1a1Sprlw1 	} else {
423cd92f1a1Sprlw1 		/* the parent continues on, with starting NSD */
424cd92f1a1Sprlw1 		/* close the dt side of the command channel */
425cd92f1a1Sprlw1 		close(dt_col->cmd_socket_dt);
426cd92f1a1Sprlw1 		dt_col->cmd_socket_dt = -1;
427*66a1527dSchristos 
428*66a1527dSchristos 		/* close the receive side of the communication channels */
429*66a1527dSchristos 		for(i=0; i<dt_col->count; i++) {
430*66a1527dSchristos 			if(nsd->dt_collector_fd_recv[i] != -1) {
431*66a1527dSchristos 				close(nsd->dt_collector_fd_recv[i]);
432*66a1527dSchristos 				nsd->dt_collector_fd_recv[i] = -1;
433*66a1527dSchristos 			}
434*66a1527dSchristos 		}
435cd92f1a1Sprlw1 	}
436cd92f1a1Sprlw1 }
437cd92f1a1Sprlw1 
438cd92f1a1Sprlw1 /* put data for sending to the collector process into the buffer */
439cd92f1a1Sprlw1 static int
prep_send_data(struct buffer * buf,uint8_t is_response,struct sockaddr_storage * local_addr,struct sockaddr_storage * addr,socklen_t addrlen,int is_tcp,struct buffer * packet,struct zone * zone)440cd92f1a1Sprlw1 prep_send_data(struct buffer* buf, uint8_t is_response,
441cd92f1a1Sprlw1 #ifdef INET6
442*66a1527dSchristos 	struct sockaddr_storage* local_addr,
443cd92f1a1Sprlw1 	struct sockaddr_storage* addr,
444cd92f1a1Sprlw1 #else
445*66a1527dSchristos 	struct sockaddr_in* local_addr,
446cd92f1a1Sprlw1 	struct sockaddr_in* addr,
447cd92f1a1Sprlw1 #endif
448cd92f1a1Sprlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet,
449cd92f1a1Sprlw1 	struct zone* zone)
450cd92f1a1Sprlw1 {
451cd92f1a1Sprlw1 	buffer_clear(buf);
452*66a1527dSchristos #ifdef INET6
453*66a1527dSchristos 	if(local_addr->ss_family != addr->ss_family)
454*66a1527dSchristos 		return 0; /* must be same length to send */
455*66a1527dSchristos #else
456*66a1527dSchristos 	if(local_addr->sin_family != addr->sin_family)
457*66a1527dSchristos 		return 0; /* must be same length to send */
458*66a1527dSchristos #endif
459*66a1527dSchristos 	if(!buffer_available(buf, 4+1+4+2*addrlen+1+4+buffer_remaining(packet)))
460cd92f1a1Sprlw1 		return 0; /* does not fit in send_buffer, log is dropped */
461cd92f1a1Sprlw1 	buffer_skip(buf, 4); /* the length of the message goes here */
462cd92f1a1Sprlw1 	buffer_write_u8(buf, is_response);
463cd92f1a1Sprlw1 	buffer_write_u32(buf, addrlen);
464*66a1527dSchristos 	buffer_write(buf, local_addr, (size_t)addrlen);
465cd92f1a1Sprlw1 	buffer_write(buf, addr, (size_t)addrlen);
466cd92f1a1Sprlw1 	buffer_write_u8(buf, (is_tcp?1:0));
467cd92f1a1Sprlw1 	buffer_write_u32(buf, buffer_remaining(packet));
468cd92f1a1Sprlw1 	buffer_write(buf, buffer_begin(packet), buffer_remaining(packet));
469cd92f1a1Sprlw1 	if(zone && zone->apex && domain_dname(zone->apex)) {
470cd92f1a1Sprlw1 		if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size))
471cd92f1a1Sprlw1 			return 0;
472cd92f1a1Sprlw1 		buffer_write_u32(buf, domain_dname(zone->apex)->name_size);
473cd92f1a1Sprlw1 		buffer_write(buf, dname_name(domain_dname(zone->apex)),
474cd92f1a1Sprlw1 			domain_dname(zone->apex)->name_size);
475cd92f1a1Sprlw1 	} else {
476cd92f1a1Sprlw1 		if(!buffer_available(buf, 4))
477cd92f1a1Sprlw1 			return 0;
478cd92f1a1Sprlw1 		buffer_write_u32(buf, 0);
479cd92f1a1Sprlw1 	}
480cd92f1a1Sprlw1 
481cd92f1a1Sprlw1 	buffer_flip(buf);
482cd92f1a1Sprlw1 	/* write length of message */
483cd92f1a1Sprlw1 	buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4);
484cd92f1a1Sprlw1 	return 1;
485cd92f1a1Sprlw1 }
486cd92f1a1Sprlw1 
487*66a1527dSchristos /* attempt to send buffer to socket, if it blocks do not send it.
488*66a1527dSchristos  * return 0 on success, -1 on error */
attempt_to_send(int s,uint8_t * data,size_t len)489*66a1527dSchristos static int attempt_to_send(int s, uint8_t* data, size_t len)
490cd92f1a1Sprlw1 {
491cd92f1a1Sprlw1 	ssize_t r;
492*66a1527dSchristos 	if(len == 0)
493*66a1527dSchristos 		return 0;
494*66a1527dSchristos 	r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL);
495cd92f1a1Sprlw1 	if(r == -1) {
496*66a1527dSchristos 		if(errno == EAGAIN || errno == EINTR ||
497*66a1527dSchristos 				errno == ENOBUFS || errno == EMSGSIZE) {
498*66a1527dSchristos 			/* check if pipe is full, if the nonblocking fd blocks,
499*66a1527dSchristos 			 * then drop the message */
500*66a1527dSchristos 			return 0;
501cd92f1a1Sprlw1 		}
502*66a1527dSchristos 		/* some sort of error, print it */
503*66a1527dSchristos 		log_msg(LOG_ERR, "dnstap collector: send failed: %s",
504cd92f1a1Sprlw1 			strerror(errno));
505*66a1527dSchristos 		return -1;
506cd92f1a1Sprlw1 	}
507*66a1527dSchristos 	assert(r > 0);
508*66a1527dSchristos 	if(r > 0) {
509*66a1527dSchristos 		assert((size_t)r == len);
510*66a1527dSchristos 		return 0;
511cd92f1a1Sprlw1 	}
512*66a1527dSchristos 	/* Other end closed the channel? */
513*66a1527dSchristos 	log_msg(LOG_ERR, "dnstap collector: server child closed the channel");
514*66a1527dSchristos 	return -1;
515cd92f1a1Sprlw1 }
516cd92f1a1Sprlw1 
dt_collector_submit_auth_query(struct nsd * nsd,struct sockaddr_storage * local_addr,struct sockaddr_storage * addr,socklen_t addrlen,int is_tcp,struct buffer * packet)517cd92f1a1Sprlw1 void dt_collector_submit_auth_query(struct nsd* nsd,
518cd92f1a1Sprlw1 #ifdef INET6
519*66a1527dSchristos 	struct sockaddr_storage* local_addr,
520cd92f1a1Sprlw1 	struct sockaddr_storage* addr,
521cd92f1a1Sprlw1 #else
522*66a1527dSchristos 	struct sockaddr_in* local_addr,
523cd92f1a1Sprlw1 	struct sockaddr_in* addr,
524cd92f1a1Sprlw1 #endif
525cd92f1a1Sprlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet)
526cd92f1a1Sprlw1 {
527cd92f1a1Sprlw1 	if(!nsd->dt_collector) return;
528cd92f1a1Sprlw1 	if(!nsd->options->dnstap_log_auth_query_messages) return;
529*66a1527dSchristos 	if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
530cd92f1a1Sprlw1 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth query"));
531cd92f1a1Sprlw1 
532cd92f1a1Sprlw1 	/* marshal data into send buffer */
533*66a1527dSchristos 	if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, addrlen,
534cd92f1a1Sprlw1 		is_tcp, packet, NULL))
535cd92f1a1Sprlw1 		return; /* probably did not fit in buffer */
536cd92f1a1Sprlw1 
537cd92f1a1Sprlw1 	/* attempt to send data; do not block */
538*66a1527dSchristos 	if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
539cd92f1a1Sprlw1 			buffer_begin(nsd->dt_collector->send_buffer),
540*66a1527dSchristos 			buffer_remaining(nsd->dt_collector->send_buffer))) {
541*66a1527dSchristos 		/* Something went wrong sending to the socket. Don't send to
542*66a1527dSchristos 		 * this socket again. */
543*66a1527dSchristos 		close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
544*66a1527dSchristos 		nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
545*66a1527dSchristos 	}
546cd92f1a1Sprlw1 }
547cd92f1a1Sprlw1 
dt_collector_submit_auth_response(struct nsd * nsd,struct sockaddr_storage * local_addr,struct sockaddr_storage * addr,socklen_t addrlen,int is_tcp,struct buffer * packet,struct zone * zone)548cd92f1a1Sprlw1 void dt_collector_submit_auth_response(struct nsd* nsd,
549cd92f1a1Sprlw1 #ifdef INET6
550*66a1527dSchristos 	struct sockaddr_storage* local_addr,
551cd92f1a1Sprlw1 	struct sockaddr_storage* addr,
552cd92f1a1Sprlw1 #else
553*66a1527dSchristos 	struct sockaddr_in* local_addr,
554cd92f1a1Sprlw1 	struct sockaddr_in* addr,
555cd92f1a1Sprlw1 #endif
556cd92f1a1Sprlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet,
557cd92f1a1Sprlw1 	struct zone* zone)
558cd92f1a1Sprlw1 {
559cd92f1a1Sprlw1 	if(!nsd->dt_collector) return;
560cd92f1a1Sprlw1 	if(!nsd->options->dnstap_log_auth_response_messages) return;
561*66a1527dSchristos 	if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
562cd92f1a1Sprlw1 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth response"));
563cd92f1a1Sprlw1 
564cd92f1a1Sprlw1 	/* marshal data into send buffer */
565*66a1527dSchristos 	if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, addrlen,
566cd92f1a1Sprlw1 		is_tcp, packet, zone))
567cd92f1a1Sprlw1 		return; /* probably did not fit in buffer */
568cd92f1a1Sprlw1 
569cd92f1a1Sprlw1 	/* attempt to send data; do not block */
570*66a1527dSchristos 	if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
571cd92f1a1Sprlw1 			buffer_begin(nsd->dt_collector->send_buffer),
572*66a1527dSchristos 			buffer_remaining(nsd->dt_collector->send_buffer))) {
573*66a1527dSchristos 		/* Something went wrong sending to the socket. Don't send to
574*66a1527dSchristos 		 * this socket again. */
575*66a1527dSchristos 		close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
576*66a1527dSchristos 		nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
577*66a1527dSchristos 	}
578cd92f1a1Sprlw1 }
579