1 /*
2  * dnstap/dnstap_collector.c -- nsd collector process for dnstap information
3  *
4  * Copyright (c) 2018, NLnet Labs. All rights reserved.
5  *
6  * See LICENSE for the license.
7  *
8  */
9 
10 #include "config.h"
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #ifndef USE_MINI_EVENT
17 #  ifdef HAVE_EVENT_H
18 #    include <event.h>
19 #  else
20 #    include <event2/event.h>
21 #    include "event2/event_struct.h"
22 #    include "event2/event_compat.h"
23 #  endif
24 #else
25 #  include "mini_event.h"
26 #endif
27 #include "dnstap/dnstap_collector.h"
28 #include "dnstap/dnstap.h"
29 #include "util.h"
30 #include "nsd.h"
31 #include "region-allocator.h"
32 #include "buffer.h"
33 #include "namedb.h"
34 #include "options.h"
35 
36 struct dt_collector* dt_collector_create(struct nsd* nsd)
37 {
38 	int i, sv[2];
39 	struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero(
40 		sizeof(*dt_col));
41 	dt_col->count = nsd->child_count;
42 	dt_col->dt_env = NULL;
43 	dt_col->region = region_create(xalloc, free);
44 	dt_col->send_buffer = buffer_create(dt_col->region,
45 		/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + addr */
46 		4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
47 #ifdef INET6
48 		sizeof(struct sockaddr_storage)
49 #else
50 		sizeof(struct sockaddr_in)
51 #endif
52 		);
53 
54 	/* open pipes in struct nsd */
55 	nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count,
56 		sizeof(int));
57 	nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count,
58 		sizeof(int));
59 	for(i=0; i<dt_col->count; i++) {
60 		int fd[2];
61 		fd[0] = -1;
62 		fd[1] = -1;
63 		if(pipe(fd) < 0) {
64 			error("dnstap_collector: cannot create pipe: %s",
65 				strerror(errno));
66 		}
67 		if(fcntl(fd[0], F_SETFL, O_NONBLOCK) == -1) {
68 			log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
69 		}
70 		if(fcntl(fd[1], F_SETFL, O_NONBLOCK) == -1) {
71 			log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
72 		}
73 		nsd->dt_collector_fd_recv[i] = fd[0];
74 		nsd->dt_collector_fd_send[i] = fd[1];
75 	}
76 
77 	/* open socketpair */
78 	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
79 		error("dnstap_collector: cannot create socketpair: %s",
80 			strerror(errno));
81 	}
82 	if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
83 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
84 	}
85 	if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
86 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
87 	}
88 	dt_col->cmd_socket_dt = sv[0];
89 	dt_col->cmd_socket_nsd = sv[1];
90 
91 	return dt_col;
92 }
93 
94 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd)
95 {
96 	if(!dt_col) return;
97 	free(nsd->dt_collector_fd_recv);
98 	nsd->dt_collector_fd_recv = NULL;
99 	free(nsd->dt_collector_fd_send);
100 	nsd->dt_collector_fd_send = NULL;
101 	region_destroy(dt_col->region);
102 	free(dt_col);
103 }
104 
105 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd)
106 {
107 	int i;
108 	if(!dt_col) return;
109 	if(dt_col->cmd_socket_dt != -1) {
110 		close(dt_col->cmd_socket_dt);
111 		dt_col->cmd_socket_dt = -1;
112 	}
113 	if(dt_col->cmd_socket_nsd != -1) {
114 		close(dt_col->cmd_socket_nsd);
115 		dt_col->cmd_socket_nsd = -1;
116 	}
117 	for(i=0; i<dt_col->count; i++) {
118 		if(nsd->dt_collector_fd_recv[i] != -1) {
119 			close(nsd->dt_collector_fd_recv[i]);
120 			nsd->dt_collector_fd_recv[i] = -1;
121 		}
122 		if(nsd->dt_collector_fd_send[i] != -1) {
123 			close(nsd->dt_collector_fd_send[i]);
124 			nsd->dt_collector_fd_send[i] = -1;
125 		}
126 	}
127 }
128 
129 /* handle command from nsd to dt collector.
130  * mostly, check for fd closed, this means we have to exit */
131 void
132 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg)
133 {
134 	struct dt_collector* dt_col = (struct dt_collector*)arg;
135 	if((event&EV_READ) != 0) {
136 		event_base_loopexit(dt_col->event_base, NULL);
137 	}
138 }
139 
140 /* read data from fd into buffer, true when message is complete */
141 static int read_into_buffer(int fd, struct buffer* buf)
142 {
143 	size_t msglen;
144 	ssize_t r;
145 	if(buffer_position(buf) < 4) {
146 		/* read the length of the message */
147 		r = read(fd, buffer_current(buf), 4 - buffer_position(buf));
148 		if(r == -1) {
149 			if(errno == EAGAIN || errno == EINTR) {
150 				/* continue to read later */
151 				return 0;
152 			}
153 			log_msg(LOG_ERR, "dnstap collector: read failed: %s",
154 				strerror(errno));
155 			return 0;
156 		}
157 		buffer_skip(buf, r);
158 		if(buffer_position(buf) < 4)
159 			return 0; /* continue to read more msglen later */
160 	}
161 
162 	/* msglen complete */
163 	msglen = buffer_read_u32_at(buf, 0);
164 	/* assert we have enough space, if we don't and we wanted to continue,
165 	 * we would have to skip the message somehow, but that should never
166 	 * happen because send_buffer and receive_buffer have the same size */
167 	assert(buffer_capacity(buf) >= msglen + 4);
168 	r = read(fd, buffer_current(buf), msglen - (buffer_position(buf) - 4));
169 	if(r == -1) {
170 		if(errno == EAGAIN || errno == EINTR) {
171 			/* continue to read later */
172 			return 0;
173 		}
174 		log_msg(LOG_ERR, "dnstap collector: read failed: %s",
175 			strerror(errno));
176 		return 0;
177 	}
178 	buffer_skip(buf, r);
179 	if(buffer_position(buf) < 4 + msglen)
180 		return 0; /* read more msg later */
181 
182 	/* msg complete */
183 	buffer_flip(buf);
184 	return 1;
185 }
186 
187 /* submit the content of the buffer received to dnstap */
188 static void
189 dt_submit_content(struct dt_env* dt_env, struct buffer* buf)
190 {
191 	uint8_t is_response, is_tcp;
192 #ifdef INET6
193 	struct sockaddr_storage addr;
194 #else
195 	struct sockaddr_in addr;
196 #endif
197 	socklen_t addrlen;
198 	size_t pktlen;
199 	uint8_t* data;
200 	size_t zonelen;
201 	uint8_t* zone;
202 
203 	/* parse content from buffer */
204 	if(!buffer_available(buf, 4+1+4)) return;
205 	buffer_skip(buf, 4); /* skip msglen */
206 	is_response = buffer_read_u8(buf);
207 	addrlen = buffer_read_u32(buf);
208 	if(addrlen > sizeof(addr)) return;
209 	if(!buffer_available(buf, addrlen)) return;
210 	buffer_read(buf, &addr, addrlen);
211 	if(!buffer_available(buf, 1+4)) return;
212 	is_tcp = buffer_read_u8(buf);
213 	pktlen = buffer_read_u32(buf);
214 	if(!buffer_available(buf, pktlen)) return;
215 	data = buffer_current(buf);
216 	buffer_skip(buf, pktlen);
217 	if(!buffer_available(buf, 4)) return;
218 	zonelen = buffer_read_u32(buf);
219 	if(zonelen == 0) {
220 		zone = NULL;
221 	} else {
222 		if(zonelen > MAXDOMAINLEN) return;
223 		if(!buffer_available(buf, zonelen)) return;
224 		zone = buffer_current(buf);
225 		buffer_skip(buf, zonelen);
226 	}
227 
228 	/* submit it */
229 	if(is_response) {
230 		dt_msg_send_auth_response(dt_env, &addr, is_tcp, zone,
231 			zonelen, data, pktlen);
232 	} else {
233 		dt_msg_send_auth_query(dt_env, &addr, is_tcp, zone,
234 			zonelen, data, pktlen);
235 	}
236 }
237 
238 /* handle input from worker for dnstap */
239 void
240 dt_handle_input(int fd, short event, void* arg)
241 {
242 	struct dt_collector_input* dt_input = (struct dt_collector_input*)arg;
243 	if((event&EV_READ) != 0) {
244 		/* read */
245 		if(!read_into_buffer(fd, dt_input->buffer))
246 			return;
247 
248 		/* once data is complete, write it to dnstap */
249 		VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d",
250 			(int)buffer_remaining(dt_input->buffer)));
251 		if(dt_input->dt_collector->dt_env) {
252 			dt_submit_content(dt_input->dt_collector->dt_env,
253 				dt_input->buffer);
254 		}
255 
256 		/* clear buffer for next message */
257 		buffer_clear(dt_input->buffer);
258 	}
259 }
260 
261 /* init dnstap */
262 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd)
263 {
264 	int num_workers = 1;
265 #ifdef HAVE_CHROOT
266 	if(nsd->chrootdir && nsd->chrootdir[0]) {
267 		int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */
268 		if (nsd->options->dnstap_socket_path &&
269 			nsd->options->dnstap_socket_path[0] == '/' &&
270 			strncmp(nsd->options->dnstap_socket_path,
271 				nsd->chrootdir, l) == 0)
272 			nsd->options->dnstap_socket_path += l;
273 	}
274 #endif
275 	dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path, num_workers);
276 	if(!dt_col->dt_env) {
277 		log_msg(LOG_ERR, "could not create dnstap env");
278 		return;
279 	}
280 	dt_apply_cfg(dt_col->dt_env, nsd->options);
281 	dt_init(dt_col->dt_env);
282 }
283 
284 /* cleanup dt collector process for exit */
285 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd)
286 {
287 	int i;
288 	dt_delete(dt_col->dt_env);
289 	event_del(dt_col->cmd_event);
290 	for(i=0; i<dt_col->count; i++) {
291 		event_del(dt_col->inputs[i].event);
292 	}
293 	dt_collector_close(dt_col, nsd);
294 	event_base_free(dt_col->event_base);
295 #ifdef MEMCLEAN
296 	free(dt_col->cmd_event);
297 	if(dt_col->inputs) {
298 		for(i=0; i<dt_col->count; i++) {
299 			free(dt_col->inputs[i].event);
300 		}
301 		free(dt_col->inputs);
302 	}
303 	dt_collector_destroy(dt_col, nsd);
304 #endif
305 }
306 
307 /* attach events to the event base to listen to the workers and cmd channel */
308 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd)
309 {
310 	int i;
311 	/* create event base */
312 	dt_col->event_base = nsd_child_event_base();
313 	if(!dt_col->event_base) {
314 		error("dnstap collector: event_base create failed");
315 	}
316 
317 	/* add command handler */
318 	dt_col->cmd_event = (struct event*)xalloc_zero(
319 		sizeof(*dt_col->cmd_event));
320 	event_set(dt_col->cmd_event, dt_col->cmd_socket_dt,
321 		EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col);
322 	if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0)
323 		log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
324 	if(event_add(dt_col->cmd_event, NULL) != 0)
325 		log_msg(LOG_ERR, "dnstap collector: event_add failed");
326 
327 	/* add worker input handlers */
328 	dt_col->inputs = xalloc_array_zero(dt_col->count,
329 		sizeof(*dt_col->inputs));
330 	for(i=0; i<dt_col->count; i++) {
331 		dt_col->inputs[i].dt_collector = dt_col;
332 		dt_col->inputs[i].event = (struct event*)xalloc_zero(
333 			sizeof(struct event));
334 		event_set(dt_col->inputs[i].event,
335 			nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ,
336 			dt_handle_input, &dt_col->inputs[i]);
337 		if(event_base_set(dt_col->event_base,
338 			dt_col->inputs[i].event) != 0)
339 			log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
340 		if(event_add(dt_col->inputs[i].event, NULL) != 0)
341 			log_msg(LOG_ERR, "dnstap collector: event_add failed");
342 
343 		dt_col->inputs[i].buffer = buffer_create(dt_col->region,
344 			/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + addr */
345 			4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
346 #ifdef INET6
347 			sizeof(struct sockaddr_storage)
348 #else
349 			sizeof(struct sockaddr_in)
350 #endif
351 		);
352 		assert(buffer_capacity(dt_col->inputs[i].buffer) ==
353 			buffer_capacity(dt_col->send_buffer));
354 	}
355 }
356 
357 /* the dnstap collector process main routine */
358 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd)
359 {
360 	/* init dnstap */
361 	VERBOSITY(1, (LOG_INFO, "dnstap collector started"));
362 	dt_init_dnstap(dt_col, nsd);
363 	dt_attach_events(dt_col, nsd);
364 
365 	/* run */
366 	if(event_base_loop(dt_col->event_base, 0) == -1) {
367 		error("dnstap collector: event_base_loop failed");
368 	}
369 
370 	/* cleanup and done */
371 	VERBOSITY(1, (LOG_INFO, "dnstap collector stopped"));
372 	dt_collector_cleanup(dt_col, nsd);
373 	exit(0);
374 }
375 
376 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd)
377 {
378 	/* fork */
379 	dt_col->dt_pid = fork();
380 	if(dt_col->dt_pid == -1) {
381 		error("dnstap_collector: fork failed: %s", strerror(errno));
382 	}
383 	if(dt_col->dt_pid == 0) {
384 		/* the dt collector process is this */
385 		/* close the nsd side of the command channel */
386 		close(dt_col->cmd_socket_nsd);
387 		dt_col->cmd_socket_nsd = -1;
388 		dt_collector_run(dt_col, nsd);
389 		/* NOTREACH */
390 		exit(0);
391 	} else {
392 		/* the parent continues on, with starting NSD */
393 		/* close the dt side of the command channel */
394 		close(dt_col->cmd_socket_dt);
395 		dt_col->cmd_socket_dt = -1;
396 	}
397 }
398 
399 /* put data for sending to the collector process into the buffer */
400 static int
401 prep_send_data(struct buffer* buf, uint8_t is_response,
402 #ifdef INET6
403 	struct sockaddr_storage* addr,
404 #else
405 	struct sockaddr_in* addr,
406 #endif
407 	socklen_t addrlen, int is_tcp, struct buffer* packet,
408 	struct zone* zone)
409 {
410 	buffer_clear(buf);
411 	if(!buffer_available(buf, 4+1+4+addrlen+1+4+buffer_remaining(packet)))
412 		return 0; /* does not fit in send_buffer, log is dropped */
413 	buffer_skip(buf, 4); /* the length of the message goes here */
414 	buffer_write_u8(buf, is_response);
415 	buffer_write_u32(buf, addrlen);
416 	buffer_write(buf, addr, (size_t)addrlen);
417 	buffer_write_u8(buf, (is_tcp?1:0));
418 	buffer_write_u32(buf, buffer_remaining(packet));
419 	buffer_write(buf, buffer_begin(packet), buffer_remaining(packet));
420 	if(zone && zone->apex && domain_dname(zone->apex)) {
421 		if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size))
422 			return 0;
423 		buffer_write_u32(buf, domain_dname(zone->apex)->name_size);
424 		buffer_write(buf, dname_name(domain_dname(zone->apex)),
425 			domain_dname(zone->apex)->name_size);
426 	} else {
427 		if(!buffer_available(buf, 4))
428 			return 0;
429 		buffer_write_u32(buf, 0);
430 	}
431 
432 	buffer_flip(buf);
433 	/* write length of message */
434 	buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4);
435 	return 1;
436 }
437 
438 /* attempt to write buffer to socket, if it blocks do not write it. */
439 static void attempt_to_write(int s, uint8_t* data, size_t len)
440 {
441 	size_t total = 0;
442 	ssize_t r;
443 	while(total < len) {
444 		r = write(s, data+total, len-total);
445 		if(r == -1) {
446 			if(errno == EAGAIN && total == 0) {
447 				/* on first write part, check if pipe is full,
448 				 * if the nonblocking fd blocks, then drop
449 				 * the message */
450 				return;
451 			}
452 			if(errno != EAGAIN && errno != EINTR) {
453 				/* some sort of error, print it and drop it */
454 				log_msg(LOG_ERR,
455 					"dnstap collector: write failed: %s",
456 					strerror(errno));
457 				return;
458 			}
459 			/* continue and write this again */
460 			/* for EINTR, we have to do this,
461 			 * for EAGAIN, if the first part succeeded, we have
462 			 * to continue to write the remainder of the message,
463 			 * because otherwise partial messages confuse the
464 			 * receiver. */
465 			continue;
466 		}
467 		total += r;
468 	}
469 }
470 
471 void dt_collector_submit_auth_query(struct nsd* nsd,
472 #ifdef INET6
473 	struct sockaddr_storage* addr,
474 #else
475 	struct sockaddr_in* addr,
476 #endif
477 	socklen_t addrlen, int is_tcp, struct buffer* packet)
478 {
479 	if(!nsd->dt_collector) return;
480 	if(!nsd->options->dnstap_log_auth_query_messages) return;
481 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth query"));
482 
483 	/* marshal data into send buffer */
484 	if(!prep_send_data(nsd->dt_collector->send_buffer, 0, addr, addrlen,
485 		is_tcp, packet, NULL))
486 		return; /* probably did not fit in buffer */
487 
488 	/* attempt to send data; do not block */
489 	attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num],
490 		buffer_begin(nsd->dt_collector->send_buffer),
491 		buffer_remaining(nsd->dt_collector->send_buffer));
492 }
493 
494 void dt_collector_submit_auth_response(struct nsd* nsd,
495 #ifdef INET6
496 	struct sockaddr_storage* addr,
497 #else
498 	struct sockaddr_in* addr,
499 #endif
500 	socklen_t addrlen, int is_tcp, struct buffer* packet,
501 	struct zone* zone)
502 {
503 	if(!nsd->dt_collector) return;
504 	if(!nsd->options->dnstap_log_auth_response_messages) return;
505 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth response"));
506 
507 	/* marshal data into send buffer */
508 	if(!prep_send_data(nsd->dt_collector->send_buffer, 1, addr, addrlen,
509 		is_tcp, packet, zone))
510 		return; /* probably did not fit in buffer */
511 
512 	/* attempt to send data; do not block */
513 	attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num],
514 		buffer_begin(nsd->dt_collector->send_buffer),
515 		buffer_remaining(nsd->dt_collector->send_buffer));
516 }
517