1 #include "fork_gc.h"
2 #include "util/arr.h"
3 #include "search_ctx.h"
4 #include "inverted_index.h"
5 #include "redis_index.h"
6 #include "numeric_index.h"
7 #include "tag_index.h"
8 #include "time_sample.h"
9 #include <stdlib.h>
10 #include <stdbool.h>
11 #include <unistd.h>
12 #include <sys/wait.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include "rwlock.h"
16 #include "util/khash.h"
17 #include <float.h>
18 #include "module.h"
19 #include "rmutil/rm_assert.h"
20
21 #ifdef __linux__
22 #include <sys/prctl.h>
23 #endif
24
25 #define GC_WRITERFD 1
26 #define GC_READERFD 0
27
28 typedef enum {
29 // Terms have been collected
30 FGC_COLLECTED,
31 // No more terms remain
32 FGC_DONE,
33 // Pipe error, child probably crashed
34 FGC_CHILD_ERROR,
35 // Error on the parent
36 FGC_PARENT_ERROR
37 } FGCError;
38
FGC_lock(ForkGC * gc,RedisModuleCtx * ctx)39 static int __attribute__((warn_unused_result)) FGC_lock(ForkGC *gc, RedisModuleCtx *ctx) {
40 if (gc->type == FGC_TYPE_NOKEYSPACE) {
41 RWLOCK_ACQUIRE_WRITE();
42 if (gc->deleting) {
43 RWLOCK_RELEASE();
44 return 0;
45 }
46 } else {
47 RedisModule_ThreadSafeContextLock(ctx);
48 if (gc->deleting) {
49 RedisModule_ThreadSafeContextUnlock(ctx);
50 return 0;
51 }
52 }
53 return 1;
54 }
55
FGC_unlock(ForkGC * gc,RedisModuleCtx * ctx)56 static void FGC_unlock(ForkGC *gc, RedisModuleCtx *ctx) {
57 if (gc->type == FGC_TYPE_NOKEYSPACE) {
58 RWLOCK_RELEASE();
59 } else {
60 RedisModule_ThreadSafeContextUnlock(ctx);
61 }
62 }
63
FGC_getSctx(ForkGC * gc,RedisModuleCtx * ctx)64 static RedisSearchCtx *FGC_getSctx(ForkGC *gc, RedisModuleCtx *ctx) {
65 RedisSearchCtx *sctx = NULL;
66 if (gc->type == FGC_TYPE_NOKEYSPACE) {
67 sctx = rm_malloc(sizeof(*sctx));
68 *sctx = (RedisSearchCtx)SEARCH_CTX_STATIC(ctx, gc->sp);
69 } else if (gc->type == FGC_TYPE_INKEYSPACE) {
70 sctx = NewSearchCtx(ctx, (RedisModuleString *)gc->keyName, false);
71 }
72 return sctx;
73 }
74
FGC_updateStats(RedisSearchCtx * sctx,ForkGC * gc,size_t recordsRemoved,size_t bytesCollected)75 static void FGC_updateStats(RedisSearchCtx *sctx, ForkGC *gc, size_t recordsRemoved,
76 size_t bytesCollected) {
77 sctx->spec->stats.numRecords -= recordsRemoved;
78 sctx->spec->stats.invertedSize -= bytesCollected;
79 gc->stats.totalCollected += bytesCollected;
80 }
81
FGC_sendFixed(ForkGC * fgc,const void * buff,size_t len)82 static void FGC_sendFixed(ForkGC *fgc, const void *buff, size_t len) {
83 RS_LOG_ASSERT(len > 0, "buffer length cannot be 0");
84 ssize_t size = write(fgc->pipefd[GC_WRITERFD], buff, len);
85 if (size != len) {
86 perror("broken pipe, exiting GC fork: write() failed");
87 // just exit, do not abort(), which will trigger a watchdog on RLEC, causing adverse effects
88 RedisModule_Log(NULL, "warning", "GC fork: broken pipe, exiting");
89 exit(1);
90 }
91 }
92
93 #define FGC_SEND_VAR(fgc, v) FGC_sendFixed(fgc, &v, sizeof v)
94
FGC_sendBuffer(ForkGC * fgc,const void * buff,size_t len)95 static void FGC_sendBuffer(ForkGC *fgc, const void *buff, size_t len) {
96 FGC_SEND_VAR(fgc, len);
97 if (len > 0) {
98 FGC_sendFixed(fgc, buff, len);
99 }
100 }
101
102 static int FGC_recvFixed(ForkGC *fgc, void *buf, size_t len);
103
104 /**
105 * Send instead of a string to indicate that no more buffers are to be received
106 */
FGC_sendTerminator(ForkGC * fgc)107 static void FGC_sendTerminator(ForkGC *fgc) {
108 size_t smax = SIZE_MAX;
109 FGC_SEND_VAR(fgc, smax);
110 }
111
FGC_recvFixed(ForkGC * fgc,void * buf,size_t len)112 static int __attribute__((warn_unused_result)) FGC_recvFixed(ForkGC *fgc, void *buf, size_t len) {
113 while (len) {
114 ssize_t nrecvd = read(fgc->pipefd[GC_READERFD], buf, len);
115 if (nrecvd > 0) {
116 buf += nrecvd;
117 len -= nrecvd;
118 } else if (nrecvd < 0 && errno != EINTR) {
119 printf("Got error while reading from pipe (%s)", strerror(errno));
120 return REDISMODULE_ERR;
121 }
122 }
123 return REDISMODULE_OK;
124 }
125
126 #define TRY_RECV_FIXED(gc, obj, len) \
127 if (FGC_recvFixed(gc, obj, len) != REDISMODULE_OK) { \
128 return REDISMODULE_ERR; \
129 }
130
131 static void *RECV_BUFFER_EMPTY = (void *)0x0deadbeef;
132
133 static int __attribute__((warn_unused_result))
FGC_recvBuffer(ForkGC * fgc,void ** buf,size_t * len)134 FGC_recvBuffer(ForkGC *fgc, void **buf, size_t *len) {
135 TRY_RECV_FIXED(fgc, len, sizeof *len);
136 if (*len == SIZE_MAX) {
137 *buf = RECV_BUFFER_EMPTY;
138 return REDISMODULE_OK;
139 }
140 if (*len == 0) {
141 *buf = NULL;
142 return REDISMODULE_OK;
143 }
144
145 *buf = rm_malloc(*len + 1);
146 ((char *)(*buf))[*len] = 0;
147 if (FGC_recvFixed(fgc, *buf, *len) != REDISMODULE_OK) {
148 rm_free(buf);
149 return REDISMODULE_ERR;
150 }
151 return REDISMODULE_OK;
152 }
153
154 #define TRY_RECV_BUFFER(gc, buf, len) \
155 if (FGC_recvBuffer(gc, buf, len) != REDISMODULE_OK) { \
156 return REDISMODULE_ERR; \
157 }
158
159 typedef struct {
160 // Number of blocks prior to repair
161 uint32_t nblocksOrig;
162 // Number of blocks repaired
163 uint32_t nblocksRepaired;
164 // Number of bytes cleaned in inverted index
165 uint64_t nbytesCollected;
166 // Number of document records removed
167 uint64_t ndocsCollected;
168
169 /** Specific information about the _last_ index block */
170 size_t lastblkDocsRemoved;
171 size_t lastblkBytesCollected;
172 size_t lastblkNumDocs;
173 } MSG_IndexInfo;
174
175 /** Structure sent describing an index block */
176 typedef struct {
177 IndexBlock blk;
178 int64_t oldix; // Old position of the block
179 int64_t newix; // New position of the block
180 // the actual content of the block follows...
181 } MSG_RepairedBlock;
182
183 typedef struct {
184 void *ptr; // Address of the buffer to free
185 uint32_t oldix; // Old index of deleted block
186 uint32_t _pad; // Uninitialized reads, otherwise
187 } MSG_DeletedBlock;
188
189 /**
190 * headerCallback and hdrarg are invoked before the inverted index is sent, only
191 * iff the inverted index was repaired.
192 * RepairCallback and its argument are passed directly to IndexBlock_Repair; see
193 * that function for more details.
194 */
FGC_childRepairInvidx(ForkGC * gc,RedisSearchCtx * sctx,InvertedIndex * idx,void (* headerCallback)(ForkGC *,void *),void * hdrarg,IndexRepairParams * params)195 static bool FGC_childRepairInvidx(ForkGC *gc, RedisSearchCtx *sctx, InvertedIndex *idx,
196 void (*headerCallback)(ForkGC *, void *), void *hdrarg,
197 IndexRepairParams *params) {
198 MSG_RepairedBlock *fixed = array_new(MSG_RepairedBlock, 10);
199 MSG_DeletedBlock *deleted = array_new(MSG_DeletedBlock, 10);
200 IndexBlock *blocklist = array_new(IndexBlock, idx->size);
201 MSG_IndexInfo ixmsg = {.nblocksOrig = idx->size};
202 IndexRepairParams params_s = {0};
203 bool rv = false;
204 if (!params) {
205 params = ¶ms_s;
206 }
207
208 for (size_t i = 0; i < idx->size; ++i) {
209 params->bytesCollected = 0;
210 params->bytesBeforFix = 0;
211 params->bytesAfterFix = 0;
212 IndexBlock *blk = idx->blocks + i;
213 if (blk->lastId - blk->firstId > UINT32_MAX) {
214 // Skip over blocks which have a wide variation. In the future we might
215 // want to split a block into two (or more) on high-delta boundaries.
216 // todo: is it ok??
217 blocklist = array_append(blocklist, *blk);
218 continue;
219 }
220
221 // Capture the pointer address before the block is cleared; otherwise
222 // the pointer might be freed!
223 void *bufptr = blk->buf.data;
224 int nrepaired = IndexBlock_Repair(blk, &sctx->spec->docs, idx->flags, params);
225 // We couldn't repair the block - return 0
226 if (nrepaired == -1) {
227 goto done;
228 } else if (nrepaired == 0) {
229 // unmodified block
230 blocklist = array_append(blocklist, *blk);
231 continue;
232 }
233
234 if (blk->numDocs == 0) {
235 // this block should be removed
236 MSG_DeletedBlock *delmsg = array_ensure_tail(&deleted, MSG_DeletedBlock);
237 *delmsg = (MSG_DeletedBlock){.ptr = bufptr, .oldix = i};
238 } else {
239 blocklist = array_append(blocklist, *blk);
240 MSG_RepairedBlock *fixmsg = array_ensure_tail(&fixed, MSG_RepairedBlock);
241 fixmsg->newix = array_len(blocklist) - 1;
242 fixmsg->oldix = i;
243 fixmsg->blk = *blk;
244 ixmsg.nblocksRepaired++;
245 }
246
247 ixmsg.nbytesCollected += (params->bytesBeforFix - params->bytesAfterFix);
248 ixmsg.ndocsCollected += nrepaired;
249 if (i == idx->size - 1) {
250 ixmsg.lastblkBytesCollected = ixmsg.nbytesCollected;
251 ixmsg.lastblkDocsRemoved = nrepaired;
252 ixmsg.lastblkNumDocs = blk->numDocs + nrepaired;
253 }
254 }
255
256 if (array_len(fixed) == 0 && array_len(deleted) == 0) {
257 // No blocks were removed or repaired
258 goto done;
259 }
260
261 headerCallback(gc, hdrarg);
262 FGC_sendFixed(gc, &ixmsg, sizeof ixmsg);
263 if (array_len(blocklist) == idx->size) {
264 // no empty block, there is no need to send the blocks array. Don't send
265 // any new blocks
266 FGC_sendBuffer(gc, NULL, 0);
267 } else {
268 FGC_sendBuffer(gc, blocklist, array_len(blocklist) * sizeof(*blocklist));
269 }
270 FGC_sendBuffer(gc, deleted, array_len(deleted) * sizeof(*deleted));
271
272 for (size_t i = 0; i < array_len(fixed); ++i) {
273 // write fix block
274 const MSG_RepairedBlock *msg = fixed + i;
275 const IndexBlock *blk = blocklist + msg->newix;
276 FGC_sendFixed(gc, msg, sizeof(*msg));
277 FGC_sendBuffer(gc, IndexBlock_DataBuf(blk), IndexBlock_DataLen(blk));
278 }
279 rv = true;
280
281 done:
282 array_free(fixed);
283 array_free(blocklist);
284 array_free(deleted);
285 return rv;
286 }
287
sendHeaderString(ForkGC * gc,void * arg)288 static void sendHeaderString(ForkGC *gc, void *arg) {
289 struct iovec { void *iov_base; size_t iov_len; };
290 struct iovec *iov = arg;
291 FGC_sendBuffer(gc, iov->iov_base, iov->iov_len);
292 }
293
FGC_childCollectTerms(ForkGC * gc,RedisSearchCtx * sctx)294 static void FGC_childCollectTerms(ForkGC *gc, RedisSearchCtx *sctx) {
295 TrieIterator *iter = Trie_Iterate(sctx->spec->terms, "", 0, 0, 1);
296 rune *rstr = NULL;
297 t_len slen = 0;
298 float score = 0;
299 int dist = 0;
300 while (TrieIterator_Next(iter, &rstr, &slen, NULL, &score, &dist)) {
301 size_t termLen;
302 char *term = runesToStr(rstr, slen, &termLen);
303 RedisModuleKey *idxKey = NULL;
304 InvertedIndex *idx = Redis_OpenInvertedIndexEx(sctx, term, strlen(term), 1, &idxKey);
305 if (idx) {
306 struct iovec { void *iov_base; size_t iov_len; } iov;
307 iov.iov_base = term;
308 iov.iov_len = termLen;
309 FGC_childRepairInvidx(gc, sctx, idx, sendHeaderString, &iov, NULL);
310 }
311 if (idxKey) {
312 RedisModule_CloseKey(idxKey);
313 }
314 rm_free(term);
315 }
316 DFAFilter_Free(iter->ctx);
317 rm_free(iter->ctx);
318 TrieIterator_Free(iter);
319
320 // we are done with terms
321 FGC_sendTerminator(gc);
322 }
323
324 KHASH_MAP_INIT_INT64(cardvals, size_t)
325
326 typedef struct {
327 const IndexBlock *lastblk;
328 khash_t(cardvals) * delLast;
329 khash_t(cardvals) * delRest;
330 } numCbCtx;
331
332 typedef union {
333 uint64_t u64;
334 double d48;
335 } numUnion;
336
countDeleted(const RSIndexResult * r,const IndexBlock * blk,void * arg)337 static void countDeleted(const RSIndexResult *r, const IndexBlock *blk, void *arg) {
338 numCbCtx *ctx = arg;
339 khash_t(cardvals) *ht = NULL;
340 if (blk == ctx->lastblk) {
341 if ((ht = ctx->delLast) == NULL) {
342 ht = ctx->delLast = kh_init(cardvals);
343 }
344 } else if ((ht = ctx->delRest) == NULL) {
345 ht = ctx->delRest = kh_init(cardvals);
346 }
347 RS_LOG_ASSERT(ht, "cardvals should not be NULL");
348 int added = 0;
349 numUnion u = {r->num.value};
350 khiter_t it = kh_put(cardvals, ht, u.u64, &added);
351 if (!added) {
352 // i.e. already existed
353 kh_val(ht, it)++;
354 } else {
355 kh_val(ht, it) = 0;
356 }
357 }
358
359 typedef struct {
360 int type;
361 const char *field;
362 const void *curPtr;
363 char *tagValue;
364 size_t tagLen;
365 uint64_t uniqueId;
366 int sentFieldName;
367 } tagNumHeader;
368
sendNumericTagHeader(ForkGC * fgc,void * arg)369 static void sendNumericTagHeader(ForkGC *fgc, void *arg) {
370 tagNumHeader *info = arg;
371 if (!info->sentFieldName) {
372 info->sentFieldName = 1;
373 FGC_sendBuffer(fgc, info->field, strlen(info->field));
374 FGC_sendFixed(fgc, &info->uniqueId, sizeof info->uniqueId);
375 }
376 FGC_SEND_VAR(fgc, info->curPtr);
377 if (info->type == RSFLDTYPE_TAG) {
378 FGC_sendBuffer(fgc, info->tagValue, info->tagLen);
379 }
380 }
381
382 // If anything other than FGC_COLLECTED is returned, it is an error or done
recvNumericTagHeader(ForkGC * fgc,char ** fieldName,size_t * fieldNameLen,uint64_t * id)383 static FGCError recvNumericTagHeader(ForkGC *fgc, char **fieldName, size_t *fieldNameLen,
384 uint64_t *id) {
385 if (FGC_recvBuffer(fgc, (void **)fieldName, fieldNameLen) != REDISMODULE_OK) {
386 return FGC_PARENT_ERROR;
387 }
388 if (*fieldName == RECV_BUFFER_EMPTY) {
389 *fieldName = NULL;
390 return FGC_DONE;
391 }
392
393 if (FGC_recvFixed(fgc, id, sizeof(*id)) != REDISMODULE_OK) {
394 rm_free(*fieldName);
395 *fieldName = NULL;
396 return FGC_PARENT_ERROR;
397 }
398 return FGC_COLLECTED;
399 }
400
sendKht(ForkGC * gc,const khash_t (cardvals)* kh)401 static void sendKht(ForkGC *gc, const khash_t(cardvals) * kh) {
402 size_t n = 0;
403 if (!kh) {
404 FGC_SEND_VAR(gc, n);
405 return;
406 }
407 n = kh_size(kh);
408 size_t nsent = 0;
409
410 FGC_SEND_VAR(gc, n);
411 for (khiter_t it = kh_begin(kh); it != kh_end(kh); ++it) {
412 if (!kh_exist(kh, it)) {
413 continue;
414 }
415 numUnion u = {kh_key(kh, it)};
416 size_t count = kh_val(kh, it);
417 CardinalityValue cu = {.value = u.d48, .appearances = count};
418 FGC_SEND_VAR(gc, cu);
419 nsent++;
420 }
421 RS_LOG_ASSERT(nsent == n, "Not all hashes has been sent");
422 }
423
FGC_childCollectNumeric(ForkGC * gc,RedisSearchCtx * sctx)424 static void FGC_childCollectNumeric(ForkGC *gc, RedisSearchCtx *sctx) {
425 RedisModuleKey *idxKey = NULL;
426 FieldSpec **numericFields = getFieldsByType(sctx->spec, INDEXFLD_T_NUMERIC | INDEXFLD_T_GEO);
427
428 for (int i = 0; i < array_len(numericFields); ++i) {
429 RedisModuleString *keyName =
430 IndexSpec_GetFormattedKey(sctx->spec, numericFields[i], INDEXFLD_T_NUMERIC);
431 NumericRangeTree *rt = OpenNumericIndex(sctx, keyName, &idxKey);
432
433 NumericRangeTreeIterator *gcIterator = NumericRangeTreeIterator_New(rt);
434
435 NumericRangeNode *currNode = NULL;
436 tagNumHeader header = {.type = RSFLDTYPE_NUMERIC,
437 .field = numericFields[i]->name,
438 .uniqueId = rt->uniqueId};
439
440 while ((currNode = NumericRangeTreeIterator_Next(gcIterator))) {
441 if (!currNode->range) {
442 continue;
443 }
444 numCbCtx nctx = {0};
445 InvertedIndex *idx = currNode->range->entries;
446 nctx.lastblk = idx->blocks + idx->size - 1;
447 IndexRepairParams params = {.RepairCallback = countDeleted, .arg = &nctx};
448 header.curPtr = currNode;
449 bool repaired = FGC_childRepairInvidx(gc, sctx, idx, sendNumericTagHeader, &header, ¶ms);
450
451 if (repaired) {
452 sendKht(gc, nctx.delRest);
453 sendKht(gc, nctx.delLast);
454 }
455 if (nctx.delRest) {
456 kh_destroy(cardvals, nctx.delRest);
457 }
458 if (nctx.delLast) {
459 kh_destroy(cardvals, nctx.delLast);
460 }
461 }
462
463 if (header.sentFieldName) {
464 // If we've repaired at least one entry, send the terminator;
465 // note that "terminator" just means a zero address and not the
466 // "no more strings" terminator in FGC_sendTerminator
467 void *pdummy = NULL;
468 FGC_SEND_VAR(gc, pdummy);
469 }
470
471 if (idxKey) {
472 RedisModule_CloseKey(idxKey);
473 }
474
475 NumericRangeTreeIterator_Free(gcIterator);
476 }
477
478 // we are done with numeric fields
479 FGC_sendTerminator(gc);
480 }
481
FGC_childCollectTags(ForkGC * gc,RedisSearchCtx * sctx)482 static void FGC_childCollectTags(ForkGC *gc, RedisSearchCtx *sctx) {
483 RedisModuleKey *idxKey = NULL;
484 FieldSpec **tagFields = getFieldsByType(sctx->spec, INDEXFLD_T_TAG);
485 if (array_len(tagFields) != 0) {
486 for (int i = 0; i < array_len(tagFields); ++i) {
487 RedisModuleString *keyName =
488 IndexSpec_GetFormattedKey(sctx->spec, tagFields[i], INDEXFLD_T_TAG);
489 TagIndex *tagIdx = TagIndex_Open(sctx, keyName, false, &idxKey);
490 if (!tagIdx) {
491 continue;
492 }
493
494 tagNumHeader header = {.type = RSFLDTYPE_TAG,
495 .field = tagFields[i]->name,
496 .uniqueId = tagIdx->uniqueId};
497
498 TrieMapIterator *iter = TrieMap_Iterate(tagIdx->values, "", 0);
499 char *ptr;
500 tm_len_t len;
501 InvertedIndex *value;
502 while (TrieMapIterator_Next(iter, &ptr, &len, (void **)&value)) {
503 header.curPtr = value;
504 header.tagValue = ptr;
505 header.tagLen = len;
506 // send repaired data
507 FGC_childRepairInvidx(gc, sctx, value, sendNumericTagHeader, &header, NULL);
508 }
509
510 // we are done with the current field
511 if (header.sentFieldName) {
512 void *pdummy = NULL;
513 FGC_SEND_VAR(gc, pdummy);
514 }
515
516 if (idxKey) {
517 RedisModule_CloseKey(idxKey);
518 }
519 }
520 }
521 // we are done with numeric fields
522 FGC_sendTerminator(gc);
523 }
524
FGC_childScanIndexes(ForkGC * gc)525 static void FGC_childScanIndexes(ForkGC *gc) {
526 RedisSearchCtx *sctx = FGC_getSctx(gc, gc->ctx);
527 if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
528 // write log here
529 return;
530 }
531
532 FGC_childCollectTerms(gc, sctx);
533 FGC_childCollectNumeric(gc, sctx);
534 FGC_childCollectTags(gc, sctx);
535
536 SearchCtx_Free(sctx);
537 }
538
539 typedef struct {
540 MSG_DeletedBlock *delBlocks;
541 size_t numDelBlocks;
542
543 MSG_RepairedBlock *changedBlocks;
544
545 IndexBlock *newBlocklist;
546 size_t newBlocklistSize;
547 int lastBlockIgnored;
548 } InvIdxBuffers;
549
550 static int __attribute__((warn_unused_result))
FGC_recvRepairedBlock(ForkGC * gc,MSG_RepairedBlock * binfo)551 FGC_recvRepairedBlock(ForkGC *gc, MSG_RepairedBlock *binfo) {
552 if (FGC_recvFixed(gc, binfo, sizeof(*binfo)) != REDISMODULE_OK) {
553 return REDISMODULE_ERR;
554 }
555 Buffer *b = &binfo->blk.buf;
556 if (FGC_recvBuffer(gc, (void **)&b->data, &b->offset) != REDISMODULE_OK) {
557 return REDISMODULE_ERR;
558 }
559 b->cap = b->offset;
560 return REDISMODULE_OK;
561 }
562
563 static int __attribute__((warn_unused_result))
FGC_recvInvIdx(ForkGC * gc,InvIdxBuffers * bufs,MSG_IndexInfo * info)564 FGC_recvInvIdx(ForkGC *gc, InvIdxBuffers *bufs, MSG_IndexInfo *info) {
565 size_t nblocksRecvd = 0;
566 if (FGC_recvFixed(gc, info, sizeof(*info)) != REDISMODULE_OK) {
567 return REDISMODULE_ERR;
568 }
569 if (FGC_recvBuffer(gc, (void **)&bufs->newBlocklist, &bufs->newBlocklistSize) != REDISMODULE_OK) {
570 return REDISMODULE_ERR;
571 }
572
573 if (bufs->newBlocklistSize) {
574 bufs->newBlocklistSize /= sizeof(*bufs->newBlocklist);
575 }
576 if (FGC_recvBuffer(gc, (void **)&bufs->delBlocks, &bufs->numDelBlocks) != REDISMODULE_OK) {
577 goto error;
578 }
579 bufs->numDelBlocks /= sizeof(*bufs->delBlocks);
580 bufs->changedBlocks = rm_malloc(sizeof(*bufs->changedBlocks) * info->nblocksRepaired);
581 for (size_t i = 0; i < info->nblocksRepaired; ++i) {
582 if (FGC_recvRepairedBlock(gc, bufs->changedBlocks + i) != REDISMODULE_OK) {
583 goto error;
584 }
585 nblocksRecvd++;
586 }
587 return REDISMODULE_OK;
588
589 error:
590 rm_free(bufs->newBlocklist);
591 for (size_t ii = 0; ii < nblocksRecvd; ++ii) {
592 rm_free(bufs->changedBlocks[ii].blk.buf.data);
593 }
594 rm_free(bufs->changedBlocks);
595 memset(bufs, 0, sizeof(*bufs));
596 return REDISMODULE_ERR;
597 }
598
freeInvIdx(InvIdxBuffers * bufs,MSG_IndexInfo * info)599 static void freeInvIdx(InvIdxBuffers *bufs, MSG_IndexInfo *info) {
600 rm_free(bufs->newBlocklist);
601 rm_free(bufs->delBlocks);
602
603 if (bufs->changedBlocks) {
604 // could be null because of pipe error
605 for (size_t ii = 0; ii < info->nblocksRepaired; ++ii) {
606 rm_free(bufs->changedBlocks[ii].blk.buf.data);
607 }
608 }
609 rm_free(bufs->changedBlocks);
610 }
611
checkLastBlock(ForkGC * gc,InvIdxBuffers * idxData,MSG_IndexInfo * info,InvertedIndex * idx)612 static void checkLastBlock(ForkGC *gc, InvIdxBuffers *idxData, MSG_IndexInfo *info,
613 InvertedIndex *idx) {
614 IndexBlock *lastOld = idx->blocks + info->nblocksOrig - 1;
615 if (info->lastblkDocsRemoved == 0) {
616 // didn't touch last block in child
617 return;
618 }
619 if (info->lastblkNumDocs == lastOld->numDocs) {
620 // didn't touch last block in parent
621 return;
622 }
623
624 if (info->lastblkDocsRemoved == info->lastblkNumDocs) {
625 // Last block was deleted entirely while updates on the main process.
626 // We need to remove it from delBlocks list
627 idxData->numDelBlocks--;
628
629 // Then We need add it to the newBlocklist.
630 idxData->newBlocklistSize++;
631 idxData->newBlocklist = rm_realloc(idxData->newBlocklist,
632 sizeof(*idxData->newBlocklist) * idxData->newBlocklistSize);
633 idxData->newBlocklist[idxData->newBlocklistSize - 1] = *lastOld;
634 } else {
635 // Last block was modified on the child and on the parent.
636
637 // we need to remove it from changedBlocks
638 MSG_RepairedBlock *rb = idxData->changedBlocks + info->nblocksRepaired - 1;
639 indexBlock_Free(&rb->blk);
640 info->nblocksRepaired--;
641
642 // Then add it to newBlocklist if newBlocklist is not NULL.
643 // If newBlocklist!=NULL then the last block must be there (it was changed and not deleted)
644 // If newBlocklist==NULL then by decreasing the nblocksOrig by one we make sure to keep the last
645 // block
646 if (idxData->newBlocklist) {
647 idxData->newBlocklist[idxData->newBlocklistSize - 1] = *lastOld;
648 } else {
649 --info->nblocksOrig;
650 }
651 }
652
653 info->ndocsCollected -= info->lastblkDocsRemoved;
654 info->nbytesCollected -= info->lastblkBytesCollected;
655 idxData->lastBlockIgnored = 1;
656 gc->stats.gcBlocksDenied++;
657 }
658
FGC_applyInvertedIndex(ForkGC * gc,InvIdxBuffers * idxData,MSG_IndexInfo * info,InvertedIndex * idx)659 static void FGC_applyInvertedIndex(ForkGC *gc, InvIdxBuffers *idxData, MSG_IndexInfo *info,
660 InvertedIndex *idx) {
661 checkLastBlock(gc, idxData, info, idx);
662 for (size_t i = 0; i < info->nblocksRepaired; ++i) {
663 MSG_RepairedBlock *blockModified = idxData->changedBlocks + i;
664 indexBlock_Free(&idx->blocks[blockModified->oldix]);
665 }
666 for (size_t i = 0; i < idxData->numDelBlocks; ++i) {
667 // Blocks that were deleted entirely:
668 MSG_DeletedBlock *delinfo = idxData->delBlocks + i;
669 rm_free(delinfo->ptr);
670 }
671 rm_free(idxData->delBlocks);
672
673 // Ensure the old index is at least as big as the new index' size
674 RS_LOG_ASSERT(idx->size >= info->nblocksOrig, "Old index should be larger or equal to new index");
675
676 if (idxData->newBlocklist) {
677 /**
678 * At this point, we check if the last block has had new data added to it,
679 * but was _not_ repaired. We check for a repaired last block in
680 * checkLastBlock().
681 */
682
683 if (!info->lastblkDocsRemoved) {
684 /**
685 * Last block was unmodified-- let's prefer the last block's pointer
686 * over our own (which may be stale).
687 * If the last block was repaired, this is handled above
688 */
689 idxData->newBlocklist[idxData->newBlocklistSize - 1] = idx->blocks[info->nblocksOrig - 1];
690 }
691
692 // Number of blocks added in the parent process since the last scan
693 size_t newAddedLen = idx->size - info->nblocksOrig;
694
695 // The final size is the reordered block size, plus the number of blocks
696 // which we haven't scanned yet, because they were added in the parent
697 size_t totalLen = idxData->newBlocklistSize + newAddedLen;
698
699 idxData->newBlocklist =
700 rm_realloc(idxData->newBlocklist, totalLen * sizeof(*idxData->newBlocklist));
701 memcpy(idxData->newBlocklist + idxData->newBlocklistSize, (idx->blocks + info->nblocksOrig),
702 newAddedLen * sizeof(*idxData->newBlocklist));
703
704 rm_free(idx->blocks);
705 idxData->newBlocklistSize += newAddedLen;
706 idx->blocks = idxData->newBlocklist;
707 idx->size = idxData->newBlocklistSize;
708 } else if (idxData->numDelBlocks) {
709 // In this case, all blocks the child has seen need to be deleted. We don't
710 // get a new block list, because they are all gone..
711 size_t newAddedLen = idx->size - info->nblocksOrig;
712 if (newAddedLen) {
713 memmove(idx->blocks, idx->blocks + info->nblocksOrig, sizeof(*idx->blocks) * newAddedLen);
714 }
715 idx->size = newAddedLen;
716 if (idx->size == 0) {
717 InvertedIndex_AddBlock(idx, 0);
718 }
719 }
720
721 for (size_t i = 0; i < info->nblocksRepaired; ++i) {
722 MSG_RepairedBlock *blockModified = idxData->changedBlocks + i;
723 idx->blocks[blockModified->newix] = blockModified->blk;
724 }
725
726 idx->numDocs -= info->ndocsCollected;
727 idx->gcMarker++;
728 }
729
FGC_parentHandleTerms(ForkGC * gc,RedisModuleCtx * rctx)730 static FGCError FGC_parentHandleTerms(ForkGC *gc, RedisModuleCtx *rctx) {
731 FGCError status = FGC_COLLECTED;
732 size_t len;
733 int hasLock = 0;
734 char *term = NULL;
735 if (FGC_recvBuffer(gc, (void **)&term, &len) != REDISMODULE_OK) {
736 return FGC_CHILD_ERROR;
737 }
738 RedisModuleKey *idxKey = NULL;
739 RedisSearchCtx *sctx = NULL;
740
741 if (term == RECV_BUFFER_EMPTY) {
742 return FGC_DONE;
743 }
744
745 InvIdxBuffers idxbufs = {0};
746 MSG_IndexInfo info = {0};
747 if (FGC_recvInvIdx(gc, &idxbufs, &info) != REDISMODULE_OK) {
748 rm_free(term);
749 return FGC_CHILD_ERROR;
750 }
751
752 if (!FGC_lock(gc, rctx)) {
753 status = FGC_PARENT_ERROR;
754 goto cleanup;
755 }
756
757 hasLock = 1;
758 sctx = FGC_getSctx(gc, rctx);
759 if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
760 status = FGC_PARENT_ERROR;
761 goto cleanup;
762 }
763
764 InvertedIndex *idx = Redis_OpenInvertedIndexEx(sctx, term, len, 1, &idxKey);
765
766 if (idx == NULL) {
767 status = FGC_PARENT_ERROR;
768 goto cleanup;
769 }
770
771 FGC_applyInvertedIndex(gc, &idxbufs, &info, idx);
772 FGC_updateStats(sctx, gc, info.ndocsCollected, info.nbytesCollected);
773
774 if (idx->numDocs == 0) {
775 // inverted index was cleaned entirely lets free it
776 RedisModuleString *termKey = fmtRedisTermKey(sctx, term, len);
777 size_t formatedTremLen;
778 const char *formatedTrem = RedisModule_StringPtrLen(termKey, &formatedTremLen);
779 if (sctx->spec->keysDict) {
780 dictDelete(sctx->spec->keysDict, termKey);
781 }
782 Trie_Delete(sctx->spec->terms, term, len);
783 RedisModule_FreeString(sctx->redisCtx, termKey);
784 }
785
786 cleanup:
787
788 if (idxKey) {
789 RedisModule_CloseKey(idxKey);
790 }
791 if (sctx) {
792 SearchCtx_Free(sctx);
793 }
794 if (hasLock) {
795 FGC_unlock(gc, rctx);
796 }
797 rm_free(term);
798 if (status != FGC_COLLECTED) {
799 freeInvIdx(&idxbufs, &info);
800 } else {
801 rm_free(idxbufs.changedBlocks);
802 }
803 return status;
804 }
805
806 typedef struct {
807 // Node in the tree that was GC'd
808 NumericRangeNode *node;
809 CardinalityValue *lastBlockDeleted;
810 CardinalityValue *restBlockDeleted;
811 size_t nlastBlockDel;
812 size_t nrestBlockDel;
813 InvIdxBuffers idxbufs;
814 MSG_IndexInfo info;
815 } NumGcInfo;
816
recvCardvals(ForkGC * fgc,CardinalityValue ** tgt,size_t * len)817 static int recvCardvals(ForkGC *fgc, CardinalityValue **tgt, size_t *len) {
818 if (FGC_recvFixed(fgc, len, sizeof(*len)) != REDISMODULE_OK) {
819 return REDISMODULE_ERR;
820 }
821 *len *= sizeof(**tgt);
822 if (!*len) {
823 *tgt = NULL;
824 return REDISMODULE_OK;
825 }
826 *tgt = rm_malloc(sizeof(**tgt) * *len);
827 int rc = FGC_recvFixed(fgc, *tgt, *len);
828 if (rc == REDISMODULE_OK) {
829 *len /= sizeof(**tgt);
830 }
831 return rc;
832 }
833
recvNumIdx(ForkGC * gc,NumGcInfo * ninfo)834 static FGCError recvNumIdx(ForkGC *gc, NumGcInfo *ninfo) {
835 if (FGC_recvFixed(gc, &ninfo->node, sizeof(ninfo->node)) != REDISMODULE_OK) {
836 goto error;
837 }
838 if (ninfo->node == NULL) {
839 return FGC_DONE;
840 }
841
842 if (FGC_recvInvIdx(gc, &ninfo->idxbufs, &ninfo->info) != REDISMODULE_OK) {
843 goto error;
844 }
845
846 if (recvCardvals(gc, &ninfo->restBlockDeleted, &ninfo->nrestBlockDel) != REDISMODULE_OK) {
847 goto error;
848 }
849 if (recvCardvals(gc, &ninfo->lastBlockDeleted, &ninfo->nlastBlockDel) != REDISMODULE_OK) {
850 goto error;
851 }
852
853 return FGC_COLLECTED;
854
855 error:
856 printf("Error receiving numeric index!\n");
857 freeInvIdx(&ninfo->idxbufs, &ninfo->info);
858 rm_free(ninfo->lastBlockDeleted);
859 rm_free(ninfo->restBlockDeleted);
860 memset(ninfo, 0, sizeof(*ninfo));
861 return FGC_CHILD_ERROR;
862 }
863
resetCardinality(NumGcInfo * info,NumericRangeNode * currNone)864 static void resetCardinality(NumGcInfo *info, NumericRangeNode *currNone) {
865 khash_t(cardvals) *kh = kh_init(cardvals);
866 int added;
867 for (size_t ii = 0; ii < info->nrestBlockDel; ++ii) {
868 numUnion u = {info->restBlockDeleted[ii].value};
869 khiter_t it = kh_put(cardvals, kh, u.u64, &added);
870 kh_val(kh, it) = info->restBlockDeleted[ii].appearances;
871 }
872 if (!info->idxbufs.lastBlockIgnored) {
873 for (size_t ii = 0; ii < info->nlastBlockDel; ++ii) {
874 numUnion u = {info->lastBlockDeleted[ii].value};
875 khiter_t it = kh_put(cardvals, kh, u.u64, &added);
876 if (!added) {
877 kh_val(kh, it) += info->lastBlockDeleted[ii].appearances;
878 } else {
879 kh_val(kh, it) = info->lastBlockDeleted[ii].appearances;
880 }
881 }
882 }
883
884 NumericRange *r = currNone->range;
885 size_t n = array_len(r->values);
886 double minVal = DBL_MAX, maxVal = -DBL_MIN, uniqueSum = 0;
887
888 for (size_t ii = 0; ii < array_len(r->values); ++ii) {
889 reeval:;
890 numUnion u = {r->values[ii].value};
891 khiter_t it = kh_get(cardvals, kh, u.u64);
892 if (it != kh_end(kh) && (r->values[ii].appearances -= kh_val(kh, it)) == 0) {
893 // delet this
894 size_t isLast = array_len(r->values) == ii + 1;
895 array_del_fast(r->values, ii);
896 if (!isLast) {
897 goto reeval;
898 }
899 } else {
900 minVal = MIN(minVal, r->values[ii].value);
901 maxVal = MAX(maxVal, r->values[ii].value);
902 uniqueSum += r->values[ii].value;
903 }
904 }
905 kh_destroy(cardvals, kh);
906 // we can only update the min and the max value if the node is a leaf.
907 // otherwise the min and the max also represent its children values and
908 // we can not change it.
909 if (NumericRangeNode_IsLeaf(currNone)) {
910 r->minVal = minVal;
911 r->maxVal = maxVal;
912 }
913 r->unique_sum = uniqueSum;
914 r->card = array_len(r->values);
915 }
916
applyNumIdx(ForkGC * gc,RedisSearchCtx * sctx,NumGcInfo * ninfo)917 static void applyNumIdx(ForkGC *gc, RedisSearchCtx *sctx, NumGcInfo *ninfo) {
918 NumericRangeNode *currNode = ninfo->node;
919 InvIdxBuffers *idxbufs = &ninfo->idxbufs;
920 MSG_IndexInfo *info = &ninfo->info;
921 FGC_applyInvertedIndex(gc, idxbufs, info, currNode->range->entries);
922
923 currNode->range->invertedIndexSize -= info->nbytesCollected;
924 FGC_updateStats(sctx, gc, info->ndocsCollected, info->nbytesCollected);
925
926 // TODO: fix for NUMERIC similar to TAG fix PR#2269
927 // if (currNode->range->entries->numDocs == 0) {
928 // NumericRangeTree_DeleteNode(rt, (currNode->range->minVal + currNode->range->maxVal) / 2);
929 // }
930
931 resetCardinality(ninfo, currNode);
932 }
933
FGC_parentHandleNumeric(ForkGC * gc,RedisModuleCtx * rctx)934 static FGCError FGC_parentHandleNumeric(ForkGC *gc, RedisModuleCtx *rctx) {
935 int hasLock = 0;
936 size_t fieldNameLen;
937 char *fieldName = NULL;
938 uint64_t rtUniqueId;
939 NumericRangeTree *rt = NULL;
940 FGCError status = recvNumericTagHeader(gc, &fieldName, &fieldNameLen, &rtUniqueId);
941 if (status == FGC_DONE) {
942 return FGC_DONE;
943 }
944
945 while (status == FGC_COLLECTED) {
946 NumGcInfo ninfo = {0};
947 RedisSearchCtx *sctx = NULL;
948 RedisModuleKey *idxKey = NULL;
949 FGCError status2 = recvNumIdx(gc, &ninfo);
950 if (status2 == FGC_DONE) {
951 break;
952 } else if (status2 != FGC_COLLECTED) {
953 status = status2;
954 break;
955 }
956
957 if (!FGC_lock(gc, rctx)) {
958 status = FGC_PARENT_ERROR;
959 goto loop_cleanup;
960 }
961
962 hasLock = 1;
963 sctx = FGC_getSctx(gc, rctx);
964 if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
965 status = FGC_PARENT_ERROR;
966 goto loop_cleanup;
967 }
968 RedisModuleString *keyName =
969 IndexSpec_GetFormattedKeyByName(sctx->spec, fieldName, INDEXFLD_T_NUMERIC);
970 rt = OpenNumericIndex(sctx, keyName, &idxKey);
971
972 if (rt->uniqueId != rtUniqueId) {
973 status = FGC_PARENT_ERROR;
974 goto loop_cleanup;
975 }
976
977 if (!ninfo.node->range) {
978 gc->stats.gcNumericNodesMissed++;
979 goto loop_cleanup;
980 }
981
982 applyNumIdx(gc, sctx, &ninfo);
983
984 if (ninfo.node->range->entries->numDocs == 0) {
985 rt->emptyLeaves++;
986 }
987
988 loop_cleanup:
989 if (sctx) {
990 SearchCtx_Free(sctx);
991 }
992 if (status != FGC_COLLECTED) {
993 freeInvIdx(&ninfo.idxbufs, &ninfo.info);
994 } else {
995 rm_free(ninfo.idxbufs.changedBlocks);
996 }
997 if (idxKey) {
998 RedisModule_CloseKey(idxKey);
999 }
1000 if (hasLock) {
1001 FGC_unlock(gc, rctx);
1002 hasLock = 0;
1003 }
1004
1005 rm_free(ninfo.restBlockDeleted);
1006 rm_free(ninfo.lastBlockDeleted);
1007 }
1008
1009 rm_free(fieldName);
1010
1011 //printf("empty %ld, number of ranges %ld\n", rt->emptyLeaves, rt->numRanges);
1012 if (rt && rt->emptyLeaves >= rt->numRanges / 2) {
1013 hasLock = 1;
1014 if (!FGC_lock(gc, rctx)) {
1015 return FGC_PARENT_ERROR;
1016 }
1017 if (RSGlobalConfig.forkGCCleanNumericEmptyNodes) {
1018 NRN_AddRv rv = NumericRangeTree_TrimEmptyLeaves(rt);
1019 rt->numRanges += rv.numRanges;
1020 rt->emptyLeaves = 0;
1021 }
1022 if (hasLock) {
1023 FGC_unlock(gc, rctx);
1024 hasLock = 0;
1025 }
1026 }
1027 //printf("removed %d\n", rv.numRanges);
1028
1029 return status;
1030 }
1031
FGC_parentHandleTags(ForkGC * gc,RedisModuleCtx * rctx)1032 static FGCError FGC_parentHandleTags(ForkGC *gc, RedisModuleCtx *rctx) {
1033 int hasLock = 0;
1034 size_t fieldNameLen;
1035 char *fieldName;
1036 uint64_t tagUniqueId;
1037 InvertedIndex *value = NULL;
1038 FGCError status = recvNumericTagHeader(gc, &fieldName, &fieldNameLen, &tagUniqueId);
1039
1040 while (status == FGC_COLLECTED) {
1041 RedisModuleString *keyName = NULL;
1042 RedisModuleKey *idxKey = NULL;
1043 RedisSearchCtx *sctx = NULL;
1044 MSG_IndexInfo info = {0};
1045 InvIdxBuffers idxbufs = {0};
1046 TagIndex *tagIdx = NULL;
1047 char *tagVal = NULL;
1048 size_t tagValLen;
1049
1050 if (FGC_recvFixed(gc, &value, sizeof value) != REDISMODULE_OK) {
1051 status = FGC_CHILD_ERROR;
1052 break;
1053 }
1054
1055 // No more tags values in tag field
1056 if (value == NULL) {
1057 RS_LOG_ASSERT(status == FGC_COLLECTED, "GC status is COLLECTED");
1058 break;
1059 }
1060
1061 if (FGC_recvBuffer(gc, (void **)&tagVal, &tagValLen) != REDISMODULE_OK) {
1062 status = FGC_CHILD_ERROR;
1063 goto loop_cleanup;
1064 }
1065 // printf("receives %s %ld\n", tagVal, tagValLen);
1066
1067 if (FGC_recvInvIdx(gc, &idxbufs, &info) != REDISMODULE_OK) {
1068 status = FGC_CHILD_ERROR;
1069 goto loop_cleanup;
1070 }
1071
1072 if (!FGC_lock(gc, rctx)) {
1073 status = FGC_PARENT_ERROR;
1074 goto loop_cleanup;
1075 }
1076
1077 hasLock = 1;
1078 sctx = FGC_getSctx(gc, rctx);
1079 if (!sctx || sctx->spec->uniqueId != gc->specUniqueId) {
1080 status = FGC_PARENT_ERROR;
1081 goto loop_cleanup;
1082 }
1083 keyName = IndexSpec_GetFormattedKeyByName(sctx->spec, fieldName, INDEXFLD_T_TAG);
1084 tagIdx = TagIndex_Open(sctx, keyName, false, &idxKey);
1085
1086 if (tagIdx->uniqueId != tagUniqueId) {
1087 status = FGC_CHILD_ERROR;
1088 goto loop_cleanup;
1089 }
1090
1091 InvertedIndex *idx = TagIndex_OpenIndex(tagIdx, tagVal, tagValLen, 0);
1092 if (idx == TRIEMAP_NOTFOUND || idx != value) {
1093 status = FGC_PARENT_ERROR;
1094 goto loop_cleanup;
1095 }
1096
1097 // printf("Child %p Parent %p\n", value, idx);
1098
1099 FGC_applyInvertedIndex(gc, &idxbufs, &info, idx);
1100 FGC_updateStats(sctx, gc, info.ndocsCollected, info.nbytesCollected);
1101
1102 // if tag value is empty, let's remove it.
1103 if (idx->numDocs == 0) {
1104 // printf("Delete GC %s %p\n", tagVal, TrieMap_Find(tagIdx->values, tagVal, tagValLen));
1105 TrieMap_Delete(tagIdx->values, tagVal, tagValLen, InvertedIndex_Free);
1106 }
1107
1108 loop_cleanup:
1109 if (sctx) {
1110 SearchCtx_Free(sctx);
1111 }
1112 if (idxKey) {
1113 RedisModule_CloseKey(idxKey);
1114 }
1115 if (hasLock) {
1116 FGC_unlock(gc, rctx);
1117 hasLock = 0;
1118 }
1119 if (status != FGC_COLLECTED) {
1120 freeInvIdx(&idxbufs, &info);
1121 } else {
1122 rm_free(idxbufs.changedBlocks);
1123 }
1124 if (tagVal) {
1125 rm_free(tagVal);
1126 }
1127 }
1128
1129 rm_free(fieldName);
1130 return status;
1131 }
1132
FGC_parentHandleFromChild(ForkGC * gc)1133 int FGC_parentHandleFromChild(ForkGC *gc) {
1134 FGCError status = FGC_COLLECTED;
1135
1136 #define COLLECT_FROM_CHILD(e) \
1137 while ((status = (e)) == FGC_COLLECTED) { \
1138 } \
1139 if (status != FGC_DONE) { \
1140 return REDISMODULE_ERR; \
1141 }
1142
1143 COLLECT_FROM_CHILD(FGC_parentHandleTerms(gc, gc->ctx));
1144 COLLECT_FROM_CHILD(FGC_parentHandleNumeric(gc, gc->ctx));
1145 COLLECT_FROM_CHILD(FGC_parentHandleTags(gc, gc->ctx));
1146 return REDISMODULE_OK;
1147 }
1148
1149 /**
1150 * In future versions of Redis, Redis will have its own fork() call.
1151 * The following two functions wrap this functionality.
1152 */
FGC_haveRedisFork()1153 static int FGC_haveRedisFork() {
1154 return RedisModule_Fork != NULL;
1155 }
1156
FGC_fork(ForkGC * gc,RedisModuleCtx * ctx)1157 static int FGC_fork(ForkGC *gc, RedisModuleCtx *ctx) {
1158 if (FGC_haveRedisFork()) {
1159 int ret = RedisModule_Fork(NULL, NULL);
1160 return ret;
1161 } else {
1162 return fork();
1163 }
1164 }
1165
periodicCb(RedisModuleCtx * ctx,void * privdata)1166 static int periodicCb(RedisModuleCtx *ctx, void *privdata) {
1167 ForkGC *gc = privdata;
1168 if (gc->deleting) {
1169 return 0;
1170 }
1171 if (gc->deletedDocsFromLastRun < RSGlobalConfig.forkGcCleanThreshold) {
1172 return 1;
1173 }
1174
1175 int gcrv = 1;
1176
1177 RedisModule_AutoMemory(ctx);
1178
1179 // Check if RDB is loading - not needed after the first time we find out that rdb is not
1180 // reloading
1181 if (gc->rdbPossiblyLoading && !gc->sp) {
1182 RedisModule_ThreadSafeContextLock(ctx);
1183 if (isRdbLoading(ctx)) {
1184 RedisModule_Log(ctx, "notice", "RDB Loading in progress, not performing GC");
1185 RedisModule_ThreadSafeContextUnlock(ctx);
1186 return 1;
1187 } else {
1188 // the RDB will not load again, so it's safe to ignore the info check in the next cycles
1189 gc->rdbPossiblyLoading = 0;
1190 }
1191 RedisModule_ThreadSafeContextUnlock(ctx);
1192 }
1193
1194 pid_t cpid;
1195 TimeSample ts;
1196
1197 while (gc->pauseState == FGC_PAUSED_CHILD) {
1198 gc->execState = FGC_STATE_WAIT_FORK;
1199 // spin or sleep
1200 usleep(500);
1201 }
1202
1203 pid_t ppid_before_fork = getpid();
1204
1205 TimeSampler_Start(&ts);
1206 pipe(gc->pipefd); // create the pipe
1207
1208 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1209 // If we are not in key space we still need to acquire the GIL to use the fork api
1210 RedisModule_ThreadSafeContextLock(ctx);
1211 }
1212
1213 if (!FGC_lock(gc, ctx)) {
1214
1215 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1216 RedisModule_ThreadSafeContextUnlock(ctx);
1217 }
1218
1219 close(gc->pipefd[GC_READERFD]);
1220 close(gc->pipefd[GC_WRITERFD]);
1221
1222 return 0;
1223 }
1224
1225 gc->execState = FGC_STATE_SCANNING;
1226
1227 cpid = FGC_fork(gc, ctx); // duplicate the current process
1228
1229 if (cpid == -1) {
1230 gc->retryInterval.tv_sec = RSGlobalConfig.forkGcRetryInterval;
1231
1232 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1233 RedisModule_ThreadSafeContextUnlock(ctx);
1234 }
1235
1236 FGC_unlock(gc, ctx);
1237
1238 close(gc->pipefd[GC_READERFD]);
1239 close(gc->pipefd[GC_WRITERFD]);
1240
1241 return 1;
1242 }
1243
1244 gc->deletedDocsFromLastRun = 0;
1245
1246 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1247 RedisModule_ThreadSafeContextUnlock(ctx);
1248 }
1249
1250 FGC_unlock(gc, ctx);
1251
1252 gc->retryInterval.tv_sec = RSGlobalConfig.forkGcRunIntervalSec;
1253
1254 if (cpid == 0) {
1255 setpriority(PRIO_PROCESS, getpid(), 19);
1256 // fork process
1257 close(gc->pipefd[GC_READERFD]);
1258 #ifdef __linux__
1259 if (!FGC_haveRedisFork()) {
1260 // set the parrent death signal to SIGTERM
1261 int r = prctl(PR_SET_PDEATHSIG, SIGKILL);
1262 if (r == -1) {
1263 exit(1);
1264 }
1265 // test in case the original parent exited just
1266 // before the prctl() call
1267 if (getppid() != ppid_before_fork) exit(1);
1268 }
1269 #endif
1270 FGC_childScanIndexes(gc);
1271 close(gc->pipefd[GC_WRITERFD]);
1272 sleep(RSGlobalConfig.forkGcSleepBeforeExit);
1273 _exit(EXIT_SUCCESS);
1274 } else {
1275 // main process
1276 close(gc->pipefd[GC_WRITERFD]);
1277 while (gc->pauseState == FGC_PAUSED_PARENT) {
1278 gc->execState = FGC_STATE_WAIT_APPLY;
1279 // spin
1280 usleep(500);
1281 }
1282
1283 gc->execState = FGC_STATE_APPLYING;
1284 if (FGC_parentHandleFromChild(gc) == REDISMODULE_ERR) {
1285 gcrv = 1;
1286 }
1287 close(gc->pipefd[GC_READERFD]);
1288 if (FGC_haveRedisFork()) {
1289
1290 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1291 // If we are not in key space we still need to acquire the GIL to use the fork api
1292 RedisModule_ThreadSafeContextLock(ctx);
1293 }
1294
1295 if (!FGC_lock(gc, ctx)) {
1296 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1297 RedisModule_ThreadSafeContextUnlock(ctx);
1298 }
1299
1300 return 0;
1301 }
1302
1303 // KillForkChild must be called when holding the GIL
1304 // otherwise it might cause a pipe leak and eventually run
1305 // out of file descriptor
1306 RedisModule_KillForkChild(cpid);
1307
1308 if (gc->type == FGC_TYPE_NOKEYSPACE) {
1309 RedisModule_ThreadSafeContextUnlock(ctx);
1310 }
1311
1312 FGC_unlock(gc, ctx);
1313
1314 } else {
1315 pid_t id = wait4(cpid, NULL, 0, NULL);
1316 if (id == -1) {
1317 printf("an error acquire when waiting for fork to terminate, pid:%d", cpid);
1318 }
1319 }
1320 }
1321 gc->execState = FGC_STATE_IDLE;
1322 TimeSampler_End(&ts);
1323
1324 long long msRun = TimeSampler_DurationMS(&ts);
1325
1326 gc->stats.numCycles++;
1327 gc->stats.totalMSRun += msRun;
1328 gc->stats.lastRunTimeMs = msRun;
1329
1330 return gcrv;
1331 }
1332
1333 #if defined(__has_feature)
1334 #if __has_feature(thread_sanitizer)
1335 #define NO_TSAN_CHECK __attribute__((no_sanitize("thread")))
1336 #endif
1337 #endif
1338 #ifndef NO_TSAN_CHECK
1339 #define NO_TSAN_CHECK
1340 #endif
1341
FGC_WaitAtFork(ForkGC * gc)1342 void FGC_WaitAtFork(ForkGC *gc) NO_TSAN_CHECK {
1343 RS_LOG_ASSERT(gc->pauseState == 0, "FGC pause state should be 0");
1344 gc->pauseState = FGC_PAUSED_CHILD;
1345
1346 while (gc->execState != FGC_STATE_WAIT_FORK) {
1347 usleep(500);
1348 }
1349 }
1350
FGC_WaitAtApply(ForkGC * gc)1351 void FGC_WaitAtApply(ForkGC *gc) NO_TSAN_CHECK {
1352 // Ensure that we're waiting for the child to begin
1353 RS_LOG_ASSERT(gc->pauseState == FGC_PAUSED_CHILD, "FGC pause state should be CHILD");
1354 RS_LOG_ASSERT(gc->execState == FGC_STATE_WAIT_FORK, "FGC exec state should be WAIT_FORK");
1355
1356 gc->pauseState = FGC_PAUSED_PARENT;
1357 while (gc->execState != FGC_STATE_WAIT_APPLY) {
1358 usleep(500);
1359 }
1360 }
1361
FGC_WaitClear(ForkGC * gc)1362 void FGC_WaitClear(ForkGC *gc) NO_TSAN_CHECK {
1363 gc->pauseState = FGC_PAUSED_UNPAUSED;
1364 while (gc->execState != FGC_STATE_IDLE) {
1365 usleep(500);
1366 }
1367 }
1368
onTerminateCb(void * privdata)1369 static void onTerminateCb(void *privdata) {
1370 ForkGC *gc = privdata;
1371 if (gc->keyName && gc->type == FGC_TYPE_INKEYSPACE) {
1372 RedisModule_FreeString(gc->ctx, (RedisModuleString *)gc->keyName);
1373 }
1374
1375 RedisModule_FreeThreadSafeContext(gc->ctx);
1376 rm_free(gc);
1377 }
1378
statsCb(RedisModuleCtx * ctx,void * gcCtx)1379 static void statsCb(RedisModuleCtx *ctx, void *gcCtx) {
1380 #define REPLY_KVNUM(n, k, v) \
1381 RedisModule_ReplyWithSimpleString(ctx, k); \
1382 RedisModule_ReplyWithDouble(ctx, (double)v); \
1383 n += 2
1384 ForkGC *gc = gcCtx;
1385
1386 int n = 0;
1387 RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
1388 if (gc) {
1389 REPLY_KVNUM(n, "bytes_collected", gc->stats.totalCollected);
1390 REPLY_KVNUM(n, "total_ms_run", gc->stats.totalMSRun);
1391 REPLY_KVNUM(n, "total_cycles", gc->stats.numCycles);
1392 REPLY_KVNUM(n, "average_cycle_time_ms", (double)gc->stats.totalMSRun / gc->stats.numCycles);
1393 REPLY_KVNUM(n, "last_run_time_ms", (double)gc->stats.lastRunTimeMs);
1394 REPLY_KVNUM(n, "gc_numeric_trees_missed", (double)gc->stats.gcNumericNodesMissed);
1395 REPLY_KVNUM(n, "gc_blocks_denied", (double)gc->stats.gcBlocksDenied);
1396 }
1397 RedisModule_ReplySetArrayLength(ctx, n);
1398 }
1399
killCb(void * ctx)1400 static void killCb(void *ctx) {
1401 ForkGC *gc = ctx;
1402 gc->deleting = 1;
1403 }
1404
deleteCb(void * ctx)1405 static void deleteCb(void *ctx) {
1406 ForkGC *gc = ctx;
1407 ++gc->deletedDocsFromLastRun;
1408 }
1409
getIntervalCb(void * ctx)1410 static struct timespec getIntervalCb(void *ctx) {
1411 ForkGC *gc = ctx;
1412 return gc->retryInterval;
1413 }
1414
FGC_New(const RedisModuleString * k,uint64_t specUniqueId,GCCallbacks * callbacks)1415 ForkGC *FGC_New(const RedisModuleString *k, uint64_t specUniqueId, GCCallbacks *callbacks) {
1416 ForkGC *forkGc = rm_calloc(1, sizeof(*forkGc));
1417 *forkGc = (ForkGC){
1418 .rdbPossiblyLoading = 1,
1419 .specUniqueId = specUniqueId,
1420 .type = FGC_TYPE_INKEYSPACE,
1421 .deletedDocsFromLastRun = 0,
1422 };
1423 forkGc->retryInterval.tv_sec = RSGlobalConfig.forkGcRunIntervalSec;
1424 forkGc->retryInterval.tv_nsec = 0;
1425 forkGc->ctx = RedisModule_GetThreadSafeContext(NULL);
1426 if (k) {
1427 forkGc->keyName = RedisModule_CreateStringFromString(forkGc->ctx, k);
1428 RedisModule_FreeString(forkGc->ctx, (RedisModuleString *)k);
1429 }
1430
1431 callbacks->onTerm = onTerminateCb;
1432 callbacks->periodicCallback = periodicCb;
1433 callbacks->renderStats = statsCb;
1434 callbacks->getInterval = getIntervalCb;
1435 callbacks->kill = killCb;
1436 callbacks->onDelete = deleteCb;
1437
1438 return forkGc;
1439 }
1440
FGC_NewFromSpec(IndexSpec * sp,uint64_t specUniqueId,GCCallbacks * callbacks)1441 ForkGC *FGC_NewFromSpec(IndexSpec *sp, uint64_t specUniqueId, GCCallbacks *callbacks) {
1442 ForkGC *ctx = FGC_New(NULL, specUniqueId, callbacks);
1443 ctx->sp = sp;
1444 ctx->type = FGC_TYPE_NOKEYSPACE;
1445 return ctx;
1446 }
1447