1 #include "numeric_index.h"
2 #include "redis_index.h"
3 #include "sys/param.h"
4 #include "rmutil/vector.h"
5 #include "rmutil/util.h"
6 #include "index.h"
7 #include "util/arr.h"
8 #include <math.h>
9 #include "redismodule.h"
10 #include "util/misc.h"
11 //#include "tests/time_sample.h"
12 #define NR_EXPONENT 4
13 #define NR_MAXRANGE_CARD 2500
14 #define NR_MAXRANGE_SIZE 10000
15 
16 typedef struct {
17   IndexIterator *it;
18   uint32_t lastRevId;
19 } NumericUnionCtx;
20 
21 /* A callback called after a concurrent context regains execution context. When this happen we need
22  * to make sure the key hasn't been deleted or its structure changed, which will render the
23  * underlying iterators invalid */
NumericRangeIterator_OnReopen(void * privdata)24 void NumericRangeIterator_OnReopen(void *privdata) {
25 }
26 
27 /* Returns 1 if the entire numeric range is contained between min and max */
NumericRange_Contained(NumericRange * n,double min,double max)28 static inline int NumericRange_Contained(NumericRange *n, double min, double max) {
29   if (!n) return 0;
30   int rc = (n->minVal >= min && n->maxVal <= max);
31 
32   // printf("range %f..%f, min %f max %f, WITHIN? %d\n", n->minVal, n->maxVal, min, max, rc);
33   return rc;
34 }
35 
36 /* Returns 1 if min and max are both inside the range. this is the opposite of _Within */
NumericRange_Contains(NumericRange * n,double min,double max)37 static inline int NumericRange_Contains(NumericRange *n, double min, double max) {
38   if (!n) return 0;
39   int rc = (n->minVal <= min && n->maxVal > max);
40   // printf("range %f..%f, min %f max %f, contains? %d\n", n->minVal, n->maxVal, min, max, rc);
41   return rc;
42 }
43 
44 /* Returns 1 if there is any overlap between the range and min/max */
NumericRange_Overlaps(NumericRange * n,double min,double max)45 int NumericRange_Overlaps(NumericRange *n, double min, double max) {
46   if (!n) return 0;
47   int rc = (min >= n->minVal && min <= n->maxVal) || (max >= n->minVal && max <= n->maxVal);
48   // printf("range %f..%f, min %f max %f, overlaps? %d\n", n->minVal, n->maxVal, min, max, rc);
49   return rc;
50 }
51 
NumericRange_Add(NumericRange * n,t_docId docId,double value,int checkCard)52 size_t NumericRange_Add(NumericRange *n, t_docId docId, double value, int checkCard) {
53 
54   int add = 0;
55   if (checkCard) {
56     add = 1;
57     size_t card = n->card;
58     for (int i = 0; i < array_len(n->values); i++) {
59 
60       if (n->values[i].value == value) {
61         add = 0;
62         n->values[i].appearances++;
63         break;
64       }
65     }
66   }
67   if (n->minVal == NF_NEGATIVE_INFINITY || value < n->minVal) n->minVal = value;
68   if (n->maxVal == NF_INFINITY || value > n->maxVal) n->maxVal = value;
69   if (add) {
70     if (n->card < n->splitCard) {
71       CardinalityValue val = {.value = value, .appearances = 1};
72       n->values = array_append(n->values, val);
73       n->unique_sum += value;
74     }
75     ++n->card;
76   }
77 
78   size_t size = InvertedIndex_WriteNumericEntry(n->entries, docId, value);
79   n->invertedIndexSize += size;
80   return size;
81 }
82 
NumericRange_Split(NumericRange * n,NumericRangeNode ** lp,NumericRangeNode ** rp,NRN_AddRv * rv)83 double NumericRange_Split(NumericRange *n, NumericRangeNode **lp, NumericRangeNode **rp,
84                           NRN_AddRv *rv) {
85 
86   double split = (n->unique_sum) / (double)n->card;
87 
88   // printf("split point :%f\n", split);
89   *lp = NewLeafNode(n->entries->numDocs / 2 + 1, n->minVal, split,
90                     MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
91   *rp = NewLeafNode(n->entries->numDocs / 2 + 1, split, n->maxVal,
92                     MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
93 
94   RSIndexResult *res = NULL;
95   IndexReader *ir = NewNumericReader(NULL, n->entries, NULL ,0, 0);
96   while (INDEXREAD_OK == IR_Read(ir, &res)) {
97     rv->sz += NumericRange_Add(res->num.value < split ? (*lp)->range : (*rp)->range, res->docId,
98                                res->num.value, 1);
99     ++rv->numRecords;
100   }
101   IR_Free(ir);
102 
103   // printf("Splitting node %p %f..%f, card %d size %d\n", n, n->minVal, n->maxVal, n->card,
104   //        n->entries->numDocs);
105   // printf("left node: %d, right: %d\n", (*lp)->range->entries->numDocs,
106   //        (*rp)->range->entries->numDocs);
107   return split;
108 }
109 
NewLeafNode(size_t cap,double min,double max,size_t splitCard)110 NumericRangeNode *NewLeafNode(size_t cap, double min, double max, size_t splitCard) {
111 
112   NumericRangeNode *n = rm_malloc(sizeof(NumericRangeNode));
113   n->left = NULL;
114   n->right = NULL;
115   n->value = 0;
116 
117   n->maxDepth = 0;
118   n->range = rm_malloc(sizeof(NumericRange));
119 
120   *n->range = (NumericRange){
121       .minVal = min,
122       .maxVal = max,
123       .unique_sum = 0,
124       .card = 0,
125       .splitCard = splitCard,
126       .values = array_new(CardinalityValue, 1),
127       //.values = rm_calloc(splitCard, sizeof(CardinalityValue)),
128       .entries = NewInvertedIndex(Index_StoreNumeric, 1),
129       .invertedIndexSize = 0,
130   };
131   return n;
132 }
133 
removeRange(NumericRangeNode * n,NRN_AddRv * rv)134 static void removeRange(NumericRangeNode *n, NRN_AddRv *rv) {
135   // first change pointer to null
136   NumericRange *temp = n->range;
137   n->range = NULL;
138   // free resources
139   rv->sz -= temp->invertedIndexSize;
140   rv->numRecords -= temp->entries->numDocs;
141   InvertedIndex_Free(temp->entries);
142   array_free(temp->values);
143   rm_free(temp);
144 
145   rv->numRanges--;
146 }
147 
NumericRangeNode_Add(NumericRangeNode * n,t_docId docId,double value)148 NRN_AddRv NumericRangeNode_Add(NumericRangeNode *n, t_docId docId, double value) {
149   NRN_AddRv rv = {.sz = 0, .changed = 0, .numRecords = 0, .numRanges = 0};
150   if (!NumericRangeNode_IsLeaf(n)) {
151     // if this node has already split but retains a range, just add to the range without checking
152     // anything
153     size_t s = 0;
154     size_t nRecords = 0;
155     if (n->range) {
156       s += NumericRange_Add(n->range, docId, value, 0);
157       ++nRecords;
158     }
159 
160     // recursively add to its left or right child.
161     NumericRangeNode **childP = value < n->value ? &n->left : &n->right;
162     NumericRangeNode *child = *childP;
163     // if the child has split we get 1 in return
164     rv = NumericRangeNode_Add(child, docId, value);
165     rv.sz += s;
166     rv.numRecords += nRecords;
167 
168     if (rv.changed) {
169       // if there was a split it means our max depth has increased.
170       // we are too deep - we don't retain this node's range anymore.
171       // this keeps memory footprint in check
172       if (++n->maxDepth > RSGlobalConfig.numericTreeMaxDepthRange && n->range) {
173         removeRange(n, &rv);
174       }
175 
176       // check if we need to rebalance the child.
177       // To ease the rebalance we don't rebalance the root
178       // nor do we rebalance nodes that are with ranges (n->maxDepth > NR_MAX_DEPTH)
179       if ((child->right->maxDepth - child->left->maxDepth) > NR_MAX_DEPTH_BALANCE) {  // role to the left
180         NumericRangeNode *right = child->right;
181         child->right = right->left;
182         right->left = child;
183         --child->maxDepth;
184         *childP = right;  // replace the child with the new child
185       } else if ((child->left->maxDepth - child->right->maxDepth) >
186                  NR_MAX_DEPTH_BALANCE) {  // role to the right
187         NumericRangeNode *left = child->left;
188         child->left = left->right;
189         left->right = child;
190         --child->maxDepth;
191         *childP = left;  // replace the child with the new child
192       }
193     }
194     // return 1 or 0 to our called, so this is done recursively
195     return rv;
196   }
197 
198   // if this node is a leaf - we add AND check the cardinality. We only split leaf nodes
199   rv.sz = (uint32_t)NumericRange_Add(n->range, docId, value, 1);
200   ++rv.numRecords;
201   int card = n->range->card;
202   // printf("Added %d %f to node %f..%f, card now %zd, size now %zd\n", docId, value,
203   // n->range->minVal,
204   //        n->range->maxVal, card, n->range->entries->numDocs);
205   if (card >= n->range->splitCard || (n->range->entries->numDocs > NR_MAXRANGE_SIZE && card > 1)) {
206 
207     // split this node but don't delete its range
208     double split = NumericRange_Split(n->range, &n->left, &n->right, &rv);
209     rv.numRanges += 2;
210     if (RSGlobalConfig.numericTreeMaxDepthRange == 0) {
211       removeRange(n, &rv);
212     }
213     n->value = split;
214     n->maxDepth = 1;
215     rv.changed = 1;
216   }
217 
218   return rv;
219 }
220 
221 /* Recursively add a node's children to the range. */
__recursiveAddRange(Vector * v,NumericRangeNode * n,double min,double max)222 void __recursiveAddRange(Vector *v, NumericRangeNode *n, double min, double max) {
223   if (!n) return;
224 
225   if (n->range) {
226     // printf("min %f, max %f, range %f..%f, contained? %d, overlaps? %d, leaf? %d\n", min, max,
227     //        n->range->minVal, n->range->maxVal, NumericRange_Contained(n->range, min, max),
228     //        NumericRange_Overlaps(n->range, min, max), __isLeaf(n));
229     // if the range is completely contained in the search, we can just add it and not inspect any
230     // downwards
231     if (NumericRange_Contained(n->range, min, max)) {
232       Vector_Push(v, n->range);
233       return;
234     }
235     // No overlap at all - no need to do anything
236     if (!NumericRange_Overlaps(n->range, min, max)) {
237       return;
238     }
239   }
240 
241   // for non leaf nodes - we try to descend into their children
242   if (!NumericRangeNode_IsLeaf(n)) {
243     if(min <= n->value) {
244       __recursiveAddRange(v, n->left, min, max);
245     }
246     if(max >= n->value) {
247       __recursiveAddRange(v, n->right, min, max);
248     }
249   } else if (NumericRange_Overlaps(n->range, min, max)) {
250     Vector_Push(v, n->range);
251     return;
252   }
253 }
254 
NumericRangeTree_DeleteNode(NumericRangeTree * t,double value)255 int NumericRangeTree_DeleteNode(NumericRangeTree *t, double value) {
256   // TODO:
257   return 0;
258 }
259 
260 /* Find the numeric ranges that fit the range we are looking for. We try to minimize the number of
261  * nodes we'll later need to union */
NumericRangeNode_FindRange(NumericRangeNode * n,double min,double max)262 Vector *NumericRangeNode_FindRange(NumericRangeNode *n, double min, double max) {
263 
264   Vector *leaves = NewVector(NumericRange *, 8);
265   __recursiveAddRange(leaves, n, min, max);
266   // printf("Found %zd ranges for %f...%f\n", leaves->top, min, max);
267   // for (int i = 0; i < leaves->top; i++) {
268   //   NumericRange *rng;
269   //   Vector_Get(leaves, i, &rng);
270   //   printf("%f...%f (%f). %d card, %d splitCard\n", rng->minVal, rng->maxVal,
271   //          rng->maxVal - rng->minVal, rng->entries->numDocs, rng->splitCard);
272   // }
273 
274   return leaves;
275 }
276 
NumericRangeNode_Free(NumericRangeNode * n)277 void NumericRangeNode_Free(NumericRangeNode *n) {
278   if (!n) return;
279   if (n->range) {
280     InvertedIndex_Free(n->range->entries);
281     array_free(n->range->values);
282     rm_free(n->range);
283     n->range = NULL;
284   }
285 
286   NumericRangeNode_Free(n->left);
287   NumericRangeNode_Free(n->right);
288 
289   rm_free(n);
290 }
291 
292 uint16_t numericTreesUniqueId = 0;
293 
294 /* Create a new numeric range tree */
NewNumericRangeTree()295 NumericRangeTree *NewNumericRangeTree() {
296   NumericRangeTree *ret = rm_malloc(sizeof(NumericRangeTree));
297 
298   ret->root = NewLeafNode(2, NF_NEGATIVE_INFINITY, NF_INFINITY, 2);
299   ret->numEntries = 0;
300   ret->numRanges = 1;
301   ret->revisionId = 0;
302   ret->lastDocId = 0;
303   ret->uniqueId = numericTreesUniqueId++;
304   return ret;
305 }
306 
NumericRangeTree_Add(NumericRangeTree * t,t_docId docId,double value)307 NRN_AddRv NumericRangeTree_Add(NumericRangeTree *t, t_docId docId, double value) {
308 
309   // Do not allow duplicate entries. This might happen due to indexer bugs and we need to protect
310   // from it
311   if (docId <= t->lastDocId) {
312     return (NRN_AddRv){0, 0, 0};
313   }
314   t->lastDocId = docId;
315 
316   NRN_AddRv rv = NumericRangeNode_Add(t->root, docId, value);
317   // rc != 0 means the tree nodes have changed, and concurrent iteration is not allowed now
318   // we increment the revision id of the tree, so currently running query iterators on it
319   // will abort the next time they get execution context
320   if (rv.changed) {
321     t->revisionId++;
322   }
323   t->numRanges += rv.numRanges;
324   t->numEntries++;
325 
326   return rv;
327 }
328 
NumericRangeTree_Find(NumericRangeTree * t,double min,double max)329 Vector *NumericRangeTree_Find(NumericRangeTree *t, double min, double max) {
330   return NumericRangeNode_FindRange(t->root, min, max);
331 }
332 
NumericRangeNode_Traverse(NumericRangeNode * n,void (* callback)(NumericRangeNode * n,void * ctx),void * ctx)333 void NumericRangeNode_Traverse(NumericRangeNode *n,
334                                void (*callback)(NumericRangeNode *n, void *ctx), void *ctx) {
335 
336   callback(n, ctx);
337 
338   if (n->left) {
339     NumericRangeNode_Traverse(n->left, callback, ctx);
340   }
341   if (n->right) {
342     NumericRangeNode_Traverse(n->right, callback, ctx);
343   }
344 }
345 
NumericRangeTree_Free(NumericRangeTree * t)346 void NumericRangeTree_Free(NumericRangeTree *t) {
347   NumericRangeNode_Free(t->root);
348   rm_free(t);
349 }
350 
NewNumericRangeIterator(const IndexSpec * sp,NumericRange * nr,const NumericFilter * f)351 IndexIterator *NewNumericRangeIterator(const IndexSpec *sp, NumericRange *nr,
352                                        const NumericFilter *f) {
353 
354   // if this range is at either end of the filter, we need to check each record
355   if (NumericFilter_Match(f, nr->minVal) && NumericFilter_Match(f, nr->maxVal) &&
356       f->geoFilter == NULL) {
357     // make the filter NULL so the reader will ignore it
358     f = NULL;
359   }
360   IndexReader *ir = NewNumericReader(sp, nr->entries, f, nr->minVal, nr->maxVal);
361 
362   return NewReadIterator(ir);
363 }
364 
365 /* Create a union iterator from the numeric filter, over all the sub-ranges in the tree that fit
366  * the filter */
createNumericIterator(const IndexSpec * sp,NumericRangeTree * t,const NumericFilter * f)367 IndexIterator *createNumericIterator(const IndexSpec *sp, NumericRangeTree *t,
368                                      const NumericFilter *f) {
369 
370   Vector *v = NumericRangeTree_Find(t, f->min, f->max);
371   if (!v || Vector_Size(v) == 0) {
372     if (v) {
373       Vector_Free(v);
374     }
375     return NULL;
376   }
377 
378   int n = Vector_Size(v);
379   // if we only selected one range - we can just iterate it without union or anything
380   if (n == 1) {
381     NumericRange *rng;
382     Vector_Get(v, 0, &rng);
383     IndexIterator *it = NewNumericRangeIterator(sp, rng, f);
384     Vector_Free(v);
385     return it;
386   }
387 
388   // We create a  union iterator, advancing a union on all the selected range,
389   // treating them as one consecutive range
390   IndexIterator **its = rm_calloc(n, sizeof(IndexIterator *));
391 
392   for (size_t i = 0; i < n; i++) {
393     NumericRange *rng;
394     Vector_Get(v, i, &rng);
395     if (!rng) {
396       continue;
397     }
398 
399     its[i] = NewNumericRangeIterator(sp, rng, f);
400   }
401   Vector_Free(v);
402 
403   QueryNodeType type = (!f || !f->geoFilter) ? QN_NUMERIC : QN_GEO;
404   IndexIterator *it = NewUnionIterator(its, n, NULL, 1, 1, type, NULL);
405 
406   return it;
407 }
408 
409 RedisModuleType *NumericIndexType = NULL;
410 #define NUMERICINDEX_KEY_FMT "nm:%s/%s"
411 
fmtRedisNumericIndexKey(RedisSearchCtx * ctx,const char * field)412 RedisModuleString *fmtRedisNumericIndexKey(RedisSearchCtx *ctx, const char *field) {
413   return RedisModule_CreateStringPrintf(ctx->redisCtx, NUMERICINDEX_KEY_FMT, ctx->spec->name,
414                                         field);
415 }
416 
openNumericKeysDict(RedisSearchCtx * ctx,RedisModuleString * keyName,int write)417 static NumericRangeTree *openNumericKeysDict(RedisSearchCtx *ctx, RedisModuleString *keyName,
418                                              int write) {
419   KeysDictValue *kdv = dictFetchValue(ctx->spec->keysDict, keyName);
420   if (kdv) {
421     return kdv->p;
422   }
423   if (!write) {
424     return NULL;
425   }
426   kdv = rm_calloc(1, sizeof(*kdv));
427   kdv->dtor = (void (*)(void *))NumericRangeTree_Free;
428   kdv->p = NewNumericRangeTree();
429   dictAdd(ctx->spec->keysDict, keyName, kdv);
430   return kdv->p;
431 }
432 
NewNumericFilterIterator(RedisSearchCtx * ctx,const NumericFilter * flt,ConcurrentSearchCtx * csx,FieldType forType)433 struct indexIterator *NewNumericFilterIterator(RedisSearchCtx *ctx, const NumericFilter *flt,
434                                                ConcurrentSearchCtx *csx, FieldType forType) {
435   RedisModuleString *s = IndexSpec_GetFormattedKeyByName(ctx->spec, flt->fieldName, forType);
436   if (!s) {
437     return NULL;
438   }
439   RedisModuleKey *key = NULL;
440   NumericRangeTree *t = NULL;
441   if (!ctx->spec->keysDict) {
442     key = RedisModule_OpenKey(ctx->redisCtx, s, REDISMODULE_READ);
443     if (!key || RedisModule_ModuleTypeGetType(key) != NumericIndexType) {
444       return NULL;
445     }
446 
447     t = RedisModule_ModuleTypeGetValue(key);
448   } else {
449     t = openNumericKeysDict(ctx, s, 0);
450   }
451 
452   if (!t) {
453     return NULL;
454   }
455 
456   IndexIterator *it = createNumericIterator(ctx->spec, t, flt);
457   if (!it) {
458     return NULL;
459   }
460 
461   if (csx) {
462     NumericUnionCtx *uc = rm_malloc(sizeof(*uc));
463     uc->lastRevId = t->revisionId;
464     uc->it = it;
465     ConcurrentSearch_AddKey(csx, NumericRangeIterator_OnReopen, uc, rm_free);
466   }
467   return it;
468 }
469 
OpenNumericIndex(RedisSearchCtx * ctx,RedisModuleString * keyName,RedisModuleKey ** idxKey)470 NumericRangeTree *OpenNumericIndex(RedisSearchCtx *ctx, RedisModuleString *keyName,
471                                    RedisModuleKey **idxKey) {
472 
473   NumericRangeTree *t;
474   if (!ctx->spec->keysDict) {
475     RedisModuleKey *key_s = NULL;
476 
477     if (!idxKey) {
478       idxKey = &key_s;
479     }
480 
481     *idxKey = RedisModule_OpenKey(ctx->redisCtx, keyName, REDISMODULE_READ | REDISMODULE_WRITE);
482 
483     int type = RedisModule_KeyType(*idxKey);
484     if (type != REDISMODULE_KEYTYPE_EMPTY &&
485         RedisModule_ModuleTypeGetType(*idxKey) != NumericIndexType) {
486       return NULL;
487     }
488 
489     /* Create an empty value object if the key is currently empty. */
490     if (type == REDISMODULE_KEYTYPE_EMPTY) {
491       t = NewNumericRangeTree();
492       RedisModule_ModuleTypeSetValue((*idxKey), NumericIndexType, t);
493     } else {
494       t = RedisModule_ModuleTypeGetValue(*idxKey);
495     }
496   } else {
497     t = openNumericKeysDict(ctx, keyName, 1);
498   }
499   return t;
500 }
501 
__numericIndex_memUsageCallback(NumericRangeNode * n,void * ctx)502 void __numericIndex_memUsageCallback(NumericRangeNode *n, void *ctx) {
503   unsigned long *sz = ctx;
504   *sz += sizeof(NumericRangeNode);
505 
506   if (n->range) {
507     *sz += sizeof(NumericRange);
508     *sz += n->range->card * sizeof(double);
509     if (n->range->entries) {
510       *sz += InvertedIndex_MemUsage(n->range->entries);
511     }
512   }
513 }
514 
NumericIndexType_MemUsage(const void * value)515 unsigned long NumericIndexType_MemUsage(const void *value) {
516   const NumericRangeTree *t = value;
517   unsigned long ret = sizeof(NumericRangeTree);
518   NumericRangeNode_Traverse(t->root, __numericIndex_memUsageCallback, &ret);
519   return ret;
520 }
521 
522 #define NUMERIC_INDEX_ENCVER 1
523 
NumericIndexType_Register(RedisModuleCtx * ctx)524 int NumericIndexType_Register(RedisModuleCtx *ctx) {
525 
526   RedisModuleTypeMethods tm = {.version = REDISMODULE_TYPE_METHOD_VERSION,
527                                .rdb_load = NumericIndexType_RdbLoad,
528                                .rdb_save = NumericIndexType_RdbSave,
529                                .aof_rewrite = GenericAofRewrite_DisabledHandler,
530                                .free = NumericIndexType_Free,
531                                .mem_usage = (const void *)NumericIndexType_MemUsage};
532 
533   NumericIndexType = RedisModule_CreateDataType(ctx, "numericdx", NUMERIC_INDEX_ENCVER, &tm);
534   if (NumericIndexType == NULL) {
535     return REDISMODULE_ERR;
536   }
537 
538   return REDISMODULE_OK;
539 }
540 
541 /* A single entry in a numeric index's single range. Since entries are binned together, each needs
542  * to have the exact value */
543 typedef struct {
544   t_docId docId;
545   double value;
546 } NumericRangeEntry;
547 
cmpdocId(const void * p1,const void * p2)548 static int cmpdocId(const void *p1, const void *p2) {
549   NumericRangeEntry *e1 = (NumericRangeEntry *)p1;
550   NumericRangeEntry *e2 = (NumericRangeEntry *)p2;
551 
552   return (int)e1->docId - (int)e2->docId;
553 }
554 
555 /** Version 0 stores the number of entries beforehand, and then loads them */
loadV0(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)556 static size_t loadV0(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
557   uint64_t num = RedisModule_LoadUnsigned(rdb);
558   if (!num) {
559     return 0;
560   }
561 
562   *entriespp = array_newlen(NumericRangeEntry, num);
563   NumericRangeEntry *entries = *entriespp;
564   for (size_t ii = 0; ii < num; ++ii) {
565     entries[ii].docId = RedisModule_LoadUnsigned(rdb);
566     entries[ii].value = RedisModule_LoadDouble(rdb);
567   }
568   return num;
569 }
570 
571 #define NUMERIC_IDX_INITIAL_LOAD_SIZE 1 << 16
572 /** Version 0 stores (id,value) pairs, with a final 0 as a terminator */
loadV1(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)573 static size_t loadV1(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
574   NumericRangeEntry *entries = array_new(NumericRangeEntry, NUMERIC_IDX_INITIAL_LOAD_SIZE);
575   while (1) {
576     NumericRangeEntry cur;
577     cur.docId = RedisModule_LoadUnsigned(rdb);
578     if (!cur.docId) {
579       break;
580     }
581     cur.value = RedisModule_LoadDouble(rdb);
582     entries = array_append(entries, cur);
583   }
584   *entriespp = entries;
585   return array_len(entries);
586 }
587 
NumericIndexType_RdbLoad(RedisModuleIO * rdb,int encver)588 void *NumericIndexType_RdbLoad(RedisModuleIO *rdb, int encver) {
589   if (encver > NUMERIC_INDEX_ENCVER) {
590     return NULL;
591   }
592 
593   NumericRangeEntry *entries = NULL;
594   size_t numEntries = 0;
595   if (encver == 0) {
596     numEntries = loadV0(rdb, &entries);
597   } else if (encver == 1) {
598     numEntries = loadV1(rdb, &entries);
599   } else {
600     return NULL;  // Unknown version
601   }
602 
603   // sort the entries by doc id, as they were not saved in this order
604   qsort(entries, numEntries, sizeof(NumericRangeEntry), cmpdocId);
605   NumericRangeTree *t = NewNumericRangeTree();
606 
607   // now push them in order into the tree
608   for (size_t i = 0; i < numEntries; i++) {
609     NumericRangeTree_Add(t, entries[i].docId, entries[i].value);
610   }
611   array_free(entries);
612   return t;
613 }
614 
615 struct niRdbSaveCtx {
616   RedisModuleIO *rdb;
617 };
618 
numericIndex_rdbSaveCallback(NumericRangeNode * n,void * ctx)619 static void numericIndex_rdbSaveCallback(NumericRangeNode *n, void *ctx) {
620   struct niRdbSaveCtx *rctx = ctx;
621 
622   if (NumericRangeNode_IsLeaf(n) && n->range) {
623     NumericRange *rng = n->range;
624     RSIndexResult *res = NULL;
625     IndexReader *ir = NewNumericReader(NULL, rng->entries, NULL, 0, 0);
626 
627     while (INDEXREAD_OK == IR_Read(ir, &res)) {
628       RedisModule_SaveUnsigned(rctx->rdb, res->docId);
629       RedisModule_SaveDouble(rctx->rdb, res->num.value);
630     }
631     IR_Free(ir);
632   }
633 }
NumericIndexType_RdbSave(RedisModuleIO * rdb,void * value)634 void NumericIndexType_RdbSave(RedisModuleIO *rdb, void *value) {
635 
636   NumericRangeTree *t = value;
637   struct niRdbSaveCtx ctx = {rdb};
638 
639   NumericRangeNode_Traverse(t->root, numericIndex_rdbSaveCallback, &ctx);
640   // Save the final record
641   RedisModule_SaveUnsigned(rdb, 0);
642 }
643 
NumericIndexType_Digest(RedisModuleDigest * digest,void * value)644 void NumericIndexType_Digest(RedisModuleDigest *digest, void *value) {
645 }
646 
NumericIndexType_Free(void * value)647 void NumericIndexType_Free(void *value) {
648   NumericRangeTree *t = value;
649   NumericRangeTree_Free(t);
650 }
651 
NumericRangeTreeIterator_New(NumericRangeTree * t)652 NumericRangeTreeIterator *NumericRangeTreeIterator_New(NumericRangeTree *t) {
653 #define NODE_STACK_INITIAL_SIZE 4
654   NumericRangeTreeIterator *iter = rm_malloc(sizeof(NumericRangeTreeIterator));
655   iter->nodesStack = array_new(NumericRangeNode *, NODE_STACK_INITIAL_SIZE);
656   array_append(iter->nodesStack, t->root);
657   return iter;
658 }
659 
NumericRangeTreeIterator_Next(NumericRangeTreeIterator * iter)660 NumericRangeNode *NumericRangeTreeIterator_Next(NumericRangeTreeIterator *iter) {
661   if (array_len(iter->nodesStack) == 0) {
662     return NULL;
663   }
664   NumericRangeNode *ret = array_pop(iter->nodesStack);
665   if (!NumericRangeNode_IsLeaf(ret)) {
666     iter->nodesStack = array_append(iter->nodesStack, ret->left);
667     iter->nodesStack = array_append(iter->nodesStack, ret->right);
668   }
669 
670   return ret;
671 }
672 
NumericRangeTreeIterator_Free(NumericRangeTreeIterator * iter)673 void NumericRangeTreeIterator_Free(NumericRangeTreeIterator *iter) {
674   array_free(iter->nodesStack);
675   rm_free(iter);
676 }
677