1 #include "numeric_index.h"
2 #include "redis_index.h"
3 #include "sys/param.h"
4 #include "rmutil/vector.h"
5 #include "rmutil/util.h"
6 #include "index.h"
7 #include "util/arr.h"
8 #include <math.h>
9 #include "redismodule.h"
10 #include "util/misc.h"
11 //#include "tests/time_sample.h"
12 #define NR_EXPONENT 4
13 #define NR_MAXRANGE_CARD 2500
14 #define NR_MAXRANGE_SIZE 10000
15 #define NR_MAX_DEPTH 2
16
17 typedef struct {
18 IndexIterator *it;
19 uint32_t lastRevId;
20 } NumericUnionCtx;
21
22 /* A callback called after a concurrent context regains execution context. When this happen we need
23 * to make sure the key hasn't been deleted or its structure changed, which will render the
24 * underlying iterators invalid */
NumericRangeIterator_OnReopen(RedisModuleKey * k,void * privdata)25 void NumericRangeIterator_OnReopen(RedisModuleKey *k, void *privdata) {
26 NumericUnionCtx *nu = privdata;
27 NumericRangeTree *t = RedisModule_ModuleTypeGetValue(k);
28
29 /* If the key has been deleted we'll get a NULL heere, so we just mark ourselves as EOF
30 * We simply abort the root iterator which is either a union of many ranges or a single range
31 *
32 * If the numeric range tree has chained (split, nodes deleted, etc) since we last closed it,
33 * We cannot continue iterating it, since the underlying pointers might be screwed.
34 * For now we will just stop processing this query. This causes the query to return bad results,
35 * so in the future we can try an reset the state here
36 */
37 if (k == NULL || t == NULL || t->revisionId != nu->lastRevId) {
38 nu->it->Abort(nu->it->ctx);
39 }
40 }
41
42 /* Returns 1 if the entire numeric range is contained between min and max */
NumericRange_Contained(NumericRange * n,double min,double max)43 static inline int NumericRange_Contained(NumericRange *n, double min, double max) {
44 if (!n) return 0;
45 int rc = (n->minVal >= min && n->maxVal <= max);
46
47 // printf("range %f..%f, min %f max %f, WITHIN? %d\n", n->minVal, n->maxVal, min, max, rc);
48 return rc;
49 }
50
51 /* Returns 1 if min and max are both inside the range. this is the opposite of _Within */
NumericRange_Contains(NumericRange * n,double min,double max)52 static inline int NumericRange_Contains(NumericRange *n, double min, double max) {
53 if (!n) return 0;
54 int rc = (n->minVal <= min && n->maxVal > max);
55 // printf("range %f..%f, min %f max %f, contains? %d\n", n->minVal, n->maxVal, min, max, rc);
56 return rc;
57 }
58
59 /* Returns 1 if there is any overlap between the range and min/max */
NumericRange_Overlaps(NumericRange * n,double min,double max)60 int NumericRange_Overlaps(NumericRange *n, double min, double max) {
61 if (!n) return 0;
62 int rc = (min >= n->minVal && min <= n->maxVal) || (max >= n->minVal && max <= n->maxVal);
63 // printf("range %f..%f, min %f max %f, overlaps? %d\n", n->minVal, n->maxVal, min, max, rc);
64 return rc;
65 }
66
NumericRange_Add(NumericRange * n,t_docId docId,double value,int checkCard)67 size_t NumericRange_Add(NumericRange *n, t_docId docId, double value, int checkCard) {
68
69 int add = 0;
70 if (checkCard) {
71 add = 1;
72 size_t card = n->card;
73 for (int i = 0; i < array_len(n->values); i++) {
74
75 if (n->values[i].value == value) {
76 add = 0;
77 n->values[i].appearances++;
78 break;
79 }
80 }
81 }
82 if (n->minVal == NF_NEGATIVE_INFINITY || value < n->minVal) n->minVal = value;
83 if (n->maxVal == NF_INFINITY || value > n->maxVal) n->maxVal = value;
84 if (add) {
85 if (n->card < n->splitCard) {
86 CardinalityValue val = {.value = value, .appearances = 1};
87 n->values = array_append(n->values, val);
88 n->unique_sum += value;
89 }
90 ++n->card;
91 }
92
93 return InvertedIndex_WriteNumericEntry(n->entries, docId, value);
94 }
95
NumericRange_Split(NumericRange * n,NumericRangeNode ** lp,NumericRangeNode ** rp)96 double NumericRange_Split(NumericRange *n, NumericRangeNode **lp, NumericRangeNode **rp) {
97
98 double split = (n->unique_sum) / (double)n->card;
99
100 // printf("split point :%f\n", split);
101 *lp = NewLeafNode(n->entries->numDocs / 2 + 1, n->minVal, split,
102 MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
103 *rp = NewLeafNode(n->entries->numDocs / 2 + 1, split, n->maxVal,
104 MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
105
106 RSIndexResult *res = NULL;
107 IndexReader *ir = NewNumericReader(NULL, n->entries, NULL);
108 while (INDEXREAD_OK == IR_Read(ir, &res)) {
109 NumericRange_Add(res->num.value < split ? (*lp)->range : (*rp)->range, res->docId,
110 res->num.value, 1);
111 }
112 IR_Free(ir);
113
114 // printf("Splitting node %p %f..%f, card %d size %d\n", n, n->minVal, n->maxVal, n->card,
115 // n->entries->numDocs);
116 // printf("left node: %d, right: %d\n", (*lp)->range->entries->numDocs,
117 // (*rp)->range->entries->numDocs);
118 return split;
119 }
120
NewLeafNode(size_t cap,double min,double max,size_t splitCard)121 NumericRangeNode *NewLeafNode(size_t cap, double min, double max, size_t splitCard) {
122
123 NumericRangeNode *n = rm_malloc(sizeof(NumericRangeNode));
124 n->left = NULL;
125 n->right = NULL;
126 n->value = 0;
127
128 n->maxDepth = 0;
129 n->range = rm_malloc(sizeof(NumericRange));
130
131 *n->range = (NumericRange){.minVal = min,
132 .maxVal = max,
133 .unique_sum = 0,
134 .card = 0,
135 .splitCard = splitCard,
136 .values = array_new(CardinalityValue, 1),
137 //.values = rm_calloc(splitCard, sizeof(CardinalityValue)),
138 .entries = NewInvertedIndex(Index_StoreNumeric, 1)};
139 return n;
140 }
141
NumericRangeNode_Add(NumericRangeNode * n,t_docId docId,double value)142 NRN_AddRv NumericRangeNode_Add(NumericRangeNode *n, t_docId docId, double value) {
143 NRN_AddRv rv = {.sz = 0, .changed = 0};
144 if (!NumericRangeNode_IsLeaf(n)) {
145 // if this node has already split but retains a range, just add to the range without checking
146 // anything
147 if (n->range) {
148 // Not leaf so should not add??
149 /* sz += */ NumericRange_Add(n->range, docId, value, 0);
150 // should continue after??
151 }
152
153 // recursively add to its left or right child.
154 NumericRangeNode **childP = value < n->value ? &n->left : &n->right;
155 NumericRangeNode *child = *childP;
156 // if the child has split we get 1 in return
157 rv = NumericRangeNode_Add(child, docId, value);
158
159 if (rv.changed) {
160 // if there was a split it means our max depth has increased.
161 // we are too deep - we don't retain this node's range anymore.
162 // this keeps memory footprint in check
163 if (++n->maxDepth > NR_MAX_DEPTH && n->range) {
164 InvertedIndex_Free(n->range->entries);
165 array_free(n->range->values);
166 rm_free(n->range);
167 n->range = NULL;
168 }
169
170 // check if we need to rebalance the child.
171 // To ease the rebalance we don't rebalance the root
172 // nor do we rebalance nodes that are with ranges (n->maxDepth > NR_MAX_DEPTH)
173 if ((child->right->maxDepth - child->left->maxDepth) > NR_MAX_DEPTH) { // role to the left
174 NumericRangeNode *right = child->right;
175 child->right = right->left;
176 right->left = child;
177 --child->maxDepth;
178 *childP = right; // replace the child with the new child
179 } else if ((child->left->maxDepth - child->right->maxDepth) >
180 NR_MAX_DEPTH) { // role to the right
181 NumericRangeNode *left = child->left;
182 child->left = left->right;
183 left->right = child;
184 --child->maxDepth;
185 *childP = left; // replace the child with the new child
186 }
187 }
188 // return 1 or 0 to our called, so this is done recursively
189 return rv;
190 }
191
192 // if this node is a leaf - we add AND check the cardinality. We only split leaf nodes
193 rv.sz = NumericRange_Add(n->range, docId, value, 1);
194 int card = n->range->card;
195 // printf("Added %d %f to node %f..%f, card now %zd, size now %zd\n", docId, value,
196 // n->range->minVal,
197 // n->range->maxVal, card, n->range->entries->numDocs);
198 if (card >= n->range->splitCard ||
199 (n->range->entries->numDocs > NR_MAXRANGE_SIZE && card > 1)) {
200
201 // split this node but don't delete its range
202 double split = NumericRange_Split(n->range, &n->left, &n->right);
203
204 n->value = split;
205
206 n->maxDepth = 1;
207 rv.changed = 1;
208 }
209
210 return rv;
211 }
212
213 /* Recursively add a node's children to the range. */
__recursiveAddRange(Vector * v,NumericRangeNode * n,double min,double max)214 void __recursiveAddRange(Vector *v, NumericRangeNode *n, double min, double max) {
215 if (!n) return;
216
217 if (n->range) {
218 // printf("min %f, max %f, range %f..%f, contained? %d, overlaps? %d, leaf? %d\n", min, max,
219 // n->range->minVal, n->range->maxVal, NumericRange_Contained(n->range, min, max),
220 // NumericRange_Overlaps(n->range, min, max), __isLeaf(n));
221 // if the range is completely contained in the search, we can just add it and not inspect any
222 // downwards
223 if (NumericRange_Contained(n->range, min, max)) {
224 Vector_Push(v, n->range);
225 return;
226 }
227 // No overlap at all - no need to do anything
228 if (!NumericRange_Overlaps(n->range, min, max)) {
229 return;
230 }
231 }
232
233 // for non leaf nodes - we try to descend into their children
234 if (!NumericRangeNode_IsLeaf(n)) {
235 __recursiveAddRange(v, n->left, min, max);
236 __recursiveAddRange(v, n->right, min, max);
237 } else if (NumericRange_Overlaps(n->range, min, max)) {
238 Vector_Push(v, n->range);
239 return;
240 }
241 }
242
243 /* Find the numeric ranges that fit the range we are looking for. We try to minimize the number of
244 * nodes we'll later need to union */
NumericRangeNode_FindRange(NumericRangeNode * n,double min,double max)245 Vector *NumericRangeNode_FindRange(NumericRangeNode *n, double min, double max) {
246
247 Vector *leaves = NewVector(NumericRange *, 8);
248 __recursiveAddRange(leaves, n, min, max);
249 // printf("Found %zd ranges for %f...%f\n", leaves->top, min, max);
250 // for (int i = 0; i < leaves->top; i++) {
251 // NumericRange *rng;
252 // Vector_Get(leaves, i, &rng);
253 // printf("%f...%f (%f). %d card, %d splitCard\n", rng->minVal, rng->maxVal,
254 // rng->maxVal - rng->minVal, rng->entries->numDocs, rng->splitCard);
255 // }
256
257 return leaves;
258 }
259
NumericRangeNode_Free(NumericRangeNode * n)260 void NumericRangeNode_Free(NumericRangeNode *n) {
261 if (!n) return;
262 if (n->range) {
263 InvertedIndex_Free(n->range->entries);
264 array_free(n->range->values);
265 rm_free(n->range);
266 n->range = NULL;
267 }
268
269 NumericRangeNode_Free(n->left);
270 NumericRangeNode_Free(n->right);
271
272 rm_free(n);
273 }
274
275 uint16_t numericTreesUniqueId = 0;
276
277 /* Create a new numeric range tree */
NewNumericRangeTree()278 NumericRangeTree *NewNumericRangeTree() {
279 #define GC_NODES_INITIAL_SIZE 10
280 NumericRangeTree *ret = rm_malloc(sizeof(NumericRangeTree));
281
282 ret->root = NewLeafNode(2, NF_NEGATIVE_INFINITY, NF_INFINITY, 2);
283 ret->numEntries = 0;
284 ret->numRanges = 1;
285 ret->revisionId = 0;
286 ret->lastDocId = 0;
287 ret->uniqueId = numericTreesUniqueId++;
288 return ret;
289 }
290
NumericRangeTree_Add(NumericRangeTree * t,t_docId docId,double value)291 int NumericRangeTree_Add(NumericRangeTree *t, t_docId docId, double value) {
292
293 // Do not allow duplicate entries. This might happen due to indexer bugs and we need to protect
294 // from it
295 if (docId <= t->lastDocId) {
296 return 0;
297 }
298 t->lastDocId = docId;
299
300 NRN_AddRv rv = NumericRangeNode_Add(t->root, docId, value);
301 // rc != 0 means the tree nodes have changed, and concurrent iteration is not allowed now
302 // we increment the revision id of the tree, so currently running query iterators on it
303 // will abort the next time they get execution context
304 if (rv.changed) {
305 t->revisionId++;
306 }
307 t->numRanges += rv.changed;
308 t->numEntries++;
309
310 return rv.sz;
311 }
312
NumericRangeTree_Find(NumericRangeTree * t,double min,double max)313 Vector *NumericRangeTree_Find(NumericRangeTree *t, double min, double max) {
314 return NumericRangeNode_FindRange(t->root, min, max);
315 }
316
NumericRangeNode_Traverse(NumericRangeNode * n,void (* callback)(NumericRangeNode * n,void * ctx),void * ctx)317 void NumericRangeNode_Traverse(NumericRangeNode *n,
318 void (*callback)(NumericRangeNode *n, void *ctx), void *ctx) {
319
320 callback(n, ctx);
321
322 if (n->left) {
323 NumericRangeNode_Traverse(n->left, callback, ctx);
324 }
325 if (n->right) {
326 NumericRangeNode_Traverse(n->right, callback, ctx);
327 }
328 }
329
NumericRangeTree_Free(NumericRangeTree * t)330 void NumericRangeTree_Free(NumericRangeTree *t) {
331 NumericRangeNode_Free(t->root);
332 rm_free(t);
333 }
334
NewNumericRangeIterator(const IndexSpec * sp,NumericRange * nr,const NumericFilter * f)335 IndexIterator *NewNumericRangeIterator(const IndexSpec *sp, NumericRange *nr,
336 const NumericFilter *f) {
337
338 // if this range is at either end of the filter, we need to check each record
339 if (NumericFilter_Match(f, nr->minVal) && NumericFilter_Match(f, nr->maxVal)) {
340 // make the filter NULL so the reader will ignore it
341 f = NULL;
342 }
343 IndexReader *ir = NewNumericReader(sp, nr->entries, f);
344
345 return NewReadIterator(ir);
346 }
347
348 /* Create a union iterator from the numeric filter, over all the sub-ranges in the tree that fit
349 * the filter */
createNumericIterator(const IndexSpec * sp,NumericRangeTree * t,const NumericFilter * f)350 IndexIterator *createNumericIterator(const IndexSpec *sp, NumericRangeTree *t,
351 const NumericFilter *f) {
352
353 Vector *v = NumericRangeTree_Find(t, f->min, f->max);
354 if (!v || Vector_Size(v) == 0) {
355 if (v) {
356 Vector_Free(v);
357 }
358 return NULL;
359 }
360
361 int n = Vector_Size(v);
362 // if we only selected one range - we can just iterate it without union or anything
363 if (n == 1) {
364 NumericRange *rng;
365 Vector_Get(v, 0, &rng);
366 IndexIterator *it = NewNumericRangeIterator(sp, rng, f);
367 Vector_Free(v);
368 return it;
369 }
370
371 // We create a union iterator, advancing a union on all the selected range,
372 // treating them as one consecutive range
373 IndexIterator **its = rm_calloc(n, sizeof(IndexIterator *));
374
375 for (size_t i = 0; i < n; i++) {
376 NumericRange *rng;
377 Vector_Get(v, i, &rng);
378 if (!rng) {
379 continue;
380 }
381
382 its[i] = NewNumericRangeIterator(sp, rng, f);
383 }
384 Vector_Free(v);
385
386 IndexIterator *it = NewUnionIterator(its, n, NULL, 1, 1);
387
388 return it;
389 }
390
391 RedisModuleType *NumericIndexType = NULL;
392 #define NUMERICINDEX_KEY_FMT "nm:%s/%s"
393
fmtRedisNumericIndexKey(RedisSearchCtx * ctx,const char * field)394 RedisModuleString *fmtRedisNumericIndexKey(RedisSearchCtx *ctx, const char *field) {
395 return RedisModule_CreateStringPrintf(ctx->redisCtx, NUMERICINDEX_KEY_FMT, ctx->spec->name,
396 field);
397 }
398
openNumericKeysDict(RedisSearchCtx * ctx,RedisModuleString * keyName,int write)399 static NumericRangeTree *openNumericKeysDict(RedisSearchCtx *ctx, RedisModuleString *keyName,
400 int write) {
401 KeysDictValue *kdv = dictFetchValue(ctx->spec->keysDict, keyName);
402 if (kdv) {
403 return kdv->p;
404 }
405 if (!write) {
406 return NULL;
407 }
408 kdv = rm_calloc(1, sizeof(*kdv));
409 kdv->dtor = (void (*)(void *))NumericRangeTree_Free;
410 kdv->p = NewNumericRangeTree();
411 dictAdd(ctx->spec->keysDict, keyName, kdv);
412 return kdv->p;
413 }
414
NewNumericFilterIterator(RedisSearchCtx * ctx,const NumericFilter * flt,ConcurrentSearchCtx * csx)415 struct indexIterator *NewNumericFilterIterator(RedisSearchCtx *ctx, const NumericFilter *flt,
416 ConcurrentSearchCtx *csx) {
417 RedisModuleString *s =
418 IndexSpec_GetFormattedKeyByName(ctx->spec, flt->fieldName, INDEXFLD_T_NUMERIC);
419 if (!s) {
420 return NULL;
421 }
422 RedisModuleKey *key = NULL;
423 NumericRangeTree *t = NULL;
424 if (!ctx->spec->keysDict) {
425 key = RedisModule_OpenKey(ctx->redisCtx, s, REDISMODULE_READ);
426 if (!key || RedisModule_ModuleTypeGetType(key) != NumericIndexType) {
427 return NULL;
428 }
429
430 t = RedisModule_ModuleTypeGetValue(key);
431 } else {
432 t = openNumericKeysDict(ctx, s, 0);
433 }
434
435 if (!t) {
436 return NULL;
437 }
438
439 IndexIterator *it = createNumericIterator(ctx->spec, t, flt);
440 if (!it) {
441 return NULL;
442 }
443
444 if (csx) {
445 NumericUnionCtx *uc = rm_malloc(sizeof(*uc));
446 uc->lastRevId = t->revisionId;
447 uc->it = it;
448 ConcurrentSearch_AddKey(csx, key, REDISMODULE_READ, s, NumericRangeIterator_OnReopen, uc,
449 rm_free);
450 }
451 return it;
452 }
453
OpenNumericIndex(RedisSearchCtx * ctx,RedisModuleString * keyName,RedisModuleKey ** idxKey)454 NumericRangeTree *OpenNumericIndex(RedisSearchCtx *ctx, RedisModuleString *keyName,
455 RedisModuleKey **idxKey) {
456
457 NumericRangeTree *t;
458 if (!ctx->spec->keysDict) {
459 RedisModuleKey *key_s = NULL;
460
461 if (!idxKey) {
462 idxKey = &key_s;
463 }
464
465 *idxKey = RedisModule_OpenKey(ctx->redisCtx, keyName, REDISMODULE_READ | REDISMODULE_WRITE);
466
467 int type = RedisModule_KeyType(*idxKey);
468 if (type != REDISMODULE_KEYTYPE_EMPTY &&
469 RedisModule_ModuleTypeGetType(*idxKey) != NumericIndexType) {
470 return NULL;
471 }
472
473 /* Create an empty value object if the key is currently empty. */
474 if (type == REDISMODULE_KEYTYPE_EMPTY) {
475 t = NewNumericRangeTree();
476 RedisModule_ModuleTypeSetValue((*idxKey), NumericIndexType, t);
477 } else {
478 t = RedisModule_ModuleTypeGetValue(*idxKey);
479 }
480 } else {
481 t = openNumericKeysDict(ctx, keyName, 1);
482 }
483 return t;
484 }
485
__numericIndex_memUsageCallback(NumericRangeNode * n,void * ctx)486 void __numericIndex_memUsageCallback(NumericRangeNode *n, void *ctx) {
487 unsigned long *sz = ctx;
488 *sz += sizeof(NumericRangeNode);
489
490 if (n->range) {
491 *sz += sizeof(NumericRange);
492 *sz += n->range->card * sizeof(double);
493 if (n->range->entries) {
494 *sz += InvertedIndex_MemUsage(n->range->entries);
495 }
496 }
497 }
498
NumericIndexType_MemUsage(const void * value)499 unsigned long NumericIndexType_MemUsage(const void *value) {
500 const NumericRangeTree *t = value;
501 unsigned long ret = sizeof(NumericRangeTree);
502 NumericRangeNode_Traverse(t->root, __numericIndex_memUsageCallback, &ret);
503 return ret;
504 }
505
506 #define NUMERIC_INDEX_ENCVER 1
507
NumericIndexType_Register(RedisModuleCtx * ctx)508 int NumericIndexType_Register(RedisModuleCtx *ctx) {
509
510 RedisModuleTypeMethods tm = {.version = REDISMODULE_TYPE_METHOD_VERSION,
511 .rdb_load = NumericIndexType_RdbLoad,
512 .rdb_save = NumericIndexType_RdbSave,
513 .aof_rewrite = GenericAofRewrite_DisabledHandler,
514 .free = NumericIndexType_Free,
515 .mem_usage = (const void *)NumericIndexType_MemUsage};
516
517 NumericIndexType = RedisModule_CreateDataType(ctx, "numericdx", NUMERIC_INDEX_ENCVER, &tm);
518 if (NumericIndexType == NULL) {
519 return REDISMODULE_ERR;
520 }
521
522 return REDISMODULE_OK;
523 }
524
525 /* A single entry in a numeric index's single range. Since entries are binned together, each needs
526 * to have the exact value */
527 typedef struct {
528 t_docId docId;
529 double value;
530 } NumericRangeEntry;
531
cmpdocId(const void * p1,const void * p2)532 static int cmpdocId(const void *p1, const void *p2) {
533 NumericRangeEntry *e1 = (NumericRangeEntry *)p1;
534 NumericRangeEntry *e2 = (NumericRangeEntry *)p2;
535
536 return (int)e1->docId - (int)e2->docId;
537 }
538
539 /** Version 0 stores the number of entries beforehand, and then loads them */
loadV0(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)540 static size_t loadV0(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
541 uint64_t num = RedisModule_LoadUnsigned(rdb);
542 if (!num) {
543 return 0;
544 }
545
546 *entriespp = array_newlen(NumericRangeEntry, num);
547 NumericRangeEntry *entries = *entriespp;
548 for (size_t ii = 0; ii < num; ++ii) {
549 entries[ii].docId = RedisModule_LoadUnsigned(rdb);
550 entries[ii].value = RedisModule_LoadDouble(rdb);
551 }
552 return num;
553 }
554
555 #define NUMERIC_IDX_INITIAL_LOAD_SIZE 1 << 16
556 /** Version 0 stores (id,value) pairs, with a final 0 as a terminator */
loadV1(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)557 static size_t loadV1(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
558 NumericRangeEntry *entries = array_new(NumericRangeEntry, NUMERIC_IDX_INITIAL_LOAD_SIZE);
559 while (1) {
560 NumericRangeEntry cur;
561 cur.docId = RedisModule_LoadUnsigned(rdb);
562 if (!cur.docId) {
563 break;
564 }
565 cur.value = RedisModule_LoadDouble(rdb);
566 entries = array_append(entries, cur);
567 }
568 *entriespp = entries;
569 return array_len(entries);
570 }
571
NumericIndexType_RdbLoad(RedisModuleIO * rdb,int encver)572 void *NumericIndexType_RdbLoad(RedisModuleIO *rdb, int encver) {
573 if (encver > NUMERIC_INDEX_ENCVER) {
574 return NULL;
575 }
576
577 NumericRangeEntry *entries = NULL;
578 size_t numEntries = 0;
579 if (encver == 0) {
580 numEntries = loadV0(rdb, &entries);
581 } else if (encver == 1) {
582 numEntries = loadV1(rdb, &entries);
583 } else {
584 return NULL; // Unknown version
585 }
586
587 // sort the entries by doc id, as they were not saved in this order
588 qsort(entries, numEntries, sizeof(NumericRangeEntry), cmpdocId);
589 NumericRangeTree *t = NewNumericRangeTree();
590
591 // now push them in order into the tree
592 for (size_t i = 0; i < numEntries; i++) {
593 NumericRangeTree_Add(t, entries[i].docId, entries[i].value);
594 }
595 array_free(entries);
596 return t;
597 }
598
599 struct niRdbSaveCtx {
600 RedisModuleIO *rdb;
601 };
602
numericIndex_rdbSaveCallback(NumericRangeNode * n,void * ctx)603 static void numericIndex_rdbSaveCallback(NumericRangeNode *n, void *ctx) {
604 struct niRdbSaveCtx *rctx = ctx;
605
606 if (NumericRangeNode_IsLeaf(n) && n->range) {
607 NumericRange *rng = n->range;
608 RSIndexResult *res = NULL;
609 IndexReader *ir = NewNumericReader(NULL, rng->entries, NULL);
610
611 while (INDEXREAD_OK == IR_Read(ir, &res)) {
612 RedisModule_SaveUnsigned(rctx->rdb, res->docId);
613 RedisModule_SaveDouble(rctx->rdb, res->num.value);
614 }
615 IR_Free(ir);
616 }
617 }
NumericIndexType_RdbSave(RedisModuleIO * rdb,void * value)618 void NumericIndexType_RdbSave(RedisModuleIO *rdb, void *value) {
619
620 NumericRangeTree *t = value;
621 struct niRdbSaveCtx ctx = {rdb};
622
623 NumericRangeNode_Traverse(t->root, numericIndex_rdbSaveCallback, &ctx);
624 // Save the final record
625 RedisModule_SaveUnsigned(rdb, 0);
626 }
627
NumericIndexType_Digest(RedisModuleDigest * digest,void * value)628 void NumericIndexType_Digest(RedisModuleDigest *digest, void *value) {
629 }
630
NumericIndexType_Free(void * value)631 void NumericIndexType_Free(void *value) {
632 NumericRangeTree *t = value;
633 NumericRangeTree_Free(t);
634 }
635
NumericRangeTreeIterator_New(NumericRangeTree * t)636 NumericRangeTreeIterator *NumericRangeTreeIterator_New(NumericRangeTree *t) {
637 #define NODE_STACK_INITIAL_SIZE 4
638 NumericRangeTreeIterator *iter = rm_malloc(sizeof(NumericRangeTreeIterator));
639 iter->nodesStack = array_new(NumericRangeNode *, NODE_STACK_INITIAL_SIZE);
640 array_append(iter->nodesStack, t->root);
641 return iter;
642 }
643
NumericRangeTreeIterator_Next(NumericRangeTreeIterator * iter)644 NumericRangeNode *NumericRangeTreeIterator_Next(NumericRangeTreeIterator *iter) {
645 if (array_len(iter->nodesStack) == 0) {
646 return NULL;
647 }
648 NumericRangeNode *ret = array_pop(iter->nodesStack);
649 if (!NumericRangeNode_IsLeaf(ret)) {
650 iter->nodesStack = array_append(iter->nodesStack, ret->left);
651 iter->nodesStack = array_append(iter->nodesStack, ret->right);
652 }
653
654 return ret;
655 }
656
NumericRangeTreeIterator_Free(NumericRangeTreeIterator * iter)657 void NumericRangeTreeIterator_Free(NumericRangeTreeIterator *iter) {
658 array_free(iter->nodesStack);
659 rm_free(iter);
660 }
661