1 #include "redisearch.h"
2 #include "varint.h"
3 #include "rmalloc.h"
4 #include "util/mempool.h"
5 #include <sys/param.h>
6 
7 /* We have two types of offset vector iterators - for terms and for aggregates. For terms we simply
8  * yield the encoded offsets one by one. For aggregates, we merge them on the fly in order.
9  * They are both encapsulated in an abstract iterator interface called RSOffsetIterator, with
10  * callbacks and context matching the appropriate implementation.
11  */
12 
13 /* A raw offset vector iterator */
14 typedef struct {
15   Buffer buf;
16   BufferReader br;
17   uint32_t lastValue;
18   RSQueryTerm *term;
19 } _RSOffsetVectorIterator;
20 
21 typedef struct {
22   const RSAggregateResult *res;
23   size_t size;
24   RSOffsetIterator *iters;
25   uint32_t *offsets;
26   RSQueryTerm **terms;
27   // uint32_t lastOffset; - TODO: Avoid duplicate offsets
28 
29 } _RSAggregateOffsetIterator;
30 
31 /* Get the next entry, or return RS_OFFSETVECTOR_EOF */
32 uint32_t _ovi_Next(void *ctx, RSQueryTerm **t);
33 /* Rewind the iterator */
34 void _ovi_Rewind(void *ctx);
35 
36 /* memory pool for buffer iterators */
37 static mempool_t *__offsetIters = NULL;
38 static mempool_t *__aggregateIters = NULL;
39 
40 /* Free it */
_ovi_free(void * ctx)41 void _ovi_free(void *ctx) {
42   mempool_release(__offsetIters, ctx);
43 }
44 
newOffsetIterator()45 void *newOffsetIterator() {
46   return rm_malloc(sizeof(_RSOffsetVectorIterator));
47 }
48 /* Create an offset iterator interface  from a raw offset vector */
RSOffsetVector_Iterate(const RSOffsetVector * v,RSQueryTerm * t)49 RSOffsetIterator RSOffsetVector_Iterate(const RSOffsetVector *v, RSQueryTerm *t) {
50   if (!__offsetIters) {
51     mempool_options options = {
52         .isGlobal = 1, .initialCap = 8, .alloc = newOffsetIterator, .free = rm_free};
53     __offsetIters = mempool_new(&options);
54   }
55   _RSOffsetVectorIterator *it = mempool_get(__offsetIters);
56   it->buf = (Buffer){.data = v->data, .offset = v->len, .cap = v->len};
57   it->br = NewBufferReader(&it->buf);
58   it->lastValue = 0;
59   it->term = t;
60 
61   return (RSOffsetIterator){.Next = _ovi_Next, .Rewind = _ovi_Rewind, .Free = _ovi_free, .ctx = it};
62 }
63 
64 /* An aggregate offset iterator yielding offsets one by one */
65 uint32_t _aoi_Next(void *ctx, RSQueryTerm **term);
66 void _aoi_Free(void *ctx);
67 void _aoi_Rewind(void *ctx);
68 
aggiterNew()69 static void *aggiterNew() {
70   _RSAggregateOffsetIterator *it = rm_malloc(sizeof(_RSAggregateOffsetIterator));
71   it->size = 0;
72   it->offsets = NULL;
73   it->iters = NULL;
74   it->terms = NULL;
75   return it;
76 }
77 
aggiterFree(void * p)78 static void aggiterFree(void *p) {
79   _RSAggregateOffsetIterator *aggiter = p;
80   rm_free(aggiter->offsets);
81   rm_free(aggiter->iters);
82   rm_free(aggiter->terms);
83   rm_free(aggiter);
84 }
85 
86 /* Create an iterator from the aggregate offset iterators of the aggregate result */
_aggregateResult_iterate(const RSAggregateResult * agg)87 static RSOffsetIterator _aggregateResult_iterate(const RSAggregateResult *agg) {
88   if (!__aggregateIters) {
89     mempool_options opts = {
90         .isGlobal = 1, .initialCap = 8, .alloc = aggiterNew, .free = aggiterFree};
91     __aggregateIters = mempool_new(&opts);
92   }
93   _RSAggregateOffsetIterator *it = mempool_get(__aggregateIters);
94   it->res = agg;
95 
96   if (agg->numChildren > it->size) {
97     it->size = agg->numChildren;
98     rm_free(it->iters);
99     rm_free(it->offsets);
100     rm_free(it->terms);
101     it->iters = rm_calloc(agg->numChildren, sizeof(RSOffsetIterator));
102     it->offsets = rm_calloc(agg->numChildren, sizeof(uint32_t));
103     it->terms = rm_calloc(agg->numChildren, sizeof(RSQueryTerm *));
104   }
105 
106   for (int i = 0; i < agg->numChildren; i++) {
107     it->iters[i] = RSIndexResult_IterateOffsets(agg->children[i]);
108     it->offsets[i] = it->iters[i].Next(it->iters[i].ctx, &it->terms[i]);
109   }
110 
111   return (RSOffsetIterator){.Next = _aoi_Next, .Rewind = _aoi_Rewind, .Free = _aoi_Free, .ctx = it};
112 }
_empty_Next(void * ctx,RSQueryTerm ** t)113 uint32_t _empty_Next(void *ctx, RSQueryTerm **t) {
114   return RS_OFFSETVECTOR_EOF;
115 }
_empty_Free(void * ctx)116 void _empty_Free(void *ctx) {
117 }
_empty_Rewind(void * ctx)118 void _empty_Rewind(void *ctx) {
119 }
120 
_emptyIterator()121 RSOffsetIterator _emptyIterator() {
122   return (RSOffsetIterator){
123       .Next = _empty_Next, .Rewind = _empty_Rewind, .Free = _empty_Free, .ctx = NULL};
124 }
125 
126 /* Create the appropriate iterator from a result based on its type */
RSIndexResult_IterateOffsets(const RSIndexResult * res)127 RSOffsetIterator RSIndexResult_IterateOffsets(const RSIndexResult *res) {
128 
129   switch (res->type) {
130     case RSResultType_Term:
131       return RSOffsetVector_Iterate(&res->term.offsets, res->term.term);
132 
133     // virtual and numeric entries have no offsets and cannot participate
134     case RSResultType_Virtual:
135     case RSResultType_Numeric:
136       return _emptyIterator();
137 
138     case RSResultType_Intersection:
139     case RSResultType_Union:
140     default:
141       // if we only have one sub result, just iterate that...
142       if (res->agg.numChildren == 1) {
143         return RSIndexResult_IterateOffsets(res->agg.children[0]);
144       }
145       return _aggregateResult_iterate(&res->agg);
146       break;
147   }
148 }
149 
150 /* Rewind an offset vector iterator and start reading it from the beginning. */
_ovi_Rewind(void * ctx)151 void _ovi_Rewind(void *ctx) {
152   _RSOffsetVectorIterator *it = ctx;
153   it->lastValue = 0;
154   it->buf.offset = 0;
155   it->br.pos = 0;
156 }
157 
_ovi_Free(void * ctx)158 void _ovi_Free(void *ctx) {
159   rm_free(ctx);
160 }
161 
_ovi_Next(void * ctx,RSQueryTerm ** t)162 uint32_t _ovi_Next(void *ctx, RSQueryTerm **t) {
163   _RSOffsetVectorIterator *vi = ctx;
164 
165   if (!BufferReader_AtEnd(&vi->br)) {
166     vi->lastValue = ReadVarint(&vi->br) + vi->lastValue;
167     if (t) *t = vi->term;
168     return vi->lastValue;
169   }
170 
171   return RS_OFFSETVECTOR_EOF;
172 }
173 
_aoi_Next(void * ctx,RSQueryTerm ** t)174 uint32_t _aoi_Next(void *ctx, RSQueryTerm **t) {
175   _RSAggregateOffsetIterator *it = ctx;
176 
177   int minIdx = -1;
178   uint32_t minVal = RS_OFFSETVECTOR_EOF;
179   uint32_t *offsets = it->offsets;
180   register int num = it->res->numChildren;
181   // find the minimal value that's not EOF
182   for (register int i = 0; i < num; i++) {
183     if (offsets[i] < minVal) {
184       minIdx = i;
185       minVal = offsets[i];
186     }
187   }
188 
189   // if we found a minimal iterator - advance it for the next round
190   if (minIdx != -1) {
191 
192     // copy the term of that iterator to t if it's not NULL
193     if (t) *t = it->terms[minIdx];
194 
195     it->offsets[minIdx] = it->iters[minIdx].Next(it->iters[minIdx].ctx, &it->terms[minIdx]);
196   }
197 
198   return minVal;
199 }
200 
_aoi_Free(void * ctx)201 void _aoi_Free(void *ctx) {
202   _RSAggregateOffsetIterator *it = ctx;
203   for (int i = 0; i < it->res->numChildren; i++) {
204     it->iters[i].Free(it->iters[i].ctx);
205   }
206 
207   mempool_release(__aggregateIters, ctx);
208 }
209 
_aoi_Rewind(void * ctx)210 void _aoi_Rewind(void *ctx) {
211   _RSAggregateOffsetIterator *it = ctx;
212 
213   for (int i = 0; i < it->res->numChildren; i++) {
214     it->iters[i].Rewind(it->iters[i].ctx);
215     it->offsets[i] = 0;
216   }
217 }