1 #include "redisearch.h"
2 #include "varint.h"
3 #include "rmalloc.h"
4 #include "util/mempool.h"
5 #include <sys/param.h>
6
7 /* We have two types of offset vector iterators - for terms and for aggregates. For terms we simply
8 * yield the encoded offsets one by one. For aggregates, we merge them on the fly in order.
9 * They are both encapsulated in an abstract iterator interface called RSOffsetIterator, with
10 * callbacks and context matching the appropriate implementation.
11 */
12
13 /* A raw offset vector iterator */
14 typedef struct {
15 Buffer buf;
16 BufferReader br;
17 uint32_t lastValue;
18 RSQueryTerm *term;
19 } _RSOffsetVectorIterator;
20
21 typedef struct {
22 const RSAggregateResult *res;
23 size_t size;
24 RSOffsetIterator *iters;
25 uint32_t *offsets;
26 RSQueryTerm **terms;
27 // uint32_t lastOffset; - TODO: Avoid duplicate offsets
28
29 } _RSAggregateOffsetIterator;
30
31 /* Get the next entry, or return RS_OFFSETVECTOR_EOF */
32 uint32_t _ovi_Next(void *ctx, RSQueryTerm **t);
33 /* Rewind the iterator */
34 void _ovi_Rewind(void *ctx);
35
36 /* memory pool for buffer iterators */
37 static mempool_t *__offsetIters = NULL;
38 static mempool_t *__aggregateIters = NULL;
39
40 /* Free it */
_ovi_free(void * ctx)41 void _ovi_free(void *ctx) {
42 mempool_release(__offsetIters, ctx);
43 }
44
newOffsetIterator()45 void *newOffsetIterator() {
46 return rm_malloc(sizeof(_RSOffsetVectorIterator));
47 }
48 /* Create an offset iterator interface from a raw offset vector */
RSOffsetVector_Iterate(const RSOffsetVector * v,RSQueryTerm * t)49 RSOffsetIterator RSOffsetVector_Iterate(const RSOffsetVector *v, RSQueryTerm *t) {
50 if (!__offsetIters) {
51 mempool_options options = {
52 .isGlobal = 1, .initialCap = 8, .alloc = newOffsetIterator, .free = rm_free};
53 __offsetIters = mempool_new(&options);
54 }
55 _RSOffsetVectorIterator *it = mempool_get(__offsetIters);
56 it->buf = (Buffer){.data = v->data, .offset = v->len, .cap = v->len};
57 it->br = NewBufferReader(&it->buf);
58 it->lastValue = 0;
59 it->term = t;
60
61 return (RSOffsetIterator){.Next = _ovi_Next, .Rewind = _ovi_Rewind, .Free = _ovi_free, .ctx = it};
62 }
63
64 /* An aggregate offset iterator yielding offsets one by one */
65 uint32_t _aoi_Next(void *ctx, RSQueryTerm **term);
66 void _aoi_Free(void *ctx);
67 void _aoi_Rewind(void *ctx);
68
aggiterNew()69 static void *aggiterNew() {
70 _RSAggregateOffsetIterator *it = rm_malloc(sizeof(_RSAggregateOffsetIterator));
71 it->size = 0;
72 it->offsets = NULL;
73 it->iters = NULL;
74 it->terms = NULL;
75 return it;
76 }
77
aggiterFree(void * p)78 static void aggiterFree(void *p) {
79 _RSAggregateOffsetIterator *aggiter = p;
80 rm_free(aggiter->offsets);
81 rm_free(aggiter->iters);
82 rm_free(aggiter->terms);
83 rm_free(aggiter);
84 }
85
86 /* Create an iterator from the aggregate offset iterators of the aggregate result */
_aggregateResult_iterate(const RSAggregateResult * agg)87 static RSOffsetIterator _aggregateResult_iterate(const RSAggregateResult *agg) {
88 if (!__aggregateIters) {
89 mempool_options opts = {
90 .isGlobal = 1, .initialCap = 8, .alloc = aggiterNew, .free = aggiterFree};
91 __aggregateIters = mempool_new(&opts);
92 }
93 _RSAggregateOffsetIterator *it = mempool_get(__aggregateIters);
94 it->res = agg;
95
96 if (agg->numChildren > it->size) {
97 it->size = agg->numChildren;
98 rm_free(it->iters);
99 rm_free(it->offsets);
100 rm_free(it->terms);
101 it->iters = rm_calloc(agg->numChildren, sizeof(RSOffsetIterator));
102 it->offsets = rm_calloc(agg->numChildren, sizeof(uint32_t));
103 it->terms = rm_calloc(agg->numChildren, sizeof(RSQueryTerm *));
104 }
105
106 for (int i = 0; i < agg->numChildren; i++) {
107 it->iters[i] = RSIndexResult_IterateOffsets(agg->children[i]);
108 it->offsets[i] = it->iters[i].Next(it->iters[i].ctx, &it->terms[i]);
109 }
110
111 return (RSOffsetIterator){.Next = _aoi_Next, .Rewind = _aoi_Rewind, .Free = _aoi_Free, .ctx = it};
112 }
_empty_Next(void * ctx,RSQueryTerm ** t)113 uint32_t _empty_Next(void *ctx, RSQueryTerm **t) {
114 return RS_OFFSETVECTOR_EOF;
115 }
_empty_Free(void * ctx)116 void _empty_Free(void *ctx) {
117 }
_empty_Rewind(void * ctx)118 void _empty_Rewind(void *ctx) {
119 }
120
_emptyIterator()121 RSOffsetIterator _emptyIterator() {
122 return (RSOffsetIterator){
123 .Next = _empty_Next, .Rewind = _empty_Rewind, .Free = _empty_Free, .ctx = NULL};
124 }
125
126 /* Create the appropriate iterator from a result based on its type */
RSIndexResult_IterateOffsets(const RSIndexResult * res)127 RSOffsetIterator RSIndexResult_IterateOffsets(const RSIndexResult *res) {
128
129 switch (res->type) {
130 case RSResultType_Term:
131 return RSOffsetVector_Iterate(&res->term.offsets, res->term.term);
132
133 // virtual and numeric entries have no offsets and cannot participate
134 case RSResultType_Virtual:
135 case RSResultType_Numeric:
136 return _emptyIterator();
137
138 case RSResultType_Intersection:
139 case RSResultType_Union:
140 default:
141 // if we only have one sub result, just iterate that...
142 if (res->agg.numChildren == 1) {
143 return RSIndexResult_IterateOffsets(res->agg.children[0]);
144 }
145 return _aggregateResult_iterate(&res->agg);
146 break;
147 }
148 }
149
150 /* Rewind an offset vector iterator and start reading it from the beginning. */
_ovi_Rewind(void * ctx)151 void _ovi_Rewind(void *ctx) {
152 _RSOffsetVectorIterator *it = ctx;
153 it->lastValue = 0;
154 it->buf.offset = 0;
155 it->br.pos = 0;
156 }
157
_ovi_Free(void * ctx)158 void _ovi_Free(void *ctx) {
159 rm_free(ctx);
160 }
161
_ovi_Next(void * ctx,RSQueryTerm ** t)162 uint32_t _ovi_Next(void *ctx, RSQueryTerm **t) {
163 _RSOffsetVectorIterator *vi = ctx;
164
165 if (!BufferReader_AtEnd(&vi->br)) {
166 vi->lastValue = ReadVarint(&vi->br) + vi->lastValue;
167 if (t) *t = vi->term;
168 return vi->lastValue;
169 }
170
171 return RS_OFFSETVECTOR_EOF;
172 }
173
_aoi_Next(void * ctx,RSQueryTerm ** t)174 uint32_t _aoi_Next(void *ctx, RSQueryTerm **t) {
175 _RSAggregateOffsetIterator *it = ctx;
176
177 int minIdx = -1;
178 uint32_t minVal = RS_OFFSETVECTOR_EOF;
179 uint32_t *offsets = it->offsets;
180 register int num = it->res->numChildren;
181 // find the minimal value that's not EOF
182 for (register int i = 0; i < num; i++) {
183 if (offsets[i] < minVal) {
184 minIdx = i;
185 minVal = offsets[i];
186 }
187 }
188
189 // if we found a minimal iterator - advance it for the next round
190 if (minIdx != -1) {
191
192 // copy the term of that iterator to t if it's not NULL
193 if (t) *t = it->terms[minIdx];
194
195 it->offsets[minIdx] = it->iters[minIdx].Next(it->iters[minIdx].ctx, &it->terms[minIdx]);
196 }
197
198 return minVal;
199 }
200
_aoi_Free(void * ctx)201 void _aoi_Free(void *ctx) {
202 _RSAggregateOffsetIterator *it = ctx;
203 for (int i = 0; i < it->res->numChildren; i++) {
204 it->iters[i].Free(it->iters[i].ctx);
205 }
206
207 mempool_release(__aggregateIters, ctx);
208 }
209
_aoi_Rewind(void * ctx)210 void _aoi_Rewind(void *ctx) {
211 _RSAggregateOffsetIterator *it = ctx;
212
213 for (int i = 0; i < it->res->numChildren; i++) {
214 it->iters[i].Rewind(it->iters[i].ctx);
215 it->offsets[i] = 0;
216 }
217 }