1 /*
2  * reporting.c -- performance metrics reporting as in Internet Draft
3  *                draft-ietf-ippm-reporting.
4  *
5  * Written by Stanislav Shalunov, http://www.internet2.edu/~shalunov/
6  *            Bernhard Lutzmann, belu@users.sf.net
7  *            Federico Montesino Pouzols, fedemp@altern.org
8  *
9  * @(#) $Id: reporting.c,v 1.1.2.12 2006/08/20 18:06:19 fedemp Exp $
10  *
11  * Copyright 2003, 2006, Internet2.
12  * Legal conditions are in file LICENSE
13  * (MD5 = ecfa50d1b0bfbb81b658c810d0476a52).
14  */
15 
16 /**
17  * @file reporting.c
18  *
19  * @short metrics computation and reporting.
20  **/
21 
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 
26 #include <stdlib.h>
27 #ifdef HAVE_STDINT_H
28 #include <stdint.h>
29 #endif
30 #include <float.h>
31 #include <math.h>
32 #include <string.h>
33 #include "reporting.h"
34 #ifndef THRULAY_REPORTING_SAMPLE_LOOP
35 #include "assertd.h"
36 #else
37 #include <assert.h>
38 #endif
39 #include "rcs.h"
40 
41 RCS_ID("@(#) $Id: reporting.c,v 1.1.2.12 2006/08/20 18:06:19 fedemp Exp $")
42 
43 #define min(a, b)	((a) < (b) ? (a) : (b))
44 #define max(a, b)	((a) > (b) ? (a) : (b))
45 
46 /*
47  * Reordering.
48  */
49 #define loop(x)		((x) >= 0 ? (x) : (x) + (int)reordering_max)
50 
51 /*
52  * Duplication.
53  */
54 static uint64_t *bitfield = NULL;	/* Bit field used to check for
55 					   duplicated packets. */
56 
57 /*
58  * Reordering.
59  */
60 static uint64_t reordering_max; 	/* We have m[j-1] == number of */
61 static uint64_t *reordering_m;		/* We have m[j-1] == number of
62 					   j-reordered packets. */
63 static uint64_t *reordering_ring;	/* Last sequence numbers seen */
64 static int r = 0;			/* Ring pointer for next write. */
65 static int l = 0;			/* Counter of sequence numbers. */
66 
67 /*
68  * Quantiles.
69  */
70 static uint16_t quantile_max_seq;       /* Maximum number of sequences */
71 static int *quantile_k;		/* number of elements in buffer */
72 
73 static double **quantile_input;	/* This is the buffer where the
74 				   sequence of incoming packets is saved.
75 				   If we received enough packets, we will
76 				   write this buffer to a quantile buffer. */
77 static int *quantile_input_cnt;   /* number of elements in input buffer */
78 static int *quantile_empty_buffers;	/* number of empty buffers */
79 
80 static int *quantile_b;		/* number of buffers */
81 
82 static double **quantile_buf;
83 
84 static int *quantile_alternate;     /* this is used to determine the offset in
85 				   COLLAPSE (if weight is even) */
86 
87 static uint64_t *quantile_inf_cnt;	/* this counter is for the additional
88 				   -inf, +inf elements we added to NEW buffer
89 				   to fill it up. */
90 
91 typedef struct quantile {
92 	struct quantile *next;	/* pointer to next quantile buffer */
93 	int weight;		/* 0 if buffer is empty, > 0 if buffer is
94 				 * full */
95 	int level;
96 	double *buffer;
97 	int pos;		/* current position in buffer; used in
98 				   quantile_collapse() */
99 } quantile_t;
100 
101 static quantile_t **quantile_buffer_head;
102 
103 int
reordering_init(uint64_t max)104 reordering_init(uint64_t max)
105 {
106 	reordering_max = max;
107 	reordering_m = calloc(reordering_max, sizeof(uint64_t));
108 	reordering_ring = calloc(reordering_max, sizeof(uint64_t));
109 	if (reordering_m == NULL) {
110 		return -1;
111 	} else {
112 		return 0;
113 	}
114 }
115 
116 int
reordering_checkin(uint64_t packet_sqn)117 reordering_checkin(uint64_t packet_sqn)
118 {
119 	int j;
120 
121 	for (j = 0; j < min(l, (int)reordering_max) &&
122 
123 		     packet_sqn < reordering_ring[loop(r-j-1)]; j++) {
124 		reordering_m[j]++;
125 	}
126 	reordering_ring[r] = packet_sqn;
127 	l++;
128 	r = (r + 1) % reordering_max;
129 	return 0;
130 }
131 
132 double
reordering_output(uint64_t j)133 reordering_output(uint64_t j)
134 {
135 	if (j >= reordering_max)
136 		return -1;
137 	else
138 		return (double)reordering_m[j] / (l - j - 1);
139 }
140 
141 void
reordering_exit(void)142 reordering_exit(void)
143 {
144 	free(reordering_ring);
145 	free(reordering_m);
146 }
147 
148 int
duplication_init(uint64_t npackets)149 duplication_init(uint64_t npackets)
150 {
151 	uint64_t bitfield_len = 0;	/* Number of sectors in bitfield */
152 
153 	/* Allocate memory for bit field */
154 	bitfield_len = ((npackets % 64) ? (npackets / 64 + 1) : npackets / 64);
155 	bitfield = calloc(bitfield_len, sizeof(uint64_t));
156 	if (bitfield == NULL) {
157 		return -1;
158 	} else {
159 		return 0;
160 	}
161 }
162 
163 int
duplication_check(uint64_t packet_sqn)164 duplication_check(uint64_t packet_sqn)
165 {
166 	uint64_t bitfield_sec;		/* Which sector in bitfield */
167 	uint64_t bitfield_bit;		/* Which bit in sector */
168 
169 	/* Calculate sector */
170 	bitfield_sec = packet_sqn >> 6;
171 
172 	/* Calculate bit in sector */
173 	bitfield_bit = (uint64_t)1 << (packet_sqn & 63);
174 
175 	if (bitfield[bitfield_sec] & bitfield_bit) {
176 		/* Duplicated packet */
177 		return 1;
178 	} else {
179 		/* Unique packet */
180 		bitfield[bitfield_sec] |= bitfield_bit;
181 		return 0;
182 	}
183 }
184 
185 void
duplication_exit(void)186 duplication_exit(void)
187 {
188 	free(bitfield);
189 }
190 
191 /* Calculate binomial coefficient C(n, k). */
192 int64_t
binomial(int n,int k)193 binomial (int n, int k)
194 {
195 	int64_t bc = 0;
196 	int i, m;
197 
198 	/* C(n, k) = C(n, n-k) */
199 	k = min(k, n-k);
200 
201 	if (k >= 0) {
202 		bc = 1;
203 		m = max(k, n-k);
204 
205 		for (i = 1; i <= k; i++) {
206 			bc = (bc * (m + i)) / i;
207 		}
208 	}
209 
210 	return bc;
211 }
212 
213 int
quantile_compare(const void * d1,const void * d2)214 quantile_compare(const void *d1, const void *d2)
215 {
216 	if (*(double *)d1 < *(double *)d2) {
217 		return -1;
218 	} else if (*(double *)d1 == *(double *)d2) {
219 		return 0;
220 	} else {
221 		assert(*(double *)d1 > *(double *)d2);
222 		return 1;
223 	}
224 }
225 
226 void
quantile_sort(double * list,int length)227 quantile_sort (double *list, int length)
228 {
229 	qsort(list, length, sizeof(double), quantile_compare);
230 }
231 
232 /**
233  * Implementation of NEW operation from section 3.1 of paper [1].
234  *
235  * Takes as input an empty buffer. Simply populates the quantile buffer with
236  * the next k elements from the input sequence, labels the buffer as full, and
237  * assigns it a weight of 1.
238  *
239  * If there are not enough elements to fill up the buffer, we alternately add
240  * -inf, +inf elements until buffer is full (-inf == 0, +inf == DBL_MAX).
241  *
242  * NOTE: We sort the elements in the input buffer before we copy them to
243  * the quantile buffer.
244  *
245  * @param seq Sequence index.
246  *
247  * @return
248  * @retval 0 on success.
249  * @retval -2 need an empty buffer.
250  * @retval -3 bad input sequence length.
251  **/
252 int
quantile_new(uint16_t seq,quantile_t * q,double * input,int k,int level)253 quantile_new(uint16_t seq, quantile_t *q, double *input, int k, int level)
254 {
255 	int i;
256 
257 	/* Check that buffer is really empty. */
258 	if (q->weight != 0) {
259 		return -2;
260 	}
261 
262 	/* Sanity check for length of input sequence. */
263 	if (k > quantile_k[seq]) {
264 		return -3;
265 	}
266 
267 	/* If there are not enough elements in the input buffer, fill it up
268 	 * with -inf, +inf elements. */
269 	for (i = k; i < quantile_k[seq]; i++) {
270 		if (i % 2) {
271 			input[i] = DBL_MAX;
272 		} else {
273 			input[i] = 0;
274 		}
275 
276 		/* Increment counter that indicates how many additional
277 		 * elements we added to fill the buffer. */
278 		quantile_inf_cnt[seq]++;
279 	}
280 
281 	quantile_sort(input, quantile_k[seq]);
282 
283 	memcpy(q->buffer, input, sizeof(double) * quantile_k[seq]);
284 
285 	/* Mark buffer as full and set level. */
286 	q->weight = 1;
287 	q->level = level;
288 
289 	/* Update number of empty quantile buffers. */
290 	quantile_empty_buffers[seq]--;
291 
292 	return 0;
293 }
294 
295 
296 /* Implementation of COLLAPSE operation from section 3.2 of paper [1].
297  *
298  * This is called from quantile_algorithm() if there are no empty buffers.
299  * We COLLAPSE all the full buffers, where level has value `level'.
300  * Output is written to the first buffer in linked list with level set to
301  * `level'. The level of the output buffer is increased by 1.
302  * All other buffers we used in the COLLAPSE are marked empty. */
303 int
quantile_collapse(uint16_t seq,int level)304 quantile_collapse(uint16_t seq, int level)
305 {
306 	quantile_t *qp = NULL, *qp_out = NULL;
307 	int num_buffers = 0;	/* number of buffers with level `level' */
308 	int weight = 0;		/* weight of the output buffer */
309 	int offset;
310 	int i, j;
311 	double min_dbl;
312 	long next_pos;
313 	long merge_pos = 0;
314 
315 	/* Check that there are at least two full buffers with given level.
316 	 * Also calculate weight of output buffer. */
317 	for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
318 		if ((qp->weight != 0) && (qp->level == level)) {
319 			num_buffers++;
320 			weight += qp->weight;
321 			qp->pos = 0;
322 		} else {
323 			/* We mark the buffers that are not used in this
324 			 * COLLAPSE. */
325 			qp->pos = -1;
326 		}
327 	}
328 	if (num_buffers < 2) {
329 		return -4;
330 	}
331 
332 	/* NOTE: The elements in full buffers are sorted. So we don't have to
333 	 * do that again.
334 	 */
335 	/* Search for first full buffer with matching level. This is the buffer
336 	 * where we save the output. */
337 	for (qp_out = quantile_buffer_head[seq]; qp_out != NULL;
338 			qp_out = qp_out->next) {
339 		if (qp_out->pos != -1) {
340 			break;
341 		}
342 	}
343 
344 	/* Calculate offset */
345 	if (weight % 2) {
346 		/* odd */
347 		offset = (weight + 1) / 2;
348 	} else {
349 		/* even - we alternate between two choices in each COLLAPSE */
350 		if (quantile_alternate[seq] % 2) {
351 			offset = weight / 2;
352 		} else {
353 			offset = (weight + 2)/ 2;
354 		}
355 		quantile_alternate[seq] = (quantile_alternate[seq] + 1) % 2;
356 	}
357 
358 	/* Initialize next position of element to save. Because first position
359 	 * is at 0, we have to decrement offset by 1. */
360 	next_pos = offset - 1;
361 
362 	for (i = 0; i < quantile_k[seq]; ) {
363 
364 		/* Search for current minimal element in all buffers.
365 		 * Because buffers are all sorted, we just have to check the
366 		 * element at current position. */
367 		min_dbl = DBL_MAX;
368 		for (qp = quantile_buffer_head[seq]; qp != NULL;
369 		     qp = qp->next) {
370 			/* Skip wrong buffers. */
371 			if (qp->pos == -1) {
372 				continue;
373 			}
374 
375 			/* Check that we are not at the end of buffer. */
376 			if (qp->pos >= quantile_k[seq]) {
377 				continue;
378 			}
379 
380 			/* Update minimum element. */
381 			min_dbl = min(min_dbl, qp->buffer[qp->pos]);
382 		}
383 
384 		/* Now process this minimal element in all buffers. */
385 		for (qp = quantile_buffer_head[seq]; qp != NULL;
386 		     qp = qp->next) {
387 			/* Skip wrong buffers. */
388 			if (qp->pos == -1) {
389 				continue;
390 			}
391 
392 			/* Now process minimal element in this buffer. */
393 			for (; (qp->buffer[qp->pos] == min_dbl) &&
394 					(qp->pos < quantile_k[seq]);
395 			     qp->pos++) {
396 
397 				/* We run this loop `qp->weight' times.
398 				 * We check there if we are in a position
399 				 * so we have to save this element in our
400 				 * output buffer. */
401 				for (j = 0; j < qp->weight; j++) {
402 
403 					if (next_pos == merge_pos) {
404 						quantile_buf[seq][i] = min_dbl;
405 						i++;
406 
407 						if (i == quantile_k[seq]) {
408 							/* We have written
409 							 * all elements to
410 							 * output buffer, so
411 							 * exit global loop. */
412 							goto out;
413 						}
414 
415 						/* Update next position. */
416 						next_pos += weight;
417 					}
418 
419 					merge_pos++;
420 				} /* for(j = 0; j < qp->weight; j++) */
421 			} /* for (; (qp->buffer[qp->pos] == min_dbl) &&
422 			     (qp->pos < quantile_k[seq]); qp->pos++) */
423 		} /* for (qp = quantile_buffer_head[seq]; qp!=NULL;
424 		     qp = qp->next) */
425 	} /* for (i = 0; i < quantile_k[seq]; ) */
426 
427 out:
428 	memcpy(qp_out->buffer, quantile_buf[seq],
429 	       sizeof(double) * quantile_k[seq]);
430 
431 	/* Update weight of output buffer. */
432 	qp_out->weight = weight;
433 	qp_out->level = level+1;
434 
435 	/* Update list of empty buffers. */
436 	for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
437 		if ((qp->pos != -1) && (qp != qp_out)) {
438 			qp->weight = 0;
439 			qp->level = 0;
440 		}
441 	}
442 	quantile_empty_buffers[seq] += num_buffers - 1;
443 	return 0;
444 }
445 
446 /**
447  * Implementation of COLLAPSE policies from section 3.4 of paper [1].
448  *
449  * There are three different algorithms noted in the paper. We use the
450  * "New Algorithm".
451  *
452  * @param seq Sequence index.
453  *
454  * @return
455  * @retval 0 on success.
456  * @retval -1 quantiles not initialized.
457  * @retval -2 need an empty buffer for new operation.
458  * @retval -3 bad input sequence length in new operation.
459  * @retval -4 not enough buffers for collapse operation.
460  **/
461 int
quantile_algorithm(uint16_t seq,double * input,int k)462 quantile_algorithm (uint16_t seq, double *input, int k)
463 {
464 	int rc;
465 	quantile_t *qp = NULL, *qp2 = NULL;
466 	int min_level = -1;
467 
468 	/* This should always be true. */
469 	if (quantile_buffer_head[seq] != NULL) {
470 		min_level = quantile_buffer_head[seq]->level;
471 	} else {
472 		return -1;
473 	}
474 
475 	/* Get minimum level of all currently full buffers. */
476 	for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
477 		if (qp->weight != 0) {
478 			/* Full buffer. */
479 			min_level = min(min_level, qp->level);
480 		}
481 	}
482 
483 	if (quantile_empty_buffers[seq] == 0) {
484 		/* There are no empty buffers. Invoke COLLAPSE on the set
485 		 * of buffers with minimum level. */
486 
487 		rc = quantile_collapse(seq, min_level);
488 		if (rc < 0)
489 			return rc;
490 	} else if (quantile_empty_buffers[seq] == 1) {
491 		/* We have exactly one empty buffer. Invoke NEW and assign
492 		 * it level `min_level'. */
493 
494 		/* Search the empty buffer. */
495 		for (qp = quantile_buffer_head[seq]; qp != NULL;
496 		     qp = qp->next) {
497 			if (qp->weight == 0) {
498 				/* Found empty buffer. */
499 				break;
500 			}
501 		}
502 
503 		rc = quantile_new(seq, qp, input, k, min_level);
504 		if (rc < 0)
505 			return rc;
506 	} else {
507 		/* There are at least two empty buffers. Invoke NEW on each
508 		 * and assign level `0' to each. */
509 
510 		/* Search for two empty buffers. */
511 		for (qp = quantile_buffer_head[seq]; qp != NULL;
512 		     qp = qp->next) {
513 			if (qp->weight == 0) {
514 				/* Found first empty buffer. */
515 				break;
516 			}
517 		}
518 		for (qp2 = qp->next; qp2 != NULL; qp2 = qp2->next) {
519 			if (qp2->weight == 0) {
520 				/* Found second empty buffer. */
521 				break;
522 			}
523 		}
524 
525 		if (k <= quantile_k[seq]) {
526 			/* This could happen if we call this after we
527 			 * received all packets but don't have enough to
528 			 * fill up two buffers. */
529 
530 			rc = quantile_new(seq, qp, input, k, 0);
531 			if (rc < 0)
532 				return rc;
533 		} else {
534 			/* We have enough input data for two buffers. */
535 			rc = quantile_new(seq, qp, input, quantile_k[seq], 0);
536 			if (rc < 0)
537 				return rc;
538 			rc = quantile_new(seq, qp2, input + quantile_k[seq],
539 					  k - quantile_k[seq], 0);
540 			if (rc < 0)
541 				return rc;
542 		}
543 	}
544 	return 0;
545 }
546 
547 int
quantile_init_seq(uint16_t seq)548 quantile_init_seq(uint16_t seq)
549 {
550 	quantile_t *qp = NULL;
551 	int i;
552 
553 	if ( seq >= quantile_max_seq)
554 		return -5;
555 
556 	/* Allocate memory for quantile buffers. Buffers are linked lists
557 	 * with a pointer to next buffer.
558 	 * We need `quantile_b' buffers, where each buffer has space for
559 	 * `quantile_k' elements. */
560 	for (i = 0; i < quantile_b[seq]; i++) {
561 		if (i == 0) {
562 			/* Initialize first buffer. */
563 			qp = malloc(sizeof(quantile_t));
564 			if (qp == NULL) {
565 				return -1;
566 			}
567 			quantile_buffer_head[seq] = qp;
568 		} else {
569 			qp->next = malloc(sizeof(quantile_t));
570 			if (qp->next == NULL) {
571 				return -1;
572 			}
573 			qp = qp->next;
574 		}
575 
576 		/* `qp' points to buffer that should be initialized. */
577 		qp->next = NULL;
578 		qp->weight = 0;	/* empty buffers have weight of 0 */
579 		qp->level = 0;
580 		qp->buffer = malloc(sizeof(double) * quantile_k[seq]);
581 		if (qp->buffer == NULL) {
582 			return -1;
583 		}
584 	}
585 	/* Update number of empty quantile buffers. */
586 	quantile_empty_buffers[seq] = quantile_b[seq];
587 
588 	return 0;
589 }
590 
591 int
quantile_init(uint16_t max_seq,double eps,uint64_t N)592 quantile_init (uint16_t max_seq, double eps, uint64_t N)
593 {
594 	int b, b_tmp = 0;
595 	int k, k_tmp = 0;
596 	int h, h_max = 0;
597 	int seq, rc;
598 
599 	quantile_max_seq = max_seq;
600 	/* Allocate array for the requested number of sequences. */
601 	quantile_k = calloc(max_seq, sizeof(int));
602 	quantile_input = calloc(max_seq, sizeof(double*));
603 	quantile_input_cnt = calloc(max_seq, sizeof(int));
604 	quantile_empty_buffers = calloc(max_seq, sizeof(int));
605 	quantile_b = calloc(max_seq, sizeof(int));
606 	quantile_buf = calloc(max_seq, sizeof(double*));
607 	quantile_alternate = calloc(max_seq, sizeof(int));
608 	quantile_inf_cnt = calloc(max_seq, sizeof(uint64_t));
609 	quantile_buffer_head = calloc(max_seq, sizeof(quantile_t*));
610 
611 	/* "In practice, optimal values for b and k can be computed by trying
612 	 * out different values of b in the range 1 and 30." */
613 	for (b = 2; b <= 30; b++) {
614 
615 		/* For each b, compute the largest integral h that satisfies:
616 		 *   (h-2) * C(b+h-2, h-1) - C(b+h-3, h-3) +
617 		 *                                 C(b+h-3, h-2) <= 2 * eps * N
618 		 */
619 		for (h = 0; ; h++) {
620 			if (((h-2) * binomial(b+h-2, h-1) -
621 						binomial(b+h-3, h-3) +
622 						binomial(b+h-3, h-2)) >
623 					(2 * eps * N)) {
624 				/* This h does not satisfy the inequality from
625 				 * above. */
626 				break;
627 			}
628 			h_max = h;
629 		}
630 
631 		/* Now compute the smallest integral k that satisfies:
632 		 *   k * C(b+h-2, h-1) => N. */
633 		k = ceil(N / (double)binomial(b+h_max-2, h_max-1));
634 
635 		/* Identify that b that minimizes b*k. */
636 		if ((b_tmp == 0) && (k_tmp == 0)) {
637 			/* Initialize values */
638 			b_tmp = b;
639 			k_tmp = k;
640 		}
641 
642 		if ((b * k) < (b_tmp * k_tmp)) {
643 			/* Found b and k for which the product is smaller
644 			 * than for the ones before. Because we want to
645 			 * minimize b*k (required memory), we save them. */
646 			b_tmp = b;
647 			k_tmp = k;
648 		}
649 	}
650 
651 	/* Set global quantile values. For now, all sequences share
652 	   the same k and b values.*/
653 	for (seq = 0; seq < max_seq; seq++ ) {
654 		quantile_b[seq] = b_tmp;
655 		quantile_k[seq] = k_tmp;
656 	}
657 
658 	/* Allocate memory for input buffer.
659 	 * We allocate enough space to save up to `2 * quantile_k' elements.
660 	 * This space is needed in the COLLAPSE policy if there are more
661 	 * than two empty buffers. Because then we have to invoke NEW on two
662 	 * buffers and thus need an input buffer with `2 * quantile_k'
663 	 * elements. */
664 	for (seq = 0; seq < quantile_max_seq; seq++) {
665 		quantile_input[seq] = malloc(sizeof(double) * 2 *
666 					     quantile_k[seq]);
667 		if (quantile_input[seq] == NULL) {
668 			return -1;
669 		}
670 		quantile_input_cnt[seq] = 0;
671 	}
672 
673 	/* Allocate memory for output buffer.
674 	 * This buffer is used in COLLAPSE to store temporary output buffer
675 	 * before it gets copied to one of the buffers used in COLLAPSE. */
676 	for (seq = 0; seq < quantile_max_seq; seq++ ) {
677 		quantile_buf[seq] = malloc(sizeof(double) * quantile_k[seq]);
678 		if (quantile_buf[seq] == NULL) {
679 			return -1;
680 		}
681 	}
682 
683 	for (seq = 0; seq < max_seq; seq++) {
684 		rc = quantile_init_seq(seq);
685 		if (rc < 0)
686 			return rc;
687 	}
688 
689 	return 0;
690 }
691 
692 int
quantile_value_checkin(uint16_t seq,double value)693 quantile_value_checkin(uint16_t seq, double value)
694 {
695 	int rc = 0;
696 
697 	if ( seq >= quantile_max_seq)
698 		return -5;
699 
700 	quantile_input[seq][quantile_input_cnt[seq]++] = value;
701 
702 	/* If we have at least two empty buffers,
703 	 * we need input for two buffers, to twice
704 	 * the value of `quantile_k'. */
705 	if (quantile_empty_buffers[seq] >= 2) {
706 		if (quantile_input_cnt[seq] ==
707 		    (2 * quantile_k[seq])) {
708 			rc = quantile_algorithm(seq, quantile_input[seq],
709 						quantile_input_cnt[seq]);
710 			/* Reset counter. */
711 			quantile_input_cnt[seq] = 0;
712 		}
713 	} else {
714 		/* There are 0 or 1 empty buffers */
715 		if (quantile_input_cnt[seq] == quantile_k[seq]) {
716 			rc = quantile_algorithm(seq, quantile_input[seq],
717 						quantile_input_cnt[seq]);
718 			/* Reset counter. */
719 			quantile_input_cnt[seq] = 0;
720 		}
721 	}
722 	return rc;
723 }
724 
725 int
quantile_finish(uint16_t seq)726 quantile_finish(uint16_t seq)
727 {
728 	int rc = 0;
729 
730 	if ( seq >= quantile_max_seq)
731 		return -5;
732 
733 	if (quantile_input_cnt[seq] > 0) {
734 		rc = quantile_algorithm(seq, quantile_input[seq],
735 					quantile_input_cnt[seq]);
736 	}
737 	return rc;
738 }
739 
740 void
quantile_reset(uint16_t seq)741 quantile_reset(uint16_t seq)
742 {
743 	quantile_input_cnt[seq] = 0;
744 	quantile_empty_buffers[seq] = quantile_b[seq];
745 	memset(quantile_buf[seq],0,sizeof(double) * quantile_k[seq]);
746 	memset(quantile_input[seq],0,sizeof(double) * quantile_k[seq]);
747 }
748 
749 /**
750  * Deinitialize one quantile sequence.
751  **/
752 void
quantile_exit_seq(uint16_t seq)753 quantile_exit_seq(uint16_t seq)
754 {
755 	quantile_t *qp = NULL, *next;
756 
757 	if (seq >= quantile_max_seq)
758 		return;
759 
760 	qp = quantile_buffer_head[seq];
761 	while (qp != NULL) {
762 		/* Save pointer to next buffer. */
763 		next = qp->next;
764 
765 		/* Free buffer and list entry. */
766 		free(qp->buffer);
767 		free(qp);
768 
769 		/* Set current buffer to next one. */
770 		qp = next;
771 	}
772 
773 	quantile_buffer_head[seq] = NULL;
774 	quantile_input_cnt[seq] = 0;
775 	quantile_empty_buffers[seq] = quantile_b[seq];
776 }
777 
778 void
quantile_exit(void)779 quantile_exit(void)
780 {
781 	int seq;
782 
783 	/* Free per sequence structures */
784 	for (seq = 0; seq < quantile_max_seq; seq++) {
785 		quantile_exit_seq(seq);
786 
787 		/* Free output buffer. */
788 		free(quantile_buf[seq]);
789 
790 		/* Free input buffer. */
791 		free(quantile_input[seq]);
792 	}
793 
794 	free(quantile_buffer_head);
795 	free(quantile_inf_cnt);
796 	free(quantile_alternate);
797 	free(quantile_buf);
798 	free(quantile_b);
799 	free(quantile_empty_buffers);
800 	free(quantile_input_cnt);
801 	free(quantile_input);
802 	free(quantile_k);
803 }
804 
805 int
quantile_output(uint16_t seq,uint64_t npackets,double phi,double * result)806 quantile_output (uint16_t seq, uint64_t npackets, double phi, double *result)
807 {
808 	quantile_t *qp = NULL;
809 	int num_buffers = 0;
810 	int weight = 0;
811 	int j;
812 	long next_pos = 0;
813 	long merge_pos = 0;
814 	double min_dbl;
815 	double beta;
816 	double phi2;		/* this is phi' */
817 
818 	if ( seq >= quantile_max_seq)
819 		return -5;
820 
821 	/* Check that there are at least two full buffers with given level. */
822 	for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
823 		if (qp->weight != 0) {
824 			num_buffers++;
825 			weight += qp->weight;
826 			qp->pos = 0;
827 		} else {
828 			qp->pos = -1;
829 		}
830 	}
831 	if (num_buffers < 2) {
832 		/* XXX: In section 3.3 "OUTPUT operation" of paper [1] is says
833 		 * that OUTPUT takes c => 2 full input buffers. But what if we
834 		 * just have one full input buffer?
835 		 *
836 		 * For example this happens if you run a UDP test with a
837 		 * block size of 100k and a test duration of 3 seconds:
838 		 * $ ./thrulay -u 100k -t 3 localhost
839 		 */
840 
841 		if (num_buffers != 1) {
842 			return -1;
843 		}
844 	}
845 
846 	/* Calculate beta and phi' */
847 	beta = 1 + quantile_inf_cnt[seq] / (double)npackets;
848 	assert(beta >= 1.0);
849 
850 	assert(phi >= 0.0 && phi <= 1.0);
851 	phi2 = (2 * phi + beta - 1) / (2 * beta);
852 
853 	next_pos = ceil(phi2 * quantile_k[seq] * weight);
854 
855 	/* XXX: If the client just sends a few packets, it is possible that
856 	 * next_pos is too large. If this is the case, decrease it. */
857 	if (next_pos >= (num_buffers * quantile_k[seq])) {
858 		next_pos --;
859 	}
860 
861 	while (1) {
862 
863 		/* Search for current minimal element in all buffers.
864 		 * Because buffers are all sorted, we just have to check the
865 		 * element at current position. */
866 		min_dbl = DBL_MAX;
867 		for (qp = quantile_buffer_head[seq]; qp != NULL;
868 		     qp = qp->next) {
869 			/* Skip wrong buffers. */
870 			if (qp->pos == -1) {
871 				continue;
872 			}
873 
874 			/* Check that we are not at the end of buffer. */
875 			if (qp->pos >= quantile_k[seq]) {
876 				continue;
877 			}
878 
879 			/* Update minimum element. */
880 			min_dbl = min(min_dbl, qp->buffer[qp->pos]);
881 		}
882 
883 		/* Now process this minimal element in all buffers. */
884 		for (qp = quantile_buffer_head[seq]; qp != NULL;
885 		     qp = qp->next) {
886 			/* Skip wrong buffers. */
887 			if (qp->pos == -1) {
888 				continue;
889 			}
890 
891 			/* Now process minimal element in this buffer. */
892 			for (; (qp->buffer[qp->pos] == min_dbl) &&
893 					(qp->pos < quantile_k[seq]);
894 			     qp->pos++) {
895 
896 				/* Increment merge position `qp->weight'
897 				 * times. If we pass the position we seek,
898 				 * return current minimal element. */
899 				for (j = 0; j < qp->weight; j++) {
900 					if (next_pos == merge_pos) {
901 						*result = min_dbl;
902 						return 0;
903 					}
904 					merge_pos++;
905 				}
906 			}
907 		}
908 	}
909 
910 	/* NOTREACHED */
911 }
912 
913 #ifdef THRULAY_REPORTING_SAMPLE_LOOP
914 
915 #include <stdio.h>
916 #include <strings.h>
917 
918 #ifndef NAN
919 #define _ISOC99_SOURCE
920 #include <math.h>
921 #endif
922 
923 #define ERR_FATAL       0
924 #define ERR_WARNING     1
925 
926 void __attribute__((noreturn))
quantile_alg_error(int rc)927 quantile_alg_error(int rc)
928 {
929 	switch (rc) {
930 	case -1:
931 		fprintf(stderr, "Error: quantiles not initialized.");
932 		break;
933 	case -2:
934 		fprintf(stderr, "Error: NEW needs an empty buffer.");
935 		break;
936 	case -3:
937 		fprintf(stderr, "Error: Bad input sequence length.");
938 		break;
939 	case -4:
940 		fprintf(stderr, "Error: Not enough buffers for COLLAPSE.");
941 		break;
942 	default:
943 		fprintf(stderr, "Error: Unknown quantile_algorithm error.");
944 	}
945 	exit(1);
946 }
947 
948 /**
949  * Will read a sample data file (first and only parameter) whose lines
950  * give two values per line (per received packet): measured packet
951  * delay and packet sequence number (in "%lf %lu" format). As an
952  * exception, the first line specifies the number of packets actually
953  * sent. Example:
954  * ----
955  * 10
956  * 0.101 0
957  * 0.109 1
958  * 0.12 1
959  * 0.10 3
960  * 0.14 4
961  * 0.15 5
962  * 0.13 2
963  * 0.09 6
964  * 0.1 8
965  * 0.091 7
966  * ----
967  *
968  * To compile this sample reporting main(), the following symbols
969  * should be defined: HAVE_CONFIG_H and
970  * HAVE_THRULAY_REPORTING_SAMPLE_LOOP. '..' should be specified as
971  * include directory, and libm should be linked as well. For instance:
972  *
973  * gcc -std=c99 -Wno-long-long -Wall -pedantic -W -Wpointer-arith -Wnested-externs -DHAVE_CONFIG_H -DTHRULAY_REPORTING_SAMPLE_LOOP -I../ -o reporting reporting.c -lm
974  *
975  **/
976 int
main(int argc,char * argv[])977 main(int argc, char *argv[])
978 {
979 	FILE *sf;
980 	/* 'Measured data' */
981 	const int max_packets = 65535;
982 	/* 'Received' packets*/
983 	int npackets = 0;
984 	uint64_t packet_sqn[max_packets];    /* Fill in with sample data */
985 	double packet_delay[max_packets];    /* Fill in with sample data */
986 	uint64_t packets_sent = 0;           /* Fill in with sample data */
987 	/* reordering */
988 	const uint64_t reordering_max = 100;
989 	char buffer_reord[reordering_max * 80];
990 	size_t r = 0;
991 	uint64_t j = 0;
992 	/* Stats */
993 	uint64_t unique_packets = 0, packets_dup = 0;
994 	double quantile_25, quantile_50, quantile_75;
995 	double delay, jitter;
996 	double packet_loss;
997 	char report_buffer[1000];
998 	/* Auxiliary variables */
999 	int i, rc, rc2, rc3;
1000 
1001 	memset(packet_sqn,0,sizeof(uint64_t)*max_packets);
1002 	memset(packet_delay,0,sizeof(double)*max_packets);
1003 
1004 	/* Inititalize duplication */
1005 	rc = duplication_init(max_packets);
1006 	if (-1 == rc) {
1007 		perror("calloc");
1008 		exit(1);
1009 	}
1010 
1011 	/* Initialize quantiles */
1012 	rc = quantile_init(1, QUANTILE_EPS, max_packets);
1013 	if (-1 == rc) {
1014 		perror("malloc");
1015 		exit(1);
1016 	}
1017 
1018 	/* Initialize reordering */
1019 	rc = reordering_init(reordering_max);
1020 	if (-1 == rc) {
1021 		perror("calloc");
1022 		exit(1);
1023 	}
1024 
1025 	/* Open sample file */
1026 	if (2 == argc) {
1027 		sf = fopen(argv[1],"r");
1028 	} else {
1029 		exit(1);
1030 	}
1031 
1032 	/* Process sample input file. */
1033 
1034 	/* The sender somehow tells the receiver how many packets were
1035 	   actually sent. */
1036 	fscanf(sf,"%lu",&packets_sent);
1037 
1038 	for (i = 0; i < max_packets && !feof(sf); i++) {
1039 
1040 		fscanf(sf,"%lf %lu",&packet_delay[i],&packet_sqn[i]);
1041 		npackets++;
1042 
1043 		/*
1044 		 * Duplication
1045 		 */
1046 		if (duplication_check(packet_sqn[i])) {
1047 			/* Duplicated packet */
1048 			packets_dup++;
1049 			continue;
1050 		} else {
1051 			/* Unique packet */
1052 			unique_packets++;
1053 		}
1054 
1055 		/*
1056 		 * Delay quantiles.
1057 		 */
1058 		rc = quantile_value_checkin(0, packet_delay[i]);
1059 		if (rc < 0)
1060 			quantile_alg_error(rc);
1061 
1062 		/*
1063 		 * Reordering
1064 		 */
1065 		reordering_checkin(packet_sqn[i]);
1066 	}
1067 
1068 	/*
1069 	 * Perform last algorithm operation with a possibly not full
1070 	 * input buffer.
1071 	 */
1072 	rc = quantile_finish(0);
1073 	if (rc < 0)
1074 		quantile_alg_error(rc);
1075 
1076 	rc = quantile_output(0, unique_packets, 0.25, &quantile_25);
1077 	rc2 = quantile_output(0, unique_packets, 0.50, &quantile_50);
1078 	rc3 = quantile_output(0, unique_packets, 0.75, &quantile_75);
1079 	if (-1 == rc || -1 == rc2 || -1 == rc3) {
1080 		fprintf(stderr,"An error occurred while computing delay "
1081 			"quantiles. %d %d %d\n",rc, rc2, rc3);
1082 		exit(1);
1083 	}
1084 
1085 	/* Delay and jitter computation */
1086 	packet_loss = packets_sent > unique_packets?
1087 		(100.0*(packets_sent - unique_packets))/packets_sent: 0;
1088 	delay = (packet_loss > 50.0)? INFINITY : quantile_50;
1089 	if (packet_loss < 25.0 ) {
1090 		jitter = quantile_75 - quantile_25;
1091 	} else if (packet_loss > 75.0) {
1092 		jitter = NAN;
1093 	} else {
1094 		jitter = INFINITY;
1095 	}
1096 
1097 	/* Format final report */
1098 	snprintf(report_buffer, sizeof(report_buffer),
1099 		 "Delay: %3.3fms\n"
1100 		 "Loss: %3.3f%%\n"
1101 		 "Jitter: %3.3fms\n"
1102 		 "Duplication: %3.3f%%\n"
1103 		 "Reordering: %3.3f%\n",
1104 		 1000.0 * delay,
1105 		 packet_loss,
1106 		 1000.0 * jitter,
1107 		 100 * (double)packets_dup/npackets,
1108 		 100.0 * reordering_output(0));
1109 
1110 	printf(report_buffer);
1111 
1112 	/* Deallocate resources for statistics. */
1113 	reordering_exit();
1114 	quantile_exit();
1115 	duplication_exit();
1116 
1117 	fclose(sf);
1118 
1119 	exit(0);
1120 }
1121 
1122 #endif /* THRULAY_REPORTING_SAMPLE_LOOP */
1123