1 #include "fork_gc.h"
2 #include "util/arr.h"
3 #include "search_ctx.h"
4 #include "inverted_index.h"
5 #include "redis_index.h"
6 #include "numeric_index.h"
7 #include "tag_index.h"
8 #include "time_sample.h"
9 #include <stdlib.h>
10 #include <stdbool.h>
11 #include <unistd.h>
12 #include <sys/wait.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include "rwlock.h"
16 #include "util/khash.h"
17 #include <float.h>
18 #include "module.h"
19 #include "rmutil/rm_assert.h"
20 
21 #ifdef __linux__
22 #include <sys/prctl.h>
23 #endif
24 
25 #define GC_WRITERFD 1
26 #define GC_READERFD 0
27 
28 typedef enum {
29   // Terms have been collected
30   FGC_COLLECTED,
31   // No more terms remain
32   FGC_DONE,
33   // Pipe error, child probably crashed
34   FGC_CHILD_ERROR,
35   // Error on the parent
36   FGC_PARENT_ERROR
37 } FGCError;
38 
FGC_lock(ForkGC * gc,RedisModuleCtx * ctx)39 static int __attribute__((warn_unused_result)) FGC_lock(ForkGC *gc, RedisModuleCtx *ctx) {
40   if (gc->type == FGC_TYPE_NOKEYSPACE) {
41     RWLOCK_ACQUIRE_WRITE();
42     if (gc->deleting) {
43       RWLOCK_RELEASE();
44       return 0;
45     }
46   } else {
47     RedisModule_ThreadSafeContextLock(ctx);
48     if (gc->deleting) {
49       RedisModule_ThreadSafeContextUnlock(ctx);
50       return 0;
51     }
52   }
53   return 1;
54 }
55 
FGC_unlock(ForkGC * gc,RedisModuleCtx * ctx)56 static void FGC_unlock(ForkGC *gc, RedisModuleCtx *ctx) {
57   if (gc->type == FGC_TYPE_NOKEYSPACE) {
58     RWLOCK_RELEASE();
59   } else {
60     RedisModule_ThreadSafeContextUnlock(ctx);
61   }
62 }
63 
FGC_getSctx(ForkGC * gc,RedisModuleCtx * ctx)64 static RedisSearchCtx *FGC_getSctx(ForkGC *gc, RedisModuleCtx *ctx) {
65   RedisSearchCtx *sctx = NULL;
66   if (gc->type == FGC_TYPE_NOKEYSPACE) {
67     sctx = rm_malloc(sizeof(*sctx));
68     *sctx = (RedisSearchCtx)SEARCH_CTX_STATIC(ctx, gc->sp);
69   } else if (gc->type == FGC_TYPE_INKEYSPACE) {
70     sctx = NewSearchCtx(ctx, (RedisModuleString *)gc->keyName, false);
71   }
72   return sctx;
73 }
74 
FGC_updateStats(RedisSearchCtx * sctx,ForkGC * gc,size_t recordsRemoved,size_t bytesCollected)75 static void FGC_updateStats(RedisSearchCtx *sctx, ForkGC *gc, size_t recordsRemoved,
76                             size_t bytesCollected) {
77   sctx->spec->stats.numRecords -= recordsRemoved;
78   sctx->spec->stats.invertedSize -= bytesCollected;
79   gc->stats.totalCollected += bytesCollected;
80 }
81 
FGC_sendFixed(ForkGC * fgc,const void * buff,size_t len)82 static void FGC_sendFixed(ForkGC *fgc, const void *buff, size_t len) {
83   RS_LOG_ASSERT(len > 0, "buffer length cannot be 0");
84   ssize_t size = write(fgc->pipefd[GC_WRITERFD], buff, len);
85   if (size != len) {
86     perror("broken pipe, exiting GC fork: write() failed");
87     // just exit, do not abort(), which will trigger a watchdog on RLEC, causing adverse effects
88     RedisModule_Log(NULL, "warning", "GC fork: broken pipe, exiting");
89     exit(1);
90   }
91 }
92 
93 #define FGC_SEND_VAR(fgc, v) FGC_sendFixed(fgc, &v, sizeof v)
94 
FGC_sendBuffer(ForkGC * fgc,const void * buff,size_t len)95 static void FGC_sendBuffer(ForkGC *fgc, const void *buff, size_t len) {
96   FGC_SEND_VAR(fgc, len);
97   if (len > 0) {
98     FGC_sendFixed(fgc, buff, len);
99   }
100 }
101 
102 static int FGC_recvFixed(ForkGC *fgc, void *buf, size_t len);
103 
104 /**
105  * Send instead of a string to indicate that no more buffers are to be received
106  */
FGC_sendTerminator(ForkGC * fgc)107 static void FGC_sendTerminator(ForkGC *fgc) {
108   size_t smax = SIZE_MAX;
109   FGC_SEND_VAR(fgc, smax);
110 }
111 
FGC_recvFixed(ForkGC * fgc,void * buf,size_t len)112 static int __attribute__((warn_unused_result)) FGC_recvFixed(ForkGC *fgc, void *buf, size_t len) {
113   while (len) {
114     ssize_t nrecvd = read(fgc->pipefd[GC_READERFD], buf, len);
115     if (nrecvd > 0) {
116       buf += nrecvd;
117       len -= nrecvd;
118     } else if (nrecvd < 0 && errno != EINTR) {
119       printf("Got error while reading from pipe (%s)", strerror(errno));
120       return REDISMODULE_ERR;
121     }
122   }
123   return REDISMODULE_OK;
124 }
125 
126 #define TRY_RECV_FIXED(gc, obj, len)                   \
127   if (FGC_recvFixed(gc, obj, len) != REDISMODULE_OK) { \
128     return REDISMODULE_ERR;                            \
129   }
130 
131 static void *RECV_BUFFER_EMPTY = (void *)0x0deadbeef;
132 
133 static int __attribute__((warn_unused_result))
FGC_recvBuffer(ForkGC * fgc,void ** buf,size_t * len)134 FGC_recvBuffer(ForkGC *fgc, void **buf, size_t *len) {
135   TRY_RECV_FIXED(fgc, len, sizeof *len);
136   if (*len == SIZE_MAX) {
137     *buf = RECV_BUFFER_EMPTY;
138     return REDISMODULE_OK;
139   }
140   if (*len == 0) {
141     *buf = NULL;
142     return REDISMODULE_OK;
143   }
144 
145   *buf = rm_malloc(*len + 1);
146   ((char *)(*buf))[*len] = 0;
147   if (FGC_recvFixed(fgc, *buf, *len) != REDISMODULE_OK) {
148     rm_free(buf);
149     return REDISMODULE_ERR;
150   }
151   return REDISMODULE_OK;
152 }
153 
154 #define TRY_RECV_BUFFER(gc, buf, len)                   \
155   if (FGC_recvBuffer(gc, buf, len) != REDISMODULE_OK) { \
156     return REDISMODULE_ERR;                             \
157   }
158 
159 typedef struct {
160   // Number of blocks prior to repair
161   uint32_t nblocksOrig;
162   // Number of blocks repaired
163   uint32_t nblocksRepaired;
164   // Number of bytes cleaned in inverted index
165   uint64_t nbytesCollected;
166   // Number of document records removed
167   uint64_t ndocsCollected;
168 
169   /** Specific information about the _last_ index block */
170   size_t lastblkDocsRemoved;
171   size_t lastblkBytesCollected;
172   size_t lastblkNumDocs;
173 } MSG_IndexInfo;
174 
175 /** Structure sent describing an index block */
176 typedef struct {
177   IndexBlock blk;
178   int64_t oldix;  // Old position of the block
179   int64_t newix;  // New position of the block
180   // the actual content of the block follows...
181 } MSG_RepairedBlock;
182 
183 typedef struct {
184   void *ptr;       // Address of the buffer to free
185   uint32_t oldix;  // Old index of deleted block
186   uint32_t _pad;   // Uninitialized reads, otherwise
187 } MSG_DeletedBlock;
188 
189 /**
190  * headerCallback and hdrarg are invoked before the inverted index is sent, only
191  * iff the inverted index was repaired.
192  * RepairCallback and its argument are passed directly to IndexBlock_Repair; see
193  * that function for more details.
194  */
FGC_childRepairInvidx(ForkGC * gc,RedisSearchCtx * sctx,InvertedIndex * idx,void (* headerCallback)(ForkGC *,void *),void * hdrarg,IndexRepairParams * params)195 static bool FGC_childRepairInvidx(ForkGC *gc, RedisSearchCtx *sctx, InvertedIndex *idx,
196                                   void (*headerCallback)(ForkGC *, void *), void *hdrarg,
197                                   IndexRepairParams *params) {
198   MSG_RepairedBlock *fixed = array_new(MSG_RepairedBlock, 10);
199   MSG_DeletedBlock *deleted = array_new(MSG_DeletedBlock, 10);
200   IndexBlock *blocklist = array_new(IndexBlock, idx->size);
201   MSG_IndexInfo ixmsg = {.nblocksOrig = idx->size};
202   IndexRepairParams params_s = {0};
203   bool rv = false;
204   if (!params) {
205     params = &params_s;
206   }
207 
208   for (size_t i = 0; i < idx->size; ++i) {
209     params->bytesCollected = 0;
210     params->bytesBeforFix = 0;
211     params->bytesAfterFix = 0;
212     IndexBlock *blk = idx->blocks + i;
213     if (blk->lastId - blk->firstId > UINT32_MAX) {
214       // Skip over blocks which have a wide variation. In the future we might
215       // want to split a block into two (or more) on high-delta boundaries.
216       // todo: is it ok??
217       blocklist = array_append(blocklist, *blk);
218       continue;
219     }
220 
221     // Capture the pointer address before the block is cleared; otherwise
222     // the pointer might be freed!
223     void *bufptr = blk->buf.data;
224     int nrepaired = IndexBlock_Repair(blk, &sctx->spec->docs, idx->flags, params);
225     // We couldn't repair the block - return 0
226     if (nrepaired == -1) {
227       goto done;
228     } else if (nrepaired == 0) {
229       // unmodified block
230       blocklist = array_append(blocklist, *blk);
231       continue;
232     }
233 
234     if (blk->numDocs == 0) {
235       // this block should be removed
236       MSG_DeletedBlock *delmsg = array_ensure_tail(&deleted, MSG_DeletedBlock);
237       *delmsg = (MSG_DeletedBlock){.ptr = bufptr, .oldix = i};
238     } else {
239       blocklist = array_append(blocklist, *blk);
240       MSG_RepairedBlock *fixmsg = array_ensure_tail(&fixed, MSG_RepairedBlock);
241       fixmsg->newix = array_len(blocklist) - 1;
242       fixmsg->oldix = i;
243       fixmsg->blk = *blk;
244       ixmsg.nblocksRepaired++;
245     }
246 
247     ixmsg.nbytesCollected += (params->bytesBeforFix - params->bytesAfterFix);
248     ixmsg.ndocsCollected += nrepaired;
249     if (i == idx->size - 1) {
250       ixmsg.lastblkBytesCollected = ixmsg.nbytesCollected;
251       ixmsg.lastblkDocsRemoved = nrepaired;
252       ixmsg.lastblkNumDocs = blk->numDocs + nrepaired;
253     }
254   }
255 
256   if (array_len(fixed) == 0 && array_len(deleted) == 0) {
257     // No blocks were removed or repaired
258     goto done;
259   }
260 
261   headerCallback(gc, hdrarg);
262   FGC_sendFixed(gc, &ixmsg, sizeof ixmsg);
263   if (array_len(blocklist) == idx->size) {
264     // no empty block, there is no need to send the blocks array. Don't send
265     // any new blocks
266     FGC_sendBuffer(gc, NULL, 0);
267   } else {
268     FGC_sendBuffer(gc, blocklist, array_len(blocklist) * sizeof(*blocklist));
269   }
270   FGC_sendBuffer(gc, deleted, array_len(deleted) * sizeof(*deleted));
271 
272   for (size_t i = 0; i < array_len(fixed); ++i) {
273     // write fix block
274     const MSG_RepairedBlock *msg = fixed + i;
275     const IndexBlock *blk = blocklist + msg->newix;
276     FGC_sendFixed(gc, msg, sizeof(*msg));
277     FGC_sendBuffer(gc, IndexBlock_DataBuf(blk), IndexBlock_DataLen(blk));
278   }
279   rv = true;
280 
281 done:
282   array_free(fixed);
283   array_free(blocklist);
284   array_free(deleted);
285   return rv;
286 }
287 
sendHeaderString(ForkGC * gc,void * arg)288 static void sendHeaderString(ForkGC *gc, void *arg) {
289   struct iovec { void *iov_base; size_t iov_len; };
290   struct iovec *iov = arg;
291   FGC_sendBuffer(gc, iov->iov_base, iov->iov_len);
292 }
293 
FGC_childCollectTerms(ForkGC * gc,RedisSearchCtx * sctx)294 static void FGC_childCollectTerms(ForkGC *gc, RedisSearchCtx *sctx) {
295   TrieIterator *iter = Trie_Iterate(sctx->spec->terms, "", 0, 0, 1);
296   rune *rstr = NULL;
297   t_len slen = 0;
298   float score = 0;
299   int dist = 0;
300   while (TrieIterator_Next(iter, &rstr, &slen, NULL, &score, &dist)) {
301     size_t termLen;
302     char *term = runesToStr(rstr, slen, &termLen);
303     RedisModuleKey *idxKey = NULL;
304     InvertedIndex *idx = Redis_OpenInvertedIndexEx(sctx, term, strlen(term), 1, &idxKey);
305     if (idx) {
306       struct iovec { void *iov_base; size_t iov_len; } iov;
307       iov.iov_base = term;
308       iov.iov_len = termLen;
309       FGC_childRepairInvidx(gc, sctx, idx, sendHeaderString, &iov, NULL);
310     }
311     if (idxKey) {
312       RedisModule_CloseKey(idxKey);
313     }
314     rm_free(term);
315   }
316   DFAFilter_Free(iter->ctx);
317   rm_free(iter->ctx);
318   TrieIterator_Free(iter);
319 
320   // we are done with terms
321   FGC_sendTerminator(gc);
322 }
323 
324 KHASH_MAP_INIT_INT64(cardvals, size_t)
325 
326 typedef struct {
327   const IndexBlock *lastblk;
328   khash_t(cardvals) * delLast;
329   khash_t(cardvals) * delRest;
330 } numCbCtx;
331 
332 typedef union {
333   uint64_t u64;
334   double d48;
335 } numUnion;
336 
countDeleted(const RSIndexResult * r,const IndexBlock * blk,void * arg)337 static void countDeleted(const RSIndexResult *r, const IndexBlock *blk, void *arg) {
338   numCbCtx *ctx = arg;
339   khash_t(cardvals) *ht = NULL;
340   if (blk == ctx->lastblk) {
341     if ((ht = ctx->delLast) == NULL) {
342       ht = ctx->delLast = kh_init(cardvals);
343     }
344   } else if ((ht = ctx->delRest) == NULL) {
345     ht = ctx->delRest = kh_init(cardvals);
346   }
347   RS_LOG_ASSERT(ht, "cardvals should not be NULL");
348   int added = 0;
349   numUnion u = {r->num.value};
350   khiter_t it = kh_put(cardvals, ht, u.u64, &added);
351   if (!added) {
352     // i.e. already existed
353     kh_val(ht, it)++;
354   } else {
355     kh_val(ht, it) = 0;
356   }
357 }
358 
359 typedef struct {
360   int type;
361   const char *field;
362   const void *curPtr;
363   char *tagValue;
364   size_t tagLen;
365   uint64_t uniqueId;
366   int sentFieldName;
367 } tagNumHeader;
368 
sendNumericTagHeader(ForkGC * fgc,void * arg)369 static void sendNumericTagHeader(ForkGC *fgc, void *arg) {
370   tagNumHeader *info = arg;
371   if (!info->sentFieldName) {
372     info->sentFieldName = 1;
373     FGC_sendBuffer(fgc, info->field, strlen(info->field));
374     FGC_sendFixed(fgc, &info->uniqueId, sizeof info->uniqueId);
375   }
376   FGC_SEND_VAR(fgc, info->curPtr);
377   if (info->type == RSFLDTYPE_TAG) {
378     FGC_sendBuffer(fgc, info->tagValue, info->tagLen);
379   }
380 }
381 
382 // If anything other than FGC_COLLECTED is returned, it is an error or done
recvNumericTagHeader(ForkGC * fgc,char ** fieldName,size_t * fieldNameLen,uint64_t * id)383 static FGCError recvNumericTagHeader(ForkGC *fgc, char **fieldName, size_t *fieldNameLen,
384                                      uint64_t *id) {
385   if (FGC_recvBuffer(fgc, (void **)fieldName, fieldNameLen) != REDISMODULE_OK) {
386     return FGC_PARENT_ERROR;
387   }
388   if (*fieldName == RECV_BUFFER_EMPTY) {
389     *fieldName = NULL;
390     return FGC_DONE;
391   }
392 
393   if (FGC_recvFixed(fgc, id, sizeof(*id)) != REDISMODULE_OK) {
394     rm_free(*fieldName);
395     *fieldName = NULL;
396     return FGC_PARENT_ERROR;
397   }
398   return FGC_COLLECTED;
399 }
400 
sendKht(ForkGC * gc,const khash_t (cardvals)* kh)401 static void sendKht(ForkGC *gc, const khash_t(cardvals) * kh) {
402   size_t n = 0;
403   if (!kh) {
404     FGC_SEND_VAR(gc, n);
405     return;
406   }
407   n = kh_size(kh);
408   size_t nsent = 0;
409 
410   FGC_SEND_VAR(gc, n);
411   for (khiter_t it = kh_begin(kh); it != kh_end(kh); ++it) {
412     if (!kh_exist(kh, it)) {
413       continue;
414     }
415     numUnion u = {kh_key(kh, it)};
416     size_t count = kh_val(kh, it);
417     CardinalityValue cu = {.value = u.d48, .appearances = count};
418     FGC_SEND_VAR(gc, cu);
419     nsent++;
420   }
421   RS_LOG_ASSERT(nsent == n, "Not all hashes has been sent");
422 }
423 
FGC_childCollectNumeric(ForkGC * gc,RedisSearchCtx * sctx)424 static void FGC_childCollectNumeric(ForkGC *gc, RedisSearchCtx *sctx) {
425   RedisModuleKey *idxKey = NULL;
426   FieldSpec **numericFields = getFieldsByType(sctx->spec, INDEXFLD_T_NUMERIC | INDEXFLD_T_GEO);
427 
428   for (int i = 0; i < array_len(numericFields); ++i) {
429     RedisModuleString *keyName =
430         IndexSpec_GetFormattedKey(sctx->spec, numericFields[i], INDEXFLD_T_NUMERIC);
431     NumericRangeTree *rt = OpenNumericIndex(sctx, keyName, &idxKey);
432 
433     NumericRangeTreeIterator *gcIterator = NumericRangeTreeIterator_New(rt);
434 
435     NumericRangeNode *currNode = NULL;
436     tagNumHeader header = {.type = RSFLDTYPE_NUMERIC,
437                            .field = numericFields[i]->name,
438                            .uniqueId = rt->uniqueId};
439 
440     while ((currNode = NumericRangeTreeIterator_Next(gcIterator))) {
441       if (!currNode->range) {
442         continue;
443       }
444       numCbCtx nctx = {0};
445       InvertedIndex *idx = currNode->range->entries;
446       nctx.lastblk = idx->blocks + idx->size - 1;
447       IndexRepairParams params = {.RepairCallback = countDeleted, .arg = &nctx};
448       header.curPtr = currNode;
449       bool repaired = FGC_childRepairInvidx(gc, sctx, idx, sendNumericTagHeader, &header, &params);
450 
451       if (repaired) {
452         sendKht(gc, nctx.delRest);
453         sendKht(gc, nctx.delLast);
454       }
455       if (nctx.delRest) {
456         kh_destroy(cardvals, nctx.delRest);
457       }
458       if (nctx.delLast) {
459         kh_destroy(cardvals, nctx.delLast);
460       }
461     }
462 
463     if (header.sentFieldName) {
464       // If we've repaired at least one entry, send the terminator;
465       // note that "terminator" just means a zero address and not the
466       // "no more strings" terminator in FGC_sendTerminator
467       void *pdummy = NULL;
468       FGC_SEND_VAR(gc, pdummy);
469     }
470 
471     if (idxKey) {
472       RedisModule_CloseKey(idxKey);
473     }
474 
475     NumericRangeTreeIterator_Free(gcIterator);
476   }
477 
478   // we are done with numeric fields
479   FGC_sendTerminator(gc);
480 }
481 
FGC_childCollectTags(ForkGC * gc,RedisSearchCtx * sctx)482 static void FGC_childCollectTags(ForkGC *gc, RedisSearchCtx *sctx) {
483   RedisModuleKey *idxKey = NULL;
484   FieldSpec **tagFields = getFieldsByType(sctx->spec, INDEXFLD_T_TAG);
485   if (array_len(tagFields) != 0) {
486     for (int i = 0; i < array_len(tagFields); ++i) {
487       RedisModuleString *keyName =
488           IndexSpec_GetFormattedKey(sctx->spec, tagFields[i], INDEXFLD_T_TAG);
489       TagIndex *tagIdx = TagIndex_Open(sctx, keyName, false, &idxKey);
490       if (!tagIdx) {
491         continue;
492       }
493 
494       tagNumHeader header = {.type = RSFLDTYPE_TAG,
495                              .field = tagFields[i]->name,
496                              .uniqueId = tagIdx->uniqueId};
497 
498       TrieMapIterator *iter = TrieMap_Iterate(tagIdx->values, "", 0);
499       char *ptr;
500       tm_len_t len;
501       InvertedIndex *value;
502       while (TrieMapIterator_Next(iter, &ptr, &len, (void **)&value)) {
503         header.curPtr = value;
504         header.tagValue = ptr;
505         header.tagLen = len;
506         // send repaired data
507         FGC_childRepairInvidx(gc, sctx, value, sendNumericTagHeader, &header, NULL);
508       }
509 
510       // we are done with the current field
511       if (header.sentFieldName) {
512         void *pdummy = NULL;
513         FGC_SEND_VAR(gc, pdummy);
514       }
515 
516       if (idxKey) {
517         RedisModule_CloseKey(idxKey);
518       }
519     }
520   }
521   // we are done with numeric fields
522   FGC_sendTerminator(gc);
523 }
524 
FGC_childScanIndexes(ForkGC * gc)525 static void FGC_childScanIndexes(ForkGC *gc) {
526   RedisSearchCtx *sctx = FGC_getSctx(gc, gc->ctx);
527   if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
528     // write log here
529     return;
530   }
531 
532   FGC_childCollectTerms(gc, sctx);
533   FGC_childCollectNumeric(gc, sctx);
534   FGC_childCollectTags(gc, sctx);
535 
536   SearchCtx_Free(sctx);
537 }
538 
539 typedef struct {
540   MSG_DeletedBlock *delBlocks;
541   size_t numDelBlocks;
542 
543   MSG_RepairedBlock *changedBlocks;
544 
545   IndexBlock *newBlocklist;
546   size_t newBlocklistSize;
547   int lastBlockIgnored;
548 } InvIdxBuffers;
549 
550 static int __attribute__((warn_unused_result))
FGC_recvRepairedBlock(ForkGC * gc,MSG_RepairedBlock * binfo)551 FGC_recvRepairedBlock(ForkGC *gc, MSG_RepairedBlock *binfo) {
552   if (FGC_recvFixed(gc, binfo, sizeof(*binfo)) != REDISMODULE_OK) {
553     return REDISMODULE_ERR;
554   }
555   Buffer *b = &binfo->blk.buf;
556   if (FGC_recvBuffer(gc, (void **)&b->data, &b->offset) != REDISMODULE_OK) {
557     return REDISMODULE_ERR;
558   }
559   b->cap = b->offset;
560   return REDISMODULE_OK;
561 }
562 
563 static int __attribute__((warn_unused_result))
FGC_recvInvIdx(ForkGC * gc,InvIdxBuffers * bufs,MSG_IndexInfo * info)564 FGC_recvInvIdx(ForkGC *gc, InvIdxBuffers *bufs, MSG_IndexInfo *info) {
565   size_t nblocksRecvd = 0;
566   if (FGC_recvFixed(gc, info, sizeof(*info)) != REDISMODULE_OK) {
567     return REDISMODULE_ERR;
568   }
569   if (FGC_recvBuffer(gc, (void **)&bufs->newBlocklist, &bufs->newBlocklistSize) != REDISMODULE_OK) {
570     return REDISMODULE_ERR;
571   }
572 
573   if (bufs->newBlocklistSize) {
574     bufs->newBlocklistSize /= sizeof(*bufs->newBlocklist);
575   }
576   if (FGC_recvBuffer(gc, (void **)&bufs->delBlocks, &bufs->numDelBlocks) != REDISMODULE_OK) {
577     goto error;
578   }
579   bufs->numDelBlocks /= sizeof(*bufs->delBlocks);
580   bufs->changedBlocks = rm_malloc(sizeof(*bufs->changedBlocks) * info->nblocksRepaired);
581   for (size_t i = 0; i < info->nblocksRepaired; ++i) {
582     if (FGC_recvRepairedBlock(gc, bufs->changedBlocks + i) != REDISMODULE_OK) {
583       goto error;
584     }
585     nblocksRecvd++;
586   }
587   return REDISMODULE_OK;
588 
589 error:
590   rm_free(bufs->newBlocklist);
591   for (size_t ii = 0; ii < nblocksRecvd; ++ii) {
592     rm_free(bufs->changedBlocks[ii].blk.buf.data);
593   }
594   rm_free(bufs->changedBlocks);
595   memset(bufs, 0, sizeof(*bufs));
596   return REDISMODULE_ERR;
597 }
598 
freeInvIdx(InvIdxBuffers * bufs,MSG_IndexInfo * info)599 static void freeInvIdx(InvIdxBuffers *bufs, MSG_IndexInfo *info) {
600   rm_free(bufs->newBlocklist);
601   rm_free(bufs->delBlocks);
602 
603   if (bufs->changedBlocks) {
604     // could be null because of pipe error
605     for (size_t ii = 0; ii < info->nblocksRepaired; ++ii) {
606       rm_free(bufs->changedBlocks[ii].blk.buf.data);
607     }
608   }
609   rm_free(bufs->changedBlocks);
610 }
611 
checkLastBlock(ForkGC * gc,InvIdxBuffers * idxData,MSG_IndexInfo * info,InvertedIndex * idx)612 static void checkLastBlock(ForkGC *gc, InvIdxBuffers *idxData, MSG_IndexInfo *info,
613                            InvertedIndex *idx) {
614   IndexBlock *lastOld = idx->blocks + info->nblocksOrig - 1;
615   if (info->lastblkDocsRemoved == 0) {
616     // didn't touch last block in child
617     return;
618   }
619   if (info->lastblkNumDocs == lastOld->numDocs) {
620     // didn't touch last block in parent
621     return;
622   }
623 
624   if (info->lastblkDocsRemoved == info->lastblkNumDocs) {
625     // Last block was deleted entirely while updates on the main process.
626     // We need to remove it from delBlocks list
627     idxData->numDelBlocks--;
628 
629     // Then We need add it to the newBlocklist.
630     idxData->newBlocklistSize++;
631     idxData->newBlocklist = rm_realloc(idxData->newBlocklist,
632                                        sizeof(*idxData->newBlocklist) * idxData->newBlocklistSize);
633     idxData->newBlocklist[idxData->newBlocklistSize - 1] = *lastOld;
634   } else {
635     // Last block was modified on the child and on the parent.
636 
637     // we need to remove it from changedBlocks
638     MSG_RepairedBlock *rb = idxData->changedBlocks + info->nblocksRepaired - 1;
639     indexBlock_Free(&rb->blk);
640     info->nblocksRepaired--;
641 
642     // Then add it to newBlocklist if newBlocklist is not NULL.
643     // If newBlocklist!=NULL then the last block must be there (it was changed and not deleted)
644     // If newBlocklist==NULL then by decreasing the nblocksOrig by one we make sure to keep the last
645     // block
646     if (idxData->newBlocklist) {
647       idxData->newBlocklist[idxData->newBlocklistSize - 1] = *lastOld;
648     } else {
649       --info->nblocksOrig;
650     }
651   }
652 
653   info->ndocsCollected -= info->lastblkDocsRemoved;
654   info->nbytesCollected -= info->lastblkBytesCollected;
655   idxData->lastBlockIgnored = 1;
656   gc->stats.gcBlocksDenied++;
657 }
658 
FGC_applyInvertedIndex(ForkGC * gc,InvIdxBuffers * idxData,MSG_IndexInfo * info,InvertedIndex * idx)659 static void FGC_applyInvertedIndex(ForkGC *gc, InvIdxBuffers *idxData, MSG_IndexInfo *info,
660                                    InvertedIndex *idx) {
661   checkLastBlock(gc, idxData, info, idx);
662   for (size_t i = 0; i < info->nblocksRepaired; ++i) {
663     MSG_RepairedBlock *blockModified = idxData->changedBlocks + i;
664     indexBlock_Free(&idx->blocks[blockModified->oldix]);
665   }
666   for (size_t i = 0; i < idxData->numDelBlocks; ++i) {
667     // Blocks that were deleted entirely:
668     MSG_DeletedBlock *delinfo = idxData->delBlocks + i;
669     rm_free(delinfo->ptr);
670   }
671   rm_free(idxData->delBlocks);
672 
673   // Ensure the old index is at least as big as the new index' size
674   RS_LOG_ASSERT(idx->size >= info->nblocksOrig, "Old index should be larger or equal to new index");
675 
676   if (idxData->newBlocklist) {
677     /**
678      * At this point, we check if the last block has had new data added to it,
679      * but was _not_ repaired. We check for a repaired last block in
680      * checkLastBlock().
681      */
682 
683     if (!info->lastblkDocsRemoved) {
684       /**
685        * Last block was unmodified-- let's prefer the last block's pointer
686        * over our own (which may be stale).
687        * If the last block was repaired, this is handled above
688        */
689       idxData->newBlocklist[idxData->newBlocklistSize - 1] = idx->blocks[info->nblocksOrig - 1];
690     }
691 
692     // Number of blocks added in the parent process since the last scan
693     size_t newAddedLen = idx->size - info->nblocksOrig;
694 
695     // The final size is the reordered block size, plus the number of blocks
696     // which we haven't scanned yet, because they were added in the parent
697     size_t totalLen = idxData->newBlocklistSize + newAddedLen;
698 
699     idxData->newBlocklist =
700         rm_realloc(idxData->newBlocklist, totalLen * sizeof(*idxData->newBlocklist));
701     memcpy(idxData->newBlocklist + idxData->newBlocklistSize, (idx->blocks + info->nblocksOrig),
702            newAddedLen * sizeof(*idxData->newBlocklist));
703 
704     rm_free(idx->blocks);
705     idxData->newBlocklistSize += newAddedLen;
706     idx->blocks = idxData->newBlocklist;
707     idx->size = idxData->newBlocklistSize;
708   } else if (idxData->numDelBlocks) {
709     // In this case, all blocks the child has seen need to be deleted. We don't
710     // get a new block list, because they are all gone..
711     size_t newAddedLen = idx->size - info->nblocksOrig;
712     if (newAddedLen) {
713       memmove(idx->blocks, idx->blocks + info->nblocksOrig, sizeof(*idx->blocks) * newAddedLen);
714     }
715     idx->size = newAddedLen;
716     if (idx->size == 0) {
717       InvertedIndex_AddBlock(idx, 0);
718     }
719   }
720 
721   for (size_t i = 0; i < info->nblocksRepaired; ++i) {
722     MSG_RepairedBlock *blockModified = idxData->changedBlocks + i;
723     idx->blocks[blockModified->newix] = blockModified->blk;
724   }
725 
726   idx->numDocs -= info->ndocsCollected;
727   idx->gcMarker++;
728 }
729 
FGC_parentHandleTerms(ForkGC * gc,RedisModuleCtx * rctx)730 static FGCError FGC_parentHandleTerms(ForkGC *gc, RedisModuleCtx *rctx) {
731   FGCError status = FGC_COLLECTED;
732   size_t len;
733   int hasLock = 0;
734   char *term = NULL;
735   if (FGC_recvBuffer(gc, (void **)&term, &len) != REDISMODULE_OK) {
736     return FGC_CHILD_ERROR;
737   }
738   RedisModuleKey *idxKey = NULL;
739   RedisSearchCtx *sctx = NULL;
740 
741   if (term == RECV_BUFFER_EMPTY) {
742     return FGC_DONE;
743   }
744 
745   InvIdxBuffers idxbufs = {0};
746   MSG_IndexInfo info = {0};
747   if (FGC_recvInvIdx(gc, &idxbufs, &info) != REDISMODULE_OK) {
748     rm_free(term);
749     return FGC_CHILD_ERROR;
750   }
751 
752   if (!FGC_lock(gc, rctx)) {
753     status = FGC_PARENT_ERROR;
754     goto cleanup;
755   }
756 
757   hasLock = 1;
758   sctx = FGC_getSctx(gc, rctx);
759   if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
760     status = FGC_PARENT_ERROR;
761     goto cleanup;
762   }
763 
764   InvertedIndex *idx = Redis_OpenInvertedIndexEx(sctx, term, len, 1, &idxKey);
765 
766   if (idx == NULL) {
767     status = FGC_PARENT_ERROR;
768     goto cleanup;
769   }
770 
771   FGC_applyInvertedIndex(gc, &idxbufs, &info, idx);
772   FGC_updateStats(sctx, gc, info.ndocsCollected, info.nbytesCollected);
773 
774   if (idx->numDocs == 0) {
775     // inverted index was cleaned entirely lets free it
776     RedisModuleString *termKey = fmtRedisTermKey(sctx, term, len);
777     size_t formatedTremLen;
778     const char *formatedTrem = RedisModule_StringPtrLen(termKey, &formatedTremLen);
779     if (sctx->spec->keysDict) {
780       dictDelete(sctx->spec->keysDict, termKey);
781     }
782     Trie_Delete(sctx->spec->terms, term, len);
783     RedisModule_FreeString(sctx->redisCtx, termKey);
784   }
785 
786 cleanup:
787 
788   if (idxKey) {
789     RedisModule_CloseKey(idxKey);
790   }
791   if (sctx) {
792     SearchCtx_Free(sctx);
793   }
794   if (hasLock) {
795     FGC_unlock(gc, rctx);
796   }
797   rm_free(term);
798   if (status != FGC_COLLECTED) {
799     freeInvIdx(&idxbufs, &info);
800   } else {
801     rm_free(idxbufs.changedBlocks);
802   }
803   return status;
804 }
805 
806 typedef struct {
807   // Node in the tree that was GC'd
808   NumericRangeNode *node;
809   CardinalityValue *lastBlockDeleted;
810   CardinalityValue *restBlockDeleted;
811   size_t nlastBlockDel;
812   size_t nrestBlockDel;
813   InvIdxBuffers idxbufs;
814   MSG_IndexInfo info;
815 } NumGcInfo;
816 
recvCardvals(ForkGC * fgc,CardinalityValue ** tgt,size_t * len)817 static int recvCardvals(ForkGC *fgc, CardinalityValue **tgt, size_t *len) {
818   if (FGC_recvFixed(fgc, len, sizeof(*len)) != REDISMODULE_OK) {
819     return REDISMODULE_ERR;
820   }
821   *len *= sizeof(**tgt);
822   if (!*len) {
823     *tgt = NULL;
824     return REDISMODULE_OK;
825   }
826   *tgt = rm_malloc(sizeof(**tgt) * *len);
827   int rc = FGC_recvFixed(fgc, *tgt, *len);
828   if (rc == REDISMODULE_OK) {
829     *len /= sizeof(**tgt);
830   }
831   return rc;
832 }
833 
recvNumIdx(ForkGC * gc,NumGcInfo * ninfo)834 static FGCError recvNumIdx(ForkGC *gc, NumGcInfo *ninfo) {
835   if (FGC_recvFixed(gc, &ninfo->node, sizeof(ninfo->node)) != REDISMODULE_OK) {
836     goto error;
837   }
838   if (ninfo->node == NULL) {
839     return FGC_DONE;
840   }
841 
842   if (FGC_recvInvIdx(gc, &ninfo->idxbufs, &ninfo->info) != REDISMODULE_OK) {
843     goto error;
844   }
845 
846   if (recvCardvals(gc, &ninfo->restBlockDeleted, &ninfo->nrestBlockDel) != REDISMODULE_OK) {
847     goto error;
848   }
849   if (recvCardvals(gc, &ninfo->lastBlockDeleted, &ninfo->nlastBlockDel) != REDISMODULE_OK) {
850     goto error;
851   }
852 
853   return FGC_COLLECTED;
854 
855 error:
856   printf("Error receiving numeric index!\n");
857   freeInvIdx(&ninfo->idxbufs, &ninfo->info);
858   rm_free(ninfo->lastBlockDeleted);
859   rm_free(ninfo->restBlockDeleted);
860   memset(ninfo, 0, sizeof(*ninfo));
861   return FGC_CHILD_ERROR;
862 }
863 
resetCardinality(NumGcInfo * info,NumericRangeNode * currNone)864 static void resetCardinality(NumGcInfo *info, NumericRangeNode *currNone) {
865   khash_t(cardvals) *kh = kh_init(cardvals);
866   int added;
867   for (size_t ii = 0; ii < info->nrestBlockDel; ++ii) {
868     numUnion u = {info->restBlockDeleted[ii].value};
869     khiter_t it = kh_put(cardvals, kh, u.u64, &added);
870     kh_val(kh, it) = info->restBlockDeleted[ii].appearances;
871   }
872   if (!info->idxbufs.lastBlockIgnored) {
873     for (size_t ii = 0; ii < info->nlastBlockDel; ++ii) {
874       numUnion u = {info->lastBlockDeleted[ii].value};
875       khiter_t it = kh_put(cardvals, kh, u.u64, &added);
876       if (!added) {
877         kh_val(kh, it) += info->lastBlockDeleted[ii].appearances;
878       } else {
879         kh_val(kh, it) = info->lastBlockDeleted[ii].appearances;
880       }
881     }
882   }
883 
884   NumericRange *r = currNone->range;
885   size_t n = array_len(r->values);
886   double minVal = DBL_MAX, maxVal = -DBL_MIN, uniqueSum = 0;
887 
888   for (size_t ii = 0; ii < array_len(r->values); ++ii) {
889   reeval:;
890     numUnion u = {r->values[ii].value};
891     khiter_t it = kh_get(cardvals, kh, u.u64);
892     if (it != kh_end(kh) && (r->values[ii].appearances -= kh_val(kh, it)) == 0) {
893       // delet this
894       size_t isLast = array_len(r->values) == ii + 1;
895       array_del_fast(r->values, ii);
896       if (!isLast) {
897         goto reeval;
898       }
899     } else {
900       minVal = MIN(minVal, r->values[ii].value);
901       maxVal = MAX(maxVal, r->values[ii].value);
902       uniqueSum += r->values[ii].value;
903     }
904   }
905   kh_destroy(cardvals, kh);
906   // we can only update the min and the max value if the node is a leaf.
907   // otherwise the min and the max also represent its children values and
908   // we can not change it.
909   if (NumericRangeNode_IsLeaf(currNone)) {
910     r->minVal = minVal;
911     r->maxVal = maxVal;
912   }
913   r->unique_sum = uniqueSum;
914   r->card = array_len(r->values);
915 }
916 
applyNumIdx(ForkGC * gc,RedisSearchCtx * sctx,NumGcInfo * ninfo)917 static void applyNumIdx(ForkGC *gc, RedisSearchCtx *sctx, NumGcInfo *ninfo) {
918   NumericRangeNode *currNode = ninfo->node;
919   InvIdxBuffers *idxbufs = &ninfo->idxbufs;
920   MSG_IndexInfo *info = &ninfo->info;
921   FGC_applyInvertedIndex(gc, idxbufs, info, currNode->range->entries);
922 
923   currNode->range->invertedIndexSize -= info->nbytesCollected;
924   FGC_updateStats(sctx, gc, info->ndocsCollected, info->nbytesCollected);
925 
926   // TODO: fix for NUMERIC similar to TAG fix PR#2269
927   // if (currNode->range->entries->numDocs == 0) {
928   //   NumericRangeTree_DeleteNode(rt, (currNode->range->minVal + currNode->range->maxVal) / 2);
929   // }
930 
931   resetCardinality(ninfo, currNode);
932 }
933 
FGC_parentHandleNumeric(ForkGC * gc,RedisModuleCtx * rctx)934 static FGCError FGC_parentHandleNumeric(ForkGC *gc, RedisModuleCtx *rctx) {
935   int hasLock = 0;
936   size_t fieldNameLen;
937   char *fieldName = NULL;
938   uint64_t rtUniqueId;
939   NumericRangeTree *rt = NULL;
940   FGCError status = recvNumericTagHeader(gc, &fieldName, &fieldNameLen, &rtUniqueId);
941   if (status == FGC_DONE) {
942     return FGC_DONE;
943   }
944 
945   while (status == FGC_COLLECTED) {
946     NumGcInfo ninfo = {0};
947     RedisSearchCtx *sctx = NULL;
948     RedisModuleKey *idxKey = NULL;
949     FGCError status2 = recvNumIdx(gc, &ninfo);
950     if (status2 == FGC_DONE) {
951       break;
952     } else if (status2 != FGC_COLLECTED) {
953       status = status2;
954       break;
955     }
956 
957     if (!FGC_lock(gc, rctx)) {
958       status = FGC_PARENT_ERROR;
959       goto loop_cleanup;
960     }
961 
962     hasLock = 1;
963     sctx = FGC_getSctx(gc, rctx);
964     if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
965       status = FGC_PARENT_ERROR;
966       goto loop_cleanup;
967     }
968     RedisModuleString *keyName =
969         IndexSpec_GetFormattedKeyByName(sctx->spec, fieldName, INDEXFLD_T_NUMERIC);
970     rt = OpenNumericIndex(sctx, keyName, &idxKey);
971 
972     if (rt->uniqueId != rtUniqueId) {
973       status = FGC_PARENT_ERROR;
974       goto loop_cleanup;
975     }
976 
977     if (!ninfo.node->range) {
978       gc->stats.gcNumericNodesMissed++;
979       goto loop_cleanup;
980     }
981 
982     applyNumIdx(gc, sctx, &ninfo);
983 
984     if (ninfo.node->range->entries->numDocs == 0) {
985       rt->emptyLeaves++;
986     }
987 
988   loop_cleanup:
989     if (sctx) {
990       SearchCtx_Free(sctx);
991     }
992     if (status != FGC_COLLECTED) {
993       freeInvIdx(&ninfo.idxbufs, &ninfo.info);
994     } else {
995       rm_free(ninfo.idxbufs.changedBlocks);
996     }
997     if (idxKey) {
998       RedisModule_CloseKey(idxKey);
999     }
1000     if (hasLock) {
1001       FGC_unlock(gc, rctx);
1002       hasLock = 0;
1003     }
1004 
1005     rm_free(ninfo.restBlockDeleted);
1006     rm_free(ninfo.lastBlockDeleted);
1007   }
1008 
1009   rm_free(fieldName);
1010 
1011   //printf("empty %ld, number of ranges %ld\n", rt->emptyLeaves, rt->numRanges);
1012   if (rt && rt->emptyLeaves >= rt->numRanges / 2) {
1013     hasLock = 1;
1014     if (!FGC_lock(gc, rctx)) {
1015       return FGC_PARENT_ERROR;
1016     }
1017     if (RSGlobalConfig.forkGCCleanNumericEmptyNodes) {
1018       NRN_AddRv rv = NumericRangeTree_TrimEmptyLeaves(rt);
1019       rt->numRanges += rv.numRanges;
1020       rt->emptyLeaves = 0;
1021     }
1022     if (hasLock) {
1023       FGC_unlock(gc, rctx);
1024       hasLock = 0;
1025     }
1026   }
1027   //printf("removed %d\n", rv.numRanges);
1028 
1029   return status;
1030 }
1031 
FGC_parentHandleTags(ForkGC * gc,RedisModuleCtx * rctx)1032 static FGCError FGC_parentHandleTags(ForkGC *gc, RedisModuleCtx *rctx) {
1033   int hasLock = 0;
1034   size_t fieldNameLen;
1035   char *fieldName;
1036   uint64_t tagUniqueId;
1037   InvertedIndex *value = NULL;
1038   FGCError status = recvNumericTagHeader(gc, &fieldName, &fieldNameLen, &tagUniqueId);
1039 
1040   while (status == FGC_COLLECTED) {
1041     RedisModuleString *keyName = NULL;
1042     RedisModuleKey *idxKey = NULL;
1043     RedisSearchCtx *sctx = NULL;
1044     MSG_IndexInfo info = {0};
1045     InvIdxBuffers idxbufs = {0};
1046     TagIndex *tagIdx = NULL;
1047     char *tagVal = NULL;
1048     size_t tagValLen;
1049 
1050     if (FGC_recvFixed(gc, &value, sizeof value) != REDISMODULE_OK) {
1051       status = FGC_CHILD_ERROR;
1052       break;
1053     }
1054 
1055     // No more tags values in tag field
1056     if (value == NULL) {
1057       RS_LOG_ASSERT(status == FGC_COLLECTED, "GC status is COLLECTED");
1058       break;
1059     }
1060 
1061     if (FGC_recvBuffer(gc, (void **)&tagVal, &tagValLen) != REDISMODULE_OK) {
1062       status = FGC_CHILD_ERROR;
1063       goto loop_cleanup;
1064     }
1065     // printf("receives %s %ld\n", tagVal, tagValLen);
1066 
1067     if (FGC_recvInvIdx(gc, &idxbufs, &info) != REDISMODULE_OK) {
1068       status = FGC_CHILD_ERROR;
1069       goto loop_cleanup;
1070     }
1071 
1072     if (!FGC_lock(gc, rctx)) {
1073       status = FGC_PARENT_ERROR;
1074       goto loop_cleanup;
1075     }
1076 
1077     hasLock = 1;
1078     sctx = FGC_getSctx(gc, rctx);
1079     if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
1080       status = FGC_PARENT_ERROR;
1081       goto loop_cleanup;
1082     }
1083     keyName = IndexSpec_GetFormattedKeyByName(sctx->spec, fieldName, INDEXFLD_T_TAG);
1084     tagIdx = TagIndex_Open(sctx, keyName, false, &idxKey);
1085 
1086     if (tagIdx->uniqueId != tagUniqueId) {
1087       status = FGC_CHILD_ERROR;
1088       goto loop_cleanup;
1089     }
1090 
1091     InvertedIndex *idx = TagIndex_OpenIndex(tagIdx, tagVal, tagValLen, 0);
1092     if (idx == TRIEMAP_NOTFOUND || idx != value) {
1093       status = FGC_PARENT_ERROR;
1094       goto loop_cleanup;
1095     }
1096 
1097     // printf("Child %p Parent %p\n", value, idx);
1098 
1099     FGC_applyInvertedIndex(gc, &idxbufs, &info, idx);
1100     FGC_updateStats(sctx, gc, info.ndocsCollected, info.nbytesCollected);
1101 
1102     // if tag value is empty, let's remove it.
1103     if (idx->numDocs == 0) {
1104       // printf("Delete GC %s %p\n", tagVal, TrieMap_Find(tagIdx->values, tagVal, tagValLen));
1105       TrieMap_Delete(tagIdx->values, tagVal, tagValLen, InvertedIndex_Free);
1106     }
1107 
1108   loop_cleanup:
1109     if (sctx) {
1110       SearchCtx_Free(sctx);
1111     }
1112     if (idxKey) {
1113       RedisModule_CloseKey(idxKey);
1114     }
1115     if (hasLock) {
1116       FGC_unlock(gc, rctx);
1117       hasLock = 0;
1118     }
1119     if (status != FGC_COLLECTED) {
1120       freeInvIdx(&idxbufs, &info);
1121     } else {
1122       rm_free(idxbufs.changedBlocks);
1123     }
1124     if (tagVal) {
1125       rm_free(tagVal);
1126     }
1127   }
1128 
1129   rm_free(fieldName);
1130   return status;
1131 }
1132 
FGC_parentHandleFromChild(ForkGC * gc)1133 int FGC_parentHandleFromChild(ForkGC *gc) {
1134   FGCError status = FGC_COLLECTED;
1135 
1136 #define COLLECT_FROM_CHILD(e)               \
1137   while ((status = (e)) == FGC_COLLECTED) { \
1138   }                                         \
1139   if (status != FGC_DONE) {                 \
1140     return REDISMODULE_ERR;                 \
1141   }
1142 
1143   COLLECT_FROM_CHILD(FGC_parentHandleTerms(gc, gc->ctx));
1144   COLLECT_FROM_CHILD(FGC_parentHandleNumeric(gc, gc->ctx));
1145   COLLECT_FROM_CHILD(FGC_parentHandleTags(gc, gc->ctx));
1146   return REDISMODULE_OK;
1147 }
1148 
1149 /**
1150  * In future versions of Redis, Redis will have its own fork() call.
1151  * The following two functions wrap this functionality.
1152  */
FGC_haveRedisFork()1153 static int FGC_haveRedisFork() {
1154   return RedisModule_Fork != NULL;
1155 }
1156 
FGC_fork(ForkGC * gc,RedisModuleCtx * ctx)1157 static int FGC_fork(ForkGC *gc, RedisModuleCtx *ctx) {
1158   if (FGC_haveRedisFork()) {
1159     int ret = RedisModule_Fork(NULL, NULL);
1160     return ret;
1161   } else {
1162     return fork();
1163   }
1164 }
1165 
periodicCb(RedisModuleCtx * ctx,void * privdata)1166 static int periodicCb(RedisModuleCtx *ctx, void *privdata) {
1167   ForkGC *gc = privdata;
1168   if (gc->deleting) {
1169     return 0;
1170   }
1171   if (gc->deletedDocsFromLastRun < RSGlobalConfig.forkGcCleanThreshold) {
1172     return 1;
1173   }
1174 
1175   int gcrv = 1;
1176 
1177   RedisModule_AutoMemory(ctx);
1178 
1179   // Check if RDB is loading - not needed after the first time we find out that rdb is not
1180   // reloading
1181   if (gc->rdbPossiblyLoading && !gc->sp) {
1182     RedisModule_ThreadSafeContextLock(ctx);
1183     if (isRdbLoading(ctx)) {
1184       RedisModule_Log(ctx, "notice", "RDB Loading in progress, not performing GC");
1185       RedisModule_ThreadSafeContextUnlock(ctx);
1186       return 1;
1187     } else {
1188       // the RDB will not load again, so it's safe to ignore the info check in the next cycles
1189       gc->rdbPossiblyLoading = 0;
1190     }
1191     RedisModule_ThreadSafeContextUnlock(ctx);
1192   }
1193 
1194   pid_t cpid;
1195   TimeSample ts;
1196 
1197   while (gc->pauseState == FGC_PAUSED_CHILD) {
1198     gc->execState = FGC_STATE_WAIT_FORK;
1199     // spin or sleep
1200     usleep(500);
1201   }
1202 
1203   pid_t ppid_before_fork = getpid();
1204 
1205   TimeSampler_Start(&ts);
1206   pipe(gc->pipefd);  // create the pipe
1207 
1208   if (gc->type == FGC_TYPE_NOKEYSPACE) {
1209     // If we are not in key space we still need to acquire the GIL to use the fork api
1210     RedisModule_ThreadSafeContextLock(ctx);
1211   }
1212 
1213   if (!FGC_lock(gc, ctx)) {
1214 
1215     if (gc->type == FGC_TYPE_NOKEYSPACE) {
1216       RedisModule_ThreadSafeContextUnlock(ctx);
1217     }
1218 
1219     close(gc->pipefd[GC_READERFD]);
1220     close(gc->pipefd[GC_WRITERFD]);
1221 
1222     return 0;
1223   }
1224 
1225   gc->execState = FGC_STATE_SCANNING;
1226 
1227   cpid = FGC_fork(gc, ctx);  // duplicate the current process
1228 
1229   if (cpid == -1) {
1230     gc->retryInterval.tv_sec = RSGlobalConfig.forkGcRetryInterval;
1231 
1232     if (gc->type == FGC_TYPE_NOKEYSPACE) {
1233       RedisModule_ThreadSafeContextUnlock(ctx);
1234     }
1235 
1236     FGC_unlock(gc, ctx);
1237 
1238     close(gc->pipefd[GC_READERFD]);
1239     close(gc->pipefd[GC_WRITERFD]);
1240 
1241     return 1;
1242   }
1243 
1244   gc->deletedDocsFromLastRun = 0;
1245 
1246   if (gc->type == FGC_TYPE_NOKEYSPACE) {
1247     RedisModule_ThreadSafeContextUnlock(ctx);
1248   }
1249 
1250   FGC_unlock(gc, ctx);
1251 
1252   gc->retryInterval.tv_sec = RSGlobalConfig.forkGcRunIntervalSec;
1253 
1254   if (cpid == 0) {
1255     setpriority(PRIO_PROCESS, getpid(), 19);
1256     // fork process
1257     close(gc->pipefd[GC_READERFD]);
1258 #ifdef __linux__
1259     if (!FGC_haveRedisFork()) {
1260       // set the parrent death signal to SIGTERM
1261       int r = prctl(PR_SET_PDEATHSIG, SIGKILL);
1262       if (r == -1) {
1263         exit(1);
1264       }
1265       // test in case the original parent exited just
1266       // before the prctl() call
1267       if (getppid() != ppid_before_fork) exit(1);
1268     }
1269 #endif
1270     FGC_childScanIndexes(gc);
1271     close(gc->pipefd[GC_WRITERFD]);
1272     sleep(RSGlobalConfig.forkGcSleepBeforeExit);
1273     _exit(EXIT_SUCCESS);
1274   } else {
1275     // main process
1276     close(gc->pipefd[GC_WRITERFD]);
1277     while (gc->pauseState == FGC_PAUSED_PARENT) {
1278       gc->execState = FGC_STATE_WAIT_APPLY;
1279       // spin
1280       usleep(500);
1281     }
1282 
1283     gc->execState = FGC_STATE_APPLYING;
1284     if (FGC_parentHandleFromChild(gc) == REDISMODULE_ERR) {
1285       gcrv = 1;
1286     }
1287     close(gc->pipefd[GC_READERFD]);
1288     if (FGC_haveRedisFork()) {
1289 
1290       if (gc->type == FGC_TYPE_NOKEYSPACE) {
1291         // If we are not in key space we still need to acquire the GIL to use the fork api
1292         RedisModule_ThreadSafeContextLock(ctx);
1293       }
1294 
1295       if (!FGC_lock(gc, ctx)) {
1296         if (gc->type == FGC_TYPE_NOKEYSPACE) {
1297           RedisModule_ThreadSafeContextUnlock(ctx);
1298         }
1299 
1300         return 0;
1301       }
1302 
1303       // KillForkChild must be called when holding the GIL
1304       // otherwise it might cause a pipe leak and eventually run
1305       // out of file descriptor
1306       RedisModule_KillForkChild(cpid);
1307 
1308       if (gc->type == FGC_TYPE_NOKEYSPACE) {
1309         RedisModule_ThreadSafeContextUnlock(ctx);
1310       }
1311 
1312       FGC_unlock(gc, ctx);
1313 
1314     } else {
1315       pid_t id = wait4(cpid, NULL, 0, NULL);
1316       if (id == -1) {
1317         printf("an error acquire when waiting for fork to terminate, pid:%d", cpid);
1318       }
1319     }
1320   }
1321   gc->execState = FGC_STATE_IDLE;
1322   TimeSampler_End(&ts);
1323 
1324   long long msRun = TimeSampler_DurationMS(&ts);
1325 
1326   gc->stats.numCycles++;
1327   gc->stats.totalMSRun += msRun;
1328   gc->stats.lastRunTimeMs = msRun;
1329 
1330   return gcrv;
1331 }
1332 
1333 #if defined(__has_feature)
1334 #if __has_feature(thread_sanitizer)
1335 #define NO_TSAN_CHECK __attribute__((no_sanitize("thread")))
1336 #endif
1337 #endif
1338 #ifndef NO_TSAN_CHECK
1339 #define NO_TSAN_CHECK
1340 #endif
1341 
FGC_WaitAtFork(ForkGC * gc)1342 void FGC_WaitAtFork(ForkGC *gc) NO_TSAN_CHECK {
1343   RS_LOG_ASSERT(gc->pauseState == 0, "FGC pause state should be 0");
1344   gc->pauseState = FGC_PAUSED_CHILD;
1345 
1346   while (gc->execState != FGC_STATE_WAIT_FORK) {
1347     usleep(500);
1348   }
1349 }
1350 
FGC_WaitAtApply(ForkGC * gc)1351 void FGC_WaitAtApply(ForkGC *gc) NO_TSAN_CHECK {
1352   // Ensure that we're waiting for the child to begin
1353   RS_LOG_ASSERT(gc->pauseState == FGC_PAUSED_CHILD, "FGC pause state should be CHILD");
1354   RS_LOG_ASSERT(gc->execState == FGC_STATE_WAIT_FORK, "FGC exec state should be WAIT_FORK");
1355 
1356   gc->pauseState = FGC_PAUSED_PARENT;
1357   while (gc->execState != FGC_STATE_WAIT_APPLY) {
1358     usleep(500);
1359   }
1360 }
1361 
FGC_WaitClear(ForkGC * gc)1362 void FGC_WaitClear(ForkGC *gc) NO_TSAN_CHECK {
1363   gc->pauseState = FGC_PAUSED_UNPAUSED;
1364   while (gc->execState != FGC_STATE_IDLE) {
1365     usleep(500);
1366   }
1367 }
1368 
onTerminateCb(void * privdata)1369 static void onTerminateCb(void *privdata) {
1370   ForkGC *gc = privdata;
1371   if (gc->keyName && gc->type == FGC_TYPE_INKEYSPACE) {
1372     RedisModule_FreeString(gc->ctx, (RedisModuleString *)gc->keyName);
1373   }
1374 
1375   RedisModule_FreeThreadSafeContext(gc->ctx);
1376   rm_free(gc);
1377 }
1378 
statsCb(RedisModuleCtx * ctx,void * gcCtx)1379 static void statsCb(RedisModuleCtx *ctx, void *gcCtx) {
1380 #define REPLY_KVNUM(n, k, v)                   \
1381   RedisModule_ReplyWithSimpleString(ctx, k);   \
1382   RedisModule_ReplyWithDouble(ctx, (double)v); \
1383   n += 2
1384   ForkGC *gc = gcCtx;
1385 
1386   int n = 0;
1387   RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
1388   if (gc) {
1389     REPLY_KVNUM(n, "bytes_collected", gc->stats.totalCollected);
1390     REPLY_KVNUM(n, "total_ms_run", gc->stats.totalMSRun);
1391     REPLY_KVNUM(n, "total_cycles", gc->stats.numCycles);
1392     REPLY_KVNUM(n, "average_cycle_time_ms", (double)gc->stats.totalMSRun / gc->stats.numCycles);
1393     REPLY_KVNUM(n, "last_run_time_ms", (double)gc->stats.lastRunTimeMs);
1394     REPLY_KVNUM(n, "gc_numeric_trees_missed", (double)gc->stats.gcNumericNodesMissed);
1395     REPLY_KVNUM(n, "gc_blocks_denied", (double)gc->stats.gcBlocksDenied);
1396   }
1397   RedisModule_ReplySetArrayLength(ctx, n);
1398 }
1399 
killCb(void * ctx)1400 static void killCb(void *ctx) {
1401   ForkGC *gc = ctx;
1402   gc->deleting = 1;
1403 }
1404 
deleteCb(void * ctx)1405 static void deleteCb(void *ctx) {
1406   ForkGC *gc = ctx;
1407   ++gc->deletedDocsFromLastRun;
1408 }
1409 
getIntervalCb(void * ctx)1410 static struct timespec getIntervalCb(void *ctx) {
1411   ForkGC *gc = ctx;
1412   return gc->retryInterval;
1413 }
1414 
FGC_New(const RedisModuleString * k,uint64_t specUniqueId,GCCallbacks * callbacks)1415 ForkGC *FGC_New(const RedisModuleString *k, uint64_t specUniqueId, GCCallbacks *callbacks) {
1416   ForkGC *forkGc = rm_calloc(1, sizeof(*forkGc));
1417   *forkGc = (ForkGC){
1418       .rdbPossiblyLoading = 1,
1419       .specUniqueId = specUniqueId,
1420       .type = FGC_TYPE_INKEYSPACE,
1421       .deletedDocsFromLastRun = 0,
1422   };
1423   forkGc->retryInterval.tv_sec = RSGlobalConfig.forkGcRunIntervalSec;
1424   forkGc->retryInterval.tv_nsec = 0;
1425   forkGc->ctx = RedisModule_GetThreadSafeContext(NULL);
1426   if (k) {
1427     forkGc->keyName = RedisModule_CreateStringFromString(forkGc->ctx, k);
1428     RedisModule_FreeString(forkGc->ctx, (RedisModuleString *)k);
1429   }
1430 
1431   callbacks->onTerm = onTerminateCb;
1432   callbacks->periodicCallback = periodicCb;
1433   callbacks->renderStats = statsCb;
1434   callbacks->getInterval = getIntervalCb;
1435   callbacks->kill = killCb;
1436   callbacks->onDelete = deleteCb;
1437 
1438   return forkGc;
1439 }
1440 
FGC_NewFromSpec(IndexSpec * sp,uint64_t specUniqueId,GCCallbacks * callbacks)1441 ForkGC *FGC_NewFromSpec(IndexSpec *sp, uint64_t specUniqueId, GCCallbacks *callbacks) {
1442   ForkGC *ctx = FGC_New(NULL, specUniqueId, callbacks);
1443   ctx->sp = sp;
1444   ctx->type = FGC_TYPE_NOKEYSPACE;
1445   return ctx;
1446 }
1447