xref: /freebsd/tools/tools/netmap/nmreplay.c (revision bdd1243d)
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  *
25  * $FreeBSD$
26  */
27 
28 
29 /*
30  * This program implements NMREPLAY, a program to replay a pcap file
31  * enforcing the output rate and possibly random losses and delay
32  * distributions.
33  * It is meant to be run from the command line and implemented with a main
34  * control thread for monitoring, plus a thread to push packets out.
35  *
36  * The control thread parses command line arguments, prepares a
37  * schedule for transmission in a memory buffer and then sits
38  * in a loop where it periodically reads traffic statistics from
39  * the other threads and prints them out on the console.
40  *
41  * The transmit buffer contains headers and packets. Each header
42  * includes a timestamp that determines when the packet should be sent out.
43  * A "consumer" thread cons() reads from the queue and transmits packets
44  * on the output netmap port when their time has come.
45  *
46  * The program does CPU pinning and sets the scheduler and priority
47  * for the "cons" threads. Externally one should do the
48  * assignment of other threads (e.g. interrupt handlers) and
49  * make sure that network interfaces are configured properly.
50  *
51  * --- Main functions of the program ---
52  * within each function, q is used as a pointer to the queue holding
53  * packets and parameters.
54  *
55  * pcap_prod()
56  *
57  *	reads from the pcap file and prepares packets to transmit.
58  *	After reading a packet from the pcap file, the following information
59  *	are extracted which can be used to determine the schedule:
60  *
61  *   	q->cur_pkt	points to the buffer containing the packet
62  *	q->cur_len	packet length, excluding CRC
63  *	q->cur_caplen	available packet length (may be shorter than cur_len)
64  *	q->cur_tt	transmission time for the packet, computed from the trace.
65  *
66  *  The following functions are then called in sequence:
67  *
68  *  q->c_loss (set with the -L command line option) decides
69  *	whether the packet should be dropped before even queuing.
70  *	This is generally useful to emulate random loss.
71  *	The function is supposed to set q->c_drop = 1 if the
72  *	packet should be dropped, or leave it to 0 otherwise.
73  *
74  *   q->c_bw (set with the -B command line option) is used to
75  *      enforce the transmit bandwidth. The function must store
76  *	in q->cur_tt the transmission time (in nanoseconds) of
77  *	the packet, which is typically proportional to the length
78  *	of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
79  *	Variants are possible, eg. to account for constant framing
80  *	bits as on the ethernet, or variable channel acquisition times,
81  *	etc.
82  *	This mechanism can also be used to simulate variable queueing
83  *	delay e.g. due to the presence of cross traffic.
84  *
85  *   q->c_delay (set with the -D option) implements delay emulation.
86  *	The function should set q->cur_delay to the additional
87  *	delay the packet is subject to. The framework will take care of
88  *	computing the actual exit time of a packet so that there is no
89  *	reordering.
90  */
91 
92 // debugging macros
93 #define NED(_fmt, ...)	do {} while (0)
94 #define ED(_fmt, ...)						\
95 	do {							\
96 		struct timeval _t0;				\
97 		gettimeofday(&_t0, NULL);			\
98 		fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
99 		(int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
100 		__FUNCTION__, __LINE__, ##__VA_ARGS__);     \
101 	} while (0)
102 
103 /* WWW is for warnings, EEE is for errors */
104 #define WWW(_fmt, ...)	ED("--WWW-- " _fmt, ##__VA_ARGS__)
105 #define EEE(_fmt, ...)	ED("--EEE-- " _fmt, ##__VA_ARGS__)
106 #define DDD(_fmt, ...)	ED("--DDD-- " _fmt, ##__VA_ARGS__)
107 
108 #define _GNU_SOURCE	// for CPU_SET() etc
109 #include <errno.h>
110 #include <fcntl.h>
111 #include <libnetmap.h>
112 #include <math.h> /* log, exp etc. */
113 #include <pthread.h>
114 #ifdef __FreeBSD__
115 #include <pthread_np.h> /* pthread w/ affinity */
116 #include <sys/cpuset.h> /* cpu_set */
117 #endif /* __FreeBSD__ */
118 #include <signal.h>
119 #include <stdio.h>
120 #include <stdlib.h>
121 #include <string.h> /* memcpy */
122 #include <stdint.h>
123 #include <sys/ioctl.h>
124 #include <sys/mman.h>
125 #include <sys/poll.h>
126 #include <sys/resource.h> // setpriority
127 #include <sys/time.h>
128 #include <unistd.h>
129 
130 /*
131  *
132  * A packet in the queue is q_pkt plus the payload.
133  *
134  * For the packet descriptor we need the following:
135  *
136  *  -	position of next packet in the queue (can go backwards).
137  *	We can reduce to 32 bits if we consider alignments,
138  *	or we just store the length to be added to the current
139  *	value and assume 0 as a special index.
140  *  -	actual packet length (16 bits may be ok)
141  *  -	queue output time, in nanoseconds (64 bits)
142  *  -	delay line output time, in nanoseconds
143  *	One of the two can be packed to a 32bit value
144  *
145  * A convenient coding uses 32 bytes per packet.
146  */
147 
148 struct q_pkt {
149 	uint64_t	next;		/* buffer index for next packet */
150 	uint64_t	pktlen;		/* actual packet len */
151 	uint64_t	pt_qout;	/* time of output from queue */
152 	uint64_t	pt_tx;		/* transmit time */
153 };
154 
155 
156 /*
157  * The header for a pcap file
158  */
159 struct pcap_file_header {
160     uint32_t magic;
161 	/*used to detect the file format itself and the byte
162     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
163     ordering format into this field. The reading application will read either
164     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
165     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
166     will have to be swapped too. For nanosecond-resolution files, the writing
167     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
168     bytes swapped, and the reading application will read either 0xa1b23c4d
169     (identical) or 0x4d3cb2a1 (swapped)*/
170     uint16_t version_major;
171     uint16_t version_minor; /*the version number of this file format */
172     int32_t thiszone;
173 	/*the correction time in seconds between GMT (UTC) and the
174     local timezone of the following packet header timestamps. Examples: If the
175     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
176     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
177     must be -3600*/
178     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
179     uint32_t snaplen;
180 	/*the "snapshot length" for the capture (typically 65535
181     or even more, but might be limited by the user)*/
182     uint32_t network;
183 	/*link-layer header type, specifying the type of headers
184     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
185     types such as 802.11, 802.11 with various radio information, PPP, Token
186     Ring, FDDI, etc.*/
187 };
188 
189 #if 0 /* from pcap.h */
190 struct pcap_file_header {
191         bpf_u_int32 magic;
192         u_short version_major;
193         u_short version_minor;
194         bpf_int32 thiszone;     /* gmt to local correction */
195         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
196         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
197         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
198 };
199 
200 struct pcap_pkthdr {
201         struct timeval ts;      /* time stamp */
202         bpf_u_int32 caplen;     /* length of portion present */
203         bpf_u_int32 len;        /* length this packet (off wire) */
204 };
205 #endif /* from pcap.h */
206 
207 struct pcap_pkthdr {
208     uint32_t ts_sec; /* seconds from epoch */
209     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
210     uint32_t caplen;
211 	/*the number of bytes of packet data actually captured
212     and saved in the file. This value should never become larger than orig_len
213     or the snaplen value of the global header*/
214     uint32_t len;	/* wire length */
215 };
216 
217 
218 #define PKT_PAD         (32)    /* padding on packets */
219 
220 static inline int pad(int x)
221 {
222         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
223 }
224 
225 
226 
227 /*
228  * wrapper around the pcap file.
229  * We mmap the file so it is easy to do multiple passes through it.
230  */
231 struct nm_pcap_file {
232     int fd;
233     uint64_t filesize;
234     const char *data; /* mmapped file */
235 
236     uint64_t tot_pkt;
237     uint64_t tot_bytes;
238     uint64_t tot_bytes_rounded;	/* need hdr + pad(len) */
239     uint32_t resolution; /* 1000 for us, 1 for ns */
240     int swap; /* need to swap fields ? */
241 
242     uint64_t first_ts;
243     uint64_t total_tx_time;
244 	/*
245 	 * total_tx_time is computed as last_ts - first_ts, plus the
246 	 * transmission time for the first packet which in turn is
247 	 * computed according to the average bandwidth
248 	 */
249 
250     uint64_t file_len;
251     const char *cur;	/* running pointer */
252     const char *lim;	/* data + file_len */
253     int err;
254 };
255 
256 static struct nm_pcap_file *readpcap(const char *fn);
257 static void destroy_pcap(struct nm_pcap_file *file);
258 
259 
260 #define NS_SCALE 1000000000UL	/* nanoseconds in 1s */
261 
262 static void destroy_pcap(struct nm_pcap_file *pf)
263 {
264     if (!pf)
265 	return;
266 
267     munmap((void *)(uintptr_t)pf->data, pf->filesize);
268     close(pf->fd);
269     bzero(pf, sizeof(*pf));
270     free(pf);
271     return;
272 }
273 
274 // convert a field of given size if swap is needed.
275 static uint32_t
276 cvt(const void *src, int size, char swap)
277 {
278     uint32_t ret = 0;
279     if (size != 2 && size != 4) {
280 	EEE("Invalid size %d\n", size);
281 	exit(1);
282     }
283     memcpy(&ret, src, size);
284     if (swap) {
285 	unsigned char tmp, *data = (unsigned char *)&ret;
286 	int i;
287         for (i = 0; i < size / 2; i++) {
288             tmp = data[i];
289             data[i] = data[size - (1 + i)];
290             data[size - (1 + i)] = tmp;
291         }
292     }
293     return ret;
294 }
295 
296 static uint32_t
297 read_next_info(struct nm_pcap_file *pf, int size)
298 {
299     const char *end = pf->cur + size;
300     uint32_t ret;
301     if (end > pf->lim) {
302 	pf->err = 1;
303 	ret = 0;
304     } else {
305 	ret = cvt(pf->cur, size, pf->swap);
306 	pf->cur = end;
307     }
308     return ret;
309 }
310 
311 /*
312  * mmap the file, make sure timestamps are sorted, and count
313  * packets and sizes
314  * Timestamps represent the receive time of the packets.
315  * We need to compute also the 'first_ts' which refers to a hypotetical
316  * packet right before the first one, see the code for details.
317  */
318 static struct nm_pcap_file *
319 readpcap(const char *fn)
320 {
321     struct nm_pcap_file _f, *pf = &_f;
322     uint64_t prev_ts, first_pkt_time;
323     uint32_t magic, first_len = 0;
324 
325     bzero(pf, sizeof(*pf));
326     pf->fd = open(fn, O_RDONLY);
327     if (pf->fd < 0) {
328 	EEE("cannot open file %s", fn);
329 	return NULL;
330     }
331     /* compute length */
332     pf->filesize = lseek(pf->fd, 0, SEEK_END);
333     lseek(pf->fd, 0, SEEK_SET);
334     ED("filesize is %lu", (u_long)(pf->filesize));
335     if (pf->filesize < sizeof(struct pcap_file_header)) {
336 	EEE("file too short %s", fn);
337 	close(pf->fd);
338 	return NULL;
339     }
340     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
341     if (pf->data == MAP_FAILED) {
342 	EEE("cannot mmap file %s", fn);
343 	close(pf->fd);
344 	return NULL;
345     }
346     pf->cur = pf->data;
347     pf->lim = pf->data + pf->filesize;
348     pf->err = 0;
349     pf->swap = 0; /* default, same endianness when read magic */
350 
351     magic = read_next_info(pf, 4);
352     ED("magic is 0x%x", magic);
353     switch (magic) {
354     case 0xa1b2c3d4: /* native, us resolution */
355 	pf->swap = 0;
356 	pf->resolution = 1000;
357 	break;
358     case 0xd4c3b2a1: /* swapped, us resolution */
359 	pf->swap = 1;
360 	pf->resolution = 1000;
361 	break;
362     case 0xa1b23c4d:	/* native, ns resolution */
363 	pf->swap = 0;
364 	pf->resolution = 1; /* nanoseconds */
365 	break;
366     case 0x4d3cb2a1:	/* swapped, ns resolution */
367 	pf->swap = 1;
368 	pf->resolution = 1; /* nanoseconds */
369 	break;
370     default:
371 	EEE("unknown magic 0x%x", magic);
372 	return NULL;
373     }
374 
375     ED("swap %d res %d\n", pf->swap, pf->resolution);
376     pf->cur = pf->data + sizeof(struct pcap_file_header);
377     pf->lim = pf->data + pf->filesize;
378     pf->err = 0;
379     prev_ts = 0;
380     while (pf->cur < pf->lim && pf->err == 0) {
381 	uint32_t base = pf->cur - pf->data;
382 	uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
383 		read_next_info(pf, 4) * pf->resolution;
384 	uint32_t caplen = read_next_info(pf, 4);
385 	uint32_t len = read_next_info(pf, 4);
386 
387 	if (pf->err) {
388 	    WWW("end of pcap file after %d packets\n",
389 		(int)pf->tot_pkt);
390 	    break;
391 	}
392 	if  (cur_ts < prev_ts) {
393 	    WWW("reordered packet %d\n",
394 		(int)pf->tot_pkt);
395 	}
396 	prev_ts = cur_ts;
397 	(void)base;
398 	if (pf->tot_pkt == 0) {
399 	    pf->first_ts = cur_ts;
400 	    first_len = len;
401 	}
402 	pf->tot_pkt++;
403 	pf->tot_bytes += len;
404 	pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
405 	pf->cur += caplen;
406     }
407     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
408     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
409 	(u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
410 	1e-9*pf->total_tx_time, (u_long)first_len);
411     /*
412      * We determine that based on the
413      * average bandwidth of the trace, as follows
414      *   first_pkt_ts = p[0].len / avg_bw
415      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
416      * so
417      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
418      */
419     if (pf->tot_bytes == first_len) {
420 	/* cannot estimate bandwidth, so force 1 Gbit */
421 	first_pkt_time = first_len * 8; /* * 10^9 / bw */
422     } else {
423 	first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
424     }
425     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
426     pf->total_tx_time += first_pkt_time;
427     pf->first_ts -= first_pkt_time;
428 
429     /* all correct, allocate a record and copy */
430     pf = calloc(1, sizeof(*pf));
431     *pf = _f;
432     /* reset pointer to start */
433     pf->cur = pf->data + sizeof(struct pcap_file_header);
434     pf->err = 0;
435     return pf;
436 }
437 
438 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
439 
440 static int verbose = 0;
441 
442 static int do_abort = 0;
443 
444 #ifdef linux
445 #define cpuset_t        cpu_set_t
446 #endif
447 
448 #ifdef __APPLE__
449 #define cpuset_t        uint64_t        // XXX
450 static inline void CPU_ZERO(cpuset_t *p)
451 {
452         *p = 0;
453 }
454 
455 static inline void CPU_SET(uint32_t i, cpuset_t *p)
456 {
457         *p |= 1<< (i & 0x3f);
458 }
459 
460 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
461 #define sched_setscheduler(a, b, c)	(1) /* error */
462 #define clock_gettime(a,b)      \
463         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
464 
465 #define	_P64	unsigned long
466 #endif
467 
468 #ifndef _P64
469 
470 /* we use uint64_t widely, but printf gives trouble on different
471  * platforms so we use _P64 as a cast
472  */
473 #define	_P64	uint64_t
474 #endif /* print stuff */
475 
476 
477 struct _qs;	/* forward */
478 /*
479  * descriptor of a configuration entry.
480  * Each handler has a parse function which takes ac/av[] and returns
481  * true if successful. Any allocated space is stored into struct _cfg *
482  * that is passed as argument.
483  * arg and arg_len are included for convenience.
484  */
485 struct _cfg {
486     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
487     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
488     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
489 
490     const char *optarg;	/* command line argument. Initial value is the error message */
491     /* placeholders for common values */
492     void *arg;		/* allocated memory if any */
493     int arg_len;	/* size of *arg in case a realloc is needed */
494     uint64_t d[16];	/* static storage for simple cases */
495     double f[4];	/* static storage for simple cases */
496 };
497 
498 
499 /*
500  * communication occurs through this data structure, with fields
501  * cache-aligned according to who are the readers/writers.
502  *
503 
504 The queue is an array of memory  (buf) of size buflen (does not change).
505 
506 The producer uses 'tail' as an index in the queue to indicate
507 the first empty location (ie. after the last byte of data),
508 the consumer uses head to indicate the next byte to consume.
509 
510 For best performance we should align buffers and packets
511 to multiples of cacheline, but this would explode memory too much.
512 Worst case memory explosion is with 65 byte packets.
513 Memory usage as shown below:
514 
515 		qpkt-pad
516 	size	32-16	32-32	32-64	64-64
517 
518 	64	96	96	96	128
519 	65	112	128	160	192
520 
521 
522 An empty queue has head == tail, a full queue will have free space
523 below a threshold.  In our case the queue is large enough and we
524 are non blocking so we can simply drop traffic when the queue
525 approaches a full state.
526 
527 To simulate bandwidth limitations efficiently, the producer has a second
528 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
529 
530  */
531 /*
532  * When sizing the buffer, we must assume some value for the bandwidth.
533  * INFINITE_BW is supposed to be faster than what we support
534  */
535 #define INFINITE_BW	(200ULL*1000000*1000)
536 #define	MY_CACHELINE	(128ULL)
537 #define MAX_PKT		(9200)	/* max packet size */
538 
539 #define ALIGN_CACHE	__attribute__ ((aligned (MY_CACHELINE)))
540 
541 struct _qs { /* shared queue */
542 	uint64_t	t0;	/* start of times */
543 
544 	uint64_t 	buflen;	/* queue length */
545 	char *buf;
546 
547 	/* handlers for various options */
548 	struct _cfg	c_delay;
549 	struct _cfg	c_bw;
550 	struct _cfg	c_loss;
551 
552 	/* producer's fields */
553 	uint64_t	tx ALIGN_CACHE;	/* tx counter */
554 	uint64_t	prod_tail_1;	/* head of queue */
555 	uint64_t	prod_head;	/* cached copy */
556 	uint64_t	prod_tail;	/* cached copy */
557 	uint64_t	prod_drop;	/* drop packet count */
558 	uint64_t	prod_max_gap;	/* rx round duration */
559 
560 	struct nm_pcap_file	*pcap;		/* the pcap struct */
561 
562 	/* parameters for reading from the netmap port */
563 	struct nmport_d *src_port;		/* netmap descriptor */
564 	const char *	prod_ifname;	/* interface name or pcap file */
565 	struct netmap_ring *rxring;	/* current ring being handled */
566 	uint32_t	si;		/* ring index */
567 	int		burst;
568 	uint32_t	rx_qmax;	/* stats on max queued */
569 
570 	uint64_t	qt_qout;	/* queue exit time for last packet */
571 		/*
572 		 * when doing shaping, the software computes and stores here
573 		 * the time when the most recently queued packet will exit from
574 		 * the queue.
575 		 */
576 
577 	uint64_t	qt_tx;		/* delay line exit time for last packet */
578 		/*
579 		 * The software computes the time at which the most recently
580 		 * queued packet exits from the queue.
581 		 * To avoid reordering, the next packet should exit at least
582 		 * at qt_tx + cur_tt
583 		 */
584 
585 	/* producer's fields controlling the queueing */
586 	const char *	cur_pkt;	/* current packet being analysed */
587 	uint32_t	cur_len;	/* length of current packet */
588 	uint32_t	cur_caplen;	/* captured length of current packet */
589 
590 	int		cur_drop;	/* 1 if current  packet should be dropped. */
591 		/*
592 		 * cur_drop can be set as a result of the loss emulation,
593 		 * and may need to use the packet size, current time, etc.
594 		 */
595 
596 	uint64_t	cur_tt;		/* transmission time (ns) for current packet */
597 		/*
598 		 * The transmission time is how much link time the packet will consume.
599 		 * should be set by the function that does the bandwidth emulation,
600 		 * but could also be the result of a function that emulates the
601 		 * presence of competing traffic, MAC protocols etc.
602 		 * cur_tt is 0 for links with infinite bandwidth.
603 		 */
604 
605 	uint64_t	cur_delay;	/* delay (ns) for current packet from c_delay.run() */
606 		/*
607 		 * this should be set by the function that computes the extra delay
608 		 * applied to the packet.
609 		 * The code makes sure that there is no reordering and possibly
610 		 * bumps the output time as needed.
611 		 */
612 
613 
614 	/* consumer's fields */
615 	const char *		cons_ifname;
616 	uint64_t rx ALIGN_CACHE;	/* rx counter */
617 	uint64_t	cons_head;	/* cached copy */
618 	uint64_t	cons_tail;	/* cached copy */
619 	uint64_t	cons_now;	/* most recent producer timestamp */
620 	uint64_t	rx_wait;	/* stats */
621 
622 	/* shared fields */
623 	volatile uint64_t _tail ALIGN_CACHE ;	/* producer writes here */
624 	volatile uint64_t _head ALIGN_CACHE ;	/* consumer reads from here */
625 };
626 
627 struct pipe_args {
628 	int		wait_link;
629 
630 	pthread_t	cons_tid;	/* main thread */
631 	pthread_t	prod_tid;	/* producer thread */
632 
633 	/* Affinity: */
634 	int		cons_core;	/* core for cons() */
635 	int		prod_core;	/* core for prod() */
636 
637 	struct nmport_d *pa;		/* netmap descriptor */
638 	struct nmport_d *pb;
639 
640 	struct _qs	q;
641 };
642 
643 #define NS_IN_S	(1000000000ULL)	// nanoseconds
644 #define TIME_UNITS	NS_IN_S
645 /* set the thread affinity. */
646 static int
647 setaffinity(int i)
648 {
649         cpuset_t cpumask;
650 	struct sched_param p;
651 
652         if (i == -1)
653                 return 0;
654 
655         /* Set thread affinity affinity.*/
656         CPU_ZERO(&cpumask);
657         CPU_SET(i, &cpumask);
658 
659         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
660                 WWW("Unable to set affinity: %s", strerror(errno));
661         }
662 	if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
663                 WWW("Unable to set priority: %s", strerror(errno));
664 	}
665 	bzero(&p, sizeof(p));
666 	p.sched_priority = 10; // 99 on linux ?
667 	// use SCHED_RR or SCHED_FIFO
668 	if (sched_setscheduler(0, SCHED_RR, &p)) {
669                 WWW("Unable to set scheduler: %s", strerror(errno));
670 	}
671         return 0;
672 }
673 
674 
675 /*
676  * set the timestamp from the clock, subtract t0
677  */
678 static inline void
679 set_tns_now(uint64_t *now, uint64_t t0)
680 {
681     struct timespec t;
682 
683     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
684     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
685     *now -= t0;
686 }
687 
688 
689 
690 /* compare two timestamps */
691 static inline int64_t
692 ts_cmp(uint64_t a, uint64_t b)
693 {
694 	return (int64_t)(a - b);
695 }
696 
697 /* create a packet descriptor */
698 static inline struct q_pkt *
699 pkt_at(struct _qs *q, uint64_t ofs)
700 {
701     return (struct q_pkt *)(q->buf + ofs);
702 }
703 
704 
705 /*
706  * we have already checked for room and prepared p->next
707  */
708 static inline int
709 enq(struct _qs *q)
710 {
711     struct q_pkt *p = pkt_at(q, q->prod_tail);
712 
713     /* hopefully prefetch has been done ahead */
714     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
715     p->pktlen = q->cur_len;
716     p->pt_qout = q->qt_qout;
717     p->pt_tx = q->qt_tx;
718     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
719     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
720         q->cur_len, (int)q->prod_tail, p->next,
721         1e-9*p->pt_qout, 1e-9*p->pt_tx);
722     q->prod_tail = p->next;
723     q->tx++;
724     return 0;
725 }
726 
727 /*
728  * simple handler for parameters not supplied
729  */
730 static int
731 null_run_fn(struct _qs *q, struct _cfg *cfg)
732 {
733     (void)q;
734     (void)cfg;
735     return 0;
736 }
737 
738 
739 
740 /*
741  * put packet data into the buffer.
742  * We read from the mmapped pcap file, construct header, copy
743  * the captured length of the packet and pad with zeroes.
744  */
745 static void *
746 pcap_prod(void *_pa)
747 {
748     struct pipe_args *pa = _pa;
749     struct _qs *q = &pa->q;
750     struct nm_pcap_file *pf = q->pcap;	/* already opened by readpcap */
751     uint32_t loops, i, tot_pkts;
752 
753     /* data plus the loop record */
754     uint64_t need;
755     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
756 
757     /*
758      * For speed we make sure the trace is at least some 1000 packets,
759      * so we may need to loop the trace more than once (for short traces)
760      */
761     loops = (1 + 10000 / pf->tot_pkt);
762     tot_pkts = loops * pf->tot_pkt;
763     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
764     q->buf = calloc(1, need);
765     if (q->buf == NULL) {
766 	D("alloc %lld bytes for queue failed, exiting",(long long)need);
767 	goto fail;
768     }
769     q->prod_head = q->prod_tail = 0;
770     q->buflen = need;
771 
772     pf->cur = pf->data + sizeof(struct pcap_file_header);
773     pf->err = 0;
774 
775     ED("--- start create %lu packets at tail %d",
776 	(u_long)tot_pkts, (int)q->prod_tail);
777     last_ts = pf->first_ts; /* beginning of the trace */
778 
779     q->qt_qout = 0; /* first packet out of the queue */
780 
781     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
782 	const char *next_pkt; /* in the pcap buffer */
783 	uint64_t cur_ts;
784 
785 	/* read values from the pcap buffer */
786 	cur_ts = read_next_info(pf, 4) * NS_SCALE +
787 		read_next_info(pf, 4) * pf->resolution;
788 	q->cur_caplen = read_next_info(pf, 4);
789 	q->cur_len = read_next_info(pf, 4);
790 	next_pkt = pf->cur + q->cur_caplen;
791 
792 	/* prepare fields in q for the generator */
793 	q->cur_pkt = pf->cur;
794 	/* initial estimate of tx time */
795 	q->cur_tt = cur_ts - last_ts;
796 	    // -pf->first_ts + loops * pf->total_tx_time - last_ts;
797 
798 	if ((i % pf->tot_pkt) == 0)
799 	   ED("insert %5d len %lu cur_tt %.6f",
800 		i, (u_long)q->cur_len, 1e-9*q->cur_tt);
801 
802 	/* prepare for next iteration */
803 	pf->cur = next_pkt;
804 	last_ts = cur_ts;
805 	if (next_pkt == pf->lim) {	//last pkt
806 	    pf->cur = pf->data + sizeof(struct pcap_file_header);
807     	    last_ts = pf->first_ts; /* beginning of the trace */
808 	    loops++;
809 	}
810 
811 	q->c_loss.run(q, &q->c_loss);
812 	if (q->cur_drop)
813 	    continue;
814 	q->c_bw.run(q, &q->c_bw);
815 	tt = q->cur_tt;
816 	q->qt_qout += tt;
817 #if 0
818 	if (drop_after(q))
819 	    continue;
820 #endif
821 	q->c_delay.run(q, &q->c_delay); /* compute delay */
822 	t_tx = q->qt_qout + q->cur_delay;
823 	ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
824 	/* insure no reordering and spacing by transmission time */
825 	q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
826 	enq(q);
827 
828 	q->tx++;
829 	ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
830     }
831     /* loop marker ? */
832     ED("done q->prod_tail:%d",(int)q->prod_tail);
833     q->_tail = q->prod_tail; /* publish */
834 
835     return NULL;
836 fail:
837     if (q->buf != NULL) {
838 	free(q->buf);
839     }
840     nmport_close(pa->pb);
841     return (NULL);
842 }
843 
844 
845 /*
846  * the consumer reads from the queue using head,
847  * advances it every now and then.
848  */
849 static void *
850 cons(void *_pa)
851 {
852     struct pipe_args *pa = _pa;
853     struct _qs *q = &pa->q;
854     int pending = 0;
855     uint64_t last_ts = 0;
856 
857     /* read the start of times in q->t0 */
858     set_tns_now(&q->t0, 0);
859     /* set the time (cons_now) to clock - q->t0 */
860     set_tns_now(&q->cons_now, q->t0);
861     q->cons_head = q->_head;
862     q->cons_tail = q->_tail;
863     while (!do_abort) { /* consumer, infinite */
864 	struct q_pkt *p = pkt_at(q, q->cons_head);
865 
866 	__builtin_prefetch (q->buf + p->next);
867 
868 	if (q->cons_head == q->cons_tail) {	//reset record
869 	    ND("Transmission restarted");
870 	    /*
871 	     * add to q->t0 the time for the last packet
872 	     */
873 	    q->t0 += last_ts;
874 	    set_tns_now(&q->cons_now, q->t0);
875 	    q->cons_head = 0;	//restart from beginning of the queue
876 	    continue;
877 	}
878 	last_ts = p->pt_tx;
879 	if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
880 	    // packet not ready
881 	    q->rx_wait++;
882 	    /* the ioctl should be conditional */
883 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
884 	    pending = 0;
885 	    usleep(20);
886 	    set_tns_now(&q->cons_now, q->t0);
887 	    continue;
888 	}
889 	/* XXX copy is inefficient but simple */
890 	if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
891 	    RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
892 		(int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
893 		(u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
894 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
895 	    pending = 0;
896 	    continue;
897 	}
898 	pending++;
899 	if (pending > q->burst) {
900 	    ioctl(pa->pb->fd, NIOCTXSYNC, 0);
901 	    pending = 0;
902 	}
903 
904 	q->cons_head = p->next;
905 	/* drain packets from the queue */
906 	q->rx++;
907     }
908     D("exiting on abort");
909     return NULL;
910 }
911 
912 /*
913  * In case of pcap file as input, the program acts in 2 different
914  * phases. It first fill the queue and then starts the cons()
915  */
916 static void *
917 nmreplay_main(void *_a)
918 {
919     struct pipe_args *a = _a;
920     struct _qs *q = &a->q;
921     const char *cap_fname = q->prod_ifname;
922 
923     setaffinity(a->cons_core);
924     set_tns_now(&q->t0, 0); /* starting reference */
925     if (cap_fname == NULL) {
926 	goto fail;
927     }
928     q->pcap = readpcap(cap_fname);
929     if (q->pcap == NULL) {
930 	EEE("unable to read file %s", cap_fname);
931 	goto fail;
932     }
933     pcap_prod((void*)a);
934     destroy_pcap(q->pcap);
935     q->pcap = NULL;
936     a->pb = nmport_open(q->cons_ifname);
937     if (a->pb == NULL) {
938 	EEE("cannot open netmap on %s", q->cons_ifname);
939 	do_abort = 1; // XXX any better way ?
940 	return NULL;
941     }
942     /* continue as cons() */
943     WWW("prepare to send packets");
944     usleep(1000);
945     cons((void*)a);
946     EEE("exiting on abort");
947 fail:
948     if (q->pcap != NULL) {
949 	destroy_pcap(q->pcap);
950     }
951     do_abort = 1;
952     return NULL;
953 }
954 
955 
956 static void
957 sigint_h(int sig)
958 {
959 	(void)sig;	/* UNUSED */
960 	do_abort = 1;
961 	signal(SIGINT, SIG_DFL);
962 }
963 
964 
965 
966 static void
967 usage(void)
968 {
969 	fprintf(stderr,
970 	    "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
971 	    "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
972 	exit(1);
973 }
974 
975 
976 /*---- configuration handling ---- */
977 /*
978  * support routine: split argument, returns ac and *av.
979  * av contains two extra entries, a NULL and a pointer
980  * to the entire string.
981  */
982 static char **
983 split_arg(const char *src, int *_ac)
984 {
985     char *my = NULL, **av = NULL;
986     const char *seps = " \t\r\n,";
987     int l, i, ac; /* number of entries */
988 
989     if (!src)
990 	return NULL;
991     l = strlen(src);
992     /* in the first pass we count fields, in the second pass
993      * we allocate the av[] array and a copy of the string
994      * and fill av[]. av[ac] = NULL, av[ac+1]
995      */
996     for (;;) {
997 	i = ac = 0;
998 	ND("start pass %d: <%s>", av ? 1 : 0, my);
999 	while (i < l) {
1000 	    /* trim leading separator */
1001 	    while (i <l && strchr(seps, src[i]))
1002 		i++;
1003 	    if (i >= l)
1004 		break;
1005 	    ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1006 	    if (av) /* in the second pass, set the result */
1007 		av[ac] = my+i;
1008 	    ac++;
1009 	    /* skip string */
1010 	    while (i <l && !strchr(seps, src[i])) i++;
1011 	    if (av)
1012 		my[i] = '\0'; /* write marker */
1013 	}
1014 	if (!av) { /* end of first pass */
1015 	    ND("ac is %d", ac);
1016 	    av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1017 	    my = (char *)&(av[ac+2]);
1018 	    strcpy(my, src);
1019 	} else {
1020 	    break;
1021 	}
1022     }
1023     for (i = 0; i < ac; i++) {
1024 	NED("%d: <%s>", i, av[i]);
1025     }
1026     av[i++] = NULL;
1027     av[i++] = my;
1028     *_ac = ac;
1029     return av;
1030 }
1031 
1032 
1033 /*
1034  * apply a command against a set of functions,
1035  * install a handler in *dst
1036  */
1037 static int
1038 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1039 {
1040 	int ac = 0;
1041 	char **av;
1042 	int i;
1043 
1044 	if (arg == NULL || *arg == '\0')
1045 		return 1; /* no argument may be ok */
1046 	if (a == NULL || dst == NULL) {
1047 		ED("program error - invalid arguments");
1048 		exit(1);
1049 	}
1050 	av = split_arg(arg, &ac);
1051 	if (av == NULL)
1052 		return 1; /* error */
1053 	for (i = 0; a[i].parse; i++) {
1054 		struct _cfg x = a[i];
1055 		const char *errmsg = x.optarg;
1056 		int ret;
1057 
1058 		x.arg = NULL;
1059 		x.arg_len = 0;
1060 		bzero(&x.d, sizeof(x.d));
1061 		ND("apply %s to %s", av[0], errmsg);
1062 		ret = x.parse(q, &x, ac, av);
1063 		if (ret == 2) /* not recognised */
1064 			continue;
1065 		if (ret == 1) {
1066 			ED("invalid arguments: need '%s' have '%s'",
1067 				errmsg, arg);
1068 			break;
1069 		}
1070 		x.optarg = arg;
1071 		*dst = x;
1072 		return 0;
1073 	}
1074 	ED("arguments %s not recognised", arg);
1075 	free(av);
1076 	return 1;
1077 }
1078 
1079 static struct _cfg delay_cfg[];
1080 static struct _cfg bw_cfg[];
1081 static struct _cfg loss_cfg[];
1082 
1083 static uint64_t parse_bw(const char *arg);
1084 
1085 /*
1086  * prodcons [options]
1087  * accept separate sets of arguments for the two directions
1088  *
1089  */
1090 
1091 static void
1092 add_to(const char ** v, int l, const char *arg, const char *msg)
1093 {
1094 	for (; l > 0 && *v != NULL ; l--, v++);
1095 	if (l == 0) {
1096 		ED("%s %s", msg, arg);
1097 		exit(1);
1098 	}
1099 	*v = arg;
1100 }
1101 
1102 int
1103 main(int argc, char **argv)
1104 {
1105 	int ch, i, err=0;
1106 
1107 #define	N_OPTS	1
1108 	struct pipe_args bp[N_OPTS];
1109 	const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1110 	const char *pcap_file[N_OPTS];
1111 	int cores[4] = { 2, 8, 4, 10 }; /* default values */
1112 
1113 	bzero(&bp, sizeof(bp));	/* all data initially go here */
1114 	bzero(d, sizeof(d));
1115 	bzero(b, sizeof(b));
1116 	bzero(l, sizeof(l));
1117 	bzero(q, sizeof(q));
1118 	bzero(m, sizeof(m));
1119 	bzero(ifname, sizeof(ifname));
1120 	bzero(pcap_file, sizeof(pcap_file));
1121 
1122 
1123 	/* set default values */
1124 	for (i = 0; i < N_OPTS; i++) {
1125 	    struct _qs *qs = &bp[i].q;
1126 
1127 	    qs->burst = 128;
1128 	    qs->c_delay.optarg = "0";
1129 	    qs->c_delay.run = null_run_fn;
1130 	    qs->c_loss.optarg = "0";
1131 	    qs->c_loss.run = null_run_fn;
1132 	    qs->c_bw.optarg = "0";
1133 	    qs->c_bw.run = null_run_fn;
1134 	}
1135 
1136 	// Options:
1137 	// B	bandwidth in bps
1138 	// D	delay in seconds
1139 	// L	loss probability
1140 	// f	pcap file
1141 	// i	interface name
1142 	// w	wait link
1143 	// b	batch size
1144 	// v	verbose
1145 	// C	cpu placement
1146 
1147 	while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1148 		switch (ch) {
1149 		default:
1150 			D("bad option %c %s", ch, optarg);
1151 			usage();
1152 			break;
1153 
1154 		case 'C': /* CPU placement, up to 4 arguments */
1155 			{
1156 				int ac = 0;
1157 				char **av = split_arg(optarg, &ac);
1158 				if (ac == 1) { /* sequential after the first */
1159 					cores[0] = atoi(av[0]);
1160 					cores[1] = cores[0] + 1;
1161 					cores[2] = cores[1] + 1;
1162 					cores[3] = cores[2] + 1;
1163 				} else if (ac == 2) { /* two sequential pairs */
1164 					cores[0] = atoi(av[0]);
1165 					cores[1] = cores[0] + 1;
1166 					cores[2] = atoi(av[1]);
1167 					cores[3] = cores[2] + 1;
1168 				} else if (ac == 4) { /* four values */
1169 					cores[0] = atoi(av[0]);
1170 					cores[1] = atoi(av[1]);
1171 					cores[2] = atoi(av[2]);
1172 					cores[3] = atoi(av[3]);
1173 				} else {
1174 					ED(" -C accepts 1, 2 or 4 comma separated arguments");
1175 					usage();
1176 				}
1177 				if (av)
1178 					free(av);
1179 			}
1180 			break;
1181 
1182 		case 'B': /* bandwidth in bps */
1183 			add_to(b, N_OPTS, optarg, "-B too many times");
1184 			break;
1185 
1186 		case 'D': /* delay in seconds (float) */
1187 			add_to(d, N_OPTS, optarg, "-D too many times");
1188 			break;
1189 
1190 		case 'L': /* loss probability */
1191 			add_to(l, N_OPTS, optarg, "-L too many times");
1192 			break;
1193 
1194 		case 'b':	/* burst */
1195 			bp[0].q.burst = atoi(optarg);
1196 			break;
1197 
1198 		case 'f':	/* pcap_file */
1199 			add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1200 			break;
1201 		case 'i':	/* interface */
1202 			add_to(ifname, N_OPTS, optarg, "-i too many times");
1203 			break;
1204 		case 'v':
1205 			verbose++;
1206 			break;
1207 		case 'w':
1208 			bp[0].wait_link = atoi(optarg);
1209 			break;
1210 		}
1211 
1212 	}
1213 
1214 	argc -= optind;
1215 	argv += optind;
1216 
1217 	/*
1218 	 * consistency checks for common arguments
1219 	 * if pcap file has been provided we need just one interface, two otherwise
1220 	 */
1221 	if (!pcap_file[0]) {
1222 		ED("missing pcap file");
1223 		usage();
1224 	}
1225 	if (!ifname[0]) {
1226 		ED("missing interface");
1227 		usage();
1228 	}
1229 	if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1230 		WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1231 		bp[0].q.burst = 1024; // XXX 128 is probably better
1232 	}
1233 	if (bp[0].wait_link > 100) {
1234 		ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1235 		bp[0].wait_link = 4;
1236 	}
1237 
1238 	bp[0].q.prod_ifname = pcap_file[0];
1239 	bp[0].q.cons_ifname = ifname[0];
1240 
1241 	/* assign cores. prod and cons work better if on the same HT */
1242 	bp[0].cons_core = cores[0];
1243 	bp[0].prod_core = cores[1];
1244 	ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1245 
1246 	/* apply commands */
1247 	for (i = 0; i < N_OPTS; i++) { /* once per queue */
1248 		struct _qs *qs = &bp[i].q;
1249 		err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
1250 		err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
1251 		err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
1252 		if (err != 0)
1253 			exit(1);
1254 	}
1255 
1256 	pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1257 	signal(SIGINT, sigint_h);
1258 	sleep(1);
1259 	while (!do_abort) {
1260 	    struct _qs olda = bp[0].q;
1261 	    struct _qs *q0 = &bp[0].q;
1262 
1263 	    sleep(1);
1264 	    ED("%lld -> %lld maxq %d round %lld",
1265 		(long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1266 		q0->rx_qmax, (long long)q0->prod_max_gap
1267 		);
1268 	    ED("plr nominal %le actual %le",
1269 		(double)(q0->c_loss.d[0])/(1<<24),
1270 		q0->c_loss.d[1] == 0 ? 0 :
1271 		(double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1272 	    bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1273 	    bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1274 	}
1275 	D("exiting on abort");
1276 	sleep(1);
1277 
1278 	return (0);
1279 }
1280 
1281 /* conversion factor for numbers.
1282  * Each entry has a set of characters and conversion factor,
1283  * the first entry should have an empty string and default factor,
1284  * the final entry has s = NULL.
1285  */
1286 struct _sm {	/* string and multiplier */
1287 	const char *s;
1288 	double m;
1289 };
1290 
1291 /*
1292  * parse a generic value
1293  */
1294 static double
1295 parse_gen(const char *arg, const struct _sm *conv, int *err)
1296 {
1297 	double d;
1298 	char *ep;
1299 	int dummy;
1300 
1301 	if (err == NULL)
1302 		err = &dummy;
1303 	*err = 0;
1304 	if (arg == NULL)
1305 		goto error;
1306 	d = strtod(arg, &ep);
1307 	if (ep == arg) { /* no value */
1308 		ED("bad argument %s", arg);
1309 		goto error;
1310 	}
1311 	/* special case, no conversion */
1312 	if (conv == NULL && *ep == '\0')
1313 		goto done;
1314 	ND("checking %s [%s]", arg, ep);
1315 	for (;conv->s; conv++) {
1316 		if (strchr(conv->s, *ep))
1317 			goto done;
1318 	}
1319 error:
1320 	*err = 1;	/* unrecognised */
1321 	return 0;
1322 
1323 done:
1324 	if (conv) {
1325 		ND("scale is %s %lf", conv->s, conv->m);
1326 		d *= conv->m; /* apply default conversion */
1327 	}
1328 	ND("returning %lf", d);
1329 	return d;
1330 }
1331 
1332 #define U_PARSE_ERR ~(0ULL)
1333 
1334 /* returns a value in nanoseconds */
1335 static uint64_t
1336 parse_time(const char *arg)
1337 {
1338     struct _sm a[] = {
1339 	{"", 1000000000 /* seconds */},
1340 	{"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1341 	{"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1342 	{NULL, 0 /* seconds */}
1343     };
1344     int err;
1345     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1346     return err ? U_PARSE_ERR : ret;
1347 }
1348 
1349 
1350 /*
1351  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1352  */
1353 static uint64_t
1354 parse_bw(const char *arg)
1355 {
1356     struct _sm a[] = {
1357 	{"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1358     };
1359     int err;
1360     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1361     return err ? U_PARSE_ERR : ret;
1362 }
1363 
1364 
1365 /*
1366  * For some function we need random bits.
1367  * This is a wrapper to whatever function you want that returns
1368  * 24 useful random bits.
1369  */
1370 
1371 static inline uint64_t
1372 my_random24(void)	/* 24 useful bits */
1373 {
1374 	return random() & ((1<<24) - 1);
1375 }
1376 
1377 
1378 /*-------------- user-configuration -----------------*/
1379 
1380 #if 0 /* start of comment block */
1381 
1382 Here we place the functions to implement the various features
1383 of the system. For each feature one should define a struct _cfg
1384 (see at the beginning for definition) that refers a *_parse() function
1385 to extract values from the command line, and a *_run() function
1386 that is invoked on each packet to implement the desired function.
1387 
1388 Examples of the two functions are below. In general:
1389 
1390 - the *_parse() function takes argc/argv[], matches the function
1391   name in argv[0], extracts the operating parameters, allocates memory
1392   if needed, and stores them in the struct _cfg.
1393   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1394   error in the arguments, 0 if all ok.
1395 
1396   On the command line, argv[] is a single, comma separated argument
1397   that follow the specific option eg -D constant,20ms
1398 
1399   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1400   function can use that without having to allocate memory.
1401 
1402 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1403   *q contains all the informatio that may be possibly needed, including
1404   those on the packet currently under processing.
1405   The basic values are the following:
1406 
1407 	char *	 cur_pkt 	points to the current packet (linear buffer)
1408 	uint32_t cur_len;	length of the current packet
1409 		the functions are not supposed to modify these values
1410 
1411 	int	 cur_drop;	true if current packet must be dropped.
1412 		Must be set to non-zero by the loss emulation function
1413 
1414 	uint64_t cur_delay;	delay in nanoseconds for the current packet
1415 		Must be set by the delay emulation function
1416 
1417    More sophisticated functions may need to access other fields in *q,
1418    see the structure description for that.
1419 
1420 When implementing a new function for a feature (e.g. for delay,
1421 bandwidth, loss...) the struct _cfg should be added to the array
1422 that contains all possible options.
1423 
1424 		--- Specific notes ---
1425 
1426 DELAY emulation		-D option_arguments
1427 
1428     If the option is not supplied, the system applies 0 extra delay
1429 
1430     The resolution for times is 1ns, the precision is load dependent and
1431     generally in the order of 20-50us.
1432     Times are in nanoseconds, can be followed by a character specifying
1433     a different unit e.g.
1434 
1435 	n	nanoseconds
1436 	u	microseconds
1437 	m	milliseconds
1438 	s	seconds
1439 
1440     Currently implemented options:
1441 
1442     constant,t		constant delay equal to t
1443 
1444     uniform,tmin,tmax	uniform delay between tmin and tmax
1445 
1446     exp,tavg,tmin	exponential distribution with average tavg
1447 			and minimum tmin (corresponds to an exponential
1448 			distribution with argument 1/(tavg-tmin) )
1449 
1450 
1451 LOSS emulation		-L option_arguments
1452 
1453     Loss is expressed as packet or bit error rate, which is an absolute
1454     number between 0 and 1 (typically small).
1455 
1456     Currently implemented options
1457 
1458     plr,p		uniform packet loss rate p, independent
1459 			of packet size
1460 
1461     burst,p,lmin,lmax 	burst loss with burst probability p and
1462 			burst length uniformly distributed between
1463 			lmin and lmax
1464 
1465     ber,p		uniformly distributed bit error rate p,
1466 			so actual loss prob. depends on size.
1467 
1468 BANDWIDTH emulation	-B option_arguments
1469 
1470     Bandwidths are expressed in bits per second, can be followed by a
1471     character specifying a different unit e.g.
1472 
1473 	b/B	bits per second
1474 	k/K	kbits/s (10^3 bits/s)
1475 	m/M	mbits/s (10^6 bits/s)
1476 	g/G	gbits/s (10^9 bits/s)
1477 
1478     Currently implemented options
1479 
1480     const,b		constant bw, excluding mac framing
1481     ether,b		constant bw, including ethernet framing
1482 			(20 bytes framing + 4 bytes crc)
1483     real,[scale]	use real time, optionally with a scaling factor
1484 
1485 #endif /* end of comment block */
1486 
1487 /*
1488  * Configuration options for delay
1489  */
1490 
1491 /* constant delay, also accepts just a number */
1492 static int
1493 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1494 {
1495 	uint64_t delay;
1496 
1497 	(void)q;
1498 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1499 		return 2; /* unrecognised */
1500 	if (ac > 2)
1501 		return 1; /* error */
1502 	delay = parse_time(av[ac - 1]);
1503 	if (delay == U_PARSE_ERR)
1504 		return 1; /* error */
1505 	dst->d[0] = delay;
1506 	return 0;	/* success */
1507 }
1508 
1509 /* runtime function, store the delay into q->cur_delay */
1510 static int
1511 const_delay_run(struct _qs *q, struct _cfg *arg)
1512 {
1513 	q->cur_delay = arg->d[0]; /* the delay */
1514 	return 0;
1515 }
1516 
1517 static int
1518 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1519 {
1520 	uint64_t dmin, dmax;
1521 
1522 	(void)q;
1523 	if (strcmp(av[0], "uniform") != 0)
1524 		return 2; /* not recognised */
1525 	if (ac != 3)
1526 		return 1; /* error */
1527 	dmin = parse_time(av[1]);
1528 	dmax = parse_time(av[2]);
1529 	if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1530 		return 1;
1531 	D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1532 	dst->d[0] = dmin;
1533 	dst->d[1] = dmax;
1534 	dst->d[2] = dmax - dmin;
1535 	return 0;
1536 }
1537 
1538 static int
1539 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1540 {
1541 	uint64_t x = my_random24();
1542 	q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1543 #if 0 /* COMPUTE_STATS */
1544 #endif /* COMPUTE_STATS */
1545 	return 0;
1546 }
1547 
1548 /*
1549  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1550  * gives a delay between 0 and infinity with average d_av
1551  * The cumulative function is 1 - d_av exp(-x/d_av)
1552  *
1553  * The inverse function generates a uniform random number p in 0..1
1554  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1555  *
1556  * To speed up behaviour at runtime we tabulate the values
1557  */
1558 
1559 static int
1560 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1561 {
1562 #define	PTS_D_EXP	512
1563 	uint64_t i, d_av, d_min, *t; /*table of values */
1564 
1565         (void)q;
1566         if (strcmp(av[0], "exp") != 0)
1567 		return 2; /* not recognised */
1568         if (ac != 3)
1569                 return 1; /* error */
1570         d_av = parse_time(av[1]);
1571         d_min = parse_time(av[2]);
1572         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1573                 return 1; /* error */
1574 	d_av -= d_min;
1575 	dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1576 	dst->arg = calloc(1, dst->arg_len);
1577 	if (dst->arg == NULL)
1578 		return 1; /* no memory */
1579 	t = (uint64_t *)dst->arg;
1580 	/* tabulate -ln(1-n)*delay  for n in 0..1 */
1581 	for (i = 0; i < PTS_D_EXP; i++) {
1582 		double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1583 		t[i] = (uint64_t)d;
1584 		ND(5, "%ld: %le", i, d);
1585 	}
1586         return 0;
1587 }
1588 
1589 static int
1590 exp_delay_run(struct _qs *q, struct _cfg *arg)
1591 {
1592 	uint64_t *t = (uint64_t *)arg->arg;
1593         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1594 	RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1595         return 0;
1596 }
1597 
1598 
1599 /* unused arguments in configuration */
1600 #define TLEM_CFG_END	NULL, 0, {0}, {0}
1601 
1602 static struct _cfg delay_cfg[] = {
1603 	{ const_delay_parse, const_delay_run,
1604 		"constant,delay", TLEM_CFG_END },
1605 	{ uniform_delay_parse, uniform_delay_run,
1606 		"uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1607 	{ exp_delay_parse, exp_delay_run,
1608 		"exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1609 	{ NULL, NULL, NULL, TLEM_CFG_END }
1610 };
1611 
1612 /* standard bandwidth, also accepts just a number */
1613 static int
1614 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1615 {
1616 	uint64_t bw;
1617 
1618 	(void)q;
1619 	if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1620 		return 2; /* unrecognised */
1621 	if (ac > 2)
1622 		return 1; /* error */
1623 	bw = parse_bw(av[ac - 1]);
1624 	if (bw == U_PARSE_ERR) {
1625 		return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1626 	}
1627 	dst->d[0] = bw;
1628 	return 0;	/* success */
1629 }
1630 
1631 
1632 /* runtime function, store the delay into q->cur_delay */
1633 static int
1634 const_bw_run(struct _qs *q, struct _cfg *arg)
1635 {
1636 	uint64_t bps = arg->d[0];
1637 	q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1638 	return 0;
1639 }
1640 
1641 /* ethernet bandwidth, add 672 bits per packet */
1642 static int
1643 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1644 {
1645 	uint64_t bw;
1646 
1647 	(void)q;
1648 	if (strcmp(av[0], "ether") != 0)
1649 		return 2; /* unrecognised */
1650 	if (ac != 2)
1651 		return 1; /* error */
1652 	bw = parse_bw(av[ac - 1]);
1653 	if (bw == U_PARSE_ERR)
1654 		return 1; /* error */
1655 	dst->d[0] = bw;
1656 	return 0;	/* success */
1657 }
1658 
1659 
1660 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1661 static int
1662 ether_bw_run(struct _qs *q, struct _cfg *arg)
1663 {
1664 	uint64_t bps = arg->d[0];
1665 	q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1666 	return 0;
1667 }
1668 
1669 /* real bandwidth, plus scaling factor */
1670 static int
1671 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1672 {
1673 	double scale;
1674 
1675 	(void)q;
1676 	if (strcmp(av[0], "real") != 0)
1677 		return 2; /* unrecognised */
1678 	if (ac > 2) { /* second argument is optional */
1679 		return 1; /* error */
1680 	} else if (ac == 1) {
1681 		scale = 1;
1682 	} else {
1683 		int err = 0;
1684 		scale = parse_gen(av[ac-1], NULL, &err);
1685 		if (err || scale <= 0 || scale > 1000)
1686 			return 1;
1687 	}
1688 	ED("real -> scale is %.6f", scale);
1689 	dst->f[0] = scale;
1690 	return 0;	/* success */
1691 }
1692 
1693 static int
1694 real_bw_run(struct _qs *q, struct _cfg *arg)
1695 {
1696 	q->cur_tt /= arg->f[0];
1697 	return 0;
1698 }
1699 
1700 static struct _cfg bw_cfg[] = {
1701 	{ const_bw_parse, const_bw_run,
1702 		"constant,bps", TLEM_CFG_END },
1703 	{ ether_bw_parse, ether_bw_run,
1704 		"ether,bps", TLEM_CFG_END },
1705 	{ real_bw_parse, real_bw_run,
1706 		"real,scale", TLEM_CFG_END },
1707 	{ NULL, NULL, NULL, TLEM_CFG_END }
1708 };
1709 
1710 /*
1711  * loss patterns
1712  */
1713 static int
1714 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1715 {
1716 	double plr;
1717 	int err;
1718 
1719 	(void)q;
1720 	if (strcmp(av[0], "plr") != 0 && ac > 1)
1721 		return 2; /* unrecognised */
1722 	if (ac > 2)
1723 		return 1; /* error */
1724 	// XXX to be completed
1725 	plr = parse_gen(av[ac-1], NULL, &err);
1726 	if (err || plr < 0 || plr > 1)
1727 		return 1;
1728 	dst->d[0] = plr * (1<<24); /* scale is 16m */
1729 	if (plr != 0 && dst->d[0] == 0)
1730 		ED("WWW warning,  rounding %le down to 0", plr);
1731 	return 0;	/* success */
1732 }
1733 
1734 static int
1735 const_plr_run(struct _qs *q, struct _cfg *arg)
1736 {
1737 	(void)arg;
1738 	uint64_t r = my_random24();
1739 	q->cur_drop = r < arg->d[0];
1740 #if 1	/* keep stats */
1741 	arg->d[1]++;
1742 	arg->d[2] += q->cur_drop;
1743 #endif
1744 	return 0;
1745 }
1746 
1747 
1748 /*
1749  * For BER the loss is 1- (1-ber)**bit_len
1750  * The linear approximation is only good for small values, so we
1751  * tabulate (1-ber)**len for various sizes in bytes
1752  */
1753 static int
1754 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1755 {
1756 	double ber, ber8, cur;
1757 	int i, err;
1758 	uint32_t *plr;
1759 	const uint32_t mask = (1<<24) - 1;
1760 
1761 	(void)q;
1762 	if (strcmp(av[0], "ber") != 0)
1763 		return 2; /* unrecognised */
1764 	if (ac != 2)
1765 		return 1; /* error */
1766 	ber = parse_gen(av[ac-1], NULL, &err);
1767 	if (err || ber < 0 || ber > 1)
1768 		return 1;
1769 	dst->arg_len = MAX_PKT * sizeof(uint32_t);
1770 	plr = calloc(1, dst->arg_len);
1771 	if (plr == NULL)
1772 		return 1; /* no memory */
1773 	dst->arg = plr;
1774 	ber8 = 1 - ber;
1775 	ber8 *= ber8; /* **2 */
1776 	ber8 *= ber8; /* **4 */
1777 	ber8 *= ber8; /* **8 */
1778 	cur = 1;
1779 	for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1780 		plr[i] = (mask + 1)*(1 - cur);
1781 		if (plr[i] > mask)
1782 			plr[i] = mask;
1783 #if 0
1784 		if (i>= 60) //  && plr[i] < mask/2)
1785 			RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1786 #endif
1787 	}
1788 	dst->d[0] = ber * (mask + 1);
1789 	return 0;	/* success */
1790 }
1791 
1792 static int
1793 const_ber_run(struct _qs *q, struct _cfg *arg)
1794 {
1795 	int l = q->cur_len;
1796 	uint64_t r = my_random24();
1797 	uint32_t *plr = arg->arg;
1798 
1799 	if (l >= MAX_PKT) {
1800 		RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1801 		l = MAX_PKT-1;
1802 	}
1803 	q->cur_drop = r < plr[l];
1804 #if 1	/* keep stats */
1805 	arg->d[1] += l * 8;
1806 	arg->d[2] += q->cur_drop;
1807 #endif
1808 	return 0;
1809 }
1810 
1811 static struct _cfg loss_cfg[] = {
1812 	{ const_plr_parse, const_plr_run,
1813 		"plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1814 	{ const_ber_parse, const_ber_run,
1815 		"ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1816 	{ NULL, NULL, NULL, TLEM_CFG_END }
1817 };
1818