1 /* 2 * dnstap/dnstap_collector.c -- nsd collector process for dnstap information 3 * 4 * Copyright (c) 2018, NLnet Labs. All rights reserved. 5 * 6 * See LICENSE for the license. 7 * 8 */ 9 10 #include "config.h" 11 #include <sys/types.h> 12 #include <sys/socket.h> 13 #include <errno.h> 14 #include <fcntl.h> 15 #include <unistd.h> 16 #ifndef USE_MINI_EVENT 17 # ifdef HAVE_EVENT_H 18 # include <event.h> 19 # else 20 # include <event2/event.h> 21 # include "event2/event_struct.h" 22 # include "event2/event_compat.h" 23 # endif 24 #else 25 # include "mini_event.h" 26 #endif 27 #include "dnstap/dnstap_collector.h" 28 #include "dnstap/dnstap.h" 29 #include "util.h" 30 #include "nsd.h" 31 #include "region-allocator.h" 32 #include "buffer.h" 33 #include "namedb.h" 34 #include "options.h" 35 36 #include "udb.h" 37 #include "rrl.h" 38 39 struct dt_collector* dt_collector_create(struct nsd* nsd) 40 { 41 int i, sv[2]; 42 struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero( 43 sizeof(*dt_col)); 44 dt_col->count = nsd->child_count * 2; 45 dt_col->dt_env = NULL; 46 dt_col->region = region_create(xalloc, free); 47 dt_col->send_buffer = buffer_create(dt_col->region, 48 /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */ 49 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 50 #ifdef INET6 51 sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage) 52 #else 53 sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in) 54 #endif 55 ); 56 57 /* open communication channels in struct nsd */ 58 nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count, 59 sizeof(int)); 60 nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count, 61 sizeof(int)); 62 for(i=0; i<dt_col->count; i++) { 63 int sv[2]; 64 int bufsz = buffer_capacity(dt_col->send_buffer); 65 sv[0] = -1; /* For receiving by parent (dnstap-collector) */ 66 sv[1] = -1; /* For sending by child (server childs) */ 67 if(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sv) < 0) { 68 error("dnstap_collector: cannot create communication channel: %s", 69 strerror(errno)); 70 } 71 if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) { 72 log_msg(LOG_ERR, "setting dnstap_collector " 73 "receive buffer size failed: %s", strerror(errno)); 74 } 75 if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) { 76 log_msg(LOG_ERR, "setting dnstap_collector " 77 "send buffer size failed: %s", strerror(errno)); 78 } 79 nsd->dt_collector_fd_recv[i] = sv[0]; 80 nsd->dt_collector_fd_send[i] = sv[1]; 81 } 82 nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count; 83 84 /* open socketpair */ 85 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 86 error("dnstap_collector: cannot create socketpair: %s", 87 strerror(errno)); 88 } 89 if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) { 90 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 91 } 92 if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) { 93 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 94 } 95 dt_col->cmd_socket_dt = sv[0]; 96 dt_col->cmd_socket_nsd = sv[1]; 97 98 return dt_col; 99 } 100 101 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd) 102 { 103 if(!dt_col) return; 104 free(nsd->dt_collector_fd_recv); 105 nsd->dt_collector_fd_recv = NULL; 106 if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap) 107 free(nsd->dt_collector_fd_send); 108 else 109 free(nsd->dt_collector_fd_swap); 110 nsd->dt_collector_fd_send = NULL; 111 nsd->dt_collector_fd_swap = NULL; 112 region_destroy(dt_col->region); 113 free(dt_col); 114 } 115 116 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd) 117 { 118 int i, *fd_send; 119 if(!dt_col) return; 120 if(dt_col->cmd_socket_dt != -1) { 121 close(dt_col->cmd_socket_dt); 122 dt_col->cmd_socket_dt = -1; 123 } 124 if(dt_col->cmd_socket_nsd != -1) { 125 close(dt_col->cmd_socket_nsd); 126 dt_col->cmd_socket_nsd = -1; 127 } 128 fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap 129 ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap; 130 for(i=0; i<dt_col->count; i++) { 131 if(nsd->dt_collector_fd_recv[i] != -1) { 132 close(nsd->dt_collector_fd_recv[i]); 133 nsd->dt_collector_fd_recv[i] = -1; 134 } 135 if(fd_send[i] != -1) { 136 close(fd_send[i]); 137 fd_send[i] = -1; 138 } 139 } 140 } 141 142 /* handle command from nsd to dt collector. 143 * mostly, check for fd closed, this means we have to exit */ 144 void 145 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg) 146 { 147 struct dt_collector* dt_col = (struct dt_collector*)arg; 148 if((event&EV_READ) != 0) { 149 event_base_loopexit(dt_col->event_base, NULL); 150 } 151 } 152 153 /* receive data from fd into buffer, 1 when message received, -1 on error */ 154 static int recv_into_buffer(int fd, struct buffer* buf) 155 { 156 size_t msglen; 157 ssize_t r; 158 159 assert(buffer_position(buf) == 0); 160 r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT); 161 if(r == -1) { 162 if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) { 163 /* continue to receive a message later */ 164 return 0; 165 } 166 log_msg(LOG_ERR, "dnstap collector: receive failed: %s", 167 strerror(errno)); 168 return -1; 169 } 170 if(r == 0) { 171 /* Remote end closed the connection? */ 172 log_msg(LOG_ERR, "dnstap collector: remote closed connection"); 173 return -1; 174 } 175 assert(r > 4); 176 msglen = buffer_read_u32_at(buf, 0); 177 if(msglen != (size_t)(r - 4)) { 178 /* Is this still possible now the communication channel is of 179 * type SOCK_DGRAM? I think not, but better safe than sorry. */ 180 log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)", 181 (unsigned int) msglen); 182 return 0; 183 } 184 buffer_skip(buf, r); 185 buffer_flip(buf); 186 return 1; 187 } 188 189 /* submit the content of the buffer received to dnstap */ 190 static void 191 dt_submit_content(struct dt_env* dt_env, struct buffer* buf) 192 { 193 uint8_t is_response, is_tcp; 194 #ifdef INET6 195 struct sockaddr_storage local_addr, addr; 196 #else 197 struct sockaddr_in local_addr, addr; 198 #endif 199 socklen_t addrlen; 200 size_t pktlen; 201 uint8_t* data; 202 size_t zonelen; 203 uint8_t* zone; 204 205 /* parse content from buffer */ 206 if(!buffer_available(buf, 4+1+4)) return; 207 buffer_skip(buf, 4); /* skip msglen */ 208 is_response = buffer_read_u8(buf); 209 addrlen = buffer_read_u32(buf); 210 if(addrlen > sizeof(local_addr) || addrlen > sizeof(addr)) return; 211 if(!buffer_available(buf, 2*addrlen)) return; 212 buffer_read(buf, &local_addr, addrlen); 213 buffer_read(buf, &addr, addrlen); 214 if(!buffer_available(buf, 1+4)) return; 215 is_tcp = buffer_read_u8(buf); 216 pktlen = buffer_read_u32(buf); 217 if(!buffer_available(buf, pktlen)) return; 218 data = buffer_current(buf); 219 buffer_skip(buf, pktlen); 220 if(!buffer_available(buf, 4)) return; 221 zonelen = buffer_read_u32(buf); 222 if(zonelen == 0) { 223 zone = NULL; 224 } else { 225 if(zonelen > MAXDOMAINLEN) return; 226 if(!buffer_available(buf, zonelen)) return; 227 zone = buffer_current(buf); 228 buffer_skip(buf, zonelen); 229 } 230 231 /* submit it */ 232 if(is_response) { 233 dt_msg_send_auth_response(dt_env, &local_addr, &addr, is_tcp, zone, 234 zonelen, data, pktlen); 235 } else { 236 dt_msg_send_auth_query(dt_env, &local_addr, &addr, is_tcp, zone, 237 zonelen, data, pktlen); 238 } 239 } 240 241 /* handle input from worker for dnstap */ 242 void 243 dt_handle_input(int fd, short event, void* arg) 244 { 245 struct dt_collector_input* dt_input = (struct dt_collector_input*)arg; 246 if((event&EV_READ) != 0) { 247 /* receive */ 248 int r = recv_into_buffer(fd, dt_input->buffer); 249 if(r == 0) 250 return; 251 else if(r < 0) { 252 event_base_loopexit(dt_input->dt_collector->event_base, NULL); 253 return; 254 } 255 /* once data is complete, send it to dnstap */ 256 VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d", 257 (int)buffer_remaining(dt_input->buffer))); 258 if(dt_input->dt_collector->dt_env) { 259 dt_submit_content(dt_input->dt_collector->dt_env, 260 dt_input->buffer); 261 } 262 263 /* clear buffer for next message */ 264 buffer_clear(dt_input->buffer); 265 } 266 } 267 268 /* init dnstap */ 269 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd) 270 { 271 int num_workers = 1; 272 #ifdef HAVE_CHROOT 273 if(nsd->chrootdir && nsd->chrootdir[0]) { 274 int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */ 275 if (nsd->options->dnstap_socket_path && 276 nsd->options->dnstap_socket_path[0] == '/' && 277 strncmp(nsd->options->dnstap_socket_path, 278 nsd->chrootdir, l) == 0) 279 nsd->options->dnstap_socket_path += l; 280 } 281 #endif 282 dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path, num_workers); 283 if(!dt_col->dt_env) { 284 log_msg(LOG_ERR, "could not create dnstap env"); 285 return; 286 } 287 dt_apply_cfg(dt_col->dt_env, nsd->options); 288 dt_init(dt_col->dt_env); 289 } 290 291 /* cleanup dt collector process for exit */ 292 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd) 293 { 294 int i; 295 dt_delete(dt_col->dt_env); 296 event_del(dt_col->cmd_event); 297 for(i=0; i<dt_col->count; i++) { 298 event_del(dt_col->inputs[i].event); 299 } 300 dt_collector_close(dt_col, nsd); 301 event_base_free(dt_col->event_base); 302 #ifdef MEMCLEAN 303 free(dt_col->cmd_event); 304 if(dt_col->inputs) { 305 for(i=0; i<dt_col->count; i++) { 306 free(dt_col->inputs[i].event); 307 } 308 free(dt_col->inputs); 309 } 310 dt_collector_destroy(dt_col, nsd); 311 #endif 312 } 313 314 /* attach events to the event base to listen to the workers and cmd channel */ 315 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd) 316 { 317 int i; 318 /* create event base */ 319 dt_col->event_base = nsd_child_event_base(); 320 if(!dt_col->event_base) { 321 error("dnstap collector: event_base create failed"); 322 } 323 324 /* add command handler */ 325 dt_col->cmd_event = (struct event*)xalloc_zero( 326 sizeof(*dt_col->cmd_event)); 327 event_set(dt_col->cmd_event, dt_col->cmd_socket_dt, 328 EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col); 329 if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0) 330 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 331 if(event_add(dt_col->cmd_event, NULL) != 0) 332 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 333 334 /* add worker input handlers */ 335 dt_col->inputs = xalloc_array_zero(dt_col->count, 336 sizeof(*dt_col->inputs)); 337 for(i=0; i<dt_col->count; i++) { 338 dt_col->inputs[i].dt_collector = dt_col; 339 dt_col->inputs[i].event = (struct event*)xalloc_zero( 340 sizeof(struct event)); 341 event_set(dt_col->inputs[i].event, 342 nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ, 343 dt_handle_input, &dt_col->inputs[i]); 344 if(event_base_set(dt_col->event_base, 345 dt_col->inputs[i].event) != 0) 346 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 347 if(event_add(dt_col->inputs[i].event, NULL) != 0) 348 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 349 350 dt_col->inputs[i].buffer = buffer_create(dt_col->region, 351 /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */ 352 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 353 #ifdef INET6 354 sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage) 355 #else 356 sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in) 357 #endif 358 ); 359 assert(buffer_capacity(dt_col->inputs[i].buffer) == 360 buffer_capacity(dt_col->send_buffer)); 361 } 362 } 363 364 /* the dnstap collector process main routine */ 365 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd) 366 { 367 /* init dnstap */ 368 VERBOSITY(1, (LOG_INFO, "dnstap collector started")); 369 dt_init_dnstap(dt_col, nsd); 370 dt_attach_events(dt_col, nsd); 371 372 /* run */ 373 if(event_base_loop(dt_col->event_base, 0) == -1) { 374 error("dnstap collector: event_base_loop failed"); 375 } 376 377 /* cleanup and done */ 378 VERBOSITY(1, (LOG_INFO, "dnstap collector stopped")); 379 dt_collector_cleanup(dt_col, nsd); 380 exit(0); 381 } 382 383 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd) 384 { 385 int i, *fd_send; 386 /* fork */ 387 dt_col->dt_pid = fork(); 388 if(dt_col->dt_pid == -1) { 389 error("dnstap_collector: fork failed: %s", strerror(errno)); 390 } 391 if(dt_col->dt_pid == 0) { 392 /* the dt collector process is this */ 393 /* close the nsd side of the command channel */ 394 close(dt_col->cmd_socket_nsd); 395 dt_col->cmd_socket_nsd = -1; 396 397 /* close the send side of the communication channels */ 398 assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap); 399 fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap 400 ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap; 401 for(i=0; i<dt_col->count; i++) { 402 if(fd_send[i] != -1) { 403 close(fd_send[i]); 404 fd_send[i] = -1; 405 } 406 } 407 #ifdef HAVE_SETPROCTITLE 408 setproctitle("dnstap_collector"); 409 #endif 410 /* Free serve process specific memory pages */ 411 #ifdef RATELIMIT 412 rrl_mmap_deinit_keep_mmap(); 413 #endif 414 udb_base_free_keep_mmap(nsd->task[0]); 415 udb_base_free_keep_mmap(nsd->task[1]); 416 namedb_close_udb(nsd->db); /* keeps mmap */ 417 namedb_close(nsd->db); 418 419 dt_collector_run(dt_col, nsd); 420 /* NOTREACH */ 421 exit(0); 422 } else { 423 /* the parent continues on, with starting NSD */ 424 /* close the dt side of the command channel */ 425 close(dt_col->cmd_socket_dt); 426 dt_col->cmd_socket_dt = -1; 427 428 /* close the receive side of the communication channels */ 429 for(i=0; i<dt_col->count; i++) { 430 if(nsd->dt_collector_fd_recv[i] != -1) { 431 close(nsd->dt_collector_fd_recv[i]); 432 nsd->dt_collector_fd_recv[i] = -1; 433 } 434 } 435 } 436 } 437 438 /* put data for sending to the collector process into the buffer */ 439 static int 440 prep_send_data(struct buffer* buf, uint8_t is_response, 441 #ifdef INET6 442 struct sockaddr_storage* local_addr, 443 struct sockaddr_storage* addr, 444 #else 445 struct sockaddr_in* local_addr, 446 struct sockaddr_in* addr, 447 #endif 448 socklen_t addrlen, int is_tcp, struct buffer* packet, 449 struct zone* zone) 450 { 451 buffer_clear(buf); 452 #ifdef INET6 453 if(local_addr->ss_family != addr->ss_family) 454 return 0; /* must be same length to send */ 455 #else 456 if(local_addr->sin_family != addr->sin_family) 457 return 0; /* must be same length to send */ 458 #endif 459 if(!buffer_available(buf, 4+1+4+2*addrlen+1+4+buffer_remaining(packet))) 460 return 0; /* does not fit in send_buffer, log is dropped */ 461 buffer_skip(buf, 4); /* the length of the message goes here */ 462 buffer_write_u8(buf, is_response); 463 buffer_write_u32(buf, addrlen); 464 buffer_write(buf, local_addr, (size_t)addrlen); 465 buffer_write(buf, addr, (size_t)addrlen); 466 buffer_write_u8(buf, (is_tcp?1:0)); 467 buffer_write_u32(buf, buffer_remaining(packet)); 468 buffer_write(buf, buffer_begin(packet), buffer_remaining(packet)); 469 if(zone && zone->apex && domain_dname(zone->apex)) { 470 if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size)) 471 return 0; 472 buffer_write_u32(buf, domain_dname(zone->apex)->name_size); 473 buffer_write(buf, dname_name(domain_dname(zone->apex)), 474 domain_dname(zone->apex)->name_size); 475 } else { 476 if(!buffer_available(buf, 4)) 477 return 0; 478 buffer_write_u32(buf, 0); 479 } 480 481 buffer_flip(buf); 482 /* write length of message */ 483 buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4); 484 return 1; 485 } 486 487 /* attempt to send buffer to socket, if it blocks do not send it. 488 * return 0 on success, -1 on error */ 489 static int attempt_to_send(int s, uint8_t* data, size_t len) 490 { 491 ssize_t r; 492 if(len == 0) 493 return 0; 494 r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL); 495 if(r == -1) { 496 if(errno == EAGAIN || errno == EINTR || 497 errno == ENOBUFS || errno == EMSGSIZE) { 498 /* check if pipe is full, if the nonblocking fd blocks, 499 * then drop the message */ 500 return 0; 501 } 502 /* some sort of error, print it */ 503 log_msg(LOG_ERR, "dnstap collector: send failed: %s", 504 strerror(errno)); 505 return -1; 506 } 507 assert(r > 0); 508 if(r > 0) { 509 assert((size_t)r == len); 510 return 0; 511 } 512 /* Other end closed the channel? */ 513 log_msg(LOG_ERR, "dnstap collector: server child closed the channel"); 514 return -1; 515 } 516 517 void dt_collector_submit_auth_query(struct nsd* nsd, 518 #ifdef INET6 519 struct sockaddr_storage* local_addr, 520 struct sockaddr_storage* addr, 521 #else 522 struct sockaddr_in* local_addr, 523 struct sockaddr_in* addr, 524 #endif 525 socklen_t addrlen, int is_tcp, struct buffer* packet) 526 { 527 if(!nsd->dt_collector) return; 528 if(!nsd->options->dnstap_log_auth_query_messages) return; 529 if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return; 530 VERBOSITY(4, (LOG_INFO, "dnstap submit auth query")); 531 532 /* marshal data into send buffer */ 533 if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, addrlen, 534 is_tcp, packet, NULL)) 535 return; /* probably did not fit in buffer */ 536 537 /* attempt to send data; do not block */ 538 if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num], 539 buffer_begin(nsd->dt_collector->send_buffer), 540 buffer_remaining(nsd->dt_collector->send_buffer))) { 541 /* Something went wrong sending to the socket. Don't send to 542 * this socket again. */ 543 close(nsd->dt_collector_fd_send[nsd->this_child->child_num]); 544 nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1; 545 } 546 } 547 548 void dt_collector_submit_auth_response(struct nsd* nsd, 549 #ifdef INET6 550 struct sockaddr_storage* local_addr, 551 struct sockaddr_storage* addr, 552 #else 553 struct sockaddr_in* local_addr, 554 struct sockaddr_in* addr, 555 #endif 556 socklen_t addrlen, int is_tcp, struct buffer* packet, 557 struct zone* zone) 558 { 559 if(!nsd->dt_collector) return; 560 if(!nsd->options->dnstap_log_auth_response_messages) return; 561 if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return; 562 VERBOSITY(4, (LOG_INFO, "dnstap submit auth response")); 563 564 /* marshal data into send buffer */ 565 if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, addrlen, 566 is_tcp, packet, zone)) 567 return; /* probably did not fit in buffer */ 568 569 /* attempt to send data; do not block */ 570 if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num], 571 buffer_begin(nsd->dt_collector->send_buffer), 572 buffer_remaining(nsd->dt_collector->send_buffer))) { 573 /* Something went wrong sending to the socket. Don't send to 574 * this socket again. */ 575 close(nsd->dt_collector_fd_send[nsd->this_child->child_num]); 576 nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1; 577 } 578 } 579