xref: /openbsd/usr.sbin/unbound/testcode/delayer.c (revision 55cc5ba3)
1 /*
2  * testcode/delayer.c - debug program that delays queries to a server.
3  *
4  * Copyright (c) 2008, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /**
37  * \file
38  *
39  * This program delays queries made. It performs as a proxy to another
40  * server and delays queries to it.
41  */
42 
43 #include "config.h"
44 #ifdef HAVE_GETOPT_H
45 #include <getopt.h>
46 #endif
47 #ifdef HAVE_TIME_H
48 #include <time.h>
49 #endif
50 #include <sys/time.h>
51 #include "util/net_help.h"
52 #include "util/config_file.h"
53 #include "sldns/sbuffer.h"
54 #include <signal.h>
55 
56 /** number of reads per select for delayer */
57 #define TRIES_PER_SELECT 100
58 
59 /**
60  * The ring buffer
61  */
62 struct ringbuf {
63 	/** base of buffer */
64 	uint8_t* buf;
65 	/** size of buffer */
66 	size_t size;
67 	/** low mark, items start here */
68 	size_t low;
69 	/** high mark, items end here */
70 	size_t high;
71 };
72 
73 /**
74  * List of proxy fds that return replies from the server to our clients.
75  */
76 struct proxy {
77 	/** the fd to listen for replies from server */
78 	int s;
79 	/** last time this was used */
80 	struct timeval lastuse;
81 	/** remote address */
82 	struct sockaddr_storage addr;
83 	/** length of addr */
84 	socklen_t addr_len;
85 	/** number of queries waiting (in total) */
86 	size_t numwait;
87 	/** number of queries sent to server (in total) */
88 	size_t numsent;
89 	/** numberof answers returned to client (in total) */
90 	size_t numreturn;
91 	/** how many times repurposed */
92 	size_t numreuse;
93 	/** next in proxylist */
94 	struct proxy* next;
95 };
96 
97 /**
98  * An item that has to be TCP relayed
99  */
100 struct tcp_send_list {
101 	/** the data item */
102 	uint8_t* item;
103 	/** size of item */
104 	size_t len;
105 	/** time when the item can be transmitted on */
106 	struct timeval wait;
107 	/** how much of the item has already been transmitted */
108 	size_t done;
109 	/** next in list */
110 	struct tcp_send_list* next;
111 };
112 
113 /**
114  * List of TCP proxy fd pairs to TCP connect client to server
115  */
116 struct tcp_proxy {
117 	/** the fd to listen for client query */
118 	int client_s;
119 	/** the fd to listen for server answer */
120 	int server_s;
121 
122 	/** remote client address */
123 	struct sockaddr_storage addr;
124 	/** length of address */
125 	socklen_t addr_len;
126 	/** timeout on this entry */
127 	struct timeval timeout;
128 
129 	/** list of query items to send to server */
130 	struct tcp_send_list* querylist;
131 	/** last in query list */
132 	struct tcp_send_list* querylast;
133 	/** list of answer items to send to client */
134 	struct tcp_send_list* answerlist;
135 	/** last in answerlist */
136 	struct tcp_send_list* answerlast;
137 
138 	/** next in list */
139 	struct tcp_proxy* next;
140 };
141 
142 /** usage information for delayer */
143 static void usage(char* argv[])
144 {
145 	printf("usage: %s [options]\n", argv[0]);
146 	printf("	-f addr : use addr, forward to that server, @port.\n");
147 	printf("	-b addr : bind to this address to listen.\n");
148 	printf("	-p port : bind to this port (use 0 for random).\n");
149 	printf("	-m mem	: use this much memory for waiting queries.\n");
150 	printf("	-d delay: UDP queries are delayed n milliseconds.\n");
151 	printf("		  TCP is delayed twice (on send, on recv).\n");
152 	printf("	-h 	: this help message\n");
153 	exit(1);
154 }
155 
156 /** timeval compare, t1 < t2 */
157 static int
158 dl_tv_smaller(struct timeval* t1, const struct timeval* t2)
159 {
160 #ifndef S_SPLINT_S
161 	if(t1->tv_sec < t2->tv_sec)
162 		return 1;
163 	if(t1->tv_sec == t2->tv_sec &&
164 		t1->tv_usec < t2->tv_usec)
165 		return 1;
166 #endif
167 	return 0;
168 }
169 
170 /** timeval add, t1 += t2 */
171 static void
172 dl_tv_add(struct timeval* t1, const struct timeval* t2)
173 {
174 #ifndef S_SPLINT_S
175 	t1->tv_sec += t2->tv_sec;
176 	t1->tv_usec += t2->tv_usec;
177 	while(t1->tv_usec >= 1000000) {
178 		t1->tv_usec -= 1000000;
179 		t1->tv_sec++;
180 	}
181 #endif
182 }
183 
184 /** timeval subtract, t1 -= t2 */
185 static void
186 dl_tv_subtract(struct timeval* t1, const struct timeval* t2)
187 {
188 #ifndef S_SPLINT_S
189 	t1->tv_sec -= t2->tv_sec;
190 	if(t1->tv_usec >= t2->tv_usec) {
191 		t1->tv_usec -= t2->tv_usec;
192 	} else {
193 		t1->tv_sec--;
194 		t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec);
195 	}
196 #endif
197 }
198 
199 
200 /** create new ring buffer */
201 static struct ringbuf*
202 ring_create(size_t sz)
203 {
204 	struct ringbuf* r = (struct ringbuf*)calloc(1, sizeof(*r));
205 	if(!r) fatal_exit("out of memory");
206 	r->buf = (uint8_t*)malloc(sz);
207 	if(!r->buf) fatal_exit("out of memory");
208 	r->size = sz;
209 	r->low = 0;
210 	r->high = 0;
211 	return r;
212 }
213 
214 /** delete ring buffer */
215 static void
216 ring_delete(struct ringbuf* r)
217 {
218 	if(!r) return;
219 	free(r->buf);
220 	free(r);
221 }
222 
223 /** add entry to ringbuffer */
224 static void
225 ring_add(struct ringbuf* r, sldns_buffer* pkt, struct timeval* now,
226 	struct timeval* delay, struct proxy* p)
227 {
228 	/* time -- proxy* -- 16bitlen -- message */
229 	uint16_t len = (uint16_t)sldns_buffer_limit(pkt);
230 	struct timeval when;
231 	size_t needed;
232 	uint8_t* where = NULL;
233 	log_assert(sldns_buffer_limit(pkt) <= 65535);
234 	needed = sizeof(when) + sizeof(p) + sizeof(len) + len;
235 	/* put item into ringbuffer */
236 	if(r->low < r->high) {
237 		/* used part is in the middle */
238 		if(r->size - r->high >= needed) {
239 			where = r->buf + r->high;
240 			r->high += needed;
241 		} else if(r->low > needed) {
242 			/* wrap around ringbuffer */
243 			/* make sure r->low == r->high means empty */
244 			/* so r->low == r->high cannot be used to signify
245 			 * a completely full ringbuf */
246 			if(r->size - r->high > sizeof(when)+sizeof(p)) {
247 				/* zero entry at end of buffer */
248 				memset(r->buf+r->high, 0,
249 					sizeof(when)+sizeof(p));
250 			}
251 			where = r->buf;
252 			r->high = needed;
253 		} else {
254 			/* drop message */
255 			log_warn("warning: mem full, dropped message");
256 			return;
257 		}
258 	} else {
259 		/* empty */
260 		if(r->high == r->low) {
261 			where = r->buf;
262 			r->low = 0;
263 			r->high = needed;
264 		/* unused part is in the middle */
265 		/* so ringbuffer has wrapped around */
266 		} else if(r->low - r->high > needed) {
267 			where = r->buf + r->high;
268 			r->high += needed;
269 		} else {
270 			log_warn("warning: mem full, dropped message");
271 			return;
272 		}
273 	}
274 	when = *now;
275 	dl_tv_add(&when, delay);
276 	/* copy it at where part */
277 	log_assert(where != NULL);
278 	memmove(where, &when, sizeof(when));
279 	memmove(where+sizeof(when), &p, sizeof(p));
280 	memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len));
281 	memmove(where+sizeof(when)+sizeof(p)+sizeof(len),
282 		sldns_buffer_begin(pkt), len);
283 }
284 
285 /** see if the ringbuffer is empty */
286 static int
287 ring_empty(struct ringbuf* r)
288 {
289 	return (r->low == r->high);
290 }
291 
292 /** peek at timevalue for next item in ring */
293 static struct timeval*
294 ring_peek_time(struct ringbuf* r)
295 {
296 	if(ring_empty(r))
297 		return NULL;
298 	return (struct timeval*)&r->buf[r->low];
299 }
300 
301 /** get entry from ringbuffer */
302 static int
303 ring_pop(struct ringbuf* r, sldns_buffer* pkt, struct timeval* tv,
304 	struct proxy** p)
305 {
306 	/* time -- proxy* -- 16bitlen -- message */
307 	uint16_t len;
308 	uint8_t* where = NULL;
309 	size_t done;
310 	if(r->low == r->high)
311 		return 0;
312 	where = r->buf + r->low;
313 	memmove(tv, where, sizeof(*tv));
314 	memmove(p, where+sizeof(*tv), sizeof(*p));
315 	memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len));
316 	memmove(sldns_buffer_begin(pkt),
317 		where+sizeof(*tv)+sizeof(*p)+sizeof(len), len);
318 	sldns_buffer_set_limit(pkt, (size_t)len);
319 	done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len;
320 	/* move lowmark */
321 	if(r->low < r->high) {
322 		/* used part in middle */
323 		log_assert(r->high - r->low >= done);
324 		r->low += done;
325 	} else {
326 		/* unused part in middle */
327 		log_assert(r->size - r->low >= done);
328 		r->low += done;
329 		if(r->size - r->low > sizeof(*tv)+sizeof(*p)) {
330 			/* see if it is zeroed; means end of buffer */
331 			struct proxy* pz;
332 			memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz));
333 			if(pz == NULL)
334 				r->low = 0;
335 		} else r->low = 0;
336 	}
337 	if(r->low == r->high) {
338 		r->low = 0; /* reset if empty */
339 		r->high = 0;
340 	}
341 	return 1;
342 }
343 
344 /** signal handler global info */
345 static volatile int do_quit = 0;
346 
347 /** signal handler for user quit */
348 static RETSIGTYPE delayer_sigh(int sig)
349 {
350 	printf("exit on signal %d\n", sig);
351 	do_quit = 1;
352 }
353 
354 /** send out waiting packets */
355 static void
356 service_send(struct ringbuf* ring, struct timeval* now, sldns_buffer* pkt,
357 	struct sockaddr_storage* srv_addr, socklen_t srv_len)
358 {
359 	struct proxy* p;
360 	struct timeval tv;
361 	ssize_t sent;
362 	while(!ring_empty(ring) &&
363 		dl_tv_smaller(ring_peek_time(ring), now)) {
364 		/* this items needs to be sent out */
365 		if(!ring_pop(ring, pkt, &tv, &p))
366 			fatal_exit("ringbuf error: pop failed");
367 		verbose(1, "send out query %d.%6.6d",
368 			(unsigned)tv.tv_sec, (unsigned)tv.tv_usec);
369 		log_addr(1, "from client", &p->addr, p->addr_len);
370 		/* send it */
371 		sent = sendto(p->s, (void*)sldns_buffer_begin(pkt),
372 			sldns_buffer_limit(pkt), 0,
373 			(struct sockaddr*)srv_addr, srv_len);
374 		if(sent == -1) {
375 			log_err("sendto: %s", sock_strerror(errno));
376 		} else if(sent != (ssize_t)sldns_buffer_limit(pkt)) {
377 			log_err("sendto: partial send");
378 		}
379 		p->lastuse = *now;
380 		p->numsent++;
381 	}
382 }
383 
384 /** do proxy for one readable client */
385 static void
386 do_proxy(struct proxy* p, int retsock, sldns_buffer* pkt)
387 {
388 	int i;
389 	ssize_t r;
390 	for(i=0; i<TRIES_PER_SELECT; i++) {
391 		r = recv(p->s, (void*)sldns_buffer_begin(pkt),
392 			sldns_buffer_capacity(pkt), 0);
393 		if(r == -1) {
394 #ifndef USE_WINSOCK
395 			if(errno == EAGAIN || errno == EINTR)
396 				return;
397 #else
398 			if(WSAGetLastError() == WSAEINPROGRESS ||
399 				WSAGetLastError() == WSAEWOULDBLOCK)
400 				return;
401 #endif
402 			log_err("recv: %s", sock_strerror(errno));
403 			return;
404 		}
405 		sldns_buffer_set_limit(pkt, (size_t)r);
406 		log_addr(1, "return reply to client", &p->addr, p->addr_len);
407 		/* send reply back to the real client */
408 		p->numreturn++;
409 		r = sendto(retsock, (void*)sldns_buffer_begin(pkt), (size_t)r,
410 			0, (struct sockaddr*)&p->addr, p->addr_len);
411 		if(r == -1) {
412 			log_err("sendto: %s", sock_strerror(errno));
413 		}
414 	}
415 }
416 
417 /** proxy return replies to clients */
418 static void
419 service_proxy(fd_set* rset, int retsock, struct proxy* proxies,
420 	sldns_buffer* pkt, struct timeval* now)
421 {
422 	struct proxy* p;
423 	for(p = proxies; p; p = p->next) {
424 		if(FD_ISSET(p->s, rset)) {
425 			p->lastuse = *now;
426 			do_proxy(p, retsock, pkt);
427 		}
428 	}
429 }
430 
431 /** find or else create proxy for this remote client */
432 static struct proxy*
433 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
434 	fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
435 	struct timeval* now, struct timeval* reuse_timeout)
436 {
437 	struct proxy* p;
438 	struct timeval t;
439 	for(p = *proxies; p; p = p->next) {
440 		if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
441 			return p;
442 	}
443 	/* possibly: reuse lapsed entries */
444 	for(p = *proxies; p; p = p->next) {
445 		if(p->numwait > p->numsent || p->numsent > p->numreturn)
446 			continue;
447 		t = *now;
448 		dl_tv_subtract(&t, &p->lastuse);
449 		if(dl_tv_smaller(&t, reuse_timeout))
450 			continue;
451 		/* yes! */
452 		verbose(1, "reuse existing entry");
453 		memmove(&p->addr, from, from_len);
454 		p->addr_len = from_len;
455 		p->numreuse++;
456 		return p;
457 	}
458 	/* create new */
459 	p = (struct proxy*)calloc(1, sizeof(*p));
460 	if(!p) fatal_exit("out of memory");
461 	p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
462 	if(p->s == -1) {
463 		fatal_exit("socket: %s", sock_strerror(errno));
464 	}
465 	fd_set_nonblock(p->s);
466 	memmove(&p->addr, from, from_len);
467 	p->addr_len = from_len;
468 	p->next = *proxies;
469 	*proxies = p;
470 	FD_SET(FD_SET_T p->s, rorig);
471 	if(p->s+1 > *max)
472 		*max = p->s+1;
473 	return p;
474 }
475 
476 /** recv new waiting packets */
477 static void
478 service_recv(int s, struct ringbuf* ring, sldns_buffer* pkt,
479 	fd_set* rorig, int* max, struct proxy** proxies,
480 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
481 	struct timeval* now, struct timeval* delay, struct timeval* reuse)
482 {
483 	int i;
484 	struct sockaddr_storage from;
485 	socklen_t from_len;
486 	ssize_t len;
487 	struct proxy* p;
488 	for(i=0; i<TRIES_PER_SELECT; i++) {
489 		from_len = (socklen_t)sizeof(from);
490 		len = recvfrom(s, (void*)sldns_buffer_begin(pkt),
491 			sldns_buffer_capacity(pkt), 0,
492 			(struct sockaddr*)&from, &from_len);
493 		if(len < 0) {
494 #ifndef USE_WINSOCK
495 			if(errno == EAGAIN || errno == EINTR)
496 				return;
497 #else
498 			if(WSAGetLastError() == WSAEWOULDBLOCK ||
499 				WSAGetLastError() == WSAEINPROGRESS)
500 				return;
501 #endif
502 			fatal_exit("recvfrom: %s", sock_strerror(errno));
503 		}
504 		sldns_buffer_set_limit(pkt, (size_t)len);
505 		/* find its proxy element */
506 		p = find_create_proxy(&from, from_len, rorig, max, proxies,
507 			addr_is_ip6(srv_addr, srv_len), now, reuse);
508 		if(!p) fatal_exit("error: cannot find or create proxy");
509 		p->lastuse = *now;
510 		ring_add(ring, pkt, now, delay, p);
511 		p->numwait++;
512 		log_addr(1, "recv from client", &p->addr, p->addr_len);
513 	}
514 }
515 
516 /** delete tcp proxy */
517 static void
518 tcp_proxy_delete(struct tcp_proxy* p)
519 {
520 	struct tcp_send_list* s, *sn;
521 	if(!p)
522 		return;
523 	log_addr(1, "delete tcp proxy", &p->addr, p->addr_len);
524 	s = p->querylist;
525 	while(s) {
526 		sn = s->next;
527 		free(s->item);
528 		free(s);
529 		s = sn;
530 	}
531 	s = p->answerlist;
532 	while(s) {
533 		sn = s->next;
534 		free(s->item);
535 		free(s);
536 		s = sn;
537 	}
538 	sock_close(p->client_s);
539 	if(p->server_s != -1)
540 		sock_close(p->server_s);
541 	free(p);
542 }
543 
544 /** accept new TCP connections, and set them up */
545 static void
546 service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies,
547 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
548 	struct timeval* now, struct timeval* tcp_timeout)
549 {
550 	int newfd;
551 	struct sockaddr_storage addr;
552 	struct tcp_proxy* p;
553 	socklen_t addr_len;
554 	newfd = accept(s, (struct sockaddr*)&addr, &addr_len);
555 	if(newfd == -1) {
556 #ifndef USE_WINSOCK
557 		if(errno == EAGAIN || errno == EINTR)
558 			return;
559 #else
560 		if(WSAGetLastError() == WSAEWOULDBLOCK ||
561 			WSAGetLastError() == WSAEINPROGRESS ||
562 			WSAGetLastError() == WSAECONNRESET)
563 			return;
564 #endif
565 		fatal_exit("accept: %s", sock_strerror(errno));
566 	}
567 	p = (struct tcp_proxy*)calloc(1, sizeof(*p));
568 	if(!p) fatal_exit("out of memory");
569 	memmove(&p->addr, &addr, addr_len);
570 	p->addr_len = addr_len;
571 	log_addr(1, "new tcp proxy", &p->addr, p->addr_len);
572 	p->client_s = newfd;
573 	p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET,
574 		SOCK_STREAM, 0);
575 	if(p->server_s == -1) {
576 		fatal_exit("tcp socket: %s", sock_strerror(errno));
577 	}
578 	fd_set_nonblock(p->client_s);
579 	fd_set_nonblock(p->server_s);
580 	if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) {
581 #ifndef USE_WINSOCK
582 		if(errno != EINPROGRESS) {
583 			log_err("tcp connect: %s", strerror(errno));
584 #else
585 		if(WSAGetLastError() != WSAEWOULDBLOCK &&
586 			WSAGetLastError() != WSAEINPROGRESS) {
587 			log_err("tcp connect: %s",
588 				wsa_strerror(WSAGetLastError()));
589 #endif
590 			sock_close(p->server_s);
591 			sock_close(p->client_s);
592 			free(p);
593 			return;
594 		}
595 	}
596 	p->timeout = *now;
597 	dl_tv_add(&p->timeout, tcp_timeout);
598 
599 	/* listen to client and server */
600 	FD_SET(FD_SET_T p->client_s, rorig);
601 	FD_SET(FD_SET_T p->server_s, rorig);
602 	if(p->client_s+1 > *max)
603 		*max = p->client_s+1;
604 	if(p->server_s+1 > *max)
605 		*max = p->server_s+1;
606 
607 	/* add into proxy list */
608 	p->next = *proxies;
609 	*proxies = p;
610 }
611 
612 /** relay TCP, read a part */
613 static int
614 tcp_relay_read(int s, struct tcp_send_list** first,
615 	struct tcp_send_list** last, struct timeval* now,
616 	struct timeval* delay, sldns_buffer* pkt)
617 {
618 	struct tcp_send_list* item;
619 	ssize_t r = recv(s, (void*)sldns_buffer_begin(pkt),
620 		sldns_buffer_capacity(pkt), 0);
621 	if(r == -1) {
622 #ifndef USE_WINSOCK
623 		if(errno == EINTR || errno == EAGAIN)
624 			return 1;
625 #else
626 		if(WSAGetLastError() == WSAEINPROGRESS ||
627 			WSAGetLastError() == WSAEWOULDBLOCK)
628 			return 1;
629 #endif
630 		log_err("tcp read: %s", sock_strerror(errno));
631 		return 0;
632 	} else if(r == 0) {
633 		/* connection closed */
634 		return 0;
635 	}
636 	item = (struct tcp_send_list*)malloc(sizeof(*item));
637 	if(!item) {
638 		log_err("out of memory");
639 		return 0;
640 	}
641 	verbose(1, "read item len %d", (int)r);
642 	item->len = (size_t)r;
643 	item->item = memdup(sldns_buffer_begin(pkt), item->len);
644 	if(!item->item) {
645 		free(item);
646 		log_err("out of memory");
647 		return 0;
648 	}
649 	item->done = 0;
650 	item->wait = *now;
651 	dl_tv_add(&item->wait, delay);
652 	item->next = NULL;
653 
654 	/* link in */
655 	if(*first) {
656 		(*last)->next = item;
657 	} else {
658 		*first = item;
659 	}
660 	*last = item;
661 	return 1;
662 }
663 
664 /** relay TCP, write a part */
665 static int
666 tcp_relay_write(int s, struct tcp_send_list** first,
667 	struct tcp_send_list** last, struct timeval* now)
668 {
669 	ssize_t r;
670 	struct tcp_send_list* p;
671 	while(*first) {
672 		p = *first;
673 		/* is the item ready? */
674 		if(!dl_tv_smaller(&p->wait, now))
675 			return 1;
676 		/* write it */
677 		r = send(s, (void*)(p->item + p->done), p->len - p->done, 0);
678 		if(r == -1) {
679 #ifndef USE_WINSOCK
680 			if(errno == EAGAIN || errno == EINTR)
681 				return 1;
682 #else
683 			if(WSAGetLastError() == WSAEWOULDBLOCK ||
684 				WSAGetLastError() == WSAEINPROGRESS)
685 				return 1;
686 #endif
687 			log_err("tcp write: %s", sock_strerror(errno));
688 			return 0;
689 		} else if(r == 0) {
690 			/* closed */
691 			return 0;
692 		}
693 		/* account it */
694 		p->done += (size_t)r;
695 		verbose(1, "write item %d of %d", (int)p->done, (int)p->len);
696 		if(p->done >= p->len) {
697 			free(p->item);
698 			*first = p->next;
699 			if(!*first)
700 				*last = NULL;
701 			free(p);
702 		} else {
703 			/* partial write */
704 			return 1;
705 		}
706 	}
707 	return 1;
708 }
709 
710 /** perform TCP relaying */
711 static void
712 service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now,
713 	struct timeval* delay, struct timeval* tcp_timeout, sldns_buffer* pkt,
714 	fd_set* rset, fd_set* rorig, fd_set* worig)
715 {
716 	struct tcp_proxy* p, **prev;
717 	struct timeval tout;
718 	int delete_it;
719 	p = *tcp_proxies;
720 	prev = tcp_proxies;
721 	tout = *now;
722 	dl_tv_add(&tout, tcp_timeout);
723 
724 	while(p) {
725 		delete_it = 0;
726 		/* can we receive further queries? */
727 		if(!delete_it && FD_ISSET(p->client_s, rset)) {
728 			p->timeout = tout;
729 			log_addr(1, "read tcp query", &p->addr, p->addr_len);
730 			if(!tcp_relay_read(p->client_s, &p->querylist,
731 				&p->querylast, now, delay, pkt))
732 				delete_it = 1;
733 		}
734 		/* can we receive further answers? */
735 		if(!delete_it && p->server_s != -1 &&
736 			FD_ISSET(p->server_s, rset)) {
737 			p->timeout = tout;
738 			log_addr(1, "read tcp answer", &p->addr, p->addr_len);
739 			if(!tcp_relay_read(p->server_s, &p->answerlist,
740 				&p->answerlast, now, delay, pkt)) {
741 				sock_close(p->server_s);
742 				FD_CLR(FD_SET_T p->server_s, worig);
743 				FD_CLR(FD_SET_T p->server_s, rorig);
744 				p->server_s = -1;
745 			}
746 		}
747 		/* can we send on further queries */
748 		if(!delete_it && p->querylist && p->server_s != -1) {
749 			p->timeout = tout;
750 			if(dl_tv_smaller(&p->querylist->wait, now))
751 				log_addr(1, "write tcp query",
752 					&p->addr, p->addr_len);
753 			if(!tcp_relay_write(p->server_s, &p->querylist,
754 				&p->querylast, now))
755 				delete_it = 1;
756 			if(p->querylist &&
757 				dl_tv_smaller(&p->querylist->wait, now))
758 				FD_SET(FD_SET_T p->server_s, worig);
759 			else 	FD_CLR(FD_SET_T p->server_s, worig);
760 		}
761 
762 		/* can we send on further answers */
763 		if(!delete_it && p->answerlist) {
764 			p->timeout = tout;
765 			if(dl_tv_smaller(&p->answerlist->wait, now))
766 				log_addr(1, "write tcp answer",
767 					&p->addr, p->addr_len);
768 			if(!tcp_relay_write(p->client_s, &p->answerlist,
769 				&p->answerlast, now))
770 				delete_it = 1;
771 			if(p->answerlist && dl_tv_smaller(&p->answerlist->wait,
772 				now))
773 				FD_SET(FD_SET_T p->client_s, worig);
774 			else 	FD_CLR(FD_SET_T p->client_s, worig);
775 			if(!p->answerlist && p->server_s == -1)
776 				delete_it = 1;
777 		}
778 
779 		/* does this entry timeout? (unused too long) */
780 		if(dl_tv_smaller(&p->timeout, now)) {
781 			delete_it = 1;
782 		}
783 		if(delete_it) {
784 			struct tcp_proxy* np = p->next;
785 			*prev = np;
786 			FD_CLR(FD_SET_T p->client_s, rorig);
787 			FD_CLR(FD_SET_T p->client_s, worig);
788 			if(p->server_s != -1) {
789 				FD_CLR(FD_SET_T p->server_s, rorig);
790 				FD_CLR(FD_SET_T p->server_s, worig);
791 			}
792 			tcp_proxy_delete(p);
793 			p = np;
794 			continue;
795 		}
796 
797 		prev = &p->next;
798 		p = p->next;
799 	}
800 }
801 
802 /** find waiting time */
803 static int
804 service_findwait(struct timeval* now, struct timeval* wait,
805 	struct ringbuf* ring, struct tcp_proxy* tcplist)
806 {
807 	/* first item is the time to wait */
808 	struct timeval* peek = ring_peek_time(ring);
809 	struct timeval tcv;
810 	int have_tcpval = 0;
811 	struct tcp_proxy* p;
812 
813 	/* also for TCP list the first in sendlists is the time to wait */
814 	for(p=tcplist; p; p=p->next) {
815 		if(!have_tcpval)
816 			tcv = p->timeout;
817 		have_tcpval = 1;
818 		if(dl_tv_smaller(&p->timeout, &tcv))
819 			tcv = p->timeout;
820 		if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv))
821 			tcv = p->querylist->wait;
822 		if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv))
823 			tcv = p->answerlist->wait;
824 	}
825 	if(peek) {
826 		/* peek can be unaligned */
827 		/* use wait as a temp variable */
828 		memmove(wait, peek, sizeof(*wait));
829 		if(!have_tcpval)
830 			tcv = *wait;
831 		else if(dl_tv_smaller(wait, &tcv))
832 			tcv = *wait;
833 		have_tcpval = 1;
834 	}
835 	if(have_tcpval) {
836 		*wait = tcv;
837 		dl_tv_subtract(wait, now);
838 		return 1;
839 	}
840 	/* nothing, block */
841 	return 0;
842 }
843 
844 /** clear proxy list */
845 static void
846 proxy_list_clear(struct proxy* p)
847 {
848 	char from[109];
849 	struct proxy* np;
850 	int i=0, port;
851 	while(p) {
852 		np = p->next;
853 		port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port);
854 		if(addr_is_ip6(&p->addr, p->addr_len)) {
855 			if(inet_ntop(AF_INET6,
856 				&((struct sockaddr_in6*)&p->addr)->sin6_addr,
857 				from, (socklen_t)sizeof(from)) == 0)
858 				(void)strlcpy(from, "err", sizeof(from));
859 		} else {
860 			if(inet_ntop(AF_INET,
861 				&((struct sockaddr_in*)&p->addr)->sin_addr,
862 				from, (socklen_t)sizeof(from)) == 0)
863 				(void)strlcpy(from, "err", sizeof(from));
864 		}
865 		printf("client[%d]: last %s@%d of %d : %u in, %u out, "
866 			"%u returned\n", i++, from, port, (int)p->numreuse+1,
867 			(unsigned)p->numwait, (unsigned)p->numsent,
868 			(unsigned)p->numreturn);
869 		sock_close(p->s);
870 		free(p);
871 		p = np;
872 	}
873 }
874 
875 /** clear TCP proxy list */
876 static void
877 tcp_proxy_list_clear(struct tcp_proxy* p)
878 {
879 	struct tcp_proxy* np;
880 	while(p) {
881 		np = p->next;
882 		tcp_proxy_delete(p);
883 		p = np;
884 	}
885 }
886 
887 /** delayer service loop */
888 static void
889 service_loop(int udp_s, int listen_s, struct ringbuf* ring,
890 	struct timeval* delay, struct timeval* reuse,
891 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
892 	sldns_buffer* pkt)
893 {
894 	fd_set rset, rorig;
895 	fd_set wset, worig;
896 	struct timeval now, wait;
897 	int max, have_wait = 0;
898 	struct proxy* proxies = NULL;
899 	struct tcp_proxy* tcp_proxies = NULL;
900 	struct timeval tcp_timeout;
901 	tcp_timeout.tv_sec = 120;
902 	tcp_timeout.tv_usec = 0;
903 #ifndef S_SPLINT_S
904 	FD_ZERO(&rorig);
905 	FD_ZERO(&worig);
906 	FD_SET(FD_SET_T udp_s, &rorig);
907 	FD_SET(FD_SET_T listen_s, &rorig);
908 #endif
909 	max = udp_s + 1;
910 	if(listen_s + 1 > max) max = listen_s + 1;
911 	while(!do_quit) {
912 		/* wait for events */
913 		rset = rorig;
914 		wset = worig;
915 		if(have_wait)
916 			verbose(1, "wait for %d.%6.6d",
917 			(unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
918 		else	verbose(1, "wait");
919 		if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) {
920 			if(errno == EAGAIN || errno == EINTR)
921 				continue;
922 			fatal_exit("select: %s", strerror(errno));
923 		}
924 		/* get current time */
925 		if(gettimeofday(&now, NULL) < 0) {
926 			if(errno == EAGAIN || errno == EINTR)
927 				continue;
928 			fatal_exit("gettimeofday: %s", strerror(errno));
929 		}
930 		verbose(1, "process at %u.%6.6u\n",
931 			(unsigned)now.tv_sec, (unsigned)now.tv_usec);
932 		/* sendout delayed queries to master server (frees up buffer)*/
933 		service_send(ring, &now, pkt, srv_addr, srv_len);
934 		/* proxy return replies */
935 		service_proxy(&rset, udp_s, proxies, pkt, &now);
936 		/* see what can be received to start waiting */
937 		service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
938 			srv_addr, srv_len, &now, delay, reuse);
939 		/* see if there are new tcp connections */
940 		service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies,
941 			srv_addr, srv_len, &now, &tcp_timeout);
942 		/* service tcp connections */
943 		service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout,
944 			pkt, &rset, &rorig, &worig);
945 		/* see what next timeout is (if any) */
946 		have_wait = service_findwait(&now, &wait, ring, tcp_proxies);
947 	}
948 	proxy_list_clear(proxies);
949 	tcp_proxy_list_clear(tcp_proxies);
950 }
951 
952 /** delayer main service routine */
953 static void
954 service(const char* bind_str, int bindport, const char* serv_str,
955 	size_t memsize, int delay_msec)
956 {
957 	struct sockaddr_storage bind_addr, srv_addr;
958 	socklen_t bind_len, srv_len;
959 	struct ringbuf* ring = ring_create(memsize);
960 	struct timeval delay, reuse;
961 	sldns_buffer* pkt;
962 	int i, s, listen_s;
963 #ifndef S_SPLINT_S
964 	delay.tv_sec = delay_msec / 1000;
965 	delay.tv_usec = (delay_msec % 1000)*1000;
966 #endif
967 	reuse = delay; /* reuse is max(4*delay, 1 second) */
968 	dl_tv_add(&reuse, &delay);
969 	dl_tv_add(&reuse, &delay);
970 	dl_tv_add(&reuse, &delay);
971 	if(reuse.tv_sec == 0)
972 		reuse.tv_sec = 1;
973 	if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
974 		printf("cannot parse forward address: %s\n", serv_str);
975 		exit(1);
976 	}
977 	pkt = sldns_buffer_new(65535);
978 	if(!pkt)
979 		fatal_exit("out of memory");
980 	if( signal(SIGINT, delayer_sigh) == SIG_ERR ||
981 #ifdef SIGHUP
982 		signal(SIGHUP, delayer_sigh) == SIG_ERR ||
983 #endif
984 #ifdef SIGQUIT
985 		signal(SIGQUIT, delayer_sigh) == SIG_ERR ||
986 #endif
987 #ifdef SIGBREAK
988 		signal(SIGBREAK, delayer_sigh) == SIG_ERR ||
989 #endif
990 #ifdef SIGALRM
991 		signal(SIGALRM, delayer_sigh) == SIG_ERR ||
992 #endif
993 		signal(SIGTERM, delayer_sigh) == SIG_ERR)
994 		fatal_exit("could not bind to signal");
995 	/* bind UDP port */
996 	if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
997 		SOCK_DGRAM, 0)) == -1) {
998 		fatal_exit("socket: %s", sock_strerror(errno));
999 	}
1000 	i=0;
1001 	if(bindport == 0) {
1002 		bindport = 1024 + ((int)arc4random())%64000;
1003 		i = 100;
1004 	}
1005 	while(1) {
1006 		if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) {
1007 			printf("cannot parse listen address: %s\n", bind_str);
1008 			exit(1);
1009 		}
1010 		if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) {
1011 			log_err("bind: %s", sock_strerror(errno));
1012 			if(i--==0)
1013 				fatal_exit("cannot bind any port");
1014 			bindport = 1024 + ((int)arc4random())%64000;
1015 		} else break;
1016 	}
1017 	fd_set_nonblock(s);
1018 	/* and TCP port */
1019 	if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
1020 		SOCK_STREAM, 0)) == -1) {
1021 		fatal_exit("tcp socket: %s", sock_strerror(errno));
1022 	}
1023 #ifdef SO_REUSEADDR
1024 	if(1) {
1025 		int on = 1;
1026 		if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void*)&on,
1027 			(socklen_t)sizeof(on)) < 0)
1028 			fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s",
1029 				sock_strerror(errno));
1030 	}
1031 #endif
1032 	if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) {
1033 		fatal_exit("tcp bind: %s", sock_strerror(errno));
1034 	}
1035 	if(listen(listen_s, 5) == -1) {
1036 		fatal_exit("tcp listen: %s", sock_strerror(errno));
1037 	}
1038 	fd_set_nonblock(listen_s);
1039 	printf("listening on port: %d\n", bindport);
1040 
1041 	/* process loop */
1042 	do_quit = 0;
1043 	service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len,
1044 		pkt);
1045 
1046 	/* cleanup */
1047 	verbose(1, "cleanup");
1048 	sock_close(s);
1049 	sock_close(listen_s);
1050 	sldns_buffer_free(pkt);
1051 	ring_delete(ring);
1052 }
1053 
1054 /** getopt global, in case header files fail to declare it. */
1055 extern int optind;
1056 /** getopt global, in case header files fail to declare it. */
1057 extern char* optarg;
1058 
1059 /** main program for delayer */
1060 int main(int argc, char** argv)
1061 {
1062 	int c;		/* defaults */
1063 	const char* server = "127.0.0.1@53";
1064 	const char* bindto = "0.0.0.0";
1065 	int bindport = 0;
1066 	size_t memsize = 10*1024*1024;
1067 	int delay = 100;
1068 
1069 	verbosity = 0;
1070 	log_init(0, 0, 0);
1071 	log_ident_set("delayer");
1072 	if(argc == 1) usage(argv);
1073 	while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) {
1074 		switch(c) {
1075 			case 'b':
1076 				bindto = optarg;
1077 				break;
1078 			case 'd':
1079 				if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) {
1080 					printf("bad delay: %s\n", optarg);
1081 					return 1;
1082 				}
1083 				delay = atoi(optarg);
1084 				break;
1085 			case 'f':
1086 				server = optarg;
1087 				break;
1088 			case 'm':
1089 				if(!cfg_parse_memsize(optarg, &memsize)) {
1090 					printf("bad memsize: %s\n", optarg);
1091 					return 1;
1092 				}
1093 				break;
1094 			case 'p':
1095 				if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) {
1096 					printf("bad port nr: %s\n", optarg);
1097 					return 1;
1098 				}
1099 				bindport = atoi(optarg);
1100 				break;
1101 			case 'h':
1102 			case '?':
1103 			default:
1104 				usage(argv);
1105 		}
1106 	}
1107 	argc -= optind;
1108 	argv += optind;
1109 	if(argc != 0)
1110 		usage(argv);
1111 
1112 	printf("bind to %s @ %d and forward to %s after %d msec\n",
1113 		bindto, bindport, server, delay);
1114 	service(bindto, bindport, server, memsize, delay);
1115 	return 0;
1116 }
1117