1 #include "aggregate.h"
2 #include "reducer.h"
3 
4 #include <query.h>
5 #include <extension.h>
6 #include <result_processor.h>
7 #include <util/arr.h>
8 #include <rmutil/util.h>
9 #include "ext/default.h"
10 #include "extension.h"
11 
12 /**
13  * Ensures that the user has not requested one of the 'extended' features. Extended
14  * in this case refers to reducers which re-create the search results.
15  * @param areq the request
16  * @param name the name of the option that requires simple mode. Used for error
17  *   formatting
18  * @param status the error object
19  */
ensureSimpleMode(AREQ * areq)20 static void ensureSimpleMode(AREQ *areq) {
21   assert(!(areq->reqflags & QEXEC_F_IS_EXTENDED));
22   areq->reqflags |= QEXEC_F_IS_SEARCH;
23 }
24 
25 /**
26  * Like @ref ensureSimpleMode(), but does the opposite -- ensures that one of the
27  * 'simple' options - i.e. ones which rely on the field to be the exact same as
28  * found in the document - was not requested.
29  */
ensureExtendedMode(AREQ * areq,const char * name,QueryError * status)30 static int ensureExtendedMode(AREQ *areq, const char *name, QueryError *status) {
31   if (areq->reqflags & QEXEC_F_IS_SEARCH) {
32     QueryError_SetErrorFmt(status, QUERY_EINVAL,
33                            "option `%s` is mutually exclusive with simple (i.e. search) options",
34                            name);
35     return 0;
36   }
37   areq->reqflags |= QEXEC_F_IS_EXTENDED;
38   return 1;
39 }
40 
41 static int parseSortby(PLN_ArrangeStep *arng, ArgsCursor *ac, QueryError *status, int allowLegacy);
42 
ReturnedField_Free(ReturnedField * field)43 static void ReturnedField_Free(ReturnedField *field) {
44   rm_free(field->highlightSettings.openTag);
45   rm_free(field->highlightSettings.closeTag);
46   rm_free(field->summarizeSettings.separator);
47 }
48 
FieldList_Free(FieldList * fields)49 void FieldList_Free(FieldList *fields) {
50   for (size_t ii = 0; ii < fields->numFields; ++ii) {
51     ReturnedField_Free(fields->fields + ii);
52   }
53   ReturnedField_Free(&fields->defaultField);
54   rm_free(fields->fields);
55 }
56 
FieldList_GetCreateField(FieldList * fields,const char * name)57 ReturnedField *FieldList_GetCreateField(FieldList *fields, const char *name) {
58   size_t foundIndex = -1;
59   for (size_t ii = 0; ii < fields->numFields; ++ii) {
60     if (!strcasecmp(fields->fields[ii].name, name)) {
61       return fields->fields + ii;
62     }
63   }
64 
65   fields->fields = rm_realloc(fields->fields, sizeof(*fields->fields) * ++fields->numFields);
66   ReturnedField *ret = fields->fields + (fields->numFields - 1);
67   memset(ret, 0, sizeof *ret);
68   ret->name = name;
69   return ret;
70 }
71 
FieldList_RestrictReturn(FieldList * fields)72 static void FieldList_RestrictReturn(FieldList *fields) {
73   if (!fields->explicitReturn) {
74     return;
75   }
76 
77   size_t oix = 0;
78   for (size_t ii = 0; ii < fields->numFields; ++ii) {
79     if (fields->fields[ii].explicitReturn == 0) {
80       ReturnedField_Free(fields->fields + ii);
81     } else if (ii != oix) {
82       fields->fields[oix++] = fields->fields[ii];
83     } else {
84       ++oix;
85     }
86   }
87   fields->numFields = oix;
88 }
89 
parseCursorSettings(AREQ * req,ArgsCursor * ac,QueryError * status)90 static int parseCursorSettings(AREQ *req, ArgsCursor *ac, QueryError *status) {
91   ACArgSpec specs[] = {{.name = "MAXIDLE",
92                         .type = AC_ARGTYPE_UINT,
93                         .target = &req->cursorMaxIdle,
94                         .intflags = AC_F_GE1},
95                        {.name = "COUNT",
96                         .type = AC_ARGTYPE_UINT,
97                         .target = &req->cursorChunkSize,
98                         .intflags = AC_F_GE1},
99                        {NULL}};
100 
101   int rv;
102   ACArgSpec *errArg = NULL;
103   if ((rv = AC_ParseArgSpec(ac, specs, &errArg)) != AC_OK && rv != AC_ERR_ENOENT) {
104     QERR_MKBADARGS_AC(status, errArg->name, rv);
105     return REDISMODULE_ERR;
106   }
107 
108   if (req->cursorMaxIdle == 0 || req->cursorMaxIdle > RSGlobalConfig.cursorMaxIdle) {
109     req->cursorMaxIdle = RSGlobalConfig.cursorMaxIdle;
110   }
111   req->reqflags |= QEXEC_F_IS_CURSOR;
112   return REDISMODULE_OK;
113 }
114 
115 #define ARG_HANDLED 1
116 #define ARG_ERROR -1
117 #define ARG_UNKNOWN 0
118 
handleCommonArgs(AREQ * req,ArgsCursor * ac,QueryError * status,int allowLegacy)119 static int handleCommonArgs(AREQ *req, ArgsCursor *ac, QueryError *status, int allowLegacy) {
120   int rv;
121   // This handles the common arguments that are not stateful
122   if (AC_AdvanceIfMatch(ac, "LIMIT")) {
123     PLN_ArrangeStep *arng = AGPLN_GetOrCreateArrangeStep(&req->ap);
124     // Parse offset, length
125     if (AC_NumRemaining(ac) < 2) {
126       QueryError_SetError(status, QUERY_EPARSEARGS, "LIMIT requires two arguments");
127       return ARG_ERROR;
128     }
129     if ((rv = AC_GetU64(ac, &arng->offset, 0)) != AC_OK ||
130         (rv = AC_GetU64(ac, &arng->limit, 0)) != AC_OK) {
131       QueryError_SetError(status, QUERY_EPARSEARGS, "LIMIT needs two numeric arguments");
132       return ARG_ERROR;
133     }
134 
135     if (arng->limit == 0) {
136       // LIMIT 0 0
137       req->reqflags |= QEXEC_F_NOROWS;
138     } else if ((arng->limit > RSGlobalConfig.maxSearchResults) && (req->reqflags & QEXEC_F_IS_SEARCH)) {
139       QueryError_SetErrorFmt(status, QUERY_ELIMIT, "LIMIT exceeds maximum of %llu",
140                              RSGlobalConfig.maxSearchResults);
141       return ARG_ERROR;
142     }
143   } else if (AC_AdvanceIfMatch(ac, "SORTBY")) {
144     PLN_ArrangeStep *arng = AGPLN_GetOrCreateArrangeStep(&req->ap);
145     if ((parseSortby(arng, ac, status, req->reqflags & QEXEC_F_IS_SEARCH)) != REDISMODULE_OK) {
146       return ARG_ERROR;
147     }
148   } else if (AC_AdvanceIfMatch(ac, "ON_TIMEOUT")) {
149     if (AC_NumRemaining(ac) < 1) {
150       QueryError_SetError(status, QUERY_EPARSEARGS, "Need argument for ON_TIMEOUT");
151       return ARG_ERROR;
152     }
153     const char *policystr = AC_GetStringNC(ac, NULL);
154     req->tmoPolicy = TimeoutPolicy_Parse(policystr, strlen(policystr));
155     if (req->tmoPolicy == TimeoutPolicy_Invalid) {
156       QueryError_SetErrorFmt(status, QUERY_EPARSEARGS, "'%s' is not a valid timeout policy",
157                              policystr);
158       return ARG_ERROR;
159     }
160   } else if (AC_AdvanceIfMatch(ac, "WITHCURSOR")) {
161     if (parseCursorSettings(req, ac, status) != REDISMODULE_OK) {
162       return ARG_ERROR;
163     }
164   } else if (AC_AdvanceIfMatch(ac, "_NUM_SSTRING")) {
165     req->reqflags |= QEXEC_F_TYPED;
166   } else if (AC_AdvanceIfMatch(ac, "WITHRAWIDS")) {
167     req->reqflags |= QEXEC_F_SENDRAWIDS;
168   } else {
169     return ARG_UNKNOWN;
170   }
171 
172   return ARG_HANDLED;
173 }
174 
parseSortby(PLN_ArrangeStep * arng,ArgsCursor * ac,QueryError * status,int isLegacy)175 static int parseSortby(PLN_ArrangeStep *arng, ArgsCursor *ac, QueryError *status, int isLegacy) {
176   // Prevent multiple SORTBY steps
177   if (arng->sortKeys != NULL) {
178     QERR_MKBADARGS_FMT(status, "Multiple SORTBY steps are not allowed. Sort multiple fields in a single step");
179     return REDISMODULE_ERR;
180   }
181 
182   // Assume argument is at 'SORTBY'
183   ArgsCursor subArgs = {0};
184   int rv;
185   int legacyDesc = 0;
186 
187   // We build a bitmap of maximum 64 sorting parameters. 1 means asc, 0 desc
188   // By default all bits are 1. Whenever we encounter DESC we flip the corresponding bit
189   uint64_t ascMap = SORTASCMAP_INIT;
190   const char **keys = NULL;
191 
192   if (isLegacy) {
193     if (AC_NumRemaining(ac) > 0) {
194       // Mimic subArgs to contain the single field we already have
195       AC_GetSlice(ac, &subArgs, 1);
196       if (AC_AdvanceIfMatch(ac, "DESC")) {
197         legacyDesc = 1;
198       } else if (AC_AdvanceIfMatch(ac, "ASC")) {
199         legacyDesc = 0;
200       }
201     } else {
202       goto err;
203     }
204   } else {
205     rv = AC_GetVarArgs(ac, &subArgs);
206     if (rv != AC_OK) {
207       QERR_MKBADARGS_AC(status, "SORTBY", rv);
208       goto err;
209     }
210   }
211 
212   keys = array_new(const char *, 8);
213 
214   if (isLegacy) {
215     // Legacy demands one field and an optional ASC/DESC parameter. Both
216     // of these are handled above, so no need for argument parsing
217     const char *s = AC_GetStringNC(&subArgs, NULL);
218     keys = array_append(keys, s);
219 
220     if (legacyDesc) {
221       SORTASCMAP_SETDESC(ascMap, 0);
222     }
223   } else {
224     while (!AC_IsAtEnd(&subArgs)) {
225 
226       const char *s = AC_GetStringNC(&subArgs, NULL);
227       if (*s == '@') {
228         if (array_len(keys) >= SORTASCMAP_MAXFIELDS) {
229           QERR_MKBADARGS_FMT(status, "Cannot sort by more than %lu fields", SORTASCMAP_MAXFIELDS);
230           goto err;
231         }
232         s++;
233         keys = array_append(keys, s);
234         continue;
235       }
236 
237       if (!strcasecmp(s, "ASC")) {
238         SORTASCMAP_SETASC(ascMap, array_len(keys) - 1);
239       } else if (!strcasecmp(s, "DESC")) {
240         SORTASCMAP_SETDESC(ascMap, array_len(keys) - 1);
241       } else {
242         // Unknown token - neither a property nor ASC/DESC
243         QERR_MKBADARGS_FMT(status, "MISSING ASC or DESC after sort field (%s)", s);
244         goto err;
245       }
246     }
247   }
248 
249   // Parse optional MAX
250   // MAX is not included in the normal SORTBY arglist.. so we need to switch
251   // back to `ac`
252   if (AC_AdvanceIfMatch(ac, "MAX")) {
253     unsigned mx = 0;
254     if ((rv = AC_GetUnsigned(ac, &mx, 0) != AC_OK)) {
255       QERR_MKBADARGS_AC(status, "MAX", rv);
256       goto err;
257     }
258     arng->limit = mx;
259   }
260 
261   arng->sortAscMap = ascMap;
262   arng->sortKeys = keys;
263   return REDISMODULE_OK;
264 err:
265   QERR_MKBADARGS_FMT(status, "Bad SORTBY arguments");
266   if (keys) {
267     array_free(keys);
268   }
269   return REDISMODULE_ERR;
270 }
271 
parseQueryLegacyArgs(ArgsCursor * ac,RSSearchOptions * options,QueryError * status)272 static int parseQueryLegacyArgs(ArgsCursor *ac, RSSearchOptions *options, QueryError *status) {
273   if (AC_AdvanceIfMatch(ac, "FILTER")) {
274     // Numeric filter
275     NumericFilter **curpp = array_ensure_tail(&options->legacy.filters, NumericFilter *);
276     *curpp = NumericFilter_Parse(ac, status);
277     if (!*curpp) {
278       return ARG_ERROR;
279     }
280   } else if (AC_AdvanceIfMatch(ac, "GEOFILTER")) {
281     options->legacy.gf = rm_calloc(1, sizeof(*options->legacy.gf));
282     if (GeoFilter_Parse(options->legacy.gf, ac, status) != REDISMODULE_OK) {
283       GeoFilter_Free(options->legacy.gf);
284       return ARG_ERROR;
285     }
286   } else {
287     return ARG_UNKNOWN;
288   }
289   return ARG_HANDLED;
290 }
291 
parseQueryArgs(ArgsCursor * ac,AREQ * req,RSSearchOptions * searchOpts,AggregatePlan * plan,QueryError * status)292 static int parseQueryArgs(ArgsCursor *ac, AREQ *req, RSSearchOptions *searchOpts,
293                           AggregatePlan *plan, QueryError *status) {
294   // Parse query-specific arguments..
295   const char *languageStr = NULL;
296   ArgsCursor returnFields = {0};
297   ArgsCursor inKeys = {0};
298   ArgsCursor inFields = {0};
299   ACArgSpec querySpecs[] = {
300       {.name = "INFIELDS", .type = AC_ARGTYPE_SUBARGS, .target = &inFields},  // Comment
301       {.name = "SLOP",
302        .type = AC_ARGTYPE_INT,
303        .target = &searchOpts->slop,
304        .intflags = AC_F_COALESCE},
305       {.name = "LANGUAGE", .type = AC_ARGTYPE_STRING, .target = &languageStr},
306       {.name = "EXPANDER", .type = AC_ARGTYPE_STRING, .target = &searchOpts->expanderName},
307       {.name = "INKEYS", .type = AC_ARGTYPE_SUBARGS, .target = &inKeys},
308       {.name = "SCORER", .type = AC_ARGTYPE_STRING, .target = &searchOpts->scorerName},
309       {.name = "RETURN", .type = AC_ARGTYPE_SUBARGS, .target = &returnFields},
310       {AC_MKBITFLAG("INORDER", &searchOpts->flags, Search_InOrder)},
311       {AC_MKBITFLAG("VERBATIM", &searchOpts->flags, Search_Verbatim)},
312       {AC_MKBITFLAG("WITHSCORES", &req->reqflags, QEXEC_F_SEND_SCORES)},
313       {AC_MKBITFLAG("WITHSORTKEYS", &req->reqflags, QEXEC_F_SEND_SORTKEYS)},
314       {AC_MKBITFLAG("WITHPAYLOADS", &req->reqflags, QEXEC_F_SEND_PAYLOADS)},
315       {AC_MKBITFLAG("NOCONTENT", &req->reqflags, QEXEC_F_SEND_NOFIELDS)},
316       {AC_MKBITFLAG("NOSTOPWORDS", &searchOpts->flags, Search_NoStopwrods)},
317       {AC_MKBITFLAG("EXPLAINSCORE", &req->reqflags, QEXEC_F_SEND_SCOREEXPLAIN)},
318       {.name = "PAYLOAD",
319        .type = AC_ARGTYPE_STRING,
320        .target = &req->ast.udata,
321        .len = &req->ast.udatalen},
322       {NULL}};
323 
324   while (!AC_IsAtEnd(ac)) {
325     ACArgSpec *errSpec = NULL;
326     int rv = AC_ParseArgSpec(ac, querySpecs, &errSpec);
327     if (rv == AC_OK) {
328       continue;
329     }
330 
331     if (rv != AC_ERR_ENOENT) {
332       QERR_MKBADARGS_AC(status, errSpec->name, rv);
333       return REDISMODULE_ERR;
334     }
335 
336     // See if this is one of our arguments which requires special handling
337     if (AC_AdvanceIfMatch(ac, "SUMMARIZE")) {
338       ensureSimpleMode(req);
339       if (ParseSummarize(ac, &req->outFields) == REDISMODULE_ERR) {
340         QERR_MKBADARGS_FMT(status, "Bad arguments for SUMMARIZE");
341         return REDISMODULE_ERR;
342       }
343       req->reqflags |= QEXEC_F_SEND_HIGHLIGHT;
344 
345     } else if (AC_AdvanceIfMatch(ac, "HIGHLIGHT")) {
346       ensureSimpleMode(req);
347       if (ParseHighlight(ac, &req->outFields) == REDISMODULE_ERR) {
348         QERR_MKBADARGS_FMT(status, "Bad arguments for HIGHLIGHT");
349         return REDISMODULE_ERR;
350       }
351       req->reqflags |= QEXEC_F_SEND_HIGHLIGHT;
352 
353     } else if ((req->reqflags & QEXEC_F_IS_SEARCH) &&
354                ((rv = parseQueryLegacyArgs(ac, searchOpts, status)) != ARG_UNKNOWN)) {
355       if (rv == ARG_ERROR) {
356         return REDISMODULE_ERR;
357       }
358     } else {
359       int rv = handleCommonArgs(req, ac, status, 1);
360       if (rv == ARG_HANDLED) {
361         // nothing
362       } else if (rv == ARG_ERROR) {
363         return REDISMODULE_ERR;
364       } else {
365         break;
366       }
367     }
368   }
369 
370   if ((req->reqflags & QEXEC_F_SEND_SCOREEXPLAIN) && !(req->reqflags & QEXEC_F_SEND_SCORES)) {
371     QERR_MKBADARGS_FMT(status, "EXPLAINSCORE must be accompanied with WITHSCORES");
372     return REDISMODULE_ERR;
373   }
374 
375   searchOpts->inkeys = (const char **)inKeys.objs;
376   searchOpts->ninkeys = inKeys.argc;
377   searchOpts->legacy.infields = (const char **)inFields.objs;
378   searchOpts->legacy.ninfields = inFields.argc;
379   searchOpts->language = RSLanguage_Find(languageStr);
380 
381   if (AC_IsInitialized(&returnFields)) {
382     ensureSimpleMode(req);
383 
384     req->outFields.explicitReturn = 1;
385     if (returnFields.argc == 0) {
386       req->reqflags |= QEXEC_F_SEND_NOFIELDS;
387     }
388 
389     while (!AC_IsAtEnd(&returnFields)) {
390       const char *name = AC_GetStringNC(&returnFields, NULL);
391       ReturnedField *f = FieldList_GetCreateField(&req->outFields, name);
392       f->explicitReturn = 1;
393     }
394   }
395 
396   FieldList_RestrictReturn(&req->outFields);
397   return REDISMODULE_OK;
398 }
399 
getReducerAlias(PLN_GroupStep * g,const char * func,const ArgsCursor * args)400 static char *getReducerAlias(PLN_GroupStep *g, const char *func, const ArgsCursor *args) {
401 
402   sds out = sdsnew("__generated_alias");
403   out = sdscat(out, func);
404   // only put parentheses if we actually have args
405   char buf[255];
406   ArgsCursor tmp = *args;
407   while (!AC_IsAtEnd(&tmp)) {
408     size_t l;
409     const char *s = AC_GetStringNC(&tmp, &l);
410     while (*s == '@') {
411       // Don't allow the leading '@' to be included as an alias!
412       ++s;
413       --l;
414     }
415     out = sdscatlen(out, s, l);
416     if (!AC_IsAtEnd(&tmp)) {
417       out = sdscat(out, ",");
418     }
419   }
420 
421   // only put parentheses if we actually have args
422   sdstolower(out);
423 
424   // duplicate everything. yeah this is lame but this function is not in a tight loop
425   char *dup = rm_strndup(out, sdslen(out));
426   sdsfree(out);
427   return dup;
428 }
429 
groupStepFree(PLN_BaseStep * base)430 static void groupStepFree(PLN_BaseStep *base) {
431   PLN_GroupStep *g = (PLN_GroupStep *)base;
432   if (g->reducers) {
433     size_t nreducers = array_len(g->reducers);
434     for (size_t ii = 0; ii < nreducers; ++ii) {
435       PLN_Reducer *gr = g->reducers + ii;
436       rm_free(gr->alias);
437     }
438     array_free(g->reducers);
439   }
440 
441   RLookup_Cleanup(&g->lookup);
442   rm_free(base);
443 }
444 
groupStepGetLookup(PLN_BaseStep * bstp)445 static RLookup *groupStepGetLookup(PLN_BaseStep *bstp) {
446   return &((PLN_GroupStep *)bstp)->lookup;
447 }
448 
PLNGroupStep_AddReducer(PLN_GroupStep * gstp,const char * name,ArgsCursor * ac,QueryError * status)449 int PLNGroupStep_AddReducer(PLN_GroupStep *gstp, const char *name, ArgsCursor *ac,
450                             QueryError *status) {
451   // Just a list of functions..
452   PLN_Reducer *gr = array_ensure_tail(&gstp->reducers, PLN_Reducer);
453 
454   gr->name = name;
455   int rv = AC_GetVarArgs(ac, &gr->args);
456   if (rv != AC_OK) {
457     QERR_MKBADARGS_AC(status, name, rv);
458     goto error;
459   }
460 
461   const char *alias = NULL;
462   // See if there is an alias
463   if (AC_AdvanceIfMatch(ac, "AS")) {
464     rv = AC_GetString(ac, &alias, NULL, 0);
465     if (rv != AC_OK) {
466       QERR_MKBADARGS_AC(status, "AS", rv);
467       goto error;
468     }
469   }
470   if (alias == NULL) {
471     gr->alias = getReducerAlias(gstp, name, &gr->args);
472   } else {
473     gr->alias = rm_strdup(alias);
474   }
475   return REDISMODULE_OK;
476 
477 error:
478   array_pop(gstp->reducers);
479   return REDISMODULE_ERR;
480 }
481 
genericStepFree(PLN_BaseStep * p)482 static void genericStepFree(PLN_BaseStep *p) {
483   rm_free(p);
484 }
485 
PLNGroupStep_New(const char ** properties,size_t nproperties)486 PLN_GroupStep *PLNGroupStep_New(const char **properties, size_t nproperties) {
487   PLN_GroupStep *gstp = rm_calloc(1, sizeof(*gstp));
488   gstp->properties = properties;
489   gstp->nproperties = nproperties;
490   gstp->base.dtor = groupStepFree;
491   gstp->base.getLookup = groupStepGetLookup;
492   gstp->base.type = PLN_T_GROUP;
493   return gstp;
494 }
495 
parseGroupby(AREQ * req,ArgsCursor * ac,QueryError * status)496 static int parseGroupby(AREQ *req, ArgsCursor *ac, QueryError *status) {
497   ArgsCursor groupArgs = {0};
498   const char *s;
499   AC_GetString(ac, &s, NULL, AC_F_NOADVANCE);
500   int rv = AC_GetVarArgs(ac, &groupArgs);
501   if (rv != AC_OK) {
502     QERR_MKBADARGS_AC(status, "GROUPBY", rv);
503     return REDISMODULE_ERR;
504   }
505 
506   // Number of fields.. now let's see the reducers
507   PLN_GroupStep *gstp = PLNGroupStep_New((const char **)groupArgs.objs, groupArgs.argc);
508   AGPLN_AddStep(&req->ap, &gstp->base);
509 
510   while (AC_AdvanceIfMatch(ac, "REDUCE")) {
511     const char *name;
512     if (AC_GetString(ac, &name, NULL, 0) != AC_OK) {
513       QERR_MKBADARGS_AC(status, "REDUCE", rv);
514       return REDISMODULE_ERR;
515     }
516     if (PLNGroupStep_AddReducer(gstp, name, ac, status) != REDISMODULE_OK) {
517       goto error;
518     }
519   }
520   return REDISMODULE_OK;
521 
522 error:
523   return REDISMODULE_ERR;
524 }
525 
freeFilterStep(PLN_BaseStep * bstp)526 static void freeFilterStep(PLN_BaseStep *bstp) {
527   PLN_MapFilterStep *fstp = (PLN_MapFilterStep *)bstp;
528   if (fstp->parsedExpr) {
529     ExprAST_Free(fstp->parsedExpr);
530   }
531   if (fstp->shouldFreeRaw) {
532     rm_free((char *)fstp->rawExpr);
533   }
534   rm_free((void *)fstp->base.alias);
535   rm_free(bstp);
536 }
537 
PLNMapFilterStep_New(const char * expr,int mode)538 PLN_MapFilterStep *PLNMapFilterStep_New(const char *expr, int mode) {
539   PLN_MapFilterStep *stp = rm_calloc(1, sizeof(*stp));
540   stp->base.dtor = freeFilterStep;
541   stp->base.type = mode;
542   stp->rawExpr = expr;
543   return stp;
544 }
545 
handleApplyOrFilter(AREQ * req,ArgsCursor * ac,QueryError * status,int isApply)546 static int handleApplyOrFilter(AREQ *req, ArgsCursor *ac, QueryError *status, int isApply) {
547   // Parse filters!
548   const char *expr = NULL;
549   int rv = AC_GetString(ac, &expr, NULL, 0);
550   if (rv != AC_OK) {
551     QERR_MKBADARGS_AC(status, "APPLY/FILTER", rv);
552     return REDISMODULE_ERR;
553   }
554 
555   PLN_MapFilterStep *stp = PLNMapFilterStep_New(expr, isApply ? PLN_T_APPLY : PLN_T_FILTER);
556   AGPLN_AddStep(&req->ap, &stp->base);
557 
558   if (isApply) {
559     if (AC_AdvanceIfMatch(ac, "AS")) {
560       const char *alias;
561       if (AC_GetString(ac, &alias, NULL, 0) != AC_OK) {
562         QERR_MKBADARGS_FMT(status, "AS needs argument");
563         goto error;
564       }
565       stp->base.alias = rm_strdup(alias);
566     } else {
567       stp->base.alias = rm_strdup(expr);
568     }
569   }
570   return REDISMODULE_OK;
571 
572 error:
573   if (stp) {
574     AGPLN_PopStep(&req->ap, &stp->base);
575     stp->base.dtor(&stp->base);
576   }
577   return REDISMODULE_ERR;
578 }
579 
loadDtor(PLN_BaseStep * bstp)580 static void loadDtor(PLN_BaseStep *bstp) {
581   PLN_LoadStep *lstp = (PLN_LoadStep *)bstp;
582   rm_free(lstp->keys);
583   rm_free(lstp);
584 }
585 
handleLoad(AREQ * req,ArgsCursor * ac,QueryError * status)586 static int handleLoad(AREQ *req, ArgsCursor *ac, QueryError *status) {
587   ArgsCursor loadfields = {0};
588   int rc = AC_GetVarArgs(ac, &loadfields);
589   if (rc != AC_OK) {
590     QERR_MKBADARGS_AC(status, "LOAD", rc);
591     return REDISMODULE_ERR;
592   }
593   PLN_LoadStep *lstp = rm_calloc(1, sizeof(*lstp));
594   lstp->base.type = PLN_T_LOAD;
595   lstp->base.dtor = loadDtor;
596   lstp->args = loadfields;
597   lstp->keys = rm_calloc(loadfields.argc, sizeof(*lstp->keys));
598 
599   AGPLN_AddStep(&req->ap, &lstp->base);
600   return REDISMODULE_OK;
601 }
602 
AREQ_New(void)603 AREQ *AREQ_New(void) {
604   return rm_calloc(1, sizeof(AREQ));
605 }
606 
AREQ_Compile(AREQ * req,RedisModuleString ** argv,int argc,QueryError * status)607 int AREQ_Compile(AREQ *req, RedisModuleString **argv, int argc, QueryError *status) {
608   req->args = rm_malloc(sizeof(*req->args) * argc);
609   req->nargs = argc;
610   for (size_t ii = 0; ii < argc; ++ii) {
611     size_t n;
612     const char *s = RedisModule_StringPtrLen(argv[ii], &n);
613     req->args[ii] = sdsnewlen(s, n);
614   }
615 
616   // Parse the query and basic keywords first..
617   ArgsCursor ac = {0};
618   ArgsCursor_InitSDS(&ac, req->args, req->nargs);
619 
620   if (AC_IsAtEnd(&ac)) {
621     QueryError_SetError(status, QUERY_EPARSEARGS, "No query string provided");
622     return REDISMODULE_ERR;
623   }
624 
625   req->query = AC_GetStringNC(&ac, NULL);
626   AGPLN_Init(&req->ap);
627 
628   RSSearchOptions *searchOpts = &req->searchopts;
629   RSSearchOptions_Init(searchOpts);
630   if (parseQueryArgs(&ac, req, searchOpts, &req->ap, status) != REDISMODULE_OK) {
631     goto error;
632   }
633 
634   int hasLoad = 0;
635 
636   // Now we have a 'compiled' plan. Let's get some more options..
637 
638   while (!AC_IsAtEnd(&ac)) {
639     int rv = handleCommonArgs(req, &ac, status, req->reqflags & QEXEC_F_IS_SEARCH);
640     if (rv == ARG_HANDLED) {
641       continue;
642     } else if (rv == ARG_ERROR) {
643       goto error;
644     }
645 
646     if (AC_AdvanceIfMatch(&ac, "GROUPBY")) {
647       if (!ensureExtendedMode(req, "GROUPBY", status)) {
648         goto error;
649       }
650       if (parseGroupby(req, &ac, status) != REDISMODULE_OK) {
651         goto error;
652       }
653     } else if (AC_AdvanceIfMatch(&ac, "APPLY")) {
654       if (handleApplyOrFilter(req, &ac, status, 1) != REDISMODULE_OK) {
655         goto error;
656       }
657     } else if (AC_AdvanceIfMatch(&ac, "LOAD")) {
658       if (handleLoad(req, &ac, status) != REDISMODULE_OK) {
659         goto error;
660       }
661     } else if (AC_AdvanceIfMatch(&ac, "FILTER")) {
662       if (handleApplyOrFilter(req, &ac, status, 0) != REDISMODULE_OK) {
663         goto error;
664       }
665     } else {
666       QueryError_FmtUnknownArg(status, &ac, "<main>");
667       goto error;
668     }
669   }
670   return REDISMODULE_OK;
671 
672 error:
673   return REDISMODULE_ERR;
674 }
675 
applyGlobalFilters(RSSearchOptions * opts,QueryAST * ast,const RedisSearchCtx * sctx)676 static void applyGlobalFilters(RSSearchOptions *opts, QueryAST *ast, const RedisSearchCtx *sctx) {
677   /** The following blocks will set filter options on the entire query */
678   if (opts->legacy.filters) {
679     for (size_t ii = 0; ii < array_len(opts->legacy.filters); ++ii) {
680       QAST_GlobalFilterOptions legacyFilterOpts = {.numeric = opts->legacy.filters[ii]};
681       QAST_SetGlobalFilters(ast, &legacyFilterOpts);
682     }
683     array_clear(opts->legacy.filters);  // so AREQ_Free() doesn't free the filters themselves, which
684                                         // are now owned by the query object
685   }
686   if (opts->legacy.gf) {
687     QAST_GlobalFilterOptions legacyOpts = {.geo = opts->legacy.gf};
688     QAST_SetGlobalFilters(ast, &legacyOpts);
689   }
690 
691   if (opts->inkeys) {
692     opts->inids = rm_malloc(sizeof(*opts->inids) * opts->ninkeys);
693     for (size_t ii = 0; ii < opts->ninkeys; ++ii) {
694       t_docId did = DocTable_GetId(&sctx->spec->docs, opts->inkeys[ii], strlen(opts->inkeys[ii]));
695       if (did) {
696         opts->inids[opts->nids++] = did;
697       }
698     }
699     QAST_GlobalFilterOptions filterOpts = {.ids = opts->inids, .nids = opts->nids};
700     QAST_SetGlobalFilters(ast, &filterOpts);
701   }
702 }
703 
AREQ_ApplyContext(AREQ * req,RedisSearchCtx * sctx,QueryError * status)704 int AREQ_ApplyContext(AREQ *req, RedisSearchCtx *sctx, QueryError *status) {
705   // Sort through the applicable options:
706   IndexSpec *index = sctx->spec;
707   RSSearchOptions *opts = &req->searchopts;
708   req->sctx = sctx;
709 
710   if ((index->flags & Index_StoreByteOffsets) == 0 && (req->reqflags & QEXEC_F_SEND_HIGHLIGHT)) {
711     QueryError_SetError(
712         status, QUERY_EINVAL,
713         "Cannot use highlight/summarize because NOOFSETS was specified at index level");
714     return REDISMODULE_ERR;
715   }
716 
717   // Go through the query options and see what else needs to be filled in!
718   // 1) INFIELDS
719   if (opts->legacy.ninfields) {
720     opts->fieldmask = 0;
721     for (size_t ii = 0; ii < opts->legacy.ninfields; ++ii) {
722       const char *s = opts->legacy.infields[ii];
723       t_fieldMask bit = IndexSpec_GetFieldBit(index, s, strlen(s));
724       opts->fieldmask |= bit;
725     }
726   }
727 
728   if (opts->language == RS_LANG_UNSUPPORTED) {
729     QueryError_SetError(status, QUERY_EINVAL, "No such language");
730     return REDISMODULE_ERR;
731   }
732   if (opts->scorerName && Extensions_GetScoringFunction(NULL, opts->scorerName) == NULL) {
733     QueryError_SetErrorFmt(status, QUERY_EINVAL, "No such scorer %s", opts->scorerName);
734     return REDISMODULE_ERR;
735   }
736   if (!(opts->flags & Search_NoStopwrods)) {
737     opts->stopwords = sctx->spec->stopwords;
738     StopWordList_Ref(sctx->spec->stopwords);
739   }
740 
741   QueryAST *ast = &req->ast;
742 
743   int rv = QAST_Parse(ast, sctx, &req->searchopts, req->query, strlen(req->query), status);
744   if (rv != REDISMODULE_OK) {
745     return REDISMODULE_ERR;
746   }
747 
748   applyGlobalFilters(opts, ast, sctx);
749 
750   if (!(opts->flags & Search_Verbatim)) {
751     if (QAST_Expand(ast, opts->expanderName, opts, sctx, status) != REDISMODULE_OK) {
752       return REDISMODULE_ERR;
753     }
754   }
755 
756   ConcurrentSearchCtx_Init(sctx->redisCtx, &req->conc);
757   req->rootiter = QAST_Iterate(ast, opts, sctx, &req->conc);
758   assert(req->rootiter);
759 
760   return REDISMODULE_OK;
761 }
762 
buildGroupRP(PLN_GroupStep * gstp,RLookup * srclookup,QueryError * err)763 static ResultProcessor *buildGroupRP(PLN_GroupStep *gstp, RLookup *srclookup, QueryError *err) {
764   const RLookupKey *srckeys[gstp->nproperties], *dstkeys[gstp->nproperties];
765   for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
766     const char *fldname = gstp->properties[ii] + 1;  // account for the @-
767     srckeys[ii] = RLookup_GetKey(srclookup, fldname, RLOOKUP_F_NOINCREF);
768     if (!srckeys[ii]) {
769       QueryError_SetErrorFmt(err, QUERY_ENOPROPKEY, "No such property `%s`", fldname);
770       return NULL;
771     }
772     dstkeys[ii] = RLookup_GetKey(&gstp->lookup, fldname, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
773   }
774 
775   Grouper *grp = Grouper_New(srckeys, dstkeys, gstp->nproperties);
776 
777   size_t nreducers = array_len(gstp->reducers);
778   for (size_t ii = 0; ii < nreducers; ++ii) {
779     // Build the actual reducer
780     PLN_Reducer *pr = gstp->reducers + ii;
781     ReducerOptions options = REDUCEROPTS_INIT(pr->name, &pr->args, srclookup, err);
782     ReducerFactory ff = RDCR_GetFactory(pr->name);
783     if (!ff) {
784       // No such reducer!
785       Grouper_Free(grp);
786       QueryError_SetErrorFmt(err, QUERY_ENOREDUCER, "No such reducer: %s", pr->name);
787       return NULL;
788     }
789     Reducer *rr = ff(&options);
790     if (!rr) {
791       Grouper_Free(grp);
792       return NULL;
793     }
794 
795     // Set the destination key for the grouper!
796     RLookupKey *dstkey =
797         RLookup_GetKey(&gstp->lookup, pr->alias, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
798     Grouper_AddReducer(grp, rr, dstkey);
799   }
800 
801   return Grouper_GetRP(grp);
802 }
803 
804 /** Pushes a processor up the stack. Returns the newly pushed processor
805  * @param req the request
806  * @param rp the processor to push
807  * @param rpUpstream previous processor (used as source for rp)
808  * @return the processor passed in `rp`.
809  */
pushRP(AREQ * req,ResultProcessor * rp,ResultProcessor * rpUpstream)810 static ResultProcessor *pushRP(AREQ *req, ResultProcessor *rp, ResultProcessor *rpUpstream) {
811   rp->upstream = rpUpstream;
812   rp->parent = &req->qiter;
813   req->qiter.endProc = rp;
814   return rp;
815 }
816 
getGroupRP(AREQ * req,PLN_GroupStep * gstp,ResultProcessor * rpUpstream,QueryError * status)817 static ResultProcessor *getGroupRP(AREQ *req, PLN_GroupStep *gstp, ResultProcessor *rpUpstream,
818                                    QueryError *status) {
819   AGGPlan *pln = &req->ap;
820   RLookup *lookup = AGPLN_GetLookup(pln, &gstp->base, AGPLN_GETLOOKUP_PREV);
821   ResultProcessor *groupRP = buildGroupRP(gstp, lookup, status);
822 
823   if (!groupRP) {
824     return NULL;
825   }
826 
827   // See if we need a LOADER group here...?
828   RLookup *firstLk = AGPLN_GetLookup(pln, &gstp->base, AGPLN_GETLOOKUP_FIRST);
829 
830   if (firstLk == lookup) {
831     // See if we need a loader step?
832     const RLookupKey **kklist = NULL;
833     for (RLookupKey *kk = firstLk->head; kk; kk = kk->next) {
834       if ((kk->flags & RLOOKUP_F_DOCSRC) && (!(kk->flags & RLOOKUP_F_SVSRC))) {
835         *array_ensure_tail(&kklist, const RLookupKey *) = kk;
836       }
837     }
838     if (kklist != NULL) {
839       ResultProcessor *rpLoader = RPLoader_New(firstLk, kklist, array_len(kklist));
840       array_free(kklist);
841       assert(rpLoader);
842       rpUpstream = pushRP(req, rpLoader, rpUpstream);
843     }
844   }
845 
846   return pushRP(req, groupRP, rpUpstream);
847 }
848 
849 #define DEFAULT_LIMIT 10
850 
getArrangeRP(AREQ * req,AGGPlan * pln,const PLN_BaseStep * stp,QueryError * status,ResultProcessor * up)851 static ResultProcessor *getArrangeRP(AREQ *req, AGGPlan *pln, const PLN_BaseStep *stp,
852                                      QueryError *status, ResultProcessor *up) {
853   ResultProcessor *rp = NULL;
854   PLN_ArrangeStep astp_s = {.base = {.type = PLN_T_ARRANGE}};
855   PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
856 
857   if (!astp) {
858     astp = &astp_s;
859   }
860 
861   size_t limit = astp->offset + astp->limit;
862   if (!limit) {
863     limit = DEFAULT_LIMIT;
864   }
865 
866   if (astp->sortKeys) {
867     size_t nkeys = array_len(astp->sortKeys);
868     astp->sortkeysLK = rm_malloc(sizeof(*astp->sortKeys) * nkeys);
869 
870     const RLookupKey **sortkeys = astp->sortkeysLK;
871 
872     RLookup *lk = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
873 
874     for (size_t ii = 0; ii < nkeys; ++ii) {
875       sortkeys[ii] = RLookup_GetKey(lk, astp->sortKeys[ii], RLOOKUP_F_NOINCREF);
876       if (!sortkeys[ii]) {
877         QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "Property `%s` not loaded nor in schema",
878                                astp->sortKeys[ii]);
879         return NULL;
880       }
881     }
882 
883     rp = RPSorter_NewByFields(limit, sortkeys, nkeys, astp->sortAscMap);
884     up = pushRP(req, rp, up);
885   }
886 
887   // No sort? then it must be sort by score, which is the default.
888   if (rp == NULL && (req->reqflags & QEXEC_F_IS_SEARCH)) {
889     rp = RPSorter_NewByScore(limit);
890     up = pushRP(req, rp, up);
891   }
892 
893   if (astp->offset || (astp->limit && !rp)) {
894     rp = RPPager_New(astp->offset, astp->limit);
895     up = pushRP(req, rp, up);
896   }
897 
898   return rp;
899 }
900 
getScorerRP(AREQ * req)901 static ResultProcessor *getScorerRP(AREQ *req) {
902   const char *scorer = req->searchopts.scorerName;
903   if (!scorer) {
904     scorer = DEFAULT_SCORER_NAME;
905   }
906   ScoringFunctionArgs scargs = {0};
907   if (req->reqflags & QEXEC_F_SEND_SCOREEXPLAIN) {
908     scargs.scrExp = rm_calloc(1, sizeof(RSScoreExplain));
909   }
910   ExtScoringFunctionCtx *fns = Extensions_GetScoringFunction(&scargs, scorer);
911   assert(fns);
912   IndexSpec_GetStats(req->sctx->spec, &scargs.indexStats);
913   scargs.qdata = req->ast.udata;
914   scargs.qdatalen = req->ast.udatalen;
915   ResultProcessor *rp = RPScorer_New(fns, &scargs);
916   return rp;
917 }
918 
hasQuerySortby(const AGGPlan * pln)919 static int hasQuerySortby(const AGGPlan *pln) {
920   const PLN_BaseStep *bstp = AGPLN_FindStep(pln, NULL, NULL, PLN_T_GROUP);
921   if (bstp != NULL) {
922     const PLN_ArrangeStep *arng = (PLN_ArrangeStep *)AGPLN_FindStep(pln, NULL, bstp, PLN_T_ARRANGE);
923     if (arng && arng->sortKeys) {
924       return 1;
925     }
926   } else {
927     // no group... just see if we have an arrange step
928     const PLN_ArrangeStep *arng = (PLN_ArrangeStep *)AGPLN_FindStep(pln, NULL, NULL, PLN_T_ARRANGE);
929     return arng && arng->sortKeys;
930   }
931   return 0;
932 }
933 
934 #define PUSH_RP()                           \
935   rpUpstream = pushRP(req, rp, rpUpstream); \
936   rp = NULL;
937 
938 /**
939  * Builds the implicit pipeline for querying and scoring, and ensures that our
940  * subsequent execution stages actually have data to operate on.
941  */
buildImplicitPipeline(AREQ * req,QueryError * Status)942 static void buildImplicitPipeline(AREQ *req, QueryError *Status) {
943   RedisSearchCtx *sctx = req->sctx;
944   req->qiter.conc = &req->conc;
945   req->qiter.sctx = sctx;
946   req->qiter.err = Status;
947 
948   IndexSpecCache *cache = IndexSpec_GetSpecCache(req->sctx->spec);
949   assert(cache);
950   RLookup *first = AGPLN_GetLookup(&req->ap, NULL, AGPLN_GETLOOKUP_FIRST);
951 
952   RLookup_Init(first, cache);
953 
954   ResultProcessor *rp = RPIndexIterator_New(req->rootiter);
955   ResultProcessor *rpUpstream = NULL;
956   req->qiter.rootProc = req->qiter.endProc = rp;
957   PUSH_RP();
958 
959   /** Create a scorer if there is no subsequent sorter within this grouping */
960   if (!hasQuerySortby(&req->ap) && (req->reqflags & QEXEC_F_IS_SEARCH)) {
961     rp = getScorerRP(req);
962     PUSH_RP();
963   }
964 }
965 
966 /**
967  * This handles the RETURN and SUMMARIZE keywords, which operate on the result
968  * which is about to be returned. It is only used in FT.SEARCH mode
969  */
buildOutputPipeline(AREQ * req,QueryError * status)970 int buildOutputPipeline(AREQ *req, QueryError *status) {
971   AGGPlan *pln = &req->ap;
972   ResultProcessor *rp = NULL, *rpUpstream = req->qiter.endProc;
973 
974   RLookup *lookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_LAST);
975   // Add a LOAD step...
976   const RLookupKey **loadkeys = NULL;
977   if (req->outFields.explicitReturn) {
978     // Go through all the fields and ensure that each one exists in the lookup stage
979     for (size_t ii = 0; ii < req->outFields.numFields; ++ii) {
980       const ReturnedField *rf = req->outFields.fields + ii;
981       RLookupKey *lk = RLookup_GetKey(lookup, rf->name, RLOOKUP_F_NOINCREF | RLOOKUP_F_OCREAT);
982       if (!lk) {
983         // TODO: this is a dead code
984         QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "Property '%s' not loaded or in schema",
985                                rf->name);
986         goto error;
987       }
988       *array_ensure_tail(&loadkeys, const RLookupKey *) = lk;
989       // assign explicit output flag
990       lk->flags |= RLOOKUP_F_EXPLICITRETURN;
991     }
992   }
993   rp = RPLoader_New(lookup, loadkeys, loadkeys ? array_len(loadkeys) : 0);
994   if (loadkeys) {
995     array_free(loadkeys);
996   }
997   PUSH_RP();
998 
999   if (req->reqflags & QEXEC_F_SEND_HIGHLIGHT) {
1000     RLookup *lookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_LAST);
1001     for (size_t ii = 0; ii < req->outFields.numFields; ++ii) {
1002       ReturnedField *ff = req->outFields.fields + ii;
1003       RLookupKey *kk = RLookup_GetKey(lookup, ff->name, 0);
1004       if (!kk) {
1005         QueryError_SetErrorFmt(status, QUERY_ENOPROPKEY, "No such property `%s`", ff->name);
1006         goto error;
1007       } else if (!(kk->flags & (RLOOKUP_F_DOCSRC | RLOOKUP_F_SVSRC))) {
1008         // TODO: this is a dead code
1009         QueryError_SetErrorFmt(status, QUERY_EINVAL, "Property `%s` is not in document", ff->name);
1010         goto error;
1011       }
1012       ff->lookupKey = kk;
1013     }
1014     rp = RPHighlighter_New(&req->searchopts, &req->outFields, lookup);
1015     PUSH_RP();
1016   }
1017 
1018   return REDISMODULE_OK;
1019 error:
1020   return REDISMODULE_ERR;
1021 }
1022 
AREQ_BuildPipeline(AREQ * req,int options,QueryError * status)1023 int AREQ_BuildPipeline(AREQ *req, int options, QueryError *status) {
1024   if (!(options & AREQ_BUILDPIPELINE_NO_ROOT)) {
1025     buildImplicitPipeline(req, status);
1026   }
1027 
1028   AGGPlan *pln = &req->ap;
1029   ResultProcessor *rp = NULL, *rpUpstream = req->qiter.endProc;
1030 
1031   // Whether we've applied a SORTBY yet..
1032   int hasArrange = 0;
1033 
1034   for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
1035     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
1036 
1037     switch (stp->type) {
1038       case PLN_T_GROUP: {
1039         rpUpstream = getGroupRP(req, (PLN_GroupStep *)stp, rpUpstream, status);
1040         if (!rpUpstream) {
1041           goto error;
1042         }
1043         break;
1044       }
1045 
1046       case PLN_T_ARRANGE: {
1047         rp = getArrangeRP(req, pln, stp, status, rpUpstream);
1048         if (!rp) {
1049           goto error;
1050         }
1051         hasArrange = 1;
1052         rpUpstream = rp;
1053         break;
1054       }
1055 
1056       case PLN_T_APPLY:
1057       case PLN_T_FILTER: {
1058         PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
1059         // Ensure the lookups can actually find what they need
1060         RLookup *curLookup = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
1061         mstp->parsedExpr = ExprAST_Parse(mstp->rawExpr, strlen(mstp->rawExpr), status);
1062         if (!mstp->parsedExpr) {
1063           goto error;
1064         }
1065 
1066         if (!ExprAST_GetLookupKeys(mstp->parsedExpr, curLookup, status)) {
1067           goto error;
1068         }
1069 
1070         if (stp->type == PLN_T_APPLY) {
1071           RLookupKey *dstkey =
1072               RLookup_GetKey(curLookup, stp->alias, RLOOKUP_F_OCREAT | RLOOKUP_F_NOINCREF);
1073           rp = RPEvaluator_NewProjector(mstp->parsedExpr, curLookup, dstkey);
1074         } else {
1075           rp = RPEvaluator_NewFilter(mstp->parsedExpr, curLookup);
1076         }
1077         PUSH_RP();
1078         break;
1079       }
1080 
1081       case PLN_T_LOAD: {
1082         PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
1083         RLookup *curLookup = AGPLN_GetLookup(pln, stp, AGPLN_GETLOOKUP_PREV);
1084         RLookup *rootLookup = AGPLN_GetLookup(pln, NULL, AGPLN_GETLOOKUP_FIRST);
1085         if (curLookup != rootLookup) {
1086           QueryError_SetError(status, QUERY_EINVAL,
1087                               "LOAD cannot be applied after projectors or reducers");
1088           goto error;
1089         }
1090         // Get all the keys for this lookup...
1091         while (!AC_IsAtEnd(&lstp->args)) {
1092           const char *s = AC_GetStringNC(&lstp->args, NULL);
1093           if (*s == '@') {
1094             s++;
1095           }
1096           const RLookupKey *kk = RLookup_GetKey(curLookup, s, RLOOKUP_F_OEXCL | RLOOKUP_F_OCREAT);
1097           if (!kk) {
1098             // We only get a NULL return if the key already exists, which means
1099             // that we don't need to retrieve it again.
1100             continue;
1101           }
1102           lstp->keys[lstp->nkeys++] = kk;
1103         }
1104         if (lstp->nkeys) {
1105           rp = RPLoader_New(curLookup, lstp->keys, lstp->nkeys);
1106           PUSH_RP();
1107         }
1108         break;
1109       }
1110       case PLN_T_ROOT:
1111         // Placeholder step for initial lookup
1112         break;
1113       case PLN_T_DISTRIBUTE:
1114         // This is the root already
1115         break;
1116 
1117       case PLN_T_INVALID:
1118       case PLN_T__MAX:
1119         // not handled yet
1120         abort();
1121     }
1122   }
1123 
1124   // If no LIMIT or SORT has been applied, do it somewhere here so we don't
1125   // return the entire matching result set!
1126   if (!hasArrange && (req->reqflags & QEXEC_F_IS_SEARCH)) {
1127     rp = getArrangeRP(req, pln, NULL, status, rpUpstream);
1128     if (!rp) {
1129       goto error;
1130     }
1131     rpUpstream = rp;
1132   }
1133 
1134   // If this is an FT.SEARCH command which requires returning of some of the
1135   // document fields, handle those options in this function
1136   if ((req->reqflags & QEXEC_F_IS_SEARCH) && !(req->reqflags & QEXEC_F_SEND_NOFIELDS)) {
1137     if (buildOutputPipeline(req, status) != REDISMODULE_OK) {
1138       goto error;
1139     }
1140   }
1141 
1142   return REDISMODULE_OK;
1143 error:
1144   return REDISMODULE_ERR;
1145 }
1146 
AREQ_Free(AREQ * req)1147 void AREQ_Free(AREQ *req) {
1148   // First, free the result processors
1149   ResultProcessor *rp = req->qiter.endProc;
1150   while (rp) {
1151     ResultProcessor *next = rp->upstream;
1152     rp->Free(rp);
1153     rp = next;
1154   }
1155   if (req->rootiter) {
1156     req->rootiter->Free(req->rootiter);
1157     req->rootiter = NULL;
1158   }
1159 
1160   // Go through each of the steps and free it..
1161   AGPLN_FreeSteps(&req->ap);
1162 
1163   QAST_Destroy(&req->ast);
1164 
1165   if (req->searchopts.stopwords) {
1166     StopWordList_Unref((StopWordList *)req->searchopts.stopwords);
1167   }
1168 
1169   ConcurrentSearchCtx_Free(&req->conc);
1170 
1171   // Finally, free the context. If we are a cursor, some more
1172   // cleanup is required since we also now own the
1173   // detached ("Thread Safe") context.
1174   RedisModuleCtx *thctx = NULL;
1175   if (req->sctx) {
1176     if (req->reqflags & QEXEC_F_IS_CURSOR) {
1177       thctx = req->sctx->redisCtx;
1178       req->sctx->redisCtx = NULL;
1179     }
1180     SearchCtx_Decref(req->sctx);
1181   }
1182   for (size_t ii = 0; ii < req->nargs; ++ii) {
1183     sdsfree(req->args[ii]);
1184   }
1185   if (req->searchopts.legacy.filters) {
1186     for (size_t ii = 0; ii < array_len(req->searchopts.legacy.filters); ++ii) {
1187       NumericFilter *nf = req->searchopts.legacy.filters[ii];
1188       if (nf) {
1189         NumericFilter_Free(req->searchopts.legacy.filters[ii]);
1190       }
1191     }
1192     array_free(req->searchopts.legacy.filters);
1193   }
1194   rm_free(req->searchopts.inids);
1195   FieldList_Free(&req->outFields);
1196   if (thctx) {
1197     RedisModule_FreeThreadSafeContext(thctx);
1198   }
1199   rm_free(req->args);
1200   rm_free(req);
1201 }
1202