xref: /freebsd/tools/tools/netmap/nmreplay.c (revision 6bfca4dc)
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 
26 
27 /*
28  * This program implements NMREPLAY, a program to replay a pcap file
29  * enforcing the output rate and possibly random losses and delay
30  * distributions.
31  * It is meant to be run from the command line and implemented with a main
32  * control thread for monitoring, plus a thread to push packets out.
33  *
34  * The control thread parses command line arguments, prepares a
35  * schedule for transmission in a memory buffer and then sits
36  * in a loop where it periodically reads traffic statistics from
37  * the other threads and prints them out on the console.
38  *
39  * The transmit buffer contains headers and packets. Each header
40  * includes a timestamp that determines when the packet should be sent out.
41  * A "consumer" thread cons() reads from the queue and transmits packets
42  * on the output netmap port when their time has come.
43  *
44  * The program does CPU pinning and sets the scheduler and priority
45  * for the "cons" threads. Externally one should do the
46  * assignment of other threads (e.g. interrupt handlers) and
47  * make sure that network interfaces are configured properly.
48  *
49  * --- Main functions of the program ---
50  * within each function, q is used as a pointer to the queue holding
51  * packets and parameters.
52  *
53  * pcap_prod()
54  *
55  *	reads from the pcap file and prepares packets to transmit.
56  *	After reading a packet from the pcap file, the following information
57  *	are extracted which can be used to determine the schedule:
58  *
59  *   	q->cur_pkt	points to the buffer containing the packet
60  *	q->cur_len	packet length, excluding CRC
61  *	q->cur_caplen	available packet length (may be shorter than cur_len)
62  *	q->cur_tt	transmission time for the packet, computed from the trace.
63  *
64  *  The following functions are then called in sequence:
65  *
66  *  q->c_loss (set with the -L command line option) decides
67  *	whether the packet should be dropped before even queuing.
68  *	This is generally useful to emulate random loss.
69  *	The function is supposed to set q->c_drop = 1 if the
70  *	packet should be dropped, or leave it to 0 otherwise.
71  *
72  *   q->c_bw (set with the -B command line option) is used to
73  *      enforce the transmit bandwidth. The function must store
74  *	in q->cur_tt the transmission time (in nanoseconds) of
75  *	the packet, which is typically proportional to the length
76  *	of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
77  *	Variants are possible, eg. to account for constant framing
78  *	bits as on the ethernet, or variable channel acquisition times,
79  *	etc.
80  *	This mechanism can also be used to simulate variable queueing
81  *	delay e.g. due to the presence of cross traffic.
82  *
83  *   q->c_delay (set with the -D option) implements delay emulation.
84  *	The function should set q->cur_delay to the additional
85  *	delay the packet is subject to. The framework will take care of
86  *	computing the actual exit time of a packet so that there is no
87  *	reordering.
88  */
89 
90 // debugging macros
91 #define NED(_fmt, ...)	do {} while (0)
92 #define ED(_fmt, ...)						\
93 	do {							\
94 		struct timeval _t0;				\
95 		gettimeofday(&_t0, NULL);			\
96 		fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
97 		(int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
98 		__FUNCTION__, __LINE__, ##__VA_ARGS__);     \
99 	} while (0)
100 
101 /* WWW is for warnings, EEE is for errors */
102 #define WWW(_fmt, ...)	ED("--WWW-- " _fmt, ##__VA_ARGS__)
103 #define EEE(_fmt, ...)	ED("--EEE-- " _fmt, ##__VA_ARGS__)
104 #define DDD(_fmt, ...)	ED("--DDD-- " _fmt, ##__VA_ARGS__)
105 
106 #define _GNU_SOURCE	// for CPU_SET() etc
107 #include <errno.h>
108 #include <fcntl.h>
109 #include <libnetmap.h>
110 #include <math.h> /* log, exp etc. */
111 #include <pthread.h>
112 #ifdef __FreeBSD__
113 #include <pthread_np.h> /* pthread w/ affinity */
114 #include <sys/cpuset.h> /* cpu_set */
115 #endif /* __FreeBSD__ */
116 #include <signal.h>
117 #include <stdio.h>
118 #include <stdlib.h>
119 #include <string.h> /* memcpy */
120 #include <stdint.h>
121 #include <sys/ioctl.h>
122 #include <sys/mman.h>
123 #include <sys/poll.h>
124 #include <sys/resource.h> // setpriority
125 #include <sys/time.h>
126 #include <unistd.h>
127 
128 /*
129  *
130  * A packet in the queue is q_pkt plus the payload.
131  *
132  * For the packet descriptor we need the following:
133  *
134  *  -	position of next packet in the queue (can go backwards).
135  *	We can reduce to 32 bits if we consider alignments,
136  *	or we just store the length to be added to the current
137  *	value and assume 0 as a special index.
138  *  -	actual packet length (16 bits may be ok)
139  *  -	queue output time, in nanoseconds (64 bits)
140  *  -	delay line output time, in nanoseconds
141  *	One of the two can be packed to a 32bit value
142  *
143  * A convenient coding uses 32 bytes per packet.
144  */
145 
146 struct q_pkt {
147 	uint64_t	next;		/* buffer index for next packet */
148 	uint64_t	pktlen;		/* actual packet len */
149 	uint64_t	pt_qout;	/* time of output from queue */
150 	uint64_t	pt_tx;		/* transmit time */
151 };
152 
153 
154 /*
155  * The header for a pcap file
156  */
157 struct pcap_file_header {
158     uint32_t magic;
159 	/*used to detect the file format itself and the byte
160     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
161     ordering format into this field. The reading application will read either
162     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
163     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
164     will have to be swapped too. For nanosecond-resolution files, the writing
165     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
166     bytes swapped, and the reading application will read either 0xa1b23c4d
167     (identical) or 0x4d3cb2a1 (swapped)*/
168     uint16_t version_major;
169     uint16_t version_minor; /*the version number of this file format */
170     int32_t thiszone;
171 	/*the correction time in seconds between GMT (UTC) and the
172     local timezone of the following packet header timestamps. Examples: If the
173     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
174     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
175     must be -3600*/
176     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
177     uint32_t snaplen;
178 	/*the "snapshot length" for the capture (typically 65535
179     or even more, but might be limited by the user)*/
180     uint32_t network;
181 	/*link-layer header type, specifying the type of headers
182     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
183     types such as 802.11, 802.11 with various radio information, PPP, Token
184     Ring, FDDI, etc.*/
185 };
186 
187 #if 0 /* from pcap.h */
188 struct pcap_file_header {
189         bpf_u_int32 magic;
190         u_short version_major;
191         u_short version_minor;
192         bpf_int32 thiszone;     /* gmt to local correction */
193         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
194         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
195         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
196 };
197 
198 struct pcap_pkthdr {
199         struct timeval ts;      /* time stamp */
200         bpf_u_int32 caplen;     /* length of portion present */
201         bpf_u_int32 len;        /* length this packet (off wire) */
202 };
203 #endif /* from pcap.h */
204 
205 struct pcap_pkthdr {
206     uint32_t ts_sec; /* seconds from epoch */
207     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
208     uint32_t caplen;
209 	/*the number of bytes of packet data actually captured
210     and saved in the file. This value should never become larger than orig_len
211     or the snaplen value of the global header*/
212     uint32_t len;	/* wire length */
213 };
214 
215 
216 #define PKT_PAD         (32)    /* padding on packets */
217 
pad(int x)218 static inline int pad(int x)
219 {
220         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
221 }
222 
223 
224 
225 /*
226  * wrapper around the pcap file.
227  * We mmap the file so it is easy to do multiple passes through it.
228  */
229 struct nm_pcap_file {
230     int fd;
231     uint64_t filesize;
232     const char *data; /* mmapped file */
233 
234     uint64_t tot_pkt;
235     uint64_t tot_bytes;
236     uint64_t tot_bytes_rounded;	/* need hdr + pad(len) */
237     uint32_t resolution; /* 1000 for us, 1 for ns */
238     int swap; /* need to swap fields ? */
239 
240     uint64_t first_ts;
241     uint64_t total_tx_time;
242 	/*
243 	 * total_tx_time is computed as last_ts - first_ts, plus the
244 	 * transmission time for the first packet which in turn is
245 	 * computed according to the average bandwidth
246 	 */
247 
248     uint64_t file_len;
249     const char *cur;	/* running pointer */
250     const char *lim;	/* data + file_len */
251     int err;
252 };
253 
254 static struct nm_pcap_file *readpcap(const char *fn);
255 static void destroy_pcap(struct nm_pcap_file *file);
256 
257 
258 #define NS_SCALE 1000000000UL	/* nanoseconds in 1s */
259 
destroy_pcap(struct nm_pcap_file * pf)260 static void destroy_pcap(struct nm_pcap_file *pf)
261 {
262     if (!pf)
263 	return;
264 
265     munmap((void *)(uintptr_t)pf->data, pf->filesize);
266     close(pf->fd);
267     bzero(pf, sizeof(*pf));
268     free(pf);
269     return;
270 }
271 
272 // convert a field of given size if swap is needed.
273 static uint32_t
cvt(const void * src,int size,char swap)274 cvt(const void *src, int size, char swap)
275 {
276     uint32_t ret = 0;
277     if (size != 2 && size != 4) {
278 	EEE("Invalid size %d\n", size);
279 	exit(1);
280     }
281     memcpy(&ret, src, size);
282     if (swap) {
283 	unsigned char tmp, *data = (unsigned char *)&ret;
284 	int i;
285         for (i = 0; i < size / 2; i++) {
286             tmp = data[i];
287             data[i] = data[size - (1 + i)];
288             data[size - (1 + i)] = tmp;
289         }
290     }
291     return ret;
292 }
293 
294 static uint32_t
read_next_info(struct nm_pcap_file * pf,int size)295 read_next_info(struct nm_pcap_file *pf, int size)
296 {
297     const char *end = pf->cur + size;
298     uint32_t ret;
299     if (end > pf->lim) {
300 	pf->err = 1;
301 	ret = 0;
302     } else {
303 	ret = cvt(pf->cur, size, pf->swap);
304 	pf->cur = end;
305     }
306     return ret;
307 }
308 
309 /*
310  * mmap the file, make sure timestamps are sorted, and count
311  * packets and sizes
312  * Timestamps represent the receive time of the packets.
313  * We need to compute also the 'first_ts' which refers to a hypotetical
314  * packet right before the first one, see the code for details.
315  */
316 static struct nm_pcap_file *
readpcap(const char * fn)317 readpcap(const char *fn)
318 {
319     struct nm_pcap_file _f, *pf = &_f;
320     uint64_t prev_ts, first_pkt_time;
321     uint32_t magic, first_len = 0;
322 
323     bzero(pf, sizeof(*pf));
324     pf->fd = open(fn, O_RDONLY);
325     if (pf->fd < 0) {
326 	EEE("cannot open file %s", fn);
327 	return NULL;
328     }
329     /* compute length */
330     pf->filesize = lseek(pf->fd, 0, SEEK_END);
331     lseek(pf->fd, 0, SEEK_SET);
332     ED("filesize is %lu", (u_long)(pf->filesize));
333     if (pf->filesize < sizeof(struct pcap_file_header)) {
334 	EEE("file too short %s", fn);
335 	close(pf->fd);
336 	return NULL;
337     }
338     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
339     if (pf->data == MAP_FAILED) {
340 	EEE("cannot mmap file %s", fn);
341 	close(pf->fd);
342 	return NULL;
343     }
344     pf->cur = pf->data;
345     pf->lim = pf->data + pf->filesize;
346     pf->err = 0;
347     pf->swap = 0; /* default, same endianness when read magic */
348 
349     magic = read_next_info(pf, 4);
350     ED("magic is 0x%x", magic);
351     switch (magic) {
352     case 0xa1b2c3d4: /* native, us resolution */
353 	pf->swap = 0;
354 	pf->resolution = 1000;
355 	break;
356     case 0xd4c3b2a1: /* swapped, us resolution */
357 	pf->swap = 1;
358 	pf->resolution = 1000;
359 	break;
360     case 0xa1b23c4d:	/* native, ns resolution */
361 	pf->swap = 0;
362 	pf->resolution = 1; /* nanoseconds */
363 	break;
364     case 0x4d3cb2a1:	/* swapped, ns resolution */
365 	pf->swap = 1;
366 	pf->resolution = 1; /* nanoseconds */
367 	break;
368     default:
369 	EEE("unknown magic 0x%x", magic);
370 	return NULL;
371     }
372 
373     ED("swap %d res %d\n", pf->swap, pf->resolution);
374     pf->cur = pf->data + sizeof(struct pcap_file_header);
375     pf->lim = pf->data + pf->filesize;
376     pf->err = 0;
377     prev_ts = 0;
378     while (pf->cur < pf->lim && pf->err == 0) {
379 	uint32_t base = pf->cur - pf->data;
380 	uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
381 		read_next_info(pf, 4) * pf->resolution;
382 	uint32_t caplen = read_next_info(pf, 4);
383 	uint32_t len = read_next_info(pf, 4);
384 
385 	if (pf->err) {
386 	    WWW("end of pcap file after %d packets\n",
387 		(int)pf->tot_pkt);
388 	    break;
389 	}
390 	if  (cur_ts < prev_ts) {
391 	    WWW("reordered packet %d\n",
392 		(int)pf->tot_pkt);
393 	}
394 	prev_ts = cur_ts;
395 	(void)base;
396 	if (pf->tot_pkt == 0) {
397 	    pf->first_ts = cur_ts;
398 	    first_len = len;
399 	}
400 	pf->tot_pkt++;
401 	pf->tot_bytes += len;
402 	pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
403 	pf->cur += caplen;
404     }
405     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
406     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
407 	(u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
408 	1e-9*pf->total_tx_time, (u_long)first_len);
409     /*
410      * We determine that based on the
411      * average bandwidth of the trace, as follows
412      *   first_pkt_ts = p[0].len / avg_bw
413      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
414      * so
415      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
416      */
417     if (pf->tot_bytes == first_len) {
418 	/* cannot estimate bandwidth, so force 1 Gbit */
419 	first_pkt_time = first_len * 8; /* * 10^9 / bw */
420     } else {
421 	first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
422     }
423     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
424     pf->total_tx_time += first_pkt_time;
425     pf->first_ts -= first_pkt_time;
426 
427     /* all correct, allocate a record and copy */
428     pf = calloc(1, sizeof(*pf));
429     *pf = _f;
430     /* reset pointer to start */
431     pf->cur = pf->data + sizeof(struct pcap_file_header);
432     pf->err = 0;
433     return pf;
434 }
435 
436 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
437 
438 static int verbose = 0;
439 
440 static int do_abort = 0;
441 
442 #ifdef linux
443 #define cpuset_t        cpu_set_t
444 #endif
445 
446 #ifdef __APPLE__
447 #define cpuset_t        uint64_t        // XXX
CPU_ZERO(cpuset_t * p)448 static inline void CPU_ZERO(cpuset_t *p)
449 {
450         *p = 0;
451 }
452 
CPU_SET(uint32_t i,cpuset_t * p)453 static inline void CPU_SET(uint32_t i, cpuset_t *p)
454 {
455         *p |= 1<< (i & 0x3f);
456 }
457 
458 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
459 #define sched_setscheduler(a, b, c)	(1) /* error */
460 #define clock_gettime(a,b)      \
461         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
462 
463 #define	_P64	unsigned long
464 #endif
465 
466 #ifndef _P64
467 
468 /* we use uint64_t widely, but printf gives trouble on different
469  * platforms so we use _P64 as a cast
470  */
471 #define	_P64	uint64_t
472 #endif /* print stuff */
473 
474 
475 struct _qs;	/* forward */
476 /*
477  * descriptor of a configuration entry.
478  * Each handler has a parse function which takes ac/av[] and returns
479  * true if successful. Any allocated space is stored into struct _cfg *
480  * that is passed as argument.
481  * arg and arg_len are included for convenience.
482  */
483 struct _cfg {
484     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
485     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
486     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
487 
488     const char *optarg;	/* command line argument. Initial value is the error message */
489     /* placeholders for common values */
490     void *arg;		/* allocated memory if any */
491     int arg_len;	/* size of *arg in case a realloc is needed */
492     uint64_t d[16];	/* static storage for simple cases */
493     double f[4];	/* static storage for simple cases */
494 };
495 
496 
497 /*
498  * communication occurs through this data structure, with fields
499  * cache-aligned according to who are the readers/writers.
500  *
501 
502 The queue is an array of memory  (buf) of size buflen (does not change).
503 
504 The producer uses 'tail' as an index in the queue to indicate
505 the first empty location (ie. after the last byte of data),
506 the consumer uses head to indicate the next byte to consume.
507 
508 For best performance we should align buffers and packets
509 to multiples of cacheline, but this would explode memory too much.
510 Worst case memory explosion is with 65 byte packets.
511 Memory usage as shown below:
512 
513 		qpkt-pad
514 	size	32-16	32-32	32-64	64-64
515 
516 	64	96	96	96	128
517 	65	112	128	160	192
518 
519 
520 An empty queue has head == tail, a full queue will have free space
521 below a threshold.  In our case the queue is large enough and we
522 are non blocking so we can simply drop traffic when the queue
523 approaches a full state.
524 
525 To simulate bandwidth limitations efficiently, the producer has a second
526 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
527 
528  */
529 /*
530  * When sizing the buffer, we must assume some value for the bandwidth.
531  * INFINITE_BW is supposed to be faster than what we support
532  */
533 #define INFINITE_BW	(200ULL*1000000*1000)
534 #define	MY_CACHELINE	(128ULL)
535 #define MAX_PKT		(9200)	/* max packet size */
536 
537 #define ALIGN_CACHE	__attribute__ ((aligned (MY_CACHELINE)))
538 
539 struct _qs { /* shared queue */
540 	uint64_t	t0;	/* start of times */
541 
542 	uint64_t 	buflen;	/* queue length */
543 	char *buf;
544 
545 	/* handlers for various options */
546 	struct _cfg	c_delay;
547 	struct _cfg	c_bw;
548 	struct _cfg	c_loss;
549 
550 	/* producer's fields */
551 	uint64_t	tx ALIGN_CACHE;	/* tx counter */
552 	uint64_t	prod_tail_1;	/* head of queue */
553 	uint64_t	prod_head;	/* cached copy */
554 	uint64_t	prod_tail;	/* cached copy */
555 	uint64_t	prod_drop;	/* drop packet count */
556 	uint64_t	prod_max_gap;	/* rx round duration */
557 
558 	struct nm_pcap_file	*pcap;		/* the pcap struct */
559 
560 	/* parameters for reading from the netmap port */
561 	struct nmport_d *src_port;		/* netmap descriptor */
562 	const char *	prod_ifname;	/* interface name or pcap file */
563 	struct netmap_ring *rxring;	/* current ring being handled */
564 	uint32_t	si;		/* ring index */
565 	int		burst;
566 	uint32_t	rx_qmax;	/* stats on max queued */
567 
568 	uint64_t	qt_qout;	/* queue exit time for last packet */
569 		/*
570 		 * when doing shaping, the software computes and stores here
571 		 * the time when the most recently queued packet will exit from
572 		 * the queue.
573 		 */
574 
575 	uint64_t	qt_tx;		/* delay line exit time for last packet */
576 		/*
577 		 * The software computes the time at which the most recently
578 		 * queued packet exits from the queue.
579 		 * To avoid reordering, the next packet should exit at least
580 		 * at qt_tx + cur_tt
581 		 */
582 
583 	/* producer's fields controlling the queueing */
584 	const char *	cur_pkt;	/* current packet being analysed */
585 	uint32_t	cur_len;	/* length of current packet */
586 	uint32_t	cur_caplen;	/* captured length of current packet */
587 
588 	int		cur_drop;	/* 1 if current  packet should be dropped. */
589 		/*
590 		 * cur_drop can be set as a result of the loss emulation,
591 		 * and may need to use the packet size, current time, etc.
592 		 */
593 
594 	uint64_t	cur_tt;		/* transmission time (ns) for current packet */
595 		/*
596 		 * The transmission time is how much link time the packet will consume.
597 		 * should be set by the function that does the bandwidth emulation,
598 		 * but could also be the result of a function that emulates the
599 		 * presence of competing traffic, MAC protocols etc.
600 		 * cur_tt is 0 for links with infinite bandwidth.
601 		 */
602 
603 	uint64_t	cur_delay;	/* delay (ns) for current packet from c_delay.run() */
604 		/*
605 		 * this should be set by the function that computes the extra delay
606 		 * applied to the packet.
607 		 * The code makes sure that there is no reordering and possibly
608 		 * bumps the output time as needed.
609 		 */
610 
611 
612 	/* consumer's fields */
613 	const char *		cons_ifname;
614 	uint64_t rx ALIGN_CACHE;	/* rx counter */
615 	uint64_t	cons_head;	/* cached copy */
616 	uint64_t	cons_tail;	/* cached copy */
617 	uint64_t	cons_now;	/* most recent producer timestamp */
618 	uint64_t	rx_wait;	/* stats */
619 
620 	/* shared fields */
621 	volatile uint64_t _tail ALIGN_CACHE ;	/* producer writes here */
622 	volatile uint64_t _head ALIGN_CACHE ;	/* consumer reads from here */
623 };
624 
625 struct pipe_args {
626 	int		wait_link;
627 
628 	pthread_t	cons_tid;	/* main thread */
629 	pthread_t	prod_tid;	/* producer thread */
630 
631 	/* Affinity: */
632 	int		cons_core;	/* core for cons() */
633 	int		prod_core;	/* core for prod() */
634 
635 	struct nmport_d *pa;		/* netmap descriptor */
636 	struct nmport_d *pb;
637 
638 	struct _qs	q;
639 };
640 
641 #define NS_IN_S	(1000000000ULL)	// nanoseconds
642 #define TIME_UNITS	NS_IN_S
643 /* set the thread affinity. */
644 static int
setaffinity(int i)645 setaffinity(int i)
646 {
647         cpuset_t cpumask;
648 	struct sched_param p;
649 
650         if (i == -1)
651                 return 0;
652 
653         /* Set thread affinity affinity.*/
654         CPU_ZERO(&cpumask);
655         CPU_SET(i, &cpumask);
656 
657         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
658                 WWW("Unable to set affinity: %s", strerror(errno));
659         }
660 	if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
661                 WWW("Unable to set priority: %s", strerror(errno));
662 	}
663 	bzero(&p, sizeof(p));
664 	p.sched_priority = 10; // 99 on linux ?
665 	// use SCHED_RR or SCHED_FIFO
666 	if (sched_setscheduler(0, SCHED_RR, &p)) {
667                 WWW("Unable to set scheduler: %s", strerror(errno));
668 	}
669         return 0;
670 }
671 
672 
673 /*
674  * set the timestamp from the clock, subtract t0
675  */
676 static inline void
set_tns_now(uint64_t * now,uint64_t t0)677 set_tns_now(uint64_t *now, uint64_t t0)
678 {
679     struct timespec t;
680 
681     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
682     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
683     *now -= t0;
684 }
685 
686 
687 
688 /* compare two timestamps */
689 static inline int64_t
ts_cmp(uint64_t a,uint64_t b)690 ts_cmp(uint64_t a, uint64_t b)
691 {
692 	return (int64_t)(a - b);
693 }
694 
695 /* create a packet descriptor */
696 static inline struct q_pkt *
pkt_at(struct _qs * q,uint64_t ofs)697 pkt_at(struct _qs *q, uint64_t ofs)
698 {
699     return (struct q_pkt *)(q->buf + ofs);
700 }
701 
702 
703 /*
704  * we have already checked for room and prepared p->next
705  */
706 static inline int
enq(struct _qs * q)707 enq(struct _qs *q)
708 {
709     struct q_pkt *p = pkt_at(q, q->prod_tail);
710 
711     /* hopefully prefetch has been done ahead */
712     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
713     p->pktlen = q->cur_len;
714     p->pt_qout = q->qt_qout;
715     p->pt_tx = q->qt_tx;
716     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
717     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
718         q->cur_len, (int)q->prod_tail, p->next,
719         1e-9*p->pt_qout, 1e-9*p->pt_tx);
720     q->prod_tail = p->next;
721     q->tx++;
722     return 0;
723 }
724 
725 /*
726  * simple handler for parameters not supplied
727  */
728 static int
null_run_fn(struct _qs * q,struct _cfg * cfg)729 null_run_fn(struct _qs *q, struct _cfg *cfg)
730 {
731     (void)q;
732     (void)cfg;
733     return 0;
734 }
735 
736 
737 
738 /*
739  * put packet data into the buffer.
740  * We read from the mmapped pcap file, construct header, copy
741  * the captured length of the packet and pad with zeroes.
742  */
743 static void *
pcap_prod(void * _pa)744 pcap_prod(void *_pa)
745 {
746     struct pipe_args *pa = _pa;
747     struct _qs *q = &pa->q;
748     struct nm_pcap_file *pf = q->pcap;	/* already opened by readpcap */
749     uint32_t loops, i, tot_pkts;
750 
751     /* data plus the loop record */
752     uint64_t need;
753     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
754 
755     /*
756      * For speed we make sure the trace is at least some 1000 packets,
757      * so we may need to loop the trace more than once (for short traces)
758      */
759     loops = (1 + 10000 / pf->tot_pkt);
760     tot_pkts = loops * pf->tot_pkt;
761     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
762     q->buf = calloc(1, need);
763     if (q->buf == NULL) {
764 	D("alloc %lld bytes for queue failed, exiting",(long long)need);
765 	goto fail;
766     }
767     q->prod_head = q->prod_tail = 0;
768     q->buflen = need;
769 
770     pf->cur = pf->data + sizeof(struct pcap_file_header);
771     pf->err = 0;
772 
773     ED("--- start create %lu packets at tail %d",
774 	(u_long)tot_pkts, (int)q->prod_tail);
775     last_ts = pf->first_ts; /* beginning of the trace */
776 
777     q->qt_qout = 0; /* first packet out of the queue */
778 
779     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
780 	const char *next_pkt; /* in the pcap buffer */
781 	uint64_t cur_ts;
782 
783 	/* read values from the pcap buffer */
784 	cur_ts = read_next_info(pf, 4) * NS_SCALE +
785 		read_next_info(pf, 4) * pf->resolution;
786 	q->cur_caplen = read_next_info(pf, 4);
787 	q->cur_len = read_next_info(pf, 4);
788 	next_pkt = pf->cur + q->cur_caplen;
789 
790 	/* prepare fields in q for the generator */
791 	q->cur_pkt = pf->cur;
792 	/* initial estimate of tx time */
793 	q->cur_tt = cur_ts - last_ts;
794 	    // -pf->first_ts + loops * pf->total_tx_time - last_ts;
795 
796 	if ((i % pf->tot_pkt) == 0)
797 	   ED("insert %5d len %lu cur_tt %.6f",
798 		i, (u_long)q->cur_len, 1e-9*q->cur_tt);
799 
800 	/* prepare for next iteration */
801 	pf->cur = next_pkt;
802 	last_ts = cur_ts;
803 	if (next_pkt == pf->lim) {	//last pkt
804 	    pf->cur = pf->data + sizeof(struct pcap_file_header);
805     	    last_ts = pf->first_ts; /* beginning of the trace */
806 	    loops++;
807 	}
808 
809 	q->c_loss.run(q, &q->c_loss);
810 	if (q->cur_drop)
811 	    continue;
812 	q->c_bw.run(q, &q->c_bw);
813 	tt = q->cur_tt;
814 	q->qt_qout += tt;
815 #if 0
816 	if (drop_after(q))
817 	    continue;
818 #endif
819 	q->c_delay.run(q, &q->c_delay); /* compute delay */
820 	t_tx = q->qt_qout + q->cur_delay;
821 	ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
822 	/* insure no reordering and spacing by transmission time */
823 	q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
824 	enq(q);
825 
826 	q->tx++;
827 	ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
828     }
829     /* loop marker ? */
830     ED("done q->prod_tail:%d",(int)q->prod_tail);
831     q->_tail = q->prod_tail; /* publish */
832 
833     return NULL;
834 fail:
835     if (q->buf != NULL) {
836 	free(q->buf);
837     }
838     nmport_close(pa->pb);
839     return (NULL);
840 }
841 
842 
843 /*
844  * the consumer reads from the queue using head,
845  * advances it every now and then.
846  */
847 static void *
cons(void * _pa)848 cons(void *_pa)
849 {
850     struct pipe_args *pa = _pa;
851     struct _qs *q = &pa->q;
852     int pending = 0;
853     uint64_t last_ts = 0;
854 
855     /* read the start of times in q->t0 */
856     set_tns_now(&q->t0, 0);
857     /* set the time (cons_now) to clock - q->t0 */
858     set_tns_now(&q->cons_now, q->t0);
859     q->cons_head = q->_head;
860     q->cons_tail = q->_tail;
861     while (!do_abort) { /* consumer, infinite */
862 	struct q_pkt *p = pkt_at(q, q->cons_head);
863 
864 	__builtin_prefetch (q->buf + p->next);
865 
866 	if (q->cons_head == q->cons_tail) {	//reset record
867 	    ND("Transmission restarted");
868 	    /*
869 	     * add to q->t0 the time for the last packet
870 	     */
871 	    q->t0 += last_ts;
872 	    set_tns_now(&q->cons_now, q->t0);
873 	    q->cons_head = 0;	//restart from beginning of the queue
874 	    continue;
875 	}
876 	last_ts = p->pt_tx;
877 	if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
878 	    // packet not ready
879 	    q->rx_wait++;
880 	    /* the ioctl should be conditional */
881 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
882 	    pending = 0;
883 	    usleep(20);
884 	    set_tns_now(&q->cons_now, q->t0);
885 	    continue;
886 	}
887 	/* XXX copy is inefficient but simple */
888 	if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
889 	    RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
890 		(int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
891 		(u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
892 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
893 	    pending = 0;
894 	    continue;
895 	}
896 	pending++;
897 	if (pending > q->burst) {
898 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
899 	    pending = 0;
900 	}
901 
902 	q->cons_head = p->next;
903 	/* drain packets from the queue */
904 	q->rx++;
905     }
906     D("exiting on abort");
907     return NULL;
908 }
909 
910 /*
911  * In case of pcap file as input, the program acts in 2 different
912  * phases. It first fill the queue and then starts the cons()
913  */
914 static void *
nmreplay_main(void * _a)915 nmreplay_main(void *_a)
916 {
917     struct pipe_args *a = _a;
918     struct _qs *q = &a->q;
919     const char *cap_fname = q->prod_ifname;
920 
921     setaffinity(a->cons_core);
922     set_tns_now(&q->t0, 0); /* starting reference */
923     if (cap_fname == NULL) {
924 	goto fail;
925     }
926     q->pcap = readpcap(cap_fname);
927     if (q->pcap == NULL) {
928 	EEE("unable to read file %s", cap_fname);
929 	goto fail;
930     }
931     pcap_prod((void*)a);
932     destroy_pcap(q->pcap);
933     q->pcap = NULL;
934     a->pb = nmport_open(q->cons_ifname);
935     if (a->pb == NULL) {
936 	EEE("cannot open netmap on %s", q->cons_ifname);
937 	do_abort = 1; // XXX any better way ?
938 	return NULL;
939     }
940     /* continue as cons() */
941     WWW("prepare to send packets");
942     usleep(1000);
943     cons((void*)a);
944     EEE("exiting on abort");
945 fail:
946     if (q->pcap != NULL) {
947 	destroy_pcap(q->pcap);
948     }
949     do_abort = 1;
950     return NULL;
951 }
952 
953 
954 static void
sigint_h(int sig)955 sigint_h(int sig)
956 {
957 	(void)sig;	/* UNUSED */
958 	do_abort = 1;
959 	signal(SIGINT, SIG_DFL);
960 }
961 
962 
963 
964 static void
usage(void)965 usage(void)
966 {
967 	fprintf(stderr,
968 	    "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
969 	    "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
970 	exit(1);
971 }
972 
973 
974 /*---- configuration handling ---- */
975 /*
976  * support routine: split argument, returns ac and *av.
977  * av contains two extra entries, a NULL and a pointer
978  * to the entire string.
979  */
980 static char **
split_arg(const char * src,int * _ac)981 split_arg(const char *src, int *_ac)
982 {
983     char *my = NULL, **av = NULL;
984     const char *seps = " \t\r\n,";
985     int l, i, ac; /* number of entries */
986 
987     if (!src)
988 	return NULL;
989     l = strlen(src);
990     /* in the first pass we count fields, in the second pass
991      * we allocate the av[] array and a copy of the string
992      * and fill av[]. av[ac] = NULL, av[ac+1]
993      */
994     for (;;) {
995 	i = ac = 0;
996 	ND("start pass %d: <%s>", av ? 1 : 0, my);
997 	while (i < l) {
998 	    /* trim leading separator */
999 	    while (i <l && strchr(seps, src[i]))
1000 		i++;
1001 	    if (i >= l)
1002 		break;
1003 	    ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1004 	    if (av) /* in the second pass, set the result */
1005 		av[ac] = my+i;
1006 	    ac++;
1007 	    /* skip string */
1008 	    while (i <l && !strchr(seps, src[i])) i++;
1009 	    if (av)
1010 		my[i] = '\0'; /* write marker */
1011 	}
1012 	if (!av) { /* end of first pass */
1013 	    ND("ac is %d", ac);
1014 	    av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1015 	    my = (char *)&(av[ac+2]);
1016 	    strcpy(my, src);
1017 	} else {
1018 	    break;
1019 	}
1020     }
1021     for (i = 0; i < ac; i++) {
1022 	NED("%d: <%s>", i, av[i]);
1023     }
1024     av[i++] = NULL;
1025     av[i++] = my;
1026     *_ac = ac;
1027     return av;
1028 }
1029 
1030 
1031 /*
1032  * apply a command against a set of functions,
1033  * install a handler in *dst
1034  */
1035 static int
cmd_apply(const struct _cfg * a,const char * arg,struct _qs * q,struct _cfg * dst)1036 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1037 {
1038 	int ac = 0;
1039 	char **av;
1040 	int i;
1041 
1042 	if (arg == NULL || *arg == '\0')
1043 		return 1; /* no argument may be ok */
1044 	if (a == NULL || dst == NULL) {
1045 		ED("program error - invalid arguments");
1046 		exit(1);
1047 	}
1048 	av = split_arg(arg, &ac);
1049 	if (av == NULL)
1050 		return 1; /* error */
1051 	for (i = 0; a[i].parse; i++) {
1052 		struct _cfg x = a[i];
1053 		const char *errmsg = x.optarg;
1054 		int ret;
1055 
1056 		x.arg = NULL;
1057 		x.arg_len = 0;
1058 		bzero(&x.d, sizeof(x.d));
1059 		ND("apply %s to %s", av[0], errmsg);
1060 		ret = x.parse(q, &x, ac, av);
1061 		if (ret == 2) /* not recognised */
1062 			continue;
1063 		if (ret == 1) {
1064 			ED("invalid arguments: need '%s' have '%s'",
1065 				errmsg, arg);
1066 			break;
1067 		}
1068 		x.optarg = arg;
1069 		*dst = x;
1070 		return 0;
1071 	}
1072 	ED("arguments %s not recognised", arg);
1073 	free(av);
1074 	return 1;
1075 }
1076 
1077 static struct _cfg delay_cfg[];
1078 static struct _cfg bw_cfg[];
1079 static struct _cfg loss_cfg[];
1080 
1081 static uint64_t parse_bw(const char *arg);
1082 
1083 /*
1084  * prodcons [options]
1085  * accept separate sets of arguments for the two directions
1086  *
1087  */
1088 
1089 static void
add_to(const char ** v,int l,const char * arg,const char * msg)1090 add_to(const char ** v, int l, const char *arg, const char *msg)
1091 {
1092 	for (; l > 0 && *v != NULL ; l--, v++);
1093 	if (l == 0) {
1094 		ED("%s %s", msg, arg);
1095 		exit(1);
1096 	}
1097 	*v = arg;
1098 }
1099 
1100 int
main(int argc,char ** argv)1101 main(int argc, char **argv)
1102 {
1103 	int ch, i, err=0;
1104 
1105 #define	N_OPTS	1
1106 	struct pipe_args bp[N_OPTS];
1107 	const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1108 	const char *pcap_file[N_OPTS];
1109 	int cores[4] = { 2, 8, 4, 10 }; /* default values */
1110 
1111 	bzero(&bp, sizeof(bp));	/* all data initially go here */
1112 	bzero(d, sizeof(d));
1113 	bzero(b, sizeof(b));
1114 	bzero(l, sizeof(l));
1115 	bzero(q, sizeof(q));
1116 	bzero(m, sizeof(m));
1117 	bzero(ifname, sizeof(ifname));
1118 	bzero(pcap_file, sizeof(pcap_file));
1119 
1120 
1121 	/* set default values */
1122 	for (i = 0; i < N_OPTS; i++) {
1123 	    struct _qs *qs = &bp[i].q;
1124 
1125 	    qs->burst = 128;
1126 	    qs->c_delay.optarg = "0";
1127 	    qs->c_delay.run = null_run_fn;
1128 	    qs->c_loss.optarg = "0";
1129 	    qs->c_loss.run = null_run_fn;
1130 	    qs->c_bw.optarg = "0";
1131 	    qs->c_bw.run = null_run_fn;
1132 	}
1133 
1134 	// Options:
1135 	// B	bandwidth in bps
1136 	// D	delay in seconds
1137 	// L	loss probability
1138 	// f	pcap file
1139 	// i	interface name
1140 	// w	wait link
1141 	// b	batch size
1142 	// v	verbose
1143 	// C	cpu placement
1144 
1145 	while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1146 		switch (ch) {
1147 		default:
1148 			D("bad option %c %s", ch, optarg);
1149 			usage();
1150 			break;
1151 
1152 		case 'C': /* CPU placement, up to 4 arguments */
1153 			{
1154 				int ac = 0;
1155 				char **av = split_arg(optarg, &ac);
1156 				if (ac == 1) { /* sequential after the first */
1157 					cores[0] = atoi(av[0]);
1158 					cores[1] = cores[0] + 1;
1159 					cores[2] = cores[1] + 1;
1160 					cores[3] = cores[2] + 1;
1161 				} else if (ac == 2) { /* two sequential pairs */
1162 					cores[0] = atoi(av[0]);
1163 					cores[1] = cores[0] + 1;
1164 					cores[2] = atoi(av[1]);
1165 					cores[3] = cores[2] + 1;
1166 				} else if (ac == 4) { /* four values */
1167 					cores[0] = atoi(av[0]);
1168 					cores[1] = atoi(av[1]);
1169 					cores[2] = atoi(av[2]);
1170 					cores[3] = atoi(av[3]);
1171 				} else {
1172 					ED(" -C accepts 1, 2 or 4 comma separated arguments");
1173 					usage();
1174 				}
1175 				if (av)
1176 					free(av);
1177 			}
1178 			break;
1179 
1180 		case 'B': /* bandwidth in bps */
1181 			add_to(b, N_OPTS, optarg, "-B too many times");
1182 			break;
1183 
1184 		case 'D': /* delay in seconds (float) */
1185 			add_to(d, N_OPTS, optarg, "-D too many times");
1186 			break;
1187 
1188 		case 'L': /* loss probability */
1189 			add_to(l, N_OPTS, optarg, "-L too many times");
1190 			break;
1191 
1192 		case 'b':	/* burst */
1193 			bp[0].q.burst = atoi(optarg);
1194 			break;
1195 
1196 		case 'f':	/* pcap_file */
1197 			add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1198 			break;
1199 		case 'i':	/* interface */
1200 			add_to(ifname, N_OPTS, optarg, "-i too many times");
1201 			break;
1202 		case 'v':
1203 			verbose++;
1204 			break;
1205 		case 'w':
1206 			bp[0].wait_link = atoi(optarg);
1207 			break;
1208 		}
1209 
1210 	}
1211 
1212 	argc -= optind;
1213 	argv += optind;
1214 
1215 	/*
1216 	 * consistency checks for common arguments
1217 	 * if pcap file has been provided we need just one interface, two otherwise
1218 	 */
1219 	if (!pcap_file[0]) {
1220 		ED("missing pcap file");
1221 		usage();
1222 	}
1223 	if (!ifname[0]) {
1224 		ED("missing interface");
1225 		usage();
1226 	}
1227 	if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1228 		WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1229 		bp[0].q.burst = 1024; // XXX 128 is probably better
1230 	}
1231 	if (bp[0].wait_link > 100) {
1232 		ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1233 		bp[0].wait_link = 4;
1234 	}
1235 
1236 	bp[0].q.prod_ifname = pcap_file[0];
1237 	bp[0].q.cons_ifname = ifname[0];
1238 
1239 	/* assign cores. prod and cons work better if on the same HT */
1240 	bp[0].cons_core = cores[0];
1241 	bp[0].prod_core = cores[1];
1242 	ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1243 
1244 	/* apply commands */
1245 	for (i = 0; i < N_OPTS; i++) { /* once per queue */
1246 		struct _qs *qs = &bp[i].q;
1247 		err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
1248 		err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
1249 		err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
1250 		if (err != 0)
1251 			exit(1);
1252 	}
1253 
1254 	pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1255 	signal(SIGINT, sigint_h);
1256 	sleep(1);
1257 	while (!do_abort) {
1258 	    struct _qs olda = bp[0].q;
1259 	    struct _qs *q0 = &bp[0].q;
1260 
1261 	    sleep(1);
1262 	    ED("%lld -> %lld maxq %d round %lld",
1263 		(long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1264 		q0->rx_qmax, (long long)q0->prod_max_gap
1265 		);
1266 	    ED("plr nominal %le actual %le",
1267 		(double)(q0->c_loss.d[0])/(1<<24),
1268 		q0->c_loss.d[1] == 0 ? 0 :
1269 		(double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1270 	    bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1271 	    bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1272 	}
1273 	D("exiting on abort");
1274 	sleep(1);
1275 
1276 	return (0);
1277 }
1278 
1279 /* conversion factor for numbers.
1280  * Each entry has a set of characters and conversion factor,
1281  * the first entry should have an empty string and default factor,
1282  * the final entry has s = NULL.
1283  */
1284 struct _sm {	/* string and multiplier */
1285 	const char *s;
1286 	double m;
1287 };
1288 
1289 /*
1290  * parse a generic value
1291  */
1292 static double
parse_gen(const char * arg,const struct _sm * conv,int * err)1293 parse_gen(const char *arg, const struct _sm *conv, int *err)
1294 {
1295 	double d;
1296 	char *ep;
1297 	int dummy;
1298 
1299 	if (err == NULL)
1300 		err = &dummy;
1301 	*err = 0;
1302 	if (arg == NULL)
1303 		goto error;
1304 	d = strtod(arg, &ep);
1305 	if (ep == arg) { /* no value */
1306 		ED("bad argument %s", arg);
1307 		goto error;
1308 	}
1309 	/* special case, no conversion */
1310 	if (conv == NULL && *ep == '\0')
1311 		goto done;
1312 	ND("checking %s [%s]", arg, ep);
1313 	for (;conv->s; conv++) {
1314 		if (strchr(conv->s, *ep))
1315 			goto done;
1316 	}
1317 error:
1318 	*err = 1;	/* unrecognised */
1319 	return 0;
1320 
1321 done:
1322 	if (conv) {
1323 		ND("scale is %s %lf", conv->s, conv->m);
1324 		d *= conv->m; /* apply default conversion */
1325 	}
1326 	ND("returning %lf", d);
1327 	return d;
1328 }
1329 
1330 #define U_PARSE_ERR ~(0ULL)
1331 
1332 /* returns a value in nanoseconds */
1333 static uint64_t
parse_time(const char * arg)1334 parse_time(const char *arg)
1335 {
1336     struct _sm a[] = {
1337 	{"", 1000000000 /* seconds */},
1338 	{"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1339 	{"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1340 	{NULL, 0 /* seconds */}
1341     };
1342     int err;
1343     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1344     return err ? U_PARSE_ERR : ret;
1345 }
1346 
1347 
1348 /*
1349  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1350  */
1351 static uint64_t
parse_bw(const char * arg)1352 parse_bw(const char *arg)
1353 {
1354     struct _sm a[] = {
1355 	{"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1356     };
1357     int err;
1358     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1359     return err ? U_PARSE_ERR : ret;
1360 }
1361 
1362 
1363 /*
1364  * For some function we need random bits.
1365  * This is a wrapper to whatever function you want that returns
1366  * 24 useful random bits.
1367  */
1368 
1369 static inline uint64_t
my_random24(void)1370 my_random24(void)	/* 24 useful bits */
1371 {
1372 	return random() & ((1<<24) - 1);
1373 }
1374 
1375 
1376 /*-------------- user-configuration -----------------*/
1377 
1378 #if 0 /* start of comment block */
1379 
1380 Here we place the functions to implement the various features
1381 of the system. For each feature one should define a struct _cfg
1382 (see at the beginning for definition) that refers a *_parse() function
1383 to extract values from the command line, and a *_run() function
1384 that is invoked on each packet to implement the desired function.
1385 
1386 Examples of the two functions are below. In general:
1387 
1388 - the *_parse() function takes argc/argv[], matches the function
1389   name in argv[0], extracts the operating parameters, allocates memory
1390   if needed, and stores them in the struct _cfg.
1391   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1392   error in the arguments, 0 if all ok.
1393 
1394   On the command line, argv[] is a single, comma separated argument
1395   that follow the specific option eg -D constant,20ms
1396 
1397   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1398   function can use that without having to allocate memory.
1399 
1400 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1401   *q contains all the informatio that may be possibly needed, including
1402   those on the packet currently under processing.
1403   The basic values are the following:
1404 
1405 	char *	 cur_pkt 	points to the current packet (linear buffer)
1406 	uint32_t cur_len;	length of the current packet
1407 		the functions are not supposed to modify these values
1408 
1409 	int	 cur_drop;	true if current packet must be dropped.
1410 		Must be set to non-zero by the loss emulation function
1411 
1412 	uint64_t cur_delay;	delay in nanoseconds for the current packet
1413 		Must be set by the delay emulation function
1414 
1415    More sophisticated functions may need to access other fields in *q,
1416    see the structure description for that.
1417 
1418 When implementing a new function for a feature (e.g. for delay,
1419 bandwidth, loss...) the struct _cfg should be added to the array
1420 that contains all possible options.
1421 
1422 		--- Specific notes ---
1423 
1424 DELAY emulation		-D option_arguments
1425 
1426     If the option is not supplied, the system applies 0 extra delay
1427 
1428     The resolution for times is 1ns, the precision is load dependent and
1429     generally in the order of 20-50us.
1430     Times are in nanoseconds, can be followed by a character specifying
1431     a different unit e.g.
1432 
1433 	n	nanoseconds
1434 	u	microseconds
1435 	m	milliseconds
1436 	s	seconds
1437 
1438     Currently implemented options:
1439 
1440     constant,t		constant delay equal to t
1441 
1442     uniform,tmin,tmax	uniform delay between tmin and tmax
1443 
1444     exp,tavg,tmin	exponential distribution with average tavg
1445 			and minimum tmin (corresponds to an exponential
1446 			distribution with argument 1/(tavg-tmin) )
1447 
1448 
1449 LOSS emulation		-L option_arguments
1450 
1451     Loss is expressed as packet or bit error rate, which is an absolute
1452     number between 0 and 1 (typically small).
1453 
1454     Currently implemented options
1455 
1456     plr,p		uniform packet loss rate p, independent
1457 			of packet size
1458 
1459     burst,p,lmin,lmax 	burst loss with burst probability p and
1460 			burst length uniformly distributed between
1461 			lmin and lmax
1462 
1463     ber,p		uniformly distributed bit error rate p,
1464 			so actual loss prob. depends on size.
1465 
1466 BANDWIDTH emulation	-B option_arguments
1467 
1468     Bandwidths are expressed in bits per second, can be followed by a
1469     character specifying a different unit e.g.
1470 
1471 	b/B	bits per second
1472 	k/K	kbits/s (10^3 bits/s)
1473 	m/M	mbits/s (10^6 bits/s)
1474 	g/G	gbits/s (10^9 bits/s)
1475 
1476     Currently implemented options
1477 
1478     const,b		constant bw, excluding mac framing
1479     ether,b		constant bw, including ethernet framing
1480 			(20 bytes framing + 4 bytes crc)
1481     real,[scale]	use real time, optionally with a scaling factor
1482 
1483 #endif /* end of comment block */
1484 
1485 /*
1486  * Configuration options for delay
1487  */
1488 
1489 /* constant delay, also accepts just a number */
1490 static int
const_delay_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1491 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1492 {
1493 	uint64_t delay;
1494 
1495 	(void)q;
1496 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1497 		return 2; /* unrecognised */
1498 	if (ac > 2)
1499 		return 1; /* error */
1500 	delay = parse_time(av[ac - 1]);
1501 	if (delay == U_PARSE_ERR)
1502 		return 1; /* error */
1503 	dst->d[0] = delay;
1504 	return 0;	/* success */
1505 }
1506 
1507 /* runtime function, store the delay into q->cur_delay */
1508 static int
const_delay_run(struct _qs * q,struct _cfg * arg)1509 const_delay_run(struct _qs *q, struct _cfg *arg)
1510 {
1511 	q->cur_delay = arg->d[0]; /* the delay */
1512 	return 0;
1513 }
1514 
1515 static int
uniform_delay_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1516 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1517 {
1518 	uint64_t dmin, dmax;
1519 
1520 	(void)q;
1521 	if (strcmp(av[0], "uniform") != 0)
1522 		return 2; /* not recognised */
1523 	if (ac != 3)
1524 		return 1; /* error */
1525 	dmin = parse_time(av[1]);
1526 	dmax = parse_time(av[2]);
1527 	if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1528 		return 1;
1529 	D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1530 	dst->d[0] = dmin;
1531 	dst->d[1] = dmax;
1532 	dst->d[2] = dmax - dmin;
1533 	return 0;
1534 }
1535 
1536 static int
uniform_delay_run(struct _qs * q,struct _cfg * arg)1537 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1538 {
1539 	uint64_t x = my_random24();
1540 	q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1541 
1542 	return 0;
1543 }
1544 
1545 /*
1546  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1547  * gives a delay between 0 and infinity with average d_av
1548  * The cumulative function is 1 - d_av exp(-x/d_av)
1549  *
1550  * The inverse function generates a uniform random number p in 0..1
1551  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1552  *
1553  * To speed up behaviour at runtime we tabulate the values
1554  */
1555 
1556 static int
exp_delay_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1557 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1558 {
1559 #define	PTS_D_EXP	512
1560 	uint64_t i, d_av, d_min, *t; /*table of values */
1561 
1562         (void)q;
1563         if (strcmp(av[0], "exp") != 0)
1564 		return 2; /* not recognised */
1565         if (ac != 3)
1566                 return 1; /* error */
1567         d_av = parse_time(av[1]);
1568         d_min = parse_time(av[2]);
1569         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1570                 return 1; /* error */
1571 	d_av -= d_min;
1572 	dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1573 	dst->arg = calloc(1, dst->arg_len);
1574 	if (dst->arg == NULL)
1575 		return 1; /* no memory */
1576 	t = (uint64_t *)dst->arg;
1577 	/* tabulate -ln(1-n)*delay  for n in 0..1 */
1578 	for (i = 0; i < PTS_D_EXP; i++) {
1579 		double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1580 		t[i] = (uint64_t)d;
1581 		ND(5, "%ld: %le", i, d);
1582 	}
1583         return 0;
1584 }
1585 
1586 static int
exp_delay_run(struct _qs * q,struct _cfg * arg)1587 exp_delay_run(struct _qs *q, struct _cfg *arg)
1588 {
1589 	uint64_t *t = (uint64_t *)arg->arg;
1590         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1591 	RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1592         return 0;
1593 }
1594 
1595 
1596 /* unused arguments in configuration */
1597 #define TLEM_CFG_END	NULL, 0, {0}, {0}
1598 
1599 static struct _cfg delay_cfg[] = {
1600 	{ const_delay_parse, const_delay_run,
1601 		"constant,delay", TLEM_CFG_END },
1602 	{ uniform_delay_parse, uniform_delay_run,
1603 		"uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1604 	{ exp_delay_parse, exp_delay_run,
1605 		"exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1606 	{ NULL, NULL, NULL, TLEM_CFG_END }
1607 };
1608 
1609 /* standard bandwidth, also accepts just a number */
1610 static int
const_bw_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1611 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1612 {
1613 	uint64_t bw;
1614 
1615 	(void)q;
1616 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1617 		return 2; /* unrecognised */
1618 	if (ac > 2)
1619 		return 1; /* error */
1620 	bw = parse_bw(av[ac - 1]);
1621 	if (bw == U_PARSE_ERR) {
1622 		return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1623 	}
1624 	dst->d[0] = bw;
1625 	return 0;	/* success */
1626 }
1627 
1628 
1629 /* runtime function, store the delay into q->cur_delay */
1630 static int
const_bw_run(struct _qs * q,struct _cfg * arg)1631 const_bw_run(struct _qs *q, struct _cfg *arg)
1632 {
1633 	uint64_t bps = arg->d[0];
1634 	q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1635 	return 0;
1636 }
1637 
1638 /* ethernet bandwidth, add 672 bits per packet */
1639 static int
ether_bw_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1640 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1641 {
1642 	uint64_t bw;
1643 
1644 	(void)q;
1645 	if (strcmp(av[0], "ether") != 0)
1646 		return 2; /* unrecognised */
1647 	if (ac != 2)
1648 		return 1; /* error */
1649 	bw = parse_bw(av[ac - 1]);
1650 	if (bw == U_PARSE_ERR)
1651 		return 1; /* error */
1652 	dst->d[0] = bw;
1653 	return 0;	/* success */
1654 }
1655 
1656 
1657 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1658 static int
ether_bw_run(struct _qs * q,struct _cfg * arg)1659 ether_bw_run(struct _qs *q, struct _cfg *arg)
1660 {
1661 	uint64_t bps = arg->d[0];
1662 	q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1663 	return 0;
1664 }
1665 
1666 /* real bandwidth, plus scaling factor */
1667 static int
real_bw_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1668 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1669 {
1670 	double scale;
1671 
1672 	(void)q;
1673 	if (strcmp(av[0], "real") != 0)
1674 		return 2; /* unrecognised */
1675 	if (ac > 2) { /* second argument is optional */
1676 		return 1; /* error */
1677 	} else if (ac == 1) {
1678 		scale = 1;
1679 	} else {
1680 		int err = 0;
1681 		scale = parse_gen(av[ac-1], NULL, &err);
1682 		if (err || scale <= 0 || scale > 1000)
1683 			return 1;
1684 	}
1685 	ED("real -> scale is %.6f", scale);
1686 	dst->f[0] = scale;
1687 	return 0;	/* success */
1688 }
1689 
1690 static int
real_bw_run(struct _qs * q,struct _cfg * arg)1691 real_bw_run(struct _qs *q, struct _cfg *arg)
1692 {
1693 	q->cur_tt /= arg->f[0];
1694 	return 0;
1695 }
1696 
1697 static struct _cfg bw_cfg[] = {
1698 	{ const_bw_parse, const_bw_run,
1699 		"constant,bps", TLEM_CFG_END },
1700 	{ ether_bw_parse, ether_bw_run,
1701 		"ether,bps", TLEM_CFG_END },
1702 	{ real_bw_parse, real_bw_run,
1703 		"real,scale", TLEM_CFG_END },
1704 	{ NULL, NULL, NULL, TLEM_CFG_END }
1705 };
1706 
1707 /*
1708  * loss patterns
1709  */
1710 static int
const_plr_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1711 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1712 {
1713 	double plr;
1714 	int err;
1715 
1716 	(void)q;
1717 	if (strcmp(av[0], "plr") != 0 && ac > 1)
1718 		return 2; /* unrecognised */
1719 	if (ac > 2)
1720 		return 1; /* error */
1721 	// XXX to be completed
1722 	plr = parse_gen(av[ac-1], NULL, &err);
1723 	if (err || plr < 0 || plr > 1)
1724 		return 1;
1725 	dst->d[0] = plr * (1<<24); /* scale is 16m */
1726 	if (plr != 0 && dst->d[0] == 0)
1727 		ED("WWW warning,  rounding %le down to 0", plr);
1728 	return 0;	/* success */
1729 }
1730 
1731 static int
const_plr_run(struct _qs * q,struct _cfg * arg)1732 const_plr_run(struct _qs *q, struct _cfg *arg)
1733 {
1734 	(void)arg;
1735 	uint64_t r = my_random24();
1736 	q->cur_drop = r < arg->d[0];
1737 #if 1	/* keep stats */
1738 	arg->d[1]++;
1739 	arg->d[2] += q->cur_drop;
1740 #endif
1741 	return 0;
1742 }
1743 
1744 
1745 /*
1746  * For BER the loss is 1- (1-ber)**bit_len
1747  * The linear approximation is only good for small values, so we
1748  * tabulate (1-ber)**len for various sizes in bytes
1749  */
1750 static int
const_ber_parse(struct _qs * q,struct _cfg * dst,int ac,char * av[])1751 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1752 {
1753 	double ber, ber8, cur;
1754 	int i, err;
1755 	uint32_t *plr;
1756 	const uint32_t mask = (1<<24) - 1;
1757 
1758 	(void)q;
1759 	if (strcmp(av[0], "ber") != 0)
1760 		return 2; /* unrecognised */
1761 	if (ac != 2)
1762 		return 1; /* error */
1763 	ber = parse_gen(av[ac-1], NULL, &err);
1764 	if (err || ber < 0 || ber > 1)
1765 		return 1;
1766 	dst->arg_len = MAX_PKT * sizeof(uint32_t);
1767 	plr = calloc(1, dst->arg_len);
1768 	if (plr == NULL)
1769 		return 1; /* no memory */
1770 	dst->arg = plr;
1771 	ber8 = 1 - ber;
1772 	ber8 *= ber8; /* **2 */
1773 	ber8 *= ber8; /* **4 */
1774 	ber8 *= ber8; /* **8 */
1775 	cur = 1;
1776 	for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1777 		plr[i] = (mask + 1)*(1 - cur);
1778 		if (plr[i] > mask)
1779 			plr[i] = mask;
1780 #if 0
1781 		if (i>= 60) //  && plr[i] < mask/2)
1782 			RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1783 #endif
1784 	}
1785 	dst->d[0] = ber * (mask + 1);
1786 	return 0;	/* success */
1787 }
1788 
1789 static int
const_ber_run(struct _qs * q,struct _cfg * arg)1790 const_ber_run(struct _qs *q, struct _cfg *arg)
1791 {
1792 	int l = q->cur_len;
1793 	uint64_t r = my_random24();
1794 	uint32_t *plr = arg->arg;
1795 
1796 	if (l >= MAX_PKT) {
1797 		RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1798 		l = MAX_PKT-1;
1799 	}
1800 	q->cur_drop = r < plr[l];
1801 #if 1	/* keep stats */
1802 	arg->d[1] += l * 8;
1803 	arg->d[2] += q->cur_drop;
1804 #endif
1805 	return 0;
1806 }
1807 
1808 static struct _cfg loss_cfg[] = {
1809 	{ const_plr_parse, const_plr_run,
1810 		"plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1811 	{ const_ber_parse, const_ber_run,
1812 		"ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1813 	{ NULL, NULL, NULL, TLEM_CFG_END }
1814 };
1815