1 #include <string.h>
2 #include <inttypes.h>
3 
4 #include "document.h"
5 #include "forward_index.h"
6 #include "numeric_filter.h"
7 #include "numeric_index.h"
8 #include "rmutil/strings.h"
9 #include "rmutil/util.h"
10 #include "util/mempool.h"
11 #include "spec.h"
12 #include "tokenize.h"
13 #include "util/logging.h"
14 #include "rmalloc.h"
15 #include "indexer.h"
16 #include "tag_index.h"
17 #include "aggregate/expr/expression.h"
18 #include "rmutil/rm_assert.h"
19 
20 // Memory pool for RSAddDocumentContext contexts
21 static mempool_t *actxPool_g = NULL;
22 extern RedisModuleCtx *RSDummyContext;
23 // For documentation, see these functions' definitions
allocDocumentContext(void)24 static void *allocDocumentContext(void) {
25   // See if there's one in the pool?
26   RSAddDocumentCtx *aCtx = rm_calloc(1, sizeof(*aCtx));
27   return aCtx;
28 }
29 
freeDocumentContext(void * p)30 static void freeDocumentContext(void *p) {
31   RSAddDocumentCtx *aCtx = p;
32   if (aCtx->fwIdx) {
33     ForwardIndexFree(aCtx->fwIdx);
34   }
35 
36   rm_free(aCtx->fspecs);
37   rm_free(aCtx->fdatas);
38   rm_free(aCtx->specName);
39   rm_free(aCtx);
40 }
41 
42 #define DUP_FIELD_ERRSTR "Requested to index field twice"
43 
44 #define FIELD_IS_VALID(aCtx, ix) ((aCtx)->fspecs[ix].name != NULL)
45 
AddDocumentCtx_SetDocument(RSAddDocumentCtx * aCtx,IndexSpec * sp)46 static int AddDocumentCtx_SetDocument(RSAddDocumentCtx *aCtx, IndexSpec *sp) {
47   Document *doc = aCtx->doc;
48   aCtx->stateFlags &= ~ACTX_F_INDEXABLES;
49   aCtx->stateFlags &= ~ACTX_F_TEXTINDEXED;
50   aCtx->stateFlags &= ~ACTX_F_OTHERINDEXED;
51 
52   aCtx->fspecs = rm_realloc(aCtx->fspecs, sizeof(*aCtx->fspecs) * doc->numFields);
53   aCtx->fdatas = rm_realloc(aCtx->fdatas, sizeof(*aCtx->fdatas) * doc->numFields);
54 
55   for (size_t ii = 0; ii < doc->numFields; ++ii) {
56     // zero out field data. We check at the destructor to see if there is any
57     // left-over tag data here; if we've realloc'd, then this contains
58     // garbage
59     aCtx->fdatas[ii].tags = NULL;
60   }
61 
62   size_t numTextIndexable = 0;
63 
64   // size: uint16_t * SPEC_MAX_FIELDS
65   FieldSpecDedupeArray dedupe = {0};
66   int hasTextFields = 0;
67   int hasOtherFields = 0;
68 
69   for (size_t i = 0; i < doc->numFields; i++) {
70     DocumentField *f = doc->fields + i;
71     const FieldSpec *fs = IndexSpec_GetField(sp, f->name, strlen(f->name));
72     if (!fs || (isSpecHash(sp) && !f->text)) {
73       aCtx->fspecs[i].name = NULL;
74       aCtx->fspecs[i].path = NULL;
75       aCtx->fspecs[i].types = 0;
76       continue;
77     }
78 
79     aCtx->fspecs[i] = *fs;
80     if (dedupe[fs->index]) {
81       QueryError_SetErrorFmt(&aCtx->status, QUERY_EDUPFIELD, "Tried to insert `%s` twice",
82                              fs->name);
83       return -1;
84     }
85 
86     dedupe[fs->index] = 1;
87 
88     if (FieldSpec_IsSortable(fs)) {
89       // mark sortable fields to be updated in the state flags
90       aCtx->stateFlags |= ACTX_F_SORTABLES;
91     }
92 
93     // See what we want the given field indexed as:
94     if (!f->indexAs) {
95       f->indexAs = fs->types;
96     } else {
97       // Verify the flags:
98       if ((f->indexAs & fs->types) != f->indexAs) {
99         QueryError_SetErrorFmt(&aCtx->status, QUERY_EUNSUPPTYPE,
100                                "Tried to index field %s as type not specified in schema", fs->name);
101         return -1;
102       }
103     }
104 
105     if (FieldSpec_IsIndexable(fs)) {
106       if (f->indexAs & INDEXFLD_T_FULLTEXT) {
107         numTextIndexable++;
108         hasTextFields = 1;
109       }
110 
111       if (f->indexAs != INDEXFLD_T_FULLTEXT) {
112         // has non-text but indexable fields
113         hasOtherFields = 1;
114       }
115     }
116   }
117 
118   if (hasTextFields || hasOtherFields) {
119     aCtx->stateFlags |= ACTX_F_INDEXABLES;
120   } else {
121     aCtx->stateFlags &= ~ACTX_F_INDEXABLES;
122   }
123 
124   if (!hasTextFields) {
125     aCtx->stateFlags |= ACTX_F_TEXTINDEXED;
126   } else {
127     aCtx->stateFlags &= ~ACTX_F_TEXTINDEXED;
128   }
129 
130   if (!hasOtherFields) {
131     aCtx->stateFlags |= ACTX_F_OTHERINDEXED;
132   } else {
133     aCtx->stateFlags &= ~ACTX_F_OTHERINDEXED;
134   }
135 
136   if ((aCtx->stateFlags & ACTX_F_SORTABLES) && aCtx->sv == NULL) {
137     aCtx->sv = NewSortingVector(sp->sortables->len);
138   }
139 
140   int empty = (aCtx->sv == NULL) && !hasTextFields && !hasOtherFields;
141   if (empty) {
142     aCtx->stateFlags |= ACTX_F_EMPTY;
143   }
144 
145   if ((aCtx->options & DOCUMENT_ADD_NOSAVE) == 0 && numTextIndexable &&
146       (sp->flags & Index_StoreByteOffsets)) {
147     if (!aCtx->byteOffsets) {
148       aCtx->byteOffsets = NewByteOffsets();
149       ByteOffsetWriter_Init(&aCtx->offsetsWriter);
150     }
151     RSByteOffsets_ReserveFields(aCtx->byteOffsets, numTextIndexable);
152   }
153   return 0;
154 }
155 
NewAddDocumentCtx(IndexSpec * sp,Document * doc,QueryError * status)156 RSAddDocumentCtx *NewAddDocumentCtx(IndexSpec *sp, Document *doc, QueryError *status) {
157 
158   if (!actxPool_g) {
159     mempool_options mopts = {.initialCap = 16,
160                              .alloc = allocDocumentContext,
161                              .free = freeDocumentContext,
162                              .isGlobal = 1};
163     actxPool_g = mempool_new(&mopts);
164   }
165 
166   // Get a new context
167   RSAddDocumentCtx *aCtx = mempool_get(actxPool_g);
168   aCtx->stateFlags = 0;
169   QueryError_ClearError(&aCtx->status);
170   aCtx->totalTokens = 0;
171   aCtx->docFlags = 0;
172   aCtx->client.bc = NULL;
173   aCtx->next = NULL;
174   aCtx->specFlags = sp->flags;
175   aCtx->indexer = sp->indexer;
176   aCtx->spec = sp;
177   if (aCtx->specFlags & Index_Async) {
178     size_t len = strlen(sp->name) + 1;
179     if (aCtx->specName == NULL) {
180       aCtx->specName = rm_malloc(len);
181     } else if (len > aCtx->specNameLen) {
182       aCtx->specName = rm_realloc(aCtx->specName, len);
183       aCtx->specNameLen = len;
184     }
185     strncpy(aCtx->specName, sp->name, len);
186     aCtx->specId = sp->uniqueId;
187   }
188   RS_LOG_ASSERT(sp->indexer, "No indexer");
189   Indexer_Incref(aCtx->indexer);
190 
191   // Assign the document:
192   aCtx->doc = doc;
193   if (AddDocumentCtx_SetDocument(aCtx, sp) != 0) {
194     *status = aCtx->status;
195     aCtx->status.detail = NULL;
196     mempool_release(actxPool_g, aCtx);
197     return NULL;
198   }
199 
200   // try to reuse the forward index on recycled contexts
201   if (aCtx->fwIdx) {
202     ForwardIndex_Reset(aCtx->fwIdx, aCtx->doc, sp->flags);
203   } else {
204     aCtx->fwIdx = NewForwardIndex(aCtx->doc, sp->flags);
205   }
206 
207   if (sp->smap) {
208     // we get a read only copy of the synonym map for accessing in the index thread with out worring
209     // about thready safe issues
210     aCtx->fwIdx->smap = SynonymMap_GetReadOnlyCopy(sp->smap);
211   } else {
212     aCtx->fwIdx->smap = NULL;
213   }
214 
215   aCtx->tokenizer = GetTokenizer(doc->language, aCtx->fwIdx->stemmer, sp->stopwords);
216 //  aCtx->doc->docId = 0;
217   return aCtx;
218 }
219 
doReplyFinish(RSAddDocumentCtx * aCtx,RedisModuleCtx * ctx)220 static void doReplyFinish(RSAddDocumentCtx *aCtx, RedisModuleCtx *ctx) {
221   if (aCtx->donecb) {
222     aCtx->donecb(aCtx, ctx, aCtx->donecbData);
223   }
224   Indexer_Decref(aCtx->indexer);
225   AddDocumentCtx_Free(aCtx);
226 }
227 
replyCallback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)228 static int replyCallback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
229   RSAddDocumentCtx *aCtx = RedisModule_GetBlockedClientPrivateData(ctx);
230   doReplyFinish(aCtx, ctx);
231   return REDISMODULE_OK;
232 }
233 
threadCallback(void * p)234 static void threadCallback(void *p) {
235   Document_AddToIndexes(p);
236 }
237 
AddDocumentCtx_Finish(RSAddDocumentCtx * aCtx)238 void AddDocumentCtx_Finish(RSAddDocumentCtx *aCtx) {
239   if (aCtx->stateFlags & ACTX_F_NOBLOCK) {
240     doReplyFinish(aCtx, aCtx->client.sctx->redisCtx);
241   } else {
242     RedisModule_UnblockClient(aCtx->client.bc, aCtx);
243   }
244 }
245 
246 // How many bytes in a document to warrant it being tokenized in a separate thread
247 #define SELF_EXEC_THRESHOLD 1024
248 
249 // LCOV_EXCL_START debug
Document_Dump(const Document * doc)250 void Document_Dump(const Document *doc) {
251   printf("Document Key: %s. ID=%" PRIu64 "\n", RedisModule_StringPtrLen(doc->docKey, NULL),
252          doc->docId);
253   for (size_t ii = 0; ii < doc->numFields; ++ii) {
254     printf("  [%lu]: %s => %s\n", ii, doc->fields[ii].name,
255            RedisModule_StringPtrLen(doc->fields[ii].text, NULL));
256   }
257 }
258 // LCOV_EXCL_STOP
259 
260 static void AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx);
261 int Document_LoadSchemaFieldJson(Document *doc, RedisSearchCtx *sctx);
262 
AddDocumentCtx_ReplaceMerge(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)263 static int AddDocumentCtx_ReplaceMerge(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
264   /**
265    * The REPLACE operation contains fields which must be reindexed. This means
266    * that a new document ID needs to be assigned, and as a consequence, all
267    * fields must be reindexed.
268    */
269   int rv = REDISMODULE_ERR;
270 
271   Document_Clear(aCtx->doc);
272 
273   // Path is not covered and is not relevant
274 
275   DocumentType ruleType = sctx->spec->rule->type;
276   if (ruleType == DocumentType_Hash) {
277     rv = Document_LoadSchemaFieldHash(aCtx->doc, sctx);
278   } else if (ruleType == DocumentType_Json) {
279     rv = Document_LoadSchemaFieldJson(aCtx->doc, sctx);
280   }
281   if (rv != REDISMODULE_OK) {
282     QueryError_SetError(&aCtx->status, QUERY_ENODOC, "Could not load existing document");
283     aCtx->donecb(aCtx, sctx->redisCtx, aCtx->donecbData);
284     AddDocumentCtx_Free(aCtx);
285     return 1;
286   }
287 
288   // Keep hold of the new fields.
289   Document_MakeStringsOwner(aCtx->doc);
290   AddDocumentCtx_SetDocument(aCtx, sctx->spec);
291   return 0;
292 }
293 
handlePartialUpdate(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)294 static int handlePartialUpdate(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
295   // Handle partial update of fields
296   if (aCtx->stateFlags & ACTX_F_INDEXABLES) {
297     return AddDocumentCtx_ReplaceMerge(aCtx, sctx);
298   } else {
299     // No indexable fields are updated, we can just update the metadata.
300     // Quick update just updates the score, payload and sortable fields of the document.
301     // Thus full-reindexing of the document is not required
302     AddDocumentCtx_UpdateNoIndex(aCtx, sctx);
303     return 1;
304   }
305 }
306 
AddDocumentCtx_Submit(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx,uint32_t options)307 void AddDocumentCtx_Submit(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx, uint32_t options) {
308   aCtx->options = options;
309   if ((aCtx->options & DOCUMENT_ADD_PARTIAL) && handlePartialUpdate(aCtx, sctx)) {
310     return;
311   }
312 
313   // We actually modify (!) the strings in the document, so we always require
314   // ownership
315   Document_MakeStringsOwner(aCtx->doc);
316 
317   if (AddDocumentCtx_IsBlockable(aCtx)) {
318     aCtx->client.bc = RedisModule_BlockClient(sctx->redisCtx, replyCallback, NULL, NULL, 0);
319   } else {
320     aCtx->client.sctx = sctx;
321   }
322 
323   RS_LOG_ASSERT(aCtx->client.bc, "No blocked client");
324   size_t totalSize = 0;
325   for (size_t ii = 0; ii < aCtx->doc->numFields; ++ii) {
326     const DocumentField *ff = aCtx->doc->fields + ii;
327     if ((ff->indexAs & (INDEXFLD_T_FULLTEXT | INDEXFLD_T_TAG)) &&
328         (ff->unionType == FLD_VAR_T_CSTR || ff->unionType == FLD_VAR_T_RMS)) {
329       size_t n;
330       DocumentField_GetValueCStr(&aCtx->doc->fields[ii], &n);
331       totalSize += n;
332     }
333   }
334 
335   if (totalSize >= SELF_EXEC_THRESHOLD && AddDocumentCtx_IsBlockable(aCtx)) {
336     ConcurrentSearch_ThreadPoolRun(threadCallback, aCtx, CONCURRENT_POOL_INDEX);
337   } else {
338     Document_AddToIndexes(aCtx);
339   }
340 }
341 
AddDocumentCtx_Free(RSAddDocumentCtx * aCtx)342 void AddDocumentCtx_Free(RSAddDocumentCtx *aCtx) {
343   /**
344    * Free preprocessed data; this is the only reliable place
345    * to do it
346    */
347   for (size_t ii = 0; ii < aCtx->doc->numFields; ++ii) {
348     if (FIELD_IS_VALID(aCtx, ii) && FIELD_IS(aCtx->fspecs + ii, INDEXFLD_T_TAG) &&
349         aCtx->fdatas[ii].tags) {
350       TagIndex_FreePreprocessedData(aCtx->fdatas[ii].tags);
351       aCtx->fdatas[ii].tags = NULL;
352     }
353   }
354 
355   // Destroy the common fields:
356   if (!(aCtx->stateFlags & ACTX_F_NOFREEDOC)) {
357     Document_Free(aCtx->doc);
358   }
359 
360   if (aCtx->sv) {
361     SortingVector_Free(aCtx->sv);
362     aCtx->sv = NULL;
363   }
364 
365   if (aCtx->byteOffsets) {
366     RSByteOffsets_Free(aCtx->byteOffsets);
367     aCtx->byteOffsets = NULL;
368   }
369 
370   if (aCtx->tokenizer) {
371     // aCtx->tokenizer->Free(aCtx->tokenizer);
372     Tokenizer_Release(aCtx->tokenizer);
373     aCtx->tokenizer = NULL;
374   }
375 
376   if (aCtx->oldMd) {
377     DMD_Decref(aCtx->oldMd);
378     aCtx->oldMd = NULL;
379   }
380 
381   ByteOffsetWriter_Cleanup(&aCtx->offsetsWriter);
382   QueryError_ClearError(&aCtx->status);
383 
384   mempool_release(actxPool_g, aCtx);
385 }
386 
387 #define FIELD_HANDLER(name)                                                                \
388   static int name(RSAddDocumentCtx *aCtx, const DocumentField *field, const FieldSpec *fs, \
389                   FieldIndexerData *fdata, QueryError *status)
390 
391 #define FIELD_BULK_INDEXER(name)                                                            \
392   static int name(IndexBulkData *bulk, RSAddDocumentCtx *aCtx, RedisSearchCtx *ctx,         \
393                   const DocumentField *field, const FieldSpec *fs, FieldIndexerData *fdata, \
394                   QueryError *status)
395 
396 #define FIELD_BULK_CTOR(name) \
397   static void name(IndexBulkData *bulk, const FieldSpec *fs, RedisSearchCtx *ctx)
398 #define FIELD_BULK_FINALIZER(name) static void name(IndexBulkData *bulk, RedisSearchCtx *ctx)
399 
400 #define FIELD_PREPROCESSOR FIELD_HANDLER
401 
FIELD_PREPROCESSOR(fulltextPreprocessor)402 FIELD_PREPROCESSOR(fulltextPreprocessor) {
403   if (field->unionType != FLD_VAR_T_CSTR && field->unionType != FLD_VAR_T_RMS) {
404     return -1;
405   }
406 
407   size_t fl;
408   const char *c = DocumentField_GetValueCStr(field, &fl);
409 
410   if (FieldSpec_IsSortable(fs)) {
411     RSSortingVector_Put(aCtx->sv, fs->sortIdx, (void *)c, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
412   }
413 
414   if (FieldSpec_IsIndexable(fs)) {
415     ForwardIndexTokenizerCtx tokCtx;
416     VarintVectorWriter *curOffsetWriter = NULL;
417     RSByteOffsetField *curOffsetField = NULL;
418     if (aCtx->byteOffsets) {
419       curOffsetField = RSByteOffsets_AddField(aCtx->byteOffsets, fs->ftId, aCtx->totalTokens + 1);
420       curOffsetWriter = &aCtx->offsetsWriter;
421     }
422 
423     ForwardIndexTokenizerCtx_Init(&tokCtx, aCtx->fwIdx, c, curOffsetWriter, fs->ftId, fs->ftWeight);
424 
425     uint32_t options = TOKENIZE_DEFAULT_OPTIONS;
426     if (FieldSpec_IsNoStem(fs)) {
427       options |= TOKENIZE_NOSTEM;
428     }
429     if (FieldSpec_IsPhonetics(fs)) {
430       options |= TOKENIZE_PHONETICS;
431     }
432     aCtx->tokenizer->Start(aCtx->tokenizer, (char *)c, fl, options);
433 
434     Token tok = {0};
435     uint32_t newTokPos;
436     while (0 != (newTokPos = aCtx->tokenizer->Next(aCtx->tokenizer, &tok))) {
437       forwardIndexTokenFunc(&tokCtx, &tok);
438     }
439     uint32_t lastTokPos = aCtx->tokenizer->ctx.lastOffset;
440 
441     if (curOffsetField) {
442       curOffsetField->lastTokPos = lastTokPos;
443     }
444     aCtx->totalTokens = lastTokPos;
445     Token_Destroy(&tok);
446   }
447   return 0;
448 }
449 
FIELD_PREPROCESSOR(numericPreprocessor)450 FIELD_PREPROCESSOR(numericPreprocessor) {
451   char *end;
452   switch (field->unionType) {
453     case FLD_VAR_T_RMS:
454       if (RedisModule_StringToDouble(field->text, &fdata->numeric) == REDISMODULE_ERR) {
455         QueryError_SetCode(status, QUERY_ENOTNUMERIC);
456         return -1;
457       }
458       break;
459     case FLD_VAR_T_CSTR:
460       fdata->numeric = strtod(field->strval, &end);
461       if (*end) {
462         QueryError_SetCode(status, QUERY_ENOTNUMERIC);
463         return -1;
464       }
465       break;
466     case FLD_VAR_T_NUM:
467       fdata->numeric = field->numval;
468       break;
469     default:
470       return -1;
471   }
472 
473   // If this is a sortable numeric value - copy the value to the sorting vector
474   if (FieldSpec_IsSortable(fs)) {
475     RSSortingVector_Put(aCtx->sv, fs->sortIdx, &fdata->numeric, RS_SORTABLE_NUM, 0);
476   }
477   return 0;
478 }
479 
FIELD_BULK_INDEXER(numericIndexer)480 FIELD_BULK_INDEXER(numericIndexer) {
481   NumericRangeTree *rt = bulk->indexDatas[IXFLDPOS_NUMERIC];
482   if (!rt) {
483     RedisModuleString *keyName = IndexSpec_GetFormattedKey(ctx->spec, fs, INDEXFLD_T_NUMERIC);
484     rt = bulk->indexDatas[IXFLDPOS_NUMERIC] =
485         OpenNumericIndex(ctx, keyName, &bulk->indexKeys[IXFLDPOS_NUMERIC]);
486     if (!rt) {
487       QueryError_SetError(status, QUERY_EGENERIC, "Could not open numeric index for indexing");
488       return -1;
489     }
490   }
491   NRN_AddRv rv = NumericRangeTree_Add(rt, aCtx->doc->docId, fdata->numeric);
492   ctx->spec->stats.invertedSize += rv.sz;  // TODO: exact amount
493   ctx->spec->stats.numRecords += rv.numRecords;
494   return 0;
495 }
496 
FIELD_PREPROCESSOR(geoPreprocessor)497 FIELD_PREPROCESSOR(geoPreprocessor) {
498   size_t len;
499   const char *str = NULL;
500   double lat = 0, lon = 0;
501 
502   switch (field->unionType) {
503     case FLD_VAR_T_GEO:
504       lon = field->lon;
505       lat = field->lat;
506       break;
507     case FLD_VAR_T_CSTR:
508     case FLD_VAR_T_RMS:
509       str = DocumentField_GetValueCStr(field, &len);
510       if (parseGeo(str, len, &lon, &lat) != REDISMODULE_OK) {
511         return REDISMODULE_ERR;
512       }
513       break;
514     case FLD_VAR_T_ARRAY:
515     case FLD_VAR_T_NUM:
516       RS_LOG_ASSERT(0, "Oops");
517   }
518   double geohash = calcGeoHash(lon, lat);
519   if (geohash == INVALID_GEOHASH) {
520     return REDISMODULE_ERR;
521   }
522   fdata->numeric = geohash;
523 
524   if (FieldSpec_IsSortable(fs)) {
525     if (str) {
526       RSSortingVector_Put(aCtx->sv, fs->sortIdx, str, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
527     } else {
528       RSSortingVector_Put(aCtx->sv, fs->sortIdx, &fdata->numeric, RS_SORTABLE_NUM, 0);
529     }
530   }
531 
532   return 0;
533 }
534 
FIELD_PREPROCESSOR(tagPreprocessor)535 FIELD_PREPROCESSOR(tagPreprocessor) {
536   fdata->tags = TagIndex_Preprocess(fs->tagSep, fs->tagFlags, field);
537 
538   if (fdata->tags == NULL) {
539     return 0;
540   }
541   if (FieldSpec_IsSortable(fs) && isSpecHash(aCtx->spec)) {
542     size_t fl;
543     const char *str = DocumentField_GetValueCStr(field, &fl);
544     RSSortingVector_Put(aCtx->sv, fs->sortIdx, str, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
545   }
546   return 0;
547 }
548 
FIELD_BULK_INDEXER(tagIndexer)549 FIELD_BULK_INDEXER(tagIndexer) {
550   TagIndex *tidx = bulk->indexDatas[IXFLDPOS_TAG];
551   if (!tidx) {
552     RedisModuleString *kname = IndexSpec_GetFormattedKey(ctx->spec, fs, INDEXFLD_T_TAG);
553     tidx = bulk->indexDatas[IXFLDPOS_TAG] =
554         TagIndex_Open(ctx, kname, 1, &bulk->indexKeys[IXFLDPOS_TAG]);
555     if (!tidx) {
556       QueryError_SetError(status, QUERY_EGENERIC, "Could not open tag index for indexing");
557       return -1;
558     }
559   }
560 
561   ctx->spec->stats.invertedSize +=
562       TagIndex_Index(tidx, (const char **)fdata->tags, array_len(fdata->tags), aCtx->doc->docId);
563   ctx->spec->stats.numRecords++;
564   return 0;
565 }
566 
567 static PreprocessorFunc preprocessorMap[] = {
568     // nl break
569     [IXFLDPOS_FULLTEXT] = fulltextPreprocessor,
570     [IXFLDPOS_NUMERIC] = numericPreprocessor,
571     [IXFLDPOS_GEO] = geoPreprocessor,
572     [IXFLDPOS_TAG] = tagPreprocessor};
573 
IndexerBulkAdd(IndexBulkData * bulk,RSAddDocumentCtx * cur,RedisSearchCtx * sctx,const DocumentField * field,const FieldSpec * fs,FieldIndexerData * fdata,QueryError * status)574 int IndexerBulkAdd(IndexBulkData *bulk, RSAddDocumentCtx *cur, RedisSearchCtx *sctx,
575                    const DocumentField *field, const FieldSpec *fs, FieldIndexerData *fdata,
576                    QueryError *status) {
577   int rc = 0;
578   for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES && rc == 0; ++ii) {
579     // see which types are supported in the current field...
580     if (field->indexAs & INDEXTYPE_FROM_POS(ii)) {
581       switch (ii) {
582         case IXFLDPOS_TAG:
583           rc = tagIndexer(bulk, cur, sctx, field, fs, fdata, status);
584           break;
585         case IXFLDPOS_NUMERIC:
586         case IXFLDPOS_GEO:
587           rc = numericIndexer(bulk, cur, sctx, field, fs, fdata, status);
588           break;
589         case IXFLDPOS_FULLTEXT:
590           break;
591         default:
592           rc = -1;
593           QueryError_SetError(status, QUERY_EINVAL, "BUG: invalid index type");
594           break;
595       }
596     }
597   }
598   return rc;
599 }
600 
IndexerBulkCleanup(IndexBulkData * cur,RedisSearchCtx * sctx)601 void IndexerBulkCleanup(IndexBulkData *cur, RedisSearchCtx *sctx) {
602   for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES; ++ii) {
603     if (cur->indexKeys[ii]) {
604       RedisModule_CloseKey(cur->indexKeys[ii]);
605     }
606   }
607 }
608 
Document_AddToIndexes(RSAddDocumentCtx * aCtx)609 int Document_AddToIndexes(RSAddDocumentCtx *aCtx) {
610   Document *doc = aCtx->doc;
611   int ourRv = REDISMODULE_OK;
612 
613   for (size_t i = 0; i < doc->numFields; i++) {
614     const FieldSpec *fs = aCtx->fspecs + i;
615     const DocumentField *ff = doc->fields + i;
616     FieldIndexerData *fdata = aCtx->fdatas + i;
617 
618     for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES; ++ii) {
619       if (!FIELD_CHKIDX(ff->indexAs, INDEXTYPE_FROM_POS(ii))) {
620         continue;
621       }
622 
623       PreprocessorFunc pp = preprocessorMap[ii];
624       if (pp(aCtx, &doc->fields[i], fs, fdata, &aCtx->status) != 0) {
625         if (!AddDocumentCtx_IsBlockable(aCtx)) {
626           ++aCtx->spec->stats.indexingFailures;
627         } else {
628           RedisModule_ThreadSafeContextLock(RSDummyContext);
629           IndexSpec *spec = IndexSpec_Load(RSDummyContext, aCtx->specName, 0);
630           if (spec && aCtx->specId == spec->uniqueId) {
631             ++spec->stats.indexingFailures;
632           }
633           RedisModule_ThreadSafeContextUnlock(RSDummyContext);
634         }
635         ourRv = REDISMODULE_ERR;
636         goto cleanup;
637       }
638     }
639   }
640 
641   if (Indexer_Add(aCtx->indexer, aCtx) != 0) {
642     ourRv = REDISMODULE_ERR;
643     goto cleanup;
644   }
645 
646 cleanup:
647   if (ourRv != REDISMODULE_OK) {
648     // if a document did not load properly, it is deleted
649     // to prevent mismatch of index and hash
650     DocTable_DeleteR(&aCtx->spec->docs, doc->docKey);
651 
652     QueryError_SetCode(&aCtx->status, QUERY_EGENERIC);
653     AddDocumentCtx_Finish(aCtx);
654   }
655   return ourRv;
656 }
657 
658 /* Evaluate an IF expression (e.g. IF "@foo == 'bar'") against a document, by getting the properties
659  * from the sorting table or from the hash representation of the document.
660  *
661  * NOTE: This is disconnected from the document indexing flow, and loads the document and discards
662  * of it internally
663  *
664  * Returns  REDISMODULE_ERR on failure, OK otherwise*/
Document_EvalExpression(RedisSearchCtx * sctx,RedisModuleString * key,const char * expr,int * result,QueryError * status)665 int Document_EvalExpression(RedisSearchCtx *sctx, RedisModuleString *key, const char *expr,
666                             int *result, QueryError *status) {
667 
668   int rc = REDISMODULE_ERR;
669   const RSDocumentMetadata *dmd = DocTable_GetByKeyR(&sctx->spec->docs, key);
670   if (!dmd) {
671     // We don't know the document...
672     QueryError_SetError(status, QUERY_ENODOC, "");
673     return REDISMODULE_ERR;
674   }
675 
676   // Try to parser the expression first, fail if we can't
677   RSExpr *e = ExprAST_Parse(expr, strlen(expr), status);
678   if (!e) {
679     return REDISMODULE_ERR;
680   }
681 
682   if (QueryError_HasError(status)) {
683     RSExpr_Free(e);
684     return REDISMODULE_ERR;
685   }
686 
687   RLookup lookup_s;
688   RLookupRow row = {0};
689   IndexSpecCache *spcache = IndexSpec_GetSpecCache(sctx->spec);
690   RLookup_Init(&lookup_s, spcache);
691   if (ExprAST_GetLookupKeys(e, &lookup_s, status) == EXPR_EVAL_ERR) {
692     goto done;
693   }
694 
695   RLookupLoadOptions loadopts = {.sctx = sctx, .dmd = dmd, .status = status};
696   if (RLookup_LoadDocument(&lookup_s, &row, &loadopts) != REDISMODULE_OK) {
697     // printf("Couldn't load document!\n");
698     goto done;
699   }
700 
701   ExprEval evaluator = {.err = status, .lookup = &lookup_s, .res = NULL, .srcrow = &row, .root = e};
702   RSValue rv = RSVALUE_STATIC;
703   if (ExprEval_Eval(&evaluator, &rv) != EXPR_EVAL_OK) {
704     // printf("Eval not OK!!! SAD!!\n");
705     goto done;
706   }
707 
708   *result = RSValue_BoolTest(&rv);
709   RSValue_Clear(&rv);
710   rc = REDISMODULE_OK;
711 
712 // Clean up:
713 done:
714   if (e) {
715     ExprAST_Free(e);
716   }
717   RLookupRow_Cleanup(&row);
718   RLookup_Cleanup(&lookup_s);
719   return rc;
720 }
721 
AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)722 static void AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
723 #define BAIL(s)                                            \
724   do {                                                     \
725     QueryError_SetError(&aCtx->status, QUERY_EGENERIC, s); \
726     goto done;                                             \
727   } while (0);
728 
729   Document *doc = aCtx->doc;
730   t_docId docId = DocTable_GetIdR(&sctx->spec->docs, doc->docKey);
731   if (docId == 0) {
732     BAIL("Couldn't load old document");
733   }
734   RSDocumentMetadata *md = DocTable_Get(&sctx->spec->docs, docId);
735   if (!md) {
736     BAIL("Couldn't load document metadata");
737   }
738 
739   // Update the score
740   md->score = doc->score;
741   // Set the payload if needed
742   if (doc->payload) {
743     DocTable_SetPayload(&sctx->spec->docs, md, doc->payload, doc->payloadSize);
744   }
745 
746   if (aCtx->stateFlags & ACTX_F_SORTABLES) {
747     FieldSpecDedupeArray dedupes = {0};
748     // Update sortables if needed
749     for (int i = 0; i < doc->numFields; i++) {
750       DocumentField *f = &doc->fields[i];
751       const FieldSpec *fs = IndexSpec_GetField(sctx->spec, f->name, strlen(f->name));
752       if (fs == NULL || !FieldSpec_IsSortable(fs)) {
753         continue;
754       }
755 
756       if (dedupes[fs->index]) {
757         BAIL(DUP_FIELD_ERRSTR);
758       }
759 
760       dedupes[fs->index] = 1;
761 
762       int idx = IndexSpec_GetFieldSortingIndex(sctx->spec, f->name, strlen(f->name));
763       if (idx < 0) continue;
764 
765       if (!md->sortVector) {
766         md->sortVector = NewSortingVector(sctx->spec->sortables->len);
767       }
768 
769       RS_LOG_ASSERT((fs->options & FieldSpec_Dynamic) == 0, "Dynamic field cannot use PARTIAL");
770 
771       switch (fs->types) {
772         case INDEXFLD_T_FULLTEXT:
773         case INDEXFLD_T_TAG:
774         case INDEXFLD_T_GEO:
775           RSSortingVector_Put(md->sortVector, idx, (void *)RedisModule_StringPtrLen(f->text, NULL),
776                               RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
777           break;
778         case INDEXFLD_T_NUMERIC: {
779           double numval;
780           if (RedisModule_StringToDouble(f->text, &numval) == REDISMODULE_ERR) {
781             BAIL("Could not parse numeric index value");
782           }
783           RSSortingVector_Put(md->sortVector, idx, &numval, RS_SORTABLE_NUM, 0);
784           break;
785         }
786         default:
787           BAIL("Unsupported sortable type");
788           break;
789       }
790     }
791   }
792 
793 done:
794   if (aCtx->donecb) {
795     aCtx->donecb(aCtx, sctx->redisCtx, aCtx->donecbData);
796   }
797   AddDocumentCtx_Free(aCtx);
798 }
799 
Document_GetField(Document * d,const char * fieldName)800 DocumentField *Document_GetField(Document *d, const char *fieldName) {
801   if (!d || !fieldName) return NULL;
802 
803   for (int i = 0; i < d->numFields; i++) {
804     if (!strcasecmp(d->fields[i].name, fieldName)) {
805       return &d->fields[i];
806     }
807   }
808   return NULL;
809 }
810 
DocumentField_GetValueCStr(const DocumentField * df,size_t * len)811 const char *DocumentField_GetValueCStr(const DocumentField *df, size_t *len) {
812   *len = 0;
813   switch (df->unionType) {
814     case FLD_VAR_T_RMS:
815       return RedisModule_StringPtrLen(df->text, len);
816     case FLD_VAR_T_CSTR:
817       *len = df->strlen;
818       return df->strval;
819     case FLD_VAR_T_NUM:
820     case FLD_VAR_T_GEO:
821     case FLD_VAR_T_ARRAY:
822       RS_LOG_ASSERT(0, "invalid types");
823   }
824   return NULL;
825 }
826