1 /* 2 * dnstap/dnstap_collector.c -- nsd collector process for dnstap information 3 * 4 * Copyright (c) 2018, NLnet Labs. All rights reserved. 5 * 6 * See LICENSE for the license. 7 * 8 */ 9 10 #include "config.h" 11 #include <sys/types.h> 12 #include <sys/socket.h> 13 #include <errno.h> 14 #include <fcntl.h> 15 #include <unistd.h> 16 #ifndef USE_MINI_EVENT 17 # ifdef HAVE_EVENT_H 18 # include <event.h> 19 # else 20 # include <event2/event.h> 21 # include "event2/event_struct.h" 22 # include "event2/event_compat.h" 23 # endif 24 #else 25 # include "mini_event.h" 26 #endif 27 #include "dnstap/dnstap_collector.h" 28 #include "dnstap/dnstap.h" 29 #include "util.h" 30 #include "nsd.h" 31 #include "region-allocator.h" 32 #include "buffer.h" 33 #include "namedb.h" 34 #include "options.h" 35 36 struct dt_collector* dt_collector_create(struct nsd* nsd) 37 { 38 int i, sv[2]; 39 struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero( 40 sizeof(*dt_col)); 41 dt_col->count = nsd->child_count; 42 dt_col->dt_env = NULL; 43 dt_col->region = region_create(xalloc, free); 44 dt_col->send_buffer = buffer_create(dt_col->region, 45 /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + addr */ 46 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 47 #ifdef INET6 48 sizeof(struct sockaddr_storage) 49 #else 50 sizeof(struct sockaddr_in) 51 #endif 52 ); 53 54 /* open pipes in struct nsd */ 55 nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count, 56 sizeof(int)); 57 nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count, 58 sizeof(int)); 59 for(i=0; i<dt_col->count; i++) { 60 int fd[2]; 61 fd[0] = -1; 62 fd[1] = -1; 63 if(pipe(fd) < 0) { 64 error("dnstap_collector: cannot create pipe: %s", 65 strerror(errno)); 66 } 67 if(fcntl(fd[0], F_SETFL, O_NONBLOCK) == -1) { 68 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 69 } 70 if(fcntl(fd[1], F_SETFL, O_NONBLOCK) == -1) { 71 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 72 } 73 nsd->dt_collector_fd_recv[i] = fd[0]; 74 nsd->dt_collector_fd_send[i] = fd[1]; 75 } 76 77 /* open socketpair */ 78 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 79 error("dnstap_collector: cannot create socketpair: %s", 80 strerror(errno)); 81 } 82 if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) { 83 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 84 } 85 if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) { 86 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 87 } 88 dt_col->cmd_socket_dt = sv[0]; 89 dt_col->cmd_socket_nsd = sv[1]; 90 91 return dt_col; 92 } 93 94 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd) 95 { 96 if(!dt_col) return; 97 free(nsd->dt_collector_fd_recv); 98 nsd->dt_collector_fd_recv = NULL; 99 free(nsd->dt_collector_fd_send); 100 nsd->dt_collector_fd_send = NULL; 101 region_destroy(dt_col->region); 102 free(dt_col); 103 } 104 105 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd) 106 { 107 int i; 108 if(!dt_col) return; 109 if(dt_col->cmd_socket_dt != -1) { 110 close(dt_col->cmd_socket_dt); 111 dt_col->cmd_socket_dt = -1; 112 } 113 if(dt_col->cmd_socket_nsd != -1) { 114 close(dt_col->cmd_socket_nsd); 115 dt_col->cmd_socket_nsd = -1; 116 } 117 for(i=0; i<dt_col->count; i++) { 118 if(nsd->dt_collector_fd_recv[i] != -1) { 119 close(nsd->dt_collector_fd_recv[i]); 120 nsd->dt_collector_fd_recv[i] = -1; 121 } 122 if(nsd->dt_collector_fd_send[i] != -1) { 123 close(nsd->dt_collector_fd_send[i]); 124 nsd->dt_collector_fd_send[i] = -1; 125 } 126 } 127 } 128 129 /* handle command from nsd to dt collector. 130 * mostly, check for fd closed, this means we have to exit */ 131 void 132 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg) 133 { 134 struct dt_collector* dt_col = (struct dt_collector*)arg; 135 if((event&EV_READ) != 0) { 136 event_base_loopexit(dt_col->event_base, NULL); 137 } 138 } 139 140 /* read data from fd into buffer, true when message is complete */ 141 static int read_into_buffer(int fd, struct buffer* buf) 142 { 143 size_t msglen; 144 ssize_t r; 145 if(buffer_position(buf) < 4) { 146 /* read the length of the message */ 147 r = read(fd, buffer_current(buf), 4 - buffer_position(buf)); 148 if(r == -1) { 149 if(errno == EAGAIN || errno == EINTR) { 150 /* continue to read later */ 151 return 0; 152 } 153 log_msg(LOG_ERR, "dnstap collector: read failed: %s", 154 strerror(errno)); 155 return 0; 156 } 157 buffer_skip(buf, r); 158 if(buffer_position(buf) < 4) 159 return 0; /* continue to read more msglen later */ 160 } 161 162 /* msglen complete */ 163 msglen = buffer_read_u32_at(buf, 0); 164 /* assert we have enough space, if we don't and we wanted to continue, 165 * we would have to skip the message somehow, but that should never 166 * happen because send_buffer and receive_buffer have the same size */ 167 assert(buffer_capacity(buf) >= msglen + 4); 168 r = read(fd, buffer_current(buf), msglen - (buffer_position(buf) - 4)); 169 if(r == -1) { 170 if(errno == EAGAIN || errno == EINTR) { 171 /* continue to read later */ 172 return 0; 173 } 174 log_msg(LOG_ERR, "dnstap collector: read failed: %s", 175 strerror(errno)); 176 return 0; 177 } 178 buffer_skip(buf, r); 179 if(buffer_position(buf) < 4 + msglen) 180 return 0; /* read more msg later */ 181 182 /* msg complete */ 183 buffer_flip(buf); 184 return 1; 185 } 186 187 /* submit the content of the buffer received to dnstap */ 188 static void 189 dt_submit_content(struct dt_env* dt_env, struct buffer* buf) 190 { 191 uint8_t is_response, is_tcp; 192 #ifdef INET6 193 struct sockaddr_storage addr; 194 #else 195 struct sockaddr_in addr; 196 #endif 197 socklen_t addrlen; 198 size_t pktlen; 199 uint8_t* data; 200 size_t zonelen; 201 uint8_t* zone; 202 203 /* parse content from buffer */ 204 if(!buffer_available(buf, 4+1+4)) return; 205 buffer_skip(buf, 4); /* skip msglen */ 206 is_response = buffer_read_u8(buf); 207 addrlen = buffer_read_u32(buf); 208 if(addrlen > sizeof(addr)) return; 209 if(!buffer_available(buf, addrlen)) return; 210 buffer_read(buf, &addr, addrlen); 211 if(!buffer_available(buf, 1+4)) return; 212 is_tcp = buffer_read_u8(buf); 213 pktlen = buffer_read_u32(buf); 214 if(!buffer_available(buf, pktlen)) return; 215 data = buffer_current(buf); 216 buffer_skip(buf, pktlen); 217 if(!buffer_available(buf, 4)) return; 218 zonelen = buffer_read_u32(buf); 219 if(zonelen == 0) { 220 zone = NULL; 221 } else { 222 if(zonelen > MAXDOMAINLEN) return; 223 if(!buffer_available(buf, zonelen)) return; 224 zone = buffer_current(buf); 225 buffer_skip(buf, zonelen); 226 } 227 228 /* submit it */ 229 if(is_response) { 230 dt_msg_send_auth_response(dt_env, &addr, is_tcp, zone, 231 zonelen, data, pktlen); 232 } else { 233 dt_msg_send_auth_query(dt_env, &addr, is_tcp, zone, 234 zonelen, data, pktlen); 235 } 236 } 237 238 /* handle input from worker for dnstap */ 239 void 240 dt_handle_input(int fd, short event, void* arg) 241 { 242 struct dt_collector_input* dt_input = (struct dt_collector_input*)arg; 243 if((event&EV_READ) != 0) { 244 /* read */ 245 if(!read_into_buffer(fd, dt_input->buffer)) 246 return; 247 248 /* once data is complete, write it to dnstap */ 249 VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d", 250 (int)buffer_remaining(dt_input->buffer))); 251 if(dt_input->dt_collector->dt_env) { 252 dt_submit_content(dt_input->dt_collector->dt_env, 253 dt_input->buffer); 254 } 255 256 /* clear buffer for next message */ 257 buffer_clear(dt_input->buffer); 258 } 259 } 260 261 /* init dnstap */ 262 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd) 263 { 264 int num_workers = 1; 265 #ifdef HAVE_CHROOT 266 if(nsd->chrootdir && nsd->chrootdir[0]) { 267 int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */ 268 if (nsd->options->dnstap_socket_path && 269 nsd->options->dnstap_socket_path[0] == '/' && 270 strncmp(nsd->options->dnstap_socket_path, 271 nsd->chrootdir, l) == 0) 272 nsd->options->dnstap_socket_path += l; 273 } 274 #endif 275 dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path, num_workers); 276 if(!dt_col->dt_env) { 277 log_msg(LOG_ERR, "could not create dnstap env"); 278 return; 279 } 280 dt_apply_cfg(dt_col->dt_env, nsd->options); 281 dt_init(dt_col->dt_env); 282 } 283 284 /* cleanup dt collector process for exit */ 285 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd) 286 { 287 int i; 288 dt_delete(dt_col->dt_env); 289 event_del(dt_col->cmd_event); 290 for(i=0; i<dt_col->count; i++) { 291 event_del(dt_col->inputs[i].event); 292 } 293 dt_collector_close(dt_col, nsd); 294 event_base_free(dt_col->event_base); 295 #ifdef MEMCLEAN 296 free(dt_col->cmd_event); 297 if(dt_col->inputs) { 298 for(i=0; i<dt_col->count; i++) { 299 free(dt_col->inputs[i].event); 300 } 301 free(dt_col->inputs); 302 } 303 dt_collector_destroy(dt_col, nsd); 304 #endif 305 } 306 307 /* attach events to the event base to listen to the workers and cmd channel */ 308 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd) 309 { 310 int i; 311 /* create event base */ 312 dt_col->event_base = nsd_child_event_base(); 313 if(!dt_col->event_base) { 314 error("dnstap collector: event_base create failed"); 315 } 316 317 /* add command handler */ 318 dt_col->cmd_event = (struct event*)xalloc_zero( 319 sizeof(*dt_col->cmd_event)); 320 event_set(dt_col->cmd_event, dt_col->cmd_socket_dt, 321 EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col); 322 if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0) 323 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 324 if(event_add(dt_col->cmd_event, NULL) != 0) 325 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 326 327 /* add worker input handlers */ 328 dt_col->inputs = xalloc_array_zero(dt_col->count, 329 sizeof(*dt_col->inputs)); 330 for(i=0; i<dt_col->count; i++) { 331 dt_col->inputs[i].dt_collector = dt_col; 332 dt_col->inputs[i].event = (struct event*)xalloc_zero( 333 sizeof(struct event)); 334 event_set(dt_col->inputs[i].event, 335 nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ, 336 dt_handle_input, &dt_col->inputs[i]); 337 if(event_base_set(dt_col->event_base, 338 dt_col->inputs[i].event) != 0) 339 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 340 if(event_add(dt_col->inputs[i].event, NULL) != 0) 341 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 342 343 dt_col->inputs[i].buffer = buffer_create(dt_col->region, 344 /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + addr */ 345 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 346 #ifdef INET6 347 sizeof(struct sockaddr_storage) 348 #else 349 sizeof(struct sockaddr_in) 350 #endif 351 ); 352 assert(buffer_capacity(dt_col->inputs[i].buffer) == 353 buffer_capacity(dt_col->send_buffer)); 354 } 355 } 356 357 /* the dnstap collector process main routine */ 358 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd) 359 { 360 /* init dnstap */ 361 VERBOSITY(1, (LOG_INFO, "dnstap collector started")); 362 dt_init_dnstap(dt_col, nsd); 363 dt_attach_events(dt_col, nsd); 364 365 /* run */ 366 if(event_base_loop(dt_col->event_base, 0) == -1) { 367 error("dnstap collector: event_base_loop failed"); 368 } 369 370 /* cleanup and done */ 371 VERBOSITY(1, (LOG_INFO, "dnstap collector stopped")); 372 dt_collector_cleanup(dt_col, nsd); 373 exit(0); 374 } 375 376 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd) 377 { 378 /* fork */ 379 dt_col->dt_pid = fork(); 380 if(dt_col->dt_pid == -1) { 381 error("dnstap_collector: fork failed: %s", strerror(errno)); 382 } 383 if(dt_col->dt_pid == 0) { 384 /* the dt collector process is this */ 385 /* close the nsd side of the command channel */ 386 close(dt_col->cmd_socket_nsd); 387 dt_col->cmd_socket_nsd = -1; 388 dt_collector_run(dt_col, nsd); 389 /* NOTREACH */ 390 exit(0); 391 } else { 392 /* the parent continues on, with starting NSD */ 393 /* close the dt side of the command channel */ 394 close(dt_col->cmd_socket_dt); 395 dt_col->cmd_socket_dt = -1; 396 } 397 } 398 399 /* put data for sending to the collector process into the buffer */ 400 static int 401 prep_send_data(struct buffer* buf, uint8_t is_response, 402 #ifdef INET6 403 struct sockaddr_storage* addr, 404 #else 405 struct sockaddr_in* addr, 406 #endif 407 socklen_t addrlen, int is_tcp, struct buffer* packet, 408 struct zone* zone) 409 { 410 buffer_clear(buf); 411 if(!buffer_available(buf, 4+1+4+addrlen+1+4+buffer_remaining(packet))) 412 return 0; /* does not fit in send_buffer, log is dropped */ 413 buffer_skip(buf, 4); /* the length of the message goes here */ 414 buffer_write_u8(buf, is_response); 415 buffer_write_u32(buf, addrlen); 416 buffer_write(buf, addr, (size_t)addrlen); 417 buffer_write_u8(buf, (is_tcp?1:0)); 418 buffer_write_u32(buf, buffer_remaining(packet)); 419 buffer_write(buf, buffer_begin(packet), buffer_remaining(packet)); 420 if(zone && zone->apex && domain_dname(zone->apex)) { 421 if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size)) 422 return 0; 423 buffer_write_u32(buf, domain_dname(zone->apex)->name_size); 424 buffer_write(buf, dname_name(domain_dname(zone->apex)), 425 domain_dname(zone->apex)->name_size); 426 } else { 427 if(!buffer_available(buf, 4)) 428 return 0; 429 buffer_write_u32(buf, 0); 430 } 431 432 buffer_flip(buf); 433 /* write length of message */ 434 buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4); 435 return 1; 436 } 437 438 /* attempt to write buffer to socket, if it blocks do not write it. */ 439 static void attempt_to_write(int s, uint8_t* data, size_t len) 440 { 441 size_t total = 0; 442 ssize_t r; 443 while(total < len) { 444 r = write(s, data+total, len-total); 445 if(r == -1) { 446 if(errno == EAGAIN && total == 0) { 447 /* on first write part, check if pipe is full, 448 * if the nonblocking fd blocks, then drop 449 * the message */ 450 return; 451 } 452 if(errno != EAGAIN && errno != EINTR) { 453 /* some sort of error, print it and drop it */ 454 log_msg(LOG_ERR, 455 "dnstap collector: write failed: %s", 456 strerror(errno)); 457 return; 458 } 459 /* continue and write this again */ 460 /* for EINTR, we have to do this, 461 * for EAGAIN, if the first part succeeded, we have 462 * to continue to write the remainder of the message, 463 * because otherwise partial messages confuse the 464 * receiver. */ 465 continue; 466 } 467 total += r; 468 } 469 } 470 471 void dt_collector_submit_auth_query(struct nsd* nsd, 472 #ifdef INET6 473 struct sockaddr_storage* addr, 474 #else 475 struct sockaddr_in* addr, 476 #endif 477 socklen_t addrlen, int is_tcp, struct buffer* packet) 478 { 479 if(!nsd->dt_collector) return; 480 if(!nsd->options->dnstap_log_auth_query_messages) return; 481 VERBOSITY(4, (LOG_INFO, "dnstap submit auth query")); 482 483 /* marshal data into send buffer */ 484 if(!prep_send_data(nsd->dt_collector->send_buffer, 0, addr, addrlen, 485 is_tcp, packet, NULL)) 486 return; /* probably did not fit in buffer */ 487 488 /* attempt to send data; do not block */ 489 attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num], 490 buffer_begin(nsd->dt_collector->send_buffer), 491 buffer_remaining(nsd->dt_collector->send_buffer)); 492 } 493 494 void dt_collector_submit_auth_response(struct nsd* nsd, 495 #ifdef INET6 496 struct sockaddr_storage* addr, 497 #else 498 struct sockaddr_in* addr, 499 #endif 500 socklen_t addrlen, int is_tcp, struct buffer* packet, 501 struct zone* zone) 502 { 503 if(!nsd->dt_collector) return; 504 if(!nsd->options->dnstap_log_auth_response_messages) return; 505 VERBOSITY(4, (LOG_INFO, "dnstap submit auth response")); 506 507 /* marshal data into send buffer */ 508 if(!prep_send_data(nsd->dt_collector->send_buffer, 1, addr, addrlen, 509 is_tcp, packet, zone)) 510 return; /* probably did not fit in buffer */ 511 512 /* attempt to send data; do not block */ 513 attempt_to_write(nsd->dt_collector_fd_send[nsd->this_child->child_num], 514 buffer_begin(nsd->dt_collector->send_buffer), 515 buffer_remaining(nsd->dt_collector->send_buffer)); 516 } 517