1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "server.h"
33 #include "cluster.h"
34 #include "bio.h"
35 
36 #include <sys/time.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include <sys/socket.h>
40 #include <sys/stat.h>
41 
42 void replicationDiscardCachedMaster(void);
43 void replicationResurrectCachedMaster(connection *conn);
44 void replicationSendAck(void);
45 void putSlaveOnline(client *slave);
46 int cancelReplicationHandshake(void);
47 
48 /* We take a global flag to remember if this instance generated an RDB
49  * because of replication, so that we can remove the RDB file in case
50  * the instance is configured to have no persistence. */
51 int RDBGeneratedByReplication = 0;
52 
53 /* --------------------------- Utility functions ---------------------------- */
54 
55 /* Return the pointer to a string representing the slave ip:listening_port
56  * pair. Mostly useful for logging, since we want to log a slave using its
57  * IP address and its listening port which is more clear for the user, for
58  * example: "Closing connection with replica 10.1.2.3:6380". */
replicationGetSlaveName(client * c)59 char *replicationGetSlaveName(client *c) {
60     static char buf[NET_PEER_ID_LEN];
61     char ip[NET_IP_STR_LEN];
62 
63     ip[0] = '\0';
64     buf[0] = '\0';
65     if (c->slave_ip[0] != '\0' ||
66         connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
67     {
68         /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
69         if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
70 
71         if (c->slave_listening_port)
72             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
73         else
74             snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip);
75     } else {
76         snprintf(buf,sizeof(buf),"client id #%llu",
77             (unsigned long long) c->id);
78     }
79     return buf;
80 }
81 
82 /* Plain unlink() can block for quite some time in order to actually apply
83  * the file deletion to the filesystem. This call removes the file in a
84  * background thread instead. We actually just do close() in the thread,
85  * by using the fact that if there is another instance of the same file open,
86  * the foreground unlink() will only remove the fs name, and deleting the
87  * file's storage space will only happen once the last reference is lost. */
bg_unlink(const char * filename)88 int bg_unlink(const char *filename) {
89     int fd = open(filename,O_RDONLY|O_NONBLOCK);
90     if (fd == -1) {
91         /* Can't open the file? Fall back to unlinking in the main thread. */
92         return unlink(filename);
93     } else {
94         /* The following unlink() removes the name but doesn't free the
95          * file contents because a process still has it open. */
96         int retval = unlink(filename);
97         if (retval == -1) {
98             /* If we got an unlink error, we just return it, closing the
99              * new reference we have to the file. */
100             int old_errno = errno;
101             close(fd);  /* This would overwrite our errno. So we saved it. */
102             errno = old_errno;
103             return -1;
104         }
105         bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)fd,NULL,NULL);
106         return 0; /* Success. */
107     }
108 }
109 
110 /* ---------------------------------- MASTER -------------------------------- */
111 
createReplicationBacklog(void)112 void createReplicationBacklog(void) {
113     serverAssert(server.repl_backlog == NULL);
114     server.repl_backlog = zmalloc(server.repl_backlog_size);
115     server.repl_backlog_histlen = 0;
116     server.repl_backlog_idx = 0;
117 
118     /* We don't have any data inside our buffer, but virtually the first
119      * byte we have is the next byte that will be generated for the
120      * replication stream. */
121     server.repl_backlog_off = server.master_repl_offset+1;
122 }
123 
124 /* This function is called when the user modifies the replication backlog
125  * size at runtime. It is up to the function to both update the
126  * server.repl_backlog_size and to resize the buffer and setup it so that
127  * it contains the same data as the previous one (possibly less data, but
128  * the most recent bytes, or the same data and more free space in case the
129  * buffer is enlarged). */
resizeReplicationBacklog(long long newsize)130 void resizeReplicationBacklog(long long newsize) {
131     if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
132         newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
133     if (server.repl_backlog_size == newsize) return;
134 
135     server.repl_backlog_size = newsize;
136     if (server.repl_backlog != NULL) {
137         /* What we actually do is to flush the old buffer and realloc a new
138          * empty one. It will refill with new data incrementally.
139          * The reason is that copying a few gigabytes adds latency and even
140          * worse often we need to alloc additional space before freeing the
141          * old buffer. */
142         zfree(server.repl_backlog);
143         server.repl_backlog = zmalloc(server.repl_backlog_size);
144         server.repl_backlog_histlen = 0;
145         server.repl_backlog_idx = 0;
146         /* Next byte we have is... the next since the buffer is empty. */
147         server.repl_backlog_off = server.master_repl_offset+1;
148     }
149 }
150 
freeReplicationBacklog(void)151 void freeReplicationBacklog(void) {
152     serverAssert(listLength(server.slaves) == 0);
153     zfree(server.repl_backlog);
154     server.repl_backlog = NULL;
155 }
156 
157 /* Add data to the replication backlog.
158  * This function also increments the global replication offset stored at
159  * server.master_repl_offset, because there is no case where we want to feed
160  * the backlog without incrementing the offset. */
feedReplicationBacklog(void * ptr,size_t len)161 void feedReplicationBacklog(void *ptr, size_t len) {
162     unsigned char *p = ptr;
163 
164     server.master_repl_offset += len;
165 
166     /* This is a circular buffer, so write as much data we can at every
167      * iteration and rewind the "idx" index if we reach the limit. */
168     while(len) {
169         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
170         if (thislen > len) thislen = len;
171         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
172         server.repl_backlog_idx += thislen;
173         if (server.repl_backlog_idx == server.repl_backlog_size)
174             server.repl_backlog_idx = 0;
175         len -= thislen;
176         p += thislen;
177         server.repl_backlog_histlen += thislen;
178     }
179     if (server.repl_backlog_histlen > server.repl_backlog_size)
180         server.repl_backlog_histlen = server.repl_backlog_size;
181     /* Set the offset of the first byte we have in the backlog. */
182     server.repl_backlog_off = server.master_repl_offset -
183                               server.repl_backlog_histlen + 1;
184 }
185 
186 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
187  * as input. */
feedReplicationBacklogWithObject(robj * o)188 void feedReplicationBacklogWithObject(robj *o) {
189     char llstr[LONG_STR_SIZE];
190     void *p;
191     size_t len;
192 
193     if (o->encoding == OBJ_ENCODING_INT) {
194         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
195         p = llstr;
196     } else {
197         len = sdslen(o->ptr);
198         p = o->ptr;
199     }
200     feedReplicationBacklog(p,len);
201 }
202 
203 /* Propagate write commands to slaves, and populate the replication backlog
204  * as well. This function is used if the instance is a master: we use
205  * the commands received by our clients in order to create the replication
206  * stream. Instead if the instance is a slave and has sub-slaves attached,
207  * we use replicationFeedSlavesFromMasterStream() */
replicationFeedSlaves(list * slaves,int dictid,robj ** argv,int argc)208 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
209     listNode *ln;
210     listIter li;
211     int j, len;
212     char llstr[LONG_STR_SIZE];
213 
214     /* If the instance is not a top level master, return ASAP: we'll just proxy
215      * the stream of data we receive from our master instead, in order to
216      * propagate *identical* replication stream. In this way this slave can
217      * advertise the same replication ID as the master (since it shares the
218      * master replication history and has the same backlog and offsets). */
219     if (server.masterhost != NULL) return;
220 
221     /* If there aren't slaves, and there is no backlog buffer to populate,
222      * we can return ASAP. */
223     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
224 
225     /* We can't have slaves attached and no backlog. */
226     serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
227 
228     /* Send SELECT command to every slave if needed. */
229     if (server.slaveseldb != dictid) {
230         robj *selectcmd;
231 
232         /* For a few DBs we have pre-computed SELECT command. */
233         if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
234             selectcmd = shared.select[dictid];
235         } else {
236             int dictid_len;
237 
238             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
239             selectcmd = createObject(OBJ_STRING,
240                 sdscatprintf(sdsempty(),
241                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
242                 dictid_len, llstr));
243         }
244 
245         /* Add the SELECT command into the backlog. */
246         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
247 
248         /* Send it to slaves. */
249         listRewind(slaves,&li);
250         while((ln = listNext(&li))) {
251             client *slave = ln->value;
252             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
253             addReply(slave,selectcmd);
254         }
255 
256         if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
257             decrRefCount(selectcmd);
258     }
259     server.slaveseldb = dictid;
260 
261     /* Write the command to the replication backlog if any. */
262     if (server.repl_backlog) {
263         char aux[LONG_STR_SIZE+3];
264 
265         /* Add the multi bulk reply length. */
266         aux[0] = '*';
267         len = ll2string(aux+1,sizeof(aux)-1,argc);
268         aux[len+1] = '\r';
269         aux[len+2] = '\n';
270         feedReplicationBacklog(aux,len+3);
271 
272         for (j = 0; j < argc; j++) {
273             long objlen = stringObjectLen(argv[j]);
274 
275             /* We need to feed the buffer with the object as a bulk reply
276              * not just as a plain string, so create the $..CRLF payload len
277              * and add the final CRLF */
278             aux[0] = '$';
279             len = ll2string(aux+1,sizeof(aux)-1,objlen);
280             aux[len+1] = '\r';
281             aux[len+2] = '\n';
282             feedReplicationBacklog(aux,len+3);
283             feedReplicationBacklogWithObject(argv[j]);
284             feedReplicationBacklog(aux+len+1,2);
285         }
286     }
287 
288     /* Write the command to every slave. */
289     listRewind(slaves,&li);
290     while((ln = listNext(&li))) {
291         client *slave = ln->value;
292 
293         /* Don't feed slaves that are still waiting for BGSAVE to start. */
294         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
295 
296         /* Feed slaves that are waiting for the initial SYNC (so these commands
297          * are queued in the output buffer until the initial SYNC completes),
298          * or are already in sync with the master. */
299 
300         /* Add the multi bulk length. */
301         addReplyArrayLen(slave,argc);
302 
303         /* Finally any additional argument that was not stored inside the
304          * static buffer if any (from j to argc). */
305         for (j = 0; j < argc; j++)
306             addReplyBulk(slave,argv[j]);
307     }
308 }
309 
310 /* This is a debugging function that gets called when we detect something
311  * wrong with the replication protocol: the goal is to peek into the
312  * replication backlog and show a few final bytes to make simpler to
313  * guess what kind of bug it could be. */
showLatestBacklog(void)314 void showLatestBacklog(void) {
315     if (server.repl_backlog == NULL) return;
316 
317     long long dumplen = 256;
318     if (server.repl_backlog_histlen < dumplen)
319         dumplen = server.repl_backlog_histlen;
320 
321     /* Identify the first byte to dump. */
322     long long idx =
323       (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
324        server.repl_backlog_size;
325 
326     /* Scan the circular buffer to collect 'dumplen' bytes. */
327     sds dump = sdsempty();
328     while(dumplen) {
329         long long thislen =
330             ((server.repl_backlog_size - idx) < dumplen) ?
331             (server.repl_backlog_size - idx) : dumplen;
332 
333         dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
334         dumplen -= thislen;
335         idx = 0;
336     }
337 
338     /* Finally log such bytes: this is vital debugging info to
339      * understand what happened. */
340     serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
341     sdsfree(dump);
342 }
343 
344 /* This function is used in order to proxy what we receive from our master
345  * to our sub-slaves. */
346 #include <ctype.h>
replicationFeedSlavesFromMasterStream(list * slaves,char * buf,size_t buflen)347 void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
348     listNode *ln;
349     listIter li;
350 
351     /* Debugging: this is handy to see the stream sent from master
352      * to slaves. Disabled with if(0). */
353     if (0) {
354         printf("%zu:",buflen);
355         for (size_t j = 0; j < buflen; j++) {
356             printf("%c", isprint(buf[j]) ? buf[j] : '.');
357         }
358         printf("\n");
359     }
360 
361     if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
362     listRewind(slaves,&li);
363     while((ln = listNext(&li))) {
364         client *slave = ln->value;
365 
366         /* Don't feed slaves that are still waiting for BGSAVE to start. */
367         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
368         addReplyProto(slave,buf,buflen);
369     }
370 }
371 
replicationFeedMonitors(client * c,list * monitors,int dictid,robj ** argv,int argc)372 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
373     listNode *ln;
374     listIter li;
375     int j;
376     sds cmdrepr = sdsnew("+");
377     robj *cmdobj;
378     struct timeval tv;
379 
380     gettimeofday(&tv,NULL);
381     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
382     if (c->flags & CLIENT_LUA) {
383         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
384     } else if (c->flags & CLIENT_UNIX_SOCKET) {
385         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
386     } else {
387         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
388     }
389 
390     for (j = 0; j < argc; j++) {
391         if (argv[j]->encoding == OBJ_ENCODING_INT) {
392             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
393         } else {
394             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
395                         sdslen(argv[j]->ptr));
396         }
397         if (j != argc-1)
398             cmdrepr = sdscatlen(cmdrepr," ",1);
399     }
400     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
401     cmdobj = createObject(OBJ_STRING,cmdrepr);
402 
403     listRewind(monitors,&li);
404     while((ln = listNext(&li))) {
405         client *monitor = ln->value;
406         addReply(monitor,cmdobj);
407     }
408     decrRefCount(cmdobj);
409 }
410 
411 /* Feed the slave 'c' with the replication backlog starting from the
412  * specified 'offset' up to the end of the backlog. */
addReplyReplicationBacklog(client * c,long long offset)413 long long addReplyReplicationBacklog(client *c, long long offset) {
414     long long j, skip, len;
415 
416     serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
417 
418     if (server.repl_backlog_histlen == 0) {
419         serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
420         return 0;
421     }
422 
423     serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
424              server.repl_backlog_size);
425     serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
426              server.repl_backlog_off);
427     serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
428              server.repl_backlog_histlen);
429     serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
430              server.repl_backlog_idx);
431 
432     /* Compute the amount of bytes we need to discard. */
433     skip = offset - server.repl_backlog_off;
434     serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
435 
436     /* Point j to the oldest byte, that is actually our
437      * server.repl_backlog_off byte. */
438     j = (server.repl_backlog_idx +
439         (server.repl_backlog_size-server.repl_backlog_histlen)) %
440         server.repl_backlog_size;
441     serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
442 
443     /* Discard the amount of data to seek to the specified 'offset'. */
444     j = (j + skip) % server.repl_backlog_size;
445 
446     /* Feed slave with data. Since it is a circular buffer we have to
447      * split the reply in two parts if we are cross-boundary. */
448     len = server.repl_backlog_histlen - skip;
449     serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
450     while(len) {
451         long long thislen =
452             ((server.repl_backlog_size - j) < len) ?
453             (server.repl_backlog_size - j) : len;
454 
455         serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
456         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
457         len -= thislen;
458         j = 0;
459     }
460     return server.repl_backlog_histlen - skip;
461 }
462 
463 /* Return the offset to provide as reply to the PSYNC command received
464  * from the slave. The returned value is only valid immediately after
465  * the BGSAVE process started and before executing any other command
466  * from clients. */
getPsyncInitialOffset(void)467 long long getPsyncInitialOffset(void) {
468     return server.master_repl_offset;
469 }
470 
471 /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
472  * as a side effect setup the slave for a full sync in different ways:
473  *
474  * 1) Remember, into the slave client structure, the replication offset
475  *    we sent here, so that if new slaves will later attach to the same
476  *    background RDB saving process (by duplicating this client output
477  *    buffer), we can get the right offset from this slave.
478  * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
479  *    we start accumulating differences from this point.
480  * 3) Force the replication stream to re-emit a SELECT statement so
481  *    the new slave incremental differences will start selecting the
482  *    right database number.
483  *
484  * Normally this function should be called immediately after a successful
485  * BGSAVE for replication was started, or when there is one already in
486  * progress that we attached our slave to. */
replicationSetupSlaveForFullResync(client * slave,long long offset)487 int replicationSetupSlaveForFullResync(client *slave, long long offset) {
488     char buf[128];
489     int buflen;
490 
491     slave->psync_initial_offset = offset;
492     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
493     /* We are going to accumulate the incremental changes for this
494      * slave as well. Set slaveseldb to -1 in order to force to re-emit
495      * a SELECT statement in the replication stream. */
496     server.slaveseldb = -1;
497 
498     /* Don't send this reply to slaves that approached us with
499      * the old SYNC command. */
500     if (!(slave->flags & CLIENT_PRE_PSYNC)) {
501         buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
502                           server.replid,offset);
503         if (connWrite(slave->conn,buf,buflen) != buflen) {
504             freeClientAsync(slave);
505             return C_ERR;
506         }
507     }
508     return C_OK;
509 }
510 
511 /* This function handles the PSYNC command from the point of view of a
512  * master receiving a request for partial resynchronization.
513  *
514  * On success return C_OK, otherwise C_ERR is returned and we proceed
515  * with the usual full resync. */
masterTryPartialResynchronization(client * c)516 int masterTryPartialResynchronization(client *c) {
517     long long psync_offset, psync_len;
518     char *master_replid = c->argv[1]->ptr;
519     char buf[128];
520     int buflen;
521 
522     /* Parse the replication offset asked by the slave. Go to full sync
523      * on parse error: this should never happen but we try to handle
524      * it in a robust way compared to aborting. */
525     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
526        C_OK) goto need_full_resync;
527 
528     /* Is the replication ID of this master the same advertised by the wannabe
529      * slave via PSYNC? If the replication ID changed this master has a
530      * different replication history, and there is no way to continue.
531      *
532      * Note that there are two potentially valid replication IDs: the ID1
533      * and the ID2. The ID2 however is only valid up to a specific offset. */
534     if (strcasecmp(master_replid, server.replid) &&
535         (strcasecmp(master_replid, server.replid2) ||
536          psync_offset > server.second_replid_offset))
537     {
538         /* Replid "?" is used by slaves that want to force a full resync. */
539         if (master_replid[0] != '?') {
540             if (strcasecmp(master_replid, server.replid) &&
541                 strcasecmp(master_replid, server.replid2))
542             {
543                 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
544                     "Replication ID mismatch (Replica asked for '%s', my "
545                     "replication IDs are '%s' and '%s')",
546                     master_replid, server.replid, server.replid2);
547             } else {
548                 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
549                     "Requested offset for second ID was %lld, but I can reply "
550                     "up to %lld", psync_offset, server.second_replid_offset);
551             }
552         } else {
553             serverLog(LL_NOTICE,"Full resync requested by replica %s",
554                 replicationGetSlaveName(c));
555         }
556         goto need_full_resync;
557     }
558 
559     /* We still have the data our slave is asking for? */
560     if (!server.repl_backlog ||
561         psync_offset < server.repl_backlog_off ||
562         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
563     {
564         serverLog(LL_NOTICE,
565             "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
566         if (psync_offset > server.master_repl_offset) {
567             serverLog(LL_WARNING,
568                 "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
569         }
570         goto need_full_resync;
571     }
572 
573     /* If we reached this point, we are able to perform a partial resync:
574      * 1) Set client state to make it a slave.
575      * 2) Inform the client we can continue with +CONTINUE
576      * 3) Send the backlog data (from the offset to the end) to the slave. */
577     c->flags |= CLIENT_SLAVE;
578     c->replstate = SLAVE_STATE_ONLINE;
579     c->repl_ack_time = server.unixtime;
580     c->repl_put_online_on_ack = 0;
581     listAddNodeTail(server.slaves,c);
582     /* We can't use the connection buffers since they are used to accumulate
583      * new commands at this stage. But we are sure the socket send buffer is
584      * empty so this write will never fail actually. */
585     if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
586         buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
587     } else {
588         buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
589     }
590     if (connWrite(c->conn,buf,buflen) != buflen) {
591         freeClientAsync(c);
592         return C_OK;
593     }
594     psync_len = addReplyReplicationBacklog(c,psync_offset);
595     serverLog(LL_NOTICE,
596         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
597             replicationGetSlaveName(c),
598             psync_len, psync_offset);
599     /* Note that we don't need to set the selected DB at server.slaveseldb
600      * to -1 to force the master to emit SELECT, since the slave already
601      * has this state from the previous connection with the master. */
602 
603     refreshGoodSlavesCount();
604 
605     /* Fire the replica change modules event. */
606     moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
607                           REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
608                           NULL);
609 
610     return C_OK; /* The caller can return, no full resync needed. */
611 
612 need_full_resync:
613     /* We need a full resync for some reason... Note that we can't
614      * reply to PSYNC right now if a full SYNC is needed. The reply
615      * must include the master offset at the time the RDB file we transfer
616      * is generated, so we need to delay the reply to that moment. */
617     return C_ERR;
618 }
619 
620 /* Start a BGSAVE for replication goals, which is, selecting the disk or
621  * socket target depending on the configuration, and making sure that
622  * the script cache is flushed before to start.
623  *
624  * The mincapa argument is the bitwise AND among all the slaves capabilities
625  * of the slaves waiting for this BGSAVE, so represents the slave capabilities
626  * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
627  *
628  * Side effects, other than starting a BGSAVE:
629  *
630  * 1) Handle the slaves in WAIT_START state, by preparing them for a full
631  *    sync if the BGSAVE was successfully started, or sending them an error
632  *    and dropping them from the list of slaves.
633  *
634  * 2) Flush the Lua scripting script cache if the BGSAVE was actually
635  *    started.
636  *
637  * Returns C_OK on success or C_ERR otherwise. */
startBgsaveForReplication(int mincapa)638 int startBgsaveForReplication(int mincapa) {
639     int retval;
640     int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
641     listIter li;
642     listNode *ln;
643 
644     serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
645         socket_target ? "replicas sockets" : "disk");
646 
647     rdbSaveInfo rsi, *rsiptr;
648     rsiptr = rdbPopulateSaveInfo(&rsi);
649     /* Only do rdbSave* when rsiptr is not NULL,
650      * otherwise slave will miss repl-stream-db. */
651     if (rsiptr) {
652         if (socket_target)
653             retval = rdbSaveToSlavesSockets(rsiptr);
654         else
655             retval = rdbSaveBackground(server.rdb_filename,rsiptr);
656     } else {
657         serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
658         retval = C_ERR;
659     }
660 
661     /* If we succeeded to start a BGSAVE with disk target, let's remember
662      * this fact, so that we can later delete the file if needed. Note
663      * that we don't set the flag to 1 if the feature is disabled, otherwise
664      * it would never be cleared: the file is not deleted. This way if
665      * the user enables it later with CONFIG SET, we are fine. */
666     if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
667         RDBGeneratedByReplication = 1;
668 
669     /* If we failed to BGSAVE, remove the slaves waiting for a full
670      * resynchronization from the list of slaves, inform them with
671      * an error about what happened, close the connection ASAP. */
672     if (retval == C_ERR) {
673         serverLog(LL_WARNING,"BGSAVE for replication failed");
674         listRewind(server.slaves,&li);
675         while((ln = listNext(&li))) {
676             client *slave = ln->value;
677 
678             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
679                 slave->replstate = REPL_STATE_NONE;
680                 slave->flags &= ~CLIENT_SLAVE;
681                 listDelNode(server.slaves,ln);
682                 addReplyError(slave,
683                     "BGSAVE failed, replication can't continue");
684                 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
685             }
686         }
687         return retval;
688     }
689 
690     /* If the target is socket, rdbSaveToSlavesSockets() already setup
691      * the slaves for a full resync. Otherwise for disk target do it now.*/
692     if (!socket_target) {
693         listRewind(server.slaves,&li);
694         while((ln = listNext(&li))) {
695             client *slave = ln->value;
696 
697             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
698                     replicationSetupSlaveForFullResync(slave,
699                             getPsyncInitialOffset());
700             }
701         }
702     }
703 
704     /* Flush the script cache, since we need that slave differences are
705      * accumulated without requiring slaves to match our cached scripts. */
706     if (retval == C_OK) replicationScriptCacheFlush();
707     return retval;
708 }
709 
710 /* SYNC and PSYNC command implementation. */
syncCommand(client * c)711 void syncCommand(client *c) {
712     /* ignore SYNC if already slave or in monitor mode */
713     if (c->flags & CLIENT_SLAVE) return;
714 
715     /* Refuse SYNC requests if we are a slave but the link with our master
716      * is not ok... */
717     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
718         addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
719         return;
720     }
721 
722     /* SYNC can't be issued when the server has pending data to send to
723      * the client about already issued commands. We need a fresh reply
724      * buffer registering the differences between the BGSAVE and the current
725      * dataset, so that we can copy to other slaves if needed. */
726     if (clientHasPendingReplies(c)) {
727         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
728         return;
729     }
730 
731     serverLog(LL_NOTICE,"Replica %s asks for synchronization",
732         replicationGetSlaveName(c));
733 
734     /* Try a partial resynchronization if this is a PSYNC command.
735      * If it fails, we continue with usual full resynchronization, however
736      * when this happens masterTryPartialResynchronization() already
737      * replied with:
738      *
739      * +FULLRESYNC <replid> <offset>
740      *
741      * So the slave knows the new replid and offset to try a PSYNC later
742      * if the connection with the master is lost. */
743     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
744         if (masterTryPartialResynchronization(c) == C_OK) {
745             server.stat_sync_partial_ok++;
746             return; /* No full resync needed, return. */
747         } else {
748             char *master_replid = c->argv[1]->ptr;
749 
750             /* Increment stats for failed PSYNCs, but only if the
751              * replid is not "?", as this is used by slaves to force a full
752              * resync on purpose when they are not albe to partially
753              * resync. */
754             if (master_replid[0] != '?') server.stat_sync_partial_err++;
755         }
756     } else {
757         /* If a slave uses SYNC, we are dealing with an old implementation
758          * of the replication protocol (like redis-cli --slave). Flag the client
759          * so that we don't expect to receive REPLCONF ACK feedbacks. */
760         c->flags |= CLIENT_PRE_PSYNC;
761     }
762 
763     /* Full resynchronization. */
764     server.stat_sync_full++;
765 
766     /* Setup the slave as one waiting for BGSAVE to start. The following code
767      * paths will change the state if we handle the slave differently. */
768     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
769     if (server.repl_disable_tcp_nodelay)
770         connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
771     c->repldbfd = -1;
772     c->flags |= CLIENT_SLAVE;
773     listAddNodeTail(server.slaves,c);
774 
775     /* Create the replication backlog if needed. */
776     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
777         /* When we create the backlog from scratch, we always use a new
778          * replication ID and clear the ID2, since there is no valid
779          * past history. */
780         changeReplicationId();
781         clearReplicationId2();
782         createReplicationBacklog();
783         serverLog(LL_NOTICE,"Replication backlog created, my new "
784                             "replication IDs are '%s' and '%s'",
785                             server.replid, server.replid2);
786     }
787 
788     /* CASE 1: BGSAVE is in progress, with disk target. */
789     if (server.rdb_child_pid != -1 &&
790         server.rdb_child_type == RDB_CHILD_TYPE_DISK)
791     {
792         /* Ok a background save is in progress. Let's check if it is a good
793          * one for replication, i.e. if there is another slave that is
794          * registering differences since the server forked to save. */
795         client *slave;
796         listNode *ln;
797         listIter li;
798 
799         listRewind(server.slaves,&li);
800         while((ln = listNext(&li))) {
801             slave = ln->value;
802             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
803         }
804         /* To attach this slave, we check that it has at least all the
805          * capabilities of the slave that triggered the current BGSAVE. */
806         if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
807             /* Perfect, the server is already registering differences for
808              * another slave. Set the right state, and copy the buffer. */
809             copyClientOutputBuffer(c,slave);
810             replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
811             serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
812         } else {
813             /* No way, we need to wait for the next BGSAVE in order to
814              * register differences. */
815             serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
816         }
817 
818     /* CASE 2: BGSAVE is in progress, with socket target. */
819     } else if (server.rdb_child_pid != -1 &&
820                server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
821     {
822         /* There is an RDB child process but it is writing directly to
823          * children sockets. We need to wait for the next BGSAVE
824          * in order to synchronize. */
825         serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
826 
827     /* CASE 3: There is no BGSAVE is progress. */
828     } else {
829         if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
830             /* Diskless replication RDB child is created inside
831              * replicationCron() since we want to delay its start a
832              * few seconds to wait for more slaves to arrive. */
833             if (server.repl_diskless_sync_delay)
834                 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
835         } else {
836             /* Target is disk (or the slave is not capable of supporting
837              * diskless replication) and we don't have a BGSAVE in progress,
838              * let's start one. */
839             if (!hasActiveChildProcess()) {
840                 startBgsaveForReplication(c->slave_capa);
841             } else {
842                 serverLog(LL_NOTICE,
843                     "No BGSAVE in progress, but another BG operation is active. "
844                     "BGSAVE for replication delayed");
845             }
846         }
847     }
848     return;
849 }
850 
851 /* REPLCONF <option> <value> <option> <value> ...
852  * This command is used by a slave in order to configure the replication
853  * process before starting it with the SYNC command.
854  *
855  * Currently the only use of this command is to communicate to the master
856  * what is the listening port of the Slave redis instance, so that the
857  * master can accurately list slaves and their listening ports in
858  * the INFO output.
859  *
860  * In the future the same command can be used in order to configure
861  * the replication to initiate an incremental replication instead of a
862  * full resync. */
replconfCommand(client * c)863 void replconfCommand(client *c) {
864     int j;
865 
866     if ((c->argc % 2) == 0) {
867         /* Number of arguments must be odd to make sure that every
868          * option has a corresponding value. */
869         addReply(c,shared.syntaxerr);
870         return;
871     }
872 
873     /* Process every option-value pair. */
874     for (j = 1; j < c->argc; j+=2) {
875         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
876             long port;
877 
878             if ((getLongFromObjectOrReply(c,c->argv[j+1],
879                     &port,NULL) != C_OK))
880                 return;
881             c->slave_listening_port = port;
882         } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
883             sds ip = c->argv[j+1]->ptr;
884             if (sdslen(ip) < sizeof(c->slave_ip)) {
885                 memcpy(c->slave_ip,ip,sdslen(ip)+1);
886             } else {
887                 addReplyErrorFormat(c,"REPLCONF ip-address provided by "
888                     "replica instance is too long: %zd bytes", sdslen(ip));
889                 return;
890             }
891         } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
892             /* Ignore capabilities not understood by this master. */
893             if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
894                 c->slave_capa |= SLAVE_CAPA_EOF;
895             else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
896                 c->slave_capa |= SLAVE_CAPA_PSYNC2;
897         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
898             /* REPLCONF ACK is used by slave to inform the master the amount
899              * of replication stream that it processed so far. It is an
900              * internal only command that normal clients should never use. */
901             long long offset;
902 
903             if (!(c->flags & CLIENT_SLAVE)) return;
904             if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
905                 return;
906             if (offset > c->repl_ack_off)
907                 c->repl_ack_off = offset;
908             c->repl_ack_time = server.unixtime;
909             /* If this was a diskless replication, we need to really put
910              * the slave online when the first ACK is received (which
911              * confirms slave is online and ready to get more data). This
912              * allows for simpler and less CPU intensive EOF detection
913              * when streaming RDB files.
914              * There's a chance the ACK got to us before we detected that the
915              * bgsave is done (since that depends on cron ticks), so run a
916              * quick check first (instead of waiting for the next ACK. */
917             if (server.rdb_child_pid != -1 && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
918                 checkChildrenDone();
919             if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
920                 putSlaveOnline(c);
921             /* Note: this command does not reply anything! */
922             return;
923         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
924             /* REPLCONF GETACK is used in order to request an ACK ASAP
925              * to the slave. */
926             if (server.masterhost && server.master) replicationSendAck();
927             return;
928         } else {
929             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
930                 (char*)c->argv[j]->ptr);
931             return;
932         }
933     }
934     addReply(c,shared.ok);
935 }
936 
937 /* This function puts a replica in the online state, and should be called just
938  * after a replica received the RDB file for the initial synchronization, and
939  * we are finally ready to send the incremental stream of commands.
940  *
941  * It does a few things:
942  *
943  * 1) Put the slave in ONLINE state. Note that the function may also be called
944  *    for a replicas that are already in ONLINE state, but having the flag
945  *    repl_put_online_on_ack set to true: we still have to install the write
946  *    handler in that case. This function will take care of that.
947  * 2) Make sure the writable event is re-installed, since calling the SYNC
948  *    command disables it, so that we can accumulate output buffer without
949  *    sending it to the replica.
950  * 3) Update the count of "good replicas". */
putSlaveOnline(client * slave)951 void putSlaveOnline(client *slave) {
952     slave->replstate = SLAVE_STATE_ONLINE;
953     slave->repl_put_online_on_ack = 0;
954     slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
955     if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
956         serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
957         freeClient(slave);
958         return;
959     }
960     refreshGoodSlavesCount();
961     /* Fire the replica change modules event. */
962     moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
963                           REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
964                           NULL);
965     serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
966         replicationGetSlaveName(slave));
967 }
968 
969 /* We call this function periodically to remove an RDB file that was
970  * generated because of replication, in an instance that is otherwise
971  * without any persistence. We don't want instances without persistence
972  * to take RDB files around, this violates certain policies in certain
973  * environments. */
removeRDBUsedToSyncReplicas(void)974 void removeRDBUsedToSyncReplicas(void) {
975     /* If the feature is disabled, return ASAP but also clear the
976      * RDBGeneratedByReplication flag in case it was set. Otherwise if the
977      * feature was enabled, but gets disabled later with CONFIG SET, the
978      * flag may remain set to one: then next time the feature is re-enabled
979      * via CONFIG SET we have have it set even if no RDB was generated
980      * because of replication recently. */
981     if (!server.rdb_del_sync_files) {
982         RDBGeneratedByReplication = 0;
983         return;
984     }
985 
986     if (allPersistenceDisabled() && RDBGeneratedByReplication) {
987         client *slave;
988         listNode *ln;
989         listIter li;
990 
991         int delrdb = 1;
992         listRewind(server.slaves,&li);
993         while((ln = listNext(&li))) {
994             slave = ln->value;
995             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
996                 slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END ||
997                 slave->replstate == SLAVE_STATE_SEND_BULK)
998             {
999                 delrdb = 0;
1000                 break; /* No need to check the other replicas. */
1001             }
1002         }
1003         if (delrdb) {
1004             struct stat sb;
1005             if (lstat(server.rdb_filename,&sb) != -1) {
1006                 RDBGeneratedByReplication = 0;
1007                 serverLog(LL_NOTICE,
1008                     "Removing the RDB file used to feed replicas "
1009                     "in a persistence-less instance");
1010                 bg_unlink(server.rdb_filename);
1011             }
1012         }
1013     }
1014 }
1015 
sendBulkToSlave(connection * conn)1016 void sendBulkToSlave(connection *conn) {
1017     client *slave = connGetPrivateData(conn);
1018     char buf[PROTO_IOBUF_LEN];
1019     ssize_t nwritten, buflen;
1020 
1021     /* Before sending the RDB file, we send the preamble as configured by the
1022      * replication process. Currently the preamble is just the bulk count of
1023      * the file in the form "$<length>\r\n". */
1024     if (slave->replpreamble) {
1025         nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
1026         if (nwritten == -1) {
1027             serverLog(LL_VERBOSE,
1028                 "Write error sending RDB preamble to replica: %s",
1029                 connGetLastError(conn));
1030             freeClient(slave);
1031             return;
1032         }
1033         server.stat_net_output_bytes += nwritten;
1034         sdsrange(slave->replpreamble,nwritten,-1);
1035         if (sdslen(slave->replpreamble) == 0) {
1036             sdsfree(slave->replpreamble);
1037             slave->replpreamble = NULL;
1038             /* fall through sending data. */
1039         } else {
1040             return;
1041         }
1042     }
1043 
1044     /* If the preamble was already transferred, send the RDB bulk data. */
1045     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
1046     buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
1047     if (buflen <= 0) {
1048         serverLog(LL_WARNING,"Read error sending DB to replica: %s",
1049             (buflen == 0) ? "premature EOF" : strerror(errno));
1050         freeClient(slave);
1051         return;
1052     }
1053     if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
1054         if (connGetState(conn) != CONN_STATE_CONNECTED) {
1055             serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1056                 connGetLastError(conn));
1057             freeClient(slave);
1058         }
1059         return;
1060     }
1061     slave->repldboff += nwritten;
1062     server.stat_net_output_bytes += nwritten;
1063     if (slave->repldboff == slave->repldbsize) {
1064         close(slave->repldbfd);
1065         slave->repldbfd = -1;
1066         connSetWriteHandler(slave->conn,NULL);
1067         putSlaveOnline(slave);
1068     }
1069 }
1070 
1071 /* Remove one write handler from the list of connections waiting to be writable
1072  * during rdb pipe transfer. */
rdbPipeWriteHandlerConnRemoved(struct connection * conn)1073 void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
1074     if (!connHasWriteHandler(conn))
1075         return;
1076     connSetWriteHandler(conn, NULL);
1077     client *slave = connGetPrivateData(conn);
1078     slave->repl_last_partial_write = 0;
1079     server.rdb_pipe_numconns_writing--;
1080     /* if there are no more writes for now for this conn, or write error: */
1081     if (server.rdb_pipe_numconns_writing == 0) {
1082         if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
1083             serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
1084         }
1085     }
1086 }
1087 
1088 /* Called in diskless master during transfer of data from the rdb pipe, when
1089  * the replica becomes writable again. */
rdbPipeWriteHandler(struct connection * conn)1090 void rdbPipeWriteHandler(struct connection *conn) {
1091     serverAssert(server.rdb_pipe_bufflen>0);
1092     client *slave = connGetPrivateData(conn);
1093     int nwritten;
1094     if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
1095                               server.rdb_pipe_bufflen - slave->repldboff)) == -1)
1096     {
1097         if (connGetState(conn) == CONN_STATE_CONNECTED)
1098             return; /* equivalent to EAGAIN */
1099         serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1100             connGetLastError(conn));
1101         freeClient(slave);
1102         return;
1103     } else {
1104         slave->repldboff += nwritten;
1105         server.stat_net_output_bytes += nwritten;
1106         if (slave->repldboff < server.rdb_pipe_bufflen) {
1107             slave->repl_last_partial_write = server.unixtime;
1108             return; /* more data to write.. */
1109         }
1110     }
1111     rdbPipeWriteHandlerConnRemoved(conn);
1112 }
1113 
1114 /* Called in diskless master, when there's data to read from the child's rdb pipe */
rdbPipeReadHandler(struct aeEventLoop * eventLoop,int fd,void * clientData,int mask)1115 void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
1116     UNUSED(mask);
1117     UNUSED(clientData);
1118     UNUSED(eventLoop);
1119     int i;
1120     if (!server.rdb_pipe_buff)
1121         server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
1122     serverAssert(server.rdb_pipe_numconns_writing==0);
1123 
1124     while (1) {
1125         server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
1126         if (server.rdb_pipe_bufflen < 0) {
1127             if (errno == EAGAIN || errno == EWOULDBLOCK)
1128                 return;
1129             serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
1130             for (i=0; i < server.rdb_pipe_numconns; i++) {
1131                 connection *conn = server.rdb_pipe_conns[i];
1132                 if (!conn)
1133                     continue;
1134                 client *slave = connGetPrivateData(conn);
1135                 freeClient(slave);
1136                 server.rdb_pipe_conns[i] = NULL;
1137             }
1138             killRDBChild();
1139             return;
1140         }
1141 
1142         if (server.rdb_pipe_bufflen == 0) {
1143             /* EOF - write end was closed. */
1144             int stillUp = 0;
1145             aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1146             for (i=0; i < server.rdb_pipe_numconns; i++)
1147             {
1148                 connection *conn = server.rdb_pipe_conns[i];
1149                 if (!conn)
1150                     continue;
1151                 stillUp++;
1152             }
1153             serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
1154             /* Now that the replicas have finished reading, notify the child that it's safe to exit.
1155              * When the server detectes the child has exited, it can mark the replica as online, and
1156              * start streaming the replication buffers. */
1157             close(server.rdb_child_exit_pipe);
1158             server.rdb_child_exit_pipe = -1;
1159             return;
1160         }
1161 
1162         int stillAlive = 0;
1163         for (i=0; i < server.rdb_pipe_numconns; i++)
1164         {
1165             int nwritten;
1166             connection *conn = server.rdb_pipe_conns[i];
1167             if (!conn)
1168                 continue;
1169 
1170             client *slave = connGetPrivateData(conn);
1171             if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
1172                 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1173                     serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
1174                         connGetLastError(conn));
1175                     freeClient(slave);
1176                     server.rdb_pipe_conns[i] = NULL;
1177                     continue;
1178                 }
1179                 /* An error and still in connected state, is equivalent to EAGAIN */
1180                 slave->repldboff = 0;
1181             } else {
1182                 slave->repldboff = nwritten;
1183                 server.stat_net_output_bytes += nwritten;
1184             }
1185             /* If we were unable to write all the data to one of the replicas,
1186              * setup write handler (and disable pipe read handler, below) */
1187             if (nwritten != server.rdb_pipe_bufflen) {
1188                 slave->repl_last_partial_write = server.unixtime;
1189                 server.rdb_pipe_numconns_writing++;
1190                 connSetWriteHandler(conn, rdbPipeWriteHandler);
1191             }
1192             stillAlive++;
1193         }
1194 
1195         if (stillAlive == 0) {
1196             serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
1197             killRDBChild();
1198         }
1199         /*  Remove the pipe read handler if at least one write handler was set. */
1200         if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
1201             aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1202             break;
1203         }
1204     }
1205 }
1206 
1207 /* This function is called at the end of every background saving,
1208  * or when the replication RDB transfer strategy is modified from
1209  * disk to socket or the other way around.
1210  *
1211  * The goal of this function is to handle slaves waiting for a successful
1212  * background saving in order to perform non-blocking synchronization, and
1213  * to schedule a new BGSAVE if there are slaves that attached while a
1214  * BGSAVE was in progress, but it was not a good one for replication (no
1215  * other slave was accumulating differences).
1216  *
1217  * The argument bgsaveerr is C_OK if the background saving succeeded
1218  * otherwise C_ERR is passed to the function.
1219  * The 'type' argument is the type of the child that terminated
1220  * (if it had a disk or socket target). */
updateSlavesWaitingBgsave(int bgsaveerr,int type)1221 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
1222     listNode *ln;
1223     int startbgsave = 0;
1224     int mincapa = -1;
1225     listIter li;
1226 
1227     listRewind(server.slaves,&li);
1228     while((ln = listNext(&li))) {
1229         client *slave = ln->value;
1230 
1231         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
1232             startbgsave = 1;
1233             mincapa = (mincapa == -1) ? slave->slave_capa :
1234                                         (mincapa & slave->slave_capa);
1235         } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
1236             struct redis_stat buf;
1237 
1238             if (bgsaveerr != C_OK) {
1239                 freeClient(slave);
1240                 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
1241                 continue;
1242             }
1243 
1244             /* If this was an RDB on disk save, we have to prepare to send
1245              * the RDB from disk to the slave socket. Otherwise if this was
1246              * already an RDB -> Slaves socket transfer, used in the case of
1247              * diskless replication, our work is trivial, we can just put
1248              * the slave online. */
1249             if (type == RDB_CHILD_TYPE_SOCKET) {
1250                 serverLog(LL_NOTICE,
1251                     "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
1252                         replicationGetSlaveName(slave));
1253                 /* Note: we wait for a REPLCONF ACK message from the replica in
1254                  * order to really put it online (install the write handler
1255                  * so that the accumulated data can be transferred). However
1256                  * we change the replication state ASAP, since our slave
1257                  * is technically online now.
1258                  *
1259                  * So things work like that:
1260                  *
1261                  * 1. We end trasnferring the RDB file via socket.
1262                  * 2. The replica is put ONLINE but the write handler
1263                  *    is not installed.
1264                  * 3. The replica however goes really online, and pings us
1265                  *    back via REPLCONF ACK commands.
1266                  * 4. Now we finally install the write handler, and send
1267                  *    the buffers accumulated so far to the replica.
1268                  *
1269                  * But why we do that? Because the replica, when we stream
1270                  * the RDB directly via the socket, must detect the RDB
1271                  * EOF (end of file), that is a special random string at the
1272                  * end of the RDB (for streamed RDBs we don't know the length
1273                  * in advance). Detecting such final EOF string is much
1274                  * simpler and less CPU intensive if no more data is sent
1275                  * after such final EOF. So we don't want to glue the end of
1276                  * the RDB trasfer with the start of the other replication
1277                  * data. */
1278                 slave->replstate = SLAVE_STATE_ONLINE;
1279                 slave->repl_put_online_on_ack = 1;
1280                 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
1281             } else {
1282                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
1283                     redis_fstat(slave->repldbfd,&buf) == -1) {
1284                     freeClient(slave);
1285                     serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
1286                     continue;
1287                 }
1288                 slave->repldboff = 0;
1289                 slave->repldbsize = buf.st_size;
1290                 slave->replstate = SLAVE_STATE_SEND_BULK;
1291                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
1292                     (unsigned long long) slave->repldbsize);
1293 
1294                 connSetWriteHandler(slave->conn,NULL);
1295                 if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) {
1296                     freeClient(slave);
1297                     continue;
1298                 }
1299             }
1300         }
1301     }
1302     if (startbgsave) startBgsaveForReplication(mincapa);
1303 }
1304 
1305 /* Change the current instance replication ID with a new, random one.
1306  * This will prevent successful PSYNCs between this master and other
1307  * slaves, so the command should be called when something happens that
1308  * alters the current story of the dataset. */
changeReplicationId(void)1309 void changeReplicationId(void) {
1310     getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
1311     server.replid[CONFIG_RUN_ID_SIZE] = '\0';
1312 }
1313 
1314 /* Clear (invalidate) the secondary replication ID. This happens, for
1315  * example, after a full resynchronization, when we start a new replication
1316  * history. */
clearReplicationId2(void)1317 void clearReplicationId2(void) {
1318     memset(server.replid2,'0',sizeof(server.replid));
1319     server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
1320     server.second_replid_offset = -1;
1321 }
1322 
1323 /* Use the current replication ID / offset as secondary replication
1324  * ID, and change the current one in order to start a new history.
1325  * This should be used when an instance is switched from slave to master
1326  * so that it can serve PSYNC requests performed using the master
1327  * replication ID. */
shiftReplicationId(void)1328 void shiftReplicationId(void) {
1329     memcpy(server.replid2,server.replid,sizeof(server.replid));
1330     /* We set the second replid offset to the master offset + 1, since
1331      * the slave will ask for the first byte it has not yet received, so
1332      * we need to add one to the offset: for example if, as a slave, we are
1333      * sure we have the same history as the master for 50 bytes, after we
1334      * are turned into a master, we can accept a PSYNC request with offset
1335      * 51, since the slave asking has the same history up to the 50th
1336      * byte, and is asking for the new bytes starting at offset 51. */
1337     server.second_replid_offset = server.master_repl_offset+1;
1338     changeReplicationId();
1339     serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
1340 }
1341 
1342 /* ----------------------------------- SLAVE -------------------------------- */
1343 
1344 /* Returns 1 if the given replication state is a handshake state,
1345  * 0 otherwise. */
slaveIsInHandshakeState(void)1346 int slaveIsInHandshakeState(void) {
1347     return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
1348            server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
1349 }
1350 
1351 /* Avoid the master to detect the slave is timing out while loading the
1352  * RDB file in initial synchronization. We send a single newline character
1353  * that is valid protocol but is guaranteed to either be sent entirely or
1354  * not, since the byte is indivisible.
1355  *
1356  * The function is called in two contexts: while we flush the current
1357  * data with emptyDb(), and while we load the new data received as an
1358  * RDB file from the master. */
replicationSendNewlineToMaster(void)1359 void replicationSendNewlineToMaster(void) {
1360     static time_t newline_sent;
1361     if (time(NULL) != newline_sent) {
1362         newline_sent = time(NULL);
1363         /* Pinging back in this stage is best-effort. */
1364         if (server.repl_transfer_s) connWrite(server.repl_transfer_s, "\n", 1);
1365     }
1366 }
1367 
1368 /* Callback used by emptyDb() while flushing away old data to load
1369  * the new dataset received by the master. */
replicationEmptyDbCallback(void * privdata)1370 void replicationEmptyDbCallback(void *privdata) {
1371     UNUSED(privdata);
1372     if (server.repl_state == REPL_STATE_TRANSFER)
1373         replicationSendNewlineToMaster();
1374 }
1375 
1376 /* Once we have a link with the master and the synchronization was
1377  * performed, this function materializes the master client we store
1378  * at server.master, starting from the specified file descriptor. */
replicationCreateMasterClient(connection * conn,int dbid)1379 void replicationCreateMasterClient(connection *conn, int dbid) {
1380     server.master = createClient(conn);
1381     if (conn)
1382         connSetReadHandler(server.master->conn, readQueryFromClient);
1383     server.master->flags |= CLIENT_MASTER;
1384     server.master->authenticated = 1;
1385     server.master->reploff = server.master_initial_offset;
1386     server.master->read_reploff = server.master->reploff;
1387     server.master->user = NULL; /* This client can do everything. */
1388     memcpy(server.master->replid, server.master_replid,
1389         sizeof(server.master_replid));
1390     /* If master offset is set to -1, this master is old and is not
1391      * PSYNC capable, so we flag it accordingly. */
1392     if (server.master->reploff == -1)
1393         server.master->flags |= CLIENT_PRE_PSYNC;
1394     if (dbid != -1) selectDb(server.master,dbid);
1395 }
1396 
1397 /* This function will try to re-enable the AOF file after the
1398  * master-replica synchronization: if it fails after multiple attempts
1399  * the replica cannot be considered reliable and exists with an
1400  * error. */
restartAOFAfterSYNC()1401 void restartAOFAfterSYNC() {
1402     unsigned int tries, max_tries = 10;
1403     for (tries = 0; tries < max_tries; ++tries) {
1404         if (startAppendOnly() == C_OK) break;
1405         serverLog(LL_WARNING,
1406             "Failed enabling the AOF after successful master synchronization! "
1407             "Trying it again in one second.");
1408         sleep(1);
1409     }
1410     if (tries == max_tries) {
1411         serverLog(LL_WARNING,
1412             "FATAL: this replica instance finished the synchronization with "
1413             "its master, but the AOF can't be turned on. Exiting now.");
1414         exit(1);
1415     }
1416 }
1417 
useDisklessLoad()1418 static int useDisklessLoad() {
1419     /* compute boolean decision to use diskless load */
1420     int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
1421            (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
1422     /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
1423     if (enabled && !moduleAllDatatypesHandleErrors()) {
1424         serverLog(LL_WARNING,
1425             "Skipping diskless-load because there are modules that don't handle read errors.");
1426         enabled = 0;
1427     }
1428     return enabled;
1429 }
1430 
1431 /* Helper function for readSyncBulkPayload() to make backups of the current
1432  * databases before socket-loading the new ones. The backups may be restored
1433  * by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
disklessLoadMakeBackup(void)1434 dbBackup *disklessLoadMakeBackup(void) {
1435     return backupDb();
1436 }
1437 
1438 /* Helper function for readSyncBulkPayload(): when replica-side diskless
1439  * database loading is used, Redis makes a backup of the existing databases
1440  * before loading the new ones from the socket.
1441  *
1442  * If the socket loading went wrong, we want to restore the old backups
1443  * into the server databases. */
disklessLoadRestoreBackup(dbBackup * buckup)1444 void disklessLoadRestoreBackup(dbBackup *buckup) {
1445     restoreDbBackup(buckup);
1446 }
1447 
1448 /* Helper function for readSyncBulkPayload() to discard our old backups
1449  * when the loading succeeded. */
disklessLoadDiscardBackup(dbBackup * buckup,int flag)1450 void disklessLoadDiscardBackup(dbBackup *buckup, int flag) {
1451     discardDbBackup(buckup, flag, replicationEmptyDbCallback);
1452 }
1453 
1454 /* Asynchronously read the SYNC payload we receive from a master */
1455 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
readSyncBulkPayload(connection * conn)1456 void readSyncBulkPayload(connection *conn) {
1457     char buf[PROTO_IOBUF_LEN];
1458     ssize_t nread, readlen, nwritten;
1459     int use_diskless_load = useDisklessLoad();
1460     dbBackup *diskless_load_backup = NULL;
1461     int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
1462                                                         EMPTYDB_NO_FLAGS;
1463     off_t left;
1464 
1465     /* Static vars used to hold the EOF mark, and the last bytes received
1466      * from the server: when they match, we reached the end of the transfer. */
1467     static char eofmark[CONFIG_RUN_ID_SIZE];
1468     static char lastbytes[CONFIG_RUN_ID_SIZE];
1469     static int usemark = 0;
1470 
1471     /* If repl_transfer_size == -1 we still have to read the bulk length
1472      * from the master reply. */
1473     if (server.repl_transfer_size == -1) {
1474         if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
1475             serverLog(LL_WARNING,
1476                 "I/O error reading bulk count from MASTER: %s",
1477                 strerror(errno));
1478             goto error;
1479         }
1480 
1481         if (buf[0] == '-') {
1482             serverLog(LL_WARNING,
1483                 "MASTER aborted replication with an error: %s",
1484                 buf+1);
1485             goto error;
1486         } else if (buf[0] == '\0') {
1487             /* At this stage just a newline works as a PING in order to take
1488              * the connection live. So we refresh our last interaction
1489              * timestamp. */
1490             server.repl_transfer_lastio = server.unixtime;
1491             return;
1492         } else if (buf[0] != '$') {
1493             serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1494             goto error;
1495         }
1496 
1497         /* There are two possible forms for the bulk payload. One is the
1498          * usual $<count> bulk format. The other is used for diskless transfers
1499          * when the master does not know beforehand the size of the file to
1500          * transfer. In the latter case, the following format is used:
1501          *
1502          * $EOF:<40 bytes delimiter>
1503          *
1504          * At the end of the file the announced delimiter is transmitted. The
1505          * delimiter is long and random enough that the probability of a
1506          * collision with the actual file content can be ignored. */
1507         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
1508             usemark = 1;
1509             memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
1510             memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
1511             /* Set any repl_transfer_size to avoid entering this code path
1512              * at the next call. */
1513             server.repl_transfer_size = 0;
1514             serverLog(LL_NOTICE,
1515                 "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
1516                 use_diskless_load? "to parser":"to disk");
1517         } else {
1518             usemark = 0;
1519             server.repl_transfer_size = strtol(buf+1,NULL,10);
1520             serverLog(LL_NOTICE,
1521                 "MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
1522                 (long long) server.repl_transfer_size,
1523                 use_diskless_load? "to parser":"to disk");
1524         }
1525         return;
1526     }
1527 
1528     if (!use_diskless_load) {
1529         /* Read the data from the socket, store it to a file and search
1530          * for the EOF. */
1531         if (usemark) {
1532             readlen = sizeof(buf);
1533         } else {
1534             left = server.repl_transfer_size - server.repl_transfer_read;
1535             readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
1536         }
1537 
1538         nread = connRead(conn,buf,readlen);
1539         if (nread <= 0) {
1540             if (connGetState(conn) == CONN_STATE_CONNECTED) {
1541                 /* equivalent to EAGAIN */
1542                 return;
1543             }
1544             serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1545                 (nread == -1) ? strerror(errno) : "connection lost");
1546             cancelReplicationHandshake();
1547             return;
1548         }
1549         server.stat_net_input_bytes += nread;
1550 
1551         /* When a mark is used, we want to detect EOF asap in order to avoid
1552          * writing the EOF mark into the file... */
1553         int eof_reached = 0;
1554 
1555         if (usemark) {
1556             /* Update the last bytes array, and check if it matches our
1557              * delimiter. */
1558             if (nread >= CONFIG_RUN_ID_SIZE) {
1559                 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
1560                        CONFIG_RUN_ID_SIZE);
1561             } else {
1562                 int rem = CONFIG_RUN_ID_SIZE-nread;
1563                 memmove(lastbytes,lastbytes+nread,rem);
1564                 memcpy(lastbytes+rem,buf,nread);
1565             }
1566             if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
1567                 eof_reached = 1;
1568         }
1569 
1570         /* Update the last I/O time for the replication transfer (used in
1571          * order to detect timeouts during replication), and write what we
1572          * got from the socket to the dump file on disk. */
1573         server.repl_transfer_lastio = server.unixtime;
1574         if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
1575             serverLog(LL_WARNING,
1576                 "Write error or short write writing to the DB dump file "
1577                 "needed for MASTER <-> REPLICA synchronization: %s",
1578                 (nwritten == -1) ? strerror(errno) : "short write");
1579             goto error;
1580         }
1581         server.repl_transfer_read += nread;
1582 
1583         /* Delete the last 40 bytes from the file if we reached EOF. */
1584         if (usemark && eof_reached) {
1585             if (ftruncate(server.repl_transfer_fd,
1586                 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
1587             {
1588                 serverLog(LL_WARNING,
1589                     "Error truncating the RDB file received from the master "
1590                     "for SYNC: %s", strerror(errno));
1591                 goto error;
1592             }
1593         }
1594 
1595         /* Sync data on disk from time to time, otherwise at the end of the
1596          * transfer we may suffer a big delay as the memory buffers are copied
1597          * into the actual disk. */
1598         if (server.repl_transfer_read >=
1599             server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1600         {
1601             off_t sync_size = server.repl_transfer_read -
1602                               server.repl_transfer_last_fsync_off;
1603             rdb_fsync_range(server.repl_transfer_fd,
1604                 server.repl_transfer_last_fsync_off, sync_size);
1605             server.repl_transfer_last_fsync_off += sync_size;
1606         }
1607 
1608         /* Check if the transfer is now complete */
1609         if (!usemark) {
1610             if (server.repl_transfer_read == server.repl_transfer_size)
1611                 eof_reached = 1;
1612         }
1613 
1614         /* If the transfer is yet not complete, we need to read more, so
1615          * return ASAP and wait for the handler to be called again. */
1616         if (!eof_reached) return;
1617     }
1618 
1619     /* We reach this point in one of the following cases:
1620      *
1621      * 1. The replica is using diskless replication, that is, it reads data
1622      *    directly from the socket to the Redis memory, without using
1623      *    a temporary RDB file on disk. In that case we just block and
1624      *    read everything from the socket.
1625      *
1626      * 2. Or when we are done reading from the socket to the RDB file, in
1627      *    such case we want just to read the RDB file in memory. */
1628     serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
1629 
1630     /* We need to stop any AOF rewriting child before flusing and parsing
1631      * the RDB, otherwise we'll create a copy-on-write disaster. */
1632     if (server.aof_state != AOF_OFF) stopAppendOnly();
1633 
1634     /* When diskless RDB loading is used by replicas, it may be configured
1635      * in order to save the current DB instead of throwing it away,
1636      * so that we can restore it in case of failed transfer. */
1637     if (use_diskless_load &&
1638         server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
1639     {
1640         /* Create a backup of server.db[] and initialize to empty
1641          * dictionaries. */
1642         diskless_load_backup = disklessLoadMakeBackup();
1643     }
1644     /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
1645      * (Where disklessLoadMakeBackup left server.db empty) because we
1646      * want to execute all the auxiliary logic of emptyDb (Namely,
1647      * fire module events) */
1648     emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
1649 
1650     /* Before loading the DB into memory we need to delete the readable
1651      * handler, otherwise it will get called recursively since
1652      * rdbLoad() will call the event loop to process events from time to
1653      * time for non blocking loading. */
1654     connSetReadHandler(conn, NULL);
1655     serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
1656     rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
1657     if (use_diskless_load) {
1658         rio rdb;
1659         rioInitWithConn(&rdb,conn,server.repl_transfer_size);
1660 
1661         /* Put the socket in blocking mode to simplify RDB transfer.
1662          * We'll restore it when the RDB is received. */
1663         connBlock(conn);
1664         connRecvTimeout(conn, server.repl_timeout*1000);
1665         startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
1666 
1667         if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
1668             /* RDB loading failed. */
1669             serverLog(LL_WARNING,
1670                       "Failed trying to load the MASTER synchronization DB "
1671                       "from socket: %s", strerror(errno));
1672             stopLoading(0);
1673             cancelReplicationHandshake();
1674             rioFreeConn(&rdb, NULL);
1675 
1676             /* Remove the half-loaded data in case we started with
1677              * an empty replica. */
1678             emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
1679 
1680             if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
1681                 /* Restore the backed up databases. */
1682                 disklessLoadRestoreBackup(diskless_load_backup);
1683             }
1684 
1685             /* Note that there's no point in restarting the AOF on SYNC
1686              * failure, it'll be restarted when sync succeeds or the replica
1687              * gets promoted. */
1688             return;
1689         }
1690         stopLoading(1);
1691 
1692         /* RDB loading succeeded if we reach this point. */
1693         if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
1694             /* Delete the backup databases we created before starting to load
1695              * the new RDB. Now the RDB was loaded with success so the old
1696              * data is useless. */
1697             disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
1698         }
1699 
1700         /* Verify the end mark is correct. */
1701         if (usemark) {
1702             if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
1703                 memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
1704             {
1705                 serverLog(LL_WARNING,"Replication stream EOF marker is broken");
1706                 cancelReplicationHandshake();
1707                 rioFreeConn(&rdb, NULL);
1708                 return;
1709             }
1710         }
1711 
1712         /* Cleanup and restore the socket to the original state to continue
1713          * with the normal replication. */
1714         rioFreeConn(&rdb, NULL);
1715         connNonBlock(conn);
1716         connRecvTimeout(conn,0);
1717     } else {
1718         /* Ensure background save doesn't overwrite synced data */
1719         if (server.rdb_child_pid != -1) {
1720             serverLog(LL_NOTICE,
1721                 "Replica is about to load the RDB file received from the "
1722                 "master, but there is a pending RDB child running. "
1723                 "Killing process %ld and removing its temp file to avoid "
1724                 "any race",
1725                     (long) server.rdb_child_pid);
1726             killRDBChild();
1727         }
1728 
1729         /* Make sure the new file (also used for persistence) is fully synced
1730          * (not covered by earlier calls to rdb_fsync_range). */
1731         if (fsync(server.repl_transfer_fd) == -1) {
1732             serverLog(LL_WARNING,
1733                 "Failed trying to sync the temp DB to disk in "
1734                 "MASTER <-> REPLICA synchronization: %s",
1735                 strerror(errno));
1736             cancelReplicationHandshake();
1737             return;
1738         }
1739 
1740         /* Rename rdb like renaming rewrite aof asynchronously. */
1741         int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
1742         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
1743             serverLog(LL_WARNING,
1744                 "Failed trying to rename the temp DB into %s in "
1745                 "MASTER <-> REPLICA synchronization: %s",
1746                 server.rdb_filename, strerror(errno));
1747             cancelReplicationHandshake();
1748             if (old_rdb_fd != -1) close(old_rdb_fd);
1749             return;
1750         }
1751         /* Close old rdb asynchronously. */
1752         if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL);
1753 
1754         if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
1755             serverLog(LL_WARNING,
1756                 "Failed trying to load the MASTER synchronization "
1757                 "DB from disk");
1758             cancelReplicationHandshake();
1759             if (server.rdb_del_sync_files && allPersistenceDisabled()) {
1760                 serverLog(LL_NOTICE,"Removing the RDB file obtained from "
1761                                     "the master. This replica has persistence "
1762                                     "disabled");
1763                 bg_unlink(server.rdb_filename);
1764             }
1765             /* Note that there's no point in restarting the AOF on sync failure,
1766                it'll be restarted when sync succeeds or replica promoted. */
1767             return;
1768         }
1769 
1770         /* Cleanup. */
1771         if (server.rdb_del_sync_files && allPersistenceDisabled()) {
1772             serverLog(LL_NOTICE,"Removing the RDB file obtained from "
1773                                 "the master. This replica has persistence "
1774                                 "disabled");
1775             bg_unlink(server.rdb_filename);
1776         }
1777 
1778         zfree(server.repl_transfer_tmpfile);
1779         close(server.repl_transfer_fd);
1780         server.repl_transfer_fd = -1;
1781         server.repl_transfer_tmpfile = NULL;
1782     }
1783 
1784     /* Final setup of the connected slave <- master link */
1785     replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
1786     server.repl_state = REPL_STATE_CONNECTED;
1787     server.repl_down_since = 0;
1788 
1789     /* Fire the master link modules event. */
1790     moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1791                           REDISMODULE_SUBEVENT_MASTER_LINK_UP,
1792                           NULL);
1793 
1794     /* After a full resynchronization we use the replication ID and
1795      * offset of the master. The secondary ID / offset are cleared since
1796      * we are starting a new history. */
1797     memcpy(server.replid,server.master->replid,sizeof(server.replid));
1798     server.master_repl_offset = server.master->reploff;
1799     clearReplicationId2();
1800 
1801     /* Let's create the replication backlog if needed. Slaves need to
1802      * accumulate the backlog regardless of the fact they have sub-slaves
1803      * or not, in order to behave correctly if they are promoted to
1804      * masters after a failover. */
1805     if (server.repl_backlog == NULL) createReplicationBacklog();
1806     serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
1807 
1808     if (server.supervised_mode == SUPERVISED_SYSTEMD) {
1809         redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections.\n");
1810         redisCommunicateSystemd("READY=1\n");
1811     }
1812 
1813     /* Restart the AOF subsystem now that we finished the sync. This
1814      * will trigger an AOF rewrite, and when done will start appending
1815      * to the new file. */
1816     if (server.aof_enabled) restartAOFAfterSYNC();
1817     return;
1818 
1819 error:
1820     cancelReplicationHandshake();
1821     return;
1822 }
1823 
1824 /* Send a synchronous command to the master. Used to send AUTH and
1825  * REPLCONF commands before starting the replication with SYNC.
1826  *
1827  * The command returns an sds string representing the result of the
1828  * operation. On error the first byte is a "-".
1829  */
1830 #define SYNC_CMD_READ (1<<0)
1831 #define SYNC_CMD_WRITE (1<<1)
1832 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
sendSynchronousCommand(int flags,connection * conn,...)1833 char *sendSynchronousCommand(int flags, connection *conn, ...) {
1834 
1835     /* Create the command to send to the master, we use redis binary
1836      * protocol to make sure correct arguments are sent. This function
1837      * is not safe for all binary data. */
1838     if (flags & SYNC_CMD_WRITE) {
1839         char *arg;
1840         va_list ap;
1841         sds cmd = sdsempty();
1842         sds cmdargs = sdsempty();
1843         size_t argslen = 0;
1844         va_start(ap,conn);
1845 
1846         while(1) {
1847             arg = va_arg(ap, char*);
1848             if (arg == NULL) break;
1849 
1850             cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
1851             argslen++;
1852         }
1853 
1854         va_end(ap);
1855 
1856         cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
1857         cmd = sdscatsds(cmd,cmdargs);
1858         sdsfree(cmdargs);
1859 
1860         /* Transfer command to the server. */
1861         if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
1862             == -1)
1863         {
1864             sdsfree(cmd);
1865             return sdscatprintf(sdsempty(),"-Writing to master: %s",
1866                     connGetLastError(conn));
1867         }
1868         sdsfree(cmd);
1869     }
1870 
1871     /* Read the reply from the server. */
1872     if (flags & SYNC_CMD_READ) {
1873         char buf[256];
1874 
1875         if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000)
1876             == -1)
1877         {
1878             return sdscatprintf(sdsempty(),"-Reading from master: %s",
1879                     strerror(errno));
1880         }
1881         server.repl_transfer_lastio = server.unixtime;
1882         return sdsnew(buf);
1883     }
1884     return NULL;
1885 }
1886 
1887 /* Try a partial resynchronization with the master if we are about to reconnect.
1888  * If there is no cached master structure, at least try to issue a
1889  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1890  * command in order to obtain the master replid and the master replication
1891  * global offset.
1892  *
1893  * This function is designed to be called from syncWithMaster(), so the
1894  * following assumptions are made:
1895  *
1896  * 1) We pass the function an already connected socket "fd".
1897  * 2) This function does not close the file descriptor "fd". However in case
1898  *    of successful partial resynchronization, the function will reuse
1899  *    'fd' as file descriptor of the server.master client structure.
1900  *
1901  * The function is split in two halves: if read_reply is 0, the function
1902  * writes the PSYNC command on the socket, and a new function call is
1903  * needed, with read_reply set to 1, in order to read the reply of the
1904  * command. This is useful in order to support non blocking operations, so
1905  * that we write, return into the event loop, and read when there are data.
1906  *
1907  * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
1908  * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
1909  * with read_reply set to 1. However even when read_reply is set to 1
1910  * the function may return PSYNC_WAIT_REPLY again to signal there were
1911  * insufficient data to read to complete its work. We should re-enter
1912  * into the event loop and wait in such a case.
1913  *
1914  * The function returns:
1915  *
1916  * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
1917  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1918  *                   In this case the master replid and global replication
1919  *                   offset is saved.
1920  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1921  *                      the caller should fall back to SYNC.
1922  * PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
1923  * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1924  * PSYNC_TRY_LATER: Master is currently in a transient error condition.
1925  *
1926  * Notable side effects:
1927  *
1928  * 1) As a side effect of the function call the function removes the readable
1929  *    event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1930  * 2) server.master_initial_offset is set to the right value according
1931  *    to the master reply. This will be used to populate the 'server.master'
1932  *    structure replication offset.
1933  */
1934 
1935 #define PSYNC_WRITE_ERROR 0
1936 #define PSYNC_WAIT_REPLY 1
1937 #define PSYNC_CONTINUE 2
1938 #define PSYNC_FULLRESYNC 3
1939 #define PSYNC_NOT_SUPPORTED 4
1940 #define PSYNC_TRY_LATER 5
slaveTryPartialResynchronization(connection * conn,int read_reply)1941 int slaveTryPartialResynchronization(connection *conn, int read_reply) {
1942     char *psync_replid;
1943     char psync_offset[32];
1944     sds reply;
1945 
1946     /* Writing half */
1947     if (!read_reply) {
1948         /* Initially set master_initial_offset to -1 to mark the current
1949          * master replid and offset as not valid. Later if we'll be able to do
1950          * a FULL resync using the PSYNC command we'll set the offset at the
1951          * right value, so that this information will be propagated to the
1952          * client structure representing the master into server.master. */
1953         server.master_initial_offset = -1;
1954 
1955         if (server.cached_master) {
1956             psync_replid = server.cached_master->replid;
1957             snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1958             serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
1959         } else {
1960             serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
1961             psync_replid = "?";
1962             memcpy(psync_offset,"-1",3);
1963         }
1964 
1965         /* Issue the PSYNC command */
1966         reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
1967         if (reply != NULL) {
1968             serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
1969             sdsfree(reply);
1970             connSetReadHandler(conn, NULL);
1971             return PSYNC_WRITE_ERROR;
1972         }
1973         return PSYNC_WAIT_REPLY;
1974     }
1975 
1976     /* Reading half */
1977     reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
1978     if (sdslen(reply) == 0) {
1979         /* The master may send empty newlines after it receives PSYNC
1980          * and before to reply, just to keep the connection alive. */
1981         sdsfree(reply);
1982         return PSYNC_WAIT_REPLY;
1983     }
1984 
1985     connSetReadHandler(conn, NULL);
1986 
1987     if (!strncmp(reply,"+FULLRESYNC",11)) {
1988         char *replid = NULL, *offset = NULL;
1989 
1990         /* FULL RESYNC, parse the reply in order to extract the replid
1991          * and the replication offset. */
1992         replid = strchr(reply,' ');
1993         if (replid) {
1994             replid++;
1995             offset = strchr(replid,' ');
1996             if (offset) offset++;
1997         }
1998         if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
1999             serverLog(LL_WARNING,
2000                 "Master replied with wrong +FULLRESYNC syntax.");
2001             /* This is an unexpected condition, actually the +FULLRESYNC
2002              * reply means that the master supports PSYNC, but the reply
2003              * format seems wrong. To stay safe we blank the master
2004              * replid to make sure next PSYNCs will fail. */
2005             memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
2006         } else {
2007             memcpy(server.master_replid, replid, offset-replid-1);
2008             server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
2009             server.master_initial_offset = strtoll(offset,NULL,10);
2010             serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
2011                 server.master_replid,
2012                 server.master_initial_offset);
2013         }
2014         /* We are going to full resync, discard the cached master structure. */
2015         replicationDiscardCachedMaster();
2016         sdsfree(reply);
2017         return PSYNC_FULLRESYNC;
2018     }
2019 
2020     if (!strncmp(reply,"+CONTINUE",9)) {
2021         /* Partial resync was accepted. */
2022         serverLog(LL_NOTICE,
2023             "Successful partial resynchronization with master.");
2024 
2025         /* Check the new replication ID advertised by the master. If it
2026          * changed, we need to set the new ID as primary ID, and set or
2027          * secondary ID as the old master ID up to the current offset, so
2028          * that our sub-slaves will be able to PSYNC with us after a
2029          * disconnection. */
2030         char *start = reply+10;
2031         char *end = reply+9;
2032         while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
2033         if (end-start == CONFIG_RUN_ID_SIZE) {
2034             char new[CONFIG_RUN_ID_SIZE+1];
2035             memcpy(new,start,CONFIG_RUN_ID_SIZE);
2036             new[CONFIG_RUN_ID_SIZE] = '\0';
2037 
2038             if (strcmp(new,server.cached_master->replid)) {
2039                 /* Master ID changed. */
2040                 serverLog(LL_WARNING,"Master replication ID changed to %s",new);
2041 
2042                 /* Set the old ID as our ID2, up to the current offset+1. */
2043                 memcpy(server.replid2,server.cached_master->replid,
2044                     sizeof(server.replid2));
2045                 server.second_replid_offset = server.master_repl_offset+1;
2046 
2047                 /* Update the cached master ID and our own primary ID to the
2048                  * new one. */
2049                 memcpy(server.replid,new,sizeof(server.replid));
2050                 memcpy(server.cached_master->replid,new,sizeof(server.replid));
2051 
2052                 /* Disconnect all the sub-slaves: they need to be notified. */
2053                 disconnectSlaves();
2054             }
2055         }
2056 
2057         /* Setup the replication to continue. */
2058         sdsfree(reply);
2059         replicationResurrectCachedMaster(conn);
2060 
2061         /* If this instance was restarted and we read the metadata to
2062          * PSYNC from the persistence file, our replication backlog could
2063          * be still not initialized. Create it. */
2064         if (server.repl_backlog == NULL) createReplicationBacklog();
2065         return PSYNC_CONTINUE;
2066     }
2067 
2068     /* If we reach this point we received either an error (since the master does
2069      * not understand PSYNC or because it is in a special state and cannot
2070      * serve our request), or an unexpected reply from the master.
2071      *
2072      * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
2073      * return PSYNC_TRY_LATER if we believe this is a transient error. */
2074 
2075     if (!strncmp(reply,"-NOMASTERLINK",13) ||
2076         !strncmp(reply,"-LOADING",8))
2077     {
2078         serverLog(LL_NOTICE,
2079             "Master is currently unable to PSYNC "
2080             "but should be in the future: %s", reply);
2081         sdsfree(reply);
2082         return PSYNC_TRY_LATER;
2083     }
2084 
2085     if (strncmp(reply,"-ERR",4)) {
2086         /* If it's not an error, log the unexpected event. */
2087         serverLog(LL_WARNING,
2088             "Unexpected reply to PSYNC from master: %s", reply);
2089     } else {
2090         serverLog(LL_NOTICE,
2091             "Master does not support PSYNC or is in "
2092             "error state (reply: %s)", reply);
2093     }
2094     sdsfree(reply);
2095     replicationDiscardCachedMaster();
2096     return PSYNC_NOT_SUPPORTED;
2097 }
2098 
2099 /* This handler fires when the non blocking connect was able to
2100  * establish a connection with the master. */
syncWithMaster(connection * conn)2101 void syncWithMaster(connection *conn) {
2102     char tmpfile[256], *err = NULL;
2103     int dfd = -1, maxtries = 5;
2104     int psync_result;
2105 
2106     /* If this event fired after the user turned the instance into a master
2107      * with SLAVEOF NO ONE we must just return ASAP. */
2108     if (server.repl_state == REPL_STATE_NONE) {
2109         connClose(conn);
2110         return;
2111     }
2112 
2113     /* Check for errors in the socket: after a non blocking connect() we
2114      * may find that the socket is in error state. */
2115     if (connGetState(conn) != CONN_STATE_CONNECTED) {
2116         serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
2117                 connGetLastError(conn));
2118         goto error;
2119     }
2120 
2121     /* Send a PING to check the master is able to reply without errors. */
2122     if (server.repl_state == REPL_STATE_CONNECTING) {
2123         serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
2124         /* Delete the writable event so that the readable event remains
2125          * registered and we can wait for the PONG reply. */
2126         connSetReadHandler(conn, syncWithMaster);
2127         connSetWriteHandler(conn, NULL);
2128         server.repl_state = REPL_STATE_RECEIVE_PONG;
2129         /* Send the PING, don't check for errors at all, we have the timeout
2130          * that will take care about this. */
2131         err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
2132         if (err) goto write_error;
2133         return;
2134     }
2135 
2136     /* Receive the PONG command. */
2137     if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
2138         err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2139 
2140         /* We accept only two replies as valid, a positive +PONG reply
2141          * (we just check for "+") or an authentication error.
2142          * Note that older versions of Redis replied with "operation not
2143          * permitted" instead of using a proper error code, so we test
2144          * both. */
2145         if (err[0] != '+' &&
2146             strncmp(err,"-NOAUTH",7) != 0 &&
2147             strncmp(err,"-NOPERM",7) != 0 &&
2148             strncmp(err,"-ERR operation not permitted",28) != 0)
2149         {
2150             serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
2151             sdsfree(err);
2152             goto error;
2153         } else {
2154             serverLog(LL_NOTICE,
2155                 "Master replied to PING, replication can continue...");
2156         }
2157         sdsfree(err);
2158         server.repl_state = REPL_STATE_SEND_AUTH;
2159     }
2160 
2161     /* AUTH with the master if required. */
2162     if (server.repl_state == REPL_STATE_SEND_AUTH) {
2163         if (server.masteruser && server.masterauth) {
2164             err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
2165                                          server.masteruser,server.masterauth,NULL);
2166             if (err) goto write_error;
2167             server.repl_state = REPL_STATE_RECEIVE_AUTH;
2168             return;
2169         } else if (server.masterauth) {
2170             err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);
2171             if (err) goto write_error;
2172             server.repl_state = REPL_STATE_RECEIVE_AUTH;
2173             return;
2174         } else {
2175             server.repl_state = REPL_STATE_SEND_PORT;
2176         }
2177     }
2178 
2179     /* Receive AUTH reply. */
2180     if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
2181         err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2182         if (err[0] == '-') {
2183             serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
2184             sdsfree(err);
2185             goto error;
2186         }
2187         sdsfree(err);
2188         server.repl_state = REPL_STATE_SEND_PORT;
2189     }
2190 
2191     /* Set the slave port, so that Master's INFO command can list the
2192      * slave listening port correctly. */
2193     if (server.repl_state == REPL_STATE_SEND_PORT) {
2194         int port;
2195         if (server.slave_announce_port) port = server.slave_announce_port;
2196         else if (server.tls_replication && server.tls_port) port = server.tls_port;
2197         else port = server.port;
2198         sds portstr = sdsfromlonglong(port);
2199         err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2200                 "listening-port",portstr, NULL);
2201         sdsfree(portstr);
2202         if (err) goto write_error;
2203         sdsfree(err);
2204         server.repl_state = REPL_STATE_RECEIVE_PORT;
2205         return;
2206     }
2207 
2208     /* Receive REPLCONF listening-port reply. */
2209     if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
2210         err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2211         /* Ignore the error if any, not all the Redis versions support
2212          * REPLCONF listening-port. */
2213         if (err[0] == '-') {
2214             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2215                                 "REPLCONF listening-port: %s", err);
2216         }
2217         sdsfree(err);
2218         server.repl_state = REPL_STATE_SEND_IP;
2219     }
2220 
2221     /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
2222     if (server.repl_state == REPL_STATE_SEND_IP &&
2223         server.slave_announce_ip == NULL)
2224     {
2225             server.repl_state = REPL_STATE_SEND_CAPA;
2226     }
2227 
2228     /* Set the slave ip, so that Master's INFO command can list the
2229      * slave IP address port correctly in case of port forwarding or NAT. */
2230     if (server.repl_state == REPL_STATE_SEND_IP) {
2231         err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2232                 "ip-address",server.slave_announce_ip, NULL);
2233         if (err) goto write_error;
2234         sdsfree(err);
2235         server.repl_state = REPL_STATE_RECEIVE_IP;
2236         return;
2237     }
2238 
2239     /* Receive REPLCONF ip-address reply. */
2240     if (server.repl_state == REPL_STATE_RECEIVE_IP) {
2241         err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2242         /* Ignore the error if any, not all the Redis versions support
2243          * REPLCONF listening-port. */
2244         if (err[0] == '-') {
2245             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2246                                 "REPLCONF ip-address: %s", err);
2247         }
2248         sdsfree(err);
2249         server.repl_state = REPL_STATE_SEND_CAPA;
2250     }
2251 
2252     /* Inform the master of our (slave) capabilities.
2253      *
2254      * EOF: supports EOF-style RDB transfer for diskless replication.
2255      * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
2256      *
2257      * The master will ignore capabilities it does not understand. */
2258     if (server.repl_state == REPL_STATE_SEND_CAPA) {
2259         err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2260                 "capa","eof","capa","psync2",NULL);
2261         if (err) goto write_error;
2262         sdsfree(err);
2263         server.repl_state = REPL_STATE_RECEIVE_CAPA;
2264         return;
2265     }
2266 
2267     /* Receive CAPA reply. */
2268     if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
2269         err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2270         /* Ignore the error if any, not all the Redis versions support
2271          * REPLCONF capa. */
2272         if (err[0] == '-') {
2273             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2274                                   "REPLCONF capa: %s", err);
2275         }
2276         sdsfree(err);
2277         server.repl_state = REPL_STATE_SEND_PSYNC;
2278     }
2279 
2280     /* Try a partial resynchonization. If we don't have a cached master
2281      * slaveTryPartialResynchronization() will at least try to use PSYNC
2282      * to start a full resynchronization so that we get the master replid
2283      * and the global offset, to try a partial resync at the next
2284      * reconnection attempt. */
2285     if (server.repl_state == REPL_STATE_SEND_PSYNC) {
2286         if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
2287             err = sdsnew("Write error sending the PSYNC command.");
2288             goto write_error;
2289         }
2290         server.repl_state = REPL_STATE_RECEIVE_PSYNC;
2291         return;
2292     }
2293 
2294     /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
2295     if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
2296         serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
2297                              "state should be RECEIVE_PSYNC but is %d",
2298                              server.repl_state);
2299         goto error;
2300     }
2301 
2302     psync_result = slaveTryPartialResynchronization(conn,1);
2303     if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
2304 
2305     /* If the master is in an transient error, we should try to PSYNC
2306      * from scratch later, so go to the error path. This happens when
2307      * the server is loading the dataset or is not connected with its
2308      * master and so forth. */
2309     if (psync_result == PSYNC_TRY_LATER) goto error;
2310 
2311     /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
2312      * uninstalling the read handler from the file descriptor. */
2313 
2314     if (psync_result == PSYNC_CONTINUE) {
2315         serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
2316         if (server.supervised_mode == SUPERVISED_SYSTEMD) {
2317             redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
2318             redisCommunicateSystemd("READY=1\n");
2319         }
2320         return;
2321     }
2322 
2323     /* PSYNC failed or is not supported: we want our slaves to resync with us
2324      * as well, if we have any sub-slaves. The master may transfer us an
2325      * entirely different data set and we have no way to incrementally feed
2326      * our slaves after that. */
2327     disconnectSlaves(); /* Force our slaves to resync with us as well. */
2328     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
2329 
2330     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
2331      * and the server.master_replid and master_initial_offset are
2332      * already populated. */
2333     if (psync_result == PSYNC_NOT_SUPPORTED) {
2334         serverLog(LL_NOTICE,"Retrying with SYNC...");
2335         if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
2336             serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
2337                 strerror(errno));
2338             goto error;
2339         }
2340     }
2341 
2342     /* Prepare a suitable temp file for bulk transfer */
2343     if (!useDisklessLoad()) {
2344         while(maxtries--) {
2345             snprintf(tmpfile,256,
2346                 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
2347             dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
2348             if (dfd != -1) break;
2349             sleep(1);
2350         }
2351         if (dfd == -1) {
2352             serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
2353             goto error;
2354         }
2355         server.repl_transfer_tmpfile = zstrdup(tmpfile);
2356         server.repl_transfer_fd = dfd;
2357     }
2358 
2359     /* Setup the non blocking download of the bulk file. */
2360     if (connSetReadHandler(conn, readSyncBulkPayload)
2361             == C_ERR)
2362     {
2363         char conninfo[CONN_INFO_LEN];
2364         serverLog(LL_WARNING,
2365             "Can't create readable event for SYNC: %s (%s)",
2366             strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
2367         goto error;
2368     }
2369 
2370     server.repl_state = REPL_STATE_TRANSFER;
2371     server.repl_transfer_size = -1;
2372     server.repl_transfer_read = 0;
2373     server.repl_transfer_last_fsync_off = 0;
2374     server.repl_transfer_lastio = server.unixtime;
2375     return;
2376 
2377 error:
2378     if (dfd != -1) close(dfd);
2379     connClose(conn);
2380     server.repl_transfer_s = NULL;
2381     if (server.repl_transfer_fd != -1)
2382         close(server.repl_transfer_fd);
2383     if (server.repl_transfer_tmpfile)
2384         zfree(server.repl_transfer_tmpfile);
2385     server.repl_transfer_tmpfile = NULL;
2386     server.repl_transfer_fd = -1;
2387     server.repl_state = REPL_STATE_CONNECT;
2388     return;
2389 
2390 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
2391     serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
2392     sdsfree(err);
2393     goto error;
2394 }
2395 
connectWithMaster(void)2396 int connectWithMaster(void) {
2397     server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
2398     if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
2399                 NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
2400         serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
2401                 connGetLastError(server.repl_transfer_s));
2402         connClose(server.repl_transfer_s);
2403         server.repl_transfer_s = NULL;
2404         return C_ERR;
2405     }
2406 
2407 
2408     server.repl_transfer_lastio = server.unixtime;
2409     server.repl_state = REPL_STATE_CONNECTING;
2410     return C_OK;
2411 }
2412 
2413 /* This function can be called when a non blocking connection is currently
2414  * in progress to undo it.
2415  * Never call this function directly, use cancelReplicationHandshake() instead.
2416  */
undoConnectWithMaster(void)2417 void undoConnectWithMaster(void) {
2418     connClose(server.repl_transfer_s);
2419     server.repl_transfer_s = NULL;
2420 }
2421 
2422 /* Abort the async download of the bulk dataset while SYNC-ing with master.
2423  * Never call this function directly, use cancelReplicationHandshake() instead.
2424  */
replicationAbortSyncTransfer(void)2425 void replicationAbortSyncTransfer(void) {
2426     serverAssert(server.repl_state == REPL_STATE_TRANSFER);
2427     undoConnectWithMaster();
2428     if (server.repl_transfer_fd!=-1) {
2429         close(server.repl_transfer_fd);
2430         bg_unlink(server.repl_transfer_tmpfile);
2431         zfree(server.repl_transfer_tmpfile);
2432         server.repl_transfer_tmpfile = NULL;
2433         server.repl_transfer_fd = -1;
2434     }
2435 }
2436 
2437 /* This function aborts a non blocking replication attempt if there is one
2438  * in progress, by canceling the non-blocking connect attempt or
2439  * the initial bulk transfer.
2440  *
2441  * If there was a replication handshake in progress 1 is returned and
2442  * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
2443  *
2444  * Otherwise zero is returned and no operation is performed at all. */
cancelReplicationHandshake(void)2445 int cancelReplicationHandshake(void) {
2446     if (server.repl_state == REPL_STATE_TRANSFER) {
2447         replicationAbortSyncTransfer();
2448         server.repl_state = REPL_STATE_CONNECT;
2449     } else if (server.repl_state == REPL_STATE_CONNECTING ||
2450                slaveIsInHandshakeState())
2451     {
2452         undoConnectWithMaster();
2453         server.repl_state = REPL_STATE_CONNECT;
2454     } else {
2455         return 0;
2456     }
2457     return 1;
2458 }
2459 
2460 /* Set replication to the specified master address and port. */
replicationSetMaster(char * ip,int port)2461 void replicationSetMaster(char *ip, int port) {
2462     int was_master = server.masterhost == NULL;
2463 
2464     sdsfree(server.masterhost);
2465     server.masterhost = sdsnew(ip);
2466     server.masterport = port;
2467     if (server.master) {
2468         freeClient(server.master);
2469     }
2470     disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
2471 
2472     /* Update oom_score_adj */
2473     setOOMScoreAdj(-1);
2474 
2475     /* Force our slaves to resync with us as well. They may hopefully be able
2476      * to partially resync with us, but we can notify the replid change. */
2477     disconnectSlaves();
2478     cancelReplicationHandshake();
2479     /* Before destroying our master state, create a cached master using
2480      * our own parameters, to later PSYNC with the new master. */
2481     if (was_master) {
2482         replicationDiscardCachedMaster();
2483         replicationCacheMasterUsingMyself();
2484     }
2485 
2486     /* Fire the role change modules event. */
2487     moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
2488                           REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
2489                           NULL);
2490 
2491     /* Fire the master link modules event. */
2492     if (server.repl_state == REPL_STATE_CONNECTED)
2493         moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2494                               REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2495                               NULL);
2496 
2497     server.repl_state = REPL_STATE_CONNECT;
2498 }
2499 
2500 /* Cancel replication, setting the instance as a master itself. */
replicationUnsetMaster(void)2501 void replicationUnsetMaster(void) {
2502     if (server.masterhost == NULL) return; /* Nothing to do. */
2503 
2504     /* Fire the master link modules event. */
2505     if (server.repl_state == REPL_STATE_CONNECTED)
2506         moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2507                               REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2508                               NULL);
2509 
2510     sdsfree(server.masterhost);
2511     server.masterhost = NULL;
2512     if (server.master) freeClient(server.master);
2513     replicationDiscardCachedMaster();
2514     cancelReplicationHandshake();
2515     /* When a slave is turned into a master, the current replication ID
2516      * (that was inherited from the master at synchronization time) is
2517      * used as secondary ID up to the current offset, and a new replication
2518      * ID is created to continue with a new replication history.
2519      *
2520      * NOTE: this function MUST be called after we call
2521      * freeClient(server.master), since there we adjust the replication
2522      * offset trimming the final PINGs. See Github issue #7320. */
2523     shiftReplicationId();
2524     /* Disconnecting all the slaves is required: we need to inform slaves
2525      * of the replication ID change (see shiftReplicationId() call). However
2526      * the slaves will be able to partially resync with us, so it will be
2527      * a very fast reconnection. */
2528     disconnectSlaves();
2529     server.repl_state = REPL_STATE_NONE;
2530 
2531     /* We need to make sure the new master will start the replication stream
2532      * with a SELECT statement. This is forced after a full resync, but
2533      * with PSYNC version 2, there is no need for full resync after a
2534      * master switch. */
2535     server.slaveseldb = -1;
2536 
2537     /* Update oom_score_adj */
2538     setOOMScoreAdj(-1);
2539 
2540     /* Once we turn from slave to master, we consider the starting time without
2541      * slaves (that is used to count the replication backlog time to live) as
2542      * starting from now. Otherwise the backlog will be freed after a
2543      * failover if slaves do not connect immediately. */
2544     server.repl_no_slaves_since = server.unixtime;
2545 
2546     /* Fire the role change modules event. */
2547     moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
2548                           REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
2549                           NULL);
2550 
2551     /* Restart the AOF subsystem in case we shut it down during a sync when
2552      * we were still a slave. */
2553     if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
2554 }
2555 
2556 /* This function is called when the slave lose the connection with the
2557  * master into an unexpected way. */
replicationHandleMasterDisconnection(void)2558 void replicationHandleMasterDisconnection(void) {
2559     /* Fire the master link modules event. */
2560     if (server.repl_state == REPL_STATE_CONNECTED)
2561         moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2562                               REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2563                               NULL);
2564 
2565     server.master = NULL;
2566     server.repl_state = REPL_STATE_CONNECT;
2567     server.repl_down_since = server.unixtime;
2568     /* We lost connection with our master, don't disconnect slaves yet,
2569      * maybe we'll be able to PSYNC with our master later. We'll disconnect
2570      * the slaves only if we'll have to do a full resync with our master. */
2571 }
2572 
replicaofCommand(client * c)2573 void replicaofCommand(client *c) {
2574     /* SLAVEOF is not allowed in cluster mode as replication is automatically
2575      * configured using the current address of the master node. */
2576     if (server.cluster_enabled) {
2577         addReplyError(c,"REPLICAOF not allowed in cluster mode.");
2578         return;
2579     }
2580 
2581     /* The special host/port combination "NO" "ONE" turns the instance
2582      * into a master. Otherwise the new master address is set. */
2583     if (!strcasecmp(c->argv[1]->ptr,"no") &&
2584         !strcasecmp(c->argv[2]->ptr,"one")) {
2585         if (server.masterhost) {
2586             replicationUnsetMaster();
2587             sds client = catClientInfoString(sdsempty(),c);
2588             serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
2589                 client);
2590             sdsfree(client);
2591         }
2592     } else {
2593         long port;
2594 
2595         if (c->flags & CLIENT_SLAVE)
2596         {
2597             /* If a client is already a replica they cannot run this command,
2598              * because it involves flushing all replicas (including this
2599              * client) */
2600             addReplyError(c, "Command is not valid when client is a replica.");
2601             return;
2602         }
2603 
2604         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
2605             return;
2606 
2607         /* Check if we are already attached to the specified slave */
2608         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
2609             && server.masterport == port) {
2610             serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
2611                                 "with the master we are already connected "
2612                                 "with. No operation performed.");
2613             addReplySds(c,sdsnew("+OK Already connected to specified "
2614                                  "master\r\n"));
2615             return;
2616         }
2617         /* There was no previous master or the user specified a different one,
2618          * we can continue. */
2619         replicationSetMaster(c->argv[1]->ptr, port);
2620         sds client = catClientInfoString(sdsempty(),c);
2621         serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
2622             server.masterhost, server.masterport, client);
2623         sdsfree(client);
2624     }
2625     addReply(c,shared.ok);
2626 }
2627 
2628 /* ROLE command: provide information about the role of the instance
2629  * (master or slave) and additional information related to replication
2630  * in an easy to process format. */
roleCommand(client * c)2631 void roleCommand(client *c) {
2632     if (server.masterhost == NULL) {
2633         listIter li;
2634         listNode *ln;
2635         void *mbcount;
2636         int slaves = 0;
2637 
2638         addReplyArrayLen(c,3);
2639         addReplyBulkCBuffer(c,"master",6);
2640         addReplyLongLong(c,server.master_repl_offset);
2641         mbcount = addReplyDeferredLen(c);
2642         listRewind(server.slaves,&li);
2643         while((ln = listNext(&li))) {
2644             client *slave = ln->value;
2645             char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
2646 
2647             if (slaveip[0] == '\0') {
2648                 if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1)
2649                     continue;
2650                 slaveip = ip;
2651             }
2652             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2653             addReplyArrayLen(c,3);
2654             addReplyBulkCString(c,slaveip);
2655             addReplyBulkLongLong(c,slave->slave_listening_port);
2656             addReplyBulkLongLong(c,slave->repl_ack_off);
2657             slaves++;
2658         }
2659         setDeferredArrayLen(c,mbcount,slaves);
2660     } else {
2661         char *slavestate = NULL;
2662 
2663         addReplyArrayLen(c,5);
2664         addReplyBulkCBuffer(c,"slave",5);
2665         addReplyBulkCString(c,server.masterhost);
2666         addReplyLongLong(c,server.masterport);
2667         if (slaveIsInHandshakeState()) {
2668             slavestate = "handshake";
2669         } else {
2670             switch(server.repl_state) {
2671             case REPL_STATE_NONE: slavestate = "none"; break;
2672             case REPL_STATE_CONNECT: slavestate = "connect"; break;
2673             case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
2674             case REPL_STATE_TRANSFER: slavestate = "sync"; break;
2675             case REPL_STATE_CONNECTED: slavestate = "connected"; break;
2676             default: slavestate = "unknown"; break;
2677             }
2678         }
2679         addReplyBulkCString(c,slavestate);
2680         addReplyLongLong(c,server.master ? server.master->reploff : -1);
2681     }
2682 }
2683 
2684 /* Send a REPLCONF ACK command to the master to inform it about the current
2685  * processed offset. If we are not connected with a master, the command has
2686  * no effects. */
replicationSendAck(void)2687 void replicationSendAck(void) {
2688     client *c = server.master;
2689 
2690     if (c != NULL) {
2691         c->flags |= CLIENT_MASTER_FORCE_REPLY;
2692         addReplyArrayLen(c,3);
2693         addReplyBulkCString(c,"REPLCONF");
2694         addReplyBulkCString(c,"ACK");
2695         addReplyBulkLongLong(c,c->reploff);
2696         c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
2697     }
2698 }
2699 
2700 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
2701 
2702 /* In order to implement partial synchronization we need to be able to cache
2703  * our master's client structure after a transient disconnection.
2704  * It is cached into server.cached_master and flushed away using the following
2705  * functions. */
2706 
2707 /* This function is called by freeClient() in order to cache the master
2708  * client structure instead of destroying it. freeClient() will return
2709  * ASAP after this function returns, so every action needed to avoid problems
2710  * with a client that is really "suspended" has to be done by this function.
2711  *
2712  * The other functions that will deal with the cached master are:
2713  *
2714  * replicationDiscardCachedMaster() that will make sure to kill the client
2715  * as for some reason we don't want to use it in the future.
2716  *
2717  * replicationResurrectCachedMaster() that is used after a successful PSYNC
2718  * handshake in order to reactivate the cached master.
2719  */
replicationCacheMaster(client * c)2720 void replicationCacheMaster(client *c) {
2721     serverAssert(server.master != NULL && server.cached_master == NULL);
2722     serverLog(LL_NOTICE,"Caching the disconnected master state.");
2723 
2724     /* Unlink the client from the server structures. */
2725     unlinkClient(c);
2726 
2727     /* Reset the master client so that's ready to accept new commands:
2728      * we want to discard te non processed query buffers and non processed
2729      * offsets, including pending transactions, already populated arguments,
2730      * pending outputs to the master. */
2731     sdsclear(server.master->querybuf);
2732     sdsclear(server.master->pending_querybuf);
2733     server.master->read_reploff = server.master->reploff;
2734     if (c->flags & CLIENT_MULTI) discardTransaction(c);
2735     listEmpty(c->reply);
2736     c->sentlen = 0;
2737     c->reply_bytes = 0;
2738     c->bufpos = 0;
2739     resetClient(c);
2740 
2741     /* Save the master. Server.master will be set to null later by
2742      * replicationHandleMasterDisconnection(). */
2743     server.cached_master = server.master;
2744 
2745     /* Invalidate the Peer ID cache. */
2746     if (c->peerid) {
2747         sdsfree(c->peerid);
2748         c->peerid = NULL;
2749     }
2750 
2751     /* Caching the master happens instead of the actual freeClient() call,
2752      * so make sure to adjust the replication state. This function will
2753      * also set server.master to NULL. */
2754     replicationHandleMasterDisconnection();
2755 }
2756 
2757 /* This function is called when a master is turend into a slave, in order to
2758  * create from scratch a cached master for the new client, that will allow
2759  * to PSYNC with the slave that was promoted as the new master after a
2760  * failover.
2761  *
2762  * Assuming this instance was previously the master instance of the new master,
2763  * the new master will accept its replication ID, and potentiall also the
2764  * current offset if no data was lost during the failover. So we use our
2765  * current replication ID and offset in order to synthesize a cached master. */
replicationCacheMasterUsingMyself(void)2766 void replicationCacheMasterUsingMyself(void) {
2767     serverLog(LL_NOTICE,
2768         "Before turning into a replica, using my own master parameters "
2769         "to synthesize a cached master: I may be able to synchronize with "
2770         "the new master with just a partial transfer.");
2771 
2772     /* This will be used to populate the field server.master->reploff
2773      * by replicationCreateMasterClient(). We'll later set the created
2774      * master as server.cached_master, so the replica will use such
2775      * offset for PSYNC. */
2776     server.master_initial_offset = server.master_repl_offset;
2777 
2778     /* The master client we create can be set to any DBID, because
2779      * the new master will start its replication stream with SELECT. */
2780     replicationCreateMasterClient(NULL,-1);
2781 
2782     /* Use our own ID / offset. */
2783     memcpy(server.master->replid, server.replid, sizeof(server.replid));
2784 
2785     /* Set as cached master. */
2786     unlinkClient(server.master);
2787     server.cached_master = server.master;
2788     server.master = NULL;
2789 }
2790 
2791 /* Free a cached master, called when there are no longer the conditions for
2792  * a partial resync on reconnection. */
replicationDiscardCachedMaster(void)2793 void replicationDiscardCachedMaster(void) {
2794     if (server.cached_master == NULL) return;
2795 
2796     serverLog(LL_NOTICE,"Discarding previously cached master state.");
2797     server.cached_master->flags &= ~CLIENT_MASTER;
2798     freeClient(server.cached_master);
2799     server.cached_master = NULL;
2800 }
2801 
2802 /* Turn the cached master into the current master, using the file descriptor
2803  * passed as argument as the socket for the new master.
2804  *
2805  * This function is called when successfully setup a partial resynchronization
2806  * so the stream of data that we'll receive will start from were this
2807  * master left. */
replicationResurrectCachedMaster(connection * conn)2808 void replicationResurrectCachedMaster(connection *conn) {
2809     server.master = server.cached_master;
2810     server.cached_master = NULL;
2811     server.master->conn = conn;
2812     connSetPrivateData(server.master->conn, server.master);
2813     server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
2814     server.master->authenticated = 1;
2815     server.master->lastinteraction = server.unixtime;
2816     server.repl_state = REPL_STATE_CONNECTED;
2817     server.repl_down_since = 0;
2818 
2819     /* Fire the master link modules event. */
2820     moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2821                           REDISMODULE_SUBEVENT_MASTER_LINK_UP,
2822                           NULL);
2823 
2824     /* Re-add to the list of clients. */
2825     linkClient(server.master);
2826     if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
2827         serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
2828         freeClientAsync(server.master); /* Close ASAP. */
2829     }
2830 
2831     /* We may also need to install the write handler as well if there is
2832      * pending data in the write buffers. */
2833     if (clientHasPendingReplies(server.master)) {
2834         if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
2835             serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
2836             freeClientAsync(server.master); /* Close ASAP. */
2837         }
2838     }
2839 }
2840 
2841 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
2842 
2843 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
2844  * If the option is active, the server will prevent writes if there are not
2845  * enough connected slaves with the specified lag (or less). */
refreshGoodSlavesCount(void)2846 void refreshGoodSlavesCount(void) {
2847     listIter li;
2848     listNode *ln;
2849     int good = 0;
2850 
2851     if (!server.repl_min_slaves_to_write ||
2852         !server.repl_min_slaves_max_lag) return;
2853 
2854     listRewind(server.slaves,&li);
2855     while((ln = listNext(&li))) {
2856         client *slave = ln->value;
2857         time_t lag = server.unixtime - slave->repl_ack_time;
2858 
2859         if (slave->replstate == SLAVE_STATE_ONLINE &&
2860             lag <= server.repl_min_slaves_max_lag) good++;
2861     }
2862     server.repl_good_slaves_count = good;
2863 }
2864 
2865 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
2866  * The goal of this code is to keep track of scripts already sent to every
2867  * connected slave, in order to be able to replicate EVALSHA as it is without
2868  * translating it to EVAL every time it is possible.
2869  *
2870  * We use a capped collection implemented by a hash table for fast lookup
2871  * of scripts we can send as EVALSHA, plus a linked list that is used for
2872  * eviction of the oldest entry when the max number of items is reached.
2873  *
2874  * We don't care about taking a different cache for every different slave
2875  * since to fill the cache again is not very costly, the goal of this code
2876  * is to avoid that the same big script is transmitted a big number of times
2877  * per second wasting bandwidth and processor speed, but it is not a problem
2878  * if we need to rebuild the cache from scratch from time to time, every used
2879  * script will need to be transmitted a single time to reappear in the cache.
2880  *
2881  * This is how the system works:
2882  *
2883  * 1) Every time a new slave connects, we flush the whole script cache.
2884  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
2885  *    trying to convert EVAL into EVALSHA specifically for slaves.
2886  * 3) Every time we transmit a script as EVAL to the slaves, we also add the
2887  *    corresponding SHA1 of the script into the cache as we are sure every
2888  *    slave knows about the script starting from now.
2889  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
2890  *    and at the same time flush the script cache.
2891  * 5) When the last slave disconnects, flush the cache.
2892  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
2893  *    in the master sometimes.
2894  */
2895 
2896 /* Initialize the script cache, only called at startup. */
replicationScriptCacheInit(void)2897 void replicationScriptCacheInit(void) {
2898     server.repl_scriptcache_size = 10000;
2899     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
2900     server.repl_scriptcache_fifo = listCreate();
2901 }
2902 
2903 /* Empty the script cache. Should be called every time we are no longer sure
2904  * that every slave knows about all the scripts in our set, or when the
2905  * current AOF "context" is no longer aware of the script. In general we
2906  * should flush the cache:
2907  *
2908  * 1) Every time a new slave reconnects to this master and performs a
2909  *    full SYNC (PSYNC does not require flushing).
2910  * 2) Every time an AOF rewrite is performed.
2911  * 3) Every time we are left without slaves at all, and AOF is off, in order
2912  *    to reclaim otherwise unused memory.
2913  */
replicationScriptCacheFlush(void)2914 void replicationScriptCacheFlush(void) {
2915     dictEmpty(server.repl_scriptcache_dict,NULL);
2916     listRelease(server.repl_scriptcache_fifo);
2917     server.repl_scriptcache_fifo = listCreate();
2918 }
2919 
2920 /* Add an entry into the script cache, if we reach max number of entries the
2921  * oldest is removed from the list. */
replicationScriptCacheAdd(sds sha1)2922 void replicationScriptCacheAdd(sds sha1) {
2923     int retval;
2924     sds key = sdsdup(sha1);
2925 
2926     /* Evict oldest. */
2927     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
2928     {
2929         listNode *ln = listLast(server.repl_scriptcache_fifo);
2930         sds oldest = listNodeValue(ln);
2931 
2932         retval = dictDelete(server.repl_scriptcache_dict,oldest);
2933         serverAssert(retval == DICT_OK);
2934         listDelNode(server.repl_scriptcache_fifo,ln);
2935     }
2936 
2937     /* Add current. */
2938     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
2939     listAddNodeHead(server.repl_scriptcache_fifo,key);
2940     serverAssert(retval == DICT_OK);
2941 }
2942 
2943 /* Returns non-zero if the specified entry exists inside the cache, that is,
2944  * if all the slaves are aware of this script SHA1. */
replicationScriptCacheExists(sds sha1)2945 int replicationScriptCacheExists(sds sha1) {
2946     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
2947 }
2948 
2949 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2950  * Redis synchronous replication design can be summarized in points:
2951  *
2952  * - Redis masters have a global replication offset, used by PSYNC.
2953  * - Master increment the offset every time new commands are sent to slaves.
2954  * - Slaves ping back masters with the offset processed so far.
2955  *
2956  * So synchronous replication adds a new WAIT command in the form:
2957  *
2958  *   WAIT <num_replicas> <milliseconds_timeout>
2959  *
2960  * That returns the number of replicas that processed the query when
2961  * we finally have at least num_replicas, or when the timeout was
2962  * reached.
2963  *
2964  * The command is implemented in this way:
2965  *
2966  * - Every time a client processes a command, we remember the replication
2967  *   offset after sending that command to the slaves.
2968  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2969  *   The client is blocked at the same time (see blocked.c).
2970  * - Once we receive enough ACKs for a given offset or when the timeout
2971  *   is reached, the WAIT command is unblocked and the reply sent to the
2972  *   client.
2973  */
2974 
2975 /* This just set a flag so that we broadcast a REPLCONF GETACK command
2976  * to all the slaves in the beforeSleep() function. Note that this way
2977  * we "group" all the clients that want to wait for synchronous replication
2978  * in a given event loop iteration, and send a single GETACK for them all. */
replicationRequestAckFromSlaves(void)2979 void replicationRequestAckFromSlaves(void) {
2980     server.get_ack_from_slaves = 1;
2981 }
2982 
2983 /* Return the number of slaves that already acknowledged the specified
2984  * replication offset. */
replicationCountAcksByOffset(long long offset)2985 int replicationCountAcksByOffset(long long offset) {
2986     listIter li;
2987     listNode *ln;
2988     int count = 0;
2989 
2990     listRewind(server.slaves,&li);
2991     while((ln = listNext(&li))) {
2992         client *slave = ln->value;
2993 
2994         if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2995         if (slave->repl_ack_off >= offset) count++;
2996     }
2997     return count;
2998 }
2999 
3000 /* WAIT for N replicas to acknowledge the processing of our latest
3001  * write command (and all the previous commands). */
waitCommand(client * c)3002 void waitCommand(client *c) {
3003     mstime_t timeout;
3004     long numreplicas, ackreplicas;
3005     long long offset = c->woff;
3006 
3007     if (server.masterhost) {
3008         addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
3009         return;
3010     }
3011 
3012     /* Argument parsing. */
3013     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
3014         return;
3015     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
3016         != C_OK) return;
3017 
3018     /* First try without blocking at all. */
3019     ackreplicas = replicationCountAcksByOffset(c->woff);
3020     if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
3021         addReplyLongLong(c,ackreplicas);
3022         return;
3023     }
3024 
3025     /* Otherwise block the client and put it into our list of clients
3026      * waiting for ack from slaves. */
3027     c->bpop.timeout = timeout;
3028     c->bpop.reploffset = offset;
3029     c->bpop.numreplicas = numreplicas;
3030     listAddNodeTail(server.clients_waiting_acks,c);
3031     blockClient(c,BLOCKED_WAIT);
3032 
3033     /* Make sure that the server will send an ACK request to all the slaves
3034      * before returning to the event loop. */
3035     replicationRequestAckFromSlaves();
3036 }
3037 
3038 /* This is called by unblockClient() to perform the blocking op type
3039  * specific cleanup. We just remove the client from the list of clients
3040  * waiting for replica acks. Never call it directly, call unblockClient()
3041  * instead. */
unblockClientWaitingReplicas(client * c)3042 void unblockClientWaitingReplicas(client *c) {
3043     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
3044     serverAssert(ln != NULL);
3045     listDelNode(server.clients_waiting_acks,ln);
3046 }
3047 
3048 /* Check if there are clients blocked in WAIT that can be unblocked since
3049  * we received enough ACKs from slaves. */
processClientsWaitingReplicas(void)3050 void processClientsWaitingReplicas(void) {
3051     long long last_offset = 0;
3052     int last_numreplicas = 0;
3053 
3054     listIter li;
3055     listNode *ln;
3056 
3057     listRewind(server.clients_waiting_acks,&li);
3058     while((ln = listNext(&li))) {
3059         client *c = ln->value;
3060 
3061         /* Every time we find a client that is satisfied for a given
3062          * offset and number of replicas, we remember it so the next client
3063          * may be unblocked without calling replicationCountAcksByOffset()
3064          * if the requested offset / replicas were equal or less. */
3065         if (last_offset && last_offset > c->bpop.reploffset &&
3066                            last_numreplicas > c->bpop.numreplicas)
3067         {
3068             unblockClient(c);
3069             addReplyLongLong(c,last_numreplicas);
3070         } else {
3071             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
3072 
3073             if (numreplicas >= c->bpop.numreplicas) {
3074                 last_offset = c->bpop.reploffset;
3075                 last_numreplicas = numreplicas;
3076                 unblockClient(c);
3077                 addReplyLongLong(c,numreplicas);
3078             }
3079         }
3080     }
3081 }
3082 
3083 /* Return the slave replication offset for this instance, that is
3084  * the offset for which we already processed the master replication stream. */
replicationGetSlaveOffset(void)3085 long long replicationGetSlaveOffset(void) {
3086     long long offset = 0;
3087 
3088     if (server.masterhost != NULL) {
3089         if (server.master) {
3090             offset = server.master->reploff;
3091         } else if (server.cached_master) {
3092             offset = server.cached_master->reploff;
3093         }
3094     }
3095     /* offset may be -1 when the master does not support it at all, however
3096      * this function is designed to return an offset that can express the
3097      * amount of data processed by the master, so we return a positive
3098      * integer. */
3099     if (offset < 0) offset = 0;
3100     return offset;
3101 }
3102 
3103 /* --------------------------- REPLICATION CRON  ---------------------------- */
3104 
3105 /* Replication cron function, called 1 time per second. */
replicationCron(void)3106 void replicationCron(void) {
3107     static long long replication_cron_loops = 0;
3108 
3109     /* Non blocking connection timeout? */
3110     if (server.masterhost &&
3111         (server.repl_state == REPL_STATE_CONNECTING ||
3112          slaveIsInHandshakeState()) &&
3113          (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
3114     {
3115         serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
3116         cancelReplicationHandshake();
3117     }
3118 
3119     /* Bulk transfer I/O timeout? */
3120     if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
3121         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
3122     {
3123         serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
3124         cancelReplicationHandshake();
3125     }
3126 
3127     /* Timed out master when we are an already connected slave? */
3128     if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
3129         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
3130     {
3131         serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
3132         freeClient(server.master);
3133     }
3134 
3135     /* Check if we should connect to a MASTER */
3136     if (server.repl_state == REPL_STATE_CONNECT) {
3137         serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
3138             server.masterhost, server.masterport);
3139         if (connectWithMaster() == C_OK) {
3140             serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
3141         }
3142     }
3143 
3144     /* Send ACK to master from time to time.
3145      * Note that we do not send periodic acks to masters that don't
3146      * support PSYNC and replication offsets. */
3147     if (server.masterhost && server.master &&
3148         !(server.master->flags & CLIENT_PRE_PSYNC))
3149         replicationSendAck();
3150 
3151     /* If we have attached slaves, PING them from time to time.
3152      * So slaves can implement an explicit timeout to masters, and will
3153      * be able to detect a link disconnection even if the TCP connection
3154      * will not actually go down. */
3155     listIter li;
3156     listNode *ln;
3157     robj *ping_argv[1];
3158 
3159     /* First, send PING according to ping_slave_period. */
3160     if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
3161         listLength(server.slaves))
3162     {
3163         /* Note that we don't send the PING if the clients are paused during
3164          * a Redis Cluster manual failover: the PING we send will otherwise
3165          * alter the replication offsets of master and slave, and will no longer
3166          * match the one stored into 'mf_master_offset' state. */
3167         int manual_failover_in_progress =
3168             server.cluster_enabled &&
3169             server.cluster->mf_end &&
3170             clientsArePaused();
3171 
3172         if (!manual_failover_in_progress) {
3173             ping_argv[0] = createStringObject("PING",4);
3174             replicationFeedSlaves(server.slaves, server.slaveseldb,
3175                 ping_argv, 1);
3176             decrRefCount(ping_argv[0]);
3177         }
3178     }
3179 
3180     /* Second, send a newline to all the slaves in pre-synchronization
3181      * stage, that is, slaves waiting for the master to create the RDB file.
3182      *
3183      * Also send the a newline to all the chained slaves we have, if we lost
3184      * connection from our master, to keep the slaves aware that their
3185      * master is online. This is needed since sub-slaves only receive proxied
3186      * data from top-level masters, so there is no explicit pinging in order
3187      * to avoid altering the replication offsets. This special out of band
3188      * pings (newlines) can be sent, they will have no effect in the offset.
3189      *
3190      * The newline will be ignored by the slave but will refresh the
3191      * last interaction timer preventing a timeout. In this case we ignore the
3192      * ping period and refresh the connection once per second since certain
3193      * timeouts are set at a few seconds (example: PSYNC response). */
3194     listRewind(server.slaves,&li);
3195     while((ln = listNext(&li))) {
3196         client *slave = ln->value;
3197 
3198         int is_presync =
3199             (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
3200             (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
3201              server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
3202 
3203         if (is_presync) {
3204             connWrite(slave->conn, "\n", 1);
3205         }
3206     }
3207 
3208     /* Disconnect timedout slaves. */
3209     if (listLength(server.slaves)) {
3210         listIter li;
3211         listNode *ln;
3212 
3213         listRewind(server.slaves,&li);
3214         while((ln = listNext(&li))) {
3215             client *slave = ln->value;
3216 
3217             if (slave->replstate == SLAVE_STATE_ONLINE) {
3218                 if (slave->flags & CLIENT_PRE_PSYNC)
3219                     continue;
3220                 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
3221                     serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
3222                           replicationGetSlaveName(slave));
3223                     freeClient(slave);
3224                     continue;
3225                 }
3226             }
3227             /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
3228              * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
3229              * from terminating. */
3230             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
3231                 if (slave->repl_last_partial_write != 0 &&
3232                     (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
3233                 {
3234                     serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
3235                           replicationGetSlaveName(slave));
3236                     freeClient(slave);
3237                     continue;
3238                 }
3239             }
3240         }
3241     }
3242 
3243     /* If this is a master without attached slaves and there is a replication
3244      * backlog active, in order to reclaim memory we can free it after some
3245      * (configured) time. Note that this cannot be done for slaves: slaves
3246      * without sub-slaves attached should still accumulate data into the
3247      * backlog, in order to reply to PSYNC queries if they are turned into
3248      * masters after a failover. */
3249     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
3250         server.repl_backlog && server.masterhost == NULL)
3251     {
3252         time_t idle = server.unixtime - server.repl_no_slaves_since;
3253 
3254         if (idle > server.repl_backlog_time_limit) {
3255             /* When we free the backlog, we always use a new
3256              * replication ID and clear the ID2. This is needed
3257              * because when there is no backlog, the master_repl_offset
3258              * is not updated, but we would still retain our replication
3259              * ID, leading to the following problem:
3260              *
3261              * 1. We are a master instance.
3262              * 2. Our slave is promoted to master. It's repl-id-2 will
3263              *    be the same as our repl-id.
3264              * 3. We, yet as master, receive some updates, that will not
3265              *    increment the master_repl_offset.
3266              * 4. Later we are turned into a slave, connect to the new
3267              *    master that will accept our PSYNC request by second
3268              *    replication ID, but there will be data inconsistency
3269              *    because we received writes. */
3270             changeReplicationId();
3271             clearReplicationId2();
3272             freeReplicationBacklog();
3273             serverLog(LL_NOTICE,
3274                 "Replication backlog freed after %d seconds "
3275                 "without connected replicas.",
3276                 (int) server.repl_backlog_time_limit);
3277         }
3278     }
3279 
3280     /* If AOF is disabled and we no longer have attached slaves, we can
3281      * free our Replication Script Cache as there is no need to propagate
3282      * EVALSHA at all. */
3283     if (listLength(server.slaves) == 0 &&
3284         server.aof_state == AOF_OFF &&
3285         listLength(server.repl_scriptcache_fifo) != 0)
3286     {
3287         replicationScriptCacheFlush();
3288     }
3289 
3290     /* Start a BGSAVE good for replication if we have slaves in
3291      * WAIT_BGSAVE_START state.
3292      *
3293      * In case of diskless replication, we make sure to wait the specified
3294      * number of seconds (according to configuration) so that other slaves
3295      * have the time to arrive before we start streaming. */
3296     if (!hasActiveChildProcess()) {
3297         time_t idle, max_idle = 0;
3298         int slaves_waiting = 0;
3299         int mincapa = -1;
3300         listNode *ln;
3301         listIter li;
3302 
3303         listRewind(server.slaves,&li);
3304         while((ln = listNext(&li))) {
3305             client *slave = ln->value;
3306             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
3307                 idle = server.unixtime - slave->lastinteraction;
3308                 if (idle > max_idle) max_idle = idle;
3309                 slaves_waiting++;
3310                 mincapa = (mincapa == -1) ? slave->slave_capa :
3311                                             (mincapa & slave->slave_capa);
3312             }
3313         }
3314 
3315         if (slaves_waiting &&
3316             (!server.repl_diskless_sync ||
3317              max_idle > server.repl_diskless_sync_delay))
3318         {
3319             /* Start the BGSAVE. The called function may start a
3320              * BGSAVE with socket target or disk target depending on the
3321              * configuration and slaves capabilities. */
3322             startBgsaveForReplication(mincapa);
3323         }
3324     }
3325 
3326     /* Remove the RDB file used for replication if Redis is not running
3327      * with any persistence. */
3328     removeRDBUsedToSyncReplicas();
3329 
3330     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
3331     refreshGoodSlavesCount();
3332     replication_cron_loops++; /* Incremented with frequency 1 HZ. */
3333 }
3334