1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43 
44 /* A global reference to myself is handy to make code more clear.
45  * Myself always points to server.cluster->myself, that is, the clusterNode
46  * that represents this node. */
47 clusterNode *myself = NULL;
48 
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(connection *conn);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79 
80 #define RCVBUF_INIT_LEN 1024
81 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
82 
83 /* -----------------------------------------------------------------------------
84  * Initialization
85  * -------------------------------------------------------------------------- */
86 
87 /* Load the cluster config from 'filename'.
88  *
89  * If the file does not exist or is zero-length (this may happen because
90  * when we lock the nodes.conf file, we create a zero-length one for the
91  * sake of locking if it does not already exist), C_ERR is returned.
92  * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)93 int clusterLoadConfig(char *filename) {
94     FILE *fp = fopen(filename,"r");
95     struct stat sb;
96     char *line;
97     int maxline, j;
98 
99     if (fp == NULL) {
100         if (errno == ENOENT) {
101             return C_ERR;
102         } else {
103             serverLog(LL_WARNING,
104                 "Loading the cluster node config from %s: %s",
105                 filename, strerror(errno));
106             exit(1);
107         }
108     }
109 
110     /* Check if the file is zero-length: if so return C_ERR to signal
111      * we have to write the config. */
112     if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
113         fclose(fp);
114         return C_ERR;
115     }
116 
117     /* Parse the file. Note that single lines of the cluster config file can
118      * be really long as they include all the hash slots of the node.
119      * This means in the worst possible case, half of the Redis slots will be
120      * present in a single line, possibly in importing or migrating state, so
121      * together with the node ID of the sender/receiver.
122      *
123      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
124     maxline = 1024+CLUSTER_SLOTS*128;
125     line = zmalloc(maxline);
126     while(fgets(line,maxline,fp) != NULL) {
127         int argc;
128         sds *argv;
129         clusterNode *n, *master;
130         char *p, *s;
131 
132         /* Skip blank lines, they can be created either by users manually
133          * editing nodes.conf or by the config writing process if stopped
134          * before the truncate() call. */
135         if (line[0] == '\n' || line[0] == '\0') continue;
136 
137         /* Split the line into arguments for processing. */
138         argv = sdssplitargs(line,&argc);
139         if (argv == NULL) goto fmterr;
140 
141         /* Handle the special "vars" line. Don't pretend it is the last
142          * line even if it actually is when generated by Redis. */
143         if (strcasecmp(argv[0],"vars") == 0) {
144             if (!(argc % 2)) goto fmterr;
145             for (j = 1; j < argc; j += 2) {
146                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
147                     server.cluster->currentEpoch =
148                             strtoull(argv[j+1],NULL,10);
149                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
150                     server.cluster->lastVoteEpoch =
151                             strtoull(argv[j+1],NULL,10);
152                 } else {
153                     serverLog(LL_WARNING,
154                         "Skipping unknown cluster config variable '%s'",
155                         argv[j]);
156                 }
157             }
158             sdsfreesplitres(argv,argc);
159             continue;
160         }
161 
162         /* Regular config lines have at least eight fields */
163         if (argc < 8) {
164             sdsfreesplitres(argv,argc);
165             goto fmterr;
166         }
167 
168         /* Create this node if it does not exist */
169         n = clusterLookupNode(argv[0]);
170         if (!n) {
171             n = createClusterNode(argv[0],0);
172             clusterAddNode(n);
173         }
174         /* Address and port */
175         if ((p = strrchr(argv[1],':')) == NULL) {
176             sdsfreesplitres(argv,argc);
177             goto fmterr;
178         }
179         *p = '\0';
180         memcpy(n->ip,argv[1],strlen(argv[1])+1);
181         char *port = p+1;
182         char *busp = strchr(port,'@');
183         if (busp) {
184             *busp = '\0';
185             busp++;
186         }
187         n->port = atoi(port);
188         /* In older versions of nodes.conf the "@busport" part is missing.
189          * In this case we set it to the default offset of 10000 from the
190          * base port. */
191         n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
192 
193         /* Parse flags */
194         p = s = argv[2];
195         while(p) {
196             p = strchr(s,',');
197             if (p) *p = '\0';
198             if (!strcasecmp(s,"myself")) {
199                 serverAssert(server.cluster->myself == NULL);
200                 myself = server.cluster->myself = n;
201                 n->flags |= CLUSTER_NODE_MYSELF;
202             } else if (!strcasecmp(s,"master")) {
203                 n->flags |= CLUSTER_NODE_MASTER;
204             } else if (!strcasecmp(s,"slave")) {
205                 n->flags |= CLUSTER_NODE_SLAVE;
206             } else if (!strcasecmp(s,"fail?")) {
207                 n->flags |= CLUSTER_NODE_PFAIL;
208             } else if (!strcasecmp(s,"fail")) {
209                 n->flags |= CLUSTER_NODE_FAIL;
210                 n->fail_time = mstime();
211             } else if (!strcasecmp(s,"handshake")) {
212                 n->flags |= CLUSTER_NODE_HANDSHAKE;
213             } else if (!strcasecmp(s,"noaddr")) {
214                 n->flags |= CLUSTER_NODE_NOADDR;
215             } else if (!strcasecmp(s,"nofailover")) {
216                 n->flags |= CLUSTER_NODE_NOFAILOVER;
217             } else if (!strcasecmp(s,"noflags")) {
218                 /* nothing to do */
219             } else {
220                 serverPanic("Unknown flag in redis cluster config file");
221             }
222             if (p) s = p+1;
223         }
224 
225         /* Get master if any. Set the master and populate master's
226          * slave list. */
227         if (argv[3][0] != '-') {
228             master = clusterLookupNode(argv[3]);
229             if (!master) {
230                 master = createClusterNode(argv[3],0);
231                 clusterAddNode(master);
232             }
233             n->slaveof = master;
234             clusterNodeAddSlave(master,n);
235         }
236 
237         /* Set ping sent / pong received timestamps */
238         if (atoi(argv[4])) n->ping_sent = mstime();
239         if (atoi(argv[5])) n->pong_received = mstime();
240 
241         /* Set configEpoch for this node. */
242         n->configEpoch = strtoull(argv[6],NULL,10);
243 
244         /* Populate hash slots served by this instance. */
245         for (j = 8; j < argc; j++) {
246             int start, stop;
247 
248             if (argv[j][0] == '[') {
249                 /* Here we handle migrating / importing slots */
250                 int slot;
251                 char direction;
252                 clusterNode *cn;
253 
254                 p = strchr(argv[j],'-');
255                 serverAssert(p != NULL);
256                 *p = '\0';
257                 direction = p[1]; /* Either '>' or '<' */
258                 slot = atoi(argv[j]+1);
259                 if (slot < 0 || slot >= CLUSTER_SLOTS) {
260                     sdsfreesplitres(argv,argc);
261                     goto fmterr;
262                 }
263                 p += 3;
264                 cn = clusterLookupNode(p);
265                 if (!cn) {
266                     cn = createClusterNode(p,0);
267                     clusterAddNode(cn);
268                 }
269                 if (direction == '>') {
270                     server.cluster->migrating_slots_to[slot] = cn;
271                 } else {
272                     server.cluster->importing_slots_from[slot] = cn;
273                 }
274                 continue;
275             } else if ((p = strchr(argv[j],'-')) != NULL) {
276                 *p = '\0';
277                 start = atoi(argv[j]);
278                 stop = atoi(p+1);
279             } else {
280                 start = stop = atoi(argv[j]);
281             }
282             if (start < 0 || start >= CLUSTER_SLOTS ||
283                 stop < 0 || stop >= CLUSTER_SLOTS)
284             {
285                 sdsfreesplitres(argv,argc);
286                 goto fmterr;
287             }
288             while(start <= stop) clusterAddSlot(n, start++);
289         }
290 
291         sdsfreesplitres(argv,argc);
292     }
293     /* Config sanity check */
294     if (server.cluster->myself == NULL) goto fmterr;
295 
296     zfree(line);
297     fclose(fp);
298 
299     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
300 
301     /* Something that should never happen: currentEpoch smaller than
302      * the max epoch found in the nodes configuration. However we handle this
303      * as some form of protection against manual editing of critical files. */
304     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
305         server.cluster->currentEpoch = clusterGetMaxEpoch();
306     }
307     return C_OK;
308 
309 fmterr:
310     serverLog(LL_WARNING,
311         "Unrecoverable error: corrupted cluster config file.");
312     zfree(line);
313     if (fp) fclose(fp);
314     exit(1);
315 }
316 
317 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
318  *
319  * This function writes the node config and returns 0, on error -1
320  * is returned.
321  *
322  * Note: we need to write the file in an atomic way from the point of view
323  * of the POSIX filesystem semantics, so that if the server is stopped
324  * or crashes during the write, we'll end with either the old file or the
325  * new one. Since we have the full payload to write available we can use
326  * a single write to write the whole file. If the pre-existing file was
327  * bigger we pad our payload with newlines that are anyway ignored and truncate
328  * the file afterward. */
clusterSaveConfig(int do_fsync)329 int clusterSaveConfig(int do_fsync) {
330     sds ci;
331     size_t content_size;
332     struct stat sb;
333     int fd;
334 
335     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
336 
337     /* Get the nodes description and concatenate our "vars" directive to
338      * save currentEpoch and lastVoteEpoch. */
339     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
340     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
341         (unsigned long long) server.cluster->currentEpoch,
342         (unsigned long long) server.cluster->lastVoteEpoch);
343     content_size = sdslen(ci);
344 
345     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
346         == -1) goto err;
347 
348     /* Pad the new payload if the existing file length is greater. */
349     if (fstat(fd,&sb) != -1) {
350         if (sb.st_size > (off_t)content_size) {
351             ci = sdsgrowzero(ci,sb.st_size);
352             memset(ci+content_size,'\n',sb.st_size-content_size);
353         }
354     }
355     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
356     if (do_fsync) {
357         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
358         fsync(fd);
359     }
360 
361     /* Truncate the file if needed to remove the final \n padding that
362      * is just garbage. */
363     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
364         /* ftruncate() failing is not a critical error. */
365     }
366     close(fd);
367     sdsfree(ci);
368     return 0;
369 
370 err:
371     if (fd != -1) close(fd);
372     sdsfree(ci);
373     return -1;
374 }
375 
clusterSaveConfigOrDie(int do_fsync)376 void clusterSaveConfigOrDie(int do_fsync) {
377     if (clusterSaveConfig(do_fsync) == -1) {
378         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
379         exit(1);
380     }
381 }
382 
383 /* Lock the cluster config using flock(), and leaks the file descriptor used to
384  * acquire the lock so that the file will be locked forever.
385  *
386  * This works because we always update nodes.conf with a new version
387  * in-place, reopening the file, and writing to it in place (later adjusting
388  * the length with ftruncate()).
389  *
390  * On success C_OK is returned, otherwise an error is logged and
391  * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)392 int clusterLockConfig(char *filename) {
393 /* flock() does not exist on Solaris
394  * and a fcntl-based solution won't help, as we constantly re-open that file,
395  * which will release _all_ locks anyway
396  */
397 #if !defined(__sun)
398     /* To lock it, we need to open the file in a way it is created if
399      * it does not exist, otherwise there is a race condition with other
400      * processes. */
401     int fd = open(filename,O_WRONLY|O_CREAT,0644);
402     if (fd == -1) {
403         serverLog(LL_WARNING,
404             "Can't open %s in order to acquire a lock: %s",
405             filename, strerror(errno));
406         return C_ERR;
407     }
408 
409     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
410         if (errno == EWOULDBLOCK) {
411             serverLog(LL_WARNING,
412                  "Sorry, the cluster configuration file %s is already used "
413                  "by a different Redis Cluster node. Please make sure that "
414                  "different nodes use different cluster configuration "
415                  "files.", filename);
416         } else {
417             serverLog(LL_WARNING,
418                 "Impossible to lock %s: %s", filename, strerror(errno));
419         }
420         close(fd);
421         return C_ERR;
422     }
423     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
424      * lock to the file as long as the process exists.
425      *
426      * After fork, the child process will get the fd opened by the parent process,
427      * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
428      * it will be closed in the child process.
429      * If it is not closed, when the main process is killed -9, but the child process
430      * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
431      * child process, and the main process will fail to get lock, means fail to start. */
432     server.cluster_config_file_lock_fd = fd;
433 #endif /* __sun */
434 
435     return C_OK;
436 }
437 
438 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
439  * in the "myself" node based on the current configuration of the node,
440  * that may change at runtime via CONFIG SET. This function changes the
441  * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)442 void clusterUpdateMyselfFlags(void) {
443     int oldflags = myself->flags;
444     int nofailover = server.cluster_slave_no_failover ?
445                      CLUSTER_NODE_NOFAILOVER : 0;
446     myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
447     myself->flags |= nofailover;
448     if (myself->flags != oldflags) {
449         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
450                              CLUSTER_TODO_UPDATE_STATE);
451     }
452 }
453 
clusterInit(void)454 void clusterInit(void) {
455     int saveconf = 0;
456 
457     server.cluster = zmalloc(sizeof(clusterState));
458     server.cluster->myself = NULL;
459     server.cluster->currentEpoch = 0;
460     server.cluster->state = CLUSTER_FAIL;
461     server.cluster->size = 1;
462     server.cluster->todo_before_sleep = 0;
463     server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
464     server.cluster->nodes_black_list =
465         dictCreate(&clusterNodesBlackListDictType,NULL);
466     server.cluster->failover_auth_time = 0;
467     server.cluster->failover_auth_count = 0;
468     server.cluster->failover_auth_rank = 0;
469     server.cluster->failover_auth_epoch = 0;
470     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
471     server.cluster->lastVoteEpoch = 0;
472     for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
473         server.cluster->stats_bus_messages_sent[i] = 0;
474         server.cluster->stats_bus_messages_received[i] = 0;
475     }
476     server.cluster->stats_pfail_nodes = 0;
477     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
478     clusterCloseAllSlots();
479 
480     /* Lock the cluster config file to make sure every node uses
481      * its own nodes.conf. */
482     server.cluster_config_file_lock_fd = -1;
483     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
484         exit(1);
485 
486     /* Load or create a new nodes configuration. */
487     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
488         /* No configuration found. We will just use the random name provided
489          * by the createClusterNode() function. */
490         myself = server.cluster->myself =
491             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
492         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
493             myself->name);
494         clusterAddNode(myself);
495         saveconf = 1;
496     }
497     if (saveconf) clusterSaveConfigOrDie(1);
498 
499     /* We need a listening TCP port for our cluster messaging needs. */
500     server.cfd_count = 0;
501 
502     /* Port sanity check II
503      * The other handshake port check is triggered too late to stop
504      * us from trying to use a too-high cluster port number. */
505     int port = server.tls_cluster ? server.tls_port : server.port;
506     if (port > (65535-CLUSTER_PORT_INCR)) {
507         serverLog(LL_WARNING, "Redis port number too high. "
508                    "Cluster communication port is 10,000 port "
509                    "numbers higher than your Redis port. "
510                    "Your Redis port number must be "
511                    "lower than 55535.");
512         exit(1);
513     }
514     if (listenToPort(port+CLUSTER_PORT_INCR,
515         server.cfd,&server.cfd_count) == C_ERR)
516     {
517         exit(1);
518     } else {
519         int j;
520 
521         for (j = 0; j < server.cfd_count; j++) {
522             if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
523                 clusterAcceptHandler, NULL) == AE_ERR)
524                     serverPanic("Unrecoverable error creating Redis Cluster "
525                                 "file event.");
526         }
527     }
528 
529     /* The slots -> keys map is a radix tree. Initialize it here. */
530     server.cluster->slots_to_keys = raxNew();
531     memset(server.cluster->slots_keys_count,0,
532            sizeof(server.cluster->slots_keys_count));
533 
534     /* Set myself->port / cport to my listening ports, we'll just need to
535      * discover the IP address via MEET messages. */
536     myself->port = port;
537     myself->cport = port+CLUSTER_PORT_INCR;
538     if (server.cluster_announce_port)
539         myself->port = server.cluster_announce_port;
540     if (server.cluster_announce_bus_port)
541         myself->cport = server.cluster_announce_bus_port;
542 
543     server.cluster->mf_end = 0;
544     resetManualFailover();
545     clusterUpdateMyselfFlags();
546 }
547 
548 /* Reset a node performing a soft or hard reset:
549  *
550  * 1) All other nodes are forgotten.
551  * 2) All the assigned / open slots are released.
552  * 3) If the node is a slave, it turns into a master.
553  * 4) Only for hard reset: a new Node ID is generated.
554  * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
555  * 6) The new configuration is saved and the cluster state updated.
556  * 7) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)557 void clusterReset(int hard) {
558     dictIterator *di;
559     dictEntry *de;
560     int j;
561 
562     /* Turn into master. */
563     if (nodeIsSlave(myself)) {
564         clusterSetNodeAsMaster(myself);
565         replicationUnsetMaster();
566         emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
567     }
568 
569     /* Close slots, reset manual failover state. */
570     clusterCloseAllSlots();
571     resetManualFailover();
572 
573     /* Unassign all the slots. */
574     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
575 
576     /* Forget all the nodes, but myself. */
577     di = dictGetSafeIterator(server.cluster->nodes);
578     while((de = dictNext(di)) != NULL) {
579         clusterNode *node = dictGetVal(de);
580 
581         if (node == myself) continue;
582         clusterDelNode(node);
583     }
584     dictReleaseIterator(di);
585 
586     /* Hard reset only: set epochs to 0, change node ID. */
587     if (hard) {
588         sds oldname;
589 
590         server.cluster->currentEpoch = 0;
591         server.cluster->lastVoteEpoch = 0;
592         myself->configEpoch = 0;
593         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
594 
595         /* To change the Node ID we need to remove the old name from the
596          * nodes table, change the ID, and re-add back with new name. */
597         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
598         dictDelete(server.cluster->nodes,oldname);
599         sdsfree(oldname);
600         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
601         clusterAddNode(myself);
602         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
603     }
604 
605     /* Make sure to persist the new config and update the state. */
606     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
607                          CLUSTER_TODO_UPDATE_STATE|
608                          CLUSTER_TODO_FSYNC_CONFIG);
609 }
610 
611 /* -----------------------------------------------------------------------------
612  * CLUSTER communication link
613  * -------------------------------------------------------------------------- */
614 
createClusterLink(clusterNode * node)615 clusterLink *createClusterLink(clusterNode *node) {
616     clusterLink *link = zmalloc(sizeof(*link));
617     link->ctime = mstime();
618     link->sndbuf = sdsempty();
619     link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
620     link->rcvbuf_len = 0;
621     link->node = node;
622     link->conn = NULL;
623     return link;
624 }
625 
626 /* Free a cluster link, but does not free the associated node of course.
627  * This function will just make sure that the original node associated
628  * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)629 void freeClusterLink(clusterLink *link) {
630     if (link->conn) {
631         connClose(link->conn);
632         link->conn = NULL;
633     }
634     sdsfree(link->sndbuf);
635     zfree(link->rcvbuf);
636     if (link->node)
637         link->node->link = NULL;
638     zfree(link);
639 }
640 
clusterConnAcceptHandler(connection * conn)641 static void clusterConnAcceptHandler(connection *conn) {
642     clusterLink *link;
643 
644     if (connGetState(conn) != CONN_STATE_CONNECTED) {
645         serverLog(LL_VERBOSE,
646                 "Error accepting cluster node connection: %s", connGetLastError(conn));
647         connClose(conn);
648         return;
649     }
650 
651     /* Create a link object we use to handle the connection.
652      * It gets passed to the readable handler when data is available.
653      * Initially the link->node pointer is set to NULL as we don't know
654      * which node is, but the right node is references once we know the
655      * node identity. */
656     link = createClusterLink(NULL);
657     link->conn = conn;
658     connSetPrivateData(conn, link);
659 
660     /* Register read handler */
661     connSetReadHandler(conn, clusterReadHandler);
662 }
663 
664 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)665 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
666     int cport, cfd;
667     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
668     char cip[NET_IP_STR_LEN];
669     UNUSED(el);
670     UNUSED(mask);
671     UNUSED(privdata);
672 
673     /* If the server is starting up, don't accept cluster connections:
674      * UPDATE messages may interact with the database content. */
675     if (server.masterhost == NULL && server.loading) return;
676 
677     while(max--) {
678         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
679         if (cfd == ANET_ERR) {
680             if (errno != EWOULDBLOCK)
681                 serverLog(LL_VERBOSE,
682                     "Error accepting cluster node: %s", server.neterr);
683             return;
684         }
685 
686         connection *conn = server.tls_cluster ?
687             connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
688 
689         /* Make sure connection is not in an error state */
690         if (connGetState(conn) != CONN_STATE_ACCEPTING) {
691             serverLog(LL_VERBOSE,
692                 "Error creating an accepting connection for cluster node: %s",
693                     connGetLastError(conn));
694             connClose(conn);
695             return;
696         }
697         connNonBlock(conn);
698         connEnableTcpNoDelay(conn);
699         connKeepAlive(conn,server.cluster_node_timeout * 2);
700 
701         /* Use non-blocking I/O for cluster messages. */
702         serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
703 
704         /* Accept the connection now.  connAccept() may call our handler directly
705          * or schedule it for later depending on connection implementation.
706          */
707         if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
708             if (connGetState(conn) == CONN_STATE_ERROR)
709                 serverLog(LL_VERBOSE,
710                         "Error accepting cluster node connection: %s",
711                         connGetLastError(conn));
712             connClose(conn);
713             return;
714         }
715     }
716 }
717 
718 /* Return the approximated number of sockets we are using in order to
719  * take the cluster bus connections. */
getClusterConnectionsCount(void)720 unsigned long getClusterConnectionsCount(void) {
721     /* We decrement the number of nodes by one, since there is the
722      * "myself" node too in the list. Each node uses two file descriptors,
723      * one incoming and one outgoing, thus the multiplication by 2. */
724     return server.cluster_enabled ?
725            ((dictSize(server.cluster->nodes)-1)*2) : 0;
726 }
727 
728 /* -----------------------------------------------------------------------------
729  * Key space handling
730  * -------------------------------------------------------------------------- */
731 
732 /* We have 16384 hash slots. The hash slot of a given key is obtained
733  * as the least significant 14 bits of the crc16 of the key.
734  *
735  * However if the key contains the {...} pattern, only the part between
736  * { and } is hashed. This may be useful in the future to force certain
737  * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)738 unsigned int keyHashSlot(char *key, int keylen) {
739     int s, e; /* start-end indexes of { and } */
740 
741     for (s = 0; s < keylen; s++)
742         if (key[s] == '{') break;
743 
744     /* No '{' ? Hash the whole key. This is the base case. */
745     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
746 
747     /* '{' found? Check if we have the corresponding '}'. */
748     for (e = s+1; e < keylen; e++)
749         if (key[e] == '}') break;
750 
751     /* No '}' or nothing between {} ? Hash the whole key. */
752     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
753 
754     /* If we are here there is both a { and a } on its right. Hash
755      * what is in the middle between { and }. */
756     return crc16(key+s+1,e-s-1) & 0x3FFF;
757 }
758 
759 /* -----------------------------------------------------------------------------
760  * CLUSTER node API
761  * -------------------------------------------------------------------------- */
762 
763 /* Create a new cluster node, with the specified flags.
764  * If "nodename" is NULL this is considered a first handshake and a random
765  * node name is assigned to this node (it will be fixed later when we'll
766  * receive the first pong).
767  *
768  * The node is created and returned to the user, but it is not automatically
769  * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)770 clusterNode *createClusterNode(char *nodename, int flags) {
771     clusterNode *node = zmalloc(sizeof(*node));
772 
773     if (nodename)
774         memcpy(node->name, nodename, CLUSTER_NAMELEN);
775     else
776         getRandomHexChars(node->name, CLUSTER_NAMELEN);
777     node->ctime = mstime();
778     node->configEpoch = 0;
779     node->flags = flags;
780     memset(node->slots,0,sizeof(node->slots));
781     node->numslots = 0;
782     node->numslaves = 0;
783     node->slaves = NULL;
784     node->slaveof = NULL;
785     node->ping_sent = node->pong_received = 0;
786     node->data_received = 0;
787     node->fail_time = 0;
788     node->link = NULL;
789     memset(node->ip,0,sizeof(node->ip));
790     node->port = 0;
791     node->cport = 0;
792     node->fail_reports = listCreate();
793     node->voted_time = 0;
794     node->orphaned_time = 0;
795     node->repl_offset_time = 0;
796     node->repl_offset = 0;
797     listSetFreeMethod(node->fail_reports,zfree);
798     return node;
799 }
800 
801 /* This function is called every time we get a failure report from a node.
802  * The side effect is to populate the fail_reports list (or to update
803  * the timestamp of an existing report).
804  *
805  * 'failing' is the node that is in failure state according to the
806  * 'sender' node.
807  *
808  * The function returns 0 if it just updates a timestamp of an existing
809  * failure report from the same sender. 1 is returned if a new failure
810  * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)811 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
812     list *l = failing->fail_reports;
813     listNode *ln;
814     listIter li;
815     clusterNodeFailReport *fr;
816 
817     /* If a failure report from the same sender already exists, just update
818      * the timestamp. */
819     listRewind(l,&li);
820     while ((ln = listNext(&li)) != NULL) {
821         fr = ln->value;
822         if (fr->node == sender) {
823             fr->time = mstime();
824             return 0;
825         }
826     }
827 
828     /* Otherwise create a new report. */
829     fr = zmalloc(sizeof(*fr));
830     fr->node = sender;
831     fr->time = mstime();
832     listAddNodeTail(l,fr);
833     return 1;
834 }
835 
836 /* Remove failure reports that are too old, where too old means reasonably
837  * older than the global node timeout. Note that anyway for a node to be
838  * flagged as FAIL we need to have a local PFAIL state that is at least
839  * older than the global node timeout, so we don't just trust the number
840  * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)841 void clusterNodeCleanupFailureReports(clusterNode *node) {
842     list *l = node->fail_reports;
843     listNode *ln;
844     listIter li;
845     clusterNodeFailReport *fr;
846     mstime_t maxtime = server.cluster_node_timeout *
847                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
848     mstime_t now = mstime();
849 
850     listRewind(l,&li);
851     while ((ln = listNext(&li)) != NULL) {
852         fr = ln->value;
853         if (now - fr->time > maxtime) listDelNode(l,ln);
854     }
855 }
856 
857 /* Remove the failing report for 'node' if it was previously considered
858  * failing by 'sender'. This function is called when a node informs us via
859  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
860  *
861  * Note that this function is called relatively often as it gets called even
862  * when there are no nodes failing, and is O(N), however when the cluster is
863  * fine the failure reports list is empty so the function runs in constant
864  * time.
865  *
866  * The function returns 1 if the failure report was found and removed.
867  * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)868 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
869     list *l = node->fail_reports;
870     listNode *ln;
871     listIter li;
872     clusterNodeFailReport *fr;
873 
874     /* Search for a failure report from this sender. */
875     listRewind(l,&li);
876     while ((ln = listNext(&li)) != NULL) {
877         fr = ln->value;
878         if (fr->node == sender) break;
879     }
880     if (!ln) return 0; /* No failure report from this sender. */
881 
882     /* Remove the failure report. */
883     listDelNode(l,ln);
884     clusterNodeCleanupFailureReports(node);
885     return 1;
886 }
887 
888 /* Return the number of external nodes that believe 'node' is failing,
889  * not including this node, that may have a PFAIL or FAIL state for this
890  * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)891 int clusterNodeFailureReportsCount(clusterNode *node) {
892     clusterNodeCleanupFailureReports(node);
893     return listLength(node->fail_reports);
894 }
895 
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)896 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
897     int j;
898 
899     for (j = 0; j < master->numslaves; j++) {
900         if (master->slaves[j] == slave) {
901             if ((j+1) < master->numslaves) {
902                 int remaining_slaves = (master->numslaves - j) - 1;
903                 memmove(master->slaves+j,master->slaves+(j+1),
904                         (sizeof(*master->slaves) * remaining_slaves));
905             }
906             master->numslaves--;
907             if (master->numslaves == 0)
908                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
909             return C_OK;
910         }
911     }
912     return C_ERR;
913 }
914 
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)915 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
916     int j;
917 
918     /* If it's already a slave, don't add it again. */
919     for (j = 0; j < master->numslaves; j++)
920         if (master->slaves[j] == slave) return C_ERR;
921     master->slaves = zrealloc(master->slaves,
922         sizeof(clusterNode*)*(master->numslaves+1));
923     master->slaves[master->numslaves] = slave;
924     master->numslaves++;
925     master->flags |= CLUSTER_NODE_MIGRATE_TO;
926     return C_OK;
927 }
928 
clusterCountNonFailingSlaves(clusterNode * n)929 int clusterCountNonFailingSlaves(clusterNode *n) {
930     int j, okslaves = 0;
931 
932     for (j = 0; j < n->numslaves; j++)
933         if (!nodeFailed(n->slaves[j])) okslaves++;
934     return okslaves;
935 }
936 
937 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)938 void freeClusterNode(clusterNode *n) {
939     sds nodename;
940     int j;
941 
942     /* If the node has associated slaves, we have to set
943      * all the slaves->slaveof fields to NULL (unknown). */
944     for (j = 0; j < n->numslaves; j++)
945         n->slaves[j]->slaveof = NULL;
946 
947     /* Remove this node from the list of slaves of its master. */
948     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
949 
950     /* Unlink from the set of nodes. */
951     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
952     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
953     sdsfree(nodename);
954 
955     /* Release link and associated data structures. */
956     if (n->link) freeClusterLink(n->link);
957     listRelease(n->fail_reports);
958     zfree(n->slaves);
959     zfree(n);
960 }
961 
962 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)963 int clusterAddNode(clusterNode *node) {
964     int retval;
965 
966     retval = dictAdd(server.cluster->nodes,
967             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
968     return (retval == DICT_OK) ? C_OK : C_ERR;
969 }
970 
971 /* Remove a node from the cluster. The function performs the high level
972  * cleanup, calling freeClusterNode() for the low level cleanup.
973  * Here we do the following:
974  *
975  * 1) Mark all the slots handled by it as unassigned.
976  * 2) Remove all the failure reports sent by this node and referenced by
977  *    other nodes.
978  * 3) Free the node with freeClusterNode() that will in turn remove it
979  *    from the hash table and from the list of slaves of its master, if
980  *    it is a slave node.
981  */
clusterDelNode(clusterNode * delnode)982 void clusterDelNode(clusterNode *delnode) {
983     int j;
984     dictIterator *di;
985     dictEntry *de;
986 
987     /* 1) Mark slots as unassigned. */
988     for (j = 0; j < CLUSTER_SLOTS; j++) {
989         if (server.cluster->importing_slots_from[j] == delnode)
990             server.cluster->importing_slots_from[j] = NULL;
991         if (server.cluster->migrating_slots_to[j] == delnode)
992             server.cluster->migrating_slots_to[j] = NULL;
993         if (server.cluster->slots[j] == delnode)
994             clusterDelSlot(j);
995     }
996 
997     /* 2) Remove failure reports. */
998     di = dictGetSafeIterator(server.cluster->nodes);
999     while((de = dictNext(di)) != NULL) {
1000         clusterNode *node = dictGetVal(de);
1001 
1002         if (node == delnode) continue;
1003         clusterNodeDelFailureReport(node,delnode);
1004     }
1005     dictReleaseIterator(di);
1006 
1007     /* 3) Free the node, unlinking it from the cluster. */
1008     freeClusterNode(delnode);
1009 }
1010 
1011 /* Node lookup by name */
clusterLookupNode(const char * name)1012 clusterNode *clusterLookupNode(const char *name) {
1013     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
1014     dictEntry *de;
1015 
1016     de = dictFind(server.cluster->nodes,s);
1017     sdsfree(s);
1018     if (de == NULL) return NULL;
1019     return dictGetVal(de);
1020 }
1021 
1022 /* This is only used after the handshake. When we connect a given IP/PORT
1023  * as a result of CLUSTER MEET we don't have the node name yet, so we
1024  * pick a random one, and will fix it when we receive the PONG request using
1025  * this function. */
clusterRenameNode(clusterNode * node,char * newname)1026 void clusterRenameNode(clusterNode *node, char *newname) {
1027     int retval;
1028     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1029 
1030     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1031         node->name, newname);
1032     retval = dictDelete(server.cluster->nodes, s);
1033     sdsfree(s);
1034     serverAssert(retval == DICT_OK);
1035     memcpy(node->name, newname, CLUSTER_NAMELEN);
1036     clusterAddNode(node);
1037 }
1038 
1039 /* -----------------------------------------------------------------------------
1040  * CLUSTER config epoch handling
1041  * -------------------------------------------------------------------------- */
1042 
1043 /* Return the greatest configEpoch found in the cluster, or the current
1044  * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)1045 uint64_t clusterGetMaxEpoch(void) {
1046     uint64_t max = 0;
1047     dictIterator *di;
1048     dictEntry *de;
1049 
1050     di = dictGetSafeIterator(server.cluster->nodes);
1051     while((de = dictNext(di)) != NULL) {
1052         clusterNode *node = dictGetVal(de);
1053         if (node->configEpoch > max) max = node->configEpoch;
1054     }
1055     dictReleaseIterator(di);
1056     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1057     return max;
1058 }
1059 
1060 /* If this node epoch is zero or is not already the greatest across the
1061  * cluster (from the POV of the local configuration), this function will:
1062  *
1063  * 1) Generate a new config epoch, incrementing the current epoch.
1064  * 2) Assign the new epoch to this node, WITHOUT any consensus.
1065  * 3) Persist the configuration on disk before sending packets with the
1066  *    new configuration.
1067  *
1068  * If the new config epoch is generated and assigned, C_OK is returned,
1069  * otherwise C_ERR is returned (since the node has already the greatest
1070  * configuration around) and no operation is performed.
1071  *
1072  * Important note: this function violates the principle that config epochs
1073  * should be generated with consensus and should be unique across the cluster.
1074  * However Redis Cluster uses this auto-generated new config epochs in two
1075  * cases:
1076  *
1077  * 1) When slots are closed after importing. Otherwise resharding would be
1078  *    too expensive.
1079  * 2) When CLUSTER FAILOVER is called with options that force a slave to
1080  *    failover its master even if there is not master majority able to
1081  *    create a new configuration epoch.
1082  *
1083  * Redis Cluster will not explode using this function, even in the case of
1084  * a collision between this node and another node, generating the same
1085  * configuration epoch unilaterally, because the config epoch conflict
1086  * resolution algorithm will eventually move colliding nodes to different
1087  * config epochs. However using this function may violate the "last failover
1088  * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1089 int clusterBumpConfigEpochWithoutConsensus(void) {
1090     uint64_t maxEpoch = clusterGetMaxEpoch();
1091 
1092     if (myself->configEpoch == 0 ||
1093         myself->configEpoch != maxEpoch)
1094     {
1095         server.cluster->currentEpoch++;
1096         myself->configEpoch = server.cluster->currentEpoch;
1097         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1098                              CLUSTER_TODO_FSYNC_CONFIG);
1099         serverLog(LL_WARNING,
1100             "New configEpoch set to %llu",
1101             (unsigned long long) myself->configEpoch);
1102         return C_OK;
1103     } else {
1104         return C_ERR;
1105     }
1106 }
1107 
1108 /* This function is called when this node is a master, and we receive from
1109  * another master a configuration epoch that is equal to our configuration
1110  * epoch.
1111  *
1112  * BACKGROUND
1113  *
1114  * It is not possible that different slaves get the same config
1115  * epoch during a failover election, because the slaves need to get voted
1116  * by a majority. However when we perform a manual resharding of the cluster
1117  * the node will assign a configuration epoch to itself without to ask
1118  * for agreement. Usually resharding happens when the cluster is working well
1119  * and is supervised by the sysadmin, however it is possible for a failover
1120  * to happen exactly while the node we are resharding a slot to assigns itself
1121  * a new configuration epoch, but before it is able to propagate it.
1122  *
1123  * So technically it is possible in this condition that two nodes end with
1124  * the same configuration epoch.
1125  *
1126  * Another possibility is that there are bugs in the implementation causing
1127  * this to happen.
1128  *
1129  * Moreover when a new cluster is created, all the nodes start with the same
1130  * configEpoch. This collision resolution code allows nodes to automatically
1131  * end with a different configEpoch at startup automatically.
1132  *
1133  * In all the cases, we want a mechanism that resolves this issue automatically
1134  * as a safeguard. The same configuration epoch for masters serving different
1135  * set of slots is not harmful, but it is if the nodes end serving the same
1136  * slots for some reason (manual errors or software bugs) without a proper
1137  * failover procedure.
1138  *
1139  * In general we want a system that eventually always ends with different
1140  * masters having different configuration epochs whatever happened, since
1141  * nothing is worse than a split-brain condition in a distributed system.
1142  *
1143  * BEHAVIOR
1144  *
1145  * When this function gets called, what happens is that if this node
1146  * has the lexicographically smaller Node ID compared to the other node
1147  * with the conflicting epoch (the 'sender' node), it will assign itself
1148  * the greatest configuration epoch currently detected among nodes plus 1.
1149  *
1150  * This means that even if there are multiple nodes colliding, the node
1151  * with the greatest Node ID never moves forward, so eventually all the nodes
1152  * end with a different configuration epoch.
1153  */
clusterHandleConfigEpochCollision(clusterNode * sender)1154 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1155     /* Prerequisites: nodes have the same configEpoch and are both masters. */
1156     if (sender->configEpoch != myself->configEpoch ||
1157         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1158     /* Don't act if the colliding node has a smaller Node ID. */
1159     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1160     /* Get the next ID available at the best of this node knowledge. */
1161     server.cluster->currentEpoch++;
1162     myself->configEpoch = server.cluster->currentEpoch;
1163     clusterSaveConfigOrDie(1);
1164     serverLog(LL_VERBOSE,
1165         "WARNING: configEpoch collision with node %.40s."
1166         " configEpoch set to %llu",
1167         sender->name,
1168         (unsigned long long) myself->configEpoch);
1169 }
1170 
1171 /* -----------------------------------------------------------------------------
1172  * CLUSTER nodes blacklist
1173  *
1174  * The nodes blacklist is just a way to ensure that a given node with a given
1175  * Node ID is not readded before some time elapsed (this time is specified
1176  * in seconds in CLUSTER_BLACKLIST_TTL).
1177  *
1178  * This is useful when we want to remove a node from the cluster completely:
1179  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1180  * that even if we receive gossip messages from other nodes that still remember
1181  * about the node we want to remove, we don't re-add it before some time.
1182  *
1183  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1184  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1185  * in the cluster without dealing with the problem of other nodes re-adding
1186  * back the node to nodes we already sent the FORGET command to.
1187  *
1188  * The data structure used is a hash table with an sds string representing
1189  * the node ID as key, and the time when it is ok to re-add the node as
1190  * value.
1191  * -------------------------------------------------------------------------- */
1192 
1193 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
1194 
1195 
1196 /* Before of the addNode() or Exists() operations we always remove expired
1197  * entries from the black list. This is an O(N) operation but it is not a
1198  * problem since add / exists operations are called very infrequently and
1199  * the hash table is supposed to contain very little elements at max.
1200  * However without the cleanup during long uptime and with some automated
1201  * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1202 void clusterBlacklistCleanup(void) {
1203     dictIterator *di;
1204     dictEntry *de;
1205 
1206     di = dictGetSafeIterator(server.cluster->nodes_black_list);
1207     while((de = dictNext(di)) != NULL) {
1208         int64_t expire = dictGetUnsignedIntegerVal(de);
1209 
1210         if (expire < server.unixtime)
1211             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1212     }
1213     dictReleaseIterator(di);
1214 }
1215 
1216 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1217 void clusterBlacklistAddNode(clusterNode *node) {
1218     dictEntry *de;
1219     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1220 
1221     clusterBlacklistCleanup();
1222     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1223         /* If the key was added, duplicate the sds string representation of
1224          * the key for the next lookup. We'll free it at the end. */
1225         id = sdsdup(id);
1226     }
1227     de = dictFind(server.cluster->nodes_black_list,id);
1228     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1229     sdsfree(id);
1230 }
1231 
1232 /* Return non-zero if the specified node ID exists in the blacklist.
1233  * You don't need to pass an sds string here, any pointer to 40 bytes
1234  * will work. */
clusterBlacklistExists(char * nodeid)1235 int clusterBlacklistExists(char *nodeid) {
1236     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1237     int retval;
1238 
1239     clusterBlacklistCleanup();
1240     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1241     sdsfree(id);
1242     return retval;
1243 }
1244 
1245 /* -----------------------------------------------------------------------------
1246  * CLUSTER messages exchange - PING/PONG and gossip
1247  * -------------------------------------------------------------------------- */
1248 
1249 /* This function checks if a given node should be marked as FAIL.
1250  * It happens if the following conditions are met:
1251  *
1252  * 1) We received enough failure reports from other master nodes via gossip.
1253  *    Enough means that the majority of the masters signaled the node is
1254  *    down recently.
1255  * 2) We believe this node is in PFAIL state.
1256  *
1257  * If a failure is detected we also inform the whole cluster about this
1258  * event trying to force every other node to set the FAIL flag for the node.
1259  *
1260  * Note that the form of agreement used here is weak, as we collect the majority
1261  * of masters state during some time, and even if we force agreement by
1262  * propagating the FAIL message, because of partitions we may not reach every
1263  * node. However:
1264  *
1265  * 1) Either we reach the majority and eventually the FAIL state will propagate
1266  *    to all the cluster.
1267  * 2) Or there is no majority so no slave promotion will be authorized and the
1268  *    FAIL flag will be cleared after some time.
1269  */
markNodeAsFailingIfNeeded(clusterNode * node)1270 void markNodeAsFailingIfNeeded(clusterNode *node) {
1271     int failures;
1272     int needed_quorum = (server.cluster->size / 2) + 1;
1273 
1274     if (!nodeTimedOut(node)) return; /* We can reach it. */
1275     if (nodeFailed(node)) return; /* Already FAILing. */
1276 
1277     failures = clusterNodeFailureReportsCount(node);
1278     /* Also count myself as a voter if I'm a master. */
1279     if (nodeIsMaster(myself)) failures++;
1280     if (failures < needed_quorum) return; /* No weak agreement from masters. */
1281 
1282     serverLog(LL_NOTICE,
1283         "Marking node %.40s as failing (quorum reached).", node->name);
1284 
1285     /* Mark the node as failing. */
1286     node->flags &= ~CLUSTER_NODE_PFAIL;
1287     node->flags |= CLUSTER_NODE_FAIL;
1288     node->fail_time = mstime();
1289 
1290     /* Broadcast the failing node name to everybody, forcing all the other
1291      * reachable nodes to flag the node as FAIL.
1292      * We do that even if this node is a replica and not a master: anyway
1293      * the failing state is triggered collecting failure reports from masters,
1294      * so here the replica is only helping propagating this status. */
1295     clusterSendFail(node->name);
1296     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1297 }
1298 
1299 /* This function is called only if a node is marked as FAIL, but we are able
1300  * to reach it again. It checks if there are the conditions to undo the FAIL
1301  * state. */
clearNodeFailureIfNeeded(clusterNode * node)1302 void clearNodeFailureIfNeeded(clusterNode *node) {
1303     mstime_t now = mstime();
1304 
1305     serverAssert(nodeFailed(node));
1306 
1307     /* For slaves we always clear the FAIL flag if we can contact the
1308      * node again. */
1309     if (nodeIsSlave(node) || node->numslots == 0) {
1310         serverLog(LL_NOTICE,
1311             "Clear FAIL state for node %.40s: %s is reachable again.",
1312                 node->name,
1313                 nodeIsSlave(node) ? "replica" : "master without slots");
1314         node->flags &= ~CLUSTER_NODE_FAIL;
1315         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1316     }
1317 
1318     /* If it is a master and...
1319      * 1) The FAIL state is old enough.
1320      * 2) It is yet serving slots from our point of view (not failed over).
1321      * Apparently no one is going to fix these slots, clear the FAIL flag. */
1322     if (nodeIsMaster(node) && node->numslots > 0 &&
1323         (now - node->fail_time) >
1324         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1325     {
1326         serverLog(LL_NOTICE,
1327             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1328                 node->name);
1329         node->flags &= ~CLUSTER_NODE_FAIL;
1330         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1331     }
1332 }
1333 
1334 /* Return true if we already have a node in HANDSHAKE state matching the
1335  * specified ip address and port number. This function is used in order to
1336  * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1337 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1338     dictIterator *di;
1339     dictEntry *de;
1340 
1341     di = dictGetSafeIterator(server.cluster->nodes);
1342     while((de = dictNext(di)) != NULL) {
1343         clusterNode *node = dictGetVal(de);
1344 
1345         if (!nodeInHandshake(node)) continue;
1346         if (!strcasecmp(node->ip,ip) &&
1347             node->port == port &&
1348             node->cport == cport) break;
1349     }
1350     dictReleaseIterator(di);
1351     return de != NULL;
1352 }
1353 
1354 /* Start a handshake with the specified address if there is not one
1355  * already in progress. Returns non-zero if the handshake was actually
1356  * started. On error zero is returned and errno is set to one of the
1357  * following values:
1358  *
1359  * EAGAIN - There is already a handshake in progress for this address.
1360  * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1361 int clusterStartHandshake(char *ip, int port, int cport) {
1362     clusterNode *n;
1363     char norm_ip[NET_IP_STR_LEN];
1364     struct sockaddr_storage sa;
1365 
1366     /* IP sanity check */
1367     if (inet_pton(AF_INET,ip,
1368             &(((struct sockaddr_in *)&sa)->sin_addr)))
1369     {
1370         sa.ss_family = AF_INET;
1371     } else if (inet_pton(AF_INET6,ip,
1372             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1373     {
1374         sa.ss_family = AF_INET6;
1375     } else {
1376         errno = EINVAL;
1377         return 0;
1378     }
1379 
1380     /* Port sanity check */
1381     if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1382         errno = EINVAL;
1383         return 0;
1384     }
1385 
1386     /* Set norm_ip as the normalized string representation of the node
1387      * IP address. */
1388     memset(norm_ip,0,NET_IP_STR_LEN);
1389     if (sa.ss_family == AF_INET)
1390         inet_ntop(AF_INET,
1391             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1392             norm_ip,NET_IP_STR_LEN);
1393     else
1394         inet_ntop(AF_INET6,
1395             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1396             norm_ip,NET_IP_STR_LEN);
1397 
1398     if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1399         errno = EAGAIN;
1400         return 0;
1401     }
1402 
1403     /* Add the node with a random address (NULL as first argument to
1404      * createClusterNode()). Everything will be fixed during the
1405      * handshake. */
1406     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1407     memcpy(n->ip,norm_ip,sizeof(n->ip));
1408     n->port = port;
1409     n->cport = cport;
1410     clusterAddNode(n);
1411     return 1;
1412 }
1413 
1414 /* Process the gossip section of PING or PONG packets.
1415  * Note that this function assumes that the packet is already sanity-checked
1416  * by the caller, not in the content of the gossip section, but in the
1417  * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1418 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1419     uint16_t count = ntohs(hdr->count);
1420     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1421     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1422 
1423     while(count--) {
1424         uint16_t flags = ntohs(g->flags);
1425         clusterNode *node;
1426         sds ci;
1427 
1428         if (server.verbosity == LL_DEBUG) {
1429             ci = representClusterNodeFlags(sdsempty(), flags);
1430             serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1431                 g->nodename,
1432                 g->ip,
1433                 ntohs(g->port),
1434                 ntohs(g->cport),
1435                 ci);
1436             sdsfree(ci);
1437         }
1438 
1439         /* Update our state accordingly to the gossip sections */
1440         node = clusterLookupNode(g->nodename);
1441         if (node) {
1442             /* We already know this node.
1443                Handle failure reports, only when the sender is a master. */
1444             if (sender && nodeIsMaster(sender) && node != myself) {
1445                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1446                     if (clusterNodeAddFailureReport(node,sender)) {
1447                         serverLog(LL_VERBOSE,
1448                             "Node %.40s reported node %.40s as not reachable.",
1449                             sender->name, node->name);
1450                     }
1451                     markNodeAsFailingIfNeeded(node);
1452                 } else {
1453                     if (clusterNodeDelFailureReport(node,sender)) {
1454                         serverLog(LL_VERBOSE,
1455                             "Node %.40s reported node %.40s is back online.",
1456                             sender->name, node->name);
1457                     }
1458                 }
1459             }
1460 
1461             /* If from our POV the node is up (no failure flags are set),
1462              * we have no pending ping for the node, nor we have failure
1463              * reports for this node, update the last pong time with the
1464              * one we see from the other nodes. */
1465             if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1466                 node->ping_sent == 0 &&
1467                 clusterNodeFailureReportsCount(node) == 0)
1468             {
1469                 mstime_t pongtime = ntohl(g->pong_received);
1470                 pongtime *= 1000; /* Convert back to milliseconds. */
1471 
1472                 /* Replace the pong time with the received one only if
1473                  * it's greater than our view but is not in the future
1474                  * (with 500 milliseconds tolerance) from the POV of our
1475                  * clock. */
1476                 if (pongtime <= (server.mstime+500) &&
1477                     pongtime > node->pong_received)
1478                 {
1479                     node->pong_received = pongtime;
1480                 }
1481             }
1482 
1483             /* If we already know this node, but it is not reachable, and
1484              * we see a different address in the gossip section of a node that
1485              * can talk with this other node, update the address, disconnect
1486              * the old link if any, so that we'll attempt to connect with the
1487              * new address. */
1488             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1489                 !(flags & CLUSTER_NODE_NOADDR) &&
1490                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1491                 (strcasecmp(node->ip,g->ip) ||
1492                  node->port != ntohs(g->port) ||
1493                  node->cport != ntohs(g->cport)))
1494             {
1495                 if (node->link) freeClusterLink(node->link);
1496                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1497                 node->port = ntohs(g->port);
1498                 node->cport = ntohs(g->cport);
1499                 node->flags &= ~CLUSTER_NODE_NOADDR;
1500             }
1501         } else {
1502             /* If it's not in NOADDR state and we don't have it, we
1503              * add it to our trusted dict with exact nodeid and flag.
1504              * Note that we cannot simply start a handshake against
1505              * this IP/PORT pairs, since IP/PORT can be reused already,
1506              * otherwise we risk joining another cluster.
1507              *
1508              * Note that we require that the sender of this gossip message
1509              * is a well known node in our cluster, otherwise we risk
1510              * joining another cluster. */
1511             if (sender &&
1512                 !(flags & CLUSTER_NODE_NOADDR) &&
1513                 !clusterBlacklistExists(g->nodename))
1514             {
1515                 clusterNode *node;
1516                 node = createClusterNode(g->nodename, flags);
1517                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1518                 node->port = ntohs(g->port);
1519                 node->cport = ntohs(g->cport);
1520                 clusterAddNode(node);
1521             }
1522         }
1523 
1524         /* Next node */
1525         g++;
1526     }
1527 }
1528 
1529 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1530  * If 'announced_ip' length is non-zero, it is used instead of extracting
1531  * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1532 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1533     if (announced_ip[0] != '\0') {
1534         memcpy(buf,announced_ip,NET_IP_STR_LEN);
1535         buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1536     } else {
1537         connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
1538     }
1539 }
1540 
1541 /* Update the node address to the IP address that can be extracted
1542  * from link->fd, or if hdr->myip is non empty, to the address the node
1543  * is announcing us. The port is taken from the packet header as well.
1544  *
1545  * If the address or port changed, disconnect the node link so that we'll
1546  * connect again to the new address.
1547  *
1548  * If the ip/port pair are already correct no operation is performed at
1549  * all.
1550  *
1551  * The function returns 0 if the node address is still the same,
1552  * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1553 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1554                               clusterMsg *hdr)
1555 {
1556     char ip[NET_IP_STR_LEN] = {0};
1557     int port = ntohs(hdr->port);
1558     int cport = ntohs(hdr->cport);
1559 
1560     /* We don't proceed if the link is the same as the sender link, as this
1561      * function is designed to see if the node link is consistent with the
1562      * symmetric link that is used to receive PINGs from the node.
1563      *
1564      * As a side effect this function never frees the passed 'link', so
1565      * it is safe to call during packet processing. */
1566     if (link == node->link) return 0;
1567 
1568     nodeIp2String(ip,link,hdr->myip);
1569     if (node->port == port && node->cport == cport &&
1570         strcmp(ip,node->ip) == 0) return 0;
1571 
1572     /* IP / port is different, update it. */
1573     memcpy(node->ip,ip,sizeof(ip));
1574     node->port = port;
1575     node->cport = cport;
1576     if (node->link) freeClusterLink(node->link);
1577     node->flags &= ~CLUSTER_NODE_NOADDR;
1578     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1579         node->name, node->ip, node->port);
1580 
1581     /* Check if this is our master and we have to change the
1582      * replication target as well. */
1583     if (nodeIsSlave(myself) && myself->slaveof == node)
1584         replicationSetMaster(node->ip, node->port);
1585     return 1;
1586 }
1587 
1588 /* Reconfigure the specified node 'n' as a master. This function is called when
1589  * a node that we believed to be a slave is now acting as master in order to
1590  * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1591 void clusterSetNodeAsMaster(clusterNode *n) {
1592     if (nodeIsMaster(n)) return;
1593 
1594     if (n->slaveof) {
1595         clusterNodeRemoveSlave(n->slaveof,n);
1596         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1597     }
1598     n->flags &= ~CLUSTER_NODE_SLAVE;
1599     n->flags |= CLUSTER_NODE_MASTER;
1600     n->slaveof = NULL;
1601 
1602     /* Update config and state. */
1603     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1604                          CLUSTER_TODO_UPDATE_STATE);
1605 }
1606 
1607 /* This function is called when we receive a master configuration via a
1608  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1609  * node, and the set of slots claimed under this configEpoch.
1610  *
1611  * What we do is to rebind the slots with newer configuration compared to our
1612  * local configuration, and if needed, we turn ourself into a replica of the
1613  * node (see the function comments for more info).
1614  *
1615  * The 'sender' is the node for which we received a configuration update.
1616  * Sometimes it is not actually the "Sender" of the information, like in the
1617  * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1618 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1619     int j;
1620     clusterNode *curmaster, *newmaster = NULL;
1621     /* The dirty slots list is a list of slots for which we lose the ownership
1622      * while having still keys inside. This usually happens after a failover
1623      * or after a manual cluster reconfiguration operated by the admin.
1624      *
1625      * If the update message is not able to demote a master to slave (in this
1626      * case we'll resync with the master updating the whole key space), we
1627      * need to delete all the keys in the slots we lost ownership. */
1628     uint16_t dirty_slots[CLUSTER_SLOTS];
1629     int dirty_slots_count = 0;
1630 
1631     /* Here we set curmaster to this node or the node this node
1632      * replicates to if it's a slave. In the for loop we are
1633      * interested to check if slots are taken away from curmaster. */
1634     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1635 
1636     if (sender == myself) {
1637         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1638         return;
1639     }
1640 
1641     for (j = 0; j < CLUSTER_SLOTS; j++) {
1642         if (bitmapTestBit(slots,j)) {
1643             /* The slot is already bound to the sender of this message. */
1644             if (server.cluster->slots[j] == sender) continue;
1645 
1646             /* The slot is in importing state, it should be modified only
1647              * manually via redis-trib (example: a resharding is in progress
1648              * and the migrating side slot was already closed and is advertising
1649              * a new config. We still want the slot to be closed manually). */
1650             if (server.cluster->importing_slots_from[j]) continue;
1651 
1652             /* We rebind the slot to the new node claiming it if:
1653              * 1) The slot was unassigned or the new node claims it with a
1654              *    greater configEpoch.
1655              * 2) We are not currently importing the slot. */
1656             if (server.cluster->slots[j] == NULL ||
1657                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1658             {
1659                 /* Was this slot mine, and still contains keys? Mark it as
1660                  * a dirty slot. */
1661                 if (server.cluster->slots[j] == myself &&
1662                     countKeysInSlot(j) &&
1663                     sender != myself)
1664                 {
1665                     dirty_slots[dirty_slots_count] = j;
1666                     dirty_slots_count++;
1667                 }
1668 
1669                 if (server.cluster->slots[j] == curmaster)
1670                     newmaster = sender;
1671                 clusterDelSlot(j);
1672                 clusterAddSlot(sender,j);
1673                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1674                                      CLUSTER_TODO_UPDATE_STATE|
1675                                      CLUSTER_TODO_FSYNC_CONFIG);
1676             }
1677         }
1678     }
1679 
1680     /* After updating the slots configuration, don't do any actual change
1681      * in the state of the server if a module disabled Redis Cluster
1682      * keys redirections. */
1683     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1684         return;
1685 
1686     /* If at least one slot was reassigned from a node to another node
1687      * with a greater configEpoch, it is possible that:
1688      * 1) We are a master left without slots. This means that we were
1689      *    failed over and we should turn into a replica of the new
1690      *    master.
1691      * 2) We are a slave and our master is left without slots. We need
1692      *    to replicate to the new slots owner. */
1693     if (newmaster && curmaster->numslots == 0) {
1694         serverLog(LL_WARNING,
1695             "Configuration change detected. Reconfiguring myself "
1696             "as a replica of %.40s", sender->name);
1697         clusterSetMaster(sender);
1698         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1699                              CLUSTER_TODO_UPDATE_STATE|
1700                              CLUSTER_TODO_FSYNC_CONFIG);
1701     } else if (dirty_slots_count) {
1702         /* If we are here, we received an update message which removed
1703          * ownership for certain slots we still have keys about, but still
1704          * we are serving some slots, so this master node was not demoted to
1705          * a slave.
1706          *
1707          * In order to maintain a consistent state between keys and slots
1708          * we need to remove all the keys from the slots we lost. */
1709         for (j = 0; j < dirty_slots_count; j++)
1710             delKeysInSlot(dirty_slots[j]);
1711     }
1712 }
1713 
1714 /* When this function is called, there is a packet to process starting
1715  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1716  * function should just handle the higher level stuff of processing the
1717  * packet, modifying the cluster state if needed.
1718  *
1719  * The function returns 1 if the link is still valid after the packet
1720  * was processed, otherwise 0 if the link was freed since the packet
1721  * processing lead to some inconsistency error (for instance a PONG
1722  * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1723 int clusterProcessPacket(clusterLink *link) {
1724     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1725     uint32_t totlen = ntohl(hdr->totlen);
1726     uint16_t type = ntohs(hdr->type);
1727     mstime_t now = mstime();
1728 
1729     if (type < CLUSTERMSG_TYPE_COUNT)
1730         server.cluster->stats_bus_messages_received[type]++;
1731     serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1732         type, (unsigned long) totlen);
1733 
1734     /* Perform sanity checks */
1735     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1736     if (totlen > link->rcvbuf_len) return 1;
1737 
1738     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1739         /* Can't handle messages of different versions. */
1740         return 1;
1741     }
1742 
1743     uint16_t flags = ntohs(hdr->flags);
1744     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1745     clusterNode *sender;
1746 
1747     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1748         type == CLUSTERMSG_TYPE_MEET)
1749     {
1750         uint16_t count = ntohs(hdr->count);
1751         uint32_t explen; /* expected length of this packet */
1752 
1753         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1754         explen += (sizeof(clusterMsgDataGossip)*count);
1755         if (totlen != explen) return 1;
1756     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1757         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1758 
1759         explen += sizeof(clusterMsgDataFail);
1760         if (totlen != explen) return 1;
1761     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1762         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1763 
1764         explen += sizeof(clusterMsgDataPublish) -
1765                 8 +
1766                 ntohl(hdr->data.publish.msg.channel_len) +
1767                 ntohl(hdr->data.publish.msg.message_len);
1768         if (totlen != explen) return 1;
1769     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1770                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1771                type == CLUSTERMSG_TYPE_MFSTART)
1772     {
1773         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1774 
1775         if (totlen != explen) return 1;
1776     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1777         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1778 
1779         explen += sizeof(clusterMsgDataUpdate);
1780         if (totlen != explen) return 1;
1781     } else if (type == CLUSTERMSG_TYPE_MODULE) {
1782         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1783 
1784         explen += sizeof(clusterMsgModule) -
1785                 3 + ntohl(hdr->data.module.msg.len);
1786         if (totlen != explen) return 1;
1787     }
1788 
1789     /* Check if the sender is a known node. Note that for incoming connections
1790      * we don't store link->node information, but resolve the node by the
1791      * ID in the header each time in the current implementation. */
1792     sender = clusterLookupNode(hdr->sender);
1793 
1794     /* Update the last time we saw any data from this node. We
1795      * use this in order to avoid detecting a timeout from a node that
1796      * is just sending a lot of data in the cluster bus, for instance
1797      * because of Pub/Sub. */
1798     if (sender) sender->data_received = now;
1799 
1800     if (sender && !nodeInHandshake(sender)) {
1801         /* Update our currentEpoch if we see a newer epoch in the cluster. */
1802         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1803         senderConfigEpoch = ntohu64(hdr->configEpoch);
1804         if (senderCurrentEpoch > server.cluster->currentEpoch)
1805             server.cluster->currentEpoch = senderCurrentEpoch;
1806         /* Update the sender configEpoch if it is publishing a newer one. */
1807         if (senderConfigEpoch > sender->configEpoch) {
1808             sender->configEpoch = senderConfigEpoch;
1809             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1810                                  CLUSTER_TODO_FSYNC_CONFIG);
1811         }
1812         /* Update the replication offset info for this node. */
1813         sender->repl_offset = ntohu64(hdr->offset);
1814         sender->repl_offset_time = now;
1815         /* If we are a slave performing a manual failover and our master
1816          * sent its offset while already paused, populate the MF state. */
1817         if (server.cluster->mf_end &&
1818             nodeIsSlave(myself) &&
1819             myself->slaveof == sender &&
1820             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1821             server.cluster->mf_master_offset == 0)
1822         {
1823             server.cluster->mf_master_offset = sender->repl_offset;
1824             serverLog(LL_WARNING,
1825                 "Received replication offset for paused "
1826                 "master manual failover: %lld",
1827                 server.cluster->mf_master_offset);
1828         }
1829     }
1830 
1831     /* Initial processing of PING and MEET requests replying with a PONG. */
1832     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1833         serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1834 
1835         /* We use incoming MEET messages in order to set the address
1836          * for 'myself', since only other cluster nodes will send us
1837          * MEET messages on handshakes, when the cluster joins, or
1838          * later if we changed address, and those nodes will use our
1839          * official address to connect to us. So by obtaining this address
1840          * from the socket is a simple way to discover / update our own
1841          * address in the cluster without it being hardcoded in the config.
1842          *
1843          * However if we don't have an address at all, we update the address
1844          * even with a normal PING packet. If it's wrong it will be fixed
1845          * by MEET later. */
1846         if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1847             server.cluster_announce_ip == NULL)
1848         {
1849             char ip[NET_IP_STR_LEN];
1850 
1851             if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
1852                 strcmp(ip,myself->ip))
1853             {
1854                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1855                 serverLog(LL_WARNING,"IP address for this node updated to %s",
1856                     myself->ip);
1857                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1858             }
1859         }
1860 
1861         /* Add this node if it is new for us and the msg type is MEET.
1862          * In this stage we don't try to add the node with the right
1863          * flags, slaveof pointer, and so forth, as this details will be
1864          * resolved when we'll receive PONGs from the node. */
1865         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1866             clusterNode *node;
1867 
1868             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1869             nodeIp2String(node->ip,link,hdr->myip);
1870             node->port = ntohs(hdr->port);
1871             node->cport = ntohs(hdr->cport);
1872             clusterAddNode(node);
1873             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1874         }
1875 
1876         /* If this is a MEET packet from an unknown node, we still process
1877          * the gossip section here since we have to trust the sender because
1878          * of the message type. */
1879         if (!sender && type == CLUSTERMSG_TYPE_MEET)
1880             clusterProcessGossipSection(hdr,link);
1881 
1882         /* Anyway reply with a PONG */
1883         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1884     }
1885 
1886     /* PING, PONG, MEET: process config information. */
1887     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1888         type == CLUSTERMSG_TYPE_MEET)
1889     {
1890         serverLog(LL_DEBUG,"%s packet received: %p",
1891             type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1892             (void*)link->node);
1893         if (link->node) {
1894             if (nodeInHandshake(link->node)) {
1895                 /* If we already have this node, try to change the
1896                  * IP/port of the node with the new one. */
1897                 if (sender) {
1898                     serverLog(LL_VERBOSE,
1899                         "Handshake: we already know node %.40s, "
1900                         "updating the address if needed.", sender->name);
1901                     if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1902                     {
1903                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1904                                              CLUSTER_TODO_UPDATE_STATE);
1905                     }
1906                     /* Free this node as we already have it. This will
1907                      * cause the link to be freed as well. */
1908                     clusterDelNode(link->node);
1909                     return 0;
1910                 }
1911 
1912                 /* First thing to do is replacing the random name with the
1913                  * right node name if this was a handshake stage. */
1914                 clusterRenameNode(link->node, hdr->sender);
1915                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1916                     link->node->name);
1917                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1918                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1919                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1920             } else if (memcmp(link->node->name,hdr->sender,
1921                         CLUSTER_NAMELEN) != 0)
1922             {
1923                 /* If the reply has a non matching node ID we
1924                  * disconnect this node and set it as not having an associated
1925                  * address. */
1926                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1927                     link->node->name,
1928                     (int)(now-(link->node->ctime)),
1929                     link->node->flags);
1930                 link->node->flags |= CLUSTER_NODE_NOADDR;
1931                 link->node->ip[0] = '\0';
1932                 link->node->port = 0;
1933                 link->node->cport = 0;
1934                 freeClusterLink(link);
1935                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1936                 return 0;
1937             }
1938         }
1939 
1940         /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1941          * announced. This is a dynamic flag that we receive from the
1942          * sender, and the latest status must be trusted. We need it to
1943          * be propagated because the slave ranking used to understand the
1944          * delay of each slave in the voting process, needs to know
1945          * what are the instances really competing. */
1946         if (sender) {
1947             int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1948             sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1949             sender->flags |= nofailover;
1950         }
1951 
1952         /* Update the node address if it changed. */
1953         if (sender && type == CLUSTERMSG_TYPE_PING &&
1954             !nodeInHandshake(sender) &&
1955             nodeUpdateAddressIfNeeded(sender,link,hdr))
1956         {
1957             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1958                                  CLUSTER_TODO_UPDATE_STATE);
1959         }
1960 
1961         /* Update our info about the node */
1962         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1963             link->node->pong_received = now;
1964             link->node->ping_sent = 0;
1965 
1966             /* The PFAIL condition can be reversed without external
1967              * help if it is momentary (that is, if it does not
1968              * turn into a FAIL state).
1969              *
1970              * The FAIL condition is also reversible under specific
1971              * conditions detected by clearNodeFailureIfNeeded(). */
1972             if (nodeTimedOut(link->node)) {
1973                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1974                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1975                                      CLUSTER_TODO_UPDATE_STATE);
1976             } else if (nodeFailed(link->node)) {
1977                 clearNodeFailureIfNeeded(link->node);
1978             }
1979         }
1980 
1981         /* Check for role switch: slave -> master or master -> slave. */
1982         if (sender) {
1983             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1984                 sizeof(hdr->slaveof)))
1985             {
1986                 /* Node is a master. */
1987                 clusterSetNodeAsMaster(sender);
1988             } else {
1989                 /* Node is a slave. */
1990                 clusterNode *master = clusterLookupNode(hdr->slaveof);
1991 
1992                 if (nodeIsMaster(sender)) {
1993                     /* Master turned into a slave! Reconfigure the node. */
1994                     clusterDelNodeSlots(sender);
1995                     sender->flags &= ~(CLUSTER_NODE_MASTER|
1996                                        CLUSTER_NODE_MIGRATE_TO);
1997                     sender->flags |= CLUSTER_NODE_SLAVE;
1998 
1999                     /* Update config and state. */
2000                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2001                                          CLUSTER_TODO_UPDATE_STATE);
2002                 }
2003 
2004                 /* Master node changed for this slave? */
2005                 if (master && sender->slaveof != master) {
2006                     if (sender->slaveof)
2007                         clusterNodeRemoveSlave(sender->slaveof,sender);
2008                     clusterNodeAddSlave(master,sender);
2009                     sender->slaveof = master;
2010 
2011                     /* Update config. */
2012                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2013                 }
2014             }
2015         }
2016 
2017         /* Update our info about served slots.
2018          *
2019          * Note: this MUST happen after we update the master/slave state
2020          * so that CLUSTER_NODE_MASTER flag will be set. */
2021 
2022         /* Many checks are only needed if the set of served slots this
2023          * instance claims is different compared to the set of slots we have
2024          * for it. Check this ASAP to avoid other computational expansive
2025          * checks later. */
2026         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
2027         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
2028 
2029         if (sender) {
2030             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
2031             if (sender_master) {
2032                 dirty_slots = memcmp(sender_master->slots,
2033                         hdr->myslots,sizeof(hdr->myslots)) != 0;
2034             }
2035         }
2036 
2037         /* 1) If the sender of the message is a master, and we detected that
2038          *    the set of slots it claims changed, scan the slots to see if we
2039          *    need to update our configuration. */
2040         if (sender && nodeIsMaster(sender) && dirty_slots)
2041             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
2042 
2043         /* 2) We also check for the reverse condition, that is, the sender
2044          *    claims to serve slots we know are served by a master with a
2045          *    greater configEpoch. If this happens we inform the sender.
2046          *
2047          * This is useful because sometimes after a partition heals, a
2048          * reappearing master may be the last one to claim a given set of
2049          * hash slots, but with a configuration that other instances know to
2050          * be deprecated. Example:
2051          *
2052          * A and B are master and slave for slots 1,2,3.
2053          * A is partitioned away, B gets promoted.
2054          * B is partitioned away, and A returns available.
2055          *
2056          * Usually B would PING A publishing its set of served slots and its
2057          * configEpoch, but because of the partition B can't inform A of the
2058          * new configuration, so other nodes that have an updated table must
2059          * do it. In this way A will stop to act as a master (or can try to
2060          * failover if there are the conditions to win the election). */
2061         if (sender && dirty_slots) {
2062             int j;
2063 
2064             for (j = 0; j < CLUSTER_SLOTS; j++) {
2065                 if (bitmapTestBit(hdr->myslots,j)) {
2066                     if (server.cluster->slots[j] == sender ||
2067                         server.cluster->slots[j] == NULL) continue;
2068                     if (server.cluster->slots[j]->configEpoch >
2069                         senderConfigEpoch)
2070                     {
2071                         serverLog(LL_VERBOSE,
2072                             "Node %.40s has old slots configuration, sending "
2073                             "an UPDATE message about %.40s",
2074                                 sender->name, server.cluster->slots[j]->name);
2075                         clusterSendUpdate(sender->link,
2076                             server.cluster->slots[j]);
2077 
2078                         /* TODO: instead of exiting the loop send every other
2079                          * UPDATE packet for other nodes that are the new owner
2080                          * of sender's slots. */
2081                         break;
2082                     }
2083                 }
2084             }
2085         }
2086 
2087         /* If our config epoch collides with the sender's try to fix
2088          * the problem. */
2089         if (sender &&
2090             nodeIsMaster(myself) && nodeIsMaster(sender) &&
2091             senderConfigEpoch == myself->configEpoch)
2092         {
2093             clusterHandleConfigEpochCollision(sender);
2094         }
2095 
2096         /* Get info from the gossip section */
2097         if (sender) clusterProcessGossipSection(hdr,link);
2098     } else if (type == CLUSTERMSG_TYPE_FAIL) {
2099         clusterNode *failing;
2100 
2101         if (sender) {
2102             failing = clusterLookupNode(hdr->data.fail.about.nodename);
2103             if (failing &&
2104                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2105             {
2106                 serverLog(LL_NOTICE,
2107                     "FAIL message received from %.40s about %.40s",
2108                     hdr->sender, hdr->data.fail.about.nodename);
2109                 failing->flags |= CLUSTER_NODE_FAIL;
2110                 failing->fail_time = now;
2111                 failing->flags &= ~CLUSTER_NODE_PFAIL;
2112                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2113                                      CLUSTER_TODO_UPDATE_STATE);
2114             }
2115         } else {
2116             serverLog(LL_NOTICE,
2117                 "Ignoring FAIL message from unknown node %.40s about %.40s",
2118                 hdr->sender, hdr->data.fail.about.nodename);
2119         }
2120     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2121         robj *channel, *message;
2122         uint32_t channel_len, message_len;
2123 
2124         /* Don't bother creating useless objects if there are no
2125          * Pub/Sub subscribers. */
2126         if (dictSize(server.pubsub_channels) ||
2127            listLength(server.pubsub_patterns))
2128         {
2129             channel_len = ntohl(hdr->data.publish.msg.channel_len);
2130             message_len = ntohl(hdr->data.publish.msg.message_len);
2131             channel = createStringObject(
2132                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
2133             message = createStringObject(
2134                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
2135                         message_len);
2136             pubsubPublishMessage(channel,message);
2137             decrRefCount(channel);
2138             decrRefCount(message);
2139         }
2140     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2141         if (!sender) return 1;  /* We don't know that node. */
2142         clusterSendFailoverAuthIfNeeded(sender,hdr);
2143     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2144         if (!sender) return 1;  /* We don't know that node. */
2145         /* We consider this vote only if the sender is a master serving
2146          * a non zero number of slots, and its currentEpoch is greater or
2147          * equal to epoch where this node started the election. */
2148         if (nodeIsMaster(sender) && sender->numslots > 0 &&
2149             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2150         {
2151             server.cluster->failover_auth_count++;
2152             /* Maybe we reached a quorum here, set a flag to make sure
2153              * we check ASAP. */
2154             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2155         }
2156     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2157         /* This message is acceptable only if I'm a master and the sender
2158          * is one of my slaves. */
2159         if (!sender || sender->slaveof != myself) return 1;
2160         /* Manual failover requested from slaves. Initialize the state
2161          * accordingly. */
2162         resetManualFailover();
2163         server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2164         server.cluster->mf_slave = sender;
2165         pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
2166         serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2167             sender->name);
2168     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2169         clusterNode *n; /* The node the update is about. */
2170         uint64_t reportedConfigEpoch =
2171                     ntohu64(hdr->data.update.nodecfg.configEpoch);
2172 
2173         if (!sender) return 1;  /* We don't know the sender. */
2174         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2175         if (!n) return 1;   /* We don't know the reported node. */
2176         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2177 
2178         /* If in our current config the node is a slave, set it as a master. */
2179         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2180 
2181         /* Update the node's configEpoch. */
2182         n->configEpoch = reportedConfigEpoch;
2183         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2184                              CLUSTER_TODO_FSYNC_CONFIG);
2185 
2186         /* Check the bitmap of served slots and update our
2187          * config accordingly. */
2188         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2189             hdr->data.update.nodecfg.slots);
2190     } else if (type == CLUSTERMSG_TYPE_MODULE) {
2191         if (!sender) return 1;  /* Protect the module from unknown nodes. */
2192         /* We need to route this message back to the right module subscribed
2193          * for the right message type. */
2194         uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2195         uint32_t len = ntohl(hdr->data.module.msg.len);
2196         uint8_t type = hdr->data.module.msg.type;
2197         unsigned char *payload = hdr->data.module.msg.bulk_data;
2198         moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2199     } else {
2200         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2201     }
2202     return 1;
2203 }
2204 
2205 /* This function is called when we detect the link with this node is lost.
2206    We set the node as no longer connected. The Cluster Cron will detect
2207    this connection and will try to get it connected again.
2208 
2209    Instead if the node is a temporary node used to accept a query, we
2210    completely free the node on error. */
handleLinkIOError(clusterLink * link)2211 void handleLinkIOError(clusterLink *link) {
2212     freeClusterLink(link);
2213 }
2214 
2215 /* Send data. This is handled using a trivial send buffer that gets
2216  * consumed by write(). We don't try to optimize this for speed too much
2217  * as this is a very low traffic channel. */
clusterWriteHandler(connection * conn)2218 void clusterWriteHandler(connection *conn) {
2219     clusterLink *link = connGetPrivateData(conn);
2220     ssize_t nwritten;
2221 
2222     nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
2223     if (nwritten <= 0) {
2224         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2225             (nwritten == -1) ? connGetLastError(conn) : "short write");
2226         handleLinkIOError(link);
2227         return;
2228     }
2229     sdsrange(link->sndbuf,nwritten,-1);
2230     if (sdslen(link->sndbuf) == 0)
2231         connSetWriteHandler(link->conn, NULL);
2232 }
2233 
2234 /* A connect handler that gets called when a connection to another node
2235  * gets established.
2236  */
clusterLinkConnectHandler(connection * conn)2237 void clusterLinkConnectHandler(connection *conn) {
2238     clusterLink *link = connGetPrivateData(conn);
2239     clusterNode *node = link->node;
2240 
2241     /* Check if connection succeeded */
2242     if (connGetState(conn) != CONN_STATE_CONNECTED) {
2243         serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
2244                 node->name, node->ip, node->cport,
2245                 connGetLastError(conn));
2246         freeClusterLink(link);
2247         return;
2248     }
2249 
2250     /* Register a read handler from now on */
2251     connSetReadHandler(conn, clusterReadHandler);
2252 
2253     /* Queue a PING in the new connection ASAP: this is crucial
2254      * to avoid false positives in failure detection.
2255      *
2256      * If the node is flagged as MEET, we send a MEET message instead
2257      * of a PING one, to force the receiver to add us in its node
2258      * table. */
2259     mstime_t old_ping_sent = node->ping_sent;
2260     clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
2261             CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
2262     if (old_ping_sent) {
2263         /* If there was an active ping before the link was
2264          * disconnected, we want to restore the ping time, otherwise
2265          * replaced by the clusterSendPing() call. */
2266         node->ping_sent = old_ping_sent;
2267     }
2268     /* We can clear the flag after the first packet is sent.
2269      * If we'll never receive a PONG, we'll never send new packets
2270      * to this node. Instead after the PONG is received and we
2271      * are no longer in meet/handshake status, we want to send
2272      * normal PING packets. */
2273     node->flags &= ~CLUSTER_NODE_MEET;
2274 
2275     serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
2276             node->name, node->ip, node->cport);
2277 }
2278 
2279 /* Read data. Try to read the first field of the header first to check the
2280  * full length of the packet. When a whole packet is in memory this function
2281  * will call the function to process the packet. And so forth. */
clusterReadHandler(connection * conn)2282 void clusterReadHandler(connection *conn) {
2283     clusterMsg buf[1];
2284     ssize_t nread;
2285     clusterMsg *hdr;
2286     clusterLink *link = connGetPrivateData(conn);
2287     unsigned int readlen, rcvbuflen;
2288 
2289     while(1) { /* Read as long as there is data to read. */
2290         rcvbuflen = link->rcvbuf_len;
2291         if (rcvbuflen < 8) {
2292             /* First, obtain the first 8 bytes to get the full message
2293              * length. */
2294             readlen = 8 - rcvbuflen;
2295         } else {
2296             /* Finally read the full message. */
2297             hdr = (clusterMsg*) link->rcvbuf;
2298             if (rcvbuflen == 8) {
2299                 /* Perform some sanity check on the message signature
2300                  * and length. */
2301                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2302                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2303                 {
2304                     serverLog(LL_WARNING,
2305                         "Bad message length or signature received "
2306                         "from Cluster bus.");
2307                     handleLinkIOError(link);
2308                     return;
2309                 }
2310             }
2311             readlen = ntohl(hdr->totlen) - rcvbuflen;
2312             if (readlen > sizeof(buf)) readlen = sizeof(buf);
2313         }
2314 
2315         nread = connRead(conn,buf,readlen);
2316         if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
2317 
2318         if (nread <= 0) {
2319             /* I/O error... */
2320             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2321                 (nread == 0) ? "connection closed" : connGetLastError(conn));
2322             handleLinkIOError(link);
2323             return;
2324         } else {
2325             /* Read data and recast the pointer to the new buffer. */
2326             size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
2327             if ((size_t)nread > unused) {
2328                 size_t required = link->rcvbuf_len + nread;
2329                 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
2330                 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
2331                 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
2332             }
2333             memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
2334             link->rcvbuf_len += nread;
2335             hdr = (clusterMsg*) link->rcvbuf;
2336             rcvbuflen += nread;
2337         }
2338 
2339         /* Total length obtained? Process this packet. */
2340         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2341             if (clusterProcessPacket(link)) {
2342                 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
2343                     zfree(link->rcvbuf);
2344                     link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
2345                 }
2346                 link->rcvbuf_len = 0;
2347             } else {
2348                 return; /* Link no longer valid. */
2349             }
2350         }
2351     }
2352 }
2353 
2354 /* Put stuff into the send buffer.
2355  *
2356  * It is guaranteed that this function will never have as a side effect
2357  * the link to be invalidated, so it is safe to call this function
2358  * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2359 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2360     if (sdslen(link->sndbuf) == 0 && msglen != 0)
2361         connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
2362 
2363     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2364 
2365     /* Populate sent messages stats. */
2366     clusterMsg *hdr = (clusterMsg*) msg;
2367     uint16_t type = ntohs(hdr->type);
2368     if (type < CLUSTERMSG_TYPE_COUNT)
2369         server.cluster->stats_bus_messages_sent[type]++;
2370 }
2371 
2372 /* Send a message to all the nodes that are part of the cluster having
2373  * a connected link.
2374  *
2375  * It is guaranteed that this function will never have as a side effect
2376  * some node->link to be invalidated, so it is safe to call this function
2377  * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2378 void clusterBroadcastMessage(void *buf, size_t len) {
2379     dictIterator *di;
2380     dictEntry *de;
2381 
2382     di = dictGetSafeIterator(server.cluster->nodes);
2383     while((de = dictNext(di)) != NULL) {
2384         clusterNode *node = dictGetVal(de);
2385 
2386         if (!node->link) continue;
2387         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2388             continue;
2389         clusterSendMessage(node->link,buf,len);
2390     }
2391     dictReleaseIterator(di);
2392 }
2393 
2394 /* Build the message header. hdr must point to a buffer at least
2395  * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2396 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2397     int totlen = 0;
2398     uint64_t offset;
2399     clusterNode *master;
2400 
2401     /* If this node is a master, we send its slots bitmap and configEpoch.
2402      * If this node is a slave we send the master's information instead (the
2403      * node is flagged as slave so the receiver knows that it is NOT really
2404      * in charge for this slots. */
2405     master = (nodeIsSlave(myself) && myself->slaveof) ?
2406               myself->slaveof : myself;
2407 
2408     memset(hdr,0,sizeof(*hdr));
2409     hdr->ver = htons(CLUSTER_PROTO_VER);
2410     hdr->sig[0] = 'R';
2411     hdr->sig[1] = 'C';
2412     hdr->sig[2] = 'm';
2413     hdr->sig[3] = 'b';
2414     hdr->type = htons(type);
2415     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2416 
2417     /* If cluster-announce-ip option is enabled, force the receivers of our
2418      * packets to use the specified address for this node. Otherwise if the
2419      * first byte is zero, they'll do auto discovery. */
2420     memset(hdr->myip,0,NET_IP_STR_LEN);
2421     if (server.cluster_announce_ip) {
2422         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2423         hdr->myip[NET_IP_STR_LEN-1] = '\0';
2424     }
2425 
2426     /* Handle cluster-announce-port as well. */
2427     int port = server.tls_cluster ? server.tls_port : server.port;
2428     int announced_port = server.cluster_announce_port ?
2429                          server.cluster_announce_port : port;
2430     int announced_cport = server.cluster_announce_bus_port ?
2431                           server.cluster_announce_bus_port :
2432                           (port + CLUSTER_PORT_INCR);
2433 
2434     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2435     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2436     if (myself->slaveof != NULL)
2437         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2438     hdr->port = htons(announced_port);
2439     hdr->cport = htons(announced_cport);
2440     hdr->flags = htons(myself->flags);
2441     hdr->state = server.cluster->state;
2442 
2443     /* Set the currentEpoch and configEpochs. */
2444     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2445     hdr->configEpoch = htonu64(master->configEpoch);
2446 
2447     /* Set the replication offset. */
2448     if (nodeIsSlave(myself))
2449         offset = replicationGetSlaveOffset();
2450     else
2451         offset = server.master_repl_offset;
2452     hdr->offset = htonu64(offset);
2453 
2454     /* Set the message flags. */
2455     if (nodeIsMaster(myself) && server.cluster->mf_end)
2456         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2457 
2458     /* Compute the message length for certain messages. For other messages
2459      * this is up to the caller. */
2460     if (type == CLUSTERMSG_TYPE_FAIL) {
2461         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2462         totlen += sizeof(clusterMsgDataFail);
2463     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2464         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2465         totlen += sizeof(clusterMsgDataUpdate);
2466     }
2467     hdr->totlen = htonl(totlen);
2468     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2469 }
2470 
2471 /* Return non zero if the node is already present in the gossip section of the
2472  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2473  * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2474 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2475     int j;
2476     for (j = 0; j < count; j++) {
2477         if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2478                 CLUSTER_NAMELEN) == 0) break;
2479     }
2480     return j != count;
2481 }
2482 
2483 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2484  * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2485 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2486     clusterMsgDataGossip *gossip;
2487     gossip = &(hdr->data.ping.gossip[i]);
2488     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2489     gossip->ping_sent = htonl(n->ping_sent/1000);
2490     gossip->pong_received = htonl(n->pong_received/1000);
2491     memcpy(gossip->ip,n->ip,sizeof(n->ip));
2492     gossip->port = htons(n->port);
2493     gossip->cport = htons(n->cport);
2494     gossip->flags = htons(n->flags);
2495     gossip->notused1 = 0;
2496 }
2497 
2498 /* Send a PING or PONG packet to the specified node, making sure to add enough
2499  * gossip information. */
clusterSendPing(clusterLink * link,int type)2500 void clusterSendPing(clusterLink *link, int type) {
2501     unsigned char *buf;
2502     clusterMsg *hdr;
2503     int gossipcount = 0; /* Number of gossip sections added so far. */
2504     int wanted; /* Number of gossip sections we want to append if possible. */
2505     int totlen; /* Total packet length. */
2506     /* freshnodes is the max number of nodes we can hope to append at all:
2507      * nodes available minus two (ourself and the node we are sending the
2508      * message to). However practically there may be less valid nodes since
2509      * nodes in handshake state, disconnected, are not considered. */
2510     int freshnodes = dictSize(server.cluster->nodes)-2;
2511 
2512     /* How many gossip sections we want to add? 1/10 of the number of nodes
2513      * and anyway at least 3. Why 1/10?
2514      *
2515      * If we have N masters, with N/10 entries, and we consider that in
2516      * node_timeout we exchange with each other node at least 4 packets
2517      * (we ping in the worst case in node_timeout/2 time, and we also
2518      * receive two pings from the host), we have a total of 8 packets
2519      * in the node_timeout*2 failure reports validity time. So we have
2520      * that, for a single PFAIL node, we can expect to receive the following
2521      * number of failure reports (in the specified window of time):
2522      *
2523      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2524      *
2525      * PROB = probability of being featured in a single gossip entry,
2526      *        which is 1 / NUM_OF_NODES.
2527      * ENTRIES = 10.
2528      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2529      *
2530      * If we assume we have just masters (so num of nodes and num of masters
2531      * is the same), with 1/10 we always get over the majority, and specifically
2532      * 80% of the number of nodes, to account for many masters failing at the
2533      * same time.
2534      *
2535      * Since we have non-voting slaves that lower the probability of an entry
2536      * to feature our node, we set the number of entries per packet as
2537      * 10% of the total nodes we have. */
2538     wanted = floor(dictSize(server.cluster->nodes)/10);
2539     if (wanted < 3) wanted = 3;
2540     if (wanted > freshnodes) wanted = freshnodes;
2541 
2542     /* Include all the nodes in PFAIL state, so that failure reports are
2543      * faster to propagate to go from PFAIL to FAIL state. */
2544     int pfail_wanted = server.cluster->stats_pfail_nodes;
2545 
2546     /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
2547      * later according to the number of gossip sections we really were able
2548      * to put inside the packet. */
2549     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2550     totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2551     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2552      * sizeof(clusterMsg) or more. */
2553     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2554     buf = zcalloc(totlen);
2555     hdr = (clusterMsg*) buf;
2556 
2557     /* Populate the header. */
2558     if (link->node && type == CLUSTERMSG_TYPE_PING)
2559         link->node->ping_sent = mstime();
2560     clusterBuildMessageHdr(hdr,type);
2561 
2562     /* Populate the gossip fields */
2563     int maxiterations = wanted*3;
2564     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2565         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2566         clusterNode *this = dictGetVal(de);
2567 
2568         /* Don't include this node: the whole packet header is about us
2569          * already, so we just gossip about other nodes. */
2570         if (this == myself) continue;
2571 
2572         /* PFAIL nodes will be added later. */
2573         if (this->flags & CLUSTER_NODE_PFAIL) continue;
2574 
2575         /* In the gossip section don't include:
2576          * 1) Nodes in HANDSHAKE state.
2577          * 3) Nodes with the NOADDR flag set.
2578          * 4) Disconnected nodes if they don't have configured slots.
2579          */
2580         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2581             (this->link == NULL && this->numslots == 0))
2582         {
2583             freshnodes--; /* Technically not correct, but saves CPU. */
2584             continue;
2585         }
2586 
2587         /* Do not add a node we already have. */
2588         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2589 
2590         /* Add it */
2591         clusterSetGossipEntry(hdr,gossipcount,this);
2592         freshnodes--;
2593         gossipcount++;
2594     }
2595 
2596     /* If there are PFAIL nodes, add them at the end. */
2597     if (pfail_wanted) {
2598         dictIterator *di;
2599         dictEntry *de;
2600 
2601         di = dictGetSafeIterator(server.cluster->nodes);
2602         while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2603             clusterNode *node = dictGetVal(de);
2604             if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2605             if (node->flags & CLUSTER_NODE_NOADDR) continue;
2606             if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2607             clusterSetGossipEntry(hdr,gossipcount,node);
2608             freshnodes--;
2609             gossipcount++;
2610             /* We take the count of the slots we allocated, since the
2611              * PFAIL stats may not match perfectly with the current number
2612              * of PFAIL nodes. */
2613             pfail_wanted--;
2614         }
2615         dictReleaseIterator(di);
2616     }
2617 
2618     /* Ready to send... fix the totlen fiend and queue the message in the
2619      * output buffer. */
2620     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2621     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2622     hdr->count = htons(gossipcount);
2623     hdr->totlen = htonl(totlen);
2624     clusterSendMessage(link,buf,totlen);
2625     zfree(buf);
2626 }
2627 
2628 /* Send a PONG packet to every connected node that's not in handshake state
2629  * and for which we have a valid link.
2630  *
2631  * In Redis Cluster pongs are not used just for failure detection, but also
2632  * to carry important configuration information. So broadcasting a pong is
2633  * useful when something changes in the configuration and we want to make
2634  * the cluster aware ASAP (for instance after a slave promotion).
2635  *
2636  * The 'target' argument specifies the receiving instances using the
2637  * defines below:
2638  *
2639  * CLUSTER_BROADCAST_ALL -> All known instances.
2640  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2641  */
2642 #define CLUSTER_BROADCAST_ALL 0
2643 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2644 void clusterBroadcastPong(int target) {
2645     dictIterator *di;
2646     dictEntry *de;
2647 
2648     di = dictGetSafeIterator(server.cluster->nodes);
2649     while((de = dictNext(di)) != NULL) {
2650         clusterNode *node = dictGetVal(de);
2651 
2652         if (!node->link) continue;
2653         if (node == myself || nodeInHandshake(node)) continue;
2654         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2655             int local_slave =
2656                 nodeIsSlave(node) && node->slaveof &&
2657                 (node->slaveof == myself || node->slaveof == myself->slaveof);
2658             if (!local_slave) continue;
2659         }
2660         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2661     }
2662     dictReleaseIterator(di);
2663 }
2664 
2665 /* Send a PUBLISH message.
2666  *
2667  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2668 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2669     unsigned char *payload;
2670     clusterMsg buf[1];
2671     clusterMsg *hdr = (clusterMsg*) buf;
2672     uint32_t totlen;
2673     uint32_t channel_len, message_len;
2674 
2675     channel = getDecodedObject(channel);
2676     message = getDecodedObject(message);
2677     channel_len = sdslen(channel->ptr);
2678     message_len = sdslen(message->ptr);
2679 
2680     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2681     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2682     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2683 
2684     hdr->data.publish.msg.channel_len = htonl(channel_len);
2685     hdr->data.publish.msg.message_len = htonl(message_len);
2686     hdr->totlen = htonl(totlen);
2687 
2688     /* Try to use the local buffer if possible */
2689     if (totlen < sizeof(buf)) {
2690         payload = (unsigned char*)buf;
2691     } else {
2692         payload = zmalloc(totlen);
2693         memcpy(payload,hdr,sizeof(*hdr));
2694         hdr = (clusterMsg*) payload;
2695     }
2696     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2697     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2698         message->ptr,sdslen(message->ptr));
2699 
2700     if (link)
2701         clusterSendMessage(link,payload,totlen);
2702     else
2703         clusterBroadcastMessage(payload,totlen);
2704 
2705     decrRefCount(channel);
2706     decrRefCount(message);
2707     if (payload != (unsigned char*)buf) zfree(payload);
2708 }
2709 
2710 /* Send a FAIL message to all the nodes we are able to contact.
2711  * The FAIL message is sent when we detect that a node is failing
2712  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2713  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2714  * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2715 void clusterSendFail(char *nodename) {
2716     clusterMsg buf[1];
2717     clusterMsg *hdr = (clusterMsg*) buf;
2718 
2719     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2720     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2721     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2722 }
2723 
2724 /* Send an UPDATE message to the specified link carrying the specified 'node'
2725  * slots configuration. The node name, slots bitmap, and configEpoch info
2726  * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2727 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2728     clusterMsg buf[1];
2729     clusterMsg *hdr = (clusterMsg*) buf;
2730 
2731     if (link == NULL) return;
2732     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2733     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2734     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2735     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2736     clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
2737 }
2738 
2739 /* Send a MODULE message.
2740  *
2741  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2742 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2743                        unsigned char *payload, uint32_t len) {
2744     unsigned char *heapbuf;
2745     clusterMsg buf[1];
2746     clusterMsg *hdr = (clusterMsg*) buf;
2747     uint32_t totlen;
2748 
2749     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2750     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2751     totlen += sizeof(clusterMsgModule) - 3 + len;
2752 
2753     hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2754     hdr->data.module.msg.type = type;
2755     hdr->data.module.msg.len = htonl(len);
2756     hdr->totlen = htonl(totlen);
2757 
2758     /* Try to use the local buffer if possible */
2759     if (totlen < sizeof(buf)) {
2760         heapbuf = (unsigned char*)buf;
2761     } else {
2762         heapbuf = zmalloc(totlen);
2763         memcpy(heapbuf,hdr,sizeof(*hdr));
2764         hdr = (clusterMsg*) heapbuf;
2765     }
2766     memcpy(hdr->data.module.msg.bulk_data,payload,len);
2767 
2768     if (link)
2769         clusterSendMessage(link,heapbuf,totlen);
2770     else
2771         clusterBroadcastMessage(heapbuf,totlen);
2772 
2773     if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
2774 }
2775 
2776 /* This function gets a cluster node ID string as target, the same way the nodes
2777  * addresses are represented in the modules side, resolves the node, and sends
2778  * the message. If the target is NULL the message is broadcasted.
2779  *
2780  * The function returns C_OK if the target is valid, otherwise C_ERR is
2781  * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2782 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2783     clusterNode *node = NULL;
2784 
2785     if (target != NULL) {
2786         node = clusterLookupNode(target);
2787         if (node == NULL || node->link == NULL) return C_ERR;
2788     }
2789 
2790     clusterSendModule(target ? node->link : NULL,
2791                       module_id, type, payload, len);
2792     return C_OK;
2793 }
2794 
2795 /* -----------------------------------------------------------------------------
2796  * CLUSTER Pub/Sub support
2797  *
2798  * For now we do very little, just propagating PUBLISH messages across the whole
2799  * cluster. In the future we'll try to get smarter and avoiding propagating those
2800  * messages to hosts without receives for a given channel.
2801  * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2802 void clusterPropagatePublish(robj *channel, robj *message) {
2803     clusterSendPublish(NULL, channel, message);
2804 }
2805 
2806 /* -----------------------------------------------------------------------------
2807  * SLAVE node specific functions
2808  * -------------------------------------------------------------------------- */
2809 
2810 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2811  * see if there is the quorum for this slave instance to failover its failing
2812  * master.
2813  *
2814  * Note that we send the failover request to everybody, master and slave nodes,
2815  * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2816 void clusterRequestFailoverAuth(void) {
2817     clusterMsg buf[1];
2818     clusterMsg *hdr = (clusterMsg*) buf;
2819     uint32_t totlen;
2820 
2821     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2822     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2823      * in the header to communicate the nodes receiving the message that
2824      * they should authorized the failover even if the master is working. */
2825     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2826     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2827     hdr->totlen = htonl(totlen);
2828     clusterBroadcastMessage(buf,totlen);
2829 }
2830 
2831 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2832 void clusterSendFailoverAuth(clusterNode *node) {
2833     clusterMsg buf[1];
2834     clusterMsg *hdr = (clusterMsg*) buf;
2835     uint32_t totlen;
2836 
2837     if (!node->link) return;
2838     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2839     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2840     hdr->totlen = htonl(totlen);
2841     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2842 }
2843 
2844 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2845 void clusterSendMFStart(clusterNode *node) {
2846     clusterMsg buf[1];
2847     clusterMsg *hdr = (clusterMsg*) buf;
2848     uint32_t totlen;
2849 
2850     if (!node->link) return;
2851     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2852     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2853     hdr->totlen = htonl(totlen);
2854     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2855 }
2856 
2857 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2858 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2859     clusterNode *master = node->slaveof;
2860     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2861     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2862     unsigned char *claimed_slots = request->myslots;
2863     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2864     int j;
2865 
2866     /* IF we are not a master serving at least 1 slot, we don't have the
2867      * right to vote, as the cluster size in Redis Cluster is the number
2868      * of masters serving at least one slot, and quorum is the cluster
2869      * size + 1 */
2870     if (nodeIsSlave(myself) || myself->numslots == 0) return;
2871 
2872     /* Request epoch must be >= our currentEpoch.
2873      * Note that it is impossible for it to actually be greater since
2874      * our currentEpoch was updated as a side effect of receiving this
2875      * request, if the request epoch was greater. */
2876     if (requestCurrentEpoch < server.cluster->currentEpoch) {
2877         serverLog(LL_WARNING,
2878             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2879             node->name,
2880             (unsigned long long) requestCurrentEpoch,
2881             (unsigned long long) server.cluster->currentEpoch);
2882         return;
2883     }
2884 
2885     /* I already voted for this epoch? Return ASAP. */
2886     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2887         serverLog(LL_WARNING,
2888                 "Failover auth denied to %.40s: already voted for epoch %llu",
2889                 node->name,
2890                 (unsigned long long) server.cluster->currentEpoch);
2891         return;
2892     }
2893 
2894     /* Node must be a slave and its master down.
2895      * The master can be non failing if the request is flagged
2896      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2897     if (nodeIsMaster(node) || master == NULL ||
2898         (!nodeFailed(master) && !force_ack))
2899     {
2900         if (nodeIsMaster(node)) {
2901             serverLog(LL_WARNING,
2902                     "Failover auth denied to %.40s: it is a master node",
2903                     node->name);
2904         } else if (master == NULL) {
2905             serverLog(LL_WARNING,
2906                     "Failover auth denied to %.40s: I don't know its master",
2907                     node->name);
2908         } else if (!nodeFailed(master)) {
2909             serverLog(LL_WARNING,
2910                     "Failover auth denied to %.40s: its master is up",
2911                     node->name);
2912         }
2913         return;
2914     }
2915 
2916     /* We did not voted for a slave about this master for two
2917      * times the node timeout. This is not strictly needed for correctness
2918      * of the algorithm but makes the base case more linear. */
2919     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2920     {
2921         serverLog(LL_WARNING,
2922                 "Failover auth denied to %.40s: "
2923                 "can't vote about this master before %lld milliseconds",
2924                 node->name,
2925                 (long long) ((server.cluster_node_timeout*2)-
2926                              (mstime() - node->slaveof->voted_time)));
2927         return;
2928     }
2929 
2930     /* The slave requesting the vote must have a configEpoch for the claimed
2931      * slots that is >= the one of the masters currently serving the same
2932      * slots in the current configuration. */
2933     for (j = 0; j < CLUSTER_SLOTS; j++) {
2934         if (bitmapTestBit(claimed_slots, j) == 0) continue;
2935         if (server.cluster->slots[j] == NULL ||
2936             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2937         {
2938             continue;
2939         }
2940         /* If we reached this point we found a slot that in our current slots
2941          * is served by a master with a greater configEpoch than the one claimed
2942          * by the slave requesting our vote. Refuse to vote for this slave. */
2943         serverLog(LL_WARNING,
2944                 "Failover auth denied to %.40s: "
2945                 "slot %d epoch (%llu) > reqEpoch (%llu)",
2946                 node->name, j,
2947                 (unsigned long long) server.cluster->slots[j]->configEpoch,
2948                 (unsigned long long) requestConfigEpoch);
2949         return;
2950     }
2951 
2952     /* We can vote for this slave. */
2953     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2954     node->slaveof->voted_time = mstime();
2955     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2956     clusterSendFailoverAuth(node);
2957     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2958         node->name, (unsigned long long) server.cluster->currentEpoch);
2959 }
2960 
2961 /* This function returns the "rank" of this instance, a slave, in the context
2962  * of its master-slaves ring. The rank of the slave is given by the number of
2963  * other slaves for the same master that have a better replication offset
2964  * compared to the local one (better means, greater, so they claim more data).
2965  *
2966  * A slave with rank 0 is the one with the greatest (most up to date)
2967  * replication offset, and so forth. Note that because how the rank is computed
2968  * multiple slaves may have the same rank, in case they have the same offset.
2969  *
2970  * The slave rank is used to add a delay to start an election in order to
2971  * get voted and replace a failing master. Slaves with better replication
2972  * offsets are more likely to win. */
clusterGetSlaveRank(void)2973 int clusterGetSlaveRank(void) {
2974     long long myoffset;
2975     int j, rank = 0;
2976     clusterNode *master;
2977 
2978     serverAssert(nodeIsSlave(myself));
2979     master = myself->slaveof;
2980     if (master == NULL) return 0; /* Never called by slaves without master. */
2981 
2982     myoffset = replicationGetSlaveOffset();
2983     for (j = 0; j < master->numslaves; j++)
2984         if (master->slaves[j] != myself &&
2985             !nodeCantFailover(master->slaves[j]) &&
2986             master->slaves[j]->repl_offset > myoffset) rank++;
2987     return rank;
2988 }
2989 
2990 /* This function is called by clusterHandleSlaveFailover() in order to
2991  * let the slave log why it is not able to failover. Sometimes there are
2992  * not the conditions, but since the failover function is called again and
2993  * again, we can't log the same things continuously.
2994  *
2995  * This function works by logging only if a given set of conditions are
2996  * true:
2997  *
2998  * 1) The reason for which the failover can't be initiated changed.
2999  *    The reasons also include a NONE reason we reset the state to
3000  *    when the slave finds that its master is fine (no FAIL flag).
3001  * 2) Also, the log is emitted again if the master is still down and
3002  *    the reason for not failing over is still the same, but more than
3003  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
3004  * 3) Finally, the function only logs if the slave is down for more than
3005  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
3006  *    failover starts in a reasonable time.
3007  *
3008  * The function is called with the reason why the slave can't failover
3009  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
3010  *
3011  * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)3012 void clusterLogCantFailover(int reason) {
3013     char *msg;
3014     static time_t lastlog_time = 0;
3015     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
3016 
3017     /* Don't log if we have the same reason for some time. */
3018     if (reason == server.cluster->cant_failover_reason &&
3019         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
3020         return;
3021 
3022     server.cluster->cant_failover_reason = reason;
3023 
3024     /* We also don't emit any log if the master failed no long ago, the
3025      * goal of this function is to log slaves in a stalled condition for
3026      * a long time. */
3027     if (myself->slaveof &&
3028         nodeFailed(myself->slaveof) &&
3029         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
3030 
3031     switch(reason) {
3032     case CLUSTER_CANT_FAILOVER_DATA_AGE:
3033         msg = "Disconnected from master for longer than allowed. "
3034               "Please check the 'cluster-replica-validity-factor' configuration "
3035               "option.";
3036         break;
3037     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
3038         msg = "Waiting the delay before I can start a new failover.";
3039         break;
3040     case CLUSTER_CANT_FAILOVER_EXPIRED:
3041         msg = "Failover attempt expired.";
3042         break;
3043     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
3044         msg = "Waiting for votes, but majority still not reached.";
3045         break;
3046     default:
3047         msg = "Unknown reason code.";
3048         break;
3049     }
3050     lastlog_time = time(NULL);
3051     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
3052 }
3053 
3054 /* This function implements the final part of automatic and manual failovers,
3055  * where the slave grabs its master's hash slots, and propagates the new
3056  * configuration.
3057  *
3058  * Note that it's up to the caller to be sure that the node got a new
3059  * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)3060 void clusterFailoverReplaceYourMaster(void) {
3061     int j;
3062     clusterNode *oldmaster = myself->slaveof;
3063 
3064     if (nodeIsMaster(myself) || oldmaster == NULL) return;
3065 
3066     /* 1) Turn this node into a master. */
3067     clusterSetNodeAsMaster(myself);
3068     replicationUnsetMaster();
3069 
3070     /* 2) Claim all the slots assigned to our master. */
3071     for (j = 0; j < CLUSTER_SLOTS; j++) {
3072         if (clusterNodeGetSlotBit(oldmaster,j)) {
3073             clusterDelSlot(j);
3074             clusterAddSlot(myself,j);
3075         }
3076     }
3077 
3078     /* 3) Update state and save config. */
3079     clusterUpdateState();
3080     clusterSaveConfigOrDie(1);
3081 
3082     /* 4) Pong all the other nodes so that they can update the state
3083      *    accordingly and detect that we switched to master role. */
3084     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
3085 
3086     /* 5) If there was a manual failover in progress, clear the state. */
3087     resetManualFailover();
3088 }
3089 
3090 /* This function is called if we are a slave node and our master serving
3091  * a non-zero amount of hash slots is in FAIL state.
3092  *
3093  * The gaol of this function is:
3094  * 1) To check if we are able to perform a failover, is our data updated?
3095  * 2) Try to get elected by masters.
3096  * 3) Perform the failover informing all the other nodes.
3097  */
clusterHandleSlaveFailover(void)3098 void clusterHandleSlaveFailover(void) {
3099     mstime_t data_age;
3100     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
3101     int needed_quorum = (server.cluster->size / 2) + 1;
3102     int manual_failover = server.cluster->mf_end != 0 &&
3103                           server.cluster->mf_can_start;
3104     mstime_t auth_timeout, auth_retry_time;
3105 
3106     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
3107 
3108     /* Compute the failover timeout (the max time we have to send votes
3109      * and wait for replies), and the failover retry time (the time to wait
3110      * before trying to get voted again).
3111      *
3112      * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3113      * Retry is two times the Timeout.
3114      */
3115     auth_timeout = server.cluster_node_timeout*2;
3116     if (auth_timeout < 2000) auth_timeout = 2000;
3117     auth_retry_time = auth_timeout*2;
3118 
3119     /* Pre conditions to run the function, that must be met both in case
3120      * of an automatic or manual failover:
3121      * 1) We are a slave.
3122      * 2) Our master is flagged as FAIL, or this is a manual failover.
3123      * 3) We don't have the no failover configuration set, and this is
3124      *    not a manual failover.
3125      * 4) It is serving slots. */
3126     if (nodeIsMaster(myself) ||
3127         myself->slaveof == NULL ||
3128         (!nodeFailed(myself->slaveof) && !manual_failover) ||
3129         (server.cluster_slave_no_failover && !manual_failover) ||
3130         myself->slaveof->numslots == 0)
3131     {
3132         /* There are no reasons to failover, so we set the reason why we
3133          * are returning without failing over to NONE. */
3134         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3135         return;
3136     }
3137 
3138     /* Set data_age to the number of seconds we are disconnected from
3139      * the master. */
3140     if (server.repl_state == REPL_STATE_CONNECTED) {
3141         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3142                    * 1000;
3143     } else {
3144         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3145     }
3146 
3147     /* Remove the node timeout from the data age as it is fine that we are
3148      * disconnected from our master at least for the time it was down to be
3149      * flagged as FAIL, that's the baseline. */
3150     if (data_age > server.cluster_node_timeout)
3151         data_age -= server.cluster_node_timeout;
3152 
3153     /* Check if our data is recent enough according to the slave validity
3154      * factor configured by the user.
3155      *
3156      * Check bypassed for manual failovers. */
3157     if (server.cluster_slave_validity_factor &&
3158         data_age >
3159         (((mstime_t)server.repl_ping_slave_period * 1000) +
3160          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3161     {
3162         if (!manual_failover) {
3163             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3164             return;
3165         }
3166     }
3167 
3168     /* If the previous failover attempt timeout and the retry time has
3169      * elapsed, we can setup a new one. */
3170     if (auth_age > auth_retry_time) {
3171         server.cluster->failover_auth_time = mstime() +
3172             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3173             random() % 500; /* Random delay between 0 and 500 milliseconds. */
3174         server.cluster->failover_auth_count = 0;
3175         server.cluster->failover_auth_sent = 0;
3176         server.cluster->failover_auth_rank = clusterGetSlaveRank();
3177         /* We add another delay that is proportional to the slave rank.
3178          * Specifically 1 second * rank. This way slaves that have a probably
3179          * less updated replication offset, are penalized. */
3180         server.cluster->failover_auth_time +=
3181             server.cluster->failover_auth_rank * 1000;
3182         /* However if this is a manual failover, no delay is needed. */
3183         if (server.cluster->mf_end) {
3184             server.cluster->failover_auth_time = mstime();
3185             server.cluster->failover_auth_rank = 0;
3186 	    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3187         }
3188         serverLog(LL_WARNING,
3189             "Start of election delayed for %lld milliseconds "
3190             "(rank #%d, offset %lld).",
3191             server.cluster->failover_auth_time - mstime(),
3192             server.cluster->failover_auth_rank,
3193             replicationGetSlaveOffset());
3194         /* Now that we have a scheduled election, broadcast our offset
3195          * to all the other slaves so that they'll updated their offsets
3196          * if our offset is better. */
3197         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3198         return;
3199     }
3200 
3201     /* It is possible that we received more updated offsets from other
3202      * slaves for the same master since we computed our election delay.
3203      * Update the delay if our rank changed.
3204      *
3205      * Not performed if this is a manual failover. */
3206     if (server.cluster->failover_auth_sent == 0 &&
3207         server.cluster->mf_end == 0)
3208     {
3209         int newrank = clusterGetSlaveRank();
3210         if (newrank > server.cluster->failover_auth_rank) {
3211             long long added_delay =
3212                 (newrank - server.cluster->failover_auth_rank) * 1000;
3213             server.cluster->failover_auth_time += added_delay;
3214             server.cluster->failover_auth_rank = newrank;
3215             serverLog(LL_WARNING,
3216                 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3217                 newrank, added_delay);
3218         }
3219     }
3220 
3221     /* Return ASAP if we can't still start the election. */
3222     if (mstime() < server.cluster->failover_auth_time) {
3223         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3224         return;
3225     }
3226 
3227     /* Return ASAP if the election is too old to be valid. */
3228     if (auth_age > auth_timeout) {
3229         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3230         return;
3231     }
3232 
3233     /* Ask for votes if needed. */
3234     if (server.cluster->failover_auth_sent == 0) {
3235         server.cluster->currentEpoch++;
3236         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3237         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3238             (unsigned long long) server.cluster->currentEpoch);
3239         clusterRequestFailoverAuth();
3240         server.cluster->failover_auth_sent = 1;
3241         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3242                              CLUSTER_TODO_UPDATE_STATE|
3243                              CLUSTER_TODO_FSYNC_CONFIG);
3244         return; /* Wait for replies. */
3245     }
3246 
3247     /* Check if we reached the quorum. */
3248     if (server.cluster->failover_auth_count >= needed_quorum) {
3249         /* We have the quorum, we can finally failover the master. */
3250 
3251         serverLog(LL_WARNING,
3252             "Failover election won: I'm the new master.");
3253 
3254         /* Update my configEpoch to the epoch of the election. */
3255         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3256             myself->configEpoch = server.cluster->failover_auth_epoch;
3257             serverLog(LL_WARNING,
3258                 "configEpoch set to %llu after successful failover",
3259                 (unsigned long long) myself->configEpoch);
3260         }
3261 
3262         /* Take responsibility for the cluster slots. */
3263         clusterFailoverReplaceYourMaster();
3264     } else {
3265         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3266     }
3267 }
3268 
3269 /* -----------------------------------------------------------------------------
3270  * CLUSTER slave migration
3271  *
3272  * Slave migration is the process that allows a slave of a master that is
3273  * already covered by at least another slave, to "migrate" to a master that
3274  * is orphaned, that is, left with no working slaves.
3275  * ------------------------------------------------------------------------- */
3276 
3277 /* This function is responsible to decide if this replica should be migrated
3278  * to a different (orphaned) master. It is called by the clusterCron() function
3279  * only if:
3280  *
3281  * 1) We are a slave node.
3282  * 2) It was detected that there is at least one orphaned master in
3283  *    the cluster.
3284  * 3) We are a slave of one of the masters with the greatest number of
3285  *    slaves.
3286  *
3287  * This checks are performed by the caller since it requires to iterate
3288  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3289  * if definitely needed.
3290  *
3291  * The function is called with a pre-computed max_slaves, that is the max
3292  * number of working (not in FAIL state) slaves for a single master.
3293  *
3294  * Additional conditions for migration are examined inside the function.
3295  */
clusterHandleSlaveMigration(int max_slaves)3296 void clusterHandleSlaveMigration(int max_slaves) {
3297     int j, okslaves = 0;
3298     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3299     dictIterator *di;
3300     dictEntry *de;
3301 
3302     /* Step 1: Don't migrate if the cluster state is not ok. */
3303     if (server.cluster->state != CLUSTER_OK) return;
3304 
3305     /* Step 2: Don't migrate if my master will not be left with at least
3306      *         'migration-barrier' slaves after my migration. */
3307     if (mymaster == NULL) return;
3308     for (j = 0; j < mymaster->numslaves; j++)
3309         if (!nodeFailed(mymaster->slaves[j]) &&
3310             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3311     if (okslaves <= server.cluster_migration_barrier) return;
3312 
3313     /* Step 3: Identify a candidate for migration, and check if among the
3314      * masters with the greatest number of ok slaves, I'm the one with the
3315      * smallest node ID (the "candidate slave").
3316      *
3317      * Note: this means that eventually a replica migration will occur
3318      * since slaves that are reachable again always have their FAIL flag
3319      * cleared, so eventually there must be a candidate. At the same time
3320      * this does not mean that there are no race conditions possible (two
3321      * slaves migrating at the same time), but this is unlikely to
3322      * happen, and harmless when happens. */
3323     candidate = myself;
3324     di = dictGetSafeIterator(server.cluster->nodes);
3325     while((de = dictNext(di)) != NULL) {
3326         clusterNode *node = dictGetVal(de);
3327         int okslaves = 0, is_orphaned = 1;
3328 
3329         /* We want to migrate only if this master is working, orphaned, and
3330          * used to have slaves or if failed over a master that had slaves
3331          * (MIGRATE_TO flag). This way we only migrate to instances that were
3332          * supposed to have replicas. */
3333         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3334         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3335 
3336         /* Check number of working slaves. */
3337         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3338         if (okslaves > 0) is_orphaned = 0;
3339 
3340         if (is_orphaned) {
3341             if (!target && node->numslots > 0) target = node;
3342 
3343             /* Track the starting time of the orphaned condition for this
3344              * master. */
3345             if (!node->orphaned_time) node->orphaned_time = mstime();
3346         } else {
3347             node->orphaned_time = 0;
3348         }
3349 
3350         /* Check if I'm the slave candidate for the migration: attached
3351          * to a master with the maximum number of slaves and with the smallest
3352          * node ID. */
3353         if (okslaves == max_slaves) {
3354             for (j = 0; j < node->numslaves; j++) {
3355                 if (memcmp(node->slaves[j]->name,
3356                            candidate->name,
3357                            CLUSTER_NAMELEN) < 0)
3358                 {
3359                     candidate = node->slaves[j];
3360                 }
3361             }
3362         }
3363     }
3364     dictReleaseIterator(di);
3365 
3366     /* Step 4: perform the migration if there is a target, and if I'm the
3367      * candidate, but only if the master is continuously orphaned for a
3368      * couple of seconds, so that during failovers, we give some time to
3369      * the natural slaves of this instance to advertise their switch from
3370      * the old master to the new one. */
3371     if (target && candidate == myself &&
3372         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3373        !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3374     {
3375         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3376             target->name);
3377         clusterSetMaster(target);
3378     }
3379 }
3380 
3381 /* -----------------------------------------------------------------------------
3382  * CLUSTER manual failover
3383  *
3384  * This are the important steps performed by slaves during a manual failover:
3385  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3386  *    setting mf_end to the millisecond unix time at which we'll abort the
3387  *    attempt.
3388  * 2) Slave sends a MFSTART message to the master requesting to pause clients
3389  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3390  *    When master is paused for manual failover, it also starts to flag
3391  *    packets with CLUSTERMSG_FLAG0_PAUSED.
3392  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3393  * 4) If slave received the offset from the master, and its offset matches,
3394  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3395  *    the failover as usually, with the difference that the vote request
3396  *    will be modified to force masters to vote for a slave that has a
3397  *    working master.
3398  *
3399  * From the point of view of the master things are simpler: when a
3400  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3401  * the sender in mf_slave. During the time limit for the manual failover
3402  * the master will just send PINGs more often to this slave, flagged with
3403  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3404  * a packet from the master with this flag set.
3405  *
3406  * The gaol of the manual failover is to perform a fast failover without
3407  * data loss due to the asynchronous master-slave replication.
3408  * -------------------------------------------------------------------------- */
3409 
3410 /* Reset the manual failover state. This works for both masters and slaves
3411  * as all the state about manual failover is cleared.
3412  *
3413  * The function can be used both to initialize the manual failover state at
3414  * startup or to abort a manual failover in progress. */
resetManualFailover(void)3415 void resetManualFailover(void) {
3416     if (server.cluster->mf_end && clientsArePaused()) {
3417         server.clients_pause_end_time = 0;
3418         clientsArePaused(); /* Just use the side effect of the function. */
3419     }
3420     server.cluster->mf_end = 0; /* No manual failover in progress. */
3421     server.cluster->mf_can_start = 0;
3422     server.cluster->mf_slave = NULL;
3423     server.cluster->mf_master_offset = 0;
3424 }
3425 
3426 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3427 void manualFailoverCheckTimeout(void) {
3428     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3429         serverLog(LL_WARNING,"Manual failover timed out.");
3430         resetManualFailover();
3431     }
3432 }
3433 
3434 /* This function is called from the cluster cron function in order to go
3435  * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3436 void clusterHandleManualFailover(void) {
3437     /* Return ASAP if no manual failover is in progress. */
3438     if (server.cluster->mf_end == 0) return;
3439 
3440     /* If mf_can_start is non-zero, the failover was already triggered so the
3441      * next steps are performed by clusterHandleSlaveFailover(). */
3442     if (server.cluster->mf_can_start) return;
3443 
3444     if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3445 
3446     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3447         /* Our replication offset matches the master replication offset
3448          * announced after clients were paused. We can start the failover. */
3449         server.cluster->mf_can_start = 1;
3450         serverLog(LL_WARNING,
3451             "All master replication stream processed, "
3452             "manual failover can start.");
3453     }
3454 }
3455 
3456 /* -----------------------------------------------------------------------------
3457  * CLUSTER cron job
3458  * -------------------------------------------------------------------------- */
3459 
3460 /* This is executed 10 times every second */
clusterCron(void)3461 void clusterCron(void) {
3462     dictIterator *di;
3463     dictEntry *de;
3464     int update_state = 0;
3465     int orphaned_masters; /* How many masters there are without ok slaves. */
3466     int max_slaves; /* Max number of ok slaves for a single master. */
3467     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3468     mstime_t min_pong = 0, now = mstime();
3469     clusterNode *min_pong_node = NULL;
3470     static unsigned long long iteration = 0;
3471     mstime_t handshake_timeout;
3472 
3473     iteration++; /* Number of times this function was called so far. */
3474 
3475     /* We want to take myself->ip in sync with the cluster-announce-ip option.
3476      * The option can be set at runtime via CONFIG SET, so we periodically check
3477      * if the option changed to reflect this into myself->ip. */
3478     {
3479         static char *prev_ip = NULL;
3480         char *curr_ip = server.cluster_announce_ip;
3481         int changed = 0;
3482 
3483         if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3484         else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3485         else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3486 
3487         if (changed) {
3488             if (prev_ip) zfree(prev_ip);
3489             prev_ip = curr_ip;
3490 
3491             if (curr_ip) {
3492                 /* We always take a copy of the previous IP address, by
3493                  * duplicating the string. This way later we can check if
3494                  * the address really changed. */
3495                 prev_ip = zstrdup(prev_ip);
3496                 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3497                 myself->ip[NET_IP_STR_LEN-1] = '\0';
3498             } else {
3499                 myself->ip[0] = '\0'; /* Force autodetection. */
3500             }
3501         }
3502     }
3503 
3504     /* The handshake timeout is the time after which a handshake node that was
3505      * not turned into a normal node is removed from the nodes. Usually it is
3506      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3507      * the value of 1 second. */
3508     handshake_timeout = server.cluster_node_timeout;
3509     if (handshake_timeout < 1000) handshake_timeout = 1000;
3510 
3511     /* Update myself flags. */
3512     clusterUpdateMyselfFlags();
3513 
3514     /* Check if we have disconnected nodes and re-establish the connection.
3515      * Also update a few stats while we are here, that can be used to make
3516      * better decisions in other part of the code. */
3517     di = dictGetSafeIterator(server.cluster->nodes);
3518     server.cluster->stats_pfail_nodes = 0;
3519     while((de = dictNext(di)) != NULL) {
3520         clusterNode *node = dictGetVal(de);
3521 
3522         /* Not interested in reconnecting the link with myself or nodes
3523          * for which we have no address. */
3524         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3525 
3526         if (node->flags & CLUSTER_NODE_PFAIL)
3527             server.cluster->stats_pfail_nodes++;
3528 
3529         /* A Node in HANDSHAKE state has a limited lifespan equal to the
3530          * configured node timeout. */
3531         if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3532             clusterDelNode(node);
3533             continue;
3534         }
3535 
3536         if (node->link == NULL) {
3537             clusterLink *link = createClusterLink(node);
3538             link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
3539             connSetPrivateData(link->conn, link);
3540             if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
3541                         clusterLinkConnectHandler) == -1) {
3542                 /* We got a synchronous error from connect before
3543                  * clusterSendPing() had a chance to be called.
3544                  * If node->ping_sent is zero, failure detection can't work,
3545                  * so we claim we actually sent a ping now (that will
3546                  * be really sent as soon as the link is obtained). */
3547                 if (node->ping_sent == 0) node->ping_sent = mstime();
3548                 serverLog(LL_DEBUG, "Unable to connect to "
3549                     "Cluster Node [%s]:%d -> %s", node->ip,
3550                     node->cport, server.neterr);
3551 
3552                 freeClusterLink(link);
3553                 continue;
3554             }
3555             node->link = link;
3556         }
3557     }
3558     dictReleaseIterator(di);
3559 
3560     /* Ping some random node 1 time every 10 iterations, so that we usually ping
3561      * one random node every second. */
3562     if (!(iteration % 10)) {
3563         int j;
3564 
3565         /* Check a few random nodes and ping the one with the oldest
3566          * pong_received time. */
3567         for (j = 0; j < 5; j++) {
3568             de = dictGetRandomKey(server.cluster->nodes);
3569             clusterNode *this = dictGetVal(de);
3570 
3571             /* Don't ping nodes disconnected or with a ping currently active. */
3572             if (this->link == NULL || this->ping_sent != 0) continue;
3573             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3574                 continue;
3575             if (min_pong_node == NULL || min_pong > this->pong_received) {
3576                 min_pong_node = this;
3577                 min_pong = this->pong_received;
3578             }
3579         }
3580         if (min_pong_node) {
3581             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3582             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3583         }
3584     }
3585 
3586     /* Iterate nodes to check if we need to flag something as failing.
3587      * This loop is also responsible to:
3588      * 1) Check if there are orphaned masters (masters without non failing
3589      *    slaves).
3590      * 2) Count the max number of non failing slaves for a single master.
3591      * 3) Count the number of slaves for our master, if we are a slave. */
3592     orphaned_masters = 0;
3593     max_slaves = 0;
3594     this_slaves = 0;
3595     di = dictGetSafeIterator(server.cluster->nodes);
3596     while((de = dictNext(di)) != NULL) {
3597         clusterNode *node = dictGetVal(de);
3598         now = mstime(); /* Use an updated time at every iteration. */
3599 
3600         if (node->flags &
3601             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3602                 continue;
3603 
3604         /* Orphaned master check, useful only if the current instance
3605          * is a slave that may migrate to another master. */
3606         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3607             int okslaves = clusterCountNonFailingSlaves(node);
3608 
3609             /* A master is orphaned if it is serving a non-zero number of
3610              * slots, have no working slaves, but used to have at least one
3611              * slave, or failed over a master that used to have slaves. */
3612             if (okslaves == 0 && node->numslots > 0 &&
3613                 node->flags & CLUSTER_NODE_MIGRATE_TO)
3614             {
3615                 orphaned_masters++;
3616             }
3617             if (okslaves > max_slaves) max_slaves = okslaves;
3618             if (nodeIsSlave(myself) && myself->slaveof == node)
3619                 this_slaves = okslaves;
3620         }
3621 
3622         /* If we are not receiving any data for more than half the cluster
3623          * timeout, reconnect the link: maybe there is a connection
3624          * issue even if the node is alive. */
3625         mstime_t ping_delay = now - node->ping_sent;
3626         mstime_t data_delay = now - node->data_received;
3627         if (node->link && /* is connected */
3628             now - node->link->ctime >
3629             server.cluster_node_timeout && /* was not already reconnected */
3630             node->ping_sent && /* we already sent a ping */
3631             /* and we are waiting for the pong more than timeout/2 */
3632             ping_delay > server.cluster_node_timeout/2 &&
3633             /* and in such interval we are not seeing any traffic at all. */
3634             data_delay > server.cluster_node_timeout/2)
3635         {
3636             /* Disconnect the link, it will be reconnected automatically. */
3637             freeClusterLink(node->link);
3638         }
3639 
3640         /* If we have currently no active ping in this instance, and the
3641          * received PONG is older than half the cluster timeout, send
3642          * a new ping now, to ensure all the nodes are pinged without
3643          * a too big delay. */
3644         if (node->link &&
3645             node->ping_sent == 0 &&
3646             (now - node->pong_received) > server.cluster_node_timeout/2)
3647         {
3648             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3649             continue;
3650         }
3651 
3652         /* If we are a master and one of the slaves requested a manual
3653          * failover, ping it continuously. */
3654         if (server.cluster->mf_end &&
3655             nodeIsMaster(myself) &&
3656             server.cluster->mf_slave == node &&
3657             node->link)
3658         {
3659             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3660             continue;
3661         }
3662 
3663         /* Check only if we have an active ping for this instance. */
3664         if (node->ping_sent == 0) continue;
3665 
3666         /* Check if this node looks unreachable.
3667          * Note that if we already received the PONG, then node->ping_sent
3668          * is zero, so can't reach this code at all, so we don't risk of
3669          * checking for a PONG delay if we didn't sent the PING.
3670          *
3671          * We also consider every incoming data as proof of liveness, since
3672          * our cluster bus link is also used for data: under heavy data
3673          * load pong delays are possible. */
3674         mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
3675                                                           data_delay;
3676 
3677         if (node_delay > server.cluster_node_timeout) {
3678             /* Timeout reached. Set the node as possibly failing if it is
3679              * not already in this state. */
3680             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3681                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3682                     node->name);
3683                 node->flags |= CLUSTER_NODE_PFAIL;
3684                 update_state = 1;
3685             }
3686         }
3687     }
3688     dictReleaseIterator(di);
3689 
3690     /* If we are a slave node but the replication is still turned off,
3691      * enable it if we know the address of our master and it appears to
3692      * be up. */
3693     if (nodeIsSlave(myself) &&
3694         server.masterhost == NULL &&
3695         myself->slaveof &&
3696         nodeHasAddr(myself->slaveof))
3697     {
3698         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3699     }
3700 
3701     /* Abort a manual failover if the timeout is reached. */
3702     manualFailoverCheckTimeout();
3703 
3704     if (nodeIsSlave(myself)) {
3705         clusterHandleManualFailover();
3706         if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3707             clusterHandleSlaveFailover();
3708         /* If there are orphaned slaves, and we are a slave among the masters
3709          * with the max number of non-failing slaves, consider migrating to
3710          * the orphaned masters. Note that it does not make sense to try
3711          * a migration if there is no master with at least *two* working
3712          * slaves. */
3713         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3714             clusterHandleSlaveMigration(max_slaves);
3715     }
3716 
3717     if (update_state || server.cluster->state == CLUSTER_FAIL)
3718         clusterUpdateState();
3719 }
3720 
3721 /* This function is called before the event handler returns to sleep for
3722  * events. It is useful to perform operations that must be done ASAP in
3723  * reaction to events fired but that are not safe to perform inside event
3724  * handlers, or to perform potentially expansive tasks that we need to do
3725  * a single time before replying to clients. */
clusterBeforeSleep(void)3726 void clusterBeforeSleep(void) {
3727     /* Handle failover, this is needed when it is likely that there is already
3728      * the quorum from masters in order to react fast. */
3729     if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3730         clusterHandleSlaveFailover();
3731 
3732     /* Update the cluster state. */
3733     if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3734         clusterUpdateState();
3735 
3736     /* Save the config, possibly using fsync. */
3737     if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3738         int fsync = server.cluster->todo_before_sleep &
3739                     CLUSTER_TODO_FSYNC_CONFIG;
3740         clusterSaveConfigOrDie(fsync);
3741     }
3742 
3743     /* Reset our flags (not strictly needed since every single function
3744      * called for flags set should be able to clear its flag). */
3745     server.cluster->todo_before_sleep = 0;
3746 }
3747 
clusterDoBeforeSleep(int flags)3748 void clusterDoBeforeSleep(int flags) {
3749     server.cluster->todo_before_sleep |= flags;
3750 }
3751 
3752 /* -----------------------------------------------------------------------------
3753  * Slots management
3754  * -------------------------------------------------------------------------- */
3755 
3756 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3757  * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3758 int bitmapTestBit(unsigned char *bitmap, int pos) {
3759     off_t byte = pos/8;
3760     int bit = pos&7;
3761     return (bitmap[byte] & (1<<bit)) != 0;
3762 }
3763 
3764 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3765 void bitmapSetBit(unsigned char *bitmap, int pos) {
3766     off_t byte = pos/8;
3767     int bit = pos&7;
3768     bitmap[byte] |= 1<<bit;
3769 }
3770 
3771 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3772 void bitmapClearBit(unsigned char *bitmap, int pos) {
3773     off_t byte = pos/8;
3774     int bit = pos&7;
3775     bitmap[byte] &= ~(1<<bit);
3776 }
3777 
3778 /* Return non-zero if there is at least one master with slaves in the cluster.
3779  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3780  * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3781 int clusterMastersHaveSlaves(void) {
3782     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3783     dictEntry *de;
3784     int slaves = 0;
3785     while((de = dictNext(di)) != NULL) {
3786         clusterNode *node = dictGetVal(de);
3787 
3788         if (nodeIsSlave(node)) continue;
3789         slaves += node->numslaves;
3790     }
3791     dictReleaseIterator(di);
3792     return slaves != 0;
3793 }
3794 
3795 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3796 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3797     int old = bitmapTestBit(n->slots,slot);
3798     bitmapSetBit(n->slots,slot);
3799     if (!old) {
3800         n->numslots++;
3801         /* When a master gets its first slot, even if it has no slaves,
3802          * it gets flagged with MIGRATE_TO, that is, the master is a valid
3803          * target for replicas migration, if and only if at least one of
3804          * the other masters has slaves right now.
3805          *
3806          * Normally masters are valid targets of replica migration if:
3807          * 1. The used to have slaves (but no longer have).
3808          * 2. They are slaves failing over a master that used to have slaves.
3809          *
3810          * However new masters with slots assigned are considered valid
3811          * migration targets if the rest of the cluster is not a slave-less.
3812          *
3813          * See https://github.com/antirez/redis/issues/3043 for more info. */
3814         if (n->numslots == 1 && clusterMastersHaveSlaves())
3815             n->flags |= CLUSTER_NODE_MIGRATE_TO;
3816     }
3817     return old;
3818 }
3819 
3820 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3821 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3822     int old = bitmapTestBit(n->slots,slot);
3823     bitmapClearBit(n->slots,slot);
3824     if (old) n->numslots--;
3825     return old;
3826 }
3827 
3828 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3829 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3830     return bitmapTestBit(n->slots,slot);
3831 }
3832 
3833 /* Add the specified slot to the list of slots that node 'n' will
3834  * serve. Return C_OK if the operation ended with success.
3835  * If the slot is already assigned to another instance this is considered
3836  * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3837 int clusterAddSlot(clusterNode *n, int slot) {
3838     if (server.cluster->slots[slot]) return C_ERR;
3839     clusterNodeSetSlotBit(n,slot);
3840     server.cluster->slots[slot] = n;
3841     return C_OK;
3842 }
3843 
3844 /* Delete the specified slot marking it as unassigned.
3845  * Returns C_OK if the slot was assigned, otherwise if the slot was
3846  * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3847 int clusterDelSlot(int slot) {
3848     clusterNode *n = server.cluster->slots[slot];
3849 
3850     if (!n) return C_ERR;
3851     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3852     server.cluster->slots[slot] = NULL;
3853     return C_OK;
3854 }
3855 
3856 /* Delete all the slots associated with the specified node.
3857  * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3858 int clusterDelNodeSlots(clusterNode *node) {
3859     int deleted = 0, j;
3860 
3861     for (j = 0; j < CLUSTER_SLOTS; j++) {
3862         if (clusterNodeGetSlotBit(node,j)) {
3863             clusterDelSlot(j);
3864             deleted++;
3865         }
3866     }
3867     return deleted;
3868 }
3869 
3870 /* Clear the migrating / importing state for all the slots.
3871  * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3872 void clusterCloseAllSlots(void) {
3873     memset(server.cluster->migrating_slots_to,0,
3874         sizeof(server.cluster->migrating_slots_to));
3875     memset(server.cluster->importing_slots_from,0,
3876         sizeof(server.cluster->importing_slots_from));
3877 }
3878 
3879 /* -----------------------------------------------------------------------------
3880  * Cluster state evaluation function
3881  * -------------------------------------------------------------------------- */
3882 
3883 /* The following are defines that are only used in the evaluation function
3884  * and are based on heuristics. Actually the main point about the rejoin and
3885  * writable delay is that they should be a few orders of magnitude larger
3886  * than the network latency. */
3887 #define CLUSTER_MAX_REJOIN_DELAY 5000
3888 #define CLUSTER_MIN_REJOIN_DELAY 500
3889 #define CLUSTER_WRITABLE_DELAY 2000
3890 
clusterUpdateState(void)3891 void clusterUpdateState(void) {
3892     int j, new_state;
3893     int reachable_masters = 0;
3894     static mstime_t among_minority_time;
3895     static mstime_t first_call_time = 0;
3896 
3897     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3898 
3899     /* If this is a master node, wait some time before turning the state
3900      * into OK, since it is not a good idea to rejoin the cluster as a writable
3901      * master, after a reboot, without giving the cluster a chance to
3902      * reconfigure this node. Note that the delay is calculated starting from
3903      * the first call to this function and not since the server start, in order
3904      * to don't count the DB loading time. */
3905     if (first_call_time == 0) first_call_time = mstime();
3906     if (nodeIsMaster(myself) &&
3907         server.cluster->state == CLUSTER_FAIL &&
3908         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3909 
3910     /* Start assuming the state is OK. We'll turn it into FAIL if there
3911      * are the right conditions. */
3912     new_state = CLUSTER_OK;
3913 
3914     /* Check if all the slots are covered. */
3915     if (server.cluster_require_full_coverage) {
3916         for (j = 0; j < CLUSTER_SLOTS; j++) {
3917             if (server.cluster->slots[j] == NULL ||
3918                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3919             {
3920                 new_state = CLUSTER_FAIL;
3921                 break;
3922             }
3923         }
3924     }
3925 
3926     /* Compute the cluster size, that is the number of master nodes
3927      * serving at least a single slot.
3928      *
3929      * At the same time count the number of reachable masters having
3930      * at least one slot. */
3931     {
3932         dictIterator *di;
3933         dictEntry *de;
3934 
3935         server.cluster->size = 0;
3936         di = dictGetSafeIterator(server.cluster->nodes);
3937         while((de = dictNext(di)) != NULL) {
3938             clusterNode *node = dictGetVal(de);
3939 
3940             if (nodeIsMaster(node) && node->numslots) {
3941                 server.cluster->size++;
3942                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3943                     reachable_masters++;
3944             }
3945         }
3946         dictReleaseIterator(di);
3947     }
3948 
3949     /* If we are in a minority partition, change the cluster state
3950      * to FAIL. */
3951     {
3952         int needed_quorum = (server.cluster->size / 2) + 1;
3953 
3954         if (reachable_masters < needed_quorum) {
3955             new_state = CLUSTER_FAIL;
3956             among_minority_time = mstime();
3957         }
3958     }
3959 
3960     /* Log a state change */
3961     if (new_state != server.cluster->state) {
3962         mstime_t rejoin_delay = server.cluster_node_timeout;
3963 
3964         /* If the instance is a master and was partitioned away with the
3965          * minority, don't let it accept queries for some time after the
3966          * partition heals, to make sure there is enough time to receive
3967          * a configuration update. */
3968         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3969             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3970         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3971             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3972 
3973         if (new_state == CLUSTER_OK &&
3974             nodeIsMaster(myself) &&
3975             mstime() - among_minority_time < rejoin_delay)
3976         {
3977             return;
3978         }
3979 
3980         /* Change the state and log the event. */
3981         serverLog(LL_WARNING,"Cluster state changed: %s",
3982             new_state == CLUSTER_OK ? "ok" : "fail");
3983         server.cluster->state = new_state;
3984     }
3985 }
3986 
3987 /* This function is called after the node startup in order to verify that data
3988  * loaded from disk is in agreement with the cluster configuration:
3989  *
3990  * 1) If we find keys about hash slots we have no responsibility for, the
3991  *    following happens:
3992  *    A) If no other node is in charge according to the current cluster
3993  *       configuration, we add these slots to our node.
3994  *    B) If according to our config other nodes are already in charge for
3995  *       this slots, we set the slots as IMPORTING from our point of view
3996  *       in order to justify we have those slots, and in order to make
3997  *       redis-trib aware of the issue, so that it can try to fix it.
3998  * 2) If we find data in a DB different than DB0 we return C_ERR to
3999  *    signal the caller it should quit the server with an error message
4000  *    or take other actions.
4001  *
4002  * The function always returns C_OK even if it will try to correct
4003  * the error described in "1". However if data is found in DB different
4004  * from DB0, C_ERR is returned.
4005  *
4006  * The function also uses the logging facility in order to warn the user
4007  * about desynchronizations between the data we have in memory and the
4008  * cluster configuration. */
verifyClusterConfigWithData(void)4009 int verifyClusterConfigWithData(void) {
4010     int j;
4011     int update_config = 0;
4012 
4013     /* Return ASAP if a module disabled cluster redirections. In that case
4014      * every master can store keys about every possible hash slot. */
4015     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
4016         return C_OK;
4017 
4018     /* If this node is a slave, don't perform the check at all as we
4019      * completely depend on the replication stream. */
4020     if (nodeIsSlave(myself)) return C_OK;
4021 
4022     /* Make sure we only have keys in DB0. */
4023     for (j = 1; j < server.dbnum; j++) {
4024         if (dictSize(server.db[j].dict)) return C_ERR;
4025     }
4026 
4027     /* Check that all the slots we see populated memory have a corresponding
4028      * entry in the cluster table. Otherwise fix the table. */
4029     for (j = 0; j < CLUSTER_SLOTS; j++) {
4030         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
4031         /* Check if we are assigned to this slot or if we are importing it.
4032          * In both cases check the next slot as the configuration makes
4033          * sense. */
4034         if (server.cluster->slots[j] == myself ||
4035             server.cluster->importing_slots_from[j] != NULL) continue;
4036 
4037         /* If we are here data and cluster config don't agree, and we have
4038          * slot 'j' populated even if we are not importing it, nor we are
4039          * assigned to this slot. Fix this condition. */
4040 
4041         update_config++;
4042         /* Case A: slot is unassigned. Take responsibility for it. */
4043         if (server.cluster->slots[j] == NULL) {
4044             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
4045                                     "Taking responsibility for it.",j);
4046             clusterAddSlot(myself,j);
4047         } else {
4048             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
4049                                     "assigned to another node. "
4050                                     "Setting it to importing state.",j);
4051             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
4052         }
4053     }
4054     if (update_config) clusterSaveConfigOrDie(1);
4055     return C_OK;
4056 }
4057 
4058 /* -----------------------------------------------------------------------------
4059  * SLAVE nodes handling
4060  * -------------------------------------------------------------------------- */
4061 
4062 /* Set the specified node 'n' as master for this node.
4063  * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)4064 void clusterSetMaster(clusterNode *n) {
4065     serverAssert(n != myself);
4066     serverAssert(myself->numslots == 0);
4067 
4068     if (nodeIsMaster(myself)) {
4069         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
4070         myself->flags |= CLUSTER_NODE_SLAVE;
4071         clusterCloseAllSlots();
4072     } else {
4073         if (myself->slaveof)
4074             clusterNodeRemoveSlave(myself->slaveof,myself);
4075     }
4076     myself->slaveof = n;
4077     clusterNodeAddSlave(n,myself);
4078     replicationSetMaster(n->ip, n->port);
4079     resetManualFailover();
4080 }
4081 
4082 /* -----------------------------------------------------------------------------
4083  * Nodes to string representation functions.
4084  * -------------------------------------------------------------------------- */
4085 
4086 struct redisNodeFlags {
4087     uint16_t flag;
4088     char *name;
4089 };
4090 
4091 static struct redisNodeFlags redisNodeFlagsTable[] = {
4092     {CLUSTER_NODE_MYSELF,       "myself,"},
4093     {CLUSTER_NODE_MASTER,       "master,"},
4094     {CLUSTER_NODE_SLAVE,        "slave,"},
4095     {CLUSTER_NODE_PFAIL,        "fail?,"},
4096     {CLUSTER_NODE_FAIL,         "fail,"},
4097     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
4098     {CLUSTER_NODE_NOADDR,       "noaddr,"},
4099     {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
4100 };
4101 
4102 /* Concatenate the comma separated list of node flags to the given SDS
4103  * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4104 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4105     size_t orig_len = sdslen(ci);
4106     int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4107     for (i = 0; i < size; i++) {
4108         struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4109         if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4110     }
4111     /* If no flag was added, add the "noflags" special flag. */
4112     if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4113     sdsIncrLen(ci,-1); /* Remove trailing comma. */
4114     return ci;
4115 }
4116 
4117 /* Generate a csv-alike representation of the specified cluster node.
4118  * See clusterGenNodesDescription() top comment for more information.
4119  *
4120  * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)4121 sds clusterGenNodeDescription(clusterNode *node) {
4122     int j, start;
4123     sds ci;
4124 
4125     /* Node coordinates */
4126     ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
4127         node->name,
4128         node->ip,
4129         node->port,
4130         node->cport);
4131 
4132     /* Flags */
4133     ci = representClusterNodeFlags(ci, node->flags);
4134 
4135     /* Slave of... or just "-" */
4136     if (node->slaveof)
4137         ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4138     else
4139         ci = sdscatlen(ci," - ",3);
4140 
4141     unsigned long long nodeEpoch = node->configEpoch;
4142     if (nodeIsSlave(node) && node->slaveof) {
4143         nodeEpoch = node->slaveof->configEpoch;
4144     }
4145     /* Latency from the POV of this node, config epoch, link status */
4146     ci = sdscatprintf(ci,"%lld %lld %llu %s",
4147         (long long) node->ping_sent,
4148         (long long) node->pong_received,
4149         nodeEpoch,
4150         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4151                     "connected" : "disconnected");
4152 
4153     /* Slots served by this instance */
4154     start = -1;
4155     for (j = 0; j < CLUSTER_SLOTS; j++) {
4156         int bit;
4157 
4158         if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4159             if (start == -1) start = j;
4160         }
4161         if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4162             if (bit && j == CLUSTER_SLOTS-1) j++;
4163 
4164             if (start == j-1) {
4165                 ci = sdscatprintf(ci," %d",start);
4166             } else {
4167                 ci = sdscatprintf(ci," %d-%d",start,j-1);
4168             }
4169             start = -1;
4170         }
4171     }
4172 
4173     /* Just for MYSELF node we also dump info about slots that
4174      * we are migrating to other instances or importing from other
4175      * instances. */
4176     if (node->flags & CLUSTER_NODE_MYSELF) {
4177         for (j = 0; j < CLUSTER_SLOTS; j++) {
4178             if (server.cluster->migrating_slots_to[j]) {
4179                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4180                     server.cluster->migrating_slots_to[j]->name);
4181             } else if (server.cluster->importing_slots_from[j]) {
4182                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4183                     server.cluster->importing_slots_from[j]->name);
4184             }
4185         }
4186     }
4187     return ci;
4188 }
4189 
4190 /* Generate a csv-alike representation of the nodes we are aware of,
4191  * including the "myself" node, and return an SDS string containing the
4192  * representation (it is up to the caller to free it).
4193  *
4194  * All the nodes matching at least one of the node flags specified in
4195  * "filter" are excluded from the output, so using zero as a filter will
4196  * include all the known nodes in the representation, including nodes in
4197  * the HANDSHAKE state.
4198  *
4199  * The representation obtained using this function is used for the output
4200  * of the CLUSTER NODES function, and as format for the cluster
4201  * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4202 sds clusterGenNodesDescription(int filter) {
4203     sds ci = sdsempty(), ni;
4204     dictIterator *di;
4205     dictEntry *de;
4206 
4207     di = dictGetSafeIterator(server.cluster->nodes);
4208     while((de = dictNext(di)) != NULL) {
4209         clusterNode *node = dictGetVal(de);
4210 
4211         if (node->flags & filter) continue;
4212         ni = clusterGenNodeDescription(node);
4213         ci = sdscatsds(ci,ni);
4214         sdsfree(ni);
4215         ci = sdscatlen(ci,"\n",1);
4216     }
4217     dictReleaseIterator(di);
4218     return ci;
4219 }
4220 
4221 /* -----------------------------------------------------------------------------
4222  * CLUSTER command
4223  * -------------------------------------------------------------------------- */
4224 
clusterGetMessageTypeString(int type)4225 const char *clusterGetMessageTypeString(int type) {
4226     switch(type) {
4227     case CLUSTERMSG_TYPE_PING: return "ping";
4228     case CLUSTERMSG_TYPE_PONG: return "pong";
4229     case CLUSTERMSG_TYPE_MEET: return "meet";
4230     case CLUSTERMSG_TYPE_FAIL: return "fail";
4231     case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4232     case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4233     case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4234     case CLUSTERMSG_TYPE_UPDATE: return "update";
4235     case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4236     case CLUSTERMSG_TYPE_MODULE: return "module";
4237     }
4238     return "unknown";
4239 }
4240 
getSlotOrReply(client * c,robj * o)4241 int getSlotOrReply(client *c, robj *o) {
4242     long long slot;
4243 
4244     if (getLongLongFromObject(o,&slot) != C_OK ||
4245         slot < 0 || slot >= CLUSTER_SLOTS)
4246     {
4247         addReplyError(c,"Invalid or out of range slot");
4248         return -1;
4249     }
4250     return (int) slot;
4251 }
4252 
clusterReplyMultiBulkSlots(client * c)4253 void clusterReplyMultiBulkSlots(client *c) {
4254     /* Format: 1) 1) start slot
4255      *            2) end slot
4256      *            3) 1) master IP
4257      *               2) master port
4258      *               3) node ID
4259      *            4) 1) replica IP
4260      *               2) replica port
4261      *               3) node ID
4262      *           ... continued until done
4263      */
4264 
4265     int num_masters = 0;
4266     void *slot_replylen = addReplyDeferredLen(c);
4267 
4268     dictEntry *de;
4269     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4270     while((de = dictNext(di)) != NULL) {
4271         clusterNode *node = dictGetVal(de);
4272         int j = 0, start = -1;
4273         int i, nested_elements = 0;
4274 
4275         /* Skip slaves (that are iterated when producing the output of their
4276          * master) and  masters not serving any slot. */
4277         if (!nodeIsMaster(node) || node->numslots == 0) continue;
4278 
4279         for(i = 0; i < node->numslaves; i++) {
4280             if (nodeFailed(node->slaves[i])) continue;
4281             nested_elements++;
4282         }
4283 
4284         for (j = 0; j < CLUSTER_SLOTS; j++) {
4285             int bit, i;
4286 
4287             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4288                 if (start == -1) start = j;
4289             }
4290             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4291                 addReplyArrayLen(c, nested_elements + 3); /* slots (2) + master addr (1). */
4292 
4293                 if (bit && j == CLUSTER_SLOTS-1) j++;
4294 
4295                 /* If slot exists in output map, add to it's list.
4296                  * else, create a new output map for this slot */
4297                 if (start == j-1) {
4298                     addReplyLongLong(c, start); /* only one slot; low==high */
4299                     addReplyLongLong(c, start);
4300                 } else {
4301                     addReplyLongLong(c, start); /* low */
4302                     addReplyLongLong(c, j-1);   /* high */
4303                 }
4304                 start = -1;
4305 
4306                 /* First node reply position is always the master */
4307                 addReplyArrayLen(c, 3);
4308                 addReplyBulkCString(c, node->ip);
4309                 addReplyLongLong(c, node->port);
4310                 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4311 
4312                 /* Remaining nodes in reply are replicas for slot range */
4313                 for (i = 0; i < node->numslaves; i++) {
4314                     /* This loop is copy/pasted from clusterGenNodeDescription()
4315                      * with modifications for per-slot node aggregation */
4316                     if (nodeFailed(node->slaves[i])) continue;
4317                     addReplyArrayLen(c, 3);
4318                     addReplyBulkCString(c, node->slaves[i]->ip);
4319                     addReplyLongLong(c, node->slaves[i]->port);
4320                     addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4321                 }
4322                 num_masters++;
4323             }
4324         }
4325     }
4326     dictReleaseIterator(di);
4327     setDeferredArrayLen(c, slot_replylen, num_masters);
4328 }
4329 
clusterCommand(client * c)4330 void clusterCommand(client *c) {
4331     if (server.cluster_enabled == 0) {
4332         addReplyError(c,"This instance has cluster support disabled");
4333         return;
4334     }
4335 
4336     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4337         const char *help[] = {
4338 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4339 "BUMPEPOCH -- Advance the cluster config epoch.",
4340 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4341 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4342 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4343 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4344 "FORGET <node-id> -- Remove a node from the cluster.",
4345 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4346 "FLUSHSLOTS -- Delete current node own slots information.",
4347 "INFO - Return information about the cluster.",
4348 "KEYSLOT <key> -- Return the hash slot for <key>.",
4349 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4350 "MYID -- Return the node id.",
4351 "NODES -- Return cluster configuration seen by node. Output format:",
4352 "    <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4353 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4354 "RESET [hard|soft] -- Reset current node (default: soft).",
4355 "SET-config-epoch <epoch> - Set config epoch of current node.",
4356 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4357 "REPLICAS <node-id> -- Return <node-id> replicas.",
4358 "SAVECONFIG - Force saving cluster configuration on disk.",
4359 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4360 "    start, end, master and replicas IP addresses, ports and ids",
4361 NULL
4362         };
4363         addReplyHelp(c, help);
4364     } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4365         /* CLUSTER MEET <ip> <port> [cport] */
4366         long long port, cport;
4367 
4368         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4369             addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4370                                 (char*)c->argv[3]->ptr);
4371             return;
4372         }
4373 
4374         if (c->argc == 5) {
4375             if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4376                 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4377                                     (char*)c->argv[4]->ptr);
4378                 return;
4379             }
4380         } else {
4381             cport = port + CLUSTER_PORT_INCR;
4382         }
4383 
4384         if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4385             errno == EINVAL)
4386         {
4387             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4388                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4389         } else {
4390             addReply(c,shared.ok);
4391         }
4392     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4393         /* CLUSTER NODES */
4394         sds nodes = clusterGenNodesDescription(0);
4395         addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
4396         sdsfree(nodes);
4397     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4398         /* CLUSTER MYID */
4399         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4400     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4401         /* CLUSTER SLOTS */
4402         clusterReplyMultiBulkSlots(c);
4403     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4404         /* CLUSTER FLUSHSLOTS */
4405         if (dictSize(server.db[0].dict) != 0) {
4406             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4407             return;
4408         }
4409         clusterDelNodeSlots(myself);
4410         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4411         addReply(c,shared.ok);
4412     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4413                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4414     {
4415         /* CLUSTER ADDSLOTS <slot> [slot] ... */
4416         /* CLUSTER DELSLOTS <slot> [slot] ... */
4417         int j, slot;
4418         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4419         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4420 
4421         memset(slots,0,CLUSTER_SLOTS);
4422         /* Check that all the arguments are parseable and that all the
4423          * slots are not already busy. */
4424         for (j = 2; j < c->argc; j++) {
4425             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4426                 zfree(slots);
4427                 return;
4428             }
4429             if (del && server.cluster->slots[slot] == NULL) {
4430                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4431                 zfree(slots);
4432                 return;
4433             } else if (!del && server.cluster->slots[slot]) {
4434                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4435                 zfree(slots);
4436                 return;
4437             }
4438             if (slots[slot]++ == 1) {
4439                 addReplyErrorFormat(c,"Slot %d specified multiple times",
4440                     (int)slot);
4441                 zfree(slots);
4442                 return;
4443             }
4444         }
4445         for (j = 0; j < CLUSTER_SLOTS; j++) {
4446             if (slots[j]) {
4447                 int retval;
4448 
4449                 /* If this slot was set as importing we can clear this
4450                  * state as now we are the real owner of the slot. */
4451                 if (server.cluster->importing_slots_from[j])
4452                     server.cluster->importing_slots_from[j] = NULL;
4453 
4454                 retval = del ? clusterDelSlot(j) :
4455                                clusterAddSlot(myself,j);
4456                 serverAssertWithInfo(c,NULL,retval == C_OK);
4457             }
4458         }
4459         zfree(slots);
4460         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4461         addReply(c,shared.ok);
4462     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4463         /* SETSLOT 10 MIGRATING <node ID> */
4464         /* SETSLOT 10 IMPORTING <node ID> */
4465         /* SETSLOT 10 STABLE */
4466         /* SETSLOT 10 NODE <node ID> */
4467         int slot;
4468         clusterNode *n;
4469 
4470         if (nodeIsSlave(myself)) {
4471             addReplyError(c,"Please use SETSLOT only with masters.");
4472             return;
4473         }
4474 
4475         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4476 
4477         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4478             if (server.cluster->slots[slot] != myself) {
4479                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4480                 return;
4481             }
4482             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4483                 addReplyErrorFormat(c,"I don't know about node %s",
4484                     (char*)c->argv[4]->ptr);
4485                 return;
4486             }
4487             server.cluster->migrating_slots_to[slot] = n;
4488         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4489             if (server.cluster->slots[slot] == myself) {
4490                 addReplyErrorFormat(c,
4491                     "I'm already the owner of hash slot %u",slot);
4492                 return;
4493             }
4494             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4495                 addReplyErrorFormat(c,"I don't know about node %s",
4496                     (char*)c->argv[4]->ptr);
4497                 return;
4498             }
4499             server.cluster->importing_slots_from[slot] = n;
4500         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4501             /* CLUSTER SETSLOT <SLOT> STABLE */
4502             server.cluster->importing_slots_from[slot] = NULL;
4503             server.cluster->migrating_slots_to[slot] = NULL;
4504         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4505             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4506             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4507 
4508             if (!n) {
4509                 addReplyErrorFormat(c,"Unknown node %s",
4510                     (char*)c->argv[4]->ptr);
4511                 return;
4512             }
4513             /* If this hash slot was served by 'myself' before to switch
4514              * make sure there are no longer local keys for this hash slot. */
4515             if (server.cluster->slots[slot] == myself && n != myself) {
4516                 if (countKeysInSlot(slot) != 0) {
4517                     addReplyErrorFormat(c,
4518                         "Can't assign hashslot %d to a different node "
4519                         "while I still hold keys for this hash slot.", slot);
4520                     return;
4521                 }
4522             }
4523             /* If this slot is in migrating status but we have no keys
4524              * for it assigning the slot to another node will clear
4525              * the migrating status. */
4526             if (countKeysInSlot(slot) == 0 &&
4527                 server.cluster->migrating_slots_to[slot])
4528                 server.cluster->migrating_slots_to[slot] = NULL;
4529 
4530             /* If this node was importing this slot, assigning the slot to
4531              * itself also clears the importing status. */
4532             if (n == myself &&
4533                 server.cluster->importing_slots_from[slot])
4534             {
4535                 /* This slot was manually migrated, set this node configEpoch
4536                  * to a new epoch so that the new version can be propagated
4537                  * by the cluster.
4538                  *
4539                  * Note that if this ever results in a collision with another
4540                  * node getting the same configEpoch, for example because a
4541                  * failover happens at the same time we close the slot, the
4542                  * configEpoch collision resolution will fix it assigning
4543                  * a different epoch to each node. */
4544                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4545                     serverLog(LL_WARNING,
4546                         "configEpoch updated after importing slot %d", slot);
4547                 }
4548                 server.cluster->importing_slots_from[slot] = NULL;
4549             }
4550             clusterDelSlot(slot);
4551             clusterAddSlot(n,slot);
4552         } else {
4553             addReplyError(c,
4554                 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4555             return;
4556         }
4557         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4558         addReply(c,shared.ok);
4559     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4560         /* CLUSTER BUMPEPOCH */
4561         int retval = clusterBumpConfigEpochWithoutConsensus();
4562         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4563                 (retval == C_OK) ? "BUMPED" : "STILL",
4564                 (unsigned long long) myself->configEpoch);
4565         addReplySds(c,reply);
4566     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4567         /* CLUSTER INFO */
4568         char *statestr[] = {"ok","fail","needhelp"};
4569         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4570         uint64_t myepoch;
4571         int j;
4572 
4573         for (j = 0; j < CLUSTER_SLOTS; j++) {
4574             clusterNode *n = server.cluster->slots[j];
4575 
4576             if (n == NULL) continue;
4577             slots_assigned++;
4578             if (nodeFailed(n)) {
4579                 slots_fail++;
4580             } else if (nodeTimedOut(n)) {
4581                 slots_pfail++;
4582             } else {
4583                 slots_ok++;
4584             }
4585         }
4586 
4587         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4588                   myself->slaveof->configEpoch : myself->configEpoch;
4589 
4590         sds info = sdscatprintf(sdsempty(),
4591             "cluster_state:%s\r\n"
4592             "cluster_slots_assigned:%d\r\n"
4593             "cluster_slots_ok:%d\r\n"
4594             "cluster_slots_pfail:%d\r\n"
4595             "cluster_slots_fail:%d\r\n"
4596             "cluster_known_nodes:%lu\r\n"
4597             "cluster_size:%d\r\n"
4598             "cluster_current_epoch:%llu\r\n"
4599             "cluster_my_epoch:%llu\r\n"
4600             , statestr[server.cluster->state],
4601             slots_assigned,
4602             slots_ok,
4603             slots_pfail,
4604             slots_fail,
4605             dictSize(server.cluster->nodes),
4606             server.cluster->size,
4607             (unsigned long long) server.cluster->currentEpoch,
4608             (unsigned long long) myepoch
4609         );
4610 
4611         /* Show stats about messages sent and received. */
4612         long long tot_msg_sent = 0;
4613         long long tot_msg_received = 0;
4614 
4615         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4616             if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4617             tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4618             info = sdscatprintf(info,
4619                 "cluster_stats_messages_%s_sent:%lld\r\n",
4620                 clusterGetMessageTypeString(i),
4621                 server.cluster->stats_bus_messages_sent[i]);
4622         }
4623         info = sdscatprintf(info,
4624             "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4625 
4626         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4627             if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4628             tot_msg_received += server.cluster->stats_bus_messages_received[i];
4629             info = sdscatprintf(info,
4630                 "cluster_stats_messages_%s_received:%lld\r\n",
4631                 clusterGetMessageTypeString(i),
4632                 server.cluster->stats_bus_messages_received[i]);
4633         }
4634         info = sdscatprintf(info,
4635             "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4636 
4637         /* Produce the reply protocol. */
4638         addReplyVerbatim(c,info,sdslen(info),"txt");
4639         sdsfree(info);
4640     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4641         int retval = clusterSaveConfig(1);
4642 
4643         if (retval == 0)
4644             addReply(c,shared.ok);
4645         else
4646             addReplyErrorFormat(c,"error saving the cluster node config: %s",
4647                 strerror(errno));
4648     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4649         /* CLUSTER KEYSLOT <key> */
4650         sds key = c->argv[2]->ptr;
4651 
4652         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4653     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4654         /* CLUSTER COUNTKEYSINSLOT <slot> */
4655         long long slot;
4656 
4657         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4658             return;
4659         if (slot < 0 || slot >= CLUSTER_SLOTS) {
4660             addReplyError(c,"Invalid slot");
4661             return;
4662         }
4663         addReplyLongLong(c,countKeysInSlot(slot));
4664     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4665         /* CLUSTER GETKEYSINSLOT <slot> <count> */
4666         long long maxkeys, slot;
4667         unsigned int numkeys, j;
4668         robj **keys;
4669 
4670         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4671             return;
4672         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4673             != C_OK)
4674             return;
4675         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4676             addReplyError(c,"Invalid slot or number of keys");
4677             return;
4678         }
4679 
4680         /* Avoid allocating more than needed in case of large COUNT argument
4681          * and smaller actual number of keys. */
4682         unsigned int keys_in_slot = countKeysInSlot(slot);
4683         if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4684 
4685         keys = zmalloc(sizeof(robj*)*maxkeys);
4686         numkeys = getKeysInSlot(slot, keys, maxkeys);
4687         addReplyArrayLen(c,numkeys);
4688         for (j = 0; j < numkeys; j++) {
4689             addReplyBulk(c,keys[j]);
4690             decrRefCount(keys[j]);
4691         }
4692         zfree(keys);
4693     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4694         /* CLUSTER FORGET <NODE ID> */
4695         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4696 
4697         if (!n) {
4698             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4699             return;
4700         } else if (n == myself) {
4701             addReplyError(c,"I tried hard but I can't forget myself...");
4702             return;
4703         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4704             addReplyError(c,"Can't forget my master!");
4705             return;
4706         }
4707         clusterBlacklistAddNode(n);
4708         clusterDelNode(n);
4709         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4710                              CLUSTER_TODO_SAVE_CONFIG);
4711         addReply(c,shared.ok);
4712     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4713         /* CLUSTER REPLICATE <NODE ID> */
4714         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4715 
4716         /* Lookup the specified node in our table. */
4717         if (!n) {
4718             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4719             return;
4720         }
4721 
4722         /* I can't replicate myself. */
4723         if (n == myself) {
4724             addReplyError(c,"Can't replicate myself");
4725             return;
4726         }
4727 
4728         /* Can't replicate a slave. */
4729         if (nodeIsSlave(n)) {
4730             addReplyError(c,"I can only replicate a master, not a replica.");
4731             return;
4732         }
4733 
4734         /* If the instance is currently a master, it should have no assigned
4735          * slots nor keys to accept to replicate some other node.
4736          * Slaves can switch to another master without issues. */
4737         if (nodeIsMaster(myself) &&
4738             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4739             addReplyError(c,
4740                 "To set a master the node must be empty and "
4741                 "without assigned slots.");
4742             return;
4743         }
4744 
4745         /* Set the master. */
4746         clusterSetMaster(n);
4747         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4748         addReply(c,shared.ok);
4749     } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4750                 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4751         /* CLUSTER SLAVES <NODE ID> */
4752         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4753         int j;
4754 
4755         /* Lookup the specified node in our table. */
4756         if (!n) {
4757             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4758             return;
4759         }
4760 
4761         if (nodeIsSlave(n)) {
4762             addReplyError(c,"The specified node is not a master");
4763             return;
4764         }
4765 
4766         addReplyArrayLen(c,n->numslaves);
4767         for (j = 0; j < n->numslaves; j++) {
4768             sds ni = clusterGenNodeDescription(n->slaves[j]);
4769             addReplyBulkCString(c,ni);
4770             sdsfree(ni);
4771         }
4772     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4773                c->argc == 3)
4774     {
4775         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4776         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4777 
4778         if (!n) {
4779             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4780             return;
4781         } else {
4782             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4783         }
4784     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4785                (c->argc == 2 || c->argc == 3))
4786     {
4787         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4788         int force = 0, takeover = 0;
4789 
4790         if (c->argc == 3) {
4791             if (!strcasecmp(c->argv[2]->ptr,"force")) {
4792                 force = 1;
4793             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4794                 takeover = 1;
4795                 force = 1; /* Takeover also implies force. */
4796             } else {
4797                 addReply(c,shared.syntaxerr);
4798                 return;
4799             }
4800         }
4801 
4802         /* Check preconditions. */
4803         if (nodeIsMaster(myself)) {
4804             addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4805             return;
4806         } else if (myself->slaveof == NULL) {
4807             addReplyError(c,"I'm a replica but my master is unknown to me");
4808             return;
4809         } else if (!force &&
4810                    (nodeFailed(myself->slaveof) ||
4811                     myself->slaveof->link == NULL))
4812         {
4813             addReplyError(c,"Master is down or failed, "
4814                             "please use CLUSTER FAILOVER FORCE");
4815             return;
4816         }
4817         resetManualFailover();
4818         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4819 
4820         if (takeover) {
4821             /* A takeover does not perform any initial check. It just
4822              * generates a new configuration epoch for this node without
4823              * consensus, claims the master's slots, and broadcast the new
4824              * configuration. */
4825             serverLog(LL_WARNING,"Taking over the master (user request).");
4826             clusterBumpConfigEpochWithoutConsensus();
4827             clusterFailoverReplaceYourMaster();
4828         } else if (force) {
4829             /* If this is a forced failover, we don't need to talk with our
4830              * master to agree about the offset. We just failover taking over
4831              * it without coordination. */
4832             serverLog(LL_WARNING,"Forced failover user request accepted.");
4833             server.cluster->mf_can_start = 1;
4834         } else {
4835             serverLog(LL_WARNING,"Manual failover user request accepted.");
4836             clusterSendMFStart(myself->slaveof);
4837         }
4838         addReply(c,shared.ok);
4839     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4840     {
4841         /* CLUSTER SET-CONFIG-EPOCH <epoch>
4842          *
4843          * The user is allowed to set the config epoch only when a node is
4844          * totally fresh: no config epoch, no other known node, and so forth.
4845          * This happens at cluster creation time to start with a cluster where
4846          * every node has a different node ID, without to rely on the conflicts
4847          * resolution system which is too slow when a big cluster is created. */
4848         long long epoch;
4849 
4850         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4851             return;
4852 
4853         if (epoch < 0) {
4854             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4855         } else if (dictSize(server.cluster->nodes) > 1) {
4856             addReplyError(c,"The user can assign a config epoch only when the "
4857                             "node does not know any other node.");
4858         } else if (myself->configEpoch != 0) {
4859             addReplyError(c,"Node config epoch is already non-zero");
4860         } else {
4861             myself->configEpoch = epoch;
4862             serverLog(LL_WARNING,
4863                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4864                 (unsigned long long) myself->configEpoch);
4865 
4866             if (server.cluster->currentEpoch < (uint64_t)epoch)
4867                 server.cluster->currentEpoch = epoch;
4868             /* No need to fsync the config here since in the unlucky event
4869              * of a failure to persist the config, the conflict resolution code
4870              * will assign a unique config to this node. */
4871             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4872                                  CLUSTER_TODO_SAVE_CONFIG);
4873             addReply(c,shared.ok);
4874         }
4875     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4876                (c->argc == 2 || c->argc == 3))
4877     {
4878         /* CLUSTER RESET [SOFT|HARD] */
4879         int hard = 0;
4880 
4881         /* Parse soft/hard argument. Default is soft. */
4882         if (c->argc == 3) {
4883             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4884                 hard = 1;
4885             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4886                 hard = 0;
4887             } else {
4888                 addReply(c,shared.syntaxerr);
4889                 return;
4890             }
4891         }
4892 
4893         /* Slaves can be reset while containing data, but not master nodes
4894          * that must be empty. */
4895         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4896             addReplyError(c,"CLUSTER RESET can't be called with "
4897                             "master nodes containing keys");
4898             return;
4899         }
4900         clusterReset(hard);
4901         addReply(c,shared.ok);
4902     } else {
4903         addReplySubcommandSyntaxError(c);
4904         return;
4905     }
4906 }
4907 
4908 /* -----------------------------------------------------------------------------
4909  * DUMP, RESTORE and MIGRATE commands
4910  * -------------------------------------------------------------------------- */
4911 
4912 /* Generates a DUMP-format representation of the object 'o', adding it to the
4913  * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4914 void createDumpPayload(rio *payload, robj *o, robj *key) {
4915     unsigned char buf[2];
4916     uint64_t crc;
4917 
4918     /* Serialize the object in an RDB-like format. It consist of an object type
4919      * byte followed by the serialized object. This is understood by RESTORE. */
4920     rioInitWithBuffer(payload,sdsempty());
4921     serverAssert(rdbSaveObjectType(payload,o));
4922     serverAssert(rdbSaveObject(payload,o,key));
4923 
4924     /* Write the footer, this is how it looks like:
4925      * ----------------+---------------------+---------------+
4926      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4927      * ----------------+---------------------+---------------+
4928      * RDB version and CRC are both in little endian.
4929      */
4930 
4931     /* RDB version */
4932     buf[0] = RDB_VERSION & 0xff;
4933     buf[1] = (RDB_VERSION >> 8) & 0xff;
4934     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4935 
4936     /* CRC64 */
4937     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4938                 sdslen(payload->io.buffer.ptr));
4939     memrev64ifbe(&crc);
4940     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4941 }
4942 
4943 /* Verify that the RDB version of the dump payload matches the one of this Redis
4944  * instance and that the checksum is ok.
4945  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4946  * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4947 int verifyDumpPayload(unsigned char *p, size_t len) {
4948     unsigned char *footer;
4949     uint16_t rdbver;
4950     uint64_t crc;
4951 
4952     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4953     if (len < 10) return C_ERR;
4954     footer = p+(len-10);
4955 
4956     /* Verify RDB version */
4957     rdbver = (footer[1] << 8) | footer[0];
4958     if (rdbver > RDB_VERSION) return C_ERR;
4959 
4960     /* Verify CRC64 */
4961     crc = crc64(0,p,len-8);
4962     memrev64ifbe(&crc);
4963     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4964 }
4965 
4966 /* DUMP keyname
4967  * DUMP is actually not used by Redis Cluster but it is the obvious
4968  * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4969 void dumpCommand(client *c) {
4970     robj *o;
4971     rio payload;
4972 
4973     /* Check if the key is here. */
4974     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4975         addReplyNull(c);
4976         return;
4977     }
4978 
4979     /* Create the DUMP encoded representation. */
4980     createDumpPayload(&payload,o,c->argv[1]);
4981 
4982     /* Transfer to the client */
4983     addReplyBulkSds(c,payload.io.buffer.ptr);
4984     return;
4985 }
4986 
4987 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4988 void restoreCommand(client *c) {
4989     long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4990     rio payload;
4991     int j, type, replace = 0, absttl = 0;
4992     robj *obj;
4993 
4994     /* Parse additional options */
4995     for (j = 4; j < c->argc; j++) {
4996         int additional = c->argc-j-1;
4997         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4998             replace = 1;
4999         } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
5000             absttl = 1;
5001         } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
5002                    lfu_freq == -1)
5003         {
5004             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
5005                     != C_OK) return;
5006             if (lru_idle < 0) {
5007                 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
5008                 return;
5009             }
5010             lru_clock = LRU_CLOCK();
5011             j++; /* Consume additional arg. */
5012         } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
5013                    lru_idle == -1)
5014         {
5015             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
5016                     != C_OK) return;
5017             if (lfu_freq < 0 || lfu_freq > 255) {
5018                 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
5019                 return;
5020             }
5021             j++; /* Consume additional arg. */
5022         } else {
5023             addReply(c,shared.syntaxerr);
5024             return;
5025         }
5026     }
5027 
5028     /* Make sure this key does not already exist here... */
5029     robj *key = c->argv[1];
5030     if (!replace && lookupKeyWrite(c->db,key) != NULL) {
5031         addReply(c,shared.busykeyerr);
5032         return;
5033     }
5034 
5035     /* Check if the TTL value makes sense */
5036     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
5037         return;
5038     } else if (ttl < 0) {
5039         addReplyError(c,"Invalid TTL value, must be >= 0");
5040         return;
5041     }
5042 
5043     /* Verify RDB version and data checksum. */
5044     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
5045     {
5046         addReplyError(c,"DUMP payload version or checksum are wrong");
5047         return;
5048     }
5049 
5050     rioInitWithBuffer(&payload,c->argv[3]->ptr);
5051     if (((type = rdbLoadObjectType(&payload)) == -1) ||
5052         ((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
5053     {
5054         addReplyError(c,"Bad data format");
5055         return;
5056     }
5057 
5058     /* Remove the old key if needed. */
5059     int deleted = 0;
5060     if (replace)
5061         deleted = dbDelete(c->db,key);
5062 
5063     if (ttl && !absttl) ttl+=mstime();
5064     if (ttl && checkAlreadyExpired(ttl)) {
5065         if (deleted) {
5066             rewriteClientCommandVector(c,2,shared.del,key);
5067             signalModifiedKey(c,c->db,key);
5068             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
5069             server.dirty++;
5070         }
5071         decrRefCount(obj);
5072         addReply(c, shared.ok);
5073         return;
5074     }
5075 
5076     /* Create the key and set the TTL if any */
5077     dbAdd(c->db,key,obj);
5078     if (ttl) {
5079         setExpire(c,c->db,key,ttl);
5080     }
5081     objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
5082     signalModifiedKey(c,c->db,key);
5083     notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
5084     addReply(c,shared.ok);
5085     server.dirty++;
5086 }
5087 
5088 /* MIGRATE socket cache implementation.
5089  *
5090  * We take a map between host:ip and a TCP socket that we used to connect
5091  * to this instance in recent time.
5092  * This sockets are closed when the max number we cache is reached, and also
5093  * in serverCron() when they are around for more than a few seconds. */
5094 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5095 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5096 
5097 typedef struct migrateCachedSocket {
5098     connection *conn;
5099     long last_dbid;
5100     time_t last_use_time;
5101 } migrateCachedSocket;
5102 
5103 /* Return a migrateCachedSocket containing a TCP socket connected with the
5104  * target instance, possibly returning a cached one.
5105  *
5106  * This function is responsible of sending errors to the client if a
5107  * connection can't be established. In this case -1 is returned.
5108  * Otherwise on success the socket is returned, and the caller should not
5109  * attempt to free it after usage.
5110  *
5111  * If the caller detects an error while using the socket, migrateCloseSocket()
5112  * should be called so that the connection will be created from scratch
5113  * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5114 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5115     connection *conn;
5116     sds name = sdsempty();
5117     migrateCachedSocket *cs;
5118 
5119     /* Check if we have an already cached socket for this ip:port pair. */
5120     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5121     name = sdscatlen(name,":",1);
5122     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5123     cs = dictFetchValue(server.migrate_cached_sockets,name);
5124     if (cs) {
5125         sdsfree(name);
5126         cs->last_use_time = server.unixtime;
5127         return cs;
5128     }
5129 
5130     /* No cached socket, create one. */
5131     if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5132         /* Too many items, drop one at random. */
5133         dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5134         cs = dictGetVal(de);
5135         connClose(cs->conn);
5136         zfree(cs);
5137         dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5138     }
5139 
5140     /* Create the socket */
5141     conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
5142     if (connBlockingConnect(conn, c->argv[1]->ptr, atoi(c->argv[2]->ptr), timeout)
5143             != C_OK) {
5144         addReplySds(c,
5145             sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5146         connClose(conn);
5147         sdsfree(name);
5148         return NULL;
5149     }
5150     connEnableTcpNoDelay(conn);
5151 
5152     /* Add to the cache and return it to the caller. */
5153     cs = zmalloc(sizeof(*cs));
5154     cs->conn = conn;
5155 
5156     cs->last_dbid = -1;
5157     cs->last_use_time = server.unixtime;
5158     dictAdd(server.migrate_cached_sockets,name,cs);
5159     return cs;
5160 }
5161 
5162 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5163 void migrateCloseSocket(robj *host, robj *port) {
5164     sds name = sdsempty();
5165     migrateCachedSocket *cs;
5166 
5167     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5168     name = sdscatlen(name,":",1);
5169     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5170     cs = dictFetchValue(server.migrate_cached_sockets,name);
5171     if (!cs) {
5172         sdsfree(name);
5173         return;
5174     }
5175 
5176     connClose(cs->conn);
5177     zfree(cs);
5178     dictDelete(server.migrate_cached_sockets,name);
5179     sdsfree(name);
5180 }
5181 
migrateCloseTimedoutSockets(void)5182 void migrateCloseTimedoutSockets(void) {
5183     dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5184     dictEntry *de;
5185 
5186     while((de = dictNext(di)) != NULL) {
5187         migrateCachedSocket *cs = dictGetVal(de);
5188 
5189         if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5190             connClose(cs->conn);
5191             zfree(cs);
5192             dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5193         }
5194     }
5195     dictReleaseIterator(di);
5196 }
5197 
5198 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
5199  *         AUTH2 username password]
5200  *
5201  * On in the multiple keys form:
5202  *
5203  * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
5204  *         AUTH2 username password] KEYS key1 key2 ... keyN */
migrateCommand(client * c)5205 void migrateCommand(client *c) {
5206     migrateCachedSocket *cs;
5207     int copy = 0, replace = 0, j;
5208     char *username = NULL;
5209     char *password = NULL;
5210     long timeout;
5211     long dbid;
5212     robj **ov = NULL; /* Objects to migrate. */
5213     robj **kv = NULL; /* Key names. */
5214     robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5215     rio cmd, payload;
5216     int may_retry = 1;
5217     int write_error = 0;
5218     int argv_rewritten = 0;
5219 
5220     /* To support the KEYS option we need the following additional state. */
5221     int first_key = 3; /* Argument index of the first key. */
5222     int num_keys = 1;  /* By default only migrate the 'key' argument. */
5223 
5224     /* Parse additional options */
5225     for (j = 6; j < c->argc; j++) {
5226         int moreargs = (c->argc-1) - j;
5227         if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5228             copy = 1;
5229         } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5230             replace = 1;
5231         } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5232             if (!moreargs) {
5233                 addReply(c,shared.syntaxerr);
5234                 return;
5235             }
5236             j++;
5237             password = c->argv[j]->ptr;
5238         } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
5239             if (moreargs < 2) {
5240                 addReply(c,shared.syntaxerr);
5241                 return;
5242             }
5243             username = c->argv[++j]->ptr;
5244             password = c->argv[++j]->ptr;
5245         } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5246             if (sdslen(c->argv[3]->ptr) != 0) {
5247                 addReplyError(c,
5248                     "When using MIGRATE KEYS option, the key argument"
5249                     " must be set to the empty string");
5250                 return;
5251             }
5252             first_key = j+1;
5253             num_keys = c->argc - j - 1;
5254             break; /* All the remaining args are keys. */
5255         } else {
5256             addReply(c,shared.syntaxerr);
5257             return;
5258         }
5259     }
5260 
5261     /* Sanity check */
5262     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5263         getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5264     {
5265         return;
5266     }
5267     if (timeout <= 0) timeout = 1000;
5268 
5269     /* Check if the keys are here. If at least one key is to migrate, do it
5270      * otherwise if all the keys are missing reply with "NOKEY" to signal
5271      * the caller there was nothing to migrate. We don't return an error in
5272      * this case, since often this is due to a normal condition like the key
5273      * expiring in the meantime. */
5274     ov = zrealloc(ov,sizeof(robj*)*num_keys);
5275     kv = zrealloc(kv,sizeof(robj*)*num_keys);
5276     int oi = 0;
5277 
5278     for (j = 0; j < num_keys; j++) {
5279         if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5280             kv[oi] = c->argv[first_key+j];
5281             oi++;
5282         }
5283     }
5284     num_keys = oi;
5285     if (num_keys == 0) {
5286         zfree(ov); zfree(kv);
5287         addReplySds(c,sdsnew("+NOKEY\r\n"));
5288         return;
5289     }
5290 
5291 try_again:
5292     write_error = 0;
5293 
5294     /* Connect */
5295     cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5296     if (cs == NULL) {
5297         zfree(ov); zfree(kv);
5298         return; /* error sent to the client by migrateGetSocket() */
5299     }
5300 
5301     rioInitWithBuffer(&cmd,sdsempty());
5302 
5303     /* Authentication */
5304     if (password) {
5305         int arity = username ? 3 : 2;
5306         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
5307         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5308         if (username) {
5309             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
5310                                  sdslen(username)));
5311         }
5312         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5313             sdslen(password)));
5314     }
5315 
5316     /* Send the SELECT command if the current DB is not already selected. */
5317     int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5318     if (select) {
5319         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5320         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5321         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5322     }
5323 
5324     int non_expired = 0; /* Number of keys that we'll find non expired.
5325                             Note that serializing large keys may take some time
5326                             so certain keys that were found non expired by the
5327                             lookupKey() function, may be expired later. */
5328 
5329     /* Create RESTORE payload and generate the protocol to call the command. */
5330     for (j = 0; j < num_keys; j++) {
5331         long long ttl = 0;
5332         long long expireat = getExpire(c->db,kv[j]);
5333 
5334         if (expireat != -1) {
5335             ttl = expireat-mstime();
5336             if (ttl < 0) {
5337                 continue;
5338             }
5339             if (ttl < 1) ttl = 1;
5340         }
5341 
5342         /* Relocate valid (non expired) keys into the array in successive
5343          * positions to remove holes created by the keys that were present
5344          * in the first lookup but are now expired after the second lookup. */
5345         kv[non_expired++] = kv[j];
5346 
5347         serverAssertWithInfo(c,NULL,
5348             rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5349 
5350         if (server.cluster_enabled)
5351             serverAssertWithInfo(c,NULL,
5352                 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5353         else
5354             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5355         serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5356         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5357                 sdslen(kv[j]->ptr)));
5358         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5359 
5360         /* Emit the payload argument, that is the serialized object using
5361          * the DUMP format. */
5362         createDumpPayload(&payload,ov[j],kv[j]);
5363         serverAssertWithInfo(c,NULL,
5364             rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5365                                sdslen(payload.io.buffer.ptr)));
5366         sdsfree(payload.io.buffer.ptr);
5367 
5368         /* Add the REPLACE option to the RESTORE command if it was specified
5369          * as a MIGRATE option. */
5370         if (replace)
5371             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5372     }
5373 
5374     /* Fix the actual number of keys we are migrating. */
5375     num_keys = non_expired;
5376 
5377     /* Transfer the query to the other node in 64K chunks. */
5378     errno = 0;
5379     {
5380         sds buf = cmd.io.buffer.ptr;
5381         size_t pos = 0, towrite;
5382         int nwritten = 0;
5383 
5384         while ((towrite = sdslen(buf)-pos) > 0) {
5385             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5386             nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
5387             if (nwritten != (signed)towrite) {
5388                 write_error = 1;
5389                 goto socket_err;
5390             }
5391             pos += nwritten;
5392         }
5393     }
5394 
5395     char buf0[1024]; /* Auth reply. */
5396     char buf1[1024]; /* Select reply. */
5397     char buf2[1024]; /* Restore reply. */
5398 
5399     /* Read the AUTH reply if needed. */
5400     if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
5401         goto socket_err;
5402 
5403     /* Read the SELECT reply if needed. */
5404     if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
5405         goto socket_err;
5406 
5407     /* Read the RESTORE replies. */
5408     int error_from_target = 0;
5409     int socket_error = 0;
5410     int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5411 
5412     /* Allocate the new argument vector that will replace the current command,
5413      * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5414      * We allocate num_keys+1 because the additional argument is for "DEL"
5415      * command name itself. */
5416     if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5417 
5418     for (j = 0; j < num_keys; j++) {
5419         if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
5420             socket_error = 1;
5421             break;
5422         }
5423         if ((password && buf0[0] == '-') ||
5424             (select && buf1[0] == '-') ||
5425             buf2[0] == '-')
5426         {
5427             /* On error assume that last_dbid is no longer valid. */
5428             if (!error_from_target) {
5429                 cs->last_dbid = -1;
5430                 char *errbuf;
5431                 if (password && buf0[0] == '-') errbuf = buf0;
5432                 else if (select && buf1[0] == '-') errbuf = buf1;
5433                 else errbuf = buf2;
5434 
5435                 error_from_target = 1;
5436                 addReplyErrorFormat(c,"Target instance replied with error: %s",
5437                     errbuf+1);
5438             }
5439         } else {
5440             if (!copy) {
5441                 /* No COPY option: remove the local key, signal the change. */
5442                 dbDelete(c->db,kv[j]);
5443                 signalModifiedKey(c,c->db,kv[j]);
5444                 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
5445                 server.dirty++;
5446 
5447                 /* Populate the argument vector to replace the old one. */
5448                 newargv[del_idx++] = kv[j];
5449                 incrRefCount(kv[j]);
5450             }
5451         }
5452     }
5453 
5454     /* On socket error, if we want to retry, do it now before rewriting the
5455      * command vector. We only retry if we are sure nothing was processed
5456      * and we failed to read the first reply (j == 0 test). */
5457     if (!error_from_target && socket_error && j == 0 && may_retry &&
5458         errno != ETIMEDOUT)
5459     {
5460         goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5461     }
5462 
5463     /* On socket errors, close the migration socket now that we still have
5464      * the original host/port in the ARGV. Later the original command may be
5465      * rewritten to DEL and will be too later. */
5466     if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5467 
5468     if (!copy) {
5469         /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5470          * this only for the keys for which we received an acknowledgement
5471          * from the receiving Redis server, by using the del_idx index. */
5472         if (del_idx > 1) {
5473             newargv[0] = createStringObject("DEL",3);
5474             /* Note that the following call takes ownership of newargv. */
5475             replaceClientCommandVector(c,del_idx,newargv);
5476             argv_rewritten = 1;
5477         } else {
5478             /* No key transfer acknowledged, no need to rewrite as DEL. */
5479             zfree(newargv);
5480         }
5481         newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5482     }
5483 
5484     /* If we are here and a socket error happened, we don't want to retry.
5485      * Just signal the problem to the client, but only do it if we did not
5486      * already queue a different error reported by the destination server. */
5487     if (!error_from_target && socket_error) {
5488         may_retry = 0;
5489         goto socket_err;
5490     }
5491 
5492     if (!error_from_target) {
5493         /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5494          * avoid SELECT the next time if the target DB is the same. Reply +OK.
5495          *
5496          * Note: If we reached this point, even if socket_error is true
5497          * still the SELECT command succeeded (otherwise the code jumps to
5498          * socket_err label. */
5499         cs->last_dbid = dbid;
5500         addReply(c,shared.ok);
5501     } else {
5502         /* On error we already sent it in the for loop above, and set
5503          * the currently selected socket to -1 to force SELECT the next time. */
5504     }
5505 
5506     sdsfree(cmd.io.buffer.ptr);
5507     zfree(ov); zfree(kv); zfree(newargv);
5508     return;
5509 
5510 /* On socket errors we try to close the cached socket and try again.
5511  * It is very common for the cached socket to get closed, if just reopening
5512  * it works it's a shame to notify the error to the caller. */
5513 socket_err:
5514     /* Cleanup we want to perform in both the retry and no retry case.
5515      * Note: Closing the migrate socket will also force SELECT next time. */
5516     sdsfree(cmd.io.buffer.ptr);
5517 
5518     /* If the command was rewritten as DEL and there was a socket error,
5519      * we already closed the socket earlier. While migrateCloseSocket()
5520      * is idempotent, the host/port arguments are now gone, so don't do it
5521      * again. */
5522     if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5523     zfree(newargv);
5524     newargv = NULL; /* This will get reallocated on retry. */
5525 
5526     /* Retry only if it's not a timeout and we never attempted a retry
5527      * (or the code jumping here did not set may_retry to zero). */
5528     if (errno != ETIMEDOUT && may_retry) {
5529         may_retry = 0;
5530         goto try_again;
5531     }
5532 
5533     /* Cleanup we want to do if no retry is attempted. */
5534     zfree(ov); zfree(kv);
5535     addReplySds(c,
5536         sdscatprintf(sdsempty(),
5537             "-IOERR error or timeout %s to target instance\r\n",
5538             write_error ? "writing" : "reading"));
5539     return;
5540 }
5541 
5542 /* -----------------------------------------------------------------------------
5543  * Cluster functions related to serving / redirecting clients
5544  * -------------------------------------------------------------------------- */
5545 
5546 /* The ASKING command is required after a -ASK redirection.
5547  * The client should issue ASKING before to actually send the command to
5548  * the target instance. See the Redis Cluster specification for more
5549  * information. */
askingCommand(client * c)5550 void askingCommand(client *c) {
5551     if (server.cluster_enabled == 0) {
5552         addReplyError(c,"This instance has cluster support disabled");
5553         return;
5554     }
5555     c->flags |= CLIENT_ASKING;
5556     addReply(c,shared.ok);
5557 }
5558 
5559 /* The READONLY command is used by clients to enter the read-only mode.
5560  * In this mode slaves will not redirect clients as long as clients access
5561  * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5562 void readonlyCommand(client *c) {
5563     if (server.cluster_enabled == 0) {
5564         addReplyError(c,"This instance has cluster support disabled");
5565         return;
5566     }
5567     c->flags |= CLIENT_READONLY;
5568     addReply(c,shared.ok);
5569 }
5570 
5571 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5572 void readwriteCommand(client *c) {
5573     c->flags &= ~CLIENT_READONLY;
5574     addReply(c,shared.ok);
5575 }
5576 
5577 /* Return the pointer to the cluster node that is able to serve the command.
5578  * For the function to succeed the command should only target either:
5579  *
5580  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5581  * 2) Multiple keys in the same hash slot, while the slot is stable (no
5582  *    resharding in progress).
5583  *
5584  * On success the function returns the node that is able to serve the request.
5585  * If the node is not 'myself' a redirection must be performed. The kind of
5586  * redirection is specified setting the integer passed by reference
5587  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5588  * CLUSTER_REDIR_MOVED.
5589  *
5590  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5591  *
5592  * If the command fails NULL is returned, and the reason of the failure is
5593  * provided via 'error_code', which will be set to:
5594  *
5595  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5596  * don't belong to the same hash slot.
5597  *
5598  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5599  * belonging to the same slot, but the slot is not stable (in migration or
5600  * importing state, likely because a resharding is in progress).
5601  *
5602  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5603  * not bound to any node. In this case the cluster global state should be
5604  * already "down" but it is fragile to rely on the update of the global state,
5605  * so we also handle it here.
5606  *
5607  * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
5608  * down but the user attempts to execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5609 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5610     clusterNode *n = NULL;
5611     robj *firstkey = NULL;
5612     int multiple_keys = 0;
5613     multiState *ms, _ms;
5614     multiCmd mc;
5615     int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5616 
5617     /* Allow any key to be set if a module disabled cluster redirections. */
5618     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5619         return myself;
5620 
5621     /* Set error code optimistically for the base case. */
5622     if (error_code) *error_code = CLUSTER_REDIR_NONE;
5623 
5624     /* Modules can turn off Redis Cluster redirection: this is useful
5625      * when writing a module that implements a completely different
5626      * distributed system. */
5627 
5628     /* We handle all the cases as if they were EXEC commands, so we have
5629      * a common code path for everything */
5630     if (cmd->proc == execCommand) {
5631         /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5632          * error. */
5633         if (!(c->flags & CLIENT_MULTI)) return myself;
5634         ms = &c->mstate;
5635     } else {
5636         /* In order to have a single codepath create a fake Multi State
5637          * structure if the client is not in MULTI/EXEC state, this way
5638          * we have a single codepath below. */
5639         ms = &_ms;
5640         _ms.commands = &mc;
5641         _ms.count = 1;
5642         mc.argv = argv;
5643         mc.argc = argc;
5644         mc.cmd = cmd;
5645     }
5646 
5647     /* Check that all the keys are in the same hash slot, and obtain this
5648      * slot and the node associated. */
5649     for (i = 0; i < ms->count; i++) {
5650         struct redisCommand *mcmd;
5651         robj **margv;
5652         int margc, *keyindex, numkeys, j;
5653 
5654         mcmd = ms->commands[i].cmd;
5655         margc = ms->commands[i].argc;
5656         margv = ms->commands[i].argv;
5657 
5658         getKeysResult result = GETKEYS_RESULT_INIT;
5659         numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
5660         keyindex = result.keys;
5661 
5662         for (j = 0; j < numkeys; j++) {
5663             robj *thiskey = margv[keyindex[j]];
5664             int thisslot = keyHashSlot((char*)thiskey->ptr,
5665                                        sdslen(thiskey->ptr));
5666 
5667             if (firstkey == NULL) {
5668                 /* This is the first key we see. Check what is the slot
5669                  * and node. */
5670                 firstkey = thiskey;
5671                 slot = thisslot;
5672                 n = server.cluster->slots[slot];
5673 
5674                 /* Error: If a slot is not served, we are in "cluster down"
5675                  * state. However the state is yet to be updated, so this was
5676                  * not trapped earlier in processCommand(). Report the same
5677                  * error to the client. */
5678                 if (n == NULL) {
5679                     getKeysFreeResult(&result);
5680                     if (error_code)
5681                         *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5682                     return NULL;
5683                 }
5684 
5685                 /* If we are migrating or importing this slot, we need to check
5686                  * if we have all the keys in the request (the only way we
5687                  * can safely serve the request, otherwise we return a TRYAGAIN
5688                  * error). To do so we set the importing/migrating state and
5689                  * increment a counter for every missing key. */
5690                 if (n == myself &&
5691                     server.cluster->migrating_slots_to[slot] != NULL)
5692                 {
5693                     migrating_slot = 1;
5694                 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5695                     importing_slot = 1;
5696                 }
5697             } else {
5698                 /* If it is not the first key, make sure it is exactly
5699                  * the same key as the first we saw. */
5700                 if (!equalStringObjects(firstkey,thiskey)) {
5701                     if (slot != thisslot) {
5702                         /* Error: multiple keys from different slots. */
5703                         getKeysFreeResult(&result);
5704                         if (error_code)
5705                             *error_code = CLUSTER_REDIR_CROSS_SLOT;
5706                         return NULL;
5707                     } else {
5708                         /* Flag this request as one with multiple different
5709                          * keys. */
5710                         multiple_keys = 1;
5711                     }
5712                 }
5713             }
5714 
5715             /* Migrating / Importing slot? Count keys we don't have. */
5716             if ((migrating_slot || importing_slot) &&
5717                 lookupKeyRead(&server.db[0],thiskey) == NULL)
5718             {
5719                 missing_keys++;
5720             }
5721         }
5722         getKeysFreeResult(&result);
5723     }
5724 
5725     /* No key at all in command? then we can serve the request
5726      * without redirections or errors in all the cases. */
5727     if (n == NULL) return myself;
5728 
5729     /* Cluster is globally down but we got keys? We only serve the request
5730      * if it is a read command and when allow_reads_when_down is enabled. */
5731     if (server.cluster->state != CLUSTER_OK) {
5732         if (!server.cluster_allow_reads_when_down) {
5733             /* The cluster is configured to block commands when the
5734              * cluster is down. */
5735             if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5736             return NULL;
5737         } else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
5738                 && !(cmd->proc == evalShaCommand))
5739         {
5740             /* The cluster is configured to allow read only commands
5741              * but this command is neither readonly, nor EVAL or
5742              * EVALSHA. */
5743             if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
5744             return NULL;
5745         } else {
5746             /* Fall through and allow the command to be executed:
5747              * this happens when server.cluster_allow_reads_when_down is
5748              * true and the command is a readonly command or EVAL / EVALSHA. */
5749         }
5750     }
5751 
5752     /* Return the hashslot by reference. */
5753     if (hashslot) *hashslot = slot;
5754 
5755     /* MIGRATE always works in the context of the local node if the slot
5756      * is open (migrating or importing state). We need to be able to freely
5757      * move keys among instances in this case. */
5758     if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5759         return myself;
5760 
5761     /* If we don't have all the keys and we are migrating the slot, send
5762      * an ASK redirection. */
5763     if (migrating_slot && missing_keys) {
5764         if (error_code) *error_code = CLUSTER_REDIR_ASK;
5765         return server.cluster->migrating_slots_to[slot];
5766     }
5767 
5768     /* If we are receiving the slot, and the client correctly flagged the
5769      * request as "ASKING", we can serve the request. However if the request
5770      * involves multiple keys and we don't have them all, the only option is
5771      * to send a TRYAGAIN error. */
5772     if (importing_slot &&
5773         (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5774     {
5775         if (multiple_keys && missing_keys) {
5776             if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5777             return NULL;
5778         } else {
5779             return myself;
5780         }
5781     }
5782 
5783     /* Handle the read-only client case reading from a slave: if this
5784      * node is a slave and the request is about a hash slot our master
5785      * is serving, we can reply without redirection. */
5786     int is_readonly_command = (c->cmd->flags & CMD_READONLY) ||
5787                               (c->cmd->proc == execCommand && !(c->mstate.cmd_inv_flags & CMD_READONLY));
5788     if (c->flags & CLIENT_READONLY &&
5789         (is_readonly_command || cmd->proc == evalCommand ||
5790          cmd->proc == evalShaCommand) &&
5791         nodeIsSlave(myself) &&
5792         myself->slaveof == n)
5793     {
5794         return myself;
5795     }
5796 
5797     /* Base case: just return the right node. However if this node is not
5798      * myself, set error_code to MOVED since we need to issue a redirection. */
5799     if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5800     return n;
5801 }
5802 
5803 /* Send the client the right redirection code, according to error_code
5804  * that should be set to one of CLUSTER_REDIR_* macros.
5805  *
5806  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5807  * are used, then the node 'n' should not be NULL, but should be the
5808  * node we want to mention in the redirection. Moreover hashslot should
5809  * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5810 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5811     if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5812         addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5813     } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5814         /* The request spawns multiple keys in the same slot,
5815          * but the slot is not "stable" currently as there is
5816          * a migration or import in progress. */
5817         addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5818     } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5819         addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5820     } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
5821         addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down and only accepts read commands\r\n"));
5822     } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5823         addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5824     } else if (error_code == CLUSTER_REDIR_MOVED ||
5825                error_code == CLUSTER_REDIR_ASK)
5826     {
5827         addReplySds(c,sdscatprintf(sdsempty(),
5828             "-%s %d %s:%d\r\n",
5829             (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5830             hashslot,n->ip,n->port));
5831     } else {
5832         serverPanic("getNodeByQuery() unknown error.");
5833     }
5834 }
5835 
5836 /* This function is called by the function processing clients incrementally
5837  * to detect timeouts, in order to handle the following case:
5838  *
5839  * 1) A client blocks with BLPOP or similar blocking operation.
5840  * 2) The master migrates the hash slot elsewhere or turns into a slave.
5841  * 3) The client may remain blocked forever (or up to the max timeout time)
5842  *    waiting for a key change that will never happen.
5843  *
5844  * If the client is found to be blocked into a hash slot this node no
5845  * longer handles, the client is sent a redirection error, and the function
5846  * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5847 int clusterRedirectBlockedClientIfNeeded(client *c) {
5848     if (c->flags & CLIENT_BLOCKED &&
5849         (c->btype == BLOCKED_LIST ||
5850          c->btype == BLOCKED_ZSET ||
5851          c->btype == BLOCKED_STREAM))
5852     {
5853         dictEntry *de;
5854         dictIterator *di;
5855 
5856         /* If the cluster is down, unblock the client with the right error.
5857          * If the cluster is configured to allow reads on cluster down, we
5858          * still want to emit this error since a write will be required
5859          * to unblock them which may never come.  */
5860         if (server.cluster->state == CLUSTER_FAIL) {
5861             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5862             return 1;
5863         }
5864 
5865         /* All keys must belong to the same slot, so check first key only. */
5866         di = dictGetIterator(c->bpop.keys);
5867         if ((de = dictNext(di)) != NULL) {
5868             robj *key = dictGetKey(de);
5869             int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5870             clusterNode *node = server.cluster->slots[slot];
5871 
5872             /* if the client is read-only and attempting to access key that our
5873              * replica can handle, allow it. */
5874             if ((c->flags & CLIENT_READONLY) &&
5875                 (c->lastcmd->flags & CMD_READONLY) &&
5876                 nodeIsSlave(myself) && myself->slaveof == node)
5877             {
5878                 node = myself;
5879             }
5880 
5881             /* We send an error and unblock the client if:
5882              * 1) The slot is unassigned, emitting a cluster down error.
5883              * 2) The slot is not handled by this node, nor being imported. */
5884             if (node != myself &&
5885                 server.cluster->importing_slots_from[slot] == NULL)
5886             {
5887                 if (node == NULL) {
5888                     clusterRedirectClient(c,NULL,0,
5889                         CLUSTER_REDIR_DOWN_UNBOUND);
5890                 } else {
5891                     clusterRedirectClient(c,node,slot,
5892                         CLUSTER_REDIR_MOVED);
5893                 }
5894                 dictReleaseIterator(di);
5895                 return 1;
5896             }
5897         }
5898         dictReleaseIterator(di);
5899     }
5900     return 0;
5901 }
5902