1 #include <string.h>
2 #include <inttypes.h>
3
4 #include "document.h"
5 #include "forward_index.h"
6 #include "numeric_filter.h"
7 #include "numeric_index.h"
8 #include "rmutil/strings.h"
9 #include "rmutil/util.h"
10 #include "util/mempool.h"
11 #include "spec.h"
12 #include "tokenize.h"
13 #include "util/logging.h"
14 #include "rmalloc.h"
15 #include "indexer.h"
16 #include "tag_index.h"
17 #include "aggregate/expr/expression.h"
18 #include "rmutil/rm_assert.h"
19
20 // Memory pool for RSAddDocumentContext contexts
21 static mempool_t *actxPool_g = NULL;
22 extern RedisModuleCtx *RSDummyContext;
23 // For documentation, see these functions' definitions
allocDocumentContext(void)24 static void *allocDocumentContext(void) {
25 // See if there's one in the pool?
26 RSAddDocumentCtx *aCtx = rm_calloc(1, sizeof(*aCtx));
27 return aCtx;
28 }
29
freeDocumentContext(void * p)30 static void freeDocumentContext(void *p) {
31 RSAddDocumentCtx *aCtx = p;
32 if (aCtx->fwIdx) {
33 ForwardIndexFree(aCtx->fwIdx);
34 }
35
36 rm_free(aCtx->fspecs);
37 rm_free(aCtx->fdatas);
38 rm_free(aCtx->specName);
39 rm_free(aCtx);
40 }
41
42 #define DUP_FIELD_ERRSTR "Requested to index field twice"
43
44 #define FIELD_IS_VALID(aCtx, ix) ((aCtx)->fspecs[ix].name != NULL)
45
AddDocumentCtx_SetDocument(RSAddDocumentCtx * aCtx,IndexSpec * sp)46 static int AddDocumentCtx_SetDocument(RSAddDocumentCtx *aCtx, IndexSpec *sp) {
47 Document *doc = aCtx->doc;
48 aCtx->stateFlags &= ~ACTX_F_INDEXABLES;
49 aCtx->stateFlags &= ~ACTX_F_TEXTINDEXED;
50 aCtx->stateFlags &= ~ACTX_F_OTHERINDEXED;
51
52 aCtx->fspecs = rm_realloc(aCtx->fspecs, sizeof(*aCtx->fspecs) * doc->numFields);
53 aCtx->fdatas = rm_realloc(aCtx->fdatas, sizeof(*aCtx->fdatas) * doc->numFields);
54
55 for (size_t ii = 0; ii < doc->numFields; ++ii) {
56 // zero out field data. We check at the destructor to see if there is any
57 // left-over tag data here; if we've realloc'd, then this contains
58 // garbage
59 aCtx->fdatas[ii].tags = NULL;
60 }
61
62 size_t numTextIndexable = 0;
63
64 // size: uint16_t * SPEC_MAX_FIELDS
65 FieldSpecDedupeArray dedupe = {0};
66 int hasTextFields = 0;
67 int hasOtherFields = 0;
68
69 for (size_t i = 0; i < doc->numFields; i++) {
70 DocumentField *f = doc->fields + i;
71 const FieldSpec *fs = IndexSpec_GetField(sp, f->name, strlen(f->name));
72 if (!fs || (isSpecHash(sp) && !f->text)) {
73 aCtx->fspecs[i].name = NULL;
74 aCtx->fspecs[i].path = NULL;
75 aCtx->fspecs[i].types = 0;
76 continue;
77 }
78
79 aCtx->fspecs[i] = *fs;
80 if (dedupe[fs->index]) {
81 QueryError_SetErrorFmt(&aCtx->status, QUERY_EDUPFIELD, "Tried to insert `%s` twice",
82 fs->name);
83 return -1;
84 }
85
86 dedupe[fs->index] = 1;
87
88 if (FieldSpec_IsSortable(fs)) {
89 // mark sortable fields to be updated in the state flags
90 aCtx->stateFlags |= ACTX_F_SORTABLES;
91 }
92
93 // See what we want the given field indexed as:
94 if (!f->indexAs) {
95 f->indexAs = fs->types;
96 } else {
97 // Verify the flags:
98 if ((f->indexAs & fs->types) != f->indexAs) {
99 QueryError_SetErrorFmt(&aCtx->status, QUERY_EUNSUPPTYPE,
100 "Tried to index field %s as type not specified in schema", fs->name);
101 return -1;
102 }
103 }
104
105 if (FieldSpec_IsIndexable(fs)) {
106 if (f->indexAs & INDEXFLD_T_FULLTEXT) {
107 numTextIndexable++;
108 hasTextFields = 1;
109 }
110
111 if (f->indexAs != INDEXFLD_T_FULLTEXT) {
112 // has non-text but indexable fields
113 hasOtherFields = 1;
114 }
115 }
116 }
117
118 if (hasTextFields || hasOtherFields) {
119 aCtx->stateFlags |= ACTX_F_INDEXABLES;
120 } else {
121 aCtx->stateFlags &= ~ACTX_F_INDEXABLES;
122 }
123
124 if (!hasTextFields) {
125 aCtx->stateFlags |= ACTX_F_TEXTINDEXED;
126 } else {
127 aCtx->stateFlags &= ~ACTX_F_TEXTINDEXED;
128 }
129
130 if (!hasOtherFields) {
131 aCtx->stateFlags |= ACTX_F_OTHERINDEXED;
132 } else {
133 aCtx->stateFlags &= ~ACTX_F_OTHERINDEXED;
134 }
135
136 if ((aCtx->stateFlags & ACTX_F_SORTABLES) && aCtx->sv == NULL) {
137 aCtx->sv = NewSortingVector(sp->sortables->len);
138 }
139
140 int empty = (aCtx->sv == NULL) && !hasTextFields && !hasOtherFields;
141 if (empty) {
142 aCtx->stateFlags |= ACTX_F_EMPTY;
143 }
144
145 if ((aCtx->options & DOCUMENT_ADD_NOSAVE) == 0 && numTextIndexable &&
146 (sp->flags & Index_StoreByteOffsets)) {
147 if (!aCtx->byteOffsets) {
148 aCtx->byteOffsets = NewByteOffsets();
149 ByteOffsetWriter_Init(&aCtx->offsetsWriter);
150 }
151 RSByteOffsets_ReserveFields(aCtx->byteOffsets, numTextIndexable);
152 }
153 return 0;
154 }
155
NewAddDocumentCtx(IndexSpec * sp,Document * doc,QueryError * status)156 RSAddDocumentCtx *NewAddDocumentCtx(IndexSpec *sp, Document *doc, QueryError *status) {
157
158 if (!actxPool_g) {
159 mempool_options mopts = {.initialCap = 16,
160 .alloc = allocDocumentContext,
161 .free = freeDocumentContext,
162 .isGlobal = 1};
163 actxPool_g = mempool_new(&mopts);
164 }
165
166 // Get a new context
167 RSAddDocumentCtx *aCtx = mempool_get(actxPool_g);
168 aCtx->stateFlags = 0;
169 QueryError_ClearError(&aCtx->status);
170 aCtx->totalTokens = 0;
171 aCtx->docFlags = 0;
172 aCtx->client.bc = NULL;
173 aCtx->next = NULL;
174 aCtx->specFlags = sp->flags;
175 aCtx->indexer = sp->indexer;
176 aCtx->spec = sp;
177 if (aCtx->specFlags & Index_Async) {
178 size_t len = strlen(sp->name) + 1;
179 if (aCtx->specName == NULL) {
180 aCtx->specName = rm_malloc(len);
181 } else if (len > aCtx->specNameLen) {
182 aCtx->specName = rm_realloc(aCtx->specName, len);
183 aCtx->specNameLen = len;
184 }
185 strncpy(aCtx->specName, sp->name, len);
186 aCtx->specId = sp->uniqueId;
187 }
188 RS_LOG_ASSERT(sp->indexer, "No indexer");
189 Indexer_Incref(aCtx->indexer);
190
191 // Assign the document:
192 aCtx->doc = doc;
193 if (AddDocumentCtx_SetDocument(aCtx, sp) != 0) {
194 *status = aCtx->status;
195 aCtx->status.detail = NULL;
196 mempool_release(actxPool_g, aCtx);
197 return NULL;
198 }
199
200 // try to reuse the forward index on recycled contexts
201 if (aCtx->fwIdx) {
202 ForwardIndex_Reset(aCtx->fwIdx, aCtx->doc, sp->flags);
203 } else {
204 aCtx->fwIdx = NewForwardIndex(aCtx->doc, sp->flags);
205 }
206
207 if (sp->smap) {
208 // we get a read only copy of the synonym map for accessing in the index thread with out worring
209 // about thready safe issues
210 aCtx->fwIdx->smap = SynonymMap_GetReadOnlyCopy(sp->smap);
211 } else {
212 aCtx->fwIdx->smap = NULL;
213 }
214
215 aCtx->tokenizer = GetTokenizer(doc->language, aCtx->fwIdx->stemmer, sp->stopwords);
216 // aCtx->doc->docId = 0;
217 return aCtx;
218 }
219
doReplyFinish(RSAddDocumentCtx * aCtx,RedisModuleCtx * ctx)220 static void doReplyFinish(RSAddDocumentCtx *aCtx, RedisModuleCtx *ctx) {
221 if (aCtx->donecb) {
222 aCtx->donecb(aCtx, ctx, aCtx->donecbData);
223 }
224 Indexer_Decref(aCtx->indexer);
225 AddDocumentCtx_Free(aCtx);
226 }
227
replyCallback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)228 static int replyCallback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
229 RSAddDocumentCtx *aCtx = RedisModule_GetBlockedClientPrivateData(ctx);
230 doReplyFinish(aCtx, ctx);
231 return REDISMODULE_OK;
232 }
233
threadCallback(void * p)234 static void threadCallback(void *p) {
235 Document_AddToIndexes(p);
236 }
237
AddDocumentCtx_Finish(RSAddDocumentCtx * aCtx)238 void AddDocumentCtx_Finish(RSAddDocumentCtx *aCtx) {
239 if (aCtx->stateFlags & ACTX_F_NOBLOCK) {
240 doReplyFinish(aCtx, aCtx->client.sctx->redisCtx);
241 } else {
242 RedisModule_UnblockClient(aCtx->client.bc, aCtx);
243 }
244 }
245
246 // How many bytes in a document to warrant it being tokenized in a separate thread
247 #define SELF_EXEC_THRESHOLD 1024
248
249 // LCOV_EXCL_START debug
Document_Dump(const Document * doc)250 void Document_Dump(const Document *doc) {
251 printf("Document Key: %s. ID=%" PRIu64 "\n", RedisModule_StringPtrLen(doc->docKey, NULL),
252 doc->docId);
253 for (size_t ii = 0; ii < doc->numFields; ++ii) {
254 printf(" [%lu]: %s => %s\n", ii, doc->fields[ii].name,
255 RedisModule_StringPtrLen(doc->fields[ii].text, NULL));
256 }
257 }
258 // LCOV_EXCL_STOP
259
260 static void AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx);
261 int Document_LoadSchemaFieldJson(Document *doc, RedisSearchCtx *sctx);
262
AddDocumentCtx_ReplaceMerge(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)263 static int AddDocumentCtx_ReplaceMerge(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
264 /**
265 * The REPLACE operation contains fields which must be reindexed. This means
266 * that a new document ID needs to be assigned, and as a consequence, all
267 * fields must be reindexed.
268 */
269 int rv = REDISMODULE_ERR;
270
271 Document_Clear(aCtx->doc);
272
273 // Path is not covered and is not relevant
274
275 DocumentType ruleType = sctx->spec->rule->type;
276 if (ruleType == DocumentType_Hash) {
277 rv = Document_LoadSchemaFieldHash(aCtx->doc, sctx);
278 } else if (ruleType == DocumentType_Json) {
279 rv = Document_LoadSchemaFieldJson(aCtx->doc, sctx);
280 }
281 if (rv != REDISMODULE_OK) {
282 QueryError_SetError(&aCtx->status, QUERY_ENODOC, "Could not load existing document");
283 aCtx->donecb(aCtx, sctx->redisCtx, aCtx->donecbData);
284 AddDocumentCtx_Free(aCtx);
285 return 1;
286 }
287
288 // Keep hold of the new fields.
289 Document_MakeStringsOwner(aCtx->doc);
290 AddDocumentCtx_SetDocument(aCtx, sctx->spec);
291 return 0;
292 }
293
handlePartialUpdate(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)294 static int handlePartialUpdate(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
295 // Handle partial update of fields
296 if (aCtx->stateFlags & ACTX_F_INDEXABLES) {
297 return AddDocumentCtx_ReplaceMerge(aCtx, sctx);
298 } else {
299 // No indexable fields are updated, we can just update the metadata.
300 // Quick update just updates the score, payload and sortable fields of the document.
301 // Thus full-reindexing of the document is not required
302 AddDocumentCtx_UpdateNoIndex(aCtx, sctx);
303 return 1;
304 }
305 }
306
AddDocumentCtx_Submit(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx,uint32_t options)307 void AddDocumentCtx_Submit(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx, uint32_t options) {
308 aCtx->options = options;
309 if ((aCtx->options & DOCUMENT_ADD_PARTIAL) && handlePartialUpdate(aCtx, sctx)) {
310 return;
311 }
312
313 // We actually modify (!) the strings in the document, so we always require
314 // ownership
315 Document_MakeStringsOwner(aCtx->doc);
316
317 if (AddDocumentCtx_IsBlockable(aCtx)) {
318 aCtx->client.bc = RedisModule_BlockClient(sctx->redisCtx, replyCallback, NULL, NULL, 0);
319 } else {
320 aCtx->client.sctx = sctx;
321 }
322
323 RS_LOG_ASSERT(aCtx->client.bc, "No blocked client");
324 size_t totalSize = 0;
325 for (size_t ii = 0; ii < aCtx->doc->numFields; ++ii) {
326 const DocumentField *ff = aCtx->doc->fields + ii;
327 if ((ff->indexAs & (INDEXFLD_T_FULLTEXT | INDEXFLD_T_TAG)) &&
328 (ff->unionType == FLD_VAR_T_CSTR || ff->unionType == FLD_VAR_T_RMS)) {
329 size_t n;
330 DocumentField_GetValueCStr(&aCtx->doc->fields[ii], &n);
331 totalSize += n;
332 }
333 }
334
335 if (totalSize >= SELF_EXEC_THRESHOLD && AddDocumentCtx_IsBlockable(aCtx)) {
336 ConcurrentSearch_ThreadPoolRun(threadCallback, aCtx, CONCURRENT_POOL_INDEX);
337 } else {
338 Document_AddToIndexes(aCtx);
339 }
340 }
341
AddDocumentCtx_Free(RSAddDocumentCtx * aCtx)342 void AddDocumentCtx_Free(RSAddDocumentCtx *aCtx) {
343 /**
344 * Free preprocessed data; this is the only reliable place
345 * to do it
346 */
347 for (size_t ii = 0; ii < aCtx->doc->numFields; ++ii) {
348 if (FIELD_IS_VALID(aCtx, ii) && FIELD_IS(aCtx->fspecs + ii, INDEXFLD_T_TAG) &&
349 aCtx->fdatas[ii].tags) {
350 TagIndex_FreePreprocessedData(aCtx->fdatas[ii].tags);
351 aCtx->fdatas[ii].tags = NULL;
352 }
353 }
354
355 // Destroy the common fields:
356 if (!(aCtx->stateFlags & ACTX_F_NOFREEDOC)) {
357 Document_Free(aCtx->doc);
358 }
359
360 if (aCtx->sv) {
361 SortingVector_Free(aCtx->sv);
362 aCtx->sv = NULL;
363 }
364
365 if (aCtx->byteOffsets) {
366 RSByteOffsets_Free(aCtx->byteOffsets);
367 aCtx->byteOffsets = NULL;
368 }
369
370 if (aCtx->tokenizer) {
371 // aCtx->tokenizer->Free(aCtx->tokenizer);
372 Tokenizer_Release(aCtx->tokenizer);
373 aCtx->tokenizer = NULL;
374 }
375
376 if (aCtx->oldMd) {
377 DMD_Decref(aCtx->oldMd);
378 aCtx->oldMd = NULL;
379 }
380
381 ByteOffsetWriter_Cleanup(&aCtx->offsetsWriter);
382 QueryError_ClearError(&aCtx->status);
383
384 mempool_release(actxPool_g, aCtx);
385 }
386
387 #define FIELD_HANDLER(name) \
388 static int name(RSAddDocumentCtx *aCtx, const DocumentField *field, const FieldSpec *fs, \
389 FieldIndexerData *fdata, QueryError *status)
390
391 #define FIELD_BULK_INDEXER(name) \
392 static int name(IndexBulkData *bulk, RSAddDocumentCtx *aCtx, RedisSearchCtx *ctx, \
393 const DocumentField *field, const FieldSpec *fs, FieldIndexerData *fdata, \
394 QueryError *status)
395
396 #define FIELD_BULK_CTOR(name) \
397 static void name(IndexBulkData *bulk, const FieldSpec *fs, RedisSearchCtx *ctx)
398 #define FIELD_BULK_FINALIZER(name) static void name(IndexBulkData *bulk, RedisSearchCtx *ctx)
399
400 #define FIELD_PREPROCESSOR FIELD_HANDLER
401
FIELD_PREPROCESSOR(fulltextPreprocessor)402 FIELD_PREPROCESSOR(fulltextPreprocessor) {
403 if (field->unionType != FLD_VAR_T_CSTR && field->unionType != FLD_VAR_T_RMS) {
404 return -1;
405 }
406
407 size_t fl;
408 const char *c = DocumentField_GetValueCStr(field, &fl);
409
410 if (FieldSpec_IsSortable(fs)) {
411 RSSortingVector_Put(aCtx->sv, fs->sortIdx, (void *)c, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
412 }
413
414 if (FieldSpec_IsIndexable(fs)) {
415 ForwardIndexTokenizerCtx tokCtx;
416 VarintVectorWriter *curOffsetWriter = NULL;
417 RSByteOffsetField *curOffsetField = NULL;
418 if (aCtx->byteOffsets) {
419 curOffsetField = RSByteOffsets_AddField(aCtx->byteOffsets, fs->ftId, aCtx->totalTokens + 1);
420 curOffsetWriter = &aCtx->offsetsWriter;
421 }
422
423 ForwardIndexTokenizerCtx_Init(&tokCtx, aCtx->fwIdx, c, curOffsetWriter, fs->ftId, fs->ftWeight);
424
425 uint32_t options = TOKENIZE_DEFAULT_OPTIONS;
426 if (FieldSpec_IsNoStem(fs)) {
427 options |= TOKENIZE_NOSTEM;
428 }
429 if (FieldSpec_IsPhonetics(fs)) {
430 options |= TOKENIZE_PHONETICS;
431 }
432 aCtx->tokenizer->Start(aCtx->tokenizer, (char *)c, fl, options);
433
434 Token tok = {0};
435 uint32_t newTokPos;
436 while (0 != (newTokPos = aCtx->tokenizer->Next(aCtx->tokenizer, &tok))) {
437 forwardIndexTokenFunc(&tokCtx, &tok);
438 }
439 uint32_t lastTokPos = aCtx->tokenizer->ctx.lastOffset;
440
441 if (curOffsetField) {
442 curOffsetField->lastTokPos = lastTokPos;
443 }
444 aCtx->totalTokens = lastTokPos;
445 Token_Destroy(&tok);
446 }
447 return 0;
448 }
449
FIELD_PREPROCESSOR(numericPreprocessor)450 FIELD_PREPROCESSOR(numericPreprocessor) {
451 char *end;
452 switch (field->unionType) {
453 case FLD_VAR_T_RMS:
454 if (RedisModule_StringToDouble(field->text, &fdata->numeric) == REDISMODULE_ERR) {
455 QueryError_SetCode(status, QUERY_ENOTNUMERIC);
456 return -1;
457 }
458 break;
459 case FLD_VAR_T_CSTR:
460 fdata->numeric = strtod(field->strval, &end);
461 if (*end) {
462 QueryError_SetCode(status, QUERY_ENOTNUMERIC);
463 return -1;
464 }
465 break;
466 case FLD_VAR_T_NUM:
467 fdata->numeric = field->numval;
468 break;
469 default:
470 return -1;
471 }
472
473 // If this is a sortable numeric value - copy the value to the sorting vector
474 if (FieldSpec_IsSortable(fs)) {
475 RSSortingVector_Put(aCtx->sv, fs->sortIdx, &fdata->numeric, RS_SORTABLE_NUM, 0);
476 }
477 return 0;
478 }
479
FIELD_BULK_INDEXER(numericIndexer)480 FIELD_BULK_INDEXER(numericIndexer) {
481 NumericRangeTree *rt = bulk->indexDatas[IXFLDPOS_NUMERIC];
482 if (!rt) {
483 RedisModuleString *keyName = IndexSpec_GetFormattedKey(ctx->spec, fs, INDEXFLD_T_NUMERIC);
484 rt = bulk->indexDatas[IXFLDPOS_NUMERIC] =
485 OpenNumericIndex(ctx, keyName, &bulk->indexKeys[IXFLDPOS_NUMERIC]);
486 if (!rt) {
487 QueryError_SetError(status, QUERY_EGENERIC, "Could not open numeric index for indexing");
488 return -1;
489 }
490 }
491 NRN_AddRv rv = NumericRangeTree_Add(rt, aCtx->doc->docId, fdata->numeric);
492 ctx->spec->stats.invertedSize += rv.sz; // TODO: exact amount
493 ctx->spec->stats.numRecords += rv.numRecords;
494 return 0;
495 }
496
FIELD_PREPROCESSOR(geoPreprocessor)497 FIELD_PREPROCESSOR(geoPreprocessor) {
498 size_t len;
499 const char *str = NULL;
500 double lat = 0, lon = 0;
501
502 switch (field->unionType) {
503 case FLD_VAR_T_GEO:
504 lon = field->lon;
505 lat = field->lat;
506 break;
507 case FLD_VAR_T_CSTR:
508 case FLD_VAR_T_RMS:
509 str = DocumentField_GetValueCStr(field, &len);
510 if (parseGeo(str, len, &lon, &lat) != REDISMODULE_OK) {
511 return REDISMODULE_ERR;
512 }
513 break;
514 case FLD_VAR_T_ARRAY:
515 case FLD_VAR_T_NUM:
516 RS_LOG_ASSERT(0, "Oops");
517 }
518 double geohash = calcGeoHash(lon, lat);
519 if (geohash == INVALID_GEOHASH) {
520 return REDISMODULE_ERR;
521 }
522 fdata->numeric = geohash;
523
524 if (FieldSpec_IsSortable(fs)) {
525 if (str) {
526 RSSortingVector_Put(aCtx->sv, fs->sortIdx, str, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
527 } else {
528 RSSortingVector_Put(aCtx->sv, fs->sortIdx, &fdata->numeric, RS_SORTABLE_NUM, 0);
529 }
530 }
531
532 return 0;
533 }
534
FIELD_PREPROCESSOR(tagPreprocessor)535 FIELD_PREPROCESSOR(tagPreprocessor) {
536 fdata->tags = TagIndex_Preprocess(fs->tagSep, fs->tagFlags, field);
537
538 if (fdata->tags == NULL) {
539 return 0;
540 }
541 if (FieldSpec_IsSortable(fs) && isSpecHash(aCtx->spec)) {
542 size_t fl;
543 const char *str = DocumentField_GetValueCStr(field, &fl);
544 RSSortingVector_Put(aCtx->sv, fs->sortIdx, str, RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
545 }
546 return 0;
547 }
548
FIELD_BULK_INDEXER(tagIndexer)549 FIELD_BULK_INDEXER(tagIndexer) {
550 TagIndex *tidx = bulk->indexDatas[IXFLDPOS_TAG];
551 if (!tidx) {
552 RedisModuleString *kname = IndexSpec_GetFormattedKey(ctx->spec, fs, INDEXFLD_T_TAG);
553 tidx = bulk->indexDatas[IXFLDPOS_TAG] =
554 TagIndex_Open(ctx, kname, 1, &bulk->indexKeys[IXFLDPOS_TAG]);
555 if (!tidx) {
556 QueryError_SetError(status, QUERY_EGENERIC, "Could not open tag index for indexing");
557 return -1;
558 }
559 }
560
561 ctx->spec->stats.invertedSize +=
562 TagIndex_Index(tidx, (const char **)fdata->tags, array_len(fdata->tags), aCtx->doc->docId);
563 ctx->spec->stats.numRecords++;
564 return 0;
565 }
566
567 static PreprocessorFunc preprocessorMap[] = {
568 // nl break
569 [IXFLDPOS_FULLTEXT] = fulltextPreprocessor,
570 [IXFLDPOS_NUMERIC] = numericPreprocessor,
571 [IXFLDPOS_GEO] = geoPreprocessor,
572 [IXFLDPOS_TAG] = tagPreprocessor};
573
IndexerBulkAdd(IndexBulkData * bulk,RSAddDocumentCtx * cur,RedisSearchCtx * sctx,const DocumentField * field,const FieldSpec * fs,FieldIndexerData * fdata,QueryError * status)574 int IndexerBulkAdd(IndexBulkData *bulk, RSAddDocumentCtx *cur, RedisSearchCtx *sctx,
575 const DocumentField *field, const FieldSpec *fs, FieldIndexerData *fdata,
576 QueryError *status) {
577 int rc = 0;
578 for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES && rc == 0; ++ii) {
579 // see which types are supported in the current field...
580 if (field->indexAs & INDEXTYPE_FROM_POS(ii)) {
581 switch (ii) {
582 case IXFLDPOS_TAG:
583 rc = tagIndexer(bulk, cur, sctx, field, fs, fdata, status);
584 break;
585 case IXFLDPOS_NUMERIC:
586 case IXFLDPOS_GEO:
587 rc = numericIndexer(bulk, cur, sctx, field, fs, fdata, status);
588 break;
589 case IXFLDPOS_FULLTEXT:
590 break;
591 default:
592 rc = -1;
593 QueryError_SetError(status, QUERY_EINVAL, "BUG: invalid index type");
594 break;
595 }
596 }
597 }
598 return rc;
599 }
600
IndexerBulkCleanup(IndexBulkData * cur,RedisSearchCtx * sctx)601 void IndexerBulkCleanup(IndexBulkData *cur, RedisSearchCtx *sctx) {
602 for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES; ++ii) {
603 if (cur->indexKeys[ii]) {
604 RedisModule_CloseKey(cur->indexKeys[ii]);
605 }
606 }
607 }
608
Document_AddToIndexes(RSAddDocumentCtx * aCtx)609 int Document_AddToIndexes(RSAddDocumentCtx *aCtx) {
610 Document *doc = aCtx->doc;
611 int ourRv = REDISMODULE_OK;
612
613 for (size_t i = 0; i < doc->numFields; i++) {
614 const FieldSpec *fs = aCtx->fspecs + i;
615 const DocumentField *ff = doc->fields + i;
616 FieldIndexerData *fdata = aCtx->fdatas + i;
617
618 for (size_t ii = 0; ii < INDEXFLD_NUM_TYPES; ++ii) {
619 if (!FIELD_CHKIDX(ff->indexAs, INDEXTYPE_FROM_POS(ii))) {
620 continue;
621 }
622
623 PreprocessorFunc pp = preprocessorMap[ii];
624 if (pp(aCtx, &doc->fields[i], fs, fdata, &aCtx->status) != 0) {
625 if (!AddDocumentCtx_IsBlockable(aCtx)) {
626 ++aCtx->spec->stats.indexingFailures;
627 } else {
628 RedisModule_ThreadSafeContextLock(RSDummyContext);
629 IndexSpec *spec = IndexSpec_Load(RSDummyContext, aCtx->specName, 0);
630 if (spec && aCtx->specId == spec->uniqueId) {
631 ++spec->stats.indexingFailures;
632 }
633 RedisModule_ThreadSafeContextUnlock(RSDummyContext);
634 }
635 ourRv = REDISMODULE_ERR;
636 goto cleanup;
637 }
638 }
639 }
640
641 if (Indexer_Add(aCtx->indexer, aCtx) != 0) {
642 ourRv = REDISMODULE_ERR;
643 goto cleanup;
644 }
645
646 cleanup:
647 if (ourRv != REDISMODULE_OK) {
648 // if a document did not load properly, it is deleted
649 // to prevent mismatch of index and hash
650 DocTable_DeleteR(&aCtx->spec->docs, doc->docKey);
651
652 QueryError_SetCode(&aCtx->status, QUERY_EGENERIC);
653 AddDocumentCtx_Finish(aCtx);
654 }
655 return ourRv;
656 }
657
658 /* Evaluate an IF expression (e.g. IF "@foo == 'bar'") against a document, by getting the properties
659 * from the sorting table or from the hash representation of the document.
660 *
661 * NOTE: This is disconnected from the document indexing flow, and loads the document and discards
662 * of it internally
663 *
664 * Returns REDISMODULE_ERR on failure, OK otherwise*/
Document_EvalExpression(RedisSearchCtx * sctx,RedisModuleString * key,const char * expr,int * result,QueryError * status)665 int Document_EvalExpression(RedisSearchCtx *sctx, RedisModuleString *key, const char *expr,
666 int *result, QueryError *status) {
667
668 int rc = REDISMODULE_ERR;
669 const RSDocumentMetadata *dmd = DocTable_GetByKeyR(&sctx->spec->docs, key);
670 if (!dmd) {
671 // We don't know the document...
672 QueryError_SetError(status, QUERY_ENODOC, "");
673 return REDISMODULE_ERR;
674 }
675
676 // Try to parser the expression first, fail if we can't
677 RSExpr *e = ExprAST_Parse(expr, strlen(expr), status);
678 if (!e) {
679 return REDISMODULE_ERR;
680 }
681
682 if (QueryError_HasError(status)) {
683 RSExpr_Free(e);
684 return REDISMODULE_ERR;
685 }
686
687 RLookup lookup_s;
688 RLookupRow row = {0};
689 IndexSpecCache *spcache = IndexSpec_GetSpecCache(sctx->spec);
690 RLookup_Init(&lookup_s, spcache);
691 if (ExprAST_GetLookupKeys(e, &lookup_s, status) == EXPR_EVAL_ERR) {
692 goto done;
693 }
694
695 RLookupLoadOptions loadopts = {.sctx = sctx, .dmd = dmd, .status = status};
696 if (RLookup_LoadDocument(&lookup_s, &row, &loadopts) != REDISMODULE_OK) {
697 // printf("Couldn't load document!\n");
698 goto done;
699 }
700
701 ExprEval evaluator = {.err = status, .lookup = &lookup_s, .res = NULL, .srcrow = &row, .root = e};
702 RSValue rv = RSVALUE_STATIC;
703 if (ExprEval_Eval(&evaluator, &rv) != EXPR_EVAL_OK) {
704 // printf("Eval not OK!!! SAD!!\n");
705 goto done;
706 }
707
708 *result = RSValue_BoolTest(&rv);
709 RSValue_Clear(&rv);
710 rc = REDISMODULE_OK;
711
712 // Clean up:
713 done:
714 if (e) {
715 ExprAST_Free(e);
716 }
717 RLookupRow_Cleanup(&row);
718 RLookup_Cleanup(&lookup_s);
719 return rc;
720 }
721
AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx * aCtx,RedisSearchCtx * sctx)722 static void AddDocumentCtx_UpdateNoIndex(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
723 #define BAIL(s) \
724 do { \
725 QueryError_SetError(&aCtx->status, QUERY_EGENERIC, s); \
726 goto done; \
727 } while (0);
728
729 Document *doc = aCtx->doc;
730 t_docId docId = DocTable_GetIdR(&sctx->spec->docs, doc->docKey);
731 if (docId == 0) {
732 BAIL("Couldn't load old document");
733 }
734 RSDocumentMetadata *md = DocTable_Get(&sctx->spec->docs, docId);
735 if (!md) {
736 BAIL("Couldn't load document metadata");
737 }
738
739 // Update the score
740 md->score = doc->score;
741 // Set the payload if needed
742 if (doc->payload) {
743 DocTable_SetPayload(&sctx->spec->docs, md, doc->payload, doc->payloadSize);
744 }
745
746 if (aCtx->stateFlags & ACTX_F_SORTABLES) {
747 FieldSpecDedupeArray dedupes = {0};
748 // Update sortables if needed
749 for (int i = 0; i < doc->numFields; i++) {
750 DocumentField *f = &doc->fields[i];
751 const FieldSpec *fs = IndexSpec_GetField(sctx->spec, f->name, strlen(f->name));
752 if (fs == NULL || !FieldSpec_IsSortable(fs)) {
753 continue;
754 }
755
756 if (dedupes[fs->index]) {
757 BAIL(DUP_FIELD_ERRSTR);
758 }
759
760 dedupes[fs->index] = 1;
761
762 int idx = IndexSpec_GetFieldSortingIndex(sctx->spec, f->name, strlen(f->name));
763 if (idx < 0) continue;
764
765 if (!md->sortVector) {
766 md->sortVector = NewSortingVector(sctx->spec->sortables->len);
767 }
768
769 RS_LOG_ASSERT((fs->options & FieldSpec_Dynamic) == 0, "Dynamic field cannot use PARTIAL");
770
771 switch (fs->types) {
772 case INDEXFLD_T_FULLTEXT:
773 case INDEXFLD_T_TAG:
774 case INDEXFLD_T_GEO:
775 RSSortingVector_Put(md->sortVector, idx, (void *)RedisModule_StringPtrLen(f->text, NULL),
776 RS_SORTABLE_STR, fs->options & FieldSpec_UNF);
777 break;
778 case INDEXFLD_T_NUMERIC: {
779 double numval;
780 if (RedisModule_StringToDouble(f->text, &numval) == REDISMODULE_ERR) {
781 BAIL("Could not parse numeric index value");
782 }
783 RSSortingVector_Put(md->sortVector, idx, &numval, RS_SORTABLE_NUM, 0);
784 break;
785 }
786 default:
787 BAIL("Unsupported sortable type");
788 break;
789 }
790 }
791 }
792
793 done:
794 if (aCtx->donecb) {
795 aCtx->donecb(aCtx, sctx->redisCtx, aCtx->donecbData);
796 }
797 AddDocumentCtx_Free(aCtx);
798 }
799
Document_GetField(Document * d,const char * fieldName)800 DocumentField *Document_GetField(Document *d, const char *fieldName) {
801 if (!d || !fieldName) return NULL;
802
803 for (int i = 0; i < d->numFields; i++) {
804 if (!strcasecmp(d->fields[i].name, fieldName)) {
805 return &d->fields[i];
806 }
807 }
808 return NULL;
809 }
810
DocumentField_GetValueCStr(const DocumentField * df,size_t * len)811 const char *DocumentField_GetValueCStr(const DocumentField *df, size_t *len) {
812 *len = 0;
813 switch (df->unionType) {
814 case FLD_VAR_T_RMS:
815 return RedisModule_StringPtrLen(df->text, len);
816 case FLD_VAR_T_CSTR:
817 *len = df->strlen;
818 return df->strval;
819 case FLD_VAR_T_NUM:
820 case FLD_VAR_T_GEO:
821 case FLD_VAR_T_ARRAY:
822 RS_LOG_ASSERT(0, "invalid types");
823 }
824 return NULL;
825 }
826