1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "bio.h"
32 #include "rio.h"
33 
34 #include <signal.h>
35 #include <fcntl.h>
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/wait.h>
41 #include <sys/param.h>
42 
43 void aofUpdateCurrentSize(void);
44 void aofClosePipes(void);
45 
46 /* ----------------------------------------------------------------------------
47  * AOF rewrite buffer implementation.
48  *
49  * The following code implement a simple buffer used in order to accumulate
50  * changes while the background process is rewriting the AOF file.
51  *
52  * We only need to append, but can't just use realloc with a large block
53  * because 'huge' reallocs are not always handled as one could expect
54  * (via remapping of pages at OS level) but may involve copying data.
55  *
56  * For this reason we use a list of blocks, every block is
57  * AOF_RW_BUF_BLOCK_SIZE bytes.
58  * ------------------------------------------------------------------------- */
59 
60 #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
61 
62 typedef struct aofrwblock {
63     unsigned long used, free;
64     char buf[AOF_RW_BUF_BLOCK_SIZE];
65 } aofrwblock;
66 
67 /* This function free the old AOF rewrite buffer if needed, and initialize
68  * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
69  * so can be used for the first initialization as well. */
aofRewriteBufferReset(void)70 void aofRewriteBufferReset(void) {
71     if (server.aof_rewrite_buf_blocks)
72         listRelease(server.aof_rewrite_buf_blocks);
73 
74     server.aof_rewrite_buf_blocks = listCreate();
75     listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
76 }
77 
78 /* Return the current size of the AOF rewrite buffer. */
aofRewriteBufferSize(void)79 unsigned long aofRewriteBufferSize(void) {
80     listNode *ln;
81     listIter li;
82     unsigned long size = 0;
83 
84     listRewind(server.aof_rewrite_buf_blocks,&li);
85     while((ln = listNext(&li))) {
86         aofrwblock *block = listNodeValue(ln);
87         size += block->used;
88     }
89     return size;
90 }
91 
92 /* Event handler used to send data to the child process doing the AOF
93  * rewrite. We send pieces of our AOF differences buffer so that the final
94  * write when the child finishes the rewrite will be small. */
aofChildWriteDiffData(aeEventLoop * el,int fd,void * privdata,int mask)95 void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
96     listNode *ln;
97     aofrwblock *block;
98     ssize_t nwritten;
99     UNUSED(el);
100     UNUSED(fd);
101     UNUSED(privdata);
102     UNUSED(mask);
103 
104     while(1) {
105         ln = listFirst(server.aof_rewrite_buf_blocks);
106         block = ln ? ln->value : NULL;
107         if (server.aof_stop_sending_diff || !block) {
108             aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
109                               AE_WRITABLE);
110             return;
111         }
112         if (block->used > 0) {
113             nwritten = write(server.aof_pipe_write_data_to_child,
114                              block->buf,block->used);
115             if (nwritten <= 0) return;
116             memmove(block->buf,block->buf+nwritten,block->used-nwritten);
117             block->used -= nwritten;
118             block->free += nwritten;
119         }
120         if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
121     }
122 }
123 
124 /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
aofRewriteBufferAppend(unsigned char * s,unsigned long len)125 void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
126     listNode *ln = listLast(server.aof_rewrite_buf_blocks);
127     aofrwblock *block = ln ? ln->value : NULL;
128 
129     while(len) {
130         /* If we already got at least an allocated block, try appending
131          * at least some piece into it. */
132         if (block) {
133             unsigned long thislen = (block->free < len) ? block->free : len;
134             if (thislen) {  /* The current block is not already full. */
135                 memcpy(block->buf+block->used, s, thislen);
136                 block->used += thislen;
137                 block->free -= thislen;
138                 s += thislen;
139                 len -= thislen;
140             }
141         }
142 
143         if (len) { /* First block to allocate, or need another block. */
144             int numblocks;
145 
146             block = zmalloc(sizeof(*block));
147             block->free = AOF_RW_BUF_BLOCK_SIZE;
148             block->used = 0;
149             listAddNodeTail(server.aof_rewrite_buf_blocks,block);
150 
151             /* Log every time we cross more 10 or 100 blocks, respectively
152              * as a notice or warning. */
153             numblocks = listLength(server.aof_rewrite_buf_blocks);
154             if (((numblocks+1) % 10) == 0) {
155                 int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
156                                                          LL_NOTICE;
157                 serverLog(level,"Background AOF buffer size: %lu MB",
158                     aofRewriteBufferSize()/(1024*1024));
159             }
160         }
161     }
162 
163     /* Install a file event to send data to the rewrite child if there is
164      * not one already. */
165     if (!server.aof_stop_sending_diff &&
166         aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0)
167     {
168         aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
169             AE_WRITABLE, aofChildWriteDiffData, NULL);
170     }
171 }
172 
173 /* Write the buffer (possibly composed of multiple blocks) into the specified
174  * fd. If a short write or any other error happens -1 is returned,
175  * otherwise the number of bytes written is returned. */
aofRewriteBufferWrite(int fd)176 ssize_t aofRewriteBufferWrite(int fd) {
177     listNode *ln;
178     listIter li;
179     ssize_t count = 0;
180 
181     listRewind(server.aof_rewrite_buf_blocks,&li);
182     while((ln = listNext(&li))) {
183         aofrwblock *block = listNodeValue(ln);
184         ssize_t nwritten;
185 
186         if (block->used) {
187             nwritten = write(fd,block->buf,block->used);
188             if (nwritten != (ssize_t)block->used) {
189                 if (nwritten == 0) errno = EIO;
190                 return -1;
191             }
192             count += nwritten;
193         }
194     }
195     return count;
196 }
197 
198 /* ----------------------------------------------------------------------------
199  * AOF file implementation
200  * ------------------------------------------------------------------------- */
201 
202 /* Return true if an AOf fsync is currently already in progress in a
203  * BIO thread. */
aofFsyncInProgress(void)204 int aofFsyncInProgress(void) {
205     return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
206 }
207 
208 /* Starts a background task that performs fsync() against the specified
209  * file descriptor (the one of the AOF file) in another thread. */
aof_background_fsync(int fd)210 void aof_background_fsync(int fd) {
211     bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
212 }
213 
214 /* Kills an AOFRW child process if exists */
killAppendOnlyChild(void)215 void killAppendOnlyChild(void) {
216     int statloc;
217     /* No AOFRW child? return. */
218     if (server.aof_child_pid == -1) return;
219     /* Kill AOFRW child, wait for child exit. */
220     serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
221         (long) server.aof_child_pid);
222     if (kill(server.aof_child_pid,SIGUSR1) != -1) {
223         while(wait3(&statloc,0,NULL) != server.aof_child_pid);
224     }
225     /* Reset the buffer accumulating changes while the child saves. */
226     aofRewriteBufferReset();
227     aofRemoveTempFile(server.aof_child_pid);
228     server.aof_child_pid = -1;
229     server.aof_rewrite_time_start = -1;
230     /* Close pipes used for IPC between the two processes. */
231     aofClosePipes();
232     closeChildInfoPipe();
233     updateDictResizePolicy();
234 }
235 
236 /* Called when the user switches from "appendonly yes" to "appendonly no"
237  * at runtime using the CONFIG command. */
stopAppendOnly(void)238 void stopAppendOnly(void) {
239     serverAssert(server.aof_state != AOF_OFF);
240     flushAppendOnlyFile(1);
241     redis_fsync(server.aof_fd);
242     close(server.aof_fd);
243 
244     server.aof_fd = -1;
245     server.aof_selected_db = -1;
246     server.aof_state = AOF_OFF;
247     server.aof_rewrite_scheduled = 0;
248     killAppendOnlyChild();
249 }
250 
251 /* Called when the user switches from "appendonly no" to "appendonly yes"
252  * at runtime using the CONFIG command. */
startAppendOnly(void)253 int startAppendOnly(void) {
254     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
255     int newfd;
256 
257     newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
258     serverAssert(server.aof_state == AOF_OFF);
259     if (newfd == -1) {
260         char *cwdp = getcwd(cwd,MAXPATHLEN);
261 
262         serverLog(LL_WARNING,
263             "Redis needs to enable the AOF but can't open the "
264             "append only file %s (in server root dir %s): %s",
265             server.aof_filename,
266             cwdp ? cwdp : "unknown",
267             strerror(errno));
268         return C_ERR;
269     }
270     if (hasActiveChildProcess() && server.aof_child_pid == -1) {
271         server.aof_rewrite_scheduled = 1;
272         serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
273     } else {
274         /* If there is a pending AOF rewrite, we need to switch it off and
275          * start a new one: the old one cannot be reused because it is not
276          * accumulating the AOF buffer. */
277         if (server.aof_child_pid != -1) {
278             serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
279             killAppendOnlyChild();
280         }
281         if (rewriteAppendOnlyFileBackground() == C_ERR) {
282             close(newfd);
283             serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
284             return C_ERR;
285         }
286     }
287     /* We correctly switched on AOF, now wait for the rewrite to be complete
288      * in order to append data on disk. */
289     server.aof_state = AOF_WAIT_REWRITE;
290     server.aof_last_fsync = server.unixtime;
291     server.aof_fd = newfd;
292     return C_OK;
293 }
294 
295 /* This is a wrapper to the write syscall in order to retry on short writes
296  * or if the syscall gets interrupted. It could look strange that we retry
297  * on short writes given that we are writing to a block device: normally if
298  * the first call is short, there is a end-of-space condition, so the next
299  * is likely to fail. However apparently in modern systems this is no longer
300  * true, and in general it looks just more resilient to retry the write. If
301  * there is an actual error condition we'll get it at the next try. */
aofWrite(int fd,const char * buf,size_t len)302 ssize_t aofWrite(int fd, const char *buf, size_t len) {
303     ssize_t nwritten = 0, totwritten = 0;
304 
305     while(len) {
306         nwritten = write(fd, buf, len);
307 
308         if (nwritten < 0) {
309             if (errno == EINTR) continue;
310             return totwritten ? totwritten : -1;
311         }
312 
313         len -= nwritten;
314         buf += nwritten;
315         totwritten += nwritten;
316     }
317 
318     return totwritten;
319 }
320 
321 /* Write the append only file buffer on disk.
322  *
323  * Since we are required to write the AOF before replying to the client,
324  * and the only way the client socket can get a write is entering when the
325  * the event loop, we accumulate all the AOF writes in a memory
326  * buffer and write it on disk using this function just before entering
327  * the event loop again.
328  *
329  * About the 'force' argument:
330  *
331  * When the fsync policy is set to 'everysec' we may delay the flush if there
332  * is still an fsync() going on in the background thread, since for instance
333  * on Linux write(2) will be blocked by the background fsync anyway.
334  * When this happens we remember that there is some aof buffer to be
335  * flushed ASAP, and will try to do that in the serverCron() function.
336  *
337  * However if force is set to 1 we'll write regardless of the background
338  * fsync. */
339 #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
flushAppendOnlyFile(int force)340 void flushAppendOnlyFile(int force) {
341     ssize_t nwritten;
342     int sync_in_progress = 0;
343     mstime_t latency;
344 
345     if (sdslen(server.aof_buf) == 0) {
346         /* Check if we need to do fsync even the aof buffer is empty,
347          * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
348          * called only when aof buffer is not empty, so if users
349          * stop write commands before fsync called in one second,
350          * the data in page cache cannot be flushed in time. */
351         if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
352             server.aof_fsync_offset != server.aof_current_size &&
353             server.unixtime > server.aof_last_fsync &&
354             !(sync_in_progress = aofFsyncInProgress())) {
355             goto try_fsync;
356         } else {
357             return;
358         }
359     }
360 
361     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
362         sync_in_progress = aofFsyncInProgress();
363 
364     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
365         /* With this append fsync policy we do background fsyncing.
366          * If the fsync is still in progress we can try to delay
367          * the write for a couple of seconds. */
368         if (sync_in_progress) {
369             if (server.aof_flush_postponed_start == 0) {
370                 /* No previous write postponing, remember that we are
371                  * postponing the flush and return. */
372                 server.aof_flush_postponed_start = server.unixtime;
373                 return;
374             } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
375                 /* We were already waiting for fsync to finish, but for less
376                  * than two seconds this is still ok. Postpone again. */
377                 return;
378             }
379             /* Otherwise fall trough, and go write since we can't wait
380              * over two seconds. */
381             server.aof_delayed_fsync++;
382             serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
383         }
384     }
385     /* We want to perform a single write. This should be guaranteed atomic
386      * at least if the filesystem we are writing is a real physical one.
387      * While this will save us against the server being killed I don't think
388      * there is much to do about the whole server stopping for power problems
389      * or alike */
390 
391     if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
392         usleep(server.aof_flush_sleep);
393     }
394 
395     latencyStartMonitor(latency);
396     nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
397     latencyEndMonitor(latency);
398     /* We want to capture different events for delayed writes:
399      * when the delay happens with a pending fsync, or with a saving child
400      * active, and when the above two conditions are missing.
401      * We also use an additional event name to save all samples which is
402      * useful for graphing / monitoring purposes. */
403     if (sync_in_progress) {
404         latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
405     } else if (hasActiveChildProcess()) {
406         latencyAddSampleIfNeeded("aof-write-active-child",latency);
407     } else {
408         latencyAddSampleIfNeeded("aof-write-alone",latency);
409     }
410     latencyAddSampleIfNeeded("aof-write",latency);
411 
412     /* We performed the write so reset the postponed flush sentinel to zero. */
413     server.aof_flush_postponed_start = 0;
414 
415     if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
416         static time_t last_write_error_log = 0;
417         int can_log = 0;
418 
419         /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
420         if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
421             can_log = 1;
422             last_write_error_log = server.unixtime;
423         }
424 
425         /* Log the AOF write error and record the error code. */
426         if (nwritten == -1) {
427             if (can_log) {
428                 serverLog(LL_WARNING,"Error writing to the AOF file: %s",
429                     strerror(errno));
430                 server.aof_last_write_errno = errno;
431             }
432         } else {
433             if (can_log) {
434                 serverLog(LL_WARNING,"Short write while writing to "
435                                        "the AOF file: (nwritten=%lld, "
436                                        "expected=%lld)",
437                                        (long long)nwritten,
438                                        (long long)sdslen(server.aof_buf));
439             }
440 
441             if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
442                 if (can_log) {
443                     serverLog(LL_WARNING, "Could not remove short write "
444                              "from the append-only file.  Redis may refuse "
445                              "to load the AOF the next time it starts.  "
446                              "ftruncate: %s", strerror(errno));
447                 }
448             } else {
449                 /* If the ftruncate() succeeded we can set nwritten to
450                  * -1 since there is no longer partial data into the AOF. */
451                 nwritten = -1;
452             }
453             server.aof_last_write_errno = ENOSPC;
454         }
455 
456         /* Handle the AOF write error. */
457         if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
458             /* We can't recover when the fsync policy is ALWAYS since the
459              * reply for the client is already in the output buffers, and we
460              * have the contract with the user that on acknowledged write data
461              * is synced on disk. */
462             serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
463             exit(1);
464         } else {
465             /* Recover from failed write leaving data into the buffer. However
466              * set an error to stop accepting writes as long as the error
467              * condition is not cleared. */
468             server.aof_last_write_status = C_ERR;
469 
470             /* Trim the sds buffer if there was a partial write, and there
471              * was no way to undo it with ftruncate(2). */
472             if (nwritten > 0) {
473                 server.aof_current_size += nwritten;
474                 sdsrange(server.aof_buf,nwritten,-1);
475             }
476             return; /* We'll try again on the next call... */
477         }
478     } else {
479         /* Successful write(2). If AOF was in error state, restore the
480          * OK state and log the event. */
481         if (server.aof_last_write_status == C_ERR) {
482             serverLog(LL_WARNING,
483                 "AOF write error looks solved, Redis can write again.");
484             server.aof_last_write_status = C_OK;
485         }
486     }
487     server.aof_current_size += nwritten;
488 
489     /* Re-use AOF buffer when it is small enough. The maximum comes from the
490      * arena size of 4k minus some overhead (but is otherwise arbitrary). */
491     if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
492         sdsclear(server.aof_buf);
493     } else {
494         sdsfree(server.aof_buf);
495         server.aof_buf = sdsempty();
496     }
497 
498 try_fsync:
499     /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
500      * children doing I/O in the background. */
501     if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
502         return;
503 
504     /* Perform the fsync if needed. */
505     if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
506         /* redis_fsync is defined as fdatasync() for Linux in order to avoid
507          * flushing metadata. */
508         latencyStartMonitor(latency);
509         redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
510         latencyEndMonitor(latency);
511         latencyAddSampleIfNeeded("aof-fsync-always",latency);
512         server.aof_fsync_offset = server.aof_current_size;
513         server.aof_last_fsync = server.unixtime;
514     } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
515                 server.unixtime > server.aof_last_fsync)) {
516         if (!sync_in_progress) {
517             aof_background_fsync(server.aof_fd);
518             server.aof_fsync_offset = server.aof_current_size;
519         }
520         server.aof_last_fsync = server.unixtime;
521     }
522 }
523 
catAppendOnlyGenericCommand(sds dst,int argc,robj ** argv)524 sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
525     char buf[32];
526     int len, j;
527     robj *o;
528 
529     buf[0] = '*';
530     len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
531     buf[len++] = '\r';
532     buf[len++] = '\n';
533     dst = sdscatlen(dst,buf,len);
534 
535     for (j = 0; j < argc; j++) {
536         o = getDecodedObject(argv[j]);
537         buf[0] = '$';
538         len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
539         buf[len++] = '\r';
540         buf[len++] = '\n';
541         dst = sdscatlen(dst,buf,len);
542         dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
543         dst = sdscatlen(dst,"\r\n",2);
544         decrRefCount(o);
545     }
546     return dst;
547 }
548 
549 /* Create the sds representation of a PEXPIREAT command, using
550  * 'seconds' as time to live and 'cmd' to understand what command
551  * we are translating into a PEXPIREAT.
552  *
553  * This command is used in order to translate EXPIRE and PEXPIRE commands
554  * into PEXPIREAT command so that we retain precision in the append only
555  * file, and the time is always absolute and not relative. */
catAppendOnlyExpireAtCommand(sds buf,struct redisCommand * cmd,robj * key,robj * seconds)556 sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
557     long long when;
558     robj *argv[3];
559 
560     /* Make sure we can use strtoll */
561     seconds = getDecodedObject(seconds);
562     when = strtoll(seconds->ptr,NULL,10);
563     /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
564     if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
565         cmd->proc == expireatCommand)
566     {
567         when *= 1000;
568     }
569     /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
570     if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
571         cmd->proc == setexCommand || cmd->proc == psetexCommand)
572     {
573         when += mstime();
574     }
575     decrRefCount(seconds);
576 
577     argv[0] = createStringObject("PEXPIREAT",9);
578     argv[1] = key;
579     argv[2] = createStringObjectFromLongLong(when);
580     buf = catAppendOnlyGenericCommand(buf, 3, argv);
581     decrRefCount(argv[0]);
582     decrRefCount(argv[2]);
583     return buf;
584 }
585 
feedAppendOnlyFile(struct redisCommand * cmd,int dictid,robj ** argv,int argc)586 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
587     sds buf = sdsempty();
588     robj *tmpargv[3];
589 
590     /* The DB this command was targeting is not the same as the last command
591      * we appended. To issue a SELECT command is needed. */
592     if (dictid != server.aof_selected_db) {
593         char seldb[64];
594 
595         snprintf(seldb,sizeof(seldb),"%d",dictid);
596         buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
597             (unsigned long)strlen(seldb),seldb);
598         server.aof_selected_db = dictid;
599     }
600 
601     if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
602         cmd->proc == expireatCommand) {
603         /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
604         buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
605     } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
606         /* Translate SETEX/PSETEX to SET and PEXPIREAT */
607         tmpargv[0] = createStringObject("SET",3);
608         tmpargv[1] = argv[1];
609         tmpargv[2] = argv[3];
610         buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
611         decrRefCount(tmpargv[0]);
612         buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
613     } else if (cmd->proc == setCommand && argc > 3) {
614         int i;
615         robj *exarg = NULL, *pxarg = NULL;
616         for (i = 3; i < argc; i ++) {
617             if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
618             if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
619         }
620         serverAssert(!(exarg && pxarg));
621 
622         if (exarg || pxarg) {
623             /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
624             buf = catAppendOnlyGenericCommand(buf,3,argv);
625             if (exarg)
626                 buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
627                                                    exarg);
628             if (pxarg)
629                 buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
630                                                    pxarg);
631         } else {
632             buf = catAppendOnlyGenericCommand(buf,argc,argv);
633         }
634     } else {
635         /* All the other commands don't need translation or need the
636          * same translation already operated in the command vector
637          * for the replication itself. */
638         buf = catAppendOnlyGenericCommand(buf,argc,argv);
639     }
640 
641     /* Append to the AOF buffer. This will be flushed on disk just before
642      * of re-entering the event loop, so before the client will get a
643      * positive reply about the operation performed. */
644     if (server.aof_state == AOF_ON)
645         server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
646 
647     /* If a background append only file rewriting is in progress we want to
648      * accumulate the differences between the child DB and the current one
649      * in a buffer, so that when the child process will do its work we
650      * can append the differences to the new append only file. */
651     if (server.aof_child_pid != -1)
652         aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
653 
654     sdsfree(buf);
655 }
656 
657 /* ----------------------------------------------------------------------------
658  * AOF loading
659  * ------------------------------------------------------------------------- */
660 
661 /* In Redis commands are always executed in the context of a client, so in
662  * order to load the append only file we need to create a fake client. */
createAOFClient(void)663 struct client *createAOFClient(void) {
664     struct client *c = zmalloc(sizeof(*c));
665 
666     selectDb(c,0);
667     c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
668     c->conn = NULL;
669     c->name = NULL;
670     c->querybuf = sdsempty();
671     c->querybuf_peak = 0;
672     c->argc = 0;
673     c->argv = NULL;
674     c->argv_len_sum = 0;
675     c->bufpos = 0;
676     c->flags = 0;
677     c->btype = BLOCKED_NONE;
678     /* We set the fake client as a slave waiting for the synchronization
679      * so that Redis will not try to send replies to this client. */
680     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
681     c->reply = listCreate();
682     c->reply_bytes = 0;
683     c->obuf_soft_limit_reached_time = 0;
684     c->watched_keys = listCreate();
685     c->peerid = NULL;
686     c->resp = 2;
687     c->user = NULL;
688     listSetFreeMethod(c->reply,freeClientReplyValue);
689     listSetDupMethod(c->reply,dupClientReplyValue);
690     initClientMultiState(c);
691     return c;
692 }
693 
freeFakeClientArgv(struct client * c)694 void freeFakeClientArgv(struct client *c) {
695     int j;
696 
697     for (j = 0; j < c->argc; j++)
698         decrRefCount(c->argv[j]);
699     zfree(c->argv);
700     c->argv_len_sum = 0;
701 }
702 
freeFakeClient(struct client * c)703 void freeFakeClient(struct client *c) {
704     sdsfree(c->querybuf);
705     listRelease(c->reply);
706     listRelease(c->watched_keys);
707     freeClientMultiState(c);
708     zfree(c);
709 }
710 
711 /* Replay the append log file. On success C_OK is returned. On non fatal
712  * error (the append only file is zero-length) C_ERR is returned. On
713  * fatal error an error message is logged and the program exists. */
loadAppendOnlyFile(char * filename)714 int loadAppendOnlyFile(char *filename) {
715     struct client *fakeClient;
716     FILE *fp = fopen(filename,"r");
717     struct redis_stat sb;
718     int old_aof_state = server.aof_state;
719     long loops = 0;
720     off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
721     off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
722 
723     if (fp == NULL) {
724         serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
725         exit(1);
726     }
727 
728     /* Handle a zero-length AOF file as a special case. An empty AOF file
729      * is a valid AOF because an empty server with AOF enabled will create
730      * a zero length file at startup, that will remain like that if no write
731      * operation is received. */
732     if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
733         server.aof_current_size = 0;
734         server.aof_fsync_offset = server.aof_current_size;
735         fclose(fp);
736         return C_ERR;
737     }
738 
739     /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
740      * to the same file we're about to read. */
741     server.aof_state = AOF_OFF;
742 
743     fakeClient = createAOFClient();
744     startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
745 
746     /* Check if this AOF file has an RDB preamble. In that case we need to
747      * load the RDB file and later continue loading the AOF tail. */
748     char sig[5]; /* "REDIS" */
749     if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
750         /* No RDB preamble, seek back at 0 offset. */
751         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
752     } else {
753         /* RDB preamble. Pass loading the RDB functions. */
754         rio rdb;
755 
756         serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
757         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
758         rioInitWithFile(&rdb,fp);
759         if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
760             serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
761             goto readerr;
762         } else {
763             serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
764         }
765     }
766 
767     /* Read the actual AOF file, in REPL format, command by command. */
768     while(1) {
769         int argc, j;
770         unsigned long len;
771         robj **argv;
772         char buf[128];
773         sds argsds;
774         struct redisCommand *cmd;
775 
776         /* Serve the clients from time to time */
777         if (!(loops++ % 1000)) {
778             loadingProgress(ftello(fp));
779             processEventsWhileBlocked();
780             processModuleLoadingProgressEvent(1);
781         }
782 
783         if (fgets(buf,sizeof(buf),fp) == NULL) {
784             if (feof(fp))
785                 break;
786             else
787                 goto readerr;
788         }
789         if (buf[0] != '*') goto fmterr;
790         if (buf[1] == '\0') goto readerr;
791         argc = atoi(buf+1);
792         if (argc < 1) goto fmterr;
793 
794         /* Load the next command in the AOF as our fake client
795          * argv. */
796         argv = zmalloc(sizeof(robj*)*argc);
797         fakeClient->argc = argc;
798         fakeClient->argv = argv;
799 
800         for (j = 0; j < argc; j++) {
801             /* Parse the argument len. */
802             char *readres = fgets(buf,sizeof(buf),fp);
803             if (readres == NULL || buf[0] != '$') {
804                 fakeClient->argc = j; /* Free up to j-1. */
805                 freeFakeClientArgv(fakeClient);
806                 if (readres == NULL)
807                     goto readerr;
808                 else
809                     goto fmterr;
810             }
811             len = strtol(buf+1,NULL,10);
812 
813             /* Read it into a string object. */
814             argsds = sdsnewlen(SDS_NOINIT,len);
815             if (len && fread(argsds,len,1,fp) == 0) {
816                 sdsfree(argsds);
817                 fakeClient->argc = j; /* Free up to j-1. */
818                 freeFakeClientArgv(fakeClient);
819                 goto readerr;
820             }
821             argv[j] = createObject(OBJ_STRING,argsds);
822 
823             /* Discard CRLF. */
824             if (fread(buf,2,1,fp) == 0) {
825                 fakeClient->argc = j+1; /* Free up to j. */
826                 freeFakeClientArgv(fakeClient);
827                 goto readerr;
828             }
829         }
830 
831         /* Command lookup */
832         cmd = lookupCommand(argv[0]->ptr);
833         if (!cmd) {
834             serverLog(LL_WARNING,
835                 "Unknown command '%s' reading the append only file",
836                 (char*)argv[0]->ptr);
837             exit(1);
838         }
839 
840         if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
841 
842         /* Run the command in the context of a fake client */
843         fakeClient->cmd = fakeClient->lastcmd = cmd;
844         if (fakeClient->flags & CLIENT_MULTI &&
845             fakeClient->cmd->proc != execCommand)
846         {
847             queueMultiCommand(fakeClient);
848         } else {
849             cmd->proc(fakeClient);
850         }
851 
852         /* The fake client should not have a reply */
853         serverAssert(fakeClient->bufpos == 0 &&
854                      listLength(fakeClient->reply) == 0);
855 
856         /* The fake client should never get blocked */
857         serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
858 
859         /* Clean up. Command code may have changed argv/argc so we use the
860          * argv/argc of the client instead of the local variables. */
861         freeFakeClientArgv(fakeClient);
862         fakeClient->cmd = NULL;
863         if (server.aof_load_truncated) valid_up_to = ftello(fp);
864         if (server.key_load_delay)
865             usleep(server.key_load_delay);
866     }
867 
868     /* This point can only be reached when EOF is reached without errors.
869      * If the client is in the middle of a MULTI/EXEC, handle it as it was
870      * a short read, even if technically the protocol is correct: we want
871      * to remove the unprocessed tail and continue. */
872     if (fakeClient->flags & CLIENT_MULTI) {
873         serverLog(LL_WARNING,
874             "Revert incomplete MULTI/EXEC transaction in AOF file");
875         valid_up_to = valid_before_multi;
876         goto uxeof;
877     }
878 
879 loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
880     fclose(fp);
881     freeFakeClient(fakeClient);
882     server.aof_state = old_aof_state;
883     stopLoading(1);
884     aofUpdateCurrentSize();
885     server.aof_rewrite_base_size = server.aof_current_size;
886     server.aof_fsync_offset = server.aof_current_size;
887     return C_OK;
888 
889 readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
890     if (!feof(fp)) {
891         if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
892         fclose(fp);
893         serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
894         exit(1);
895     }
896 
897 uxeof: /* Unexpected AOF end of file. */
898     if (server.aof_load_truncated) {
899         serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
900         serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
901             (unsigned long long) valid_up_to);
902         if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
903             if (valid_up_to == -1) {
904                 serverLog(LL_WARNING,"Last valid command offset is invalid");
905             } else {
906                 serverLog(LL_WARNING,"Error truncating the AOF file: %s",
907                     strerror(errno));
908             }
909         } else {
910             /* Make sure the AOF file descriptor points to the end of the
911              * file after the truncate call. */
912             if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
913                 serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
914                     strerror(errno));
915             } else {
916                 serverLog(LL_WARNING,
917                     "AOF loaded anyway because aof-load-truncated is enabled");
918                 goto loaded_ok;
919             }
920         }
921     }
922     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
923     fclose(fp);
924     serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
925     exit(1);
926 
927 fmterr: /* Format error. */
928     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
929     fclose(fp);
930     serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
931     exit(1);
932 }
933 
934 /* ----------------------------------------------------------------------------
935  * AOF rewrite
936  * ------------------------------------------------------------------------- */
937 
938 /* Delegate writing an object to writing a bulk string or bulk long long.
939  * This is not placed in rio.c since that adds the server.h dependency. */
rioWriteBulkObject(rio * r,robj * obj)940 int rioWriteBulkObject(rio *r, robj *obj) {
941     /* Avoid using getDecodedObject to help copy-on-write (we are often
942      * in a child process when this function is called). */
943     if (obj->encoding == OBJ_ENCODING_INT) {
944         return rioWriteBulkLongLong(r,(long)obj->ptr);
945     } else if (sdsEncodedObject(obj)) {
946         return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
947     } else {
948         serverPanic("Unknown string encoding");
949     }
950 }
951 
952 /* Emit the commands needed to rebuild a list object.
953  * The function returns 0 on error, 1 on success. */
rewriteListObject(rio * r,robj * key,robj * o)954 int rewriteListObject(rio *r, robj *key, robj *o) {
955     long long count = 0, items = listTypeLength(o);
956 
957     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
958         quicklist *list = o->ptr;
959         quicklistIter *li = quicklistGetIterator(list, AL_START_HEAD);
960         quicklistEntry entry;
961 
962         while (quicklistNext(li,&entry)) {
963             if (count == 0) {
964                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
965                     AOF_REWRITE_ITEMS_PER_CMD : items;
966                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
967                 if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
968                 if (rioWriteBulkObject(r,key) == 0) return 0;
969             }
970 
971             if (entry.value) {
972                 if (rioWriteBulkString(r,(char*)entry.value,entry.sz) == 0) return 0;
973             } else {
974                 if (rioWriteBulkLongLong(r,entry.longval) == 0) return 0;
975             }
976             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
977             items--;
978         }
979         quicklistReleaseIterator(li);
980     } else {
981         serverPanic("Unknown list encoding");
982     }
983     return 1;
984 }
985 
986 /* Emit the commands needed to rebuild a set object.
987  * The function returns 0 on error, 1 on success. */
rewriteSetObject(rio * r,robj * key,robj * o)988 int rewriteSetObject(rio *r, robj *key, robj *o) {
989     long long count = 0, items = setTypeSize(o);
990 
991     if (o->encoding == OBJ_ENCODING_INTSET) {
992         int ii = 0;
993         int64_t llval;
994 
995         while(intsetGet(o->ptr,ii++,&llval)) {
996             if (count == 0) {
997                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
998                     AOF_REWRITE_ITEMS_PER_CMD : items;
999 
1000                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
1001                 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
1002                 if (rioWriteBulkObject(r,key) == 0) return 0;
1003             }
1004             if (rioWriteBulkLongLong(r,llval) == 0) return 0;
1005             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1006             items--;
1007         }
1008     } else if (o->encoding == OBJ_ENCODING_HT) {
1009         dictIterator *di = dictGetIterator(o->ptr);
1010         dictEntry *de;
1011 
1012         while((de = dictNext(di)) != NULL) {
1013             sds ele = dictGetKey(de);
1014             if (count == 0) {
1015                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1016                     AOF_REWRITE_ITEMS_PER_CMD : items;
1017 
1018                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
1019                 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
1020                 if (rioWriteBulkObject(r,key) == 0) return 0;
1021             }
1022             if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
1023             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1024             items--;
1025         }
1026         dictReleaseIterator(di);
1027     } else {
1028         serverPanic("Unknown set encoding");
1029     }
1030     return 1;
1031 }
1032 
1033 /* Emit the commands needed to rebuild a sorted set object.
1034  * The function returns 0 on error, 1 on success. */
rewriteSortedSetObject(rio * r,robj * key,robj * o)1035 int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
1036     long long count = 0, items = zsetLength(o);
1037 
1038     if (o->encoding == OBJ_ENCODING_ZIPLIST) {
1039         unsigned char *zl = o->ptr;
1040         unsigned char *eptr, *sptr;
1041         unsigned char *vstr;
1042         unsigned int vlen;
1043         long long vll;
1044         double score;
1045 
1046         eptr = ziplistIndex(zl,0);
1047         serverAssert(eptr != NULL);
1048         sptr = ziplistNext(zl,eptr);
1049         serverAssert(sptr != NULL);
1050 
1051         while (eptr != NULL) {
1052             serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
1053             score = zzlGetScore(sptr);
1054 
1055             if (count == 0) {
1056                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1057                     AOF_REWRITE_ITEMS_PER_CMD : items;
1058 
1059                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1060                 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
1061                 if (rioWriteBulkObject(r,key) == 0) return 0;
1062             }
1063             if (rioWriteBulkDouble(r,score) == 0) return 0;
1064             if (vstr != NULL) {
1065                 if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
1066             } else {
1067                 if (rioWriteBulkLongLong(r,vll) == 0) return 0;
1068             }
1069             zzlNext(zl,&eptr,&sptr);
1070             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1071             items--;
1072         }
1073     } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
1074         zset *zs = o->ptr;
1075         dictIterator *di = dictGetIterator(zs->dict);
1076         dictEntry *de;
1077 
1078         while((de = dictNext(di)) != NULL) {
1079             sds ele = dictGetKey(de);
1080             double *score = dictGetVal(de);
1081 
1082             if (count == 0) {
1083                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1084                     AOF_REWRITE_ITEMS_PER_CMD : items;
1085 
1086                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1087                 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
1088                 if (rioWriteBulkObject(r,key) == 0) return 0;
1089             }
1090             if (rioWriteBulkDouble(r,*score) == 0) return 0;
1091             if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
1092             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1093             items--;
1094         }
1095         dictReleaseIterator(di);
1096     } else {
1097         serverPanic("Unknown sorted zset encoding");
1098     }
1099     return 1;
1100 }
1101 
1102 /* Write either the key or the value of the currently selected item of a hash.
1103  * The 'hi' argument passes a valid Redis hash iterator.
1104  * The 'what' filed specifies if to write a key or a value and can be
1105  * either OBJ_HASH_KEY or OBJ_HASH_VALUE.
1106  *
1107  * The function returns 0 on error, non-zero on success. */
rioWriteHashIteratorCursor(rio * r,hashTypeIterator * hi,int what)1108 static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
1109     if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
1110         unsigned char *vstr = NULL;
1111         unsigned int vlen = UINT_MAX;
1112         long long vll = LLONG_MAX;
1113 
1114         hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
1115         if (vstr)
1116             return rioWriteBulkString(r, (char*)vstr, vlen);
1117         else
1118             return rioWriteBulkLongLong(r, vll);
1119     } else if (hi->encoding == OBJ_ENCODING_HT) {
1120         sds value = hashTypeCurrentFromHashTable(hi, what);
1121         return rioWriteBulkString(r, value, sdslen(value));
1122     }
1123 
1124     serverPanic("Unknown hash encoding");
1125     return 0;
1126 }
1127 
1128 /* Emit the commands needed to rebuild a hash object.
1129  * The function returns 0 on error, 1 on success. */
rewriteHashObject(rio * r,robj * key,robj * o)1130 int rewriteHashObject(rio *r, robj *key, robj *o) {
1131     hashTypeIterator *hi;
1132     long long count = 0, items = hashTypeLength(o);
1133 
1134     hi = hashTypeInitIterator(o);
1135     while (hashTypeNext(hi) != C_ERR) {
1136         if (count == 0) {
1137             int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1138                 AOF_REWRITE_ITEMS_PER_CMD : items;
1139 
1140             if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1141             if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
1142             if (rioWriteBulkObject(r,key) == 0) return 0;
1143         }
1144 
1145         if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
1146         if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE) == 0) return 0;
1147         if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1148         items--;
1149     }
1150 
1151     hashTypeReleaseIterator(hi);
1152 
1153     return 1;
1154 }
1155 
1156 /* Helper for rewriteStreamObject() that generates a bulk string into the
1157  * AOF representing the ID 'id'. */
rioWriteBulkStreamID(rio * r,streamID * id)1158 int rioWriteBulkStreamID(rio *r,streamID *id) {
1159     int retval;
1160 
1161     sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
1162     retval = rioWriteBulkString(r,replyid,sdslen(replyid));
1163     sdsfree(replyid);
1164     return retval;
1165 }
1166 
1167 /* Helper for rewriteStreamObject(): emit the XCLAIM needed in order to
1168  * add the message described by 'nack' having the id 'rawid', into the pending
1169  * list of the specified consumer. All this in the context of the specified
1170  * key and group. */
rioWriteStreamPendingEntry(rio * r,robj * key,const char * groupname,size_t groupname_len,streamConsumer * consumer,unsigned char * rawid,streamNACK * nack)1171 int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
1172      /* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
1173                RETRYCOUNT <count> JUSTID FORCE. */
1174     streamID id;
1175     streamDecodeID(rawid,&id);
1176     if (rioWriteBulkCount(r,'*',12) == 0) return 0;
1177     if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
1178     if (rioWriteBulkObject(r,key) == 0) return 0;
1179     if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
1180     if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
1181     if (rioWriteBulkString(r,"0",1) == 0) return 0;
1182     if (rioWriteBulkStreamID(r,&id) == 0) return 0;
1183     if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
1184     if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
1185     if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
1186     if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
1187     if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
1188     if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
1189     return 1;
1190 }
1191 
1192 /* Emit the commands needed to rebuild a stream object.
1193  * The function returns 0 on error, 1 on success. */
rewriteStreamObject(rio * r,robj * key,robj * o)1194 int rewriteStreamObject(rio *r, robj *key, robj *o) {
1195     stream *s = o->ptr;
1196     streamIterator si;
1197     streamIteratorStart(&si,s,NULL,NULL,0);
1198     streamID id;
1199     int64_t numfields;
1200 
1201     if (s->length) {
1202         /* Reconstruct the stream data using XADD commands. */
1203         while(streamIteratorGetID(&si,&id,&numfields)) {
1204             /* Emit a two elements array for each item. The first is
1205              * the ID, the second is an array of field-value pairs. */
1206 
1207             /* Emit the XADD <key> <id> ...fields... command. */
1208             if (!rioWriteBulkCount(r,'*',3+numfields*2) ||
1209                 !rioWriteBulkString(r,"XADD",4) ||
1210                 !rioWriteBulkObject(r,key) ||
1211                 !rioWriteBulkStreamID(r,&id))
1212             {
1213                 streamIteratorStop(&si);
1214                 return 0;
1215             }
1216             while(numfields--) {
1217                 unsigned char *field, *value;
1218                 int64_t field_len, value_len;
1219                 streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
1220                 if (!rioWriteBulkString(r,(char*)field,field_len) ||
1221                     !rioWriteBulkString(r,(char*)value,value_len))
1222                 {
1223                     streamIteratorStop(&si);
1224                     return 0;
1225                 }
1226             }
1227         }
1228     } else {
1229         /* Use the XADD MAXLEN 0 trick to generate an empty stream if
1230          * the key we are serializing is an empty string, which is possible
1231          * for the Stream type. */
1232         id.ms = 0; id.seq = 1;
1233         if (!rioWriteBulkCount(r,'*',7) ||
1234             !rioWriteBulkString(r,"XADD",4) ||
1235             !rioWriteBulkObject(r,key) ||
1236             !rioWriteBulkString(r,"MAXLEN",6) ||
1237             !rioWriteBulkString(r,"0",1) ||
1238             !rioWriteBulkStreamID(r,&id) ||
1239             !rioWriteBulkString(r,"x",1) ||
1240             !rioWriteBulkString(r,"y",1))
1241         {
1242             streamIteratorStop(&si);
1243             return 0;
1244         }
1245     }
1246 
1247     /* Append XSETID after XADD, make sure lastid is correct,
1248      * in case of XDEL lastid. */
1249     if (!rioWriteBulkCount(r,'*',3) ||
1250         !rioWriteBulkString(r,"XSETID",6) ||
1251         !rioWriteBulkObject(r,key) ||
1252         !rioWriteBulkStreamID(r,&s->last_id))
1253     {
1254         streamIteratorStop(&si);
1255         return 0;
1256     }
1257 
1258 
1259     /* Create all the stream consumer groups. */
1260     if (s->cgroups) {
1261         raxIterator ri;
1262         raxStart(&ri,s->cgroups);
1263         raxSeek(&ri,"^",NULL,0);
1264         while(raxNext(&ri)) {
1265             streamCG *group = ri.data;
1266             /* Emit the XGROUP CREATE in order to create the group. */
1267             if (!rioWriteBulkCount(r,'*',5) ||
1268                 !rioWriteBulkString(r,"XGROUP",6) ||
1269                 !rioWriteBulkString(r,"CREATE",6) ||
1270                 !rioWriteBulkObject(r,key) ||
1271                 !rioWriteBulkString(r,(char*)ri.key,ri.key_len) ||
1272                 !rioWriteBulkStreamID(r,&group->last_id))
1273             {
1274                 raxStop(&ri);
1275                 streamIteratorStop(&si);
1276                 return 0;
1277             }
1278 
1279             /* Generate XCLAIMs for each consumer that happens to
1280              * have pending entries. Empty consumers have no semantical
1281              * value so they are discarded. */
1282             raxIterator ri_cons;
1283             raxStart(&ri_cons,group->consumers);
1284             raxSeek(&ri_cons,"^",NULL,0);
1285             while(raxNext(&ri_cons)) {
1286                 streamConsumer *consumer = ri_cons.data;
1287                 /* For the current consumer, iterate all the PEL entries
1288                  * to emit the XCLAIM protocol. */
1289                 raxIterator ri_pel;
1290                 raxStart(&ri_pel,consumer->pel);
1291                 raxSeek(&ri_pel,"^",NULL,0);
1292                 while(raxNext(&ri_pel)) {
1293                     streamNACK *nack = ri_pel.data;
1294                     if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
1295                                                    ri.key_len,consumer,
1296                                                    ri_pel.key,nack) == 0)
1297                     {
1298                         raxStop(&ri_pel);
1299                         raxStop(&ri_cons);
1300                         raxStop(&ri);
1301                         streamIteratorStop(&si);
1302                         return 0;
1303                     }
1304                 }
1305                 raxStop(&ri_pel);
1306             }
1307             raxStop(&ri_cons);
1308         }
1309         raxStop(&ri);
1310     }
1311 
1312     streamIteratorStop(&si);
1313     return 1;
1314 }
1315 
1316 /* Call the module type callback in order to rewrite a data type
1317  * that is exported by a module and is not handled by Redis itself.
1318  * The function returns 0 on error, 1 on success. */
rewriteModuleObject(rio * r,robj * key,robj * o)1319 int rewriteModuleObject(rio *r, robj *key, robj *o) {
1320     RedisModuleIO io;
1321     moduleValue *mv = o->ptr;
1322     moduleType *mt = mv->type;
1323     moduleInitIOContext(io,mt,r,key);
1324     mt->aof_rewrite(&io,key,mv->value);
1325     if (io.ctx) {
1326         moduleFreeContext(io.ctx);
1327         zfree(io.ctx);
1328     }
1329     return io.error ? 0 : 1;
1330 }
1331 
1332 /* This function is called by the child rewriting the AOF file to read
1333  * the difference accumulated from the parent into a buffer, that is
1334  * concatenated at the end of the rewrite. */
aofReadDiffFromParent(void)1335 ssize_t aofReadDiffFromParent(void) {
1336     char buf[65536]; /* Default pipe buffer size on most Linux systems. */
1337     ssize_t nread, total = 0;
1338 
1339     while ((nread =
1340             read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
1341         server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
1342         total += nread;
1343     }
1344     return total;
1345 }
1346 
rewriteAppendOnlyFileRio(rio * aof)1347 int rewriteAppendOnlyFileRio(rio *aof) {
1348     dictIterator *di = NULL;
1349     dictEntry *de;
1350     size_t processed = 0;
1351     int j;
1352 
1353     for (j = 0; j < server.dbnum; j++) {
1354         char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
1355         redisDb *db = server.db+j;
1356         dict *d = db->dict;
1357         if (dictSize(d) == 0) continue;
1358         di = dictGetSafeIterator(d);
1359 
1360         /* SELECT the new DB */
1361         if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
1362         if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
1363 
1364         /* Iterate this DB writing every entry */
1365         while((de = dictNext(di)) != NULL) {
1366             sds keystr;
1367             robj key, *o;
1368             long long expiretime;
1369 
1370             keystr = dictGetKey(de);
1371             o = dictGetVal(de);
1372             initStaticStringObject(key,keystr);
1373 
1374             expiretime = getExpire(db,&key);
1375 
1376             /* Save the key and associated value */
1377             if (o->type == OBJ_STRING) {
1378                 /* Emit a SET command */
1379                 char cmd[]="*3\r\n$3\r\nSET\r\n";
1380                 if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1381                 /* Key and value */
1382                 if (rioWriteBulkObject(aof,&key) == 0) goto werr;
1383                 if (rioWriteBulkObject(aof,o) == 0) goto werr;
1384             } else if (o->type == OBJ_LIST) {
1385                 if (rewriteListObject(aof,&key,o) == 0) goto werr;
1386             } else if (o->type == OBJ_SET) {
1387                 if (rewriteSetObject(aof,&key,o) == 0) goto werr;
1388             } else if (o->type == OBJ_ZSET) {
1389                 if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
1390             } else if (o->type == OBJ_HASH) {
1391                 if (rewriteHashObject(aof,&key,o) == 0) goto werr;
1392             } else if (o->type == OBJ_STREAM) {
1393                 if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
1394             } else if (o->type == OBJ_MODULE) {
1395                 if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
1396             } else {
1397                 serverPanic("Unknown object type");
1398             }
1399             /* Save the expire time */
1400             if (expiretime != -1) {
1401                 char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
1402                 if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1403                 if (rioWriteBulkObject(aof,&key) == 0) goto werr;
1404                 if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
1405             }
1406             /* Read some diff from the parent process from time to time. */
1407             if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
1408                 processed = aof->processed_bytes;
1409                 aofReadDiffFromParent();
1410             }
1411         }
1412         dictReleaseIterator(di);
1413         di = NULL;
1414     }
1415     return C_OK;
1416 
1417 werr:
1418     if (di) dictReleaseIterator(di);
1419     return C_ERR;
1420 }
1421 
1422 /* Write a sequence of commands able to fully rebuild the dataset into
1423  * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
1424  *
1425  * In order to minimize the number of commands needed in the rewritten
1426  * log Redis uses variadic commands when possible, such as RPUSH, SADD
1427  * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
1428  * are inserted using a single command. */
rewriteAppendOnlyFile(char * filename)1429 int rewriteAppendOnlyFile(char *filename) {
1430     rio aof;
1431     FILE *fp = NULL;
1432     char tmpfile[256];
1433     char byte;
1434 
1435     /* Note that we have to use a different temp name here compared to the
1436      * one used by rewriteAppendOnlyFileBackground() function. */
1437     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
1438     fp = fopen(tmpfile,"w");
1439     if (!fp) {
1440         serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
1441         return C_ERR;
1442     }
1443 
1444     server.aof_child_diff = sdsempty();
1445     rioInitWithFile(&aof,fp);
1446 
1447     if (server.aof_rewrite_incremental_fsync)
1448         rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
1449 
1450     startSaving(RDBFLAGS_AOF_PREAMBLE);
1451 
1452     if (server.aof_use_rdb_preamble) {
1453         int error;
1454         if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
1455             errno = error;
1456             goto werr;
1457         }
1458     } else {
1459         if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
1460     }
1461 
1462     /* Do an initial slow fsync here while the parent is still sending
1463      * data, in order to make the next final fsync faster. */
1464     if (fflush(fp) == EOF) goto werr;
1465     if (fsync(fileno(fp)) == -1) goto werr;
1466 
1467     /* Read again a few times to get more data from the parent.
1468      * We can't read forever (the server may receive data from clients
1469      * faster than it is able to send data to the child), so we try to read
1470      * some more data in a loop as soon as there is a good chance more data
1471      * will come. If it looks like we are wasting time, we abort (this
1472      * happens after 20 ms without new data). */
1473     int nodata = 0;
1474     mstime_t start = mstime();
1475     while(mstime()-start < 1000 && nodata < 20) {
1476         if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
1477         {
1478             nodata++;
1479             continue;
1480         }
1481         nodata = 0; /* Start counting from zero, we stop on N *contiguous*
1482                        timeouts. */
1483         aofReadDiffFromParent();
1484     }
1485 
1486     /* Ask the master to stop sending diffs. */
1487     if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
1488     if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
1489         goto werr;
1490     /* We read the ACK from the server using a 10 seconds timeout. Normally
1491      * it should reply ASAP, but just in case we lose its reply, we are sure
1492      * the child will eventually get terminated. */
1493     if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
1494         byte != '!') goto werr;
1495     serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
1496 
1497     /* Read the final diff if any. */
1498     aofReadDiffFromParent();
1499 
1500     /* Write the received diff to the file. */
1501     serverLog(LL_NOTICE,
1502         "Concatenating %.2f MB of AOF diff received from parent.",
1503         (double) sdslen(server.aof_child_diff) / (1024*1024));
1504     if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
1505         goto werr;
1506 
1507     /* Make sure data will not remain on the OS's output buffers */
1508     if (fflush(fp)) goto werr;
1509     if (fsync(fileno(fp))) goto werr;
1510     if (fclose(fp)) { fp = NULL; goto werr; }
1511     fp = NULL;
1512 
1513     /* Use RENAME to make sure the DB file is changed atomically only
1514      * if the generate DB file is ok. */
1515     if (rename(tmpfile,filename) == -1) {
1516         serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
1517         unlink(tmpfile);
1518         stopSaving(0);
1519         return C_ERR;
1520     }
1521     serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
1522     stopSaving(1);
1523     return C_OK;
1524 
1525 werr:
1526     serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
1527     if (fp) fclose(fp);
1528     unlink(tmpfile);
1529     stopSaving(0);
1530     return C_ERR;
1531 }
1532 
1533 /* ----------------------------------------------------------------------------
1534  * AOF rewrite pipes for IPC
1535  * -------------------------------------------------------------------------- */
1536 
1537 /* This event handler is called when the AOF rewriting child sends us a
1538  * single '!' char to signal we should stop sending buffer diffs. The
1539  * parent sends a '!' as well to acknowledge. */
aofChildPipeReadable(aeEventLoop * el,int fd,void * privdata,int mask)1540 void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
1541     char byte;
1542     UNUSED(el);
1543     UNUSED(privdata);
1544     UNUSED(mask);
1545 
1546     if (read(fd,&byte,1) == 1 && byte == '!') {
1547         serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
1548         server.aof_stop_sending_diff = 1;
1549         if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
1550             /* If we can't send the ack, inform the user, but don't try again
1551              * since in the other side the children will use a timeout if the
1552              * kernel can't buffer our write, or, the children was
1553              * terminated. */
1554             serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
1555                 strerror(errno));
1556         }
1557     }
1558     /* Remove the handler since this can be called only one time during a
1559      * rewrite. */
1560     aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
1561 }
1562 
1563 /* Create the pipes used for parent - child process IPC during rewrite.
1564  * We have a data pipe used to send AOF incremental diffs to the child,
1565  * and two other pipes used by the children to signal it finished with
1566  * the rewrite so no more data should be written, and another for the
1567  * parent to acknowledge it understood this new condition. */
aofCreatePipes(void)1568 int aofCreatePipes(void) {
1569     int fds[6] = {-1, -1, -1, -1, -1, -1};
1570     int j;
1571 
1572     if (pipe(fds) == -1) goto error; /* parent -> children data. */
1573     if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
1574     if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
1575     /* Parent -> children data is non blocking. */
1576     if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
1577     if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
1578     if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
1579 
1580     server.aof_pipe_write_data_to_child = fds[1];
1581     server.aof_pipe_read_data_from_parent = fds[0];
1582     server.aof_pipe_write_ack_to_parent = fds[3];
1583     server.aof_pipe_read_ack_from_child = fds[2];
1584     server.aof_pipe_write_ack_to_child = fds[5];
1585     server.aof_pipe_read_ack_from_parent = fds[4];
1586     server.aof_stop_sending_diff = 0;
1587     return C_OK;
1588 
1589 error:
1590     serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
1591         strerror(errno));
1592     for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
1593     return C_ERR;
1594 }
1595 
aofClosePipes(void)1596 void aofClosePipes(void) {
1597     aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
1598     aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
1599     close(server.aof_pipe_write_data_to_child);
1600     close(server.aof_pipe_read_data_from_parent);
1601     close(server.aof_pipe_write_ack_to_parent);
1602     close(server.aof_pipe_read_ack_from_child);
1603     close(server.aof_pipe_write_ack_to_child);
1604     close(server.aof_pipe_read_ack_from_parent);
1605 }
1606 
1607 /* ----------------------------------------------------------------------------
1608  * AOF background rewrite
1609  * ------------------------------------------------------------------------- */
1610 
1611 /* This is how rewriting of the append only file in background works:
1612  *
1613  * 1) The user calls BGREWRITEAOF
1614  * 2) Redis calls this function, that forks():
1615  *    2a) the child rewrite the append only file in a temp file.
1616  *    2b) the parent accumulates differences in server.aof_rewrite_buf.
1617  * 3) When the child finished '2a' exists.
1618  * 4) The parent will trap the exit code, if it's OK, will append the
1619  *    data accumulated into server.aof_rewrite_buf into the temp file, and
1620  *    finally will rename(2) the temp file in the actual file name.
1621  *    The the new file is reopened as the new append only file. Profit!
1622  */
rewriteAppendOnlyFileBackground(void)1623 int rewriteAppendOnlyFileBackground(void) {
1624     pid_t childpid;
1625 
1626     if (hasActiveChildProcess()) return C_ERR;
1627     if (aofCreatePipes() != C_OK) return C_ERR;
1628     openChildInfoPipe();
1629     if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
1630         char tmpfile[256];
1631 
1632         /* Child */
1633         redisSetProcTitle("redis-aof-rewrite");
1634         redisSetCpuAffinity(server.aof_rewrite_cpulist);
1635         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
1636         if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
1637             sendChildCOWInfo(CHILD_TYPE_AOF, "AOF rewrite");
1638             exitFromChild(0);
1639         } else {
1640             exitFromChild(1);
1641         }
1642     } else {
1643         /* Parent */
1644         if (childpid == -1) {
1645             closeChildInfoPipe();
1646             serverLog(LL_WARNING,
1647                 "Can't rewrite append only file in background: fork: %s",
1648                 strerror(errno));
1649             aofClosePipes();
1650             return C_ERR;
1651         }
1652         serverLog(LL_NOTICE,
1653             "Background append only file rewriting started by pid %d",childpid);
1654         server.aof_rewrite_scheduled = 0;
1655         server.aof_rewrite_time_start = time(NULL);
1656         server.aof_child_pid = childpid;
1657         updateDictResizePolicy();
1658         /* We set appendseldb to -1 in order to force the next call to the
1659          * feedAppendOnlyFile() to issue a SELECT command, so the differences
1660          * accumulated by the parent into server.aof_rewrite_buf will start
1661          * with a SELECT statement and it will be safe to merge. */
1662         server.aof_selected_db = -1;
1663         replicationScriptCacheFlush();
1664         return C_OK;
1665     }
1666     return C_OK; /* unreached */
1667 }
1668 
bgrewriteaofCommand(client * c)1669 void bgrewriteaofCommand(client *c) {
1670     if (server.aof_child_pid != -1) {
1671         addReplyError(c,"Background append only file rewriting already in progress");
1672     } else if (hasActiveChildProcess()) {
1673         server.aof_rewrite_scheduled = 1;
1674         addReplyStatus(c,"Background append only file rewriting scheduled");
1675     } else if (rewriteAppendOnlyFileBackground() == C_OK) {
1676         addReplyStatus(c,"Background append only file rewriting started");
1677     } else {
1678         addReplyError(c,"Can't execute an AOF background rewriting. "
1679                         "Please check the server logs for more information.");
1680     }
1681 }
1682 
aofRemoveTempFile(pid_t childpid)1683 void aofRemoveTempFile(pid_t childpid) {
1684     char tmpfile[256];
1685 
1686     snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
1687     bg_unlink(tmpfile);
1688 
1689     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid);
1690     bg_unlink(tmpfile);
1691 }
1692 
1693 /* Update the server.aof_current_size field explicitly using stat(2)
1694  * to check the size of the file. This is useful after a rewrite or after
1695  * a restart, normally the size is updated just adding the write length
1696  * to the current length, that is much faster. */
aofUpdateCurrentSize(void)1697 void aofUpdateCurrentSize(void) {
1698     struct redis_stat sb;
1699     mstime_t latency;
1700 
1701     latencyStartMonitor(latency);
1702     if (redis_fstat(server.aof_fd,&sb) == -1) {
1703         serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
1704             strerror(errno));
1705     } else {
1706         server.aof_current_size = sb.st_size;
1707     }
1708     latencyEndMonitor(latency);
1709     latencyAddSampleIfNeeded("aof-fstat",latency);
1710 }
1711 
1712 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1713  * Handle this. */
backgroundRewriteDoneHandler(int exitcode,int bysignal)1714 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
1715     if (!bysignal && exitcode == 0) {
1716         int newfd, oldfd;
1717         char tmpfile[256];
1718         long long now = ustime();
1719         mstime_t latency;
1720 
1721         serverLog(LL_NOTICE,
1722             "Background AOF rewrite terminated with success");
1723 
1724         /* Flush the differences accumulated by the parent to the
1725          * rewritten AOF. */
1726         latencyStartMonitor(latency);
1727         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
1728             (int)server.aof_child_pid);
1729         newfd = open(tmpfile,O_WRONLY|O_APPEND);
1730         if (newfd == -1) {
1731             serverLog(LL_WARNING,
1732                 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
1733             goto cleanup;
1734         }
1735 
1736         if (aofRewriteBufferWrite(newfd) == -1) {
1737             serverLog(LL_WARNING,
1738                 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
1739             close(newfd);
1740             goto cleanup;
1741         }
1742         latencyEndMonitor(latency);
1743         latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
1744 
1745         serverLog(LL_NOTICE,
1746             "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
1747 
1748         /* The only remaining thing to do is to rename the temporary file to
1749          * the configured file and switch the file descriptor used to do AOF
1750          * writes. We don't want close(2) or rename(2) calls to block the
1751          * server on old file deletion.
1752          *
1753          * There are two possible scenarios:
1754          *
1755          * 1) AOF is DISABLED and this was a one time rewrite. The temporary
1756          * file will be renamed to the configured file. When this file already
1757          * exists, it will be unlinked, which may block the server.
1758          *
1759          * 2) AOF is ENABLED and the rewritten AOF will immediately start
1760          * receiving writes. After the temporary file is renamed to the
1761          * configured file, the original AOF file descriptor will be closed.
1762          * Since this will be the last reference to that file, closing it
1763          * causes the underlying file to be unlinked, which may block the
1764          * server.
1765          *
1766          * To mitigate the blocking effect of the unlink operation (either
1767          * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
1768          * use a background thread to take care of this. First, we
1769          * make scenario 1 identical to scenario 2 by opening the target file
1770          * when it exists. The unlink operation after the rename(2) will then
1771          * be executed upon calling close(2) for its descriptor. Everything to
1772          * guarantee atomicity for this switch has already happened by then, so
1773          * we don't care what the outcome or duration of that close operation
1774          * is, as long as the file descriptor is released again. */
1775         if (server.aof_fd == -1) {
1776             /* AOF disabled */
1777 
1778             /* Don't care if this fails: oldfd will be -1 and we handle that.
1779              * One notable case of -1 return is if the old file does
1780              * not exist. */
1781             oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
1782         } else {
1783             /* AOF enabled */
1784             oldfd = -1; /* We'll set this to the current AOF filedes later. */
1785         }
1786 
1787         /* Rename the temporary file. This will not unlink the target file if
1788          * it exists, because we reference it with "oldfd". */
1789         latencyStartMonitor(latency);
1790         if (rename(tmpfile,server.aof_filename) == -1) {
1791             serverLog(LL_WARNING,
1792                 "Error trying to rename the temporary AOF file %s into %s: %s",
1793                 tmpfile,
1794                 server.aof_filename,
1795                 strerror(errno));
1796             close(newfd);
1797             if (oldfd != -1) close(oldfd);
1798             goto cleanup;
1799         }
1800         latencyEndMonitor(latency);
1801         latencyAddSampleIfNeeded("aof-rename",latency);
1802 
1803         if (server.aof_fd == -1) {
1804             /* AOF disabled, we don't need to set the AOF file descriptor
1805              * to this new file, so we can close it. */
1806             close(newfd);
1807         } else {
1808             /* AOF enabled, replace the old fd with the new one. */
1809             oldfd = server.aof_fd;
1810             server.aof_fd = newfd;
1811             if (server.aof_fsync == AOF_FSYNC_ALWAYS)
1812                 redis_fsync(newfd);
1813             else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
1814                 aof_background_fsync(newfd);
1815             server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
1816             aofUpdateCurrentSize();
1817             server.aof_rewrite_base_size = server.aof_current_size;
1818             server.aof_fsync_offset = server.aof_current_size;
1819 
1820             /* Clear regular AOF buffer since its contents was just written to
1821              * the new AOF from the background rewrite buffer. */
1822             sdsfree(server.aof_buf);
1823             server.aof_buf = sdsempty();
1824         }
1825 
1826         server.aof_lastbgrewrite_status = C_OK;
1827 
1828         serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
1829         /* Change state from WAIT_REWRITE to ON if needed */
1830         if (server.aof_state == AOF_WAIT_REWRITE)
1831             server.aof_state = AOF_ON;
1832 
1833         /* Asynchronously close the overwritten AOF. */
1834         if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
1835 
1836         serverLog(LL_VERBOSE,
1837             "Background AOF rewrite signal handler took %lldus", ustime()-now);
1838     } else if (!bysignal && exitcode != 0) {
1839         server.aof_lastbgrewrite_status = C_ERR;
1840 
1841         serverLog(LL_WARNING,
1842             "Background AOF rewrite terminated with error");
1843     } else {
1844         /* SIGUSR1 is whitelisted, so we have a way to kill a child without
1845          * triggering an error condition. */
1846         if (bysignal != SIGUSR1)
1847             server.aof_lastbgrewrite_status = C_ERR;
1848 
1849         serverLog(LL_WARNING,
1850             "Background AOF rewrite terminated by signal %d", bysignal);
1851     }
1852 
1853 cleanup:
1854     aofClosePipes();
1855     aofRewriteBufferReset();
1856     aofRemoveTempFile(server.aof_child_pid);
1857     server.aof_child_pid = -1;
1858     server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
1859     server.aof_rewrite_time_start = -1;
1860     /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
1861     if (server.aof_state == AOF_WAIT_REWRITE)
1862         server.aof_rewrite_scheduled = 1;
1863 }
1864