1 /**
2  * This module implements the Cormode-Muthukrishnan algorithm
3  * for computation of biased quantiles over data streams from
4  * "Effective Computation of Biased Quantiles over Data Streams"
5  *
6  */
7 #include <stdint.h>
8 #include <iso646.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <math.h>
12 #include <limits.h>
13 #include <stdio.h>
14 #include "heap.h"
15 #include "cm_quantile.h"
16 
17 /* Static declarations */
18 static void cm_add_to_buffer(cm_quantile *cm, double value);
19 static double cm_insert_point_value(cm_quantile *cm);
20 static void cm_reset_insert_cursor(cm_quantile *cm);
21 static int cm_cursor_increment(cm_quantile *cm);
22 static void cm_insert_sample(cm_quantile *cm, cm_sample *position, cm_sample *new);
23 static void cm_append_sample(cm_quantile *cm, cm_sample *new);
24 static void cm_insert(cm_quantile *cm);
25 static void cm_compress(cm_quantile *cm);
26 static uint64_t cm_threshold(cm_quantile *cm, uint64_t rank);
27 
28 // This is a comparison function that treats keys as doubles
compare_double_keys(register void * key1,register void * key2)29 static int compare_double_keys(register void* key1, register void* key2) {
30     // Cast them as double* and read them in
31     register double val1 = *((double*)key1);
32     register double val2 = *((double*)key2);
33 
34     // Perform the comparison
35     if (val1 < val2)
36         return -1;
37     else if (val1 == val2)
38         return 0;
39     else
40         return 1;
41 }
42 
43 /**
44  * Initializes the CM quantile struct
45  * @arg eps The maximum error for the quantiles
46  * @arg quantiles A sorted array of double quantile values, must be on (0, 1)
47  * @arg num_quants The number of entries in the quantiles array
48  * @arg cm_quantile The cm_quantile struct to initialize
49  * @return 0 on success.
50  */
init_cm_quantile(double eps,double * quantiles,uint32_t num_quants,cm_quantile * cm)51 int init_cm_quantile(double eps, double *quantiles, uint32_t num_quants, cm_quantile *cm) {
52     // Verify the sanity of epsilon
53     if (eps <= 0 or eps >= 0.5) return -1;
54 
55     // Verify the quantiles
56     if (!num_quants) return -1;
57     for (int i=0; i < num_quants; i++) {
58         double val = quantiles[i];
59         if (val <= 0 or val >= 1) return -1;
60     }
61 
62     // Check that we have a non-null cm
63     if (!cm) return -1;
64 
65     // Initialize
66     cm->eps = eps;
67     cm->num_samples = 0;
68     cm->num_values = 0;
69     cm->samples = NULL;
70     cm->end = NULL;
71 
72     // Copy the quantiles
73     cm->quantiles = malloc(num_quants * sizeof(double));
74     memcpy(cm->quantiles, quantiles, num_quants * sizeof(double));
75     cm->num_quantiles = num_quants;
76 
77     // Initialize the buffers
78     heap *heaps = malloc(2*sizeof(heap));
79     cm->bufLess = heaps;
80     cm->bufMore = heaps+1;
81     heap_create(cm->bufLess, 0, compare_double_keys);
82     heap_create(cm->bufMore, 0, compare_double_keys);
83 
84     // Setup the cursors
85     cm->insert.curs = NULL;
86     cm->compress.curs = NULL;
87 
88     return 0;
89 }
90 
91 /*
92  * Callback function to delete the sample inside the
93  * heap buffer.
94  */
free_buffer_sample(void * key,void * val)95 static void free_buffer_sample(void *key, void *val) {
96     free(val);
97 }
98 
99 /**
100  * Destroy the CM quantile struct.
101  * @arg cm_quantile The cm_quantile to destroy
102  * @return 0 on success.
103  */
destroy_cm_quantile(cm_quantile * cm)104 int destroy_cm_quantile(cm_quantile *cm) {
105     // Free the quantiles
106     free(cm->quantiles);
107 
108     // Destroy everything in the buffer
109     heap_foreach(cm->bufLess, free_buffer_sample);
110     heap_destroy(cm->bufLess);
111     heap_foreach(cm->bufMore, free_buffer_sample);
112     heap_destroy(cm->bufMore);
113 
114     // Free the lower address, since they are allocated to be adjacent
115     free((cm->bufLess < cm->bufMore) ? cm->bufLess : cm->bufMore);
116 
117     // Iterate through the linked list, free all
118     cm_sample *next;
119     cm_sample *current = cm->samples;
120     while (current) {
121         next = current->next;
122         free(current);
123         current = next;
124     }
125 
126     return 0;
127 }
128 
129 /**
130  * Adds a new sample to the struct
131  * @arg cm_quantile The cm_quantile to add to
132  * @arg sample The new sample value
133  * @return 0 on success.
134  */
cm_add_sample(cm_quantile * cm,double sample)135 int cm_add_sample(cm_quantile *cm, double sample) {
136     cm_add_to_buffer(cm, sample);
137     cm_insert(cm);
138     cm_compress(cm);
139     return 0;
140 }
141 
142 /**
143  * Forces the internal buffers to be flushed,
144  * this allows query to have maximum accuracy.
145  * @arg cm_quantile The cm_quantile to add to
146  * @return 0 on success.
147  */
cm_flush(cm_quantile * cm)148 int cm_flush(cm_quantile *cm) {
149     int rounds = 0;
150     while (heap_size(cm->bufLess) or heap_size(cm->bufMore)) {
151         if (heap_size(cm->bufMore) == 0) cm_reset_insert_cursor(cm);
152         cm_insert(cm);
153         cm_compress(cm);
154         rounds++;
155     }
156     return 0;
157 }
158 
159 /**
160  * Queries for a quantile value
161  * @arg cm_quantile The cm_quantile to query
162  * @arg quantile The quantile to query
163  * @return The value on success or 0.
164  */
cm_query(cm_quantile * cm,double quantile)165 double cm_query(cm_quantile *cm, double quantile) {
166     uint64_t rank = ceil(quantile * cm->num_values);
167 	uint64_t min_rank=0;
168     uint64_t max_rank;
169 	uint64_t threshold = ceil(cm_threshold(cm, rank) / 2.);
170 
171     cm_sample *prev = cm->samples;
172     cm_sample *current = cm->samples;
173     while (current) {
174         max_rank = min_rank + current->width + current->delta;
175         if (max_rank > rank + threshold || min_rank > rank) {
176             break;
177         }
178         min_rank += current->width;
179         prev = current;
180         current = current->next;
181     }
182     return (prev) ? prev->value : 0;
183 }
184 
185 /**
186  * Adds a new sample to the buffer
187  */
cm_add_to_buffer(cm_quantile * cm,double value)188 static void cm_add_to_buffer(cm_quantile *cm, double value) {
189     // Allocate a new sample
190     cm_sample *s = calloc(1, sizeof(cm_sample));
191     s->value = value;
192 
193     /*
194      * Check the cursor value.
195      * Only use bufLess if we have at least a single value.
196      */
197     if (cm->num_values && value < cm_insert_point_value(cm)) {
198         heap_insert(cm->bufLess, &s->value, s);
199     } else {
200         heap_insert(cm->bufMore, &s->value, s);
201     }
202 }
203 
204 // Returns the value under the insertion cursor or 0
cm_insert_point_value(cm_quantile * cm)205 static double cm_insert_point_value(cm_quantile *cm) {
206     return (cm->insert.curs) ? cm->insert.curs->value : 0;
207 }
208 
209 // Resets the insert cursor
cm_reset_insert_cursor(cm_quantile * cm)210 static void cm_reset_insert_cursor(cm_quantile *cm) {
211     // Swap the buffers, reset the cursor
212     heap *tmp = cm->bufLess;
213     cm->bufLess = cm->bufMore;
214     cm->bufMore = tmp;
215     cm->insert.curs = NULL;
216 }
217 
218 // Computes the number of items to process in one iteration
cm_cursor_increment(cm_quantile * cm)219 static int cm_cursor_increment(cm_quantile *cm) {
220     return ceil(cm->num_samples * cm->eps);
221 }
222 
223 /* Inserts a new sample before the position sample */
cm_insert_sample(cm_quantile * cm,cm_sample * position,cm_sample * new)224 static void cm_insert_sample(cm_quantile *cm, cm_sample *position, cm_sample *new) {
225     // Inserting at the head
226     if (!position->prev) {
227         position->prev = new;
228         cm->samples = new;
229         new->next = position;
230     } else {
231         cm_sample *prev = position->prev;
232         prev->next = new;
233         position->prev = new;
234         new->prev = prev;
235         new->next = position;
236     }
237 }
238 
239 /* Inserts a new sample at the end */
cm_append_sample(cm_quantile * cm,cm_sample * new)240 static void cm_append_sample(cm_quantile *cm, cm_sample *new) {
241     new->prev = cm->end;
242     cm->end->next = new;
243     cm->end = new;
244 }
245 
246 /**
247  * Incrementally processes inserts by moving
248  * data from the buffer to the samples using a cursor
249  */
cm_insert(cm_quantile * cm)250 static void cm_insert(cm_quantile *cm) {
251     // Check if this is the first element
252     cm_sample *samp;
253     if (!cm->samples) {
254         if (!heap_delmin(cm->bufMore, NULL, (void**)&samp)) return;
255         samp->width = 1;
256         samp->delta = 0;
257         cm->samples = samp;
258         cm->end = samp;
259         cm->num_values++;
260         cm->num_samples++;
261         cm->insert.curs = samp;
262 		return;
263 	}
264 
265     // Check if we need to initialize the cursor
266     if (!cm->insert.curs) {
267         cm->insert.curs = cm->samples;
268     }
269 
270     // Handle adding values in the middle
271     int incr_size = cm_cursor_increment(cm);
272     double *val;
273     for (int i=0; i < incr_size and cm->insert.curs; i++) {
274         while (heap_min(cm->bufMore, (void**)&val, NULL) && *val <= cm_insert_point_value(cm)) {
275             heap_delmin(cm->bufMore, NULL, (void**)&samp);
276             samp->width = 1;
277             samp->delta = cm->insert.curs->width + cm->insert.curs->delta - 1;
278             cm_insert_sample(cm, cm->insert.curs, samp);
279             cm->num_values++;
280             cm->num_samples++;
281 
282             // Check if we need to update the compress cursor
283             if (cm->compress.curs && cm->compress.curs->value >= samp->value) {
284                 cm->compress.min_rank++;
285             }
286         }
287         // Increment the cursor
288         cm->insert.curs = cm->insert.curs->next;
289 	}
290 
291     // Handle adding values at the end
292     if (cm->insert.curs == NULL) {
293         while (heap_min(cm->bufMore, (void**)&val, NULL) && *val > cm->end->value) {
294             heap_delmin(cm->bufMore, NULL, (void**)&samp);
295             samp->width = 1;
296             samp->delta = 0;
297             cm_append_sample(cm, samp);
298             cm->num_values++;
299             cm->num_samples++;
300         }
301 
302         // Reset the cursor
303         cm_reset_insert_cursor(cm);
304 	}
305 }
306 
307 /* Incrementally processes compression by using a cursor */
cm_compress(cm_quantile * cm)308 static void cm_compress(cm_quantile *cm) {
309     // Bail early if there is nothing to really compress..
310     if (cm->num_samples < 3) return;
311 
312     // Check if we need to initialize the cursor
313     if (!cm->compress.curs) {
314         cm->compress.curs = cm->end->prev;
315         cm->compress.min_rank = cm->num_values - 1 - cm->compress.curs->width;
316         cm->compress.curs = cm->compress.curs->prev;
317     }
318 
319     int incr_size = cm_cursor_increment(cm);
320     cm_sample *next, *prev;
321     uint64_t threshold;
322     uint64_t max_rank, test_val;
323     for (int i=0; i < incr_size and cm->compress.curs != cm->samples; i++) {
324         next = cm->compress.curs->next;
325         max_rank = cm->compress.min_rank + cm->compress.curs->width + cm->compress.curs->delta;
326         cm->compress.min_rank -= cm->compress.curs->width;
327 
328         threshold = cm_threshold(cm, max_rank);
329         test_val = cm->compress.curs->width + next->width + next->delta;
330         if (test_val <= threshold) {
331             // Make sure we don't stomp the insertion cursor
332             if (cm->insert.curs == cm->compress.curs) {
333                 cm->insert.curs = next;
334             }
335 
336             // Combine the widths
337             next->width += cm->compress.curs->width;
338 
339             // Remove the tuple
340             prev = cm->compress.curs->prev;
341             prev->next = next;
342             next->prev = prev;
343             free(cm->compress.curs);
344             cm->compress.curs = prev;
345 
346             // Reduce the sample count
347             cm->num_samples--;
348         } else {
349             cm->compress.curs = cm->compress.curs->prev;
350         }
351     }
352 
353     // Reset the cursor if we hit the start
354     if (cm->compress.curs == cm->samples) cm->compress.curs = NULL;
355 }
356 
357 /* Computes the minimum threshold value */
cm_threshold(cm_quantile * cm,uint64_t rank)358 static uint64_t cm_threshold(cm_quantile *cm, uint64_t rank) {
359     uint64_t min_val = LLONG_MAX;
360 
361     uint64_t quant_min;
362     double   quant;
363     for (int i=0; i < cm->num_quantiles; i++) {
364         quant = cm->quantiles[i];
365         if (rank >= quant * cm->num_values) {
366             quant_min = 2 * cm->eps * rank / quant;
367         } else {
368             quant_min = 2 * cm->eps * (cm->num_values - rank) / (1 - quant);
369         }
370         if (quant_min < min_val) min_val = quant_min;
371     }
372 
373     return min_val;
374 }
375 
376