1 /* Asynchronous replication implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31
32 #include "server.h"
33 #include "cluster.h"
34 #include "bio.h"
35
36 #include <sys/time.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include <sys/socket.h>
40 #include <sys/stat.h>
41
42 void replicationDiscardCachedMaster(void);
43 void replicationResurrectCachedMaster(connection *conn);
44 void replicationSendAck(void);
45 void putSlaveOnline(client *slave);
46 int cancelReplicationHandshake(void);
47
48 /* We take a global flag to remember if this instance generated an RDB
49 * because of replication, so that we can remove the RDB file in case
50 * the instance is configured to have no persistence. */
51 int RDBGeneratedByReplication = 0;
52
53 /* --------------------------- Utility functions ---------------------------- */
54
55 /* Return the pointer to a string representing the slave ip:listening_port
56 * pair. Mostly useful for logging, since we want to log a slave using its
57 * IP address and its listening port which is more clear for the user, for
58 * example: "Closing connection with replica 10.1.2.3:6380". */
replicationGetSlaveName(client * c)59 char *replicationGetSlaveName(client *c) {
60 static char buf[NET_PEER_ID_LEN];
61 char ip[NET_IP_STR_LEN];
62
63 ip[0] = '\0';
64 buf[0] = '\0';
65 if (c->slave_ip[0] != '\0' ||
66 connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
67 {
68 /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
69 if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
70
71 if (c->slave_listening_port)
72 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
73 else
74 snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip);
75 } else {
76 snprintf(buf,sizeof(buf),"client id #%llu",
77 (unsigned long long) c->id);
78 }
79 return buf;
80 }
81
82 /* Plain unlink() can block for quite some time in order to actually apply
83 * the file deletion to the filesystem. This call removes the file in a
84 * background thread instead. We actually just do close() in the thread,
85 * by using the fact that if there is another instance of the same file open,
86 * the foreground unlink() will only remove the fs name, and deleting the
87 * file's storage space will only happen once the last reference is lost. */
bg_unlink(const char * filename)88 int bg_unlink(const char *filename) {
89 int fd = open(filename,O_RDONLY|O_NONBLOCK);
90 if (fd == -1) {
91 /* Can't open the file? Fall back to unlinking in the main thread. */
92 return unlink(filename);
93 } else {
94 /* The following unlink() removes the name but doesn't free the
95 * file contents because a process still has it open. */
96 int retval = unlink(filename);
97 if (retval == -1) {
98 /* If we got an unlink error, we just return it, closing the
99 * new reference we have to the file. */
100 int old_errno = errno;
101 close(fd); /* This would overwrite our errno. So we saved it. */
102 errno = old_errno;
103 return -1;
104 }
105 bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)fd,NULL,NULL);
106 return 0; /* Success. */
107 }
108 }
109
110 /* ---------------------------------- MASTER -------------------------------- */
111
createReplicationBacklog(void)112 void createReplicationBacklog(void) {
113 serverAssert(server.repl_backlog == NULL);
114 server.repl_backlog = zmalloc(server.repl_backlog_size);
115 server.repl_backlog_histlen = 0;
116 server.repl_backlog_idx = 0;
117
118 /* We don't have any data inside our buffer, but virtually the first
119 * byte we have is the next byte that will be generated for the
120 * replication stream. */
121 server.repl_backlog_off = server.master_repl_offset+1;
122 }
123
124 /* This function is called when the user modifies the replication backlog
125 * size at runtime. It is up to the function to both update the
126 * server.repl_backlog_size and to resize the buffer and setup it so that
127 * it contains the same data as the previous one (possibly less data, but
128 * the most recent bytes, or the same data and more free space in case the
129 * buffer is enlarged). */
resizeReplicationBacklog(long long newsize)130 void resizeReplicationBacklog(long long newsize) {
131 if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
132 newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
133 if (server.repl_backlog_size == newsize) return;
134
135 server.repl_backlog_size = newsize;
136 if (server.repl_backlog != NULL) {
137 /* What we actually do is to flush the old buffer and realloc a new
138 * empty one. It will refill with new data incrementally.
139 * The reason is that copying a few gigabytes adds latency and even
140 * worse often we need to alloc additional space before freeing the
141 * old buffer. */
142 zfree(server.repl_backlog);
143 server.repl_backlog = zmalloc(server.repl_backlog_size);
144 server.repl_backlog_histlen = 0;
145 server.repl_backlog_idx = 0;
146 /* Next byte we have is... the next since the buffer is empty. */
147 server.repl_backlog_off = server.master_repl_offset+1;
148 }
149 }
150
freeReplicationBacklog(void)151 void freeReplicationBacklog(void) {
152 serverAssert(listLength(server.slaves) == 0);
153 zfree(server.repl_backlog);
154 server.repl_backlog = NULL;
155 }
156
157 /* Add data to the replication backlog.
158 * This function also increments the global replication offset stored at
159 * server.master_repl_offset, because there is no case where we want to feed
160 * the backlog without incrementing the offset. */
feedReplicationBacklog(void * ptr,size_t len)161 void feedReplicationBacklog(void *ptr, size_t len) {
162 unsigned char *p = ptr;
163
164 server.master_repl_offset += len;
165
166 /* This is a circular buffer, so write as much data we can at every
167 * iteration and rewind the "idx" index if we reach the limit. */
168 while(len) {
169 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
170 if (thislen > len) thislen = len;
171 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
172 server.repl_backlog_idx += thislen;
173 if (server.repl_backlog_idx == server.repl_backlog_size)
174 server.repl_backlog_idx = 0;
175 len -= thislen;
176 p += thislen;
177 server.repl_backlog_histlen += thislen;
178 }
179 if (server.repl_backlog_histlen > server.repl_backlog_size)
180 server.repl_backlog_histlen = server.repl_backlog_size;
181 /* Set the offset of the first byte we have in the backlog. */
182 server.repl_backlog_off = server.master_repl_offset -
183 server.repl_backlog_histlen + 1;
184 }
185
186 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
187 * as input. */
feedReplicationBacklogWithObject(robj * o)188 void feedReplicationBacklogWithObject(robj *o) {
189 char llstr[LONG_STR_SIZE];
190 void *p;
191 size_t len;
192
193 if (o->encoding == OBJ_ENCODING_INT) {
194 len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
195 p = llstr;
196 } else {
197 len = sdslen(o->ptr);
198 p = o->ptr;
199 }
200 feedReplicationBacklog(p,len);
201 }
202
203 /* Propagate write commands to slaves, and populate the replication backlog
204 * as well. This function is used if the instance is a master: we use
205 * the commands received by our clients in order to create the replication
206 * stream. Instead if the instance is a slave and has sub-slaves attached,
207 * we use replicationFeedSlavesFromMasterStream() */
replicationFeedSlaves(list * slaves,int dictid,robj ** argv,int argc)208 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
209 listNode *ln;
210 listIter li;
211 int j, len;
212 char llstr[LONG_STR_SIZE];
213
214 /* If the instance is not a top level master, return ASAP: we'll just proxy
215 * the stream of data we receive from our master instead, in order to
216 * propagate *identical* replication stream. In this way this slave can
217 * advertise the same replication ID as the master (since it shares the
218 * master replication history and has the same backlog and offsets). */
219 if (server.masterhost != NULL) return;
220
221 /* If there aren't slaves, and there is no backlog buffer to populate,
222 * we can return ASAP. */
223 if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
224
225 /* We can't have slaves attached and no backlog. */
226 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
227
228 /* Send SELECT command to every slave if needed. */
229 if (server.slaveseldb != dictid) {
230 robj *selectcmd;
231
232 /* For a few DBs we have pre-computed SELECT command. */
233 if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
234 selectcmd = shared.select[dictid];
235 } else {
236 int dictid_len;
237
238 dictid_len = ll2string(llstr,sizeof(llstr),dictid);
239 selectcmd = createObject(OBJ_STRING,
240 sdscatprintf(sdsempty(),
241 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
242 dictid_len, llstr));
243 }
244
245 /* Add the SELECT command into the backlog. */
246 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
247
248 /* Send it to slaves. */
249 listRewind(slaves,&li);
250 while((ln = listNext(&li))) {
251 client *slave = ln->value;
252 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
253 addReply(slave,selectcmd);
254 }
255
256 if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
257 decrRefCount(selectcmd);
258 }
259 server.slaveseldb = dictid;
260
261 /* Write the command to the replication backlog if any. */
262 if (server.repl_backlog) {
263 char aux[LONG_STR_SIZE+3];
264
265 /* Add the multi bulk reply length. */
266 aux[0] = '*';
267 len = ll2string(aux+1,sizeof(aux)-1,argc);
268 aux[len+1] = '\r';
269 aux[len+2] = '\n';
270 feedReplicationBacklog(aux,len+3);
271
272 for (j = 0; j < argc; j++) {
273 long objlen = stringObjectLen(argv[j]);
274
275 /* We need to feed the buffer with the object as a bulk reply
276 * not just as a plain string, so create the $..CRLF payload len
277 * and add the final CRLF */
278 aux[0] = '$';
279 len = ll2string(aux+1,sizeof(aux)-1,objlen);
280 aux[len+1] = '\r';
281 aux[len+2] = '\n';
282 feedReplicationBacklog(aux,len+3);
283 feedReplicationBacklogWithObject(argv[j]);
284 feedReplicationBacklog(aux+len+1,2);
285 }
286 }
287
288 /* Write the command to every slave. */
289 listRewind(slaves,&li);
290 while((ln = listNext(&li))) {
291 client *slave = ln->value;
292
293 /* Don't feed slaves that are still waiting for BGSAVE to start. */
294 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
295
296 /* Feed slaves that are waiting for the initial SYNC (so these commands
297 * are queued in the output buffer until the initial SYNC completes),
298 * or are already in sync with the master. */
299
300 /* Add the multi bulk length. */
301 addReplyArrayLen(slave,argc);
302
303 /* Finally any additional argument that was not stored inside the
304 * static buffer if any (from j to argc). */
305 for (j = 0; j < argc; j++)
306 addReplyBulk(slave,argv[j]);
307 }
308 }
309
310 /* This is a debugging function that gets called when we detect something
311 * wrong with the replication protocol: the goal is to peek into the
312 * replication backlog and show a few final bytes to make simpler to
313 * guess what kind of bug it could be. */
showLatestBacklog(void)314 void showLatestBacklog(void) {
315 if (server.repl_backlog == NULL) return;
316
317 long long dumplen = 256;
318 if (server.repl_backlog_histlen < dumplen)
319 dumplen = server.repl_backlog_histlen;
320
321 /* Identify the first byte to dump. */
322 long long idx =
323 (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
324 server.repl_backlog_size;
325
326 /* Scan the circular buffer to collect 'dumplen' bytes. */
327 sds dump = sdsempty();
328 while(dumplen) {
329 long long thislen =
330 ((server.repl_backlog_size - idx) < dumplen) ?
331 (server.repl_backlog_size - idx) : dumplen;
332
333 dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
334 dumplen -= thislen;
335 idx = 0;
336 }
337
338 /* Finally log such bytes: this is vital debugging info to
339 * understand what happened. */
340 serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
341 sdsfree(dump);
342 }
343
344 /* This function is used in order to proxy what we receive from our master
345 * to our sub-slaves. */
346 #include <ctype.h>
replicationFeedSlavesFromMasterStream(list * slaves,char * buf,size_t buflen)347 void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
348 listNode *ln;
349 listIter li;
350
351 /* Debugging: this is handy to see the stream sent from master
352 * to slaves. Disabled with if(0). */
353 if (0) {
354 printf("%zu:",buflen);
355 for (size_t j = 0; j < buflen; j++) {
356 printf("%c", isprint(buf[j]) ? buf[j] : '.');
357 }
358 printf("\n");
359 }
360
361 if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
362 listRewind(slaves,&li);
363 while((ln = listNext(&li))) {
364 client *slave = ln->value;
365
366 /* Don't feed slaves that are still waiting for BGSAVE to start. */
367 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
368 addReplyProto(slave,buf,buflen);
369 }
370 }
371
replicationFeedMonitors(client * c,list * monitors,int dictid,robj ** argv,int argc)372 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
373 listNode *ln;
374 listIter li;
375 int j;
376 sds cmdrepr = sdsnew("+");
377 robj *cmdobj;
378 struct timeval tv;
379
380 gettimeofday(&tv,NULL);
381 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
382 if (c->flags & CLIENT_LUA) {
383 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
384 } else if (c->flags & CLIENT_UNIX_SOCKET) {
385 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
386 } else {
387 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
388 }
389
390 for (j = 0; j < argc; j++) {
391 if (argv[j]->encoding == OBJ_ENCODING_INT) {
392 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
393 } else {
394 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
395 sdslen(argv[j]->ptr));
396 }
397 if (j != argc-1)
398 cmdrepr = sdscatlen(cmdrepr," ",1);
399 }
400 cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
401 cmdobj = createObject(OBJ_STRING,cmdrepr);
402
403 listRewind(monitors,&li);
404 while((ln = listNext(&li))) {
405 client *monitor = ln->value;
406 addReply(monitor,cmdobj);
407 }
408 decrRefCount(cmdobj);
409 }
410
411 /* Feed the slave 'c' with the replication backlog starting from the
412 * specified 'offset' up to the end of the backlog. */
addReplyReplicationBacklog(client * c,long long offset)413 long long addReplyReplicationBacklog(client *c, long long offset) {
414 long long j, skip, len;
415
416 serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
417
418 if (server.repl_backlog_histlen == 0) {
419 serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
420 return 0;
421 }
422
423 serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
424 server.repl_backlog_size);
425 serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
426 server.repl_backlog_off);
427 serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
428 server.repl_backlog_histlen);
429 serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
430 server.repl_backlog_idx);
431
432 /* Compute the amount of bytes we need to discard. */
433 skip = offset - server.repl_backlog_off;
434 serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
435
436 /* Point j to the oldest byte, that is actually our
437 * server.repl_backlog_off byte. */
438 j = (server.repl_backlog_idx +
439 (server.repl_backlog_size-server.repl_backlog_histlen)) %
440 server.repl_backlog_size;
441 serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
442
443 /* Discard the amount of data to seek to the specified 'offset'. */
444 j = (j + skip) % server.repl_backlog_size;
445
446 /* Feed slave with data. Since it is a circular buffer we have to
447 * split the reply in two parts if we are cross-boundary. */
448 len = server.repl_backlog_histlen - skip;
449 serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
450 while(len) {
451 long long thislen =
452 ((server.repl_backlog_size - j) < len) ?
453 (server.repl_backlog_size - j) : len;
454
455 serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
456 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
457 len -= thislen;
458 j = 0;
459 }
460 return server.repl_backlog_histlen - skip;
461 }
462
463 /* Return the offset to provide as reply to the PSYNC command received
464 * from the slave. The returned value is only valid immediately after
465 * the BGSAVE process started and before executing any other command
466 * from clients. */
getPsyncInitialOffset(void)467 long long getPsyncInitialOffset(void) {
468 return server.master_repl_offset;
469 }
470
471 /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
472 * as a side effect setup the slave for a full sync in different ways:
473 *
474 * 1) Remember, into the slave client structure, the replication offset
475 * we sent here, so that if new slaves will later attach to the same
476 * background RDB saving process (by duplicating this client output
477 * buffer), we can get the right offset from this slave.
478 * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
479 * we start accumulating differences from this point.
480 * 3) Force the replication stream to re-emit a SELECT statement so
481 * the new slave incremental differences will start selecting the
482 * right database number.
483 *
484 * Normally this function should be called immediately after a successful
485 * BGSAVE for replication was started, or when there is one already in
486 * progress that we attached our slave to. */
replicationSetupSlaveForFullResync(client * slave,long long offset)487 int replicationSetupSlaveForFullResync(client *slave, long long offset) {
488 char buf[128];
489 int buflen;
490
491 slave->psync_initial_offset = offset;
492 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
493 /* We are going to accumulate the incremental changes for this
494 * slave as well. Set slaveseldb to -1 in order to force to re-emit
495 * a SELECT statement in the replication stream. */
496 server.slaveseldb = -1;
497
498 /* Don't send this reply to slaves that approached us with
499 * the old SYNC command. */
500 if (!(slave->flags & CLIENT_PRE_PSYNC)) {
501 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
502 server.replid,offset);
503 if (connWrite(slave->conn,buf,buflen) != buflen) {
504 freeClientAsync(slave);
505 return C_ERR;
506 }
507 }
508 return C_OK;
509 }
510
511 /* This function handles the PSYNC command from the point of view of a
512 * master receiving a request for partial resynchronization.
513 *
514 * On success return C_OK, otherwise C_ERR is returned and we proceed
515 * with the usual full resync. */
masterTryPartialResynchronization(client * c)516 int masterTryPartialResynchronization(client *c) {
517 long long psync_offset, psync_len;
518 char *master_replid = c->argv[1]->ptr;
519 char buf[128];
520 int buflen;
521
522 /* Parse the replication offset asked by the slave. Go to full sync
523 * on parse error: this should never happen but we try to handle
524 * it in a robust way compared to aborting. */
525 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
526 C_OK) goto need_full_resync;
527
528 /* Is the replication ID of this master the same advertised by the wannabe
529 * slave via PSYNC? If the replication ID changed this master has a
530 * different replication history, and there is no way to continue.
531 *
532 * Note that there are two potentially valid replication IDs: the ID1
533 * and the ID2. The ID2 however is only valid up to a specific offset. */
534 if (strcasecmp(master_replid, server.replid) &&
535 (strcasecmp(master_replid, server.replid2) ||
536 psync_offset > server.second_replid_offset))
537 {
538 /* Replid "?" is used by slaves that want to force a full resync. */
539 if (master_replid[0] != '?') {
540 if (strcasecmp(master_replid, server.replid) &&
541 strcasecmp(master_replid, server.replid2))
542 {
543 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
544 "Replication ID mismatch (Replica asked for '%s', my "
545 "replication IDs are '%s' and '%s')",
546 master_replid, server.replid, server.replid2);
547 } else {
548 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
549 "Requested offset for second ID was %lld, but I can reply "
550 "up to %lld", psync_offset, server.second_replid_offset);
551 }
552 } else {
553 serverLog(LL_NOTICE,"Full resync requested by replica %s",
554 replicationGetSlaveName(c));
555 }
556 goto need_full_resync;
557 }
558
559 /* We still have the data our slave is asking for? */
560 if (!server.repl_backlog ||
561 psync_offset < server.repl_backlog_off ||
562 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
563 {
564 serverLog(LL_NOTICE,
565 "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
566 if (psync_offset > server.master_repl_offset) {
567 serverLog(LL_WARNING,
568 "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
569 }
570 goto need_full_resync;
571 }
572
573 /* If we reached this point, we are able to perform a partial resync:
574 * 1) Set client state to make it a slave.
575 * 2) Inform the client we can continue with +CONTINUE
576 * 3) Send the backlog data (from the offset to the end) to the slave. */
577 c->flags |= CLIENT_SLAVE;
578 c->replstate = SLAVE_STATE_ONLINE;
579 c->repl_ack_time = server.unixtime;
580 c->repl_put_online_on_ack = 0;
581 listAddNodeTail(server.slaves,c);
582 /* We can't use the connection buffers since they are used to accumulate
583 * new commands at this stage. But we are sure the socket send buffer is
584 * empty so this write will never fail actually. */
585 if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
586 buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
587 } else {
588 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
589 }
590 if (connWrite(c->conn,buf,buflen) != buflen) {
591 freeClientAsync(c);
592 return C_OK;
593 }
594 psync_len = addReplyReplicationBacklog(c,psync_offset);
595 serverLog(LL_NOTICE,
596 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
597 replicationGetSlaveName(c),
598 psync_len, psync_offset);
599 /* Note that we don't need to set the selected DB at server.slaveseldb
600 * to -1 to force the master to emit SELECT, since the slave already
601 * has this state from the previous connection with the master. */
602
603 refreshGoodSlavesCount();
604
605 /* Fire the replica change modules event. */
606 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
607 REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
608 NULL);
609
610 return C_OK; /* The caller can return, no full resync needed. */
611
612 need_full_resync:
613 /* We need a full resync for some reason... Note that we can't
614 * reply to PSYNC right now if a full SYNC is needed. The reply
615 * must include the master offset at the time the RDB file we transfer
616 * is generated, so we need to delay the reply to that moment. */
617 return C_ERR;
618 }
619
620 /* Start a BGSAVE for replication goals, which is, selecting the disk or
621 * socket target depending on the configuration, and making sure that
622 * the script cache is flushed before to start.
623 *
624 * The mincapa argument is the bitwise AND among all the slaves capabilities
625 * of the slaves waiting for this BGSAVE, so represents the slave capabilities
626 * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
627 *
628 * Side effects, other than starting a BGSAVE:
629 *
630 * 1) Handle the slaves in WAIT_START state, by preparing them for a full
631 * sync if the BGSAVE was successfully started, or sending them an error
632 * and dropping them from the list of slaves.
633 *
634 * 2) Flush the Lua scripting script cache if the BGSAVE was actually
635 * started.
636 *
637 * Returns C_OK on success or C_ERR otherwise. */
startBgsaveForReplication(int mincapa)638 int startBgsaveForReplication(int mincapa) {
639 int retval;
640 int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
641 listIter li;
642 listNode *ln;
643
644 serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
645 socket_target ? "replicas sockets" : "disk");
646
647 rdbSaveInfo rsi, *rsiptr;
648 rsiptr = rdbPopulateSaveInfo(&rsi);
649 /* Only do rdbSave* when rsiptr is not NULL,
650 * otherwise slave will miss repl-stream-db. */
651 if (rsiptr) {
652 if (socket_target)
653 retval = rdbSaveToSlavesSockets(rsiptr);
654 else
655 retval = rdbSaveBackground(server.rdb_filename,rsiptr);
656 } else {
657 serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
658 retval = C_ERR;
659 }
660
661 /* If we succeeded to start a BGSAVE with disk target, let's remember
662 * this fact, so that we can later delete the file if needed. Note
663 * that we don't set the flag to 1 if the feature is disabled, otherwise
664 * it would never be cleared: the file is not deleted. This way if
665 * the user enables it later with CONFIG SET, we are fine. */
666 if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
667 RDBGeneratedByReplication = 1;
668
669 /* If we failed to BGSAVE, remove the slaves waiting for a full
670 * resynchronization from the list of slaves, inform them with
671 * an error about what happened, close the connection ASAP. */
672 if (retval == C_ERR) {
673 serverLog(LL_WARNING,"BGSAVE for replication failed");
674 listRewind(server.slaves,&li);
675 while((ln = listNext(&li))) {
676 client *slave = ln->value;
677
678 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
679 slave->replstate = REPL_STATE_NONE;
680 slave->flags &= ~CLIENT_SLAVE;
681 listDelNode(server.slaves,ln);
682 addReplyError(slave,
683 "BGSAVE failed, replication can't continue");
684 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
685 }
686 }
687 return retval;
688 }
689
690 /* If the target is socket, rdbSaveToSlavesSockets() already setup
691 * the slaves for a full resync. Otherwise for disk target do it now.*/
692 if (!socket_target) {
693 listRewind(server.slaves,&li);
694 while((ln = listNext(&li))) {
695 client *slave = ln->value;
696
697 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
698 replicationSetupSlaveForFullResync(slave,
699 getPsyncInitialOffset());
700 }
701 }
702 }
703
704 /* Flush the script cache, since we need that slave differences are
705 * accumulated without requiring slaves to match our cached scripts. */
706 if (retval == C_OK) replicationScriptCacheFlush();
707 return retval;
708 }
709
710 /* SYNC and PSYNC command implementation. */
syncCommand(client * c)711 void syncCommand(client *c) {
712 /* ignore SYNC if already slave or in monitor mode */
713 if (c->flags & CLIENT_SLAVE) return;
714
715 /* Refuse SYNC requests if we are a slave but the link with our master
716 * is not ok... */
717 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
718 addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
719 return;
720 }
721
722 /* SYNC can't be issued when the server has pending data to send to
723 * the client about already issued commands. We need a fresh reply
724 * buffer registering the differences between the BGSAVE and the current
725 * dataset, so that we can copy to other slaves if needed. */
726 if (clientHasPendingReplies(c)) {
727 addReplyError(c,"SYNC and PSYNC are invalid with pending output");
728 return;
729 }
730
731 serverLog(LL_NOTICE,"Replica %s asks for synchronization",
732 replicationGetSlaveName(c));
733
734 /* Try a partial resynchronization if this is a PSYNC command.
735 * If it fails, we continue with usual full resynchronization, however
736 * when this happens masterTryPartialResynchronization() already
737 * replied with:
738 *
739 * +FULLRESYNC <replid> <offset>
740 *
741 * So the slave knows the new replid and offset to try a PSYNC later
742 * if the connection with the master is lost. */
743 if (!strcasecmp(c->argv[0]->ptr,"psync")) {
744 if (masterTryPartialResynchronization(c) == C_OK) {
745 server.stat_sync_partial_ok++;
746 return; /* No full resync needed, return. */
747 } else {
748 char *master_replid = c->argv[1]->ptr;
749
750 /* Increment stats for failed PSYNCs, but only if the
751 * replid is not "?", as this is used by slaves to force a full
752 * resync on purpose when they are not albe to partially
753 * resync. */
754 if (master_replid[0] != '?') server.stat_sync_partial_err++;
755 }
756 } else {
757 /* If a slave uses SYNC, we are dealing with an old implementation
758 * of the replication protocol (like redis-cli --slave). Flag the client
759 * so that we don't expect to receive REPLCONF ACK feedbacks. */
760 c->flags |= CLIENT_PRE_PSYNC;
761 }
762
763 /* Full resynchronization. */
764 server.stat_sync_full++;
765
766 /* Setup the slave as one waiting for BGSAVE to start. The following code
767 * paths will change the state if we handle the slave differently. */
768 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
769 if (server.repl_disable_tcp_nodelay)
770 connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
771 c->repldbfd = -1;
772 c->flags |= CLIENT_SLAVE;
773 listAddNodeTail(server.slaves,c);
774
775 /* Create the replication backlog if needed. */
776 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
777 /* When we create the backlog from scratch, we always use a new
778 * replication ID and clear the ID2, since there is no valid
779 * past history. */
780 changeReplicationId();
781 clearReplicationId2();
782 createReplicationBacklog();
783 serverLog(LL_NOTICE,"Replication backlog created, my new "
784 "replication IDs are '%s' and '%s'",
785 server.replid, server.replid2);
786 }
787
788 /* CASE 1: BGSAVE is in progress, with disk target. */
789 if (server.rdb_child_pid != -1 &&
790 server.rdb_child_type == RDB_CHILD_TYPE_DISK)
791 {
792 /* Ok a background save is in progress. Let's check if it is a good
793 * one for replication, i.e. if there is another slave that is
794 * registering differences since the server forked to save. */
795 client *slave;
796 listNode *ln;
797 listIter li;
798
799 listRewind(server.slaves,&li);
800 while((ln = listNext(&li))) {
801 slave = ln->value;
802 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
803 }
804 /* To attach this slave, we check that it has at least all the
805 * capabilities of the slave that triggered the current BGSAVE. */
806 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
807 /* Perfect, the server is already registering differences for
808 * another slave. Set the right state, and copy the buffer. */
809 copyClientOutputBuffer(c,slave);
810 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
811 serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
812 } else {
813 /* No way, we need to wait for the next BGSAVE in order to
814 * register differences. */
815 serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
816 }
817
818 /* CASE 2: BGSAVE is in progress, with socket target. */
819 } else if (server.rdb_child_pid != -1 &&
820 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
821 {
822 /* There is an RDB child process but it is writing directly to
823 * children sockets. We need to wait for the next BGSAVE
824 * in order to synchronize. */
825 serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
826
827 /* CASE 3: There is no BGSAVE is progress. */
828 } else {
829 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
830 /* Diskless replication RDB child is created inside
831 * replicationCron() since we want to delay its start a
832 * few seconds to wait for more slaves to arrive. */
833 if (server.repl_diskless_sync_delay)
834 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
835 } else {
836 /* Target is disk (or the slave is not capable of supporting
837 * diskless replication) and we don't have a BGSAVE in progress,
838 * let's start one. */
839 if (!hasActiveChildProcess()) {
840 startBgsaveForReplication(c->slave_capa);
841 } else {
842 serverLog(LL_NOTICE,
843 "No BGSAVE in progress, but another BG operation is active. "
844 "BGSAVE for replication delayed");
845 }
846 }
847 }
848 return;
849 }
850
851 /* REPLCONF <option> <value> <option> <value> ...
852 * This command is used by a slave in order to configure the replication
853 * process before starting it with the SYNC command.
854 *
855 * Currently the only use of this command is to communicate to the master
856 * what is the listening port of the Slave redis instance, so that the
857 * master can accurately list slaves and their listening ports in
858 * the INFO output.
859 *
860 * In the future the same command can be used in order to configure
861 * the replication to initiate an incremental replication instead of a
862 * full resync. */
replconfCommand(client * c)863 void replconfCommand(client *c) {
864 int j;
865
866 if ((c->argc % 2) == 0) {
867 /* Number of arguments must be odd to make sure that every
868 * option has a corresponding value. */
869 addReply(c,shared.syntaxerr);
870 return;
871 }
872
873 /* Process every option-value pair. */
874 for (j = 1; j < c->argc; j+=2) {
875 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
876 long port;
877
878 if ((getLongFromObjectOrReply(c,c->argv[j+1],
879 &port,NULL) != C_OK))
880 return;
881 c->slave_listening_port = port;
882 } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
883 sds ip = c->argv[j+1]->ptr;
884 if (sdslen(ip) < sizeof(c->slave_ip)) {
885 memcpy(c->slave_ip,ip,sdslen(ip)+1);
886 } else {
887 addReplyErrorFormat(c,"REPLCONF ip-address provided by "
888 "replica instance is too long: %zd bytes", sdslen(ip));
889 return;
890 }
891 } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
892 /* Ignore capabilities not understood by this master. */
893 if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
894 c->slave_capa |= SLAVE_CAPA_EOF;
895 else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
896 c->slave_capa |= SLAVE_CAPA_PSYNC2;
897 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
898 /* REPLCONF ACK is used by slave to inform the master the amount
899 * of replication stream that it processed so far. It is an
900 * internal only command that normal clients should never use. */
901 long long offset;
902
903 if (!(c->flags & CLIENT_SLAVE)) return;
904 if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
905 return;
906 if (offset > c->repl_ack_off)
907 c->repl_ack_off = offset;
908 c->repl_ack_time = server.unixtime;
909 /* If this was a diskless replication, we need to really put
910 * the slave online when the first ACK is received (which
911 * confirms slave is online and ready to get more data). This
912 * allows for simpler and less CPU intensive EOF detection
913 * when streaming RDB files.
914 * There's a chance the ACK got to us before we detected that the
915 * bgsave is done (since that depends on cron ticks), so run a
916 * quick check first (instead of waiting for the next ACK. */
917 if (server.rdb_child_pid != -1 && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
918 checkChildrenDone();
919 if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
920 putSlaveOnline(c);
921 /* Note: this command does not reply anything! */
922 return;
923 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
924 /* REPLCONF GETACK is used in order to request an ACK ASAP
925 * to the slave. */
926 if (server.masterhost && server.master) replicationSendAck();
927 return;
928 } else {
929 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
930 (char*)c->argv[j]->ptr);
931 return;
932 }
933 }
934 addReply(c,shared.ok);
935 }
936
937 /* This function puts a replica in the online state, and should be called just
938 * after a replica received the RDB file for the initial synchronization, and
939 * we are finally ready to send the incremental stream of commands.
940 *
941 * It does a few things:
942 *
943 * 1) Put the slave in ONLINE state. Note that the function may also be called
944 * for a replicas that are already in ONLINE state, but having the flag
945 * repl_put_online_on_ack set to true: we still have to install the write
946 * handler in that case. This function will take care of that.
947 * 2) Make sure the writable event is re-installed, since calling the SYNC
948 * command disables it, so that we can accumulate output buffer without
949 * sending it to the replica.
950 * 3) Update the count of "good replicas". */
putSlaveOnline(client * slave)951 void putSlaveOnline(client *slave) {
952 slave->replstate = SLAVE_STATE_ONLINE;
953 slave->repl_put_online_on_ack = 0;
954 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
955 if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
956 serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
957 freeClient(slave);
958 return;
959 }
960 refreshGoodSlavesCount();
961 /* Fire the replica change modules event. */
962 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
963 REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
964 NULL);
965 serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
966 replicationGetSlaveName(slave));
967 }
968
969 /* We call this function periodically to remove an RDB file that was
970 * generated because of replication, in an instance that is otherwise
971 * without any persistence. We don't want instances without persistence
972 * to take RDB files around, this violates certain policies in certain
973 * environments. */
removeRDBUsedToSyncReplicas(void)974 void removeRDBUsedToSyncReplicas(void) {
975 /* If the feature is disabled, return ASAP but also clear the
976 * RDBGeneratedByReplication flag in case it was set. Otherwise if the
977 * feature was enabled, but gets disabled later with CONFIG SET, the
978 * flag may remain set to one: then next time the feature is re-enabled
979 * via CONFIG SET we have have it set even if no RDB was generated
980 * because of replication recently. */
981 if (!server.rdb_del_sync_files) {
982 RDBGeneratedByReplication = 0;
983 return;
984 }
985
986 if (allPersistenceDisabled() && RDBGeneratedByReplication) {
987 client *slave;
988 listNode *ln;
989 listIter li;
990
991 int delrdb = 1;
992 listRewind(server.slaves,&li);
993 while((ln = listNext(&li))) {
994 slave = ln->value;
995 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
996 slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END ||
997 slave->replstate == SLAVE_STATE_SEND_BULK)
998 {
999 delrdb = 0;
1000 break; /* No need to check the other replicas. */
1001 }
1002 }
1003 if (delrdb) {
1004 struct stat sb;
1005 if (lstat(server.rdb_filename,&sb) != -1) {
1006 RDBGeneratedByReplication = 0;
1007 serverLog(LL_NOTICE,
1008 "Removing the RDB file used to feed replicas "
1009 "in a persistence-less instance");
1010 bg_unlink(server.rdb_filename);
1011 }
1012 }
1013 }
1014 }
1015
sendBulkToSlave(connection * conn)1016 void sendBulkToSlave(connection *conn) {
1017 client *slave = connGetPrivateData(conn);
1018 char buf[PROTO_IOBUF_LEN];
1019 ssize_t nwritten, buflen;
1020
1021 /* Before sending the RDB file, we send the preamble as configured by the
1022 * replication process. Currently the preamble is just the bulk count of
1023 * the file in the form "$<length>\r\n". */
1024 if (slave->replpreamble) {
1025 nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
1026 if (nwritten == -1) {
1027 serverLog(LL_VERBOSE,
1028 "Write error sending RDB preamble to replica: %s",
1029 connGetLastError(conn));
1030 freeClient(slave);
1031 return;
1032 }
1033 server.stat_net_output_bytes += nwritten;
1034 sdsrange(slave->replpreamble,nwritten,-1);
1035 if (sdslen(slave->replpreamble) == 0) {
1036 sdsfree(slave->replpreamble);
1037 slave->replpreamble = NULL;
1038 /* fall through sending data. */
1039 } else {
1040 return;
1041 }
1042 }
1043
1044 /* If the preamble was already transferred, send the RDB bulk data. */
1045 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
1046 buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
1047 if (buflen <= 0) {
1048 serverLog(LL_WARNING,"Read error sending DB to replica: %s",
1049 (buflen == 0) ? "premature EOF" : strerror(errno));
1050 freeClient(slave);
1051 return;
1052 }
1053 if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
1054 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1055 serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1056 connGetLastError(conn));
1057 freeClient(slave);
1058 }
1059 return;
1060 }
1061 slave->repldboff += nwritten;
1062 server.stat_net_output_bytes += nwritten;
1063 if (slave->repldboff == slave->repldbsize) {
1064 close(slave->repldbfd);
1065 slave->repldbfd = -1;
1066 connSetWriteHandler(slave->conn,NULL);
1067 putSlaveOnline(slave);
1068 }
1069 }
1070
1071 /* Remove one write handler from the list of connections waiting to be writable
1072 * during rdb pipe transfer. */
rdbPipeWriteHandlerConnRemoved(struct connection * conn)1073 void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
1074 if (!connHasWriteHandler(conn))
1075 return;
1076 connSetWriteHandler(conn, NULL);
1077 client *slave = connGetPrivateData(conn);
1078 slave->repl_last_partial_write = 0;
1079 server.rdb_pipe_numconns_writing--;
1080 /* if there are no more writes for now for this conn, or write error: */
1081 if (server.rdb_pipe_numconns_writing == 0) {
1082 if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
1083 serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
1084 }
1085 }
1086 }
1087
1088 /* Called in diskless master during transfer of data from the rdb pipe, when
1089 * the replica becomes writable again. */
rdbPipeWriteHandler(struct connection * conn)1090 void rdbPipeWriteHandler(struct connection *conn) {
1091 serverAssert(server.rdb_pipe_bufflen>0);
1092 client *slave = connGetPrivateData(conn);
1093 int nwritten;
1094 if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
1095 server.rdb_pipe_bufflen - slave->repldboff)) == -1)
1096 {
1097 if (connGetState(conn) == CONN_STATE_CONNECTED)
1098 return; /* equivalent to EAGAIN */
1099 serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1100 connGetLastError(conn));
1101 freeClient(slave);
1102 return;
1103 } else {
1104 slave->repldboff += nwritten;
1105 server.stat_net_output_bytes += nwritten;
1106 if (slave->repldboff < server.rdb_pipe_bufflen) {
1107 slave->repl_last_partial_write = server.unixtime;
1108 return; /* more data to write.. */
1109 }
1110 }
1111 rdbPipeWriteHandlerConnRemoved(conn);
1112 }
1113
1114 /* Called in diskless master, when there's data to read from the child's rdb pipe */
rdbPipeReadHandler(struct aeEventLoop * eventLoop,int fd,void * clientData,int mask)1115 void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
1116 UNUSED(mask);
1117 UNUSED(clientData);
1118 UNUSED(eventLoop);
1119 int i;
1120 if (!server.rdb_pipe_buff)
1121 server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
1122 serverAssert(server.rdb_pipe_numconns_writing==0);
1123
1124 while (1) {
1125 server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
1126 if (server.rdb_pipe_bufflen < 0) {
1127 if (errno == EAGAIN || errno == EWOULDBLOCK)
1128 return;
1129 serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
1130 for (i=0; i < server.rdb_pipe_numconns; i++) {
1131 connection *conn = server.rdb_pipe_conns[i];
1132 if (!conn)
1133 continue;
1134 client *slave = connGetPrivateData(conn);
1135 freeClient(slave);
1136 server.rdb_pipe_conns[i] = NULL;
1137 }
1138 killRDBChild();
1139 return;
1140 }
1141
1142 if (server.rdb_pipe_bufflen == 0) {
1143 /* EOF - write end was closed. */
1144 int stillUp = 0;
1145 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1146 for (i=0; i < server.rdb_pipe_numconns; i++)
1147 {
1148 connection *conn = server.rdb_pipe_conns[i];
1149 if (!conn)
1150 continue;
1151 stillUp++;
1152 }
1153 serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
1154 /* Now that the replicas have finished reading, notify the child that it's safe to exit.
1155 * When the server detectes the child has exited, it can mark the replica as online, and
1156 * start streaming the replication buffers. */
1157 close(server.rdb_child_exit_pipe);
1158 server.rdb_child_exit_pipe = -1;
1159 return;
1160 }
1161
1162 int stillAlive = 0;
1163 for (i=0; i < server.rdb_pipe_numconns; i++)
1164 {
1165 int nwritten;
1166 connection *conn = server.rdb_pipe_conns[i];
1167 if (!conn)
1168 continue;
1169
1170 client *slave = connGetPrivateData(conn);
1171 if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
1172 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1173 serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
1174 connGetLastError(conn));
1175 freeClient(slave);
1176 server.rdb_pipe_conns[i] = NULL;
1177 continue;
1178 }
1179 /* An error and still in connected state, is equivalent to EAGAIN */
1180 slave->repldboff = 0;
1181 } else {
1182 slave->repldboff = nwritten;
1183 server.stat_net_output_bytes += nwritten;
1184 }
1185 /* If we were unable to write all the data to one of the replicas,
1186 * setup write handler (and disable pipe read handler, below) */
1187 if (nwritten != server.rdb_pipe_bufflen) {
1188 slave->repl_last_partial_write = server.unixtime;
1189 server.rdb_pipe_numconns_writing++;
1190 connSetWriteHandler(conn, rdbPipeWriteHandler);
1191 }
1192 stillAlive++;
1193 }
1194
1195 if (stillAlive == 0) {
1196 serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
1197 killRDBChild();
1198 }
1199 /* Remove the pipe read handler if at least one write handler was set. */
1200 if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
1201 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1202 break;
1203 }
1204 }
1205 }
1206
1207 /* This function is called at the end of every background saving,
1208 * or when the replication RDB transfer strategy is modified from
1209 * disk to socket or the other way around.
1210 *
1211 * The goal of this function is to handle slaves waiting for a successful
1212 * background saving in order to perform non-blocking synchronization, and
1213 * to schedule a new BGSAVE if there are slaves that attached while a
1214 * BGSAVE was in progress, but it was not a good one for replication (no
1215 * other slave was accumulating differences).
1216 *
1217 * The argument bgsaveerr is C_OK if the background saving succeeded
1218 * otherwise C_ERR is passed to the function.
1219 * The 'type' argument is the type of the child that terminated
1220 * (if it had a disk or socket target). */
updateSlavesWaitingBgsave(int bgsaveerr,int type)1221 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
1222 listNode *ln;
1223 int startbgsave = 0;
1224 int mincapa = -1;
1225 listIter li;
1226
1227 listRewind(server.slaves,&li);
1228 while((ln = listNext(&li))) {
1229 client *slave = ln->value;
1230
1231 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
1232 startbgsave = 1;
1233 mincapa = (mincapa == -1) ? slave->slave_capa :
1234 (mincapa & slave->slave_capa);
1235 } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
1236 struct redis_stat buf;
1237
1238 if (bgsaveerr != C_OK) {
1239 freeClient(slave);
1240 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
1241 continue;
1242 }
1243
1244 /* If this was an RDB on disk save, we have to prepare to send
1245 * the RDB from disk to the slave socket. Otherwise if this was
1246 * already an RDB -> Slaves socket transfer, used in the case of
1247 * diskless replication, our work is trivial, we can just put
1248 * the slave online. */
1249 if (type == RDB_CHILD_TYPE_SOCKET) {
1250 serverLog(LL_NOTICE,
1251 "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
1252 replicationGetSlaveName(slave));
1253 /* Note: we wait for a REPLCONF ACK message from the replica in
1254 * order to really put it online (install the write handler
1255 * so that the accumulated data can be transferred). However
1256 * we change the replication state ASAP, since our slave
1257 * is technically online now.
1258 *
1259 * So things work like that:
1260 *
1261 * 1. We end trasnferring the RDB file via socket.
1262 * 2. The replica is put ONLINE but the write handler
1263 * is not installed.
1264 * 3. The replica however goes really online, and pings us
1265 * back via REPLCONF ACK commands.
1266 * 4. Now we finally install the write handler, and send
1267 * the buffers accumulated so far to the replica.
1268 *
1269 * But why we do that? Because the replica, when we stream
1270 * the RDB directly via the socket, must detect the RDB
1271 * EOF (end of file), that is a special random string at the
1272 * end of the RDB (for streamed RDBs we don't know the length
1273 * in advance). Detecting such final EOF string is much
1274 * simpler and less CPU intensive if no more data is sent
1275 * after such final EOF. So we don't want to glue the end of
1276 * the RDB trasfer with the start of the other replication
1277 * data. */
1278 slave->replstate = SLAVE_STATE_ONLINE;
1279 slave->repl_put_online_on_ack = 1;
1280 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
1281 } else {
1282 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
1283 redis_fstat(slave->repldbfd,&buf) == -1) {
1284 freeClient(slave);
1285 serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
1286 continue;
1287 }
1288 slave->repldboff = 0;
1289 slave->repldbsize = buf.st_size;
1290 slave->replstate = SLAVE_STATE_SEND_BULK;
1291 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
1292 (unsigned long long) slave->repldbsize);
1293
1294 connSetWriteHandler(slave->conn,NULL);
1295 if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) {
1296 freeClient(slave);
1297 continue;
1298 }
1299 }
1300 }
1301 }
1302 if (startbgsave) startBgsaveForReplication(mincapa);
1303 }
1304
1305 /* Change the current instance replication ID with a new, random one.
1306 * This will prevent successful PSYNCs between this master and other
1307 * slaves, so the command should be called when something happens that
1308 * alters the current story of the dataset. */
changeReplicationId(void)1309 void changeReplicationId(void) {
1310 getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
1311 server.replid[CONFIG_RUN_ID_SIZE] = '\0';
1312 }
1313
1314 /* Clear (invalidate) the secondary replication ID. This happens, for
1315 * example, after a full resynchronization, when we start a new replication
1316 * history. */
clearReplicationId2(void)1317 void clearReplicationId2(void) {
1318 memset(server.replid2,'0',sizeof(server.replid));
1319 server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
1320 server.second_replid_offset = -1;
1321 }
1322
1323 /* Use the current replication ID / offset as secondary replication
1324 * ID, and change the current one in order to start a new history.
1325 * This should be used when an instance is switched from slave to master
1326 * so that it can serve PSYNC requests performed using the master
1327 * replication ID. */
shiftReplicationId(void)1328 void shiftReplicationId(void) {
1329 memcpy(server.replid2,server.replid,sizeof(server.replid));
1330 /* We set the second replid offset to the master offset + 1, since
1331 * the slave will ask for the first byte it has not yet received, so
1332 * we need to add one to the offset: for example if, as a slave, we are
1333 * sure we have the same history as the master for 50 bytes, after we
1334 * are turned into a master, we can accept a PSYNC request with offset
1335 * 51, since the slave asking has the same history up to the 50th
1336 * byte, and is asking for the new bytes starting at offset 51. */
1337 server.second_replid_offset = server.master_repl_offset+1;
1338 changeReplicationId();
1339 serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
1340 }
1341
1342 /* ----------------------------------- SLAVE -------------------------------- */
1343
1344 /* Returns 1 if the given replication state is a handshake state,
1345 * 0 otherwise. */
slaveIsInHandshakeState(void)1346 int slaveIsInHandshakeState(void) {
1347 return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
1348 server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
1349 }
1350
1351 /* Avoid the master to detect the slave is timing out while loading the
1352 * RDB file in initial synchronization. We send a single newline character
1353 * that is valid protocol but is guaranteed to either be sent entirely or
1354 * not, since the byte is indivisible.
1355 *
1356 * The function is called in two contexts: while we flush the current
1357 * data with emptyDb(), and while we load the new data received as an
1358 * RDB file from the master. */
replicationSendNewlineToMaster(void)1359 void replicationSendNewlineToMaster(void) {
1360 static time_t newline_sent;
1361 if (time(NULL) != newline_sent) {
1362 newline_sent = time(NULL);
1363 /* Pinging back in this stage is best-effort. */
1364 if (server.repl_transfer_s) connWrite(server.repl_transfer_s, "\n", 1);
1365 }
1366 }
1367
1368 /* Callback used by emptyDb() while flushing away old data to load
1369 * the new dataset received by the master. */
replicationEmptyDbCallback(void * privdata)1370 void replicationEmptyDbCallback(void *privdata) {
1371 UNUSED(privdata);
1372 if (server.repl_state == REPL_STATE_TRANSFER)
1373 replicationSendNewlineToMaster();
1374 }
1375
1376 /* Once we have a link with the master and the synchronization was
1377 * performed, this function materializes the master client we store
1378 * at server.master, starting from the specified file descriptor. */
replicationCreateMasterClient(connection * conn,int dbid)1379 void replicationCreateMasterClient(connection *conn, int dbid) {
1380 server.master = createClient(conn);
1381 if (conn)
1382 connSetReadHandler(server.master->conn, readQueryFromClient);
1383 server.master->flags |= CLIENT_MASTER;
1384 server.master->authenticated = 1;
1385 server.master->reploff = server.master_initial_offset;
1386 server.master->read_reploff = server.master->reploff;
1387 server.master->user = NULL; /* This client can do everything. */
1388 memcpy(server.master->replid, server.master_replid,
1389 sizeof(server.master_replid));
1390 /* If master offset is set to -1, this master is old and is not
1391 * PSYNC capable, so we flag it accordingly. */
1392 if (server.master->reploff == -1)
1393 server.master->flags |= CLIENT_PRE_PSYNC;
1394 if (dbid != -1) selectDb(server.master,dbid);
1395 }
1396
1397 /* This function will try to re-enable the AOF file after the
1398 * master-replica synchronization: if it fails after multiple attempts
1399 * the replica cannot be considered reliable and exists with an
1400 * error. */
restartAOFAfterSYNC()1401 void restartAOFAfterSYNC() {
1402 unsigned int tries, max_tries = 10;
1403 for (tries = 0; tries < max_tries; ++tries) {
1404 if (startAppendOnly() == C_OK) break;
1405 serverLog(LL_WARNING,
1406 "Failed enabling the AOF after successful master synchronization! "
1407 "Trying it again in one second.");
1408 sleep(1);
1409 }
1410 if (tries == max_tries) {
1411 serverLog(LL_WARNING,
1412 "FATAL: this replica instance finished the synchronization with "
1413 "its master, but the AOF can't be turned on. Exiting now.");
1414 exit(1);
1415 }
1416 }
1417
useDisklessLoad()1418 static int useDisklessLoad() {
1419 /* compute boolean decision to use diskless load */
1420 int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
1421 (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
1422 /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
1423 if (enabled && !moduleAllDatatypesHandleErrors()) {
1424 serverLog(LL_WARNING,
1425 "Skipping diskless-load because there are modules that don't handle read errors.");
1426 enabled = 0;
1427 }
1428 return enabled;
1429 }
1430
1431 /* Helper function for readSyncBulkPayload() to make backups of the current
1432 * databases before socket-loading the new ones. The backups may be restored
1433 * by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
disklessLoadMakeBackup(void)1434 dbBackup *disklessLoadMakeBackup(void) {
1435 return backupDb();
1436 }
1437
1438 /* Helper function for readSyncBulkPayload(): when replica-side diskless
1439 * database loading is used, Redis makes a backup of the existing databases
1440 * before loading the new ones from the socket.
1441 *
1442 * If the socket loading went wrong, we want to restore the old backups
1443 * into the server databases. */
disklessLoadRestoreBackup(dbBackup * buckup)1444 void disklessLoadRestoreBackup(dbBackup *buckup) {
1445 restoreDbBackup(buckup);
1446 }
1447
1448 /* Helper function for readSyncBulkPayload() to discard our old backups
1449 * when the loading succeeded. */
disklessLoadDiscardBackup(dbBackup * buckup,int flag)1450 void disklessLoadDiscardBackup(dbBackup *buckup, int flag) {
1451 discardDbBackup(buckup, flag, replicationEmptyDbCallback);
1452 }
1453
1454 /* Asynchronously read the SYNC payload we receive from a master */
1455 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
readSyncBulkPayload(connection * conn)1456 void readSyncBulkPayload(connection *conn) {
1457 char buf[PROTO_IOBUF_LEN];
1458 ssize_t nread, readlen, nwritten;
1459 int use_diskless_load = useDisklessLoad();
1460 dbBackup *diskless_load_backup = NULL;
1461 int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
1462 EMPTYDB_NO_FLAGS;
1463 off_t left;
1464
1465 /* Static vars used to hold the EOF mark, and the last bytes received
1466 * from the server: when they match, we reached the end of the transfer. */
1467 static char eofmark[CONFIG_RUN_ID_SIZE];
1468 static char lastbytes[CONFIG_RUN_ID_SIZE];
1469 static int usemark = 0;
1470
1471 /* If repl_transfer_size == -1 we still have to read the bulk length
1472 * from the master reply. */
1473 if (server.repl_transfer_size == -1) {
1474 if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
1475 serverLog(LL_WARNING,
1476 "I/O error reading bulk count from MASTER: %s",
1477 strerror(errno));
1478 goto error;
1479 }
1480
1481 if (buf[0] == '-') {
1482 serverLog(LL_WARNING,
1483 "MASTER aborted replication with an error: %s",
1484 buf+1);
1485 goto error;
1486 } else if (buf[0] == '\0') {
1487 /* At this stage just a newline works as a PING in order to take
1488 * the connection live. So we refresh our last interaction
1489 * timestamp. */
1490 server.repl_transfer_lastio = server.unixtime;
1491 return;
1492 } else if (buf[0] != '$') {
1493 serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1494 goto error;
1495 }
1496
1497 /* There are two possible forms for the bulk payload. One is the
1498 * usual $<count> bulk format. The other is used for diskless transfers
1499 * when the master does not know beforehand the size of the file to
1500 * transfer. In the latter case, the following format is used:
1501 *
1502 * $EOF:<40 bytes delimiter>
1503 *
1504 * At the end of the file the announced delimiter is transmitted. The
1505 * delimiter is long and random enough that the probability of a
1506 * collision with the actual file content can be ignored. */
1507 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
1508 usemark = 1;
1509 memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
1510 memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
1511 /* Set any repl_transfer_size to avoid entering this code path
1512 * at the next call. */
1513 server.repl_transfer_size = 0;
1514 serverLog(LL_NOTICE,
1515 "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
1516 use_diskless_load? "to parser":"to disk");
1517 } else {
1518 usemark = 0;
1519 server.repl_transfer_size = strtol(buf+1,NULL,10);
1520 serverLog(LL_NOTICE,
1521 "MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
1522 (long long) server.repl_transfer_size,
1523 use_diskless_load? "to parser":"to disk");
1524 }
1525 return;
1526 }
1527
1528 if (!use_diskless_load) {
1529 /* Read the data from the socket, store it to a file and search
1530 * for the EOF. */
1531 if (usemark) {
1532 readlen = sizeof(buf);
1533 } else {
1534 left = server.repl_transfer_size - server.repl_transfer_read;
1535 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
1536 }
1537
1538 nread = connRead(conn,buf,readlen);
1539 if (nread <= 0) {
1540 if (connGetState(conn) == CONN_STATE_CONNECTED) {
1541 /* equivalent to EAGAIN */
1542 return;
1543 }
1544 serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1545 (nread == -1) ? strerror(errno) : "connection lost");
1546 cancelReplicationHandshake();
1547 return;
1548 }
1549 server.stat_net_input_bytes += nread;
1550
1551 /* When a mark is used, we want to detect EOF asap in order to avoid
1552 * writing the EOF mark into the file... */
1553 int eof_reached = 0;
1554
1555 if (usemark) {
1556 /* Update the last bytes array, and check if it matches our
1557 * delimiter. */
1558 if (nread >= CONFIG_RUN_ID_SIZE) {
1559 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
1560 CONFIG_RUN_ID_SIZE);
1561 } else {
1562 int rem = CONFIG_RUN_ID_SIZE-nread;
1563 memmove(lastbytes,lastbytes+nread,rem);
1564 memcpy(lastbytes+rem,buf,nread);
1565 }
1566 if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
1567 eof_reached = 1;
1568 }
1569
1570 /* Update the last I/O time for the replication transfer (used in
1571 * order to detect timeouts during replication), and write what we
1572 * got from the socket to the dump file on disk. */
1573 server.repl_transfer_lastio = server.unixtime;
1574 if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
1575 serverLog(LL_WARNING,
1576 "Write error or short write writing to the DB dump file "
1577 "needed for MASTER <-> REPLICA synchronization: %s",
1578 (nwritten == -1) ? strerror(errno) : "short write");
1579 goto error;
1580 }
1581 server.repl_transfer_read += nread;
1582
1583 /* Delete the last 40 bytes from the file if we reached EOF. */
1584 if (usemark && eof_reached) {
1585 if (ftruncate(server.repl_transfer_fd,
1586 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
1587 {
1588 serverLog(LL_WARNING,
1589 "Error truncating the RDB file received from the master "
1590 "for SYNC: %s", strerror(errno));
1591 goto error;
1592 }
1593 }
1594
1595 /* Sync data on disk from time to time, otherwise at the end of the
1596 * transfer we may suffer a big delay as the memory buffers are copied
1597 * into the actual disk. */
1598 if (server.repl_transfer_read >=
1599 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1600 {
1601 off_t sync_size = server.repl_transfer_read -
1602 server.repl_transfer_last_fsync_off;
1603 rdb_fsync_range(server.repl_transfer_fd,
1604 server.repl_transfer_last_fsync_off, sync_size);
1605 server.repl_transfer_last_fsync_off += sync_size;
1606 }
1607
1608 /* Check if the transfer is now complete */
1609 if (!usemark) {
1610 if (server.repl_transfer_read == server.repl_transfer_size)
1611 eof_reached = 1;
1612 }
1613
1614 /* If the transfer is yet not complete, we need to read more, so
1615 * return ASAP and wait for the handler to be called again. */
1616 if (!eof_reached) return;
1617 }
1618
1619 /* We reach this point in one of the following cases:
1620 *
1621 * 1. The replica is using diskless replication, that is, it reads data
1622 * directly from the socket to the Redis memory, without using
1623 * a temporary RDB file on disk. In that case we just block and
1624 * read everything from the socket.
1625 *
1626 * 2. Or when we are done reading from the socket to the RDB file, in
1627 * such case we want just to read the RDB file in memory. */
1628 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
1629
1630 /* We need to stop any AOF rewriting child before flusing and parsing
1631 * the RDB, otherwise we'll create a copy-on-write disaster. */
1632 if (server.aof_state != AOF_OFF) stopAppendOnly();
1633
1634 /* When diskless RDB loading is used by replicas, it may be configured
1635 * in order to save the current DB instead of throwing it away,
1636 * so that we can restore it in case of failed transfer. */
1637 if (use_diskless_load &&
1638 server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
1639 {
1640 /* Create a backup of server.db[] and initialize to empty
1641 * dictionaries. */
1642 diskless_load_backup = disklessLoadMakeBackup();
1643 }
1644 /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
1645 * (Where disklessLoadMakeBackup left server.db empty) because we
1646 * want to execute all the auxiliary logic of emptyDb (Namely,
1647 * fire module events) */
1648 emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
1649
1650 /* Before loading the DB into memory we need to delete the readable
1651 * handler, otherwise it will get called recursively since
1652 * rdbLoad() will call the event loop to process events from time to
1653 * time for non blocking loading. */
1654 connSetReadHandler(conn, NULL);
1655 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
1656 rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
1657 if (use_diskless_load) {
1658 rio rdb;
1659 rioInitWithConn(&rdb,conn,server.repl_transfer_size);
1660
1661 /* Put the socket in blocking mode to simplify RDB transfer.
1662 * We'll restore it when the RDB is received. */
1663 connBlock(conn);
1664 connRecvTimeout(conn, server.repl_timeout*1000);
1665 startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
1666
1667 if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
1668 /* RDB loading failed. */
1669 serverLog(LL_WARNING,
1670 "Failed trying to load the MASTER synchronization DB "
1671 "from socket: %s", strerror(errno));
1672 stopLoading(0);
1673 cancelReplicationHandshake();
1674 rioFreeConn(&rdb, NULL);
1675
1676 /* Remove the half-loaded data in case we started with
1677 * an empty replica. */
1678 emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
1679
1680 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
1681 /* Restore the backed up databases. */
1682 disklessLoadRestoreBackup(diskless_load_backup);
1683 }
1684
1685 /* Note that there's no point in restarting the AOF on SYNC
1686 * failure, it'll be restarted when sync succeeds or the replica
1687 * gets promoted. */
1688 return;
1689 }
1690 stopLoading(1);
1691
1692 /* RDB loading succeeded if we reach this point. */
1693 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
1694 /* Delete the backup databases we created before starting to load
1695 * the new RDB. Now the RDB was loaded with success so the old
1696 * data is useless. */
1697 disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
1698 }
1699
1700 /* Verify the end mark is correct. */
1701 if (usemark) {
1702 if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
1703 memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
1704 {
1705 serverLog(LL_WARNING,"Replication stream EOF marker is broken");
1706 cancelReplicationHandshake();
1707 rioFreeConn(&rdb, NULL);
1708 return;
1709 }
1710 }
1711
1712 /* Cleanup and restore the socket to the original state to continue
1713 * with the normal replication. */
1714 rioFreeConn(&rdb, NULL);
1715 connNonBlock(conn);
1716 connRecvTimeout(conn,0);
1717 } else {
1718 /* Ensure background save doesn't overwrite synced data */
1719 if (server.rdb_child_pid != -1) {
1720 serverLog(LL_NOTICE,
1721 "Replica is about to load the RDB file received from the "
1722 "master, but there is a pending RDB child running. "
1723 "Killing process %ld and removing its temp file to avoid "
1724 "any race",
1725 (long) server.rdb_child_pid);
1726 killRDBChild();
1727 }
1728
1729 /* Make sure the new file (also used for persistence) is fully synced
1730 * (not covered by earlier calls to rdb_fsync_range). */
1731 if (fsync(server.repl_transfer_fd) == -1) {
1732 serverLog(LL_WARNING,
1733 "Failed trying to sync the temp DB to disk in "
1734 "MASTER <-> REPLICA synchronization: %s",
1735 strerror(errno));
1736 cancelReplicationHandshake();
1737 return;
1738 }
1739
1740 /* Rename rdb like renaming rewrite aof asynchronously. */
1741 int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
1742 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
1743 serverLog(LL_WARNING,
1744 "Failed trying to rename the temp DB into %s in "
1745 "MASTER <-> REPLICA synchronization: %s",
1746 server.rdb_filename, strerror(errno));
1747 cancelReplicationHandshake();
1748 if (old_rdb_fd != -1) close(old_rdb_fd);
1749 return;
1750 }
1751 /* Close old rdb asynchronously. */
1752 if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL);
1753
1754 if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
1755 serverLog(LL_WARNING,
1756 "Failed trying to load the MASTER synchronization "
1757 "DB from disk");
1758 cancelReplicationHandshake();
1759 if (server.rdb_del_sync_files && allPersistenceDisabled()) {
1760 serverLog(LL_NOTICE,"Removing the RDB file obtained from "
1761 "the master. This replica has persistence "
1762 "disabled");
1763 bg_unlink(server.rdb_filename);
1764 }
1765 /* Note that there's no point in restarting the AOF on sync failure,
1766 it'll be restarted when sync succeeds or replica promoted. */
1767 return;
1768 }
1769
1770 /* Cleanup. */
1771 if (server.rdb_del_sync_files && allPersistenceDisabled()) {
1772 serverLog(LL_NOTICE,"Removing the RDB file obtained from "
1773 "the master. This replica has persistence "
1774 "disabled");
1775 bg_unlink(server.rdb_filename);
1776 }
1777
1778 zfree(server.repl_transfer_tmpfile);
1779 close(server.repl_transfer_fd);
1780 server.repl_transfer_fd = -1;
1781 server.repl_transfer_tmpfile = NULL;
1782 }
1783
1784 /* Final setup of the connected slave <- master link */
1785 replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
1786 server.repl_state = REPL_STATE_CONNECTED;
1787 server.repl_down_since = 0;
1788
1789 /* Fire the master link modules event. */
1790 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1791 REDISMODULE_SUBEVENT_MASTER_LINK_UP,
1792 NULL);
1793
1794 /* After a full resynchronization we use the replication ID and
1795 * offset of the master. The secondary ID / offset are cleared since
1796 * we are starting a new history. */
1797 memcpy(server.replid,server.master->replid,sizeof(server.replid));
1798 server.master_repl_offset = server.master->reploff;
1799 clearReplicationId2();
1800
1801 /* Let's create the replication backlog if needed. Slaves need to
1802 * accumulate the backlog regardless of the fact they have sub-slaves
1803 * or not, in order to behave correctly if they are promoted to
1804 * masters after a failover. */
1805 if (server.repl_backlog == NULL) createReplicationBacklog();
1806 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
1807
1808 if (server.supervised_mode == SUPERVISED_SYSTEMD) {
1809 redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections.\n");
1810 redisCommunicateSystemd("READY=1\n");
1811 }
1812
1813 /* Restart the AOF subsystem now that we finished the sync. This
1814 * will trigger an AOF rewrite, and when done will start appending
1815 * to the new file. */
1816 if (server.aof_enabled) restartAOFAfterSYNC();
1817 return;
1818
1819 error:
1820 cancelReplicationHandshake();
1821 return;
1822 }
1823
1824 /* Send a synchronous command to the master. Used to send AUTH and
1825 * REPLCONF commands before starting the replication with SYNC.
1826 *
1827 * The command returns an sds string representing the result of the
1828 * operation. On error the first byte is a "-".
1829 */
1830 #define SYNC_CMD_READ (1<<0)
1831 #define SYNC_CMD_WRITE (1<<1)
1832 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
sendSynchronousCommand(int flags,connection * conn,...)1833 char *sendSynchronousCommand(int flags, connection *conn, ...) {
1834
1835 /* Create the command to send to the master, we use redis binary
1836 * protocol to make sure correct arguments are sent. This function
1837 * is not safe for all binary data. */
1838 if (flags & SYNC_CMD_WRITE) {
1839 char *arg;
1840 va_list ap;
1841 sds cmd = sdsempty();
1842 sds cmdargs = sdsempty();
1843 size_t argslen = 0;
1844 va_start(ap,conn);
1845
1846 while(1) {
1847 arg = va_arg(ap, char*);
1848 if (arg == NULL) break;
1849
1850 cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
1851 argslen++;
1852 }
1853
1854 va_end(ap);
1855
1856 cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
1857 cmd = sdscatsds(cmd,cmdargs);
1858 sdsfree(cmdargs);
1859
1860 /* Transfer command to the server. */
1861 if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
1862 == -1)
1863 {
1864 sdsfree(cmd);
1865 return sdscatprintf(sdsempty(),"-Writing to master: %s",
1866 connGetLastError(conn));
1867 }
1868 sdsfree(cmd);
1869 }
1870
1871 /* Read the reply from the server. */
1872 if (flags & SYNC_CMD_READ) {
1873 char buf[256];
1874
1875 if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000)
1876 == -1)
1877 {
1878 return sdscatprintf(sdsempty(),"-Reading from master: %s",
1879 strerror(errno));
1880 }
1881 server.repl_transfer_lastio = server.unixtime;
1882 return sdsnew(buf);
1883 }
1884 return NULL;
1885 }
1886
1887 /* Try a partial resynchronization with the master if we are about to reconnect.
1888 * If there is no cached master structure, at least try to issue a
1889 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1890 * command in order to obtain the master replid and the master replication
1891 * global offset.
1892 *
1893 * This function is designed to be called from syncWithMaster(), so the
1894 * following assumptions are made:
1895 *
1896 * 1) We pass the function an already connected socket "fd".
1897 * 2) This function does not close the file descriptor "fd". However in case
1898 * of successful partial resynchronization, the function will reuse
1899 * 'fd' as file descriptor of the server.master client structure.
1900 *
1901 * The function is split in two halves: if read_reply is 0, the function
1902 * writes the PSYNC command on the socket, and a new function call is
1903 * needed, with read_reply set to 1, in order to read the reply of the
1904 * command. This is useful in order to support non blocking operations, so
1905 * that we write, return into the event loop, and read when there are data.
1906 *
1907 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
1908 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
1909 * with read_reply set to 1. However even when read_reply is set to 1
1910 * the function may return PSYNC_WAIT_REPLY again to signal there were
1911 * insufficient data to read to complete its work. We should re-enter
1912 * into the event loop and wait in such a case.
1913 *
1914 * The function returns:
1915 *
1916 * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
1917 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1918 * In this case the master replid and global replication
1919 * offset is saved.
1920 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1921 * the caller should fall back to SYNC.
1922 * PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
1923 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1924 * PSYNC_TRY_LATER: Master is currently in a transient error condition.
1925 *
1926 * Notable side effects:
1927 *
1928 * 1) As a side effect of the function call the function removes the readable
1929 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1930 * 2) server.master_initial_offset is set to the right value according
1931 * to the master reply. This will be used to populate the 'server.master'
1932 * structure replication offset.
1933 */
1934
1935 #define PSYNC_WRITE_ERROR 0
1936 #define PSYNC_WAIT_REPLY 1
1937 #define PSYNC_CONTINUE 2
1938 #define PSYNC_FULLRESYNC 3
1939 #define PSYNC_NOT_SUPPORTED 4
1940 #define PSYNC_TRY_LATER 5
slaveTryPartialResynchronization(connection * conn,int read_reply)1941 int slaveTryPartialResynchronization(connection *conn, int read_reply) {
1942 char *psync_replid;
1943 char psync_offset[32];
1944 sds reply;
1945
1946 /* Writing half */
1947 if (!read_reply) {
1948 /* Initially set master_initial_offset to -1 to mark the current
1949 * master replid and offset as not valid. Later if we'll be able to do
1950 * a FULL resync using the PSYNC command we'll set the offset at the
1951 * right value, so that this information will be propagated to the
1952 * client structure representing the master into server.master. */
1953 server.master_initial_offset = -1;
1954
1955 if (server.cached_master) {
1956 psync_replid = server.cached_master->replid;
1957 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1958 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
1959 } else {
1960 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
1961 psync_replid = "?";
1962 memcpy(psync_offset,"-1",3);
1963 }
1964
1965 /* Issue the PSYNC command */
1966 reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
1967 if (reply != NULL) {
1968 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
1969 sdsfree(reply);
1970 connSetReadHandler(conn, NULL);
1971 return PSYNC_WRITE_ERROR;
1972 }
1973 return PSYNC_WAIT_REPLY;
1974 }
1975
1976 /* Reading half */
1977 reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
1978 if (sdslen(reply) == 0) {
1979 /* The master may send empty newlines after it receives PSYNC
1980 * and before to reply, just to keep the connection alive. */
1981 sdsfree(reply);
1982 return PSYNC_WAIT_REPLY;
1983 }
1984
1985 connSetReadHandler(conn, NULL);
1986
1987 if (!strncmp(reply,"+FULLRESYNC",11)) {
1988 char *replid = NULL, *offset = NULL;
1989
1990 /* FULL RESYNC, parse the reply in order to extract the replid
1991 * and the replication offset. */
1992 replid = strchr(reply,' ');
1993 if (replid) {
1994 replid++;
1995 offset = strchr(replid,' ');
1996 if (offset) offset++;
1997 }
1998 if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
1999 serverLog(LL_WARNING,
2000 "Master replied with wrong +FULLRESYNC syntax.");
2001 /* This is an unexpected condition, actually the +FULLRESYNC
2002 * reply means that the master supports PSYNC, but the reply
2003 * format seems wrong. To stay safe we blank the master
2004 * replid to make sure next PSYNCs will fail. */
2005 memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
2006 } else {
2007 memcpy(server.master_replid, replid, offset-replid-1);
2008 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
2009 server.master_initial_offset = strtoll(offset,NULL,10);
2010 serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
2011 server.master_replid,
2012 server.master_initial_offset);
2013 }
2014 /* We are going to full resync, discard the cached master structure. */
2015 replicationDiscardCachedMaster();
2016 sdsfree(reply);
2017 return PSYNC_FULLRESYNC;
2018 }
2019
2020 if (!strncmp(reply,"+CONTINUE",9)) {
2021 /* Partial resync was accepted. */
2022 serverLog(LL_NOTICE,
2023 "Successful partial resynchronization with master.");
2024
2025 /* Check the new replication ID advertised by the master. If it
2026 * changed, we need to set the new ID as primary ID, and set or
2027 * secondary ID as the old master ID up to the current offset, so
2028 * that our sub-slaves will be able to PSYNC with us after a
2029 * disconnection. */
2030 char *start = reply+10;
2031 char *end = reply+9;
2032 while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
2033 if (end-start == CONFIG_RUN_ID_SIZE) {
2034 char new[CONFIG_RUN_ID_SIZE+1];
2035 memcpy(new,start,CONFIG_RUN_ID_SIZE);
2036 new[CONFIG_RUN_ID_SIZE] = '\0';
2037
2038 if (strcmp(new,server.cached_master->replid)) {
2039 /* Master ID changed. */
2040 serverLog(LL_WARNING,"Master replication ID changed to %s",new);
2041
2042 /* Set the old ID as our ID2, up to the current offset+1. */
2043 memcpy(server.replid2,server.cached_master->replid,
2044 sizeof(server.replid2));
2045 server.second_replid_offset = server.master_repl_offset+1;
2046
2047 /* Update the cached master ID and our own primary ID to the
2048 * new one. */
2049 memcpy(server.replid,new,sizeof(server.replid));
2050 memcpy(server.cached_master->replid,new,sizeof(server.replid));
2051
2052 /* Disconnect all the sub-slaves: they need to be notified. */
2053 disconnectSlaves();
2054 }
2055 }
2056
2057 /* Setup the replication to continue. */
2058 sdsfree(reply);
2059 replicationResurrectCachedMaster(conn);
2060
2061 /* If this instance was restarted and we read the metadata to
2062 * PSYNC from the persistence file, our replication backlog could
2063 * be still not initialized. Create it. */
2064 if (server.repl_backlog == NULL) createReplicationBacklog();
2065 return PSYNC_CONTINUE;
2066 }
2067
2068 /* If we reach this point we received either an error (since the master does
2069 * not understand PSYNC or because it is in a special state and cannot
2070 * serve our request), or an unexpected reply from the master.
2071 *
2072 * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
2073 * return PSYNC_TRY_LATER if we believe this is a transient error. */
2074
2075 if (!strncmp(reply,"-NOMASTERLINK",13) ||
2076 !strncmp(reply,"-LOADING",8))
2077 {
2078 serverLog(LL_NOTICE,
2079 "Master is currently unable to PSYNC "
2080 "but should be in the future: %s", reply);
2081 sdsfree(reply);
2082 return PSYNC_TRY_LATER;
2083 }
2084
2085 if (strncmp(reply,"-ERR",4)) {
2086 /* If it's not an error, log the unexpected event. */
2087 serverLog(LL_WARNING,
2088 "Unexpected reply to PSYNC from master: %s", reply);
2089 } else {
2090 serverLog(LL_NOTICE,
2091 "Master does not support PSYNC or is in "
2092 "error state (reply: %s)", reply);
2093 }
2094 sdsfree(reply);
2095 replicationDiscardCachedMaster();
2096 return PSYNC_NOT_SUPPORTED;
2097 }
2098
2099 /* This handler fires when the non blocking connect was able to
2100 * establish a connection with the master. */
syncWithMaster(connection * conn)2101 void syncWithMaster(connection *conn) {
2102 char tmpfile[256], *err = NULL;
2103 int dfd = -1, maxtries = 5;
2104 int psync_result;
2105
2106 /* If this event fired after the user turned the instance into a master
2107 * with SLAVEOF NO ONE we must just return ASAP. */
2108 if (server.repl_state == REPL_STATE_NONE) {
2109 connClose(conn);
2110 return;
2111 }
2112
2113 /* Check for errors in the socket: after a non blocking connect() we
2114 * may find that the socket is in error state. */
2115 if (connGetState(conn) != CONN_STATE_CONNECTED) {
2116 serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
2117 connGetLastError(conn));
2118 goto error;
2119 }
2120
2121 /* Send a PING to check the master is able to reply without errors. */
2122 if (server.repl_state == REPL_STATE_CONNECTING) {
2123 serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
2124 /* Delete the writable event so that the readable event remains
2125 * registered and we can wait for the PONG reply. */
2126 connSetReadHandler(conn, syncWithMaster);
2127 connSetWriteHandler(conn, NULL);
2128 server.repl_state = REPL_STATE_RECEIVE_PONG;
2129 /* Send the PING, don't check for errors at all, we have the timeout
2130 * that will take care about this. */
2131 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
2132 if (err) goto write_error;
2133 return;
2134 }
2135
2136 /* Receive the PONG command. */
2137 if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
2138 err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2139
2140 /* We accept only two replies as valid, a positive +PONG reply
2141 * (we just check for "+") or an authentication error.
2142 * Note that older versions of Redis replied with "operation not
2143 * permitted" instead of using a proper error code, so we test
2144 * both. */
2145 if (err[0] != '+' &&
2146 strncmp(err,"-NOAUTH",7) != 0 &&
2147 strncmp(err,"-NOPERM",7) != 0 &&
2148 strncmp(err,"-ERR operation not permitted",28) != 0)
2149 {
2150 serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
2151 sdsfree(err);
2152 goto error;
2153 } else {
2154 serverLog(LL_NOTICE,
2155 "Master replied to PING, replication can continue...");
2156 }
2157 sdsfree(err);
2158 server.repl_state = REPL_STATE_SEND_AUTH;
2159 }
2160
2161 /* AUTH with the master if required. */
2162 if (server.repl_state == REPL_STATE_SEND_AUTH) {
2163 if (server.masteruser && server.masterauth) {
2164 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
2165 server.masteruser,server.masterauth,NULL);
2166 if (err) goto write_error;
2167 server.repl_state = REPL_STATE_RECEIVE_AUTH;
2168 return;
2169 } else if (server.masterauth) {
2170 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);
2171 if (err) goto write_error;
2172 server.repl_state = REPL_STATE_RECEIVE_AUTH;
2173 return;
2174 } else {
2175 server.repl_state = REPL_STATE_SEND_PORT;
2176 }
2177 }
2178
2179 /* Receive AUTH reply. */
2180 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
2181 err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2182 if (err[0] == '-') {
2183 serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
2184 sdsfree(err);
2185 goto error;
2186 }
2187 sdsfree(err);
2188 server.repl_state = REPL_STATE_SEND_PORT;
2189 }
2190
2191 /* Set the slave port, so that Master's INFO command can list the
2192 * slave listening port correctly. */
2193 if (server.repl_state == REPL_STATE_SEND_PORT) {
2194 int port;
2195 if (server.slave_announce_port) port = server.slave_announce_port;
2196 else if (server.tls_replication && server.tls_port) port = server.tls_port;
2197 else port = server.port;
2198 sds portstr = sdsfromlonglong(port);
2199 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2200 "listening-port",portstr, NULL);
2201 sdsfree(portstr);
2202 if (err) goto write_error;
2203 sdsfree(err);
2204 server.repl_state = REPL_STATE_RECEIVE_PORT;
2205 return;
2206 }
2207
2208 /* Receive REPLCONF listening-port reply. */
2209 if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
2210 err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2211 /* Ignore the error if any, not all the Redis versions support
2212 * REPLCONF listening-port. */
2213 if (err[0] == '-') {
2214 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2215 "REPLCONF listening-port: %s", err);
2216 }
2217 sdsfree(err);
2218 server.repl_state = REPL_STATE_SEND_IP;
2219 }
2220
2221 /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
2222 if (server.repl_state == REPL_STATE_SEND_IP &&
2223 server.slave_announce_ip == NULL)
2224 {
2225 server.repl_state = REPL_STATE_SEND_CAPA;
2226 }
2227
2228 /* Set the slave ip, so that Master's INFO command can list the
2229 * slave IP address port correctly in case of port forwarding or NAT. */
2230 if (server.repl_state == REPL_STATE_SEND_IP) {
2231 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2232 "ip-address",server.slave_announce_ip, NULL);
2233 if (err) goto write_error;
2234 sdsfree(err);
2235 server.repl_state = REPL_STATE_RECEIVE_IP;
2236 return;
2237 }
2238
2239 /* Receive REPLCONF ip-address reply. */
2240 if (server.repl_state == REPL_STATE_RECEIVE_IP) {
2241 err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2242 /* Ignore the error if any, not all the Redis versions support
2243 * REPLCONF listening-port. */
2244 if (err[0] == '-') {
2245 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2246 "REPLCONF ip-address: %s", err);
2247 }
2248 sdsfree(err);
2249 server.repl_state = REPL_STATE_SEND_CAPA;
2250 }
2251
2252 /* Inform the master of our (slave) capabilities.
2253 *
2254 * EOF: supports EOF-style RDB transfer for diskless replication.
2255 * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
2256 *
2257 * The master will ignore capabilities it does not understand. */
2258 if (server.repl_state == REPL_STATE_SEND_CAPA) {
2259 err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
2260 "capa","eof","capa","psync2",NULL);
2261 if (err) goto write_error;
2262 sdsfree(err);
2263 server.repl_state = REPL_STATE_RECEIVE_CAPA;
2264 return;
2265 }
2266
2267 /* Receive CAPA reply. */
2268 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
2269 err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
2270 /* Ignore the error if any, not all the Redis versions support
2271 * REPLCONF capa. */
2272 if (err[0] == '-') {
2273 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
2274 "REPLCONF capa: %s", err);
2275 }
2276 sdsfree(err);
2277 server.repl_state = REPL_STATE_SEND_PSYNC;
2278 }
2279
2280 /* Try a partial resynchonization. If we don't have a cached master
2281 * slaveTryPartialResynchronization() will at least try to use PSYNC
2282 * to start a full resynchronization so that we get the master replid
2283 * and the global offset, to try a partial resync at the next
2284 * reconnection attempt. */
2285 if (server.repl_state == REPL_STATE_SEND_PSYNC) {
2286 if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
2287 err = sdsnew("Write error sending the PSYNC command.");
2288 goto write_error;
2289 }
2290 server.repl_state = REPL_STATE_RECEIVE_PSYNC;
2291 return;
2292 }
2293
2294 /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
2295 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
2296 serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
2297 "state should be RECEIVE_PSYNC but is %d",
2298 server.repl_state);
2299 goto error;
2300 }
2301
2302 psync_result = slaveTryPartialResynchronization(conn,1);
2303 if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
2304
2305 /* If the master is in an transient error, we should try to PSYNC
2306 * from scratch later, so go to the error path. This happens when
2307 * the server is loading the dataset or is not connected with its
2308 * master and so forth. */
2309 if (psync_result == PSYNC_TRY_LATER) goto error;
2310
2311 /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
2312 * uninstalling the read handler from the file descriptor. */
2313
2314 if (psync_result == PSYNC_CONTINUE) {
2315 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
2316 if (server.supervised_mode == SUPERVISED_SYSTEMD) {
2317 redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
2318 redisCommunicateSystemd("READY=1\n");
2319 }
2320 return;
2321 }
2322
2323 /* PSYNC failed or is not supported: we want our slaves to resync with us
2324 * as well, if we have any sub-slaves. The master may transfer us an
2325 * entirely different data set and we have no way to incrementally feed
2326 * our slaves after that. */
2327 disconnectSlaves(); /* Force our slaves to resync with us as well. */
2328 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
2329
2330 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
2331 * and the server.master_replid and master_initial_offset are
2332 * already populated. */
2333 if (psync_result == PSYNC_NOT_SUPPORTED) {
2334 serverLog(LL_NOTICE,"Retrying with SYNC...");
2335 if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
2336 serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
2337 strerror(errno));
2338 goto error;
2339 }
2340 }
2341
2342 /* Prepare a suitable temp file for bulk transfer */
2343 if (!useDisklessLoad()) {
2344 while(maxtries--) {
2345 snprintf(tmpfile,256,
2346 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
2347 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
2348 if (dfd != -1) break;
2349 sleep(1);
2350 }
2351 if (dfd == -1) {
2352 serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
2353 goto error;
2354 }
2355 server.repl_transfer_tmpfile = zstrdup(tmpfile);
2356 server.repl_transfer_fd = dfd;
2357 }
2358
2359 /* Setup the non blocking download of the bulk file. */
2360 if (connSetReadHandler(conn, readSyncBulkPayload)
2361 == C_ERR)
2362 {
2363 char conninfo[CONN_INFO_LEN];
2364 serverLog(LL_WARNING,
2365 "Can't create readable event for SYNC: %s (%s)",
2366 strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
2367 goto error;
2368 }
2369
2370 server.repl_state = REPL_STATE_TRANSFER;
2371 server.repl_transfer_size = -1;
2372 server.repl_transfer_read = 0;
2373 server.repl_transfer_last_fsync_off = 0;
2374 server.repl_transfer_lastio = server.unixtime;
2375 return;
2376
2377 error:
2378 if (dfd != -1) close(dfd);
2379 connClose(conn);
2380 server.repl_transfer_s = NULL;
2381 if (server.repl_transfer_fd != -1)
2382 close(server.repl_transfer_fd);
2383 if (server.repl_transfer_tmpfile)
2384 zfree(server.repl_transfer_tmpfile);
2385 server.repl_transfer_tmpfile = NULL;
2386 server.repl_transfer_fd = -1;
2387 server.repl_state = REPL_STATE_CONNECT;
2388 return;
2389
2390 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
2391 serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
2392 sdsfree(err);
2393 goto error;
2394 }
2395
connectWithMaster(void)2396 int connectWithMaster(void) {
2397 server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
2398 if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
2399 NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
2400 serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
2401 connGetLastError(server.repl_transfer_s));
2402 connClose(server.repl_transfer_s);
2403 server.repl_transfer_s = NULL;
2404 return C_ERR;
2405 }
2406
2407
2408 server.repl_transfer_lastio = server.unixtime;
2409 server.repl_state = REPL_STATE_CONNECTING;
2410 return C_OK;
2411 }
2412
2413 /* This function can be called when a non blocking connection is currently
2414 * in progress to undo it.
2415 * Never call this function directly, use cancelReplicationHandshake() instead.
2416 */
undoConnectWithMaster(void)2417 void undoConnectWithMaster(void) {
2418 connClose(server.repl_transfer_s);
2419 server.repl_transfer_s = NULL;
2420 }
2421
2422 /* Abort the async download of the bulk dataset while SYNC-ing with master.
2423 * Never call this function directly, use cancelReplicationHandshake() instead.
2424 */
replicationAbortSyncTransfer(void)2425 void replicationAbortSyncTransfer(void) {
2426 serverAssert(server.repl_state == REPL_STATE_TRANSFER);
2427 undoConnectWithMaster();
2428 if (server.repl_transfer_fd!=-1) {
2429 close(server.repl_transfer_fd);
2430 bg_unlink(server.repl_transfer_tmpfile);
2431 zfree(server.repl_transfer_tmpfile);
2432 server.repl_transfer_tmpfile = NULL;
2433 server.repl_transfer_fd = -1;
2434 }
2435 }
2436
2437 /* This function aborts a non blocking replication attempt if there is one
2438 * in progress, by canceling the non-blocking connect attempt or
2439 * the initial bulk transfer.
2440 *
2441 * If there was a replication handshake in progress 1 is returned and
2442 * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
2443 *
2444 * Otherwise zero is returned and no operation is performed at all. */
cancelReplicationHandshake(void)2445 int cancelReplicationHandshake(void) {
2446 if (server.repl_state == REPL_STATE_TRANSFER) {
2447 replicationAbortSyncTransfer();
2448 server.repl_state = REPL_STATE_CONNECT;
2449 } else if (server.repl_state == REPL_STATE_CONNECTING ||
2450 slaveIsInHandshakeState())
2451 {
2452 undoConnectWithMaster();
2453 server.repl_state = REPL_STATE_CONNECT;
2454 } else {
2455 return 0;
2456 }
2457 return 1;
2458 }
2459
2460 /* Set replication to the specified master address and port. */
replicationSetMaster(char * ip,int port)2461 void replicationSetMaster(char *ip, int port) {
2462 int was_master = server.masterhost == NULL;
2463
2464 sdsfree(server.masterhost);
2465 server.masterhost = sdsnew(ip);
2466 server.masterport = port;
2467 if (server.master) {
2468 freeClient(server.master);
2469 }
2470 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
2471
2472 /* Update oom_score_adj */
2473 setOOMScoreAdj(-1);
2474
2475 /* Force our slaves to resync with us as well. They may hopefully be able
2476 * to partially resync with us, but we can notify the replid change. */
2477 disconnectSlaves();
2478 cancelReplicationHandshake();
2479 /* Before destroying our master state, create a cached master using
2480 * our own parameters, to later PSYNC with the new master. */
2481 if (was_master) {
2482 replicationDiscardCachedMaster();
2483 replicationCacheMasterUsingMyself();
2484 }
2485
2486 /* Fire the role change modules event. */
2487 moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
2488 REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
2489 NULL);
2490
2491 /* Fire the master link modules event. */
2492 if (server.repl_state == REPL_STATE_CONNECTED)
2493 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2494 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2495 NULL);
2496
2497 server.repl_state = REPL_STATE_CONNECT;
2498 }
2499
2500 /* Cancel replication, setting the instance as a master itself. */
replicationUnsetMaster(void)2501 void replicationUnsetMaster(void) {
2502 if (server.masterhost == NULL) return; /* Nothing to do. */
2503
2504 /* Fire the master link modules event. */
2505 if (server.repl_state == REPL_STATE_CONNECTED)
2506 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2507 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2508 NULL);
2509
2510 sdsfree(server.masterhost);
2511 server.masterhost = NULL;
2512 if (server.master) freeClient(server.master);
2513 replicationDiscardCachedMaster();
2514 cancelReplicationHandshake();
2515 /* When a slave is turned into a master, the current replication ID
2516 * (that was inherited from the master at synchronization time) is
2517 * used as secondary ID up to the current offset, and a new replication
2518 * ID is created to continue with a new replication history.
2519 *
2520 * NOTE: this function MUST be called after we call
2521 * freeClient(server.master), since there we adjust the replication
2522 * offset trimming the final PINGs. See Github issue #7320. */
2523 shiftReplicationId();
2524 /* Disconnecting all the slaves is required: we need to inform slaves
2525 * of the replication ID change (see shiftReplicationId() call). However
2526 * the slaves will be able to partially resync with us, so it will be
2527 * a very fast reconnection. */
2528 disconnectSlaves();
2529 server.repl_state = REPL_STATE_NONE;
2530
2531 /* We need to make sure the new master will start the replication stream
2532 * with a SELECT statement. This is forced after a full resync, but
2533 * with PSYNC version 2, there is no need for full resync after a
2534 * master switch. */
2535 server.slaveseldb = -1;
2536
2537 /* Update oom_score_adj */
2538 setOOMScoreAdj(-1);
2539
2540 /* Once we turn from slave to master, we consider the starting time without
2541 * slaves (that is used to count the replication backlog time to live) as
2542 * starting from now. Otherwise the backlog will be freed after a
2543 * failover if slaves do not connect immediately. */
2544 server.repl_no_slaves_since = server.unixtime;
2545
2546 /* Fire the role change modules event. */
2547 moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
2548 REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
2549 NULL);
2550
2551 /* Restart the AOF subsystem in case we shut it down during a sync when
2552 * we were still a slave. */
2553 if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
2554 }
2555
2556 /* This function is called when the slave lose the connection with the
2557 * master into an unexpected way. */
replicationHandleMasterDisconnection(void)2558 void replicationHandleMasterDisconnection(void) {
2559 /* Fire the master link modules event. */
2560 if (server.repl_state == REPL_STATE_CONNECTED)
2561 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2562 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
2563 NULL);
2564
2565 server.master = NULL;
2566 server.repl_state = REPL_STATE_CONNECT;
2567 server.repl_down_since = server.unixtime;
2568 /* We lost connection with our master, don't disconnect slaves yet,
2569 * maybe we'll be able to PSYNC with our master later. We'll disconnect
2570 * the slaves only if we'll have to do a full resync with our master. */
2571 }
2572
replicaofCommand(client * c)2573 void replicaofCommand(client *c) {
2574 /* SLAVEOF is not allowed in cluster mode as replication is automatically
2575 * configured using the current address of the master node. */
2576 if (server.cluster_enabled) {
2577 addReplyError(c,"REPLICAOF not allowed in cluster mode.");
2578 return;
2579 }
2580
2581 /* The special host/port combination "NO" "ONE" turns the instance
2582 * into a master. Otherwise the new master address is set. */
2583 if (!strcasecmp(c->argv[1]->ptr,"no") &&
2584 !strcasecmp(c->argv[2]->ptr,"one")) {
2585 if (server.masterhost) {
2586 replicationUnsetMaster();
2587 sds client = catClientInfoString(sdsempty(),c);
2588 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
2589 client);
2590 sdsfree(client);
2591 }
2592 } else {
2593 long port;
2594
2595 if (c->flags & CLIENT_SLAVE)
2596 {
2597 /* If a client is already a replica they cannot run this command,
2598 * because it involves flushing all replicas (including this
2599 * client) */
2600 addReplyError(c, "Command is not valid when client is a replica.");
2601 return;
2602 }
2603
2604 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
2605 return;
2606
2607 /* Check if we are already attached to the specified slave */
2608 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
2609 && server.masterport == port) {
2610 serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
2611 "with the master we are already connected "
2612 "with. No operation performed.");
2613 addReplySds(c,sdsnew("+OK Already connected to specified "
2614 "master\r\n"));
2615 return;
2616 }
2617 /* There was no previous master or the user specified a different one,
2618 * we can continue. */
2619 replicationSetMaster(c->argv[1]->ptr, port);
2620 sds client = catClientInfoString(sdsempty(),c);
2621 serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
2622 server.masterhost, server.masterport, client);
2623 sdsfree(client);
2624 }
2625 addReply(c,shared.ok);
2626 }
2627
2628 /* ROLE command: provide information about the role of the instance
2629 * (master or slave) and additional information related to replication
2630 * in an easy to process format. */
roleCommand(client * c)2631 void roleCommand(client *c) {
2632 if (server.masterhost == NULL) {
2633 listIter li;
2634 listNode *ln;
2635 void *mbcount;
2636 int slaves = 0;
2637
2638 addReplyArrayLen(c,3);
2639 addReplyBulkCBuffer(c,"master",6);
2640 addReplyLongLong(c,server.master_repl_offset);
2641 mbcount = addReplyDeferredLen(c);
2642 listRewind(server.slaves,&li);
2643 while((ln = listNext(&li))) {
2644 client *slave = ln->value;
2645 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
2646
2647 if (slaveip[0] == '\0') {
2648 if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1)
2649 continue;
2650 slaveip = ip;
2651 }
2652 if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2653 addReplyArrayLen(c,3);
2654 addReplyBulkCString(c,slaveip);
2655 addReplyBulkLongLong(c,slave->slave_listening_port);
2656 addReplyBulkLongLong(c,slave->repl_ack_off);
2657 slaves++;
2658 }
2659 setDeferredArrayLen(c,mbcount,slaves);
2660 } else {
2661 char *slavestate = NULL;
2662
2663 addReplyArrayLen(c,5);
2664 addReplyBulkCBuffer(c,"slave",5);
2665 addReplyBulkCString(c,server.masterhost);
2666 addReplyLongLong(c,server.masterport);
2667 if (slaveIsInHandshakeState()) {
2668 slavestate = "handshake";
2669 } else {
2670 switch(server.repl_state) {
2671 case REPL_STATE_NONE: slavestate = "none"; break;
2672 case REPL_STATE_CONNECT: slavestate = "connect"; break;
2673 case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
2674 case REPL_STATE_TRANSFER: slavestate = "sync"; break;
2675 case REPL_STATE_CONNECTED: slavestate = "connected"; break;
2676 default: slavestate = "unknown"; break;
2677 }
2678 }
2679 addReplyBulkCString(c,slavestate);
2680 addReplyLongLong(c,server.master ? server.master->reploff : -1);
2681 }
2682 }
2683
2684 /* Send a REPLCONF ACK command to the master to inform it about the current
2685 * processed offset. If we are not connected with a master, the command has
2686 * no effects. */
replicationSendAck(void)2687 void replicationSendAck(void) {
2688 client *c = server.master;
2689
2690 if (c != NULL) {
2691 c->flags |= CLIENT_MASTER_FORCE_REPLY;
2692 addReplyArrayLen(c,3);
2693 addReplyBulkCString(c,"REPLCONF");
2694 addReplyBulkCString(c,"ACK");
2695 addReplyBulkLongLong(c,c->reploff);
2696 c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
2697 }
2698 }
2699
2700 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
2701
2702 /* In order to implement partial synchronization we need to be able to cache
2703 * our master's client structure after a transient disconnection.
2704 * It is cached into server.cached_master and flushed away using the following
2705 * functions. */
2706
2707 /* This function is called by freeClient() in order to cache the master
2708 * client structure instead of destroying it. freeClient() will return
2709 * ASAP after this function returns, so every action needed to avoid problems
2710 * with a client that is really "suspended" has to be done by this function.
2711 *
2712 * The other functions that will deal with the cached master are:
2713 *
2714 * replicationDiscardCachedMaster() that will make sure to kill the client
2715 * as for some reason we don't want to use it in the future.
2716 *
2717 * replicationResurrectCachedMaster() that is used after a successful PSYNC
2718 * handshake in order to reactivate the cached master.
2719 */
replicationCacheMaster(client * c)2720 void replicationCacheMaster(client *c) {
2721 serverAssert(server.master != NULL && server.cached_master == NULL);
2722 serverLog(LL_NOTICE,"Caching the disconnected master state.");
2723
2724 /* Unlink the client from the server structures. */
2725 unlinkClient(c);
2726
2727 /* Reset the master client so that's ready to accept new commands:
2728 * we want to discard te non processed query buffers and non processed
2729 * offsets, including pending transactions, already populated arguments,
2730 * pending outputs to the master. */
2731 sdsclear(server.master->querybuf);
2732 sdsclear(server.master->pending_querybuf);
2733 server.master->read_reploff = server.master->reploff;
2734 if (c->flags & CLIENT_MULTI) discardTransaction(c);
2735 listEmpty(c->reply);
2736 c->sentlen = 0;
2737 c->reply_bytes = 0;
2738 c->bufpos = 0;
2739 resetClient(c);
2740
2741 /* Save the master. Server.master will be set to null later by
2742 * replicationHandleMasterDisconnection(). */
2743 server.cached_master = server.master;
2744
2745 /* Invalidate the Peer ID cache. */
2746 if (c->peerid) {
2747 sdsfree(c->peerid);
2748 c->peerid = NULL;
2749 }
2750
2751 /* Caching the master happens instead of the actual freeClient() call,
2752 * so make sure to adjust the replication state. This function will
2753 * also set server.master to NULL. */
2754 replicationHandleMasterDisconnection();
2755 }
2756
2757 /* This function is called when a master is turend into a slave, in order to
2758 * create from scratch a cached master for the new client, that will allow
2759 * to PSYNC with the slave that was promoted as the new master after a
2760 * failover.
2761 *
2762 * Assuming this instance was previously the master instance of the new master,
2763 * the new master will accept its replication ID, and potentiall also the
2764 * current offset if no data was lost during the failover. So we use our
2765 * current replication ID and offset in order to synthesize a cached master. */
replicationCacheMasterUsingMyself(void)2766 void replicationCacheMasterUsingMyself(void) {
2767 serverLog(LL_NOTICE,
2768 "Before turning into a replica, using my own master parameters "
2769 "to synthesize a cached master: I may be able to synchronize with "
2770 "the new master with just a partial transfer.");
2771
2772 /* This will be used to populate the field server.master->reploff
2773 * by replicationCreateMasterClient(). We'll later set the created
2774 * master as server.cached_master, so the replica will use such
2775 * offset for PSYNC. */
2776 server.master_initial_offset = server.master_repl_offset;
2777
2778 /* The master client we create can be set to any DBID, because
2779 * the new master will start its replication stream with SELECT. */
2780 replicationCreateMasterClient(NULL,-1);
2781
2782 /* Use our own ID / offset. */
2783 memcpy(server.master->replid, server.replid, sizeof(server.replid));
2784
2785 /* Set as cached master. */
2786 unlinkClient(server.master);
2787 server.cached_master = server.master;
2788 server.master = NULL;
2789 }
2790
2791 /* Free a cached master, called when there are no longer the conditions for
2792 * a partial resync on reconnection. */
replicationDiscardCachedMaster(void)2793 void replicationDiscardCachedMaster(void) {
2794 if (server.cached_master == NULL) return;
2795
2796 serverLog(LL_NOTICE,"Discarding previously cached master state.");
2797 server.cached_master->flags &= ~CLIENT_MASTER;
2798 freeClient(server.cached_master);
2799 server.cached_master = NULL;
2800 }
2801
2802 /* Turn the cached master into the current master, using the file descriptor
2803 * passed as argument as the socket for the new master.
2804 *
2805 * This function is called when successfully setup a partial resynchronization
2806 * so the stream of data that we'll receive will start from were this
2807 * master left. */
replicationResurrectCachedMaster(connection * conn)2808 void replicationResurrectCachedMaster(connection *conn) {
2809 server.master = server.cached_master;
2810 server.cached_master = NULL;
2811 server.master->conn = conn;
2812 connSetPrivateData(server.master->conn, server.master);
2813 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
2814 server.master->authenticated = 1;
2815 server.master->lastinteraction = server.unixtime;
2816 server.repl_state = REPL_STATE_CONNECTED;
2817 server.repl_down_since = 0;
2818
2819 /* Fire the master link modules event. */
2820 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2821 REDISMODULE_SUBEVENT_MASTER_LINK_UP,
2822 NULL);
2823
2824 /* Re-add to the list of clients. */
2825 linkClient(server.master);
2826 if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
2827 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
2828 freeClientAsync(server.master); /* Close ASAP. */
2829 }
2830
2831 /* We may also need to install the write handler as well if there is
2832 * pending data in the write buffers. */
2833 if (clientHasPendingReplies(server.master)) {
2834 if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
2835 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
2836 freeClientAsync(server.master); /* Close ASAP. */
2837 }
2838 }
2839 }
2840
2841 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
2842
2843 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
2844 * If the option is active, the server will prevent writes if there are not
2845 * enough connected slaves with the specified lag (or less). */
refreshGoodSlavesCount(void)2846 void refreshGoodSlavesCount(void) {
2847 listIter li;
2848 listNode *ln;
2849 int good = 0;
2850
2851 if (!server.repl_min_slaves_to_write ||
2852 !server.repl_min_slaves_max_lag) return;
2853
2854 listRewind(server.slaves,&li);
2855 while((ln = listNext(&li))) {
2856 client *slave = ln->value;
2857 time_t lag = server.unixtime - slave->repl_ack_time;
2858
2859 if (slave->replstate == SLAVE_STATE_ONLINE &&
2860 lag <= server.repl_min_slaves_max_lag) good++;
2861 }
2862 server.repl_good_slaves_count = good;
2863 }
2864
2865 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
2866 * The goal of this code is to keep track of scripts already sent to every
2867 * connected slave, in order to be able to replicate EVALSHA as it is without
2868 * translating it to EVAL every time it is possible.
2869 *
2870 * We use a capped collection implemented by a hash table for fast lookup
2871 * of scripts we can send as EVALSHA, plus a linked list that is used for
2872 * eviction of the oldest entry when the max number of items is reached.
2873 *
2874 * We don't care about taking a different cache for every different slave
2875 * since to fill the cache again is not very costly, the goal of this code
2876 * is to avoid that the same big script is transmitted a big number of times
2877 * per second wasting bandwidth and processor speed, but it is not a problem
2878 * if we need to rebuild the cache from scratch from time to time, every used
2879 * script will need to be transmitted a single time to reappear in the cache.
2880 *
2881 * This is how the system works:
2882 *
2883 * 1) Every time a new slave connects, we flush the whole script cache.
2884 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
2885 * trying to convert EVAL into EVALSHA specifically for slaves.
2886 * 3) Every time we transmit a script as EVAL to the slaves, we also add the
2887 * corresponding SHA1 of the script into the cache as we are sure every
2888 * slave knows about the script starting from now.
2889 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
2890 * and at the same time flush the script cache.
2891 * 5) When the last slave disconnects, flush the cache.
2892 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
2893 * in the master sometimes.
2894 */
2895
2896 /* Initialize the script cache, only called at startup. */
replicationScriptCacheInit(void)2897 void replicationScriptCacheInit(void) {
2898 server.repl_scriptcache_size = 10000;
2899 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
2900 server.repl_scriptcache_fifo = listCreate();
2901 }
2902
2903 /* Empty the script cache. Should be called every time we are no longer sure
2904 * that every slave knows about all the scripts in our set, or when the
2905 * current AOF "context" is no longer aware of the script. In general we
2906 * should flush the cache:
2907 *
2908 * 1) Every time a new slave reconnects to this master and performs a
2909 * full SYNC (PSYNC does not require flushing).
2910 * 2) Every time an AOF rewrite is performed.
2911 * 3) Every time we are left without slaves at all, and AOF is off, in order
2912 * to reclaim otherwise unused memory.
2913 */
replicationScriptCacheFlush(void)2914 void replicationScriptCacheFlush(void) {
2915 dictEmpty(server.repl_scriptcache_dict,NULL);
2916 listRelease(server.repl_scriptcache_fifo);
2917 server.repl_scriptcache_fifo = listCreate();
2918 }
2919
2920 /* Add an entry into the script cache, if we reach max number of entries the
2921 * oldest is removed from the list. */
replicationScriptCacheAdd(sds sha1)2922 void replicationScriptCacheAdd(sds sha1) {
2923 int retval;
2924 sds key = sdsdup(sha1);
2925
2926 /* Evict oldest. */
2927 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
2928 {
2929 listNode *ln = listLast(server.repl_scriptcache_fifo);
2930 sds oldest = listNodeValue(ln);
2931
2932 retval = dictDelete(server.repl_scriptcache_dict,oldest);
2933 serverAssert(retval == DICT_OK);
2934 listDelNode(server.repl_scriptcache_fifo,ln);
2935 }
2936
2937 /* Add current. */
2938 retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
2939 listAddNodeHead(server.repl_scriptcache_fifo,key);
2940 serverAssert(retval == DICT_OK);
2941 }
2942
2943 /* Returns non-zero if the specified entry exists inside the cache, that is,
2944 * if all the slaves are aware of this script SHA1. */
replicationScriptCacheExists(sds sha1)2945 int replicationScriptCacheExists(sds sha1) {
2946 return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
2947 }
2948
2949 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2950 * Redis synchronous replication design can be summarized in points:
2951 *
2952 * - Redis masters have a global replication offset, used by PSYNC.
2953 * - Master increment the offset every time new commands are sent to slaves.
2954 * - Slaves ping back masters with the offset processed so far.
2955 *
2956 * So synchronous replication adds a new WAIT command in the form:
2957 *
2958 * WAIT <num_replicas> <milliseconds_timeout>
2959 *
2960 * That returns the number of replicas that processed the query when
2961 * we finally have at least num_replicas, or when the timeout was
2962 * reached.
2963 *
2964 * The command is implemented in this way:
2965 *
2966 * - Every time a client processes a command, we remember the replication
2967 * offset after sending that command to the slaves.
2968 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2969 * The client is blocked at the same time (see blocked.c).
2970 * - Once we receive enough ACKs for a given offset or when the timeout
2971 * is reached, the WAIT command is unblocked and the reply sent to the
2972 * client.
2973 */
2974
2975 /* This just set a flag so that we broadcast a REPLCONF GETACK command
2976 * to all the slaves in the beforeSleep() function. Note that this way
2977 * we "group" all the clients that want to wait for synchronous replication
2978 * in a given event loop iteration, and send a single GETACK for them all. */
replicationRequestAckFromSlaves(void)2979 void replicationRequestAckFromSlaves(void) {
2980 server.get_ack_from_slaves = 1;
2981 }
2982
2983 /* Return the number of slaves that already acknowledged the specified
2984 * replication offset. */
replicationCountAcksByOffset(long long offset)2985 int replicationCountAcksByOffset(long long offset) {
2986 listIter li;
2987 listNode *ln;
2988 int count = 0;
2989
2990 listRewind(server.slaves,&li);
2991 while((ln = listNext(&li))) {
2992 client *slave = ln->value;
2993
2994 if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2995 if (slave->repl_ack_off >= offset) count++;
2996 }
2997 return count;
2998 }
2999
3000 /* WAIT for N replicas to acknowledge the processing of our latest
3001 * write command (and all the previous commands). */
waitCommand(client * c)3002 void waitCommand(client *c) {
3003 mstime_t timeout;
3004 long numreplicas, ackreplicas;
3005 long long offset = c->woff;
3006
3007 if (server.masterhost) {
3008 addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
3009 return;
3010 }
3011
3012 /* Argument parsing. */
3013 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
3014 return;
3015 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
3016 != C_OK) return;
3017
3018 /* First try without blocking at all. */
3019 ackreplicas = replicationCountAcksByOffset(c->woff);
3020 if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
3021 addReplyLongLong(c,ackreplicas);
3022 return;
3023 }
3024
3025 /* Otherwise block the client and put it into our list of clients
3026 * waiting for ack from slaves. */
3027 c->bpop.timeout = timeout;
3028 c->bpop.reploffset = offset;
3029 c->bpop.numreplicas = numreplicas;
3030 listAddNodeTail(server.clients_waiting_acks,c);
3031 blockClient(c,BLOCKED_WAIT);
3032
3033 /* Make sure that the server will send an ACK request to all the slaves
3034 * before returning to the event loop. */
3035 replicationRequestAckFromSlaves();
3036 }
3037
3038 /* This is called by unblockClient() to perform the blocking op type
3039 * specific cleanup. We just remove the client from the list of clients
3040 * waiting for replica acks. Never call it directly, call unblockClient()
3041 * instead. */
unblockClientWaitingReplicas(client * c)3042 void unblockClientWaitingReplicas(client *c) {
3043 listNode *ln = listSearchKey(server.clients_waiting_acks,c);
3044 serverAssert(ln != NULL);
3045 listDelNode(server.clients_waiting_acks,ln);
3046 }
3047
3048 /* Check if there are clients blocked in WAIT that can be unblocked since
3049 * we received enough ACKs from slaves. */
processClientsWaitingReplicas(void)3050 void processClientsWaitingReplicas(void) {
3051 long long last_offset = 0;
3052 int last_numreplicas = 0;
3053
3054 listIter li;
3055 listNode *ln;
3056
3057 listRewind(server.clients_waiting_acks,&li);
3058 while((ln = listNext(&li))) {
3059 client *c = ln->value;
3060
3061 /* Every time we find a client that is satisfied for a given
3062 * offset and number of replicas, we remember it so the next client
3063 * may be unblocked without calling replicationCountAcksByOffset()
3064 * if the requested offset / replicas were equal or less. */
3065 if (last_offset && last_offset > c->bpop.reploffset &&
3066 last_numreplicas > c->bpop.numreplicas)
3067 {
3068 unblockClient(c);
3069 addReplyLongLong(c,last_numreplicas);
3070 } else {
3071 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
3072
3073 if (numreplicas >= c->bpop.numreplicas) {
3074 last_offset = c->bpop.reploffset;
3075 last_numreplicas = numreplicas;
3076 unblockClient(c);
3077 addReplyLongLong(c,numreplicas);
3078 }
3079 }
3080 }
3081 }
3082
3083 /* Return the slave replication offset for this instance, that is
3084 * the offset for which we already processed the master replication stream. */
replicationGetSlaveOffset(void)3085 long long replicationGetSlaveOffset(void) {
3086 long long offset = 0;
3087
3088 if (server.masterhost != NULL) {
3089 if (server.master) {
3090 offset = server.master->reploff;
3091 } else if (server.cached_master) {
3092 offset = server.cached_master->reploff;
3093 }
3094 }
3095 /* offset may be -1 when the master does not support it at all, however
3096 * this function is designed to return an offset that can express the
3097 * amount of data processed by the master, so we return a positive
3098 * integer. */
3099 if (offset < 0) offset = 0;
3100 return offset;
3101 }
3102
3103 /* --------------------------- REPLICATION CRON ---------------------------- */
3104
3105 /* Replication cron function, called 1 time per second. */
replicationCron(void)3106 void replicationCron(void) {
3107 static long long replication_cron_loops = 0;
3108
3109 /* Non blocking connection timeout? */
3110 if (server.masterhost &&
3111 (server.repl_state == REPL_STATE_CONNECTING ||
3112 slaveIsInHandshakeState()) &&
3113 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
3114 {
3115 serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
3116 cancelReplicationHandshake();
3117 }
3118
3119 /* Bulk transfer I/O timeout? */
3120 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
3121 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
3122 {
3123 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
3124 cancelReplicationHandshake();
3125 }
3126
3127 /* Timed out master when we are an already connected slave? */
3128 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
3129 (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
3130 {
3131 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
3132 freeClient(server.master);
3133 }
3134
3135 /* Check if we should connect to a MASTER */
3136 if (server.repl_state == REPL_STATE_CONNECT) {
3137 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
3138 server.masterhost, server.masterport);
3139 if (connectWithMaster() == C_OK) {
3140 serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
3141 }
3142 }
3143
3144 /* Send ACK to master from time to time.
3145 * Note that we do not send periodic acks to masters that don't
3146 * support PSYNC and replication offsets. */
3147 if (server.masterhost && server.master &&
3148 !(server.master->flags & CLIENT_PRE_PSYNC))
3149 replicationSendAck();
3150
3151 /* If we have attached slaves, PING them from time to time.
3152 * So slaves can implement an explicit timeout to masters, and will
3153 * be able to detect a link disconnection even if the TCP connection
3154 * will not actually go down. */
3155 listIter li;
3156 listNode *ln;
3157 robj *ping_argv[1];
3158
3159 /* First, send PING according to ping_slave_period. */
3160 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
3161 listLength(server.slaves))
3162 {
3163 /* Note that we don't send the PING if the clients are paused during
3164 * a Redis Cluster manual failover: the PING we send will otherwise
3165 * alter the replication offsets of master and slave, and will no longer
3166 * match the one stored into 'mf_master_offset' state. */
3167 int manual_failover_in_progress =
3168 server.cluster_enabled &&
3169 server.cluster->mf_end &&
3170 clientsArePaused();
3171
3172 if (!manual_failover_in_progress) {
3173 ping_argv[0] = createStringObject("PING",4);
3174 replicationFeedSlaves(server.slaves, server.slaveseldb,
3175 ping_argv, 1);
3176 decrRefCount(ping_argv[0]);
3177 }
3178 }
3179
3180 /* Second, send a newline to all the slaves in pre-synchronization
3181 * stage, that is, slaves waiting for the master to create the RDB file.
3182 *
3183 * Also send the a newline to all the chained slaves we have, if we lost
3184 * connection from our master, to keep the slaves aware that their
3185 * master is online. This is needed since sub-slaves only receive proxied
3186 * data from top-level masters, so there is no explicit pinging in order
3187 * to avoid altering the replication offsets. This special out of band
3188 * pings (newlines) can be sent, they will have no effect in the offset.
3189 *
3190 * The newline will be ignored by the slave but will refresh the
3191 * last interaction timer preventing a timeout. In this case we ignore the
3192 * ping period and refresh the connection once per second since certain
3193 * timeouts are set at a few seconds (example: PSYNC response). */
3194 listRewind(server.slaves,&li);
3195 while((ln = listNext(&li))) {
3196 client *slave = ln->value;
3197
3198 int is_presync =
3199 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
3200 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
3201 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
3202
3203 if (is_presync) {
3204 connWrite(slave->conn, "\n", 1);
3205 }
3206 }
3207
3208 /* Disconnect timedout slaves. */
3209 if (listLength(server.slaves)) {
3210 listIter li;
3211 listNode *ln;
3212
3213 listRewind(server.slaves,&li);
3214 while((ln = listNext(&li))) {
3215 client *slave = ln->value;
3216
3217 if (slave->replstate == SLAVE_STATE_ONLINE) {
3218 if (slave->flags & CLIENT_PRE_PSYNC)
3219 continue;
3220 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
3221 serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
3222 replicationGetSlaveName(slave));
3223 freeClient(slave);
3224 continue;
3225 }
3226 }
3227 /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
3228 * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
3229 * from terminating. */
3230 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
3231 if (slave->repl_last_partial_write != 0 &&
3232 (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
3233 {
3234 serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
3235 replicationGetSlaveName(slave));
3236 freeClient(slave);
3237 continue;
3238 }
3239 }
3240 }
3241 }
3242
3243 /* If this is a master without attached slaves and there is a replication
3244 * backlog active, in order to reclaim memory we can free it after some
3245 * (configured) time. Note that this cannot be done for slaves: slaves
3246 * without sub-slaves attached should still accumulate data into the
3247 * backlog, in order to reply to PSYNC queries if they are turned into
3248 * masters after a failover. */
3249 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
3250 server.repl_backlog && server.masterhost == NULL)
3251 {
3252 time_t idle = server.unixtime - server.repl_no_slaves_since;
3253
3254 if (idle > server.repl_backlog_time_limit) {
3255 /* When we free the backlog, we always use a new
3256 * replication ID and clear the ID2. This is needed
3257 * because when there is no backlog, the master_repl_offset
3258 * is not updated, but we would still retain our replication
3259 * ID, leading to the following problem:
3260 *
3261 * 1. We are a master instance.
3262 * 2. Our slave is promoted to master. It's repl-id-2 will
3263 * be the same as our repl-id.
3264 * 3. We, yet as master, receive some updates, that will not
3265 * increment the master_repl_offset.
3266 * 4. Later we are turned into a slave, connect to the new
3267 * master that will accept our PSYNC request by second
3268 * replication ID, but there will be data inconsistency
3269 * because we received writes. */
3270 changeReplicationId();
3271 clearReplicationId2();
3272 freeReplicationBacklog();
3273 serverLog(LL_NOTICE,
3274 "Replication backlog freed after %d seconds "
3275 "without connected replicas.",
3276 (int) server.repl_backlog_time_limit);
3277 }
3278 }
3279
3280 /* If AOF is disabled and we no longer have attached slaves, we can
3281 * free our Replication Script Cache as there is no need to propagate
3282 * EVALSHA at all. */
3283 if (listLength(server.slaves) == 0 &&
3284 server.aof_state == AOF_OFF &&
3285 listLength(server.repl_scriptcache_fifo) != 0)
3286 {
3287 replicationScriptCacheFlush();
3288 }
3289
3290 /* Start a BGSAVE good for replication if we have slaves in
3291 * WAIT_BGSAVE_START state.
3292 *
3293 * In case of diskless replication, we make sure to wait the specified
3294 * number of seconds (according to configuration) so that other slaves
3295 * have the time to arrive before we start streaming. */
3296 if (!hasActiveChildProcess()) {
3297 time_t idle, max_idle = 0;
3298 int slaves_waiting = 0;
3299 int mincapa = -1;
3300 listNode *ln;
3301 listIter li;
3302
3303 listRewind(server.slaves,&li);
3304 while((ln = listNext(&li))) {
3305 client *slave = ln->value;
3306 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
3307 idle = server.unixtime - slave->lastinteraction;
3308 if (idle > max_idle) max_idle = idle;
3309 slaves_waiting++;
3310 mincapa = (mincapa == -1) ? slave->slave_capa :
3311 (mincapa & slave->slave_capa);
3312 }
3313 }
3314
3315 if (slaves_waiting &&
3316 (!server.repl_diskless_sync ||
3317 max_idle > server.repl_diskless_sync_delay))
3318 {
3319 /* Start the BGSAVE. The called function may start a
3320 * BGSAVE with socket target or disk target depending on the
3321 * configuration and slaves capabilities. */
3322 startBgsaveForReplication(mincapa);
3323 }
3324 }
3325
3326 /* Remove the RDB file used for replication if Redis is not running
3327 * with any persistence. */
3328 removeRDBUsedToSyncReplicas();
3329
3330 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
3331 refreshGoodSlavesCount();
3332 replication_cron_loops++; /* Incremented with frequency 1 HZ. */
3333 }
3334