1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43 
44 /* A global reference to myself is handy to make code more clear.
45  * Myself always points to server.cluster->myself, that is, the clusterNode
46  * that represents this node. */
47 clusterNode *myself = NULL;
48 
49 clusterNode *createClusterNode(char *nodename, int flags);
50 void clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(connection *conn);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter, int use_pport);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79 const char *clusterGetMessageTypeString(int type);
80 unsigned int countKeysInSlot(unsigned int hashslot);
81 unsigned int delKeysInSlot(unsigned int hashslot);
82 
83 /* Links to the next and previous entries for keys in the same slot are stored
84  * in the dict entry metadata. See Slot to Key API below. */
85 #define dictEntryNextInSlot(de) \
86     (((clusterDictEntryMetadata *)dictMetadata(de))->next)
87 #define dictEntryPrevInSlot(de) \
88     (((clusterDictEntryMetadata *)dictMetadata(de))->prev)
89 
90 #define RCVBUF_INIT_LEN 1024
91 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
92 
93 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
94  * clusterNode structures. */
95 dictType clusterNodesDictType = {
96         dictSdsHash,                /* hash function */
97         NULL,                       /* key dup */
98         NULL,                       /* val dup */
99         dictSdsKeyCompare,          /* key compare */
100         dictSdsDestructor,          /* key destructor */
101         NULL,                       /* val destructor */
102         NULL                        /* allow to expand */
103 };
104 
105 /* Cluster re-addition blacklist. This maps node IDs to the time
106  * we can re-add this node. The goal is to avoid readding a removed
107  * node for some time. */
108 dictType clusterNodesBlackListDictType = {
109         dictSdsCaseHash,            /* hash function */
110         NULL,                       /* key dup */
111         NULL,                       /* val dup */
112         dictSdsKeyCaseCompare,      /* key compare */
113         dictSdsDestructor,          /* key destructor */
114         NULL,                       /* val destructor */
115         NULL                        /* allow to expand */
116 };
117 
118 /* -----------------------------------------------------------------------------
119  * Initialization
120  * -------------------------------------------------------------------------- */
121 
122 /* Load the cluster config from 'filename'.
123  *
124  * If the file does not exist or is zero-length (this may happen because
125  * when we lock the nodes.conf file, we create a zero-length one for the
126  * sake of locking if it does not already exist), C_ERR is returned.
127  * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)128 int clusterLoadConfig(char *filename) {
129     FILE *fp = fopen(filename,"r");
130     struct stat sb;
131     char *line;
132     int maxline, j;
133 
134     if (fp == NULL) {
135         if (errno == ENOENT) {
136             return C_ERR;
137         } else {
138             serverLog(LL_WARNING,
139                 "Loading the cluster node config from %s: %s",
140                 filename, strerror(errno));
141             exit(1);
142         }
143     }
144 
145     if (redis_fstat(fileno(fp),&sb) == -1) {
146         serverLog(LL_WARNING,
147             "Unable to obtain the cluster node config file stat %s: %s",
148             filename, strerror(errno));
149         exit(1);
150     }
151     /* Check if the file is zero-length: if so return C_ERR to signal
152      * we have to write the config. */
153     if (sb.st_size == 0) {
154         fclose(fp);
155         return C_ERR;
156     }
157 
158     /* Parse the file. Note that single lines of the cluster config file can
159      * be really long as they include all the hash slots of the node.
160      * This means in the worst possible case, half of the Redis slots will be
161      * present in a single line, possibly in importing or migrating state, so
162      * together with the node ID of the sender/receiver.
163      *
164      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
165     maxline = 1024+CLUSTER_SLOTS*128;
166     line = zmalloc(maxline);
167     while(fgets(line,maxline,fp) != NULL) {
168         int argc;
169         sds *argv;
170         clusterNode *n, *master;
171         char *p, *s;
172 
173         /* Skip blank lines, they can be created either by users manually
174          * editing nodes.conf or by the config writing process if stopped
175          * before the truncate() call. */
176         if (line[0] == '\n' || line[0] == '\0') continue;
177 
178         /* Split the line into arguments for processing. */
179         argv = sdssplitargs(line,&argc);
180         if (argv == NULL) goto fmterr;
181 
182         /* Handle the special "vars" line. Don't pretend it is the last
183          * line even if it actually is when generated by Redis. */
184         if (strcasecmp(argv[0],"vars") == 0) {
185             if (!(argc % 2)) goto fmterr;
186             for (j = 1; j < argc; j += 2) {
187                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
188                     server.cluster->currentEpoch =
189                             strtoull(argv[j+1],NULL,10);
190                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
191                     server.cluster->lastVoteEpoch =
192                             strtoull(argv[j+1],NULL,10);
193                 } else {
194                     serverLog(LL_WARNING,
195                         "Skipping unknown cluster config variable '%s'",
196                         argv[j]);
197                 }
198             }
199             sdsfreesplitres(argv,argc);
200             continue;
201         }
202 
203         /* Regular config lines have at least eight fields */
204         if (argc < 8) {
205             sdsfreesplitres(argv,argc);
206             goto fmterr;
207         }
208 
209         /* Create this node if it does not exist */
210         n = clusterLookupNode(argv[0]);
211         if (!n) {
212             n = createClusterNode(argv[0],0);
213             clusterAddNode(n);
214         }
215         /* Address and port */
216         if ((p = strrchr(argv[1],':')) == NULL) {
217             sdsfreesplitres(argv,argc);
218             goto fmterr;
219         }
220         *p = '\0';
221         memcpy(n->ip,argv[1],strlen(argv[1])+1);
222         char *port = p+1;
223         char *busp = strchr(port,'@');
224         if (busp) {
225             *busp = '\0';
226             busp++;
227         }
228         n->port = atoi(port);
229         /* In older versions of nodes.conf the "@busport" part is missing.
230          * In this case we set it to the default offset of 10000 from the
231          * base port. */
232         n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
233 
234         /* The plaintext port for client in a TLS cluster (n->pport) is not
235          * stored in nodes.conf. It is received later over the bus protocol. */
236 
237         /* Parse flags */
238         p = s = argv[2];
239         while(p) {
240             p = strchr(s,',');
241             if (p) *p = '\0';
242             if (!strcasecmp(s,"myself")) {
243                 serverAssert(server.cluster->myself == NULL);
244                 myself = server.cluster->myself = n;
245                 n->flags |= CLUSTER_NODE_MYSELF;
246             } else if (!strcasecmp(s,"master")) {
247                 n->flags |= CLUSTER_NODE_MASTER;
248             } else if (!strcasecmp(s,"slave")) {
249                 n->flags |= CLUSTER_NODE_SLAVE;
250             } else if (!strcasecmp(s,"fail?")) {
251                 n->flags |= CLUSTER_NODE_PFAIL;
252             } else if (!strcasecmp(s,"fail")) {
253                 n->flags |= CLUSTER_NODE_FAIL;
254                 n->fail_time = mstime();
255             } else if (!strcasecmp(s,"handshake")) {
256                 n->flags |= CLUSTER_NODE_HANDSHAKE;
257             } else if (!strcasecmp(s,"noaddr")) {
258                 n->flags |= CLUSTER_NODE_NOADDR;
259             } else if (!strcasecmp(s,"nofailover")) {
260                 n->flags |= CLUSTER_NODE_NOFAILOVER;
261             } else if (!strcasecmp(s,"noflags")) {
262                 /* nothing to do */
263             } else {
264                 serverPanic("Unknown flag in redis cluster config file");
265             }
266             if (p) s = p+1;
267         }
268 
269         /* Get master if any. Set the master and populate master's
270          * slave list. */
271         if (argv[3][0] != '-') {
272             master = clusterLookupNode(argv[3]);
273             if (!master) {
274                 master = createClusterNode(argv[3],0);
275                 clusterAddNode(master);
276             }
277             n->slaveof = master;
278             clusterNodeAddSlave(master,n);
279         }
280 
281         /* Set ping sent / pong received timestamps */
282         if (atoi(argv[4])) n->ping_sent = mstime();
283         if (atoi(argv[5])) n->pong_received = mstime();
284 
285         /* Set configEpoch for this node. */
286         n->configEpoch = strtoull(argv[6],NULL,10);
287 
288         /* Populate hash slots served by this instance. */
289         for (j = 8; j < argc; j++) {
290             int start, stop;
291 
292             if (argv[j][0] == '[') {
293                 /* Here we handle migrating / importing slots */
294                 int slot;
295                 char direction;
296                 clusterNode *cn;
297 
298                 p = strchr(argv[j],'-');
299                 serverAssert(p != NULL);
300                 *p = '\0';
301                 direction = p[1]; /* Either '>' or '<' */
302                 slot = atoi(argv[j]+1);
303                 if (slot < 0 || slot >= CLUSTER_SLOTS) {
304                     sdsfreesplitres(argv,argc);
305                     goto fmterr;
306                 }
307                 p += 3;
308                 cn = clusterLookupNode(p);
309                 if (!cn) {
310                     cn = createClusterNode(p,0);
311                     clusterAddNode(cn);
312                 }
313                 if (direction == '>') {
314                     server.cluster->migrating_slots_to[slot] = cn;
315                 } else {
316                     server.cluster->importing_slots_from[slot] = cn;
317                 }
318                 continue;
319             } else if ((p = strchr(argv[j],'-')) != NULL) {
320                 *p = '\0';
321                 start = atoi(argv[j]);
322                 stop = atoi(p+1);
323             } else {
324                 start = stop = atoi(argv[j]);
325             }
326             if (start < 0 || start >= CLUSTER_SLOTS ||
327                 stop < 0 || stop >= CLUSTER_SLOTS)
328             {
329                 sdsfreesplitres(argv,argc);
330                 goto fmterr;
331             }
332             while(start <= stop) clusterAddSlot(n, start++);
333         }
334 
335         sdsfreesplitres(argv,argc);
336     }
337     /* Config sanity check */
338     if (server.cluster->myself == NULL) goto fmterr;
339 
340     zfree(line);
341     fclose(fp);
342 
343     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
344 
345     /* Something that should never happen: currentEpoch smaller than
346      * the max epoch found in the nodes configuration. However we handle this
347      * as some form of protection against manual editing of critical files. */
348     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
349         server.cluster->currentEpoch = clusterGetMaxEpoch();
350     }
351     return C_OK;
352 
353 fmterr:
354     serverLog(LL_WARNING,
355         "Unrecoverable error: corrupted cluster config file.");
356     zfree(line);
357     if (fp) fclose(fp);
358     exit(1);
359 }
360 
361 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
362  *
363  * This function writes the node config and returns 0, on error -1
364  * is returned.
365  *
366  * Note: we need to write the file in an atomic way from the point of view
367  * of the POSIX filesystem semantics, so that if the server is stopped
368  * or crashes during the write, we'll end with either the old file or the
369  * new one. Since we have the full payload to write available we can use
370  * a single write to write the whole file. If the pre-existing file was
371  * bigger we pad our payload with newlines that are anyway ignored and truncate
372  * the file afterward. */
clusterSaveConfig(int do_fsync)373 int clusterSaveConfig(int do_fsync) {
374     sds ci;
375     size_t content_size;
376     struct stat sb;
377     int fd;
378 
379     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
380 
381     /* Get the nodes description and concatenate our "vars" directive to
382      * save currentEpoch and lastVoteEpoch. */
383     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
384     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
385         (unsigned long long) server.cluster->currentEpoch,
386         (unsigned long long) server.cluster->lastVoteEpoch);
387     content_size = sdslen(ci);
388 
389     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
390         == -1) goto err;
391 
392     if (redis_fstat(fd,&sb) == -1) goto err;
393 
394     /* Pad the new payload if the existing file length is greater. */
395     if (sb.st_size > (off_t)content_size) {
396         ci = sdsgrowzero(ci,sb.st_size);
397         memset(ci+content_size,'\n',sb.st_size-content_size);
398     }
399 
400     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
401     if (do_fsync) {
402         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
403         if (fsync(fd) == -1) goto err;
404     }
405 
406     /* Truncate the file if needed to remove the final \n padding that
407      * is just garbage. */
408     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
409         /* ftruncate() failing is not a critical error. */
410     }
411     close(fd);
412     sdsfree(ci);
413     return 0;
414 
415 err:
416     if (fd != -1) close(fd);
417     sdsfree(ci);
418     return -1;
419 }
420 
clusterSaveConfigOrDie(int do_fsync)421 void clusterSaveConfigOrDie(int do_fsync) {
422     if (clusterSaveConfig(do_fsync) == -1) {
423         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
424         exit(1);
425     }
426 }
427 
428 /* Lock the cluster config using flock(), and leaks the file descriptor used to
429  * acquire the lock so that the file will be locked forever.
430  *
431  * This works because we always update nodes.conf with a new version
432  * in-place, reopening the file, and writing to it in place (later adjusting
433  * the length with ftruncate()).
434  *
435  * On success C_OK is returned, otherwise an error is logged and
436  * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)437 int clusterLockConfig(char *filename) {
438 /* flock() does not exist on Solaris
439  * and a fcntl-based solution won't help, as we constantly re-open that file,
440  * which will release _all_ locks anyway
441  */
442 #if !defined(__sun)
443     /* To lock it, we need to open the file in a way it is created if
444      * it does not exist, otherwise there is a race condition with other
445      * processes. */
446     int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
447     if (fd == -1) {
448         serverLog(LL_WARNING,
449             "Can't open %s in order to acquire a lock: %s",
450             filename, strerror(errno));
451         return C_ERR;
452     }
453 
454     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
455         if (errno == EWOULDBLOCK) {
456             serverLog(LL_WARNING,
457                  "Sorry, the cluster configuration file %s is already used "
458                  "by a different Redis Cluster node. Please make sure that "
459                  "different nodes use different cluster configuration "
460                  "files.", filename);
461         } else {
462             serverLog(LL_WARNING,
463                 "Impossible to lock %s: %s", filename, strerror(errno));
464         }
465         close(fd);
466         return C_ERR;
467     }
468     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
469      * lock to the file as long as the process exists.
470      *
471      * After fork, the child process will get the fd opened by the parent process,
472      * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
473      * it will be closed in the child process.
474      * If it is not closed, when the main process is killed -9, but the child process
475      * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
476      * child process, and the main process will fail to get lock, means fail to start. */
477     server.cluster_config_file_lock_fd = fd;
478 #else
479     UNUSED(filename);
480 #endif /* __sun */
481 
482     return C_OK;
483 }
484 
485 /* Derives our ports to be announced in the cluster bus. */
deriveAnnouncedPorts(int * announced_port,int * announced_pport,int * announced_cport)486 void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
487                           int *announced_cport) {
488     int port = server.tls_cluster ? server.tls_port : server.port;
489     /* Default announced ports. */
490     *announced_port = port;
491     *announced_pport = server.tls_cluster ? server.port : 0;
492     *announced_cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
493 
494     /* Config overriding announced ports. */
495     if (server.tls_cluster && server.cluster_announce_tls_port) {
496         *announced_port = server.cluster_announce_tls_port;
497         *announced_pport = server.cluster_announce_port;
498     } else if (server.cluster_announce_port) {
499         *announced_port = server.cluster_announce_port;
500     }
501     if (server.cluster_announce_bus_port) {
502         *announced_cport = server.cluster_announce_bus_port;
503     }
504 }
505 
506 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
507  * in the "myself" node based on the current configuration of the node,
508  * that may change at runtime via CONFIG SET. This function changes the
509  * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)510 void clusterUpdateMyselfFlags(void) {
511     if (!myself) return;
512     int oldflags = myself->flags;
513     int nofailover = server.cluster_slave_no_failover ?
514                      CLUSTER_NODE_NOFAILOVER : 0;
515     myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
516     myself->flags |= nofailover;
517     if (myself->flags != oldflags) {
518         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
519                              CLUSTER_TODO_UPDATE_STATE);
520     }
521 }
522 
523 
524 /* We want to take myself->ip in sync with the cluster-announce-ip option.
525 * The option can be set at runtime via CONFIG SET. */
clusterUpdateMyselfIp(void)526 void clusterUpdateMyselfIp(void) {
527     if (!myself) return;
528     static char *prev_ip = NULL;
529     char *curr_ip = server.cluster_announce_ip;
530     int changed = 0;
531 
532     if (prev_ip == NULL && curr_ip != NULL) changed = 1;
533     else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
534     else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
535 
536     if (changed) {
537         if (prev_ip) zfree(prev_ip);
538         prev_ip = curr_ip;
539 
540         if (curr_ip) {
541             /* We always take a copy of the previous IP address, by
542             * duplicating the string. This way later we can check if
543             * the address really changed. */
544             prev_ip = zstrdup(prev_ip);
545             strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
546             myself->ip[NET_IP_STR_LEN-1] = '\0';
547         } else {
548             myself->ip[0] = '\0'; /* Force autodetection. */
549         }
550     }
551 }
552 
clusterInit(void)553 void clusterInit(void) {
554     int saveconf = 0;
555 
556     server.cluster = zmalloc(sizeof(clusterState));
557     server.cluster->myself = NULL;
558     server.cluster->currentEpoch = 0;
559     server.cluster->state = CLUSTER_FAIL;
560     server.cluster->size = 1;
561     server.cluster->todo_before_sleep = 0;
562     server.cluster->nodes = dictCreate(&clusterNodesDictType);
563     server.cluster->nodes_black_list =
564         dictCreate(&clusterNodesBlackListDictType);
565     server.cluster->failover_auth_time = 0;
566     server.cluster->failover_auth_count = 0;
567     server.cluster->failover_auth_rank = 0;
568     server.cluster->failover_auth_epoch = 0;
569     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
570     server.cluster->lastVoteEpoch = 0;
571     for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
572         server.cluster->stats_bus_messages_sent[i] = 0;
573         server.cluster->stats_bus_messages_received[i] = 0;
574     }
575     server.cluster->stats_pfail_nodes = 0;
576     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
577     clusterCloseAllSlots();
578 
579     /* Lock the cluster config file to make sure every node uses
580      * its own nodes.conf. */
581     server.cluster_config_file_lock_fd = -1;
582     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
583         exit(1);
584 
585     /* Load or create a new nodes configuration. */
586     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
587         /* No configuration found. We will just use the random name provided
588          * by the createClusterNode() function. */
589         myself = server.cluster->myself =
590             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
591         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
592             myself->name);
593         clusterAddNode(myself);
594         saveconf = 1;
595     }
596     if (saveconf) clusterSaveConfigOrDie(1);
597 
598     /* We need a listening TCP port for our cluster messaging needs. */
599     server.cfd.count = 0;
600 
601     /* Port sanity check II
602      * The other handshake port check is triggered too late to stop
603      * us from trying to use a too-high cluster port number. */
604     int port = server.tls_cluster ? server.tls_port : server.port;
605     if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
606         serverLog(LL_WARNING, "Redis port number too high. "
607                    "Cluster communication port is 10,000 port "
608                    "numbers higher than your Redis port. "
609                    "Your Redis port number must be 55535 or less.");
610         exit(1);
611     }
612     if (!server.bindaddr_count) {
613         serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus.");
614         exit(1);
615     }
616     int cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
617     if (listenToPort(cport, &server.cfd) == C_ERR ) {
618         /* Note: the following log text is matched by the test suite. */
619         serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", cport);
620         exit(1);
621     }
622 
623     if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
624         serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
625     }
626 
627     /* Initialize data for the Slot to key API. */
628     slotToKeyInit(server.db);
629 
630     /* Set myself->port/cport/pport to my listening ports, we'll just need to
631      * discover the IP address via MEET messages. */
632     deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
633 
634     server.cluster->mf_end = 0;
635     server.cluster->mf_slave = NULL;
636     resetManualFailover();
637     clusterUpdateMyselfFlags();
638     clusterUpdateMyselfIp();
639 }
640 
641 /* Reset a node performing a soft or hard reset:
642  *
643  * 1) All other nodes are forgotten.
644  * 2) All the assigned / open slots are released.
645  * 3) If the node is a slave, it turns into a master.
646  * 4) Only for hard reset: a new Node ID is generated.
647  * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
648  * 6) The new configuration is saved and the cluster state updated.
649  * 7) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)650 void clusterReset(int hard) {
651     dictIterator *di;
652     dictEntry *de;
653     int j;
654 
655     /* Turn into master. */
656     if (nodeIsSlave(myself)) {
657         clusterSetNodeAsMaster(myself);
658         replicationUnsetMaster();
659         emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
660     }
661 
662     /* Close slots, reset manual failover state. */
663     clusterCloseAllSlots();
664     resetManualFailover();
665 
666     /* Unassign all the slots. */
667     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
668 
669     /* Forget all the nodes, but myself. */
670     di = dictGetSafeIterator(server.cluster->nodes);
671     while((de = dictNext(di)) != NULL) {
672         clusterNode *node = dictGetVal(de);
673 
674         if (node == myself) continue;
675         clusterDelNode(node);
676     }
677     dictReleaseIterator(di);
678 
679     /* Hard reset only: set epochs to 0, change node ID. */
680     if (hard) {
681         sds oldname;
682 
683         server.cluster->currentEpoch = 0;
684         server.cluster->lastVoteEpoch = 0;
685         myself->configEpoch = 0;
686         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
687 
688         /* To change the Node ID we need to remove the old name from the
689          * nodes table, change the ID, and re-add back with new name. */
690         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
691         dictDelete(server.cluster->nodes,oldname);
692         sdsfree(oldname);
693         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
694         clusterAddNode(myself);
695         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
696     }
697 
698     /* Make sure to persist the new config and update the state. */
699     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
700                          CLUSTER_TODO_UPDATE_STATE|
701                          CLUSTER_TODO_FSYNC_CONFIG);
702 }
703 
704 /* -----------------------------------------------------------------------------
705  * CLUSTER communication link
706  * -------------------------------------------------------------------------- */
707 
createClusterLink(clusterNode * node)708 clusterLink *createClusterLink(clusterNode *node) {
709     clusterLink *link = zmalloc(sizeof(*link));
710     link->ctime = mstime();
711     link->sndbuf = sdsempty();
712     link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
713     link->rcvbuf_len = 0;
714     link->node = node;
715     link->conn = NULL;
716     return link;
717 }
718 
719 /* Free a cluster link, but does not free the associated node of course.
720  * This function will just make sure that the original node associated
721  * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)722 void freeClusterLink(clusterLink *link) {
723     if (link->conn) {
724         connClose(link->conn);
725         link->conn = NULL;
726     }
727     sdsfree(link->sndbuf);
728     zfree(link->rcvbuf);
729     if (link->node)
730         link->node->link = NULL;
731     zfree(link);
732 }
733 
clusterConnAcceptHandler(connection * conn)734 static void clusterConnAcceptHandler(connection *conn) {
735     clusterLink *link;
736 
737     if (connGetState(conn) != CONN_STATE_CONNECTED) {
738         serverLog(LL_VERBOSE,
739                 "Error accepting cluster node connection: %s", connGetLastError(conn));
740         connClose(conn);
741         return;
742     }
743 
744     /* Create a link object we use to handle the connection.
745      * It gets passed to the readable handler when data is available.
746      * Initially the link->node pointer is set to NULL as we don't know
747      * which node is, but the right node is references once we know the
748      * node identity. */
749     link = createClusterLink(NULL);
750     link->conn = conn;
751     connSetPrivateData(conn, link);
752 
753     /* Register read handler */
754     connSetReadHandler(conn, clusterReadHandler);
755 }
756 
757 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)758 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
759     int cport, cfd;
760     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
761     char cip[NET_IP_STR_LEN];
762     UNUSED(el);
763     UNUSED(mask);
764     UNUSED(privdata);
765 
766     /* If the server is starting up, don't accept cluster connections:
767      * UPDATE messages may interact with the database content. */
768     if (server.masterhost == NULL && server.loading) return;
769 
770     while(max--) {
771         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
772         if (cfd == ANET_ERR) {
773             if (errno != EWOULDBLOCK)
774                 serverLog(LL_VERBOSE,
775                     "Error accepting cluster node: %s", server.neterr);
776             return;
777         }
778 
779         connection *conn = server.tls_cluster ?
780             connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
781 
782         /* Make sure connection is not in an error state */
783         if (connGetState(conn) != CONN_STATE_ACCEPTING) {
784             serverLog(LL_VERBOSE,
785                 "Error creating an accepting connection for cluster node: %s",
786                     connGetLastError(conn));
787             connClose(conn);
788             return;
789         }
790         connEnableTcpNoDelay(conn);
791         connKeepAlive(conn,server.cluster_node_timeout * 2);
792 
793         /* Use non-blocking I/O for cluster messages. */
794         serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
795 
796         /* Accept the connection now.  connAccept() may call our handler directly
797          * or schedule it for later depending on connection implementation.
798          */
799         if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
800             if (connGetState(conn) == CONN_STATE_ERROR)
801                 serverLog(LL_VERBOSE,
802                         "Error accepting cluster node connection: %s",
803                         connGetLastError(conn));
804             connClose(conn);
805             return;
806         }
807     }
808 }
809 
810 /* Return the approximated number of sockets we are using in order to
811  * take the cluster bus connections. */
getClusterConnectionsCount(void)812 unsigned long getClusterConnectionsCount(void) {
813     /* We decrement the number of nodes by one, since there is the
814      * "myself" node too in the list. Each node uses two file descriptors,
815      * one incoming and one outgoing, thus the multiplication by 2. */
816     return server.cluster_enabled ?
817            ((dictSize(server.cluster->nodes)-1)*2) : 0;
818 }
819 
820 /* -----------------------------------------------------------------------------
821  * Key space handling
822  * -------------------------------------------------------------------------- */
823 
824 /* We have 16384 hash slots. The hash slot of a given key is obtained
825  * as the least significant 14 bits of the crc16 of the key.
826  *
827  * However if the key contains the {...} pattern, only the part between
828  * { and } is hashed. This may be useful in the future to force certain
829  * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)830 unsigned int keyHashSlot(char *key, int keylen) {
831     int s, e; /* start-end indexes of { and } */
832 
833     for (s = 0; s < keylen; s++)
834         if (key[s] == '{') break;
835 
836     /* No '{' ? Hash the whole key. This is the base case. */
837     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
838 
839     /* '{' found? Check if we have the corresponding '}'. */
840     for (e = s+1; e < keylen; e++)
841         if (key[e] == '}') break;
842 
843     /* No '}' or nothing between {} ? Hash the whole key. */
844     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
845 
846     /* If we are here there is both a { and a } on its right. Hash
847      * what is in the middle between { and }. */
848     return crc16(key+s+1,e-s-1) & 0x3FFF;
849 }
850 
851 /* -----------------------------------------------------------------------------
852  * CLUSTER node API
853  * -------------------------------------------------------------------------- */
854 
855 /* Create a new cluster node, with the specified flags.
856  * If "nodename" is NULL this is considered a first handshake and a random
857  * node name is assigned to this node (it will be fixed later when we'll
858  * receive the first pong).
859  *
860  * The node is created and returned to the user, but it is not automatically
861  * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)862 clusterNode *createClusterNode(char *nodename, int flags) {
863     clusterNode *node = zmalloc(sizeof(*node));
864 
865     if (nodename)
866         memcpy(node->name, nodename, CLUSTER_NAMELEN);
867     else
868         getRandomHexChars(node->name, CLUSTER_NAMELEN);
869     node->ctime = mstime();
870     node->configEpoch = 0;
871     node->flags = flags;
872     memset(node->slots,0,sizeof(node->slots));
873     node->slots_info = NULL;
874     node->numslots = 0;
875     node->numslaves = 0;
876     node->slaves = NULL;
877     node->slaveof = NULL;
878     node->ping_sent = node->pong_received = 0;
879     node->data_received = 0;
880     node->fail_time = 0;
881     node->link = NULL;
882     memset(node->ip,0,sizeof(node->ip));
883     node->port = 0;
884     node->cport = 0;
885     node->pport = 0;
886     node->fail_reports = listCreate();
887     node->voted_time = 0;
888     node->orphaned_time = 0;
889     node->repl_offset_time = 0;
890     node->repl_offset = 0;
891     listSetFreeMethod(node->fail_reports,zfree);
892     return node;
893 }
894 
895 /* This function is called every time we get a failure report from a node.
896  * The side effect is to populate the fail_reports list (or to update
897  * the timestamp of an existing report).
898  *
899  * 'failing' is the node that is in failure state according to the
900  * 'sender' node.
901  *
902  * The function returns 0 if it just updates a timestamp of an existing
903  * failure report from the same sender. 1 is returned if a new failure
904  * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)905 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
906     list *l = failing->fail_reports;
907     listNode *ln;
908     listIter li;
909     clusterNodeFailReport *fr;
910 
911     /* If a failure report from the same sender already exists, just update
912      * the timestamp. */
913     listRewind(l,&li);
914     while ((ln = listNext(&li)) != NULL) {
915         fr = ln->value;
916         if (fr->node == sender) {
917             fr->time = mstime();
918             return 0;
919         }
920     }
921 
922     /* Otherwise create a new report. */
923     fr = zmalloc(sizeof(*fr));
924     fr->node = sender;
925     fr->time = mstime();
926     listAddNodeTail(l,fr);
927     return 1;
928 }
929 
930 /* Remove failure reports that are too old, where too old means reasonably
931  * older than the global node timeout. Note that anyway for a node to be
932  * flagged as FAIL we need to have a local PFAIL state that is at least
933  * older than the global node timeout, so we don't just trust the number
934  * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)935 void clusterNodeCleanupFailureReports(clusterNode *node) {
936     list *l = node->fail_reports;
937     listNode *ln;
938     listIter li;
939     clusterNodeFailReport *fr;
940     mstime_t maxtime = server.cluster_node_timeout *
941                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
942     mstime_t now = mstime();
943 
944     listRewind(l,&li);
945     while ((ln = listNext(&li)) != NULL) {
946         fr = ln->value;
947         if (now - fr->time > maxtime) listDelNode(l,ln);
948     }
949 }
950 
951 /* Remove the failing report for 'node' if it was previously considered
952  * failing by 'sender'. This function is called when a node informs us via
953  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
954  *
955  * Note that this function is called relatively often as it gets called even
956  * when there are no nodes failing, and is O(N), however when the cluster is
957  * fine the failure reports list is empty so the function runs in constant
958  * time.
959  *
960  * The function returns 1 if the failure report was found and removed.
961  * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)962 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
963     list *l = node->fail_reports;
964     listNode *ln;
965     listIter li;
966     clusterNodeFailReport *fr;
967 
968     /* Search for a failure report from this sender. */
969     listRewind(l,&li);
970     while ((ln = listNext(&li)) != NULL) {
971         fr = ln->value;
972         if (fr->node == sender) break;
973     }
974     if (!ln) return 0; /* No failure report from this sender. */
975 
976     /* Remove the failure report. */
977     listDelNode(l,ln);
978     clusterNodeCleanupFailureReports(node);
979     return 1;
980 }
981 
982 /* Return the number of external nodes that believe 'node' is failing,
983  * not including this node, that may have a PFAIL or FAIL state for this
984  * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)985 int clusterNodeFailureReportsCount(clusterNode *node) {
986     clusterNodeCleanupFailureReports(node);
987     return listLength(node->fail_reports);
988 }
989 
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)990 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
991     int j;
992 
993     for (j = 0; j < master->numslaves; j++) {
994         if (master->slaves[j] == slave) {
995             if ((j+1) < master->numslaves) {
996                 int remaining_slaves = (master->numslaves - j) - 1;
997                 memmove(master->slaves+j,master->slaves+(j+1),
998                         (sizeof(*master->slaves) * remaining_slaves));
999             }
1000             master->numslaves--;
1001             if (master->numslaves == 0)
1002                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
1003             return C_OK;
1004         }
1005     }
1006     return C_ERR;
1007 }
1008 
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)1009 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
1010     int j;
1011 
1012     /* If it's already a slave, don't add it again. */
1013     for (j = 0; j < master->numslaves; j++)
1014         if (master->slaves[j] == slave) return C_ERR;
1015     master->slaves = zrealloc(master->slaves,
1016         sizeof(clusterNode*)*(master->numslaves+1));
1017     master->slaves[master->numslaves] = slave;
1018     master->numslaves++;
1019     master->flags |= CLUSTER_NODE_MIGRATE_TO;
1020     return C_OK;
1021 }
1022 
clusterCountNonFailingSlaves(clusterNode * n)1023 int clusterCountNonFailingSlaves(clusterNode *n) {
1024     int j, okslaves = 0;
1025 
1026     for (j = 0; j < n->numslaves; j++)
1027         if (!nodeFailed(n->slaves[j])) okslaves++;
1028     return okslaves;
1029 }
1030 
1031 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)1032 void freeClusterNode(clusterNode *n) {
1033     sds nodename;
1034     int j;
1035 
1036     /* If the node has associated slaves, we have to set
1037      * all the slaves->slaveof fields to NULL (unknown). */
1038     for (j = 0; j < n->numslaves; j++)
1039         n->slaves[j]->slaveof = NULL;
1040 
1041     /* Remove this node from the list of slaves of its master. */
1042     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
1043 
1044     /* Unlink from the set of nodes. */
1045     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
1046     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
1047     sdsfree(nodename);
1048 
1049     /* Release link and associated data structures. */
1050     if (n->link) freeClusterLink(n->link);
1051     listRelease(n->fail_reports);
1052     zfree(n->slaves);
1053     zfree(n);
1054 }
1055 
1056 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)1057 void clusterAddNode(clusterNode *node) {
1058     int retval;
1059 
1060     retval = dictAdd(server.cluster->nodes,
1061             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
1062     serverAssert(retval == DICT_OK);
1063 }
1064 
1065 /* Remove a node from the cluster. The function performs the high level
1066  * cleanup, calling freeClusterNode() for the low level cleanup.
1067  * Here we do the following:
1068  *
1069  * 1) Mark all the slots handled by it as unassigned.
1070  * 2) Remove all the failure reports sent by this node and referenced by
1071  *    other nodes.
1072  * 3) Free the node with freeClusterNode() that will in turn remove it
1073  *    from the hash table and from the list of slaves of its master, if
1074  *    it is a slave node.
1075  */
clusterDelNode(clusterNode * delnode)1076 void clusterDelNode(clusterNode *delnode) {
1077     int j;
1078     dictIterator *di;
1079     dictEntry *de;
1080 
1081     /* 1) Mark slots as unassigned. */
1082     for (j = 0; j < CLUSTER_SLOTS; j++) {
1083         if (server.cluster->importing_slots_from[j] == delnode)
1084             server.cluster->importing_slots_from[j] = NULL;
1085         if (server.cluster->migrating_slots_to[j] == delnode)
1086             server.cluster->migrating_slots_to[j] = NULL;
1087         if (server.cluster->slots[j] == delnode)
1088             clusterDelSlot(j);
1089     }
1090 
1091     /* 2) Remove failure reports. */
1092     di = dictGetSafeIterator(server.cluster->nodes);
1093     while((de = dictNext(di)) != NULL) {
1094         clusterNode *node = dictGetVal(de);
1095 
1096         if (node == delnode) continue;
1097         clusterNodeDelFailureReport(node,delnode);
1098     }
1099     dictReleaseIterator(di);
1100 
1101     /* 3) Free the node, unlinking it from the cluster. */
1102     freeClusterNode(delnode);
1103 }
1104 
1105 /* Node lookup by name */
clusterLookupNode(const char * name)1106 clusterNode *clusterLookupNode(const char *name) {
1107     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
1108     dictEntry *de;
1109 
1110     de = dictFind(server.cluster->nodes,s);
1111     sdsfree(s);
1112     if (de == NULL) return NULL;
1113     return dictGetVal(de);
1114 }
1115 
1116 /* This is only used after the handshake. When we connect a given IP/PORT
1117  * as a result of CLUSTER MEET we don't have the node name yet, so we
1118  * pick a random one, and will fix it when we receive the PONG request using
1119  * this function. */
clusterRenameNode(clusterNode * node,char * newname)1120 void clusterRenameNode(clusterNode *node, char *newname) {
1121     int retval;
1122     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1123 
1124     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1125         node->name, newname);
1126     retval = dictDelete(server.cluster->nodes, s);
1127     sdsfree(s);
1128     serverAssert(retval == DICT_OK);
1129     memcpy(node->name, newname, CLUSTER_NAMELEN);
1130     clusterAddNode(node);
1131 }
1132 
1133 /* -----------------------------------------------------------------------------
1134  * CLUSTER config epoch handling
1135  * -------------------------------------------------------------------------- */
1136 
1137 /* Return the greatest configEpoch found in the cluster, or the current
1138  * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)1139 uint64_t clusterGetMaxEpoch(void) {
1140     uint64_t max = 0;
1141     dictIterator *di;
1142     dictEntry *de;
1143 
1144     di = dictGetSafeIterator(server.cluster->nodes);
1145     while((de = dictNext(di)) != NULL) {
1146         clusterNode *node = dictGetVal(de);
1147         if (node->configEpoch > max) max = node->configEpoch;
1148     }
1149     dictReleaseIterator(di);
1150     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1151     return max;
1152 }
1153 
1154 /* If this node epoch is zero or is not already the greatest across the
1155  * cluster (from the POV of the local configuration), this function will:
1156  *
1157  * 1) Generate a new config epoch, incrementing the current epoch.
1158  * 2) Assign the new epoch to this node, WITHOUT any consensus.
1159  * 3) Persist the configuration on disk before sending packets with the
1160  *    new configuration.
1161  *
1162  * If the new config epoch is generated and assigned, C_OK is returned,
1163  * otherwise C_ERR is returned (since the node has already the greatest
1164  * configuration around) and no operation is performed.
1165  *
1166  * Important note: this function violates the principle that config epochs
1167  * should be generated with consensus and should be unique across the cluster.
1168  * However Redis Cluster uses this auto-generated new config epochs in two
1169  * cases:
1170  *
1171  * 1) When slots are closed after importing. Otherwise resharding would be
1172  *    too expensive.
1173  * 2) When CLUSTER FAILOVER is called with options that force a slave to
1174  *    failover its master even if there is not master majority able to
1175  *    create a new configuration epoch.
1176  *
1177  * Redis Cluster will not explode using this function, even in the case of
1178  * a collision between this node and another node, generating the same
1179  * configuration epoch unilaterally, because the config epoch conflict
1180  * resolution algorithm will eventually move colliding nodes to different
1181  * config epochs. However using this function may violate the "last failover
1182  * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1183 int clusterBumpConfigEpochWithoutConsensus(void) {
1184     uint64_t maxEpoch = clusterGetMaxEpoch();
1185 
1186     if (myself->configEpoch == 0 ||
1187         myself->configEpoch != maxEpoch)
1188     {
1189         server.cluster->currentEpoch++;
1190         myself->configEpoch = server.cluster->currentEpoch;
1191         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1192                              CLUSTER_TODO_FSYNC_CONFIG);
1193         serverLog(LL_WARNING,
1194             "New configEpoch set to %llu",
1195             (unsigned long long) myself->configEpoch);
1196         return C_OK;
1197     } else {
1198         return C_ERR;
1199     }
1200 }
1201 
1202 /* This function is called when this node is a master, and we receive from
1203  * another master a configuration epoch that is equal to our configuration
1204  * epoch.
1205  *
1206  * BACKGROUND
1207  *
1208  * It is not possible that different slaves get the same config
1209  * epoch during a failover election, because the slaves need to get voted
1210  * by a majority. However when we perform a manual resharding of the cluster
1211  * the node will assign a configuration epoch to itself without to ask
1212  * for agreement. Usually resharding happens when the cluster is working well
1213  * and is supervised by the sysadmin, however it is possible for a failover
1214  * to happen exactly while the node we are resharding a slot to assigns itself
1215  * a new configuration epoch, but before it is able to propagate it.
1216  *
1217  * So technically it is possible in this condition that two nodes end with
1218  * the same configuration epoch.
1219  *
1220  * Another possibility is that there are bugs in the implementation causing
1221  * this to happen.
1222  *
1223  * Moreover when a new cluster is created, all the nodes start with the same
1224  * configEpoch. This collision resolution code allows nodes to automatically
1225  * end with a different configEpoch at startup automatically.
1226  *
1227  * In all the cases, we want a mechanism that resolves this issue automatically
1228  * as a safeguard. The same configuration epoch for masters serving different
1229  * set of slots is not harmful, but it is if the nodes end serving the same
1230  * slots for some reason (manual errors or software bugs) without a proper
1231  * failover procedure.
1232  *
1233  * In general we want a system that eventually always ends with different
1234  * masters having different configuration epochs whatever happened, since
1235  * nothing is worse than a split-brain condition in a distributed system.
1236  *
1237  * BEHAVIOR
1238  *
1239  * When this function gets called, what happens is that if this node
1240  * has the lexicographically smaller Node ID compared to the other node
1241  * with the conflicting epoch (the 'sender' node), it will assign itself
1242  * the greatest configuration epoch currently detected among nodes plus 1.
1243  *
1244  * This means that even if there are multiple nodes colliding, the node
1245  * with the greatest Node ID never moves forward, so eventually all the nodes
1246  * end with a different configuration epoch.
1247  */
clusterHandleConfigEpochCollision(clusterNode * sender)1248 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1249     /* Prerequisites: nodes have the same configEpoch and are both masters. */
1250     if (sender->configEpoch != myself->configEpoch ||
1251         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1252     /* Don't act if the colliding node has a smaller Node ID. */
1253     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1254     /* Get the next ID available at the best of this node knowledge. */
1255     server.cluster->currentEpoch++;
1256     myself->configEpoch = server.cluster->currentEpoch;
1257     clusterSaveConfigOrDie(1);
1258     serverLog(LL_VERBOSE,
1259         "WARNING: configEpoch collision with node %.40s."
1260         " configEpoch set to %llu",
1261         sender->name,
1262         (unsigned long long) myself->configEpoch);
1263 }
1264 
1265 /* -----------------------------------------------------------------------------
1266  * CLUSTER nodes blacklist
1267  *
1268  * The nodes blacklist is just a way to ensure that a given node with a given
1269  * Node ID is not re-added before some time elapsed (this time is specified
1270  * in seconds in CLUSTER_BLACKLIST_TTL).
1271  *
1272  * This is useful when we want to remove a node from the cluster completely:
1273  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1274  * that even if we receive gossip messages from other nodes that still remember
1275  * about the node we want to remove, we don't re-add it before some time.
1276  *
1277  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1278  * that redis-cli has 60 seconds to send CLUSTER FORGET messages to nodes
1279  * in the cluster without dealing with the problem of other nodes re-adding
1280  * back the node to nodes we already sent the FORGET command to.
1281  *
1282  * The data structure used is a hash table with an sds string representing
1283  * the node ID as key, and the time when it is ok to re-add the node as
1284  * value.
1285  * -------------------------------------------------------------------------- */
1286 
1287 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
1288 
1289 
1290 /* Before of the addNode() or Exists() operations we always remove expired
1291  * entries from the black list. This is an O(N) operation but it is not a
1292  * problem since add / exists operations are called very infrequently and
1293  * the hash table is supposed to contain very little elements at max.
1294  * However without the cleanup during long uptime and with some automated
1295  * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1296 void clusterBlacklistCleanup(void) {
1297     dictIterator *di;
1298     dictEntry *de;
1299 
1300     di = dictGetSafeIterator(server.cluster->nodes_black_list);
1301     while((de = dictNext(di)) != NULL) {
1302         int64_t expire = dictGetUnsignedIntegerVal(de);
1303 
1304         if (expire < server.unixtime)
1305             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1306     }
1307     dictReleaseIterator(di);
1308 }
1309 
1310 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1311 void clusterBlacklistAddNode(clusterNode *node) {
1312     dictEntry *de;
1313     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1314 
1315     clusterBlacklistCleanup();
1316     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1317         /* If the key was added, duplicate the sds string representation of
1318          * the key for the next lookup. We'll free it at the end. */
1319         id = sdsdup(id);
1320     }
1321     de = dictFind(server.cluster->nodes_black_list,id);
1322     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1323     sdsfree(id);
1324 }
1325 
1326 /* Return non-zero if the specified node ID exists in the blacklist.
1327  * You don't need to pass an sds string here, any pointer to 40 bytes
1328  * will work. */
clusterBlacklistExists(char * nodeid)1329 int clusterBlacklistExists(char *nodeid) {
1330     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1331     int retval;
1332 
1333     clusterBlacklistCleanup();
1334     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1335     sdsfree(id);
1336     return retval;
1337 }
1338 
1339 /* -----------------------------------------------------------------------------
1340  * CLUSTER messages exchange - PING/PONG and gossip
1341  * -------------------------------------------------------------------------- */
1342 
1343 /* This function checks if a given node should be marked as FAIL.
1344  * It happens if the following conditions are met:
1345  *
1346  * 1) We received enough failure reports from other master nodes via gossip.
1347  *    Enough means that the majority of the masters signaled the node is
1348  *    down recently.
1349  * 2) We believe this node is in PFAIL state.
1350  *
1351  * If a failure is detected we also inform the whole cluster about this
1352  * event trying to force every other node to set the FAIL flag for the node.
1353  *
1354  * Note that the form of agreement used here is weak, as we collect the majority
1355  * of masters state during some time, and even if we force agreement by
1356  * propagating the FAIL message, because of partitions we may not reach every
1357  * node. However:
1358  *
1359  * 1) Either we reach the majority and eventually the FAIL state will propagate
1360  *    to all the cluster.
1361  * 2) Or there is no majority so no slave promotion will be authorized and the
1362  *    FAIL flag will be cleared after some time.
1363  */
markNodeAsFailingIfNeeded(clusterNode * node)1364 void markNodeAsFailingIfNeeded(clusterNode *node) {
1365     int failures;
1366     int needed_quorum = (server.cluster->size / 2) + 1;
1367 
1368     if (!nodeTimedOut(node)) return; /* We can reach it. */
1369     if (nodeFailed(node)) return; /* Already FAILing. */
1370 
1371     failures = clusterNodeFailureReportsCount(node);
1372     /* Also count myself as a voter if I'm a master. */
1373     if (nodeIsMaster(myself)) failures++;
1374     if (failures < needed_quorum) return; /* No weak agreement from masters. */
1375 
1376     serverLog(LL_NOTICE,
1377         "Marking node %.40s as failing (quorum reached).", node->name);
1378 
1379     /* Mark the node as failing. */
1380     node->flags &= ~CLUSTER_NODE_PFAIL;
1381     node->flags |= CLUSTER_NODE_FAIL;
1382     node->fail_time = mstime();
1383 
1384     /* Broadcast the failing node name to everybody, forcing all the other
1385      * reachable nodes to flag the node as FAIL.
1386      * We do that even if this node is a replica and not a master: anyway
1387      * the failing state is triggered collecting failure reports from masters,
1388      * so here the replica is only helping propagating this status. */
1389     clusterSendFail(node->name);
1390     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1391 }
1392 
1393 /* This function is called only if a node is marked as FAIL, but we are able
1394  * to reach it again. It checks if there are the conditions to undo the FAIL
1395  * state. */
clearNodeFailureIfNeeded(clusterNode * node)1396 void clearNodeFailureIfNeeded(clusterNode *node) {
1397     mstime_t now = mstime();
1398 
1399     serverAssert(nodeFailed(node));
1400 
1401     /* For slaves we always clear the FAIL flag if we can contact the
1402      * node again. */
1403     if (nodeIsSlave(node) || node->numslots == 0) {
1404         serverLog(LL_NOTICE,
1405             "Clear FAIL state for node %.40s: %s is reachable again.",
1406                 node->name,
1407                 nodeIsSlave(node) ? "replica" : "master without slots");
1408         node->flags &= ~CLUSTER_NODE_FAIL;
1409         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1410     }
1411 
1412     /* If it is a master and...
1413      * 1) The FAIL state is old enough.
1414      * 2) It is yet serving slots from our point of view (not failed over).
1415      * Apparently no one is going to fix these slots, clear the FAIL flag. */
1416     if (nodeIsMaster(node) && node->numslots > 0 &&
1417         (now - node->fail_time) >
1418         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1419     {
1420         serverLog(LL_NOTICE,
1421             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1422                 node->name);
1423         node->flags &= ~CLUSTER_NODE_FAIL;
1424         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1425     }
1426 }
1427 
1428 /* Return true if we already have a node in HANDSHAKE state matching the
1429  * specified ip address and port number. This function is used in order to
1430  * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1431 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1432     dictIterator *di;
1433     dictEntry *de;
1434 
1435     di = dictGetSafeIterator(server.cluster->nodes);
1436     while((de = dictNext(di)) != NULL) {
1437         clusterNode *node = dictGetVal(de);
1438 
1439         if (!nodeInHandshake(node)) continue;
1440         if (!strcasecmp(node->ip,ip) &&
1441             node->port == port &&
1442             node->cport == cport) break;
1443     }
1444     dictReleaseIterator(di);
1445     return de != NULL;
1446 }
1447 
1448 /* Start a handshake with the specified address if there is not one
1449  * already in progress. Returns non-zero if the handshake was actually
1450  * started. On error zero is returned and errno is set to one of the
1451  * following values:
1452  *
1453  * EAGAIN - There is already a handshake in progress for this address.
1454  * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1455 int clusterStartHandshake(char *ip, int port, int cport) {
1456     clusterNode *n;
1457     char norm_ip[NET_IP_STR_LEN];
1458     struct sockaddr_storage sa;
1459 
1460     /* IP sanity check */
1461     if (inet_pton(AF_INET,ip,
1462             &(((struct sockaddr_in *)&sa)->sin_addr)))
1463     {
1464         sa.ss_family = AF_INET;
1465     } else if (inet_pton(AF_INET6,ip,
1466             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1467     {
1468         sa.ss_family = AF_INET6;
1469     } else {
1470         errno = EINVAL;
1471         return 0;
1472     }
1473 
1474     /* Port sanity check */
1475     if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1476         errno = EINVAL;
1477         return 0;
1478     }
1479 
1480     /* Set norm_ip as the normalized string representation of the node
1481      * IP address. */
1482     memset(norm_ip,0,NET_IP_STR_LEN);
1483     if (sa.ss_family == AF_INET)
1484         inet_ntop(AF_INET,
1485             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1486             norm_ip,NET_IP_STR_LEN);
1487     else
1488         inet_ntop(AF_INET6,
1489             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1490             norm_ip,NET_IP_STR_LEN);
1491 
1492     if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1493         errno = EAGAIN;
1494         return 0;
1495     }
1496 
1497     /* Add the node with a random address (NULL as first argument to
1498      * createClusterNode()). Everything will be fixed during the
1499      * handshake. */
1500     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1501     memcpy(n->ip,norm_ip,sizeof(n->ip));
1502     n->port = port;
1503     n->cport = cport;
1504     clusterAddNode(n);
1505     return 1;
1506 }
1507 
1508 /* Process the gossip section of PING or PONG packets.
1509  * Note that this function assumes that the packet is already sanity-checked
1510  * by the caller, not in the content of the gossip section, but in the
1511  * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1512 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1513     uint16_t count = ntohs(hdr->count);
1514     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1515     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1516 
1517     while(count--) {
1518         uint16_t flags = ntohs(g->flags);
1519         clusterNode *node;
1520         sds ci;
1521 
1522         if (server.verbosity == LL_DEBUG) {
1523             ci = representClusterNodeFlags(sdsempty(), flags);
1524             serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1525                 g->nodename,
1526                 g->ip,
1527                 ntohs(g->port),
1528                 ntohs(g->cport),
1529                 ci);
1530             sdsfree(ci);
1531         }
1532 
1533         /* Update our state accordingly to the gossip sections */
1534         node = clusterLookupNode(g->nodename);
1535         if (node) {
1536             /* We already know this node.
1537                Handle failure reports, only when the sender is a master. */
1538             if (sender && nodeIsMaster(sender) && node != myself) {
1539                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1540                     if (clusterNodeAddFailureReport(node,sender)) {
1541                         serverLog(LL_VERBOSE,
1542                             "Node %.40s reported node %.40s as not reachable.",
1543                             sender->name, node->name);
1544                     }
1545                     markNodeAsFailingIfNeeded(node);
1546                 } else {
1547                     if (clusterNodeDelFailureReport(node,sender)) {
1548                         serverLog(LL_VERBOSE,
1549                             "Node %.40s reported node %.40s is back online.",
1550                             sender->name, node->name);
1551                     }
1552                 }
1553             }
1554 
1555             /* If from our POV the node is up (no failure flags are set),
1556              * we have no pending ping for the node, nor we have failure
1557              * reports for this node, update the last pong time with the
1558              * one we see from the other nodes. */
1559             if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1560                 node->ping_sent == 0 &&
1561                 clusterNodeFailureReportsCount(node) == 0)
1562             {
1563                 mstime_t pongtime = ntohl(g->pong_received);
1564                 pongtime *= 1000; /* Convert back to milliseconds. */
1565 
1566                 /* Replace the pong time with the received one only if
1567                  * it's greater than our view but is not in the future
1568                  * (with 500 milliseconds tolerance) from the POV of our
1569                  * clock. */
1570                 if (pongtime <= (server.mstime+500) &&
1571                     pongtime > node->pong_received)
1572                 {
1573                     node->pong_received = pongtime;
1574                 }
1575             }
1576 
1577             /* If we already know this node, but it is not reachable, and
1578              * we see a different address in the gossip section of a node that
1579              * can talk with this other node, update the address, disconnect
1580              * the old link if any, so that we'll attempt to connect with the
1581              * new address. */
1582             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1583                 !(flags & CLUSTER_NODE_NOADDR) &&
1584                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1585                 (strcasecmp(node->ip,g->ip) ||
1586                  node->port != ntohs(g->port) ||
1587                  node->cport != ntohs(g->cport)))
1588             {
1589                 if (node->link) freeClusterLink(node->link);
1590                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1591                 node->port = ntohs(g->port);
1592                 node->pport = ntohs(g->pport);
1593                 node->cport = ntohs(g->cport);
1594                 node->flags &= ~CLUSTER_NODE_NOADDR;
1595             }
1596         } else {
1597             /* If it's not in NOADDR state and we don't have it, we
1598              * add it to our trusted dict with exact nodeid and flag.
1599              * Note that we cannot simply start a handshake against
1600              * this IP/PORT pairs, since IP/PORT can be reused already,
1601              * otherwise we risk joining another cluster.
1602              *
1603              * Note that we require that the sender of this gossip message
1604              * is a well known node in our cluster, otherwise we risk
1605              * joining another cluster. */
1606             if (sender &&
1607                 !(flags & CLUSTER_NODE_NOADDR) &&
1608                 !clusterBlacklistExists(g->nodename))
1609             {
1610                 clusterNode *node;
1611                 node = createClusterNode(g->nodename, flags);
1612                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1613                 node->port = ntohs(g->port);
1614                 node->pport = ntohs(g->pport);
1615                 node->cport = ntohs(g->cport);
1616                 clusterAddNode(node);
1617             }
1618         }
1619 
1620         /* Next node */
1621         g++;
1622     }
1623 }
1624 
1625 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1626  * If 'announced_ip' length is non-zero, it is used instead of extracting
1627  * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1628 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1629     if (announced_ip[0] != '\0') {
1630         memcpy(buf,announced_ip,NET_IP_STR_LEN);
1631         buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1632     } else {
1633         connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
1634     }
1635 }
1636 
1637 /* Update the node address to the IP address that can be extracted
1638  * from link->fd, or if hdr->myip is non empty, to the address the node
1639  * is announcing us. The port is taken from the packet header as well.
1640  *
1641  * If the address or port changed, disconnect the node link so that we'll
1642  * connect again to the new address.
1643  *
1644  * If the ip/port pair are already correct no operation is performed at
1645  * all.
1646  *
1647  * The function returns 0 if the node address is still the same,
1648  * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1649 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1650                               clusterMsg *hdr)
1651 {
1652     char ip[NET_IP_STR_LEN] = {0};
1653     int port = ntohs(hdr->port);
1654     int pport = ntohs(hdr->pport);
1655     int cport = ntohs(hdr->cport);
1656 
1657     /* We don't proceed if the link is the same as the sender link, as this
1658      * function is designed to see if the node link is consistent with the
1659      * symmetric link that is used to receive PINGs from the node.
1660      *
1661      * As a side effect this function never frees the passed 'link', so
1662      * it is safe to call during packet processing. */
1663     if (link == node->link) return 0;
1664 
1665     nodeIp2String(ip,link,hdr->myip);
1666     if (node->port == port && node->cport == cport && node->pport == pport &&
1667         strcmp(ip,node->ip) == 0) return 0;
1668 
1669     /* IP / port is different, update it. */
1670     memcpy(node->ip,ip,sizeof(ip));
1671     node->port = port;
1672     node->pport = pport;
1673     node->cport = cport;
1674     if (node->link) freeClusterLink(node->link);
1675     node->flags &= ~CLUSTER_NODE_NOADDR;
1676     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1677         node->name, node->ip, node->port);
1678 
1679     /* Check if this is our master and we have to change the
1680      * replication target as well. */
1681     if (nodeIsSlave(myself) && myself->slaveof == node)
1682         replicationSetMaster(node->ip, node->port);
1683     return 1;
1684 }
1685 
1686 /* Reconfigure the specified node 'n' as a master. This function is called when
1687  * a node that we believed to be a slave is now acting as master in order to
1688  * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1689 void clusterSetNodeAsMaster(clusterNode *n) {
1690     if (nodeIsMaster(n)) return;
1691 
1692     if (n->slaveof) {
1693         clusterNodeRemoveSlave(n->slaveof,n);
1694         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1695     }
1696     n->flags &= ~CLUSTER_NODE_SLAVE;
1697     n->flags |= CLUSTER_NODE_MASTER;
1698     n->slaveof = NULL;
1699 
1700     /* Update config and state. */
1701     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1702                          CLUSTER_TODO_UPDATE_STATE);
1703 }
1704 
1705 /* This function is called when we receive a master configuration via a
1706  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1707  * node, and the set of slots claimed under this configEpoch.
1708  *
1709  * What we do is to rebind the slots with newer configuration compared to our
1710  * local configuration, and if needed, we turn ourself into a replica of the
1711  * node (see the function comments for more info).
1712  *
1713  * The 'sender' is the node for which we received a configuration update.
1714  * Sometimes it is not actually the "Sender" of the information, like in the
1715  * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1716 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1717     int j;
1718     clusterNode *curmaster = NULL, *newmaster = NULL;
1719     /* The dirty slots list is a list of slots for which we lose the ownership
1720      * while having still keys inside. This usually happens after a failover
1721      * or after a manual cluster reconfiguration operated by the admin.
1722      *
1723      * If the update message is not able to demote a master to slave (in this
1724      * case we'll resync with the master updating the whole key space), we
1725      * need to delete all the keys in the slots we lost ownership. */
1726     uint16_t dirty_slots[CLUSTER_SLOTS];
1727     int dirty_slots_count = 0;
1728 
1729     /* We should detect if sender is new master of our shard.
1730      * We will know it if all our slots were migrated to sender, and sender
1731      * has no slots except ours */
1732     int sender_slots = 0;
1733     int migrated_our_slots = 0;
1734 
1735     /* Here we set curmaster to this node or the node this node
1736      * replicates to if it's a slave. In the for loop we are
1737      * interested to check if slots are taken away from curmaster. */
1738     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1739 
1740     if (sender == myself) {
1741         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1742         return;
1743     }
1744 
1745     for (j = 0; j < CLUSTER_SLOTS; j++) {
1746         if (bitmapTestBit(slots,j)) {
1747             sender_slots++;
1748 
1749             /* The slot is already bound to the sender of this message. */
1750             if (server.cluster->slots[j] == sender) continue;
1751 
1752             /* The slot is in importing state, it should be modified only
1753              * manually via redis-cli (example: a resharding is in progress
1754              * and the migrating side slot was already closed and is advertising
1755              * a new config. We still want the slot to be closed manually). */
1756             if (server.cluster->importing_slots_from[j]) continue;
1757 
1758             /* We rebind the slot to the new node claiming it if:
1759              * 1) The slot was unassigned or the new node claims it with a
1760              *    greater configEpoch.
1761              * 2) We are not currently importing the slot. */
1762             if (server.cluster->slots[j] == NULL ||
1763                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1764             {
1765                 /* Was this slot mine, and still contains keys? Mark it as
1766                  * a dirty slot. */
1767                 if (server.cluster->slots[j] == myself &&
1768                     countKeysInSlot(j) &&
1769                     sender != myself)
1770                 {
1771                     dirty_slots[dirty_slots_count] = j;
1772                     dirty_slots_count++;
1773                 }
1774 
1775                 if (server.cluster->slots[j] == curmaster) {
1776                     newmaster = sender;
1777                     migrated_our_slots++;
1778                 }
1779                 clusterDelSlot(j);
1780                 clusterAddSlot(sender,j);
1781                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1782                                      CLUSTER_TODO_UPDATE_STATE|
1783                                      CLUSTER_TODO_FSYNC_CONFIG);
1784             }
1785         }
1786     }
1787 
1788     /* After updating the slots configuration, don't do any actual change
1789      * in the state of the server if a module disabled Redis Cluster
1790      * keys redirections. */
1791     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1792         return;
1793 
1794     /* If at least one slot was reassigned from a node to another node
1795      * with a greater configEpoch, it is possible that:
1796      * 1) We are a master left without slots. This means that we were
1797      *    failed over and we should turn into a replica of the new
1798      *    master.
1799      * 2) We are a slave and our master is left without slots. We need
1800      *    to replicate to the new slots owner. */
1801     if (newmaster && curmaster->numslots == 0 &&
1802             (server.cluster_allow_replica_migration ||
1803              sender_slots == migrated_our_slots)) {
1804         serverLog(LL_WARNING,
1805             "Configuration change detected. Reconfiguring myself "
1806             "as a replica of %.40s", sender->name);
1807         clusterSetMaster(sender);
1808         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1809                              CLUSTER_TODO_UPDATE_STATE|
1810                              CLUSTER_TODO_FSYNC_CONFIG);
1811     } else if (dirty_slots_count) {
1812         /* If we are here, we received an update message which removed
1813          * ownership for certain slots we still have keys about, but still
1814          * we are serving some slots, so this master node was not demoted to
1815          * a slave.
1816          *
1817          * In order to maintain a consistent state between keys and slots
1818          * we need to remove all the keys from the slots we lost. */
1819         for (j = 0; j < dirty_slots_count; j++)
1820             delKeysInSlot(dirty_slots[j]);
1821     }
1822 }
1823 
1824 /* When this function is called, there is a packet to process starting
1825  * at link->rcvbuf. Releasing the buffer is up to the caller, so this
1826  * function should just handle the higher level stuff of processing the
1827  * packet, modifying the cluster state if needed.
1828  *
1829  * The function returns 1 if the link is still valid after the packet
1830  * was processed, otherwise 0 if the link was freed since the packet
1831  * processing lead to some inconsistency error (for instance a PONG
1832  * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1833 int clusterProcessPacket(clusterLink *link) {
1834     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1835     uint32_t totlen = ntohl(hdr->totlen);
1836     uint16_t type = ntohs(hdr->type);
1837     mstime_t now = mstime();
1838 
1839     if (type < CLUSTERMSG_TYPE_COUNT)
1840         server.cluster->stats_bus_messages_received[type]++;
1841     serverLog(LL_DEBUG,"--- Processing packet of type %s, %lu bytes",
1842         clusterGetMessageTypeString(type), (unsigned long) totlen);
1843 
1844     /* Perform sanity checks */
1845     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1846     if (totlen > link->rcvbuf_len) return 1;
1847 
1848     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1849         /* Can't handle messages of different versions. */
1850         return 1;
1851     }
1852 
1853     uint16_t flags = ntohs(hdr->flags);
1854     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1855     clusterNode *sender;
1856 
1857     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1858         type == CLUSTERMSG_TYPE_MEET)
1859     {
1860         uint16_t count = ntohs(hdr->count);
1861         uint32_t explen; /* expected length of this packet */
1862 
1863         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1864         explen += (sizeof(clusterMsgDataGossip)*count);
1865         if (totlen != explen) return 1;
1866     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1867         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1868 
1869         explen += sizeof(clusterMsgDataFail);
1870         if (totlen != explen) return 1;
1871     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1872         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1873 
1874         explen += sizeof(clusterMsgDataPublish) -
1875                 8 +
1876                 ntohl(hdr->data.publish.msg.channel_len) +
1877                 ntohl(hdr->data.publish.msg.message_len);
1878         if (totlen != explen) return 1;
1879     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1880                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1881                type == CLUSTERMSG_TYPE_MFSTART)
1882     {
1883         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1884 
1885         if (totlen != explen) return 1;
1886     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1887         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1888 
1889         explen += sizeof(clusterMsgDataUpdate);
1890         if (totlen != explen) return 1;
1891     } else if (type == CLUSTERMSG_TYPE_MODULE) {
1892         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1893 
1894         explen += sizeof(clusterMsgModule) -
1895                 3 + ntohl(hdr->data.module.msg.len);
1896         if (totlen != explen) return 1;
1897     }
1898 
1899     /* Check if the sender is a known node. Note that for incoming connections
1900      * we don't store link->node information, but resolve the node by the
1901      * ID in the header each time in the current implementation. */
1902     sender = clusterLookupNode(hdr->sender);
1903 
1904     /* Update the last time we saw any data from this node. We
1905      * use this in order to avoid detecting a timeout from a node that
1906      * is just sending a lot of data in the cluster bus, for instance
1907      * because of Pub/Sub. */
1908     if (sender) sender->data_received = now;
1909 
1910     if (sender && !nodeInHandshake(sender)) {
1911         /* Update our currentEpoch if we see a newer epoch in the cluster. */
1912         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1913         senderConfigEpoch = ntohu64(hdr->configEpoch);
1914         if (senderCurrentEpoch > server.cluster->currentEpoch)
1915             server.cluster->currentEpoch = senderCurrentEpoch;
1916         /* Update the sender configEpoch if it is publishing a newer one. */
1917         if (senderConfigEpoch > sender->configEpoch) {
1918             sender->configEpoch = senderConfigEpoch;
1919             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1920                                  CLUSTER_TODO_FSYNC_CONFIG);
1921         }
1922         /* Update the replication offset info for this node. */
1923         sender->repl_offset = ntohu64(hdr->offset);
1924         sender->repl_offset_time = now;
1925         /* If we are a slave performing a manual failover and our master
1926          * sent its offset while already paused, populate the MF state. */
1927         if (server.cluster->mf_end &&
1928             nodeIsSlave(myself) &&
1929             myself->slaveof == sender &&
1930             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1931             server.cluster->mf_master_offset == -1)
1932         {
1933             server.cluster->mf_master_offset = sender->repl_offset;
1934             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
1935             serverLog(LL_WARNING,
1936                 "Received replication offset for paused "
1937                 "master manual failover: %lld",
1938                 server.cluster->mf_master_offset);
1939         }
1940     }
1941 
1942     /* Initial processing of PING and MEET requests replying with a PONG. */
1943     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1944         /* We use incoming MEET messages in order to set the address
1945          * for 'myself', since only other cluster nodes will send us
1946          * MEET messages on handshakes, when the cluster joins, or
1947          * later if we changed address, and those nodes will use our
1948          * official address to connect to us. So by obtaining this address
1949          * from the socket is a simple way to discover / update our own
1950          * address in the cluster without it being hardcoded in the config.
1951          *
1952          * However if we don't have an address at all, we update the address
1953          * even with a normal PING packet. If it's wrong it will be fixed
1954          * by MEET later. */
1955         if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1956             server.cluster_announce_ip == NULL)
1957         {
1958             char ip[NET_IP_STR_LEN];
1959 
1960             if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
1961                 strcmp(ip,myself->ip))
1962             {
1963                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1964                 serverLog(LL_WARNING,"IP address for this node updated to %s",
1965                     myself->ip);
1966                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1967             }
1968         }
1969 
1970         /* Add this node if it is new for us and the msg type is MEET.
1971          * In this stage we don't try to add the node with the right
1972          * flags, slaveof pointer, and so forth, as this details will be
1973          * resolved when we'll receive PONGs from the node. */
1974         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1975             clusterNode *node;
1976 
1977             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1978             nodeIp2String(node->ip,link,hdr->myip);
1979             node->port = ntohs(hdr->port);
1980             node->pport = ntohs(hdr->pport);
1981             node->cport = ntohs(hdr->cport);
1982             clusterAddNode(node);
1983             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1984         }
1985 
1986         /* If this is a MEET packet from an unknown node, we still process
1987          * the gossip section here since we have to trust the sender because
1988          * of the message type. */
1989         if (!sender && type == CLUSTERMSG_TYPE_MEET)
1990             clusterProcessGossipSection(hdr,link);
1991 
1992         /* Anyway reply with a PONG */
1993         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1994     }
1995 
1996     /* PING, PONG, MEET: process config information. */
1997     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1998         type == CLUSTERMSG_TYPE_MEET)
1999     {
2000         serverLog(LL_DEBUG,"%s packet received: %s",
2001             clusterGetMessageTypeString(type),
2002             link->node ? link->node->name : "NULL");
2003         if (link->node) {
2004             if (nodeInHandshake(link->node)) {
2005                 /* If we already have this node, try to change the
2006                  * IP/port of the node with the new one. */
2007                 if (sender) {
2008                     serverLog(LL_VERBOSE,
2009                         "Handshake: we already know node %.40s, "
2010                         "updating the address if needed.", sender->name);
2011                     if (nodeUpdateAddressIfNeeded(sender,link,hdr))
2012                     {
2013                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2014                                              CLUSTER_TODO_UPDATE_STATE);
2015                     }
2016                     /* Free this node as we already have it. This will
2017                      * cause the link to be freed as well. */
2018                     clusterDelNode(link->node);
2019                     return 0;
2020                 }
2021 
2022                 /* First thing to do is replacing the random name with the
2023                  * right node name if this was a handshake stage. */
2024                 clusterRenameNode(link->node, hdr->sender);
2025                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
2026                     link->node->name);
2027                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
2028                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
2029                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2030             } else if (memcmp(link->node->name,hdr->sender,
2031                         CLUSTER_NAMELEN) != 0)
2032             {
2033                 /* If the reply has a non matching node ID we
2034                  * disconnect this node and set it as not having an associated
2035                  * address. */
2036                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
2037                     link->node->name,
2038                     (int)(now-(link->node->ctime)),
2039                     link->node->flags);
2040                 link->node->flags |= CLUSTER_NODE_NOADDR;
2041                 link->node->ip[0] = '\0';
2042                 link->node->port = 0;
2043                 link->node->pport = 0;
2044                 link->node->cport = 0;
2045                 freeClusterLink(link);
2046                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2047                 return 0;
2048             }
2049         }
2050 
2051         /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
2052          * announced. This is a dynamic flag that we receive from the
2053          * sender, and the latest status must be trusted. We need it to
2054          * be propagated because the slave ranking used to understand the
2055          * delay of each slave in the voting process, needs to know
2056          * what are the instances really competing. */
2057         if (sender) {
2058             int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
2059             sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
2060             sender->flags |= nofailover;
2061         }
2062 
2063         /* Update the node address if it changed. */
2064         if (sender && type == CLUSTERMSG_TYPE_PING &&
2065             !nodeInHandshake(sender) &&
2066             nodeUpdateAddressIfNeeded(sender,link,hdr))
2067         {
2068             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2069                                  CLUSTER_TODO_UPDATE_STATE);
2070         }
2071 
2072         /* Update our info about the node */
2073         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
2074             link->node->pong_received = now;
2075             link->node->ping_sent = 0;
2076 
2077             /* The PFAIL condition can be reversed without external
2078              * help if it is momentary (that is, if it does not
2079              * turn into a FAIL state).
2080              *
2081              * The FAIL condition is also reversible under specific
2082              * conditions detected by clearNodeFailureIfNeeded(). */
2083             if (nodeTimedOut(link->node)) {
2084                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
2085                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2086                                      CLUSTER_TODO_UPDATE_STATE);
2087             } else if (nodeFailed(link->node)) {
2088                 clearNodeFailureIfNeeded(link->node);
2089             }
2090         }
2091 
2092         /* Check for role switch: slave -> master or master -> slave. */
2093         if (sender) {
2094             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
2095                 sizeof(hdr->slaveof)))
2096             {
2097                 /* Node is a master. */
2098                 clusterSetNodeAsMaster(sender);
2099             } else {
2100                 /* Node is a slave. */
2101                 clusterNode *master = clusterLookupNode(hdr->slaveof);
2102 
2103                 if (nodeIsMaster(sender)) {
2104                     /* Master turned into a slave! Reconfigure the node. */
2105                     clusterDelNodeSlots(sender);
2106                     sender->flags &= ~(CLUSTER_NODE_MASTER|
2107                                        CLUSTER_NODE_MIGRATE_TO);
2108                     sender->flags |= CLUSTER_NODE_SLAVE;
2109 
2110                     /* Update config and state. */
2111                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2112                                          CLUSTER_TODO_UPDATE_STATE);
2113                 }
2114 
2115                 /* Master node changed for this slave? */
2116                 if (master && sender->slaveof != master) {
2117                     if (sender->slaveof)
2118                         clusterNodeRemoveSlave(sender->slaveof,sender);
2119                     clusterNodeAddSlave(master,sender);
2120                     sender->slaveof = master;
2121 
2122                     /* Update config. */
2123                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2124                 }
2125             }
2126         }
2127 
2128         /* Update our info about served slots.
2129          *
2130          * Note: this MUST happen after we update the master/slave state
2131          * so that CLUSTER_NODE_MASTER flag will be set. */
2132 
2133         /* Many checks are only needed if the set of served slots this
2134          * instance claims is different compared to the set of slots we have
2135          * for it. Check this ASAP to avoid other computational expansive
2136          * checks later. */
2137         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
2138         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
2139 
2140         if (sender) {
2141             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
2142             if (sender_master) {
2143                 dirty_slots = memcmp(sender_master->slots,
2144                         hdr->myslots,sizeof(hdr->myslots)) != 0;
2145             }
2146         }
2147 
2148         /* 1) If the sender of the message is a master, and we detected that
2149          *    the set of slots it claims changed, scan the slots to see if we
2150          *    need to update our configuration. */
2151         if (sender && nodeIsMaster(sender) && dirty_slots)
2152             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
2153 
2154         /* 2) We also check for the reverse condition, that is, the sender
2155          *    claims to serve slots we know are served by a master with a
2156          *    greater configEpoch. If this happens we inform the sender.
2157          *
2158          * This is useful because sometimes after a partition heals, a
2159          * reappearing master may be the last one to claim a given set of
2160          * hash slots, but with a configuration that other instances know to
2161          * be deprecated. Example:
2162          *
2163          * A and B are master and slave for slots 1,2,3.
2164          * A is partitioned away, B gets promoted.
2165          * B is partitioned away, and A returns available.
2166          *
2167          * Usually B would PING A publishing its set of served slots and its
2168          * configEpoch, but because of the partition B can't inform A of the
2169          * new configuration, so other nodes that have an updated table must
2170          * do it. In this way A will stop to act as a master (or can try to
2171          * failover if there are the conditions to win the election). */
2172         if (sender && dirty_slots) {
2173             int j;
2174 
2175             for (j = 0; j < CLUSTER_SLOTS; j++) {
2176                 if (bitmapTestBit(hdr->myslots,j)) {
2177                     if (server.cluster->slots[j] == sender ||
2178                         server.cluster->slots[j] == NULL) continue;
2179                     if (server.cluster->slots[j]->configEpoch >
2180                         senderConfigEpoch)
2181                     {
2182                         serverLog(LL_VERBOSE,
2183                             "Node %.40s has old slots configuration, sending "
2184                             "an UPDATE message about %.40s",
2185                                 sender->name, server.cluster->slots[j]->name);
2186                         clusterSendUpdate(sender->link,
2187                             server.cluster->slots[j]);
2188 
2189                         /* TODO: instead of exiting the loop send every other
2190                          * UPDATE packet for other nodes that are the new owner
2191                          * of sender's slots. */
2192                         break;
2193                     }
2194                 }
2195             }
2196         }
2197 
2198         /* If our config epoch collides with the sender's try to fix
2199          * the problem. */
2200         if (sender &&
2201             nodeIsMaster(myself) && nodeIsMaster(sender) &&
2202             senderConfigEpoch == myself->configEpoch)
2203         {
2204             clusterHandleConfigEpochCollision(sender);
2205         }
2206 
2207         /* Get info from the gossip section */
2208         if (sender) clusterProcessGossipSection(hdr,link);
2209     } else if (type == CLUSTERMSG_TYPE_FAIL) {
2210         clusterNode *failing;
2211 
2212         if (sender) {
2213             failing = clusterLookupNode(hdr->data.fail.about.nodename);
2214             if (failing &&
2215                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2216             {
2217                 serverLog(LL_NOTICE,
2218                     "FAIL message received from %.40s about %.40s",
2219                     hdr->sender, hdr->data.fail.about.nodename);
2220                 failing->flags |= CLUSTER_NODE_FAIL;
2221                 failing->fail_time = now;
2222                 failing->flags &= ~CLUSTER_NODE_PFAIL;
2223                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2224                                      CLUSTER_TODO_UPDATE_STATE);
2225             }
2226         } else {
2227             serverLog(LL_NOTICE,
2228                 "Ignoring FAIL message from unknown node %.40s about %.40s",
2229                 hdr->sender, hdr->data.fail.about.nodename);
2230         }
2231     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2232         if (!sender) return 1;  /* We don't know that node. */
2233 
2234         robj *channel, *message;
2235         uint32_t channel_len, message_len;
2236 
2237         /* Don't bother creating useless objects if there are no
2238          * Pub/Sub subscribers. */
2239         if (dictSize(server.pubsub_channels) ||
2240            dictSize(server.pubsub_patterns))
2241         {
2242             channel_len = ntohl(hdr->data.publish.msg.channel_len);
2243             message_len = ntohl(hdr->data.publish.msg.message_len);
2244             channel = createStringObject(
2245                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
2246             message = createStringObject(
2247                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
2248                         message_len);
2249             pubsubPublishMessage(channel,message);
2250             decrRefCount(channel);
2251             decrRefCount(message);
2252         }
2253     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2254         if (!sender) return 1;  /* We don't know that node. */
2255         clusterSendFailoverAuthIfNeeded(sender,hdr);
2256     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2257         if (!sender) return 1;  /* We don't know that node. */
2258         /* We consider this vote only if the sender is a master serving
2259          * a non zero number of slots, and its currentEpoch is greater or
2260          * equal to epoch where this node started the election. */
2261         if (nodeIsMaster(sender) && sender->numslots > 0 &&
2262             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2263         {
2264             server.cluster->failover_auth_count++;
2265             /* Maybe we reached a quorum here, set a flag to make sure
2266              * we check ASAP. */
2267             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2268         }
2269     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2270         /* This message is acceptable only if I'm a master and the sender
2271          * is one of my slaves. */
2272         if (!sender || sender->slaveof != myself) return 1;
2273         /* Manual failover requested from slaves. Initialize the state
2274          * accordingly. */
2275         resetManualFailover();
2276         server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2277         server.cluster->mf_slave = sender;
2278         pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE);
2279         serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2280             sender->name);
2281         /* We need to send a ping message to the replica, as it would carry
2282          * `server.cluster->mf_master_offset`, which means the master paused clients
2283          * at offset `server.cluster->mf_master_offset`, so that the replica would
2284          * know that it is safe to set its `server.cluster->mf_can_start` to 1 so as
2285          * to complete failover as quickly as possible. */
2286         clusterSendPing(link, CLUSTERMSG_TYPE_PING);
2287     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2288         clusterNode *n; /* The node the update is about. */
2289         uint64_t reportedConfigEpoch =
2290                     ntohu64(hdr->data.update.nodecfg.configEpoch);
2291 
2292         if (!sender) return 1;  /* We don't know the sender. */
2293         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2294         if (!n) return 1;   /* We don't know the reported node. */
2295         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2296 
2297         /* If in our current config the node is a slave, set it as a master. */
2298         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2299 
2300         /* Update the node's configEpoch. */
2301         n->configEpoch = reportedConfigEpoch;
2302         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2303                              CLUSTER_TODO_FSYNC_CONFIG);
2304 
2305         /* Check the bitmap of served slots and update our
2306          * config accordingly. */
2307         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2308             hdr->data.update.nodecfg.slots);
2309     } else if (type == CLUSTERMSG_TYPE_MODULE) {
2310         if (!sender) return 1;  /* Protect the module from unknown nodes. */
2311         /* We need to route this message back to the right module subscribed
2312          * for the right message type. */
2313         uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2314         uint32_t len = ntohl(hdr->data.module.msg.len);
2315         uint8_t type = hdr->data.module.msg.type;
2316         unsigned char *payload = hdr->data.module.msg.bulk_data;
2317         moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2318     } else {
2319         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2320     }
2321     return 1;
2322 }
2323 
2324 /* This function is called when we detect the link with this node is lost.
2325    We set the node as no longer connected. The Cluster Cron will detect
2326    this connection and will try to get it connected again.
2327 
2328    Instead if the node is a temporary node used to accept a query, we
2329    completely free the node on error. */
handleLinkIOError(clusterLink * link)2330 void handleLinkIOError(clusterLink *link) {
2331     freeClusterLink(link);
2332 }
2333 
2334 /* Send data. This is handled using a trivial send buffer that gets
2335  * consumed by write(). We don't try to optimize this for speed too much
2336  * as this is a very low traffic channel. */
clusterWriteHandler(connection * conn)2337 void clusterWriteHandler(connection *conn) {
2338     clusterLink *link = connGetPrivateData(conn);
2339     ssize_t nwritten;
2340 
2341     nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
2342     if (nwritten <= 0) {
2343         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2344             (nwritten == -1) ? connGetLastError(conn) : "short write");
2345         handleLinkIOError(link);
2346         return;
2347     }
2348     sdsrange(link->sndbuf,nwritten,-1);
2349     if (sdslen(link->sndbuf) == 0)
2350         connSetWriteHandler(link->conn, NULL);
2351 }
2352 
2353 /* A connect handler that gets called when a connection to another node
2354  * gets established.
2355  */
clusterLinkConnectHandler(connection * conn)2356 void clusterLinkConnectHandler(connection *conn) {
2357     clusterLink *link = connGetPrivateData(conn);
2358     clusterNode *node = link->node;
2359 
2360     /* Check if connection succeeded */
2361     if (connGetState(conn) != CONN_STATE_CONNECTED) {
2362         serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
2363                 node->name, node->ip, node->cport,
2364                 connGetLastError(conn));
2365         freeClusterLink(link);
2366         return;
2367     }
2368 
2369     /* Register a read handler from now on */
2370     connSetReadHandler(conn, clusterReadHandler);
2371 
2372     /* Queue a PING in the new connection ASAP: this is crucial
2373      * to avoid false positives in failure detection.
2374      *
2375      * If the node is flagged as MEET, we send a MEET message instead
2376      * of a PING one, to force the receiver to add us in its node
2377      * table. */
2378     mstime_t old_ping_sent = node->ping_sent;
2379     clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
2380             CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
2381     if (old_ping_sent) {
2382         /* If there was an active ping before the link was
2383          * disconnected, we want to restore the ping time, otherwise
2384          * replaced by the clusterSendPing() call. */
2385         node->ping_sent = old_ping_sent;
2386     }
2387     /* We can clear the flag after the first packet is sent.
2388      * If we'll never receive a PONG, we'll never send new packets
2389      * to this node. Instead after the PONG is received and we
2390      * are no longer in meet/handshake status, we want to send
2391      * normal PING packets. */
2392     node->flags &= ~CLUSTER_NODE_MEET;
2393 
2394     serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
2395             node->name, node->ip, node->cport);
2396 }
2397 
2398 /* Read data. Try to read the first field of the header first to check the
2399  * full length of the packet. When a whole packet is in memory this function
2400  * will call the function to process the packet. And so forth. */
clusterReadHandler(connection * conn)2401 void clusterReadHandler(connection *conn) {
2402     clusterMsg buf[1];
2403     ssize_t nread;
2404     clusterMsg *hdr;
2405     clusterLink *link = connGetPrivateData(conn);
2406     unsigned int readlen, rcvbuflen;
2407 
2408     while(1) { /* Read as long as there is data to read. */
2409         rcvbuflen = link->rcvbuf_len;
2410         if (rcvbuflen < 8) {
2411             /* First, obtain the first 8 bytes to get the full message
2412              * length. */
2413             readlen = 8 - rcvbuflen;
2414         } else {
2415             /* Finally read the full message. */
2416             hdr = (clusterMsg*) link->rcvbuf;
2417             if (rcvbuflen == 8) {
2418                 /* Perform some sanity check on the message signature
2419                  * and length. */
2420                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2421                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2422                 {
2423                     serverLog(LL_WARNING,
2424                         "Bad message length or signature received "
2425                         "from Cluster bus.");
2426                     handleLinkIOError(link);
2427                     return;
2428                 }
2429             }
2430             readlen = ntohl(hdr->totlen) - rcvbuflen;
2431             if (readlen > sizeof(buf)) readlen = sizeof(buf);
2432         }
2433 
2434         nread = connRead(conn,buf,readlen);
2435         if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
2436 
2437         if (nread <= 0) {
2438             /* I/O error... */
2439             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2440                 (nread == 0) ? "connection closed" : connGetLastError(conn));
2441             handleLinkIOError(link);
2442             return;
2443         } else {
2444             /* Read data and recast the pointer to the new buffer. */
2445             size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
2446             if ((size_t)nread > unused) {
2447                 size_t required = link->rcvbuf_len + nread;
2448                 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
2449                 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
2450                 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
2451             }
2452             memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
2453             link->rcvbuf_len += nread;
2454             hdr = (clusterMsg*) link->rcvbuf;
2455             rcvbuflen += nread;
2456         }
2457 
2458         /* Total length obtained? Process this packet. */
2459         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2460             if (clusterProcessPacket(link)) {
2461                 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
2462                     zfree(link->rcvbuf);
2463                     link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
2464                 }
2465                 link->rcvbuf_len = 0;
2466             } else {
2467                 return; /* Link no longer valid. */
2468             }
2469         }
2470     }
2471 }
2472 
2473 /* Put stuff into the send buffer.
2474  *
2475  * It is guaranteed that this function will never have as a side effect
2476  * the link to be invalidated, so it is safe to call this function
2477  * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2478 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2479     if (sdslen(link->sndbuf) == 0 && msglen != 0)
2480         connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
2481 
2482     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2483 
2484     /* Populate sent messages stats. */
2485     clusterMsg *hdr = (clusterMsg*) msg;
2486     uint16_t type = ntohs(hdr->type);
2487     if (type < CLUSTERMSG_TYPE_COUNT)
2488         server.cluster->stats_bus_messages_sent[type]++;
2489 }
2490 
2491 /* Send a message to all the nodes that are part of the cluster having
2492  * a connected link.
2493  *
2494  * It is guaranteed that this function will never have as a side effect
2495  * some node->link to be invalidated, so it is safe to call this function
2496  * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2497 void clusterBroadcastMessage(void *buf, size_t len) {
2498     dictIterator *di;
2499     dictEntry *de;
2500 
2501     di = dictGetSafeIterator(server.cluster->nodes);
2502     while((de = dictNext(di)) != NULL) {
2503         clusterNode *node = dictGetVal(de);
2504 
2505         if (!node->link) continue;
2506         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2507             continue;
2508         clusterSendMessage(node->link,buf,len);
2509     }
2510     dictReleaseIterator(di);
2511 }
2512 
2513 /* Build the message header. hdr must point to a buffer at least
2514  * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2515 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2516     int totlen = 0;
2517     uint64_t offset;
2518     clusterNode *master;
2519 
2520     /* If this node is a master, we send its slots bitmap and configEpoch.
2521      * If this node is a slave we send the master's information instead (the
2522      * node is flagged as slave so the receiver knows that it is NOT really
2523      * in charge for this slots. */
2524     master = (nodeIsSlave(myself) && myself->slaveof) ?
2525               myself->slaveof : myself;
2526 
2527     memset(hdr,0,sizeof(*hdr));
2528     hdr->ver = htons(CLUSTER_PROTO_VER);
2529     hdr->sig[0] = 'R';
2530     hdr->sig[1] = 'C';
2531     hdr->sig[2] = 'm';
2532     hdr->sig[3] = 'b';
2533     hdr->type = htons(type);
2534     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2535 
2536     /* If cluster-announce-ip option is enabled, force the receivers of our
2537      * packets to use the specified address for this node. Otherwise if the
2538      * first byte is zero, they'll do auto discovery. */
2539     memset(hdr->myip,0,NET_IP_STR_LEN);
2540     if (server.cluster_announce_ip) {
2541         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
2542         hdr->myip[NET_IP_STR_LEN-1] = '\0';
2543     }
2544 
2545     /* Handle cluster-announce-[tls-|bus-]port. */
2546     int announced_port, announced_pport, announced_cport;
2547     deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
2548 
2549     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2550     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2551     if (myself->slaveof != NULL)
2552         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2553     hdr->port = htons(announced_port);
2554     hdr->pport = htons(announced_pport);
2555     hdr->cport = htons(announced_cport);
2556     hdr->flags = htons(myself->flags);
2557     hdr->state = server.cluster->state;
2558 
2559     /* Set the currentEpoch and configEpochs. */
2560     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2561     hdr->configEpoch = htonu64(master->configEpoch);
2562 
2563     /* Set the replication offset. */
2564     if (nodeIsSlave(myself))
2565         offset = replicationGetSlaveOffset();
2566     else
2567         offset = server.master_repl_offset;
2568     hdr->offset = htonu64(offset);
2569 
2570     /* Set the message flags. */
2571     if (nodeIsMaster(myself) && server.cluster->mf_end)
2572         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2573 
2574     /* Compute the message length for certain messages. For other messages
2575      * this is up to the caller. */
2576     if (type == CLUSTERMSG_TYPE_FAIL) {
2577         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2578         totlen += sizeof(clusterMsgDataFail);
2579     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2580         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2581         totlen += sizeof(clusterMsgDataUpdate);
2582     }
2583     hdr->totlen = htonl(totlen);
2584     /* For PING, PONG, MEET and other variable length messages fixing the
2585      * totlen field is up to the caller. */
2586 }
2587 
2588 /* Return non zero if the node is already present in the gossip section of the
2589  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2590  * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2591 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2592     int j;
2593     for (j = 0; j < count; j++) {
2594         if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2595                 CLUSTER_NAMELEN) == 0) break;
2596     }
2597     return j != count;
2598 }
2599 
2600 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2601  * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2602 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2603     clusterMsgDataGossip *gossip;
2604     gossip = &(hdr->data.ping.gossip[i]);
2605     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2606     gossip->ping_sent = htonl(n->ping_sent/1000);
2607     gossip->pong_received = htonl(n->pong_received/1000);
2608     memcpy(gossip->ip,n->ip,sizeof(n->ip));
2609     gossip->port = htons(n->port);
2610     gossip->cport = htons(n->cport);
2611     gossip->flags = htons(n->flags);
2612     gossip->pport = htons(n->pport);
2613     gossip->notused1 = 0;
2614 }
2615 
2616 /* Send a PING or PONG packet to the specified node, making sure to add enough
2617  * gossip information. */
clusterSendPing(clusterLink * link,int type)2618 void clusterSendPing(clusterLink *link, int type) {
2619     unsigned char *buf;
2620     clusterMsg *hdr;
2621     int gossipcount = 0; /* Number of gossip sections added so far. */
2622     int wanted; /* Number of gossip sections we want to append if possible. */
2623     int totlen; /* Total packet length. */
2624     /* freshnodes is the max number of nodes we can hope to append at all:
2625      * nodes available minus two (ourself and the node we are sending the
2626      * message to). However practically there may be less valid nodes since
2627      * nodes in handshake state, disconnected, are not considered. */
2628     int freshnodes = dictSize(server.cluster->nodes)-2;
2629 
2630     /* How many gossip sections we want to add? 1/10 of the number of nodes
2631      * and anyway at least 3. Why 1/10?
2632      *
2633      * If we have N masters, with N/10 entries, and we consider that in
2634      * node_timeout we exchange with each other node at least 4 packets
2635      * (we ping in the worst case in node_timeout/2 time, and we also
2636      * receive two pings from the host), we have a total of 8 packets
2637      * in the node_timeout*2 failure reports validity time. So we have
2638      * that, for a single PFAIL node, we can expect to receive the following
2639      * number of failure reports (in the specified window of time):
2640      *
2641      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2642      *
2643      * PROB = probability of being featured in a single gossip entry,
2644      *        which is 1 / NUM_OF_NODES.
2645      * ENTRIES = 10.
2646      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2647      *
2648      * If we assume we have just masters (so num of nodes and num of masters
2649      * is the same), with 1/10 we always get over the majority, and specifically
2650      * 80% of the number of nodes, to account for many masters failing at the
2651      * same time.
2652      *
2653      * Since we have non-voting slaves that lower the probability of an entry
2654      * to feature our node, we set the number of entries per packet as
2655      * 10% of the total nodes we have. */
2656     wanted = floor(dictSize(server.cluster->nodes)/10);
2657     if (wanted < 3) wanted = 3;
2658     if (wanted > freshnodes) wanted = freshnodes;
2659 
2660     /* Include all the nodes in PFAIL state, so that failure reports are
2661      * faster to propagate to go from PFAIL to FAIL state. */
2662     int pfail_wanted = server.cluster->stats_pfail_nodes;
2663 
2664     /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
2665      * later according to the number of gossip sections we really were able
2666      * to put inside the packet. */
2667     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2668     totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2669     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2670      * sizeof(clusterMsg) or more. */
2671     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2672     buf = zcalloc(totlen);
2673     hdr = (clusterMsg*) buf;
2674 
2675     /* Populate the header. */
2676     if (link->node && type == CLUSTERMSG_TYPE_PING)
2677         link->node->ping_sent = mstime();
2678     clusterBuildMessageHdr(hdr,type);
2679 
2680     /* Populate the gossip fields */
2681     int maxiterations = wanted*3;
2682     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2683         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2684         clusterNode *this = dictGetVal(de);
2685 
2686         /* Don't include this node: the whole packet header is about us
2687          * already, so we just gossip about other nodes. */
2688         if (this == myself) continue;
2689 
2690         /* PFAIL nodes will be added later. */
2691         if (this->flags & CLUSTER_NODE_PFAIL) continue;
2692 
2693         /* In the gossip section don't include:
2694          * 1) Nodes in HANDSHAKE state.
2695          * 3) Nodes with the NOADDR flag set.
2696          * 4) Disconnected nodes if they don't have configured slots.
2697          */
2698         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2699             (this->link == NULL && this->numslots == 0))
2700         {
2701             freshnodes--; /* Technically not correct, but saves CPU. */
2702             continue;
2703         }
2704 
2705         /* Do not add a node we already have. */
2706         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2707 
2708         /* Add it */
2709         clusterSetGossipEntry(hdr,gossipcount,this);
2710         freshnodes--;
2711         gossipcount++;
2712     }
2713 
2714     /* If there are PFAIL nodes, add them at the end. */
2715     if (pfail_wanted) {
2716         dictIterator *di;
2717         dictEntry *de;
2718 
2719         di = dictGetSafeIterator(server.cluster->nodes);
2720         while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2721             clusterNode *node = dictGetVal(de);
2722             if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2723             if (node->flags & CLUSTER_NODE_NOADDR) continue;
2724             if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2725             clusterSetGossipEntry(hdr,gossipcount,node);
2726             freshnodes--;
2727             gossipcount++;
2728             /* We take the count of the slots we allocated, since the
2729              * PFAIL stats may not match perfectly with the current number
2730              * of PFAIL nodes. */
2731             pfail_wanted--;
2732         }
2733         dictReleaseIterator(di);
2734     }
2735 
2736     /* Ready to send... fix the totlen field and queue the message in the
2737      * output buffer. */
2738     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2739     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2740     hdr->count = htons(gossipcount);
2741     hdr->totlen = htonl(totlen);
2742     clusterSendMessage(link,buf,totlen);
2743     zfree(buf);
2744 }
2745 
2746 /* Send a PONG packet to every connected node that's not in handshake state
2747  * and for which we have a valid link.
2748  *
2749  * In Redis Cluster pongs are not used just for failure detection, but also
2750  * to carry important configuration information. So broadcasting a pong is
2751  * useful when something changes in the configuration and we want to make
2752  * the cluster aware ASAP (for instance after a slave promotion).
2753  *
2754  * The 'target' argument specifies the receiving instances using the
2755  * defines below:
2756  *
2757  * CLUSTER_BROADCAST_ALL -> All known instances.
2758  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2759  */
2760 #define CLUSTER_BROADCAST_ALL 0
2761 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2762 void clusterBroadcastPong(int target) {
2763     dictIterator *di;
2764     dictEntry *de;
2765 
2766     di = dictGetSafeIterator(server.cluster->nodes);
2767     while((de = dictNext(di)) != NULL) {
2768         clusterNode *node = dictGetVal(de);
2769 
2770         if (!node->link) continue;
2771         if (node == myself || nodeInHandshake(node)) continue;
2772         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2773             int local_slave =
2774                 nodeIsSlave(node) && node->slaveof &&
2775                 (node->slaveof == myself || node->slaveof == myself->slaveof);
2776             if (!local_slave) continue;
2777         }
2778         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2779     }
2780     dictReleaseIterator(di);
2781 }
2782 
2783 /* Send a PUBLISH message.
2784  *
2785  * If link is NULL, then the message is broadcasted to the whole cluster.
2786  *
2787  * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
2788  * As all the struct is used as a buffer, when more than 8 bytes are copied into
2789  * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
2790  * positive in this context. */
2791 REDIS_NO_SANITIZE("bounds")
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2792 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2793     unsigned char *payload;
2794     clusterMsg buf[1];
2795     clusterMsg *hdr = (clusterMsg*) buf;
2796     uint32_t totlen;
2797     uint32_t channel_len, message_len;
2798 
2799     channel = getDecodedObject(channel);
2800     message = getDecodedObject(message);
2801     channel_len = sdslen(channel->ptr);
2802     message_len = sdslen(message->ptr);
2803 
2804     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2805     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2806     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2807 
2808     hdr->data.publish.msg.channel_len = htonl(channel_len);
2809     hdr->data.publish.msg.message_len = htonl(message_len);
2810     hdr->totlen = htonl(totlen);
2811 
2812     /* Try to use the local buffer if possible */
2813     if (totlen < sizeof(buf)) {
2814         payload = (unsigned char*)buf;
2815     } else {
2816         payload = zmalloc(totlen);
2817         memcpy(payload,hdr,sizeof(*hdr));
2818         hdr = (clusterMsg*) payload;
2819     }
2820     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2821     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2822         message->ptr,sdslen(message->ptr));
2823 
2824     if (link)
2825         clusterSendMessage(link,payload,totlen);
2826     else
2827         clusterBroadcastMessage(payload,totlen);
2828 
2829     decrRefCount(channel);
2830     decrRefCount(message);
2831     if (payload != (unsigned char*)buf) zfree(payload);
2832 }
2833 
2834 /* Send a FAIL message to all the nodes we are able to contact.
2835  * The FAIL message is sent when we detect that a node is failing
2836  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2837  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2838  * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2839 void clusterSendFail(char *nodename) {
2840     clusterMsg buf[1];
2841     clusterMsg *hdr = (clusterMsg*) buf;
2842 
2843     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2844     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2845     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2846 }
2847 
2848 /* Send an UPDATE message to the specified link carrying the specified 'node'
2849  * slots configuration. The node name, slots bitmap, and configEpoch info
2850  * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2851 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2852     clusterMsg buf[1];
2853     clusterMsg *hdr = (clusterMsg*) buf;
2854 
2855     if (link == NULL) return;
2856     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2857     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2858     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2859     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2860     clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
2861 }
2862 
2863 /* Send a MODULE message.
2864  *
2865  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2866 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2867                        unsigned char *payload, uint32_t len) {
2868     unsigned char *heapbuf;
2869     clusterMsg buf[1];
2870     clusterMsg *hdr = (clusterMsg*) buf;
2871     uint32_t totlen;
2872 
2873     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2874     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2875     totlen += sizeof(clusterMsgModule) - 3 + len;
2876 
2877     hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2878     hdr->data.module.msg.type = type;
2879     hdr->data.module.msg.len = htonl(len);
2880     hdr->totlen = htonl(totlen);
2881 
2882     /* Try to use the local buffer if possible */
2883     if (totlen < sizeof(buf)) {
2884         heapbuf = (unsigned char*)buf;
2885     } else {
2886         heapbuf = zmalloc(totlen);
2887         memcpy(heapbuf,hdr,sizeof(*hdr));
2888         hdr = (clusterMsg*) heapbuf;
2889     }
2890     memcpy(hdr->data.module.msg.bulk_data,payload,len);
2891 
2892     if (link)
2893         clusterSendMessage(link,heapbuf,totlen);
2894     else
2895         clusterBroadcastMessage(heapbuf,totlen);
2896 
2897     if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
2898 }
2899 
2900 /* This function gets a cluster node ID string as target, the same way the nodes
2901  * addresses are represented in the modules side, resolves the node, and sends
2902  * the message. If the target is NULL the message is broadcasted.
2903  *
2904  * The function returns C_OK if the target is valid, otherwise C_ERR is
2905  * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2906 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2907     clusterNode *node = NULL;
2908 
2909     if (target != NULL) {
2910         node = clusterLookupNode(target);
2911         if (node == NULL || node->link == NULL) return C_ERR;
2912     }
2913 
2914     clusterSendModule(target ? node->link : NULL,
2915                       module_id, type, payload, len);
2916     return C_OK;
2917 }
2918 
2919 /* -----------------------------------------------------------------------------
2920  * CLUSTER Pub/Sub support
2921  *
2922  * For now we do very little, just propagating PUBLISH messages across the whole
2923  * cluster. In the future we'll try to get smarter and avoiding propagating those
2924  * messages to hosts without receives for a given channel.
2925  * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2926 void clusterPropagatePublish(robj *channel, robj *message) {
2927     clusterSendPublish(NULL, channel, message);
2928 }
2929 
2930 /* -----------------------------------------------------------------------------
2931  * SLAVE node specific functions
2932  * -------------------------------------------------------------------------- */
2933 
2934 /* This function sends a FAILOVER_AUTH_REQUEST message to every node in order to
2935  * see if there is the quorum for this slave instance to failover its failing
2936  * master.
2937  *
2938  * Note that we send the failover request to everybody, master and slave nodes,
2939  * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2940 void clusterRequestFailoverAuth(void) {
2941     clusterMsg buf[1];
2942     clusterMsg *hdr = (clusterMsg*) buf;
2943     uint32_t totlen;
2944 
2945     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2946     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2947      * in the header to communicate the nodes receiving the message that
2948      * they should authorized the failover even if the master is working. */
2949     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2950     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2951     hdr->totlen = htonl(totlen);
2952     clusterBroadcastMessage(buf,totlen);
2953 }
2954 
2955 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2956 void clusterSendFailoverAuth(clusterNode *node) {
2957     clusterMsg buf[1];
2958     clusterMsg *hdr = (clusterMsg*) buf;
2959     uint32_t totlen;
2960 
2961     if (!node->link) return;
2962     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2963     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2964     hdr->totlen = htonl(totlen);
2965     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2966 }
2967 
2968 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2969 void clusterSendMFStart(clusterNode *node) {
2970     clusterMsg buf[1];
2971     clusterMsg *hdr = (clusterMsg*) buf;
2972     uint32_t totlen;
2973 
2974     if (!node->link) return;
2975     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2976     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2977     hdr->totlen = htonl(totlen);
2978     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2979 }
2980 
2981 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2982 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2983     clusterNode *master = node->slaveof;
2984     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2985     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2986     unsigned char *claimed_slots = request->myslots;
2987     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2988     int j;
2989 
2990     /* IF we are not a master serving at least 1 slot, we don't have the
2991      * right to vote, as the cluster size in Redis Cluster is the number
2992      * of masters serving at least one slot, and quorum is the cluster
2993      * size + 1 */
2994     if (nodeIsSlave(myself) || myself->numslots == 0) return;
2995 
2996     /* Request epoch must be >= our currentEpoch.
2997      * Note that it is impossible for it to actually be greater since
2998      * our currentEpoch was updated as a side effect of receiving this
2999      * request, if the request epoch was greater. */
3000     if (requestCurrentEpoch < server.cluster->currentEpoch) {
3001         serverLog(LL_WARNING,
3002             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
3003             node->name,
3004             (unsigned long long) requestCurrentEpoch,
3005             (unsigned long long) server.cluster->currentEpoch);
3006         return;
3007     }
3008 
3009     /* I already voted for this epoch? Return ASAP. */
3010     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
3011         serverLog(LL_WARNING,
3012                 "Failover auth denied to %.40s: already voted for epoch %llu",
3013                 node->name,
3014                 (unsigned long long) server.cluster->currentEpoch);
3015         return;
3016     }
3017 
3018     /* Node must be a slave and its master down.
3019      * The master can be non failing if the request is flagged
3020      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
3021     if (nodeIsMaster(node) || master == NULL ||
3022         (!nodeFailed(master) && !force_ack))
3023     {
3024         if (nodeIsMaster(node)) {
3025             serverLog(LL_WARNING,
3026                     "Failover auth denied to %.40s: it is a master node",
3027                     node->name);
3028         } else if (master == NULL) {
3029             serverLog(LL_WARNING,
3030                     "Failover auth denied to %.40s: I don't know its master",
3031                     node->name);
3032         } else if (!nodeFailed(master)) {
3033             serverLog(LL_WARNING,
3034                     "Failover auth denied to %.40s: its master is up",
3035                     node->name);
3036         }
3037         return;
3038     }
3039 
3040     /* We did not voted for a slave about this master for two
3041      * times the node timeout. This is not strictly needed for correctness
3042      * of the algorithm but makes the base case more linear. */
3043     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
3044     {
3045         serverLog(LL_WARNING,
3046                 "Failover auth denied to %.40s: "
3047                 "can't vote about this master before %lld milliseconds",
3048                 node->name,
3049                 (long long) ((server.cluster_node_timeout*2)-
3050                              (mstime() - node->slaveof->voted_time)));
3051         return;
3052     }
3053 
3054     /* The slave requesting the vote must have a configEpoch for the claimed
3055      * slots that is >= the one of the masters currently serving the same
3056      * slots in the current configuration. */
3057     for (j = 0; j < CLUSTER_SLOTS; j++) {
3058         if (bitmapTestBit(claimed_slots, j) == 0) continue;
3059         if (server.cluster->slots[j] == NULL ||
3060             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
3061         {
3062             continue;
3063         }
3064         /* If we reached this point we found a slot that in our current slots
3065          * is served by a master with a greater configEpoch than the one claimed
3066          * by the slave requesting our vote. Refuse to vote for this slave. */
3067         serverLog(LL_WARNING,
3068                 "Failover auth denied to %.40s: "
3069                 "slot %d epoch (%llu) > reqEpoch (%llu)",
3070                 node->name, j,
3071                 (unsigned long long) server.cluster->slots[j]->configEpoch,
3072                 (unsigned long long) requestConfigEpoch);
3073         return;
3074     }
3075 
3076     /* We can vote for this slave. */
3077     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
3078     node->slaveof->voted_time = mstime();
3079     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
3080     clusterSendFailoverAuth(node);
3081     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
3082         node->name, (unsigned long long) server.cluster->currentEpoch);
3083 }
3084 
3085 /* This function returns the "rank" of this instance, a slave, in the context
3086  * of its master-slaves ring. The rank of the slave is given by the number of
3087  * other slaves for the same master that have a better replication offset
3088  * compared to the local one (better means, greater, so they claim more data).
3089  *
3090  * A slave with rank 0 is the one with the greatest (most up to date)
3091  * replication offset, and so forth. Note that because how the rank is computed
3092  * multiple slaves may have the same rank, in case they have the same offset.
3093  *
3094  * The slave rank is used to add a delay to start an election in order to
3095  * get voted and replace a failing master. Slaves with better replication
3096  * offsets are more likely to win. */
clusterGetSlaveRank(void)3097 int clusterGetSlaveRank(void) {
3098     long long myoffset;
3099     int j, rank = 0;
3100     clusterNode *master;
3101 
3102     serverAssert(nodeIsSlave(myself));
3103     master = myself->slaveof;
3104     if (master == NULL) return 0; /* Never called by slaves without master. */
3105 
3106     myoffset = replicationGetSlaveOffset();
3107     for (j = 0; j < master->numslaves; j++)
3108         if (master->slaves[j] != myself &&
3109             !nodeCantFailover(master->slaves[j]) &&
3110             master->slaves[j]->repl_offset > myoffset) rank++;
3111     return rank;
3112 }
3113 
3114 /* This function is called by clusterHandleSlaveFailover() in order to
3115  * let the slave log why it is not able to failover. Sometimes there are
3116  * not the conditions, but since the failover function is called again and
3117  * again, we can't log the same things continuously.
3118  *
3119  * This function works by logging only if a given set of conditions are
3120  * true:
3121  *
3122  * 1) The reason for which the failover can't be initiated changed.
3123  *    The reasons also include a NONE reason we reset the state to
3124  *    when the slave finds that its master is fine (no FAIL flag).
3125  * 2) Also, the log is emitted again if the master is still down and
3126  *    the reason for not failing over is still the same, but more than
3127  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
3128  * 3) Finally, the function only logs if the slave is down for more than
3129  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
3130  *    failover starts in a reasonable time.
3131  *
3132  * The function is called with the reason why the slave can't failover
3133  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
3134  *
3135  * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)3136 void clusterLogCantFailover(int reason) {
3137     char *msg;
3138     static time_t lastlog_time = 0;
3139     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
3140 
3141     /* Don't log if we have the same reason for some time. */
3142     if (reason == server.cluster->cant_failover_reason &&
3143         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
3144         return;
3145 
3146     server.cluster->cant_failover_reason = reason;
3147 
3148     /* We also don't emit any log if the master failed no long ago, the
3149      * goal of this function is to log slaves in a stalled condition for
3150      * a long time. */
3151     if (myself->slaveof &&
3152         nodeFailed(myself->slaveof) &&
3153         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
3154 
3155     switch(reason) {
3156     case CLUSTER_CANT_FAILOVER_DATA_AGE:
3157         msg = "Disconnected from master for longer than allowed. "
3158               "Please check the 'cluster-replica-validity-factor' configuration "
3159               "option.";
3160         break;
3161     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
3162         msg = "Waiting the delay before I can start a new failover.";
3163         break;
3164     case CLUSTER_CANT_FAILOVER_EXPIRED:
3165         msg = "Failover attempt expired.";
3166         break;
3167     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
3168         msg = "Waiting for votes, but majority still not reached.";
3169         break;
3170     default:
3171         msg = "Unknown reason code.";
3172         break;
3173     }
3174     lastlog_time = time(NULL);
3175     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
3176 }
3177 
3178 /* This function implements the final part of automatic and manual failovers,
3179  * where the slave grabs its master's hash slots, and propagates the new
3180  * configuration.
3181  *
3182  * Note that it's up to the caller to be sure that the node got a new
3183  * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)3184 void clusterFailoverReplaceYourMaster(void) {
3185     int j;
3186     clusterNode *oldmaster = myself->slaveof;
3187 
3188     if (nodeIsMaster(myself) || oldmaster == NULL) return;
3189 
3190     /* 1) Turn this node into a master. */
3191     clusterSetNodeAsMaster(myself);
3192     replicationUnsetMaster();
3193 
3194     /* 2) Claim all the slots assigned to our master. */
3195     for (j = 0; j < CLUSTER_SLOTS; j++) {
3196         if (clusterNodeGetSlotBit(oldmaster,j)) {
3197             clusterDelSlot(j);
3198             clusterAddSlot(myself,j);
3199         }
3200     }
3201 
3202     /* 3) Update state and save config. */
3203     clusterUpdateState();
3204     clusterSaveConfigOrDie(1);
3205 
3206     /* 4) Pong all the other nodes so that they can update the state
3207      *    accordingly and detect that we switched to master role. */
3208     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
3209 
3210     /* 5) If there was a manual failover in progress, clear the state. */
3211     resetManualFailover();
3212 }
3213 
3214 /* This function is called if we are a slave node and our master serving
3215  * a non-zero amount of hash slots is in FAIL state.
3216  *
3217  * The goal of this function is:
3218  * 1) To check if we are able to perform a failover, is our data updated?
3219  * 2) Try to get elected by masters.
3220  * 3) Perform the failover informing all the other nodes.
3221  */
clusterHandleSlaveFailover(void)3222 void clusterHandleSlaveFailover(void) {
3223     mstime_t data_age;
3224     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
3225     int needed_quorum = (server.cluster->size / 2) + 1;
3226     int manual_failover = server.cluster->mf_end != 0 &&
3227                           server.cluster->mf_can_start;
3228     mstime_t auth_timeout, auth_retry_time;
3229 
3230     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
3231 
3232     /* Compute the failover timeout (the max time we have to send votes
3233      * and wait for replies), and the failover retry time (the time to wait
3234      * before trying to get voted again).
3235      *
3236      * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3237      * Retry is two times the Timeout.
3238      */
3239     auth_timeout = server.cluster_node_timeout*2;
3240     if (auth_timeout < 2000) auth_timeout = 2000;
3241     auth_retry_time = auth_timeout*2;
3242 
3243     /* Pre conditions to run the function, that must be met both in case
3244      * of an automatic or manual failover:
3245      * 1) We are a slave.
3246      * 2) Our master is flagged as FAIL, or this is a manual failover.
3247      * 3) We don't have the no failover configuration set, and this is
3248      *    not a manual failover.
3249      * 4) It is serving slots. */
3250     if (nodeIsMaster(myself) ||
3251         myself->slaveof == NULL ||
3252         (!nodeFailed(myself->slaveof) && !manual_failover) ||
3253         (server.cluster_slave_no_failover && !manual_failover) ||
3254         myself->slaveof->numslots == 0)
3255     {
3256         /* There are no reasons to failover, so we set the reason why we
3257          * are returning without failing over to NONE. */
3258         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3259         return;
3260     }
3261 
3262     /* Set data_age to the number of milliseconds we are disconnected from
3263      * the master. */
3264     if (server.repl_state == REPL_STATE_CONNECTED) {
3265         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3266                    * 1000;
3267     } else {
3268         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3269     }
3270 
3271     /* Remove the node timeout from the data age as it is fine that we are
3272      * disconnected from our master at least for the time it was down to be
3273      * flagged as FAIL, that's the baseline. */
3274     if (data_age > server.cluster_node_timeout)
3275         data_age -= server.cluster_node_timeout;
3276 
3277     /* Check if our data is recent enough according to the slave validity
3278      * factor configured by the user.
3279      *
3280      * Check bypassed for manual failovers. */
3281     if (server.cluster_slave_validity_factor &&
3282         data_age >
3283         (((mstime_t)server.repl_ping_slave_period * 1000) +
3284          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3285     {
3286         if (!manual_failover) {
3287             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3288             return;
3289         }
3290     }
3291 
3292     /* If the previous failover attempt timeout and the retry time has
3293      * elapsed, we can setup a new one. */
3294     if (auth_age > auth_retry_time) {
3295         server.cluster->failover_auth_time = mstime() +
3296             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3297             random() % 500; /* Random delay between 0 and 500 milliseconds. */
3298         server.cluster->failover_auth_count = 0;
3299         server.cluster->failover_auth_sent = 0;
3300         server.cluster->failover_auth_rank = clusterGetSlaveRank();
3301         /* We add another delay that is proportional to the slave rank.
3302          * Specifically 1 second * rank. This way slaves that have a probably
3303          * less updated replication offset, are penalized. */
3304         server.cluster->failover_auth_time +=
3305             server.cluster->failover_auth_rank * 1000;
3306         /* However if this is a manual failover, no delay is needed. */
3307         if (server.cluster->mf_end) {
3308             server.cluster->failover_auth_time = mstime();
3309             server.cluster->failover_auth_rank = 0;
3310 	    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3311         }
3312         serverLog(LL_WARNING,
3313             "Start of election delayed for %lld milliseconds "
3314             "(rank #%d, offset %lld).",
3315             server.cluster->failover_auth_time - mstime(),
3316             server.cluster->failover_auth_rank,
3317             replicationGetSlaveOffset());
3318         /* Now that we have a scheduled election, broadcast our offset
3319          * to all the other slaves so that they'll updated their offsets
3320          * if our offset is better. */
3321         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3322         return;
3323     }
3324 
3325     /* It is possible that we received more updated offsets from other
3326      * slaves for the same master since we computed our election delay.
3327      * Update the delay if our rank changed.
3328      *
3329      * Not performed if this is a manual failover. */
3330     if (server.cluster->failover_auth_sent == 0 &&
3331         server.cluster->mf_end == 0)
3332     {
3333         int newrank = clusterGetSlaveRank();
3334         if (newrank > server.cluster->failover_auth_rank) {
3335             long long added_delay =
3336                 (newrank - server.cluster->failover_auth_rank) * 1000;
3337             server.cluster->failover_auth_time += added_delay;
3338             server.cluster->failover_auth_rank = newrank;
3339             serverLog(LL_WARNING,
3340                 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3341                 newrank, added_delay);
3342         }
3343     }
3344 
3345     /* Return ASAP if we can't still start the election. */
3346     if (mstime() < server.cluster->failover_auth_time) {
3347         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3348         return;
3349     }
3350 
3351     /* Return ASAP if the election is too old to be valid. */
3352     if (auth_age > auth_timeout) {
3353         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3354         return;
3355     }
3356 
3357     /* Ask for votes if needed. */
3358     if (server.cluster->failover_auth_sent == 0) {
3359         server.cluster->currentEpoch++;
3360         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3361         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3362             (unsigned long long) server.cluster->currentEpoch);
3363         clusterRequestFailoverAuth();
3364         server.cluster->failover_auth_sent = 1;
3365         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3366                              CLUSTER_TODO_UPDATE_STATE|
3367                              CLUSTER_TODO_FSYNC_CONFIG);
3368         return; /* Wait for replies. */
3369     }
3370 
3371     /* Check if we reached the quorum. */
3372     if (server.cluster->failover_auth_count >= needed_quorum) {
3373         /* We have the quorum, we can finally failover the master. */
3374 
3375         serverLog(LL_WARNING,
3376             "Failover election won: I'm the new master.");
3377 
3378         /* Update my configEpoch to the epoch of the election. */
3379         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3380             myself->configEpoch = server.cluster->failover_auth_epoch;
3381             serverLog(LL_WARNING,
3382                 "configEpoch set to %llu after successful failover",
3383                 (unsigned long long) myself->configEpoch);
3384         }
3385 
3386         /* Take responsibility for the cluster slots. */
3387         clusterFailoverReplaceYourMaster();
3388     } else {
3389         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3390     }
3391 }
3392 
3393 /* -----------------------------------------------------------------------------
3394  * CLUSTER slave migration
3395  *
3396  * Slave migration is the process that allows a slave of a master that is
3397  * already covered by at least another slave, to "migrate" to a master that
3398  * is orphaned, that is, left with no working slaves.
3399  * ------------------------------------------------------------------------- */
3400 
3401 /* This function is responsible to decide if this replica should be migrated
3402  * to a different (orphaned) master. It is called by the clusterCron() function
3403  * only if:
3404  *
3405  * 1) We are a slave node.
3406  * 2) It was detected that there is at least one orphaned master in
3407  *    the cluster.
3408  * 3) We are a slave of one of the masters with the greatest number of
3409  *    slaves.
3410  *
3411  * This checks are performed by the caller since it requires to iterate
3412  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3413  * if definitely needed.
3414  *
3415  * The function is called with a pre-computed max_slaves, that is the max
3416  * number of working (not in FAIL state) slaves for a single master.
3417  *
3418  * Additional conditions for migration are examined inside the function.
3419  */
clusterHandleSlaveMigration(int max_slaves)3420 void clusterHandleSlaveMigration(int max_slaves) {
3421     int j, okslaves = 0;
3422     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3423     dictIterator *di;
3424     dictEntry *de;
3425 
3426     /* Step 1: Don't migrate if the cluster state is not ok. */
3427     if (server.cluster->state != CLUSTER_OK) return;
3428 
3429     /* Step 2: Don't migrate if my master will not be left with at least
3430      *         'migration-barrier' slaves after my migration. */
3431     if (mymaster == NULL) return;
3432     for (j = 0; j < mymaster->numslaves; j++)
3433         if (!nodeFailed(mymaster->slaves[j]) &&
3434             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3435     if (okslaves <= server.cluster_migration_barrier) return;
3436 
3437     /* Step 3: Identify a candidate for migration, and check if among the
3438      * masters with the greatest number of ok slaves, I'm the one with the
3439      * smallest node ID (the "candidate slave").
3440      *
3441      * Note: this means that eventually a replica migration will occur
3442      * since slaves that are reachable again always have their FAIL flag
3443      * cleared, so eventually there must be a candidate.
3444      * There is a possible race condition causing multiple
3445      * slaves to migrate at the same time, but this is unlikely to
3446      * happen and relatively harmless when it does. */
3447     candidate = myself;
3448     di = dictGetSafeIterator(server.cluster->nodes);
3449     while((de = dictNext(di)) != NULL) {
3450         clusterNode *node = dictGetVal(de);
3451         int okslaves = 0, is_orphaned = 1;
3452 
3453         /* We want to migrate only if this master is working, orphaned, and
3454          * used to have slaves or if failed over a master that had slaves
3455          * (MIGRATE_TO flag). This way we only migrate to instances that were
3456          * supposed to have replicas. */
3457         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3458         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3459 
3460         /* Check number of working slaves. */
3461         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3462         if (okslaves > 0) is_orphaned = 0;
3463 
3464         if (is_orphaned) {
3465             if (!target && node->numslots > 0) target = node;
3466 
3467             /* Track the starting time of the orphaned condition for this
3468              * master. */
3469             if (!node->orphaned_time) node->orphaned_time = mstime();
3470         } else {
3471             node->orphaned_time = 0;
3472         }
3473 
3474         /* Check if I'm the slave candidate for the migration: attached
3475          * to a master with the maximum number of slaves and with the smallest
3476          * node ID. */
3477         if (okslaves == max_slaves) {
3478             for (j = 0; j < node->numslaves; j++) {
3479                 if (memcmp(node->slaves[j]->name,
3480                            candidate->name,
3481                            CLUSTER_NAMELEN) < 0)
3482                 {
3483                     candidate = node->slaves[j];
3484                 }
3485             }
3486         }
3487     }
3488     dictReleaseIterator(di);
3489 
3490     /* Step 4: perform the migration if there is a target, and if I'm the
3491      * candidate, but only if the master is continuously orphaned for a
3492      * couple of seconds, so that during failovers, we give some time to
3493      * the natural slaves of this instance to advertise their switch from
3494      * the old master to the new one. */
3495     if (target && candidate == myself &&
3496         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3497        !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3498     {
3499         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3500             target->name);
3501         clusterSetMaster(target);
3502     }
3503 }
3504 
3505 /* -----------------------------------------------------------------------------
3506  * CLUSTER manual failover
3507  *
3508  * This are the important steps performed by slaves during a manual failover:
3509  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3510  *    setting mf_end to the millisecond unix time at which we'll abort the
3511  *    attempt.
3512  * 2) Slave sends a MFSTART message to the master requesting to pause clients
3513  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3514  *    When master is paused for manual failover, it also starts to flag
3515  *    packets with CLUSTERMSG_FLAG0_PAUSED.
3516  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3517  * 4) If slave received the offset from the master, and its offset matches,
3518  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3519  *    the failover as usually, with the difference that the vote request
3520  *    will be modified to force masters to vote for a slave that has a
3521  *    working master.
3522  *
3523  * From the point of view of the master things are simpler: when a
3524  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3525  * the sender in mf_slave. During the time limit for the manual failover
3526  * the master will just send PINGs more often to this slave, flagged with
3527  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3528  * a packet from the master with this flag set.
3529  *
3530  * The goal of the manual failover is to perform a fast failover without
3531  * data loss due to the asynchronous master-slave replication.
3532  * -------------------------------------------------------------------------- */
3533 
3534 /* Reset the manual failover state. This works for both masters and slaves
3535  * as all the state about manual failover is cleared.
3536  *
3537  * The function can be used both to initialize the manual failover state at
3538  * startup or to abort a manual failover in progress. */
resetManualFailover(void)3539 void resetManualFailover(void) {
3540     if (server.cluster->mf_slave) {
3541         /* We were a master failing over, so we paused clients. Regardless
3542          * of the outcome we unpause now to allow traffic again. */
3543         unpauseClients();
3544     }
3545     server.cluster->mf_end = 0; /* No manual failover in progress. */
3546     server.cluster->mf_can_start = 0;
3547     server.cluster->mf_slave = NULL;
3548     server.cluster->mf_master_offset = -1;
3549 }
3550 
3551 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3552 void manualFailoverCheckTimeout(void) {
3553     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3554         serverLog(LL_WARNING,"Manual failover timed out.");
3555         resetManualFailover();
3556     }
3557 }
3558 
3559 /* This function is called from the cluster cron function in order to go
3560  * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3561 void clusterHandleManualFailover(void) {
3562     /* Return ASAP if no manual failover is in progress. */
3563     if (server.cluster->mf_end == 0) return;
3564 
3565     /* If mf_can_start is non-zero, the failover was already triggered so the
3566      * next steps are performed by clusterHandleSlaveFailover(). */
3567     if (server.cluster->mf_can_start) return;
3568 
3569     if (server.cluster->mf_master_offset == -1) return; /* Wait for offset... */
3570 
3571     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3572         /* Our replication offset matches the master replication offset
3573          * announced after clients were paused. We can start the failover. */
3574         server.cluster->mf_can_start = 1;
3575         serverLog(LL_WARNING,
3576             "All master replication stream processed, "
3577             "manual failover can start.");
3578         clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3579         return;
3580     }
3581     clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
3582 }
3583 
3584 /* -----------------------------------------------------------------------------
3585  * CLUSTER cron job
3586  * -------------------------------------------------------------------------- */
3587 
3588 /* Check if the node is disconnected and re-establish the connection.
3589  * Also update a few stats while we are here, that can be used to make
3590  * better decisions in other part of the code. */
clusterNodeCronHandleReconnect(clusterNode * node,mstime_t handshake_timeout,mstime_t now)3591 int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
3592     /* Not interested in reconnecting the link with myself or nodes
3593      * for which we have no address. */
3594     if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
3595 
3596     if (node->flags & CLUSTER_NODE_PFAIL)
3597         server.cluster->stats_pfail_nodes++;
3598 
3599     /* A Node in HANDSHAKE state has a limited lifespan equal to the
3600      * configured node timeout. */
3601     if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3602         clusterDelNode(node);
3603         return 1;
3604     }
3605 
3606     if (node->link == NULL) {
3607         clusterLink *link = createClusterLink(node);
3608         link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
3609         connSetPrivateData(link->conn, link);
3610         if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
3611                     clusterLinkConnectHandler) == -1) {
3612             /* We got a synchronous error from connect before
3613              * clusterSendPing() had a chance to be called.
3614              * If node->ping_sent is zero, failure detection can't work,
3615              * so we claim we actually sent a ping now (that will
3616              * be really sent as soon as the link is obtained). */
3617             if (node->ping_sent == 0) node->ping_sent = mstime();
3618             serverLog(LL_DEBUG, "Unable to connect to "
3619                 "Cluster Node [%s]:%d -> %s", node->ip,
3620                 node->cport, server.neterr);
3621 
3622             freeClusterLink(link);
3623             return 0;
3624         }
3625         node->link = link;
3626     }
3627     return 0;
3628 }
3629 
3630 /* Resize the send buffer of a node if it is wasting
3631  * enough space. */
clusterNodeCronResizeBuffers(clusterNode * node)3632 int clusterNodeCronResizeBuffers(clusterNode *node) {
3633      /* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
3634       * We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
3635     if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) {
3636         node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf);
3637     }
3638     return 0;
3639 }
3640 
3641 /* This is executed 10 times every second */
clusterCron(void)3642 void clusterCron(void) {
3643     dictIterator *di;
3644     dictEntry *de;
3645     int update_state = 0;
3646     int orphaned_masters; /* How many masters there are without ok slaves. */
3647     int max_slaves; /* Max number of ok slaves for a single master. */
3648     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3649     mstime_t min_pong = 0, now = mstime();
3650     clusterNode *min_pong_node = NULL;
3651     static unsigned long long iteration = 0;
3652     mstime_t handshake_timeout;
3653 
3654     iteration++; /* Number of times this function was called so far. */
3655 
3656     /* The handshake timeout is the time after which a handshake node that was
3657      * not turned into a normal node is removed from the nodes. Usually it is
3658      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3659      * the value of 1 second. */
3660     handshake_timeout = server.cluster_node_timeout;
3661     if (handshake_timeout < 1000) handshake_timeout = 1000;
3662 
3663     /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
3664     server.cluster->stats_pfail_nodes = 0;
3665     /* Run through some of the operations we want to do on each cluster node. */
3666     di = dictGetSafeIterator(server.cluster->nodes);
3667     while((de = dictNext(di)) != NULL) {
3668         clusterNode *node = dictGetVal(de);
3669         /* The protocol is that they return non-zero if the node was
3670          * terminated. */
3671         if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
3672         if(clusterNodeCronResizeBuffers(node)) continue;
3673     }
3674     dictReleaseIterator(di);
3675 
3676     /* Ping some random node 1 time every 10 iterations, so that we usually ping
3677      * one random node every second. */
3678     if (!(iteration % 10)) {
3679         int j;
3680 
3681         /* Check a few random nodes and ping the one with the oldest
3682          * pong_received time. */
3683         for (j = 0; j < 5; j++) {
3684             de = dictGetRandomKey(server.cluster->nodes);
3685             clusterNode *this = dictGetVal(de);
3686 
3687             /* Don't ping nodes disconnected or with a ping currently active. */
3688             if (this->link == NULL || this->ping_sent != 0) continue;
3689             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3690                 continue;
3691             if (min_pong_node == NULL || min_pong > this->pong_received) {
3692                 min_pong_node = this;
3693                 min_pong = this->pong_received;
3694             }
3695         }
3696         if (min_pong_node) {
3697             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3698             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3699         }
3700     }
3701 
3702     /* Iterate nodes to check if we need to flag something as failing.
3703      * This loop is also responsible to:
3704      * 1) Check if there are orphaned masters (masters without non failing
3705      *    slaves).
3706      * 2) Count the max number of non failing slaves for a single master.
3707      * 3) Count the number of slaves for our master, if we are a slave. */
3708     orphaned_masters = 0;
3709     max_slaves = 0;
3710     this_slaves = 0;
3711     di = dictGetSafeIterator(server.cluster->nodes);
3712     while((de = dictNext(di)) != NULL) {
3713         clusterNode *node = dictGetVal(de);
3714         now = mstime(); /* Use an updated time at every iteration. */
3715 
3716         if (node->flags &
3717             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3718                 continue;
3719 
3720         /* Orphaned master check, useful only if the current instance
3721          * is a slave that may migrate to another master. */
3722         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3723             int okslaves = clusterCountNonFailingSlaves(node);
3724 
3725             /* A master is orphaned if it is serving a non-zero number of
3726              * slots, have no working slaves, but used to have at least one
3727              * slave, or failed over a master that used to have slaves. */
3728             if (okslaves == 0 && node->numslots > 0 &&
3729                 node->flags & CLUSTER_NODE_MIGRATE_TO)
3730             {
3731                 orphaned_masters++;
3732             }
3733             if (okslaves > max_slaves) max_slaves = okslaves;
3734             if (nodeIsSlave(myself) && myself->slaveof == node)
3735                 this_slaves = okslaves;
3736         }
3737 
3738         /* If we are not receiving any data for more than half the cluster
3739          * timeout, reconnect the link: maybe there is a connection
3740          * issue even if the node is alive. */
3741         mstime_t ping_delay = now - node->ping_sent;
3742         mstime_t data_delay = now - node->data_received;
3743         if (node->link && /* is connected */
3744             now - node->link->ctime >
3745             server.cluster_node_timeout && /* was not already reconnected */
3746             node->ping_sent && /* we already sent a ping */
3747             /* and we are waiting for the pong more than timeout/2 */
3748             ping_delay > server.cluster_node_timeout/2 &&
3749             /* and in such interval we are not seeing any traffic at all. */
3750             data_delay > server.cluster_node_timeout/2)
3751         {
3752             /* Disconnect the link, it will be reconnected automatically. */
3753             freeClusterLink(node->link);
3754         }
3755 
3756         /* If we have currently no active ping in this instance, and the
3757          * received PONG is older than half the cluster timeout, send
3758          * a new ping now, to ensure all the nodes are pinged without
3759          * a too big delay. */
3760         if (node->link &&
3761             node->ping_sent == 0 &&
3762             (now - node->pong_received) > server.cluster_node_timeout/2)
3763         {
3764             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3765             continue;
3766         }
3767 
3768         /* If we are a master and one of the slaves requested a manual
3769          * failover, ping it continuously. */
3770         if (server.cluster->mf_end &&
3771             nodeIsMaster(myself) &&
3772             server.cluster->mf_slave == node &&
3773             node->link)
3774         {
3775             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3776             continue;
3777         }
3778 
3779         /* Check only if we have an active ping for this instance. */
3780         if (node->ping_sent == 0) continue;
3781 
3782         /* Check if this node looks unreachable.
3783          * Note that if we already received the PONG, then node->ping_sent
3784          * is zero, so can't reach this code at all, so we don't risk of
3785          * checking for a PONG delay if we didn't sent the PING.
3786          *
3787          * We also consider every incoming data as proof of liveness, since
3788          * our cluster bus link is also used for data: under heavy data
3789          * load pong delays are possible. */
3790         mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
3791                                                           data_delay;
3792 
3793         if (node_delay > server.cluster_node_timeout) {
3794             /* Timeout reached. Set the node as possibly failing if it is
3795              * not already in this state. */
3796             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3797                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3798                     node->name);
3799                 node->flags |= CLUSTER_NODE_PFAIL;
3800                 update_state = 1;
3801             }
3802         }
3803     }
3804     dictReleaseIterator(di);
3805 
3806     /* If we are a slave node but the replication is still turned off,
3807      * enable it if we know the address of our master and it appears to
3808      * be up. */
3809     if (nodeIsSlave(myself) &&
3810         server.masterhost == NULL &&
3811         myself->slaveof &&
3812         nodeHasAddr(myself->slaveof))
3813     {
3814         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3815     }
3816 
3817     /* Abort a manual failover if the timeout is reached. */
3818     manualFailoverCheckTimeout();
3819 
3820     if (nodeIsSlave(myself)) {
3821         clusterHandleManualFailover();
3822         if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3823             clusterHandleSlaveFailover();
3824         /* If there are orphaned slaves, and we are a slave among the masters
3825          * with the max number of non-failing slaves, consider migrating to
3826          * the orphaned masters. Note that it does not make sense to try
3827          * a migration if there is no master with at least *two* working
3828          * slaves. */
3829         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
3830 		server.cluster_allow_replica_migration)
3831             clusterHandleSlaveMigration(max_slaves);
3832     }
3833 
3834     if (update_state || server.cluster->state == CLUSTER_FAIL)
3835         clusterUpdateState();
3836 }
3837 
3838 /* This function is called before the event handler returns to sleep for
3839  * events. It is useful to perform operations that must be done ASAP in
3840  * reaction to events fired but that are not safe to perform inside event
3841  * handlers, or to perform potentially expansive tasks that we need to do
3842  * a single time before replying to clients. */
clusterBeforeSleep(void)3843 void clusterBeforeSleep(void) {
3844     int flags = server.cluster->todo_before_sleep;
3845 
3846     /* Reset our flags (not strictly needed since every single function
3847      * called for flags set should be able to clear its flag). */
3848     server.cluster->todo_before_sleep = 0;
3849 
3850     if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
3851         /* Handle manual failover as soon as possible so that won't have a 100ms
3852          * as it was handled only in clusterCron */
3853         if(nodeIsSlave(myself)) {
3854             clusterHandleManualFailover();
3855             if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3856                 clusterHandleSlaveFailover();
3857         }
3858     } else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
3859         /* Handle failover, this is needed when it is likely that there is already
3860          * the quorum from masters in order to react fast. */
3861         clusterHandleSlaveFailover();
3862     }
3863 
3864     /* Update the cluster state. */
3865     if (flags & CLUSTER_TODO_UPDATE_STATE)
3866         clusterUpdateState();
3867 
3868     /* Save the config, possibly using fsync. */
3869     if (flags & CLUSTER_TODO_SAVE_CONFIG) {
3870         int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
3871         clusterSaveConfigOrDie(fsync);
3872     }
3873 }
3874 
clusterDoBeforeSleep(int flags)3875 void clusterDoBeforeSleep(int flags) {
3876     server.cluster->todo_before_sleep |= flags;
3877 }
3878 
3879 /* -----------------------------------------------------------------------------
3880  * Slots management
3881  * -------------------------------------------------------------------------- */
3882 
3883 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3884  * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3885 int bitmapTestBit(unsigned char *bitmap, int pos) {
3886     off_t byte = pos/8;
3887     int bit = pos&7;
3888     return (bitmap[byte] & (1<<bit)) != 0;
3889 }
3890 
3891 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3892 void bitmapSetBit(unsigned char *bitmap, int pos) {
3893     off_t byte = pos/8;
3894     int bit = pos&7;
3895     bitmap[byte] |= 1<<bit;
3896 }
3897 
3898 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3899 void bitmapClearBit(unsigned char *bitmap, int pos) {
3900     off_t byte = pos/8;
3901     int bit = pos&7;
3902     bitmap[byte] &= ~(1<<bit);
3903 }
3904 
3905 /* Return non-zero if there is at least one master with slaves in the cluster.
3906  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3907  * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3908 int clusterMastersHaveSlaves(void) {
3909     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3910     dictEntry *de;
3911     int slaves = 0;
3912     while((de = dictNext(di)) != NULL) {
3913         clusterNode *node = dictGetVal(de);
3914 
3915         if (nodeIsSlave(node)) continue;
3916         slaves += node->numslaves;
3917     }
3918     dictReleaseIterator(di);
3919     return slaves != 0;
3920 }
3921 
3922 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3923 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3924     int old = bitmapTestBit(n->slots,slot);
3925     bitmapSetBit(n->slots,slot);
3926     if (!old) {
3927         n->numslots++;
3928         /* When a master gets its first slot, even if it has no slaves,
3929          * it gets flagged with MIGRATE_TO, that is, the master is a valid
3930          * target for replicas migration, if and only if at least one of
3931          * the other masters has slaves right now.
3932          *
3933          * Normally masters are valid targets of replica migration if:
3934          * 1. The used to have slaves (but no longer have).
3935          * 2. They are slaves failing over a master that used to have slaves.
3936          *
3937          * However new masters with slots assigned are considered valid
3938          * migration targets if the rest of the cluster is not a slave-less.
3939          *
3940          * See https://github.com/redis/redis/issues/3043 for more info. */
3941         if (n->numslots == 1 && clusterMastersHaveSlaves())
3942             n->flags |= CLUSTER_NODE_MIGRATE_TO;
3943     }
3944     return old;
3945 }
3946 
3947 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3948 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3949     int old = bitmapTestBit(n->slots,slot);
3950     bitmapClearBit(n->slots,slot);
3951     if (old) n->numslots--;
3952     return old;
3953 }
3954 
3955 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3956 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3957     return bitmapTestBit(n->slots,slot);
3958 }
3959 
3960 /* Add the specified slot to the list of slots that node 'n' will
3961  * serve. Return C_OK if the operation ended with success.
3962  * If the slot is already assigned to another instance this is considered
3963  * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3964 int clusterAddSlot(clusterNode *n, int slot) {
3965     if (server.cluster->slots[slot]) return C_ERR;
3966     clusterNodeSetSlotBit(n,slot);
3967     server.cluster->slots[slot] = n;
3968     return C_OK;
3969 }
3970 
3971 /* Delete the specified slot marking it as unassigned.
3972  * Returns C_OK if the slot was assigned, otherwise if the slot was
3973  * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3974 int clusterDelSlot(int slot) {
3975     clusterNode *n = server.cluster->slots[slot];
3976 
3977     if (!n) return C_ERR;
3978     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3979     server.cluster->slots[slot] = NULL;
3980     return C_OK;
3981 }
3982 
3983 /* Delete all the slots associated with the specified node.
3984  * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3985 int clusterDelNodeSlots(clusterNode *node) {
3986     int deleted = 0, j;
3987 
3988     for (j = 0; j < CLUSTER_SLOTS; j++) {
3989         if (clusterNodeGetSlotBit(node,j)) {
3990             clusterDelSlot(j);
3991             deleted++;
3992         }
3993     }
3994     return deleted;
3995 }
3996 
3997 /* Clear the migrating / importing state for all the slots.
3998  * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3999 void clusterCloseAllSlots(void) {
4000     memset(server.cluster->migrating_slots_to,0,
4001         sizeof(server.cluster->migrating_slots_to));
4002     memset(server.cluster->importing_slots_from,0,
4003         sizeof(server.cluster->importing_slots_from));
4004 }
4005 
4006 /* -----------------------------------------------------------------------------
4007  * Cluster state evaluation function
4008  * -------------------------------------------------------------------------- */
4009 
4010 /* The following are defines that are only used in the evaluation function
4011  * and are based on heuristics. Actually the main point about the rejoin and
4012  * writable delay is that they should be a few orders of magnitude larger
4013  * than the network latency. */
4014 #define CLUSTER_MAX_REJOIN_DELAY 5000
4015 #define CLUSTER_MIN_REJOIN_DELAY 500
4016 #define CLUSTER_WRITABLE_DELAY 2000
4017 
clusterUpdateState(void)4018 void clusterUpdateState(void) {
4019     int j, new_state;
4020     int reachable_masters = 0;
4021     static mstime_t among_minority_time;
4022     static mstime_t first_call_time = 0;
4023 
4024     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
4025 
4026     /* If this is a master node, wait some time before turning the state
4027      * into OK, since it is not a good idea to rejoin the cluster as a writable
4028      * master, after a reboot, without giving the cluster a chance to
4029      * reconfigure this node. Note that the delay is calculated starting from
4030      * the first call to this function and not since the server start, in order
4031      * to not count the DB loading time. */
4032     if (first_call_time == 0) first_call_time = mstime();
4033     if (nodeIsMaster(myself) &&
4034         server.cluster->state == CLUSTER_FAIL &&
4035         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
4036 
4037     /* Start assuming the state is OK. We'll turn it into FAIL if there
4038      * are the right conditions. */
4039     new_state = CLUSTER_OK;
4040 
4041     /* Check if all the slots are covered. */
4042     if (server.cluster_require_full_coverage) {
4043         for (j = 0; j < CLUSTER_SLOTS; j++) {
4044             if (server.cluster->slots[j] == NULL ||
4045                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
4046             {
4047                 new_state = CLUSTER_FAIL;
4048                 break;
4049             }
4050         }
4051     }
4052 
4053     /* Compute the cluster size, that is the number of master nodes
4054      * serving at least a single slot.
4055      *
4056      * At the same time count the number of reachable masters having
4057      * at least one slot. */
4058     {
4059         dictIterator *di;
4060         dictEntry *de;
4061 
4062         server.cluster->size = 0;
4063         di = dictGetSafeIterator(server.cluster->nodes);
4064         while((de = dictNext(di)) != NULL) {
4065             clusterNode *node = dictGetVal(de);
4066 
4067             if (nodeIsMaster(node) && node->numslots) {
4068                 server.cluster->size++;
4069                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
4070                     reachable_masters++;
4071             }
4072         }
4073         dictReleaseIterator(di);
4074     }
4075 
4076     /* If we are in a minority partition, change the cluster state
4077      * to FAIL. */
4078     {
4079         int needed_quorum = (server.cluster->size / 2) + 1;
4080 
4081         if (reachable_masters < needed_quorum) {
4082             new_state = CLUSTER_FAIL;
4083             among_minority_time = mstime();
4084         }
4085     }
4086 
4087     /* Log a state change */
4088     if (new_state != server.cluster->state) {
4089         mstime_t rejoin_delay = server.cluster_node_timeout;
4090 
4091         /* If the instance is a master and was partitioned away with the
4092          * minority, don't let it accept queries for some time after the
4093          * partition heals, to make sure there is enough time to receive
4094          * a configuration update. */
4095         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
4096             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
4097         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
4098             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
4099 
4100         if (new_state == CLUSTER_OK &&
4101             nodeIsMaster(myself) &&
4102             mstime() - among_minority_time < rejoin_delay)
4103         {
4104             return;
4105         }
4106 
4107         /* Change the state and log the event. */
4108         serverLog(LL_WARNING,"Cluster state changed: %s",
4109             new_state == CLUSTER_OK ? "ok" : "fail");
4110         server.cluster->state = new_state;
4111     }
4112 }
4113 
4114 /* This function is called after the node startup in order to verify that data
4115  * loaded from disk is in agreement with the cluster configuration:
4116  *
4117  * 1) If we find keys about hash slots we have no responsibility for, the
4118  *    following happens:
4119  *    A) If no other node is in charge according to the current cluster
4120  *       configuration, we add these slots to our node.
4121  *    B) If according to our config other nodes are already in charge for
4122  *       this slots, we set the slots as IMPORTING from our point of view
4123  *       in order to justify we have those slots, and in order to make
4124  *       redis-cli aware of the issue, so that it can try to fix it.
4125  * 2) If we find data in a DB different than DB0 we return C_ERR to
4126  *    signal the caller it should quit the server with an error message
4127  *    or take other actions.
4128  *
4129  * The function always returns C_OK even if it will try to correct
4130  * the error described in "1". However if data is found in DB different
4131  * from DB0, C_ERR is returned.
4132  *
4133  * The function also uses the logging facility in order to warn the user
4134  * about desynchronizations between the data we have in memory and the
4135  * cluster configuration. */
verifyClusterConfigWithData(void)4136 int verifyClusterConfigWithData(void) {
4137     int j;
4138     int update_config = 0;
4139 
4140     /* Return ASAP if a module disabled cluster redirections. In that case
4141      * every master can store keys about every possible hash slot. */
4142     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
4143         return C_OK;
4144 
4145     /* If this node is a slave, don't perform the check at all as we
4146      * completely depend on the replication stream. */
4147     if (nodeIsSlave(myself)) return C_OK;
4148 
4149     /* Make sure we only have keys in DB0. */
4150     for (j = 1; j < server.dbnum; j++) {
4151         if (dictSize(server.db[j].dict)) return C_ERR;
4152     }
4153 
4154     /* Check that all the slots we see populated memory have a corresponding
4155      * entry in the cluster table. Otherwise fix the table. */
4156     for (j = 0; j < CLUSTER_SLOTS; j++) {
4157         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
4158         /* Check if we are assigned to this slot or if we are importing it.
4159          * In both cases check the next slot as the configuration makes
4160          * sense. */
4161         if (server.cluster->slots[j] == myself ||
4162             server.cluster->importing_slots_from[j] != NULL) continue;
4163 
4164         /* If we are here data and cluster config don't agree, and we have
4165          * slot 'j' populated even if we are not importing it, nor we are
4166          * assigned to this slot. Fix this condition. */
4167 
4168         update_config++;
4169         /* Case A: slot is unassigned. Take responsibility for it. */
4170         if (server.cluster->slots[j] == NULL) {
4171             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
4172                                     "Taking responsibility for it.",j);
4173             clusterAddSlot(myself,j);
4174         } else {
4175             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
4176                                     "assigned to another node. "
4177                                     "Setting it to importing state.",j);
4178             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
4179         }
4180     }
4181     if (update_config) clusterSaveConfigOrDie(1);
4182     return C_OK;
4183 }
4184 
4185 /* -----------------------------------------------------------------------------
4186  * SLAVE nodes handling
4187  * -------------------------------------------------------------------------- */
4188 
4189 /* Set the specified node 'n' as master for this node.
4190  * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)4191 void clusterSetMaster(clusterNode *n) {
4192     serverAssert(n != myself);
4193     serverAssert(myself->numslots == 0);
4194 
4195     if (nodeIsMaster(myself)) {
4196         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
4197         myself->flags |= CLUSTER_NODE_SLAVE;
4198         clusterCloseAllSlots();
4199     } else {
4200         if (myself->slaveof)
4201             clusterNodeRemoveSlave(myself->slaveof,myself);
4202     }
4203     myself->slaveof = n;
4204     clusterNodeAddSlave(n,myself);
4205     replicationSetMaster(n->ip, n->port);
4206     resetManualFailover();
4207 }
4208 
4209 /* -----------------------------------------------------------------------------
4210  * Nodes to string representation functions.
4211  * -------------------------------------------------------------------------- */
4212 
4213 struct redisNodeFlags {
4214     uint16_t flag;
4215     char *name;
4216 };
4217 
4218 static struct redisNodeFlags redisNodeFlagsTable[] = {
4219     {CLUSTER_NODE_MYSELF,       "myself,"},
4220     {CLUSTER_NODE_MASTER,       "master,"},
4221     {CLUSTER_NODE_SLAVE,        "slave,"},
4222     {CLUSTER_NODE_PFAIL,        "fail?,"},
4223     {CLUSTER_NODE_FAIL,         "fail,"},
4224     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
4225     {CLUSTER_NODE_NOADDR,       "noaddr,"},
4226     {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
4227 };
4228 
4229 /* Concatenate the comma separated list of node flags to the given SDS
4230  * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4231 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4232     size_t orig_len = sdslen(ci);
4233     int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4234     for (i = 0; i < size; i++) {
4235         struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4236         if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4237     }
4238     /* If no flag was added, add the "noflags" special flag. */
4239     if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4240     sdsIncrLen(ci,-1); /* Remove trailing comma. */
4241     return ci;
4242 }
4243 
4244 /* Generate a csv-alike representation of the specified cluster node.
4245  * See clusterGenNodesDescription() top comment for more information.
4246  *
4247  * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node,int use_pport)4248 sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
4249     int j, start;
4250     sds ci;
4251     int port = use_pport && node->pport ? node->pport : node->port;
4252 
4253     /* Node coordinates */
4254     ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
4255     ci = sdscatfmt(ci," %s:%i@%i ",
4256         node->ip,
4257         port,
4258         node->cport);
4259 
4260     /* Flags */
4261     ci = representClusterNodeFlags(ci, node->flags);
4262 
4263     /* Slave of... or just "-" */
4264     ci = sdscatlen(ci," ",1);
4265     if (node->slaveof)
4266         ci = sdscatlen(ci,node->slaveof->name,CLUSTER_NAMELEN);
4267     else
4268         ci = sdscatlen(ci,"-",1);
4269 
4270     unsigned long long nodeEpoch = node->configEpoch;
4271     if (nodeIsSlave(node) && node->slaveof) {
4272         nodeEpoch = node->slaveof->configEpoch;
4273     }
4274     /* Latency from the POV of this node, config epoch, link status */
4275     ci = sdscatfmt(ci," %I %I %U %s",
4276         (long long) node->ping_sent,
4277         (long long) node->pong_received,
4278         nodeEpoch,
4279         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4280                     "connected" : "disconnected");
4281 
4282     /* Slots served by this instance. If we already have slots info,
4283      * append it directly, otherwise, generate slots only if it has. */
4284     if (node->slots_info) {
4285         ci = sdscatsds(ci, node->slots_info);
4286     } else if (node->numslots > 0) {
4287         start = -1;
4288         for (j = 0; j < CLUSTER_SLOTS; j++) {
4289             int bit;
4290 
4291             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4292                 if (start == -1) start = j;
4293             }
4294             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4295                 if (bit && j == CLUSTER_SLOTS-1) j++;
4296 
4297                 if (start == j-1) {
4298                     ci = sdscatfmt(ci," %i",start);
4299                 } else {
4300                     ci = sdscatfmt(ci," %i-%i",start,j-1);
4301                 }
4302                 start = -1;
4303             }
4304         }
4305     }
4306 
4307     /* Just for MYSELF node we also dump info about slots that
4308      * we are migrating to other instances or importing from other
4309      * instances. */
4310     if (node->flags & CLUSTER_NODE_MYSELF) {
4311         for (j = 0; j < CLUSTER_SLOTS; j++) {
4312             if (server.cluster->migrating_slots_to[j]) {
4313                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4314                     server.cluster->migrating_slots_to[j]->name);
4315             } else if (server.cluster->importing_slots_from[j]) {
4316                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4317                     server.cluster->importing_slots_from[j]->name);
4318             }
4319         }
4320     }
4321     return ci;
4322 }
4323 
4324 /* Generate the slot topology for all nodes and store the string representation
4325  * in the slots_info struct on the node. This is used to improve the efficiency
4326  * of clusterGenNodesDescription() because it removes looping of the slot space
4327  * for generating the slot info for each node individually. */
clusterGenNodesSlotsInfo(int filter)4328 void clusterGenNodesSlotsInfo(int filter) {
4329     clusterNode *n = NULL;
4330     int start = -1;
4331 
4332     for (int i = 0; i <= CLUSTER_SLOTS; i++) {
4333         /* Find start node and slot id. */
4334         if (n == NULL) {
4335             if (i == CLUSTER_SLOTS) break;
4336             n = server.cluster->slots[i];
4337             start = i;
4338             continue;
4339         }
4340 
4341         /* Generate slots info when occur different node with start
4342          * or end of slot. */
4343         if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
4344             if (!(n->flags & filter)) {
4345                 if (n->slots_info == NULL) n->slots_info = sdsempty();
4346                 if (start == i-1) {
4347                     n->slots_info = sdscatfmt(n->slots_info," %i",start);
4348                 } else {
4349                     n->slots_info = sdscatfmt(n->slots_info," %i-%i",start,i-1);
4350                 }
4351             }
4352             if (i == CLUSTER_SLOTS) break;
4353             n = server.cluster->slots[i];
4354             start = i;
4355         }
4356     }
4357 }
4358 
4359 /* Generate a csv-alike representation of the nodes we are aware of,
4360  * including the "myself" node, and return an SDS string containing the
4361  * representation (it is up to the caller to free it).
4362  *
4363  * All the nodes matching at least one of the node flags specified in
4364  * "filter" are excluded from the output, so using zero as a filter will
4365  * include all the known nodes in the representation, including nodes in
4366  * the HANDSHAKE state.
4367  *
4368  * Setting use_pport to 1 in a TLS cluster makes the result contain the
4369  * plaintext client port rather then the TLS client port of each node.
4370  *
4371  * The representation obtained using this function is used for the output
4372  * of the CLUSTER NODES function, and as format for the cluster
4373  * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter,int use_pport)4374 sds clusterGenNodesDescription(int filter, int use_pport) {
4375     sds ci = sdsempty(), ni;
4376     dictIterator *di;
4377     dictEntry *de;
4378 
4379     /* Generate all nodes slots info firstly. */
4380     clusterGenNodesSlotsInfo(filter);
4381 
4382     di = dictGetSafeIterator(server.cluster->nodes);
4383     while((de = dictNext(di)) != NULL) {
4384         clusterNode *node = dictGetVal(de);
4385 
4386         if (node->flags & filter) continue;
4387         ni = clusterGenNodeDescription(node, use_pport);
4388         ci = sdscatsds(ci,ni);
4389         sdsfree(ni);
4390         ci = sdscatlen(ci,"\n",1);
4391 
4392         /* Release slots info. */
4393         if (node->slots_info) {
4394             sdsfree(node->slots_info);
4395             node->slots_info = NULL;
4396         }
4397     }
4398     dictReleaseIterator(di);
4399     return ci;
4400 }
4401 
4402 /* -----------------------------------------------------------------------------
4403  * CLUSTER command
4404  * -------------------------------------------------------------------------- */
4405 
clusterGetMessageTypeString(int type)4406 const char *clusterGetMessageTypeString(int type) {
4407     switch(type) {
4408     case CLUSTERMSG_TYPE_PING: return "ping";
4409     case CLUSTERMSG_TYPE_PONG: return "pong";
4410     case CLUSTERMSG_TYPE_MEET: return "meet";
4411     case CLUSTERMSG_TYPE_FAIL: return "fail";
4412     case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4413     case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4414     case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4415     case CLUSTERMSG_TYPE_UPDATE: return "update";
4416     case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4417     case CLUSTERMSG_TYPE_MODULE: return "module";
4418     }
4419     return "unknown";
4420 }
4421 
getSlotOrReply(client * c,robj * o)4422 int getSlotOrReply(client *c, robj *o) {
4423     long long slot;
4424 
4425     if (getLongLongFromObject(o,&slot) != C_OK ||
4426         slot < 0 || slot >= CLUSTER_SLOTS)
4427     {
4428         addReplyError(c,"Invalid or out of range slot");
4429         return -1;
4430     }
4431     return (int) slot;
4432 }
4433 
4434 /* Returns an indication if the replica node is fully available
4435  * and should be listed in CLUSTER SLOTS response.
4436  * Returns 1 for available nodes, 0 for nodes that have
4437  * not finished their initial sync, in failed state, or are
4438  * otherwise considered not available to serve read commands. */
isReplicaAvailable(clusterNode * node)4439 static int isReplicaAvailable(clusterNode *node) {
4440     if (nodeFailed(node)) {
4441         return 0;
4442     }
4443     long long repl_offset = node->repl_offset;
4444     if (node->flags & CLUSTER_NODE_MYSELF) {
4445         /* Nodes do not update their own information
4446          * in the cluster node list. */
4447         repl_offset = replicationGetSlaveOffset();
4448     }
4449     return (repl_offset != 0);
4450 }
4451 
checkSlotAssignmentsOrReply(client * c,unsigned char * slots,int del,int start_slot,int end_slot)4452 int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) {
4453     int slot;
4454     for (slot = start_slot; slot <= end_slot; slot++) {
4455         if (del && server.cluster->slots[slot] == NULL) {
4456             addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4457             return C_ERR;
4458         } else if (!del && server.cluster->slots[slot]) {
4459             addReplyErrorFormat(c,"Slot %d is already busy", slot);
4460             return C_ERR;
4461         }
4462         if (slots[slot]++ == 1) {
4463             addReplyErrorFormat(c,"Slot %d specified multiple times",(int)slot);
4464             return C_ERR;
4465         }
4466     }
4467     return C_OK;
4468 }
4469 
clusterUpdateSlots(client * c,unsigned char * slots,int del)4470 void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
4471     int j;
4472     for (j = 0; j < CLUSTER_SLOTS; j++) {
4473         if (slots[j]) {
4474             int retval;
4475 
4476             /* If this slot was set as importing we can clear this
4477              * state as now we are the real owner of the slot. */
4478             if (server.cluster->importing_slots_from[j])
4479                 server.cluster->importing_slots_from[j] = NULL;
4480 
4481             retval = del ? clusterDelSlot(j) :
4482                            clusterAddSlot(myself,j);
4483             serverAssertWithInfo(c,NULL,retval == C_OK);
4484         }
4485     }
4486 }
4487 
addNodeReplyForClusterSlot(client * c,clusterNode * node,int start_slot,int end_slot)4488 void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
4489     int i, nested_elements = 3; /* slots (2) + master addr (1) */
4490     void *nested_replylen = addReplyDeferredLen(c);
4491     addReplyLongLong(c, start_slot);
4492     addReplyLongLong(c, end_slot);
4493     addReplyArrayLen(c, 3);
4494     addReplyBulkCString(c, node->ip);
4495     /* Report non-TLS ports to non-TLS client in TLS cluster if available. */
4496     int use_pport = (server.tls_cluster &&
4497                      c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
4498     addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
4499     addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4500 
4501     /* Remaining nodes in reply are replicas for slot range */
4502     for (i = 0; i < node->numslaves; i++) {
4503         /* This loop is copy/pasted from clusterGenNodeDescription()
4504          * with modifications for per-slot node aggregation. */
4505         if (!isReplicaAvailable(node->slaves[i])) continue;
4506         addReplyArrayLen(c, 3);
4507         addReplyBulkCString(c, node->slaves[i]->ip);
4508         /* Report slave's non-TLS port to non-TLS client in TLS cluster */
4509         addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
4510                              node->slaves[i]->pport :
4511                              node->slaves[i]->port));
4512         addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4513         nested_elements++;
4514     }
4515     setDeferredArrayLen(c, nested_replylen, nested_elements);
4516 }
4517 
clusterReplyMultiBulkSlots(client * c)4518 void clusterReplyMultiBulkSlots(client * c) {
4519     /* Format: 1) 1) start slot
4520      *            2) end slot
4521      *            3) 1) master IP
4522      *               2) master port
4523      *               3) node ID
4524      *            4) 1) replica IP
4525      *               2) replica port
4526      *               3) node ID
4527      *           ... continued until done
4528      */
4529     clusterNode *n = NULL;
4530     int num_masters = 0, start = -1;
4531     void *slot_replylen = addReplyDeferredLen(c);
4532 
4533     for (int i = 0; i <= CLUSTER_SLOTS; i++) {
4534         /* Find start node and slot id. */
4535         if (n == NULL) {
4536             if (i == CLUSTER_SLOTS) break;
4537             n = server.cluster->slots[i];
4538             start = i;
4539             continue;
4540         }
4541 
4542         /* Add cluster slots info when occur different node with start
4543          * or end of slot. */
4544         if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
4545             addNodeReplyForClusterSlot(c, n, start, i-1);
4546             num_masters++;
4547             if (i == CLUSTER_SLOTS) break;
4548             n = server.cluster->slots[i];
4549             start = i;
4550         }
4551     }
4552     setDeferredArrayLen(c, slot_replylen, num_masters);
4553 }
4554 
clusterCommand(client * c)4555 void clusterCommand(client *c) {
4556     if (server.cluster_enabled == 0) {
4557         addReplyError(c,"This instance has cluster support disabled");
4558         return;
4559     }
4560 
4561     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4562         const char *help[] = {
4563 "ADDSLOTS <slot> [<slot> ...]",
4564 "    Assign slots to current node.",
4565 "ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
4566 "    Assign slots which are between <start-slot> and <end-slot> to current node.",
4567 "BUMPEPOCH",
4568 "    Advance the cluster config epoch.",
4569 "COUNT-FAILURE-REPORTS <node-id>",
4570 "    Return number of failure reports for <node-id>.",
4571 "COUNTKEYSINSLOT <slot>",
4572 "    Return the number of keys in <slot>.",
4573 "DELSLOTS <slot> [<slot> ...]",
4574 "    Delete slots information from current node.",
4575 "DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
4576 "    Delete slots information which are between <start-slot> and <end-slot> from current node.",
4577 "FAILOVER [FORCE|TAKEOVER]",
4578 "    Promote current replica node to being a master.",
4579 "FORGET <node-id>",
4580 "    Remove a node from the cluster.",
4581 "GETKEYSINSLOT <slot> <count>",
4582 "    Return key names stored by current node in a slot.",
4583 "FLUSHSLOTS",
4584 "    Delete current node own slots information.",
4585 "INFO",
4586 "    Return information about the cluster.",
4587 "KEYSLOT <key>",
4588 "    Return the hash slot for <key>.",
4589 "MEET <ip> <port> [<bus-port>]",
4590 "    Connect nodes into a working cluster.",
4591 "MYID",
4592 "    Return the node id.",
4593 "NODES",
4594 "    Return cluster configuration seen by node. Output format:",
4595 "    <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
4596 "REPLICATE <node-id>",
4597 "    Configure current node as replica to <node-id>.",
4598 "RESET [HARD|SOFT]",
4599 "    Reset current node (default: soft).",
4600 "SET-CONFIG-EPOCH <epoch>",
4601 "    Set config epoch of current node.",
4602 "SETSLOT <slot> (IMPORTING|MIGRATING|STABLE|NODE <node-id>)",
4603 "    Set slot state.",
4604 "REPLICAS <node-id>",
4605 "    Return <node-id> replicas.",
4606 "SAVECONFIG",
4607 "    Force saving cluster configuration on disk.",
4608 "SLOTS",
4609 "    Return information about slots range mappings. Each range is made of:",
4610 "    start, end, master and replicas IP addresses, ports and ids",
4611 NULL
4612         };
4613         addReplyHelp(c, help);
4614     } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4615         /* CLUSTER MEET <ip> <port> [cport] */
4616         long long port, cport;
4617 
4618         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4619             addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4620                                 (char*)c->argv[3]->ptr);
4621             return;
4622         }
4623 
4624         if (c->argc == 5) {
4625             if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4626                 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4627                                     (char*)c->argv[4]->ptr);
4628                 return;
4629             }
4630         } else {
4631             cport = port + CLUSTER_PORT_INCR;
4632         }
4633 
4634         if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4635             errno == EINVAL)
4636         {
4637             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4638                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4639         } else {
4640             addReply(c,shared.ok);
4641         }
4642     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4643         /* CLUSTER NODES */
4644         /* Report plaintext ports, only if cluster is TLS but client is known to
4645          * be non-TLS). */
4646         int use_pport = (server.tls_cluster &&
4647                          c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
4648         sds nodes = clusterGenNodesDescription(0, use_pport);
4649         addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
4650         sdsfree(nodes);
4651     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4652         /* CLUSTER MYID */
4653         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4654     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4655         /* CLUSTER SLOTS */
4656         clusterReplyMultiBulkSlots(c);
4657     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4658         /* CLUSTER FLUSHSLOTS */
4659         if (dictSize(server.db[0].dict) != 0) {
4660             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4661             return;
4662         }
4663         clusterDelNodeSlots(myself);
4664         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4665         addReply(c,shared.ok);
4666     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4667                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4668     {
4669         /* CLUSTER ADDSLOTS <slot> [slot] ... */
4670         /* CLUSTER DELSLOTS <slot> [slot] ... */
4671         int j, slot;
4672         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4673         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4674 
4675         memset(slots,0,CLUSTER_SLOTS);
4676         /* Check that all the arguments are parseable.*/
4677         for (j = 2; j < c->argc; j++) {
4678             if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
4679                 zfree(slots);
4680                 return;
4681             }
4682         }
4683         /* Check that the slots are not already busy. */
4684         for (j = 2; j < c->argc; j++) {
4685             slot = getSlotOrReply(c,c->argv[j]);
4686             if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) {
4687                 zfree(slots);
4688                 return;
4689             }
4690         }
4691         clusterUpdateSlots(c, slots, del);
4692         zfree(slots);
4693         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4694         addReply(c,shared.ok);
4695     } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") ||
4696                !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) {
4697         if (c->argc % 2 == 1) {
4698             addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
4699                             c->cmd->name);
4700             return;
4701         }
4702         /* CLUSTER ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
4703         /* CLUSTER DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
4704         int j, startslot, endslot;
4705         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4706         int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange");
4707 
4708         memset(slots,0,CLUSTER_SLOTS);
4709         /* Check that all the arguments are parseable and that all the
4710          * slots are not already busy. */
4711         for (j = 2; j < c->argc; j += 2) {
4712             if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
4713                 zfree(slots);
4714                 return;
4715             }
4716             if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) {
4717                 zfree(slots);
4718                 return;
4719             }
4720             if (startslot > endslot) {
4721                 addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot);
4722                 zfree(slots);
4723                 return;
4724             }
4725 
4726             if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) {
4727                 zfree(slots);
4728                 return;
4729             }
4730         }
4731         clusterUpdateSlots(c, slots, del);
4732         zfree(slots);
4733         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4734         addReply(c,shared.ok);
4735     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4736         /* SETSLOT 10 MIGRATING <node ID> */
4737         /* SETSLOT 10 IMPORTING <node ID> */
4738         /* SETSLOT 10 STABLE */
4739         /* SETSLOT 10 NODE <node ID> */
4740         int slot;
4741         clusterNode *n;
4742 
4743         if (nodeIsSlave(myself)) {
4744             addReplyError(c,"Please use SETSLOT only with masters.");
4745             return;
4746         }
4747 
4748         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4749 
4750         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4751             if (server.cluster->slots[slot] != myself) {
4752                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4753                 return;
4754             }
4755             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4756                 addReplyErrorFormat(c,"I don't know about node %s",
4757                     (char*)c->argv[4]->ptr);
4758                 return;
4759             }
4760             if (nodeIsSlave(n)) {
4761                 addReplyError(c,"Target node is not a master");
4762                 return;
4763             }
4764             server.cluster->migrating_slots_to[slot] = n;
4765         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4766             if (server.cluster->slots[slot] == myself) {
4767                 addReplyErrorFormat(c,
4768                     "I'm already the owner of hash slot %u",slot);
4769                 return;
4770             }
4771             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4772                 addReplyErrorFormat(c,"I don't know about node %s",
4773                     (char*)c->argv[4]->ptr);
4774                 return;
4775             }
4776             if (nodeIsSlave(n)) {
4777                 addReplyError(c,"Target node is not a master");
4778                 return;
4779             }
4780             server.cluster->importing_slots_from[slot] = n;
4781         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4782             /* CLUSTER SETSLOT <SLOT> STABLE */
4783             server.cluster->importing_slots_from[slot] = NULL;
4784             server.cluster->migrating_slots_to[slot] = NULL;
4785         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4786             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4787             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4788 
4789             if (!n) {
4790                 addReplyErrorFormat(c,"Unknown node %s",
4791                     (char*)c->argv[4]->ptr);
4792                 return;
4793             }
4794             /* If this hash slot was served by 'myself' before to switch
4795              * make sure there are no longer local keys for this hash slot. */
4796             if (server.cluster->slots[slot] == myself && n != myself) {
4797                 if (countKeysInSlot(slot) != 0) {
4798                     addReplyErrorFormat(c,
4799                         "Can't assign hashslot %d to a different node "
4800                         "while I still hold keys for this hash slot.", slot);
4801                     return;
4802                 }
4803             }
4804             /* If this slot is in migrating status but we have no keys
4805              * for it assigning the slot to another node will clear
4806              * the migrating status. */
4807             if (countKeysInSlot(slot) == 0 &&
4808                 server.cluster->migrating_slots_to[slot])
4809                 server.cluster->migrating_slots_to[slot] = NULL;
4810 
4811             clusterDelSlot(slot);
4812             clusterAddSlot(n,slot);
4813 
4814             /* If this node was importing this slot, assigning the slot to
4815              * itself also clears the importing status. */
4816             if (n == myself &&
4817                 server.cluster->importing_slots_from[slot])
4818             {
4819                 /* This slot was manually migrated, set this node configEpoch
4820                  * to a new epoch so that the new version can be propagated
4821                  * by the cluster.
4822                  *
4823                  * Note that if this ever results in a collision with another
4824                  * node getting the same configEpoch, for example because a
4825                  * failover happens at the same time we close the slot, the
4826                  * configEpoch collision resolution will fix it assigning
4827                  * a different epoch to each node. */
4828                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4829                     serverLog(LL_WARNING,
4830                         "configEpoch updated after importing slot %d", slot);
4831                 }
4832                 server.cluster->importing_slots_from[slot] = NULL;
4833                 /* After importing this slot, let the other nodes know as
4834                  * soon as possible. */
4835                 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
4836             }
4837         } else {
4838             addReplyError(c,
4839                 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4840             return;
4841         }
4842         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4843         addReply(c,shared.ok);
4844     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4845         /* CLUSTER BUMPEPOCH */
4846         int retval = clusterBumpConfigEpochWithoutConsensus();
4847         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4848                 (retval == C_OK) ? "BUMPED" : "STILL",
4849                 (unsigned long long) myself->configEpoch);
4850         addReplySds(c,reply);
4851     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4852         /* CLUSTER INFO */
4853         char *statestr[] = {"ok","fail","needhelp"};
4854         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4855         uint64_t myepoch;
4856         int j;
4857 
4858         for (j = 0; j < CLUSTER_SLOTS; j++) {
4859             clusterNode *n = server.cluster->slots[j];
4860 
4861             if (n == NULL) continue;
4862             slots_assigned++;
4863             if (nodeFailed(n)) {
4864                 slots_fail++;
4865             } else if (nodeTimedOut(n)) {
4866                 slots_pfail++;
4867             } else {
4868                 slots_ok++;
4869             }
4870         }
4871 
4872         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4873                   myself->slaveof->configEpoch : myself->configEpoch;
4874 
4875         sds info = sdscatprintf(sdsempty(),
4876             "cluster_state:%s\r\n"
4877             "cluster_slots_assigned:%d\r\n"
4878             "cluster_slots_ok:%d\r\n"
4879             "cluster_slots_pfail:%d\r\n"
4880             "cluster_slots_fail:%d\r\n"
4881             "cluster_known_nodes:%lu\r\n"
4882             "cluster_size:%d\r\n"
4883             "cluster_current_epoch:%llu\r\n"
4884             "cluster_my_epoch:%llu\r\n"
4885             , statestr[server.cluster->state],
4886             slots_assigned,
4887             slots_ok,
4888             slots_pfail,
4889             slots_fail,
4890             dictSize(server.cluster->nodes),
4891             server.cluster->size,
4892             (unsigned long long) server.cluster->currentEpoch,
4893             (unsigned long long) myepoch
4894         );
4895 
4896         /* Show stats about messages sent and received. */
4897         long long tot_msg_sent = 0;
4898         long long tot_msg_received = 0;
4899 
4900         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4901             if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4902             tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4903             info = sdscatprintf(info,
4904                 "cluster_stats_messages_%s_sent:%lld\r\n",
4905                 clusterGetMessageTypeString(i),
4906                 server.cluster->stats_bus_messages_sent[i]);
4907         }
4908         info = sdscatprintf(info,
4909             "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4910 
4911         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4912             if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4913             tot_msg_received += server.cluster->stats_bus_messages_received[i];
4914             info = sdscatprintf(info,
4915                 "cluster_stats_messages_%s_received:%lld\r\n",
4916                 clusterGetMessageTypeString(i),
4917                 server.cluster->stats_bus_messages_received[i]);
4918         }
4919         info = sdscatprintf(info,
4920             "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4921 
4922         /* Produce the reply protocol. */
4923         addReplyVerbatim(c,info,sdslen(info),"txt");
4924         sdsfree(info);
4925     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4926         int retval = clusterSaveConfig(1);
4927 
4928         if (retval == 0)
4929             addReply(c,shared.ok);
4930         else
4931             addReplyErrorFormat(c,"error saving the cluster node config: %s",
4932                 strerror(errno));
4933     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4934         /* CLUSTER KEYSLOT <key> */
4935         sds key = c->argv[2]->ptr;
4936 
4937         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4938     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4939         /* CLUSTER COUNTKEYSINSLOT <slot> */
4940         long long slot;
4941 
4942         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4943             return;
4944         if (slot < 0 || slot >= CLUSTER_SLOTS) {
4945             addReplyError(c,"Invalid slot");
4946             return;
4947         }
4948         addReplyLongLong(c,countKeysInSlot(slot));
4949     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4950         /* CLUSTER GETKEYSINSLOT <slot> <count> */
4951         long long maxkeys, slot;
4952 
4953         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4954             return;
4955         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4956             != C_OK)
4957             return;
4958         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4959             addReplyError(c,"Invalid slot or number of keys");
4960             return;
4961         }
4962 
4963         unsigned int keys_in_slot = countKeysInSlot(slot);
4964         unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
4965         addReplyArrayLen(c,numkeys);
4966         dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
4967         for (unsigned int j = 0; j < numkeys; j++) {
4968             serverAssert(de != NULL);
4969             sds sdskey = dictGetKey(de);
4970             addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
4971             de = dictEntryNextInSlot(de);
4972         }
4973     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4974         /* CLUSTER FORGET <NODE ID> */
4975         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4976 
4977         if (!n) {
4978             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4979             return;
4980         } else if (n == myself) {
4981             addReplyError(c,"I tried hard but I can't forget myself...");
4982             return;
4983         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4984             addReplyError(c,"Can't forget my master!");
4985             return;
4986         }
4987         clusterBlacklistAddNode(n);
4988         clusterDelNode(n);
4989         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4990                              CLUSTER_TODO_SAVE_CONFIG);
4991         addReply(c,shared.ok);
4992     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4993         /* CLUSTER REPLICATE <NODE ID> */
4994         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4995 
4996         /* Lookup the specified node in our table. */
4997         if (!n) {
4998             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4999             return;
5000         }
5001 
5002         /* I can't replicate myself. */
5003         if (n == myself) {
5004             addReplyError(c,"Can't replicate myself");
5005             return;
5006         }
5007 
5008         /* Can't replicate a slave. */
5009         if (nodeIsSlave(n)) {
5010             addReplyError(c,"I can only replicate a master, not a replica.");
5011             return;
5012         }
5013 
5014         /* If the instance is currently a master, it should have no assigned
5015          * slots nor keys to accept to replicate some other node.
5016          * Slaves can switch to another master without issues. */
5017         if (nodeIsMaster(myself) &&
5018             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
5019             addReplyError(c,
5020                 "To set a master the node must be empty and "
5021                 "without assigned slots.");
5022             return;
5023         }
5024 
5025         /* Set the master. */
5026         clusterSetMaster(n);
5027         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5028         addReply(c,shared.ok);
5029     } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
5030                 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
5031         /* CLUSTER SLAVES <NODE ID> */
5032         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
5033         int j;
5034 
5035         /* Lookup the specified node in our table. */
5036         if (!n) {
5037             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5038             return;
5039         }
5040 
5041         if (nodeIsSlave(n)) {
5042             addReplyError(c,"The specified node is not a master");
5043             return;
5044         }
5045 
5046         /* Use plaintext port if cluster is TLS but client is non-TLS. */
5047         int use_pport = (server.tls_cluster &&
5048                          c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
5049         addReplyArrayLen(c,n->numslaves);
5050         for (j = 0; j < n->numslaves; j++) {
5051             sds ni = clusterGenNodeDescription(n->slaves[j], use_pport);
5052             addReplyBulkCString(c,ni);
5053             sdsfree(ni);
5054         }
5055     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
5056                c->argc == 3)
5057     {
5058         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
5059         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
5060 
5061         if (!n) {
5062             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5063             return;
5064         } else {
5065             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
5066         }
5067     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
5068                (c->argc == 2 || c->argc == 3))
5069     {
5070         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
5071         int force = 0, takeover = 0;
5072 
5073         if (c->argc == 3) {
5074             if (!strcasecmp(c->argv[2]->ptr,"force")) {
5075                 force = 1;
5076             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
5077                 takeover = 1;
5078                 force = 1; /* Takeover also implies force. */
5079             } else {
5080                 addReplyErrorObject(c,shared.syntaxerr);
5081                 return;
5082             }
5083         }
5084 
5085         /* Check preconditions. */
5086         if (nodeIsMaster(myself)) {
5087             addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
5088             return;
5089         } else if (myself->slaveof == NULL) {
5090             addReplyError(c,"I'm a replica but my master is unknown to me");
5091             return;
5092         } else if (!force &&
5093                    (nodeFailed(myself->slaveof) ||
5094                     myself->slaveof->link == NULL))
5095         {
5096             addReplyError(c,"Master is down or failed, "
5097                             "please use CLUSTER FAILOVER FORCE");
5098             return;
5099         }
5100         resetManualFailover();
5101         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
5102 
5103         if (takeover) {
5104             /* A takeover does not perform any initial check. It just
5105              * generates a new configuration epoch for this node without
5106              * consensus, claims the master's slots, and broadcast the new
5107              * configuration. */
5108             serverLog(LL_WARNING,"Taking over the master (user request).");
5109             clusterBumpConfigEpochWithoutConsensus();
5110             clusterFailoverReplaceYourMaster();
5111         } else if (force) {
5112             /* If this is a forced failover, we don't need to talk with our
5113              * master to agree about the offset. We just failover taking over
5114              * it without coordination. */
5115             serverLog(LL_WARNING,"Forced failover user request accepted.");
5116             server.cluster->mf_can_start = 1;
5117         } else {
5118             serverLog(LL_WARNING,"Manual failover user request accepted.");
5119             clusterSendMFStart(myself->slaveof);
5120         }
5121         addReply(c,shared.ok);
5122     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
5123     {
5124         /* CLUSTER SET-CONFIG-EPOCH <epoch>
5125          *
5126          * The user is allowed to set the config epoch only when a node is
5127          * totally fresh: no config epoch, no other known node, and so forth.
5128          * This happens at cluster creation time to start with a cluster where
5129          * every node has a different node ID, without to rely on the conflicts
5130          * resolution system which is too slow when a big cluster is created. */
5131         long long epoch;
5132 
5133         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
5134             return;
5135 
5136         if (epoch < 0) {
5137             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
5138         } else if (dictSize(server.cluster->nodes) > 1) {
5139             addReplyError(c,"The user can assign a config epoch only when the "
5140                             "node does not know any other node.");
5141         } else if (myself->configEpoch != 0) {
5142             addReplyError(c,"Node config epoch is already non-zero");
5143         } else {
5144             myself->configEpoch = epoch;
5145             serverLog(LL_WARNING,
5146                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
5147                 (unsigned long long) myself->configEpoch);
5148 
5149             if (server.cluster->currentEpoch < (uint64_t)epoch)
5150                 server.cluster->currentEpoch = epoch;
5151             /* No need to fsync the config here since in the unlucky event
5152              * of a failure to persist the config, the conflict resolution code
5153              * will assign a unique config to this node. */
5154             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
5155                                  CLUSTER_TODO_SAVE_CONFIG);
5156             addReply(c,shared.ok);
5157         }
5158     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
5159                (c->argc == 2 || c->argc == 3))
5160     {
5161         /* CLUSTER RESET [SOFT|HARD] */
5162         int hard = 0;
5163 
5164         /* Parse soft/hard argument. Default is soft. */
5165         if (c->argc == 3) {
5166             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
5167                 hard = 1;
5168             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
5169                 hard = 0;
5170             } else {
5171                 addReplyErrorObject(c,shared.syntaxerr);
5172                 return;
5173             }
5174         }
5175 
5176         /* Slaves can be reset while containing data, but not master nodes
5177          * that must be empty. */
5178         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
5179             addReplyError(c,"CLUSTER RESET can't be called with "
5180                             "master nodes containing keys");
5181             return;
5182         }
5183         clusterReset(hard);
5184         addReply(c,shared.ok);
5185     } else {
5186         addReplySubcommandSyntaxError(c);
5187         return;
5188     }
5189 }
5190 
5191 /* -----------------------------------------------------------------------------
5192  * DUMP, RESTORE and MIGRATE commands
5193  * -------------------------------------------------------------------------- */
5194 
5195 /* Generates a DUMP-format representation of the object 'o', adding it to the
5196  * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key,int dbid)5197 void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
5198     unsigned char buf[2];
5199     uint64_t crc;
5200 
5201     /* Serialize the object in an RDB-like format. It consist of an object type
5202      * byte followed by the serialized object. This is understood by RESTORE. */
5203     rioInitWithBuffer(payload,sdsempty());
5204     serverAssert(rdbSaveObjectType(payload,o));
5205     serverAssert(rdbSaveObject(payload,o,key,dbid));
5206 
5207     /* Write the footer, this is how it looks like:
5208      * ----------------+---------------------+---------------+
5209      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
5210      * ----------------+---------------------+---------------+
5211      * RDB version and CRC are both in little endian.
5212      */
5213 
5214     /* RDB version */
5215     buf[0] = RDB_VERSION & 0xff;
5216     buf[1] = (RDB_VERSION >> 8) & 0xff;
5217     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
5218 
5219     /* CRC64 */
5220     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
5221                 sdslen(payload->io.buffer.ptr));
5222     memrev64ifbe(&crc);
5223     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
5224 }
5225 
5226 /* Verify that the RDB version of the dump payload matches the one of this Redis
5227  * instance and that the checksum is ok.
5228  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
5229  * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)5230 int verifyDumpPayload(unsigned char *p, size_t len) {
5231     unsigned char *footer;
5232     uint16_t rdbver;
5233     uint64_t crc;
5234 
5235     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
5236     if (len < 10) return C_ERR;
5237     footer = p+(len-10);
5238 
5239     /* Verify RDB version */
5240     rdbver = (footer[1] << 8) | footer[0];
5241     if (rdbver > RDB_VERSION) return C_ERR;
5242 
5243     if (server.skip_checksum_validation)
5244         return C_OK;
5245 
5246     /* Verify CRC64 */
5247     crc = crc64(0,p,len-8);
5248     memrev64ifbe(&crc);
5249     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
5250 }
5251 
5252 /* DUMP keyname
5253  * DUMP is actually not used by Redis Cluster but it is the obvious
5254  * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)5255 void dumpCommand(client *c) {
5256     robj *o;
5257     rio payload;
5258 
5259     /* Check if the key is here. */
5260     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
5261         addReplyNull(c);
5262         return;
5263     }
5264 
5265     /* Create the DUMP encoded representation. */
5266     createDumpPayload(&payload,o,c->argv[1],c->db->id);
5267 
5268     /* Transfer to the client */
5269     addReplyBulkSds(c,payload.io.buffer.ptr);
5270     return;
5271 }
5272 
5273 /* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
restoreCommand(client * c)5274 void restoreCommand(client *c) {
5275     long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
5276     rio payload;
5277     int j, type, replace = 0, absttl = 0;
5278     robj *obj;
5279 
5280     /* Parse additional options */
5281     for (j = 4; j < c->argc; j++) {
5282         int additional = c->argc-j-1;
5283         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5284             replace = 1;
5285         } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
5286             absttl = 1;
5287         } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
5288                    lfu_freq == -1)
5289         {
5290             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
5291                     != C_OK) return;
5292             if (lru_idle < 0) {
5293                 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
5294                 return;
5295             }
5296             lru_clock = LRU_CLOCK();
5297             j++; /* Consume additional arg. */
5298         } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
5299                    lru_idle == -1)
5300         {
5301             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
5302                     != C_OK) return;
5303             if (lfu_freq < 0 || lfu_freq > 255) {
5304                 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
5305                 return;
5306             }
5307             j++; /* Consume additional arg. */
5308         } else {
5309             addReplyErrorObject(c,shared.syntaxerr);
5310             return;
5311         }
5312     }
5313 
5314     /* Make sure this key does not already exist here... */
5315     robj *key = c->argv[1];
5316     if (!replace && lookupKeyWrite(c->db,key) != NULL) {
5317         addReplyErrorObject(c,shared.busykeyerr);
5318         return;
5319     }
5320 
5321     /* Check if the TTL value makes sense */
5322     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
5323         return;
5324     } else if (ttl < 0) {
5325         addReplyError(c,"Invalid TTL value, must be >= 0");
5326         return;
5327     }
5328 
5329     /* Verify RDB version and data checksum. */
5330     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
5331     {
5332         addReplyError(c,"DUMP payload version or checksum are wrong");
5333         return;
5334     }
5335 
5336     rioInitWithBuffer(&payload,c->argv[3]->ptr);
5337     if (((type = rdbLoadObjectType(&payload)) == -1) ||
5338         ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL))
5339     {
5340         addReplyError(c,"Bad data format");
5341         return;
5342     }
5343 
5344     /* Remove the old key if needed. */
5345     int deleted = 0;
5346     if (replace)
5347         deleted = dbDelete(c->db,key);
5348 
5349     if (ttl && !absttl) ttl+=mstime();
5350     if (ttl && checkAlreadyExpired(ttl)) {
5351         if (deleted) {
5352             rewriteClientCommandVector(c,2,shared.del,key);
5353             signalModifiedKey(c,c->db,key);
5354             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
5355             server.dirty++;
5356         }
5357         decrRefCount(obj);
5358         addReply(c, shared.ok);
5359         return;
5360     }
5361 
5362     /* Create the key and set the TTL if any */
5363     dbAdd(c->db,key,obj);
5364     if (ttl) {
5365         setExpire(c,c->db,key,ttl);
5366         if (!absttl) {
5367             /* Propagate TTL as absolute timestamp */
5368             robj *ttl_obj = createStringObjectFromLongLong(ttl);
5369             rewriteClientCommandArgument(c,2,ttl_obj);
5370             decrRefCount(ttl_obj);
5371             rewriteClientCommandArgument(c,c->argc,shared.absttl);
5372         }
5373     }
5374     objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
5375     signalModifiedKey(c,c->db,key);
5376     notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
5377     addReply(c,shared.ok);
5378     server.dirty++;
5379 }
5380 
5381 /* MIGRATE socket cache implementation.
5382  *
5383  * We take a map between host:ip and a TCP socket that we used to connect
5384  * to this instance in recent time.
5385  * This sockets are closed when the max number we cache is reached, and also
5386  * in serverCron() when they are around for more than a few seconds. */
5387 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5388 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5389 
5390 typedef struct migrateCachedSocket {
5391     connection *conn;
5392     long last_dbid;
5393     time_t last_use_time;
5394 } migrateCachedSocket;
5395 
5396 /* Return a migrateCachedSocket containing a TCP socket connected with the
5397  * target instance, possibly returning a cached one.
5398  *
5399  * This function is responsible of sending errors to the client if a
5400  * connection can't be established. In this case -1 is returned.
5401  * Otherwise on success the socket is returned, and the caller should not
5402  * attempt to free it after usage.
5403  *
5404  * If the caller detects an error while using the socket, migrateCloseSocket()
5405  * should be called so that the connection will be created from scratch
5406  * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5407 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5408     connection *conn;
5409     sds name = sdsempty();
5410     migrateCachedSocket *cs;
5411 
5412     /* Check if we have an already cached socket for this ip:port pair. */
5413     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5414     name = sdscatlen(name,":",1);
5415     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5416     cs = dictFetchValue(server.migrate_cached_sockets,name);
5417     if (cs) {
5418         sdsfree(name);
5419         cs->last_use_time = server.unixtime;
5420         return cs;
5421     }
5422 
5423     /* No cached socket, create one. */
5424     if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5425         /* Too many items, drop one at random. */
5426         dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5427         cs = dictGetVal(de);
5428         connClose(cs->conn);
5429         zfree(cs);
5430         dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5431     }
5432 
5433     /* Create the socket */
5434     conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
5435     if (connBlockingConnect(conn, c->argv[1]->ptr, atoi(c->argv[2]->ptr), timeout)
5436             != C_OK) {
5437         addReplyError(c,"-IOERR error or timeout connecting to the client");
5438         connClose(conn);
5439         sdsfree(name);
5440         return NULL;
5441     }
5442     connEnableTcpNoDelay(conn);
5443 
5444     /* Add to the cache and return it to the caller. */
5445     cs = zmalloc(sizeof(*cs));
5446     cs->conn = conn;
5447 
5448     cs->last_dbid = -1;
5449     cs->last_use_time = server.unixtime;
5450     dictAdd(server.migrate_cached_sockets,name,cs);
5451     return cs;
5452 }
5453 
5454 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5455 void migrateCloseSocket(robj *host, robj *port) {
5456     sds name = sdsempty();
5457     migrateCachedSocket *cs;
5458 
5459     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5460     name = sdscatlen(name,":",1);
5461     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5462     cs = dictFetchValue(server.migrate_cached_sockets,name);
5463     if (!cs) {
5464         sdsfree(name);
5465         return;
5466     }
5467 
5468     connClose(cs->conn);
5469     zfree(cs);
5470     dictDelete(server.migrate_cached_sockets,name);
5471     sdsfree(name);
5472 }
5473 
migrateCloseTimedoutSockets(void)5474 void migrateCloseTimedoutSockets(void) {
5475     dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5476     dictEntry *de;
5477 
5478     while((de = dictNext(di)) != NULL) {
5479         migrateCachedSocket *cs = dictGetVal(de);
5480 
5481         if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5482             connClose(cs->conn);
5483             zfree(cs);
5484             dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5485         }
5486     }
5487     dictReleaseIterator(di);
5488 }
5489 
5490 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
5491  *         AUTH2 username password]
5492  *
5493  * On in the multiple keys form:
5494  *
5495  * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
5496  *         AUTH2 username password] KEYS key1 key2 ... keyN */
migrateCommand(client * c)5497 void migrateCommand(client *c) {
5498     migrateCachedSocket *cs;
5499     int copy = 0, replace = 0, j;
5500     char *username = NULL;
5501     char *password = NULL;
5502     long timeout;
5503     long dbid;
5504     robj **ov = NULL; /* Objects to migrate. */
5505     robj **kv = NULL; /* Key names. */
5506     robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5507     rio cmd, payload;
5508     int may_retry = 1;
5509     int write_error = 0;
5510     int argv_rewritten = 0;
5511 
5512     /* To support the KEYS option we need the following additional state. */
5513     int first_key = 3; /* Argument index of the first key. */
5514     int num_keys = 1;  /* By default only migrate the 'key' argument. */
5515 
5516     /* Parse additional options */
5517     for (j = 6; j < c->argc; j++) {
5518         int moreargs = (c->argc-1) - j;
5519         if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5520             copy = 1;
5521         } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5522             replace = 1;
5523         } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5524             if (!moreargs) {
5525                 addReplyErrorObject(c,shared.syntaxerr);
5526                 return;
5527             }
5528             j++;
5529             password = c->argv[j]->ptr;
5530             redactClientCommandArgument(c,j);
5531         } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
5532             if (moreargs < 2) {
5533                 addReplyErrorObject(c,shared.syntaxerr);
5534                 return;
5535             }
5536             username = c->argv[++j]->ptr;
5537             redactClientCommandArgument(c,j);
5538             password = c->argv[++j]->ptr;
5539             redactClientCommandArgument(c,j);
5540         } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5541             if (sdslen(c->argv[3]->ptr) != 0) {
5542                 addReplyError(c,
5543                     "When using MIGRATE KEYS option, the key argument"
5544                     " must be set to the empty string");
5545                 return;
5546             }
5547             first_key = j+1;
5548             num_keys = c->argc - j - 1;
5549             break; /* All the remaining args are keys. */
5550         } else {
5551             addReplyErrorObject(c,shared.syntaxerr);
5552             return;
5553         }
5554     }
5555 
5556     /* Sanity check */
5557     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5558         getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5559     {
5560         return;
5561     }
5562     if (timeout <= 0) timeout = 1000;
5563 
5564     /* Check if the keys are here. If at least one key is to migrate, do it
5565      * otherwise if all the keys are missing reply with "NOKEY" to signal
5566      * the caller there was nothing to migrate. We don't return an error in
5567      * this case, since often this is due to a normal condition like the key
5568      * expiring in the meantime. */
5569     ov = zrealloc(ov,sizeof(robj*)*num_keys);
5570     kv = zrealloc(kv,sizeof(robj*)*num_keys);
5571     int oi = 0;
5572 
5573     for (j = 0; j < num_keys; j++) {
5574         if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5575             kv[oi] = c->argv[first_key+j];
5576             oi++;
5577         }
5578     }
5579     num_keys = oi;
5580     if (num_keys == 0) {
5581         zfree(ov); zfree(kv);
5582         addReplySds(c,sdsnew("+NOKEY\r\n"));
5583         return;
5584     }
5585 
5586 try_again:
5587     write_error = 0;
5588 
5589     /* Connect */
5590     cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5591     if (cs == NULL) {
5592         zfree(ov); zfree(kv);
5593         return; /* error sent to the client by migrateGetSocket() */
5594     }
5595 
5596     rioInitWithBuffer(&cmd,sdsempty());
5597 
5598     /* Authentication */
5599     if (password) {
5600         int arity = username ? 3 : 2;
5601         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
5602         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5603         if (username) {
5604             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
5605                                  sdslen(username)));
5606         }
5607         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5608             sdslen(password)));
5609     }
5610 
5611     /* Send the SELECT command if the current DB is not already selected. */
5612     int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5613     if (select) {
5614         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5615         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5616         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5617     }
5618 
5619     int non_expired = 0; /* Number of keys that we'll find non expired.
5620                             Note that serializing large keys may take some time
5621                             so certain keys that were found non expired by the
5622                             lookupKey() function, may be expired later. */
5623 
5624     /* Create RESTORE payload and generate the protocol to call the command. */
5625     for (j = 0; j < num_keys; j++) {
5626         long long ttl = 0;
5627         long long expireat = getExpire(c->db,kv[j]);
5628 
5629         if (expireat != -1) {
5630             ttl = expireat-mstime();
5631             if (ttl < 0) {
5632                 continue;
5633             }
5634             if (ttl < 1) ttl = 1;
5635         }
5636 
5637         /* Relocate valid (non expired) keys and values into the array in successive
5638          * positions to remove holes created by the keys that were present
5639          * in the first lookup but are now expired after the second lookup. */
5640         ov[non_expired] = ov[j];
5641         kv[non_expired++] = kv[j];
5642 
5643         serverAssertWithInfo(c,NULL,
5644             rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5645 
5646         if (server.cluster_enabled)
5647             serverAssertWithInfo(c,NULL,
5648                 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5649         else
5650             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5651         serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5652         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5653                 sdslen(kv[j]->ptr)));
5654         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5655 
5656         /* Emit the payload argument, that is the serialized object using
5657          * the DUMP format. */
5658         createDumpPayload(&payload,ov[j],kv[j],dbid);
5659         serverAssertWithInfo(c,NULL,
5660             rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5661                                sdslen(payload.io.buffer.ptr)));
5662         sdsfree(payload.io.buffer.ptr);
5663 
5664         /* Add the REPLACE option to the RESTORE command if it was specified
5665          * as a MIGRATE option. */
5666         if (replace)
5667             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5668     }
5669 
5670     /* Fix the actual number of keys we are migrating. */
5671     num_keys = non_expired;
5672 
5673     /* Transfer the query to the other node in 64K chunks. */
5674     errno = 0;
5675     {
5676         sds buf = cmd.io.buffer.ptr;
5677         size_t pos = 0, towrite;
5678         int nwritten = 0;
5679 
5680         while ((towrite = sdslen(buf)-pos) > 0) {
5681             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5682             nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
5683             if (nwritten != (signed)towrite) {
5684                 write_error = 1;
5685                 goto socket_err;
5686             }
5687             pos += nwritten;
5688         }
5689     }
5690 
5691     char buf0[1024]; /* Auth reply. */
5692     char buf1[1024]; /* Select reply. */
5693     char buf2[1024]; /* Restore reply. */
5694 
5695     /* Read the AUTH reply if needed. */
5696     if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
5697         goto socket_err;
5698 
5699     /* Read the SELECT reply if needed. */
5700     if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
5701         goto socket_err;
5702 
5703     /* Read the RESTORE replies. */
5704     int error_from_target = 0;
5705     int socket_error = 0;
5706     int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5707 
5708     /* Allocate the new argument vector that will replace the current command,
5709      * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5710      * We allocate num_keys+1 because the additional argument is for "DEL"
5711      * command name itself. */
5712     if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5713 
5714     for (j = 0; j < num_keys; j++) {
5715         if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
5716             socket_error = 1;
5717             break;
5718         }
5719         if ((password && buf0[0] == '-') ||
5720             (select && buf1[0] == '-') ||
5721             buf2[0] == '-')
5722         {
5723             /* On error assume that last_dbid is no longer valid. */
5724             if (!error_from_target) {
5725                 cs->last_dbid = -1;
5726                 char *errbuf;
5727                 if (password && buf0[0] == '-') errbuf = buf0;
5728                 else if (select && buf1[0] == '-') errbuf = buf1;
5729                 else errbuf = buf2;
5730 
5731                 error_from_target = 1;
5732                 addReplyErrorFormat(c,"Target instance replied with error: %s",
5733                     errbuf+1);
5734             }
5735         } else {
5736             if (!copy) {
5737                 /* No COPY option: remove the local key, signal the change. */
5738                 dbDelete(c->db,kv[j]);
5739                 signalModifiedKey(c,c->db,kv[j]);
5740                 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
5741                 server.dirty++;
5742 
5743                 /* Populate the argument vector to replace the old one. */
5744                 newargv[del_idx++] = kv[j];
5745                 incrRefCount(kv[j]);
5746             }
5747         }
5748     }
5749 
5750     /* On socket error, if we want to retry, do it now before rewriting the
5751      * command vector. We only retry if we are sure nothing was processed
5752      * and we failed to read the first reply (j == 0 test). */
5753     if (!error_from_target && socket_error && j == 0 && may_retry &&
5754         errno != ETIMEDOUT)
5755     {
5756         goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5757     }
5758 
5759     /* On socket errors, close the migration socket now that we still have
5760      * the original host/port in the ARGV. Later the original command may be
5761      * rewritten to DEL and will be too later. */
5762     if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5763 
5764     if (!copy) {
5765         /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5766          * this only for the keys for which we received an acknowledgement
5767          * from the receiving Redis server, by using the del_idx index. */
5768         if (del_idx > 1) {
5769             newargv[0] = createStringObject("DEL",3);
5770             /* Note that the following call takes ownership of newargv. */
5771             replaceClientCommandVector(c,del_idx,newargv);
5772             argv_rewritten = 1;
5773         } else {
5774             /* No key transfer acknowledged, no need to rewrite as DEL. */
5775             zfree(newargv);
5776         }
5777         newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5778     }
5779 
5780     /* If we are here and a socket error happened, we don't want to retry.
5781      * Just signal the problem to the client, but only do it if we did not
5782      * already queue a different error reported by the destination server. */
5783     if (!error_from_target && socket_error) {
5784         may_retry = 0;
5785         goto socket_err;
5786     }
5787 
5788     if (!error_from_target) {
5789         /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5790          * avoid SELECT the next time if the target DB is the same. Reply +OK.
5791          *
5792          * Note: If we reached this point, even if socket_error is true
5793          * still the SELECT command succeeded (otherwise the code jumps to
5794          * socket_err label. */
5795         cs->last_dbid = dbid;
5796         addReply(c,shared.ok);
5797     } else {
5798         /* On error we already sent it in the for loop above, and set
5799          * the currently selected socket to -1 to force SELECT the next time. */
5800     }
5801 
5802     sdsfree(cmd.io.buffer.ptr);
5803     zfree(ov); zfree(kv); zfree(newargv);
5804     return;
5805 
5806 /* On socket errors we try to close the cached socket and try again.
5807  * It is very common for the cached socket to get closed, if just reopening
5808  * it works it's a shame to notify the error to the caller. */
5809 socket_err:
5810     /* Cleanup we want to perform in both the retry and no retry case.
5811      * Note: Closing the migrate socket will also force SELECT next time. */
5812     sdsfree(cmd.io.buffer.ptr);
5813 
5814     /* If the command was rewritten as DEL and there was a socket error,
5815      * we already closed the socket earlier. While migrateCloseSocket()
5816      * is idempotent, the host/port arguments are now gone, so don't do it
5817      * again. */
5818     if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5819     zfree(newargv);
5820     newargv = NULL; /* This will get reallocated on retry. */
5821 
5822     /* Retry only if it's not a timeout and we never attempted a retry
5823      * (or the code jumping here did not set may_retry to zero). */
5824     if (errno != ETIMEDOUT && may_retry) {
5825         may_retry = 0;
5826         goto try_again;
5827     }
5828 
5829     /* Cleanup we want to do if no retry is attempted. */
5830     zfree(ov); zfree(kv);
5831     addReplyErrorSds(c, sdscatprintf(sdsempty(),
5832                                   "-IOERR error or timeout %s to target instance",
5833                                   write_error ? "writing" : "reading"));
5834     return;
5835 }
5836 
5837 /* -----------------------------------------------------------------------------
5838  * Cluster functions related to serving / redirecting clients
5839  * -------------------------------------------------------------------------- */
5840 
5841 /* The ASKING command is required after a -ASK redirection.
5842  * The client should issue ASKING before to actually send the command to
5843  * the target instance. See the Redis Cluster specification for more
5844  * information. */
askingCommand(client * c)5845 void askingCommand(client *c) {
5846     if (server.cluster_enabled == 0) {
5847         addReplyError(c,"This instance has cluster support disabled");
5848         return;
5849     }
5850     c->flags |= CLIENT_ASKING;
5851     addReply(c,shared.ok);
5852 }
5853 
5854 /* The READONLY command is used by clients to enter the read-only mode.
5855  * In this mode slaves will not redirect clients as long as clients access
5856  * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5857 void readonlyCommand(client *c) {
5858     if (server.cluster_enabled == 0) {
5859         addReplyError(c,"This instance has cluster support disabled");
5860         return;
5861     }
5862     c->flags |= CLIENT_READONLY;
5863     addReply(c,shared.ok);
5864 }
5865 
5866 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5867 void readwriteCommand(client *c) {
5868     if (server.cluster_enabled == 0) {
5869         addReplyError(c,"This instance has cluster support disabled");
5870         return;
5871     }
5872     c->flags &= ~CLIENT_READONLY;
5873     addReply(c,shared.ok);
5874 }
5875 
5876 /* Return the pointer to the cluster node that is able to serve the command.
5877  * For the function to succeed the command should only target either:
5878  *
5879  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5880  * 2) Multiple keys in the same hash slot, while the slot is stable (no
5881  *    resharding in progress).
5882  *
5883  * On success the function returns the node that is able to serve the request.
5884  * If the node is not 'myself' a redirection must be performed. The kind of
5885  * redirection is specified setting the integer passed by reference
5886  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5887  * CLUSTER_REDIR_MOVED.
5888  *
5889  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5890  *
5891  * If the command fails NULL is returned, and the reason of the failure is
5892  * provided via 'error_code', which will be set to:
5893  *
5894  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5895  * don't belong to the same hash slot.
5896  *
5897  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5898  * belonging to the same slot, but the slot is not stable (in migration or
5899  * importing state, likely because a resharding is in progress).
5900  *
5901  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5902  * not bound to any node. In this case the cluster global state should be
5903  * already "down" but it is fragile to rely on the update of the global state,
5904  * so we also handle it here.
5905  *
5906  * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
5907  * down but the user attempts to execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5908 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5909     clusterNode *n = NULL;
5910     robj *firstkey = NULL;
5911     int multiple_keys = 0;
5912     multiState *ms, _ms;
5913     multiCmd mc;
5914     int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5915 
5916     /* Allow any key to be set if a module disabled cluster redirections. */
5917     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5918         return myself;
5919 
5920     /* Set error code optimistically for the base case. */
5921     if (error_code) *error_code = CLUSTER_REDIR_NONE;
5922 
5923     /* Modules can turn off Redis Cluster redirection: this is useful
5924      * when writing a module that implements a completely different
5925      * distributed system. */
5926 
5927     /* We handle all the cases as if they were EXEC commands, so we have
5928      * a common code path for everything */
5929     if (cmd->proc == execCommand) {
5930         /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5931          * error. */
5932         if (!(c->flags & CLIENT_MULTI)) return myself;
5933         ms = &c->mstate;
5934     } else {
5935         /* In order to have a single codepath create a fake Multi State
5936          * structure if the client is not in MULTI/EXEC state, this way
5937          * we have a single codepath below. */
5938         ms = &_ms;
5939         _ms.commands = &mc;
5940         _ms.count = 1;
5941         mc.argv = argv;
5942         mc.argc = argc;
5943         mc.cmd = cmd;
5944     }
5945 
5946     /* Check that all the keys are in the same hash slot, and obtain this
5947      * slot and the node associated. */
5948     for (i = 0; i < ms->count; i++) {
5949         struct redisCommand *mcmd;
5950         robj **margv;
5951         int margc, *keyindex, numkeys, j;
5952 
5953         mcmd = ms->commands[i].cmd;
5954         margc = ms->commands[i].argc;
5955         margv = ms->commands[i].argv;
5956 
5957         getKeysResult result = GETKEYS_RESULT_INIT;
5958         numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
5959         keyindex = result.keys;
5960 
5961         for (j = 0; j < numkeys; j++) {
5962             robj *thiskey = margv[keyindex[j]];
5963             int thisslot = keyHashSlot((char*)thiskey->ptr,
5964                                        sdslen(thiskey->ptr));
5965 
5966             if (firstkey == NULL) {
5967                 /* This is the first key we see. Check what is the slot
5968                  * and node. */
5969                 firstkey = thiskey;
5970                 slot = thisslot;
5971                 n = server.cluster->slots[slot];
5972 
5973                 /* Error: If a slot is not served, we are in "cluster down"
5974                  * state. However the state is yet to be updated, so this was
5975                  * not trapped earlier in processCommand(). Report the same
5976                  * error to the client. */
5977                 if (n == NULL) {
5978                     getKeysFreeResult(&result);
5979                     if (error_code)
5980                         *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5981                     return NULL;
5982                 }
5983 
5984                 /* If we are migrating or importing this slot, we need to check
5985                  * if we have all the keys in the request (the only way we
5986                  * can safely serve the request, otherwise we return a TRYAGAIN
5987                  * error). To do so we set the importing/migrating state and
5988                  * increment a counter for every missing key. */
5989                 if (n == myself &&
5990                     server.cluster->migrating_slots_to[slot] != NULL)
5991                 {
5992                     migrating_slot = 1;
5993                 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5994                     importing_slot = 1;
5995                 }
5996             } else {
5997                 /* If it is not the first key, make sure it is exactly
5998                  * the same key as the first we saw. */
5999                 if (!equalStringObjects(firstkey,thiskey)) {
6000                     if (slot != thisslot) {
6001                         /* Error: multiple keys from different slots. */
6002                         getKeysFreeResult(&result);
6003                         if (error_code)
6004                             *error_code = CLUSTER_REDIR_CROSS_SLOT;
6005                         return NULL;
6006                     } else {
6007                         /* Flag this request as one with multiple different
6008                          * keys. */
6009                         multiple_keys = 1;
6010                     }
6011                 }
6012             }
6013 
6014             /* Migrating / Importing slot? Count keys we don't have. */
6015             int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
6016             if ((migrating_slot || importing_slot) &&
6017                 lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
6018             {
6019                 missing_keys++;
6020             }
6021         }
6022         getKeysFreeResult(&result);
6023     }
6024 
6025     /* No key at all in command? then we can serve the request
6026      * without redirections or errors in all the cases. */
6027     if (n == NULL) return myself;
6028 
6029     /* Cluster is globally down but we got keys? We only serve the request
6030      * if it is a read command and when allow_reads_when_down is enabled. */
6031     if (server.cluster->state != CLUSTER_OK) {
6032         if (!server.cluster_allow_reads_when_down) {
6033             /* The cluster is configured to block commands when the
6034              * cluster is down. */
6035             if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
6036             return NULL;
6037         } else if (cmd->flags & CMD_WRITE) {
6038             /* The cluster is configured to allow read only commands */
6039             if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
6040             return NULL;
6041         } else {
6042             /* Fall through and allow the command to be executed:
6043              * this happens when server.cluster_allow_reads_when_down is
6044              * true and the command is not a write command */
6045         }
6046     }
6047 
6048     /* Return the hashslot by reference. */
6049     if (hashslot) *hashslot = slot;
6050 
6051     /* MIGRATE always works in the context of the local node if the slot
6052      * is open (migrating or importing state). We need to be able to freely
6053      * move keys among instances in this case. */
6054     if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
6055         return myself;
6056 
6057     /* If we don't have all the keys and we are migrating the slot, send
6058      * an ASK redirection. */
6059     if (migrating_slot && missing_keys) {
6060         if (error_code) *error_code = CLUSTER_REDIR_ASK;
6061         return server.cluster->migrating_slots_to[slot];
6062     }
6063 
6064     /* If we are receiving the slot, and the client correctly flagged the
6065      * request as "ASKING", we can serve the request. However if the request
6066      * involves multiple keys and we don't have them all, the only option is
6067      * to send a TRYAGAIN error. */
6068     if (importing_slot &&
6069         (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
6070     {
6071         if (multiple_keys && missing_keys) {
6072             if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
6073             return NULL;
6074         } else {
6075             return myself;
6076         }
6077     }
6078 
6079     /* Handle the read-only client case reading from a slave: if this
6080      * node is a slave and the request is about a hash slot our master
6081      * is serving, we can reply without redirection. */
6082     int is_write_command = (c->cmd->flags & CMD_WRITE) ||
6083                            (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
6084     if (c->flags & CLIENT_READONLY &&
6085         !is_write_command &&
6086         nodeIsSlave(myself) &&
6087         myself->slaveof == n)
6088     {
6089         return myself;
6090     }
6091 
6092     /* Base case: just return the right node. However if this node is not
6093      * myself, set error_code to MOVED since we need to issue a redirection. */
6094     if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
6095     return n;
6096 }
6097 
6098 /* Send the client the right redirection code, according to error_code
6099  * that should be set to one of CLUSTER_REDIR_* macros.
6100  *
6101  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
6102  * are used, then the node 'n' should not be NULL, but should be the
6103  * node we want to mention in the redirection. Moreover hashslot should
6104  * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)6105 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
6106     if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
6107         addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
6108     } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
6109         /* The request spawns multiple keys in the same slot,
6110          * but the slot is not "stable" currently as there is
6111          * a migration or import in progress. */
6112         addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
6113     } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
6114         addReplyError(c,"-CLUSTERDOWN The cluster is down");
6115     } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
6116         addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
6117     } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
6118         addReplyError(c,"-CLUSTERDOWN Hash slot not served");
6119     } else if (error_code == CLUSTER_REDIR_MOVED ||
6120                error_code == CLUSTER_REDIR_ASK)
6121     {
6122         /* Redirect to IP:port. Include plaintext port if cluster is TLS but
6123          * client is non-TLS. */
6124         int use_pport = (server.tls_cluster &&
6125                          c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
6126         int port = use_pport && n->pport ? n->pport : n->port;
6127         addReplyErrorSds(c,sdscatprintf(sdsempty(),
6128             "-%s %d %s:%d",
6129             (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
6130             hashslot, n->ip, port));
6131     } else {
6132         serverPanic("getNodeByQuery() unknown error.");
6133     }
6134 }
6135 
6136 /* This function is called by the function processing clients incrementally
6137  * to detect timeouts, in order to handle the following case:
6138  *
6139  * 1) A client blocks with BLPOP or similar blocking operation.
6140  * 2) The master migrates the hash slot elsewhere or turns into a slave.
6141  * 3) The client may remain blocked forever (or up to the max timeout time)
6142  *    waiting for a key change that will never happen.
6143  *
6144  * If the client is found to be blocked into a hash slot this node no
6145  * longer handles, the client is sent a redirection error, and the function
6146  * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)6147 int clusterRedirectBlockedClientIfNeeded(client *c) {
6148     if (c->flags & CLIENT_BLOCKED &&
6149         (c->btype == BLOCKED_LIST ||
6150          c->btype == BLOCKED_ZSET ||
6151          c->btype == BLOCKED_STREAM ||
6152          c->btype == BLOCKED_MODULE))
6153     {
6154         dictEntry *de;
6155         dictIterator *di;
6156 
6157         /* If the cluster is down, unblock the client with the right error.
6158          * If the cluster is configured to allow reads on cluster down, we
6159          * still want to emit this error since a write will be required
6160          * to unblock them which may never come.  */
6161         if (server.cluster->state == CLUSTER_FAIL) {
6162             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
6163             return 1;
6164         }
6165 
6166         /* If the client is blocked on module, but ont on a specific key,
6167          * don't unblock it (except for the CLSUTER_FAIL case above). */
6168         if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
6169             return 0;
6170 
6171         /* All keys must belong to the same slot, so check first key only. */
6172         di = dictGetIterator(c->bpop.keys);
6173         if ((de = dictNext(di)) != NULL) {
6174             robj *key = dictGetKey(de);
6175             int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
6176             clusterNode *node = server.cluster->slots[slot];
6177 
6178             /* if the client is read-only and attempting to access key that our
6179              * replica can handle, allow it. */
6180             if ((c->flags & CLIENT_READONLY) &&
6181                 !(c->lastcmd->flags & CMD_WRITE) &&
6182                 nodeIsSlave(myself) && myself->slaveof == node)
6183             {
6184                 node = myself;
6185             }
6186 
6187             /* We send an error and unblock the client if:
6188              * 1) The slot is unassigned, emitting a cluster down error.
6189              * 2) The slot is not handled by this node, nor being imported. */
6190             if (node != myself &&
6191                 server.cluster->importing_slots_from[slot] == NULL)
6192             {
6193                 if (node == NULL) {
6194                     clusterRedirectClient(c,NULL,0,
6195                         CLUSTER_REDIR_DOWN_UNBOUND);
6196                 } else {
6197                     clusterRedirectClient(c,node,slot,
6198                         CLUSTER_REDIR_MOVED);
6199                 }
6200                 dictReleaseIterator(di);
6201                 return 1;
6202             }
6203         }
6204         dictReleaseIterator(di);
6205     }
6206     return 0;
6207 }
6208 
6209 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
6210  * a fast way a key that belongs to a specified hash slot. This is useful
6211  * while rehashing the cluster and in other conditions when we need to
6212  * understand if we have keys for a given hash slot. */
6213 
slotToKeyAddEntry(dictEntry * entry,redisDb * db)6214 void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
6215     sds key = entry->key;
6216     unsigned int hashslot = keyHashSlot(key, sdslen(key));
6217     slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6218     slot_to_keys->count++;
6219 
6220     /* Insert entry before the first element in the list. */
6221     dictEntry *first = slot_to_keys->head;
6222     dictEntryNextInSlot(entry) = first;
6223     if (first != NULL) {
6224         serverAssert(dictEntryPrevInSlot(first) == NULL);
6225         dictEntryPrevInSlot(first) = entry;
6226     }
6227     serverAssert(dictEntryPrevInSlot(entry) == NULL);
6228     slot_to_keys->head = entry;
6229 }
6230 
slotToKeyDelEntry(dictEntry * entry,redisDb * db)6231 void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
6232     sds key = entry->key;
6233     unsigned int hashslot = keyHashSlot(key, sdslen(key));
6234     slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6235     slot_to_keys->count--;
6236 
6237     /* Connect previous and next entries to each other. */
6238     dictEntry *next = dictEntryNextInSlot(entry);
6239     dictEntry *prev = dictEntryPrevInSlot(entry);
6240     if (next != NULL) {
6241         dictEntryPrevInSlot(next) = prev;
6242     }
6243     if (prev != NULL) {
6244         dictEntryNextInSlot(prev) = next;
6245     } else {
6246         /* The removed entry was the first in the list. */
6247         serverAssert(slot_to_keys->head == entry);
6248         slot_to_keys->head = next;
6249     }
6250 }
6251 
6252 /* Updates neighbour entries when an entry has been replaced (e.g. reallocated
6253  * during active defrag). */
slotToKeyReplaceEntry(dictEntry * entry,redisDb * db)6254 void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
6255     dictEntry *next = dictEntryNextInSlot(entry);
6256     dictEntry *prev = dictEntryPrevInSlot(entry);
6257     if (next != NULL) {
6258         dictEntryPrevInSlot(next) = entry;
6259     }
6260     if (prev != NULL) {
6261         dictEntryNextInSlot(prev) = entry;
6262     } else {
6263         /* The replaced entry was the first in the list. */
6264         sds key = entry->key;
6265         unsigned int hashslot = keyHashSlot(key, sdslen(key));
6266         slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6267         slot_to_keys->head = entry;
6268     }
6269 }
6270 
6271 /* Initialize slots-keys map of given db. */
slotToKeyInit(redisDb * db)6272 void slotToKeyInit(redisDb *db) {
6273     db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
6274 }
6275 
6276 /* Empty slots-keys map of given db. */
slotToKeyFlush(redisDb * db)6277 void slotToKeyFlush(redisDb *db) {
6278     memset(db->slots_to_keys, 0,
6279         sizeof(clusterSlotToKeyMapping));
6280 }
6281 
6282 /* Free slots-keys map of given db. */
slotToKeyDestroy(redisDb * db)6283 void slotToKeyDestroy(redisDb *db) {
6284     zfree(db->slots_to_keys);
6285     db->slots_to_keys = NULL;
6286 }
6287 
6288 /* Remove all the keys in the specified hash slot.
6289  * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)6290 unsigned int delKeysInSlot(unsigned int hashslot) {
6291     unsigned int j = 0;
6292     dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
6293     while (de != NULL) {
6294         sds sdskey = dictGetKey(de);
6295         de = dictEntryNextInSlot(de);
6296         robj *key = createStringObject(sdskey, sdslen(sdskey));
6297         dbDelete(&server.db[0], key);
6298         decrRefCount(key);
6299         j++;
6300     }
6301     return j;
6302 }
6303 
countKeysInSlot(unsigned int hashslot)6304 unsigned int countKeysInSlot(unsigned int hashslot) {
6305     return (*server.db->slots_to_keys).by_slot[hashslot].count;
6306 }
6307