1 /**
2 * This module implements the Cormode-Muthukrishnan algorithm
3 * for computation of biased quantiles over data streams from
4 * "Effective Computation of Biased Quantiles over Data Streams"
5 *
6 */
7 #include <stdint.h>
8 #include <iso646.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <math.h>
12 #include <limits.h>
13 #include <stdio.h>
14 #include "heap.h"
15 #include "cm_quantile.h"
16
17 /* Static declarations */
18 static void cm_add_to_buffer(cm_quantile *cm, double value);
19 static double cm_insert_point_value(cm_quantile *cm);
20 static void cm_reset_insert_cursor(cm_quantile *cm);
21 static int cm_cursor_increment(cm_quantile *cm);
22 static void cm_insert_sample(cm_quantile *cm, cm_sample *position, cm_sample *new);
23 static void cm_append_sample(cm_quantile *cm, cm_sample *new);
24 static void cm_insert(cm_quantile *cm);
25 static void cm_compress(cm_quantile *cm);
26 static uint64_t cm_threshold(cm_quantile *cm, uint64_t rank);
27
28 // This is a comparison function that treats keys as doubles
compare_double_keys(register void * key1,register void * key2)29 static int compare_double_keys(register void* key1, register void* key2) {
30 // Cast them as double* and read them in
31 register double val1 = *((double*)key1);
32 register double val2 = *((double*)key2);
33
34 // Perform the comparison
35 if (val1 < val2)
36 return -1;
37 else if (val1 == val2)
38 return 0;
39 else
40 return 1;
41 }
42
43 /**
44 * Initializes the CM quantile struct
45 * @arg eps The maximum error for the quantiles
46 * @arg quantiles A sorted array of double quantile values, must be on (0, 1)
47 * @arg num_quants The number of entries in the quantiles array
48 * @arg cm_quantile The cm_quantile struct to initialize
49 * @return 0 on success.
50 */
init_cm_quantile(double eps,double * quantiles,uint32_t num_quants,cm_quantile * cm)51 int init_cm_quantile(double eps, double *quantiles, uint32_t num_quants, cm_quantile *cm) {
52 // Verify the sanity of epsilon
53 if (eps <= 0 or eps >= 0.5) return -1;
54
55 // Verify the quantiles
56 if (!num_quants) return -1;
57 for (int i=0; i < num_quants; i++) {
58 double val = quantiles[i];
59 if (val <= 0 or val >= 1) return -1;
60 }
61
62 // Check that we have a non-null cm
63 if (!cm) return -1;
64
65 // Initialize
66 cm->eps = eps;
67 cm->num_samples = 0;
68 cm->num_values = 0;
69 cm->samples = NULL;
70 cm->end = NULL;
71
72 // Copy the quantiles
73 cm->quantiles = malloc(num_quants * sizeof(double));
74 memcpy(cm->quantiles, quantiles, num_quants * sizeof(double));
75 cm->num_quantiles = num_quants;
76
77 // Initialize the buffers
78 heap *heaps = malloc(2*sizeof(heap));
79 cm->bufLess = heaps;
80 cm->bufMore = heaps+1;
81 heap_create(cm->bufLess, 0, compare_double_keys);
82 heap_create(cm->bufMore, 0, compare_double_keys);
83
84 // Setup the cursors
85 cm->insert.curs = NULL;
86 cm->compress.curs = NULL;
87
88 return 0;
89 }
90
91 /*
92 * Callback function to delete the sample inside the
93 * heap buffer.
94 */
free_buffer_sample(void * key,void * val)95 static void free_buffer_sample(void *key, void *val) {
96 free(val);
97 }
98
99 /**
100 * Destroy the CM quantile struct.
101 * @arg cm_quantile The cm_quantile to destroy
102 * @return 0 on success.
103 */
destroy_cm_quantile(cm_quantile * cm)104 int destroy_cm_quantile(cm_quantile *cm) {
105 // Free the quantiles
106 free(cm->quantiles);
107
108 // Destroy everything in the buffer
109 heap_foreach(cm->bufLess, free_buffer_sample);
110 heap_destroy(cm->bufLess);
111 heap_foreach(cm->bufMore, free_buffer_sample);
112 heap_destroy(cm->bufMore);
113
114 // Free the lower address, since they are allocated to be adjacent
115 free((cm->bufLess < cm->bufMore) ? cm->bufLess : cm->bufMore);
116
117 // Iterate through the linked list, free all
118 cm_sample *next;
119 cm_sample *current = cm->samples;
120 while (current) {
121 next = current->next;
122 free(current);
123 current = next;
124 }
125
126 return 0;
127 }
128
129 /**
130 * Adds a new sample to the struct
131 * @arg cm_quantile The cm_quantile to add to
132 * @arg sample The new sample value
133 * @return 0 on success.
134 */
cm_add_sample(cm_quantile * cm,double sample)135 int cm_add_sample(cm_quantile *cm, double sample) {
136 cm_add_to_buffer(cm, sample);
137 cm_insert(cm);
138 cm_compress(cm);
139 return 0;
140 }
141
142 /**
143 * Forces the internal buffers to be flushed,
144 * this allows query to have maximum accuracy.
145 * @arg cm_quantile The cm_quantile to add to
146 * @return 0 on success.
147 */
cm_flush(cm_quantile * cm)148 int cm_flush(cm_quantile *cm) {
149 int rounds = 0;
150 while (heap_size(cm->bufLess) or heap_size(cm->bufMore)) {
151 if (heap_size(cm->bufMore) == 0) cm_reset_insert_cursor(cm);
152 cm_insert(cm);
153 cm_compress(cm);
154 rounds++;
155 }
156 return 0;
157 }
158
159 /**
160 * Queries for a quantile value
161 * @arg cm_quantile The cm_quantile to query
162 * @arg quantile The quantile to query
163 * @return The value on success or 0.
164 */
cm_query(cm_quantile * cm,double quantile)165 double cm_query(cm_quantile *cm, double quantile) {
166 uint64_t rank = ceil(quantile * cm->num_values);
167 uint64_t min_rank=0;
168 uint64_t max_rank;
169 uint64_t threshold = ceil(cm_threshold(cm, rank) / 2.);
170
171 cm_sample *prev = cm->samples;
172 cm_sample *current = cm->samples;
173 while (current) {
174 max_rank = min_rank + current->width + current->delta;
175 if (max_rank > rank + threshold || min_rank > rank) {
176 break;
177 }
178 min_rank += current->width;
179 prev = current;
180 current = current->next;
181 }
182 return (prev) ? prev->value : 0;
183 }
184
185 /**
186 * Adds a new sample to the buffer
187 */
cm_add_to_buffer(cm_quantile * cm,double value)188 static void cm_add_to_buffer(cm_quantile *cm, double value) {
189 // Allocate a new sample
190 cm_sample *s = calloc(1, sizeof(cm_sample));
191 s->value = value;
192
193 /*
194 * Check the cursor value.
195 * Only use bufLess if we have at least a single value.
196 */
197 if (cm->num_values && value < cm_insert_point_value(cm)) {
198 heap_insert(cm->bufLess, &s->value, s);
199 } else {
200 heap_insert(cm->bufMore, &s->value, s);
201 }
202 }
203
204 // Returns the value under the insertion cursor or 0
cm_insert_point_value(cm_quantile * cm)205 static double cm_insert_point_value(cm_quantile *cm) {
206 return (cm->insert.curs) ? cm->insert.curs->value : 0;
207 }
208
209 // Resets the insert cursor
cm_reset_insert_cursor(cm_quantile * cm)210 static void cm_reset_insert_cursor(cm_quantile *cm) {
211 // Swap the buffers, reset the cursor
212 heap *tmp = cm->bufLess;
213 cm->bufLess = cm->bufMore;
214 cm->bufMore = tmp;
215 cm->insert.curs = NULL;
216 }
217
218 // Computes the number of items to process in one iteration
cm_cursor_increment(cm_quantile * cm)219 static int cm_cursor_increment(cm_quantile *cm) {
220 return ceil(cm->num_samples * cm->eps);
221 }
222
223 /* Inserts a new sample before the position sample */
cm_insert_sample(cm_quantile * cm,cm_sample * position,cm_sample * new)224 static void cm_insert_sample(cm_quantile *cm, cm_sample *position, cm_sample *new) {
225 // Inserting at the head
226 if (!position->prev) {
227 position->prev = new;
228 cm->samples = new;
229 new->next = position;
230 } else {
231 cm_sample *prev = position->prev;
232 prev->next = new;
233 position->prev = new;
234 new->prev = prev;
235 new->next = position;
236 }
237 }
238
239 /* Inserts a new sample at the end */
cm_append_sample(cm_quantile * cm,cm_sample * new)240 static void cm_append_sample(cm_quantile *cm, cm_sample *new) {
241 new->prev = cm->end;
242 cm->end->next = new;
243 cm->end = new;
244 }
245
246 /**
247 * Incrementally processes inserts by moving
248 * data from the buffer to the samples using a cursor
249 */
cm_insert(cm_quantile * cm)250 static void cm_insert(cm_quantile *cm) {
251 // Check if this is the first element
252 cm_sample *samp;
253 if (!cm->samples) {
254 if (!heap_delmin(cm->bufMore, NULL, (void**)&samp)) return;
255 samp->width = 1;
256 samp->delta = 0;
257 cm->samples = samp;
258 cm->end = samp;
259 cm->num_values++;
260 cm->num_samples++;
261 cm->insert.curs = samp;
262 return;
263 }
264
265 // Check if we need to initialize the cursor
266 if (!cm->insert.curs) {
267 cm->insert.curs = cm->samples;
268 }
269
270 // Handle adding values in the middle
271 int incr_size = cm_cursor_increment(cm);
272 double *val;
273 for (int i=0; i < incr_size and cm->insert.curs; i++) {
274 while (heap_min(cm->bufMore, (void**)&val, NULL) && *val <= cm_insert_point_value(cm)) {
275 heap_delmin(cm->bufMore, NULL, (void**)&samp);
276 samp->width = 1;
277 samp->delta = cm->insert.curs->width + cm->insert.curs->delta - 1;
278 cm_insert_sample(cm, cm->insert.curs, samp);
279 cm->num_values++;
280 cm->num_samples++;
281
282 // Check if we need to update the compress cursor
283 if (cm->compress.curs && cm->compress.curs->value >= samp->value) {
284 cm->compress.min_rank++;
285 }
286 }
287 // Increment the cursor
288 cm->insert.curs = cm->insert.curs->next;
289 }
290
291 // Handle adding values at the end
292 if (cm->insert.curs == NULL) {
293 while (heap_min(cm->bufMore, (void**)&val, NULL) && *val > cm->end->value) {
294 heap_delmin(cm->bufMore, NULL, (void**)&samp);
295 samp->width = 1;
296 samp->delta = 0;
297 cm_append_sample(cm, samp);
298 cm->num_values++;
299 cm->num_samples++;
300 }
301
302 // Reset the cursor
303 cm_reset_insert_cursor(cm);
304 }
305 }
306
307 /* Incrementally processes compression by using a cursor */
cm_compress(cm_quantile * cm)308 static void cm_compress(cm_quantile *cm) {
309 // Bail early if there is nothing to really compress..
310 if (cm->num_samples < 3) return;
311
312 // Check if we need to initialize the cursor
313 if (!cm->compress.curs) {
314 cm->compress.curs = cm->end->prev;
315 cm->compress.min_rank = cm->num_values - 1 - cm->compress.curs->width;
316 cm->compress.curs = cm->compress.curs->prev;
317 }
318
319 int incr_size = cm_cursor_increment(cm);
320 cm_sample *next, *prev;
321 uint64_t threshold;
322 uint64_t max_rank, test_val;
323 for (int i=0; i < incr_size and cm->compress.curs != cm->samples; i++) {
324 next = cm->compress.curs->next;
325 max_rank = cm->compress.min_rank + cm->compress.curs->width + cm->compress.curs->delta;
326 cm->compress.min_rank -= cm->compress.curs->width;
327
328 threshold = cm_threshold(cm, max_rank);
329 test_val = cm->compress.curs->width + next->width + next->delta;
330 if (test_val <= threshold) {
331 // Make sure we don't stomp the insertion cursor
332 if (cm->insert.curs == cm->compress.curs) {
333 cm->insert.curs = next;
334 }
335
336 // Combine the widths
337 next->width += cm->compress.curs->width;
338
339 // Remove the tuple
340 prev = cm->compress.curs->prev;
341 prev->next = next;
342 next->prev = prev;
343 free(cm->compress.curs);
344 cm->compress.curs = prev;
345
346 // Reduce the sample count
347 cm->num_samples--;
348 } else {
349 cm->compress.curs = cm->compress.curs->prev;
350 }
351 }
352
353 // Reset the cursor if we hit the start
354 if (cm->compress.curs == cm->samples) cm->compress.curs = NULL;
355 }
356
357 /* Computes the minimum threshold value */
cm_threshold(cm_quantile * cm,uint64_t rank)358 static uint64_t cm_threshold(cm_quantile *cm, uint64_t rank) {
359 uint64_t min_val = LLONG_MAX;
360
361 uint64_t quant_min;
362 double quant;
363 for (int i=0; i < cm->num_quantiles; i++) {
364 quant = cm->quantiles[i];
365 if (rank >= quant * cm->num_values) {
366 quant_min = 2 * cm->eps * rank / quant;
367 } else {
368 quant_min = 2 * cm->eps * (cm->num_values - rank) / (1 - quant);
369 }
370 if (quant_min < min_val) min_val = quant_min;
371 }
372
373 return min_val;
374 }
375
376