1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43 
44 /* A global reference to myself is handy to make code more clear.
45  * Myself always points to server.cluster->myself, that is, the clusterNode
46  * that represents this node. */
47 clusterNode *myself = NULL;
48 
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79 
80 /* -----------------------------------------------------------------------------
81  * Initialization
82  * -------------------------------------------------------------------------- */
83 
84 /* Load the cluster config from 'filename'.
85  *
86  * If the file does not exist or is zero-length (this may happen because
87  * when we lock the nodes.conf file, we create a zero-length one for the
88  * sake of locking if it does not already exist), C_ERR is returned.
89  * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91     FILE *fp = fopen(filename,"r");
92     struct stat sb;
93     char *line;
94     int maxline, j;
95 
96     if (fp == NULL) {
97         if (errno == ENOENT) {
98             return C_ERR;
99         } else {
100             serverLog(LL_WARNING,
101                 "Loading the cluster node config from %s: %s",
102                 filename, strerror(errno));
103             exit(1);
104         }
105     }
106 
107     /* Check if the file is zero-length: if so return C_ERR to signal
108      * we have to write the config. */
109     if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110         fclose(fp);
111         return C_ERR;
112     }
113 
114     /* Parse the file. Note that single lines of the cluster config file can
115      * be really long as they include all the hash slots of the node.
116      * This means in the worst possible case, half of the Redis slots will be
117      * present in a single line, possibly in importing or migrating state, so
118      * together with the node ID of the sender/receiver.
119      *
120      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121     maxline = 1024+CLUSTER_SLOTS*128;
122     line = zmalloc(maxline);
123     while(fgets(line,maxline,fp) != NULL) {
124         int argc;
125         sds *argv;
126         clusterNode *n, *master;
127         char *p, *s;
128 
129         /* Skip blank lines, they can be created either by users manually
130          * editing nodes.conf or by the config writing process if stopped
131          * before the truncate() call. */
132         if (line[0] == '\n' || line[0] == '\0') continue;
133 
134         /* Split the line into arguments for processing. */
135         argv = sdssplitargs(line,&argc);
136         if (argv == NULL) goto fmterr;
137 
138         /* Handle the special "vars" line. Don't pretend it is the last
139          * line even if it actually is when generated by Redis. */
140         if (strcasecmp(argv[0],"vars") == 0) {
141             if (!(argc % 2)) goto fmterr;
142             for (j = 1; j < argc; j += 2) {
143                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
144                     server.cluster->currentEpoch =
145                             strtoull(argv[j+1],NULL,10);
146                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
147                     server.cluster->lastVoteEpoch =
148                             strtoull(argv[j+1],NULL,10);
149                 } else {
150                     serverLog(LL_WARNING,
151                         "Skipping unknown cluster config variable '%s'",
152                         argv[j]);
153                 }
154             }
155             sdsfreesplitres(argv,argc);
156             continue;
157         }
158 
159         /* Regular config lines have at least eight fields */
160         if (argc < 8) {
161             sdsfreesplitres(argv,argc);
162             goto fmterr;
163         }
164 
165         /* Create this node if it does not exist */
166         n = clusterLookupNode(argv[0]);
167         if (!n) {
168             n = createClusterNode(argv[0],0);
169             clusterAddNode(n);
170         }
171         /* Address and port */
172         if ((p = strrchr(argv[1],':')) == NULL) {
173             sdsfreesplitres(argv,argc);
174             goto fmterr;
175         }
176         *p = '\0';
177         memcpy(n->ip,argv[1],strlen(argv[1])+1);
178         char *port = p+1;
179         char *busp = strchr(port,'@');
180         if (busp) {
181             *busp = '\0';
182             busp++;
183         }
184         n->port = atoi(port);
185         /* In older versions of nodes.conf the "@busport" part is missing.
186          * In this case we set it to the default offset of 10000 from the
187          * base port. */
188         n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
189 
190         /* Parse flags */
191         p = s = argv[2];
192         while(p) {
193             p = strchr(s,',');
194             if (p) *p = '\0';
195             if (!strcasecmp(s,"myself")) {
196                 serverAssert(server.cluster->myself == NULL);
197                 myself = server.cluster->myself = n;
198                 n->flags |= CLUSTER_NODE_MYSELF;
199             } else if (!strcasecmp(s,"master")) {
200                 n->flags |= CLUSTER_NODE_MASTER;
201             } else if (!strcasecmp(s,"slave")) {
202                 n->flags |= CLUSTER_NODE_SLAVE;
203             } else if (!strcasecmp(s,"fail?")) {
204                 n->flags |= CLUSTER_NODE_PFAIL;
205             } else if (!strcasecmp(s,"fail")) {
206                 n->flags |= CLUSTER_NODE_FAIL;
207                 n->fail_time = mstime();
208             } else if (!strcasecmp(s,"handshake")) {
209                 n->flags |= CLUSTER_NODE_HANDSHAKE;
210             } else if (!strcasecmp(s,"noaddr")) {
211                 n->flags |= CLUSTER_NODE_NOADDR;
212             } else if (!strcasecmp(s,"nofailover")) {
213                 n->flags |= CLUSTER_NODE_NOFAILOVER;
214             } else if (!strcasecmp(s,"noflags")) {
215                 /* nothing to do */
216             } else {
217                 serverPanic("Unknown flag in redis cluster config file");
218             }
219             if (p) s = p+1;
220         }
221 
222         /* Get master if any. Set the master and populate master's
223          * slave list. */
224         if (argv[3][0] != '-') {
225             master = clusterLookupNode(argv[3]);
226             if (!master) {
227                 master = createClusterNode(argv[3],0);
228                 clusterAddNode(master);
229             }
230             n->slaveof = master;
231             clusterNodeAddSlave(master,n);
232         }
233 
234         /* Set ping sent / pong received timestamps */
235         if (atoi(argv[4])) n->ping_sent = mstime();
236         if (atoi(argv[5])) n->pong_received = mstime();
237 
238         /* Set configEpoch for this node. */
239         n->configEpoch = strtoull(argv[6],NULL,10);
240 
241         /* Populate hash slots served by this instance. */
242         for (j = 8; j < argc; j++) {
243             int start, stop;
244 
245             if (argv[j][0] == '[') {
246                 /* Here we handle migrating / importing slots */
247                 int slot;
248                 char direction;
249                 clusterNode *cn;
250 
251                 p = strchr(argv[j],'-');
252                 serverAssert(p != NULL);
253                 *p = '\0';
254                 direction = p[1]; /* Either '>' or '<' */
255                 slot = atoi(argv[j]+1);
256                 if (slot < 0 || slot >= CLUSTER_SLOTS) {
257                     sdsfreesplitres(argv,argc);
258                     goto fmterr;
259                 }
260                 p += 3;
261                 cn = clusterLookupNode(p);
262                 if (!cn) {
263                     cn = createClusterNode(p,0);
264                     clusterAddNode(cn);
265                 }
266                 if (direction == '>') {
267                     server.cluster->migrating_slots_to[slot] = cn;
268                 } else {
269                     server.cluster->importing_slots_from[slot] = cn;
270                 }
271                 continue;
272             } else if ((p = strchr(argv[j],'-')) != NULL) {
273                 *p = '\0';
274                 start = atoi(argv[j]);
275                 stop = atoi(p+1);
276             } else {
277                 start = stop = atoi(argv[j]);
278             }
279             if (start < 0 || start >= CLUSTER_SLOTS ||
280                 stop < 0 || stop >= CLUSTER_SLOTS)
281             {
282                 sdsfreesplitres(argv,argc);
283                 goto fmterr;
284             }
285             while(start <= stop) clusterAddSlot(n, start++);
286         }
287 
288         sdsfreesplitres(argv,argc);
289     }
290     /* Config sanity check */
291     if (server.cluster->myself == NULL) goto fmterr;
292 
293     zfree(line);
294     fclose(fp);
295 
296     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
297 
298     /* Something that should never happen: currentEpoch smaller than
299      * the max epoch found in the nodes configuration. However we handle this
300      * as some form of protection against manual editing of critical files. */
301     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
302         server.cluster->currentEpoch = clusterGetMaxEpoch();
303     }
304     return C_OK;
305 
306 fmterr:
307     serverLog(LL_WARNING,
308         "Unrecoverable error: corrupted cluster config file.");
309     zfree(line);
310     if (fp) fclose(fp);
311     exit(1);
312 }
313 
314 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
315  *
316  * This function writes the node config and returns 0, on error -1
317  * is returned.
318  *
319  * Note: we need to write the file in an atomic way from the point of view
320  * of the POSIX filesystem semantics, so that if the server is stopped
321  * or crashes during the write, we'll end with either the old file or the
322  * new one. Since we have the full payload to write available we can use
323  * a single write to write the whole file. If the pre-existing file was
324  * bigger we pad our payload with newlines that are anyway ignored and truncate
325  * the file afterward. */
clusterSaveConfig(int do_fsync)326 int clusterSaveConfig(int do_fsync) {
327     sds ci;
328     size_t content_size;
329     struct stat sb;
330     int fd;
331 
332     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
333 
334     /* Get the nodes description and concatenate our "vars" directive to
335      * save currentEpoch and lastVoteEpoch. */
336     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
337     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
338         (unsigned long long) server.cluster->currentEpoch,
339         (unsigned long long) server.cluster->lastVoteEpoch);
340     content_size = sdslen(ci);
341 
342     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
343         == -1) goto err;
344 
345     /* Pad the new payload if the existing file length is greater. */
346     if (fstat(fd,&sb) != -1) {
347         if (sb.st_size > (off_t)content_size) {
348             ci = sdsgrowzero(ci,sb.st_size);
349             memset(ci+content_size,'\n',sb.st_size-content_size);
350         }
351     }
352     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
353     if (do_fsync) {
354         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
355         fsync(fd);
356     }
357 
358     /* Truncate the file if needed to remove the final \n padding that
359      * is just garbage. */
360     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
361         /* ftruncate() failing is not a critical error. */
362     }
363     close(fd);
364     sdsfree(ci);
365     return 0;
366 
367 err:
368     if (fd != -1) close(fd);
369     sdsfree(ci);
370     return -1;
371 }
372 
clusterSaveConfigOrDie(int do_fsync)373 void clusterSaveConfigOrDie(int do_fsync) {
374     if (clusterSaveConfig(do_fsync) == -1) {
375         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
376         exit(1);
377     }
378 }
379 
380 /* Lock the cluster config using flock(), and leaks the file descritor used to
381  * acquire the lock so that the file will be locked forever.
382  *
383  * This works because we always update nodes.conf with a new version
384  * in-place, reopening the file, and writing to it in place (later adjusting
385  * the length with ftruncate()).
386  *
387  * On success C_OK is returned, otherwise an error is logged and
388  * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)389 int clusterLockConfig(char *filename) {
390 /* flock() does not exist on Solaris
391  * and a fcntl-based solution won't help, as we constantly re-open that file,
392  * which will release _all_ locks anyway
393  */
394 #if !defined(__sun)
395     /* To lock it, we need to open the file in a way it is created if
396      * it does not exist, otherwise there is a race condition with other
397      * processes. */
398     int fd = open(filename,O_WRONLY|O_CREAT,0644);
399     if (fd == -1) {
400         serverLog(LL_WARNING,
401             "Can't open %s in order to acquire a lock: %s",
402             filename, strerror(errno));
403         return C_ERR;
404     }
405 
406     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
407         if (errno == EWOULDBLOCK) {
408             serverLog(LL_WARNING,
409                  "Sorry, the cluster configuration file %s is already used "
410                  "by a different Redis Cluster node. Please make sure that "
411                  "different nodes use different cluster configuration "
412                  "files.", filename);
413         } else {
414             serverLog(LL_WARNING,
415                 "Impossible to lock %s: %s", filename, strerror(errno));
416         }
417         close(fd);
418         return C_ERR;
419     }
420     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
421      * lock to the file as long as the process exists.
422      *
423      * After fork, the child process will get the fd opened by the parent process,
424      * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
425      * it will be closed in the child process.
426      * If it is not closed, when the main process is killed -9, but the child process
427      * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
428      * child process, and the main process will fail to get lock, means fail to start. */
429     server.cluster_config_file_lock_fd = fd;
430 #endif /* __sun */
431 
432     return C_OK;
433 }
434 
435 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
436  * in the "myself" node based on the current configuration of the node,
437  * that may change at runtime via CONFIG SET. This function changes the
438  * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)439 void clusterUpdateMyselfFlags(void) {
440     int oldflags = myself->flags;
441     int nofailover = server.cluster_slave_no_failover ?
442                      CLUSTER_NODE_NOFAILOVER : 0;
443     myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
444     myself->flags |= nofailover;
445     if (myself->flags != oldflags) {
446         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
447                              CLUSTER_TODO_UPDATE_STATE);
448     }
449 }
450 
clusterInit(void)451 void clusterInit(void) {
452     int saveconf = 0;
453 
454     server.cluster = zmalloc(sizeof(clusterState));
455     server.cluster->myself = NULL;
456     server.cluster->currentEpoch = 0;
457     server.cluster->state = CLUSTER_FAIL;
458     server.cluster->size = 1;
459     server.cluster->todo_before_sleep = 0;
460     server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
461     server.cluster->nodes_black_list =
462         dictCreate(&clusterNodesBlackListDictType,NULL);
463     server.cluster->failover_auth_time = 0;
464     server.cluster->failover_auth_count = 0;
465     server.cluster->failover_auth_rank = 0;
466     server.cluster->failover_auth_epoch = 0;
467     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
468     server.cluster->lastVoteEpoch = 0;
469     for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
470         server.cluster->stats_bus_messages_sent[i] = 0;
471         server.cluster->stats_bus_messages_received[i] = 0;
472     }
473     server.cluster->stats_pfail_nodes = 0;
474     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
475     clusterCloseAllSlots();
476 
477     /* Lock the cluster config file to make sure every node uses
478      * its own nodes.conf. */
479     server.cluster_config_file_lock_fd = -1;
480     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
481         exit(1);
482 
483     /* Load or create a new nodes configuration. */
484     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
485         /* No configuration found. We will just use the random name provided
486          * by the createClusterNode() function. */
487         myself = server.cluster->myself =
488             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
489         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
490             myself->name);
491         clusterAddNode(myself);
492         saveconf = 1;
493     }
494     if (saveconf) clusterSaveConfigOrDie(1);
495 
496     /* We need a listening TCP port for our cluster messaging needs. */
497     server.cfd_count = 0;
498 
499     /* Port sanity check II
500      * The other handshake port check is triggered too late to stop
501      * us from trying to use a too-high cluster port number. */
502     if (server.port > (65535-CLUSTER_PORT_INCR)) {
503         serverLog(LL_WARNING, "Redis port number too high. "
504                    "Cluster communication port is 10,000 port "
505                    "numbers higher than your Redis port. "
506                    "Your Redis port number must be "
507                    "lower than 55535.");
508         exit(1);
509     }
510 
511     if (listenToPort(server.port+CLUSTER_PORT_INCR,
512         server.cfd,&server.cfd_count) == C_ERR)
513     {
514         exit(1);
515     } else {
516         int j;
517 
518         for (j = 0; j < server.cfd_count; j++) {
519             if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
520                 clusterAcceptHandler, NULL) == AE_ERR)
521                     serverPanic("Unrecoverable error creating Redis Cluster "
522                                 "file event.");
523         }
524     }
525 
526     /* The slots -> keys map is a radix tree. Initialize it here. */
527     server.cluster->slots_to_keys = raxNew();
528     memset(server.cluster->slots_keys_count,0,
529            sizeof(server.cluster->slots_keys_count));
530 
531     /* Set myself->port / cport to my listening ports, we'll just need to
532      * discover the IP address via MEET messages. */
533     myself->port = server.port;
534     myself->cport = server.port+CLUSTER_PORT_INCR;
535     if (server.cluster_announce_port)
536         myself->port = server.cluster_announce_port;
537     if (server.cluster_announce_bus_port)
538         myself->cport = server.cluster_announce_bus_port;
539 
540     server.cluster->mf_end = 0;
541     resetManualFailover();
542     clusterUpdateMyselfFlags();
543 }
544 
545 /* Reset a node performing a soft or hard reset:
546  *
547  * 1) All other nodes are forget.
548  * 2) All the assigned / open slots are released.
549  * 3) If the node is a slave, it turns into a master.
550  * 5) Only for hard reset: a new Node ID is generated.
551  * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
552  * 7) The new configuration is saved and the cluster state updated.
553  * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)554 void clusterReset(int hard) {
555     dictIterator *di;
556     dictEntry *de;
557     int j;
558 
559     /* Turn into master. */
560     if (nodeIsSlave(myself)) {
561         clusterSetNodeAsMaster(myself);
562         replicationUnsetMaster();
563         emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
564     }
565 
566     /* Close slots, reset manual failover state. */
567     clusterCloseAllSlots();
568     resetManualFailover();
569 
570     /* Unassign all the slots. */
571     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
572 
573     /* Forget all the nodes, but myself. */
574     di = dictGetSafeIterator(server.cluster->nodes);
575     while((de = dictNext(di)) != NULL) {
576         clusterNode *node = dictGetVal(de);
577 
578         if (node == myself) continue;
579         clusterDelNode(node);
580     }
581     dictReleaseIterator(di);
582 
583     /* Hard reset only: set epochs to 0, change node ID. */
584     if (hard) {
585         sds oldname;
586 
587         server.cluster->currentEpoch = 0;
588         server.cluster->lastVoteEpoch = 0;
589         myself->configEpoch = 0;
590         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
591 
592         /* To change the Node ID we need to remove the old name from the
593          * nodes table, change the ID, and re-add back with new name. */
594         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
595         dictDelete(server.cluster->nodes,oldname);
596         sdsfree(oldname);
597         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
598         clusterAddNode(myself);
599         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
600     }
601 
602     /* Make sure to persist the new config and update the state. */
603     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
604                          CLUSTER_TODO_UPDATE_STATE|
605                          CLUSTER_TODO_FSYNC_CONFIG);
606 }
607 
608 /* -----------------------------------------------------------------------------
609  * CLUSTER communication link
610  * -------------------------------------------------------------------------- */
611 
createClusterLink(clusterNode * node)612 clusterLink *createClusterLink(clusterNode *node) {
613     clusterLink *link = zmalloc(sizeof(*link));
614     link->ctime = mstime();
615     link->sndbuf = sdsempty();
616     link->rcvbuf = sdsempty();
617     link->node = node;
618     link->fd = -1;
619     return link;
620 }
621 
622 /* Free a cluster link, but does not free the associated node of course.
623  * This function will just make sure that the original node associated
624  * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)625 void freeClusterLink(clusterLink *link) {
626     if (link->fd != -1) {
627         aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
628     }
629     sdsfree(link->sndbuf);
630     sdsfree(link->rcvbuf);
631     if (link->node)
632         link->node->link = NULL;
633     close(link->fd);
634     zfree(link);
635 }
636 
637 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)638 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
639     int cport, cfd;
640     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
641     char cip[NET_IP_STR_LEN];
642     clusterLink *link;
643     UNUSED(el);
644     UNUSED(mask);
645     UNUSED(privdata);
646 
647     /* If the server is starting up, don't accept cluster connections:
648      * UPDATE messages may interact with the database content. */
649     if (server.masterhost == NULL && server.loading) return;
650 
651     while(max--) {
652         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
653         if (cfd == ANET_ERR) {
654             if (errno != EWOULDBLOCK)
655                 serverLog(LL_VERBOSE,
656                     "Error accepting cluster node: %s", server.neterr);
657             return;
658         }
659         anetNonBlock(NULL,cfd);
660         anetEnableTcpNoDelay(NULL,cfd);
661 
662         /* Use non-blocking I/O for cluster messages. */
663         serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
664         /* Create a link object we use to handle the connection.
665          * It gets passed to the readable handler when data is available.
666          * Initiallly the link->node pointer is set to NULL as we don't know
667          * which node is, but the right node is references once we know the
668          * node identity. */
669         link = createClusterLink(NULL);
670         link->fd = cfd;
671         aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
672     }
673 }
674 
675 /* -----------------------------------------------------------------------------
676  * Key space handling
677  * -------------------------------------------------------------------------- */
678 
679 /* We have 16384 hash slots. The hash slot of a given key is obtained
680  * as the least significant 14 bits of the crc16 of the key.
681  *
682  * However if the key contains the {...} pattern, only the part between
683  * { and } is hashed. This may be useful in the future to force certain
684  * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)685 unsigned int keyHashSlot(char *key, int keylen) {
686     int s, e; /* start-end indexes of { and } */
687 
688     for (s = 0; s < keylen; s++)
689         if (key[s] == '{') break;
690 
691     /* No '{' ? Hash the whole key. This is the base case. */
692     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
693 
694     /* '{' found? Check if we have the corresponding '}'. */
695     for (e = s+1; e < keylen; e++)
696         if (key[e] == '}') break;
697 
698     /* No '}' or nothing between {} ? Hash the whole key. */
699     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
700 
701     /* If we are here there is both a { and a } on its right. Hash
702      * what is in the middle between { and }. */
703     return crc16(key+s+1,e-s-1) & 0x3FFF;
704 }
705 
706 /* -----------------------------------------------------------------------------
707  * CLUSTER node API
708  * -------------------------------------------------------------------------- */
709 
710 /* Create a new cluster node, with the specified flags.
711  * If "nodename" is NULL this is considered a first handshake and a random
712  * node name is assigned to this node (it will be fixed later when we'll
713  * receive the first pong).
714  *
715  * The node is created and returned to the user, but it is not automatically
716  * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)717 clusterNode *createClusterNode(char *nodename, int flags) {
718     clusterNode *node = zmalloc(sizeof(*node));
719 
720     if (nodename)
721         memcpy(node->name, nodename, CLUSTER_NAMELEN);
722     else
723         getRandomHexChars(node->name, CLUSTER_NAMELEN);
724     node->ctime = mstime();
725     node->configEpoch = 0;
726     node->flags = flags;
727     memset(node->slots,0,sizeof(node->slots));
728     node->numslots = 0;
729     node->numslaves = 0;
730     node->slaves = NULL;
731     node->slaveof = NULL;
732     node->ping_sent = node->pong_received = 0;
733     node->data_received = 0;
734     node->fail_time = 0;
735     node->link = NULL;
736     memset(node->ip,0,sizeof(node->ip));
737     node->port = 0;
738     node->cport = 0;
739     node->fail_reports = listCreate();
740     node->voted_time = 0;
741     node->orphaned_time = 0;
742     node->repl_offset_time = 0;
743     node->repl_offset = 0;
744     listSetFreeMethod(node->fail_reports,zfree);
745     return node;
746 }
747 
748 /* This function is called every time we get a failure report from a node.
749  * The side effect is to populate the fail_reports list (or to update
750  * the timestamp of an existing report).
751  *
752  * 'failing' is the node that is in failure state according to the
753  * 'sender' node.
754  *
755  * The function returns 0 if it just updates a timestamp of an existing
756  * failure report from the same sender. 1 is returned if a new failure
757  * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)758 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
759     list *l = failing->fail_reports;
760     listNode *ln;
761     listIter li;
762     clusterNodeFailReport *fr;
763 
764     /* If a failure report from the same sender already exists, just update
765      * the timestamp. */
766     listRewind(l,&li);
767     while ((ln = listNext(&li)) != NULL) {
768         fr = ln->value;
769         if (fr->node == sender) {
770             fr->time = mstime();
771             return 0;
772         }
773     }
774 
775     /* Otherwise create a new report. */
776     fr = zmalloc(sizeof(*fr));
777     fr->node = sender;
778     fr->time = mstime();
779     listAddNodeTail(l,fr);
780     return 1;
781 }
782 
783 /* Remove failure reports that are too old, where too old means reasonably
784  * older than the global node timeout. Note that anyway for a node to be
785  * flagged as FAIL we need to have a local PFAIL state that is at least
786  * older than the global node timeout, so we don't just trust the number
787  * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)788 void clusterNodeCleanupFailureReports(clusterNode *node) {
789     list *l = node->fail_reports;
790     listNode *ln;
791     listIter li;
792     clusterNodeFailReport *fr;
793     mstime_t maxtime = server.cluster_node_timeout *
794                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
795     mstime_t now = mstime();
796 
797     listRewind(l,&li);
798     while ((ln = listNext(&li)) != NULL) {
799         fr = ln->value;
800         if (now - fr->time > maxtime) listDelNode(l,ln);
801     }
802 }
803 
804 /* Remove the failing report for 'node' if it was previously considered
805  * failing by 'sender'. This function is called when a node informs us via
806  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
807  *
808  * Note that this function is called relatively often as it gets called even
809  * when there are no nodes failing, and is O(N), however when the cluster is
810  * fine the failure reports list is empty so the function runs in constant
811  * time.
812  *
813  * The function returns 1 if the failure report was found and removed.
814  * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)815 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
816     list *l = node->fail_reports;
817     listNode *ln;
818     listIter li;
819     clusterNodeFailReport *fr;
820 
821     /* Search for a failure report from this sender. */
822     listRewind(l,&li);
823     while ((ln = listNext(&li)) != NULL) {
824         fr = ln->value;
825         if (fr->node == sender) break;
826     }
827     if (!ln) return 0; /* No failure report from this sender. */
828 
829     /* Remove the failure report. */
830     listDelNode(l,ln);
831     clusterNodeCleanupFailureReports(node);
832     return 1;
833 }
834 
835 /* Return the number of external nodes that believe 'node' is failing,
836  * not including this node, that may have a PFAIL or FAIL state for this
837  * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)838 int clusterNodeFailureReportsCount(clusterNode *node) {
839     clusterNodeCleanupFailureReports(node);
840     return listLength(node->fail_reports);
841 }
842 
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)843 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
844     int j;
845 
846     for (j = 0; j < master->numslaves; j++) {
847         if (master->slaves[j] == slave) {
848             if ((j+1) < master->numslaves) {
849                 int remaining_slaves = (master->numslaves - j) - 1;
850                 memmove(master->slaves+j,master->slaves+(j+1),
851                         (sizeof(*master->slaves) * remaining_slaves));
852             }
853             master->numslaves--;
854             if (master->numslaves == 0)
855                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
856             return C_OK;
857         }
858     }
859     return C_ERR;
860 }
861 
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)862 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
863     int j;
864 
865     /* If it's already a slave, don't add it again. */
866     for (j = 0; j < master->numslaves; j++)
867         if (master->slaves[j] == slave) return C_ERR;
868     master->slaves = zrealloc(master->slaves,
869         sizeof(clusterNode*)*(master->numslaves+1));
870     master->slaves[master->numslaves] = slave;
871     master->numslaves++;
872     master->flags |= CLUSTER_NODE_MIGRATE_TO;
873     return C_OK;
874 }
875 
clusterCountNonFailingSlaves(clusterNode * n)876 int clusterCountNonFailingSlaves(clusterNode *n) {
877     int j, okslaves = 0;
878 
879     for (j = 0; j < n->numslaves; j++)
880         if (!nodeFailed(n->slaves[j])) okslaves++;
881     return okslaves;
882 }
883 
884 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)885 void freeClusterNode(clusterNode *n) {
886     sds nodename;
887     int j;
888 
889     /* If the node has associated slaves, we have to set
890      * all the slaves->slaveof fields to NULL (unknown). */
891     for (j = 0; j < n->numslaves; j++)
892         n->slaves[j]->slaveof = NULL;
893 
894     /* Remove this node from the list of slaves of its master. */
895     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
896 
897     /* Unlink from the set of nodes. */
898     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
899     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
900     sdsfree(nodename);
901 
902     /* Release link and associated data structures. */
903     if (n->link) freeClusterLink(n->link);
904     listRelease(n->fail_reports);
905     zfree(n->slaves);
906     zfree(n);
907 }
908 
909 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)910 int clusterAddNode(clusterNode *node) {
911     int retval;
912 
913     retval = dictAdd(server.cluster->nodes,
914             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
915     return (retval == DICT_OK) ? C_OK : C_ERR;
916 }
917 
918 /* Remove a node from the cluster. The functio performs the high level
919  * cleanup, calling freeClusterNode() for the low level cleanup.
920  * Here we do the following:
921  *
922  * 1) Mark all the slots handled by it as unassigned.
923  * 2) Remove all the failure reports sent by this node and referenced by
924  *    other nodes.
925  * 3) Free the node with freeClusterNode() that will in turn remove it
926  *    from the hash table and from the list of slaves of its master, if
927  *    it is a slave node.
928  */
clusterDelNode(clusterNode * delnode)929 void clusterDelNode(clusterNode *delnode) {
930     int j;
931     dictIterator *di;
932     dictEntry *de;
933 
934     /* 1) Mark slots as unassigned. */
935     for (j = 0; j < CLUSTER_SLOTS; j++) {
936         if (server.cluster->importing_slots_from[j] == delnode)
937             server.cluster->importing_slots_from[j] = NULL;
938         if (server.cluster->migrating_slots_to[j] == delnode)
939             server.cluster->migrating_slots_to[j] = NULL;
940         if (server.cluster->slots[j] == delnode)
941             clusterDelSlot(j);
942     }
943 
944     /* 2) Remove failure reports. */
945     di = dictGetSafeIterator(server.cluster->nodes);
946     while((de = dictNext(di)) != NULL) {
947         clusterNode *node = dictGetVal(de);
948 
949         if (node == delnode) continue;
950         clusterNodeDelFailureReport(node,delnode);
951     }
952     dictReleaseIterator(di);
953 
954     /* 3) Free the node, unlinking it from the cluster. */
955     freeClusterNode(delnode);
956 }
957 
958 /* Node lookup by name */
clusterLookupNode(const char * name)959 clusterNode *clusterLookupNode(const char *name) {
960     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
961     dictEntry *de;
962 
963     de = dictFind(server.cluster->nodes,s);
964     sdsfree(s);
965     if (de == NULL) return NULL;
966     return dictGetVal(de);
967 }
968 
969 /* This is only used after the handshake. When we connect a given IP/PORT
970  * as a result of CLUSTER MEET we don't have the node name yet, so we
971  * pick a random one, and will fix it when we receive the PONG request using
972  * this function. */
clusterRenameNode(clusterNode * node,char * newname)973 void clusterRenameNode(clusterNode *node, char *newname) {
974     int retval;
975     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
976 
977     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
978         node->name, newname);
979     retval = dictDelete(server.cluster->nodes, s);
980     sdsfree(s);
981     serverAssert(retval == DICT_OK);
982     memcpy(node->name, newname, CLUSTER_NAMELEN);
983     clusterAddNode(node);
984 }
985 
986 /* -----------------------------------------------------------------------------
987  * CLUSTER config epoch handling
988  * -------------------------------------------------------------------------- */
989 
990 /* Return the greatest configEpoch found in the cluster, or the current
991  * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)992 uint64_t clusterGetMaxEpoch(void) {
993     uint64_t max = 0;
994     dictIterator *di;
995     dictEntry *de;
996 
997     di = dictGetSafeIterator(server.cluster->nodes);
998     while((de = dictNext(di)) != NULL) {
999         clusterNode *node = dictGetVal(de);
1000         if (node->configEpoch > max) max = node->configEpoch;
1001     }
1002     dictReleaseIterator(di);
1003     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1004     return max;
1005 }
1006 
1007 /* If this node epoch is zero or is not already the greatest across the
1008  * cluster (from the POV of the local configuration), this function will:
1009  *
1010  * 1) Generate a new config epoch, incrementing the current epoch.
1011  * 2) Assign the new epoch to this node, WITHOUT any consensus.
1012  * 3) Persist the configuration on disk before sending packets with the
1013  *    new configuration.
1014  *
1015  * If the new config epoch is generated and assigend, C_OK is returned,
1016  * otherwise C_ERR is returned (since the node has already the greatest
1017  * configuration around) and no operation is performed.
1018  *
1019  * Important note: this function violates the principle that config epochs
1020  * should be generated with consensus and should be unique across the cluster.
1021  * However Redis Cluster uses this auto-generated new config epochs in two
1022  * cases:
1023  *
1024  * 1) When slots are closed after importing. Otherwise resharding would be
1025  *    too expensive.
1026  * 2) When CLUSTER FAILOVER is called with options that force a slave to
1027  *    failover its master even if there is not master majority able to
1028  *    create a new configuration epoch.
1029  *
1030  * Redis Cluster will not explode using this function, even in the case of
1031  * a collision between this node and another node, generating the same
1032  * configuration epoch unilaterally, because the config epoch conflict
1033  * resolution algorithm will eventually move colliding nodes to different
1034  * config epochs. However using this function may violate the "last failover
1035  * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1036 int clusterBumpConfigEpochWithoutConsensus(void) {
1037     uint64_t maxEpoch = clusterGetMaxEpoch();
1038 
1039     if (myself->configEpoch == 0 ||
1040         myself->configEpoch != maxEpoch)
1041     {
1042         server.cluster->currentEpoch++;
1043         myself->configEpoch = server.cluster->currentEpoch;
1044         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1045                              CLUSTER_TODO_FSYNC_CONFIG);
1046         serverLog(LL_WARNING,
1047             "New configEpoch set to %llu",
1048             (unsigned long long) myself->configEpoch);
1049         return C_OK;
1050     } else {
1051         return C_ERR;
1052     }
1053 }
1054 
1055 /* This function is called when this node is a master, and we receive from
1056  * another master a configuration epoch that is equal to our configuration
1057  * epoch.
1058  *
1059  * BACKGROUND
1060  *
1061  * It is not possible that different slaves get the same config
1062  * epoch during a failover election, because the slaves need to get voted
1063  * by a majority. However when we perform a manual resharding of the cluster
1064  * the node will assign a configuration epoch to itself without to ask
1065  * for agreement. Usually resharding happens when the cluster is working well
1066  * and is supervised by the sysadmin, however it is possible for a failover
1067  * to happen exactly while the node we are resharding a slot to assigns itself
1068  * a new configuration epoch, but before it is able to propagate it.
1069  *
1070  * So technically it is possible in this condition that two nodes end with
1071  * the same configuration epoch.
1072  *
1073  * Another possibility is that there are bugs in the implementation causing
1074  * this to happen.
1075  *
1076  * Moreover when a new cluster is created, all the nodes start with the same
1077  * configEpoch. This collision resolution code allows nodes to automatically
1078  * end with a different configEpoch at startup automatically.
1079  *
1080  * In all the cases, we want a mechanism that resolves this issue automatically
1081  * as a safeguard. The same configuration epoch for masters serving different
1082  * set of slots is not harmful, but it is if the nodes end serving the same
1083  * slots for some reason (manual errors or software bugs) without a proper
1084  * failover procedure.
1085  *
1086  * In general we want a system that eventually always ends with different
1087  * masters having different configuration epochs whatever happened, since
1088  * nothign is worse than a split-brain condition in a distributed system.
1089  *
1090  * BEHAVIOR
1091  *
1092  * When this function gets called, what happens is that if this node
1093  * has the lexicographically smaller Node ID compared to the other node
1094  * with the conflicting epoch (the 'sender' node), it will assign itself
1095  * the greatest configuration epoch currently detected among nodes plus 1.
1096  *
1097  * This means that even if there are multiple nodes colliding, the node
1098  * with the greatest Node ID never moves forward, so eventually all the nodes
1099  * end with a different configuration epoch.
1100  */
clusterHandleConfigEpochCollision(clusterNode * sender)1101 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1102     /* Prerequisites: nodes have the same configEpoch and are both masters. */
1103     if (sender->configEpoch != myself->configEpoch ||
1104         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1105     /* Don't act if the colliding node has a smaller Node ID. */
1106     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1107     /* Get the next ID available at the best of this node knowledge. */
1108     server.cluster->currentEpoch++;
1109     myself->configEpoch = server.cluster->currentEpoch;
1110     clusterSaveConfigOrDie(1);
1111     serverLog(LL_VERBOSE,
1112         "WARNING: configEpoch collision with node %.40s."
1113         " configEpoch set to %llu",
1114         sender->name,
1115         (unsigned long long) myself->configEpoch);
1116 }
1117 
1118 /* -----------------------------------------------------------------------------
1119  * CLUSTER nodes blacklist
1120  *
1121  * The nodes blacklist is just a way to ensure that a given node with a given
1122  * Node ID is not readded before some time elapsed (this time is specified
1123  * in seconds in CLUSTER_BLACKLIST_TTL).
1124  *
1125  * This is useful when we want to remove a node from the cluster completely:
1126  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1127  * that even if we receive gossip messages from other nodes that still remember
1128  * about the node we want to remove, we don't re-add it before some time.
1129  *
1130  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1131  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1132  * in the cluster without dealing with the problem of other nodes re-adding
1133  * back the node to nodes we already sent the FORGET command to.
1134  *
1135  * The data structure used is a hash table with an sds string representing
1136  * the node ID as key, and the time when it is ok to re-add the node as
1137  * value.
1138  * -------------------------------------------------------------------------- */
1139 
1140 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
1141 
1142 
1143 /* Before of the addNode() or Exists() operations we always remove expired
1144  * entries from the black list. This is an O(N) operation but it is not a
1145  * problem since add / exists operations are called very infrequently and
1146  * the hash table is supposed to contain very little elements at max.
1147  * However without the cleanup during long uptimes and with some automated
1148  * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1149 void clusterBlacklistCleanup(void) {
1150     dictIterator *di;
1151     dictEntry *de;
1152 
1153     di = dictGetSafeIterator(server.cluster->nodes_black_list);
1154     while((de = dictNext(di)) != NULL) {
1155         int64_t expire = dictGetUnsignedIntegerVal(de);
1156 
1157         if (expire < server.unixtime)
1158             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1159     }
1160     dictReleaseIterator(di);
1161 }
1162 
1163 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1164 void clusterBlacklistAddNode(clusterNode *node) {
1165     dictEntry *de;
1166     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1167 
1168     clusterBlacklistCleanup();
1169     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1170         /* If the key was added, duplicate the sds string representation of
1171          * the key for the next lookup. We'll free it at the end. */
1172         id = sdsdup(id);
1173     }
1174     de = dictFind(server.cluster->nodes_black_list,id);
1175     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1176     sdsfree(id);
1177 }
1178 
1179 /* Return non-zero if the specified node ID exists in the blacklist.
1180  * You don't need to pass an sds string here, any pointer to 40 bytes
1181  * will work. */
clusterBlacklistExists(char * nodeid)1182 int clusterBlacklistExists(char *nodeid) {
1183     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1184     int retval;
1185 
1186     clusterBlacklistCleanup();
1187     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1188     sdsfree(id);
1189     return retval;
1190 }
1191 
1192 /* -----------------------------------------------------------------------------
1193  * CLUSTER messages exchange - PING/PONG and gossip
1194  * -------------------------------------------------------------------------- */
1195 
1196 /* This function checks if a given node should be marked as FAIL.
1197  * It happens if the following conditions are met:
1198  *
1199  * 1) We received enough failure reports from other master nodes via gossip.
1200  *    Enough means that the majority of the masters signaled the node is
1201  *    down recently.
1202  * 2) We believe this node is in PFAIL state.
1203  *
1204  * If a failure is detected we also inform the whole cluster about this
1205  * event trying to force every other node to set the FAIL flag for the node.
1206  *
1207  * Note that the form of agreement used here is weak, as we collect the majority
1208  * of masters state during some time, and even if we force agreement by
1209  * propagating the FAIL message, because of partitions we may not reach every
1210  * node. However:
1211  *
1212  * 1) Either we reach the majority and eventually the FAIL state will propagate
1213  *    to all the cluster.
1214  * 2) Or there is no majority so no slave promotion will be authorized and the
1215  *    FAIL flag will be cleared after some time.
1216  */
markNodeAsFailingIfNeeded(clusterNode * node)1217 void markNodeAsFailingIfNeeded(clusterNode *node) {
1218     int failures;
1219     int needed_quorum = (server.cluster->size / 2) + 1;
1220 
1221     if (!nodeTimedOut(node)) return; /* We can reach it. */
1222     if (nodeFailed(node)) return; /* Already FAILing. */
1223 
1224     failures = clusterNodeFailureReportsCount(node);
1225     /* Also count myself as a voter if I'm a master. */
1226     if (nodeIsMaster(myself)) failures++;
1227     if (failures < needed_quorum) return; /* No weak agreement from masters. */
1228 
1229     serverLog(LL_NOTICE,
1230         "Marking node %.40s as failing (quorum reached).", node->name);
1231 
1232     /* Mark the node as failing. */
1233     node->flags &= ~CLUSTER_NODE_PFAIL;
1234     node->flags |= CLUSTER_NODE_FAIL;
1235     node->fail_time = mstime();
1236 
1237     /* Broadcast the failing node name to everybody, forcing all the other
1238      * reachable nodes to flag the node as FAIL. */
1239     if (nodeIsMaster(myself)) clusterSendFail(node->name);
1240     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1241 }
1242 
1243 /* This function is called only if a node is marked as FAIL, but we are able
1244  * to reach it again. It checks if there are the conditions to undo the FAIL
1245  * state. */
clearNodeFailureIfNeeded(clusterNode * node)1246 void clearNodeFailureIfNeeded(clusterNode *node) {
1247     mstime_t now = mstime();
1248 
1249     serverAssert(nodeFailed(node));
1250 
1251     /* For slaves we always clear the FAIL flag if we can contact the
1252      * node again. */
1253     if (nodeIsSlave(node) || node->numslots == 0) {
1254         serverLog(LL_NOTICE,
1255             "Clear FAIL state for node %.40s: %s is reachable again.",
1256                 node->name,
1257                 nodeIsSlave(node) ? "replica" : "master without slots");
1258         node->flags &= ~CLUSTER_NODE_FAIL;
1259         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1260     }
1261 
1262     /* If it is a master and...
1263      * 1) The FAIL state is old enough.
1264      * 2) It is yet serving slots from our point of view (not failed over).
1265      * Apparently no one is going to fix these slots, clear the FAIL flag. */
1266     if (nodeIsMaster(node) && node->numslots > 0 &&
1267         (now - node->fail_time) >
1268         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1269     {
1270         serverLog(LL_NOTICE,
1271             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1272                 node->name);
1273         node->flags &= ~CLUSTER_NODE_FAIL;
1274         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1275     }
1276 }
1277 
1278 /* Return true if we already have a node in HANDSHAKE state matching the
1279  * specified ip address and port number. This function is used in order to
1280  * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1281 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1282     dictIterator *di;
1283     dictEntry *de;
1284 
1285     di = dictGetSafeIterator(server.cluster->nodes);
1286     while((de = dictNext(di)) != NULL) {
1287         clusterNode *node = dictGetVal(de);
1288 
1289         if (!nodeInHandshake(node)) continue;
1290         if (!strcasecmp(node->ip,ip) &&
1291             node->port == port &&
1292             node->cport == cport) break;
1293     }
1294     dictReleaseIterator(di);
1295     return de != NULL;
1296 }
1297 
1298 /* Start an handshake with the specified address if there is not one
1299  * already in progress. Returns non-zero if the handshake was actually
1300  * started. On error zero is returned and errno is set to one of the
1301  * following values:
1302  *
1303  * EAGAIN - There is already an handshake in progress for this address.
1304  * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1305 int clusterStartHandshake(char *ip, int port, int cport) {
1306     clusterNode *n;
1307     char norm_ip[NET_IP_STR_LEN];
1308     struct sockaddr_storage sa;
1309 
1310     /* IP sanity check */
1311     if (inet_pton(AF_INET,ip,
1312             &(((struct sockaddr_in *)&sa)->sin_addr)))
1313     {
1314         sa.ss_family = AF_INET;
1315     } else if (inet_pton(AF_INET6,ip,
1316             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1317     {
1318         sa.ss_family = AF_INET6;
1319     } else {
1320         errno = EINVAL;
1321         return 0;
1322     }
1323 
1324     /* Port sanity check */
1325     if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1326         errno = EINVAL;
1327         return 0;
1328     }
1329 
1330     /* Set norm_ip as the normalized string representation of the node
1331      * IP address. */
1332     memset(norm_ip,0,NET_IP_STR_LEN);
1333     if (sa.ss_family == AF_INET)
1334         inet_ntop(AF_INET,
1335             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1336             norm_ip,NET_IP_STR_LEN);
1337     else
1338         inet_ntop(AF_INET6,
1339             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1340             norm_ip,NET_IP_STR_LEN);
1341 
1342     if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1343         errno = EAGAIN;
1344         return 0;
1345     }
1346 
1347     /* Add the node with a random address (NULL as first argument to
1348      * createClusterNode()). Everything will be fixed during the
1349      * handshake. */
1350     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1351     memcpy(n->ip,norm_ip,sizeof(n->ip));
1352     n->port = port;
1353     n->cport = cport;
1354     clusterAddNode(n);
1355     return 1;
1356 }
1357 
1358 /* Process the gossip section of PING or PONG packets.
1359  * Note that this function assumes that the packet is already sanity-checked
1360  * by the caller, not in the content of the gossip section, but in the
1361  * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1362 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1363     uint16_t count = ntohs(hdr->count);
1364     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1365     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1366 
1367     while(count--) {
1368         uint16_t flags = ntohs(g->flags);
1369         clusterNode *node;
1370         sds ci;
1371 
1372         if (server.verbosity == LL_DEBUG) {
1373             ci = representClusterNodeFlags(sdsempty(), flags);
1374             serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1375                 g->nodename,
1376                 g->ip,
1377                 ntohs(g->port),
1378                 ntohs(g->cport),
1379                 ci);
1380             sdsfree(ci);
1381         }
1382 
1383         /* Update our state accordingly to the gossip sections */
1384         node = clusterLookupNode(g->nodename);
1385         if (node) {
1386             /* We already know this node.
1387                Handle failure reports, only when the sender is a master. */
1388             if (sender && nodeIsMaster(sender) && node != myself) {
1389                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1390                     if (clusterNodeAddFailureReport(node,sender)) {
1391                         serverLog(LL_VERBOSE,
1392                             "Node %.40s reported node %.40s as not reachable.",
1393                             sender->name, node->name);
1394                     }
1395                     markNodeAsFailingIfNeeded(node);
1396                 } else {
1397                     if (clusterNodeDelFailureReport(node,sender)) {
1398                         serverLog(LL_VERBOSE,
1399                             "Node %.40s reported node %.40s is back online.",
1400                             sender->name, node->name);
1401                     }
1402                 }
1403             }
1404 
1405             /* If from our POV the node is up (no failure flags are set),
1406              * we have no pending ping for the node, nor we have failure
1407              * reports for this node, update the last pong time with the
1408              * one we see from the other nodes. */
1409             if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1410                 node->ping_sent == 0 &&
1411                 clusterNodeFailureReportsCount(node) == 0)
1412             {
1413                 mstime_t pongtime = ntohl(g->pong_received);
1414                 pongtime *= 1000; /* Convert back to milliseconds. */
1415 
1416                 /* Replace the pong time with the received one only if
1417                  * it's greater than our view but is not in the future
1418                  * (with 500 milliseconds tolerance) from the POV of our
1419                  * clock. */
1420                 if (pongtime <= (server.mstime+500) &&
1421                     pongtime > node->pong_received)
1422                 {
1423                     node->pong_received = pongtime;
1424                 }
1425             }
1426 
1427             /* If we already know this node, but it is not reachable, and
1428              * we see a different address in the gossip section of a node that
1429              * can talk with this other node, update the address, disconnect
1430              * the old link if any, so that we'll attempt to connect with the
1431              * new address. */
1432             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1433                 !(flags & CLUSTER_NODE_NOADDR) &&
1434                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1435                 (strcasecmp(node->ip,g->ip) ||
1436                  node->port != ntohs(g->port) ||
1437                  node->cport != ntohs(g->cport)))
1438             {
1439                 if (node->link) freeClusterLink(node->link);
1440                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1441                 node->port = ntohs(g->port);
1442                 node->cport = ntohs(g->cport);
1443                 node->flags &= ~CLUSTER_NODE_NOADDR;
1444             }
1445         } else {
1446             /* If it's not in NOADDR state and we don't have it, we
1447              * add it to our trusted dict with exact nodeid and flag.
1448              * Note that we cannot simply start a handshake against
1449              * this IP/PORT pairs, since IP/PORT can be reused already,
1450              * otherwise we risk joining another cluster.
1451              *
1452              * Note that we require that the sender of this gossip message
1453              * is a well known node in our cluster, otherwise we risk
1454              * joining another cluster. */
1455             if (sender &&
1456                 !(flags & CLUSTER_NODE_NOADDR) &&
1457                 !clusterBlacklistExists(g->nodename))
1458             {
1459                 clusterNode *node;
1460                 node = createClusterNode(g->nodename, flags);
1461                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1462                 node->port = ntohs(g->port);
1463                 node->cport = ntohs(g->cport);
1464                 clusterAddNode(node);
1465             }
1466         }
1467 
1468         /* Next node */
1469         g++;
1470     }
1471 }
1472 
1473 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1474  * If 'announced_ip' length is non-zero, it is used instead of extracting
1475  * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1476 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1477     if (announced_ip[0] != '\0') {
1478         memcpy(buf,announced_ip,NET_IP_STR_LEN);
1479         buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1480     } else {
1481         anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1482     }
1483 }
1484 
1485 /* Update the node address to the IP address that can be extracted
1486  * from link->fd, or if hdr->myip is non empty, to the address the node
1487  * is announcing us. The port is taken from the packet header as well.
1488  *
1489  * If the address or port changed, disconnect the node link so that we'll
1490  * connect again to the new address.
1491  *
1492  * If the ip/port pair are already correct no operation is performed at
1493  * all.
1494  *
1495  * The function returns 0 if the node address is still the same,
1496  * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1497 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1498                               clusterMsg *hdr)
1499 {
1500     char ip[NET_IP_STR_LEN] = {0};
1501     int port = ntohs(hdr->port);
1502     int cport = ntohs(hdr->cport);
1503 
1504     /* We don't proceed if the link is the same as the sender link, as this
1505      * function is designed to see if the node link is consistent with the
1506      * symmetric link that is used to receive PINGs from the node.
1507      *
1508      * As a side effect this function never frees the passed 'link', so
1509      * it is safe to call during packet processing. */
1510     if (link == node->link) return 0;
1511 
1512     nodeIp2String(ip,link,hdr->myip);
1513     if (node->port == port && node->cport == cport &&
1514         strcmp(ip,node->ip) == 0) return 0;
1515 
1516     /* IP / port is different, update it. */
1517     memcpy(node->ip,ip,sizeof(ip));
1518     node->port = port;
1519     node->cport = cport;
1520     if (node->link) freeClusterLink(node->link);
1521     node->flags &= ~CLUSTER_NODE_NOADDR;
1522     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1523         node->name, node->ip, node->port);
1524 
1525     /* Check if this is our master and we have to change the
1526      * replication target as well. */
1527     if (nodeIsSlave(myself) && myself->slaveof == node)
1528         replicationSetMaster(node->ip, node->port);
1529     return 1;
1530 }
1531 
1532 /* Reconfigure the specified node 'n' as a master. This function is called when
1533  * a node that we believed to be a slave is now acting as master in order to
1534  * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1535 void clusterSetNodeAsMaster(clusterNode *n) {
1536     if (nodeIsMaster(n)) return;
1537 
1538     if (n->slaveof) {
1539         clusterNodeRemoveSlave(n->slaveof,n);
1540         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1541     }
1542     n->flags &= ~CLUSTER_NODE_SLAVE;
1543     n->flags |= CLUSTER_NODE_MASTER;
1544     n->slaveof = NULL;
1545 
1546     /* Update config and state. */
1547     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1548                          CLUSTER_TODO_UPDATE_STATE);
1549 }
1550 
1551 /* This function is called when we receive a master configuration via a
1552  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1553  * node, and the set of slots claimed under this configEpoch.
1554  *
1555  * What we do is to rebind the slots with newer configuration compared to our
1556  * local configuration, and if needed, we turn ourself into a replica of the
1557  * node (see the function comments for more info).
1558  *
1559  * The 'sender' is the node for which we received a configuration update.
1560  * Sometimes it is not actually the "Sender" of the information, like in the
1561  * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1562 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1563     int j;
1564     clusterNode *curmaster, *newmaster = NULL;
1565     /* The dirty slots list is a list of slots for which we lose the ownership
1566      * while having still keys inside. This usually happens after a failover
1567      * or after a manual cluster reconfiguration operated by the admin.
1568      *
1569      * If the update message is not able to demote a master to slave (in this
1570      * case we'll resync with the master updating the whole key space), we
1571      * need to delete all the keys in the slots we lost ownership. */
1572     uint16_t dirty_slots[CLUSTER_SLOTS];
1573     int dirty_slots_count = 0;
1574 
1575     /* Here we set curmaster to this node or the node this node
1576      * replicates to if it's a slave. In the for loop we are
1577      * interested to check if slots are taken away from curmaster. */
1578     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1579 
1580     if (sender == myself) {
1581         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1582         return;
1583     }
1584 
1585     for (j = 0; j < CLUSTER_SLOTS; j++) {
1586         if (bitmapTestBit(slots,j)) {
1587             /* The slot is already bound to the sender of this message. */
1588             if (server.cluster->slots[j] == sender) continue;
1589 
1590             /* The slot is in importing state, it should be modified only
1591              * manually via redis-trib (example: a resharding is in progress
1592              * and the migrating side slot was already closed and is advertising
1593              * a new config. We still want the slot to be closed manually). */
1594             if (server.cluster->importing_slots_from[j]) continue;
1595 
1596             /* We rebind the slot to the new node claiming it if:
1597              * 1) The slot was unassigned or the new node claims it with a
1598              *    greater configEpoch.
1599              * 2) We are not currently importing the slot. */
1600             if (server.cluster->slots[j] == NULL ||
1601                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1602             {
1603                 /* Was this slot mine, and still contains keys? Mark it as
1604                  * a dirty slot. */
1605                 if (server.cluster->slots[j] == myself &&
1606                     countKeysInSlot(j) &&
1607                     sender != myself)
1608                 {
1609                     dirty_slots[dirty_slots_count] = j;
1610                     dirty_slots_count++;
1611                 }
1612 
1613                 if (server.cluster->slots[j] == curmaster)
1614                     newmaster = sender;
1615                 clusterDelSlot(j);
1616                 clusterAddSlot(sender,j);
1617                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1618                                      CLUSTER_TODO_UPDATE_STATE|
1619                                      CLUSTER_TODO_FSYNC_CONFIG);
1620             }
1621         }
1622     }
1623 
1624     /* After updating the slots configuration, don't do any actual change
1625      * in the state of the server if a module disabled Redis Cluster
1626      * keys redirections. */
1627     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1628         return;
1629 
1630     /* If at least one slot was reassigned from a node to another node
1631      * with a greater configEpoch, it is possible that:
1632      * 1) We are a master left without slots. This means that we were
1633      *    failed over and we should turn into a replica of the new
1634      *    master.
1635      * 2) We are a slave and our master is left without slots. We need
1636      *    to replicate to the new slots owner. */
1637     if (newmaster && curmaster->numslots == 0) {
1638         serverLog(LL_WARNING,
1639             "Configuration change detected. Reconfiguring myself "
1640             "as a replica of %.40s", sender->name);
1641         clusterSetMaster(sender);
1642         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1643                              CLUSTER_TODO_UPDATE_STATE|
1644                              CLUSTER_TODO_FSYNC_CONFIG);
1645     } else if (dirty_slots_count) {
1646         /* If we are here, we received an update message which removed
1647          * ownership for certain slots we still have keys about, but still
1648          * we are serving some slots, so this master node was not demoted to
1649          * a slave.
1650          *
1651          * In order to maintain a consistent state between keys and slots
1652          * we need to remove all the keys from the slots we lost. */
1653         for (j = 0; j < dirty_slots_count; j++)
1654             delKeysInSlot(dirty_slots[j]);
1655     }
1656 }
1657 
1658 /* When this function is called, there is a packet to process starting
1659  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1660  * function should just handle the higher level stuff of processing the
1661  * packet, modifying the cluster state if needed.
1662  *
1663  * The function returns 1 if the link is still valid after the packet
1664  * was processed, otherwise 0 if the link was freed since the packet
1665  * processing lead to some inconsistency error (for instance a PONG
1666  * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1667 int clusterProcessPacket(clusterLink *link) {
1668     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1669     uint32_t totlen = ntohl(hdr->totlen);
1670     uint16_t type = ntohs(hdr->type);
1671     mstime_t now = mstime();
1672 
1673     if (type < CLUSTERMSG_TYPE_COUNT)
1674         server.cluster->stats_bus_messages_received[type]++;
1675     serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1676         type, (unsigned long) totlen);
1677 
1678     /* Perform sanity checks */
1679     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1680     if (totlen > sdslen(link->rcvbuf)) return 1;
1681 
1682     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1683         /* Can't handle messages of different versions. */
1684         return 1;
1685     }
1686 
1687     uint16_t flags = ntohs(hdr->flags);
1688     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1689     clusterNode *sender;
1690 
1691     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1692         type == CLUSTERMSG_TYPE_MEET)
1693     {
1694         uint16_t count = ntohs(hdr->count);
1695         uint32_t explen; /* expected length of this packet */
1696 
1697         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1698         explen += (sizeof(clusterMsgDataGossip)*count);
1699         if (totlen != explen) return 1;
1700     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1701         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1702 
1703         explen += sizeof(clusterMsgDataFail);
1704         if (totlen != explen) return 1;
1705     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1706         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1707 
1708         explen += sizeof(clusterMsgDataPublish) -
1709                 8 +
1710                 ntohl(hdr->data.publish.msg.channel_len) +
1711                 ntohl(hdr->data.publish.msg.message_len);
1712         if (totlen != explen) return 1;
1713     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1714                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1715                type == CLUSTERMSG_TYPE_MFSTART)
1716     {
1717         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1718 
1719         if (totlen != explen) return 1;
1720     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1721         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1722 
1723         explen += sizeof(clusterMsgDataUpdate);
1724         if (totlen != explen) return 1;
1725     } else if (type == CLUSTERMSG_TYPE_MODULE) {
1726         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1727 
1728         explen += sizeof(clusterMsgDataPublish) -
1729                 3 + ntohl(hdr->data.module.msg.len);
1730         if (totlen != explen) return 1;
1731     }
1732 
1733     /* Check if the sender is a known node. */
1734     sender = clusterLookupNode(hdr->sender);
1735 
1736     /* Update the last time we saw any data from this node. We
1737      * use this in order to avoid detecting a timeout from a node that
1738      * is just sending a lot of data in the cluster bus, for instance
1739      * because of Pub/Sub. */
1740     if (sender) sender->data_received = now;
1741 
1742     if (sender && !nodeInHandshake(sender)) {
1743         /* Update our curretEpoch if we see a newer epoch in the cluster. */
1744         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1745         senderConfigEpoch = ntohu64(hdr->configEpoch);
1746         if (senderCurrentEpoch > server.cluster->currentEpoch)
1747             server.cluster->currentEpoch = senderCurrentEpoch;
1748         /* Update the sender configEpoch if it is publishing a newer one. */
1749         if (senderConfigEpoch > sender->configEpoch) {
1750             sender->configEpoch = senderConfigEpoch;
1751             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1752                                  CLUSTER_TODO_FSYNC_CONFIG);
1753         }
1754         /* Update the replication offset info for this node. */
1755         sender->repl_offset = ntohu64(hdr->offset);
1756         sender->repl_offset_time = now;
1757         /* If we are a slave performing a manual failover and our master
1758          * sent its offset while already paused, populate the MF state. */
1759         if (server.cluster->mf_end &&
1760             nodeIsSlave(myself) &&
1761             myself->slaveof == sender &&
1762             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1763             server.cluster->mf_master_offset == 0)
1764         {
1765             server.cluster->mf_master_offset = sender->repl_offset;
1766             serverLog(LL_WARNING,
1767                 "Received replication offset for paused "
1768                 "master manual failover: %lld",
1769                 server.cluster->mf_master_offset);
1770         }
1771     }
1772 
1773     /* Initial processing of PING and MEET requests replying with a PONG. */
1774     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1775         serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1776 
1777         /* We use incoming MEET messages in order to set the address
1778          * for 'myself', since only other cluster nodes will send us
1779          * MEET messages on handshakes, when the cluster joins, or
1780          * later if we changed address, and those nodes will use our
1781          * official address to connect to us. So by obtaining this address
1782          * from the socket is a simple way to discover / update our own
1783          * address in the cluster without it being hardcoded in the config.
1784          *
1785          * However if we don't have an address at all, we update the address
1786          * even with a normal PING packet. If it's wrong it will be fixed
1787          * by MEET later. */
1788         if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1789             server.cluster_announce_ip == NULL)
1790         {
1791             char ip[NET_IP_STR_LEN];
1792 
1793             if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1794                 strcmp(ip,myself->ip))
1795             {
1796                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1797                 serverLog(LL_WARNING,"IP address for this node updated to %s",
1798                     myself->ip);
1799                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1800             }
1801         }
1802 
1803         /* Add this node if it is new for us and the msg type is MEET.
1804          * In this stage we don't try to add the node with the right
1805          * flags, slaveof pointer, and so forth, as this details will be
1806          * resolved when we'll receive PONGs from the node. */
1807         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1808             clusterNode *node;
1809 
1810             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1811             nodeIp2String(node->ip,link,hdr->myip);
1812             node->port = ntohs(hdr->port);
1813             node->cport = ntohs(hdr->cport);
1814             clusterAddNode(node);
1815             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1816         }
1817 
1818         /* If this is a MEET packet from an unknown node, we still process
1819          * the gossip section here since we have to trust the sender because
1820          * of the message type. */
1821         if (!sender && type == CLUSTERMSG_TYPE_MEET)
1822             clusterProcessGossipSection(hdr,link);
1823 
1824         /* Anyway reply with a PONG */
1825         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1826     }
1827 
1828     /* PING, PONG, MEET: process config information. */
1829     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1830         type == CLUSTERMSG_TYPE_MEET)
1831     {
1832         serverLog(LL_DEBUG,"%s packet received: %p",
1833             type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1834             (void*)link->node);
1835         if (link->node) {
1836             if (nodeInHandshake(link->node)) {
1837                 /* If we already have this node, try to change the
1838                  * IP/port of the node with the new one. */
1839                 if (sender) {
1840                     serverLog(LL_VERBOSE,
1841                         "Handshake: we already know node %.40s, "
1842                         "updating the address if needed.", sender->name);
1843                     if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1844                     {
1845                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1846                                              CLUSTER_TODO_UPDATE_STATE);
1847                     }
1848                     /* Free this node as we already have it. This will
1849                      * cause the link to be freed as well. */
1850                     clusterDelNode(link->node);
1851                     return 0;
1852                 }
1853 
1854                 /* First thing to do is replacing the random name with the
1855                  * right node name if this was a handshake stage. */
1856                 clusterRenameNode(link->node, hdr->sender);
1857                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1858                     link->node->name);
1859                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1860                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1861                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1862             } else if (memcmp(link->node->name,hdr->sender,
1863                         CLUSTER_NAMELEN) != 0)
1864             {
1865                 /* If the reply has a non matching node ID we
1866                  * disconnect this node and set it as not having an associated
1867                  * address. */
1868                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1869                     link->node->name,
1870                     (int)(now-(link->node->ctime)),
1871                     link->node->flags);
1872                 link->node->flags |= CLUSTER_NODE_NOADDR;
1873                 link->node->ip[0] = '\0';
1874                 link->node->port = 0;
1875                 link->node->cport = 0;
1876                 freeClusterLink(link);
1877                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1878                 return 0;
1879             }
1880         }
1881 
1882         /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1883          * announced. This is a dynamic flag that we receive from the
1884          * sender, and the latest status must be trusted. We need it to
1885          * be propagated because the slave ranking used to understand the
1886          * delay of each slave in the voting process, needs to know
1887          * what are the instances really competing. */
1888         if (sender) {
1889             int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1890             sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1891             sender->flags |= nofailover;
1892         }
1893 
1894         /* Update the node address if it changed. */
1895         if (sender && type == CLUSTERMSG_TYPE_PING &&
1896             !nodeInHandshake(sender) &&
1897             nodeUpdateAddressIfNeeded(sender,link,hdr))
1898         {
1899             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1900                                  CLUSTER_TODO_UPDATE_STATE);
1901         }
1902 
1903         /* Update our info about the node */
1904         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1905             link->node->pong_received = now;
1906             link->node->ping_sent = 0;
1907 
1908             /* The PFAIL condition can be reversed without external
1909              * help if it is momentary (that is, if it does not
1910              * turn into a FAIL state).
1911              *
1912              * The FAIL condition is also reversible under specific
1913              * conditions detected by clearNodeFailureIfNeeded(). */
1914             if (nodeTimedOut(link->node)) {
1915                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1916                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1917                                      CLUSTER_TODO_UPDATE_STATE);
1918             } else if (nodeFailed(link->node)) {
1919                 clearNodeFailureIfNeeded(link->node);
1920             }
1921         }
1922 
1923         /* Check for role switch: slave -> master or master -> slave. */
1924         if (sender) {
1925             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1926                 sizeof(hdr->slaveof)))
1927             {
1928                 /* Node is a master. */
1929                 clusterSetNodeAsMaster(sender);
1930             } else {
1931                 /* Node is a slave. */
1932                 clusterNode *master = clusterLookupNode(hdr->slaveof);
1933 
1934                 if (nodeIsMaster(sender)) {
1935                     /* Master turned into a slave! Reconfigure the node. */
1936                     clusterDelNodeSlots(sender);
1937                     sender->flags &= ~(CLUSTER_NODE_MASTER|
1938                                        CLUSTER_NODE_MIGRATE_TO);
1939                     sender->flags |= CLUSTER_NODE_SLAVE;
1940 
1941                     /* Update config and state. */
1942                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1943                                          CLUSTER_TODO_UPDATE_STATE);
1944                 }
1945 
1946                 /* Master node changed for this slave? */
1947                 if (master && sender->slaveof != master) {
1948                     if (sender->slaveof)
1949                         clusterNodeRemoveSlave(sender->slaveof,sender);
1950                     clusterNodeAddSlave(master,sender);
1951                     sender->slaveof = master;
1952 
1953                     /* Update config. */
1954                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1955                 }
1956             }
1957         }
1958 
1959         /* Update our info about served slots.
1960          *
1961          * Note: this MUST happen after we update the master/slave state
1962          * so that CLUSTER_NODE_MASTER flag will be set. */
1963 
1964         /* Many checks are only needed if the set of served slots this
1965          * instance claims is different compared to the set of slots we have
1966          * for it. Check this ASAP to avoid other computational expansive
1967          * checks later. */
1968         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1969         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1970 
1971         if (sender) {
1972             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1973             if (sender_master) {
1974                 dirty_slots = memcmp(sender_master->slots,
1975                         hdr->myslots,sizeof(hdr->myslots)) != 0;
1976             }
1977         }
1978 
1979         /* 1) If the sender of the message is a master, and we detected that
1980          *    the set of slots it claims changed, scan the slots to see if we
1981          *    need to update our configuration. */
1982         if (sender && nodeIsMaster(sender) && dirty_slots)
1983             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1984 
1985         /* 2) We also check for the reverse condition, that is, the sender
1986          *    claims to serve slots we know are served by a master with a
1987          *    greater configEpoch. If this happens we inform the sender.
1988          *
1989          * This is useful because sometimes after a partition heals, a
1990          * reappearing master may be the last one to claim a given set of
1991          * hash slots, but with a configuration that other instances know to
1992          * be deprecated. Example:
1993          *
1994          * A and B are master and slave for slots 1,2,3.
1995          * A is partitioned away, B gets promoted.
1996          * B is partitioned away, and A returns available.
1997          *
1998          * Usually B would PING A publishing its set of served slots and its
1999          * configEpoch, but because of the partition B can't inform A of the
2000          * new configuration, so other nodes that have an updated table must
2001          * do it. In this way A will stop to act as a master (or can try to
2002          * failover if there are the conditions to win the election). */
2003         if (sender && dirty_slots) {
2004             int j;
2005 
2006             for (j = 0; j < CLUSTER_SLOTS; j++) {
2007                 if (bitmapTestBit(hdr->myslots,j)) {
2008                     if (server.cluster->slots[j] == sender ||
2009                         server.cluster->slots[j] == NULL) continue;
2010                     if (server.cluster->slots[j]->configEpoch >
2011                         senderConfigEpoch)
2012                     {
2013                         serverLog(LL_VERBOSE,
2014                             "Node %.40s has old slots configuration, sending "
2015                             "an UPDATE message about %.40s",
2016                                 sender->name, server.cluster->slots[j]->name);
2017                         clusterSendUpdate(sender->link,
2018                             server.cluster->slots[j]);
2019 
2020                         /* TODO: instead of exiting the loop send every other
2021                          * UPDATE packet for other nodes that are the new owner
2022                          * of sender's slots. */
2023                         break;
2024                     }
2025                 }
2026             }
2027         }
2028 
2029         /* If our config epoch collides with the sender's try to fix
2030          * the problem. */
2031         if (sender &&
2032             nodeIsMaster(myself) && nodeIsMaster(sender) &&
2033             senderConfigEpoch == myself->configEpoch)
2034         {
2035             clusterHandleConfigEpochCollision(sender);
2036         }
2037 
2038         /* Get info from the gossip section */
2039         if (sender) clusterProcessGossipSection(hdr,link);
2040     } else if (type == CLUSTERMSG_TYPE_FAIL) {
2041         clusterNode *failing;
2042 
2043         if (sender) {
2044             failing = clusterLookupNode(hdr->data.fail.about.nodename);
2045             if (failing &&
2046                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2047             {
2048                 serverLog(LL_NOTICE,
2049                     "FAIL message received from %.40s about %.40s",
2050                     hdr->sender, hdr->data.fail.about.nodename);
2051                 failing->flags |= CLUSTER_NODE_FAIL;
2052                 failing->fail_time = now;
2053                 failing->flags &= ~CLUSTER_NODE_PFAIL;
2054                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2055                                      CLUSTER_TODO_UPDATE_STATE);
2056             }
2057         } else {
2058             serverLog(LL_NOTICE,
2059                 "Ignoring FAIL message from unknown node %.40s about %.40s",
2060                 hdr->sender, hdr->data.fail.about.nodename);
2061         }
2062     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2063         robj *channel, *message;
2064         uint32_t channel_len, message_len;
2065 
2066         /* Don't bother creating useless objects if there are no
2067          * Pub/Sub subscribers. */
2068         if (dictSize(server.pubsub_channels) ||
2069            listLength(server.pubsub_patterns))
2070         {
2071             channel_len = ntohl(hdr->data.publish.msg.channel_len);
2072             message_len = ntohl(hdr->data.publish.msg.message_len);
2073             channel = createStringObject(
2074                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
2075             message = createStringObject(
2076                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
2077                         message_len);
2078             pubsubPublishMessage(channel,message);
2079             decrRefCount(channel);
2080             decrRefCount(message);
2081         }
2082     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2083         if (!sender) return 1;  /* We don't know that node. */
2084         clusterSendFailoverAuthIfNeeded(sender,hdr);
2085     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2086         if (!sender) return 1;  /* We don't know that node. */
2087         /* We consider this vote only if the sender is a master serving
2088          * a non zero number of slots, and its currentEpoch is greater or
2089          * equal to epoch where this node started the election. */
2090         if (nodeIsMaster(sender) && sender->numslots > 0 &&
2091             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2092         {
2093             server.cluster->failover_auth_count++;
2094             /* Maybe we reached a quorum here, set a flag to make sure
2095              * we check ASAP. */
2096             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2097         }
2098     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2099         /* This message is acceptable only if I'm a master and the sender
2100          * is one of my slaves. */
2101         if (!sender || sender->slaveof != myself) return 1;
2102         /* Manual failover requested from slaves. Initialize the state
2103          * accordingly. */
2104         resetManualFailover();
2105         server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2106         server.cluster->mf_slave = sender;
2107         pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
2108         serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2109             sender->name);
2110     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2111         clusterNode *n; /* The node the update is about. */
2112         uint64_t reportedConfigEpoch =
2113                     ntohu64(hdr->data.update.nodecfg.configEpoch);
2114 
2115         if (!sender) return 1;  /* We don't know the sender. */
2116         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2117         if (!n) return 1;   /* We don't know the reported node. */
2118         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2119 
2120         /* If in our current config the node is a slave, set it as a master. */
2121         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2122 
2123         /* Update the node's configEpoch. */
2124         n->configEpoch = reportedConfigEpoch;
2125         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2126                              CLUSTER_TODO_FSYNC_CONFIG);
2127 
2128         /* Check the bitmap of served slots and update our
2129          * config accordingly. */
2130         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2131             hdr->data.update.nodecfg.slots);
2132     } else if (type == CLUSTERMSG_TYPE_MODULE) {
2133         if (!sender) return 1;  /* Protect the module from unknown nodes. */
2134         /* We need to route this message back to the right module subscribed
2135          * for the right message type. */
2136         uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2137         uint32_t len = ntohl(hdr->data.module.msg.len);
2138         uint8_t type = hdr->data.module.msg.type;
2139         unsigned char *payload = hdr->data.module.msg.bulk_data;
2140         moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2141     } else {
2142         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2143     }
2144     return 1;
2145 }
2146 
2147 /* This function is called when we detect the link with this node is lost.
2148    We set the node as no longer connected. The Cluster Cron will detect
2149    this connection and will try to get it connected again.
2150 
2151    Instead if the node is a temporary node used to accept a query, we
2152    completely free the node on error. */
handleLinkIOError(clusterLink * link)2153 void handleLinkIOError(clusterLink *link) {
2154     freeClusterLink(link);
2155 }
2156 
2157 /* Send data. This is handled using a trivial send buffer that gets
2158  * consumed by write(). We don't try to optimize this for speed too much
2159  * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)2160 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2161     clusterLink *link = (clusterLink*) privdata;
2162     ssize_t nwritten;
2163     UNUSED(el);
2164     UNUSED(mask);
2165 
2166     nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2167     if (nwritten <= 0) {
2168         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2169             (nwritten == -1) ? strerror(errno) : "short write");
2170         handleLinkIOError(link);
2171         return;
2172     }
2173     sdsrange(link->sndbuf,nwritten,-1);
2174     if (sdslen(link->sndbuf) == 0)
2175         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2176 }
2177 
2178 /* Read data. Try to read the first field of the header first to check the
2179  * full length of the packet. When a whole packet is in memory this function
2180  * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2181 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2182     char buf[sizeof(clusterMsg)];
2183     ssize_t nread;
2184     clusterMsg *hdr;
2185     clusterLink *link = (clusterLink*) privdata;
2186     unsigned int readlen, rcvbuflen;
2187     UNUSED(el);
2188     UNUSED(mask);
2189 
2190     while(1) { /* Read as long as there is data to read. */
2191         rcvbuflen = sdslen(link->rcvbuf);
2192         if (rcvbuflen < 8) {
2193             /* First, obtain the first 8 bytes to get the full message
2194              * length. */
2195             readlen = 8 - rcvbuflen;
2196         } else {
2197             /* Finally read the full message. */
2198             hdr = (clusterMsg*) link->rcvbuf;
2199             if (rcvbuflen == 8) {
2200                 /* Perform some sanity check on the message signature
2201                  * and length. */
2202                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2203                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2204                 {
2205                     serverLog(LL_WARNING,
2206                         "Bad message length or signature received "
2207                         "from Cluster bus.");
2208                     handleLinkIOError(link);
2209                     return;
2210                 }
2211             }
2212             readlen = ntohl(hdr->totlen) - rcvbuflen;
2213             if (readlen > sizeof(buf)) readlen = sizeof(buf);
2214         }
2215 
2216         nread = read(fd,buf,readlen);
2217         if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2218 
2219         if (nread <= 0) {
2220             /* I/O error... */
2221             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2222                 (nread == 0) ? "connection closed" : strerror(errno));
2223             handleLinkIOError(link);
2224             return;
2225         } else {
2226             /* Read data and recast the pointer to the new buffer. */
2227             link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2228             hdr = (clusterMsg*) link->rcvbuf;
2229             rcvbuflen += nread;
2230         }
2231 
2232         /* Total length obtained? Process this packet. */
2233         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2234             if (clusterProcessPacket(link)) {
2235                 sdsfree(link->rcvbuf);
2236                 link->rcvbuf = sdsempty();
2237             } else {
2238                 return; /* Link no longer valid. */
2239             }
2240         }
2241     }
2242 }
2243 
2244 /* Put stuff into the send buffer.
2245  *
2246  * It is guaranteed that this function will never have as a side effect
2247  * the link to be invalidated, so it is safe to call this function
2248  * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2249 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2250     if (sdslen(link->sndbuf) == 0 && msglen != 0)
2251         aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
2252                     clusterWriteHandler,link);
2253 
2254     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2255 
2256     /* Populate sent messages stats. */
2257     clusterMsg *hdr = (clusterMsg*) msg;
2258     uint16_t type = ntohs(hdr->type);
2259     if (type < CLUSTERMSG_TYPE_COUNT)
2260         server.cluster->stats_bus_messages_sent[type]++;
2261 }
2262 
2263 /* Send a message to all the nodes that are part of the cluster having
2264  * a connected link.
2265  *
2266  * It is guaranteed that this function will never have as a side effect
2267  * some node->link to be invalidated, so it is safe to call this function
2268  * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2269 void clusterBroadcastMessage(void *buf, size_t len) {
2270     dictIterator *di;
2271     dictEntry *de;
2272 
2273     di = dictGetSafeIterator(server.cluster->nodes);
2274     while((de = dictNext(di)) != NULL) {
2275         clusterNode *node = dictGetVal(de);
2276 
2277         if (!node->link) continue;
2278         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2279             continue;
2280         clusterSendMessage(node->link,buf,len);
2281     }
2282     dictReleaseIterator(di);
2283 }
2284 
2285 /* Build the message header. hdr must point to a buffer at least
2286  * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2287 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2288     int totlen = 0;
2289     uint64_t offset;
2290     clusterNode *master;
2291 
2292     /* If this node is a master, we send its slots bitmap and configEpoch.
2293      * If this node is a slave we send the master's information instead (the
2294      * node is flagged as slave so the receiver knows that it is NOT really
2295      * in charge for this slots. */
2296     master = (nodeIsSlave(myself) && myself->slaveof) ?
2297               myself->slaveof : myself;
2298 
2299     memset(hdr,0,sizeof(*hdr));
2300     hdr->ver = htons(CLUSTER_PROTO_VER);
2301     hdr->sig[0] = 'R';
2302     hdr->sig[1] = 'C';
2303     hdr->sig[2] = 'm';
2304     hdr->sig[3] = 'b';
2305     hdr->type = htons(type);
2306     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2307 
2308     /* If cluster-announce-ip option is enabled, force the receivers of our
2309      * packets to use the specified address for this node. Otherwise if the
2310      * first byte is zero, they'll do auto discovery. */
2311     memset(hdr->myip,0,NET_IP_STR_LEN);
2312     if (server.cluster_announce_ip) {
2313         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2314         hdr->myip[NET_IP_STR_LEN-1] = '\0';
2315     }
2316 
2317     /* Handle cluster-announce-port as well. */
2318     int announced_port = server.cluster_announce_port ?
2319                          server.cluster_announce_port : server.port;
2320     int announced_cport = server.cluster_announce_bus_port ?
2321                           server.cluster_announce_bus_port :
2322                           (server.port + CLUSTER_PORT_INCR);
2323 
2324     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2325     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2326     if (myself->slaveof != NULL)
2327         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2328     hdr->port = htons(announced_port);
2329     hdr->cport = htons(announced_cport);
2330     hdr->flags = htons(myself->flags);
2331     hdr->state = server.cluster->state;
2332 
2333     /* Set the currentEpoch and configEpochs. */
2334     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2335     hdr->configEpoch = htonu64(master->configEpoch);
2336 
2337     /* Set the replication offset. */
2338     if (nodeIsSlave(myself))
2339         offset = replicationGetSlaveOffset();
2340     else
2341         offset = server.master_repl_offset;
2342     hdr->offset = htonu64(offset);
2343 
2344     /* Set the message flags. */
2345     if (nodeIsMaster(myself) && server.cluster->mf_end)
2346         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2347 
2348     /* Compute the message length for certain messages. For other messages
2349      * this is up to the caller. */
2350     if (type == CLUSTERMSG_TYPE_FAIL) {
2351         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2352         totlen += sizeof(clusterMsgDataFail);
2353     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2354         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2355         totlen += sizeof(clusterMsgDataUpdate);
2356     }
2357     hdr->totlen = htonl(totlen);
2358     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2359 }
2360 
2361 /* Return non zero if the node is already present in the gossip section of the
2362  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2363  * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2364 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2365     int j;
2366     for (j = 0; j < count; j++) {
2367         if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2368                 CLUSTER_NAMELEN) == 0) break;
2369     }
2370     return j != count;
2371 }
2372 
2373 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2374  * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2375 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2376     clusterMsgDataGossip *gossip;
2377     gossip = &(hdr->data.ping.gossip[i]);
2378     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2379     gossip->ping_sent = htonl(n->ping_sent/1000);
2380     gossip->pong_received = htonl(n->pong_received/1000);
2381     memcpy(gossip->ip,n->ip,sizeof(n->ip));
2382     gossip->port = htons(n->port);
2383     gossip->cport = htons(n->cport);
2384     gossip->flags = htons(n->flags);
2385     gossip->notused1 = 0;
2386 }
2387 
2388 /* Send a PING or PONG packet to the specified node, making sure to add enough
2389  * gossip informations. */
clusterSendPing(clusterLink * link,int type)2390 void clusterSendPing(clusterLink *link, int type) {
2391     unsigned char *buf;
2392     clusterMsg *hdr;
2393     int gossipcount = 0; /* Number of gossip sections added so far. */
2394     int wanted; /* Number of gossip sections we want to append if possible. */
2395     int totlen; /* Total packet length. */
2396     /* freshnodes is the max number of nodes we can hope to append at all:
2397      * nodes available minus two (ourself and the node we are sending the
2398      * message to). However practically there may be less valid nodes since
2399      * nodes in handshake state, disconnected, are not considered. */
2400     int freshnodes = dictSize(server.cluster->nodes)-2;
2401 
2402     /* How many gossip sections we want to add? 1/10 of the number of nodes
2403      * and anyway at least 3. Why 1/10?
2404      *
2405      * If we have N masters, with N/10 entries, and we consider that in
2406      * node_timeout we exchange with each other node at least 4 packets
2407      * (we ping in the worst case in node_timeout/2 time, and we also
2408      * receive two pings from the host), we have a total of 8 packets
2409      * in the node_timeout*2 falure reports validity time. So we have
2410      * that, for a single PFAIL node, we can expect to receive the following
2411      * number of failure reports (in the specified window of time):
2412      *
2413      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2414      *
2415      * PROB = probability of being featured in a single gossip entry,
2416      *        which is 1 / NUM_OF_NODES.
2417      * ENTRIES = 10.
2418      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2419      *
2420      * If we assume we have just masters (so num of nodes and num of masters
2421      * is the same), with 1/10 we always get over the majority, and specifically
2422      * 80% of the number of nodes, to account for many masters failing at the
2423      * same time.
2424      *
2425      * Since we have non-voting slaves that lower the probability of an entry
2426      * to feature our node, we set the number of entries per packet as
2427      * 10% of the total nodes we have. */
2428     wanted = floor(dictSize(server.cluster->nodes)/10);
2429     if (wanted < 3) wanted = 3;
2430     if (wanted > freshnodes) wanted = freshnodes;
2431 
2432     /* Include all the nodes in PFAIL state, so that failure reports are
2433      * faster to propagate to go from PFAIL to FAIL state. */
2434     int pfail_wanted = server.cluster->stats_pfail_nodes;
2435 
2436     /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2437      * later according to the number of gossip sections we really were able
2438      * to put inside the packet. */
2439     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2440     totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2441     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2442      * sizeof(clusterMsg) or more. */
2443     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2444     buf = zcalloc(totlen);
2445     hdr = (clusterMsg*) buf;
2446 
2447     /* Populate the header. */
2448     if (link->node && type == CLUSTERMSG_TYPE_PING)
2449         link->node->ping_sent = mstime();
2450     clusterBuildMessageHdr(hdr,type);
2451 
2452     /* Populate the gossip fields */
2453     int maxiterations = wanted*3;
2454     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2455         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2456         clusterNode *this = dictGetVal(de);
2457 
2458         /* Don't include this node: the whole packet header is about us
2459          * already, so we just gossip about other nodes. */
2460         if (this == myself) continue;
2461 
2462         /* PFAIL nodes will be added later. */
2463         if (this->flags & CLUSTER_NODE_PFAIL) continue;
2464 
2465         /* In the gossip section don't include:
2466          * 1) Nodes in HANDSHAKE state.
2467          * 3) Nodes with the NOADDR flag set.
2468          * 4) Disconnected nodes if they don't have configured slots.
2469          */
2470         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2471             (this->link == NULL && this->numslots == 0))
2472         {
2473             freshnodes--; /* Tecnically not correct, but saves CPU. */
2474             continue;
2475         }
2476 
2477         /* Do not add a node we already have. */
2478         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2479 
2480         /* Add it */
2481         clusterSetGossipEntry(hdr,gossipcount,this);
2482         freshnodes--;
2483         gossipcount++;
2484     }
2485 
2486     /* If there are PFAIL nodes, add them at the end. */
2487     if (pfail_wanted) {
2488         dictIterator *di;
2489         dictEntry *de;
2490 
2491         di = dictGetSafeIterator(server.cluster->nodes);
2492         while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2493             clusterNode *node = dictGetVal(de);
2494             if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2495             if (node->flags & CLUSTER_NODE_NOADDR) continue;
2496             if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2497             clusterSetGossipEntry(hdr,gossipcount,node);
2498             freshnodes--;
2499             gossipcount++;
2500             /* We take the count of the slots we allocated, since the
2501              * PFAIL stats may not match perfectly with the current number
2502              * of PFAIL nodes. */
2503             pfail_wanted--;
2504         }
2505         dictReleaseIterator(di);
2506     }
2507 
2508     /* Ready to send... fix the totlen fiend and queue the message in the
2509      * output buffer. */
2510     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2511     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2512     hdr->count = htons(gossipcount);
2513     hdr->totlen = htonl(totlen);
2514     clusterSendMessage(link,buf,totlen);
2515     zfree(buf);
2516 }
2517 
2518 /* Send a PONG packet to every connected node that's not in handshake state
2519  * and for which we have a valid link.
2520  *
2521  * In Redis Cluster pongs are not used just for failure detection, but also
2522  * to carry important configuration information. So broadcasting a pong is
2523  * useful when something changes in the configuration and we want to make
2524  * the cluster aware ASAP (for instance after a slave promotion).
2525  *
2526  * The 'target' argument specifies the receiving instances using the
2527  * defines below:
2528  *
2529  * CLUSTER_BROADCAST_ALL -> All known instances.
2530  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2531  */
2532 #define CLUSTER_BROADCAST_ALL 0
2533 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2534 void clusterBroadcastPong(int target) {
2535     dictIterator *di;
2536     dictEntry *de;
2537 
2538     di = dictGetSafeIterator(server.cluster->nodes);
2539     while((de = dictNext(di)) != NULL) {
2540         clusterNode *node = dictGetVal(de);
2541 
2542         if (!node->link) continue;
2543         if (node == myself || nodeInHandshake(node)) continue;
2544         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2545             int local_slave =
2546                 nodeIsSlave(node) && node->slaveof &&
2547                 (node->slaveof == myself || node->slaveof == myself->slaveof);
2548             if (!local_slave) continue;
2549         }
2550         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2551     }
2552     dictReleaseIterator(di);
2553 }
2554 
2555 /* Send a PUBLISH message.
2556  *
2557  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2558 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2559     unsigned char buf[sizeof(clusterMsg)], *payload;
2560     clusterMsg *hdr = (clusterMsg*) buf;
2561     uint32_t totlen;
2562     uint32_t channel_len, message_len;
2563 
2564     channel = getDecodedObject(channel);
2565     message = getDecodedObject(message);
2566     channel_len = sdslen(channel->ptr);
2567     message_len = sdslen(message->ptr);
2568 
2569     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2570     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2571     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2572 
2573     hdr->data.publish.msg.channel_len = htonl(channel_len);
2574     hdr->data.publish.msg.message_len = htonl(message_len);
2575     hdr->totlen = htonl(totlen);
2576 
2577     /* Try to use the local buffer if possible */
2578     if (totlen < sizeof(buf)) {
2579         payload = buf;
2580     } else {
2581         payload = zmalloc(totlen);
2582         memcpy(payload,hdr,sizeof(*hdr));
2583         hdr = (clusterMsg*) payload;
2584     }
2585     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2586     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2587         message->ptr,sdslen(message->ptr));
2588 
2589     if (link)
2590         clusterSendMessage(link,payload,totlen);
2591     else
2592         clusterBroadcastMessage(payload,totlen);
2593 
2594     decrRefCount(channel);
2595     decrRefCount(message);
2596     if (payload != buf) zfree(payload);
2597 }
2598 
2599 /* Send a FAIL message to all the nodes we are able to contact.
2600  * The FAIL message is sent when we detect that a node is failing
2601  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2602  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2603  * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2604 void clusterSendFail(char *nodename) {
2605     unsigned char buf[sizeof(clusterMsg)];
2606     clusterMsg *hdr = (clusterMsg*) buf;
2607 
2608     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2609     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2610     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2611 }
2612 
2613 /* Send an UPDATE message to the specified link carrying the specified 'node'
2614  * slots configuration. The node name, slots bitmap, and configEpoch info
2615  * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2616 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2617     unsigned char buf[sizeof(clusterMsg)];
2618     clusterMsg *hdr = (clusterMsg*) buf;
2619 
2620     if (link == NULL) return;
2621     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2622     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2623     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2624     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2625     clusterSendMessage(link,buf,ntohl(hdr->totlen));
2626 }
2627 
2628 /* Send a MODULE message.
2629  *
2630  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2631 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2632                        unsigned char *payload, uint32_t len) {
2633     unsigned char buf[sizeof(clusterMsg)], *heapbuf;
2634     clusterMsg *hdr = (clusterMsg*) buf;
2635     uint32_t totlen;
2636 
2637     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2638     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2639     totlen += sizeof(clusterMsgModule) - 3 + len;
2640 
2641     hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2642     hdr->data.module.msg.type = type;
2643     hdr->data.module.msg.len = htonl(len);
2644     hdr->totlen = htonl(totlen);
2645 
2646     /* Try to use the local buffer if possible */
2647     if (totlen < sizeof(buf)) {
2648         heapbuf = buf;
2649     } else {
2650         heapbuf = zmalloc(totlen);
2651         memcpy(heapbuf,hdr,sizeof(*hdr));
2652         hdr = (clusterMsg*) heapbuf;
2653     }
2654     memcpy(hdr->data.module.msg.bulk_data,payload,len);
2655 
2656     if (link)
2657         clusterSendMessage(link,heapbuf,totlen);
2658     else
2659         clusterBroadcastMessage(heapbuf,totlen);
2660 
2661     if (heapbuf != buf) zfree(heapbuf);
2662 }
2663 
2664 /* This function gets a cluster node ID string as target, the same way the nodes
2665  * addresses are represented in the modules side, resolves the node, and sends
2666  * the message. If the target is NULL the message is broadcasted.
2667  *
2668  * The function returns C_OK if the target is valid, otherwise C_ERR is
2669  * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2670 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2671     clusterNode *node = NULL;
2672 
2673     if (target != NULL) {
2674         node = clusterLookupNode(target);
2675         if (node == NULL || node->link == NULL) return C_ERR;
2676     }
2677 
2678     clusterSendModule(target ? node->link : NULL,
2679                       module_id, type, payload, len);
2680     return C_OK;
2681 }
2682 
2683 /* -----------------------------------------------------------------------------
2684  * CLUSTER Pub/Sub support
2685  *
2686  * For now we do very little, just propagating PUBLISH messages across the whole
2687  * cluster. In the future we'll try to get smarter and avoiding propagating those
2688  * messages to hosts without receives for a given channel.
2689  * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2690 void clusterPropagatePublish(robj *channel, robj *message) {
2691     clusterSendPublish(NULL, channel, message);
2692 }
2693 
2694 /* -----------------------------------------------------------------------------
2695  * SLAVE node specific functions
2696  * -------------------------------------------------------------------------- */
2697 
2698 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2699  * see if there is the quorum for this slave instance to failover its failing
2700  * master.
2701  *
2702  * Note that we send the failover request to everybody, master and slave nodes,
2703  * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2704 void clusterRequestFailoverAuth(void) {
2705     unsigned char buf[sizeof(clusterMsg)];
2706     clusterMsg *hdr = (clusterMsg*) buf;
2707     uint32_t totlen;
2708 
2709     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2710     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2711      * in the header to communicate the nodes receiving the message that
2712      * they should authorized the failover even if the master is working. */
2713     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2714     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2715     hdr->totlen = htonl(totlen);
2716     clusterBroadcastMessage(buf,totlen);
2717 }
2718 
2719 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2720 void clusterSendFailoverAuth(clusterNode *node) {
2721     unsigned char buf[sizeof(clusterMsg)];
2722     clusterMsg *hdr = (clusterMsg*) buf;
2723     uint32_t totlen;
2724 
2725     if (!node->link) return;
2726     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2727     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2728     hdr->totlen = htonl(totlen);
2729     clusterSendMessage(node->link,buf,totlen);
2730 }
2731 
2732 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2733 void clusterSendMFStart(clusterNode *node) {
2734     unsigned char buf[sizeof(clusterMsg)];
2735     clusterMsg *hdr = (clusterMsg*) buf;
2736     uint32_t totlen;
2737 
2738     if (!node->link) return;
2739     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2740     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2741     hdr->totlen = htonl(totlen);
2742     clusterSendMessage(node->link,buf,totlen);
2743 }
2744 
2745 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2746 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2747     clusterNode *master = node->slaveof;
2748     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2749     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2750     unsigned char *claimed_slots = request->myslots;
2751     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2752     int j;
2753 
2754     /* IF we are not a master serving at least 1 slot, we don't have the
2755      * right to vote, as the cluster size in Redis Cluster is the number
2756      * of masters serving at least one slot, and quorum is the cluster
2757      * size + 1 */
2758     if (nodeIsSlave(myself) || myself->numslots == 0) return;
2759 
2760     /* Request epoch must be >= our currentEpoch.
2761      * Note that it is impossible for it to actually be greater since
2762      * our currentEpoch was updated as a side effect of receiving this
2763      * request, if the request epoch was greater. */
2764     if (requestCurrentEpoch < server.cluster->currentEpoch) {
2765         serverLog(LL_WARNING,
2766             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2767             node->name,
2768             (unsigned long long) requestCurrentEpoch,
2769             (unsigned long long) server.cluster->currentEpoch);
2770         return;
2771     }
2772 
2773     /* I already voted for this epoch? Return ASAP. */
2774     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2775         serverLog(LL_WARNING,
2776                 "Failover auth denied to %.40s: already voted for epoch %llu",
2777                 node->name,
2778                 (unsigned long long) server.cluster->currentEpoch);
2779         return;
2780     }
2781 
2782     /* Node must be a slave and its master down.
2783      * The master can be non failing if the request is flagged
2784      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2785     if (nodeIsMaster(node) || master == NULL ||
2786         (!nodeFailed(master) && !force_ack))
2787     {
2788         if (nodeIsMaster(node)) {
2789             serverLog(LL_WARNING,
2790                     "Failover auth denied to %.40s: it is a master node",
2791                     node->name);
2792         } else if (master == NULL) {
2793             serverLog(LL_WARNING,
2794                     "Failover auth denied to %.40s: I don't know its master",
2795                     node->name);
2796         } else if (!nodeFailed(master)) {
2797             serverLog(LL_WARNING,
2798                     "Failover auth denied to %.40s: its master is up",
2799                     node->name);
2800         }
2801         return;
2802     }
2803 
2804     /* We did not voted for a slave about this master for two
2805      * times the node timeout. This is not strictly needed for correctness
2806      * of the algorithm but makes the base case more linear. */
2807     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2808     {
2809         serverLog(LL_WARNING,
2810                 "Failover auth denied to %.40s: "
2811                 "can't vote about this master before %lld milliseconds",
2812                 node->name,
2813                 (long long) ((server.cluster_node_timeout*2)-
2814                              (mstime() - node->slaveof->voted_time)));
2815         return;
2816     }
2817 
2818     /* The slave requesting the vote must have a configEpoch for the claimed
2819      * slots that is >= the one of the masters currently serving the same
2820      * slots in the current configuration. */
2821     for (j = 0; j < CLUSTER_SLOTS; j++) {
2822         if (bitmapTestBit(claimed_slots, j) == 0) continue;
2823         if (server.cluster->slots[j] == NULL ||
2824             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2825         {
2826             continue;
2827         }
2828         /* If we reached this point we found a slot that in our current slots
2829          * is served by a master with a greater configEpoch than the one claimed
2830          * by the slave requesting our vote. Refuse to vote for this slave. */
2831         serverLog(LL_WARNING,
2832                 "Failover auth denied to %.40s: "
2833                 "slot %d epoch (%llu) > reqEpoch (%llu)",
2834                 node->name, j,
2835                 (unsigned long long) server.cluster->slots[j]->configEpoch,
2836                 (unsigned long long) requestConfigEpoch);
2837         return;
2838     }
2839 
2840     /* We can vote for this slave. */
2841     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2842     node->slaveof->voted_time = mstime();
2843     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2844     clusterSendFailoverAuth(node);
2845     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2846         node->name, (unsigned long long) server.cluster->currentEpoch);
2847 }
2848 
2849 /* This function returns the "rank" of this instance, a slave, in the context
2850  * of its master-slaves ring. The rank of the slave is given by the number of
2851  * other slaves for the same master that have a better replication offset
2852  * compared to the local one (better means, greater, so they claim more data).
2853  *
2854  * A slave with rank 0 is the one with the greatest (most up to date)
2855  * replication offset, and so forth. Note that because how the rank is computed
2856  * multiple slaves may have the same rank, in case they have the same offset.
2857  *
2858  * The slave rank is used to add a delay to start an election in order to
2859  * get voted and replace a failing master. Slaves with better replication
2860  * offsets are more likely to win. */
clusterGetSlaveRank(void)2861 int clusterGetSlaveRank(void) {
2862     long long myoffset;
2863     int j, rank = 0;
2864     clusterNode *master;
2865 
2866     serverAssert(nodeIsSlave(myself));
2867     master = myself->slaveof;
2868     if (master == NULL) return 0; /* Never called by slaves without master. */
2869 
2870     myoffset = replicationGetSlaveOffset();
2871     for (j = 0; j < master->numslaves; j++)
2872         if (master->slaves[j] != myself &&
2873             !nodeCantFailover(master->slaves[j]) &&
2874             master->slaves[j]->repl_offset > myoffset) rank++;
2875     return rank;
2876 }
2877 
2878 /* This function is called by clusterHandleSlaveFailover() in order to
2879  * let the slave log why it is not able to failover. Sometimes there are
2880  * not the conditions, but since the failover function is called again and
2881  * again, we can't log the same things continuously.
2882  *
2883  * This function works by logging only if a given set of conditions are
2884  * true:
2885  *
2886  * 1) The reason for which the failover can't be initiated changed.
2887  *    The reasons also include a NONE reason we reset the state to
2888  *    when the slave finds that its master is fine (no FAIL flag).
2889  * 2) Also, the log is emitted again if the master is still down and
2890  *    the reason for not failing over is still the same, but more than
2891  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2892  * 3) Finally, the function only logs if the slave is down for more than
2893  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
2894  *    failover starts in a reasonable time.
2895  *
2896  * The function is called with the reason why the slave can't failover
2897  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2898  *
2899  * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2900 void clusterLogCantFailover(int reason) {
2901     char *msg;
2902     static time_t lastlog_time = 0;
2903     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2904 
2905     /* Don't log if we have the same reason for some time. */
2906     if (reason == server.cluster->cant_failover_reason &&
2907         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2908         return;
2909 
2910     server.cluster->cant_failover_reason = reason;
2911 
2912     /* We also don't emit any log if the master failed no long ago, the
2913      * goal of this function is to log slaves in a stalled condition for
2914      * a long time. */
2915     if (myself->slaveof &&
2916         nodeFailed(myself->slaveof) &&
2917         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2918 
2919     switch(reason) {
2920     case CLUSTER_CANT_FAILOVER_DATA_AGE:
2921         msg = "Disconnected from master for longer than allowed. "
2922               "Please check the 'cluster-replica-validity-factor' configuration "
2923               "option.";
2924         break;
2925     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2926         msg = "Waiting the delay before I can start a new failover.";
2927         break;
2928     case CLUSTER_CANT_FAILOVER_EXPIRED:
2929         msg = "Failover attempt expired.";
2930         break;
2931     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2932         msg = "Waiting for votes, but majority still not reached.";
2933         break;
2934     default:
2935         msg = "Unknown reason code.";
2936         break;
2937     }
2938     lastlog_time = time(NULL);
2939     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2940 }
2941 
2942 /* This function implements the final part of automatic and manual failovers,
2943  * where the slave grabs its master's hash slots, and propagates the new
2944  * configuration.
2945  *
2946  * Note that it's up to the caller to be sure that the node got a new
2947  * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2948 void clusterFailoverReplaceYourMaster(void) {
2949     int j;
2950     clusterNode *oldmaster = myself->slaveof;
2951 
2952     if (nodeIsMaster(myself) || oldmaster == NULL) return;
2953 
2954     /* 1) Turn this node into a master. */
2955     clusterSetNodeAsMaster(myself);
2956     replicationUnsetMaster();
2957 
2958     /* 2) Claim all the slots assigned to our master. */
2959     for (j = 0; j < CLUSTER_SLOTS; j++) {
2960         if (clusterNodeGetSlotBit(oldmaster,j)) {
2961             clusterDelSlot(j);
2962             clusterAddSlot(myself,j);
2963         }
2964     }
2965 
2966     /* 3) Update state and save config. */
2967     clusterUpdateState();
2968     clusterSaveConfigOrDie(1);
2969 
2970     /* 4) Pong all the other nodes so that they can update the state
2971      *    accordingly and detect that we switched to master role. */
2972     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2973 
2974     /* 5) If there was a manual failover in progress, clear the state. */
2975     resetManualFailover();
2976 }
2977 
2978 /* This function is called if we are a slave node and our master serving
2979  * a non-zero amount of hash slots is in FAIL state.
2980  *
2981  * The gaol of this function is:
2982  * 1) To check if we are able to perform a failover, is our data updated?
2983  * 2) Try to get elected by masters.
2984  * 3) Perform the failover informing all the other nodes.
2985  */
clusterHandleSlaveFailover(void)2986 void clusterHandleSlaveFailover(void) {
2987     mstime_t data_age;
2988     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2989     int needed_quorum = (server.cluster->size / 2) + 1;
2990     int manual_failover = server.cluster->mf_end != 0 &&
2991                           server.cluster->mf_can_start;
2992     mstime_t auth_timeout, auth_retry_time;
2993 
2994     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2995 
2996     /* Compute the failover timeout (the max time we have to send votes
2997      * and wait for replies), and the failover retry time (the time to wait
2998      * before trying to get voted again).
2999      *
3000      * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3001      * Retry is two times the Timeout.
3002      */
3003     auth_timeout = server.cluster_node_timeout*2;
3004     if (auth_timeout < 2000) auth_timeout = 2000;
3005     auth_retry_time = auth_timeout*2;
3006 
3007     /* Pre conditions to run the function, that must be met both in case
3008      * of an automatic or manual failover:
3009      * 1) We are a slave.
3010      * 2) Our master is flagged as FAIL, or this is a manual failover.
3011      * 3) We don't have the no failover configuration set, and this is
3012      *    not a manual failover.
3013      * 4) It is serving slots. */
3014     if (nodeIsMaster(myself) ||
3015         myself->slaveof == NULL ||
3016         (!nodeFailed(myself->slaveof) && !manual_failover) ||
3017         (server.cluster_slave_no_failover && !manual_failover) ||
3018         myself->slaveof->numslots == 0)
3019     {
3020         /* There are no reasons to failover, so we set the reason why we
3021          * are returning without failing over to NONE. */
3022         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3023         return;
3024     }
3025 
3026     /* Set data_age to the number of seconds we are disconnected from
3027      * the master. */
3028     if (server.repl_state == REPL_STATE_CONNECTED) {
3029         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3030                    * 1000;
3031     } else {
3032         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3033     }
3034 
3035     /* Remove the node timeout from the data age as it is fine that we are
3036      * disconnected from our master at least for the time it was down to be
3037      * flagged as FAIL, that's the baseline. */
3038     if (data_age > server.cluster_node_timeout)
3039         data_age -= server.cluster_node_timeout;
3040 
3041     /* Check if our data is recent enough according to the slave validity
3042      * factor configured by the user.
3043      *
3044      * Check bypassed for manual failovers. */
3045     if (server.cluster_slave_validity_factor &&
3046         data_age >
3047         (((mstime_t)server.repl_ping_slave_period * 1000) +
3048          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3049     {
3050         if (!manual_failover) {
3051             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3052             return;
3053         }
3054     }
3055 
3056     /* If the previous failover attempt timedout and the retry time has
3057      * elapsed, we can setup a new one. */
3058     if (auth_age > auth_retry_time) {
3059         server.cluster->failover_auth_time = mstime() +
3060             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3061             random() % 500; /* Random delay between 0 and 500 milliseconds. */
3062         server.cluster->failover_auth_count = 0;
3063         server.cluster->failover_auth_sent = 0;
3064         server.cluster->failover_auth_rank = clusterGetSlaveRank();
3065         /* We add another delay that is proportional to the slave rank.
3066          * Specifically 1 second * rank. This way slaves that have a probably
3067          * less updated replication offset, are penalized. */
3068         server.cluster->failover_auth_time +=
3069             server.cluster->failover_auth_rank * 1000;
3070         /* However if this is a manual failover, no delay is needed. */
3071         if (server.cluster->mf_end) {
3072             server.cluster->failover_auth_time = mstime();
3073             server.cluster->failover_auth_rank = 0;
3074 	    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3075         }
3076         serverLog(LL_WARNING,
3077             "Start of election delayed for %lld milliseconds "
3078             "(rank #%d, offset %lld).",
3079             server.cluster->failover_auth_time - mstime(),
3080             server.cluster->failover_auth_rank,
3081             replicationGetSlaveOffset());
3082         /* Now that we have a scheduled election, broadcast our offset
3083          * to all the other slaves so that they'll updated their offsets
3084          * if our offset is better. */
3085         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3086         return;
3087     }
3088 
3089     /* It is possible that we received more updated offsets from other
3090      * slaves for the same master since we computed our election delay.
3091      * Update the delay if our rank changed.
3092      *
3093      * Not performed if this is a manual failover. */
3094     if (server.cluster->failover_auth_sent == 0 &&
3095         server.cluster->mf_end == 0)
3096     {
3097         int newrank = clusterGetSlaveRank();
3098         if (newrank > server.cluster->failover_auth_rank) {
3099             long long added_delay =
3100                 (newrank - server.cluster->failover_auth_rank) * 1000;
3101             server.cluster->failover_auth_time += added_delay;
3102             server.cluster->failover_auth_rank = newrank;
3103             serverLog(LL_WARNING,
3104                 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3105                 newrank, added_delay);
3106         }
3107     }
3108 
3109     /* Return ASAP if we can't still start the election. */
3110     if (mstime() < server.cluster->failover_auth_time) {
3111         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3112         return;
3113     }
3114 
3115     /* Return ASAP if the election is too old to be valid. */
3116     if (auth_age > auth_timeout) {
3117         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3118         return;
3119     }
3120 
3121     /* Ask for votes if needed. */
3122     if (server.cluster->failover_auth_sent == 0) {
3123         server.cluster->currentEpoch++;
3124         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3125         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3126             (unsigned long long) server.cluster->currentEpoch);
3127         clusterRequestFailoverAuth();
3128         server.cluster->failover_auth_sent = 1;
3129         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3130                              CLUSTER_TODO_UPDATE_STATE|
3131                              CLUSTER_TODO_FSYNC_CONFIG);
3132         return; /* Wait for replies. */
3133     }
3134 
3135     /* Check if we reached the quorum. */
3136     if (server.cluster->failover_auth_count >= needed_quorum) {
3137         /* We have the quorum, we can finally failover the master. */
3138 
3139         serverLog(LL_WARNING,
3140             "Failover election won: I'm the new master.");
3141 
3142         /* Update my configEpoch to the epoch of the election. */
3143         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3144             myself->configEpoch = server.cluster->failover_auth_epoch;
3145             serverLog(LL_WARNING,
3146                 "configEpoch set to %llu after successful failover",
3147                 (unsigned long long) myself->configEpoch);
3148         }
3149 
3150         /* Take responsibility for the cluster slots. */
3151         clusterFailoverReplaceYourMaster();
3152     } else {
3153         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3154     }
3155 }
3156 
3157 /* -----------------------------------------------------------------------------
3158  * CLUSTER slave migration
3159  *
3160  * Slave migration is the process that allows a slave of a master that is
3161  * already covered by at least another slave, to "migrate" to a master that
3162  * is orpaned, that is, left with no working slaves.
3163  * ------------------------------------------------------------------------- */
3164 
3165 /* This function is responsible to decide if this replica should be migrated
3166  * to a different (orphaned) master. It is called by the clusterCron() function
3167  * only if:
3168  *
3169  * 1) We are a slave node.
3170  * 2) It was detected that there is at least one orphaned master in
3171  *    the cluster.
3172  * 3) We are a slave of one of the masters with the greatest number of
3173  *    slaves.
3174  *
3175  * This checks are performed by the caller since it requires to iterate
3176  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3177  * if definitely needed.
3178  *
3179  * The fuction is called with a pre-computed max_slaves, that is the max
3180  * number of working (not in FAIL state) slaves for a single master.
3181  *
3182  * Additional conditions for migration are examined inside the function.
3183  */
clusterHandleSlaveMigration(int max_slaves)3184 void clusterHandleSlaveMigration(int max_slaves) {
3185     int j, okslaves = 0;
3186     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3187     dictIterator *di;
3188     dictEntry *de;
3189 
3190     /* Step 1: Don't migrate if the cluster state is not ok. */
3191     if (server.cluster->state != CLUSTER_OK) return;
3192 
3193     /* Step 2: Don't migrate if my master will not be left with at least
3194      *         'migration-barrier' slaves after my migration. */
3195     if (mymaster == NULL) return;
3196     for (j = 0; j < mymaster->numslaves; j++)
3197         if (!nodeFailed(mymaster->slaves[j]) &&
3198             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3199     if (okslaves <= server.cluster_migration_barrier) return;
3200 
3201     /* Step 3: Identify a candidate for migration, and check if among the
3202      * masters with the greatest number of ok slaves, I'm the one with the
3203      * smallest node ID (the "candidate slave").
3204      *
3205      * Note: this means that eventually a replica migration will occur
3206      * since slaves that are reachable again always have their FAIL flag
3207      * cleared, so eventually there must be a candidate. At the same time
3208      * this does not mean that there are no race conditions possible (two
3209      * slaves migrating at the same time), but this is unlikely to
3210      * happen, and harmless when happens. */
3211     candidate = myself;
3212     di = dictGetSafeIterator(server.cluster->nodes);
3213     while((de = dictNext(di)) != NULL) {
3214         clusterNode *node = dictGetVal(de);
3215         int okslaves = 0, is_orphaned = 1;
3216 
3217         /* We want to migrate only if this master is working, orphaned, and
3218          * used to have slaves or if failed over a master that had slaves
3219          * (MIGRATE_TO flag). This way we only migrate to instances that were
3220          * supposed to have replicas. */
3221         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3222         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3223 
3224         /* Check number of working slaves. */
3225         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3226         if (okslaves > 0) is_orphaned = 0;
3227 
3228         if (is_orphaned) {
3229             if (!target && node->numslots > 0) target = node;
3230 
3231             /* Track the starting time of the orphaned condition for this
3232              * master. */
3233             if (!node->orphaned_time) node->orphaned_time = mstime();
3234         } else {
3235             node->orphaned_time = 0;
3236         }
3237 
3238         /* Check if I'm the slave candidate for the migration: attached
3239          * to a master with the maximum number of slaves and with the smallest
3240          * node ID. */
3241         if (okslaves == max_slaves) {
3242             for (j = 0; j < node->numslaves; j++) {
3243                 if (memcmp(node->slaves[j]->name,
3244                            candidate->name,
3245                            CLUSTER_NAMELEN) < 0)
3246                 {
3247                     candidate = node->slaves[j];
3248                 }
3249             }
3250         }
3251     }
3252     dictReleaseIterator(di);
3253 
3254     /* Step 4: perform the migration if there is a target, and if I'm the
3255      * candidate, but only if the master is continuously orphaned for a
3256      * couple of seconds, so that during failovers, we give some time to
3257      * the natural slaves of this instance to advertise their switch from
3258      * the old master to the new one. */
3259     if (target && candidate == myself &&
3260         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3261        !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3262     {
3263         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3264             target->name);
3265         clusterSetMaster(target);
3266     }
3267 }
3268 
3269 /* -----------------------------------------------------------------------------
3270  * CLUSTER manual failover
3271  *
3272  * This are the important steps performed by slaves during a manual failover:
3273  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3274  *    setting mf_end to the millisecond unix time at which we'll abort the
3275  *    attempt.
3276  * 2) Slave sends a MFSTART message to the master requesting to pause clients
3277  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3278  *    When master is paused for manual failover, it also starts to flag
3279  *    packets with CLUSTERMSG_FLAG0_PAUSED.
3280  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3281  * 4) If slave received the offset from the master, and its offset matches,
3282  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3283  *    the failover as usually, with the difference that the vote request
3284  *    will be modified to force masters to vote for a slave that has a
3285  *    working master.
3286  *
3287  * From the point of view of the master things are simpler: when a
3288  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3289  * the sender in mf_slave. During the time limit for the manual failover
3290  * the master will just send PINGs more often to this slave, flagged with
3291  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3292  * a packet from the master with this flag set.
3293  *
3294  * The gaol of the manual failover is to perform a fast failover without
3295  * data loss due to the asynchronous master-slave replication.
3296  * -------------------------------------------------------------------------- */
3297 
3298 /* Reset the manual failover state. This works for both masters and slavesa
3299  * as all the state about manual failover is cleared.
3300  *
3301  * The function can be used both to initialize the manual failover state at
3302  * startup or to abort a manual failover in progress. */
resetManualFailover(void)3303 void resetManualFailover(void) {
3304     if (server.cluster->mf_end && clientsArePaused()) {
3305         server.clients_pause_end_time = 0;
3306         clientsArePaused(); /* Just use the side effect of the function. */
3307     }
3308     server.cluster->mf_end = 0; /* No manual failover in progress. */
3309     server.cluster->mf_can_start = 0;
3310     server.cluster->mf_slave = NULL;
3311     server.cluster->mf_master_offset = 0;
3312 }
3313 
3314 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3315 void manualFailoverCheckTimeout(void) {
3316     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3317         serverLog(LL_WARNING,"Manual failover timed out.");
3318         resetManualFailover();
3319     }
3320 }
3321 
3322 /* This function is called from the cluster cron function in order to go
3323  * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3324 void clusterHandleManualFailover(void) {
3325     /* Return ASAP if no manual failover is in progress. */
3326     if (server.cluster->mf_end == 0) return;
3327 
3328     /* If mf_can_start is non-zero, the failover was already triggered so the
3329      * next steps are performed by clusterHandleSlaveFailover(). */
3330     if (server.cluster->mf_can_start) return;
3331 
3332     if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3333 
3334     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3335         /* Our replication offset matches the master replication offset
3336          * announced after clients were paused. We can start the failover. */
3337         server.cluster->mf_can_start = 1;
3338         serverLog(LL_WARNING,
3339             "All master replication stream processed, "
3340             "manual failover can start.");
3341     }
3342 }
3343 
3344 /* -----------------------------------------------------------------------------
3345  * CLUSTER cron job
3346  * -------------------------------------------------------------------------- */
3347 
3348 /* This is executed 10 times every second */
clusterCron(void)3349 void clusterCron(void) {
3350     dictIterator *di;
3351     dictEntry *de;
3352     int update_state = 0;
3353     int orphaned_masters; /* How many masters there are without ok slaves. */
3354     int max_slaves; /* Max number of ok slaves for a single master. */
3355     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3356     mstime_t min_pong = 0, now = mstime();
3357     clusterNode *min_pong_node = NULL;
3358     static unsigned long long iteration = 0;
3359     mstime_t handshake_timeout;
3360 
3361     iteration++; /* Number of times this function was called so far. */
3362 
3363     /* We want to take myself->ip in sync with the cluster-announce-ip option.
3364      * The option can be set at runtime via CONFIG SET, so we periodically check
3365      * if the option changed to reflect this into myself->ip. */
3366     {
3367         static char *prev_ip = NULL;
3368         char *curr_ip = server.cluster_announce_ip;
3369         int changed = 0;
3370 
3371         if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3372         else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3373         else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3374 
3375         if (changed) {
3376             if (prev_ip) zfree(prev_ip);
3377             prev_ip = curr_ip;
3378 
3379             if (curr_ip) {
3380                 /* We always take a copy of the previous IP address, by
3381                  * duplicating the string. This way later we can check if
3382                  * the address really changed. */
3383                 prev_ip = zstrdup(prev_ip);
3384                 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3385                 myself->ip[NET_IP_STR_LEN-1] = '\0';
3386             } else {
3387                 myself->ip[0] = '\0'; /* Force autodetection. */
3388             }
3389         }
3390     }
3391 
3392     /* The handshake timeout is the time after which a handshake node that was
3393      * not turned into a normal node is removed from the nodes. Usually it is
3394      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3395      * the value of 1 second. */
3396     handshake_timeout = server.cluster_node_timeout;
3397     if (handshake_timeout < 1000) handshake_timeout = 1000;
3398 
3399     /* Update myself flags. */
3400     clusterUpdateMyselfFlags();
3401 
3402     /* Check if we have disconnected nodes and re-establish the connection.
3403      * Also update a few stats while we are here, that can be used to make
3404      * better decisions in other part of the code. */
3405     di = dictGetSafeIterator(server.cluster->nodes);
3406     server.cluster->stats_pfail_nodes = 0;
3407     while((de = dictNext(di)) != NULL) {
3408         clusterNode *node = dictGetVal(de);
3409 
3410         /* Not interested in reconnecting the link with myself or nodes
3411          * for which we have no address. */
3412         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3413 
3414         if (node->flags & CLUSTER_NODE_PFAIL)
3415             server.cluster->stats_pfail_nodes++;
3416 
3417         /* A Node in HANDSHAKE state has a limited lifespan equal to the
3418          * configured node timeout. */
3419         if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3420             clusterDelNode(node);
3421             continue;
3422         }
3423 
3424         if (node->link == NULL) {
3425             int fd;
3426             mstime_t old_ping_sent;
3427             clusterLink *link;
3428 
3429             fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3430                 node->cport, NET_FIRST_BIND_ADDR);
3431             if (fd == -1) {
3432                 /* We got a synchronous error from connect before
3433                  * clusterSendPing() had a chance to be called.
3434                  * If node->ping_sent is zero, failure detection can't work,
3435                  * so we claim we actually sent a ping now (that will
3436                  * be really sent as soon as the link is obtained). */
3437                 if (node->ping_sent == 0) node->ping_sent = mstime();
3438                 serverLog(LL_DEBUG, "Unable to connect to "
3439                     "Cluster Node [%s]:%d -> %s", node->ip,
3440                     node->cport, server.neterr);
3441                 continue;
3442             }
3443             link = createClusterLink(node);
3444             link->fd = fd;
3445             node->link = link;
3446             aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3447                     clusterReadHandler,link);
3448             /* Queue a PING in the new connection ASAP: this is crucial
3449              * to avoid false positives in failure detection.
3450              *
3451              * If the node is flagged as MEET, we send a MEET message instead
3452              * of a PING one, to force the receiver to add us in its node
3453              * table. */
3454             old_ping_sent = node->ping_sent;
3455             clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3456                     CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3457             if (old_ping_sent) {
3458                 /* If there was an active ping before the link was
3459                  * disconnected, we want to restore the ping time, otherwise
3460                  * replaced by the clusterSendPing() call. */
3461                 node->ping_sent = old_ping_sent;
3462             }
3463             /* We can clear the flag after the first packet is sent.
3464              * If we'll never receive a PONG, we'll never send new packets
3465              * to this node. Instead after the PONG is received and we
3466              * are no longer in meet/handshake status, we want to send
3467              * normal PING packets. */
3468             node->flags &= ~CLUSTER_NODE_MEET;
3469 
3470             serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3471                     node->name, node->ip, node->cport);
3472         }
3473     }
3474     dictReleaseIterator(di);
3475 
3476     /* Ping some random node 1 time every 10 iterations, so that we usually ping
3477      * one random node every second. */
3478     if (!(iteration % 10)) {
3479         int j;
3480 
3481         /* Check a few random nodes and ping the one with the oldest
3482          * pong_received time. */
3483         for (j = 0; j < 5; j++) {
3484             de = dictGetRandomKey(server.cluster->nodes);
3485             clusterNode *this = dictGetVal(de);
3486 
3487             /* Don't ping nodes disconnected or with a ping currently active. */
3488             if (this->link == NULL || this->ping_sent != 0) continue;
3489             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3490                 continue;
3491             if (min_pong_node == NULL || min_pong > this->pong_received) {
3492                 min_pong_node = this;
3493                 min_pong = this->pong_received;
3494             }
3495         }
3496         if (min_pong_node) {
3497             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3498             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3499         }
3500     }
3501 
3502     /* Iterate nodes to check if we need to flag something as failing.
3503      * This loop is also responsible to:
3504      * 1) Check if there are orphaned masters (masters without non failing
3505      *    slaves).
3506      * 2) Count the max number of non failing slaves for a single master.
3507      * 3) Count the number of slaves for our master, if we are a slave. */
3508     orphaned_masters = 0;
3509     max_slaves = 0;
3510     this_slaves = 0;
3511     di = dictGetSafeIterator(server.cluster->nodes);
3512     while((de = dictNext(di)) != NULL) {
3513         clusterNode *node = dictGetVal(de);
3514         now = mstime(); /* Use an updated time at every iteration. */
3515 
3516         if (node->flags &
3517             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3518                 continue;
3519 
3520         /* Orphaned master check, useful only if the current instance
3521          * is a slave that may migrate to another master. */
3522         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3523             int okslaves = clusterCountNonFailingSlaves(node);
3524 
3525             /* A master is orphaned if it is serving a non-zero number of
3526              * slots, have no working slaves, but used to have at least one
3527              * slave, or failed over a master that used to have slaves. */
3528             if (okslaves == 0 && node->numslots > 0 &&
3529                 node->flags & CLUSTER_NODE_MIGRATE_TO)
3530             {
3531                 orphaned_masters++;
3532             }
3533             if (okslaves > max_slaves) max_slaves = okslaves;
3534             if (nodeIsSlave(myself) && myself->slaveof == node)
3535                 this_slaves = okslaves;
3536         }
3537 
3538         /* If we are not receiving any data for more than half the cluster
3539          * timeout, reconnect the link: maybe there is a connection
3540          * issue even if the node is alive. */
3541         if (node->link && /* is connected */
3542             now - node->link->ctime >
3543             server.cluster_node_timeout && /* was not already reconnected */
3544             node->ping_sent && /* we already sent a ping */
3545             node->pong_received < node->ping_sent && /* still waiting pong */
3546             /* and we are waiting for the pong more than timeout/2 */
3547             now - node->ping_sent > server.cluster_node_timeout/2 &&
3548             /* and in such interval we are not seeing any traffic at all. */
3549             now - node->data_received > server.cluster_node_timeout/2)
3550         {
3551             /* Disconnect the link, it will be reconnected automatically. */
3552             freeClusterLink(node->link);
3553         }
3554 
3555         /* If we have currently no active ping in this instance, and the
3556          * received PONG is older than half the cluster timeout, send
3557          * a new ping now, to ensure all the nodes are pinged without
3558          * a too big delay. */
3559         if (node->link &&
3560             node->ping_sent == 0 &&
3561             (now - node->pong_received) > server.cluster_node_timeout/2)
3562         {
3563             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3564             continue;
3565         }
3566 
3567         /* If we are a master and one of the slaves requested a manual
3568          * failover, ping it continuously. */
3569         if (server.cluster->mf_end &&
3570             nodeIsMaster(myself) &&
3571             server.cluster->mf_slave == node &&
3572             node->link)
3573         {
3574             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3575             continue;
3576         }
3577 
3578         /* Check only if we have an active ping for this instance. */
3579         if (node->ping_sent == 0) continue;
3580 
3581         /* Compute the delay of the PONG. Note that if we already received
3582          * the PONG, then node->ping_sent is zero, so can't reach this
3583          * code at all. */
3584         mstime_t delay = now - node->ping_sent;
3585 
3586         /* We consider every incoming data as proof of liveness, since
3587          * our cluster bus link is also used for data: under heavy data
3588          * load pong delays are possible. */
3589         mstime_t data_delay = now - node->data_received;
3590         if (data_delay < delay) delay = data_delay;
3591 
3592         if (delay > server.cluster_node_timeout) {
3593             /* Timeout reached. Set the node as possibly failing if it is
3594              * not already in this state. */
3595             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3596                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3597                     node->name);
3598                 node->flags |= CLUSTER_NODE_PFAIL;
3599                 update_state = 1;
3600             }
3601         }
3602     }
3603     dictReleaseIterator(di);
3604 
3605     /* If we are a slave node but the replication is still turned off,
3606      * enable it if we know the address of our master and it appears to
3607      * be up. */
3608     if (nodeIsSlave(myself) &&
3609         server.masterhost == NULL &&
3610         myself->slaveof &&
3611         nodeHasAddr(myself->slaveof))
3612     {
3613         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3614     }
3615 
3616     /* Abourt a manual failover if the timeout is reached. */
3617     manualFailoverCheckTimeout();
3618 
3619     if (nodeIsSlave(myself)) {
3620         clusterHandleManualFailover();
3621         if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3622             clusterHandleSlaveFailover();
3623         /* If there are orphaned slaves, and we are a slave among the masters
3624          * with the max number of non-failing slaves, consider migrating to
3625          * the orphaned masters. Note that it does not make sense to try
3626          * a migration if there is no master with at least *two* working
3627          * slaves. */
3628         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3629             clusterHandleSlaveMigration(max_slaves);
3630     }
3631 
3632     if (update_state || server.cluster->state == CLUSTER_FAIL)
3633         clusterUpdateState();
3634 }
3635 
3636 /* This function is called before the event handler returns to sleep for
3637  * events. It is useful to perform operations that must be done ASAP in
3638  * reaction to events fired but that are not safe to perform inside event
3639  * handlers, or to perform potentially expansive tasks that we need to do
3640  * a single time before replying to clients. */
clusterBeforeSleep(void)3641 void clusterBeforeSleep(void) {
3642     /* Handle failover, this is needed when it is likely that there is already
3643      * the quorum from masters in order to react fast. */
3644     if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3645         clusterHandleSlaveFailover();
3646 
3647     /* Update the cluster state. */
3648     if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3649         clusterUpdateState();
3650 
3651     /* Save the config, possibly using fsync. */
3652     if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3653         int fsync = server.cluster->todo_before_sleep &
3654                     CLUSTER_TODO_FSYNC_CONFIG;
3655         clusterSaveConfigOrDie(fsync);
3656     }
3657 
3658     /* Reset our flags (not strictly needed since every single function
3659      * called for flags set should be able to clear its flag). */
3660     server.cluster->todo_before_sleep = 0;
3661 }
3662 
clusterDoBeforeSleep(int flags)3663 void clusterDoBeforeSleep(int flags) {
3664     server.cluster->todo_before_sleep |= flags;
3665 }
3666 
3667 /* -----------------------------------------------------------------------------
3668  * Slots management
3669  * -------------------------------------------------------------------------- */
3670 
3671 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3672  * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3673 int bitmapTestBit(unsigned char *bitmap, int pos) {
3674     off_t byte = pos/8;
3675     int bit = pos&7;
3676     return (bitmap[byte] & (1<<bit)) != 0;
3677 }
3678 
3679 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3680 void bitmapSetBit(unsigned char *bitmap, int pos) {
3681     off_t byte = pos/8;
3682     int bit = pos&7;
3683     bitmap[byte] |= 1<<bit;
3684 }
3685 
3686 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3687 void bitmapClearBit(unsigned char *bitmap, int pos) {
3688     off_t byte = pos/8;
3689     int bit = pos&7;
3690     bitmap[byte] &= ~(1<<bit);
3691 }
3692 
3693 /* Return non-zero if there is at least one master with slaves in the cluster.
3694  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3695  * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3696 int clusterMastersHaveSlaves(void) {
3697     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3698     dictEntry *de;
3699     int slaves = 0;
3700     while((de = dictNext(di)) != NULL) {
3701         clusterNode *node = dictGetVal(de);
3702 
3703         if (nodeIsSlave(node)) continue;
3704         slaves += node->numslaves;
3705     }
3706     dictReleaseIterator(di);
3707     return slaves != 0;
3708 }
3709 
3710 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3711 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3712     int old = bitmapTestBit(n->slots,slot);
3713     bitmapSetBit(n->slots,slot);
3714     if (!old) {
3715         n->numslots++;
3716         /* When a master gets its first slot, even if it has no slaves,
3717          * it gets flagged with MIGRATE_TO, that is, the master is a valid
3718          * target for replicas migration, if and only if at least one of
3719          * the other masters has slaves right now.
3720          *
3721          * Normally masters are valid targerts of replica migration if:
3722          * 1. The used to have slaves (but no longer have).
3723          * 2. They are slaves failing over a master that used to have slaves.
3724          *
3725          * However new masters with slots assigned are considered valid
3726          * migration tagets if the rest of the cluster is not a slave-less.
3727          *
3728          * See https://github.com/antirez/redis/issues/3043 for more info. */
3729         if (n->numslots == 1 && clusterMastersHaveSlaves())
3730             n->flags |= CLUSTER_NODE_MIGRATE_TO;
3731     }
3732     return old;
3733 }
3734 
3735 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3736 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3737     int old = bitmapTestBit(n->slots,slot);
3738     bitmapClearBit(n->slots,slot);
3739     if (old) n->numslots--;
3740     return old;
3741 }
3742 
3743 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3744 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3745     return bitmapTestBit(n->slots,slot);
3746 }
3747 
3748 /* Add the specified slot to the list of slots that node 'n' will
3749  * serve. Return C_OK if the operation ended with success.
3750  * If the slot is already assigned to another instance this is considered
3751  * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3752 int clusterAddSlot(clusterNode *n, int slot) {
3753     if (server.cluster->slots[slot]) return C_ERR;
3754     clusterNodeSetSlotBit(n,slot);
3755     server.cluster->slots[slot] = n;
3756     return C_OK;
3757 }
3758 
3759 /* Delete the specified slot marking it as unassigned.
3760  * Returns C_OK if the slot was assigned, otherwise if the slot was
3761  * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3762 int clusterDelSlot(int slot) {
3763     clusterNode *n = server.cluster->slots[slot];
3764 
3765     if (!n) return C_ERR;
3766     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3767     server.cluster->slots[slot] = NULL;
3768     return C_OK;
3769 }
3770 
3771 /* Delete all the slots associated with the specified node.
3772  * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3773 int clusterDelNodeSlots(clusterNode *node) {
3774     int deleted = 0, j;
3775 
3776     for (j = 0; j < CLUSTER_SLOTS; j++) {
3777         if (clusterNodeGetSlotBit(node,j)) {
3778             clusterDelSlot(j);
3779             deleted++;
3780         }
3781     }
3782     return deleted;
3783 }
3784 
3785 /* Clear the migrating / importing state for all the slots.
3786  * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3787 void clusterCloseAllSlots(void) {
3788     memset(server.cluster->migrating_slots_to,0,
3789         sizeof(server.cluster->migrating_slots_to));
3790     memset(server.cluster->importing_slots_from,0,
3791         sizeof(server.cluster->importing_slots_from));
3792 }
3793 
3794 /* -----------------------------------------------------------------------------
3795  * Cluster state evaluation function
3796  * -------------------------------------------------------------------------- */
3797 
3798 /* The following are defines that are only used in the evaluation function
3799  * and are based on heuristics. Actually the main point about the rejoin and
3800  * writable delay is that they should be a few orders of magnitude larger
3801  * than the network latency. */
3802 #define CLUSTER_MAX_REJOIN_DELAY 5000
3803 #define CLUSTER_MIN_REJOIN_DELAY 500
3804 #define CLUSTER_WRITABLE_DELAY 2000
3805 
clusterUpdateState(void)3806 void clusterUpdateState(void) {
3807     int j, new_state;
3808     int reachable_masters = 0;
3809     static mstime_t among_minority_time;
3810     static mstime_t first_call_time = 0;
3811 
3812     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3813 
3814     /* If this is a master node, wait some time before turning the state
3815      * into OK, since it is not a good idea to rejoin the cluster as a writable
3816      * master, after a reboot, without giving the cluster a chance to
3817      * reconfigure this node. Note that the delay is calculated starting from
3818      * the first call to this function and not since the server start, in order
3819      * to don't count the DB loading time. */
3820     if (first_call_time == 0) first_call_time = mstime();
3821     if (nodeIsMaster(myself) &&
3822         server.cluster->state == CLUSTER_FAIL &&
3823         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3824 
3825     /* Start assuming the state is OK. We'll turn it into FAIL if there
3826      * are the right conditions. */
3827     new_state = CLUSTER_OK;
3828 
3829     /* Check if all the slots are covered. */
3830     if (server.cluster_require_full_coverage) {
3831         for (j = 0; j < CLUSTER_SLOTS; j++) {
3832             if (server.cluster->slots[j] == NULL ||
3833                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3834             {
3835                 new_state = CLUSTER_FAIL;
3836                 break;
3837             }
3838         }
3839     }
3840 
3841     /* Compute the cluster size, that is the number of master nodes
3842      * serving at least a single slot.
3843      *
3844      * At the same time count the number of reachable masters having
3845      * at least one slot. */
3846     {
3847         dictIterator *di;
3848         dictEntry *de;
3849 
3850         server.cluster->size = 0;
3851         di = dictGetSafeIterator(server.cluster->nodes);
3852         while((de = dictNext(di)) != NULL) {
3853             clusterNode *node = dictGetVal(de);
3854 
3855             if (nodeIsMaster(node) && node->numslots) {
3856                 server.cluster->size++;
3857                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3858                     reachable_masters++;
3859             }
3860         }
3861         dictReleaseIterator(di);
3862     }
3863 
3864     /* If we are in a minority partition, change the cluster state
3865      * to FAIL. */
3866     {
3867         int needed_quorum = (server.cluster->size / 2) + 1;
3868 
3869         if (reachable_masters < needed_quorum) {
3870             new_state = CLUSTER_FAIL;
3871             among_minority_time = mstime();
3872         }
3873     }
3874 
3875     /* Log a state change */
3876     if (new_state != server.cluster->state) {
3877         mstime_t rejoin_delay = server.cluster_node_timeout;
3878 
3879         /* If the instance is a master and was partitioned away with the
3880          * minority, don't let it accept queries for some time after the
3881          * partition heals, to make sure there is enough time to receive
3882          * a configuration update. */
3883         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3884             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3885         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3886             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3887 
3888         if (new_state == CLUSTER_OK &&
3889             nodeIsMaster(myself) &&
3890             mstime() - among_minority_time < rejoin_delay)
3891         {
3892             return;
3893         }
3894 
3895         /* Change the state and log the event. */
3896         serverLog(LL_WARNING,"Cluster state changed: %s",
3897             new_state == CLUSTER_OK ? "ok" : "fail");
3898         server.cluster->state = new_state;
3899     }
3900 }
3901 
3902 /* This function is called after the node startup in order to verify that data
3903  * loaded from disk is in agreement with the cluster configuration:
3904  *
3905  * 1) If we find keys about hash slots we have no responsibility for, the
3906  *    following happens:
3907  *    A) If no other node is in charge according to the current cluster
3908  *       configuration, we add these slots to our node.
3909  *    B) If according to our config other nodes are already in charge for
3910  *       this lots, we set the slots as IMPORTING from our point of view
3911  *       in order to justify we have those slots, and in order to make
3912  *       redis-trib aware of the issue, so that it can try to fix it.
3913  * 2) If we find data in a DB different than DB0 we return C_ERR to
3914  *    signal the caller it should quit the server with an error message
3915  *    or take other actions.
3916  *
3917  * The function always returns C_OK even if it will try to correct
3918  * the error described in "1". However if data is found in DB different
3919  * from DB0, C_ERR is returned.
3920  *
3921  * The function also uses the logging facility in order to warn the user
3922  * about desynchronizations between the data we have in memory and the
3923  * cluster configuration. */
verifyClusterConfigWithData(void)3924 int verifyClusterConfigWithData(void) {
3925     int j;
3926     int update_config = 0;
3927 
3928     /* Return ASAP if a module disabled cluster redirections. In that case
3929      * every master can store keys about every possible hash slot. */
3930     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
3931         return C_OK;
3932 
3933     /* If this node is a slave, don't perform the check at all as we
3934      * completely depend on the replication stream. */
3935     if (nodeIsSlave(myself)) return C_OK;
3936 
3937     /* Make sure we only have keys in DB0. */
3938     for (j = 1; j < server.dbnum; j++) {
3939         if (dictSize(server.db[j].dict)) return C_ERR;
3940     }
3941 
3942     /* Check that all the slots we see populated memory have a corresponding
3943      * entry in the cluster table. Otherwise fix the table. */
3944     for (j = 0; j < CLUSTER_SLOTS; j++) {
3945         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3946         /* Check if we are assigned to this slot or if we are importing it.
3947          * In both cases check the next slot as the configuration makes
3948          * sense. */
3949         if (server.cluster->slots[j] == myself ||
3950             server.cluster->importing_slots_from[j] != NULL) continue;
3951 
3952         /* If we are here data and cluster config don't agree, and we have
3953          * slot 'j' populated even if we are not importing it, nor we are
3954          * assigned to this slot. Fix this condition. */
3955 
3956         update_config++;
3957         /* Case A: slot is unassigned. Take responsibility for it. */
3958         if (server.cluster->slots[j] == NULL) {
3959             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3960                                     "Taking responsibility for it.",j);
3961             clusterAddSlot(myself,j);
3962         } else {
3963             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3964                                     "assigned to another node. "
3965                                     "Setting it to importing state.",j);
3966             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3967         }
3968     }
3969     if (update_config) clusterSaveConfigOrDie(1);
3970     return C_OK;
3971 }
3972 
3973 /* -----------------------------------------------------------------------------
3974  * SLAVE nodes handling
3975  * -------------------------------------------------------------------------- */
3976 
3977 /* Set the specified node 'n' as master for this node.
3978  * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3979 void clusterSetMaster(clusterNode *n) {
3980     serverAssert(n != myself);
3981     serverAssert(myself->numslots == 0);
3982 
3983     if (nodeIsMaster(myself)) {
3984         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3985         myself->flags |= CLUSTER_NODE_SLAVE;
3986         clusterCloseAllSlots();
3987     } else {
3988         if (myself->slaveof)
3989             clusterNodeRemoveSlave(myself->slaveof,myself);
3990     }
3991     myself->slaveof = n;
3992     clusterNodeAddSlave(n,myself);
3993     replicationSetMaster(n->ip, n->port);
3994     resetManualFailover();
3995 }
3996 
3997 /* -----------------------------------------------------------------------------
3998  * Nodes to string representation functions.
3999  * -------------------------------------------------------------------------- */
4000 
4001 struct redisNodeFlags {
4002     uint16_t flag;
4003     char *name;
4004 };
4005 
4006 static struct redisNodeFlags redisNodeFlagsTable[] = {
4007     {CLUSTER_NODE_MYSELF,       "myself,"},
4008     {CLUSTER_NODE_MASTER,       "master,"},
4009     {CLUSTER_NODE_SLAVE,        "slave,"},
4010     {CLUSTER_NODE_PFAIL,        "fail?,"},
4011     {CLUSTER_NODE_FAIL,         "fail,"},
4012     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
4013     {CLUSTER_NODE_NOADDR,       "noaddr,"},
4014     {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
4015 };
4016 
4017 /* Concatenate the comma separated list of node flags to the given SDS
4018  * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4019 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4020     size_t orig_len = sdslen(ci);
4021     int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4022     for (i = 0; i < size; i++) {
4023         struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4024         if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4025     }
4026     /* If no flag was added, add the "noflags" special flag. */
4027     if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4028     sdsIncrLen(ci,-1); /* Remove trailing comma. */
4029     return ci;
4030 }
4031 
4032 /* Generate a csv-alike representation of the specified cluster node.
4033  * See clusterGenNodesDescription() top comment for more information.
4034  *
4035  * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)4036 sds clusterGenNodeDescription(clusterNode *node) {
4037     int j, start;
4038     sds ci;
4039 
4040     /* Node coordinates */
4041     ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
4042         node->name,
4043         node->ip,
4044         node->port,
4045         node->cport);
4046 
4047     /* Flags */
4048     ci = representClusterNodeFlags(ci, node->flags);
4049 
4050     /* Slave of... or just "-" */
4051     if (node->slaveof)
4052         ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4053     else
4054         ci = sdscatlen(ci," - ",3);
4055 
4056     /* Latency from the POV of this node, config epoch, link status */
4057     ci = sdscatprintf(ci,"%lld %lld %llu %s",
4058         (long long) node->ping_sent,
4059         (long long) node->pong_received,
4060         (unsigned long long) node->configEpoch,
4061         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4062                     "connected" : "disconnected");
4063 
4064     /* Slots served by this instance */
4065     start = -1;
4066     for (j = 0; j < CLUSTER_SLOTS; j++) {
4067         int bit;
4068 
4069         if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4070             if (start == -1) start = j;
4071         }
4072         if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4073             if (bit && j == CLUSTER_SLOTS-1) j++;
4074 
4075             if (start == j-1) {
4076                 ci = sdscatprintf(ci," %d",start);
4077             } else {
4078                 ci = sdscatprintf(ci," %d-%d",start,j-1);
4079             }
4080             start = -1;
4081         }
4082     }
4083 
4084     /* Just for MYSELF node we also dump info about slots that
4085      * we are migrating to other instances or importing from other
4086      * instances. */
4087     if (node->flags & CLUSTER_NODE_MYSELF) {
4088         for (j = 0; j < CLUSTER_SLOTS; j++) {
4089             if (server.cluster->migrating_slots_to[j]) {
4090                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4091                     server.cluster->migrating_slots_to[j]->name);
4092             } else if (server.cluster->importing_slots_from[j]) {
4093                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4094                     server.cluster->importing_slots_from[j]->name);
4095             }
4096         }
4097     }
4098     return ci;
4099 }
4100 
4101 /* Generate a csv-alike representation of the nodes we are aware of,
4102  * including the "myself" node, and return an SDS string containing the
4103  * representation (it is up to the caller to free it).
4104  *
4105  * All the nodes matching at least one of the node flags specified in
4106  * "filter" are excluded from the output, so using zero as a filter will
4107  * include all the known nodes in the representation, including nodes in
4108  * the HANDSHAKE state.
4109  *
4110  * The representation obtained using this function is used for the output
4111  * of the CLUSTER NODES function, and as format for the cluster
4112  * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4113 sds clusterGenNodesDescription(int filter) {
4114     sds ci = sdsempty(), ni;
4115     dictIterator *di;
4116     dictEntry *de;
4117 
4118     di = dictGetSafeIterator(server.cluster->nodes);
4119     while((de = dictNext(di)) != NULL) {
4120         clusterNode *node = dictGetVal(de);
4121 
4122         if (node->flags & filter) continue;
4123         ni = clusterGenNodeDescription(node);
4124         ci = sdscatsds(ci,ni);
4125         sdsfree(ni);
4126         ci = sdscatlen(ci,"\n",1);
4127     }
4128     dictReleaseIterator(di);
4129     return ci;
4130 }
4131 
4132 /* -----------------------------------------------------------------------------
4133  * CLUSTER command
4134  * -------------------------------------------------------------------------- */
4135 
clusterGetMessageTypeString(int type)4136 const char *clusterGetMessageTypeString(int type) {
4137     switch(type) {
4138     case CLUSTERMSG_TYPE_PING: return "ping";
4139     case CLUSTERMSG_TYPE_PONG: return "pong";
4140     case CLUSTERMSG_TYPE_MEET: return "meet";
4141     case CLUSTERMSG_TYPE_FAIL: return "fail";
4142     case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4143     case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4144     case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4145     case CLUSTERMSG_TYPE_UPDATE: return "update";
4146     case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4147     case CLUSTERMSG_TYPE_MODULE: return "module";
4148     }
4149     return "unknown";
4150 }
4151 
getSlotOrReply(client * c,robj * o)4152 int getSlotOrReply(client *c, robj *o) {
4153     long long slot;
4154 
4155     if (getLongLongFromObject(o,&slot) != C_OK ||
4156         slot < 0 || slot >= CLUSTER_SLOTS)
4157     {
4158         addReplyError(c,"Invalid or out of range slot");
4159         return -1;
4160     }
4161     return (int) slot;
4162 }
4163 
clusterReplyMultiBulkSlots(client * c)4164 void clusterReplyMultiBulkSlots(client *c) {
4165     /* Format: 1) 1) start slot
4166      *            2) end slot
4167      *            3) 1) master IP
4168      *               2) master port
4169      *               3) node ID
4170      *            4) 1) replica IP
4171      *               2) replica port
4172      *               3) node ID
4173      *           ... continued until done
4174      */
4175 
4176     int num_masters = 0;
4177     void *slot_replylen = addDeferredMultiBulkLength(c);
4178 
4179     dictEntry *de;
4180     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4181     while((de = dictNext(di)) != NULL) {
4182         clusterNode *node = dictGetVal(de);
4183         int j = 0, start = -1;
4184         int i, nested_elements = 0;
4185 
4186         /* Skip slaves (that are iterated when producing the output of their
4187          * master) and  masters not serving any slot. */
4188         if (!nodeIsMaster(node) || node->numslots == 0) continue;
4189 
4190         for(i = 0; i < node->numslaves; i++) {
4191             if (nodeFailed(node->slaves[i])) continue;
4192             nested_elements++;
4193         }
4194 
4195         for (j = 0; j < CLUSTER_SLOTS; j++) {
4196             int bit, i;
4197 
4198             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4199                 if (start == -1) start = j;
4200             }
4201             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4202                 addReplyMultiBulkLen(c, nested_elements + 3); /* slots (2) + master addr (1). */
4203 
4204                 if (bit && j == CLUSTER_SLOTS-1) j++;
4205 
4206                 /* If slot exists in output map, add to it's list.
4207                  * else, create a new output map for this slot */
4208                 if (start == j-1) {
4209                     addReplyLongLong(c, start); /* only one slot; low==high */
4210                     addReplyLongLong(c, start);
4211                 } else {
4212                     addReplyLongLong(c, start); /* low */
4213                     addReplyLongLong(c, j-1);   /* high */
4214                 }
4215                 start = -1;
4216 
4217                 /* First node reply position is always the master */
4218                 addReplyMultiBulkLen(c, 3);
4219                 addReplyBulkCString(c, node->ip);
4220                 addReplyLongLong(c, node->port);
4221                 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4222 
4223                 /* Remaining nodes in reply are replicas for slot range */
4224                 for (i = 0; i < node->numslaves; i++) {
4225                     /* This loop is copy/pasted from clusterGenNodeDescription()
4226                      * with modifications for per-slot node aggregation */
4227                     if (nodeFailed(node->slaves[i])) continue;
4228                     addReplyMultiBulkLen(c, 3);
4229                     addReplyBulkCString(c, node->slaves[i]->ip);
4230                     addReplyLongLong(c, node->slaves[i]->port);
4231                     addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4232                 }
4233                 num_masters++;
4234             }
4235         }
4236     }
4237     dictReleaseIterator(di);
4238     setDeferredMultiBulkLength(c, slot_replylen, num_masters);
4239 }
4240 
clusterCommand(client * c)4241 void clusterCommand(client *c) {
4242     if (server.cluster_enabled == 0) {
4243         addReplyError(c,"This instance has cluster support disabled");
4244         return;
4245     }
4246 
4247     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4248         const char *help[] = {
4249 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4250 "BUMPEPOCH -- Advance the cluster config epoch.",
4251 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4252 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4253 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4254 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4255 "FORGET <node-id> -- Remove a node from the cluster.",
4256 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4257 "FLUSHSLOTS -- Delete current node own slots information.",
4258 "INFO - Return onformation about the cluster.",
4259 "KEYSLOT <key> -- Return the hash slot for <key>.",
4260 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4261 "MYID -- Return the node id.",
4262 "NODES -- Return cluster configuration seen by node. Output format:",
4263 "    <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4264 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4265 "RESET [hard|soft] -- Reset current node (default: soft).",
4266 "SET-config-epoch <epoch> - Set config epoch of current node.",
4267 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4268 "REPLICAS <node-id> -- Return <node-id> replicas.",
4269 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4270 "    start, end, master and replicas IP addresses, ports and ids",
4271 NULL
4272         };
4273         addReplyHelp(c, help);
4274     } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4275         /* CLUSTER MEET <ip> <port> [cport] */
4276         long long port, cport;
4277 
4278         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4279             addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4280                                 (char*)c->argv[3]->ptr);
4281             return;
4282         }
4283 
4284         if (c->argc == 5) {
4285             if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4286                 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4287                                     (char*)c->argv[4]->ptr);
4288                 return;
4289             }
4290         } else {
4291             cport = port + CLUSTER_PORT_INCR;
4292         }
4293 
4294         if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4295             errno == EINVAL)
4296         {
4297             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4298                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4299         } else {
4300             addReply(c,shared.ok);
4301         }
4302     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4303         /* CLUSTER NODES */
4304         robj *o;
4305         sds ci = clusterGenNodesDescription(0);
4306 
4307         o = createObject(OBJ_STRING,ci);
4308         addReplyBulk(c,o);
4309         decrRefCount(o);
4310     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4311         /* CLUSTER MYID */
4312         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4313     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4314         /* CLUSTER SLOTS */
4315         clusterReplyMultiBulkSlots(c);
4316     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4317         /* CLUSTER FLUSHSLOTS */
4318         if (dictSize(server.db[0].dict) != 0) {
4319             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4320             return;
4321         }
4322         clusterDelNodeSlots(myself);
4323         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4324         addReply(c,shared.ok);
4325     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4326                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4327     {
4328         /* CLUSTER ADDSLOTS <slot> [slot] ... */
4329         /* CLUSTER DELSLOTS <slot> [slot] ... */
4330         int j, slot;
4331         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4332         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4333 
4334         memset(slots,0,CLUSTER_SLOTS);
4335         /* Check that all the arguments are parseable and that all the
4336          * slots are not already busy. */
4337         for (j = 2; j < c->argc; j++) {
4338             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4339                 zfree(slots);
4340                 return;
4341             }
4342             if (del && server.cluster->slots[slot] == NULL) {
4343                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4344                 zfree(slots);
4345                 return;
4346             } else if (!del && server.cluster->slots[slot]) {
4347                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4348                 zfree(slots);
4349                 return;
4350             }
4351             if (slots[slot]++ == 1) {
4352                 addReplyErrorFormat(c,"Slot %d specified multiple times",
4353                     (int)slot);
4354                 zfree(slots);
4355                 return;
4356             }
4357         }
4358         for (j = 0; j < CLUSTER_SLOTS; j++) {
4359             if (slots[j]) {
4360                 int retval;
4361 
4362                 /* If this slot was set as importing we can clear this
4363                  * state as now we are the real owner of the slot. */
4364                 if (server.cluster->importing_slots_from[j])
4365                     server.cluster->importing_slots_from[j] = NULL;
4366 
4367                 retval = del ? clusterDelSlot(j) :
4368                                clusterAddSlot(myself,j);
4369                 serverAssertWithInfo(c,NULL,retval == C_OK);
4370             }
4371         }
4372         zfree(slots);
4373         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4374         addReply(c,shared.ok);
4375     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4376         /* SETSLOT 10 MIGRATING <node ID> */
4377         /* SETSLOT 10 IMPORTING <node ID> */
4378         /* SETSLOT 10 STABLE */
4379         /* SETSLOT 10 NODE <node ID> */
4380         int slot;
4381         clusterNode *n;
4382 
4383         if (nodeIsSlave(myself)) {
4384             addReplyError(c,"Please use SETSLOT only with masters.");
4385             return;
4386         }
4387 
4388         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4389 
4390         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4391             if (server.cluster->slots[slot] != myself) {
4392                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4393                 return;
4394             }
4395             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4396                 addReplyErrorFormat(c,"I don't know about node %s",
4397                     (char*)c->argv[4]->ptr);
4398                 return;
4399             }
4400             server.cluster->migrating_slots_to[slot] = n;
4401         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4402             if (server.cluster->slots[slot] == myself) {
4403                 addReplyErrorFormat(c,
4404                     "I'm already the owner of hash slot %u",slot);
4405                 return;
4406             }
4407             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4408                 addReplyErrorFormat(c,"I don't know about node %s",
4409                     (char*)c->argv[4]->ptr);
4410                 return;
4411             }
4412             server.cluster->importing_slots_from[slot] = n;
4413         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4414             /* CLUSTER SETSLOT <SLOT> STABLE */
4415             server.cluster->importing_slots_from[slot] = NULL;
4416             server.cluster->migrating_slots_to[slot] = NULL;
4417         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4418             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4419             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4420 
4421             if (!n) {
4422                 addReplyErrorFormat(c,"Unknown node %s",
4423                     (char*)c->argv[4]->ptr);
4424                 return;
4425             }
4426             /* If this hash slot was served by 'myself' before to switch
4427              * make sure there are no longer local keys for this hash slot. */
4428             if (server.cluster->slots[slot] == myself && n != myself) {
4429                 if (countKeysInSlot(slot) != 0) {
4430                     addReplyErrorFormat(c,
4431                         "Can't assign hashslot %d to a different node "
4432                         "while I still hold keys for this hash slot.", slot);
4433                     return;
4434                 }
4435             }
4436             /* If this slot is in migrating status but we have no keys
4437              * for it assigning the slot to another node will clear
4438              * the migratig status. */
4439             if (countKeysInSlot(slot) == 0 &&
4440                 server.cluster->migrating_slots_to[slot])
4441                 server.cluster->migrating_slots_to[slot] = NULL;
4442 
4443             /* If this node was importing this slot, assigning the slot to
4444              * itself also clears the importing status. */
4445             if (n == myself &&
4446                 server.cluster->importing_slots_from[slot])
4447             {
4448                 /* This slot was manually migrated, set this node configEpoch
4449                  * to a new epoch so that the new version can be propagated
4450                  * by the cluster.
4451                  *
4452                  * Note that if this ever results in a collision with another
4453                  * node getting the same configEpoch, for example because a
4454                  * failover happens at the same time we close the slot, the
4455                  * configEpoch collision resolution will fix it assigning
4456                  * a different epoch to each node. */
4457                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4458                     serverLog(LL_WARNING,
4459                         "configEpoch updated after importing slot %d", slot);
4460                 }
4461                 server.cluster->importing_slots_from[slot] = NULL;
4462             }
4463             clusterDelSlot(slot);
4464             clusterAddSlot(n,slot);
4465         } else {
4466             addReplyError(c,
4467                 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4468             return;
4469         }
4470         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4471         addReply(c,shared.ok);
4472     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4473         /* CLUSTER BUMPEPOCH */
4474         int retval = clusterBumpConfigEpochWithoutConsensus();
4475         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4476                 (retval == C_OK) ? "BUMPED" : "STILL",
4477                 (unsigned long long) myself->configEpoch);
4478         addReplySds(c,reply);
4479     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4480         /* CLUSTER INFO */
4481         char *statestr[] = {"ok","fail","needhelp"};
4482         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4483         uint64_t myepoch;
4484         int j;
4485 
4486         for (j = 0; j < CLUSTER_SLOTS; j++) {
4487             clusterNode *n = server.cluster->slots[j];
4488 
4489             if (n == NULL) continue;
4490             slots_assigned++;
4491             if (nodeFailed(n)) {
4492                 slots_fail++;
4493             } else if (nodeTimedOut(n)) {
4494                 slots_pfail++;
4495             } else {
4496                 slots_ok++;
4497             }
4498         }
4499 
4500         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4501                   myself->slaveof->configEpoch : myself->configEpoch;
4502 
4503         sds info = sdscatprintf(sdsempty(),
4504             "cluster_state:%s\r\n"
4505             "cluster_slots_assigned:%d\r\n"
4506             "cluster_slots_ok:%d\r\n"
4507             "cluster_slots_pfail:%d\r\n"
4508             "cluster_slots_fail:%d\r\n"
4509             "cluster_known_nodes:%lu\r\n"
4510             "cluster_size:%d\r\n"
4511             "cluster_current_epoch:%llu\r\n"
4512             "cluster_my_epoch:%llu\r\n"
4513             , statestr[server.cluster->state],
4514             slots_assigned,
4515             slots_ok,
4516             slots_pfail,
4517             slots_fail,
4518             dictSize(server.cluster->nodes),
4519             server.cluster->size,
4520             (unsigned long long) server.cluster->currentEpoch,
4521             (unsigned long long) myepoch
4522         );
4523 
4524         /* Show stats about messages sent and received. */
4525         long long tot_msg_sent = 0;
4526         long long tot_msg_received = 0;
4527 
4528         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4529             if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4530             tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4531             info = sdscatprintf(info,
4532                 "cluster_stats_messages_%s_sent:%lld\r\n",
4533                 clusterGetMessageTypeString(i),
4534                 server.cluster->stats_bus_messages_sent[i]);
4535         }
4536         info = sdscatprintf(info,
4537             "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4538 
4539         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4540             if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4541             tot_msg_received += server.cluster->stats_bus_messages_received[i];
4542             info = sdscatprintf(info,
4543                 "cluster_stats_messages_%s_received:%lld\r\n",
4544                 clusterGetMessageTypeString(i),
4545                 server.cluster->stats_bus_messages_received[i]);
4546         }
4547         info = sdscatprintf(info,
4548             "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4549 
4550         /* Produce the reply protocol. */
4551         addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4552             (unsigned long)sdslen(info)));
4553         addReplySds(c,info);
4554         addReply(c,shared.crlf);
4555     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4556         int retval = clusterSaveConfig(1);
4557 
4558         if (retval == 0)
4559             addReply(c,shared.ok);
4560         else
4561             addReplyErrorFormat(c,"error saving the cluster node config: %s",
4562                 strerror(errno));
4563     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4564         /* CLUSTER KEYSLOT <key> */
4565         sds key = c->argv[2]->ptr;
4566 
4567         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4568     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4569         /* CLUSTER COUNTKEYSINSLOT <slot> */
4570         long long slot;
4571 
4572         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4573             return;
4574         if (slot < 0 || slot >= CLUSTER_SLOTS) {
4575             addReplyError(c,"Invalid slot");
4576             return;
4577         }
4578         addReplyLongLong(c,countKeysInSlot(slot));
4579     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4580         /* CLUSTER GETKEYSINSLOT <slot> <count> */
4581         long long maxkeys, slot;
4582         unsigned int numkeys, j;
4583         robj **keys;
4584 
4585         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4586             return;
4587         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4588             != C_OK)
4589             return;
4590         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4591             addReplyError(c,"Invalid slot or number of keys");
4592             return;
4593         }
4594 
4595         /* Avoid allocating more than needed in case of large COUNT argument
4596          * and smaller actual number of keys. */
4597         unsigned int keys_in_slot = countKeysInSlot(slot);
4598         if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4599 
4600         keys = zmalloc(sizeof(robj*)*maxkeys);
4601         numkeys = getKeysInSlot(slot, keys, maxkeys);
4602         addReplyMultiBulkLen(c,numkeys);
4603         for (j = 0; j < numkeys; j++) {
4604             addReplyBulk(c,keys[j]);
4605             decrRefCount(keys[j]);
4606         }
4607         zfree(keys);
4608     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4609         /* CLUSTER FORGET <NODE ID> */
4610         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4611 
4612         if (!n) {
4613             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4614             return;
4615         } else if (n == myself) {
4616             addReplyError(c,"I tried hard but I can't forget myself...");
4617             return;
4618         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4619             addReplyError(c,"Can't forget my master!");
4620             return;
4621         }
4622         clusterBlacklistAddNode(n);
4623         clusterDelNode(n);
4624         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4625                              CLUSTER_TODO_SAVE_CONFIG);
4626         addReply(c,shared.ok);
4627     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4628         /* CLUSTER REPLICATE <NODE ID> */
4629         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4630 
4631         /* Lookup the specified node in our table. */
4632         if (!n) {
4633             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4634             return;
4635         }
4636 
4637         /* I can't replicate myself. */
4638         if (n == myself) {
4639             addReplyError(c,"Can't replicate myself");
4640             return;
4641         }
4642 
4643         /* Can't replicate a slave. */
4644         if (nodeIsSlave(n)) {
4645             addReplyError(c,"I can only replicate a master, not a replica.");
4646             return;
4647         }
4648 
4649         /* If the instance is currently a master, it should have no assigned
4650          * slots nor keys to accept to replicate some other node.
4651          * Slaves can switch to another master without issues. */
4652         if (nodeIsMaster(myself) &&
4653             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4654             addReplyError(c,
4655                 "To set a master the node must be empty and "
4656                 "without assigned slots.");
4657             return;
4658         }
4659 
4660         /* Set the master. */
4661         clusterSetMaster(n);
4662         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4663         addReply(c,shared.ok);
4664     } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4665                 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4666         /* CLUSTER SLAVES <NODE ID> */
4667         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4668         int j;
4669 
4670         /* Lookup the specified node in our table. */
4671         if (!n) {
4672             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4673             return;
4674         }
4675 
4676         if (nodeIsSlave(n)) {
4677             addReplyError(c,"The specified node is not a master");
4678             return;
4679         }
4680 
4681         addReplyMultiBulkLen(c,n->numslaves);
4682         for (j = 0; j < n->numslaves; j++) {
4683             sds ni = clusterGenNodeDescription(n->slaves[j]);
4684             addReplyBulkCString(c,ni);
4685             sdsfree(ni);
4686         }
4687     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4688                c->argc == 3)
4689     {
4690         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4691         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4692 
4693         if (!n) {
4694             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4695             return;
4696         } else {
4697             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4698         }
4699     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4700                (c->argc == 2 || c->argc == 3))
4701     {
4702         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4703         int force = 0, takeover = 0;
4704 
4705         if (c->argc == 3) {
4706             if (!strcasecmp(c->argv[2]->ptr,"force")) {
4707                 force = 1;
4708             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4709                 takeover = 1;
4710                 force = 1; /* Takeover also implies force. */
4711             } else {
4712                 addReply(c,shared.syntaxerr);
4713                 return;
4714             }
4715         }
4716 
4717         /* Check preconditions. */
4718         if (nodeIsMaster(myself)) {
4719             addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4720             return;
4721         } else if (myself->slaveof == NULL) {
4722             addReplyError(c,"I'm a replica but my master is unknown to me");
4723             return;
4724         } else if (!force &&
4725                    (nodeFailed(myself->slaveof) ||
4726                     myself->slaveof->link == NULL))
4727         {
4728             addReplyError(c,"Master is down or failed, "
4729                             "please use CLUSTER FAILOVER FORCE");
4730             return;
4731         }
4732         resetManualFailover();
4733         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4734 
4735         if (takeover) {
4736             /* A takeover does not perform any initial check. It just
4737              * generates a new configuration epoch for this node without
4738              * consensus, claims the master's slots, and broadcast the new
4739              * configuration. */
4740             serverLog(LL_WARNING,"Taking over the master (user request).");
4741             clusterBumpConfigEpochWithoutConsensus();
4742             clusterFailoverReplaceYourMaster();
4743         } else if (force) {
4744             /* If this is a forced failover, we don't need to talk with our
4745              * master to agree about the offset. We just failover taking over
4746              * it without coordination. */
4747             serverLog(LL_WARNING,"Forced failover user request accepted.");
4748             server.cluster->mf_can_start = 1;
4749         } else {
4750             serverLog(LL_WARNING,"Manual failover user request accepted.");
4751             clusterSendMFStart(myself->slaveof);
4752         }
4753         addReply(c,shared.ok);
4754     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4755     {
4756         /* CLUSTER SET-CONFIG-EPOCH <epoch>
4757          *
4758          * The user is allowed to set the config epoch only when a node is
4759          * totally fresh: no config epoch, no other known node, and so forth.
4760          * This happens at cluster creation time to start with a cluster where
4761          * every node has a different node ID, without to rely on the conflicts
4762          * resolution system which is too slow when a big cluster is created. */
4763         long long epoch;
4764 
4765         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4766             return;
4767 
4768         if (epoch < 0) {
4769             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4770         } else if (dictSize(server.cluster->nodes) > 1) {
4771             addReplyError(c,"The user can assign a config epoch only when the "
4772                             "node does not know any other node.");
4773         } else if (myself->configEpoch != 0) {
4774             addReplyError(c,"Node config epoch is already non-zero");
4775         } else {
4776             myself->configEpoch = epoch;
4777             serverLog(LL_WARNING,
4778                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4779                 (unsigned long long) myself->configEpoch);
4780 
4781             if (server.cluster->currentEpoch < (uint64_t)epoch)
4782                 server.cluster->currentEpoch = epoch;
4783             /* No need to fsync the config here since in the unlucky event
4784              * of a failure to persist the config, the conflict resolution code
4785              * will assign an unique config to this node. */
4786             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4787                                  CLUSTER_TODO_SAVE_CONFIG);
4788             addReply(c,shared.ok);
4789         }
4790     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4791                (c->argc == 2 || c->argc == 3))
4792     {
4793         /* CLUSTER RESET [SOFT|HARD] */
4794         int hard = 0;
4795 
4796         /* Parse soft/hard argument. Default is soft. */
4797         if (c->argc == 3) {
4798             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4799                 hard = 1;
4800             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4801                 hard = 0;
4802             } else {
4803                 addReply(c,shared.syntaxerr);
4804                 return;
4805             }
4806         }
4807 
4808         /* Slaves can be reset while containing data, but not master nodes
4809          * that must be empty. */
4810         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4811             addReplyError(c,"CLUSTER RESET can't be called with "
4812                             "master nodes containing keys");
4813             return;
4814         }
4815         clusterReset(hard);
4816         addReply(c,shared.ok);
4817     } else {
4818         addReplySubcommandSyntaxError(c);
4819         return;
4820     }
4821 }
4822 
4823 /* -----------------------------------------------------------------------------
4824  * DUMP, RESTORE and MIGRATE commands
4825  * -------------------------------------------------------------------------- */
4826 
4827 /* Generates a DUMP-format representation of the object 'o', adding it to the
4828  * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4829 void createDumpPayload(rio *payload, robj *o, robj *key) {
4830     unsigned char buf[2];
4831     uint64_t crc;
4832 
4833     /* Serialize the object in a RDB-like format. It consist of an object type
4834      * byte followed by the serialized object. This is understood by RESTORE. */
4835     rioInitWithBuffer(payload,sdsempty());
4836     serverAssert(rdbSaveObjectType(payload,o));
4837     serverAssert(rdbSaveObject(payload,o,key));
4838 
4839     /* Write the footer, this is how it looks like:
4840      * ----------------+---------------------+---------------+
4841      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4842      * ----------------+---------------------+---------------+
4843      * RDB version and CRC are both in little endian.
4844      */
4845 
4846     /* RDB version */
4847     buf[0] = RDB_VERSION & 0xff;
4848     buf[1] = (RDB_VERSION >> 8) & 0xff;
4849     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4850 
4851     /* CRC64 */
4852     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4853                 sdslen(payload->io.buffer.ptr));
4854     memrev64ifbe(&crc);
4855     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4856 }
4857 
4858 /* Verify that the RDB version of the dump payload matches the one of this Redis
4859  * instance and that the checksum is ok.
4860  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4861  * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4862 int verifyDumpPayload(unsigned char *p, size_t len) {
4863     unsigned char *footer;
4864     uint16_t rdbver;
4865     uint64_t crc;
4866 
4867     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4868     if (len < 10) return C_ERR;
4869     footer = p+(len-10);
4870 
4871     /* Verify RDB version */
4872     rdbver = (footer[1] << 8) | footer[0];
4873     if (rdbver > RDB_VERSION) return C_ERR;
4874 
4875     /* Verify CRC64 */
4876     crc = crc64(0,p,len-8);
4877     memrev64ifbe(&crc);
4878     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4879 }
4880 
4881 /* DUMP keyname
4882  * DUMP is actually not used by Redis Cluster but it is the obvious
4883  * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4884 void dumpCommand(client *c) {
4885     robj *o, *dumpobj;
4886     rio payload;
4887 
4888     /* Check if the key is here. */
4889     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4890         addReply(c,shared.nullbulk);
4891         return;
4892     }
4893 
4894     /* Create the DUMP encoded representation. */
4895     createDumpPayload(&payload,o,c->argv[1]);
4896 
4897     /* Transfer to the client */
4898     dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4899     addReplyBulk(c,dumpobj);
4900     decrRefCount(dumpobj);
4901     return;
4902 }
4903 
4904 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4905 void restoreCommand(client *c) {
4906     long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4907     rio payload;
4908     int j, type, replace = 0, absttl = 0;
4909     robj *obj;
4910 
4911     /* Parse additional options */
4912     for (j = 4; j < c->argc; j++) {
4913         int additional = c->argc-j-1;
4914         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4915             replace = 1;
4916         } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
4917             absttl = 1;
4918         } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
4919                    lfu_freq == -1)
4920         {
4921             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
4922                     != C_OK) return;
4923             if (lru_idle < 0) {
4924                 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
4925                 return;
4926             }
4927             lru_clock = LRU_CLOCK();
4928             j++; /* Consume additional arg. */
4929         } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
4930                    lru_idle == -1)
4931         {
4932             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
4933                     != C_OK) return;
4934             if (lfu_freq < 0 || lfu_freq > 255) {
4935                 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
4936                 return;
4937             }
4938             j++; /* Consume additional arg. */
4939         } else {
4940             addReply(c,shared.syntaxerr);
4941             return;
4942         }
4943     }
4944 
4945     /* Make sure this key does not already exist here... */
4946     robj *key = c->argv[1];
4947     if (!replace && lookupKeyWrite(c->db,key) != NULL) {
4948         addReply(c,shared.busykeyerr);
4949         return;
4950     }
4951 
4952     /* Check if the TTL value makes sense */
4953     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4954         return;
4955     } else if (ttl < 0) {
4956         addReplyError(c,"Invalid TTL value, must be >= 0");
4957         return;
4958     }
4959 
4960     /* Verify RDB version and data checksum. */
4961     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4962     {
4963         addReplyError(c,"DUMP payload version or checksum are wrong");
4964         return;
4965     }
4966 
4967     rioInitWithBuffer(&payload,c->argv[3]->ptr);
4968     if (((type = rdbLoadObjectType(&payload)) == -1) ||
4969         ((obj = rdbLoadObject(type,&payload,key)) == NULL))
4970     {
4971         addReplyError(c,"Bad data format");
4972         return;
4973     }
4974 
4975     /* Remove the old key if needed. */
4976     int deleted = 0;
4977     if (replace)
4978         deleted = dbDelete(c->db,key);
4979 
4980     if (ttl && !absttl) ttl+=mstime();
4981     if (ttl && checkAlreadyExpired(ttl)) {
4982         if (deleted) {
4983             rewriteClientCommandVector(c,2,shared.del,key);
4984             signalModifiedKey(c->db,key);
4985             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
4986             server.dirty++;
4987         }
4988         decrRefCount(obj);
4989         addReply(c, shared.ok);
4990         return;
4991     }
4992 
4993     /* Create the key and set the TTL if any */
4994     dbAdd(c->db,key,obj);
4995     if (ttl) {
4996         setExpire(c,c->db,key,ttl);
4997     }
4998     objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
4999     signalModifiedKey(c->db,key);
5000     addReply(c,shared.ok);
5001     server.dirty++;
5002 }
5003 
5004 /* MIGRATE socket cache implementation.
5005  *
5006  * We take a map between host:ip and a TCP socket that we used to connect
5007  * to this instance in recent time.
5008  * This sockets are closed when the max number we cache is reached, and also
5009  * in serverCron() when they are around for more than a few seconds. */
5010 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5011 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5012 
5013 typedef struct migrateCachedSocket {
5014     int fd;
5015     long last_dbid;
5016     time_t last_use_time;
5017 } migrateCachedSocket;
5018 
5019 /* Return a migrateCachedSocket containing a TCP socket connected with the
5020  * target instance, possibly returning a cached one.
5021  *
5022  * This function is responsible of sending errors to the client if a
5023  * connection can't be established. In this case -1 is returned.
5024  * Otherwise on success the socket is returned, and the caller should not
5025  * attempt to free it after usage.
5026  *
5027  * If the caller detects an error while using the socket, migrateCloseSocket()
5028  * should be called so that the connection will be created from scratch
5029  * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5030 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5031     int fd;
5032     sds name = sdsempty();
5033     migrateCachedSocket *cs;
5034 
5035     /* Check if we have an already cached socket for this ip:port pair. */
5036     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5037     name = sdscatlen(name,":",1);
5038     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5039     cs = dictFetchValue(server.migrate_cached_sockets,name);
5040     if (cs) {
5041         sdsfree(name);
5042         cs->last_use_time = server.unixtime;
5043         return cs;
5044     }
5045 
5046     /* No cached socket, create one. */
5047     if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5048         /* Too many items, drop one at random. */
5049         dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5050         cs = dictGetVal(de);
5051         close(cs->fd);
5052         zfree(cs);
5053         dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5054     }
5055 
5056     /* Create the socket */
5057     fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
5058                                 atoi(c->argv[2]->ptr));
5059     if (fd == -1) {
5060         sdsfree(name);
5061         addReplyErrorFormat(c,"Can't connect to target node: %s",
5062             server.neterr);
5063         return NULL;
5064     }
5065     anetEnableTcpNoDelay(server.neterr,fd);
5066 
5067     /* Check if it connects within the specified timeout. */
5068     if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
5069         sdsfree(name);
5070         addReplySds(c,
5071             sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5072         close(fd);
5073         return NULL;
5074     }
5075 
5076     /* Add to the cache and return it to the caller. */
5077     cs = zmalloc(sizeof(*cs));
5078     cs->fd = fd;
5079     cs->last_dbid = -1;
5080     cs->last_use_time = server.unixtime;
5081     dictAdd(server.migrate_cached_sockets,name,cs);
5082     return cs;
5083 }
5084 
5085 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5086 void migrateCloseSocket(robj *host, robj *port) {
5087     sds name = sdsempty();
5088     migrateCachedSocket *cs;
5089 
5090     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5091     name = sdscatlen(name,":",1);
5092     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5093     cs = dictFetchValue(server.migrate_cached_sockets,name);
5094     if (!cs) {
5095         sdsfree(name);
5096         return;
5097     }
5098 
5099     close(cs->fd);
5100     zfree(cs);
5101     dictDelete(server.migrate_cached_sockets,name);
5102     sdsfree(name);
5103 }
5104 
migrateCloseTimedoutSockets(void)5105 void migrateCloseTimedoutSockets(void) {
5106     dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5107     dictEntry *de;
5108 
5109     while((de = dictNext(di)) != NULL) {
5110         migrateCachedSocket *cs = dictGetVal(de);
5111 
5112         if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5113             close(cs->fd);
5114             zfree(cs);
5115             dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5116         }
5117     }
5118     dictReleaseIterator(di);
5119 }
5120 
5121 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
5122  *
5123  * On in the multiple keys form:
5124  *
5125  * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1
5126  * key2 ... keyN */
migrateCommand(client * c)5127 void migrateCommand(client *c) {
5128     migrateCachedSocket *cs;
5129     int copy = 0, replace = 0, j;
5130     char *password = NULL;
5131     long timeout;
5132     long dbid;
5133     robj **ov = NULL; /* Objects to migrate. */
5134     robj **kv = NULL; /* Key names. */
5135     robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5136     rio cmd, payload;
5137     int may_retry = 1;
5138     int write_error = 0;
5139     int argv_rewritten = 0;
5140 
5141     /* To support the KEYS option we need the following additional state. */
5142     int first_key = 3; /* Argument index of the first key. */
5143     int num_keys = 1;  /* By default only migrate the 'key' argument. */
5144 
5145     /* Parse additional options */
5146     for (j = 6; j < c->argc; j++) {
5147         int moreargs = j < c->argc-1;
5148         if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5149             copy = 1;
5150         } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5151             replace = 1;
5152         } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5153             if (!moreargs) {
5154                 addReply(c,shared.syntaxerr);
5155                 return;
5156             }
5157             j++;
5158             password = c->argv[j]->ptr;
5159         } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5160             if (sdslen(c->argv[3]->ptr) != 0) {
5161                 addReplyError(c,
5162                     "When using MIGRATE KEYS option, the key argument"
5163                     " must be set to the empty string");
5164                 return;
5165             }
5166             first_key = j+1;
5167             num_keys = c->argc - j - 1;
5168             break; /* All the remaining args are keys. */
5169         } else {
5170             addReply(c,shared.syntaxerr);
5171             return;
5172         }
5173     }
5174 
5175     /* Sanity check */
5176     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5177         getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5178     {
5179         return;
5180     }
5181     if (timeout <= 0) timeout = 1000;
5182 
5183     /* Check if the keys are here. If at least one key is to migrate, do it
5184      * otherwise if all the keys are missing reply with "NOKEY" to signal
5185      * the caller there was nothing to migrate. We don't return an error in
5186      * this case, since often this is due to a normal condition like the key
5187      * expiring in the meantime. */
5188     ov = zrealloc(ov,sizeof(robj*)*num_keys);
5189     kv = zrealloc(kv,sizeof(robj*)*num_keys);
5190     int oi = 0;
5191 
5192     for (j = 0; j < num_keys; j++) {
5193         if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5194             kv[oi] = c->argv[first_key+j];
5195             oi++;
5196         }
5197     }
5198     num_keys = oi;
5199     if (num_keys == 0) {
5200         zfree(ov); zfree(kv);
5201         addReplySds(c,sdsnew("+NOKEY\r\n"));
5202         return;
5203     }
5204 
5205 try_again:
5206     write_error = 0;
5207 
5208     /* Connect */
5209     cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5210     if (cs == NULL) {
5211         zfree(ov); zfree(kv);
5212         return; /* error sent to the client by migrateGetSocket() */
5213     }
5214 
5215     rioInitWithBuffer(&cmd,sdsempty());
5216 
5217     /* Authentication */
5218     if (password) {
5219         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5220         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5221         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5222             sdslen(password)));
5223     }
5224 
5225     /* Send the SELECT command if the current DB is not already selected. */
5226     int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5227     if (select) {
5228         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5229         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5230         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5231     }
5232 
5233     int non_expired = 0; /* Number of keys that we'll find non expired.
5234                             Note that serializing large keys may take some time
5235                             so certain keys that were found non expired by the
5236                             lookupKey() function, may be expired later. */
5237 
5238     /* Create RESTORE payload and generate the protocol to call the command. */
5239     for (j = 0; j < num_keys; j++) {
5240         long long ttl = 0;
5241         long long expireat = getExpire(c->db,kv[j]);
5242 
5243         if (expireat != -1) {
5244             ttl = expireat-mstime();
5245             if (ttl < 0) {
5246                 continue;
5247             }
5248             if (ttl < 1) ttl = 1;
5249         }
5250 
5251         /* Relocate valid (non expired) keys into the array in successive
5252          * positions to remove holes created by the keys that were present
5253          * in the first lookup but are now expired after the second lookup. */
5254         kv[non_expired++] = kv[j];
5255 
5256         serverAssertWithInfo(c,NULL,
5257             rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5258 
5259         if (server.cluster_enabled)
5260             serverAssertWithInfo(c,NULL,
5261                 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5262         else
5263             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5264         serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5265         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5266                 sdslen(kv[j]->ptr)));
5267         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5268 
5269         /* Emit the payload argument, that is the serialized object using
5270          * the DUMP format. */
5271         createDumpPayload(&payload,ov[j],kv[j]);
5272         serverAssertWithInfo(c,NULL,
5273             rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5274                                sdslen(payload.io.buffer.ptr)));
5275         sdsfree(payload.io.buffer.ptr);
5276 
5277         /* Add the REPLACE option to the RESTORE command if it was specified
5278          * as a MIGRATE option. */
5279         if (replace)
5280             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5281     }
5282 
5283     /* Fix the actual number of keys we are migrating. */
5284     num_keys = non_expired;
5285 
5286     /* Transfer the query to the other node in 64K chunks. */
5287     errno = 0;
5288     {
5289         sds buf = cmd.io.buffer.ptr;
5290         size_t pos = 0, towrite;
5291         int nwritten = 0;
5292 
5293         while ((towrite = sdslen(buf)-pos) > 0) {
5294             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5295             nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
5296             if (nwritten != (signed)towrite) {
5297                 write_error = 1;
5298                 goto socket_err;
5299             }
5300             pos += nwritten;
5301         }
5302     }
5303 
5304     char buf0[1024]; /* Auth reply. */
5305     char buf1[1024]; /* Select reply. */
5306     char buf2[1024]; /* Restore reply. */
5307 
5308     /* Read the AUTH reply if needed. */
5309     if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
5310         goto socket_err;
5311 
5312     /* Read the SELECT reply if needed. */
5313     if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
5314         goto socket_err;
5315 
5316     /* Read the RESTORE replies. */
5317     int error_from_target = 0;
5318     int socket_error = 0;
5319     int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5320 
5321     /* Allocate the new argument vector that will replace the current command,
5322      * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5323      * We allocate num_keys+1 because the additional argument is for "DEL"
5324      * command name itself. */
5325     if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5326 
5327     for (j = 0; j < num_keys; j++) {
5328         if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
5329             socket_error = 1;
5330             break;
5331         }
5332         if ((password && buf0[0] == '-') ||
5333             (select && buf1[0] == '-') ||
5334             buf2[0] == '-')
5335         {
5336             /* On error assume that last_dbid is no longer valid. */
5337             if (!error_from_target) {
5338                 cs->last_dbid = -1;
5339                 char *errbuf;
5340                 if (password && buf0[0] == '-') errbuf = buf0;
5341                 else if (select && buf1[0] == '-') errbuf = buf1;
5342                 else errbuf = buf2;
5343 
5344                 error_from_target = 1;
5345                 addReplyErrorFormat(c,"Target instance replied with error: %s",
5346                     errbuf+1);
5347             }
5348         } else {
5349             if (!copy) {
5350                 /* No COPY option: remove the local key, signal the change. */
5351                 dbDelete(c->db,kv[j]);
5352                 signalModifiedKey(c->db,kv[j]);
5353                 server.dirty++;
5354 
5355                 /* Populate the argument vector to replace the old one. */
5356                 newargv[del_idx++] = kv[j];
5357                 incrRefCount(kv[j]);
5358             }
5359         }
5360     }
5361 
5362     /* On socket error, if we want to retry, do it now before rewriting the
5363      * command vector. We only retry if we are sure nothing was processed
5364      * and we failed to read the first reply (j == 0 test). */
5365     if (!error_from_target && socket_error && j == 0 && may_retry &&
5366         errno != ETIMEDOUT)
5367     {
5368         goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5369     }
5370 
5371     /* On socket errors, close the migration socket now that we still have
5372      * the original host/port in the ARGV. Later the original command may be
5373      * rewritten to DEL and will be too later. */
5374     if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5375 
5376     if (!copy) {
5377         /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5378          * this only for the keys for which we received an acknowledgement
5379          * from the receiving Redis server, by using the del_idx index. */
5380         if (del_idx > 1) {
5381             newargv[0] = createStringObject("DEL",3);
5382             /* Note that the following call takes ownership of newargv. */
5383             replaceClientCommandVector(c,del_idx,newargv);
5384             argv_rewritten = 1;
5385         } else {
5386             /* No key transfer acknowledged, no need to rewrite as DEL. */
5387             zfree(newargv);
5388         }
5389         newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5390     }
5391 
5392     /* If we are here and a socket error happened, we don't want to retry.
5393      * Just signal the problem to the client, but only do it if we did not
5394      * already queue a different error reported by the destination server. */
5395     if (!error_from_target && socket_error) {
5396         may_retry = 0;
5397         goto socket_err;
5398     }
5399 
5400     if (!error_from_target) {
5401         /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5402          * avoid SELECT the next time if the target DB is the same. Reply +OK.
5403          *
5404          * Note: If we reached this point, even if socket_error is true
5405          * still the SELECT command succeeded (otherwise the code jumps to
5406          * socket_err label. */
5407         cs->last_dbid = dbid;
5408         addReply(c,shared.ok);
5409     } else {
5410         /* On error we already sent it in the for loop above, and set
5411          * the currently selected socket to -1 to force SELECT the next time. */
5412     }
5413 
5414     sdsfree(cmd.io.buffer.ptr);
5415     zfree(ov); zfree(kv); zfree(newargv);
5416     return;
5417 
5418 /* On socket errors we try to close the cached socket and try again.
5419  * It is very common for the cached socket to get closed, if just reopening
5420  * it works it's a shame to notify the error to the caller. */
5421 socket_err:
5422     /* Cleanup we want to perform in both the retry and no retry case.
5423      * Note: Closing the migrate socket will also force SELECT next time. */
5424     sdsfree(cmd.io.buffer.ptr);
5425 
5426     /* If the command was rewritten as DEL and there was a socket error,
5427      * we already closed the socket earlier. While migrateCloseSocket()
5428      * is idempotent, the host/port arguments are now gone, so don't do it
5429      * again. */
5430     if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5431     zfree(newargv);
5432     newargv = NULL; /* This will get reallocated on retry. */
5433 
5434     /* Retry only if it's not a timeout and we never attempted a retry
5435      * (or the code jumping here did not set may_retry to zero). */
5436     if (errno != ETIMEDOUT && may_retry) {
5437         may_retry = 0;
5438         goto try_again;
5439     }
5440 
5441     /* Cleanup we want to do if no retry is attempted. */
5442     zfree(ov); zfree(kv);
5443     addReplySds(c,
5444         sdscatprintf(sdsempty(),
5445             "-IOERR error or timeout %s to target instance\r\n",
5446             write_error ? "writing" : "reading"));
5447     return;
5448 }
5449 
5450 /* -----------------------------------------------------------------------------
5451  * Cluster functions related to serving / redirecting clients
5452  * -------------------------------------------------------------------------- */
5453 
5454 /* The ASKING command is required after a -ASK redirection.
5455  * The client should issue ASKING before to actually send the command to
5456  * the target instance. See the Redis Cluster specification for more
5457  * information. */
askingCommand(client * c)5458 void askingCommand(client *c) {
5459     if (server.cluster_enabled == 0) {
5460         addReplyError(c,"This instance has cluster support disabled");
5461         return;
5462     }
5463     c->flags |= CLIENT_ASKING;
5464     addReply(c,shared.ok);
5465 }
5466 
5467 /* The READONLY command is used by clients to enter the read-only mode.
5468  * In this mode slaves will not redirect clients as long as clients access
5469  * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5470 void readonlyCommand(client *c) {
5471     if (server.cluster_enabled == 0) {
5472         addReplyError(c,"This instance has cluster support disabled");
5473         return;
5474     }
5475     c->flags |= CLIENT_READONLY;
5476     addReply(c,shared.ok);
5477 }
5478 
5479 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5480 void readwriteCommand(client *c) {
5481     c->flags &= ~CLIENT_READONLY;
5482     addReply(c,shared.ok);
5483 }
5484 
5485 /* Return the pointer to the cluster node that is able to serve the command.
5486  * For the function to succeed the command should only target either:
5487  *
5488  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5489  * 2) Multiple keys in the same hash slot, while the slot is stable (no
5490  *    resharding in progress).
5491  *
5492  * On success the function returns the node that is able to serve the request.
5493  * If the node is not 'myself' a redirection must be perfomed. The kind of
5494  * redirection is specified setting the integer passed by reference
5495  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5496  * CLUSTER_REDIR_MOVED.
5497  *
5498  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5499  *
5500  * If the command fails NULL is returned, and the reason of the failure is
5501  * provided via 'error_code', which will be set to:
5502  *
5503  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5504  * don't belong to the same hash slot.
5505  *
5506  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5507  * belonging to the same slot, but the slot is not stable (in migration or
5508  * importing state, likely because a resharding is in progress).
5509  *
5510  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5511  * not bound to any node. In this case the cluster global state should be
5512  * already "down" but it is fragile to rely on the update of the global state,
5513  * so we also handle it here.
5514  *
5515  * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
5516  * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5517 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5518     clusterNode *n = NULL;
5519     robj *firstkey = NULL;
5520     int multiple_keys = 0;
5521     multiState *ms, _ms;
5522     multiCmd mc;
5523     int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5524 
5525     /* Allow any key to be set if a module disabled cluster redirections. */
5526     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5527         return myself;
5528 
5529     /* Set error code optimistically for the base case. */
5530     if (error_code) *error_code = CLUSTER_REDIR_NONE;
5531 
5532     /* Modules can turn off Redis Cluster redirection: this is useful
5533      * when writing a module that implements a completely different
5534      * distributed system. */
5535 
5536     /* We handle all the cases as if they were EXEC commands, so we have
5537      * a common code path for everything */
5538     if (cmd->proc == execCommand) {
5539         /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5540          * error. */
5541         if (!(c->flags & CLIENT_MULTI)) return myself;
5542         ms = &c->mstate;
5543     } else {
5544         /* In order to have a single codepath create a fake Multi State
5545          * structure if the client is not in MULTI/EXEC state, this way
5546          * we have a single codepath below. */
5547         ms = &_ms;
5548         _ms.commands = &mc;
5549         _ms.count = 1;
5550         mc.argv = argv;
5551         mc.argc = argc;
5552         mc.cmd = cmd;
5553     }
5554 
5555     /* Check that all the keys are in the same hash slot, and obtain this
5556      * slot and the node associated. */
5557     for (i = 0; i < ms->count; i++) {
5558         struct redisCommand *mcmd;
5559         robj **margv;
5560         int margc, *keyindex, numkeys, j;
5561 
5562         mcmd = ms->commands[i].cmd;
5563         margc = ms->commands[i].argc;
5564         margv = ms->commands[i].argv;
5565 
5566         keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5567         for (j = 0; j < numkeys; j++) {
5568             robj *thiskey = margv[keyindex[j]];
5569             int thisslot = keyHashSlot((char*)thiskey->ptr,
5570                                        sdslen(thiskey->ptr));
5571 
5572             if (firstkey == NULL) {
5573                 /* This is the first key we see. Check what is the slot
5574                  * and node. */
5575                 firstkey = thiskey;
5576                 slot = thisslot;
5577                 n = server.cluster->slots[slot];
5578 
5579                 /* Error: If a slot is not served, we are in "cluster down"
5580                  * state. However the state is yet to be updated, so this was
5581                  * not trapped earlier in processCommand(). Report the same
5582                  * error to the client. */
5583                 if (n == NULL) {
5584                     getKeysFreeResult(keyindex);
5585                     if (error_code)
5586                         *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5587                     return NULL;
5588                 }
5589 
5590                 /* If we are migrating or importing this slot, we need to check
5591                  * if we have all the keys in the request (the only way we
5592                  * can safely serve the request, otherwise we return a TRYAGAIN
5593                  * error). To do so we set the importing/migrating state and
5594                  * increment a counter for every missing key. */
5595                 if (n == myself &&
5596                     server.cluster->migrating_slots_to[slot] != NULL)
5597                 {
5598                     migrating_slot = 1;
5599                 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5600                     importing_slot = 1;
5601                 }
5602             } else {
5603                 /* If it is not the first key, make sure it is exactly
5604                  * the same key as the first we saw. */
5605                 if (!equalStringObjects(firstkey,thiskey)) {
5606                     if (slot != thisslot) {
5607                         /* Error: multiple keys from different slots. */
5608                         getKeysFreeResult(keyindex);
5609                         if (error_code)
5610                             *error_code = CLUSTER_REDIR_CROSS_SLOT;
5611                         return NULL;
5612                     } else {
5613                         /* Flag this request as one with multiple different
5614                          * keys. */
5615                         multiple_keys = 1;
5616                     }
5617                 }
5618             }
5619 
5620             /* Migarting / Improrting slot? Count keys we don't have. */
5621             if ((migrating_slot || importing_slot) &&
5622                 lookupKeyRead(&server.db[0],thiskey) == NULL)
5623             {
5624                 missing_keys++;
5625             }
5626         }
5627         getKeysFreeResult(keyindex);
5628     }
5629 
5630     /* No key at all in command? then we can serve the request
5631      * without redirections or errors in all the cases. */
5632     if (n == NULL) return myself;
5633 
5634     /* Cluster is globally down but we got keys? We can't serve the request. */
5635     if (server.cluster->state != CLUSTER_OK) {
5636         if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5637         return NULL;
5638     }
5639 
5640     /* Return the hashslot by reference. */
5641     if (hashslot) *hashslot = slot;
5642 
5643     /* MIGRATE always works in the context of the local node if the slot
5644      * is open (migrating or importing state). We need to be able to freely
5645      * move keys among instances in this case. */
5646     if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5647         return myself;
5648 
5649     /* If we don't have all the keys and we are migrating the slot, send
5650      * an ASK redirection. */
5651     if (migrating_slot && missing_keys) {
5652         if (error_code) *error_code = CLUSTER_REDIR_ASK;
5653         return server.cluster->migrating_slots_to[slot];
5654     }
5655 
5656     /* If we are receiving the slot, and the client correctly flagged the
5657      * request as "ASKING", we can serve the request. However if the request
5658      * involves multiple keys and we don't have them all, the only option is
5659      * to send a TRYAGAIN error. */
5660     if (importing_slot &&
5661         (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5662     {
5663         if (multiple_keys && missing_keys) {
5664             if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5665             return NULL;
5666         } else {
5667             return myself;
5668         }
5669     }
5670 
5671     /* Handle the read-only client case reading from a slave: if this
5672      * node is a slave and the request is about an hash slot our master
5673      * is serving, we can reply without redirection. */
5674     if (c->flags & CLIENT_READONLY &&
5675         (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
5676          cmd->proc == evalShaCommand) &&
5677         nodeIsSlave(myself) &&
5678         myself->slaveof == n)
5679     {
5680         return myself;
5681     }
5682 
5683     /* Base case: just return the right node. However if this node is not
5684      * myself, set error_code to MOVED since we need to issue a rediretion. */
5685     if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5686     return n;
5687 }
5688 
5689 /* Send the client the right redirection code, according to error_code
5690  * that should be set to one of CLUSTER_REDIR_* macros.
5691  *
5692  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5693  * are used, then the node 'n' should not be NULL, but should be the
5694  * node we want to mention in the redirection. Moreover hashslot should
5695  * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5696 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5697     if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5698         addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5699     } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5700         /* The request spawns multiple keys in the same slot,
5701          * but the slot is not "stable" currently as there is
5702          * a migration or import in progress. */
5703         addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5704     } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5705         addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5706     } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5707         addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5708     } else if (error_code == CLUSTER_REDIR_MOVED ||
5709                error_code == CLUSTER_REDIR_ASK)
5710     {
5711         addReplySds(c,sdscatprintf(sdsempty(),
5712             "-%s %d %s:%d\r\n",
5713             (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5714             hashslot,n->ip,n->port));
5715     } else {
5716         serverPanic("getNodeByQuery() unknown error.");
5717     }
5718 }
5719 
5720 /* This function is called by the function processing clients incrementally
5721  * to detect timeouts, in order to handle the following case:
5722  *
5723  * 1) A client blocks with BLPOP or similar blocking operation.
5724  * 2) The master migrates the hash slot elsewhere or turns into a slave.
5725  * 3) The client may remain blocked forever (or up to the max timeout time)
5726  *    waiting for a key change that will never happen.
5727  *
5728  * If the client is found to be blocked into an hash slot this node no
5729  * longer handles, the client is sent a redirection error, and the function
5730  * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5731 int clusterRedirectBlockedClientIfNeeded(client *c) {
5732     if (c->flags & CLIENT_BLOCKED &&
5733         (c->btype == BLOCKED_LIST ||
5734          c->btype == BLOCKED_ZSET ||
5735          c->btype == BLOCKED_STREAM))
5736     {
5737         dictEntry *de;
5738         dictIterator *di;
5739 
5740         /* If the cluster is down, unblock the client with the right error. */
5741         if (server.cluster->state == CLUSTER_FAIL) {
5742             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5743             return 1;
5744         }
5745 
5746         /* All keys must belong to the same slot, so check first key only. */
5747         di = dictGetIterator(c->bpop.keys);
5748         if ((de = dictNext(di)) != NULL) {
5749             robj *key = dictGetKey(de);
5750             int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5751             clusterNode *node = server.cluster->slots[slot];
5752 
5753             /* if the client is read-only and attempting to access key that our
5754              * replica can handle, allow it. */
5755             if ((c->flags & CLIENT_READONLY) &&
5756                 (c->lastcmd->flags & CMD_READONLY) &&
5757                 nodeIsSlave(myself) && myself->slaveof == node)
5758             {
5759                 node = myself;
5760             }
5761 
5762             /* We send an error and unblock the client if:
5763              * 1) The slot is unassigned, emitting a cluster down error.
5764              * 2) The slot is not handled by this node, nor being imported. */
5765             if (node != myself &&
5766                 server.cluster->importing_slots_from[slot] == NULL)
5767             {
5768                 if (node == NULL) {
5769                     clusterRedirectClient(c,NULL,0,
5770                         CLUSTER_REDIR_DOWN_UNBOUND);
5771                 } else {
5772                     clusterRedirectClient(c,node,slot,
5773                         CLUSTER_REDIR_MOVED);
5774                 }
5775                 dictReleaseIterator(di);
5776                 return 1;
5777             }
5778         }
5779         dictReleaseIterator(di);
5780     }
5781     return 0;
5782 }
5783