xref: /freebsd/tools/tools/netmap/nmreplay.c (revision c697fb7f)
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  *
25  * $FreeBSD$
26  */
27 
28 
29 /*
30  * This program implements NMREPLAY, a program to replay a pcap file
31  * enforcing the output rate and possibly random losses and delay
32  * distributions.
33  * It is meant to be run from the command line and implemented with a main
34  * control thread for monitoring, plus a thread to push packets out.
35  *
36  * The control thread parses command line arguments, prepares a
37  * schedule for transmission in a memory buffer and then sits
38  * in a loop where it periodically reads traffic statistics from
39  * the other threads and prints them out on the console.
40  *
41  * The transmit buffer contains headers and packets. Each header
42  * includes a timestamp that determines when the packet should be sent out.
43  * A "consumer" thread cons() reads from the queue and transmits packets
44  * on the output netmap port when their time has come.
45  *
46  * The program does CPU pinning and sets the scheduler and priority
47  * for the "cons" threads. Externally one should do the
48  * assignment of other threads (e.g. interrupt handlers) and
49  * make sure that network interfaces are configured properly.
50  *
51  * --- Main functions of the program ---
52  * within each function, q is used as a pointer to the queue holding
53  * packets and parameters.
54  *
55  * pcap_prod()
56  *
57  *	reads from the pcap file and prepares packets to transmit.
58  *	After reading a packet from the pcap file, the following information
59  *	are extracted which can be used to determine the schedule:
60  *
61  *   	q->cur_pkt	points to the buffer containing the packet
62  *	q->cur_len	packet length, excluding CRC
63  *	q->cur_caplen	available packet length (may be shorter than cur_len)
64  *	q->cur_tt	transmission time for the packet, computed from the trace.
65  *
66  *  The following functions are then called in sequence:
67  *
68  *  q->c_loss (set with the -L command line option) decides
69  *	whether the packet should be dropped before even queuing.
70  *	This is generally useful to emulate random loss.
71  *	The function is supposed to set q->c_drop = 1 if the
72  *	packet should be dropped, or leave it to 0 otherwise.
73  *
74  *   q->c_bw (set with the -B command line option) is used to
75  *      enforce the transmit bandwidth. The function must store
76  *	in q->cur_tt the transmission time (in nanoseconds) of
77  *	the packet, which is typically proportional to the length
78  *	of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
79  *	Variants are possible, eg. to account for constant framing
80  *	bits as on the ethernet, or variable channel acquisition times,
81  *	etc.
82  *	This mechanism can also be used to simulate variable queueing
83  *	delay e.g. due to the presence of cross traffic.
84  *
85  *   q->c_delay (set with the -D option) implements delay emulation.
86  *	The function should set q->cur_delay to the additional
87  *	delay the packet is subject to. The framework will take care of
88  *	computing the actual exit time of a packet so that there is no
89  *	reordering.
90  */
91 
92 // debugging macros
93 #define NED(_fmt, ...)	do {} while (0)
94 #define ED(_fmt, ...)						\
95 	do {							\
96 		struct timeval _t0;				\
97 		gettimeofday(&_t0, NULL);			\
98 		fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
99 		(int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
100 		__FUNCTION__, __LINE__, ##__VA_ARGS__);     \
101 	} while (0)
102 
103 /* WWW is for warnings, EEE is for errors */
104 #define WWW(_fmt, ...)	ED("--WWW-- " _fmt, ##__VA_ARGS__)
105 #define EEE(_fmt, ...)	ED("--EEE-- " _fmt, ##__VA_ARGS__)
106 #define DDD(_fmt, ...)	ED("--DDD-- " _fmt, ##__VA_ARGS__)
107 
108 #define _GNU_SOURCE	// for CPU_SET() etc
109 #include <stdio.h>
110 #define NETMAP_WITH_LIBS
111 #include <net/netmap_user.h>
112 #include <sys/poll.h>
113 
114 
115 /*
116  *
117  * A packet in the queue is q_pkt plus the payload.
118  *
119  * For the packet descriptor we need the following:
120  *
121  *  -	position of next packet in the queue (can go backwards).
122  *	We can reduce to 32 bits if we consider alignments,
123  *	or we just store the length to be added to the current
124  *	value and assume 0 as a special index.
125  *  -	actual packet length (16 bits may be ok)
126  *  -	queue output time, in nanoseconds (64 bits)
127  *  -	delay line output time, in nanoseconds
128  *	One of the two can be packed to a 32bit value
129  *
130  * A convenient coding uses 32 bytes per packet.
131  */
132 
133 struct q_pkt {
134 	uint64_t	next;		/* buffer index for next packet */
135 	uint64_t	pktlen;		/* actual packet len */
136 	uint64_t	pt_qout;	/* time of output from queue */
137 	uint64_t	pt_tx;		/* transmit time */
138 };
139 
140 
141 /*
142  * The header for a pcap file
143  */
144 struct pcap_file_header {
145     uint32_t magic;
146 	/*used to detect the file format itself and the byte
147     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
148     ordering format into this field. The reading application will read either
149     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
150     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
151     will have to be swapped too. For nanosecond-resolution files, the writing
152     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
153     bytes swapped, and the reading application will read either 0xa1b23c4d
154     (identical) or 0x4d3cb2a1 (swapped)*/
155     uint16_t version_major;
156     uint16_t version_minor; /*the version number of this file format */
157     int32_t thiszone;
158 	/*the correction time in seconds between GMT (UTC) and the
159     local timezone of the following packet header timestamps. Examples: If the
160     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
161     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
162     must be -3600*/
163     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
164     uint32_t snaplen;
165 	/*the "snapshot length" for the capture (typically 65535
166     or even more, but might be limited by the user)*/
167     uint32_t network;
168 	/*link-layer header type, specifying the type of headers
169     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
170     types such as 802.11, 802.11 with various radio information, PPP, Token
171     Ring, FDDI, etc.*/
172 };
173 
174 #if 0 /* from pcap.h */
175 struct pcap_file_header {
176         bpf_u_int32 magic;
177         u_short version_major;
178         u_short version_minor;
179         bpf_int32 thiszone;     /* gmt to local correction */
180         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
181         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
182         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
183 };
184 
185 struct pcap_pkthdr {
186         struct timeval ts;      /* time stamp */
187         bpf_u_int32 caplen;     /* length of portion present */
188         bpf_u_int32 len;        /* length this packet (off wire) */
189 };
190 #endif /* from pcap.h */
191 
192 struct pcap_pkthdr {
193     uint32_t ts_sec; /* seconds from epoch */
194     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
195     uint32_t caplen;
196 	/*the number of bytes of packet data actually captured
197     and saved in the file. This value should never become larger than orig_len
198     or the snaplen value of the global header*/
199     uint32_t len;	/* wire length */
200 };
201 
202 
203 #define PKT_PAD         (32)    /* padding on packets */
204 
205 static inline int pad(int x)
206 {
207         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
208 }
209 
210 
211 
212 /*
213  * wrapper around the pcap file.
214  * We mmap the file so it is easy to do multiple passes through it.
215  */
216 struct nm_pcap_file {
217     int fd;
218     uint64_t filesize;
219     const char *data; /* mmapped file */
220 
221     uint64_t tot_pkt;
222     uint64_t tot_bytes;
223     uint64_t tot_bytes_rounded;	/* need hdr + pad(len) */
224     uint32_t resolution; /* 1000 for us, 1 for ns */
225     int swap; /* need to swap fields ? */
226 
227     uint64_t first_ts;
228     uint64_t total_tx_time;
229 	/*
230 	 * total_tx_time is computed as last_ts - first_ts, plus the
231 	 * transmission time for the first packet which in turn is
232 	 * computed according to the average bandwidth
233 	 */
234 
235     uint64_t file_len;
236     const char *cur;	/* running pointer */
237     const char *lim;	/* data + file_len */
238     int err;
239 };
240 
241 static struct nm_pcap_file *readpcap(const char *fn);
242 static void destroy_pcap(struct nm_pcap_file *file);
243 
244 
245 #include <stdio.h>
246 #include <stdlib.h>
247 #include <stdint.h>
248 #include <unistd.h>
249 #include <fcntl.h>
250 #include <string.h> /* memcpy */
251 
252 #include <sys/mman.h>
253 
254 #define NS_SCALE 1000000000UL	/* nanoseconds in 1s */
255 
256 static void destroy_pcap(struct nm_pcap_file *pf)
257 {
258     if (!pf)
259 	return;
260 
261     munmap((void *)(uintptr_t)pf->data, pf->filesize);
262     close(pf->fd);
263     bzero(pf, sizeof(*pf));
264     free(pf);
265     return;
266 }
267 
268 // convert a field of given size if swap is needed.
269 static uint32_t
270 cvt(const void *src, int size, char swap)
271 {
272     uint32_t ret = 0;
273     if (size != 2 && size != 4) {
274 	EEE("Invalid size %d\n", size);
275 	exit(1);
276     }
277     memcpy(&ret, src, size);
278     if (swap) {
279 	unsigned char tmp, *data = (unsigned char *)&ret;
280 	int i;
281         for (i = 0; i < size / 2; i++) {
282             tmp = data[i];
283             data[i] = data[size - (1 + i)];
284             data[size - (1 + i)] = tmp;
285         }
286     }
287     return ret;
288 }
289 
290 static uint32_t
291 read_next_info(struct nm_pcap_file *pf, int size)
292 {
293     const char *end = pf->cur + size;
294     uint32_t ret;
295     if (end > pf->lim) {
296 	pf->err = 1;
297 	ret = 0;
298     } else {
299 	ret = cvt(pf->cur, size, pf->swap);
300 	pf->cur = end;
301     }
302     return ret;
303 }
304 
305 /*
306  * mmap the file, make sure timestamps are sorted, and count
307  * packets and sizes
308  * Timestamps represent the receive time of the packets.
309  * We need to compute also the 'first_ts' which refers to a hypotetical
310  * packet right before the first one, see the code for details.
311  */
312 static struct nm_pcap_file *
313 readpcap(const char *fn)
314 {
315     struct nm_pcap_file _f, *pf = &_f;
316     uint64_t prev_ts, first_pkt_time;
317     uint32_t magic, first_len = 0;
318 
319     bzero(pf, sizeof(*pf));
320     pf->fd = open(fn, O_RDONLY);
321     if (pf->fd < 0) {
322 	EEE("cannot open file %s", fn);
323 	return NULL;
324     }
325     /* compute length */
326     pf->filesize = lseek(pf->fd, 0, SEEK_END);
327     lseek(pf->fd, 0, SEEK_SET);
328     ED("filesize is %lu", (u_long)(pf->filesize));
329     if (pf->filesize < sizeof(struct pcap_file_header)) {
330 	EEE("file too short %s", fn);
331 	close(pf->fd);
332 	return NULL;
333     }
334     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
335     if (pf->data == MAP_FAILED) {
336 	EEE("cannot mmap file %s", fn);
337 	close(pf->fd);
338 	return NULL;
339     }
340     pf->cur = pf->data;
341     pf->lim = pf->data + pf->filesize;
342     pf->err = 0;
343     pf->swap = 0; /* default, same endianness when read magic */
344 
345     magic = read_next_info(pf, 4);
346     ED("magic is 0x%x", magic);
347     switch (magic) {
348     case 0xa1b2c3d4: /* native, us resolution */
349 	pf->swap = 0;
350 	pf->resolution = 1000;
351 	break;
352     case 0xd4c3b2a1: /* swapped, us resolution */
353 	pf->swap = 1;
354 	pf->resolution = 1000;
355 	break;
356     case 0xa1b23c4d:	/* native, ns resolution */
357 	pf->swap = 0;
358 	pf->resolution = 1; /* nanoseconds */
359 	break;
360     case 0x4d3cb2a1:	/* swapped, ns resolution */
361 	pf->swap = 1;
362 	pf->resolution = 1; /* nanoseconds */
363 	break;
364     default:
365 	EEE("unknown magic 0x%x", magic);
366 	return NULL;
367     }
368 
369     ED("swap %d res %d\n", pf->swap, pf->resolution);
370     pf->cur = pf->data + sizeof(struct pcap_file_header);
371     pf->lim = pf->data + pf->filesize;
372     pf->err = 0;
373     prev_ts = 0;
374     while (pf->cur < pf->lim && pf->err == 0) {
375 	uint32_t base = pf->cur - pf->data;
376 	uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
377 		read_next_info(pf, 4) * pf->resolution;
378 	uint32_t caplen = read_next_info(pf, 4);
379 	uint32_t len = read_next_info(pf, 4);
380 
381 	if (pf->err) {
382 	    WWW("end of pcap file after %d packets\n",
383 		(int)pf->tot_pkt);
384 	    break;
385 	}
386 	if  (cur_ts < prev_ts) {
387 	    WWW("reordered packet %d\n",
388 		(int)pf->tot_pkt);
389 	}
390 	prev_ts = cur_ts;
391 	(void)base;
392 	if (pf->tot_pkt == 0) {
393 	    pf->first_ts = cur_ts;
394 	    first_len = len;
395 	}
396 	pf->tot_pkt++;
397 	pf->tot_bytes += len;
398 	pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
399 	pf->cur += caplen;
400     }
401     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
402     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
403 	(u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
404 	1e-9*pf->total_tx_time, (u_long)first_len);
405     /*
406      * We determine that based on the
407      * average bandwidth of the trace, as follows
408      *   first_pkt_ts = p[0].len / avg_bw
409      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
410      * so
411      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
412      */
413     if (pf->tot_bytes == first_len) {
414 	/* cannot estimate bandwidth, so force 1 Gbit */
415 	first_pkt_time = first_len * 8; /* * 10^9 / bw */
416     } else {
417 	first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
418     }
419     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
420     pf->total_tx_time += first_pkt_time;
421     pf->first_ts -= first_pkt_time;
422 
423     /* all correct, allocate a record and copy */
424     pf = calloc(1, sizeof(*pf));
425     *pf = _f;
426     /* reset pointer to start */
427     pf->cur = pf->data + sizeof(struct pcap_file_header);
428     pf->err = 0;
429     return pf;
430 }
431 
432 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
433 
434 int verbose = 0;
435 
436 static int do_abort = 0;
437 
438 #include <stdlib.h>
439 #include <stdio.h>
440 #include <pthread.h>
441 #include <sys/time.h>
442 
443 #include <sys/resource.h> // setpriority
444 
445 #ifdef __FreeBSD__
446 #include <pthread_np.h> /* pthread w/ affinity */
447 #include <sys/cpuset.h> /* cpu_set */
448 #endif /* __FreeBSD__ */
449 
450 #ifdef linux
451 #define cpuset_t        cpu_set_t
452 #endif
453 
454 #ifdef __APPLE__
455 #define cpuset_t        uint64_t        // XXX
456 static inline void CPU_ZERO(cpuset_t *p)
457 {
458         *p = 0;
459 }
460 
461 static inline void CPU_SET(uint32_t i, cpuset_t *p)
462 {
463         *p |= 1<< (i & 0x3f);
464 }
465 
466 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
467 #define sched_setscheduler(a, b, c)	(1) /* error */
468 #define clock_gettime(a,b)      \
469         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
470 
471 #define	_P64	unsigned long
472 #endif
473 
474 #ifndef _P64
475 
476 /* we use uint64_t widely, but printf gives trouble on different
477  * platforms so we use _P64 as a cast
478  */
479 #define	_P64	uint64_t
480 #endif /* print stuff */
481 
482 
483 struct _qs;	/* forward */
484 /*
485  * descriptor of a configuration entry.
486  * Each handler has a parse function which takes ac/av[] and returns
487  * true if successful. Any allocated space is stored into struct _cfg *
488  * that is passed as argument.
489  * arg and arg_len are included for convenience.
490  */
491 struct _cfg {
492     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
493     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
494     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
495 
496     const char *optarg;	/* command line argument. Initial value is the error message */
497     /* placeholders for common values */
498     void *arg;		/* allocated memory if any */
499     int arg_len;	/* size of *arg in case a realloc is needed */
500     uint64_t d[16];	/* static storage for simple cases */
501     double f[4];	/* static storage for simple cases */
502 };
503 
504 
505 /*
506  * communication occurs through this data structure, with fields
507  * cache-aligned according to who are the readers/writers.
508  *
509 
510 The queue is an array of memory  (buf) of size buflen (does not change).
511 
512 The producer uses 'tail' as an index in the queue to indicate
513 the first empty location (ie. after the last byte of data),
514 the consumer uses head to indicate the next byte to consume.
515 
516 For best performance we should align buffers and packets
517 to multiples of cacheline, but this would explode memory too much.
518 Worst case memory explosion is with 65 byte packets.
519 Memory usage as shown below:
520 
521 		qpkt-pad
522 	size	32-16	32-32	32-64	64-64
523 
524 	64	96	96	96	128
525 	65	112	128	160	192
526 
527 
528 An empty queue has head == tail, a full queue will have free space
529 below a threshold.  In our case the queue is large enough and we
530 are non blocking so we can simply drop traffic when the queue
531 approaches a full state.
532 
533 To simulate bandwidth limitations efficiently, the producer has a second
534 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
535 
536  */
537 /*
538  * When sizing the buffer, we must assume some value for the bandwidth.
539  * INFINITE_BW is supposed to be faster than what we support
540  */
541 #define INFINITE_BW	(200ULL*1000000*1000)
542 #define	MY_CACHELINE	(128ULL)
543 #define MAX_PKT		(9200)	/* max packet size */
544 
545 #define ALIGN_CACHE	__attribute__ ((aligned (MY_CACHELINE)))
546 
547 struct _qs { /* shared queue */
548 	uint64_t	t0;	/* start of times */
549 
550 	uint64_t 	buflen;	/* queue length */
551 	char *buf;
552 
553 	/* handlers for various options */
554 	struct _cfg	c_delay;
555 	struct _cfg	c_bw;
556 	struct _cfg	c_loss;
557 
558 	/* producer's fields */
559 	uint64_t	tx ALIGN_CACHE;	/* tx counter */
560 	uint64_t	prod_tail_1;	/* head of queue */
561 	uint64_t	prod_head;	/* cached copy */
562 	uint64_t	prod_tail;	/* cached copy */
563 	uint64_t	prod_drop;	/* drop packet count */
564 	uint64_t	prod_max_gap;	/* rx round duration */
565 
566 	struct nm_pcap_file	*pcap;		/* the pcap struct */
567 
568 	/* parameters for reading from the netmap port */
569 	struct nm_desc *src_port;		/* netmap descriptor */
570 	const char *	prod_ifname;	/* interface name or pcap file */
571 	struct netmap_ring *rxring;	/* current ring being handled */
572 	uint32_t	si;		/* ring index */
573 	int		burst;
574 	uint32_t	rx_qmax;	/* stats on max queued */
575 
576 	uint64_t	qt_qout;	/* queue exit time for last packet */
577 		/*
578 		 * when doing shaping, the software computes and stores here
579 		 * the time when the most recently queued packet will exit from
580 		 * the queue.
581 		 */
582 
583 	uint64_t	qt_tx;		/* delay line exit time for last packet */
584 		/*
585 		 * The software computes the time at which the most recently
586 		 * queued packet exits from the queue.
587 		 * To avoid reordering, the next packet should exit at least
588 		 * at qt_tx + cur_tt
589 		 */
590 
591 	/* producer's fields controlling the queueing */
592 	const char *	cur_pkt;	/* current packet being analysed */
593 	uint32_t	cur_len;	/* length of current packet */
594 	uint32_t	cur_caplen;	/* captured length of current packet */
595 
596 	int		cur_drop;	/* 1 if current  packet should be dropped. */
597 		/*
598 		 * cur_drop can be set as a result of the loss emulation,
599 		 * and may need to use the packet size, current time, etc.
600 		 */
601 
602 	uint64_t	cur_tt;		/* transmission time (ns) for current packet */
603 		/*
604 		 * The transmission time is how much link time the packet will consume.
605 		 * should be set by the function that does the bandwidth emulation,
606 		 * but could also be the result of a function that emulates the
607 		 * presence of competing traffic, MAC protocols etc.
608 		 * cur_tt is 0 for links with infinite bandwidth.
609 		 */
610 
611 	uint64_t	cur_delay;	/* delay (ns) for current packet from c_delay.run() */
612 		/*
613 		 * this should be set by the function that computes the extra delay
614 		 * applied to the packet.
615 		 * The code makes sure that there is no reordering and possibly
616 		 * bumps the output time as needed.
617 		 */
618 
619 
620 	/* consumer's fields */
621 	const char *		cons_ifname;
622 	uint64_t rx ALIGN_CACHE;	/* rx counter */
623 	uint64_t	cons_head;	/* cached copy */
624 	uint64_t	cons_tail;	/* cached copy */
625 	uint64_t	cons_now;	/* most recent producer timestamp */
626 	uint64_t	rx_wait;	/* stats */
627 
628 	/* shared fields */
629 	volatile uint64_t _tail ALIGN_CACHE ;	/* producer writes here */
630 	volatile uint64_t _head ALIGN_CACHE ;	/* consumer reads from here */
631 };
632 
633 struct pipe_args {
634 	int		wait_link;
635 
636 	pthread_t	cons_tid;	/* main thread */
637 	pthread_t	prod_tid;	/* producer thread */
638 
639 	/* Affinity: */
640 	int		cons_core;	/* core for cons() */
641 	int		prod_core;	/* core for prod() */
642 
643 	struct nm_desc *pa;		/* netmap descriptor */
644 	struct nm_desc *pb;
645 
646 	struct _qs	q;
647 };
648 
649 #define NS_IN_S	(1000000000ULL)	// nanoseconds
650 #define TIME_UNITS	NS_IN_S
651 /* set the thread affinity. */
652 static int
653 setaffinity(int i)
654 {
655         cpuset_t cpumask;
656 	struct sched_param p;
657 
658         if (i == -1)
659                 return 0;
660 
661         /* Set thread affinity affinity.*/
662         CPU_ZERO(&cpumask);
663         CPU_SET(i, &cpumask);
664 
665         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
666                 WWW("Unable to set affinity: %s", strerror(errno));
667         }
668 	if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
669                 WWW("Unable to set priority: %s", strerror(errno));
670 	}
671 	bzero(&p, sizeof(p));
672 	p.sched_priority = 10; // 99 on linux ?
673 	// use SCHED_RR or SCHED_FIFO
674 	if (sched_setscheduler(0, SCHED_RR, &p)) {
675                 WWW("Unable to set scheduler: %s", strerror(errno));
676 	}
677         return 0;
678 }
679 
680 
681 /*
682  * set the timestamp from the clock, subtract t0
683  */
684 static inline void
685 set_tns_now(uint64_t *now, uint64_t t0)
686 {
687     struct timespec t;
688 
689     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
690     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
691     *now -= t0;
692 }
693 
694 
695 
696 /* compare two timestamps */
697 static inline int64_t
698 ts_cmp(uint64_t a, uint64_t b)
699 {
700 	return (int64_t)(a - b);
701 }
702 
703 /* create a packet descriptor */
704 static inline struct q_pkt *
705 pkt_at(struct _qs *q, uint64_t ofs)
706 {
707     return (struct q_pkt *)(q->buf + ofs);
708 }
709 
710 
711 /*
712  * we have already checked for room and prepared p->next
713  */
714 static inline int
715 enq(struct _qs *q)
716 {
717     struct q_pkt *p = pkt_at(q, q->prod_tail);
718 
719     /* hopefully prefetch has been done ahead */
720     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
721     p->pktlen = q->cur_len;
722     p->pt_qout = q->qt_qout;
723     p->pt_tx = q->qt_tx;
724     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
725     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
726         q->cur_len, (int)q->prod_tail, p->next,
727         1e-9*p->pt_qout, 1e-9*p->pt_tx);
728     q->prod_tail = p->next;
729     q->tx++;
730     return 0;
731 }
732 
733 /*
734  * simple handler for parameters not supplied
735  */
736 static int
737 null_run_fn(struct _qs *q, struct _cfg *cfg)
738 {
739     (void)q;
740     (void)cfg;
741     return 0;
742 }
743 
744 
745 
746 /*
747  * put packet data into the buffer.
748  * We read from the mmapped pcap file, construct header, copy
749  * the captured length of the packet and pad with zeroes.
750  */
751 static void *
752 pcap_prod(void *_pa)
753 {
754     struct pipe_args *pa = _pa;
755     struct _qs *q = &pa->q;
756     struct nm_pcap_file *pf = q->pcap;	/* already opened by readpcap */
757     uint32_t loops, i, tot_pkts;
758 
759     /* data plus the loop record */
760     uint64_t need;
761     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
762 
763     /*
764      * For speed we make sure the trace is at least some 1000 packets,
765      * so we may need to loop the trace more than once (for short traces)
766      */
767     loops = (1 + 10000 / pf->tot_pkt);
768     tot_pkts = loops * pf->tot_pkt;
769     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
770     q->buf = calloc(1, need);
771     if (q->buf == NULL) {
772 	D("alloc %lld bytes for queue failed, exiting",(long long)need);
773 	goto fail;
774     }
775     q->prod_head = q->prod_tail = 0;
776     q->buflen = need;
777 
778     pf->cur = pf->data + sizeof(struct pcap_file_header);
779     pf->err = 0;
780 
781     ED("--- start create %lu packets at tail %d",
782 	(u_long)tot_pkts, (int)q->prod_tail);
783     last_ts = pf->first_ts; /* beginning of the trace */
784 
785     q->qt_qout = 0; /* first packet out of the queue */
786 
787     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
788 	const char *next_pkt; /* in the pcap buffer */
789 	uint64_t cur_ts;
790 
791 	/* read values from the pcap buffer */
792 	cur_ts = read_next_info(pf, 4) * NS_SCALE +
793 		read_next_info(pf, 4) * pf->resolution;
794 	q->cur_caplen = read_next_info(pf, 4);
795 	q->cur_len = read_next_info(pf, 4);
796 	next_pkt = pf->cur + q->cur_caplen;
797 
798 	/* prepare fields in q for the generator */
799 	q->cur_pkt = pf->cur;
800 	/* initial estimate of tx time */
801 	q->cur_tt = cur_ts - last_ts;
802 	    // -pf->first_ts + loops * pf->total_tx_time - last_ts;
803 
804 	if ((i % pf->tot_pkt) == 0)
805 	   ED("insert %5d len %lu cur_tt %.6f",
806 		i, (u_long)q->cur_len, 1e-9*q->cur_tt);
807 
808 	/* prepare for next iteration */
809 	pf->cur = next_pkt;
810 	last_ts = cur_ts;
811 	if (next_pkt == pf->lim) {	//last pkt
812 	    pf->cur = pf->data + sizeof(struct pcap_file_header);
813     	    last_ts = pf->first_ts; /* beginning of the trace */
814 	    loops++;
815 	}
816 
817 	q->c_loss.run(q, &q->c_loss);
818 	if (q->cur_drop)
819 	    continue;
820 	q->c_bw.run(q, &q->c_bw);
821 	tt = q->cur_tt;
822 	q->qt_qout += tt;
823 #if 0
824 	if (drop_after(q))
825 	    continue;
826 #endif
827 	q->c_delay.run(q, &q->c_delay); /* compute delay */
828 	t_tx = q->qt_qout + q->cur_delay;
829 	ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
830 	/* insure no reordering and spacing by transmission time */
831 	q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
832 	enq(q);
833 
834 	q->tx++;
835 	ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
836     }
837     /* loop marker ? */
838     ED("done q->prod_tail:%d",(int)q->prod_tail);
839     q->_tail = q->prod_tail; /* publish */
840 
841     return NULL;
842 fail:
843     if (q->buf != NULL) {
844 	free(q->buf);
845     }
846     nm_close(pa->pb);
847     return (NULL);
848 }
849 
850 
851 /*
852  * the consumer reads from the queue using head,
853  * advances it every now and then.
854  */
855 static void *
856 cons(void *_pa)
857 {
858     struct pipe_args *pa = _pa;
859     struct _qs *q = &pa->q;
860     int pending = 0;
861     uint64_t last_ts = 0;
862 
863     /* read the start of times in q->t0 */
864     set_tns_now(&q->t0, 0);
865     /* set the time (cons_now) to clock - q->t0 */
866     set_tns_now(&q->cons_now, q->t0);
867     q->cons_head = q->_head;
868     q->cons_tail = q->_tail;
869     while (!do_abort) { /* consumer, infinite */
870 	struct q_pkt *p = pkt_at(q, q->cons_head);
871 
872 	__builtin_prefetch (q->buf + p->next);
873 
874 	if (q->cons_head == q->cons_tail) {	//reset record
875 	    ND("Transmission restarted");
876 	    /*
877 	     * add to q->t0 the time for the last packet
878 	     */
879 	    q->t0 += last_ts;
880 	    set_tns_now(&q->cons_now, q->t0);
881 	    q->cons_head = 0;	//restart from beginning of the queue
882 	    continue;
883 	}
884 	last_ts = p->pt_tx;
885 	if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
886 	    // packet not ready
887 	    q->rx_wait++;
888 	    /* the ioctl should be conditional */
889 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
890 	    pending = 0;
891 	    usleep(20);
892 	    set_tns_now(&q->cons_now, q->t0);
893 	    continue;
894 	}
895 	/* XXX copy is inefficient but simple */
896 	if (nm_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
897 	    RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
898 		(int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
899 		(u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
900 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
901 	    pending = 0;
902 	    continue;
903 	}
904 	pending++;
905 	if (pending > q->burst) {
906 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
907 	    pending = 0;
908 	}
909 
910 	q->cons_head = p->next;
911 	/* drain packets from the queue */
912 	q->rx++;
913     }
914     D("exiting on abort");
915     return NULL;
916 }
917 
918 /*
919  * In case of pcap file as input, the program acts in 2 different
920  * phases. It first fill the queue and then starts the cons()
921  */
922 static void *
923 nmreplay_main(void *_a)
924 {
925     struct pipe_args *a = _a;
926     struct _qs *q = &a->q;
927     const char *cap_fname = q->prod_ifname;
928 
929     setaffinity(a->cons_core);
930     set_tns_now(&q->t0, 0); /* starting reference */
931     if (cap_fname == NULL) {
932 	goto fail;
933     }
934     q->pcap = readpcap(cap_fname);
935     if (q->pcap == NULL) {
936 	EEE("unable to read file %s", cap_fname);
937 	goto fail;
938     }
939     pcap_prod((void*)a);
940     destroy_pcap(q->pcap);
941     q->pcap = NULL;
942     a->pb = nm_open(q->cons_ifname, NULL, 0, NULL);
943     if (a->pb == NULL) {
944 	EEE("cannot open netmap on %s", q->cons_ifname);
945 	do_abort = 1; // XXX any better way ?
946 	return NULL;
947     }
948     /* continue as cons() */
949     WWW("prepare to send packets");
950     usleep(1000);
951     cons((void*)a);
952     EEE("exiting on abort");
953 fail:
954     if (q->pcap != NULL) {
955 	destroy_pcap(q->pcap);
956     }
957     do_abort = 1;
958     return NULL;
959 }
960 
961 
962 static void
963 sigint_h(int sig)
964 {
965 	(void)sig;	/* UNUSED */
966 	do_abort = 1;
967 	signal(SIGINT, SIG_DFL);
968 }
969 
970 
971 
972 static void
973 usage(void)
974 {
975 	fprintf(stderr,
976 	    "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
977 	    "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
978 	exit(1);
979 }
980 
981 
982 /*---- configuration handling ---- */
983 /*
984  * support routine: split argument, returns ac and *av.
985  * av contains two extra entries, a NULL and a pointer
986  * to the entire string.
987  */
988 static char **
989 split_arg(const char *src, int *_ac)
990 {
991     char *my = NULL, **av = NULL, *seps = " \t\r\n,";
992     int l, i, ac; /* number of entries */
993 
994     if (!src)
995 	return NULL;
996     l = strlen(src);
997     /* in the first pass we count fields, in the second pass
998      * we allocate the av[] array and a copy of the string
999      * and fill av[]. av[ac] = NULL, av[ac+1]
1000      */
1001     for (;;) {
1002 	i = ac = 0;
1003 	ND("start pass %d: <%s>", av ? 1 : 0, my);
1004 	while (i < l) {
1005 	    /* trim leading separator */
1006 	    while (i <l && strchr(seps, src[i]))
1007 		i++;
1008 	    if (i >= l)
1009 		break;
1010 	    ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1011 	    if (av) /* in the second pass, set the result */
1012 		av[ac] = my+i;
1013 	    ac++;
1014 	    /* skip string */
1015 	    while (i <l && !strchr(seps, src[i])) i++;
1016 	    if (av)
1017 		my[i] = '\0'; /* write marker */
1018 	}
1019 	if (!av) { /* end of first pass */
1020 	    ND("ac is %d", ac);
1021 	    av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1022 	    my = (char *)&(av[ac+2]);
1023 	    strcpy(my, src);
1024 	} else {
1025 	    break;
1026 	}
1027     }
1028     for (i = 0; i < ac; i++) {
1029 	NED("%d: <%s>", i, av[i]);
1030     }
1031     av[i++] = NULL;
1032     av[i++] = my;
1033     *_ac = ac;
1034     return av;
1035 }
1036 
1037 
1038 /*
1039  * apply a command against a set of functions,
1040  * install a handler in *dst
1041  */
1042 static int
1043 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1044 {
1045 	int ac = 0;
1046 	char **av;
1047 	int i;
1048 
1049 	if (arg == NULL || *arg == '\0')
1050 		return 1; /* no argument may be ok */
1051 	if (a == NULL || dst == NULL) {
1052 		ED("program error - invalid arguments");
1053 		exit(1);
1054 	}
1055 	av = split_arg(arg, &ac);
1056 	if (av == NULL)
1057 		return 1; /* error */
1058 	for (i = 0; a[i].parse; i++) {
1059 		struct _cfg x = a[i];
1060 		const char *errmsg = x.optarg;
1061 		int ret;
1062 
1063 		x.arg = NULL;
1064 		x.arg_len = 0;
1065 		bzero(&x.d, sizeof(x.d));
1066 		ND("apply %s to %s", av[0], errmsg);
1067 		ret = x.parse(q, &x, ac, av);
1068 		if (ret == 2) /* not recognised */
1069 			continue;
1070 		if (ret == 1) {
1071 			ED("invalid arguments: need '%s' have '%s'",
1072 				errmsg, arg);
1073 			break;
1074 		}
1075 		x.optarg = arg;
1076 		*dst = x;
1077 		return 0;
1078 	}
1079 	ED("arguments %s not recognised", arg);
1080 	free(av);
1081 	return 1;
1082 }
1083 
1084 static struct _cfg delay_cfg[];
1085 static struct _cfg bw_cfg[];
1086 static struct _cfg loss_cfg[];
1087 
1088 static uint64_t parse_bw(const char *arg);
1089 
1090 /*
1091  * prodcons [options]
1092  * accept separate sets of arguments for the two directions
1093  *
1094  */
1095 
1096 static void
1097 add_to(const char ** v, int l, const char *arg, const char *msg)
1098 {
1099 	for (; l > 0 && *v != NULL ; l--, v++);
1100 	if (l == 0) {
1101 		ED("%s %s", msg, arg);
1102 		exit(1);
1103 	}
1104 	*v = arg;
1105 }
1106 
1107 int
1108 main(int argc, char **argv)
1109 {
1110 	int ch, i, err=0;
1111 
1112 #define	N_OPTS	1
1113 	struct pipe_args bp[N_OPTS];
1114 	const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1115 	const char *pcap_file[N_OPTS];
1116 	int cores[4] = { 2, 8, 4, 10 }; /* default values */
1117 
1118 	bzero(&bp, sizeof(bp));	/* all data initially go here */
1119 	bzero(d, sizeof(d));
1120 	bzero(b, sizeof(b));
1121 	bzero(l, sizeof(l));
1122 	bzero(q, sizeof(q));
1123 	bzero(m, sizeof(m));
1124 	bzero(ifname, sizeof(ifname));
1125 	bzero(pcap_file, sizeof(pcap_file));
1126 
1127 
1128 	/* set default values */
1129 	for (i = 0; i < N_OPTS; i++) {
1130 	    struct _qs *q = &bp[i].q;
1131 
1132 	    q->burst = 128;
1133 	    q->c_delay.optarg = "0";
1134 	    q->c_delay.run = null_run_fn;
1135 	    q->c_loss.optarg = "0";
1136 	    q->c_loss.run = null_run_fn;
1137 	    q->c_bw.optarg = "0";
1138 	    q->c_bw.run = null_run_fn;
1139 	}
1140 
1141 	// Options:
1142 	// B	bandwidth in bps
1143 	// D	delay in seconds
1144 	// L	loss probability
1145 	// f	pcap file
1146 	// i	interface name
1147 	// w	wait link
1148 	// b	batch size
1149 	// v	verbose
1150 	// C	cpu placement
1151 
1152 	while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1153 		switch (ch) {
1154 		default:
1155 			D("bad option %c %s", ch, optarg);
1156 			usage();
1157 			break;
1158 
1159 		case 'C': /* CPU placement, up to 4 arguments */
1160 			{
1161 				int ac = 0;
1162 				char **av = split_arg(optarg, &ac);
1163 				if (ac == 1) { /* sequential after the first */
1164 					cores[0] = atoi(av[0]);
1165 					cores[1] = cores[0] + 1;
1166 					cores[2] = cores[1] + 1;
1167 					cores[3] = cores[2] + 1;
1168 				} else if (ac == 2) { /* two sequential pairs */
1169 					cores[0] = atoi(av[0]);
1170 					cores[1] = cores[0] + 1;
1171 					cores[2] = atoi(av[1]);
1172 					cores[3] = cores[2] + 1;
1173 				} else if (ac == 4) { /* four values */
1174 					cores[0] = atoi(av[0]);
1175 					cores[1] = atoi(av[1]);
1176 					cores[2] = atoi(av[2]);
1177 					cores[3] = atoi(av[3]);
1178 				} else {
1179 					ED(" -C accepts 1, 2 or 4 comma separated arguments");
1180 					usage();
1181 				}
1182 				if (av)
1183 					free(av);
1184 			}
1185 			break;
1186 
1187 		case 'B': /* bandwidth in bps */
1188 			add_to(b, N_OPTS, optarg, "-B too many times");
1189 			break;
1190 
1191 		case 'D': /* delay in seconds (float) */
1192 			add_to(d, N_OPTS, optarg, "-D too many times");
1193 			break;
1194 
1195 		case 'L': /* loss probability */
1196 			add_to(l, N_OPTS, optarg, "-L too many times");
1197 			break;
1198 
1199 		case 'b':	/* burst */
1200 			bp[0].q.burst = atoi(optarg);
1201 			break;
1202 
1203 		case 'f':	/* pcap_file */
1204 			add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1205 			break;
1206 		case 'i':	/* interface */
1207 			add_to(ifname, N_OPTS, optarg, "-i too many times");
1208 			break;
1209 		case 'v':
1210 			verbose++;
1211 			break;
1212 		case 'w':
1213 			bp[0].wait_link = atoi(optarg);
1214 			break;
1215 		}
1216 
1217 	}
1218 
1219 	argc -= optind;
1220 	argv += optind;
1221 
1222 	/*
1223 	 * consistency checks for common arguments
1224 	 * if pcap file has been provided we need just one interface, two otherwise
1225 	 */
1226 	if (!pcap_file[0]) {
1227 		ED("missing pcap file");
1228 		usage();
1229 	}
1230 	if (!ifname[0]) {
1231 		ED("missing interface");
1232 		usage();
1233 	}
1234 	if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1235 		WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1236 		bp[0].q.burst = 1024; // XXX 128 is probably better
1237 	}
1238 	if (bp[0].wait_link > 100) {
1239 		ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1240 		bp[0].wait_link = 4;
1241 	}
1242 
1243 	bp[0].q.prod_ifname = pcap_file[0];
1244 	bp[0].q.cons_ifname = ifname[0];
1245 
1246 	/* assign cores. prod and cons work better if on the same HT */
1247 	bp[0].cons_core = cores[0];
1248 	bp[0].prod_core = cores[1];
1249 	ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1250 
1251 	/* apply commands */
1252 	for (i = 0; i < N_OPTS; i++) { /* once per queue */
1253 		struct _qs *q = &bp[i].q;
1254 		err += cmd_apply(delay_cfg, d[i], q, &q->c_delay);
1255 		err += cmd_apply(bw_cfg, b[i], q, &q->c_bw);
1256 		err += cmd_apply(loss_cfg, l[i], q, &q->c_loss);
1257 	}
1258 
1259 	pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1260 	signal(SIGINT, sigint_h);
1261 	sleep(1);
1262 	while (!do_abort) {
1263 	    struct _qs olda = bp[0].q;
1264 	    struct _qs *q0 = &bp[0].q;
1265 
1266 	    sleep(1);
1267 	    ED("%lld -> %lld maxq %d round %lld",
1268 		(long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1269 		q0->rx_qmax, (long long)q0->prod_max_gap
1270 		);
1271 	    ED("plr nominal %le actual %le",
1272 		(double)(q0->c_loss.d[0])/(1<<24),
1273 		q0->c_loss.d[1] == 0 ? 0 :
1274 		(double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1275 	    bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1276 	    bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1277 	}
1278 	D("exiting on abort");
1279 	sleep(1);
1280 
1281 	return (0);
1282 }
1283 
1284 /* conversion factor for numbers.
1285  * Each entry has a set of characters and conversion factor,
1286  * the first entry should have an empty string and default factor,
1287  * the final entry has s = NULL.
1288  */
1289 struct _sm {	/* string and multiplier */
1290 	char *s;
1291 	double m;
1292 };
1293 
1294 /*
1295  * parse a generic value
1296  */
1297 static double
1298 parse_gen(const char *arg, const struct _sm *conv, int *err)
1299 {
1300 	double d;
1301 	char *ep;
1302 	int dummy;
1303 
1304 	if (err == NULL)
1305 		err = &dummy;
1306 	*err = 0;
1307 	if (arg == NULL)
1308 		goto error;
1309 	d = strtod(arg, &ep);
1310 	if (ep == arg) { /* no value */
1311 		ED("bad argument %s", arg);
1312 		goto error;
1313 	}
1314 	/* special case, no conversion */
1315 	if (conv == NULL && *ep == '\0')
1316 		goto done;
1317 	ND("checking %s [%s]", arg, ep);
1318 	for (;conv->s; conv++) {
1319 		if (strchr(conv->s, *ep))
1320 			goto done;
1321 	}
1322 error:
1323 	*err = 1;	/* unrecognised */
1324 	return 0;
1325 
1326 done:
1327 	if (conv) {
1328 		ND("scale is %s %lf", conv->s, conv->m);
1329 		d *= conv->m; /* apply default conversion */
1330 	}
1331 	ND("returning %lf", d);
1332 	return d;
1333 }
1334 
1335 #define U_PARSE_ERR ~(0ULL)
1336 
1337 /* returns a value in nanoseconds */
1338 static uint64_t
1339 parse_time(const char *arg)
1340 {
1341     struct _sm a[] = {
1342 	{"", 1000000000 /* seconds */},
1343 	{"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1344 	{"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1345 	{NULL, 0 /* seconds */}
1346     };
1347     int err;
1348     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1349     return err ? U_PARSE_ERR : ret;
1350 }
1351 
1352 
1353 /*
1354  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1355  */
1356 static uint64_t
1357 parse_bw(const char *arg)
1358 {
1359     struct _sm a[] = {
1360 	{"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1361     };
1362     int err;
1363     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1364     return err ? U_PARSE_ERR : ret;
1365 }
1366 
1367 
1368 /*
1369  * For some function we need random bits.
1370  * This is a wrapper to whatever function you want that returns
1371  * 24 useful random bits.
1372  */
1373 
1374 #include <math.h> /* log, exp etc. */
1375 static inline uint64_t
1376 my_random24(void)	/* 24 useful bits */
1377 {
1378 	return random() & ((1<<24) - 1);
1379 }
1380 
1381 
1382 /*-------------- user-configuration -----------------*/
1383 
1384 #if 0 /* start of comment block */
1385 
1386 Here we place the functions to implement the various features
1387 of the system. For each feature one should define a struct _cfg
1388 (see at the beginning for definition) that refers a *_parse() function
1389 to extract values from the command line, and a *_run() function
1390 that is invoked on each packet to implement the desired function.
1391 
1392 Examples of the two functions are below. In general:
1393 
1394 - the *_parse() function takes argc/argv[], matches the function
1395   name in argv[0], extracts the operating parameters, allocates memory
1396   if needed, and stores them in the struct _cfg.
1397   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1398   error in the arguments, 0 if all ok.
1399 
1400   On the command line, argv[] is a single, comma separated argument
1401   that follow the specific option eg -D constant,20ms
1402 
1403   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1404   function can use that without having to allocate memory.
1405 
1406 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1407   *q contains all the informatio that may be possibly needed, including
1408   those on the packet currently under processing.
1409   The basic values are the following:
1410 
1411 	char *	 cur_pkt 	points to the current packet (linear buffer)
1412 	uint32_t cur_len;	length of the current packet
1413 		the functions are not supposed to modify these values
1414 
1415 	int	 cur_drop;	true if current packet must be dropped.
1416 		Must be set to non-zero by the loss emulation function
1417 
1418 	uint64_t cur_delay;	delay in nanoseconds for the current packet
1419 		Must be set by the delay emulation function
1420 
1421    More sophisticated functions may need to access other fields in *q,
1422    see the structure description for that.
1423 
1424 When implementing a new function for a feature (e.g. for delay,
1425 bandwidth, loss...) the struct _cfg should be added to the array
1426 that contains all possible options.
1427 
1428 		--- Specific notes ---
1429 
1430 DELAY emulation		-D option_arguments
1431 
1432     If the option is not supplied, the system applies 0 extra delay
1433 
1434     The resolution for times is 1ns, the precision is load dependent and
1435     generally in the order of 20-50us.
1436     Times are in nanoseconds, can be followed by a character specifying
1437     a different unit e.g.
1438 
1439 	n	nanoseconds
1440 	u	microseconds
1441 	m	milliseconds
1442 	s	seconds
1443 
1444     Currently implemented options:
1445 
1446     constant,t		constant delay equal to t
1447 
1448     uniform,tmin,tmax	uniform delay between tmin and tmax
1449 
1450     exp,tavg,tmin	exponential distribution with average tavg
1451 			and minimum tmin (corresponds to an exponential
1452 			distribution with argument 1/(tavg-tmin) )
1453 
1454 
1455 LOSS emulation		-L option_arguments
1456 
1457     Loss is expressed as packet or bit error rate, which is an absolute
1458     number between 0 and 1 (typically small).
1459 
1460     Currently implemented options
1461 
1462     plr,p		uniform packet loss rate p, independent
1463 			of packet size
1464 
1465     burst,p,lmin,lmax 	burst loss with burst probability p and
1466 			burst length uniformly distributed between
1467 			lmin and lmax
1468 
1469     ber,p		uniformly distributed bit error rate p,
1470 			so actual loss prob. depends on size.
1471 
1472 BANDWIDTH emulation	-B option_arguments
1473 
1474     Bandwidths are expressed in bits per second, can be followed by a
1475     character specifying a different unit e.g.
1476 
1477 	b/B	bits per second
1478 	k/K	kbits/s (10^3 bits/s)
1479 	m/M	mbits/s (10^6 bits/s)
1480 	g/G	gbits/s (10^9 bits/s)
1481 
1482     Currently implemented options
1483 
1484     const,b		constant bw, excluding mac framing
1485     ether,b		constant bw, including ethernet framing
1486 			(20 bytes framing + 4 bytes crc)
1487     real,[scale]	use real time, optionally with a scaling factor
1488 
1489 #endif /* end of comment block */
1490 
1491 /*
1492  * Configuration options for delay
1493  */
1494 
1495 /* constant delay, also accepts just a number */
1496 static int
1497 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1498 {
1499 	uint64_t delay;
1500 
1501 	(void)q;
1502 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1503 		return 2; /* unrecognised */
1504 	if (ac > 2)
1505 		return 1; /* error */
1506 	delay = parse_time(av[ac - 1]);
1507 	if (delay == U_PARSE_ERR)
1508 		return 1; /* error */
1509 	dst->d[0] = delay;
1510 	return 0;	/* success */
1511 }
1512 
1513 /* runtime function, store the delay into q->cur_delay */
1514 static int
1515 const_delay_run(struct _qs *q, struct _cfg *arg)
1516 {
1517 	q->cur_delay = arg->d[0]; /* the delay */
1518 	return 0;
1519 }
1520 
1521 static int
1522 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1523 {
1524 	uint64_t dmin, dmax;
1525 
1526 	(void)q;
1527 	if (strcmp(av[0], "uniform") != 0)
1528 		return 2; /* not recognised */
1529 	if (ac != 3)
1530 		return 1; /* error */
1531 	dmin = parse_time(av[1]);
1532 	dmax = parse_time(av[2]);
1533 	if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1534 		return 1;
1535 	D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1536 	dst->d[0] = dmin;
1537 	dst->d[1] = dmax;
1538 	dst->d[2] = dmax - dmin;
1539 	return 0;
1540 }
1541 
1542 static int
1543 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1544 {
1545 	uint64_t x = my_random24();
1546 	q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1547 #if 0 /* COMPUTE_STATS */
1548 #endif /* COMPUTE_STATS */
1549 	return 0;
1550 }
1551 
1552 /*
1553  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1554  * gives a delay between 0 and infinity with average d_av
1555  * The cumulative function is 1 - d_av exp(-x/d_av)
1556  *
1557  * The inverse function generates a uniform random number p in 0..1
1558  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1559  *
1560  * To speed up behaviour at runtime we tabulate the values
1561  */
1562 
1563 static int
1564 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1565 {
1566 #define	PTS_D_EXP	512
1567 	uint64_t i, d_av, d_min, *t; /*table of values */
1568 
1569         (void)q;
1570         if (strcmp(av[0], "exp") != 0)
1571 		return 2; /* not recognised */
1572         if (ac != 3)
1573                 return 1; /* error */
1574         d_av = parse_time(av[1]);
1575         d_min = parse_time(av[2]);
1576         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1577                 return 1; /* error */
1578 	d_av -= d_min;
1579 	dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1580 	dst->arg = calloc(1, dst->arg_len);
1581 	if (dst->arg == NULL)
1582 		return 1; /* no memory */
1583 	t = (uint64_t *)dst->arg;
1584 	/* tabulate -ln(1-n)*delay  for n in 0..1 */
1585 	for (i = 0; i < PTS_D_EXP; i++) {
1586 		double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1587 		t[i] = (uint64_t)d;
1588 		ND(5, "%ld: %le", i, d);
1589 	}
1590         return 0;
1591 }
1592 
1593 static int
1594 exp_delay_run(struct _qs *q, struct _cfg *arg)
1595 {
1596 	uint64_t *t = (uint64_t *)arg->arg;
1597         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1598 	RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1599         return 0;
1600 }
1601 
1602 
1603 /* unused arguments in configuration */
1604 #define TLEM_CFG_END	NULL, 0, {0}, {0}
1605 
1606 static struct _cfg delay_cfg[] = {
1607 	{ const_delay_parse, const_delay_run,
1608 		"constant,delay", TLEM_CFG_END },
1609 	{ uniform_delay_parse, uniform_delay_run,
1610 		"uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1611 	{ exp_delay_parse, exp_delay_run,
1612 		"exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1613 	{ NULL, NULL, NULL, TLEM_CFG_END }
1614 };
1615 
1616 /* standard bandwidth, also accepts just a number */
1617 static int
1618 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1619 {
1620 	uint64_t bw;
1621 
1622 	(void)q;
1623 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1624 		return 2; /* unrecognised */
1625 	if (ac > 2)
1626 		return 1; /* error */
1627 	bw = parse_bw(av[ac - 1]);
1628 	if (bw == U_PARSE_ERR) {
1629 		return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1630 	}
1631 	dst->d[0] = bw;
1632 	return 0;	/* success */
1633 }
1634 
1635 
1636 /* runtime function, store the delay into q->cur_delay */
1637 static int
1638 const_bw_run(struct _qs *q, struct _cfg *arg)
1639 {
1640 	uint64_t bps = arg->d[0];
1641 	q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1642 	return 0;
1643 }
1644 
1645 /* ethernet bandwidth, add 672 bits per packet */
1646 static int
1647 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1648 {
1649 	uint64_t bw;
1650 
1651 	(void)q;
1652 	if (strcmp(av[0], "ether") != 0)
1653 		return 2; /* unrecognised */
1654 	if (ac != 2)
1655 		return 1; /* error */
1656 	bw = parse_bw(av[ac - 1]);
1657 	if (bw == U_PARSE_ERR)
1658 		return 1; /* error */
1659 	dst->d[0] = bw;
1660 	return 0;	/* success */
1661 }
1662 
1663 
1664 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1665 static int
1666 ether_bw_run(struct _qs *q, struct _cfg *arg)
1667 {
1668 	uint64_t bps = arg->d[0];
1669 	q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1670 	return 0;
1671 }
1672 
1673 /* real bandwidth, plus scaling factor */
1674 static int
1675 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1676 {
1677 	double scale;
1678 
1679 	(void)q;
1680 	if (strcmp(av[0], "real") != 0)
1681 		return 2; /* unrecognised */
1682 	if (ac > 2) { /* second argument is optional */
1683 		return 1; /* error */
1684 	} else if (ac == 1) {
1685 		scale = 1;
1686 	} else {
1687 		int err = 0;
1688 		scale = parse_gen(av[ac-1], NULL, &err);
1689 		if (err || scale <= 0 || scale > 1000)
1690 			return 1;
1691 	}
1692 	ED("real -> scale is %.6f", scale);
1693 	dst->f[0] = scale;
1694 	return 0;	/* success */
1695 }
1696 
1697 static int
1698 real_bw_run(struct _qs *q, struct _cfg *arg)
1699 {
1700 	q->cur_tt /= arg->f[0];
1701 	return 0;
1702 }
1703 
1704 static struct _cfg bw_cfg[] = {
1705 	{ const_bw_parse, const_bw_run,
1706 		"constant,bps", TLEM_CFG_END },
1707 	{ ether_bw_parse, ether_bw_run,
1708 		"ether,bps", TLEM_CFG_END },
1709 	{ real_bw_parse, real_bw_run,
1710 		"real,scale", TLEM_CFG_END },
1711 	{ NULL, NULL, NULL, TLEM_CFG_END }
1712 };
1713 
1714 /*
1715  * loss patterns
1716  */
1717 static int
1718 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1719 {
1720 	double plr;
1721 	int err;
1722 
1723 	(void)q;
1724 	if (strcmp(av[0], "plr") != 0 && ac > 1)
1725 		return 2; /* unrecognised */
1726 	if (ac > 2)
1727 		return 1; /* error */
1728 	// XXX to be completed
1729 	plr = parse_gen(av[ac-1], NULL, &err);
1730 	if (err || plr < 0 || plr > 1)
1731 		return 1;
1732 	dst->d[0] = plr * (1<<24); /* scale is 16m */
1733 	if (plr != 0 && dst->d[0] == 0)
1734 		ED("WWW warning,  rounding %le down to 0", plr);
1735 	return 0;	/* success */
1736 }
1737 
1738 static int
1739 const_plr_run(struct _qs *q, struct _cfg *arg)
1740 {
1741 	(void)arg;
1742 	uint64_t r = my_random24();
1743 	q->cur_drop = r < arg->d[0];
1744 #if 1	/* keep stats */
1745 	arg->d[1]++;
1746 	arg->d[2] += q->cur_drop;
1747 #endif
1748 	return 0;
1749 }
1750 
1751 
1752 /*
1753  * For BER the loss is 1- (1-ber)**bit_len
1754  * The linear approximation is only good for small values, so we
1755  * tabulate (1-ber)**len for various sizes in bytes
1756  */
1757 static int
1758 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1759 {
1760 	double ber, ber8, cur;
1761 	int i, err;
1762 	uint32_t *plr;
1763 	const uint32_t mask = (1<<24) - 1;
1764 
1765 	(void)q;
1766 	if (strcmp(av[0], "ber") != 0)
1767 		return 2; /* unrecognised */
1768 	if (ac != 2)
1769 		return 1; /* error */
1770 	ber = parse_gen(av[ac-1], NULL, &err);
1771 	if (err || ber < 0 || ber > 1)
1772 		return 1;
1773 	dst->arg_len = MAX_PKT * sizeof(uint32_t);
1774 	plr = calloc(1, dst->arg_len);
1775 	if (plr == NULL)
1776 		return 1; /* no memory */
1777 	dst->arg = plr;
1778 	ber8 = 1 - ber;
1779 	ber8 *= ber8; /* **2 */
1780 	ber8 *= ber8; /* **4 */
1781 	ber8 *= ber8; /* **8 */
1782 	cur = 1;
1783 	for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1784 		plr[i] = (mask + 1)*(1 - cur);
1785 		if (plr[i] > mask)
1786 			plr[i] = mask;
1787 #if 0
1788 		if (i>= 60) //  && plr[i] < mask/2)
1789 			RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1790 #endif
1791 	}
1792 	dst->d[0] = ber * (mask + 1);
1793 	return 0;	/* success */
1794 }
1795 
1796 static int
1797 const_ber_run(struct _qs *q, struct _cfg *arg)
1798 {
1799 	int l = q->cur_len;
1800 	uint64_t r = my_random24();
1801 	uint32_t *plr = arg->arg;
1802 
1803 	if (l >= MAX_PKT) {
1804 		RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1805 		l = MAX_PKT-1;
1806 	}
1807 	q->cur_drop = r < plr[l];
1808 #if 1	/* keep stats */
1809 	arg->d[1] += l * 8;
1810 	arg->d[2] += q->cur_drop;
1811 #endif
1812 	return 0;
1813 }
1814 
1815 static struct _cfg loss_cfg[] = {
1816 	{ const_plr_parse, const_plr_run,
1817 		"plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1818 	{ const_ber_parse, const_ber_run,
1819 		"ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1820 	{ NULL, NULL, NULL, TLEM_CFG_END }
1821 };
1822