1 #include "doc_table.h"
2 #include <sys/param.h>
3 #include <string.h>
4 #include <stdio.h>
5 #include "redismodule.h"
6 #include "util/fnv.h"
7 #include "dep/triemap/triemap.h"
8 #include "sortable.h"
9 #include "rmalloc.h"
10 #include "spec.h"
11 #include "config.h"
12 #include "rmutil/rm_assert.h"
13 
14 /* Creates a new DocTable with a given capacity */
NewDocTable(size_t cap,size_t max_size)15 DocTable NewDocTable(size_t cap, size_t max_size) {
16   DocTable ret = {
17       .size = 1,
18       .cap = cap,
19       .maxDocId = 0,
20       .memsize = 0,
21       .sortablesSize = 0,
22       .maxSize = max_size,
23       .dim = NewDocIdMap(),
24   };
25   ret.buckets = rm_calloc(cap, sizeof(*ret.buckets));
26   return ret;
27 }
28 
DocTable_GetBucket(const DocTable * t,t_docId docId)29 static inline uint32_t DocTable_GetBucket(const DocTable *t, t_docId docId) {
30   return docId < t->maxSize ? docId : docId % t->maxSize;
31 }
32 
DocTable_ValidateDocId(const DocTable * t,t_docId docId)33 static inline int DocTable_ValidateDocId(const DocTable *t, t_docId docId) {
34   return docId != 0 && docId <= t->maxDocId;
35 }
36 
DocTable_Get(const DocTable * t,t_docId docId)37 RSDocumentMetadata *DocTable_Get(const DocTable *t, t_docId docId) {
38   if (!DocTable_ValidateDocId(t, docId)) {
39     return NULL;
40   }
41   uint32_t bucketIndex = DocTable_GetBucket(t, docId);
42   if (bucketIndex >= t->cap) {
43     return NULL;
44   }
45   DMDChain *dmdChain = &t->buckets[bucketIndex];
46   DLLIST2_FOREACH(it, &dmdChain->lroot) {
47     RSDocumentMetadata *dmd = DLLIST2_ITEM(it, RSDocumentMetadata, llnode);
48     if (dmd->id == docId) {
49       return dmd;
50     }
51   }
52   return NULL;
53 }
54 
DocTable_Exists(const DocTable * t,t_docId docId)55 int DocTable_Exists(const DocTable *t, t_docId docId) {
56   if (!docId || docId > t->maxDocId) {
57     return 0;
58   }
59   uint32_t ix = DocTable_GetBucket(t, docId);
60   if (ix >= t->cap) {
61     return 0;
62   }
63   const DMDChain *chain = t->buckets + ix;
64   if (chain == NULL) {
65     return 0;
66   }
67   DLLIST2_FOREACH(it, &chain->lroot) {
68     const RSDocumentMetadata *md = DLLIST2_ITEM(it, RSDocumentMetadata, llnode);
69     if (md->id == docId && !(md->flags & Document_Deleted)) {
70       return 1;
71     }
72   }
73   return 0;
74 }
75 
DocTable_GetByKeyR(const DocTable * t,RedisModuleString * s)76 RSDocumentMetadata *DocTable_GetByKeyR(const DocTable *t, RedisModuleString *s) {
77   const char *kstr;
78   size_t klen;
79   kstr = RedisModule_StringPtrLen(s, &klen);
80   t_docId id = DocTable_GetId(t, kstr, klen);
81   return DocTable_Get(t, id);
82 }
83 
DocTable_Set(DocTable * t,t_docId docId,RSDocumentMetadata * dmd)84 static inline void DocTable_Set(DocTable *t, t_docId docId, RSDocumentMetadata *dmd) {
85   uint32_t bucket = DocTable_GetBucket(t, docId);
86   if (bucket >= t->cap && t->cap < t->maxSize) {
87     /* We have to grow the array capacity.
88      * We only grow till we reach maxSize, then we starts to add the dmds to
89      * the already existing chains.
90      */
91     size_t oldcap = t->cap;
92     // We grow by half of the current capacity with maximum of 1m
93     t->cap += 1 + (t->cap ? MIN(t->cap / 2, 1024 * 1024) : 1);
94     t->cap = MIN(t->cap, t->maxSize);  // make sure we do not excised maxSize
95     t->cap = MAX(t->cap, bucket + 1);  // docs[bucket] needs to be valid, so t->cap > bucket
96     t->buckets = rm_realloc(t->buckets, t->cap * sizeof(DMDChain));
97 
98     // We clear new extra allocation to Null all list pointers
99     size_t memsetSize = (t->cap - oldcap) * sizeof(DMDChain);
100     memset(&t->buckets[oldcap], 0, memsetSize);
101   }
102 
103   DMDChain *chain = &t->buckets[bucket];
104   DMD_Incref(dmd);
105 
106   // Adding the dmd to the chain
107   dllist2_append(&chain->lroot, &dmd->llnode);
108 }
109 
110 /** Get the docId of a key if it exists in the table, or 0 if it doesnt */
DocTable_GetId(const DocTable * dt,const char * s,size_t n)111 t_docId DocTable_GetId(const DocTable *dt, const char *s, size_t n) {
112   return DocIdMap_Get(&dt->dim, s, n);
113 }
114 
115 /* Set the payload for a document. Returns 1 if we set the payload, 0 if we couldn't find the
116  * document */
DocTable_SetPayload(DocTable * t,RSDocumentMetadata * dmd,const char * data,size_t len)117 int DocTable_SetPayload(DocTable *t, RSDocumentMetadata *dmd, const char *data, size_t len) {
118   /* Get the metadata */
119   if (!dmd || !data) {
120     return 0;
121   }
122 
123   /* If we already have metadata - clean up the old data */
124   if (dmd->payload) {
125     /* Free the old payload */
126     if (dmd->payload->data) {
127       rm_free((void *)dmd->payload->data);
128     }
129     t->memsize -= dmd->payload->len;
130   } else {
131     dmd->payload = rm_malloc(sizeof(RSPayload));
132   }
133   /* Copy it... */
134   dmd->payload->data = rm_calloc(1, len + 1);
135   dmd->payload->len = len;
136   memcpy(dmd->payload->data, data, len);
137 
138   dmd->flags |= Document_HasPayload;
139   t->memsize += len;
140   return 1;
141 }
142 
143 /* Set the sorting vector for a document. If the vector is NULL we mark the doc as not having a
144  * vector. Returns 1 on success, 0 if the document does not exist. No further validation is done
145  */
DocTable_SetSortingVector(DocTable * t,RSDocumentMetadata * dmd,RSSortingVector * v)146 int DocTable_SetSortingVector(DocTable *t, RSDocumentMetadata *dmd, RSSortingVector *v) {
147   if (!dmd) {
148     return 0;
149   }
150 
151   // LCOV_EXCL_START
152   /* Null vector means remove the current vector if it exists */
153   /*if (!v) {
154     if (dmd->sortVector) {
155       SortingVector_Free(dmd->sortVector);
156     }
157     dmd->sortVector = NULL;
158     dmd->flags &= ~Document_HasSortVector;
159     return 1;
160   }*/
161   // LCOV_EXCL_STOP
162   RS_LOG_ASSERT(v, "Sorting vector does not exist");  // tested in doAssignIds()
163 
164   /* Set th new vector and the flags accordingly */
165   dmd->sortVector = v;
166   dmd->flags |= Document_HasSortVector;
167   t->sortablesSize += RSSortingVector_GetMemorySize(v);
168 
169   return 1;
170 }
171 
DocTable_SetByteOffsets(DocTable * t,RSDocumentMetadata * dmd,RSByteOffsets * v)172 int DocTable_SetByteOffsets(DocTable *t, RSDocumentMetadata *dmd, RSByteOffsets *v) {
173   if (!dmd) {
174     return 0;
175   }
176 
177   dmd->byteOffsets = v;
178   dmd->flags |= Document_HasOffsetVector;
179   return 1;
180 }
181 
182 /* Put a new document into the table, assign it an incremental id and store the metadata in the
183  * table.
184  *
185  * Return 0 if the document is already in the index  */
DocTable_Put(DocTable * t,const char * s,size_t n,double score,RSDocumentFlags flags,const char * payload,size_t payloadSize,DocumentType type)186 RSDocumentMetadata *DocTable_Put(DocTable *t, const char *s, size_t n, double score, RSDocumentFlags flags,
187                                  const char *payload, size_t payloadSize, DocumentType type) {
188 
189   t_docId xid = DocIdMap_Get(&t->dim, s, n);
190   // if the document is already in the index, return 0
191   if (xid) {
192     return DocTable_Get(t, xid);
193   }
194   t_docId docId = ++t->maxDocId;
195 
196   /* Copy the payload since it's probably an input string not retained */
197   RSPayload *dpl = NULL;
198   if (payload && payloadSize) {
199 
200     dpl = rm_malloc(sizeof(RSPayload));
201     dpl->data = rm_calloc(1, payloadSize + 1);
202     memcpy(dpl->data, payload, payloadSize);
203     dpl->len = payloadSize;
204     flags |= Document_HasPayload;
205     t->memsize += payloadSize + sizeof(RSPayload);
206   }
207 
208   sds keyPtr = sdsnewlen(s, n);
209 
210   RSDocumentMetadata *dmd = rm_calloc(1, sizeof(*dmd));
211   dmd->keyPtr = keyPtr;
212   dmd->score = score;
213   dmd->flags = flags;
214   dmd->payload = dpl;
215   dmd->maxFreq = 1;
216   dmd->id = docId;
217   dmd->sortVector = NULL;
218   dmd->type = type;
219 
220   DocTable_Set(t, docId, dmd);
221   ++t->size;
222   t->memsize += sizeof(RSDocumentMetadata) + sdsAllocSize(keyPtr);
223   DocIdMap_Put(&t->dim, s, n, docId);
224   return dmd;
225 }
226 
DocTable_GetPayload(DocTable * t,t_docId docId)227 RSPayload *DocTable_GetPayload(DocTable *t, t_docId docId) {
228   RSDocumentMetadata *dmd = DocTable_Get(t, docId);
229   return dmd ? dmd->payload : NULL;
230 }
231 
232 /* Get the "real" external key for an incremental id. Returns NULL if docId is not in the table.
233  */
DocTable_GetKey(DocTable * t,t_docId docId,size_t * lenp)234 const char *DocTable_GetKey(DocTable *t, t_docId docId, size_t *lenp) {
235   size_t len_s = 0;
236   if (!lenp) {
237     lenp = &len_s;
238   }
239 
240   RSDocumentMetadata *dmd = DocTable_Get(t, docId);
241   if (!dmd) {
242     *lenp = 0;
243     return NULL;
244   }
245   *lenp = sdslen(dmd->keyPtr);
246   return dmd->keyPtr;
247 }
248 
249 /* Get the score for a document from the table. Returns 0 if docId is not in the table. */
DocTable_GetScore(DocTable * t,t_docId docId)250 inline float DocTable_GetScore(DocTable *t, t_docId docId) {
251   RSDocumentMetadata *dmd = DocTable_Get(t, docId);
252   return dmd ? dmd->score : 0;
253 }
254 
DMD_Free(RSDocumentMetadata * md)255 void DMD_Free(RSDocumentMetadata *md) {
256   if (md->payload) {
257     rm_free(md->payload->data);
258     rm_free(md->payload);
259     md->flags &= ~Document_HasPayload;
260     md->payload = NULL;
261   }
262   if (md->sortVector) {
263     SortingVector_Free(md->sortVector);
264     md->sortVector = NULL;
265     md->flags &= ~Document_HasSortVector;
266   }
267   if (md->byteOffsets) {
268     RSByteOffsets_Free(md->byteOffsets);
269     md->byteOffsets = NULL;
270     md->flags &= ~Document_HasOffsetVector;
271   }
272   sdsfree(md->keyPtr);
273   rm_free(md);
274 }
275 
DocTable_Free(DocTable * t)276 void DocTable_Free(DocTable *t) {
277   for (int i = 0; i < t->cap; ++i) {
278     DMDChain *chain = &t->buckets[i];
279     if (DLLIST2_IS_EMPTY(&chain->lroot)) {
280       continue;
281     }
282     DLLIST2_node *nn = chain->lroot.head;
283     while (nn) {
284       RSDocumentMetadata *md = DLLIST2_ITEM(nn, RSDocumentMetadata, llnode);
285       nn = nn->next;
286       DMD_Free(md);
287     }
288   }
289   rm_free(t->buckets);
290   DocIdMap_Free(&t->dim);
291 }
292 
DocTable_DmdUnchain(DocTable * t,RSDocumentMetadata * md)293 static void DocTable_DmdUnchain(DocTable *t, RSDocumentMetadata *md) {
294   uint32_t bucketIndex = DocTable_GetBucket(t, md->id);
295   DMDChain *dmdChain = &t->buckets[bucketIndex];
296   dllist2_delete(&dmdChain->lroot, &md->llnode);
297 }
298 
DocTable_Delete(DocTable * t,const char * s,size_t n)299 int DocTable_Delete(DocTable *t, const char *s, size_t n) {
300   RSDocumentMetadata *md = DocTable_Pop(t, s, n);
301   if (md) {
302     DMD_Decref(md);
303     return 1;
304   }
305   return 0;
306 }
307 
DocTable_Pop(DocTable * t,const char * s,size_t n)308 RSDocumentMetadata *DocTable_Pop(DocTable *t, const char *s, size_t n) {
309   t_docId docId = DocIdMap_Get(&t->dim, s, n);
310 
311   if (docId && docId <= t->maxDocId) {
312 
313     RSDocumentMetadata *md = DocTable_Get(t, docId);
314     if (!md) {
315       return NULL;
316     }
317 
318     md->flags |= Document_Deleted;
319 
320     t->memsize -= sizeof(RSDocumentMetadata) + sdsAllocSize(md->keyPtr);
321     if (md->payload) {
322       t->memsize -= md->payload->len + sizeof(RSPayload);
323     }
324     if (md->sortVector) {
325       t->sortablesSize -= RSSortingVector_GetMemorySize(md->sortVector);
326     }
327 
328     DocTable_DmdUnchain(t, md);
329     DocIdMap_Delete(&t->dim, s, n);
330     --t->size;
331 
332     return md;
333   }
334   return NULL;
335 }
336 
DocTable_Replace(DocTable * t,const char * from_str,size_t from_len,const char * to_str,size_t to_len)337 int DocTable_Replace(DocTable *t, const char *from_str, size_t from_len, const char *to_str,
338                      size_t to_len) {
339   t_docId id = DocIdMap_Get(&t->dim, from_str, from_len);
340   if (id == 0) {
341     return REDISMODULE_ERR;
342   }
343   DocIdMap_Delete(&t->dim, from_str, from_len);
344   DocIdMap_Put(&t->dim, to_str, to_len, id);
345   RSDocumentMetadata *dmd = DocTable_Get(t, id);
346   sdsfree(dmd->keyPtr);
347   dmd->keyPtr = sdsnewlen(to_str, to_len);
348   return REDISMODULE_OK;
349 }
350 
DocTable_RdbSave(DocTable * t,RedisModuleIO * rdb)351 void DocTable_RdbSave(DocTable *t, RedisModuleIO *rdb) {
352 
353   RedisModule_SaveUnsigned(rdb, t->size);
354 
355   uint32_t elements_written = 0;
356   for (uint32_t i = 0; i < t->cap; ++i) {
357     if (DLLIST2_IS_EMPTY(&t->buckets[i].lroot)) {
358       continue;
359     }
360     DLLIST2_FOREACH(it, &t->buckets[i].lroot) {
361       const RSDocumentMetadata *dmd = DLLIST2_ITEM(it, RSDocumentMetadata, llnode);
362       RedisModule_SaveStringBuffer(rdb, dmd->keyPtr, sdslen(dmd->keyPtr));
363       RedisModule_SaveUnsigned(rdb, dmd->flags);
364       RedisModule_SaveUnsigned(rdb, dmd->maxFreq);
365       RedisModule_SaveUnsigned(rdb, dmd->len);
366       RedisModule_SaveFloat(rdb, dmd->score);
367       if (dmd->flags & Document_HasPayload) {
368         if (dmd->payload) {
369           // save an extra space for the null terminator to make the payload null terminated on
370           RedisModule_SaveStringBuffer(rdb, dmd->payload->data, dmd->payload->len + 1);
371         } else {
372           RedisModule_SaveStringBuffer(rdb, "", 1);
373         }
374       }
375 
376       //      if (dmd->flags & Document_HasSortVector) {
377       //        SortingVector_RdbSave(rdb, dmd->sortVector);
378       //      }
379 
380       if (dmd->flags & Document_HasOffsetVector) {
381         Buffer tmp;
382         Buffer_Init(&tmp, 16);
383         RSByteOffsets_Serialize(dmd->byteOffsets, &tmp);
384         RedisModule_SaveStringBuffer(rdb, tmp.data, tmp.offset);
385         Buffer_Free(&tmp);
386       }
387       ++elements_written;
388     }
389   }
390   RS_LOG_ASSERT((elements_written + 1 == t->size), "Wrong number of written elements");
391 }
392 
DocTable_LegacyRdbLoad(DocTable * t,RedisModuleIO * rdb,int encver)393 void DocTable_LegacyRdbLoad(DocTable *t, RedisModuleIO *rdb, int encver) {
394   long long deletedElements = 0;
395   t->size = RedisModule_LoadUnsigned(rdb);
396   t->maxDocId = RedisModule_LoadUnsigned(rdb);
397   if (encver >= INDEX_MIN_COMPACTED_DOCTABLE_VERSION) {
398     t->maxSize = RedisModule_LoadUnsigned(rdb);
399   } else {
400     t->maxSize = MIN(RSGlobalConfig.maxDocTableSize, t->maxDocId);
401   }
402 
403   if (t->maxDocId > t->maxSize) {
404     /**
405      * If the maximum doc id is greater than the maximum cap size
406      * then it means there is a possibility that any index under maxId can
407      * be accessed. However, it is possible that this bucket does not have
408      * any documents inside it (and thus might not be populated below), but
409      * could still be accessed for simple queries (e.g. get, exist). Ensure
410      * we don't have to rely on Set/Put to ensure the doc table array.
411      */
412     t->cap = t->maxSize;
413     rm_free(t->buckets);
414     t->buckets = rm_calloc(t->cap, sizeof(*t->buckets));
415   }
416 
417   for (size_t i = 1; i < t->size; i++) {
418     size_t len;
419 
420     RSDocumentMetadata *dmd = rm_calloc(1, sizeof(RSDocumentMetadata));
421     char *tmpPtr = RedisModule_LoadStringBuffer(rdb, &len);
422     if (encver < INDEX_MIN_BINKEYS_VERSION) {
423       // Previous versions would encode the NUL byte
424       len--;
425     }
426     dmd->id = encver < INDEX_MIN_COMPACTED_DOCTABLE_VERSION ? i : RedisModule_LoadUnsigned(rdb);
427     dmd->keyPtr = sdsnewlen(tmpPtr, len);
428     RedisModule_Free(tmpPtr);
429 
430     dmd->flags = RedisModule_LoadUnsigned(rdb);
431     dmd->maxFreq = 1;
432     dmd->len = 1;
433     if (encver > 1) {
434       dmd->maxFreq = RedisModule_LoadUnsigned(rdb);
435     }
436     if (encver >= INDEX_MIN_DOCLEN_VERSION) {
437       dmd->len = RedisModule_LoadUnsigned(rdb);
438     } else {
439       // In older versions, default the len to max freq to avoid division by zero.
440       dmd->len = dmd->maxFreq;
441     }
442 
443     dmd->score = RedisModule_LoadFloat(rdb);
444     dmd->payload = NULL;
445     // read payload if set
446     if ((dmd->flags & Document_HasPayload)) {
447       if (!(dmd->flags & Document_Deleted)) {
448         dmd->payload = rm_malloc(sizeof(RSPayload));
449         dmd->payload->data = RedisModule_LoadStringBuffer(rdb, &dmd->payload->len);
450         char *buf = rm_malloc(dmd->payload->len);
451         memcpy(buf, dmd->payload->data, dmd->payload->len);
452         RedisModule_Free(dmd->payload->data);
453         dmd->payload->data = buf;
454         dmd->payload->len--;
455         t->memsize += dmd->payload->len + sizeof(RSPayload);
456       } else if ((dmd->flags & Document_Deleted) && (encver == INDEX_MIN_EXPIRE_VERSION)) {
457         RedisModule_Free(RedisModule_LoadStringBuffer(rdb, NULL));  // throw this string to garbage
458       }
459     }
460     dmd->sortVector = NULL;
461     if (dmd->flags & Document_HasSortVector) {
462       dmd->sortVector = SortingVector_RdbLoad(rdb, encver);
463       t->sortablesSize += RSSortingVector_GetMemorySize(dmd->sortVector);
464     }
465 
466     if (dmd->flags & Document_HasOffsetVector) {
467       size_t nTmp = 0;
468       char *tmp = RedisModule_LoadStringBuffer(rdb, &nTmp);
469       Buffer *bufTmp = Buffer_Wrap(tmp, nTmp);
470       dmd->byteOffsets = LoadByteOffsets(bufTmp);
471       rm_free(bufTmp);
472       RedisModule_Free(tmp);
473     }
474 
475     if (dmd->flags & Document_Deleted) {
476       ++deletedElements;
477       DMD_Free(dmd);
478     } else {
479       DocIdMap_Put(&t->dim, dmd->keyPtr, sdslen(dmd->keyPtr), dmd->id);
480       DocTable_Set(t, dmd->id, dmd);
481       t->memsize += sizeof(RSDocumentMetadata) + len;
482     }
483   }
484   t->size -= deletedElements;
485 }
486 
DocTable_RdbLoad(DocTable * t,RedisModuleIO * rdb,int encver)487 void DocTable_RdbLoad(DocTable *t, RedisModuleIO *rdb, int encver) {
488   long long deletedElements = 0;
489   size_t size = RedisModule_LoadUnsigned(rdb);
490   //  t->maxDocId = RedisModule_LoadUnsigned(rdb);
491   //  if (encver >= INDEX_MIN_COMPACTED_DOCTABLE_VERSION) {
492   //    t->maxSize = RedisModule_LoadUnsigned(rdb);
493   //  } else {
494   //    t->maxSize = MIN(RSGlobalConfig.maxDocTableSize, t->maxDocId);
495   //  }
496 
497   //  if (t->maxDocId > t->maxSize) {
498   //    /**
499   //     * If the maximum doc id is greater than the maximum cap size
500   //     * then it means there is a possibility that any index under maxId can
501   //     * be accessed. However, it is possible that this bucket does not have
502   //     * any documents inside it (and thus might not be populated below), but
503   //     * could still be accessed for simple queries (e.g. get, exist). Ensure
504   //     * we don't have to rely on Set/Put to ensure the doc table array.
505   //     */
506   //    t->cap = t->maxSize;
507   //    rm_free(t->buckets);
508   //    t->buckets = rm_calloc(t->cap, sizeof(*t->buckets));
509   //  }
510 
511   for (size_t i = 1; i < size; i++) {
512     size_t len;
513 
514     RSDocumentMetadata *dmd = rm_calloc(1, sizeof(RSDocumentMetadata));
515     char *tmpPtr = RedisModule_LoadStringBuffer(rdb, &len);
516     if (encver < INDEX_MIN_BINKEYS_VERSION) {
517       // Previous versions would encode the NUL byte
518       len--;
519     }
520     //    dmd->id = encver < INDEX_MIN_COMPACTED_DOCTABLE_VERSION ? i :
521     //    RedisModule_LoadUnsigned(rdb);
522     dmd->keyPtr = sdsnewlen(tmpPtr, len);
523     RedisModule_Free(tmpPtr);
524 
525     dmd->flags = RedisModule_LoadUnsigned(rdb);
526     dmd->maxFreq = 1;
527     dmd->len = 1;
528     if (encver > 1) {
529       dmd->maxFreq = RedisModule_LoadUnsigned(rdb);
530     }
531     if (encver >= INDEX_MIN_DOCLEN_VERSION) {
532       dmd->len = RedisModule_LoadUnsigned(rdb);
533     } else {
534       // In older versions, default the len to max freq to avoid division by zero.
535       dmd->len = dmd->maxFreq;
536     }
537 
538     dmd->score = RedisModule_LoadFloat(rdb);
539     dmd->payload = NULL;
540     // read payload if set
541     if ((dmd->flags & Document_HasPayload)) {
542       if (!(dmd->flags & Document_Deleted)) {
543         dmd->payload = rm_malloc(sizeof(RSPayload));
544         dmd->payload->data = RedisModule_LoadStringBuffer(rdb, &dmd->payload->len);
545         char *buf = rm_malloc(dmd->payload->len);
546         memcpy(buf, dmd->payload->data, dmd->payload->len);
547         RedisModule_Free(dmd->payload->data);
548         dmd->payload->data = buf;
549         dmd->payload->len--;
550         t->memsize += dmd->payload->len + sizeof(RSPayload);
551       } else if ((dmd->flags & Document_Deleted) && (encver == INDEX_MIN_EXPIRE_VERSION)) {
552         RedisModule_Free(RedisModule_LoadStringBuffer(rdb, NULL));  // throw this string to garbage
553       }
554     }
555     dmd->sortVector = NULL;
556     //    if (dmd->flags & Document_HasSortVector) {
557     //      dmd->sortVector = SortingVector_RdbLoad(rdb, encver);
558     //      t->sortablesSize += RSSortingVector_GetMemorySize(dmd->sortVector);
559     //    }
560 
561     if (dmd->flags & Document_HasOffsetVector) {
562       size_t nTmp = 0;
563       char *tmp = RedisModule_LoadStringBuffer(rdb, &nTmp);
564       Buffer *bufTmp = Buffer_Wrap(tmp, nTmp);
565       dmd->byteOffsets = LoadByteOffsets(bufTmp);
566       rm_free(bufTmp);
567       RedisModule_Free(tmp);
568     }
569 
570     if (dmd->flags & Document_Deleted) {
571       DMD_Free(dmd);
572     } else {
573       RedisModuleString *keyRedisStr =
574           RedisModule_CreateString(NULL, dmd->keyPtr, sdslen(dmd->keyPtr));
575       RedisModule_FreeString(NULL, keyRedisStr);
576       //      DocIdMap_Put(&t->dim, dmd->keyPtr, sdslen(dmd->keyPtr), dmd->id);
577       //      DocTable_Set(t, dmd->id, dmd);
578       //      t->memsize += sizeof(RSDocumentMetadata) + len;
579     }
580   }
581 }
582 
NewDocIdMap()583 DocIdMap NewDocIdMap() {
584 
585   TrieMap *m = NewTrieMap();
586   return (DocIdMap){m};
587 }
588 
DocIdMap_Get(const DocIdMap * m,const char * s,size_t n)589 t_docId DocIdMap_Get(const DocIdMap *m, const char *s, size_t n) {
590 
591   void *val = TrieMap_Find(m->tm, (char *)s, n);
592   if (val && val != TRIEMAP_NOTFOUND) {
593     return *((t_docId *)val);
594   }
595   return 0;
596 }
597 
_docIdMap_replace(void * oldval,void * newval)598 void *_docIdMap_replace(void *oldval, void *newval) {
599   if (oldval) {
600     rm_free(oldval);
601   }
602   return newval;
603 }
604 
DocIdMap_Put(DocIdMap * m,const char * s,size_t n,t_docId docId)605 void DocIdMap_Put(DocIdMap *m, const char *s, size_t n, t_docId docId) {
606 
607   t_docId *pd = rm_malloc(sizeof(t_docId));
608   *pd = docId;
609   TrieMap_Add(m->tm, (char *)s, n, pd, _docIdMap_replace);
610 }
611 
DocIdMap_Free(DocIdMap * m)612 void DocIdMap_Free(DocIdMap *m) {
613   TrieMap_Free(m->tm, rm_free);
614 }
615 
DocIdMap_Delete(DocIdMap * m,const char * s,size_t n)616 int DocIdMap_Delete(DocIdMap *m, const char *s, size_t n) {
617   return TrieMap_Delete(m->tm, (char *)s, n, rm_free);
618 }
619