1 #include "numeric_index.h"
2 #include "redis_index.h"
3 #include "sys/param.h"
4 #include "rmutil/vector.h"
5 #include "rmutil/util.h"
6 #include "index.h"
7 #include "util/arr.h"
8 #include <math.h>
9 #include "redismodule.h"
10 #include "util/misc.h"
11 //#include "tests/time_sample.h"
12 #define NR_EXPONENT 4
13 #define NR_MAXRANGE_CARD 2500
14 #define NR_MAXRANGE_SIZE 10000
15 #define NR_MAX_DEPTH 2
16 
17 typedef struct {
18   IndexIterator *it;
19   uint32_t lastRevId;
20 } NumericUnionCtx;
21 
22 /* A callback called after a concurrent context regains execution context. When this happen we need
23  * to make sure the key hasn't been deleted or its structure changed, which will render the
24  * underlying iterators invalid */
NumericRangeIterator_OnReopen(RedisModuleKey * k,void * privdata)25 void NumericRangeIterator_OnReopen(RedisModuleKey *k, void *privdata) {
26   NumericUnionCtx *nu = privdata;
27   NumericRangeTree *t = RedisModule_ModuleTypeGetValue(k);
28 
29   /* If the key has been deleted we'll get a NULL heere, so we just mark ourselves as EOF
30    * We simply abort the root iterator which is either a union of many ranges or a single range
31    *
32    * If the numeric range tree has chained (split, nodes deleted, etc) since we last closed it,
33    * We cannot continue iterating it, since the underlying pointers might be screwed.
34    * For now we will just stop processing this query. This causes the query to return bad results,
35    * so in the future we can try an reset the state here
36    */
37   if (k == NULL || t == NULL || t->revisionId != nu->lastRevId) {
38     nu->it->Abort(nu->it->ctx);
39   }
40 }
41 
42 /* Returns 1 if the entire numeric range is contained between min and max */
NumericRange_Contained(NumericRange * n,double min,double max)43 static inline int NumericRange_Contained(NumericRange *n, double min, double max) {
44   if (!n) return 0;
45   int rc = (n->minVal >= min && n->maxVal <= max);
46 
47   // printf("range %f..%f, min %f max %f, WITHIN? %d\n", n->minVal, n->maxVal, min, max, rc);
48   return rc;
49 }
50 
51 /* Returns 1 if min and max are both inside the range. this is the opposite of _Within */
NumericRange_Contains(NumericRange * n,double min,double max)52 static inline int NumericRange_Contains(NumericRange *n, double min, double max) {
53   if (!n) return 0;
54   int rc = (n->minVal <= min && n->maxVal > max);
55   // printf("range %f..%f, min %f max %f, contains? %d\n", n->minVal, n->maxVal, min, max, rc);
56   return rc;
57 }
58 
59 /* Returns 1 if there is any overlap between the range and min/max */
NumericRange_Overlaps(NumericRange * n,double min,double max)60 int NumericRange_Overlaps(NumericRange *n, double min, double max) {
61   if (!n) return 0;
62   int rc = (min >= n->minVal && min <= n->maxVal) || (max >= n->minVal && max <= n->maxVal);
63   // printf("range %f..%f, min %f max %f, overlaps? %d\n", n->minVal, n->maxVal, min, max, rc);
64   return rc;
65 }
66 
NumericRange_Add(NumericRange * n,t_docId docId,double value,int checkCard)67 size_t NumericRange_Add(NumericRange *n, t_docId docId, double value, int checkCard) {
68 
69   int add = 0;
70   if (checkCard) {
71     add = 1;
72     size_t card = n->card;
73     for (int i = 0; i < array_len(n->values); i++) {
74 
75       if (n->values[i].value == value) {
76         add = 0;
77         n->values[i].appearances++;
78         break;
79       }
80     }
81   }
82   if (n->minVal == NF_NEGATIVE_INFINITY || value < n->minVal) n->minVal = value;
83   if (n->maxVal == NF_INFINITY || value > n->maxVal) n->maxVal = value;
84   if (add) {
85     if (n->card < n->splitCard) {
86       CardinalityValue val = {.value = value, .appearances = 1};
87       n->values = array_append(n->values, val);
88       n->unique_sum += value;
89     }
90     ++n->card;
91   }
92 
93   return InvertedIndex_WriteNumericEntry(n->entries, docId, value);
94 }
95 
NumericRange_Split(NumericRange * n,NumericRangeNode ** lp,NumericRangeNode ** rp)96 double NumericRange_Split(NumericRange *n, NumericRangeNode **lp, NumericRangeNode **rp) {
97 
98   double split = (n->unique_sum) / (double)n->card;
99 
100   // printf("split point :%f\n", split);
101   *lp = NewLeafNode(n->entries->numDocs / 2 + 1, n->minVal, split,
102                     MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
103   *rp = NewLeafNode(n->entries->numDocs / 2 + 1, split, n->maxVal,
104                     MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
105 
106   RSIndexResult *res = NULL;
107   IndexReader *ir = NewNumericReader(NULL, n->entries, NULL);
108   while (INDEXREAD_OK == IR_Read(ir, &res)) {
109     NumericRange_Add(res->num.value < split ? (*lp)->range : (*rp)->range, res->docId,
110                      res->num.value, 1);
111   }
112   IR_Free(ir);
113 
114   // printf("Splitting node %p %f..%f, card %d size %d\n", n, n->minVal, n->maxVal, n->card,
115   //        n->entries->numDocs);
116   // printf("left node: %d, right: %d\n", (*lp)->range->entries->numDocs,
117   //        (*rp)->range->entries->numDocs);
118   return split;
119 }
120 
NewLeafNode(size_t cap,double min,double max,size_t splitCard)121 NumericRangeNode *NewLeafNode(size_t cap, double min, double max, size_t splitCard) {
122 
123   NumericRangeNode *n = rm_malloc(sizeof(NumericRangeNode));
124   n->left = NULL;
125   n->right = NULL;
126   n->value = 0;
127 
128   n->maxDepth = 0;
129   n->range = rm_malloc(sizeof(NumericRange));
130 
131   *n->range = (NumericRange){.minVal = min,
132                              .maxVal = max,
133                              .unique_sum = 0,
134                              .card = 0,
135                              .splitCard = splitCard,
136                              .values = array_new(CardinalityValue, 1),
137                              //.values = rm_calloc(splitCard, sizeof(CardinalityValue)),
138                              .entries = NewInvertedIndex(Index_StoreNumeric, 1)};
139   return n;
140 }
141 
NumericRangeNode_Add(NumericRangeNode * n,t_docId docId,double value)142 NRN_AddRv NumericRangeNode_Add(NumericRangeNode *n, t_docId docId, double value) {
143   NRN_AddRv rv = {.sz = 0, .changed = 0};
144   if (!NumericRangeNode_IsLeaf(n)) {
145     // if this node has already split but retains a range, just add to the range without checking
146     // anything
147     if (n->range) {
148       // Not leaf so should not add??
149       /* sz += */ NumericRange_Add(n->range, docId, value, 0);
150       // should continue after??
151     }
152 
153     // recursively add to its left or right child.
154     NumericRangeNode **childP = value < n->value ? &n->left : &n->right;
155     NumericRangeNode *child = *childP;
156     // if the child has split we get 1 in return
157     rv = NumericRangeNode_Add(child, docId, value);
158 
159     if (rv.changed) {
160       // if there was a split it means our max depth has increased.
161       // we are too deep - we don't retain this node's range anymore.
162       // this keeps memory footprint in check
163       if (++n->maxDepth > NR_MAX_DEPTH && n->range) {
164         InvertedIndex_Free(n->range->entries);
165         array_free(n->range->values);
166         rm_free(n->range);
167         n->range = NULL;
168       }
169 
170       // check if we need to rebalance the child.
171       // To ease the rebalance we don't rebalance the root
172       // nor do we rebalance nodes that are with ranges (n->maxDepth > NR_MAX_DEPTH)
173       if ((child->right->maxDepth - child->left->maxDepth) > NR_MAX_DEPTH) {  // role to the left
174         NumericRangeNode *right = child->right;
175         child->right = right->left;
176         right->left = child;
177         --child->maxDepth;
178         *childP = right;  // replace the child with the new child
179       } else if ((child->left->maxDepth - child->right->maxDepth) >
180                  NR_MAX_DEPTH) {  // role to the right
181         NumericRangeNode *left = child->left;
182         child->left = left->right;
183         left->right = child;
184         --child->maxDepth;
185         *childP = left;  // replace the child with the new child
186       }
187     }
188     // return 1 or 0 to our called, so this is done recursively
189     return rv;
190   }
191 
192   // if this node is a leaf - we add AND check the cardinality. We only split leaf nodes
193   rv.sz = NumericRange_Add(n->range, docId, value, 1);
194   int card = n->range->card;
195   // printf("Added %d %f to node %f..%f, card now %zd, size now %zd\n", docId, value,
196   // n->range->minVal,
197   //        n->range->maxVal, card, n->range->entries->numDocs);
198   if (card >= n->range->splitCard ||
199       (n->range->entries->numDocs > NR_MAXRANGE_SIZE && card > 1)) {
200 
201     // split this node but don't delete its range
202     double split = NumericRange_Split(n->range, &n->left, &n->right);
203 
204     n->value = split;
205 
206     n->maxDepth = 1;
207     rv.changed = 1;
208   }
209 
210   return rv;
211 }
212 
213 /* Recursively add a node's children to the range. */
__recursiveAddRange(Vector * v,NumericRangeNode * n,double min,double max)214 void __recursiveAddRange(Vector *v, NumericRangeNode *n, double min, double max) {
215   if (!n) return;
216 
217   if (n->range) {
218     // printf("min %f, max %f, range %f..%f, contained? %d, overlaps? %d, leaf? %d\n", min, max,
219     //        n->range->minVal, n->range->maxVal, NumericRange_Contained(n->range, min, max),
220     //        NumericRange_Overlaps(n->range, min, max), __isLeaf(n));
221     // if the range is completely contained in the search, we can just add it and not inspect any
222     // downwards
223     if (NumericRange_Contained(n->range, min, max)) {
224       Vector_Push(v, n->range);
225       return;
226     }
227     // No overlap at all - no need to do anything
228     if (!NumericRange_Overlaps(n->range, min, max)) {
229       return;
230     }
231   }
232 
233   // for non leaf nodes - we try to descend into their children
234   if (!NumericRangeNode_IsLeaf(n)) {
235     __recursiveAddRange(v, n->left, min, max);
236     __recursiveAddRange(v, n->right, min, max);
237   } else if (NumericRange_Overlaps(n->range, min, max)) {
238     Vector_Push(v, n->range);
239     return;
240   }
241 }
242 
243 /* Find the numeric ranges that fit the range we are looking for. We try to minimize the number of
244  * nodes we'll later need to union */
NumericRangeNode_FindRange(NumericRangeNode * n,double min,double max)245 Vector *NumericRangeNode_FindRange(NumericRangeNode *n, double min, double max) {
246 
247   Vector *leaves = NewVector(NumericRange *, 8);
248   __recursiveAddRange(leaves, n, min, max);
249   // printf("Found %zd ranges for %f...%f\n", leaves->top, min, max);
250   // for (int i = 0; i < leaves->top; i++) {
251   //   NumericRange *rng;
252   //   Vector_Get(leaves, i, &rng);
253   //   printf("%f...%f (%f). %d card, %d splitCard\n", rng->minVal, rng->maxVal,
254   //          rng->maxVal - rng->minVal, rng->entries->numDocs, rng->splitCard);
255   // }
256 
257   return leaves;
258 }
259 
NumericRangeNode_Free(NumericRangeNode * n)260 void NumericRangeNode_Free(NumericRangeNode *n) {
261   if (!n) return;
262   if (n->range) {
263     InvertedIndex_Free(n->range->entries);
264     array_free(n->range->values);
265     rm_free(n->range);
266     n->range = NULL;
267   }
268 
269   NumericRangeNode_Free(n->left);
270   NumericRangeNode_Free(n->right);
271 
272   rm_free(n);
273 }
274 
275 uint16_t numericTreesUniqueId = 0;
276 
277 /* Create a new numeric range tree */
NewNumericRangeTree()278 NumericRangeTree *NewNumericRangeTree() {
279 #define GC_NODES_INITIAL_SIZE 10
280   NumericRangeTree *ret = rm_malloc(sizeof(NumericRangeTree));
281 
282   ret->root = NewLeafNode(2, NF_NEGATIVE_INFINITY, NF_INFINITY, 2);
283   ret->numEntries = 0;
284   ret->numRanges = 1;
285   ret->revisionId = 0;
286   ret->lastDocId = 0;
287   ret->uniqueId = numericTreesUniqueId++;
288   return ret;
289 }
290 
NumericRangeTree_Add(NumericRangeTree * t,t_docId docId,double value)291 int NumericRangeTree_Add(NumericRangeTree *t, t_docId docId, double value) {
292 
293   // Do not allow duplicate entries. This might happen due to indexer bugs and we need to protect
294   // from it
295   if (docId <= t->lastDocId) {
296     return 0;
297   }
298   t->lastDocId = docId;
299 
300   NRN_AddRv rv = NumericRangeNode_Add(t->root, docId, value);
301   // rc != 0 means the tree nodes have changed, and concurrent iteration is not allowed now
302   // we increment the revision id of the tree, so currently running query iterators on it
303   // will abort the next time they get execution context
304   if (rv.changed) {
305     t->revisionId++;
306   }
307   t->numRanges += rv.changed;
308   t->numEntries++;
309 
310   return rv.sz;
311 }
312 
NumericRangeTree_Find(NumericRangeTree * t,double min,double max)313 Vector *NumericRangeTree_Find(NumericRangeTree *t, double min, double max) {
314   return NumericRangeNode_FindRange(t->root, min, max);
315 }
316 
NumericRangeNode_Traverse(NumericRangeNode * n,void (* callback)(NumericRangeNode * n,void * ctx),void * ctx)317 void NumericRangeNode_Traverse(NumericRangeNode *n,
318                                void (*callback)(NumericRangeNode *n, void *ctx), void *ctx) {
319 
320   callback(n, ctx);
321 
322   if (n->left) {
323     NumericRangeNode_Traverse(n->left, callback, ctx);
324   }
325   if (n->right) {
326     NumericRangeNode_Traverse(n->right, callback, ctx);
327   }
328 }
329 
NumericRangeTree_Free(NumericRangeTree * t)330 void NumericRangeTree_Free(NumericRangeTree *t) {
331   NumericRangeNode_Free(t->root);
332   rm_free(t);
333 }
334 
NewNumericRangeIterator(const IndexSpec * sp,NumericRange * nr,const NumericFilter * f)335 IndexIterator *NewNumericRangeIterator(const IndexSpec *sp, NumericRange *nr,
336                                        const NumericFilter *f) {
337 
338   // if this range is at either end of the filter, we need to check each record
339   if (NumericFilter_Match(f, nr->minVal) && NumericFilter_Match(f, nr->maxVal)) {
340     // make the filter NULL so the reader will ignore it
341     f = NULL;
342   }
343   IndexReader *ir = NewNumericReader(sp, nr->entries, f);
344 
345   return NewReadIterator(ir);
346 }
347 
348 /* Create a union iterator from the numeric filter, over all the sub-ranges in the tree that fit
349  * the filter */
createNumericIterator(const IndexSpec * sp,NumericRangeTree * t,const NumericFilter * f)350 IndexIterator *createNumericIterator(const IndexSpec *sp, NumericRangeTree *t,
351                                      const NumericFilter *f) {
352 
353   Vector *v = NumericRangeTree_Find(t, f->min, f->max);
354   if (!v || Vector_Size(v) == 0) {
355     if (v) {
356       Vector_Free(v);
357     }
358     return NULL;
359   }
360 
361   int n = Vector_Size(v);
362   // if we only selected one range - we can just iterate it without union or anything
363   if (n == 1) {
364     NumericRange *rng;
365     Vector_Get(v, 0, &rng);
366     IndexIterator *it = NewNumericRangeIterator(sp, rng, f);
367     Vector_Free(v);
368     return it;
369   }
370 
371   // We create a  union iterator, advancing a union on all the selected range,
372   // treating them as one consecutive range
373   IndexIterator **its = rm_calloc(n, sizeof(IndexIterator *));
374 
375   for (size_t i = 0; i < n; i++) {
376     NumericRange *rng;
377     Vector_Get(v, i, &rng);
378     if (!rng) {
379       continue;
380     }
381 
382     its[i] = NewNumericRangeIterator(sp, rng, f);
383   }
384   Vector_Free(v);
385 
386   IndexIterator *it = NewUnionIterator(its, n, NULL, 1, 1);
387 
388   return it;
389 }
390 
391 RedisModuleType *NumericIndexType = NULL;
392 #define NUMERICINDEX_KEY_FMT "nm:%s/%s"
393 
fmtRedisNumericIndexKey(RedisSearchCtx * ctx,const char * field)394 RedisModuleString *fmtRedisNumericIndexKey(RedisSearchCtx *ctx, const char *field) {
395   return RedisModule_CreateStringPrintf(ctx->redisCtx, NUMERICINDEX_KEY_FMT, ctx->spec->name,
396                                         field);
397 }
398 
openNumericKeysDict(RedisSearchCtx * ctx,RedisModuleString * keyName,int write)399 static NumericRangeTree *openNumericKeysDict(RedisSearchCtx *ctx, RedisModuleString *keyName,
400                                              int write) {
401   KeysDictValue *kdv = dictFetchValue(ctx->spec->keysDict, keyName);
402   if (kdv) {
403     return kdv->p;
404   }
405   if (!write) {
406     return NULL;
407   }
408   kdv = rm_calloc(1, sizeof(*kdv));
409   kdv->dtor = (void (*)(void *))NumericRangeTree_Free;
410   kdv->p = NewNumericRangeTree();
411   dictAdd(ctx->spec->keysDict, keyName, kdv);
412   return kdv->p;
413 }
414 
NewNumericFilterIterator(RedisSearchCtx * ctx,const NumericFilter * flt,ConcurrentSearchCtx * csx)415 struct indexIterator *NewNumericFilterIterator(RedisSearchCtx *ctx, const NumericFilter *flt,
416                                                ConcurrentSearchCtx *csx) {
417   RedisModuleString *s =
418       IndexSpec_GetFormattedKeyByName(ctx->spec, flt->fieldName, INDEXFLD_T_NUMERIC);
419   if (!s) {
420     return NULL;
421   }
422   RedisModuleKey *key = NULL;
423   NumericRangeTree *t = NULL;
424   if (!ctx->spec->keysDict) {
425     key = RedisModule_OpenKey(ctx->redisCtx, s, REDISMODULE_READ);
426     if (!key || RedisModule_ModuleTypeGetType(key) != NumericIndexType) {
427       return NULL;
428     }
429 
430     t = RedisModule_ModuleTypeGetValue(key);
431   } else {
432     t = openNumericKeysDict(ctx, s, 0);
433   }
434 
435   if (!t) {
436     return NULL;
437   }
438 
439   IndexIterator *it = createNumericIterator(ctx->spec, t, flt);
440   if (!it) {
441     return NULL;
442   }
443 
444   if (csx) {
445     NumericUnionCtx *uc = rm_malloc(sizeof(*uc));
446     uc->lastRevId = t->revisionId;
447     uc->it = it;
448     ConcurrentSearch_AddKey(csx, key, REDISMODULE_READ, s, NumericRangeIterator_OnReopen, uc,
449                             rm_free);
450   }
451   return it;
452 }
453 
OpenNumericIndex(RedisSearchCtx * ctx,RedisModuleString * keyName,RedisModuleKey ** idxKey)454 NumericRangeTree *OpenNumericIndex(RedisSearchCtx *ctx, RedisModuleString *keyName,
455                                    RedisModuleKey **idxKey) {
456 
457   NumericRangeTree *t;
458   if (!ctx->spec->keysDict) {
459     RedisModuleKey *key_s = NULL;
460 
461     if (!idxKey) {
462       idxKey = &key_s;
463     }
464 
465     *idxKey = RedisModule_OpenKey(ctx->redisCtx, keyName, REDISMODULE_READ | REDISMODULE_WRITE);
466 
467     int type = RedisModule_KeyType(*idxKey);
468     if (type != REDISMODULE_KEYTYPE_EMPTY &&
469         RedisModule_ModuleTypeGetType(*idxKey) != NumericIndexType) {
470       return NULL;
471     }
472 
473     /* Create an empty value object if the key is currently empty. */
474     if (type == REDISMODULE_KEYTYPE_EMPTY) {
475       t = NewNumericRangeTree();
476       RedisModule_ModuleTypeSetValue((*idxKey), NumericIndexType, t);
477     } else {
478       t = RedisModule_ModuleTypeGetValue(*idxKey);
479     }
480   } else {
481     t = openNumericKeysDict(ctx, keyName, 1);
482   }
483   return t;
484 }
485 
__numericIndex_memUsageCallback(NumericRangeNode * n,void * ctx)486 void __numericIndex_memUsageCallback(NumericRangeNode *n, void *ctx) {
487   unsigned long *sz = ctx;
488   *sz += sizeof(NumericRangeNode);
489 
490   if (n->range) {
491     *sz += sizeof(NumericRange);
492     *sz += n->range->card * sizeof(double);
493     if (n->range->entries) {
494       *sz += InvertedIndex_MemUsage(n->range->entries);
495     }
496   }
497 }
498 
NumericIndexType_MemUsage(const void * value)499 unsigned long NumericIndexType_MemUsage(const void *value) {
500   const NumericRangeTree *t = value;
501   unsigned long ret = sizeof(NumericRangeTree);
502   NumericRangeNode_Traverse(t->root, __numericIndex_memUsageCallback, &ret);
503   return ret;
504 }
505 
506 #define NUMERIC_INDEX_ENCVER 1
507 
NumericIndexType_Register(RedisModuleCtx * ctx)508 int NumericIndexType_Register(RedisModuleCtx *ctx) {
509 
510   RedisModuleTypeMethods tm = {.version = REDISMODULE_TYPE_METHOD_VERSION,
511                                .rdb_load = NumericIndexType_RdbLoad,
512                                .rdb_save = NumericIndexType_RdbSave,
513                                .aof_rewrite = GenericAofRewrite_DisabledHandler,
514                                .free = NumericIndexType_Free,
515                                .mem_usage = (const void *)NumericIndexType_MemUsage};
516 
517   NumericIndexType = RedisModule_CreateDataType(ctx, "numericdx", NUMERIC_INDEX_ENCVER, &tm);
518   if (NumericIndexType == NULL) {
519     return REDISMODULE_ERR;
520   }
521 
522   return REDISMODULE_OK;
523 }
524 
525 /* A single entry in a numeric index's single range. Since entries are binned together, each needs
526  * to have the exact value */
527 typedef struct {
528   t_docId docId;
529   double value;
530 } NumericRangeEntry;
531 
cmpdocId(const void * p1,const void * p2)532 static int cmpdocId(const void *p1, const void *p2) {
533   NumericRangeEntry *e1 = (NumericRangeEntry *)p1;
534   NumericRangeEntry *e2 = (NumericRangeEntry *)p2;
535 
536   return (int)e1->docId - (int)e2->docId;
537 }
538 
539 /** Version 0 stores the number of entries beforehand, and then loads them */
loadV0(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)540 static size_t loadV0(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
541   uint64_t num = RedisModule_LoadUnsigned(rdb);
542   if (!num) {
543     return 0;
544   }
545 
546   *entriespp = array_newlen(NumericRangeEntry, num);
547   NumericRangeEntry *entries = *entriespp;
548   for (size_t ii = 0; ii < num; ++ii) {
549     entries[ii].docId = RedisModule_LoadUnsigned(rdb);
550     entries[ii].value = RedisModule_LoadDouble(rdb);
551   }
552   return num;
553 }
554 
555 #define NUMERIC_IDX_INITIAL_LOAD_SIZE 1 << 16
556 /** Version 0 stores (id,value) pairs, with a final 0 as a terminator */
loadV1(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)557 static size_t loadV1(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
558   NumericRangeEntry *entries = array_new(NumericRangeEntry, NUMERIC_IDX_INITIAL_LOAD_SIZE);
559   while (1) {
560     NumericRangeEntry cur;
561     cur.docId = RedisModule_LoadUnsigned(rdb);
562     if (!cur.docId) {
563       break;
564     }
565     cur.value = RedisModule_LoadDouble(rdb);
566     entries = array_append(entries, cur);
567   }
568   *entriespp = entries;
569   return array_len(entries);
570 }
571 
NumericIndexType_RdbLoad(RedisModuleIO * rdb,int encver)572 void *NumericIndexType_RdbLoad(RedisModuleIO *rdb, int encver) {
573   if (encver > NUMERIC_INDEX_ENCVER) {
574     return NULL;
575   }
576 
577   NumericRangeEntry *entries = NULL;
578   size_t numEntries = 0;
579   if (encver == 0) {
580     numEntries = loadV0(rdb, &entries);
581   } else if (encver == 1) {
582     numEntries = loadV1(rdb, &entries);
583   } else {
584     return NULL;  // Unknown version
585   }
586 
587   // sort the entries by doc id, as they were not saved in this order
588   qsort(entries, numEntries, sizeof(NumericRangeEntry), cmpdocId);
589   NumericRangeTree *t = NewNumericRangeTree();
590 
591   // now push them in order into the tree
592   for (size_t i = 0; i < numEntries; i++) {
593     NumericRangeTree_Add(t, entries[i].docId, entries[i].value);
594   }
595   array_free(entries);
596   return t;
597 }
598 
599 struct niRdbSaveCtx {
600   RedisModuleIO *rdb;
601 };
602 
numericIndex_rdbSaveCallback(NumericRangeNode * n,void * ctx)603 static void numericIndex_rdbSaveCallback(NumericRangeNode *n, void *ctx) {
604   struct niRdbSaveCtx *rctx = ctx;
605 
606   if (NumericRangeNode_IsLeaf(n) && n->range) {
607     NumericRange *rng = n->range;
608     RSIndexResult *res = NULL;
609     IndexReader *ir = NewNumericReader(NULL, rng->entries, NULL);
610 
611     while (INDEXREAD_OK == IR_Read(ir, &res)) {
612       RedisModule_SaveUnsigned(rctx->rdb, res->docId);
613       RedisModule_SaveDouble(rctx->rdb, res->num.value);
614     }
615     IR_Free(ir);
616   }
617 }
NumericIndexType_RdbSave(RedisModuleIO * rdb,void * value)618 void NumericIndexType_RdbSave(RedisModuleIO *rdb, void *value) {
619 
620   NumericRangeTree *t = value;
621   struct niRdbSaveCtx ctx = {rdb};
622 
623   NumericRangeNode_Traverse(t->root, numericIndex_rdbSaveCallback, &ctx);
624   // Save the final record
625   RedisModule_SaveUnsigned(rdb, 0);
626 }
627 
NumericIndexType_Digest(RedisModuleDigest * digest,void * value)628 void NumericIndexType_Digest(RedisModuleDigest *digest, void *value) {
629 }
630 
NumericIndexType_Free(void * value)631 void NumericIndexType_Free(void *value) {
632   NumericRangeTree *t = value;
633   NumericRangeTree_Free(t);
634 }
635 
NumericRangeTreeIterator_New(NumericRangeTree * t)636 NumericRangeTreeIterator *NumericRangeTreeIterator_New(NumericRangeTree *t) {
637 #define NODE_STACK_INITIAL_SIZE 4
638   NumericRangeTreeIterator *iter = rm_malloc(sizeof(NumericRangeTreeIterator));
639   iter->nodesStack = array_new(NumericRangeNode *, NODE_STACK_INITIAL_SIZE);
640   array_append(iter->nodesStack, t->root);
641   return iter;
642 }
643 
NumericRangeTreeIterator_Next(NumericRangeTreeIterator * iter)644 NumericRangeNode *NumericRangeTreeIterator_Next(NumericRangeTreeIterator *iter) {
645   if (array_len(iter->nodesStack) == 0) {
646     return NULL;
647   }
648   NumericRangeNode *ret = array_pop(iter->nodesStack);
649   if (!NumericRangeNode_IsLeaf(ret)) {
650     iter->nodesStack = array_append(iter->nodesStack, ret->left);
651     iter->nodesStack = array_append(iter->nodesStack, ret->right);
652   }
653 
654   return ret;
655 }
656 
NumericRangeTreeIterator_Free(NumericRangeTreeIterator * iter)657 void NumericRangeTreeIterator_Free(NumericRangeTreeIterator *iter) {
658   array_free(iter->nodesStack);
659   rm_free(iter);
660 }
661