1 /*
2 * reporting.c -- performance metrics reporting as in Internet Draft
3 * draft-ietf-ippm-reporting.
4 *
5 * Written by Stanislav Shalunov, http://www.internet2.edu/~shalunov/
6 * Bernhard Lutzmann, belu@users.sf.net
7 * Federico Montesino Pouzols, fedemp@altern.org
8 *
9 * @(#) $Id: reporting.c,v 1.1.2.12 2006/08/20 18:06:19 fedemp Exp $
10 *
11 * Copyright 2003, 2006, Internet2.
12 * Legal conditions are in file LICENSE
13 * (MD5 = ecfa50d1b0bfbb81b658c810d0476a52).
14 */
15
16 /**
17 * @file reporting.c
18 *
19 * @short metrics computation and reporting.
20 **/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdlib.h>
27 #ifdef HAVE_STDINT_H
28 #include <stdint.h>
29 #endif
30 #include <float.h>
31 #include <math.h>
32 #include <string.h>
33 #include "reporting.h"
34 #ifndef THRULAY_REPORTING_SAMPLE_LOOP
35 #include "assertd.h"
36 #else
37 #include <assert.h>
38 #endif
39 #include "rcs.h"
40
41 RCS_ID("@(#) $Id: reporting.c,v 1.1.2.12 2006/08/20 18:06:19 fedemp Exp $")
42
43 #define min(a, b) ((a) < (b) ? (a) : (b))
44 #define max(a, b) ((a) > (b) ? (a) : (b))
45
46 /*
47 * Reordering.
48 */
49 #define loop(x) ((x) >= 0 ? (x) : (x) + (int)reordering_max)
50
51 /*
52 * Duplication.
53 */
54 static uint64_t *bitfield = NULL; /* Bit field used to check for
55 duplicated packets. */
56
57 /*
58 * Reordering.
59 */
60 static uint64_t reordering_max; /* We have m[j-1] == number of */
61 static uint64_t *reordering_m; /* We have m[j-1] == number of
62 j-reordered packets. */
63 static uint64_t *reordering_ring; /* Last sequence numbers seen */
64 static int r = 0; /* Ring pointer for next write. */
65 static int l = 0; /* Counter of sequence numbers. */
66
67 /*
68 * Quantiles.
69 */
70 static uint16_t quantile_max_seq; /* Maximum number of sequences */
71 static int *quantile_k; /* number of elements in buffer */
72
73 static double **quantile_input; /* This is the buffer where the
74 sequence of incoming packets is saved.
75 If we received enough packets, we will
76 write this buffer to a quantile buffer. */
77 static int *quantile_input_cnt; /* number of elements in input buffer */
78 static int *quantile_empty_buffers; /* number of empty buffers */
79
80 static int *quantile_b; /* number of buffers */
81
82 static double **quantile_buf;
83
84 static int *quantile_alternate; /* this is used to determine the offset in
85 COLLAPSE (if weight is even) */
86
87 static uint64_t *quantile_inf_cnt; /* this counter is for the additional
88 -inf, +inf elements we added to NEW buffer
89 to fill it up. */
90
91 typedef struct quantile {
92 struct quantile *next; /* pointer to next quantile buffer */
93 int weight; /* 0 if buffer is empty, > 0 if buffer is
94 * full */
95 int level;
96 double *buffer;
97 int pos; /* current position in buffer; used in
98 quantile_collapse() */
99 } quantile_t;
100
101 static quantile_t **quantile_buffer_head;
102
103 int
reordering_init(uint64_t max)104 reordering_init(uint64_t max)
105 {
106 reordering_max = max;
107 reordering_m = calloc(reordering_max, sizeof(uint64_t));
108 reordering_ring = calloc(reordering_max, sizeof(uint64_t));
109 if (reordering_m == NULL) {
110 return -1;
111 } else {
112 return 0;
113 }
114 }
115
116 int
reordering_checkin(uint64_t packet_sqn)117 reordering_checkin(uint64_t packet_sqn)
118 {
119 int j;
120
121 for (j = 0; j < min(l, (int)reordering_max) &&
122
123 packet_sqn < reordering_ring[loop(r-j-1)]; j++) {
124 reordering_m[j]++;
125 }
126 reordering_ring[r] = packet_sqn;
127 l++;
128 r = (r + 1) % reordering_max;
129 return 0;
130 }
131
132 double
reordering_output(uint64_t j)133 reordering_output(uint64_t j)
134 {
135 if (j >= reordering_max)
136 return -1;
137 else
138 return (double)reordering_m[j] / (l - j - 1);
139 }
140
141 void
reordering_exit(void)142 reordering_exit(void)
143 {
144 free(reordering_ring);
145 free(reordering_m);
146 }
147
148 int
duplication_init(uint64_t npackets)149 duplication_init(uint64_t npackets)
150 {
151 uint64_t bitfield_len = 0; /* Number of sectors in bitfield */
152
153 /* Allocate memory for bit field */
154 bitfield_len = ((npackets % 64) ? (npackets / 64 + 1) : npackets / 64);
155 bitfield = calloc(bitfield_len, sizeof(uint64_t));
156 if (bitfield == NULL) {
157 return -1;
158 } else {
159 return 0;
160 }
161 }
162
163 int
duplication_check(uint64_t packet_sqn)164 duplication_check(uint64_t packet_sqn)
165 {
166 uint64_t bitfield_sec; /* Which sector in bitfield */
167 uint64_t bitfield_bit; /* Which bit in sector */
168
169 /* Calculate sector */
170 bitfield_sec = packet_sqn >> 6;
171
172 /* Calculate bit in sector */
173 bitfield_bit = (uint64_t)1 << (packet_sqn & 63);
174
175 if (bitfield[bitfield_sec] & bitfield_bit) {
176 /* Duplicated packet */
177 return 1;
178 } else {
179 /* Unique packet */
180 bitfield[bitfield_sec] |= bitfield_bit;
181 return 0;
182 }
183 }
184
185 void
duplication_exit(void)186 duplication_exit(void)
187 {
188 free(bitfield);
189 }
190
191 /* Calculate binomial coefficient C(n, k). */
192 int64_t
binomial(int n,int k)193 binomial (int n, int k)
194 {
195 int64_t bc = 0;
196 int i, m;
197
198 /* C(n, k) = C(n, n-k) */
199 k = min(k, n-k);
200
201 if (k >= 0) {
202 bc = 1;
203 m = max(k, n-k);
204
205 for (i = 1; i <= k; i++) {
206 bc = (bc * (m + i)) / i;
207 }
208 }
209
210 return bc;
211 }
212
213 int
quantile_compare(const void * d1,const void * d2)214 quantile_compare(const void *d1, const void *d2)
215 {
216 if (*(double *)d1 < *(double *)d2) {
217 return -1;
218 } else if (*(double *)d1 == *(double *)d2) {
219 return 0;
220 } else {
221 assert(*(double *)d1 > *(double *)d2);
222 return 1;
223 }
224 }
225
226 void
quantile_sort(double * list,int length)227 quantile_sort (double *list, int length)
228 {
229 qsort(list, length, sizeof(double), quantile_compare);
230 }
231
232 /**
233 * Implementation of NEW operation from section 3.1 of paper [1].
234 *
235 * Takes as input an empty buffer. Simply populates the quantile buffer with
236 * the next k elements from the input sequence, labels the buffer as full, and
237 * assigns it a weight of 1.
238 *
239 * If there are not enough elements to fill up the buffer, we alternately add
240 * -inf, +inf elements until buffer is full (-inf == 0, +inf == DBL_MAX).
241 *
242 * NOTE: We sort the elements in the input buffer before we copy them to
243 * the quantile buffer.
244 *
245 * @param seq Sequence index.
246 *
247 * @return
248 * @retval 0 on success.
249 * @retval -2 need an empty buffer.
250 * @retval -3 bad input sequence length.
251 **/
252 int
quantile_new(uint16_t seq,quantile_t * q,double * input,int k,int level)253 quantile_new(uint16_t seq, quantile_t *q, double *input, int k, int level)
254 {
255 int i;
256
257 /* Check that buffer is really empty. */
258 if (q->weight != 0) {
259 return -2;
260 }
261
262 /* Sanity check for length of input sequence. */
263 if (k > quantile_k[seq]) {
264 return -3;
265 }
266
267 /* If there are not enough elements in the input buffer, fill it up
268 * with -inf, +inf elements. */
269 for (i = k; i < quantile_k[seq]; i++) {
270 if (i % 2) {
271 input[i] = DBL_MAX;
272 } else {
273 input[i] = 0;
274 }
275
276 /* Increment counter that indicates how many additional
277 * elements we added to fill the buffer. */
278 quantile_inf_cnt[seq]++;
279 }
280
281 quantile_sort(input, quantile_k[seq]);
282
283 memcpy(q->buffer, input, sizeof(double) * quantile_k[seq]);
284
285 /* Mark buffer as full and set level. */
286 q->weight = 1;
287 q->level = level;
288
289 /* Update number of empty quantile buffers. */
290 quantile_empty_buffers[seq]--;
291
292 return 0;
293 }
294
295
296 /* Implementation of COLLAPSE operation from section 3.2 of paper [1].
297 *
298 * This is called from quantile_algorithm() if there are no empty buffers.
299 * We COLLAPSE all the full buffers, where level has value `level'.
300 * Output is written to the first buffer in linked list with level set to
301 * `level'. The level of the output buffer is increased by 1.
302 * All other buffers we used in the COLLAPSE are marked empty. */
303 int
quantile_collapse(uint16_t seq,int level)304 quantile_collapse(uint16_t seq, int level)
305 {
306 quantile_t *qp = NULL, *qp_out = NULL;
307 int num_buffers = 0; /* number of buffers with level `level' */
308 int weight = 0; /* weight of the output buffer */
309 int offset;
310 int i, j;
311 double min_dbl;
312 long next_pos;
313 long merge_pos = 0;
314
315 /* Check that there are at least two full buffers with given level.
316 * Also calculate weight of output buffer. */
317 for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
318 if ((qp->weight != 0) && (qp->level == level)) {
319 num_buffers++;
320 weight += qp->weight;
321 qp->pos = 0;
322 } else {
323 /* We mark the buffers that are not used in this
324 * COLLAPSE. */
325 qp->pos = -1;
326 }
327 }
328 if (num_buffers < 2) {
329 return -4;
330 }
331
332 /* NOTE: The elements in full buffers are sorted. So we don't have to
333 * do that again.
334 */
335 /* Search for first full buffer with matching level. This is the buffer
336 * where we save the output. */
337 for (qp_out = quantile_buffer_head[seq]; qp_out != NULL;
338 qp_out = qp_out->next) {
339 if (qp_out->pos != -1) {
340 break;
341 }
342 }
343
344 /* Calculate offset */
345 if (weight % 2) {
346 /* odd */
347 offset = (weight + 1) / 2;
348 } else {
349 /* even - we alternate between two choices in each COLLAPSE */
350 if (quantile_alternate[seq] % 2) {
351 offset = weight / 2;
352 } else {
353 offset = (weight + 2)/ 2;
354 }
355 quantile_alternate[seq] = (quantile_alternate[seq] + 1) % 2;
356 }
357
358 /* Initialize next position of element to save. Because first position
359 * is at 0, we have to decrement offset by 1. */
360 next_pos = offset - 1;
361
362 for (i = 0; i < quantile_k[seq]; ) {
363
364 /* Search for current minimal element in all buffers.
365 * Because buffers are all sorted, we just have to check the
366 * element at current position. */
367 min_dbl = DBL_MAX;
368 for (qp = quantile_buffer_head[seq]; qp != NULL;
369 qp = qp->next) {
370 /* Skip wrong buffers. */
371 if (qp->pos == -1) {
372 continue;
373 }
374
375 /* Check that we are not at the end of buffer. */
376 if (qp->pos >= quantile_k[seq]) {
377 continue;
378 }
379
380 /* Update minimum element. */
381 min_dbl = min(min_dbl, qp->buffer[qp->pos]);
382 }
383
384 /* Now process this minimal element in all buffers. */
385 for (qp = quantile_buffer_head[seq]; qp != NULL;
386 qp = qp->next) {
387 /* Skip wrong buffers. */
388 if (qp->pos == -1) {
389 continue;
390 }
391
392 /* Now process minimal element in this buffer. */
393 for (; (qp->buffer[qp->pos] == min_dbl) &&
394 (qp->pos < quantile_k[seq]);
395 qp->pos++) {
396
397 /* We run this loop `qp->weight' times.
398 * We check there if we are in a position
399 * so we have to save this element in our
400 * output buffer. */
401 for (j = 0; j < qp->weight; j++) {
402
403 if (next_pos == merge_pos) {
404 quantile_buf[seq][i] = min_dbl;
405 i++;
406
407 if (i == quantile_k[seq]) {
408 /* We have written
409 * all elements to
410 * output buffer, so
411 * exit global loop. */
412 goto out;
413 }
414
415 /* Update next position. */
416 next_pos += weight;
417 }
418
419 merge_pos++;
420 } /* for(j = 0; j < qp->weight; j++) */
421 } /* for (; (qp->buffer[qp->pos] == min_dbl) &&
422 (qp->pos < quantile_k[seq]); qp->pos++) */
423 } /* for (qp = quantile_buffer_head[seq]; qp!=NULL;
424 qp = qp->next) */
425 } /* for (i = 0; i < quantile_k[seq]; ) */
426
427 out:
428 memcpy(qp_out->buffer, quantile_buf[seq],
429 sizeof(double) * quantile_k[seq]);
430
431 /* Update weight of output buffer. */
432 qp_out->weight = weight;
433 qp_out->level = level+1;
434
435 /* Update list of empty buffers. */
436 for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
437 if ((qp->pos != -1) && (qp != qp_out)) {
438 qp->weight = 0;
439 qp->level = 0;
440 }
441 }
442 quantile_empty_buffers[seq] += num_buffers - 1;
443 return 0;
444 }
445
446 /**
447 * Implementation of COLLAPSE policies from section 3.4 of paper [1].
448 *
449 * There are three different algorithms noted in the paper. We use the
450 * "New Algorithm".
451 *
452 * @param seq Sequence index.
453 *
454 * @return
455 * @retval 0 on success.
456 * @retval -1 quantiles not initialized.
457 * @retval -2 need an empty buffer for new operation.
458 * @retval -3 bad input sequence length in new operation.
459 * @retval -4 not enough buffers for collapse operation.
460 **/
461 int
quantile_algorithm(uint16_t seq,double * input,int k)462 quantile_algorithm (uint16_t seq, double *input, int k)
463 {
464 int rc;
465 quantile_t *qp = NULL, *qp2 = NULL;
466 int min_level = -1;
467
468 /* This should always be true. */
469 if (quantile_buffer_head[seq] != NULL) {
470 min_level = quantile_buffer_head[seq]->level;
471 } else {
472 return -1;
473 }
474
475 /* Get minimum level of all currently full buffers. */
476 for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
477 if (qp->weight != 0) {
478 /* Full buffer. */
479 min_level = min(min_level, qp->level);
480 }
481 }
482
483 if (quantile_empty_buffers[seq] == 0) {
484 /* There are no empty buffers. Invoke COLLAPSE on the set
485 * of buffers with minimum level. */
486
487 rc = quantile_collapse(seq, min_level);
488 if (rc < 0)
489 return rc;
490 } else if (quantile_empty_buffers[seq] == 1) {
491 /* We have exactly one empty buffer. Invoke NEW and assign
492 * it level `min_level'. */
493
494 /* Search the empty buffer. */
495 for (qp = quantile_buffer_head[seq]; qp != NULL;
496 qp = qp->next) {
497 if (qp->weight == 0) {
498 /* Found empty buffer. */
499 break;
500 }
501 }
502
503 rc = quantile_new(seq, qp, input, k, min_level);
504 if (rc < 0)
505 return rc;
506 } else {
507 /* There are at least two empty buffers. Invoke NEW on each
508 * and assign level `0' to each. */
509
510 /* Search for two empty buffers. */
511 for (qp = quantile_buffer_head[seq]; qp != NULL;
512 qp = qp->next) {
513 if (qp->weight == 0) {
514 /* Found first empty buffer. */
515 break;
516 }
517 }
518 for (qp2 = qp->next; qp2 != NULL; qp2 = qp2->next) {
519 if (qp2->weight == 0) {
520 /* Found second empty buffer. */
521 break;
522 }
523 }
524
525 if (k <= quantile_k[seq]) {
526 /* This could happen if we call this after we
527 * received all packets but don't have enough to
528 * fill up two buffers. */
529
530 rc = quantile_new(seq, qp, input, k, 0);
531 if (rc < 0)
532 return rc;
533 } else {
534 /* We have enough input data for two buffers. */
535 rc = quantile_new(seq, qp, input, quantile_k[seq], 0);
536 if (rc < 0)
537 return rc;
538 rc = quantile_new(seq, qp2, input + quantile_k[seq],
539 k - quantile_k[seq], 0);
540 if (rc < 0)
541 return rc;
542 }
543 }
544 return 0;
545 }
546
547 int
quantile_init_seq(uint16_t seq)548 quantile_init_seq(uint16_t seq)
549 {
550 quantile_t *qp = NULL;
551 int i;
552
553 if ( seq >= quantile_max_seq)
554 return -5;
555
556 /* Allocate memory for quantile buffers. Buffers are linked lists
557 * with a pointer to next buffer.
558 * We need `quantile_b' buffers, where each buffer has space for
559 * `quantile_k' elements. */
560 for (i = 0; i < quantile_b[seq]; i++) {
561 if (i == 0) {
562 /* Initialize first buffer. */
563 qp = malloc(sizeof(quantile_t));
564 if (qp == NULL) {
565 return -1;
566 }
567 quantile_buffer_head[seq] = qp;
568 } else {
569 qp->next = malloc(sizeof(quantile_t));
570 if (qp->next == NULL) {
571 return -1;
572 }
573 qp = qp->next;
574 }
575
576 /* `qp' points to buffer that should be initialized. */
577 qp->next = NULL;
578 qp->weight = 0; /* empty buffers have weight of 0 */
579 qp->level = 0;
580 qp->buffer = malloc(sizeof(double) * quantile_k[seq]);
581 if (qp->buffer == NULL) {
582 return -1;
583 }
584 }
585 /* Update number of empty quantile buffers. */
586 quantile_empty_buffers[seq] = quantile_b[seq];
587
588 return 0;
589 }
590
591 int
quantile_init(uint16_t max_seq,double eps,uint64_t N)592 quantile_init (uint16_t max_seq, double eps, uint64_t N)
593 {
594 int b, b_tmp = 0;
595 int k, k_tmp = 0;
596 int h, h_max = 0;
597 int seq, rc;
598
599 quantile_max_seq = max_seq;
600 /* Allocate array for the requested number of sequences. */
601 quantile_k = calloc(max_seq, sizeof(int));
602 quantile_input = calloc(max_seq, sizeof(double*));
603 quantile_input_cnt = calloc(max_seq, sizeof(int));
604 quantile_empty_buffers = calloc(max_seq, sizeof(int));
605 quantile_b = calloc(max_seq, sizeof(int));
606 quantile_buf = calloc(max_seq, sizeof(double*));
607 quantile_alternate = calloc(max_seq, sizeof(int));
608 quantile_inf_cnt = calloc(max_seq, sizeof(uint64_t));
609 quantile_buffer_head = calloc(max_seq, sizeof(quantile_t*));
610
611 /* "In practice, optimal values for b and k can be computed by trying
612 * out different values of b in the range 1 and 30." */
613 for (b = 2; b <= 30; b++) {
614
615 /* For each b, compute the largest integral h that satisfies:
616 * (h-2) * C(b+h-2, h-1) - C(b+h-3, h-3) +
617 * C(b+h-3, h-2) <= 2 * eps * N
618 */
619 for (h = 0; ; h++) {
620 if (((h-2) * binomial(b+h-2, h-1) -
621 binomial(b+h-3, h-3) +
622 binomial(b+h-3, h-2)) >
623 (2 * eps * N)) {
624 /* This h does not satisfy the inequality from
625 * above. */
626 break;
627 }
628 h_max = h;
629 }
630
631 /* Now compute the smallest integral k that satisfies:
632 * k * C(b+h-2, h-1) => N. */
633 k = ceil(N / (double)binomial(b+h_max-2, h_max-1));
634
635 /* Identify that b that minimizes b*k. */
636 if ((b_tmp == 0) && (k_tmp == 0)) {
637 /* Initialize values */
638 b_tmp = b;
639 k_tmp = k;
640 }
641
642 if ((b * k) < (b_tmp * k_tmp)) {
643 /* Found b and k for which the product is smaller
644 * than for the ones before. Because we want to
645 * minimize b*k (required memory), we save them. */
646 b_tmp = b;
647 k_tmp = k;
648 }
649 }
650
651 /* Set global quantile values. For now, all sequences share
652 the same k and b values.*/
653 for (seq = 0; seq < max_seq; seq++ ) {
654 quantile_b[seq] = b_tmp;
655 quantile_k[seq] = k_tmp;
656 }
657
658 /* Allocate memory for input buffer.
659 * We allocate enough space to save up to `2 * quantile_k' elements.
660 * This space is needed in the COLLAPSE policy if there are more
661 * than two empty buffers. Because then we have to invoke NEW on two
662 * buffers and thus need an input buffer with `2 * quantile_k'
663 * elements. */
664 for (seq = 0; seq < quantile_max_seq; seq++) {
665 quantile_input[seq] = malloc(sizeof(double) * 2 *
666 quantile_k[seq]);
667 if (quantile_input[seq] == NULL) {
668 return -1;
669 }
670 quantile_input_cnt[seq] = 0;
671 }
672
673 /* Allocate memory for output buffer.
674 * This buffer is used in COLLAPSE to store temporary output buffer
675 * before it gets copied to one of the buffers used in COLLAPSE. */
676 for (seq = 0; seq < quantile_max_seq; seq++ ) {
677 quantile_buf[seq] = malloc(sizeof(double) * quantile_k[seq]);
678 if (quantile_buf[seq] == NULL) {
679 return -1;
680 }
681 }
682
683 for (seq = 0; seq < max_seq; seq++) {
684 rc = quantile_init_seq(seq);
685 if (rc < 0)
686 return rc;
687 }
688
689 return 0;
690 }
691
692 int
quantile_value_checkin(uint16_t seq,double value)693 quantile_value_checkin(uint16_t seq, double value)
694 {
695 int rc = 0;
696
697 if ( seq >= quantile_max_seq)
698 return -5;
699
700 quantile_input[seq][quantile_input_cnt[seq]++] = value;
701
702 /* If we have at least two empty buffers,
703 * we need input for two buffers, to twice
704 * the value of `quantile_k'. */
705 if (quantile_empty_buffers[seq] >= 2) {
706 if (quantile_input_cnt[seq] ==
707 (2 * quantile_k[seq])) {
708 rc = quantile_algorithm(seq, quantile_input[seq],
709 quantile_input_cnt[seq]);
710 /* Reset counter. */
711 quantile_input_cnt[seq] = 0;
712 }
713 } else {
714 /* There are 0 or 1 empty buffers */
715 if (quantile_input_cnt[seq] == quantile_k[seq]) {
716 rc = quantile_algorithm(seq, quantile_input[seq],
717 quantile_input_cnt[seq]);
718 /* Reset counter. */
719 quantile_input_cnt[seq] = 0;
720 }
721 }
722 return rc;
723 }
724
725 int
quantile_finish(uint16_t seq)726 quantile_finish(uint16_t seq)
727 {
728 int rc = 0;
729
730 if ( seq >= quantile_max_seq)
731 return -5;
732
733 if (quantile_input_cnt[seq] > 0) {
734 rc = quantile_algorithm(seq, quantile_input[seq],
735 quantile_input_cnt[seq]);
736 }
737 return rc;
738 }
739
740 void
quantile_reset(uint16_t seq)741 quantile_reset(uint16_t seq)
742 {
743 quantile_input_cnt[seq] = 0;
744 quantile_empty_buffers[seq] = quantile_b[seq];
745 memset(quantile_buf[seq],0,sizeof(double) * quantile_k[seq]);
746 memset(quantile_input[seq],0,sizeof(double) * quantile_k[seq]);
747 }
748
749 /**
750 * Deinitialize one quantile sequence.
751 **/
752 void
quantile_exit_seq(uint16_t seq)753 quantile_exit_seq(uint16_t seq)
754 {
755 quantile_t *qp = NULL, *next;
756
757 if (seq >= quantile_max_seq)
758 return;
759
760 qp = quantile_buffer_head[seq];
761 while (qp != NULL) {
762 /* Save pointer to next buffer. */
763 next = qp->next;
764
765 /* Free buffer and list entry. */
766 free(qp->buffer);
767 free(qp);
768
769 /* Set current buffer to next one. */
770 qp = next;
771 }
772
773 quantile_buffer_head[seq] = NULL;
774 quantile_input_cnt[seq] = 0;
775 quantile_empty_buffers[seq] = quantile_b[seq];
776 }
777
778 void
quantile_exit(void)779 quantile_exit(void)
780 {
781 int seq;
782
783 /* Free per sequence structures */
784 for (seq = 0; seq < quantile_max_seq; seq++) {
785 quantile_exit_seq(seq);
786
787 /* Free output buffer. */
788 free(quantile_buf[seq]);
789
790 /* Free input buffer. */
791 free(quantile_input[seq]);
792 }
793
794 free(quantile_buffer_head);
795 free(quantile_inf_cnt);
796 free(quantile_alternate);
797 free(quantile_buf);
798 free(quantile_b);
799 free(quantile_empty_buffers);
800 free(quantile_input_cnt);
801 free(quantile_input);
802 free(quantile_k);
803 }
804
805 int
quantile_output(uint16_t seq,uint64_t npackets,double phi,double * result)806 quantile_output (uint16_t seq, uint64_t npackets, double phi, double *result)
807 {
808 quantile_t *qp = NULL;
809 int num_buffers = 0;
810 int weight = 0;
811 int j;
812 long next_pos = 0;
813 long merge_pos = 0;
814 double min_dbl;
815 double beta;
816 double phi2; /* this is phi' */
817
818 if ( seq >= quantile_max_seq)
819 return -5;
820
821 /* Check that there are at least two full buffers with given level. */
822 for (qp = quantile_buffer_head[seq]; qp != NULL; qp = qp->next) {
823 if (qp->weight != 0) {
824 num_buffers++;
825 weight += qp->weight;
826 qp->pos = 0;
827 } else {
828 qp->pos = -1;
829 }
830 }
831 if (num_buffers < 2) {
832 /* XXX: In section 3.3 "OUTPUT operation" of paper [1] is says
833 * that OUTPUT takes c => 2 full input buffers. But what if we
834 * just have one full input buffer?
835 *
836 * For example this happens if you run a UDP test with a
837 * block size of 100k and a test duration of 3 seconds:
838 * $ ./thrulay -u 100k -t 3 localhost
839 */
840
841 if (num_buffers != 1) {
842 return -1;
843 }
844 }
845
846 /* Calculate beta and phi' */
847 beta = 1 + quantile_inf_cnt[seq] / (double)npackets;
848 assert(beta >= 1.0);
849
850 assert(phi >= 0.0 && phi <= 1.0);
851 phi2 = (2 * phi + beta - 1) / (2 * beta);
852
853 next_pos = ceil(phi2 * quantile_k[seq] * weight);
854
855 /* XXX: If the client just sends a few packets, it is possible that
856 * next_pos is too large. If this is the case, decrease it. */
857 if (next_pos >= (num_buffers * quantile_k[seq])) {
858 next_pos --;
859 }
860
861 while (1) {
862
863 /* Search for current minimal element in all buffers.
864 * Because buffers are all sorted, we just have to check the
865 * element at current position. */
866 min_dbl = DBL_MAX;
867 for (qp = quantile_buffer_head[seq]; qp != NULL;
868 qp = qp->next) {
869 /* Skip wrong buffers. */
870 if (qp->pos == -1) {
871 continue;
872 }
873
874 /* Check that we are not at the end of buffer. */
875 if (qp->pos >= quantile_k[seq]) {
876 continue;
877 }
878
879 /* Update minimum element. */
880 min_dbl = min(min_dbl, qp->buffer[qp->pos]);
881 }
882
883 /* Now process this minimal element in all buffers. */
884 for (qp = quantile_buffer_head[seq]; qp != NULL;
885 qp = qp->next) {
886 /* Skip wrong buffers. */
887 if (qp->pos == -1) {
888 continue;
889 }
890
891 /* Now process minimal element in this buffer. */
892 for (; (qp->buffer[qp->pos] == min_dbl) &&
893 (qp->pos < quantile_k[seq]);
894 qp->pos++) {
895
896 /* Increment merge position `qp->weight'
897 * times. If we pass the position we seek,
898 * return current minimal element. */
899 for (j = 0; j < qp->weight; j++) {
900 if (next_pos == merge_pos) {
901 *result = min_dbl;
902 return 0;
903 }
904 merge_pos++;
905 }
906 }
907 }
908 }
909
910 /* NOTREACHED */
911 }
912
913 #ifdef THRULAY_REPORTING_SAMPLE_LOOP
914
915 #include <stdio.h>
916 #include <strings.h>
917
918 #ifndef NAN
919 #define _ISOC99_SOURCE
920 #include <math.h>
921 #endif
922
923 #define ERR_FATAL 0
924 #define ERR_WARNING 1
925
926 void __attribute__((noreturn))
quantile_alg_error(int rc)927 quantile_alg_error(int rc)
928 {
929 switch (rc) {
930 case -1:
931 fprintf(stderr, "Error: quantiles not initialized.");
932 break;
933 case -2:
934 fprintf(stderr, "Error: NEW needs an empty buffer.");
935 break;
936 case -3:
937 fprintf(stderr, "Error: Bad input sequence length.");
938 break;
939 case -4:
940 fprintf(stderr, "Error: Not enough buffers for COLLAPSE.");
941 break;
942 default:
943 fprintf(stderr, "Error: Unknown quantile_algorithm error.");
944 }
945 exit(1);
946 }
947
948 /**
949 * Will read a sample data file (first and only parameter) whose lines
950 * give two values per line (per received packet): measured packet
951 * delay and packet sequence number (in "%lf %lu" format). As an
952 * exception, the first line specifies the number of packets actually
953 * sent. Example:
954 * ----
955 * 10
956 * 0.101 0
957 * 0.109 1
958 * 0.12 1
959 * 0.10 3
960 * 0.14 4
961 * 0.15 5
962 * 0.13 2
963 * 0.09 6
964 * 0.1 8
965 * 0.091 7
966 * ----
967 *
968 * To compile this sample reporting main(), the following symbols
969 * should be defined: HAVE_CONFIG_H and
970 * HAVE_THRULAY_REPORTING_SAMPLE_LOOP. '..' should be specified as
971 * include directory, and libm should be linked as well. For instance:
972 *
973 * gcc -std=c99 -Wno-long-long -Wall -pedantic -W -Wpointer-arith -Wnested-externs -DHAVE_CONFIG_H -DTHRULAY_REPORTING_SAMPLE_LOOP -I../ -o reporting reporting.c -lm
974 *
975 **/
976 int
main(int argc,char * argv[])977 main(int argc, char *argv[])
978 {
979 FILE *sf;
980 /* 'Measured data' */
981 const int max_packets = 65535;
982 /* 'Received' packets*/
983 int npackets = 0;
984 uint64_t packet_sqn[max_packets]; /* Fill in with sample data */
985 double packet_delay[max_packets]; /* Fill in with sample data */
986 uint64_t packets_sent = 0; /* Fill in with sample data */
987 /* reordering */
988 const uint64_t reordering_max = 100;
989 char buffer_reord[reordering_max * 80];
990 size_t r = 0;
991 uint64_t j = 0;
992 /* Stats */
993 uint64_t unique_packets = 0, packets_dup = 0;
994 double quantile_25, quantile_50, quantile_75;
995 double delay, jitter;
996 double packet_loss;
997 char report_buffer[1000];
998 /* Auxiliary variables */
999 int i, rc, rc2, rc3;
1000
1001 memset(packet_sqn,0,sizeof(uint64_t)*max_packets);
1002 memset(packet_delay,0,sizeof(double)*max_packets);
1003
1004 /* Inititalize duplication */
1005 rc = duplication_init(max_packets);
1006 if (-1 == rc) {
1007 perror("calloc");
1008 exit(1);
1009 }
1010
1011 /* Initialize quantiles */
1012 rc = quantile_init(1, QUANTILE_EPS, max_packets);
1013 if (-1 == rc) {
1014 perror("malloc");
1015 exit(1);
1016 }
1017
1018 /* Initialize reordering */
1019 rc = reordering_init(reordering_max);
1020 if (-1 == rc) {
1021 perror("calloc");
1022 exit(1);
1023 }
1024
1025 /* Open sample file */
1026 if (2 == argc) {
1027 sf = fopen(argv[1],"r");
1028 } else {
1029 exit(1);
1030 }
1031
1032 /* Process sample input file. */
1033
1034 /* The sender somehow tells the receiver how many packets were
1035 actually sent. */
1036 fscanf(sf,"%lu",&packets_sent);
1037
1038 for (i = 0; i < max_packets && !feof(sf); i++) {
1039
1040 fscanf(sf,"%lf %lu",&packet_delay[i],&packet_sqn[i]);
1041 npackets++;
1042
1043 /*
1044 * Duplication
1045 */
1046 if (duplication_check(packet_sqn[i])) {
1047 /* Duplicated packet */
1048 packets_dup++;
1049 continue;
1050 } else {
1051 /* Unique packet */
1052 unique_packets++;
1053 }
1054
1055 /*
1056 * Delay quantiles.
1057 */
1058 rc = quantile_value_checkin(0, packet_delay[i]);
1059 if (rc < 0)
1060 quantile_alg_error(rc);
1061
1062 /*
1063 * Reordering
1064 */
1065 reordering_checkin(packet_sqn[i]);
1066 }
1067
1068 /*
1069 * Perform last algorithm operation with a possibly not full
1070 * input buffer.
1071 */
1072 rc = quantile_finish(0);
1073 if (rc < 0)
1074 quantile_alg_error(rc);
1075
1076 rc = quantile_output(0, unique_packets, 0.25, &quantile_25);
1077 rc2 = quantile_output(0, unique_packets, 0.50, &quantile_50);
1078 rc3 = quantile_output(0, unique_packets, 0.75, &quantile_75);
1079 if (-1 == rc || -1 == rc2 || -1 == rc3) {
1080 fprintf(stderr,"An error occurred while computing delay "
1081 "quantiles. %d %d %d\n",rc, rc2, rc3);
1082 exit(1);
1083 }
1084
1085 /* Delay and jitter computation */
1086 packet_loss = packets_sent > unique_packets?
1087 (100.0*(packets_sent - unique_packets))/packets_sent: 0;
1088 delay = (packet_loss > 50.0)? INFINITY : quantile_50;
1089 if (packet_loss < 25.0 ) {
1090 jitter = quantile_75 - quantile_25;
1091 } else if (packet_loss > 75.0) {
1092 jitter = NAN;
1093 } else {
1094 jitter = INFINITY;
1095 }
1096
1097 /* Format final report */
1098 snprintf(report_buffer, sizeof(report_buffer),
1099 "Delay: %3.3fms\n"
1100 "Loss: %3.3f%%\n"
1101 "Jitter: %3.3fms\n"
1102 "Duplication: %3.3f%%\n"
1103 "Reordering: %3.3f%\n",
1104 1000.0 * delay,
1105 packet_loss,
1106 1000.0 * jitter,
1107 100 * (double)packets_dup/npackets,
1108 100.0 * reordering_output(0));
1109
1110 printf(report_buffer);
1111
1112 /* Deallocate resources for statistics. */
1113 reordering_exit();
1114 quantile_exit();
1115 duplication_exit();
1116
1117 fclose(sf);
1118
1119 exit(0);
1120 }
1121
1122 #endif /* THRULAY_REPORTING_SAMPLE_LOOP */
1123