1 #include "aggregate.h"
2 #include "reducer.h"
3
4 #include <query.h>
5 #include <extension.h>
6 #include <result_processor.h>
7 #include <util/arr.h>
8 #include <rmutil/util.h>
9 #include "ext/default.h"
10 #include "extension.h"
11
12 /**
13 * Ensures that the user has not requested one of the 'extended' features. Extended
14 * in this case refers to reducers which re-create the search results.
15 * @param areq the request
16 * @param name the name of the option that requires simple mode. Used for error
17 * formatting
18 * @param status the error object
19 */
ensureSimpleMode(AREQ * areq)20 static void ensureSimpleMode(AREQ *areq) {
21 assert(!(areq->reqflags & QEXEC_F_IS_EXTENDED));
22 areq->reqflags |= QEXEC_F_IS_SEARCH;
23 }
24
25 /**
26 * Like @ref ensureSimpleMode(), but does the opposite -- ensures that one of the
27 * 'simple' options - i.e. ones which rely on the field to be the exact same as
28 * found in the document - was not requested.
29 */
ensureExtendedMode(AREQ * areq,const char * name,QueryError * status)30 static int ensureExtendedMode(AREQ *areq, const char *name, QueryError *status) {
31 if (areq->reqflags & QEXEC_F_IS_SEARCH) {
32 QueryError_SetErrorFmt(status, QUERY_EINVAL,
33 "option `%s` is mutually exclusive with simple (i.e. search) options",
34 name);
35 return 0;
36 }
37 areq->reqflags |= QEXEC_F_IS_EXTENDED;
38 return 1;
39 }
40
41 static int parseSortby(PLN_ArrangeStep *arng, ArgsCursor *ac, QueryError *status, int allowLegacy);
42
ReturnedField_Free(ReturnedField * field)43 static void ReturnedField_Free(ReturnedField *field) {
44 rm_free(field->highlightSettings.openTag);
45 rm_free(field->highlightSettings.closeTag);
46 rm_free(field->summarizeSettings.separator);
47 }
48
FieldList_Free(FieldList * fields)49 void FieldList_Free(FieldList *fields) {
50 for (size_t ii = 0; ii < fields->numFields; ++ii) {
51 ReturnedField_Free(fields->fields + ii);
52 }
53 ReturnedField_Free(&fields->defaultField);
54 rm_free(fields->fields);
55 }
56
FieldList_GetCreateField(FieldList * fields,const char * name)57 ReturnedField *FieldList_GetCreateField(FieldList *fields, const char *name) {
58 size_t foundIndex = -1;
59 for (size_t ii = 0; ii < fields->numFields; ++ii) {
60 if (!strcasecmp(fields->fields[ii].name, name)) {
61 return fields->fields + ii;
62 }
63 }
64
65 fields->fields = rm_realloc(fields->fields, sizeof(*fields->fields) * ++fields->numFields);
66 ReturnedField *ret = fields->fields + (fields->numFields - 1);
67 memset(ret, 0, sizeof *ret);
68 ret->name = name;
69 return ret;
70 }
71
FieldList_RestrictReturn(FieldList * fields)72 static void FieldList_RestrictReturn(FieldList *fields) {
73 if (!fields->explicitReturn) {
74 return;
75 }
76
77 size_t oix = 0;
78 for (size_t ii = 0; ii < fields->numFields; ++ii) {
79 if (fields->fields[ii].explicitReturn == 0) {
80 ReturnedField_Free(fields->fields + ii);
81 } else if (ii != oix) {
82 fields->fields[oix++] = fields->fields[ii];
83 } else {
84 ++oix;
85 }
86 }
87 fields->numFields = oix;
88 }
89
parseCursorSettings(AREQ * req,ArgsCursor * ac,QueryError * status)90 static int parseCursorSettings(AREQ *req, ArgsCursor *ac, QueryError *status) {
91 ACArgSpec specs[] = {{.name = "MAXIDLE",
92 .type = AC_ARGTYPE_UINT,
93 .target = &req->cursorMaxIdle,
94 .intflags = AC_F_GE1},
95 {.name = "COUNT",
96 .type = AC_ARGTYPE_UINT,
97 .target = &req->cursorChunkSize,
98 .intflags = AC_F_GE1},
99 {NULL}};
100
101 int rv;
102 ACArgSpec *errArg = NULL;
103 if ((rv = AC_ParseArgSpec(ac, specs, &errArg)) != AC_OK && rv != AC_ERR_ENOENT) {
104 QERR_MKBADARGS_AC(status, errArg->name, rv);
105 return REDISMODULE_ERR;
106 }
107
108 if (req->cursorMaxIdle == 0 || req->cursorMaxIdle > RSGlobalConfig.cursorMaxIdle) {
109 req->cursorMaxIdle = RSGlobalConfig.cursorMaxIdle;
110 }
111 req->reqflags |= QEXEC_F_IS_CURSOR;
112 return REDISMODULE_OK;
113 }
114
115 #define ARG_HANDLED 1
116 #define ARG_ERROR -1
117 #define ARG_UNKNOWN 0
118
handleCommonArgs(AREQ * req,ArgsCursor * ac,QueryError * status,int allowLegacy)119 static int handleCommonArgs(AREQ *req, ArgsCursor *ac, QueryError *status, int allowLegacy) {
120 int rv;
121 // This handles the common arguments that are not stateful
122 if (AC_AdvanceIfMatch(ac, "LIMIT")) {
123 PLN_ArrangeStep *arng = AGPLN_GetOrCreateArrangeStep(&req->ap);
124 // Parse offset, length
125 if (AC_NumRemaining(ac) < 2) {
126 QueryError_SetError(status, QUERY_EPARSEARGS, "LIMIT requires two arguments");
127 return ARG_ERROR;
128 }
129 if ((rv = AC_GetU64(ac, &arng->offset, 0)) != AC_OK ||
130 (rv = AC_GetU64(ac, &arng->limit, 0)) != AC_OK) {
131 QueryError_SetError(status, QUERY_EPARSEARGS, "LIMIT needs two numeric arguments");
132 return ARG_ERROR;
133 }
134
135 if (arng->limit == 0) {
136 // LIMIT 0 0
137 req->reqflags |= QEXEC_F_NOROWS;
138 } else if ((arng->limit > RSGlobalConfig.maxSearchResults) && (req->reqflags & QEXEC_F_IS_SEARCH)) {
139 QueryError_SetErrorFmt(status, QUERY_ELIMIT, "LIMIT exceeds maximum of %llu",
140 RSGlobalConfig.maxSearchResults);
141 return ARG_ERROR;
142 }
143 } else if (AC_AdvanceIfMatch(ac, "SORTBY")) {
144 PLN_ArrangeStep *arng = AGPLN_GetOrCreateArrangeStep(&req->ap);
145 if ((parseSortby(arng, ac, status, req->reqflags & QEXEC_F_IS_SEARCH)) != REDISMODULE_OK) {
146 return ARG_ERROR;
147 }
148 } else if (AC_AdvanceIfMatch(ac, "ON_TIMEOUT")) {
149 if (AC_NumRemaining(ac) < 1) {
150 QueryError_SetError(status, QUERY_EPARSEARGS, "Need argument for ON_TIMEOUT");
151 return ARG_ERROR;
152 }
153 const char *policystr = AC_GetStringNC(ac, NULL);
154 req->tmoPolicy = TimeoutPolicy_Parse(policystr, strlen(policystr));
155 if (req->tmoPolicy == TimeoutPolicy_Invalid) {
156 QueryError_SetErrorFmt(status, QUERY_EPARSEARGS, "'%s' is not a valid timeout policy",
157 policystr);
158 return ARG_ERROR;
159 }
160 } else if (AC_AdvanceIfMatch(ac, "WITHCURSOR")) {
161 if (parseCursorSettings(req, ac, status) != REDISMODULE_OK) {
162 return ARG_ERROR;
163 }
164 } else if (AC_AdvanceIfMatch(ac, "_NUM_SSTRING")) {
165 req->reqflags |= QEXEC_F_TYPED;
166 } else if (AC_AdvanceIfMatch(ac, "WITHRAWIDS")) {
167 req->reqflags |= QEXEC_F_SENDRAWIDS;
168 } else {
169 return ARG_UNKNOWN;
170 }
171
172 return ARG_HANDLED;
173 }
174
parseSortby(PLN_ArrangeStep * arng,ArgsCursor * ac,QueryError * status,int isLegacy)175 static int parseSortby(PLN_ArrangeStep *arng, ArgsCursor *ac, QueryError *status, int isLegacy) {
176 // Prevent multiple SORTBY steps
177 if (arng->sortKeys != NULL) {
178 QERR_MKBADARGS_FMT(status, "Multiple SORTBY steps are not allowed. Sort multiple fields in a single step");
179 return REDISMODULE_ERR;
180 }
181
182 // Assume argument is at 'SORTBY'
183 ArgsCursor subArgs = {0};
184 int rv;
185 int legacyDesc = 0;
186
187 // We build a bitmap of maximum 64 sorting parameters. 1 means asc, 0 desc
188 // By default all bits are 1. Whenever we encounter DESC we flip the corresponding bit
189 uint64_t ascMap = SORTASCMAP_INIT;
190 const char **keys = NULL;
191
192 if (isLegacy) {
193 if (AC_NumRemaining(ac) > 0) {
194 // Mimic subArgs to contain the single field we already have
195 AC_GetSlice(ac, &subArgs, 1);
196 if (AC_AdvanceIfMatch(ac, "DESC")) {
197 legacyDesc = 1;
198 } else if (AC_AdvanceIfMatch(ac, "ASC")) {
199 legacyDesc = 0;
200 }
201 } else {
202 goto err;
203 }
204 } else {
205 rv = AC_GetVarArgs(ac, &subArgs);
206 if (rv != AC_OK) {
207 QERR_MKBADARGS_AC(status, "SORTBY", rv);
208 goto err;
209 }
210 }
211
212 keys = array_new(const char *, 8);
213
214 if (isLegacy) {
215 // Legacy demands one field and an optional ASC/DESC parameter. Both
216 // of these are handled above, so no need for argument parsing
217 const char *s = AC_GetStringNC(&subArgs, NULL);
218 keys = array_append(keys, s);
219
220 if (legacyDesc) {
221 SORTASCMAP_SETDESC(ascMap, 0);
222 }
223 } else {
224 while (!AC_IsAtEnd(&subArgs)) {
225
226 const char *s = AC_GetStringNC(&subArgs, NULL);
227 if (*s == '@') {
228 if (array_len(keys) >= SORTASCMAP_MAXFIELDS) {
229 QERR_MKBADARGS_FMT(status, "Cannot sort by more than %lu fields", SORTASCMAP_MAXFIELDS);
230 goto err;
231 }
232 s++;
233 keys = array_append(keys, s);
234 continue;
235 }
236
237 if (!strcasecmp(s, "ASC")) {
238 SORTASCMAP_SETASC(ascMap, array_len(keys) - 1);
239 } else if (!strcasecmp(s, "DESC")) {
240 SORTASCMAP_SETDESC(ascMap, array_len(keys) - 1);
241 } else {
242 // Unknown token - neither a property nor ASC/DESC
243 QERR_MKBADARGS_FMT(status, "MISSING ASC or DESC after sort field (%s)", s);
244 goto err;
245 }
246 }
247 }
248
249 // Parse optional MAX
250 // MAX is not included in the normal SORTBY arglist.. so we need to switch
251 // back to `ac`
252 if (AC_AdvanceIfMatch(ac, "MAX")) {
253 unsigned mx = 0;
254 if ((rv = AC_GetUnsigned(ac, &mx, 0) != AC_OK)) {
255 QERR_MKBADARGS_AC(status, "MAX", rv);
256 goto err;
257 }
258 arng->limit = mx;
259 }
260
261 arng->sortAscMap = ascMap;
262 arng->sortKeys = keys;
263 return REDISMODULE_OK;
264 err:
265 QERR_MKBADARGS_FMT(status, "Bad SORTBY arguments");
266 if (keys) {
267 array_free(keys);
268 }
269 return REDISMODULE_ERR;
270 }
271
parseQueryLegacyArgs(ArgsCursor * ac,RSSearchOptions * options,QueryError * status)272 static int parseQueryLegacyArgs(ArgsCursor *ac, RSSearchOptions *options, QueryError *status) {
273 if (AC_AdvanceIfMatch(ac, "FILTER")) {
274 // Numeric filter
275 NumericFilter **curpp = array_ensure_tail(&options->legacy.filters, NumericFilter *);
276 *curpp = NumericFilter_Parse(ac, status);
277 if (!*curpp) {
278 return ARG_ERROR;
279 }
280 } else if (AC_AdvanceIfMatch(ac, "GEOFILTER")) {
281 options->legacy.gf = rm_calloc(1, sizeof(*options->legacy.gf));
282 if (GeoFilter_Parse(options->legacy.gf, ac, status) != REDISMODULE_OK) {
283 GeoFilter_Free(options->legacy.gf);
284 return ARG_ERROR;
285 }
286 } else {
287 return ARG_UNKNOWN;
288 }
289 return ARG_HANDLED;
290 }
291
parseQueryArgs(ArgsCursor * ac,AREQ * req,RSSearchOptions * searchOpts,AggregatePlan * plan,QueryError * status)292 static int parseQueryArgs(ArgsCursor *ac, AREQ *req, RSSearchOptions *searchOpts,
293 AggregatePlan *plan, QueryError *status) {
294 // Parse query-specific arguments..
295 const char *languageStr = NULL;
296 ArgsCursor returnFields = {0};
297 ArgsCursor inKeys = {0};
298 ArgsCursor inFields = {0};
299 ACArgSpec querySpecs[] = {
300 {.name = "INFIELDS", .type = AC_ARGTYPE_SUBARGS, .target = &inFields}, // Comment
301 {.name = "SLOP",
302 .type = AC_ARGTYPE_INT,
303 .target = &searchOpts->slop,
304 .intflags = AC_F_COALESCE},
305 {.name = "LANGUAGE", .type = AC_ARGTYPE_STRING, .target = &languageStr},
306 {.name = "EXPANDER", .type = AC_ARGTYPE_STRING, .target = &searchOpts->expanderName},
307 {.name = "INKEYS", .type = AC_ARGTYPE_SUBARGS, .target = &inKeys},
308 {.name = "SCORER", .type = AC_ARGTYPE_STRING, .target = &searchOpts->scorerName},
309 {.name = "RETURN", .type = AC_ARGTYPE_SUBARGS, .target = &returnFields},
310 {AC_MKBITFLAG("INORDER", &searchOpts->flags, Search_InOrder)},
311 {AC_MKBITFLAG("VERBATIM", &searchOpts->flags, Search_Verbatim)},
312 {AC_MKBITFLAG("WITHSCORES", &req->reqflags, QEXEC_F_SEND_SCORES)},
313 {AC_MKBITFLAG("WITHSORTKEYS", &req->reqflags, QEXEC_F_SEND_SORTKEYS)},
314 {AC_MKBITFLAG("WITHPAYLOADS", &req->reqflags, QEXEC_F_SEND_PAYLOADS)},
315 {AC_MKBITFLAG("NOCONTENT", &req->reqflags, QEXEC_F_SEND_NOFIELDS)},
316 {AC_MKBITFLAG("NOSTOPWORDS", &searchOpts->flags, Search_NoStopwrods)},
317 {AC_MKBITFLAG("EXPLAINSCORE", &req->reqflags, QEXEC_F_SEND_SCOREEXPLAIN)},
318 {.name = "PAYLOAD",
319 .type = AC_ARGTYPE_STRING,
320 .target = &req->ast.udata,
321 .len = &req->ast.udatalen},
322 {NULL}};
323
324 while (!AC_IsAtEnd(ac)) {
325 ACArgSpec *errSpec = NULL;
326 int rv = AC_ParseArgSpec(ac, querySpecs, &errSpec);
327 if (rv == AC_OK) {
328 continue;
329 }
330
331 if (rv != AC_ERR_ENOENT) {
332 QERR_MKBADARGS_AC(status, errSpec->name, rv);
333 return REDISMODULE_ERR;
334 }
335
336 // See if this is one of our arguments which requires special handling
337 if (AC_AdvanceIfMatch(ac, "SUMMARIZE")) {
338 ensureSimpleMode(req);
339 if (ParseSummarize(ac, &req->outFields) == REDISMODULE_ERR) {
340 QERR_MKBADARGS_FMT(status, "Bad arguments for SUMMARIZE");
341 return REDISMODULE_ERR;
342 }
343 req->reqflags |= QEXEC_F_SEND_HIGHLIGHT;
344
345 } else if (AC_AdvanceIfMatch(ac, "HIGHLIGHT")) {
346 ensureSimpleMode(req);
347 if (ParseHighlight(ac, &req->outFields) == REDISMODULE_ERR) {
348 QERR_MKBADARGS_FMT(status, "Bad arguments for HIGHLIGHT");
349 return REDISMODULE_ERR;
350 }
351 req->reqflags |= QEXEC_F_SEND_HIGHLIGHT;
352
353 } else if ((req->reqflags & QEXEC_F_IS_SEARCH) &&
354 ((rv = parseQueryLegacyArgs(ac, searchOpts, status)) != ARG_UNKNOWN)) {
355 if (rv == ARG_ERROR) {
356 return REDISMODULE_ERR;
357 }
358 } else {
359 int rv = handleCommonArgs(req, ac, status, 1);
360 if (rv == ARG_HANDLED) {
361 // nothing
362 } else if (rv == ARG_ERROR) {
363 return REDISMODULE_ERR;
364 } else {
365 break;
366 }
367 }
368 }
369
370 if ((req->reqflags & QEXEC_F_SEND_SCOREEXPLAIN) && !(req->reqflags & QEXEC_F_SEND_SCORES)) {
371 QERR_MKBADARGS_FMT(status, "EXPLAINSCORE must be accompanied with WITHSCORES");
372 return REDISMODULE_ERR;
373 }
374
375 searchOpts->inkeys = (const char **)inKeys.objs;
376 searchOpts->ninkeys = inKeys.argc;
377 searchOpts->legacy.infields = (const char **)inFields.objs;
378 searchOpts->legacy.ninfields = inFields.argc;
379 searchOpts->language = RSLanguage_Find(languageStr);
380
381 if (AC_IsInitialized(&returnFields)) {
382 ensureSimpleMode(req);
383
384 req->outFields.explicitReturn = 1;
385 if (returnFields.argc == 0) {
386 req->reqflags |= QEXEC_F_SEND_NOFIELDS;
387 }
388
389 while (!AC_IsAtEnd(&returnFields)) {
390 const char *name = AC_GetStringNC(&returnFields, NULL);
391 ReturnedField *f = FieldList_GetCreateField(&req->outFields, name);
392 f->explicitReturn = 1;
393 }
394 }
395
396 FieldList_RestrictReturn(&req->outFields);
397 return REDISMODULE_OK;
398 }
399
getReducerAlias(PLN_GroupStep * g,const char * func,const ArgsCursor * args)400 static char *getReducerAlias(PLN_GroupStep *g, const char *func, const ArgsCursor *args) {
401
402 sds out = sdsnew("__generated_alias");
403 out = sdscat(out, func);
404 // only put parentheses if we actually have args
405 char buf[255];
406 ArgsCursor tmp = *args;
407 while (!AC_IsAtEnd(&tmp)) {
408 size_t l;
409 const char *s = AC_GetStringNC(&tmp, &l);
410 while (*s == '@') {
411 // Don't allow the leading '@' to be included as an alias!
412 ++s;
413 --l;
414 }
415 out = sdscatlen(out, s, l);
416 if (!AC_IsAtEnd(&tmp)) {
417 out = sdscat(out, ",");
418 }
419 }
420
421 // only put parentheses if we actually have args
422 sdstolower(out);
423
424 // duplicate everything. yeah this is lame but this function is not in a tight loop
425 char *dup = rm_strndup(out, sdslen(out));
426 sdsfree(out);
427 return dup;
428 }
429
groupStepFree(PLN_BaseStep * base)430 static void groupStepFree(PLN_BaseStep *base) {
431 PLN_GroupStep *g = (PLN_GroupStep *)base;
432 if (g->reducers) {
433 size_t nreducers = array_len(g->reducers);
434 for (size_t ii = 0; ii < nreducers; ++ii) {
435 PLN_Reducer *gr = g->reducers + ii;
436 rm_free(gr->alias);
437 }
438 array_free(g->reducers);
439 }
440
441 RLookup_Cleanup(&g->lookup);
442 rm_free(base);
443 }
444
groupStepGetLookup(PLN_BaseStep * bstp)445 static RLookup *groupStepGetLookup(PLN_BaseStep *bstp) {
446 return &((PLN_GroupStep *)bstp)->lookup;
447 }
448
PLNGroupStep_AddReducer(PLN_GroupStep * gstp,const char * name,ArgsCursor * ac,QueryError * status)449 int PLNGroupStep_AddReducer(PLN_GroupStep *gstp, const char *name, ArgsCursor *ac,
450 QueryError *status) {
451 // Just a list of functions..
452 PLN_Reducer *gr = array_ensure_tail(&gstp->reducers, PLN_Reducer);
453
454 gr->name = name;
455 int rv = AC_GetVarArgs(ac, &gr->args);
456 if (rv != AC_OK) {
457 QERR_MKBADARGS_AC(status, name, rv);
458 goto error;
459 }
460
461 const char *alias = NULL;
462 // See if there is an alias
463 if (AC_AdvanceIfMatch(ac, "AS")) {
464 rv = AC_GetString(ac, &alias, NULL, 0);
465 if (rv != AC_OK) {
466 QERR_MKBADARGS_AC(status, "AS", rv);
467 goto error;
468 }
469 }
470 if (alias == NULL) {
471 gr->alias = getReducerAlias(gstp, name, &gr->args);
472 } else {
473 gr->alias = rm_strdup(alias);
474 }
475 return REDISMODULE_OK;
476
477 error:
478 array_pop(gstp->reducers);
479 return REDISMODULE_ERR;
480 }
481
genericStepFree(PLN_BaseStep * p)482 static void genericStepFree(PLN_BaseStep *p) {
483 rm_free(p);
484 }
485
PLNGroupStep_New(const char ** properties,size_t nproperties)486 PLN_GroupStep *PLNGroupStep_New(const char **properties, size_t nproperties) {
487 PLN_GroupStep *gstp = rm_calloc(1, sizeof(*gstp));
488 gstp->properties = properties;
489 gstp->nproperties = nproperties;
490 gstp->base.dtor = groupStepFree;
491 gstp->base.getLookup = groupStepGetLookup;
492 gstp->base.type = PLN_T_GROUP;
493 return gstp;
494 }
495
parseGroupby(AREQ * req,ArgsCursor * ac,QueryError * status)496 static int parseGroupby(AREQ *req, ArgsCursor *ac, QueryError *status) {
497 ArgsCursor groupArgs = {0};
498 const char *s;
499 AC_GetString(ac, &s, NULL, AC_F_NOADVANCE);
500 int rv = AC_GetVarArgs(ac, &groupArgs);
501 if (rv != AC_OK) {
502 QERR_MKBADARGS_AC(status, "GROUPBY", rv);
503 return REDISMODULE_ERR;
504 }
505
506 // Number of fields.. now let's see the reducers
507 PLN_GroupStep *gstp = PLNGroupStep_New((const char **)groupArgs.objs, groupArgs.argc);
508 AGPLN_AddStep(&req->ap, &gstp->base);
509
510 while (AC_AdvanceIfMatch(ac, "REDUCE")) {
511 const char *name;
512 if (AC_GetString(ac, &name, NULL, 0) != AC_OK) {
513 QERR_MKBADARGS_AC(status, "REDUCE", rv);
514 return REDISMODULE_ERR;
515 }
516 if (PLNGroupStep_AddReducer(gstp, name, ac, status) != REDISMODULE_OK) {
517 goto error;
518 }
519 }
520 return REDISMODULE_OK;
521
522 error:
523 return REDISMODULE_ERR;
524 }
525
freeFilterStep(PLN_BaseStep * bstp)526 static void freeFilterStep(PLN_BaseStep *bstp) {
527 PLN_MapFilterStep *fstp = (PLN_MapFilterStep *)bstp;
528 if (fstp->parsedExpr) {
529 ExprAST_Free(fstp->parsedExpr);
530 }
531 if (fstp->shouldFreeRaw) {
532 rm_free((char *)fstp->rawExpr);
533 }
534 rm_free((void *)fstp->base.alias);
535 rm_free(bstp);
536 }
537
PLNMapFilterStep_New(const char * expr,int mode)538 PLN_MapFilterStep *PLNMapFilterStep_New(const char *expr, int mode) {
539 PLN_MapFilterStep *stp = rm_calloc(1, sizeof(*stp));
540 stp->base.dtor = freeFilterStep;
541 stp->base.type = mode;
542 stp->rawExpr = expr;
543 return stp;
544 }
545
handleApplyOrFilter(AREQ * req,ArgsCursor * ac,QueryError * status,int isApply)546 static int handleApplyOrFilter(AREQ *req, ArgsCursor *ac, QueryError *status, int isApply) {
547 // Parse filters!
548 const char *expr = NULL;
549 int rv = AC_GetString(ac, &expr, NULL, 0);
550 if (rv != AC_OK) {
551 QERR_MKBADARGS_AC(status, "APPLY/FILTER", rv);
552 return REDISMODULE_ERR;
553 }
554
555 PLN_MapFilterStep *stp = PLNMapFilterStep_New(expr, isApply ? PLN_T_APPLY : PLN_T_FILTER);
556 AGPLN_AddStep(&req->ap, &stp->base);
557
558 if (isApply) {
559 if (AC_AdvanceIfMatch(ac, "AS")) {
560 const char *alias;
561 if (AC_GetString(ac, &alias, NULL, 0) != AC_OK) {
562 QERR_MKBADARGS_FMT(status, "AS needs argument");
563 goto error;
564 }
565 stp->base.alias = rm_strdup(alias);
566 } else {
567 stp->base.alias = rm_strdup(expr);
568 }
569 }
570 return REDISMODULE_OK;
571
572 error:
573 if (stp) {
574 AGPLN_PopStep(&req->ap, &stp->base);
575 stp->base.dtor(&stp->base);
576 }
577 return REDISMODULE_ERR;
578 }
579
loadDtor(PLN_BaseStep * bstp)580 static void loadDtor(PLN_BaseStep *bstp) {
581 PLN_LoadStep *lstp = (PLN_LoadStep *)bstp;
582 rm_free(lstp->keys);
583 rm_free(lstp);
584 }
585
handleLoad(AREQ * req,ArgsCursor * ac,QueryError * status)586 static int handleLoad(AREQ *req, ArgsCursor *ac, QueryError *status) {
587 ArgsCursor loadfields = {0};
588 int rc = AC_GetVarArgs(ac, &loadfields);
589 if (rc != AC_OK) {
590 QERR_MKBADARGS_AC(status, "LOAD", rc);
591 return REDISMODULE_ERR;
592 }
593 PLN_LoadStep *lstp = rm_calloc(1, sizeof(*lstp));
594 lstp->base.type = PLN_T_LOAD;
595 lstp->base.dtor = loadDtor;
596 lstp->args = loadfields;
597 lstp->keys = rm_calloc(loadfields.argc, sizeof(*lstp->keys));
598
599 AGPLN_AddStep(&req->ap, &lstp->base);
600 return REDISMODULE_OK;
601 }
602
AREQ_New(void)603 AREQ *AREQ_New(void) {
604 return rm_calloc(1, sizeof(AREQ));
605 }
606
AREQ_Compile(AREQ * req,RedisModuleString ** argv,int argc,QueryError * status)607 int AREQ_Compile(AREQ *req, RedisModuleString **argv, int argc, QueryError *status) {
608 req->args = rm_malloc(sizeof(*req->args) * argc);
609 req->nargs = argc;
610 for (size_t ii = 0; ii < argc; ++ii) {
611 size_t n;
612 const char *s = RedisModule_StringPtrLen(argv[ii], &n);
613 req->args[ii] = sdsnewlen(s, n);
614 }
615
616 // Parse the query and basic keywords first..
617 ArgsCursor ac = {0};
618 ArgsCursor_InitSDS(&ac, req->args, req->nargs);
619
620 if (AC_IsAtEnd(&ac)) {
621 QueryError_SetError(status, QUERY_EPARSEARGS, "No query string provided");
622 return REDISMODULE_ERR;
623 }
624
625 req->query = AC_GetStringNC(&ac, NULL);
626 AGPLN_Init(&req->ap);
627
628 RSSearchOptions *searchOpts = &req->searchopts;
629 RSSearchOptions_Init(searchOpts);
630 if (parseQueryArgs(&ac, req, searchOpts, &req->ap, status) != REDISMODULE_OK) {
631 goto error;
632 }
633
634 int hasLoad = 0;
635
636 // Now we have a 'compiled' plan. Let's get some more options..
637
638 while (!AC_IsAtEnd(&ac)) {
639 int rv = handleCommonArgs(req, &ac, status, req->reqflags & QEXEC_F_IS_SEARCH);
640 if (rv == ARG_HANDLED) {
641 continue;
642 } else if (rv == ARG_ERROR) {
643 goto error;
644 }
645
646 if (AC_AdvanceIfMatch(&ac, "GROUPBY")) {
647 if (!ensureExtendedMode(req, "GROUPBY", status)) {
648 goto error;
649 }
650 if (parseGroupby(req, &ac, status) != REDISMODULE_OK) {
651 goto error;
652 }
653 } else if (AC_AdvanceIfMatch(&ac, "APPLY")) {
654 if (handleApplyOrFilter(req, &ac, status, 1) != REDISMODULE_OK) {
655 goto error;
656 }
657 } else if (AC_AdvanceIfMatch(&ac, "LOAD")) {
658 if (handleLoad(req, &ac, status) != REDISMODULE_OK) {
659 goto error;
660 }
661 } else if (AC_AdvanceIfMatch(&ac, "FILTER")) {
662 if (handleApplyOrFilter(req, &ac, status, 0) != REDISMODULE_OK) {
663 goto error;
664 }
665 } else {
666 QueryError_FmtUnknownArg(status, &ac, "<main>");
667 goto error;
668 }
669 }
670 return REDISMODULE_OK;
671
672 error:
673 return REDISMODULE_ERR;
674 }
675
applyGlobalFilters(RSSearchOptions * opts,QueryAST * ast,const RedisSearchCtx * sctx)676 static void applyGlobalFilters(RSSearchOptions *opts, QueryAST *ast, const RedisSearchCtx *sctx) {
677 /** The following blocks will set filter options on the entire query */
678 if (opts->legacy.filters) {
679 for (size_t ii = 0; ii < array_len(opts->legacy.filters); ++ii) {
680 QAST_GlobalFilterOptions legacyFilterOpts = {.numeric = opts->legacy.filters[ii]};
681 QAST_SetGlobalFilters(ast, &legacyFilterOpts);
682 }
683 array_clear(opts->legacy.filters); // so AREQ_Free() doesn't free the filters themselves, which
684 // are now owned by the query object
685 }
686 if (opts->legacy.gf) {
687 QAST_GlobalFilterOptions legacyOpts = {.geo = opts->legacy.gf};
688 QAST_SetGlobalFilters(ast, &legacyOpts);
689 }
690
691 if (opts->inkeys) {
692 opts->inids = rm_malloc(sizeof(*opts->inids) * opts->ninkeys);
693 for (size_t ii = 0; ii < opts->ninkeys; ++ii) {
694 t_docId did = DocTable_GetId(&sctx->spec->docs, opts->inkeys[ii], strlen(opts->inkeys[ii]));
695 if (did) {
696 opts->inids[opts->nids++] = did;
697 }
698 }
699 QAST_GlobalFilterOptions filterOpts = {.ids = opts->inids, .nids = opts->nids};
700 QAST_SetGlobalFilters(ast, &filterOpts);
701 }
702 }
703
AREQ_ApplyContext(AREQ * req,RedisSearchCtx * sctx,QueryError * status)704 int AREQ_ApplyContext(AREQ *req, RedisSearchCtx *sctx, QueryError *status) {
705 // Sort through the applicable options:
706 IndexSpec *index = sctx->spec;
707 RSSearchOptions *opts = &req->searchopts;
708 req->sctx = sctx;
709
710 if ((index->flags & Index_StoreByteOffsets) == 0 && (req->reqflags & QEXEC_F_SEND_HIGHLIGHT)) {
711 QueryError_SetError(
712 status, QUERY_EINVAL,
713 "Cannot use highlight/summarize because NOOFSETS was specified at index level");
714 return REDISMODULE_ERR;
715 }
716
717 // Go through the query options and see what else needs to be filled in!
718 // 1) INFIELDS
719 if (opts->legacy.ninfields) {
720 opts->fieldmask = 0;
721 for (size_t ii = 0; ii < opts->legacy.ninfields; ++ii) {
722 const char *s = opts->legacy.infields[ii];
723 t_fieldMask bit = IndexSpec_GetFieldBit(index, s, strlen(s));
724 opts->fieldmask |= bit;
725 }
726 }
727
728 if (opts->language == RS_LANG_UNSUPPORTED) {
729 QueryError_SetError(status, QUERY_EINVAL, "No such language");
730 return REDISMODULE_ERR;
731 }
732 if (opts->scorerName && Extensions_GetScoringFunction(NULL, opts->scorerName) == NULL) {
733 QueryError_SetErrorFmt(status, QUERY_EINVAL, "No such scorer %s", opts->scorerName);
734 return REDISMODULE_ERR;
735 }
736 if (!(opts->flags & Search_NoStopwrods)) {
737 opts->stopwords = sctx->spec->stopwords;
738 StopWordList_Ref(sctx->spec->stopwords);
739 }
740
741 QueryAST *ast = &req->ast;
742
743 int rv = QAST_Parse(ast, sctx, &req->searchopts, req->query, strlen(req->query), status);
744 if (rv != REDISMODULE_OK) {
745 return REDISMODULE_ERR;
746 }
747
748 applyGlobalFilters(opts, ast, sctx);
749
750 if (!(opts->flags & Search_Verbatim)) {
751 if (QAST_Expand(ast, opts->expanderName, opts, sctx, status) != REDISMODULE_OK) {
752 return REDISMODULE_ERR;
753 }
754 }
755
756 ConcurrentSearchCtx_Init(sctx->redisCtx, &req->conc);
757 req->rootiter = QAST_Iterate(ast, opts, sctx, &req->conc);
758 assert(req->rootiter);
759
760 return REDISMODULE_OK;
761 }
762
buildGroupRP(PLN_GroupStep * gstp,RLookup * srclookup,QueryError * err)763 static ResultProcessor *buildGroupRP(PLN_GroupStep *gstp, RLookup *srclookup, QueryError *err) {
764 const RLookupKey *srckeys[gstp->nproperties], *dstkeys[gstp->nproperties];
765 for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
766 const char *fldname = gstp->properties[ii] + 1; // account for the @-
767 srckeys[ii] = RLookup_GetKey(srclookup, fldname, RLOOKUP_F_NOINCREF);
768 if (!srckeys[ii]) {
769 QueryError_SetErrorFmt(err, QUERY_ENOPROPKEY, "No such property `%s`", fldname);
770 return NULL;
771 }
772 dstkeys[ii] = RLookup_GetKey(&gstp->lookup, fldname, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
773 }
774
775 Grouper *grp = Grouper_New(srckeys, dstkeys, gstp->nproperties);
776
777 size_t nreducers = array_len(gstp->reducers);
778 for (size_t ii = 0; ii < nreducers; ++ii) {
779 // Build the actual reducer
780 PLN_Reducer *pr = gstp->reducers + ii;
781 ReducerOptions options = REDUCEROPTS_INIT(pr->name, &pr->args, srclookup, err);
782 ReducerFactory ff = RDCR_GetFactory(pr->name);
783 if (!ff) {
784 // No such reducer!
785 Grouper_Free(grp);
786 QueryError_SetErrorFmt(err, QUERY_ENOREDUCER, "No such reducer: %s", pr->name);
787 return NULL;
788 }
789 Reducer *rr = ff(&options);
790 if (!rr) {
791 Grouper_Free(grp);
792 return NULL;
793 }
794
795 // Set the destination key for the grouper!
796 RLookupKey *dstkey =
797 RLookup_GetKey(&gstp->lookup, pr->alias, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
798 Grouper_AddReducer(grp, rr, dstkey);
799 }
800
801 return Grouper_GetRP(grp);
802 }
803
804 /** Pushes a processor up the stack. Returns the newly pushed processor
805 * @param req the request
806 * @param rp the processor to push
807 * @param rpUpstream previous processor (used as source for rp)
808 * @return the processor passed in `rp`.
809 */
pushRP(AREQ * req,ResultProcessor * rp,ResultProcessor * rpUpstream)810 static ResultProcessor *pushRP(AREQ *req, ResultProcessor *rp, ResultProcessor *rpUpstream) {
811 rp->upstream = rpUpstream;
812 rp->parent = &req->qiter;
813 req->qiter.endProc = rp;
814 return rp;
815 }
816
getGroupRP(AREQ * req,PLN_GroupStep * gstp,ResultProcessor * rpUpstream,QueryError * status)817 static ResultProcessor *getGroupRP(AREQ *req, PLN_GroupStep *gstp, ResultProcessor *rpUpstream,
818 QueryError *status) {
819 AGGPlan *pln = &req->ap;
820 RLookup *lookup = AGPLN_GetLookup(pln, &gstp->base, AGPLN_GETLOOKUP_PREV);
821 ResultProcessor *groupRP = buildGroupRP(gstp, lookup, status);
822
823 if (!groupRP) {
824 return NULL;
825 }
826
827 // See if we need a LOADER group here...?
828 RLookup *firstLk = AGPLN_GetLookup(pln, &gstp->base, AGPLN_GETLOOKUP_FIRST);
829
830 if (firstLk == lookup) {
831 // See if we need a loader step?
832 const RLookupKey **kklist = NULL;
833 for (RLookupKey *kk = firstLk->head; kk; kk = kk->next) {
834 if ((kk->flags & RLOOKUP_F_DOCSRC) && (!(kk->flags & RLOOKUP_F_SVSRC))) {
835 *array_ensure_tail(&kklist, const RLookupKey *) = kk;
836 }
837 }
838 if (kklist != NULL) {
839 ResultProcessor *rpLoader = RPLoader_New(firstLk, kklist, array_len(kklist));
840 array_free(kklist);
841 assert(rpLoader);
842 rpUpstream = pushRP(req, rpLoader, rpUpstream);
843 }
844 }
845
846 return pushRP(req, groupRP, rpUpstream);
847 }
848
849 #define DEFAULT_LIMIT 10
850
getArrangeRP(AREQ * req,AGGPlan * pln,const PLN_BaseStep * stp,QueryError * status,ResultProcessor * up)851 static ResultProcessor *getArrangeRP(AREQ *req, AGGPlan *pln, const PLN_BaseStep *stp,
852 QueryError *status, ResultProcessor *up) {
853 ResultProcessor *rp = NULL;
854 PLN_ArrangeStep astp_s = {.base = {.type = PLN_T_ARRANGE}};
855 PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
856
857 if (!astp) {
858 astp = &astp_s;
859 }
860
861 size_t limit = astp->offset + astp->limit;
862 if (!limit) {
863 limit = DEFAULT_LIMIT;
864 }
865
866 if (astp->sortKeys) {
867 size_t nkeys = array_len(astp->sortKeys);
868 astp->sortkeysLK = rm_malloc(sizeof(*astp->sortKeys) * nkeys);
869
870 const RLookupKey **sortkeys = astp->sortkeysLK;
871
872 RLookup *lk = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
873
874 for (size_t ii = 0; ii < nkeys; ++ii) {
875 sortkeys[ii] = RLookup_GetKey(lk, astp->sortKeys[ii], RLOOKUP_F_NOINCREF);
876 if (!sortkeys[ii]) {
877 QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "Property `%s` not loaded nor in schema",
878 astp->sortKeys[ii]);
879 return NULL;
880 }
881 }
882
883 rp = RPSorter_NewByFields(limit, sortkeys, nkeys, astp->sortAscMap);
884 up = pushRP(req, rp, up);
885 }
886
887 // No sort? then it must be sort by score, which is the default.
888 if (rp == NULL && (req->reqflags & QEXEC_F_IS_SEARCH)) {
889 rp = RPSorter_NewByScore(limit);
890 up = pushRP(req, rp, up);
891 }
892
893 if (astp->offset || (astp->limit && !rp)) {
894 rp = RPPager_New(astp->offset, astp->limit);
895 up = pushRP(req, rp, up);
896 }
897
898 return rp;
899 }
900
getScorerRP(AREQ * req)901 static ResultProcessor *getScorerRP(AREQ *req) {
902 const char *scorer = req->searchopts.scorerName;
903 if (!scorer) {
904 scorer = DEFAULT_SCORER_NAME;
905 }
906 ScoringFunctionArgs scargs = {0};
907 if (req->reqflags & QEXEC_F_SEND_SCOREEXPLAIN) {
908 scargs.scrExp = rm_calloc(1, sizeof(RSScoreExplain));
909 }
910 ExtScoringFunctionCtx *fns = Extensions_GetScoringFunction(&scargs, scorer);
911 assert(fns);
912 IndexSpec_GetStats(req->sctx->spec, &scargs.indexStats);
913 scargs.qdata = req->ast.udata;
914 scargs.qdatalen = req->ast.udatalen;
915 ResultProcessor *rp = RPScorer_New(fns, &scargs);
916 return rp;
917 }
918
hasQuerySortby(const AGGPlan * pln)919 static int hasQuerySortby(const AGGPlan *pln) {
920 const PLN_BaseStep *bstp = AGPLN_FindStep(pln, NULL, NULL, PLN_T_GROUP);
921 if (bstp != NULL) {
922 const PLN_ArrangeStep *arng = (PLN_ArrangeStep *)AGPLN_FindStep(pln, NULL, bstp, PLN_T_ARRANGE);
923 if (arng && arng->sortKeys) {
924 return 1;
925 }
926 } else {
927 // no group... just see if we have an arrange step
928 const PLN_ArrangeStep *arng = (PLN_ArrangeStep *)AGPLN_FindStep(pln, NULL, NULL, PLN_T_ARRANGE);
929 return arng && arng->sortKeys;
930 }
931 return 0;
932 }
933
934 #define PUSH_RP() \
935 rpUpstream = pushRP(req, rp, rpUpstream); \
936 rp = NULL;
937
938 /**
939 * Builds the implicit pipeline for querying and scoring, and ensures that our
940 * subsequent execution stages actually have data to operate on.
941 */
buildImplicitPipeline(AREQ * req,QueryError * Status)942 static void buildImplicitPipeline(AREQ *req, QueryError *Status) {
943 RedisSearchCtx *sctx = req->sctx;
944 req->qiter.conc = &req->conc;
945 req->qiter.sctx = sctx;
946 req->qiter.err = Status;
947
948 IndexSpecCache *cache = IndexSpec_GetSpecCache(req->sctx->spec);
949 assert(cache);
950 RLookup *first = AGPLN_GetLookup(&req->ap, NULL, AGPLN_GETLOOKUP_FIRST);
951
952 RLookup_Init(first, cache);
953
954 ResultProcessor *rp = RPIndexIterator_New(req->rootiter);
955 ResultProcessor *rpUpstream = NULL;
956 req->qiter.rootProc = req->qiter.endProc = rp;
957 PUSH_RP();
958
959 /** Create a scorer if there is no subsequent sorter within this grouping */
960 if (!hasQuerySortby(&req->ap) && (req->reqflags & QEXEC_F_IS_SEARCH)) {
961 rp = getScorerRP(req);
962 PUSH_RP();
963 }
964 }
965
966 /**
967 * This handles the RETURN and SUMMARIZE keywords, which operate on the result
968 * which is about to be returned. It is only used in FT.SEARCH mode
969 */
buildOutputPipeline(AREQ * req,QueryError * status)970 int buildOutputPipeline(AREQ *req, QueryError *status) {
971 AGGPlan *pln = &req->ap;
972 ResultProcessor *rp = NULL, *rpUpstream = req->qiter.endProc;
973
974 RLookup *lookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_LAST);
975 // Add a LOAD step...
976 const RLookupKey **loadkeys = NULL;
977 if (req->outFields.explicitReturn) {
978 // Go through all the fields and ensure that each one exists in the lookup stage
979 for (size_t ii = 0; ii < req->outFields.numFields; ++ii) {
980 const ReturnedField *rf = req->outFields.fields + ii;
981 RLookupKey *lk = RLookup_GetKey(lookup, rf->name, RLOOKUP_F_NOINCREF | RLOOKUP_F_OCREAT);
982 if (!lk) {
983 // TODO: this is a dead code
984 QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "Property '%s' not loaded or in schema",
985 rf->name);
986 goto error;
987 }
988 *array_ensure_tail(&loadkeys, const RLookupKey *) = lk;
989 // assign explicit output flag
990 lk->flags |= RLOOKUP_F_EXPLICITRETURN;
991 }
992 }
993 rp = RPLoader_New(lookup, loadkeys, loadkeys ? array_len(loadkeys) : 0);
994 if (loadkeys) {
995 array_free(loadkeys);
996 }
997 PUSH_RP();
998
999 if (req->reqflags & QEXEC_F_SEND_HIGHLIGHT) {
1000 RLookup *lookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_LAST);
1001 for (size_t ii = 0; ii < req->outFields.numFields; ++ii) {
1002 ReturnedField *ff = req->outFields.fields + ii;
1003 RLookupKey *kk = RLookup_GetKey(lookup, ff->name, 0);
1004 if (!kk) {
1005 QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "No such property `%s`", ff->name);
1006 goto error;
1007 } else if (!(kk->flags & (RLOOKUP_F_DOCSRC | RLOOKUP_F_SVSRC))) {
1008 // TODO: this is a dead code
1009 QueryError_SetErrorFmt(status, QUERY_EINVAL, "Property `%s` is not in document", ff->name);
1010 goto error;
1011 }
1012 ff->lookupKey = kk;
1013 }
1014 rp = RPHighlighter_New(&req->searchopts, &req->outFields, lookup);
1015 PUSH_RP();
1016 }
1017
1018 return REDISMODULE_OK;
1019 error:
1020 return REDISMODULE_ERR;
1021 }
1022
AREQ_BuildPipeline(AREQ * req,int options,QueryError * status)1023 int AREQ_BuildPipeline(AREQ *req, int options, QueryError *status) {
1024 if (!(options & AREQ_BUILDPIPELINE_NO_ROOT)) {
1025 buildImplicitPipeline(req, status);
1026 }
1027
1028 AGGPlan *pln = &req->ap;
1029 ResultProcessor *rp = NULL, *rpUpstream = req->qiter.endProc;
1030
1031 // Whether we've applied a SORTBY yet..
1032 int hasArrange = 0;
1033
1034 for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
1035 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
1036
1037 switch (stp->type) {
1038 case PLN_T_GROUP: {
1039 rpUpstream = getGroupRP(req, (PLN_GroupStep *)stp, rpUpstream, status);
1040 if (!rpUpstream) {
1041 goto error;
1042 }
1043 break;
1044 }
1045
1046 case PLN_T_ARRANGE: {
1047 rp = getArrangeRP(req, pln, stp, status, rpUpstream);
1048 if (!rp) {
1049 goto error;
1050 }
1051 hasArrange = 1;
1052 rpUpstream = rp;
1053 break;
1054 }
1055
1056 case PLN_T_APPLY:
1057 case PLN_T_FILTER: {
1058 PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
1059 // Ensure the lookups can actually find what they need
1060 RLookup *curLookup = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
1061 mstp->parsedExpr = ExprAST_Parse(mstp->rawExpr, strlen(mstp->rawExpr), status);
1062 if (!mstp->parsedExpr) {
1063 goto error;
1064 }
1065
1066 if (!ExprAST_GetLookupKeys(mstp->parsedExpr, curLookup, status)) {
1067 goto error;
1068 }
1069
1070 if (stp->type == PLN_T_APPLY) {
1071 RLookupKey *dstkey =
1072 RLookup_GetKey(curLookup, stp->alias, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
1073 rp = RPEvaluator_NewProjector(mstp->parsedExpr, curLookup, dstkey);
1074 } else {
1075 rp = RPEvaluator_NewFilter(mstp->parsedExpr, curLookup);
1076 }
1077 PUSH_RP();
1078 break;
1079 }
1080
1081 case PLN_T_LOAD: {
1082 PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
1083 RLookup *curLookup = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
1084 RLookup *rootLookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_FIRST);
1085 if (curLookup != rootLookup) {
1086 QueryError_SetError(status, QUERY_EINVAL,
1087 "LOAD cannot be applied after projectors or reducers");
1088 goto error;
1089 }
1090 // Get all the keys for this lookup...
1091 while (!AC_IsAtEnd(&lstp->args)) {
1092 const char *s = AC_GetStringNC(&lstp->args, NULL);
1093 if (*s == '@') {
1094 s++;
1095 }
1096 const RLookupKey *kk = RLookup_GetKey(curLookup, s, RLOOKUP_F_OEXCL | RLOOKUP_F_OCREAT);
1097 if (!kk) {
1098 // We only get a NULL return if the key already exists, which means
1099 // that we don't need to retrieve it again.
1100 continue;
1101 }
1102 lstp->keys[lstp->nkeys++] = kk;
1103 }
1104 if (lstp->nkeys) {
1105 rp = RPLoader_New(curLookup, lstp->keys, lstp->nkeys);
1106 PUSH_RP();
1107 }
1108 break;
1109 }
1110 case PLN_T_ROOT:
1111 // Placeholder step for initial lookup
1112 break;
1113 case PLN_T_DISTRIBUTE:
1114 // This is the root already
1115 break;
1116
1117 case PLN_T_INVALID:
1118 case PLN_T__MAX:
1119 // not handled yet
1120 abort();
1121 }
1122 }
1123
1124 // If no LIMIT or SORT has been applied, do it somewhere here so we don't
1125 // return the entire matching result set!
1126 if (!hasArrange && (req->reqflags & QEXEC_F_IS_SEARCH)) {
1127 rp = getArrangeRP(req, pln, NULL, status, rpUpstream);
1128 if (!rp) {
1129 goto error;
1130 }
1131 rpUpstream = rp;
1132 }
1133
1134 // If this is an FT.SEARCH command which requires returning of some of the
1135 // document fields, handle those options in this function
1136 if ((req->reqflags & QEXEC_F_IS_SEARCH) && !(req->reqflags & QEXEC_F_SEND_NOFIELDS)) {
1137 if (buildOutputPipeline(req, status) != REDISMODULE_OK) {
1138 goto error;
1139 }
1140 }
1141
1142 return REDISMODULE_OK;
1143 error:
1144 return REDISMODULE_ERR;
1145 }
1146
AREQ_Free(AREQ * req)1147 void AREQ_Free(AREQ *req) {
1148 // First, free the result processors
1149 ResultProcessor *rp = req->qiter.endProc;
1150 while (rp) {
1151 ResultProcessor *next = rp->upstream;
1152 rp->Free(rp);
1153 rp = next;
1154 }
1155 if (req->rootiter) {
1156 req->rootiter->Free(req->rootiter);
1157 req->rootiter = NULL;
1158 }
1159
1160 // Go through each of the steps and free it..
1161 AGPLN_FreeSteps(&req->ap);
1162
1163 QAST_Destroy(&req->ast);
1164
1165 if (req->searchopts.stopwords) {
1166 StopWordList_Unref((StopWordList *)req->searchopts.stopwords);
1167 }
1168
1169 ConcurrentSearchCtx_Free(&req->conc);
1170
1171 // Finally, free the context. If we are a cursor, some more
1172 // cleanup is required since we also now own the
1173 // detached ("Thread Safe") context.
1174 RedisModuleCtx *thctx = NULL;
1175 if (req->sctx) {
1176 if (req->reqflags & QEXEC_F_IS_CURSOR) {
1177 thctx = req->sctx->redisCtx;
1178 req->sctx->redisCtx = NULL;
1179 }
1180 SearchCtx_Decref(req->sctx);
1181 }
1182 for (size_t ii = 0; ii < req->nargs; ++ii) {
1183 sdsfree(req->args[ii]);
1184 }
1185 if (req->searchopts.legacy.filters) {
1186 for (size_t ii = 0; ii < array_len(req->searchopts.legacy.filters); ++ii) {
1187 NumericFilter *nf = req->searchopts.legacy.filters[ii];
1188 if (nf) {
1189 NumericFilter_Free(req->searchopts.legacy.filters[ii]);
1190 }
1191 }
1192 array_free(req->searchopts.legacy.filters);
1193 }
1194 rm_free(req->searchopts.inids);
1195 FieldList_Free(&req->outFields);
1196 if (thctx) {
1197 RedisModule_FreeThreadSafeContext(thctx);
1198 }
1199 rm_free(req->args);
1200 rm_free(req);
1201 }
1202