1 /* 2 * testcode/delayer.c - debug program that delays queries to a server. 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This program delays queries made. It performs as a proxy to another 40 * server and delays queries to it. 41 */ 42 43 #include "config.h" 44 #ifdef HAVE_GETOPT_H 45 #include <getopt.h> 46 #endif 47 #ifdef HAVE_TIME_H 48 #include <time.h> 49 #endif 50 #include <sys/time.h> 51 #include "util/net_help.h" 52 #include "util/config_file.h" 53 #include "sldns/sbuffer.h" 54 #include <signal.h> 55 56 /** number of reads per select for delayer */ 57 #define TRIES_PER_SELECT 100 58 59 /** 60 * The ring buffer 61 */ 62 struct ringbuf { 63 /** base of buffer */ 64 uint8_t* buf; 65 /** size of buffer */ 66 size_t size; 67 /** low mark, items start here */ 68 size_t low; 69 /** high mark, items end here */ 70 size_t high; 71 }; 72 73 /** 74 * List of proxy fds that return replies from the server to our clients. 75 */ 76 struct proxy { 77 /** the fd to listen for replies from server */ 78 int s; 79 /** last time this was used */ 80 struct timeval lastuse; 81 /** remote address */ 82 struct sockaddr_storage addr; 83 /** length of addr */ 84 socklen_t addr_len; 85 /** number of queries waiting (in total) */ 86 size_t numwait; 87 /** number of queries sent to server (in total) */ 88 size_t numsent; 89 /** numberof answers returned to client (in total) */ 90 size_t numreturn; 91 /** how many times repurposed */ 92 size_t numreuse; 93 /** next in proxylist */ 94 struct proxy* next; 95 }; 96 97 /** 98 * An item that has to be TCP relayed 99 */ 100 struct tcp_send_list { 101 /** the data item */ 102 uint8_t* item; 103 /** size of item */ 104 size_t len; 105 /** time when the item can be transmitted on */ 106 struct timeval wait; 107 /** how much of the item has already been transmitted */ 108 size_t done; 109 /** next in list */ 110 struct tcp_send_list* next; 111 }; 112 113 /** 114 * List of TCP proxy fd pairs to TCP connect client to server 115 */ 116 struct tcp_proxy { 117 /** the fd to listen for client query */ 118 int client_s; 119 /** the fd to listen for server answer */ 120 int server_s; 121 122 /** remote client address */ 123 struct sockaddr_storage addr; 124 /** length of address */ 125 socklen_t addr_len; 126 /** timeout on this entry */ 127 struct timeval timeout; 128 129 /** list of query items to send to server */ 130 struct tcp_send_list* querylist; 131 /** last in query list */ 132 struct tcp_send_list* querylast; 133 /** list of answer items to send to client */ 134 struct tcp_send_list* answerlist; 135 /** last in answerlist */ 136 struct tcp_send_list* answerlast; 137 138 /** next in list */ 139 struct tcp_proxy* next; 140 }; 141 142 /** usage information for delayer */ 143 static void usage(char* argv[]) 144 { 145 printf("usage: %s [options]\n", argv[0]); 146 printf(" -f addr : use addr, forward to that server, @port.\n"); 147 printf(" -b addr : bind to this address to listen.\n"); 148 printf(" -p port : bind to this port (use 0 for random).\n"); 149 printf(" -m mem : use this much memory for waiting queries.\n"); 150 printf(" -d delay: UDP queries are delayed n milliseconds.\n"); 151 printf(" TCP is delayed twice (on send, on recv).\n"); 152 printf(" -h : this help message\n"); 153 exit(1); 154 } 155 156 /** timeval compare, t1 < t2 */ 157 static int 158 dl_tv_smaller(struct timeval* t1, const struct timeval* t2) 159 { 160 #ifndef S_SPLINT_S 161 if(t1->tv_sec < t2->tv_sec) 162 return 1; 163 if(t1->tv_sec == t2->tv_sec && 164 t1->tv_usec < t2->tv_usec) 165 return 1; 166 #endif 167 return 0; 168 } 169 170 /** timeval add, t1 += t2 */ 171 static void 172 dl_tv_add(struct timeval* t1, const struct timeval* t2) 173 { 174 #ifndef S_SPLINT_S 175 t1->tv_sec += t2->tv_sec; 176 t1->tv_usec += t2->tv_usec; 177 while(t1->tv_usec >= 1000000) { 178 t1->tv_usec -= 1000000; 179 t1->tv_sec++; 180 } 181 #endif 182 } 183 184 /** timeval subtract, t1 -= t2 */ 185 static void 186 dl_tv_subtract(struct timeval* t1, const struct timeval* t2) 187 { 188 #ifndef S_SPLINT_S 189 t1->tv_sec -= t2->tv_sec; 190 if(t1->tv_usec >= t2->tv_usec) { 191 t1->tv_usec -= t2->tv_usec; 192 } else { 193 t1->tv_sec--; 194 t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec); 195 } 196 #endif 197 } 198 199 200 /** create new ring buffer */ 201 static struct ringbuf* 202 ring_create(size_t sz) 203 { 204 struct ringbuf* r = (struct ringbuf*)calloc(1, sizeof(*r)); 205 if(!r) fatal_exit("out of memory"); 206 r->buf = (uint8_t*)malloc(sz); 207 if(!r->buf) fatal_exit("out of memory"); 208 r->size = sz; 209 r->low = 0; 210 r->high = 0; 211 return r; 212 } 213 214 /** delete ring buffer */ 215 static void 216 ring_delete(struct ringbuf* r) 217 { 218 if(!r) return; 219 free(r->buf); 220 free(r); 221 } 222 223 /** add entry to ringbuffer */ 224 static void 225 ring_add(struct ringbuf* r, sldns_buffer* pkt, struct timeval* now, 226 struct timeval* delay, struct proxy* p) 227 { 228 /* time -- proxy* -- 16bitlen -- message */ 229 uint16_t len = (uint16_t)sldns_buffer_limit(pkt); 230 struct timeval when; 231 size_t needed; 232 uint8_t* where = NULL; 233 log_assert(sldns_buffer_limit(pkt) <= 65535); 234 needed = sizeof(when) + sizeof(p) + sizeof(len) + len; 235 /* put item into ringbuffer */ 236 if(r->low < r->high) { 237 /* used part is in the middle */ 238 if(r->size - r->high >= needed) { 239 where = r->buf + r->high; 240 r->high += needed; 241 } else if(r->low > needed) { 242 /* wrap around ringbuffer */ 243 /* make sure r->low == r->high means empty */ 244 /* so r->low == r->high cannot be used to signify 245 * a completely full ringbuf */ 246 if(r->size - r->high > sizeof(when)+sizeof(p)) { 247 /* zero entry at end of buffer */ 248 memset(r->buf+r->high, 0, 249 sizeof(when)+sizeof(p)); 250 } 251 where = r->buf; 252 r->high = needed; 253 } else { 254 /* drop message */ 255 log_warn("warning: mem full, dropped message"); 256 return; 257 } 258 } else { 259 /* empty */ 260 if(r->high == r->low) { 261 where = r->buf; 262 r->low = 0; 263 r->high = needed; 264 /* unused part is in the middle */ 265 /* so ringbuffer has wrapped around */ 266 } else if(r->low - r->high > needed) { 267 where = r->buf + r->high; 268 r->high += needed; 269 } else { 270 log_warn("warning: mem full, dropped message"); 271 return; 272 } 273 } 274 when = *now; 275 dl_tv_add(&when, delay); 276 /* copy it at where part */ 277 log_assert(where != NULL); 278 memmove(where, &when, sizeof(when)); 279 memmove(where+sizeof(when), &p, sizeof(p)); 280 memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len)); 281 memmove(where+sizeof(when)+sizeof(p)+sizeof(len), 282 sldns_buffer_begin(pkt), len); 283 } 284 285 /** see if the ringbuffer is empty */ 286 static int 287 ring_empty(struct ringbuf* r) 288 { 289 return (r->low == r->high); 290 } 291 292 /** peek at timevalue for next item in ring */ 293 static struct timeval* 294 ring_peek_time(struct ringbuf* r) 295 { 296 if(ring_empty(r)) 297 return NULL; 298 return (struct timeval*)&r->buf[r->low]; 299 } 300 301 /** get entry from ringbuffer */ 302 static int 303 ring_pop(struct ringbuf* r, sldns_buffer* pkt, struct timeval* tv, 304 struct proxy** p) 305 { 306 /* time -- proxy* -- 16bitlen -- message */ 307 uint16_t len; 308 uint8_t* where = NULL; 309 size_t done; 310 if(r->low == r->high) 311 return 0; 312 where = r->buf + r->low; 313 memmove(tv, where, sizeof(*tv)); 314 memmove(p, where+sizeof(*tv), sizeof(*p)); 315 memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len)); 316 memmove(sldns_buffer_begin(pkt), 317 where+sizeof(*tv)+sizeof(*p)+sizeof(len), len); 318 sldns_buffer_set_limit(pkt, (size_t)len); 319 done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len; 320 /* move lowmark */ 321 if(r->low < r->high) { 322 /* used part in middle */ 323 log_assert(r->high - r->low >= done); 324 r->low += done; 325 } else { 326 /* unused part in middle */ 327 log_assert(r->size - r->low >= done); 328 r->low += done; 329 if(r->size - r->low > sizeof(*tv)+sizeof(*p)) { 330 /* see if it is zeroed; means end of buffer */ 331 struct proxy* pz; 332 memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz)); 333 if(pz == NULL) 334 r->low = 0; 335 } else r->low = 0; 336 } 337 if(r->low == r->high) { 338 r->low = 0; /* reset if empty */ 339 r->high = 0; 340 } 341 return 1; 342 } 343 344 /** signal handler global info */ 345 static volatile int do_quit = 0; 346 347 /** signal handler for user quit */ 348 static RETSIGTYPE delayer_sigh(int sig) 349 { 350 printf("exit on signal %d\n", sig); 351 do_quit = 1; 352 } 353 354 /** send out waiting packets */ 355 static void 356 service_send(struct ringbuf* ring, struct timeval* now, sldns_buffer* pkt, 357 struct sockaddr_storage* srv_addr, socklen_t srv_len) 358 { 359 struct proxy* p; 360 struct timeval tv; 361 ssize_t sent; 362 while(!ring_empty(ring) && 363 dl_tv_smaller(ring_peek_time(ring), now)) { 364 /* this items needs to be sent out */ 365 if(!ring_pop(ring, pkt, &tv, &p)) 366 fatal_exit("ringbuf error: pop failed"); 367 verbose(1, "send out query %d.%6.6d", 368 (unsigned)tv.tv_sec, (unsigned)tv.tv_usec); 369 log_addr(1, "from client", &p->addr, p->addr_len); 370 /* send it */ 371 sent = sendto(p->s, (void*)sldns_buffer_begin(pkt), 372 sldns_buffer_limit(pkt), 0, 373 (struct sockaddr*)srv_addr, srv_len); 374 if(sent == -1) { 375 log_err("sendto: %s", sock_strerror(errno)); 376 } else if(sent != (ssize_t)sldns_buffer_limit(pkt)) { 377 log_err("sendto: partial send"); 378 } 379 p->lastuse = *now; 380 p->numsent++; 381 } 382 } 383 384 /** do proxy for one readable client */ 385 static void 386 do_proxy(struct proxy* p, int retsock, sldns_buffer* pkt) 387 { 388 int i; 389 ssize_t r; 390 for(i=0; i<TRIES_PER_SELECT; i++) { 391 r = recv(p->s, (void*)sldns_buffer_begin(pkt), 392 sldns_buffer_capacity(pkt), 0); 393 if(r == -1) { 394 #ifndef USE_WINSOCK 395 if(errno == EAGAIN || errno == EINTR) 396 return; 397 #else 398 if(WSAGetLastError() == WSAEINPROGRESS || 399 WSAGetLastError() == WSAEWOULDBLOCK) 400 return; 401 #endif 402 log_err("recv: %s", sock_strerror(errno)); 403 return; 404 } 405 sldns_buffer_set_limit(pkt, (size_t)r); 406 log_addr(1, "return reply to client", &p->addr, p->addr_len); 407 /* send reply back to the real client */ 408 p->numreturn++; 409 r = sendto(retsock, (void*)sldns_buffer_begin(pkt), (size_t)r, 410 0, (struct sockaddr*)&p->addr, p->addr_len); 411 if(r == -1) { 412 log_err("sendto: %s", sock_strerror(errno)); 413 } 414 } 415 } 416 417 /** proxy return replies to clients */ 418 static void 419 service_proxy(fd_set* rset, int retsock, struct proxy* proxies, 420 sldns_buffer* pkt, struct timeval* now) 421 { 422 struct proxy* p; 423 for(p = proxies; p; p = p->next) { 424 if(FD_ISSET(p->s, rset)) { 425 p->lastuse = *now; 426 do_proxy(p, retsock, pkt); 427 } 428 } 429 } 430 431 /** find or else create proxy for this remote client */ 432 static struct proxy* 433 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len, 434 fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6, 435 struct timeval* now, struct timeval* reuse_timeout) 436 { 437 struct proxy* p; 438 struct timeval t; 439 for(p = *proxies; p; p = p->next) { 440 if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0) 441 return p; 442 } 443 /* possibly: reuse lapsed entries */ 444 for(p = *proxies; p; p = p->next) { 445 if(p->numwait > p->numsent || p->numsent > p->numreturn) 446 continue; 447 t = *now; 448 dl_tv_subtract(&t, &p->lastuse); 449 if(dl_tv_smaller(&t, reuse_timeout)) 450 continue; 451 /* yes! */ 452 verbose(1, "reuse existing entry"); 453 memmove(&p->addr, from, from_len); 454 p->addr_len = from_len; 455 p->numreuse++; 456 return p; 457 } 458 /* create new */ 459 p = (struct proxy*)calloc(1, sizeof(*p)); 460 if(!p) fatal_exit("out of memory"); 461 p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0); 462 if(p->s == -1) { 463 fatal_exit("socket: %s", sock_strerror(errno)); 464 } 465 fd_set_nonblock(p->s); 466 memmove(&p->addr, from, from_len); 467 p->addr_len = from_len; 468 p->next = *proxies; 469 *proxies = p; 470 FD_SET(FD_SET_T p->s, rorig); 471 if(p->s+1 > *max) 472 *max = p->s+1; 473 return p; 474 } 475 476 /** recv new waiting packets */ 477 static void 478 service_recv(int s, struct ringbuf* ring, sldns_buffer* pkt, 479 fd_set* rorig, int* max, struct proxy** proxies, 480 struct sockaddr_storage* srv_addr, socklen_t srv_len, 481 struct timeval* now, struct timeval* delay, struct timeval* reuse) 482 { 483 int i; 484 struct sockaddr_storage from; 485 socklen_t from_len; 486 ssize_t len; 487 struct proxy* p; 488 for(i=0; i<TRIES_PER_SELECT; i++) { 489 from_len = (socklen_t)sizeof(from); 490 len = recvfrom(s, (void*)sldns_buffer_begin(pkt), 491 sldns_buffer_capacity(pkt), 0, 492 (struct sockaddr*)&from, &from_len); 493 if(len < 0) { 494 #ifndef USE_WINSOCK 495 if(errno == EAGAIN || errno == EINTR) 496 return; 497 #else 498 if(WSAGetLastError() == WSAEWOULDBLOCK || 499 WSAGetLastError() == WSAEINPROGRESS) 500 return; 501 #endif 502 fatal_exit("recvfrom: %s", sock_strerror(errno)); 503 } 504 sldns_buffer_set_limit(pkt, (size_t)len); 505 /* find its proxy element */ 506 p = find_create_proxy(&from, from_len, rorig, max, proxies, 507 addr_is_ip6(srv_addr, srv_len), now, reuse); 508 if(!p) fatal_exit("error: cannot find or create proxy"); 509 p->lastuse = *now; 510 ring_add(ring, pkt, now, delay, p); 511 p->numwait++; 512 log_addr(1, "recv from client", &p->addr, p->addr_len); 513 } 514 } 515 516 /** delete tcp proxy */ 517 static void 518 tcp_proxy_delete(struct tcp_proxy* p) 519 { 520 struct tcp_send_list* s, *sn; 521 if(!p) 522 return; 523 log_addr(1, "delete tcp proxy", &p->addr, p->addr_len); 524 s = p->querylist; 525 while(s) { 526 sn = s->next; 527 free(s->item); 528 free(s); 529 s = sn; 530 } 531 s = p->answerlist; 532 while(s) { 533 sn = s->next; 534 free(s->item); 535 free(s); 536 s = sn; 537 } 538 sock_close(p->client_s); 539 if(p->server_s != -1) 540 sock_close(p->server_s); 541 free(p); 542 } 543 544 /** accept new TCP connections, and set them up */ 545 static void 546 service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies, 547 struct sockaddr_storage* srv_addr, socklen_t srv_len, 548 struct timeval* now, struct timeval* tcp_timeout) 549 { 550 int newfd; 551 struct sockaddr_storage addr; 552 struct tcp_proxy* p; 553 socklen_t addr_len; 554 newfd = accept(s, (struct sockaddr*)&addr, &addr_len); 555 if(newfd == -1) { 556 #ifndef USE_WINSOCK 557 if(errno == EAGAIN || errno == EINTR) 558 return; 559 #else 560 if(WSAGetLastError() == WSAEWOULDBLOCK || 561 WSAGetLastError() == WSAEINPROGRESS || 562 WSAGetLastError() == WSAECONNRESET) 563 return; 564 #endif 565 fatal_exit("accept: %s", sock_strerror(errno)); 566 } 567 p = (struct tcp_proxy*)calloc(1, sizeof(*p)); 568 if(!p) fatal_exit("out of memory"); 569 memmove(&p->addr, &addr, addr_len); 570 p->addr_len = addr_len; 571 log_addr(1, "new tcp proxy", &p->addr, p->addr_len); 572 p->client_s = newfd; 573 p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET, 574 SOCK_STREAM, 0); 575 if(p->server_s == -1) { 576 fatal_exit("tcp socket: %s", sock_strerror(errno)); 577 } 578 fd_set_nonblock(p->client_s); 579 fd_set_nonblock(p->server_s); 580 if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) { 581 #ifndef USE_WINSOCK 582 if(errno != EINPROGRESS) { 583 log_err("tcp connect: %s", strerror(errno)); 584 #else 585 if(WSAGetLastError() != WSAEWOULDBLOCK && 586 WSAGetLastError() != WSAEINPROGRESS) { 587 log_err("tcp connect: %s", 588 wsa_strerror(WSAGetLastError())); 589 #endif 590 sock_close(p->server_s); 591 sock_close(p->client_s); 592 free(p); 593 return; 594 } 595 } 596 p->timeout = *now; 597 dl_tv_add(&p->timeout, tcp_timeout); 598 599 /* listen to client and server */ 600 FD_SET(FD_SET_T p->client_s, rorig); 601 FD_SET(FD_SET_T p->server_s, rorig); 602 if(p->client_s+1 > *max) 603 *max = p->client_s+1; 604 if(p->server_s+1 > *max) 605 *max = p->server_s+1; 606 607 /* add into proxy list */ 608 p->next = *proxies; 609 *proxies = p; 610 } 611 612 /** relay TCP, read a part */ 613 static int 614 tcp_relay_read(int s, struct tcp_send_list** first, 615 struct tcp_send_list** last, struct timeval* now, 616 struct timeval* delay, sldns_buffer* pkt) 617 { 618 struct tcp_send_list* item; 619 ssize_t r = recv(s, (void*)sldns_buffer_begin(pkt), 620 sldns_buffer_capacity(pkt), 0); 621 if(r == -1) { 622 #ifndef USE_WINSOCK 623 if(errno == EINTR || errno == EAGAIN) 624 return 1; 625 #else 626 if(WSAGetLastError() == WSAEINPROGRESS || 627 WSAGetLastError() == WSAEWOULDBLOCK) 628 return 1; 629 #endif 630 log_err("tcp read: %s", sock_strerror(errno)); 631 return 0; 632 } else if(r == 0) { 633 /* connection closed */ 634 return 0; 635 } 636 item = (struct tcp_send_list*)malloc(sizeof(*item)); 637 if(!item) { 638 log_err("out of memory"); 639 return 0; 640 } 641 verbose(1, "read item len %d", (int)r); 642 item->len = (size_t)r; 643 item->item = memdup(sldns_buffer_begin(pkt), item->len); 644 if(!item->item) { 645 free(item); 646 log_err("out of memory"); 647 return 0; 648 } 649 item->done = 0; 650 item->wait = *now; 651 dl_tv_add(&item->wait, delay); 652 item->next = NULL; 653 654 /* link in */ 655 if(*first) { 656 (*last)->next = item; 657 } else { 658 *first = item; 659 } 660 *last = item; 661 return 1; 662 } 663 664 /** relay TCP, write a part */ 665 static int 666 tcp_relay_write(int s, struct tcp_send_list** first, 667 struct tcp_send_list** last, struct timeval* now) 668 { 669 ssize_t r; 670 struct tcp_send_list* p; 671 while(*first) { 672 p = *first; 673 /* is the item ready? */ 674 if(!dl_tv_smaller(&p->wait, now)) 675 return 1; 676 /* write it */ 677 r = send(s, (void*)(p->item + p->done), p->len - p->done, 0); 678 if(r == -1) { 679 #ifndef USE_WINSOCK 680 if(errno == EAGAIN || errno == EINTR) 681 return 1; 682 #else 683 if(WSAGetLastError() == WSAEWOULDBLOCK || 684 WSAGetLastError() == WSAEINPROGRESS) 685 return 1; 686 #endif 687 log_err("tcp write: %s", sock_strerror(errno)); 688 return 0; 689 } else if(r == 0) { 690 /* closed */ 691 return 0; 692 } 693 /* account it */ 694 p->done += (size_t)r; 695 verbose(1, "write item %d of %d", (int)p->done, (int)p->len); 696 if(p->done >= p->len) { 697 free(p->item); 698 *first = p->next; 699 if(!*first) 700 *last = NULL; 701 free(p); 702 } else { 703 /* partial write */ 704 return 1; 705 } 706 } 707 return 1; 708 } 709 710 /** perform TCP relaying */ 711 static void 712 service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now, 713 struct timeval* delay, struct timeval* tcp_timeout, sldns_buffer* pkt, 714 fd_set* rset, fd_set* rorig, fd_set* worig) 715 { 716 struct tcp_proxy* p, **prev; 717 struct timeval tout; 718 int delete_it; 719 p = *tcp_proxies; 720 prev = tcp_proxies; 721 tout = *now; 722 dl_tv_add(&tout, tcp_timeout); 723 724 while(p) { 725 delete_it = 0; 726 /* can we receive further queries? */ 727 if(!delete_it && FD_ISSET(p->client_s, rset)) { 728 p->timeout = tout; 729 log_addr(1, "read tcp query", &p->addr, p->addr_len); 730 if(!tcp_relay_read(p->client_s, &p->querylist, 731 &p->querylast, now, delay, pkt)) 732 delete_it = 1; 733 } 734 /* can we receive further answers? */ 735 if(!delete_it && p->server_s != -1 && 736 FD_ISSET(p->server_s, rset)) { 737 p->timeout = tout; 738 log_addr(1, "read tcp answer", &p->addr, p->addr_len); 739 if(!tcp_relay_read(p->server_s, &p->answerlist, 740 &p->answerlast, now, delay, pkt)) { 741 sock_close(p->server_s); 742 FD_CLR(FD_SET_T p->server_s, worig); 743 FD_CLR(FD_SET_T p->server_s, rorig); 744 p->server_s = -1; 745 } 746 } 747 /* can we send on further queries */ 748 if(!delete_it && p->querylist && p->server_s != -1) { 749 p->timeout = tout; 750 if(dl_tv_smaller(&p->querylist->wait, now)) 751 log_addr(1, "write tcp query", 752 &p->addr, p->addr_len); 753 if(!tcp_relay_write(p->server_s, &p->querylist, 754 &p->querylast, now)) 755 delete_it = 1; 756 if(p->querylist && 757 dl_tv_smaller(&p->querylist->wait, now)) 758 FD_SET(FD_SET_T p->server_s, worig); 759 else FD_CLR(FD_SET_T p->server_s, worig); 760 } 761 762 /* can we send on further answers */ 763 if(!delete_it && p->answerlist) { 764 p->timeout = tout; 765 if(dl_tv_smaller(&p->answerlist->wait, now)) 766 log_addr(1, "write tcp answer", 767 &p->addr, p->addr_len); 768 if(!tcp_relay_write(p->client_s, &p->answerlist, 769 &p->answerlast, now)) 770 delete_it = 1; 771 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, 772 now)) 773 FD_SET(FD_SET_T p->client_s, worig); 774 else FD_CLR(FD_SET_T p->client_s, worig); 775 if(!p->answerlist && p->server_s == -1) 776 delete_it = 1; 777 } 778 779 /* does this entry timeout? (unused too long) */ 780 if(dl_tv_smaller(&p->timeout, now)) { 781 delete_it = 1; 782 } 783 if(delete_it) { 784 struct tcp_proxy* np = p->next; 785 *prev = np; 786 FD_CLR(FD_SET_T p->client_s, rorig); 787 FD_CLR(FD_SET_T p->client_s, worig); 788 if(p->server_s != -1) { 789 FD_CLR(FD_SET_T p->server_s, rorig); 790 FD_CLR(FD_SET_T p->server_s, worig); 791 } 792 tcp_proxy_delete(p); 793 p = np; 794 continue; 795 } 796 797 prev = &p->next; 798 p = p->next; 799 } 800 } 801 802 /** find waiting time */ 803 static int 804 service_findwait(struct timeval* now, struct timeval* wait, 805 struct ringbuf* ring, struct tcp_proxy* tcplist) 806 { 807 /* first item is the time to wait */ 808 struct timeval* peek = ring_peek_time(ring); 809 struct timeval tcv; 810 int have_tcpval = 0; 811 struct tcp_proxy* p; 812 813 /* also for TCP list the first in sendlists is the time to wait */ 814 for(p=tcplist; p; p=p->next) { 815 if(!have_tcpval) 816 tcv = p->timeout; 817 have_tcpval = 1; 818 if(dl_tv_smaller(&p->timeout, &tcv)) 819 tcv = p->timeout; 820 if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv)) 821 tcv = p->querylist->wait; 822 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv)) 823 tcv = p->answerlist->wait; 824 } 825 if(peek) { 826 /* peek can be unaligned */ 827 /* use wait as a temp variable */ 828 memmove(wait, peek, sizeof(*wait)); 829 if(!have_tcpval) 830 tcv = *wait; 831 else if(dl_tv_smaller(wait, &tcv)) 832 tcv = *wait; 833 have_tcpval = 1; 834 } 835 if(have_tcpval) { 836 *wait = tcv; 837 dl_tv_subtract(wait, now); 838 return 1; 839 } 840 /* nothing, block */ 841 return 0; 842 } 843 844 /** clear proxy list */ 845 static void 846 proxy_list_clear(struct proxy* p) 847 { 848 char from[109]; 849 struct proxy* np; 850 int i=0, port; 851 while(p) { 852 np = p->next; 853 port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port); 854 if(addr_is_ip6(&p->addr, p->addr_len)) { 855 if(inet_ntop(AF_INET6, 856 &((struct sockaddr_in6*)&p->addr)->sin6_addr, 857 from, (socklen_t)sizeof(from)) == 0) 858 (void)strlcpy(from, "err", sizeof(from)); 859 } else { 860 if(inet_ntop(AF_INET, 861 &((struct sockaddr_in*)&p->addr)->sin_addr, 862 from, (socklen_t)sizeof(from)) == 0) 863 (void)strlcpy(from, "err", sizeof(from)); 864 } 865 printf("client[%d]: last %s@%d of %d : %u in, %u out, " 866 "%u returned\n", i++, from, port, (int)p->numreuse+1, 867 (unsigned)p->numwait, (unsigned)p->numsent, 868 (unsigned)p->numreturn); 869 sock_close(p->s); 870 free(p); 871 p = np; 872 } 873 } 874 875 /** clear TCP proxy list */ 876 static void 877 tcp_proxy_list_clear(struct tcp_proxy* p) 878 { 879 struct tcp_proxy* np; 880 while(p) { 881 np = p->next; 882 tcp_proxy_delete(p); 883 p = np; 884 } 885 } 886 887 /** delayer service loop */ 888 static void 889 service_loop(int udp_s, int listen_s, struct ringbuf* ring, 890 struct timeval* delay, struct timeval* reuse, 891 struct sockaddr_storage* srv_addr, socklen_t srv_len, 892 sldns_buffer* pkt) 893 { 894 fd_set rset, rorig; 895 fd_set wset, worig; 896 struct timeval now, wait; 897 int max, have_wait = 0; 898 struct proxy* proxies = NULL; 899 struct tcp_proxy* tcp_proxies = NULL; 900 struct timeval tcp_timeout; 901 tcp_timeout.tv_sec = 120; 902 tcp_timeout.tv_usec = 0; 903 #ifndef S_SPLINT_S 904 FD_ZERO(&rorig); 905 FD_ZERO(&worig); 906 FD_SET(FD_SET_T udp_s, &rorig); 907 FD_SET(FD_SET_T listen_s, &rorig); 908 #endif 909 max = udp_s + 1; 910 if(listen_s + 1 > max) max = listen_s + 1; 911 while(!do_quit) { 912 /* wait for events */ 913 rset = rorig; 914 wset = worig; 915 if(have_wait) 916 verbose(1, "wait for %d.%6.6d", 917 (unsigned)wait.tv_sec, (unsigned)wait.tv_usec); 918 else verbose(1, "wait"); 919 if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) { 920 if(errno == EAGAIN || errno == EINTR) 921 continue; 922 fatal_exit("select: %s", strerror(errno)); 923 } 924 /* get current time */ 925 if(gettimeofday(&now, NULL) < 0) { 926 if(errno == EAGAIN || errno == EINTR) 927 continue; 928 fatal_exit("gettimeofday: %s", strerror(errno)); 929 } 930 verbose(1, "process at %u.%6.6u\n", 931 (unsigned)now.tv_sec, (unsigned)now.tv_usec); 932 /* sendout delayed queries to master server (frees up buffer)*/ 933 service_send(ring, &now, pkt, srv_addr, srv_len); 934 /* proxy return replies */ 935 service_proxy(&rset, udp_s, proxies, pkt, &now); 936 /* see what can be received to start waiting */ 937 service_recv(udp_s, ring, pkt, &rorig, &max, &proxies, 938 srv_addr, srv_len, &now, delay, reuse); 939 /* see if there are new tcp connections */ 940 service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies, 941 srv_addr, srv_len, &now, &tcp_timeout); 942 /* service tcp connections */ 943 service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout, 944 pkt, &rset, &rorig, &worig); 945 /* see what next timeout is (if any) */ 946 have_wait = service_findwait(&now, &wait, ring, tcp_proxies); 947 } 948 proxy_list_clear(proxies); 949 tcp_proxy_list_clear(tcp_proxies); 950 } 951 952 /** delayer main service routine */ 953 static void 954 service(const char* bind_str, int bindport, const char* serv_str, 955 size_t memsize, int delay_msec) 956 { 957 struct sockaddr_storage bind_addr, srv_addr; 958 socklen_t bind_len, srv_len; 959 struct ringbuf* ring = ring_create(memsize); 960 struct timeval delay, reuse; 961 sldns_buffer* pkt; 962 int i, s, listen_s; 963 #ifndef S_SPLINT_S 964 delay.tv_sec = delay_msec / 1000; 965 delay.tv_usec = (delay_msec % 1000)*1000; 966 #endif 967 reuse = delay; /* reuse is max(4*delay, 1 second) */ 968 dl_tv_add(&reuse, &delay); 969 dl_tv_add(&reuse, &delay); 970 dl_tv_add(&reuse, &delay); 971 if(reuse.tv_sec == 0) 972 reuse.tv_sec = 1; 973 if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) { 974 printf("cannot parse forward address: %s\n", serv_str); 975 exit(1); 976 } 977 pkt = sldns_buffer_new(65535); 978 if(!pkt) 979 fatal_exit("out of memory"); 980 if( signal(SIGINT, delayer_sigh) == SIG_ERR || 981 #ifdef SIGHUP 982 signal(SIGHUP, delayer_sigh) == SIG_ERR || 983 #endif 984 #ifdef SIGQUIT 985 signal(SIGQUIT, delayer_sigh) == SIG_ERR || 986 #endif 987 #ifdef SIGBREAK 988 signal(SIGBREAK, delayer_sigh) == SIG_ERR || 989 #endif 990 #ifdef SIGALRM 991 signal(SIGALRM, delayer_sigh) == SIG_ERR || 992 #endif 993 signal(SIGTERM, delayer_sigh) == SIG_ERR) 994 fatal_exit("could not bind to signal"); 995 /* bind UDP port */ 996 if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 997 SOCK_DGRAM, 0)) == -1) { 998 fatal_exit("socket: %s", sock_strerror(errno)); 999 } 1000 i=0; 1001 if(bindport == 0) { 1002 bindport = 1024 + ((int)arc4random())%64000; 1003 i = 100; 1004 } 1005 while(1) { 1006 if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) { 1007 printf("cannot parse listen address: %s\n", bind_str); 1008 exit(1); 1009 } 1010 if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1011 log_err("bind: %s", sock_strerror(errno)); 1012 if(i--==0) 1013 fatal_exit("cannot bind any port"); 1014 bindport = 1024 + ((int)arc4random())%64000; 1015 } else break; 1016 } 1017 fd_set_nonblock(s); 1018 /* and TCP port */ 1019 if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 1020 SOCK_STREAM, 0)) == -1) { 1021 fatal_exit("tcp socket: %s", sock_strerror(errno)); 1022 } 1023 #ifdef SO_REUSEADDR 1024 if(1) { 1025 int on = 1; 1026 if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void*)&on, 1027 (socklen_t)sizeof(on)) < 0) 1028 fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s", 1029 sock_strerror(errno)); 1030 } 1031 #endif 1032 if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1033 fatal_exit("tcp bind: %s", sock_strerror(errno)); 1034 } 1035 if(listen(listen_s, 5) == -1) { 1036 fatal_exit("tcp listen: %s", sock_strerror(errno)); 1037 } 1038 fd_set_nonblock(listen_s); 1039 printf("listening on port: %d\n", bindport); 1040 1041 /* process loop */ 1042 do_quit = 0; 1043 service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len, 1044 pkt); 1045 1046 /* cleanup */ 1047 verbose(1, "cleanup"); 1048 sock_close(s); 1049 sock_close(listen_s); 1050 sldns_buffer_free(pkt); 1051 ring_delete(ring); 1052 } 1053 1054 /** getopt global, in case header files fail to declare it. */ 1055 extern int optind; 1056 /** getopt global, in case header files fail to declare it. */ 1057 extern char* optarg; 1058 1059 /** main program for delayer */ 1060 int main(int argc, char** argv) 1061 { 1062 int c; /* defaults */ 1063 const char* server = "127.0.0.1@53"; 1064 const char* bindto = "0.0.0.0"; 1065 int bindport = 0; 1066 size_t memsize = 10*1024*1024; 1067 int delay = 100; 1068 1069 verbosity = 0; 1070 log_init(0, 0, 0); 1071 log_ident_set("delayer"); 1072 if(argc == 1) usage(argv); 1073 while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) { 1074 switch(c) { 1075 case 'b': 1076 bindto = optarg; 1077 break; 1078 case 'd': 1079 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1080 printf("bad delay: %s\n", optarg); 1081 return 1; 1082 } 1083 delay = atoi(optarg); 1084 break; 1085 case 'f': 1086 server = optarg; 1087 break; 1088 case 'm': 1089 if(!cfg_parse_memsize(optarg, &memsize)) { 1090 printf("bad memsize: %s\n", optarg); 1091 return 1; 1092 } 1093 break; 1094 case 'p': 1095 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1096 printf("bad port nr: %s\n", optarg); 1097 return 1; 1098 } 1099 bindport = atoi(optarg); 1100 break; 1101 case 'h': 1102 case '?': 1103 default: 1104 usage(argv); 1105 } 1106 } 1107 argc -= optind; 1108 argv += optind; 1109 if(argc != 0) 1110 usage(argv); 1111 1112 printf("bind to %s @ %d and forward to %s after %d msec\n", 1113 bindto, bindport, server, delay); 1114 service(bindto, bindport, server, memsize, delay); 1115 return 0; 1116 } 1117