1 #include "numeric_index.h"
2 #include "redis_index.h"
3 #include "sys/param.h"
4 #include "rmutil/vector.h"
5 #include "rmutil/util.h"
6 #include "index.h"
7 #include "util/arr.h"
8 #include <math.h>
9 #include "redismodule.h"
10 #include "util/misc.h"
11 //#include "tests/time_sample.h"
12 #define NR_EXPONENT 4
13 #define NR_MAXRANGE_CARD 2500
14 #define NR_MAXRANGE_SIZE 10000
15
16 typedef struct {
17 IndexIterator *it;
18 uint32_t lastRevId;
19 } NumericUnionCtx;
20
21 /* A callback called after a concurrent context regains execution context. When this happen we need
22 * to make sure the key hasn't been deleted or its structure changed, which will render the
23 * underlying iterators invalid */
NumericRangeIterator_OnReopen(void * privdata)24 void NumericRangeIterator_OnReopen(void *privdata) {
25 }
26
27 /* Returns 1 if the entire numeric range is contained between min and max */
NumericRange_Contained(NumericRange * n,double min,double max)28 static inline int NumericRange_Contained(NumericRange *n, double min, double max) {
29 if (!n) return 0;
30 int rc = (n->minVal >= min && n->maxVal <= max);
31
32 // printf("range %f..%f, min %f max %f, WITHIN? %d\n", n->minVal, n->maxVal, min, max, rc);
33 return rc;
34 }
35
36 /* Returns 1 if min and max are both inside the range. this is the opposite of _Within */
NumericRange_Contains(NumericRange * n,double min,double max)37 static inline int NumericRange_Contains(NumericRange *n, double min, double max) {
38 if (!n) return 0;
39 int rc = (n->minVal <= min && n->maxVal > max);
40 // printf("range %f..%f, min %f max %f, contains? %d\n", n->minVal, n->maxVal, min, max, rc);
41 return rc;
42 }
43
44 /* Returns 1 if there is any overlap between the range and min/max */
NumericRange_Overlaps(NumericRange * n,double min,double max)45 int NumericRange_Overlaps(NumericRange *n, double min, double max) {
46 if (!n) return 0;
47 int rc = (min >= n->minVal && min <= n->maxVal) || (max >= n->minVal && max <= n->maxVal);
48 // printf("range %f..%f, min %f max %f, overlaps? %d\n", n->minVal, n->maxVal, min, max, rc);
49 return rc;
50 }
51
NumericRange_Add(NumericRange * n,t_docId docId,double value,int checkCard)52 size_t NumericRange_Add(NumericRange *n, t_docId docId, double value, int checkCard) {
53
54 int add = 0;
55 if (checkCard) {
56 add = 1;
57 size_t card = n->card;
58 for (int i = 0; i < array_len(n->values); i++) {
59
60 if (n->values[i].value == value) {
61 add = 0;
62 n->values[i].appearances++;
63 break;
64 }
65 }
66 }
67 if (n->minVal == NF_NEGATIVE_INFINITY || value < n->minVal) n->minVal = value;
68 if (n->maxVal == NF_INFINITY || value > n->maxVal) n->maxVal = value;
69 if (add) {
70 if (n->card < n->splitCard) {
71 CardinalityValue val = {.value = value, .appearances = 1};
72 n->values = array_append(n->values, val);
73 n->unique_sum += value;
74 }
75 ++n->card;
76 }
77
78 size_t size = InvertedIndex_WriteNumericEntry(n->entries, docId, value);
79 n->invertedIndexSize += size;
80 return size;
81 }
82
NumericRange_Split(NumericRange * n,NumericRangeNode ** lp,NumericRangeNode ** rp,NRN_AddRv * rv)83 double NumericRange_Split(NumericRange *n, NumericRangeNode **lp, NumericRangeNode **rp,
84 NRN_AddRv *rv) {
85
86 double split = (n->unique_sum) / (double)n->card;
87
88 // printf("split point :%f\n", split);
89 *lp = NewLeafNode(n->entries->numDocs / 2 + 1, n->minVal, split,
90 MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
91 *rp = NewLeafNode(n->entries->numDocs / 2 + 1, split, n->maxVal,
92 MIN(NR_MAXRANGE_CARD, 1 + n->splitCard * NR_EXPONENT));
93
94 RSIndexResult *res = NULL;
95 IndexReader *ir = NewNumericReader(NULL, n->entries, NULL ,0, 0);
96 while (INDEXREAD_OK == IR_Read(ir, &res)) {
97 rv->sz += NumericRange_Add(res->num.value < split ? (*lp)->range : (*rp)->range, res->docId,
98 res->num.value, 1);
99 ++rv->numRecords;
100 }
101 IR_Free(ir);
102
103 // printf("Splitting node %p %f..%f, card %d size %d\n", n, n->minVal, n->maxVal, n->card,
104 // n->entries->numDocs);
105 // printf("left node: %d, right: %d\n", (*lp)->range->entries->numDocs,
106 // (*rp)->range->entries->numDocs);
107 return split;
108 }
109
NewLeafNode(size_t cap,double min,double max,size_t splitCard)110 NumericRangeNode *NewLeafNode(size_t cap, double min, double max, size_t splitCard) {
111
112 NumericRangeNode *n = rm_malloc(sizeof(NumericRangeNode));
113 n->left = NULL;
114 n->right = NULL;
115 n->value = 0;
116
117 n->maxDepth = 0;
118 n->range = rm_malloc(sizeof(NumericRange));
119
120 *n->range = (NumericRange){
121 .minVal = min,
122 .maxVal = max,
123 .unique_sum = 0,
124 .card = 0,
125 .splitCard = splitCard,
126 .values = array_new(CardinalityValue, 1),
127 //.values = rm_calloc(splitCard, sizeof(CardinalityValue)),
128 .entries = NewInvertedIndex(Index_StoreNumeric, 1),
129 .invertedIndexSize = 0,
130 };
131 return n;
132 }
133
removeRange(NumericRangeNode * n,NRN_AddRv * rv)134 static void removeRange(NumericRangeNode *n, NRN_AddRv *rv) {
135 // first change pointer to null
136 NumericRange *temp = n->range;
137 n->range = NULL;
138 // free resources
139 rv->sz -= temp->invertedIndexSize;
140 rv->numRecords -= temp->entries->numDocs;
141 InvertedIndex_Free(temp->entries);
142 array_free(temp->values);
143 rm_free(temp);
144
145 rv->numRanges--;
146 }
147
NumericRangeNode_Add(NumericRangeNode * n,t_docId docId,double value)148 NRN_AddRv NumericRangeNode_Add(NumericRangeNode *n, t_docId docId, double value) {
149 NRN_AddRv rv = {.sz = 0, .changed = 0, .numRecords = 0, .numRanges = 0};
150 if (!NumericRangeNode_IsLeaf(n)) {
151 // if this node has already split but retains a range, just add to the range without checking
152 // anything
153 size_t s = 0;
154 size_t nRecords = 0;
155 if (n->range) {
156 s += NumericRange_Add(n->range, docId, value, 0);
157 ++nRecords;
158 }
159
160 // recursively add to its left or right child.
161 NumericRangeNode **childP = value < n->value ? &n->left : &n->right;
162 NumericRangeNode *child = *childP;
163 // if the child has split we get 1 in return
164 rv = NumericRangeNode_Add(child, docId, value);
165 rv.sz += s;
166 rv.numRecords += nRecords;
167
168 if (rv.changed) {
169 // if there was a split it means our max depth has increased.
170 // we are too deep - we don't retain this node's range anymore.
171 // this keeps memory footprint in check
172 if (++n->maxDepth > RSGlobalConfig.numericTreeMaxDepthRange && n->range) {
173 removeRange(n, &rv);
174 }
175
176 // check if we need to rebalance the child.
177 // To ease the rebalance we don't rebalance the root
178 // nor do we rebalance nodes that are with ranges (n->maxDepth > NR_MAX_DEPTH)
179 if ((child->right->maxDepth - child->left->maxDepth) > NR_MAX_DEPTH_BALANCE) { // role to the left
180 NumericRangeNode *right = child->right;
181 child->right = right->left;
182 right->left = child;
183 --child->maxDepth;
184 *childP = right; // replace the child with the new child
185 } else if ((child->left->maxDepth - child->right->maxDepth) >
186 NR_MAX_DEPTH_BALANCE) { // role to the right
187 NumericRangeNode *left = child->left;
188 child->left = left->right;
189 left->right = child;
190 --child->maxDepth;
191 *childP = left; // replace the child with the new child
192 }
193 }
194 // return 1 or 0 to our called, so this is done recursively
195 return rv;
196 }
197
198 // if this node is a leaf - we add AND check the cardinality. We only split leaf nodes
199 rv.sz = (uint32_t)NumericRange_Add(n->range, docId, value, 1);
200 ++rv.numRecords;
201 int card = n->range->card;
202 // printf("Added %d %f to node %f..%f, card now %zd, size now %zd\n", docId, value,
203 // n->range->minVal,
204 // n->range->maxVal, card, n->range->entries->numDocs);
205 if (card >= n->range->splitCard || (n->range->entries->numDocs > NR_MAXRANGE_SIZE && card > 1)) {
206
207 // split this node but don't delete its range
208 double split = NumericRange_Split(n->range, &n->left, &n->right, &rv);
209 rv.numRanges += 2;
210 if (RSGlobalConfig.numericTreeMaxDepthRange == 0) {
211 removeRange(n, &rv);
212 }
213 n->value = split;
214 n->maxDepth = 1;
215 rv.changed = 1;
216 }
217
218 return rv;
219 }
220
221 /* Recursively add a node's children to the range. */
__recursiveAddRange(Vector * v,NumericRangeNode * n,double min,double max)222 void __recursiveAddRange(Vector *v, NumericRangeNode *n, double min, double max) {
223 if (!n) return;
224
225 if (n->range) {
226 // printf("min %f, max %f, range %f..%f, contained? %d, overlaps? %d, leaf? %d\n", min, max,
227 // n->range->minVal, n->range->maxVal, NumericRange_Contained(n->range, min, max),
228 // NumericRange_Overlaps(n->range, min, max), __isLeaf(n));
229 // if the range is completely contained in the search, we can just add it and not inspect any
230 // downwards
231 if (NumericRange_Contained(n->range, min, max)) {
232 Vector_Push(v, n->range);
233 return;
234 }
235 // No overlap at all - no need to do anything
236 if (!NumericRange_Overlaps(n->range, min, max)) {
237 return;
238 }
239 }
240
241 // for non leaf nodes - we try to descend into their children
242 if (!NumericRangeNode_IsLeaf(n)) {
243 if(min <= n->value) {
244 __recursiveAddRange(v, n->left, min, max);
245 }
246 if(max >= n->value) {
247 __recursiveAddRange(v, n->right, min, max);
248 }
249 } else if (NumericRange_Overlaps(n->range, min, max)) {
250 Vector_Push(v, n->range);
251 return;
252 }
253 }
254
NumericRangeTree_DeleteNode(NumericRangeTree * t,double value)255 int NumericRangeTree_DeleteNode(NumericRangeTree *t, double value) {
256 // TODO:
257 return 0;
258 }
259
260 /* Find the numeric ranges that fit the range we are looking for. We try to minimize the number of
261 * nodes we'll later need to union */
NumericRangeNode_FindRange(NumericRangeNode * n,double min,double max)262 Vector *NumericRangeNode_FindRange(NumericRangeNode *n, double min, double max) {
263
264 Vector *leaves = NewVector(NumericRange *, 8);
265 __recursiveAddRange(leaves, n, min, max);
266 // printf("Found %zd ranges for %f...%f\n", leaves->top, min, max);
267 // for (int i = 0; i < leaves->top; i++) {
268 // NumericRange *rng;
269 // Vector_Get(leaves, i, &rng);
270 // printf("%f...%f (%f). %d card, %d splitCard\n", rng->minVal, rng->maxVal,
271 // rng->maxVal - rng->minVal, rng->entries->numDocs, rng->splitCard);
272 // }
273
274 return leaves;
275 }
276
NumericRangeNode_Free(NumericRangeNode * n)277 void NumericRangeNode_Free(NumericRangeNode *n) {
278 if (!n) return;
279 if (n->range) {
280 InvertedIndex_Free(n->range->entries);
281 array_free(n->range->values);
282 rm_free(n->range);
283 n->range = NULL;
284 }
285
286 NumericRangeNode_Free(n->left);
287 NumericRangeNode_Free(n->right);
288
289 rm_free(n);
290 }
291
292 uint16_t numericTreesUniqueId = 0;
293
294 /* Create a new numeric range tree */
NewNumericRangeTree()295 NumericRangeTree *NewNumericRangeTree() {
296 NumericRangeTree *ret = rm_malloc(sizeof(NumericRangeTree));
297
298 ret->root = NewLeafNode(2, NF_NEGATIVE_INFINITY, NF_INFINITY, 2);
299 ret->numEntries = 0;
300 ret->numRanges = 1;
301 ret->revisionId = 0;
302 ret->lastDocId = 0;
303 ret->uniqueId = numericTreesUniqueId++;
304 return ret;
305 }
306
NumericRangeTree_Add(NumericRangeTree * t,t_docId docId,double value)307 NRN_AddRv NumericRangeTree_Add(NumericRangeTree *t, t_docId docId, double value) {
308
309 // Do not allow duplicate entries. This might happen due to indexer bugs and we need to protect
310 // from it
311 if (docId <= t->lastDocId) {
312 return (NRN_AddRv){0, 0, 0};
313 }
314 t->lastDocId = docId;
315
316 NRN_AddRv rv = NumericRangeNode_Add(t->root, docId, value);
317 // rc != 0 means the tree nodes have changed, and concurrent iteration is not allowed now
318 // we increment the revision id of the tree, so currently running query iterators on it
319 // will abort the next time they get execution context
320 if (rv.changed) {
321 t->revisionId++;
322 }
323 t->numRanges += rv.numRanges;
324 t->numEntries++;
325
326 return rv;
327 }
328
NumericRangeTree_Find(NumericRangeTree * t,double min,double max)329 Vector *NumericRangeTree_Find(NumericRangeTree *t, double min, double max) {
330 return NumericRangeNode_FindRange(t->root, min, max);
331 }
332
NumericRangeNode_Traverse(NumericRangeNode * n,void (* callback)(NumericRangeNode * n,void * ctx),void * ctx)333 void NumericRangeNode_Traverse(NumericRangeNode *n,
334 void (*callback)(NumericRangeNode *n, void *ctx), void *ctx) {
335
336 callback(n, ctx);
337
338 if (n->left) {
339 NumericRangeNode_Traverse(n->left, callback, ctx);
340 }
341 if (n->right) {
342 NumericRangeNode_Traverse(n->right, callback, ctx);
343 }
344 }
345
NumericRangeTree_Free(NumericRangeTree * t)346 void NumericRangeTree_Free(NumericRangeTree *t) {
347 NumericRangeNode_Free(t->root);
348 rm_free(t);
349 }
350
NewNumericRangeIterator(const IndexSpec * sp,NumericRange * nr,const NumericFilter * f)351 IndexIterator *NewNumericRangeIterator(const IndexSpec *sp, NumericRange *nr,
352 const NumericFilter *f) {
353
354 // if this range is at either end of the filter, we need to check each record
355 if (NumericFilter_Match(f, nr->minVal) && NumericFilter_Match(f, nr->maxVal) &&
356 f->geoFilter == NULL) {
357 // make the filter NULL so the reader will ignore it
358 f = NULL;
359 }
360 IndexReader *ir = NewNumericReader(sp, nr->entries, f, nr->minVal, nr->maxVal);
361
362 return NewReadIterator(ir);
363 }
364
365 /* Create a union iterator from the numeric filter, over all the sub-ranges in the tree that fit
366 * the filter */
createNumericIterator(const IndexSpec * sp,NumericRangeTree * t,const NumericFilter * f)367 IndexIterator *createNumericIterator(const IndexSpec *sp, NumericRangeTree *t,
368 const NumericFilter *f) {
369
370 Vector *v = NumericRangeTree_Find(t, f->min, f->max);
371 if (!v || Vector_Size(v) == 0) {
372 if (v) {
373 Vector_Free(v);
374 }
375 return NULL;
376 }
377
378 int n = Vector_Size(v);
379 // if we only selected one range - we can just iterate it without union or anything
380 if (n == 1) {
381 NumericRange *rng;
382 Vector_Get(v, 0, &rng);
383 IndexIterator *it = NewNumericRangeIterator(sp, rng, f);
384 Vector_Free(v);
385 return it;
386 }
387
388 // We create a union iterator, advancing a union on all the selected range,
389 // treating them as one consecutive range
390 IndexIterator **its = rm_calloc(n, sizeof(IndexIterator *));
391
392 for (size_t i = 0; i < n; i++) {
393 NumericRange *rng;
394 Vector_Get(v, i, &rng);
395 if (!rng) {
396 continue;
397 }
398
399 its[i] = NewNumericRangeIterator(sp, rng, f);
400 }
401 Vector_Free(v);
402
403 QueryNodeType type = (!f || !f->geoFilter) ? QN_NUMERIC : QN_GEO;
404 IndexIterator *it = NewUnionIterator(its, n, NULL, 1, 1, type, NULL);
405
406 return it;
407 }
408
409 RedisModuleType *NumericIndexType = NULL;
410 #define NUMERICINDEX_KEY_FMT "nm:%s/%s"
411
fmtRedisNumericIndexKey(RedisSearchCtx * ctx,const char * field)412 RedisModuleString *fmtRedisNumericIndexKey(RedisSearchCtx *ctx, const char *field) {
413 return RedisModule_CreateStringPrintf(ctx->redisCtx, NUMERICINDEX_KEY_FMT, ctx->spec->name,
414 field);
415 }
416
openNumericKeysDict(RedisSearchCtx * ctx,RedisModuleString * keyName,int write)417 static NumericRangeTree *openNumericKeysDict(RedisSearchCtx *ctx, RedisModuleString *keyName,
418 int write) {
419 KeysDictValue *kdv = dictFetchValue(ctx->spec->keysDict, keyName);
420 if (kdv) {
421 return kdv->p;
422 }
423 if (!write) {
424 return NULL;
425 }
426 kdv = rm_calloc(1, sizeof(*kdv));
427 kdv->dtor = (void (*)(void *))NumericRangeTree_Free;
428 kdv->p = NewNumericRangeTree();
429 dictAdd(ctx->spec->keysDict, keyName, kdv);
430 return kdv->p;
431 }
432
NewNumericFilterIterator(RedisSearchCtx * ctx,const NumericFilter * flt,ConcurrentSearchCtx * csx,FieldType forType)433 struct indexIterator *NewNumericFilterIterator(RedisSearchCtx *ctx, const NumericFilter *flt,
434 ConcurrentSearchCtx *csx, FieldType forType) {
435 RedisModuleString *s = IndexSpec_GetFormattedKeyByName(ctx->spec, flt->fieldName, forType);
436 if (!s) {
437 return NULL;
438 }
439 RedisModuleKey *key = NULL;
440 NumericRangeTree *t = NULL;
441 if (!ctx->spec->keysDict) {
442 key = RedisModule_OpenKey(ctx->redisCtx, s, REDISMODULE_READ);
443 if (!key || RedisModule_ModuleTypeGetType(key) != NumericIndexType) {
444 return NULL;
445 }
446
447 t = RedisModule_ModuleTypeGetValue(key);
448 } else {
449 t = openNumericKeysDict(ctx, s, 0);
450 }
451
452 if (!t) {
453 return NULL;
454 }
455
456 IndexIterator *it = createNumericIterator(ctx->spec, t, flt);
457 if (!it) {
458 return NULL;
459 }
460
461 if (csx) {
462 NumericUnionCtx *uc = rm_malloc(sizeof(*uc));
463 uc->lastRevId = t->revisionId;
464 uc->it = it;
465 ConcurrentSearch_AddKey(csx, NumericRangeIterator_OnReopen, uc, rm_free);
466 }
467 return it;
468 }
469
OpenNumericIndex(RedisSearchCtx * ctx,RedisModuleString * keyName,RedisModuleKey ** idxKey)470 NumericRangeTree *OpenNumericIndex(RedisSearchCtx *ctx, RedisModuleString *keyName,
471 RedisModuleKey **idxKey) {
472
473 NumericRangeTree *t;
474 if (!ctx->spec->keysDict) {
475 RedisModuleKey *key_s = NULL;
476
477 if (!idxKey) {
478 idxKey = &key_s;
479 }
480
481 *idxKey = RedisModule_OpenKey(ctx->redisCtx, keyName, REDISMODULE_READ | REDISMODULE_WRITE);
482
483 int type = RedisModule_KeyType(*idxKey);
484 if (type != REDISMODULE_KEYTYPE_EMPTY &&
485 RedisModule_ModuleTypeGetType(*idxKey) != NumericIndexType) {
486 return NULL;
487 }
488
489 /* Create an empty value object if the key is currently empty. */
490 if (type == REDISMODULE_KEYTYPE_EMPTY) {
491 t = NewNumericRangeTree();
492 RedisModule_ModuleTypeSetValue((*idxKey), NumericIndexType, t);
493 } else {
494 t = RedisModule_ModuleTypeGetValue(*idxKey);
495 }
496 } else {
497 t = openNumericKeysDict(ctx, keyName, 1);
498 }
499 return t;
500 }
501
__numericIndex_memUsageCallback(NumericRangeNode * n,void * ctx)502 void __numericIndex_memUsageCallback(NumericRangeNode *n, void *ctx) {
503 unsigned long *sz = ctx;
504 *sz += sizeof(NumericRangeNode);
505
506 if (n->range) {
507 *sz += sizeof(NumericRange);
508 *sz += n->range->card * sizeof(double);
509 if (n->range->entries) {
510 *sz += InvertedIndex_MemUsage(n->range->entries);
511 }
512 }
513 }
514
NumericIndexType_MemUsage(const void * value)515 unsigned long NumericIndexType_MemUsage(const void *value) {
516 const NumericRangeTree *t = value;
517 unsigned long ret = sizeof(NumericRangeTree);
518 NumericRangeNode_Traverse(t->root, __numericIndex_memUsageCallback, &ret);
519 return ret;
520 }
521
522 #define NUMERIC_INDEX_ENCVER 1
523
NumericIndexType_Register(RedisModuleCtx * ctx)524 int NumericIndexType_Register(RedisModuleCtx *ctx) {
525
526 RedisModuleTypeMethods tm = {.version = REDISMODULE_TYPE_METHOD_VERSION,
527 .rdb_load = NumericIndexType_RdbLoad,
528 .rdb_save = NumericIndexType_RdbSave,
529 .aof_rewrite = GenericAofRewrite_DisabledHandler,
530 .free = NumericIndexType_Free,
531 .mem_usage = (const void *)NumericIndexType_MemUsage};
532
533 NumericIndexType = RedisModule_CreateDataType(ctx, "numericdx", NUMERIC_INDEX_ENCVER, &tm);
534 if (NumericIndexType == NULL) {
535 return REDISMODULE_ERR;
536 }
537
538 return REDISMODULE_OK;
539 }
540
541 /* A single entry in a numeric index's single range. Since entries are binned together, each needs
542 * to have the exact value */
543 typedef struct {
544 t_docId docId;
545 double value;
546 } NumericRangeEntry;
547
cmpdocId(const void * p1,const void * p2)548 static int cmpdocId(const void *p1, const void *p2) {
549 NumericRangeEntry *e1 = (NumericRangeEntry *)p1;
550 NumericRangeEntry *e2 = (NumericRangeEntry *)p2;
551
552 return (int)e1->docId - (int)e2->docId;
553 }
554
555 /** Version 0 stores the number of entries beforehand, and then loads them */
loadV0(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)556 static size_t loadV0(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
557 uint64_t num = RedisModule_LoadUnsigned(rdb);
558 if (!num) {
559 return 0;
560 }
561
562 *entriespp = array_newlen(NumericRangeEntry, num);
563 NumericRangeEntry *entries = *entriespp;
564 for (size_t ii = 0; ii < num; ++ii) {
565 entries[ii].docId = RedisModule_LoadUnsigned(rdb);
566 entries[ii].value = RedisModule_LoadDouble(rdb);
567 }
568 return num;
569 }
570
571 #define NUMERIC_IDX_INITIAL_LOAD_SIZE 1 << 16
572 /** Version 0 stores (id,value) pairs, with a final 0 as a terminator */
loadV1(RedisModuleIO * rdb,NumericRangeEntry ** entriespp)573 static size_t loadV1(RedisModuleIO *rdb, NumericRangeEntry **entriespp) {
574 NumericRangeEntry *entries = array_new(NumericRangeEntry, NUMERIC_IDX_INITIAL_LOAD_SIZE);
575 while (1) {
576 NumericRangeEntry cur;
577 cur.docId = RedisModule_LoadUnsigned(rdb);
578 if (!cur.docId) {
579 break;
580 }
581 cur.value = RedisModule_LoadDouble(rdb);
582 entries = array_append(entries, cur);
583 }
584 *entriespp = entries;
585 return array_len(entries);
586 }
587
NumericIndexType_RdbLoad(RedisModuleIO * rdb,int encver)588 void *NumericIndexType_RdbLoad(RedisModuleIO *rdb, int encver) {
589 if (encver > NUMERIC_INDEX_ENCVER) {
590 return NULL;
591 }
592
593 NumericRangeEntry *entries = NULL;
594 size_t numEntries = 0;
595 if (encver == 0) {
596 numEntries = loadV0(rdb, &entries);
597 } else if (encver == 1) {
598 numEntries = loadV1(rdb, &entries);
599 } else {
600 return NULL; // Unknown version
601 }
602
603 // sort the entries by doc id, as they were not saved in this order
604 qsort(entries, numEntries, sizeof(NumericRangeEntry), cmpdocId);
605 NumericRangeTree *t = NewNumericRangeTree();
606
607 // now push them in order into the tree
608 for (size_t i = 0; i < numEntries; i++) {
609 NumericRangeTree_Add(t, entries[i].docId, entries[i].value);
610 }
611 array_free(entries);
612 return t;
613 }
614
615 struct niRdbSaveCtx {
616 RedisModuleIO *rdb;
617 };
618
numericIndex_rdbSaveCallback(NumericRangeNode * n,void * ctx)619 static void numericIndex_rdbSaveCallback(NumericRangeNode *n, void *ctx) {
620 struct niRdbSaveCtx *rctx = ctx;
621
622 if (NumericRangeNode_IsLeaf(n) && n->range) {
623 NumericRange *rng = n->range;
624 RSIndexResult *res = NULL;
625 IndexReader *ir = NewNumericReader(NULL, rng->entries, NULL, 0, 0);
626
627 while (INDEXREAD_OK == IR_Read(ir, &res)) {
628 RedisModule_SaveUnsigned(rctx->rdb, res->docId);
629 RedisModule_SaveDouble(rctx->rdb, res->num.value);
630 }
631 IR_Free(ir);
632 }
633 }
NumericIndexType_RdbSave(RedisModuleIO * rdb,void * value)634 void NumericIndexType_RdbSave(RedisModuleIO *rdb, void *value) {
635
636 NumericRangeTree *t = value;
637 struct niRdbSaveCtx ctx = {rdb};
638
639 NumericRangeNode_Traverse(t->root, numericIndex_rdbSaveCallback, &ctx);
640 // Save the final record
641 RedisModule_SaveUnsigned(rdb, 0);
642 }
643
NumericIndexType_Digest(RedisModuleDigest * digest,void * value)644 void NumericIndexType_Digest(RedisModuleDigest *digest, void *value) {
645 }
646
NumericIndexType_Free(void * value)647 void NumericIndexType_Free(void *value) {
648 NumericRangeTree *t = value;
649 NumericRangeTree_Free(t);
650 }
651
NumericRangeTreeIterator_New(NumericRangeTree * t)652 NumericRangeTreeIterator *NumericRangeTreeIterator_New(NumericRangeTree *t) {
653 #define NODE_STACK_INITIAL_SIZE 4
654 NumericRangeTreeIterator *iter = rm_malloc(sizeof(NumericRangeTreeIterator));
655 iter->nodesStack = array_new(NumericRangeNode *, NODE_STACK_INITIAL_SIZE);
656 array_append(iter->nodesStack, t->root);
657 return iter;
658 }
659
NumericRangeTreeIterator_Next(NumericRangeTreeIterator * iter)660 NumericRangeNode *NumericRangeTreeIterator_Next(NumericRangeTreeIterator *iter) {
661 if (array_len(iter->nodesStack) == 0) {
662 return NULL;
663 }
664 NumericRangeNode *ret = array_pop(iter->nodesStack);
665 if (!NumericRangeNode_IsLeaf(ret)) {
666 iter->nodesStack = array_append(iter->nodesStack, ret->left);
667 iter->nodesStack = array_append(iter->nodesStack, ret->right);
668 }
669
670 return ret;
671 }
672
NumericRangeTreeIterator_Free(NumericRangeTreeIterator * iter)673 void NumericRangeTreeIterator_Free(NumericRangeTreeIterator *iter) {
674 array_free(iter->nodesStack);
675 rm_free(iter);
676 }
677