1 /* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43
44 /* A global reference to myself is handy to make code more clear.
45 * Myself always points to server.cluster->myself, that is, the clusterNode
46 * that represents this node. */
47 clusterNode *myself = NULL;
48
49 clusterNode *createClusterNode(char *nodename, int flags);
50 void clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(connection *conn);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter, int use_pport);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79 const char *clusterGetMessageTypeString(int type);
80 unsigned int countKeysInSlot(unsigned int hashslot);
81 unsigned int delKeysInSlot(unsigned int hashslot);
82
83 /* Links to the next and previous entries for keys in the same slot are stored
84 * in the dict entry metadata. See Slot to Key API below. */
85 #define dictEntryNextInSlot(de) \
86 (((clusterDictEntryMetadata *)dictMetadata(de))->next)
87 #define dictEntryPrevInSlot(de) \
88 (((clusterDictEntryMetadata *)dictMetadata(de))->prev)
89
90 #define RCVBUF_INIT_LEN 1024
91 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
92
93 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
94 * clusterNode structures. */
95 dictType clusterNodesDictType = {
96 dictSdsHash, /* hash function */
97 NULL, /* key dup */
98 NULL, /* val dup */
99 dictSdsKeyCompare, /* key compare */
100 dictSdsDestructor, /* key destructor */
101 NULL, /* val destructor */
102 NULL /* allow to expand */
103 };
104
105 /* Cluster re-addition blacklist. This maps node IDs to the time
106 * we can re-add this node. The goal is to avoid readding a removed
107 * node for some time. */
108 dictType clusterNodesBlackListDictType = {
109 dictSdsCaseHash, /* hash function */
110 NULL, /* key dup */
111 NULL, /* val dup */
112 dictSdsKeyCaseCompare, /* key compare */
113 dictSdsDestructor, /* key destructor */
114 NULL, /* val destructor */
115 NULL /* allow to expand */
116 };
117
118 /* -----------------------------------------------------------------------------
119 * Initialization
120 * -------------------------------------------------------------------------- */
121
122 /* Load the cluster config from 'filename'.
123 *
124 * If the file does not exist or is zero-length (this may happen because
125 * when we lock the nodes.conf file, we create a zero-length one for the
126 * sake of locking if it does not already exist), C_ERR is returned.
127 * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)128 int clusterLoadConfig(char *filename) {
129 FILE *fp = fopen(filename,"r");
130 struct stat sb;
131 char *line;
132 int maxline, j;
133
134 if (fp == NULL) {
135 if (errno == ENOENT) {
136 return C_ERR;
137 } else {
138 serverLog(LL_WARNING,
139 "Loading the cluster node config from %s: %s",
140 filename, strerror(errno));
141 exit(1);
142 }
143 }
144
145 if (redis_fstat(fileno(fp),&sb) == -1) {
146 serverLog(LL_WARNING,
147 "Unable to obtain the cluster node config file stat %s: %s",
148 filename, strerror(errno));
149 exit(1);
150 }
151 /* Check if the file is zero-length: if so return C_ERR to signal
152 * we have to write the config. */
153 if (sb.st_size == 0) {
154 fclose(fp);
155 return C_ERR;
156 }
157
158 /* Parse the file. Note that single lines of the cluster config file can
159 * be really long as they include all the hash slots of the node.
160 * This means in the worst possible case, half of the Redis slots will be
161 * present in a single line, possibly in importing or migrating state, so
162 * together with the node ID of the sender/receiver.
163 *
164 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
165 maxline = 1024+CLUSTER_SLOTS*128;
166 line = zmalloc(maxline);
167 while(fgets(line,maxline,fp) != NULL) {
168 int argc;
169 sds *argv;
170 clusterNode *n, *master;
171 char *p, *s;
172
173 /* Skip blank lines, they can be created either by users manually
174 * editing nodes.conf or by the config writing process if stopped
175 * before the truncate() call. */
176 if (line[0] == '\n' || line[0] == '\0') continue;
177
178 /* Split the line into arguments for processing. */
179 argv = sdssplitargs(line,&argc);
180 if (argv == NULL) goto fmterr;
181
182 /* Handle the special "vars" line. Don't pretend it is the last
183 * line even if it actually is when generated by Redis. */
184 if (strcasecmp(argv[0],"vars") == 0) {
185 if (!(argc % 2)) goto fmterr;
186 for (j = 1; j < argc; j += 2) {
187 if (strcasecmp(argv[j],"currentEpoch") == 0) {
188 server.cluster->currentEpoch =
189 strtoull(argv[j+1],NULL,10);
190 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
191 server.cluster->lastVoteEpoch =
192 strtoull(argv[j+1],NULL,10);
193 } else {
194 serverLog(LL_WARNING,
195 "Skipping unknown cluster config variable '%s'",
196 argv[j]);
197 }
198 }
199 sdsfreesplitres(argv,argc);
200 continue;
201 }
202
203 /* Regular config lines have at least eight fields */
204 if (argc < 8) {
205 sdsfreesplitres(argv,argc);
206 goto fmterr;
207 }
208
209 /* Create this node if it does not exist */
210 n = clusterLookupNode(argv[0]);
211 if (!n) {
212 n = createClusterNode(argv[0],0);
213 clusterAddNode(n);
214 }
215 /* Address and port */
216 if ((p = strrchr(argv[1],':')) == NULL) {
217 sdsfreesplitres(argv,argc);
218 goto fmterr;
219 }
220 *p = '\0';
221 memcpy(n->ip,argv[1],strlen(argv[1])+1);
222 char *port = p+1;
223 char *busp = strchr(port,'@');
224 if (busp) {
225 *busp = '\0';
226 busp++;
227 }
228 n->port = atoi(port);
229 /* In older versions of nodes.conf the "@busport" part is missing.
230 * In this case we set it to the default offset of 10000 from the
231 * base port. */
232 n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
233
234 /* The plaintext port for client in a TLS cluster (n->pport) is not
235 * stored in nodes.conf. It is received later over the bus protocol. */
236
237 /* Parse flags */
238 p = s = argv[2];
239 while(p) {
240 p = strchr(s,',');
241 if (p) *p = '\0';
242 if (!strcasecmp(s,"myself")) {
243 serverAssert(server.cluster->myself == NULL);
244 myself = server.cluster->myself = n;
245 n->flags |= CLUSTER_NODE_MYSELF;
246 } else if (!strcasecmp(s,"master")) {
247 n->flags |= CLUSTER_NODE_MASTER;
248 } else if (!strcasecmp(s,"slave")) {
249 n->flags |= CLUSTER_NODE_SLAVE;
250 } else if (!strcasecmp(s,"fail?")) {
251 n->flags |= CLUSTER_NODE_PFAIL;
252 } else if (!strcasecmp(s,"fail")) {
253 n->flags |= CLUSTER_NODE_FAIL;
254 n->fail_time = mstime();
255 } else if (!strcasecmp(s,"handshake")) {
256 n->flags |= CLUSTER_NODE_HANDSHAKE;
257 } else if (!strcasecmp(s,"noaddr")) {
258 n->flags |= CLUSTER_NODE_NOADDR;
259 } else if (!strcasecmp(s,"nofailover")) {
260 n->flags |= CLUSTER_NODE_NOFAILOVER;
261 } else if (!strcasecmp(s,"noflags")) {
262 /* nothing to do */
263 } else {
264 serverPanic("Unknown flag in redis cluster config file");
265 }
266 if (p) s = p+1;
267 }
268
269 /* Get master if any. Set the master and populate master's
270 * slave list. */
271 if (argv[3][0] != '-') {
272 master = clusterLookupNode(argv[3]);
273 if (!master) {
274 master = createClusterNode(argv[3],0);
275 clusterAddNode(master);
276 }
277 n->slaveof = master;
278 clusterNodeAddSlave(master,n);
279 }
280
281 /* Set ping sent / pong received timestamps */
282 if (atoi(argv[4])) n->ping_sent = mstime();
283 if (atoi(argv[5])) n->pong_received = mstime();
284
285 /* Set configEpoch for this node. */
286 n->configEpoch = strtoull(argv[6],NULL,10);
287
288 /* Populate hash slots served by this instance. */
289 for (j = 8; j < argc; j++) {
290 int start, stop;
291
292 if (argv[j][0] == '[') {
293 /* Here we handle migrating / importing slots */
294 int slot;
295 char direction;
296 clusterNode *cn;
297
298 p = strchr(argv[j],'-');
299 serverAssert(p != NULL);
300 *p = '\0';
301 direction = p[1]; /* Either '>' or '<' */
302 slot = atoi(argv[j]+1);
303 if (slot < 0 || slot >= CLUSTER_SLOTS) {
304 sdsfreesplitres(argv,argc);
305 goto fmterr;
306 }
307 p += 3;
308 cn = clusterLookupNode(p);
309 if (!cn) {
310 cn = createClusterNode(p,0);
311 clusterAddNode(cn);
312 }
313 if (direction == '>') {
314 server.cluster->migrating_slots_to[slot] = cn;
315 } else {
316 server.cluster->importing_slots_from[slot] = cn;
317 }
318 continue;
319 } else if ((p = strchr(argv[j],'-')) != NULL) {
320 *p = '\0';
321 start = atoi(argv[j]);
322 stop = atoi(p+1);
323 } else {
324 start = stop = atoi(argv[j]);
325 }
326 if (start < 0 || start >= CLUSTER_SLOTS ||
327 stop < 0 || stop >= CLUSTER_SLOTS)
328 {
329 sdsfreesplitres(argv,argc);
330 goto fmterr;
331 }
332 while(start <= stop) clusterAddSlot(n, start++);
333 }
334
335 sdsfreesplitres(argv,argc);
336 }
337 /* Config sanity check */
338 if (server.cluster->myself == NULL) goto fmterr;
339
340 zfree(line);
341 fclose(fp);
342
343 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
344
345 /* Something that should never happen: currentEpoch smaller than
346 * the max epoch found in the nodes configuration. However we handle this
347 * as some form of protection against manual editing of critical files. */
348 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
349 server.cluster->currentEpoch = clusterGetMaxEpoch();
350 }
351 return C_OK;
352
353 fmterr:
354 serverLog(LL_WARNING,
355 "Unrecoverable error: corrupted cluster config file.");
356 zfree(line);
357 if (fp) fclose(fp);
358 exit(1);
359 }
360
361 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
362 *
363 * This function writes the node config and returns 0, on error -1
364 * is returned.
365 *
366 * Note: we need to write the file in an atomic way from the point of view
367 * of the POSIX filesystem semantics, so that if the server is stopped
368 * or crashes during the write, we'll end with either the old file or the
369 * new one. Since we have the full payload to write available we can use
370 * a single write to write the whole file. If the pre-existing file was
371 * bigger we pad our payload with newlines that are anyway ignored and truncate
372 * the file afterward. */
clusterSaveConfig(int do_fsync)373 int clusterSaveConfig(int do_fsync) {
374 sds ci;
375 size_t content_size;
376 struct stat sb;
377 int fd;
378
379 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
380
381 /* Get the nodes description and concatenate our "vars" directive to
382 * save currentEpoch and lastVoteEpoch. */
383 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
384 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
385 (unsigned long long) server.cluster->currentEpoch,
386 (unsigned long long) server.cluster->lastVoteEpoch);
387 content_size = sdslen(ci);
388
389 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
390 == -1) goto err;
391
392 if (redis_fstat(fd,&sb) == -1) goto err;
393
394 /* Pad the new payload if the existing file length is greater. */
395 if (sb.st_size > (off_t)content_size) {
396 ci = sdsgrowzero(ci,sb.st_size);
397 memset(ci+content_size,'\n',sb.st_size-content_size);
398 }
399
400 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
401 if (do_fsync) {
402 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
403 if (fsync(fd) == -1) goto err;
404 }
405
406 /* Truncate the file if needed to remove the final \n padding that
407 * is just garbage. */
408 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
409 /* ftruncate() failing is not a critical error. */
410 }
411 close(fd);
412 sdsfree(ci);
413 return 0;
414
415 err:
416 if (fd != -1) close(fd);
417 sdsfree(ci);
418 return -1;
419 }
420
clusterSaveConfigOrDie(int do_fsync)421 void clusterSaveConfigOrDie(int do_fsync) {
422 if (clusterSaveConfig(do_fsync) == -1) {
423 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
424 exit(1);
425 }
426 }
427
428 /* Lock the cluster config using flock(), and leaks the file descriptor used to
429 * acquire the lock so that the file will be locked forever.
430 *
431 * This works because we always update nodes.conf with a new version
432 * in-place, reopening the file, and writing to it in place (later adjusting
433 * the length with ftruncate()).
434 *
435 * On success C_OK is returned, otherwise an error is logged and
436 * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)437 int clusterLockConfig(char *filename) {
438 /* flock() does not exist on Solaris
439 * and a fcntl-based solution won't help, as we constantly re-open that file,
440 * which will release _all_ locks anyway
441 */
442 #if !defined(__sun)
443 /* To lock it, we need to open the file in a way it is created if
444 * it does not exist, otherwise there is a race condition with other
445 * processes. */
446 int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
447 if (fd == -1) {
448 serverLog(LL_WARNING,
449 "Can't open %s in order to acquire a lock: %s",
450 filename, strerror(errno));
451 return C_ERR;
452 }
453
454 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
455 if (errno == EWOULDBLOCK) {
456 serverLog(LL_WARNING,
457 "Sorry, the cluster configuration file %s is already used "
458 "by a different Redis Cluster node. Please make sure that "
459 "different nodes use different cluster configuration "
460 "files.", filename);
461 } else {
462 serverLog(LL_WARNING,
463 "Impossible to lock %s: %s", filename, strerror(errno));
464 }
465 close(fd);
466 return C_ERR;
467 }
468 /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
469 * lock to the file as long as the process exists.
470 *
471 * After fork, the child process will get the fd opened by the parent process,
472 * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
473 * it will be closed in the child process.
474 * If it is not closed, when the main process is killed -9, but the child process
475 * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
476 * child process, and the main process will fail to get lock, means fail to start. */
477 server.cluster_config_file_lock_fd = fd;
478 #else
479 UNUSED(filename);
480 #endif /* __sun */
481
482 return C_OK;
483 }
484
485 /* Derives our ports to be announced in the cluster bus. */
deriveAnnouncedPorts(int * announced_port,int * announced_pport,int * announced_cport)486 void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
487 int *announced_cport) {
488 int port = server.tls_cluster ? server.tls_port : server.port;
489 /* Default announced ports. */
490 *announced_port = port;
491 *announced_pport = server.tls_cluster ? server.port : 0;
492 *announced_cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
493
494 /* Config overriding announced ports. */
495 if (server.tls_cluster && server.cluster_announce_tls_port) {
496 *announced_port = server.cluster_announce_tls_port;
497 *announced_pport = server.cluster_announce_port;
498 } else if (server.cluster_announce_port) {
499 *announced_port = server.cluster_announce_port;
500 }
501 if (server.cluster_announce_bus_port) {
502 *announced_cport = server.cluster_announce_bus_port;
503 }
504 }
505
506 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
507 * in the "myself" node based on the current configuration of the node,
508 * that may change at runtime via CONFIG SET. This function changes the
509 * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)510 void clusterUpdateMyselfFlags(void) {
511 if (!myself) return;
512 int oldflags = myself->flags;
513 int nofailover = server.cluster_slave_no_failover ?
514 CLUSTER_NODE_NOFAILOVER : 0;
515 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
516 myself->flags |= nofailover;
517 if (myself->flags != oldflags) {
518 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
519 CLUSTER_TODO_UPDATE_STATE);
520 }
521 }
522
523
524 /* We want to take myself->ip in sync with the cluster-announce-ip option.
525 * The option can be set at runtime via CONFIG SET. */
clusterUpdateMyselfIp(void)526 void clusterUpdateMyselfIp(void) {
527 if (!myself) return;
528 static char *prev_ip = NULL;
529 char *curr_ip = server.cluster_announce_ip;
530 int changed = 0;
531
532 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
533 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
534 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
535
536 if (changed) {
537 if (prev_ip) zfree(prev_ip);
538 prev_ip = curr_ip;
539
540 if (curr_ip) {
541 /* We always take a copy of the previous IP address, by
542 * duplicating the string. This way later we can check if
543 * the address really changed. */
544 prev_ip = zstrdup(prev_ip);
545 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
546 myself->ip[NET_IP_STR_LEN-1] = '\0';
547 } else {
548 myself->ip[0] = '\0'; /* Force autodetection. */
549 }
550 }
551 }
552
clusterInit(void)553 void clusterInit(void) {
554 int saveconf = 0;
555
556 server.cluster = zmalloc(sizeof(clusterState));
557 server.cluster->myself = NULL;
558 server.cluster->currentEpoch = 0;
559 server.cluster->state = CLUSTER_FAIL;
560 server.cluster->size = 1;
561 server.cluster->todo_before_sleep = 0;
562 server.cluster->nodes = dictCreate(&clusterNodesDictType);
563 server.cluster->nodes_black_list =
564 dictCreate(&clusterNodesBlackListDictType);
565 server.cluster->failover_auth_time = 0;
566 server.cluster->failover_auth_count = 0;
567 server.cluster->failover_auth_rank = 0;
568 server.cluster->failover_auth_epoch = 0;
569 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
570 server.cluster->lastVoteEpoch = 0;
571 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
572 server.cluster->stats_bus_messages_sent[i] = 0;
573 server.cluster->stats_bus_messages_received[i] = 0;
574 }
575 server.cluster->stats_pfail_nodes = 0;
576 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
577 clusterCloseAllSlots();
578
579 /* Lock the cluster config file to make sure every node uses
580 * its own nodes.conf. */
581 server.cluster_config_file_lock_fd = -1;
582 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
583 exit(1);
584
585 /* Load or create a new nodes configuration. */
586 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
587 /* No configuration found. We will just use the random name provided
588 * by the createClusterNode() function. */
589 myself = server.cluster->myself =
590 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
591 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
592 myself->name);
593 clusterAddNode(myself);
594 saveconf = 1;
595 }
596 if (saveconf) clusterSaveConfigOrDie(1);
597
598 /* We need a listening TCP port for our cluster messaging needs. */
599 server.cfd.count = 0;
600
601 /* Port sanity check II
602 * The other handshake port check is triggered too late to stop
603 * us from trying to use a too-high cluster port number. */
604 int port = server.tls_cluster ? server.tls_port : server.port;
605 if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
606 serverLog(LL_WARNING, "Redis port number too high. "
607 "Cluster communication port is 10,000 port "
608 "numbers higher than your Redis port. "
609 "Your Redis port number must be 55535 or less.");
610 exit(1);
611 }
612 if (!server.bindaddr_count) {
613 serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus.");
614 exit(1);
615 }
616 int cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
617 if (listenToPort(cport, &server.cfd) == C_ERR ) {
618 /* Note: the following log text is matched by the test suite. */
619 serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", cport);
620 exit(1);
621 }
622
623 if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
624 serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
625 }
626
627 /* Initialize data for the Slot to key API. */
628 slotToKeyInit(server.db);
629
630 /* Set myself->port/cport/pport to my listening ports, we'll just need to
631 * discover the IP address via MEET messages. */
632 deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
633
634 server.cluster->mf_end = 0;
635 server.cluster->mf_slave = NULL;
636 resetManualFailover();
637 clusterUpdateMyselfFlags();
638 clusterUpdateMyselfIp();
639 }
640
641 /* Reset a node performing a soft or hard reset:
642 *
643 * 1) All other nodes are forgotten.
644 * 2) All the assigned / open slots are released.
645 * 3) If the node is a slave, it turns into a master.
646 * 4) Only for hard reset: a new Node ID is generated.
647 * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
648 * 6) The new configuration is saved and the cluster state updated.
649 * 7) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)650 void clusterReset(int hard) {
651 dictIterator *di;
652 dictEntry *de;
653 int j;
654
655 /* Turn into master. */
656 if (nodeIsSlave(myself)) {
657 clusterSetNodeAsMaster(myself);
658 replicationUnsetMaster();
659 emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
660 }
661
662 /* Close slots, reset manual failover state. */
663 clusterCloseAllSlots();
664 resetManualFailover();
665
666 /* Unassign all the slots. */
667 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
668
669 /* Forget all the nodes, but myself. */
670 di = dictGetSafeIterator(server.cluster->nodes);
671 while((de = dictNext(di)) != NULL) {
672 clusterNode *node = dictGetVal(de);
673
674 if (node == myself) continue;
675 clusterDelNode(node);
676 }
677 dictReleaseIterator(di);
678
679 /* Hard reset only: set epochs to 0, change node ID. */
680 if (hard) {
681 sds oldname;
682
683 server.cluster->currentEpoch = 0;
684 server.cluster->lastVoteEpoch = 0;
685 myself->configEpoch = 0;
686 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
687
688 /* To change the Node ID we need to remove the old name from the
689 * nodes table, change the ID, and re-add back with new name. */
690 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
691 dictDelete(server.cluster->nodes,oldname);
692 sdsfree(oldname);
693 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
694 clusterAddNode(myself);
695 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
696 }
697
698 /* Make sure to persist the new config and update the state. */
699 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
700 CLUSTER_TODO_UPDATE_STATE|
701 CLUSTER_TODO_FSYNC_CONFIG);
702 }
703
704 /* -----------------------------------------------------------------------------
705 * CLUSTER communication link
706 * -------------------------------------------------------------------------- */
707
createClusterLink(clusterNode * node)708 clusterLink *createClusterLink(clusterNode *node) {
709 clusterLink *link = zmalloc(sizeof(*link));
710 link->ctime = mstime();
711 link->sndbuf = sdsempty();
712 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
713 link->rcvbuf_len = 0;
714 link->node = node;
715 link->conn = NULL;
716 return link;
717 }
718
719 /* Free a cluster link, but does not free the associated node of course.
720 * This function will just make sure that the original node associated
721 * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)722 void freeClusterLink(clusterLink *link) {
723 if (link->conn) {
724 connClose(link->conn);
725 link->conn = NULL;
726 }
727 sdsfree(link->sndbuf);
728 zfree(link->rcvbuf);
729 if (link->node)
730 link->node->link = NULL;
731 zfree(link);
732 }
733
clusterConnAcceptHandler(connection * conn)734 static void clusterConnAcceptHandler(connection *conn) {
735 clusterLink *link;
736
737 if (connGetState(conn) != CONN_STATE_CONNECTED) {
738 serverLog(LL_VERBOSE,
739 "Error accepting cluster node connection: %s", connGetLastError(conn));
740 connClose(conn);
741 return;
742 }
743
744 /* Create a link object we use to handle the connection.
745 * It gets passed to the readable handler when data is available.
746 * Initially the link->node pointer is set to NULL as we don't know
747 * which node is, but the right node is references once we know the
748 * node identity. */
749 link = createClusterLink(NULL);
750 link->conn = conn;
751 connSetPrivateData(conn, link);
752
753 /* Register read handler */
754 connSetReadHandler(conn, clusterReadHandler);
755 }
756
757 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)758 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
759 int cport, cfd;
760 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
761 char cip[NET_IP_STR_LEN];
762 UNUSED(el);
763 UNUSED(mask);
764 UNUSED(privdata);
765
766 /* If the server is starting up, don't accept cluster connections:
767 * UPDATE messages may interact with the database content. */
768 if (server.masterhost == NULL && server.loading) return;
769
770 while(max--) {
771 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
772 if (cfd == ANET_ERR) {
773 if (errno != EWOULDBLOCK)
774 serverLog(LL_VERBOSE,
775 "Error accepting cluster node: %s", server.neterr);
776 return;
777 }
778
779 connection *conn = server.tls_cluster ?
780 connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
781
782 /* Make sure connection is not in an error state */
783 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
784 serverLog(LL_VERBOSE,
785 "Error creating an accepting connection for cluster node: %s",
786 connGetLastError(conn));
787 connClose(conn);
788 return;
789 }
790 connEnableTcpNoDelay(conn);
791 connKeepAlive(conn,server.cluster_node_timeout * 2);
792
793 /* Use non-blocking I/O for cluster messages. */
794 serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
795
796 /* Accept the connection now. connAccept() may call our handler directly
797 * or schedule it for later depending on connection implementation.
798 */
799 if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
800 if (connGetState(conn) == CONN_STATE_ERROR)
801 serverLog(LL_VERBOSE,
802 "Error accepting cluster node connection: %s",
803 connGetLastError(conn));
804 connClose(conn);
805 return;
806 }
807 }
808 }
809
810 /* Return the approximated number of sockets we are using in order to
811 * take the cluster bus connections. */
getClusterConnectionsCount(void)812 unsigned long getClusterConnectionsCount(void) {
813 /* We decrement the number of nodes by one, since there is the
814 * "myself" node too in the list. Each node uses two file descriptors,
815 * one incoming and one outgoing, thus the multiplication by 2. */
816 return server.cluster_enabled ?
817 ((dictSize(server.cluster->nodes)-1)*2) : 0;
818 }
819
820 /* -----------------------------------------------------------------------------
821 * Key space handling
822 * -------------------------------------------------------------------------- */
823
824 /* We have 16384 hash slots. The hash slot of a given key is obtained
825 * as the least significant 14 bits of the crc16 of the key.
826 *
827 * However if the key contains the {...} pattern, only the part between
828 * { and } is hashed. This may be useful in the future to force certain
829 * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)830 unsigned int keyHashSlot(char *key, int keylen) {
831 int s, e; /* start-end indexes of { and } */
832
833 for (s = 0; s < keylen; s++)
834 if (key[s] == '{') break;
835
836 /* No '{' ? Hash the whole key. This is the base case. */
837 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
838
839 /* '{' found? Check if we have the corresponding '}'. */
840 for (e = s+1; e < keylen; e++)
841 if (key[e] == '}') break;
842
843 /* No '}' or nothing between {} ? Hash the whole key. */
844 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
845
846 /* If we are here there is both a { and a } on its right. Hash
847 * what is in the middle between { and }. */
848 return crc16(key+s+1,e-s-1) & 0x3FFF;
849 }
850
851 /* -----------------------------------------------------------------------------
852 * CLUSTER node API
853 * -------------------------------------------------------------------------- */
854
855 /* Create a new cluster node, with the specified flags.
856 * If "nodename" is NULL this is considered a first handshake and a random
857 * node name is assigned to this node (it will be fixed later when we'll
858 * receive the first pong).
859 *
860 * The node is created and returned to the user, but it is not automatically
861 * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)862 clusterNode *createClusterNode(char *nodename, int flags) {
863 clusterNode *node = zmalloc(sizeof(*node));
864
865 if (nodename)
866 memcpy(node->name, nodename, CLUSTER_NAMELEN);
867 else
868 getRandomHexChars(node->name, CLUSTER_NAMELEN);
869 node->ctime = mstime();
870 node->configEpoch = 0;
871 node->flags = flags;
872 memset(node->slots,0,sizeof(node->slots));
873 node->slots_info = NULL;
874 node->numslots = 0;
875 node->numslaves = 0;
876 node->slaves = NULL;
877 node->slaveof = NULL;
878 node->ping_sent = node->pong_received = 0;
879 node->data_received = 0;
880 node->fail_time = 0;
881 node->link = NULL;
882 memset(node->ip,0,sizeof(node->ip));
883 node->port = 0;
884 node->cport = 0;
885 node->pport = 0;
886 node->fail_reports = listCreate();
887 node->voted_time = 0;
888 node->orphaned_time = 0;
889 node->repl_offset_time = 0;
890 node->repl_offset = 0;
891 listSetFreeMethod(node->fail_reports,zfree);
892 return node;
893 }
894
895 /* This function is called every time we get a failure report from a node.
896 * The side effect is to populate the fail_reports list (or to update
897 * the timestamp of an existing report).
898 *
899 * 'failing' is the node that is in failure state according to the
900 * 'sender' node.
901 *
902 * The function returns 0 if it just updates a timestamp of an existing
903 * failure report from the same sender. 1 is returned if a new failure
904 * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)905 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
906 list *l = failing->fail_reports;
907 listNode *ln;
908 listIter li;
909 clusterNodeFailReport *fr;
910
911 /* If a failure report from the same sender already exists, just update
912 * the timestamp. */
913 listRewind(l,&li);
914 while ((ln = listNext(&li)) != NULL) {
915 fr = ln->value;
916 if (fr->node == sender) {
917 fr->time = mstime();
918 return 0;
919 }
920 }
921
922 /* Otherwise create a new report. */
923 fr = zmalloc(sizeof(*fr));
924 fr->node = sender;
925 fr->time = mstime();
926 listAddNodeTail(l,fr);
927 return 1;
928 }
929
930 /* Remove failure reports that are too old, where too old means reasonably
931 * older than the global node timeout. Note that anyway for a node to be
932 * flagged as FAIL we need to have a local PFAIL state that is at least
933 * older than the global node timeout, so we don't just trust the number
934 * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)935 void clusterNodeCleanupFailureReports(clusterNode *node) {
936 list *l = node->fail_reports;
937 listNode *ln;
938 listIter li;
939 clusterNodeFailReport *fr;
940 mstime_t maxtime = server.cluster_node_timeout *
941 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
942 mstime_t now = mstime();
943
944 listRewind(l,&li);
945 while ((ln = listNext(&li)) != NULL) {
946 fr = ln->value;
947 if (now - fr->time > maxtime) listDelNode(l,ln);
948 }
949 }
950
951 /* Remove the failing report for 'node' if it was previously considered
952 * failing by 'sender'. This function is called when a node informs us via
953 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
954 *
955 * Note that this function is called relatively often as it gets called even
956 * when there are no nodes failing, and is O(N), however when the cluster is
957 * fine the failure reports list is empty so the function runs in constant
958 * time.
959 *
960 * The function returns 1 if the failure report was found and removed.
961 * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)962 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
963 list *l = node->fail_reports;
964 listNode *ln;
965 listIter li;
966 clusterNodeFailReport *fr;
967
968 /* Search for a failure report from this sender. */
969 listRewind(l,&li);
970 while ((ln = listNext(&li)) != NULL) {
971 fr = ln->value;
972 if (fr->node == sender) break;
973 }
974 if (!ln) return 0; /* No failure report from this sender. */
975
976 /* Remove the failure report. */
977 listDelNode(l,ln);
978 clusterNodeCleanupFailureReports(node);
979 return 1;
980 }
981
982 /* Return the number of external nodes that believe 'node' is failing,
983 * not including this node, that may have a PFAIL or FAIL state for this
984 * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)985 int clusterNodeFailureReportsCount(clusterNode *node) {
986 clusterNodeCleanupFailureReports(node);
987 return listLength(node->fail_reports);
988 }
989
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)990 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
991 int j;
992
993 for (j = 0; j < master->numslaves; j++) {
994 if (master->slaves[j] == slave) {
995 if ((j+1) < master->numslaves) {
996 int remaining_slaves = (master->numslaves - j) - 1;
997 memmove(master->slaves+j,master->slaves+(j+1),
998 (sizeof(*master->slaves) * remaining_slaves));
999 }
1000 master->numslaves--;
1001 if (master->numslaves == 0)
1002 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
1003 return C_OK;
1004 }
1005 }
1006 return C_ERR;
1007 }
1008
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)1009 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
1010 int j;
1011
1012 /* If it's already a slave, don't add it again. */
1013 for (j = 0; j < master->numslaves; j++)
1014 if (master->slaves[j] == slave) return C_ERR;
1015 master->slaves = zrealloc(master->slaves,
1016 sizeof(clusterNode*)*(master->numslaves+1));
1017 master->slaves[master->numslaves] = slave;
1018 master->numslaves++;
1019 master->flags |= CLUSTER_NODE_MIGRATE_TO;
1020 return C_OK;
1021 }
1022
clusterCountNonFailingSlaves(clusterNode * n)1023 int clusterCountNonFailingSlaves(clusterNode *n) {
1024 int j, okslaves = 0;
1025
1026 for (j = 0; j < n->numslaves; j++)
1027 if (!nodeFailed(n->slaves[j])) okslaves++;
1028 return okslaves;
1029 }
1030
1031 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)1032 void freeClusterNode(clusterNode *n) {
1033 sds nodename;
1034 int j;
1035
1036 /* If the node has associated slaves, we have to set
1037 * all the slaves->slaveof fields to NULL (unknown). */
1038 for (j = 0; j < n->numslaves; j++)
1039 n->slaves[j]->slaveof = NULL;
1040
1041 /* Remove this node from the list of slaves of its master. */
1042 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
1043
1044 /* Unlink from the set of nodes. */
1045 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
1046 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
1047 sdsfree(nodename);
1048
1049 /* Release link and associated data structures. */
1050 if (n->link) freeClusterLink(n->link);
1051 listRelease(n->fail_reports);
1052 zfree(n->slaves);
1053 zfree(n);
1054 }
1055
1056 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)1057 void clusterAddNode(clusterNode *node) {
1058 int retval;
1059
1060 retval = dictAdd(server.cluster->nodes,
1061 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
1062 serverAssert(retval == DICT_OK);
1063 }
1064
1065 /* Remove a node from the cluster. The function performs the high level
1066 * cleanup, calling freeClusterNode() for the low level cleanup.
1067 * Here we do the following:
1068 *
1069 * 1) Mark all the slots handled by it as unassigned.
1070 * 2) Remove all the failure reports sent by this node and referenced by
1071 * other nodes.
1072 * 3) Free the node with freeClusterNode() that will in turn remove it
1073 * from the hash table and from the list of slaves of its master, if
1074 * it is a slave node.
1075 */
clusterDelNode(clusterNode * delnode)1076 void clusterDelNode(clusterNode *delnode) {
1077 int j;
1078 dictIterator *di;
1079 dictEntry *de;
1080
1081 /* 1) Mark slots as unassigned. */
1082 for (j = 0; j < CLUSTER_SLOTS; j++) {
1083 if (server.cluster->importing_slots_from[j] == delnode)
1084 server.cluster->importing_slots_from[j] = NULL;
1085 if (server.cluster->migrating_slots_to[j] == delnode)
1086 server.cluster->migrating_slots_to[j] = NULL;
1087 if (server.cluster->slots[j] == delnode)
1088 clusterDelSlot(j);
1089 }
1090
1091 /* 2) Remove failure reports. */
1092 di = dictGetSafeIterator(server.cluster->nodes);
1093 while((de = dictNext(di)) != NULL) {
1094 clusterNode *node = dictGetVal(de);
1095
1096 if (node == delnode) continue;
1097 clusterNodeDelFailureReport(node,delnode);
1098 }
1099 dictReleaseIterator(di);
1100
1101 /* 3) Free the node, unlinking it from the cluster. */
1102 freeClusterNode(delnode);
1103 }
1104
1105 /* Node lookup by name */
clusterLookupNode(const char * name)1106 clusterNode *clusterLookupNode(const char *name) {
1107 sds s = sdsnewlen(name, CLUSTER_NAMELEN);
1108 dictEntry *de;
1109
1110 de = dictFind(server.cluster->nodes,s);
1111 sdsfree(s);
1112 if (de == NULL) return NULL;
1113 return dictGetVal(de);
1114 }
1115
1116 /* This is only used after the handshake. When we connect a given IP/PORT
1117 * as a result of CLUSTER MEET we don't have the node name yet, so we
1118 * pick a random one, and will fix it when we receive the PONG request using
1119 * this function. */
clusterRenameNode(clusterNode * node,char * newname)1120 void clusterRenameNode(clusterNode *node, char *newname) {
1121 int retval;
1122 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1123
1124 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1125 node->name, newname);
1126 retval = dictDelete(server.cluster->nodes, s);
1127 sdsfree(s);
1128 serverAssert(retval == DICT_OK);
1129 memcpy(node->name, newname, CLUSTER_NAMELEN);
1130 clusterAddNode(node);
1131 }
1132
1133 /* -----------------------------------------------------------------------------
1134 * CLUSTER config epoch handling
1135 * -------------------------------------------------------------------------- */
1136
1137 /* Return the greatest configEpoch found in the cluster, or the current
1138 * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)1139 uint64_t clusterGetMaxEpoch(void) {
1140 uint64_t max = 0;
1141 dictIterator *di;
1142 dictEntry *de;
1143
1144 di = dictGetSafeIterator(server.cluster->nodes);
1145 while((de = dictNext(di)) != NULL) {
1146 clusterNode *node = dictGetVal(de);
1147 if (node->configEpoch > max) max = node->configEpoch;
1148 }
1149 dictReleaseIterator(di);
1150 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1151 return max;
1152 }
1153
1154 /* If this node epoch is zero or is not already the greatest across the
1155 * cluster (from the POV of the local configuration), this function will:
1156 *
1157 * 1) Generate a new config epoch, incrementing the current epoch.
1158 * 2) Assign the new epoch to this node, WITHOUT any consensus.
1159 * 3) Persist the configuration on disk before sending packets with the
1160 * new configuration.
1161 *
1162 * If the new config epoch is generated and assigned, C_OK is returned,
1163 * otherwise C_ERR is returned (since the node has already the greatest
1164 * configuration around) and no operation is performed.
1165 *
1166 * Important note: this function violates the principle that config epochs
1167 * should be generated with consensus and should be unique across the cluster.
1168 * However Redis Cluster uses this auto-generated new config epochs in two
1169 * cases:
1170 *
1171 * 1) When slots are closed after importing. Otherwise resharding would be
1172 * too expensive.
1173 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1174 * failover its master even if there is not master majority able to
1175 * create a new configuration epoch.
1176 *
1177 * Redis Cluster will not explode using this function, even in the case of
1178 * a collision between this node and another node, generating the same
1179 * configuration epoch unilaterally, because the config epoch conflict
1180 * resolution algorithm will eventually move colliding nodes to different
1181 * config epochs. However using this function may violate the "last failover
1182 * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1183 int clusterBumpConfigEpochWithoutConsensus(void) {
1184 uint64_t maxEpoch = clusterGetMaxEpoch();
1185
1186 if (myself->configEpoch == 0 ||
1187 myself->configEpoch != maxEpoch)
1188 {
1189 server.cluster->currentEpoch++;
1190 myself->configEpoch = server.cluster->currentEpoch;
1191 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1192 CLUSTER_TODO_FSYNC_CONFIG);
1193 serverLog(LL_WARNING,
1194 "New configEpoch set to %llu",
1195 (unsigned long long) myself->configEpoch);
1196 return C_OK;
1197 } else {
1198 return C_ERR;
1199 }
1200 }
1201
1202 /* This function is called when this node is a master, and we receive from
1203 * another master a configuration epoch that is equal to our configuration
1204 * epoch.
1205 *
1206 * BACKGROUND
1207 *
1208 * It is not possible that different slaves get the same config
1209 * epoch during a failover election, because the slaves need to get voted
1210 * by a majority. However when we perform a manual resharding of the cluster
1211 * the node will assign a configuration epoch to itself without to ask
1212 * for agreement. Usually resharding happens when the cluster is working well
1213 * and is supervised by the sysadmin, however it is possible for a failover
1214 * to happen exactly while the node we are resharding a slot to assigns itself
1215 * a new configuration epoch, but before it is able to propagate it.
1216 *
1217 * So technically it is possible in this condition that two nodes end with
1218 * the same configuration epoch.
1219 *
1220 * Another possibility is that there are bugs in the implementation causing
1221 * this to happen.
1222 *
1223 * Moreover when a new cluster is created, all the nodes start with the same
1224 * configEpoch. This collision resolution code allows nodes to automatically
1225 * end with a different configEpoch at startup automatically.
1226 *
1227 * In all the cases, we want a mechanism that resolves this issue automatically
1228 * as a safeguard. The same configuration epoch for masters serving different
1229 * set of slots is not harmful, but it is if the nodes end serving the same
1230 * slots for some reason (manual errors or software bugs) without a proper
1231 * failover procedure.
1232 *
1233 * In general we want a system that eventually always ends with different
1234 * masters having different configuration epochs whatever happened, since
1235 * nothing is worse than a split-brain condition in a distributed system.
1236 *
1237 * BEHAVIOR
1238 *
1239 * When this function gets called, what happens is that if this node
1240 * has the lexicographically smaller Node ID compared to the other node
1241 * with the conflicting epoch (the 'sender' node), it will assign itself
1242 * the greatest configuration epoch currently detected among nodes plus 1.
1243 *
1244 * This means that even if there are multiple nodes colliding, the node
1245 * with the greatest Node ID never moves forward, so eventually all the nodes
1246 * end with a different configuration epoch.
1247 */
clusterHandleConfigEpochCollision(clusterNode * sender)1248 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1249 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1250 if (sender->configEpoch != myself->configEpoch ||
1251 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1252 /* Don't act if the colliding node has a smaller Node ID. */
1253 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1254 /* Get the next ID available at the best of this node knowledge. */
1255 server.cluster->currentEpoch++;
1256 myself->configEpoch = server.cluster->currentEpoch;
1257 clusterSaveConfigOrDie(1);
1258 serverLog(LL_VERBOSE,
1259 "WARNING: configEpoch collision with node %.40s."
1260 " configEpoch set to %llu",
1261 sender->name,
1262 (unsigned long long) myself->configEpoch);
1263 }
1264
1265 /* -----------------------------------------------------------------------------
1266 * CLUSTER nodes blacklist
1267 *
1268 * The nodes blacklist is just a way to ensure that a given node with a given
1269 * Node ID is not re-added before some time elapsed (this time is specified
1270 * in seconds in CLUSTER_BLACKLIST_TTL).
1271 *
1272 * This is useful when we want to remove a node from the cluster completely:
1273 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1274 * that even if we receive gossip messages from other nodes that still remember
1275 * about the node we want to remove, we don't re-add it before some time.
1276 *
1277 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1278 * that redis-cli has 60 seconds to send CLUSTER FORGET messages to nodes
1279 * in the cluster without dealing with the problem of other nodes re-adding
1280 * back the node to nodes we already sent the FORGET command to.
1281 *
1282 * The data structure used is a hash table with an sds string representing
1283 * the node ID as key, and the time when it is ok to re-add the node as
1284 * value.
1285 * -------------------------------------------------------------------------- */
1286
1287 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1288
1289
1290 /* Before of the addNode() or Exists() operations we always remove expired
1291 * entries from the black list. This is an O(N) operation but it is not a
1292 * problem since add / exists operations are called very infrequently and
1293 * the hash table is supposed to contain very little elements at max.
1294 * However without the cleanup during long uptime and with some automated
1295 * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1296 void clusterBlacklistCleanup(void) {
1297 dictIterator *di;
1298 dictEntry *de;
1299
1300 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1301 while((de = dictNext(di)) != NULL) {
1302 int64_t expire = dictGetUnsignedIntegerVal(de);
1303
1304 if (expire < server.unixtime)
1305 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1306 }
1307 dictReleaseIterator(di);
1308 }
1309
1310 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1311 void clusterBlacklistAddNode(clusterNode *node) {
1312 dictEntry *de;
1313 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1314
1315 clusterBlacklistCleanup();
1316 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1317 /* If the key was added, duplicate the sds string representation of
1318 * the key for the next lookup. We'll free it at the end. */
1319 id = sdsdup(id);
1320 }
1321 de = dictFind(server.cluster->nodes_black_list,id);
1322 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1323 sdsfree(id);
1324 }
1325
1326 /* Return non-zero if the specified node ID exists in the blacklist.
1327 * You don't need to pass an sds string here, any pointer to 40 bytes
1328 * will work. */
clusterBlacklistExists(char * nodeid)1329 int clusterBlacklistExists(char *nodeid) {
1330 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1331 int retval;
1332
1333 clusterBlacklistCleanup();
1334 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1335 sdsfree(id);
1336 return retval;
1337 }
1338
1339 /* -----------------------------------------------------------------------------
1340 * CLUSTER messages exchange - PING/PONG and gossip
1341 * -------------------------------------------------------------------------- */
1342
1343 /* This function checks if a given node should be marked as FAIL.
1344 * It happens if the following conditions are met:
1345 *
1346 * 1) We received enough failure reports from other master nodes via gossip.
1347 * Enough means that the majority of the masters signaled the node is
1348 * down recently.
1349 * 2) We believe this node is in PFAIL state.
1350 *
1351 * If a failure is detected we also inform the whole cluster about this
1352 * event trying to force every other node to set the FAIL flag for the node.
1353 *
1354 * Note that the form of agreement used here is weak, as we collect the majority
1355 * of masters state during some time, and even if we force agreement by
1356 * propagating the FAIL message, because of partitions we may not reach every
1357 * node. However:
1358 *
1359 * 1) Either we reach the majority and eventually the FAIL state will propagate
1360 * to all the cluster.
1361 * 2) Or there is no majority so no slave promotion will be authorized and the
1362 * FAIL flag will be cleared after some time.
1363 */
markNodeAsFailingIfNeeded(clusterNode * node)1364 void markNodeAsFailingIfNeeded(clusterNode *node) {
1365 int failures;
1366 int needed_quorum = (server.cluster->size / 2) + 1;
1367
1368 if (!nodeTimedOut(node)) return; /* We can reach it. */
1369 if (nodeFailed(node)) return; /* Already FAILing. */
1370
1371 failures = clusterNodeFailureReportsCount(node);
1372 /* Also count myself as a voter if I'm a master. */
1373 if (nodeIsMaster(myself)) failures++;
1374 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1375
1376 serverLog(LL_NOTICE,
1377 "Marking node %.40s as failing (quorum reached).", node->name);
1378
1379 /* Mark the node as failing. */
1380 node->flags &= ~CLUSTER_NODE_PFAIL;
1381 node->flags |= CLUSTER_NODE_FAIL;
1382 node->fail_time = mstime();
1383
1384 /* Broadcast the failing node name to everybody, forcing all the other
1385 * reachable nodes to flag the node as FAIL.
1386 * We do that even if this node is a replica and not a master: anyway
1387 * the failing state is triggered collecting failure reports from masters,
1388 * so here the replica is only helping propagating this status. */
1389 clusterSendFail(node->name);
1390 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1391 }
1392
1393 /* This function is called only if a node is marked as FAIL, but we are able
1394 * to reach it again. It checks if there are the conditions to undo the FAIL
1395 * state. */
clearNodeFailureIfNeeded(clusterNode * node)1396 void clearNodeFailureIfNeeded(clusterNode *node) {
1397 mstime_t now = mstime();
1398
1399 serverAssert(nodeFailed(node));
1400
1401 /* For slaves we always clear the FAIL flag if we can contact the
1402 * node again. */
1403 if (nodeIsSlave(node) || node->numslots == 0) {
1404 serverLog(LL_NOTICE,
1405 "Clear FAIL state for node %.40s: %s is reachable again.",
1406 node->name,
1407 nodeIsSlave(node) ? "replica" : "master without slots");
1408 node->flags &= ~CLUSTER_NODE_FAIL;
1409 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1410 }
1411
1412 /* If it is a master and...
1413 * 1) The FAIL state is old enough.
1414 * 2) It is yet serving slots from our point of view (not failed over).
1415 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1416 if (nodeIsMaster(node) && node->numslots > 0 &&
1417 (now - node->fail_time) >
1418 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1419 {
1420 serverLog(LL_NOTICE,
1421 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1422 node->name);
1423 node->flags &= ~CLUSTER_NODE_FAIL;
1424 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1425 }
1426 }
1427
1428 /* Return true if we already have a node in HANDSHAKE state matching the
1429 * specified ip address and port number. This function is used in order to
1430 * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1431 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1432 dictIterator *di;
1433 dictEntry *de;
1434
1435 di = dictGetSafeIterator(server.cluster->nodes);
1436 while((de = dictNext(di)) != NULL) {
1437 clusterNode *node = dictGetVal(de);
1438
1439 if (!nodeInHandshake(node)) continue;
1440 if (!strcasecmp(node->ip,ip) &&
1441 node->port == port &&
1442 node->cport == cport) break;
1443 }
1444 dictReleaseIterator(di);
1445 return de != NULL;
1446 }
1447
1448 /* Start a handshake with the specified address if there is not one
1449 * already in progress. Returns non-zero if the handshake was actually
1450 * started. On error zero is returned and errno is set to one of the
1451 * following values:
1452 *
1453 * EAGAIN - There is already a handshake in progress for this address.
1454 * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1455 int clusterStartHandshake(char *ip, int port, int cport) {
1456 clusterNode *n;
1457 char norm_ip[NET_IP_STR_LEN];
1458 struct sockaddr_storage sa;
1459
1460 /* IP sanity check */
1461 if (inet_pton(AF_INET,ip,
1462 &(((struct sockaddr_in *)&sa)->sin_addr)))
1463 {
1464 sa.ss_family = AF_INET;
1465 } else if (inet_pton(AF_INET6,ip,
1466 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1467 {
1468 sa.ss_family = AF_INET6;
1469 } else {
1470 errno = EINVAL;
1471 return 0;
1472 }
1473
1474 /* Port sanity check */
1475 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1476 errno = EINVAL;
1477 return 0;
1478 }
1479
1480 /* Set norm_ip as the normalized string representation of the node
1481 * IP address. */
1482 memset(norm_ip,0,NET_IP_STR_LEN);
1483 if (sa.ss_family == AF_INET)
1484 inet_ntop(AF_INET,
1485 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1486 norm_ip,NET_IP_STR_LEN);
1487 else
1488 inet_ntop(AF_INET6,
1489 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1490 norm_ip,NET_IP_STR_LEN);
1491
1492 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1493 errno = EAGAIN;
1494 return 0;
1495 }
1496
1497 /* Add the node with a random address (NULL as first argument to
1498 * createClusterNode()). Everything will be fixed during the
1499 * handshake. */
1500 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1501 memcpy(n->ip,norm_ip,sizeof(n->ip));
1502 n->port = port;
1503 n->cport = cport;
1504 clusterAddNode(n);
1505 return 1;
1506 }
1507
1508 /* Process the gossip section of PING or PONG packets.
1509 * Note that this function assumes that the packet is already sanity-checked
1510 * by the caller, not in the content of the gossip section, but in the
1511 * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1512 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1513 uint16_t count = ntohs(hdr->count);
1514 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1515 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1516
1517 while(count--) {
1518 uint16_t flags = ntohs(g->flags);
1519 clusterNode *node;
1520 sds ci;
1521
1522 if (server.verbosity == LL_DEBUG) {
1523 ci = representClusterNodeFlags(sdsempty(), flags);
1524 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1525 g->nodename,
1526 g->ip,
1527 ntohs(g->port),
1528 ntohs(g->cport),
1529 ci);
1530 sdsfree(ci);
1531 }
1532
1533 /* Update our state accordingly to the gossip sections */
1534 node = clusterLookupNode(g->nodename);
1535 if (node) {
1536 /* We already know this node.
1537 Handle failure reports, only when the sender is a master. */
1538 if (sender && nodeIsMaster(sender) && node != myself) {
1539 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1540 if (clusterNodeAddFailureReport(node,sender)) {
1541 serverLog(LL_VERBOSE,
1542 "Node %.40s reported node %.40s as not reachable.",
1543 sender->name, node->name);
1544 }
1545 markNodeAsFailingIfNeeded(node);
1546 } else {
1547 if (clusterNodeDelFailureReport(node,sender)) {
1548 serverLog(LL_VERBOSE,
1549 "Node %.40s reported node %.40s is back online.",
1550 sender->name, node->name);
1551 }
1552 }
1553 }
1554
1555 /* If from our POV the node is up (no failure flags are set),
1556 * we have no pending ping for the node, nor we have failure
1557 * reports for this node, update the last pong time with the
1558 * one we see from the other nodes. */
1559 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1560 node->ping_sent == 0 &&
1561 clusterNodeFailureReportsCount(node) == 0)
1562 {
1563 mstime_t pongtime = ntohl(g->pong_received);
1564 pongtime *= 1000; /* Convert back to milliseconds. */
1565
1566 /* Replace the pong time with the received one only if
1567 * it's greater than our view but is not in the future
1568 * (with 500 milliseconds tolerance) from the POV of our
1569 * clock. */
1570 if (pongtime <= (server.mstime+500) &&
1571 pongtime > node->pong_received)
1572 {
1573 node->pong_received = pongtime;
1574 }
1575 }
1576
1577 /* If we already know this node, but it is not reachable, and
1578 * we see a different address in the gossip section of a node that
1579 * can talk with this other node, update the address, disconnect
1580 * the old link if any, so that we'll attempt to connect with the
1581 * new address. */
1582 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1583 !(flags & CLUSTER_NODE_NOADDR) &&
1584 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1585 (strcasecmp(node->ip,g->ip) ||
1586 node->port != ntohs(g->port) ||
1587 node->cport != ntohs(g->cport)))
1588 {
1589 if (node->link) freeClusterLink(node->link);
1590 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1591 node->port = ntohs(g->port);
1592 node->pport = ntohs(g->pport);
1593 node->cport = ntohs(g->cport);
1594 node->flags &= ~CLUSTER_NODE_NOADDR;
1595 }
1596 } else {
1597 /* If it's not in NOADDR state and we don't have it, we
1598 * add it to our trusted dict with exact nodeid and flag.
1599 * Note that we cannot simply start a handshake against
1600 * this IP/PORT pairs, since IP/PORT can be reused already,
1601 * otherwise we risk joining another cluster.
1602 *
1603 * Note that we require that the sender of this gossip message
1604 * is a well known node in our cluster, otherwise we risk
1605 * joining another cluster. */
1606 if (sender &&
1607 !(flags & CLUSTER_NODE_NOADDR) &&
1608 !clusterBlacklistExists(g->nodename))
1609 {
1610 clusterNode *node;
1611 node = createClusterNode(g->nodename, flags);
1612 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1613 node->port = ntohs(g->port);
1614 node->pport = ntohs(g->pport);
1615 node->cport = ntohs(g->cport);
1616 clusterAddNode(node);
1617 }
1618 }
1619
1620 /* Next node */
1621 g++;
1622 }
1623 }
1624
1625 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1626 * If 'announced_ip' length is non-zero, it is used instead of extracting
1627 * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1628 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1629 if (announced_ip[0] != '\0') {
1630 memcpy(buf,announced_ip,NET_IP_STR_LEN);
1631 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1632 } else {
1633 connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
1634 }
1635 }
1636
1637 /* Update the node address to the IP address that can be extracted
1638 * from link->fd, or if hdr->myip is non empty, to the address the node
1639 * is announcing us. The port is taken from the packet header as well.
1640 *
1641 * If the address or port changed, disconnect the node link so that we'll
1642 * connect again to the new address.
1643 *
1644 * If the ip/port pair are already correct no operation is performed at
1645 * all.
1646 *
1647 * The function returns 0 if the node address is still the same,
1648 * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1649 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1650 clusterMsg *hdr)
1651 {
1652 char ip[NET_IP_STR_LEN] = {0};
1653 int port = ntohs(hdr->port);
1654 int pport = ntohs(hdr->pport);
1655 int cport = ntohs(hdr->cport);
1656
1657 /* We don't proceed if the link is the same as the sender link, as this
1658 * function is designed to see if the node link is consistent with the
1659 * symmetric link that is used to receive PINGs from the node.
1660 *
1661 * As a side effect this function never frees the passed 'link', so
1662 * it is safe to call during packet processing. */
1663 if (link == node->link) return 0;
1664
1665 nodeIp2String(ip,link,hdr->myip);
1666 if (node->port == port && node->cport == cport && node->pport == pport &&
1667 strcmp(ip,node->ip) == 0) return 0;
1668
1669 /* IP / port is different, update it. */
1670 memcpy(node->ip,ip,sizeof(ip));
1671 node->port = port;
1672 node->pport = pport;
1673 node->cport = cport;
1674 if (node->link) freeClusterLink(node->link);
1675 node->flags &= ~CLUSTER_NODE_NOADDR;
1676 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1677 node->name, node->ip, node->port);
1678
1679 /* Check if this is our master and we have to change the
1680 * replication target as well. */
1681 if (nodeIsSlave(myself) && myself->slaveof == node)
1682 replicationSetMaster(node->ip, node->port);
1683 return 1;
1684 }
1685
1686 /* Reconfigure the specified node 'n' as a master. This function is called when
1687 * a node that we believed to be a slave is now acting as master in order to
1688 * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1689 void clusterSetNodeAsMaster(clusterNode *n) {
1690 if (nodeIsMaster(n)) return;
1691
1692 if (n->slaveof) {
1693 clusterNodeRemoveSlave(n->slaveof,n);
1694 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1695 }
1696 n->flags &= ~CLUSTER_NODE_SLAVE;
1697 n->flags |= CLUSTER_NODE_MASTER;
1698 n->slaveof = NULL;
1699
1700 /* Update config and state. */
1701 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1702 CLUSTER_TODO_UPDATE_STATE);
1703 }
1704
1705 /* This function is called when we receive a master configuration via a
1706 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1707 * node, and the set of slots claimed under this configEpoch.
1708 *
1709 * What we do is to rebind the slots with newer configuration compared to our
1710 * local configuration, and if needed, we turn ourself into a replica of the
1711 * node (see the function comments for more info).
1712 *
1713 * The 'sender' is the node for which we received a configuration update.
1714 * Sometimes it is not actually the "Sender" of the information, like in the
1715 * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1716 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1717 int j;
1718 clusterNode *curmaster = NULL, *newmaster = NULL;
1719 /* The dirty slots list is a list of slots for which we lose the ownership
1720 * while having still keys inside. This usually happens after a failover
1721 * or after a manual cluster reconfiguration operated by the admin.
1722 *
1723 * If the update message is not able to demote a master to slave (in this
1724 * case we'll resync with the master updating the whole key space), we
1725 * need to delete all the keys in the slots we lost ownership. */
1726 uint16_t dirty_slots[CLUSTER_SLOTS];
1727 int dirty_slots_count = 0;
1728
1729 /* We should detect if sender is new master of our shard.
1730 * We will know it if all our slots were migrated to sender, and sender
1731 * has no slots except ours */
1732 int sender_slots = 0;
1733 int migrated_our_slots = 0;
1734
1735 /* Here we set curmaster to this node or the node this node
1736 * replicates to if it's a slave. In the for loop we are
1737 * interested to check if slots are taken away from curmaster. */
1738 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1739
1740 if (sender == myself) {
1741 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1742 return;
1743 }
1744
1745 for (j = 0; j < CLUSTER_SLOTS; j++) {
1746 if (bitmapTestBit(slots,j)) {
1747 sender_slots++;
1748
1749 /* The slot is already bound to the sender of this message. */
1750 if (server.cluster->slots[j] == sender) continue;
1751
1752 /* The slot is in importing state, it should be modified only
1753 * manually via redis-cli (example: a resharding is in progress
1754 * and the migrating side slot was already closed and is advertising
1755 * a new config. We still want the slot to be closed manually). */
1756 if (server.cluster->importing_slots_from[j]) continue;
1757
1758 /* We rebind the slot to the new node claiming it if:
1759 * 1) The slot was unassigned or the new node claims it with a
1760 * greater configEpoch.
1761 * 2) We are not currently importing the slot. */
1762 if (server.cluster->slots[j] == NULL ||
1763 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1764 {
1765 /* Was this slot mine, and still contains keys? Mark it as
1766 * a dirty slot. */
1767 if (server.cluster->slots[j] == myself &&
1768 countKeysInSlot(j) &&
1769 sender != myself)
1770 {
1771 dirty_slots[dirty_slots_count] = j;
1772 dirty_slots_count++;
1773 }
1774
1775 if (server.cluster->slots[j] == curmaster) {
1776 newmaster = sender;
1777 migrated_our_slots++;
1778 }
1779 clusterDelSlot(j);
1780 clusterAddSlot(sender,j);
1781 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1782 CLUSTER_TODO_UPDATE_STATE|
1783 CLUSTER_TODO_FSYNC_CONFIG);
1784 }
1785 }
1786 }
1787
1788 /* After updating the slots configuration, don't do any actual change
1789 * in the state of the server if a module disabled Redis Cluster
1790 * keys redirections. */
1791 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1792 return;
1793
1794 /* If at least one slot was reassigned from a node to another node
1795 * with a greater configEpoch, it is possible that:
1796 * 1) We are a master left without slots. This means that we were
1797 * failed over and we should turn into a replica of the new
1798 * master.
1799 * 2) We are a slave and our master is left without slots. We need
1800 * to replicate to the new slots owner. */
1801 if (newmaster && curmaster->numslots == 0 &&
1802 (server.cluster_allow_replica_migration ||
1803 sender_slots == migrated_our_slots)) {
1804 serverLog(LL_WARNING,
1805 "Configuration change detected. Reconfiguring myself "
1806 "as a replica of %.40s", sender->name);
1807 clusterSetMaster(sender);
1808 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1809 CLUSTER_TODO_UPDATE_STATE|
1810 CLUSTER_TODO_FSYNC_CONFIG);
1811 } else if (dirty_slots_count) {
1812 /* If we are here, we received an update message which removed
1813 * ownership for certain slots we still have keys about, but still
1814 * we are serving some slots, so this master node was not demoted to
1815 * a slave.
1816 *
1817 * In order to maintain a consistent state between keys and slots
1818 * we need to remove all the keys from the slots we lost. */
1819 for (j = 0; j < dirty_slots_count; j++)
1820 delKeysInSlot(dirty_slots[j]);
1821 }
1822 }
1823
1824 /* When this function is called, there is a packet to process starting
1825 * at link->rcvbuf. Releasing the buffer is up to the caller, so this
1826 * function should just handle the higher level stuff of processing the
1827 * packet, modifying the cluster state if needed.
1828 *
1829 * The function returns 1 if the link is still valid after the packet
1830 * was processed, otherwise 0 if the link was freed since the packet
1831 * processing lead to some inconsistency error (for instance a PONG
1832 * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1833 int clusterProcessPacket(clusterLink *link) {
1834 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1835 uint32_t totlen = ntohl(hdr->totlen);
1836 uint16_t type = ntohs(hdr->type);
1837 mstime_t now = mstime();
1838
1839 if (type < CLUSTERMSG_TYPE_COUNT)
1840 server.cluster->stats_bus_messages_received[type]++;
1841 serverLog(LL_DEBUG,"--- Processing packet of type %s, %lu bytes",
1842 clusterGetMessageTypeString(type), (unsigned long) totlen);
1843
1844 /* Perform sanity checks */
1845 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1846 if (totlen > link->rcvbuf_len) return 1;
1847
1848 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1849 /* Can't handle messages of different versions. */
1850 return 1;
1851 }
1852
1853 uint16_t flags = ntohs(hdr->flags);
1854 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1855 clusterNode *sender;
1856
1857 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1858 type == CLUSTERMSG_TYPE_MEET)
1859 {
1860 uint16_t count = ntohs(hdr->count);
1861 uint32_t explen; /* expected length of this packet */
1862
1863 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1864 explen += (sizeof(clusterMsgDataGossip)*count);
1865 if (totlen != explen) return 1;
1866 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1867 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1868
1869 explen += sizeof(clusterMsgDataFail);
1870 if (totlen != explen) return 1;
1871 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1872 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1873
1874 explen += sizeof(clusterMsgDataPublish) -
1875 8 +
1876 ntohl(hdr->data.publish.msg.channel_len) +
1877 ntohl(hdr->data.publish.msg.message_len);
1878 if (totlen != explen) return 1;
1879 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1880 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1881 type == CLUSTERMSG_TYPE_MFSTART)
1882 {
1883 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1884
1885 if (totlen != explen) return 1;
1886 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1887 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1888
1889 explen += sizeof(clusterMsgDataUpdate);
1890 if (totlen != explen) return 1;
1891 } else if (type == CLUSTERMSG_TYPE_MODULE) {
1892 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1893
1894 explen += sizeof(clusterMsgModule) -
1895 3 + ntohl(hdr->data.module.msg.len);
1896 if (totlen != explen) return 1;
1897 }
1898
1899 /* Check if the sender is a known node. Note that for incoming connections
1900 * we don't store link->node information, but resolve the node by the
1901 * ID in the header each time in the current implementation. */
1902 sender = clusterLookupNode(hdr->sender);
1903
1904 /* Update the last time we saw any data from this node. We
1905 * use this in order to avoid detecting a timeout from a node that
1906 * is just sending a lot of data in the cluster bus, for instance
1907 * because of Pub/Sub. */
1908 if (sender) sender->data_received = now;
1909
1910 if (sender && !nodeInHandshake(sender)) {
1911 /* Update our currentEpoch if we see a newer epoch in the cluster. */
1912 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1913 senderConfigEpoch = ntohu64(hdr->configEpoch);
1914 if (senderCurrentEpoch > server.cluster->currentEpoch)
1915 server.cluster->currentEpoch = senderCurrentEpoch;
1916 /* Update the sender configEpoch if it is publishing a newer one. */
1917 if (senderConfigEpoch > sender->configEpoch) {
1918 sender->configEpoch = senderConfigEpoch;
1919 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1920 CLUSTER_TODO_FSYNC_CONFIG);
1921 }
1922 /* Update the replication offset info for this node. */
1923 sender->repl_offset = ntohu64(hdr->offset);
1924 sender->repl_offset_time = now;
1925 /* If we are a slave performing a manual failover and our master
1926 * sent its offset while already paused, populate the MF state. */
1927 if (server.cluster->mf_end &&
1928 nodeIsSlave(myself) &&
1929 myself->slaveof == sender &&
1930 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1931 server.cluster->mf_master_offset == -1)
1932 {
1933 server.cluster->mf_master_offset = sender->repl_offset;
1934 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
1935 serverLog(LL_WARNING,
1936 "Received replication offset for paused "
1937 "master manual failover: %lld",
1938 server.cluster->mf_master_offset);
1939 }
1940 }
1941
1942 /* Initial processing of PING and MEET requests replying with a PONG. */
1943 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1944 /* We use incoming MEET messages in order to set the address
1945 * for 'myself', since only other cluster nodes will send us
1946 * MEET messages on handshakes, when the cluster joins, or
1947 * later if we changed address, and those nodes will use our
1948 * official address to connect to us. So by obtaining this address
1949 * from the socket is a simple way to discover / update our own
1950 * address in the cluster without it being hardcoded in the config.
1951 *
1952 * However if we don't have an address at all, we update the address
1953 * even with a normal PING packet. If it's wrong it will be fixed
1954 * by MEET later. */
1955 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1956 server.cluster_announce_ip == NULL)
1957 {
1958 char ip[NET_IP_STR_LEN];
1959
1960 if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
1961 strcmp(ip,myself->ip))
1962 {
1963 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1964 serverLog(LL_WARNING,"IP address for this node updated to %s",
1965 myself->ip);
1966 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1967 }
1968 }
1969
1970 /* Add this node if it is new for us and the msg type is MEET.
1971 * In this stage we don't try to add the node with the right
1972 * flags, slaveof pointer, and so forth, as this details will be
1973 * resolved when we'll receive PONGs from the node. */
1974 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1975 clusterNode *node;
1976
1977 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1978 nodeIp2String(node->ip,link,hdr->myip);
1979 node->port = ntohs(hdr->port);
1980 node->pport = ntohs(hdr->pport);
1981 node->cport = ntohs(hdr->cport);
1982 clusterAddNode(node);
1983 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1984 }
1985
1986 /* If this is a MEET packet from an unknown node, we still process
1987 * the gossip section here since we have to trust the sender because
1988 * of the message type. */
1989 if (!sender && type == CLUSTERMSG_TYPE_MEET)
1990 clusterProcessGossipSection(hdr,link);
1991
1992 /* Anyway reply with a PONG */
1993 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1994 }
1995
1996 /* PING, PONG, MEET: process config information. */
1997 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1998 type == CLUSTERMSG_TYPE_MEET)
1999 {
2000 serverLog(LL_DEBUG,"%s packet received: %s",
2001 clusterGetMessageTypeString(type),
2002 link->node ? link->node->name : "NULL");
2003 if (link->node) {
2004 if (nodeInHandshake(link->node)) {
2005 /* If we already have this node, try to change the
2006 * IP/port of the node with the new one. */
2007 if (sender) {
2008 serverLog(LL_VERBOSE,
2009 "Handshake: we already know node %.40s, "
2010 "updating the address if needed.", sender->name);
2011 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
2012 {
2013 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2014 CLUSTER_TODO_UPDATE_STATE);
2015 }
2016 /* Free this node as we already have it. This will
2017 * cause the link to be freed as well. */
2018 clusterDelNode(link->node);
2019 return 0;
2020 }
2021
2022 /* First thing to do is replacing the random name with the
2023 * right node name if this was a handshake stage. */
2024 clusterRenameNode(link->node, hdr->sender);
2025 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
2026 link->node->name);
2027 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
2028 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
2029 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2030 } else if (memcmp(link->node->name,hdr->sender,
2031 CLUSTER_NAMELEN) != 0)
2032 {
2033 /* If the reply has a non matching node ID we
2034 * disconnect this node and set it as not having an associated
2035 * address. */
2036 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
2037 link->node->name,
2038 (int)(now-(link->node->ctime)),
2039 link->node->flags);
2040 link->node->flags |= CLUSTER_NODE_NOADDR;
2041 link->node->ip[0] = '\0';
2042 link->node->port = 0;
2043 link->node->pport = 0;
2044 link->node->cport = 0;
2045 freeClusterLink(link);
2046 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2047 return 0;
2048 }
2049 }
2050
2051 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
2052 * announced. This is a dynamic flag that we receive from the
2053 * sender, and the latest status must be trusted. We need it to
2054 * be propagated because the slave ranking used to understand the
2055 * delay of each slave in the voting process, needs to know
2056 * what are the instances really competing. */
2057 if (sender) {
2058 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
2059 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
2060 sender->flags |= nofailover;
2061 }
2062
2063 /* Update the node address if it changed. */
2064 if (sender && type == CLUSTERMSG_TYPE_PING &&
2065 !nodeInHandshake(sender) &&
2066 nodeUpdateAddressIfNeeded(sender,link,hdr))
2067 {
2068 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2069 CLUSTER_TODO_UPDATE_STATE);
2070 }
2071
2072 /* Update our info about the node */
2073 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
2074 link->node->pong_received = now;
2075 link->node->ping_sent = 0;
2076
2077 /* The PFAIL condition can be reversed without external
2078 * help if it is momentary (that is, if it does not
2079 * turn into a FAIL state).
2080 *
2081 * The FAIL condition is also reversible under specific
2082 * conditions detected by clearNodeFailureIfNeeded(). */
2083 if (nodeTimedOut(link->node)) {
2084 link->node->flags &= ~CLUSTER_NODE_PFAIL;
2085 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2086 CLUSTER_TODO_UPDATE_STATE);
2087 } else if (nodeFailed(link->node)) {
2088 clearNodeFailureIfNeeded(link->node);
2089 }
2090 }
2091
2092 /* Check for role switch: slave -> master or master -> slave. */
2093 if (sender) {
2094 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
2095 sizeof(hdr->slaveof)))
2096 {
2097 /* Node is a master. */
2098 clusterSetNodeAsMaster(sender);
2099 } else {
2100 /* Node is a slave. */
2101 clusterNode *master = clusterLookupNode(hdr->slaveof);
2102
2103 if (nodeIsMaster(sender)) {
2104 /* Master turned into a slave! Reconfigure the node. */
2105 clusterDelNodeSlots(sender);
2106 sender->flags &= ~(CLUSTER_NODE_MASTER|
2107 CLUSTER_NODE_MIGRATE_TO);
2108 sender->flags |= CLUSTER_NODE_SLAVE;
2109
2110 /* Update config and state. */
2111 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2112 CLUSTER_TODO_UPDATE_STATE);
2113 }
2114
2115 /* Master node changed for this slave? */
2116 if (master && sender->slaveof != master) {
2117 if (sender->slaveof)
2118 clusterNodeRemoveSlave(sender->slaveof,sender);
2119 clusterNodeAddSlave(master,sender);
2120 sender->slaveof = master;
2121
2122 /* Update config. */
2123 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2124 }
2125 }
2126 }
2127
2128 /* Update our info about served slots.
2129 *
2130 * Note: this MUST happen after we update the master/slave state
2131 * so that CLUSTER_NODE_MASTER flag will be set. */
2132
2133 /* Many checks are only needed if the set of served slots this
2134 * instance claims is different compared to the set of slots we have
2135 * for it. Check this ASAP to avoid other computational expansive
2136 * checks later. */
2137 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
2138 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
2139
2140 if (sender) {
2141 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
2142 if (sender_master) {
2143 dirty_slots = memcmp(sender_master->slots,
2144 hdr->myslots,sizeof(hdr->myslots)) != 0;
2145 }
2146 }
2147
2148 /* 1) If the sender of the message is a master, and we detected that
2149 * the set of slots it claims changed, scan the slots to see if we
2150 * need to update our configuration. */
2151 if (sender && nodeIsMaster(sender) && dirty_slots)
2152 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
2153
2154 /* 2) We also check for the reverse condition, that is, the sender
2155 * claims to serve slots we know are served by a master with a
2156 * greater configEpoch. If this happens we inform the sender.
2157 *
2158 * This is useful because sometimes after a partition heals, a
2159 * reappearing master may be the last one to claim a given set of
2160 * hash slots, but with a configuration that other instances know to
2161 * be deprecated. Example:
2162 *
2163 * A and B are master and slave for slots 1,2,3.
2164 * A is partitioned away, B gets promoted.
2165 * B is partitioned away, and A returns available.
2166 *
2167 * Usually B would PING A publishing its set of served slots and its
2168 * configEpoch, but because of the partition B can't inform A of the
2169 * new configuration, so other nodes that have an updated table must
2170 * do it. In this way A will stop to act as a master (or can try to
2171 * failover if there are the conditions to win the election). */
2172 if (sender && dirty_slots) {
2173 int j;
2174
2175 for (j = 0; j < CLUSTER_SLOTS; j++) {
2176 if (bitmapTestBit(hdr->myslots,j)) {
2177 if (server.cluster->slots[j] == sender ||
2178 server.cluster->slots[j] == NULL) continue;
2179 if (server.cluster->slots[j]->configEpoch >
2180 senderConfigEpoch)
2181 {
2182 serverLog(LL_VERBOSE,
2183 "Node %.40s has old slots configuration, sending "
2184 "an UPDATE message about %.40s",
2185 sender->name, server.cluster->slots[j]->name);
2186 clusterSendUpdate(sender->link,
2187 server.cluster->slots[j]);
2188
2189 /* TODO: instead of exiting the loop send every other
2190 * UPDATE packet for other nodes that are the new owner
2191 * of sender's slots. */
2192 break;
2193 }
2194 }
2195 }
2196 }
2197
2198 /* If our config epoch collides with the sender's try to fix
2199 * the problem. */
2200 if (sender &&
2201 nodeIsMaster(myself) && nodeIsMaster(sender) &&
2202 senderConfigEpoch == myself->configEpoch)
2203 {
2204 clusterHandleConfigEpochCollision(sender);
2205 }
2206
2207 /* Get info from the gossip section */
2208 if (sender) clusterProcessGossipSection(hdr,link);
2209 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2210 clusterNode *failing;
2211
2212 if (sender) {
2213 failing = clusterLookupNode(hdr->data.fail.about.nodename);
2214 if (failing &&
2215 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2216 {
2217 serverLog(LL_NOTICE,
2218 "FAIL message received from %.40s about %.40s",
2219 hdr->sender, hdr->data.fail.about.nodename);
2220 failing->flags |= CLUSTER_NODE_FAIL;
2221 failing->fail_time = now;
2222 failing->flags &= ~CLUSTER_NODE_PFAIL;
2223 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2224 CLUSTER_TODO_UPDATE_STATE);
2225 }
2226 } else {
2227 serverLog(LL_NOTICE,
2228 "Ignoring FAIL message from unknown node %.40s about %.40s",
2229 hdr->sender, hdr->data.fail.about.nodename);
2230 }
2231 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2232 if (!sender) return 1; /* We don't know that node. */
2233
2234 robj *channel, *message;
2235 uint32_t channel_len, message_len;
2236
2237 /* Don't bother creating useless objects if there are no
2238 * Pub/Sub subscribers. */
2239 if (dictSize(server.pubsub_channels) ||
2240 dictSize(server.pubsub_patterns))
2241 {
2242 channel_len = ntohl(hdr->data.publish.msg.channel_len);
2243 message_len = ntohl(hdr->data.publish.msg.message_len);
2244 channel = createStringObject(
2245 (char*)hdr->data.publish.msg.bulk_data,channel_len);
2246 message = createStringObject(
2247 (char*)hdr->data.publish.msg.bulk_data+channel_len,
2248 message_len);
2249 pubsubPublishMessage(channel,message);
2250 decrRefCount(channel);
2251 decrRefCount(message);
2252 }
2253 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2254 if (!sender) return 1; /* We don't know that node. */
2255 clusterSendFailoverAuthIfNeeded(sender,hdr);
2256 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2257 if (!sender) return 1; /* We don't know that node. */
2258 /* We consider this vote only if the sender is a master serving
2259 * a non zero number of slots, and its currentEpoch is greater or
2260 * equal to epoch where this node started the election. */
2261 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2262 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2263 {
2264 server.cluster->failover_auth_count++;
2265 /* Maybe we reached a quorum here, set a flag to make sure
2266 * we check ASAP. */
2267 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2268 }
2269 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2270 /* This message is acceptable only if I'm a master and the sender
2271 * is one of my slaves. */
2272 if (!sender || sender->slaveof != myself) return 1;
2273 /* Manual failover requested from slaves. Initialize the state
2274 * accordingly. */
2275 resetManualFailover();
2276 server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2277 server.cluster->mf_slave = sender;
2278 pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE);
2279 serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2280 sender->name);
2281 /* We need to send a ping message to the replica, as it would carry
2282 * `server.cluster->mf_master_offset`, which means the master paused clients
2283 * at offset `server.cluster->mf_master_offset`, so that the replica would
2284 * know that it is safe to set its `server.cluster->mf_can_start` to 1 so as
2285 * to complete failover as quickly as possible. */
2286 clusterSendPing(link, CLUSTERMSG_TYPE_PING);
2287 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2288 clusterNode *n; /* The node the update is about. */
2289 uint64_t reportedConfigEpoch =
2290 ntohu64(hdr->data.update.nodecfg.configEpoch);
2291
2292 if (!sender) return 1; /* We don't know the sender. */
2293 n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2294 if (!n) return 1; /* We don't know the reported node. */
2295 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2296
2297 /* If in our current config the node is a slave, set it as a master. */
2298 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2299
2300 /* Update the node's configEpoch. */
2301 n->configEpoch = reportedConfigEpoch;
2302 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2303 CLUSTER_TODO_FSYNC_CONFIG);
2304
2305 /* Check the bitmap of served slots and update our
2306 * config accordingly. */
2307 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2308 hdr->data.update.nodecfg.slots);
2309 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2310 if (!sender) return 1; /* Protect the module from unknown nodes. */
2311 /* We need to route this message back to the right module subscribed
2312 * for the right message type. */
2313 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2314 uint32_t len = ntohl(hdr->data.module.msg.len);
2315 uint8_t type = hdr->data.module.msg.type;
2316 unsigned char *payload = hdr->data.module.msg.bulk_data;
2317 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2318 } else {
2319 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2320 }
2321 return 1;
2322 }
2323
2324 /* This function is called when we detect the link with this node is lost.
2325 We set the node as no longer connected. The Cluster Cron will detect
2326 this connection and will try to get it connected again.
2327
2328 Instead if the node is a temporary node used to accept a query, we
2329 completely free the node on error. */
handleLinkIOError(clusterLink * link)2330 void handleLinkIOError(clusterLink *link) {
2331 freeClusterLink(link);
2332 }
2333
2334 /* Send data. This is handled using a trivial send buffer that gets
2335 * consumed by write(). We don't try to optimize this for speed too much
2336 * as this is a very low traffic channel. */
clusterWriteHandler(connection * conn)2337 void clusterWriteHandler(connection *conn) {
2338 clusterLink *link = connGetPrivateData(conn);
2339 ssize_t nwritten;
2340
2341 nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
2342 if (nwritten <= 0) {
2343 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2344 (nwritten == -1) ? connGetLastError(conn) : "short write");
2345 handleLinkIOError(link);
2346 return;
2347 }
2348 sdsrange(link->sndbuf,nwritten,-1);
2349 if (sdslen(link->sndbuf) == 0)
2350 connSetWriteHandler(link->conn, NULL);
2351 }
2352
2353 /* A connect handler that gets called when a connection to another node
2354 * gets established.
2355 */
clusterLinkConnectHandler(connection * conn)2356 void clusterLinkConnectHandler(connection *conn) {
2357 clusterLink *link = connGetPrivateData(conn);
2358 clusterNode *node = link->node;
2359
2360 /* Check if connection succeeded */
2361 if (connGetState(conn) != CONN_STATE_CONNECTED) {
2362 serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
2363 node->name, node->ip, node->cport,
2364 connGetLastError(conn));
2365 freeClusterLink(link);
2366 return;
2367 }
2368
2369 /* Register a read handler from now on */
2370 connSetReadHandler(conn, clusterReadHandler);
2371
2372 /* Queue a PING in the new connection ASAP: this is crucial
2373 * to avoid false positives in failure detection.
2374 *
2375 * If the node is flagged as MEET, we send a MEET message instead
2376 * of a PING one, to force the receiver to add us in its node
2377 * table. */
2378 mstime_t old_ping_sent = node->ping_sent;
2379 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
2380 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
2381 if (old_ping_sent) {
2382 /* If there was an active ping before the link was
2383 * disconnected, we want to restore the ping time, otherwise
2384 * replaced by the clusterSendPing() call. */
2385 node->ping_sent = old_ping_sent;
2386 }
2387 /* We can clear the flag after the first packet is sent.
2388 * If we'll never receive a PONG, we'll never send new packets
2389 * to this node. Instead after the PONG is received and we
2390 * are no longer in meet/handshake status, we want to send
2391 * normal PING packets. */
2392 node->flags &= ~CLUSTER_NODE_MEET;
2393
2394 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
2395 node->name, node->ip, node->cport);
2396 }
2397
2398 /* Read data. Try to read the first field of the header first to check the
2399 * full length of the packet. When a whole packet is in memory this function
2400 * will call the function to process the packet. And so forth. */
clusterReadHandler(connection * conn)2401 void clusterReadHandler(connection *conn) {
2402 clusterMsg buf[1];
2403 ssize_t nread;
2404 clusterMsg *hdr;
2405 clusterLink *link = connGetPrivateData(conn);
2406 unsigned int readlen, rcvbuflen;
2407
2408 while(1) { /* Read as long as there is data to read. */
2409 rcvbuflen = link->rcvbuf_len;
2410 if (rcvbuflen < 8) {
2411 /* First, obtain the first 8 bytes to get the full message
2412 * length. */
2413 readlen = 8 - rcvbuflen;
2414 } else {
2415 /* Finally read the full message. */
2416 hdr = (clusterMsg*) link->rcvbuf;
2417 if (rcvbuflen == 8) {
2418 /* Perform some sanity check on the message signature
2419 * and length. */
2420 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2421 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2422 {
2423 serverLog(LL_WARNING,
2424 "Bad message length or signature received "
2425 "from Cluster bus.");
2426 handleLinkIOError(link);
2427 return;
2428 }
2429 }
2430 readlen = ntohl(hdr->totlen) - rcvbuflen;
2431 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2432 }
2433
2434 nread = connRead(conn,buf,readlen);
2435 if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
2436
2437 if (nread <= 0) {
2438 /* I/O error... */
2439 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2440 (nread == 0) ? "connection closed" : connGetLastError(conn));
2441 handleLinkIOError(link);
2442 return;
2443 } else {
2444 /* Read data and recast the pointer to the new buffer. */
2445 size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
2446 if ((size_t)nread > unused) {
2447 size_t required = link->rcvbuf_len + nread;
2448 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
2449 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
2450 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
2451 }
2452 memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
2453 link->rcvbuf_len += nread;
2454 hdr = (clusterMsg*) link->rcvbuf;
2455 rcvbuflen += nread;
2456 }
2457
2458 /* Total length obtained? Process this packet. */
2459 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2460 if (clusterProcessPacket(link)) {
2461 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
2462 zfree(link->rcvbuf);
2463 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
2464 }
2465 link->rcvbuf_len = 0;
2466 } else {
2467 return; /* Link no longer valid. */
2468 }
2469 }
2470 }
2471 }
2472
2473 /* Put stuff into the send buffer.
2474 *
2475 * It is guaranteed that this function will never have as a side effect
2476 * the link to be invalidated, so it is safe to call this function
2477 * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2478 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2479 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2480 connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
2481
2482 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2483
2484 /* Populate sent messages stats. */
2485 clusterMsg *hdr = (clusterMsg*) msg;
2486 uint16_t type = ntohs(hdr->type);
2487 if (type < CLUSTERMSG_TYPE_COUNT)
2488 server.cluster->stats_bus_messages_sent[type]++;
2489 }
2490
2491 /* Send a message to all the nodes that are part of the cluster having
2492 * a connected link.
2493 *
2494 * It is guaranteed that this function will never have as a side effect
2495 * some node->link to be invalidated, so it is safe to call this function
2496 * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2497 void clusterBroadcastMessage(void *buf, size_t len) {
2498 dictIterator *di;
2499 dictEntry *de;
2500
2501 di = dictGetSafeIterator(server.cluster->nodes);
2502 while((de = dictNext(di)) != NULL) {
2503 clusterNode *node = dictGetVal(de);
2504
2505 if (!node->link) continue;
2506 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2507 continue;
2508 clusterSendMessage(node->link,buf,len);
2509 }
2510 dictReleaseIterator(di);
2511 }
2512
2513 /* Build the message header. hdr must point to a buffer at least
2514 * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2515 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2516 int totlen = 0;
2517 uint64_t offset;
2518 clusterNode *master;
2519
2520 /* If this node is a master, we send its slots bitmap and configEpoch.
2521 * If this node is a slave we send the master's information instead (the
2522 * node is flagged as slave so the receiver knows that it is NOT really
2523 * in charge for this slots. */
2524 master = (nodeIsSlave(myself) && myself->slaveof) ?
2525 myself->slaveof : myself;
2526
2527 memset(hdr,0,sizeof(*hdr));
2528 hdr->ver = htons(CLUSTER_PROTO_VER);
2529 hdr->sig[0] = 'R';
2530 hdr->sig[1] = 'C';
2531 hdr->sig[2] = 'm';
2532 hdr->sig[3] = 'b';
2533 hdr->type = htons(type);
2534 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2535
2536 /* If cluster-announce-ip option is enabled, force the receivers of our
2537 * packets to use the specified address for this node. Otherwise if the
2538 * first byte is zero, they'll do auto discovery. */
2539 memset(hdr->myip,0,NET_IP_STR_LEN);
2540 if (server.cluster_announce_ip) {
2541 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
2542 hdr->myip[NET_IP_STR_LEN-1] = '\0';
2543 }
2544
2545 /* Handle cluster-announce-[tls-|bus-]port. */
2546 int announced_port, announced_pport, announced_cport;
2547 deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
2548
2549 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2550 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2551 if (myself->slaveof != NULL)
2552 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2553 hdr->port = htons(announced_port);
2554 hdr->pport = htons(announced_pport);
2555 hdr->cport = htons(announced_cport);
2556 hdr->flags = htons(myself->flags);
2557 hdr->state = server.cluster->state;
2558
2559 /* Set the currentEpoch and configEpochs. */
2560 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2561 hdr->configEpoch = htonu64(master->configEpoch);
2562
2563 /* Set the replication offset. */
2564 if (nodeIsSlave(myself))
2565 offset = replicationGetSlaveOffset();
2566 else
2567 offset = server.master_repl_offset;
2568 hdr->offset = htonu64(offset);
2569
2570 /* Set the message flags. */
2571 if (nodeIsMaster(myself) && server.cluster->mf_end)
2572 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2573
2574 /* Compute the message length for certain messages. For other messages
2575 * this is up to the caller. */
2576 if (type == CLUSTERMSG_TYPE_FAIL) {
2577 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2578 totlen += sizeof(clusterMsgDataFail);
2579 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2580 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2581 totlen += sizeof(clusterMsgDataUpdate);
2582 }
2583 hdr->totlen = htonl(totlen);
2584 /* For PING, PONG, MEET and other variable length messages fixing the
2585 * totlen field is up to the caller. */
2586 }
2587
2588 /* Return non zero if the node is already present in the gossip section of the
2589 * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2590 * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2591 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2592 int j;
2593 for (j = 0; j < count; j++) {
2594 if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2595 CLUSTER_NAMELEN) == 0) break;
2596 }
2597 return j != count;
2598 }
2599
2600 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2601 * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2602 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2603 clusterMsgDataGossip *gossip;
2604 gossip = &(hdr->data.ping.gossip[i]);
2605 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2606 gossip->ping_sent = htonl(n->ping_sent/1000);
2607 gossip->pong_received = htonl(n->pong_received/1000);
2608 memcpy(gossip->ip,n->ip,sizeof(n->ip));
2609 gossip->port = htons(n->port);
2610 gossip->cport = htons(n->cport);
2611 gossip->flags = htons(n->flags);
2612 gossip->pport = htons(n->pport);
2613 gossip->notused1 = 0;
2614 }
2615
2616 /* Send a PING or PONG packet to the specified node, making sure to add enough
2617 * gossip information. */
clusterSendPing(clusterLink * link,int type)2618 void clusterSendPing(clusterLink *link, int type) {
2619 unsigned char *buf;
2620 clusterMsg *hdr;
2621 int gossipcount = 0; /* Number of gossip sections added so far. */
2622 int wanted; /* Number of gossip sections we want to append if possible. */
2623 int totlen; /* Total packet length. */
2624 /* freshnodes is the max number of nodes we can hope to append at all:
2625 * nodes available minus two (ourself and the node we are sending the
2626 * message to). However practically there may be less valid nodes since
2627 * nodes in handshake state, disconnected, are not considered. */
2628 int freshnodes = dictSize(server.cluster->nodes)-2;
2629
2630 /* How many gossip sections we want to add? 1/10 of the number of nodes
2631 * and anyway at least 3. Why 1/10?
2632 *
2633 * If we have N masters, with N/10 entries, and we consider that in
2634 * node_timeout we exchange with each other node at least 4 packets
2635 * (we ping in the worst case in node_timeout/2 time, and we also
2636 * receive two pings from the host), we have a total of 8 packets
2637 * in the node_timeout*2 failure reports validity time. So we have
2638 * that, for a single PFAIL node, we can expect to receive the following
2639 * number of failure reports (in the specified window of time):
2640 *
2641 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2642 *
2643 * PROB = probability of being featured in a single gossip entry,
2644 * which is 1 / NUM_OF_NODES.
2645 * ENTRIES = 10.
2646 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2647 *
2648 * If we assume we have just masters (so num of nodes and num of masters
2649 * is the same), with 1/10 we always get over the majority, and specifically
2650 * 80% of the number of nodes, to account for many masters failing at the
2651 * same time.
2652 *
2653 * Since we have non-voting slaves that lower the probability of an entry
2654 * to feature our node, we set the number of entries per packet as
2655 * 10% of the total nodes we have. */
2656 wanted = floor(dictSize(server.cluster->nodes)/10);
2657 if (wanted < 3) wanted = 3;
2658 if (wanted > freshnodes) wanted = freshnodes;
2659
2660 /* Include all the nodes in PFAIL state, so that failure reports are
2661 * faster to propagate to go from PFAIL to FAIL state. */
2662 int pfail_wanted = server.cluster->stats_pfail_nodes;
2663
2664 /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
2665 * later according to the number of gossip sections we really were able
2666 * to put inside the packet. */
2667 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2668 totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2669 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2670 * sizeof(clusterMsg) or more. */
2671 if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2672 buf = zcalloc(totlen);
2673 hdr = (clusterMsg*) buf;
2674
2675 /* Populate the header. */
2676 if (link->node && type == CLUSTERMSG_TYPE_PING)
2677 link->node->ping_sent = mstime();
2678 clusterBuildMessageHdr(hdr,type);
2679
2680 /* Populate the gossip fields */
2681 int maxiterations = wanted*3;
2682 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2683 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2684 clusterNode *this = dictGetVal(de);
2685
2686 /* Don't include this node: the whole packet header is about us
2687 * already, so we just gossip about other nodes. */
2688 if (this == myself) continue;
2689
2690 /* PFAIL nodes will be added later. */
2691 if (this->flags & CLUSTER_NODE_PFAIL) continue;
2692
2693 /* In the gossip section don't include:
2694 * 1) Nodes in HANDSHAKE state.
2695 * 3) Nodes with the NOADDR flag set.
2696 * 4) Disconnected nodes if they don't have configured slots.
2697 */
2698 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2699 (this->link == NULL && this->numslots == 0))
2700 {
2701 freshnodes--; /* Technically not correct, but saves CPU. */
2702 continue;
2703 }
2704
2705 /* Do not add a node we already have. */
2706 if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2707
2708 /* Add it */
2709 clusterSetGossipEntry(hdr,gossipcount,this);
2710 freshnodes--;
2711 gossipcount++;
2712 }
2713
2714 /* If there are PFAIL nodes, add them at the end. */
2715 if (pfail_wanted) {
2716 dictIterator *di;
2717 dictEntry *de;
2718
2719 di = dictGetSafeIterator(server.cluster->nodes);
2720 while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2721 clusterNode *node = dictGetVal(de);
2722 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2723 if (node->flags & CLUSTER_NODE_NOADDR) continue;
2724 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2725 clusterSetGossipEntry(hdr,gossipcount,node);
2726 freshnodes--;
2727 gossipcount++;
2728 /* We take the count of the slots we allocated, since the
2729 * PFAIL stats may not match perfectly with the current number
2730 * of PFAIL nodes. */
2731 pfail_wanted--;
2732 }
2733 dictReleaseIterator(di);
2734 }
2735
2736 /* Ready to send... fix the totlen field and queue the message in the
2737 * output buffer. */
2738 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2739 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2740 hdr->count = htons(gossipcount);
2741 hdr->totlen = htonl(totlen);
2742 clusterSendMessage(link,buf,totlen);
2743 zfree(buf);
2744 }
2745
2746 /* Send a PONG packet to every connected node that's not in handshake state
2747 * and for which we have a valid link.
2748 *
2749 * In Redis Cluster pongs are not used just for failure detection, but also
2750 * to carry important configuration information. So broadcasting a pong is
2751 * useful when something changes in the configuration and we want to make
2752 * the cluster aware ASAP (for instance after a slave promotion).
2753 *
2754 * The 'target' argument specifies the receiving instances using the
2755 * defines below:
2756 *
2757 * CLUSTER_BROADCAST_ALL -> All known instances.
2758 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2759 */
2760 #define CLUSTER_BROADCAST_ALL 0
2761 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2762 void clusterBroadcastPong(int target) {
2763 dictIterator *di;
2764 dictEntry *de;
2765
2766 di = dictGetSafeIterator(server.cluster->nodes);
2767 while((de = dictNext(di)) != NULL) {
2768 clusterNode *node = dictGetVal(de);
2769
2770 if (!node->link) continue;
2771 if (node == myself || nodeInHandshake(node)) continue;
2772 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2773 int local_slave =
2774 nodeIsSlave(node) && node->slaveof &&
2775 (node->slaveof == myself || node->slaveof == myself->slaveof);
2776 if (!local_slave) continue;
2777 }
2778 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2779 }
2780 dictReleaseIterator(di);
2781 }
2782
2783 /* Send a PUBLISH message.
2784 *
2785 * If link is NULL, then the message is broadcasted to the whole cluster.
2786 *
2787 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
2788 * As all the struct is used as a buffer, when more than 8 bytes are copied into
2789 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
2790 * positive in this context. */
2791 REDIS_NO_SANITIZE("bounds")
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2792 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2793 unsigned char *payload;
2794 clusterMsg buf[1];
2795 clusterMsg *hdr = (clusterMsg*) buf;
2796 uint32_t totlen;
2797 uint32_t channel_len, message_len;
2798
2799 channel = getDecodedObject(channel);
2800 message = getDecodedObject(message);
2801 channel_len = sdslen(channel->ptr);
2802 message_len = sdslen(message->ptr);
2803
2804 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2805 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2806 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2807
2808 hdr->data.publish.msg.channel_len = htonl(channel_len);
2809 hdr->data.publish.msg.message_len = htonl(message_len);
2810 hdr->totlen = htonl(totlen);
2811
2812 /* Try to use the local buffer if possible */
2813 if (totlen < sizeof(buf)) {
2814 payload = (unsigned char*)buf;
2815 } else {
2816 payload = zmalloc(totlen);
2817 memcpy(payload,hdr,sizeof(*hdr));
2818 hdr = (clusterMsg*) payload;
2819 }
2820 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2821 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2822 message->ptr,sdslen(message->ptr));
2823
2824 if (link)
2825 clusterSendMessage(link,payload,totlen);
2826 else
2827 clusterBroadcastMessage(payload,totlen);
2828
2829 decrRefCount(channel);
2830 decrRefCount(message);
2831 if (payload != (unsigned char*)buf) zfree(payload);
2832 }
2833
2834 /* Send a FAIL message to all the nodes we are able to contact.
2835 * The FAIL message is sent when we detect that a node is failing
2836 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2837 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2838 * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2839 void clusterSendFail(char *nodename) {
2840 clusterMsg buf[1];
2841 clusterMsg *hdr = (clusterMsg*) buf;
2842
2843 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2844 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2845 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2846 }
2847
2848 /* Send an UPDATE message to the specified link carrying the specified 'node'
2849 * slots configuration. The node name, slots bitmap, and configEpoch info
2850 * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2851 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2852 clusterMsg buf[1];
2853 clusterMsg *hdr = (clusterMsg*) buf;
2854
2855 if (link == NULL) return;
2856 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2857 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2858 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2859 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2860 clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
2861 }
2862
2863 /* Send a MODULE message.
2864 *
2865 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2866 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2867 unsigned char *payload, uint32_t len) {
2868 unsigned char *heapbuf;
2869 clusterMsg buf[1];
2870 clusterMsg *hdr = (clusterMsg*) buf;
2871 uint32_t totlen;
2872
2873 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2874 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2875 totlen += sizeof(clusterMsgModule) - 3 + len;
2876
2877 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2878 hdr->data.module.msg.type = type;
2879 hdr->data.module.msg.len = htonl(len);
2880 hdr->totlen = htonl(totlen);
2881
2882 /* Try to use the local buffer if possible */
2883 if (totlen < sizeof(buf)) {
2884 heapbuf = (unsigned char*)buf;
2885 } else {
2886 heapbuf = zmalloc(totlen);
2887 memcpy(heapbuf,hdr,sizeof(*hdr));
2888 hdr = (clusterMsg*) heapbuf;
2889 }
2890 memcpy(hdr->data.module.msg.bulk_data,payload,len);
2891
2892 if (link)
2893 clusterSendMessage(link,heapbuf,totlen);
2894 else
2895 clusterBroadcastMessage(heapbuf,totlen);
2896
2897 if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
2898 }
2899
2900 /* This function gets a cluster node ID string as target, the same way the nodes
2901 * addresses are represented in the modules side, resolves the node, and sends
2902 * the message. If the target is NULL the message is broadcasted.
2903 *
2904 * The function returns C_OK if the target is valid, otherwise C_ERR is
2905 * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2906 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2907 clusterNode *node = NULL;
2908
2909 if (target != NULL) {
2910 node = clusterLookupNode(target);
2911 if (node == NULL || node->link == NULL) return C_ERR;
2912 }
2913
2914 clusterSendModule(target ? node->link : NULL,
2915 module_id, type, payload, len);
2916 return C_OK;
2917 }
2918
2919 /* -----------------------------------------------------------------------------
2920 * CLUSTER Pub/Sub support
2921 *
2922 * For now we do very little, just propagating PUBLISH messages across the whole
2923 * cluster. In the future we'll try to get smarter and avoiding propagating those
2924 * messages to hosts without receives for a given channel.
2925 * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2926 void clusterPropagatePublish(robj *channel, robj *message) {
2927 clusterSendPublish(NULL, channel, message);
2928 }
2929
2930 /* -----------------------------------------------------------------------------
2931 * SLAVE node specific functions
2932 * -------------------------------------------------------------------------- */
2933
2934 /* This function sends a FAILOVER_AUTH_REQUEST message to every node in order to
2935 * see if there is the quorum for this slave instance to failover its failing
2936 * master.
2937 *
2938 * Note that we send the failover request to everybody, master and slave nodes,
2939 * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2940 void clusterRequestFailoverAuth(void) {
2941 clusterMsg buf[1];
2942 clusterMsg *hdr = (clusterMsg*) buf;
2943 uint32_t totlen;
2944
2945 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2946 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2947 * in the header to communicate the nodes receiving the message that
2948 * they should authorized the failover even if the master is working. */
2949 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2950 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2951 hdr->totlen = htonl(totlen);
2952 clusterBroadcastMessage(buf,totlen);
2953 }
2954
2955 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2956 void clusterSendFailoverAuth(clusterNode *node) {
2957 clusterMsg buf[1];
2958 clusterMsg *hdr = (clusterMsg*) buf;
2959 uint32_t totlen;
2960
2961 if (!node->link) return;
2962 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2963 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2964 hdr->totlen = htonl(totlen);
2965 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2966 }
2967
2968 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2969 void clusterSendMFStart(clusterNode *node) {
2970 clusterMsg buf[1];
2971 clusterMsg *hdr = (clusterMsg*) buf;
2972 uint32_t totlen;
2973
2974 if (!node->link) return;
2975 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2976 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2977 hdr->totlen = htonl(totlen);
2978 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2979 }
2980
2981 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2982 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2983 clusterNode *master = node->slaveof;
2984 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2985 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2986 unsigned char *claimed_slots = request->myslots;
2987 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2988 int j;
2989
2990 /* IF we are not a master serving at least 1 slot, we don't have the
2991 * right to vote, as the cluster size in Redis Cluster is the number
2992 * of masters serving at least one slot, and quorum is the cluster
2993 * size + 1 */
2994 if (nodeIsSlave(myself) || myself->numslots == 0) return;
2995
2996 /* Request epoch must be >= our currentEpoch.
2997 * Note that it is impossible for it to actually be greater since
2998 * our currentEpoch was updated as a side effect of receiving this
2999 * request, if the request epoch was greater. */
3000 if (requestCurrentEpoch < server.cluster->currentEpoch) {
3001 serverLog(LL_WARNING,
3002 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
3003 node->name,
3004 (unsigned long long) requestCurrentEpoch,
3005 (unsigned long long) server.cluster->currentEpoch);
3006 return;
3007 }
3008
3009 /* I already voted for this epoch? Return ASAP. */
3010 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
3011 serverLog(LL_WARNING,
3012 "Failover auth denied to %.40s: already voted for epoch %llu",
3013 node->name,
3014 (unsigned long long) server.cluster->currentEpoch);
3015 return;
3016 }
3017
3018 /* Node must be a slave and its master down.
3019 * The master can be non failing if the request is flagged
3020 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
3021 if (nodeIsMaster(node) || master == NULL ||
3022 (!nodeFailed(master) && !force_ack))
3023 {
3024 if (nodeIsMaster(node)) {
3025 serverLog(LL_WARNING,
3026 "Failover auth denied to %.40s: it is a master node",
3027 node->name);
3028 } else if (master == NULL) {
3029 serverLog(LL_WARNING,
3030 "Failover auth denied to %.40s: I don't know its master",
3031 node->name);
3032 } else if (!nodeFailed(master)) {
3033 serverLog(LL_WARNING,
3034 "Failover auth denied to %.40s: its master is up",
3035 node->name);
3036 }
3037 return;
3038 }
3039
3040 /* We did not voted for a slave about this master for two
3041 * times the node timeout. This is not strictly needed for correctness
3042 * of the algorithm but makes the base case more linear. */
3043 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
3044 {
3045 serverLog(LL_WARNING,
3046 "Failover auth denied to %.40s: "
3047 "can't vote about this master before %lld milliseconds",
3048 node->name,
3049 (long long) ((server.cluster_node_timeout*2)-
3050 (mstime() - node->slaveof->voted_time)));
3051 return;
3052 }
3053
3054 /* The slave requesting the vote must have a configEpoch for the claimed
3055 * slots that is >= the one of the masters currently serving the same
3056 * slots in the current configuration. */
3057 for (j = 0; j < CLUSTER_SLOTS; j++) {
3058 if (bitmapTestBit(claimed_slots, j) == 0) continue;
3059 if (server.cluster->slots[j] == NULL ||
3060 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
3061 {
3062 continue;
3063 }
3064 /* If we reached this point we found a slot that in our current slots
3065 * is served by a master with a greater configEpoch than the one claimed
3066 * by the slave requesting our vote. Refuse to vote for this slave. */
3067 serverLog(LL_WARNING,
3068 "Failover auth denied to %.40s: "
3069 "slot %d epoch (%llu) > reqEpoch (%llu)",
3070 node->name, j,
3071 (unsigned long long) server.cluster->slots[j]->configEpoch,
3072 (unsigned long long) requestConfigEpoch);
3073 return;
3074 }
3075
3076 /* We can vote for this slave. */
3077 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
3078 node->slaveof->voted_time = mstime();
3079 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
3080 clusterSendFailoverAuth(node);
3081 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
3082 node->name, (unsigned long long) server.cluster->currentEpoch);
3083 }
3084
3085 /* This function returns the "rank" of this instance, a slave, in the context
3086 * of its master-slaves ring. The rank of the slave is given by the number of
3087 * other slaves for the same master that have a better replication offset
3088 * compared to the local one (better means, greater, so they claim more data).
3089 *
3090 * A slave with rank 0 is the one with the greatest (most up to date)
3091 * replication offset, and so forth. Note that because how the rank is computed
3092 * multiple slaves may have the same rank, in case they have the same offset.
3093 *
3094 * The slave rank is used to add a delay to start an election in order to
3095 * get voted and replace a failing master. Slaves with better replication
3096 * offsets are more likely to win. */
clusterGetSlaveRank(void)3097 int clusterGetSlaveRank(void) {
3098 long long myoffset;
3099 int j, rank = 0;
3100 clusterNode *master;
3101
3102 serverAssert(nodeIsSlave(myself));
3103 master = myself->slaveof;
3104 if (master == NULL) return 0; /* Never called by slaves without master. */
3105
3106 myoffset = replicationGetSlaveOffset();
3107 for (j = 0; j < master->numslaves; j++)
3108 if (master->slaves[j] != myself &&
3109 !nodeCantFailover(master->slaves[j]) &&
3110 master->slaves[j]->repl_offset > myoffset) rank++;
3111 return rank;
3112 }
3113
3114 /* This function is called by clusterHandleSlaveFailover() in order to
3115 * let the slave log why it is not able to failover. Sometimes there are
3116 * not the conditions, but since the failover function is called again and
3117 * again, we can't log the same things continuously.
3118 *
3119 * This function works by logging only if a given set of conditions are
3120 * true:
3121 *
3122 * 1) The reason for which the failover can't be initiated changed.
3123 * The reasons also include a NONE reason we reset the state to
3124 * when the slave finds that its master is fine (no FAIL flag).
3125 * 2) Also, the log is emitted again if the master is still down and
3126 * the reason for not failing over is still the same, but more than
3127 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
3128 * 3) Finally, the function only logs if the slave is down for more than
3129 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
3130 * failover starts in a reasonable time.
3131 *
3132 * The function is called with the reason why the slave can't failover
3133 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
3134 *
3135 * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)3136 void clusterLogCantFailover(int reason) {
3137 char *msg;
3138 static time_t lastlog_time = 0;
3139 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
3140
3141 /* Don't log if we have the same reason for some time. */
3142 if (reason == server.cluster->cant_failover_reason &&
3143 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
3144 return;
3145
3146 server.cluster->cant_failover_reason = reason;
3147
3148 /* We also don't emit any log if the master failed no long ago, the
3149 * goal of this function is to log slaves in a stalled condition for
3150 * a long time. */
3151 if (myself->slaveof &&
3152 nodeFailed(myself->slaveof) &&
3153 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
3154
3155 switch(reason) {
3156 case CLUSTER_CANT_FAILOVER_DATA_AGE:
3157 msg = "Disconnected from master for longer than allowed. "
3158 "Please check the 'cluster-replica-validity-factor' configuration "
3159 "option.";
3160 break;
3161 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
3162 msg = "Waiting the delay before I can start a new failover.";
3163 break;
3164 case CLUSTER_CANT_FAILOVER_EXPIRED:
3165 msg = "Failover attempt expired.";
3166 break;
3167 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
3168 msg = "Waiting for votes, but majority still not reached.";
3169 break;
3170 default:
3171 msg = "Unknown reason code.";
3172 break;
3173 }
3174 lastlog_time = time(NULL);
3175 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
3176 }
3177
3178 /* This function implements the final part of automatic and manual failovers,
3179 * where the slave grabs its master's hash slots, and propagates the new
3180 * configuration.
3181 *
3182 * Note that it's up to the caller to be sure that the node got a new
3183 * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)3184 void clusterFailoverReplaceYourMaster(void) {
3185 int j;
3186 clusterNode *oldmaster = myself->slaveof;
3187
3188 if (nodeIsMaster(myself) || oldmaster == NULL) return;
3189
3190 /* 1) Turn this node into a master. */
3191 clusterSetNodeAsMaster(myself);
3192 replicationUnsetMaster();
3193
3194 /* 2) Claim all the slots assigned to our master. */
3195 for (j = 0; j < CLUSTER_SLOTS; j++) {
3196 if (clusterNodeGetSlotBit(oldmaster,j)) {
3197 clusterDelSlot(j);
3198 clusterAddSlot(myself,j);
3199 }
3200 }
3201
3202 /* 3) Update state and save config. */
3203 clusterUpdateState();
3204 clusterSaveConfigOrDie(1);
3205
3206 /* 4) Pong all the other nodes so that they can update the state
3207 * accordingly and detect that we switched to master role. */
3208 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
3209
3210 /* 5) If there was a manual failover in progress, clear the state. */
3211 resetManualFailover();
3212 }
3213
3214 /* This function is called if we are a slave node and our master serving
3215 * a non-zero amount of hash slots is in FAIL state.
3216 *
3217 * The goal of this function is:
3218 * 1) To check if we are able to perform a failover, is our data updated?
3219 * 2) Try to get elected by masters.
3220 * 3) Perform the failover informing all the other nodes.
3221 */
clusterHandleSlaveFailover(void)3222 void clusterHandleSlaveFailover(void) {
3223 mstime_t data_age;
3224 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
3225 int needed_quorum = (server.cluster->size / 2) + 1;
3226 int manual_failover = server.cluster->mf_end != 0 &&
3227 server.cluster->mf_can_start;
3228 mstime_t auth_timeout, auth_retry_time;
3229
3230 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
3231
3232 /* Compute the failover timeout (the max time we have to send votes
3233 * and wait for replies), and the failover retry time (the time to wait
3234 * before trying to get voted again).
3235 *
3236 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3237 * Retry is two times the Timeout.
3238 */
3239 auth_timeout = server.cluster_node_timeout*2;
3240 if (auth_timeout < 2000) auth_timeout = 2000;
3241 auth_retry_time = auth_timeout*2;
3242
3243 /* Pre conditions to run the function, that must be met both in case
3244 * of an automatic or manual failover:
3245 * 1) We are a slave.
3246 * 2) Our master is flagged as FAIL, or this is a manual failover.
3247 * 3) We don't have the no failover configuration set, and this is
3248 * not a manual failover.
3249 * 4) It is serving slots. */
3250 if (nodeIsMaster(myself) ||
3251 myself->slaveof == NULL ||
3252 (!nodeFailed(myself->slaveof) && !manual_failover) ||
3253 (server.cluster_slave_no_failover && !manual_failover) ||
3254 myself->slaveof->numslots == 0)
3255 {
3256 /* There are no reasons to failover, so we set the reason why we
3257 * are returning without failing over to NONE. */
3258 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3259 return;
3260 }
3261
3262 /* Set data_age to the number of milliseconds we are disconnected from
3263 * the master. */
3264 if (server.repl_state == REPL_STATE_CONNECTED) {
3265 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3266 * 1000;
3267 } else {
3268 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3269 }
3270
3271 /* Remove the node timeout from the data age as it is fine that we are
3272 * disconnected from our master at least for the time it was down to be
3273 * flagged as FAIL, that's the baseline. */
3274 if (data_age > server.cluster_node_timeout)
3275 data_age -= server.cluster_node_timeout;
3276
3277 /* Check if our data is recent enough according to the slave validity
3278 * factor configured by the user.
3279 *
3280 * Check bypassed for manual failovers. */
3281 if (server.cluster_slave_validity_factor &&
3282 data_age >
3283 (((mstime_t)server.repl_ping_slave_period * 1000) +
3284 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3285 {
3286 if (!manual_failover) {
3287 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3288 return;
3289 }
3290 }
3291
3292 /* If the previous failover attempt timeout and the retry time has
3293 * elapsed, we can setup a new one. */
3294 if (auth_age > auth_retry_time) {
3295 server.cluster->failover_auth_time = mstime() +
3296 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3297 random() % 500; /* Random delay between 0 and 500 milliseconds. */
3298 server.cluster->failover_auth_count = 0;
3299 server.cluster->failover_auth_sent = 0;
3300 server.cluster->failover_auth_rank = clusterGetSlaveRank();
3301 /* We add another delay that is proportional to the slave rank.
3302 * Specifically 1 second * rank. This way slaves that have a probably
3303 * less updated replication offset, are penalized. */
3304 server.cluster->failover_auth_time +=
3305 server.cluster->failover_auth_rank * 1000;
3306 /* However if this is a manual failover, no delay is needed. */
3307 if (server.cluster->mf_end) {
3308 server.cluster->failover_auth_time = mstime();
3309 server.cluster->failover_auth_rank = 0;
3310 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3311 }
3312 serverLog(LL_WARNING,
3313 "Start of election delayed for %lld milliseconds "
3314 "(rank #%d, offset %lld).",
3315 server.cluster->failover_auth_time - mstime(),
3316 server.cluster->failover_auth_rank,
3317 replicationGetSlaveOffset());
3318 /* Now that we have a scheduled election, broadcast our offset
3319 * to all the other slaves so that they'll updated their offsets
3320 * if our offset is better. */
3321 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3322 return;
3323 }
3324
3325 /* It is possible that we received more updated offsets from other
3326 * slaves for the same master since we computed our election delay.
3327 * Update the delay if our rank changed.
3328 *
3329 * Not performed if this is a manual failover. */
3330 if (server.cluster->failover_auth_sent == 0 &&
3331 server.cluster->mf_end == 0)
3332 {
3333 int newrank = clusterGetSlaveRank();
3334 if (newrank > server.cluster->failover_auth_rank) {
3335 long long added_delay =
3336 (newrank - server.cluster->failover_auth_rank) * 1000;
3337 server.cluster->failover_auth_time += added_delay;
3338 server.cluster->failover_auth_rank = newrank;
3339 serverLog(LL_WARNING,
3340 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3341 newrank, added_delay);
3342 }
3343 }
3344
3345 /* Return ASAP if we can't still start the election. */
3346 if (mstime() < server.cluster->failover_auth_time) {
3347 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3348 return;
3349 }
3350
3351 /* Return ASAP if the election is too old to be valid. */
3352 if (auth_age > auth_timeout) {
3353 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3354 return;
3355 }
3356
3357 /* Ask for votes if needed. */
3358 if (server.cluster->failover_auth_sent == 0) {
3359 server.cluster->currentEpoch++;
3360 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3361 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3362 (unsigned long long) server.cluster->currentEpoch);
3363 clusterRequestFailoverAuth();
3364 server.cluster->failover_auth_sent = 1;
3365 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3366 CLUSTER_TODO_UPDATE_STATE|
3367 CLUSTER_TODO_FSYNC_CONFIG);
3368 return; /* Wait for replies. */
3369 }
3370
3371 /* Check if we reached the quorum. */
3372 if (server.cluster->failover_auth_count >= needed_quorum) {
3373 /* We have the quorum, we can finally failover the master. */
3374
3375 serverLog(LL_WARNING,
3376 "Failover election won: I'm the new master.");
3377
3378 /* Update my configEpoch to the epoch of the election. */
3379 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3380 myself->configEpoch = server.cluster->failover_auth_epoch;
3381 serverLog(LL_WARNING,
3382 "configEpoch set to %llu after successful failover",
3383 (unsigned long long) myself->configEpoch);
3384 }
3385
3386 /* Take responsibility for the cluster slots. */
3387 clusterFailoverReplaceYourMaster();
3388 } else {
3389 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3390 }
3391 }
3392
3393 /* -----------------------------------------------------------------------------
3394 * CLUSTER slave migration
3395 *
3396 * Slave migration is the process that allows a slave of a master that is
3397 * already covered by at least another slave, to "migrate" to a master that
3398 * is orphaned, that is, left with no working slaves.
3399 * ------------------------------------------------------------------------- */
3400
3401 /* This function is responsible to decide if this replica should be migrated
3402 * to a different (orphaned) master. It is called by the clusterCron() function
3403 * only if:
3404 *
3405 * 1) We are a slave node.
3406 * 2) It was detected that there is at least one orphaned master in
3407 * the cluster.
3408 * 3) We are a slave of one of the masters with the greatest number of
3409 * slaves.
3410 *
3411 * This checks are performed by the caller since it requires to iterate
3412 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3413 * if definitely needed.
3414 *
3415 * The function is called with a pre-computed max_slaves, that is the max
3416 * number of working (not in FAIL state) slaves for a single master.
3417 *
3418 * Additional conditions for migration are examined inside the function.
3419 */
clusterHandleSlaveMigration(int max_slaves)3420 void clusterHandleSlaveMigration(int max_slaves) {
3421 int j, okslaves = 0;
3422 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3423 dictIterator *di;
3424 dictEntry *de;
3425
3426 /* Step 1: Don't migrate if the cluster state is not ok. */
3427 if (server.cluster->state != CLUSTER_OK) return;
3428
3429 /* Step 2: Don't migrate if my master will not be left with at least
3430 * 'migration-barrier' slaves after my migration. */
3431 if (mymaster == NULL) return;
3432 for (j = 0; j < mymaster->numslaves; j++)
3433 if (!nodeFailed(mymaster->slaves[j]) &&
3434 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3435 if (okslaves <= server.cluster_migration_barrier) return;
3436
3437 /* Step 3: Identify a candidate for migration, and check if among the
3438 * masters with the greatest number of ok slaves, I'm the one with the
3439 * smallest node ID (the "candidate slave").
3440 *
3441 * Note: this means that eventually a replica migration will occur
3442 * since slaves that are reachable again always have their FAIL flag
3443 * cleared, so eventually there must be a candidate.
3444 * There is a possible race condition causing multiple
3445 * slaves to migrate at the same time, but this is unlikely to
3446 * happen and relatively harmless when it does. */
3447 candidate = myself;
3448 di = dictGetSafeIterator(server.cluster->nodes);
3449 while((de = dictNext(di)) != NULL) {
3450 clusterNode *node = dictGetVal(de);
3451 int okslaves = 0, is_orphaned = 1;
3452
3453 /* We want to migrate only if this master is working, orphaned, and
3454 * used to have slaves or if failed over a master that had slaves
3455 * (MIGRATE_TO flag). This way we only migrate to instances that were
3456 * supposed to have replicas. */
3457 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3458 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3459
3460 /* Check number of working slaves. */
3461 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3462 if (okslaves > 0) is_orphaned = 0;
3463
3464 if (is_orphaned) {
3465 if (!target && node->numslots > 0) target = node;
3466
3467 /* Track the starting time of the orphaned condition for this
3468 * master. */
3469 if (!node->orphaned_time) node->orphaned_time = mstime();
3470 } else {
3471 node->orphaned_time = 0;
3472 }
3473
3474 /* Check if I'm the slave candidate for the migration: attached
3475 * to a master with the maximum number of slaves and with the smallest
3476 * node ID. */
3477 if (okslaves == max_slaves) {
3478 for (j = 0; j < node->numslaves; j++) {
3479 if (memcmp(node->slaves[j]->name,
3480 candidate->name,
3481 CLUSTER_NAMELEN) < 0)
3482 {
3483 candidate = node->slaves[j];
3484 }
3485 }
3486 }
3487 }
3488 dictReleaseIterator(di);
3489
3490 /* Step 4: perform the migration if there is a target, and if I'm the
3491 * candidate, but only if the master is continuously orphaned for a
3492 * couple of seconds, so that during failovers, we give some time to
3493 * the natural slaves of this instance to advertise their switch from
3494 * the old master to the new one. */
3495 if (target && candidate == myself &&
3496 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3497 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3498 {
3499 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3500 target->name);
3501 clusterSetMaster(target);
3502 }
3503 }
3504
3505 /* -----------------------------------------------------------------------------
3506 * CLUSTER manual failover
3507 *
3508 * This are the important steps performed by slaves during a manual failover:
3509 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3510 * setting mf_end to the millisecond unix time at which we'll abort the
3511 * attempt.
3512 * 2) Slave sends a MFSTART message to the master requesting to pause clients
3513 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3514 * When master is paused for manual failover, it also starts to flag
3515 * packets with CLUSTERMSG_FLAG0_PAUSED.
3516 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3517 * 4) If slave received the offset from the master, and its offset matches,
3518 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3519 * the failover as usually, with the difference that the vote request
3520 * will be modified to force masters to vote for a slave that has a
3521 * working master.
3522 *
3523 * From the point of view of the master things are simpler: when a
3524 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3525 * the sender in mf_slave. During the time limit for the manual failover
3526 * the master will just send PINGs more often to this slave, flagged with
3527 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3528 * a packet from the master with this flag set.
3529 *
3530 * The goal of the manual failover is to perform a fast failover without
3531 * data loss due to the asynchronous master-slave replication.
3532 * -------------------------------------------------------------------------- */
3533
3534 /* Reset the manual failover state. This works for both masters and slaves
3535 * as all the state about manual failover is cleared.
3536 *
3537 * The function can be used both to initialize the manual failover state at
3538 * startup or to abort a manual failover in progress. */
resetManualFailover(void)3539 void resetManualFailover(void) {
3540 if (server.cluster->mf_slave) {
3541 /* We were a master failing over, so we paused clients. Regardless
3542 * of the outcome we unpause now to allow traffic again. */
3543 unpauseClients();
3544 }
3545 server.cluster->mf_end = 0; /* No manual failover in progress. */
3546 server.cluster->mf_can_start = 0;
3547 server.cluster->mf_slave = NULL;
3548 server.cluster->mf_master_offset = -1;
3549 }
3550
3551 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3552 void manualFailoverCheckTimeout(void) {
3553 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3554 serverLog(LL_WARNING,"Manual failover timed out.");
3555 resetManualFailover();
3556 }
3557 }
3558
3559 /* This function is called from the cluster cron function in order to go
3560 * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3561 void clusterHandleManualFailover(void) {
3562 /* Return ASAP if no manual failover is in progress. */
3563 if (server.cluster->mf_end == 0) return;
3564
3565 /* If mf_can_start is non-zero, the failover was already triggered so the
3566 * next steps are performed by clusterHandleSlaveFailover(). */
3567 if (server.cluster->mf_can_start) return;
3568
3569 if (server.cluster->mf_master_offset == -1) return; /* Wait for offset... */
3570
3571 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3572 /* Our replication offset matches the master replication offset
3573 * announced after clients were paused. We can start the failover. */
3574 server.cluster->mf_can_start = 1;
3575 serverLog(LL_WARNING,
3576 "All master replication stream processed, "
3577 "manual failover can start.");
3578 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3579 return;
3580 }
3581 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
3582 }
3583
3584 /* -----------------------------------------------------------------------------
3585 * CLUSTER cron job
3586 * -------------------------------------------------------------------------- */
3587
3588 /* Check if the node is disconnected and re-establish the connection.
3589 * Also update a few stats while we are here, that can be used to make
3590 * better decisions in other part of the code. */
clusterNodeCronHandleReconnect(clusterNode * node,mstime_t handshake_timeout,mstime_t now)3591 int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
3592 /* Not interested in reconnecting the link with myself or nodes
3593 * for which we have no address. */
3594 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
3595
3596 if (node->flags & CLUSTER_NODE_PFAIL)
3597 server.cluster->stats_pfail_nodes++;
3598
3599 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3600 * configured node timeout. */
3601 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3602 clusterDelNode(node);
3603 return 1;
3604 }
3605
3606 if (node->link == NULL) {
3607 clusterLink *link = createClusterLink(node);
3608 link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
3609 connSetPrivateData(link->conn, link);
3610 if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
3611 clusterLinkConnectHandler) == -1) {
3612 /* We got a synchronous error from connect before
3613 * clusterSendPing() had a chance to be called.
3614 * If node->ping_sent is zero, failure detection can't work,
3615 * so we claim we actually sent a ping now (that will
3616 * be really sent as soon as the link is obtained). */
3617 if (node->ping_sent == 0) node->ping_sent = mstime();
3618 serverLog(LL_DEBUG, "Unable to connect to "
3619 "Cluster Node [%s]:%d -> %s", node->ip,
3620 node->cport, server.neterr);
3621
3622 freeClusterLink(link);
3623 return 0;
3624 }
3625 node->link = link;
3626 }
3627 return 0;
3628 }
3629
3630 /* Resize the send buffer of a node if it is wasting
3631 * enough space. */
clusterNodeCronResizeBuffers(clusterNode * node)3632 int clusterNodeCronResizeBuffers(clusterNode *node) {
3633 /* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
3634 * We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
3635 if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) {
3636 node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf);
3637 }
3638 return 0;
3639 }
3640
3641 /* This is executed 10 times every second */
clusterCron(void)3642 void clusterCron(void) {
3643 dictIterator *di;
3644 dictEntry *de;
3645 int update_state = 0;
3646 int orphaned_masters; /* How many masters there are without ok slaves. */
3647 int max_slaves; /* Max number of ok slaves for a single master. */
3648 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3649 mstime_t min_pong = 0, now = mstime();
3650 clusterNode *min_pong_node = NULL;
3651 static unsigned long long iteration = 0;
3652 mstime_t handshake_timeout;
3653
3654 iteration++; /* Number of times this function was called so far. */
3655
3656 /* The handshake timeout is the time after which a handshake node that was
3657 * not turned into a normal node is removed from the nodes. Usually it is
3658 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3659 * the value of 1 second. */
3660 handshake_timeout = server.cluster_node_timeout;
3661 if (handshake_timeout < 1000) handshake_timeout = 1000;
3662
3663 /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
3664 server.cluster->stats_pfail_nodes = 0;
3665 /* Run through some of the operations we want to do on each cluster node. */
3666 di = dictGetSafeIterator(server.cluster->nodes);
3667 while((de = dictNext(di)) != NULL) {
3668 clusterNode *node = dictGetVal(de);
3669 /* The protocol is that they return non-zero if the node was
3670 * terminated. */
3671 if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
3672 if(clusterNodeCronResizeBuffers(node)) continue;
3673 }
3674 dictReleaseIterator(di);
3675
3676 /* Ping some random node 1 time every 10 iterations, so that we usually ping
3677 * one random node every second. */
3678 if (!(iteration % 10)) {
3679 int j;
3680
3681 /* Check a few random nodes and ping the one with the oldest
3682 * pong_received time. */
3683 for (j = 0; j < 5; j++) {
3684 de = dictGetRandomKey(server.cluster->nodes);
3685 clusterNode *this = dictGetVal(de);
3686
3687 /* Don't ping nodes disconnected or with a ping currently active. */
3688 if (this->link == NULL || this->ping_sent != 0) continue;
3689 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3690 continue;
3691 if (min_pong_node == NULL || min_pong > this->pong_received) {
3692 min_pong_node = this;
3693 min_pong = this->pong_received;
3694 }
3695 }
3696 if (min_pong_node) {
3697 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3698 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3699 }
3700 }
3701
3702 /* Iterate nodes to check if we need to flag something as failing.
3703 * This loop is also responsible to:
3704 * 1) Check if there are orphaned masters (masters without non failing
3705 * slaves).
3706 * 2) Count the max number of non failing slaves for a single master.
3707 * 3) Count the number of slaves for our master, if we are a slave. */
3708 orphaned_masters = 0;
3709 max_slaves = 0;
3710 this_slaves = 0;
3711 di = dictGetSafeIterator(server.cluster->nodes);
3712 while((de = dictNext(di)) != NULL) {
3713 clusterNode *node = dictGetVal(de);
3714 now = mstime(); /* Use an updated time at every iteration. */
3715
3716 if (node->flags &
3717 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3718 continue;
3719
3720 /* Orphaned master check, useful only if the current instance
3721 * is a slave that may migrate to another master. */
3722 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3723 int okslaves = clusterCountNonFailingSlaves(node);
3724
3725 /* A master is orphaned if it is serving a non-zero number of
3726 * slots, have no working slaves, but used to have at least one
3727 * slave, or failed over a master that used to have slaves. */
3728 if (okslaves == 0 && node->numslots > 0 &&
3729 node->flags & CLUSTER_NODE_MIGRATE_TO)
3730 {
3731 orphaned_masters++;
3732 }
3733 if (okslaves > max_slaves) max_slaves = okslaves;
3734 if (nodeIsSlave(myself) && myself->slaveof == node)
3735 this_slaves = okslaves;
3736 }
3737
3738 /* If we are not receiving any data for more than half the cluster
3739 * timeout, reconnect the link: maybe there is a connection
3740 * issue even if the node is alive. */
3741 mstime_t ping_delay = now - node->ping_sent;
3742 mstime_t data_delay = now - node->data_received;
3743 if (node->link && /* is connected */
3744 now - node->link->ctime >
3745 server.cluster_node_timeout && /* was not already reconnected */
3746 node->ping_sent && /* we already sent a ping */
3747 /* and we are waiting for the pong more than timeout/2 */
3748 ping_delay > server.cluster_node_timeout/2 &&
3749 /* and in such interval we are not seeing any traffic at all. */
3750 data_delay > server.cluster_node_timeout/2)
3751 {
3752 /* Disconnect the link, it will be reconnected automatically. */
3753 freeClusterLink(node->link);
3754 }
3755
3756 /* If we have currently no active ping in this instance, and the
3757 * received PONG is older than half the cluster timeout, send
3758 * a new ping now, to ensure all the nodes are pinged without
3759 * a too big delay. */
3760 if (node->link &&
3761 node->ping_sent == 0 &&
3762 (now - node->pong_received) > server.cluster_node_timeout/2)
3763 {
3764 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3765 continue;
3766 }
3767
3768 /* If we are a master and one of the slaves requested a manual
3769 * failover, ping it continuously. */
3770 if (server.cluster->mf_end &&
3771 nodeIsMaster(myself) &&
3772 server.cluster->mf_slave == node &&
3773 node->link)
3774 {
3775 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3776 continue;
3777 }
3778
3779 /* Check only if we have an active ping for this instance. */
3780 if (node->ping_sent == 0) continue;
3781
3782 /* Check if this node looks unreachable.
3783 * Note that if we already received the PONG, then node->ping_sent
3784 * is zero, so can't reach this code at all, so we don't risk of
3785 * checking for a PONG delay if we didn't sent the PING.
3786 *
3787 * We also consider every incoming data as proof of liveness, since
3788 * our cluster bus link is also used for data: under heavy data
3789 * load pong delays are possible. */
3790 mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
3791 data_delay;
3792
3793 if (node_delay > server.cluster_node_timeout) {
3794 /* Timeout reached. Set the node as possibly failing if it is
3795 * not already in this state. */
3796 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3797 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3798 node->name);
3799 node->flags |= CLUSTER_NODE_PFAIL;
3800 update_state = 1;
3801 }
3802 }
3803 }
3804 dictReleaseIterator(di);
3805
3806 /* If we are a slave node but the replication is still turned off,
3807 * enable it if we know the address of our master and it appears to
3808 * be up. */
3809 if (nodeIsSlave(myself) &&
3810 server.masterhost == NULL &&
3811 myself->slaveof &&
3812 nodeHasAddr(myself->slaveof))
3813 {
3814 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3815 }
3816
3817 /* Abort a manual failover if the timeout is reached. */
3818 manualFailoverCheckTimeout();
3819
3820 if (nodeIsSlave(myself)) {
3821 clusterHandleManualFailover();
3822 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3823 clusterHandleSlaveFailover();
3824 /* If there are orphaned slaves, and we are a slave among the masters
3825 * with the max number of non-failing slaves, consider migrating to
3826 * the orphaned masters. Note that it does not make sense to try
3827 * a migration if there is no master with at least *two* working
3828 * slaves. */
3829 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
3830 server.cluster_allow_replica_migration)
3831 clusterHandleSlaveMigration(max_slaves);
3832 }
3833
3834 if (update_state || server.cluster->state == CLUSTER_FAIL)
3835 clusterUpdateState();
3836 }
3837
3838 /* This function is called before the event handler returns to sleep for
3839 * events. It is useful to perform operations that must be done ASAP in
3840 * reaction to events fired but that are not safe to perform inside event
3841 * handlers, or to perform potentially expansive tasks that we need to do
3842 * a single time before replying to clients. */
clusterBeforeSleep(void)3843 void clusterBeforeSleep(void) {
3844 int flags = server.cluster->todo_before_sleep;
3845
3846 /* Reset our flags (not strictly needed since every single function
3847 * called for flags set should be able to clear its flag). */
3848 server.cluster->todo_before_sleep = 0;
3849
3850 if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
3851 /* Handle manual failover as soon as possible so that won't have a 100ms
3852 * as it was handled only in clusterCron */
3853 if(nodeIsSlave(myself)) {
3854 clusterHandleManualFailover();
3855 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3856 clusterHandleSlaveFailover();
3857 }
3858 } else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
3859 /* Handle failover, this is needed when it is likely that there is already
3860 * the quorum from masters in order to react fast. */
3861 clusterHandleSlaveFailover();
3862 }
3863
3864 /* Update the cluster state. */
3865 if (flags & CLUSTER_TODO_UPDATE_STATE)
3866 clusterUpdateState();
3867
3868 /* Save the config, possibly using fsync. */
3869 if (flags & CLUSTER_TODO_SAVE_CONFIG) {
3870 int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
3871 clusterSaveConfigOrDie(fsync);
3872 }
3873 }
3874
clusterDoBeforeSleep(int flags)3875 void clusterDoBeforeSleep(int flags) {
3876 server.cluster->todo_before_sleep |= flags;
3877 }
3878
3879 /* -----------------------------------------------------------------------------
3880 * Slots management
3881 * -------------------------------------------------------------------------- */
3882
3883 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3884 * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3885 int bitmapTestBit(unsigned char *bitmap, int pos) {
3886 off_t byte = pos/8;
3887 int bit = pos&7;
3888 return (bitmap[byte] & (1<<bit)) != 0;
3889 }
3890
3891 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3892 void bitmapSetBit(unsigned char *bitmap, int pos) {
3893 off_t byte = pos/8;
3894 int bit = pos&7;
3895 bitmap[byte] |= 1<<bit;
3896 }
3897
3898 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3899 void bitmapClearBit(unsigned char *bitmap, int pos) {
3900 off_t byte = pos/8;
3901 int bit = pos&7;
3902 bitmap[byte] &= ~(1<<bit);
3903 }
3904
3905 /* Return non-zero if there is at least one master with slaves in the cluster.
3906 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3907 * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3908 int clusterMastersHaveSlaves(void) {
3909 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3910 dictEntry *de;
3911 int slaves = 0;
3912 while((de = dictNext(di)) != NULL) {
3913 clusterNode *node = dictGetVal(de);
3914
3915 if (nodeIsSlave(node)) continue;
3916 slaves += node->numslaves;
3917 }
3918 dictReleaseIterator(di);
3919 return slaves != 0;
3920 }
3921
3922 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3923 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3924 int old = bitmapTestBit(n->slots,slot);
3925 bitmapSetBit(n->slots,slot);
3926 if (!old) {
3927 n->numslots++;
3928 /* When a master gets its first slot, even if it has no slaves,
3929 * it gets flagged with MIGRATE_TO, that is, the master is a valid
3930 * target for replicas migration, if and only if at least one of
3931 * the other masters has slaves right now.
3932 *
3933 * Normally masters are valid targets of replica migration if:
3934 * 1. The used to have slaves (but no longer have).
3935 * 2. They are slaves failing over a master that used to have slaves.
3936 *
3937 * However new masters with slots assigned are considered valid
3938 * migration targets if the rest of the cluster is not a slave-less.
3939 *
3940 * See https://github.com/redis/redis/issues/3043 for more info. */
3941 if (n->numslots == 1 && clusterMastersHaveSlaves())
3942 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3943 }
3944 return old;
3945 }
3946
3947 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3948 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3949 int old = bitmapTestBit(n->slots,slot);
3950 bitmapClearBit(n->slots,slot);
3951 if (old) n->numslots--;
3952 return old;
3953 }
3954
3955 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3956 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3957 return bitmapTestBit(n->slots,slot);
3958 }
3959
3960 /* Add the specified slot to the list of slots that node 'n' will
3961 * serve. Return C_OK if the operation ended with success.
3962 * If the slot is already assigned to another instance this is considered
3963 * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3964 int clusterAddSlot(clusterNode *n, int slot) {
3965 if (server.cluster->slots[slot]) return C_ERR;
3966 clusterNodeSetSlotBit(n,slot);
3967 server.cluster->slots[slot] = n;
3968 return C_OK;
3969 }
3970
3971 /* Delete the specified slot marking it as unassigned.
3972 * Returns C_OK if the slot was assigned, otherwise if the slot was
3973 * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3974 int clusterDelSlot(int slot) {
3975 clusterNode *n = server.cluster->slots[slot];
3976
3977 if (!n) return C_ERR;
3978 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3979 server.cluster->slots[slot] = NULL;
3980 return C_OK;
3981 }
3982
3983 /* Delete all the slots associated with the specified node.
3984 * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3985 int clusterDelNodeSlots(clusterNode *node) {
3986 int deleted = 0, j;
3987
3988 for (j = 0; j < CLUSTER_SLOTS; j++) {
3989 if (clusterNodeGetSlotBit(node,j)) {
3990 clusterDelSlot(j);
3991 deleted++;
3992 }
3993 }
3994 return deleted;
3995 }
3996
3997 /* Clear the migrating / importing state for all the slots.
3998 * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3999 void clusterCloseAllSlots(void) {
4000 memset(server.cluster->migrating_slots_to,0,
4001 sizeof(server.cluster->migrating_slots_to));
4002 memset(server.cluster->importing_slots_from,0,
4003 sizeof(server.cluster->importing_slots_from));
4004 }
4005
4006 /* -----------------------------------------------------------------------------
4007 * Cluster state evaluation function
4008 * -------------------------------------------------------------------------- */
4009
4010 /* The following are defines that are only used in the evaluation function
4011 * and are based on heuristics. Actually the main point about the rejoin and
4012 * writable delay is that they should be a few orders of magnitude larger
4013 * than the network latency. */
4014 #define CLUSTER_MAX_REJOIN_DELAY 5000
4015 #define CLUSTER_MIN_REJOIN_DELAY 500
4016 #define CLUSTER_WRITABLE_DELAY 2000
4017
clusterUpdateState(void)4018 void clusterUpdateState(void) {
4019 int j, new_state;
4020 int reachable_masters = 0;
4021 static mstime_t among_minority_time;
4022 static mstime_t first_call_time = 0;
4023
4024 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
4025
4026 /* If this is a master node, wait some time before turning the state
4027 * into OK, since it is not a good idea to rejoin the cluster as a writable
4028 * master, after a reboot, without giving the cluster a chance to
4029 * reconfigure this node. Note that the delay is calculated starting from
4030 * the first call to this function and not since the server start, in order
4031 * to not count the DB loading time. */
4032 if (first_call_time == 0) first_call_time = mstime();
4033 if (nodeIsMaster(myself) &&
4034 server.cluster->state == CLUSTER_FAIL &&
4035 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
4036
4037 /* Start assuming the state is OK. We'll turn it into FAIL if there
4038 * are the right conditions. */
4039 new_state = CLUSTER_OK;
4040
4041 /* Check if all the slots are covered. */
4042 if (server.cluster_require_full_coverage) {
4043 for (j = 0; j < CLUSTER_SLOTS; j++) {
4044 if (server.cluster->slots[j] == NULL ||
4045 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
4046 {
4047 new_state = CLUSTER_FAIL;
4048 break;
4049 }
4050 }
4051 }
4052
4053 /* Compute the cluster size, that is the number of master nodes
4054 * serving at least a single slot.
4055 *
4056 * At the same time count the number of reachable masters having
4057 * at least one slot. */
4058 {
4059 dictIterator *di;
4060 dictEntry *de;
4061
4062 server.cluster->size = 0;
4063 di = dictGetSafeIterator(server.cluster->nodes);
4064 while((de = dictNext(di)) != NULL) {
4065 clusterNode *node = dictGetVal(de);
4066
4067 if (nodeIsMaster(node) && node->numslots) {
4068 server.cluster->size++;
4069 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
4070 reachable_masters++;
4071 }
4072 }
4073 dictReleaseIterator(di);
4074 }
4075
4076 /* If we are in a minority partition, change the cluster state
4077 * to FAIL. */
4078 {
4079 int needed_quorum = (server.cluster->size / 2) + 1;
4080
4081 if (reachable_masters < needed_quorum) {
4082 new_state = CLUSTER_FAIL;
4083 among_minority_time = mstime();
4084 }
4085 }
4086
4087 /* Log a state change */
4088 if (new_state != server.cluster->state) {
4089 mstime_t rejoin_delay = server.cluster_node_timeout;
4090
4091 /* If the instance is a master and was partitioned away with the
4092 * minority, don't let it accept queries for some time after the
4093 * partition heals, to make sure there is enough time to receive
4094 * a configuration update. */
4095 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
4096 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
4097 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
4098 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
4099
4100 if (new_state == CLUSTER_OK &&
4101 nodeIsMaster(myself) &&
4102 mstime() - among_minority_time < rejoin_delay)
4103 {
4104 return;
4105 }
4106
4107 /* Change the state and log the event. */
4108 serverLog(LL_WARNING,"Cluster state changed: %s",
4109 new_state == CLUSTER_OK ? "ok" : "fail");
4110 server.cluster->state = new_state;
4111 }
4112 }
4113
4114 /* This function is called after the node startup in order to verify that data
4115 * loaded from disk is in agreement with the cluster configuration:
4116 *
4117 * 1) If we find keys about hash slots we have no responsibility for, the
4118 * following happens:
4119 * A) If no other node is in charge according to the current cluster
4120 * configuration, we add these slots to our node.
4121 * B) If according to our config other nodes are already in charge for
4122 * this slots, we set the slots as IMPORTING from our point of view
4123 * in order to justify we have those slots, and in order to make
4124 * redis-cli aware of the issue, so that it can try to fix it.
4125 * 2) If we find data in a DB different than DB0 we return C_ERR to
4126 * signal the caller it should quit the server with an error message
4127 * or take other actions.
4128 *
4129 * The function always returns C_OK even if it will try to correct
4130 * the error described in "1". However if data is found in DB different
4131 * from DB0, C_ERR is returned.
4132 *
4133 * The function also uses the logging facility in order to warn the user
4134 * about desynchronizations between the data we have in memory and the
4135 * cluster configuration. */
verifyClusterConfigWithData(void)4136 int verifyClusterConfigWithData(void) {
4137 int j;
4138 int update_config = 0;
4139
4140 /* Return ASAP if a module disabled cluster redirections. In that case
4141 * every master can store keys about every possible hash slot. */
4142 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
4143 return C_OK;
4144
4145 /* If this node is a slave, don't perform the check at all as we
4146 * completely depend on the replication stream. */
4147 if (nodeIsSlave(myself)) return C_OK;
4148
4149 /* Make sure we only have keys in DB0. */
4150 for (j = 1; j < server.dbnum; j++) {
4151 if (dictSize(server.db[j].dict)) return C_ERR;
4152 }
4153
4154 /* Check that all the slots we see populated memory have a corresponding
4155 * entry in the cluster table. Otherwise fix the table. */
4156 for (j = 0; j < CLUSTER_SLOTS; j++) {
4157 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
4158 /* Check if we are assigned to this slot or if we are importing it.
4159 * In both cases check the next slot as the configuration makes
4160 * sense. */
4161 if (server.cluster->slots[j] == myself ||
4162 server.cluster->importing_slots_from[j] != NULL) continue;
4163
4164 /* If we are here data and cluster config don't agree, and we have
4165 * slot 'j' populated even if we are not importing it, nor we are
4166 * assigned to this slot. Fix this condition. */
4167
4168 update_config++;
4169 /* Case A: slot is unassigned. Take responsibility for it. */
4170 if (server.cluster->slots[j] == NULL) {
4171 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
4172 "Taking responsibility for it.",j);
4173 clusterAddSlot(myself,j);
4174 } else {
4175 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
4176 "assigned to another node. "
4177 "Setting it to importing state.",j);
4178 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
4179 }
4180 }
4181 if (update_config) clusterSaveConfigOrDie(1);
4182 return C_OK;
4183 }
4184
4185 /* -----------------------------------------------------------------------------
4186 * SLAVE nodes handling
4187 * -------------------------------------------------------------------------- */
4188
4189 /* Set the specified node 'n' as master for this node.
4190 * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)4191 void clusterSetMaster(clusterNode *n) {
4192 serverAssert(n != myself);
4193 serverAssert(myself->numslots == 0);
4194
4195 if (nodeIsMaster(myself)) {
4196 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
4197 myself->flags |= CLUSTER_NODE_SLAVE;
4198 clusterCloseAllSlots();
4199 } else {
4200 if (myself->slaveof)
4201 clusterNodeRemoveSlave(myself->slaveof,myself);
4202 }
4203 myself->slaveof = n;
4204 clusterNodeAddSlave(n,myself);
4205 replicationSetMaster(n->ip, n->port);
4206 resetManualFailover();
4207 }
4208
4209 /* -----------------------------------------------------------------------------
4210 * Nodes to string representation functions.
4211 * -------------------------------------------------------------------------- */
4212
4213 struct redisNodeFlags {
4214 uint16_t flag;
4215 char *name;
4216 };
4217
4218 static struct redisNodeFlags redisNodeFlagsTable[] = {
4219 {CLUSTER_NODE_MYSELF, "myself,"},
4220 {CLUSTER_NODE_MASTER, "master,"},
4221 {CLUSTER_NODE_SLAVE, "slave,"},
4222 {CLUSTER_NODE_PFAIL, "fail?,"},
4223 {CLUSTER_NODE_FAIL, "fail,"},
4224 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
4225 {CLUSTER_NODE_NOADDR, "noaddr,"},
4226 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
4227 };
4228
4229 /* Concatenate the comma separated list of node flags to the given SDS
4230 * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4231 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4232 size_t orig_len = sdslen(ci);
4233 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4234 for (i = 0; i < size; i++) {
4235 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4236 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4237 }
4238 /* If no flag was added, add the "noflags" special flag. */
4239 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4240 sdsIncrLen(ci,-1); /* Remove trailing comma. */
4241 return ci;
4242 }
4243
4244 /* Generate a csv-alike representation of the specified cluster node.
4245 * See clusterGenNodesDescription() top comment for more information.
4246 *
4247 * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node,int use_pport)4248 sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
4249 int j, start;
4250 sds ci;
4251 int port = use_pport && node->pport ? node->pport : node->port;
4252
4253 /* Node coordinates */
4254 ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
4255 ci = sdscatfmt(ci," %s:%i@%i ",
4256 node->ip,
4257 port,
4258 node->cport);
4259
4260 /* Flags */
4261 ci = representClusterNodeFlags(ci, node->flags);
4262
4263 /* Slave of... or just "-" */
4264 ci = sdscatlen(ci," ",1);
4265 if (node->slaveof)
4266 ci = sdscatlen(ci,node->slaveof->name,CLUSTER_NAMELEN);
4267 else
4268 ci = sdscatlen(ci,"-",1);
4269
4270 unsigned long long nodeEpoch = node->configEpoch;
4271 if (nodeIsSlave(node) && node->slaveof) {
4272 nodeEpoch = node->slaveof->configEpoch;
4273 }
4274 /* Latency from the POV of this node, config epoch, link status */
4275 ci = sdscatfmt(ci," %I %I %U %s",
4276 (long long) node->ping_sent,
4277 (long long) node->pong_received,
4278 nodeEpoch,
4279 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4280 "connected" : "disconnected");
4281
4282 /* Slots served by this instance. If we already have slots info,
4283 * append it directly, otherwise, generate slots only if it has. */
4284 if (node->slots_info) {
4285 ci = sdscatsds(ci, node->slots_info);
4286 } else if (node->numslots > 0) {
4287 start = -1;
4288 for (j = 0; j < CLUSTER_SLOTS; j++) {
4289 int bit;
4290
4291 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4292 if (start == -1) start = j;
4293 }
4294 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4295 if (bit && j == CLUSTER_SLOTS-1) j++;
4296
4297 if (start == j-1) {
4298 ci = sdscatfmt(ci," %i",start);
4299 } else {
4300 ci = sdscatfmt(ci," %i-%i",start,j-1);
4301 }
4302 start = -1;
4303 }
4304 }
4305 }
4306
4307 /* Just for MYSELF node we also dump info about slots that
4308 * we are migrating to other instances or importing from other
4309 * instances. */
4310 if (node->flags & CLUSTER_NODE_MYSELF) {
4311 for (j = 0; j < CLUSTER_SLOTS; j++) {
4312 if (server.cluster->migrating_slots_to[j]) {
4313 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4314 server.cluster->migrating_slots_to[j]->name);
4315 } else if (server.cluster->importing_slots_from[j]) {
4316 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4317 server.cluster->importing_slots_from[j]->name);
4318 }
4319 }
4320 }
4321 return ci;
4322 }
4323
4324 /* Generate the slot topology for all nodes and store the string representation
4325 * in the slots_info struct on the node. This is used to improve the efficiency
4326 * of clusterGenNodesDescription() because it removes looping of the slot space
4327 * for generating the slot info for each node individually. */
clusterGenNodesSlotsInfo(int filter)4328 void clusterGenNodesSlotsInfo(int filter) {
4329 clusterNode *n = NULL;
4330 int start = -1;
4331
4332 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
4333 /* Find start node and slot id. */
4334 if (n == NULL) {
4335 if (i == CLUSTER_SLOTS) break;
4336 n = server.cluster->slots[i];
4337 start = i;
4338 continue;
4339 }
4340
4341 /* Generate slots info when occur different node with start
4342 * or end of slot. */
4343 if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
4344 if (!(n->flags & filter)) {
4345 if (n->slots_info == NULL) n->slots_info = sdsempty();
4346 if (start == i-1) {
4347 n->slots_info = sdscatfmt(n->slots_info," %i",start);
4348 } else {
4349 n->slots_info = sdscatfmt(n->slots_info," %i-%i",start,i-1);
4350 }
4351 }
4352 if (i == CLUSTER_SLOTS) break;
4353 n = server.cluster->slots[i];
4354 start = i;
4355 }
4356 }
4357 }
4358
4359 /* Generate a csv-alike representation of the nodes we are aware of,
4360 * including the "myself" node, and return an SDS string containing the
4361 * representation (it is up to the caller to free it).
4362 *
4363 * All the nodes matching at least one of the node flags specified in
4364 * "filter" are excluded from the output, so using zero as a filter will
4365 * include all the known nodes in the representation, including nodes in
4366 * the HANDSHAKE state.
4367 *
4368 * Setting use_pport to 1 in a TLS cluster makes the result contain the
4369 * plaintext client port rather then the TLS client port of each node.
4370 *
4371 * The representation obtained using this function is used for the output
4372 * of the CLUSTER NODES function, and as format for the cluster
4373 * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter,int use_pport)4374 sds clusterGenNodesDescription(int filter, int use_pport) {
4375 sds ci = sdsempty(), ni;
4376 dictIterator *di;
4377 dictEntry *de;
4378
4379 /* Generate all nodes slots info firstly. */
4380 clusterGenNodesSlotsInfo(filter);
4381
4382 di = dictGetSafeIterator(server.cluster->nodes);
4383 while((de = dictNext(di)) != NULL) {
4384 clusterNode *node = dictGetVal(de);
4385
4386 if (node->flags & filter) continue;
4387 ni = clusterGenNodeDescription(node, use_pport);
4388 ci = sdscatsds(ci,ni);
4389 sdsfree(ni);
4390 ci = sdscatlen(ci,"\n",1);
4391
4392 /* Release slots info. */
4393 if (node->slots_info) {
4394 sdsfree(node->slots_info);
4395 node->slots_info = NULL;
4396 }
4397 }
4398 dictReleaseIterator(di);
4399 return ci;
4400 }
4401
4402 /* -----------------------------------------------------------------------------
4403 * CLUSTER command
4404 * -------------------------------------------------------------------------- */
4405
clusterGetMessageTypeString(int type)4406 const char *clusterGetMessageTypeString(int type) {
4407 switch(type) {
4408 case CLUSTERMSG_TYPE_PING: return "ping";
4409 case CLUSTERMSG_TYPE_PONG: return "pong";
4410 case CLUSTERMSG_TYPE_MEET: return "meet";
4411 case CLUSTERMSG_TYPE_FAIL: return "fail";
4412 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4413 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4414 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4415 case CLUSTERMSG_TYPE_UPDATE: return "update";
4416 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4417 case CLUSTERMSG_TYPE_MODULE: return "module";
4418 }
4419 return "unknown";
4420 }
4421
getSlotOrReply(client * c,robj * o)4422 int getSlotOrReply(client *c, robj *o) {
4423 long long slot;
4424
4425 if (getLongLongFromObject(o,&slot) != C_OK ||
4426 slot < 0 || slot >= CLUSTER_SLOTS)
4427 {
4428 addReplyError(c,"Invalid or out of range slot");
4429 return -1;
4430 }
4431 return (int) slot;
4432 }
4433
4434 /* Returns an indication if the replica node is fully available
4435 * and should be listed in CLUSTER SLOTS response.
4436 * Returns 1 for available nodes, 0 for nodes that have
4437 * not finished their initial sync, in failed state, or are
4438 * otherwise considered not available to serve read commands. */
isReplicaAvailable(clusterNode * node)4439 static int isReplicaAvailable(clusterNode *node) {
4440 if (nodeFailed(node)) {
4441 return 0;
4442 }
4443 long long repl_offset = node->repl_offset;
4444 if (node->flags & CLUSTER_NODE_MYSELF) {
4445 /* Nodes do not update their own information
4446 * in the cluster node list. */
4447 repl_offset = replicationGetSlaveOffset();
4448 }
4449 return (repl_offset != 0);
4450 }
4451
checkSlotAssignmentsOrReply(client * c,unsigned char * slots,int del,int start_slot,int end_slot)4452 int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) {
4453 int slot;
4454 for (slot = start_slot; slot <= end_slot; slot++) {
4455 if (del && server.cluster->slots[slot] == NULL) {
4456 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4457 return C_ERR;
4458 } else if (!del && server.cluster->slots[slot]) {
4459 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4460 return C_ERR;
4461 }
4462 if (slots[slot]++ == 1) {
4463 addReplyErrorFormat(c,"Slot %d specified multiple times",(int)slot);
4464 return C_ERR;
4465 }
4466 }
4467 return C_OK;
4468 }
4469
clusterUpdateSlots(client * c,unsigned char * slots,int del)4470 void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
4471 int j;
4472 for (j = 0; j < CLUSTER_SLOTS; j++) {
4473 if (slots[j]) {
4474 int retval;
4475
4476 /* If this slot was set as importing we can clear this
4477 * state as now we are the real owner of the slot. */
4478 if (server.cluster->importing_slots_from[j])
4479 server.cluster->importing_slots_from[j] = NULL;
4480
4481 retval = del ? clusterDelSlot(j) :
4482 clusterAddSlot(myself,j);
4483 serverAssertWithInfo(c,NULL,retval == C_OK);
4484 }
4485 }
4486 }
4487
addNodeReplyForClusterSlot(client * c,clusterNode * node,int start_slot,int end_slot)4488 void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
4489 int i, nested_elements = 3; /* slots (2) + master addr (1) */
4490 void *nested_replylen = addReplyDeferredLen(c);
4491 addReplyLongLong(c, start_slot);
4492 addReplyLongLong(c, end_slot);
4493 addReplyArrayLen(c, 3);
4494 addReplyBulkCString(c, node->ip);
4495 /* Report non-TLS ports to non-TLS client in TLS cluster if available. */
4496 int use_pport = (server.tls_cluster &&
4497 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
4498 addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
4499 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4500
4501 /* Remaining nodes in reply are replicas for slot range */
4502 for (i = 0; i < node->numslaves; i++) {
4503 /* This loop is copy/pasted from clusterGenNodeDescription()
4504 * with modifications for per-slot node aggregation. */
4505 if (!isReplicaAvailable(node->slaves[i])) continue;
4506 addReplyArrayLen(c, 3);
4507 addReplyBulkCString(c, node->slaves[i]->ip);
4508 /* Report slave's non-TLS port to non-TLS client in TLS cluster */
4509 addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
4510 node->slaves[i]->pport :
4511 node->slaves[i]->port));
4512 addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4513 nested_elements++;
4514 }
4515 setDeferredArrayLen(c, nested_replylen, nested_elements);
4516 }
4517
clusterReplyMultiBulkSlots(client * c)4518 void clusterReplyMultiBulkSlots(client * c) {
4519 /* Format: 1) 1) start slot
4520 * 2) end slot
4521 * 3) 1) master IP
4522 * 2) master port
4523 * 3) node ID
4524 * 4) 1) replica IP
4525 * 2) replica port
4526 * 3) node ID
4527 * ... continued until done
4528 */
4529 clusterNode *n = NULL;
4530 int num_masters = 0, start = -1;
4531 void *slot_replylen = addReplyDeferredLen(c);
4532
4533 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
4534 /* Find start node and slot id. */
4535 if (n == NULL) {
4536 if (i == CLUSTER_SLOTS) break;
4537 n = server.cluster->slots[i];
4538 start = i;
4539 continue;
4540 }
4541
4542 /* Add cluster slots info when occur different node with start
4543 * or end of slot. */
4544 if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
4545 addNodeReplyForClusterSlot(c, n, start, i-1);
4546 num_masters++;
4547 if (i == CLUSTER_SLOTS) break;
4548 n = server.cluster->slots[i];
4549 start = i;
4550 }
4551 }
4552 setDeferredArrayLen(c, slot_replylen, num_masters);
4553 }
4554
clusterCommand(client * c)4555 void clusterCommand(client *c) {
4556 if (server.cluster_enabled == 0) {
4557 addReplyError(c,"This instance has cluster support disabled");
4558 return;
4559 }
4560
4561 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4562 const char *help[] = {
4563 "ADDSLOTS <slot> [<slot> ...]",
4564 " Assign slots to current node.",
4565 "ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
4566 " Assign slots which are between <start-slot> and <end-slot> to current node.",
4567 "BUMPEPOCH",
4568 " Advance the cluster config epoch.",
4569 "COUNT-FAILURE-REPORTS <node-id>",
4570 " Return number of failure reports for <node-id>.",
4571 "COUNTKEYSINSLOT <slot>",
4572 " Return the number of keys in <slot>.",
4573 "DELSLOTS <slot> [<slot> ...]",
4574 " Delete slots information from current node.",
4575 "DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
4576 " Delete slots information which are between <start-slot> and <end-slot> from current node.",
4577 "FAILOVER [FORCE|TAKEOVER]",
4578 " Promote current replica node to being a master.",
4579 "FORGET <node-id>",
4580 " Remove a node from the cluster.",
4581 "GETKEYSINSLOT <slot> <count>",
4582 " Return key names stored by current node in a slot.",
4583 "FLUSHSLOTS",
4584 " Delete current node own slots information.",
4585 "INFO",
4586 " Return information about the cluster.",
4587 "KEYSLOT <key>",
4588 " Return the hash slot for <key>.",
4589 "MEET <ip> <port> [<bus-port>]",
4590 " Connect nodes into a working cluster.",
4591 "MYID",
4592 " Return the node id.",
4593 "NODES",
4594 " Return cluster configuration seen by node. Output format:",
4595 " <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
4596 "REPLICATE <node-id>",
4597 " Configure current node as replica to <node-id>.",
4598 "RESET [HARD|SOFT]",
4599 " Reset current node (default: soft).",
4600 "SET-CONFIG-EPOCH <epoch>",
4601 " Set config epoch of current node.",
4602 "SETSLOT <slot> (IMPORTING|MIGRATING|STABLE|NODE <node-id>)",
4603 " Set slot state.",
4604 "REPLICAS <node-id>",
4605 " Return <node-id> replicas.",
4606 "SAVECONFIG",
4607 " Force saving cluster configuration on disk.",
4608 "SLOTS",
4609 " Return information about slots range mappings. Each range is made of:",
4610 " start, end, master and replicas IP addresses, ports and ids",
4611 NULL
4612 };
4613 addReplyHelp(c, help);
4614 } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4615 /* CLUSTER MEET <ip> <port> [cport] */
4616 long long port, cport;
4617
4618 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4619 addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4620 (char*)c->argv[3]->ptr);
4621 return;
4622 }
4623
4624 if (c->argc == 5) {
4625 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4626 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4627 (char*)c->argv[4]->ptr);
4628 return;
4629 }
4630 } else {
4631 cport = port + CLUSTER_PORT_INCR;
4632 }
4633
4634 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4635 errno == EINVAL)
4636 {
4637 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4638 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4639 } else {
4640 addReply(c,shared.ok);
4641 }
4642 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4643 /* CLUSTER NODES */
4644 /* Report plaintext ports, only if cluster is TLS but client is known to
4645 * be non-TLS). */
4646 int use_pport = (server.tls_cluster &&
4647 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
4648 sds nodes = clusterGenNodesDescription(0, use_pport);
4649 addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
4650 sdsfree(nodes);
4651 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4652 /* CLUSTER MYID */
4653 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4654 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4655 /* CLUSTER SLOTS */
4656 clusterReplyMultiBulkSlots(c);
4657 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4658 /* CLUSTER FLUSHSLOTS */
4659 if (dictSize(server.db[0].dict) != 0) {
4660 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4661 return;
4662 }
4663 clusterDelNodeSlots(myself);
4664 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4665 addReply(c,shared.ok);
4666 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4667 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4668 {
4669 /* CLUSTER ADDSLOTS <slot> [slot] ... */
4670 /* CLUSTER DELSLOTS <slot> [slot] ... */
4671 int j, slot;
4672 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4673 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4674
4675 memset(slots,0,CLUSTER_SLOTS);
4676 /* Check that all the arguments are parseable.*/
4677 for (j = 2; j < c->argc; j++) {
4678 if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
4679 zfree(slots);
4680 return;
4681 }
4682 }
4683 /* Check that the slots are not already busy. */
4684 for (j = 2; j < c->argc; j++) {
4685 slot = getSlotOrReply(c,c->argv[j]);
4686 if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) {
4687 zfree(slots);
4688 return;
4689 }
4690 }
4691 clusterUpdateSlots(c, slots, del);
4692 zfree(slots);
4693 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4694 addReply(c,shared.ok);
4695 } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") ||
4696 !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) {
4697 if (c->argc % 2 == 1) {
4698 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
4699 c->cmd->name);
4700 return;
4701 }
4702 /* CLUSTER ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
4703 /* CLUSTER DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
4704 int j, startslot, endslot;
4705 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4706 int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange");
4707
4708 memset(slots,0,CLUSTER_SLOTS);
4709 /* Check that all the arguments are parseable and that all the
4710 * slots are not already busy. */
4711 for (j = 2; j < c->argc; j += 2) {
4712 if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
4713 zfree(slots);
4714 return;
4715 }
4716 if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) {
4717 zfree(slots);
4718 return;
4719 }
4720 if (startslot > endslot) {
4721 addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot);
4722 zfree(slots);
4723 return;
4724 }
4725
4726 if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) {
4727 zfree(slots);
4728 return;
4729 }
4730 }
4731 clusterUpdateSlots(c, slots, del);
4732 zfree(slots);
4733 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4734 addReply(c,shared.ok);
4735 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4736 /* SETSLOT 10 MIGRATING <node ID> */
4737 /* SETSLOT 10 IMPORTING <node ID> */
4738 /* SETSLOT 10 STABLE */
4739 /* SETSLOT 10 NODE <node ID> */
4740 int slot;
4741 clusterNode *n;
4742
4743 if (nodeIsSlave(myself)) {
4744 addReplyError(c,"Please use SETSLOT only with masters.");
4745 return;
4746 }
4747
4748 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4749
4750 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4751 if (server.cluster->slots[slot] != myself) {
4752 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4753 return;
4754 }
4755 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4756 addReplyErrorFormat(c,"I don't know about node %s",
4757 (char*)c->argv[4]->ptr);
4758 return;
4759 }
4760 if (nodeIsSlave(n)) {
4761 addReplyError(c,"Target node is not a master");
4762 return;
4763 }
4764 server.cluster->migrating_slots_to[slot] = n;
4765 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4766 if (server.cluster->slots[slot] == myself) {
4767 addReplyErrorFormat(c,
4768 "I'm already the owner of hash slot %u",slot);
4769 return;
4770 }
4771 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4772 addReplyErrorFormat(c,"I don't know about node %s",
4773 (char*)c->argv[4]->ptr);
4774 return;
4775 }
4776 if (nodeIsSlave(n)) {
4777 addReplyError(c,"Target node is not a master");
4778 return;
4779 }
4780 server.cluster->importing_slots_from[slot] = n;
4781 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4782 /* CLUSTER SETSLOT <SLOT> STABLE */
4783 server.cluster->importing_slots_from[slot] = NULL;
4784 server.cluster->migrating_slots_to[slot] = NULL;
4785 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4786 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4787 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4788
4789 if (!n) {
4790 addReplyErrorFormat(c,"Unknown node %s",
4791 (char*)c->argv[4]->ptr);
4792 return;
4793 }
4794 /* If this hash slot was served by 'myself' before to switch
4795 * make sure there are no longer local keys for this hash slot. */
4796 if (server.cluster->slots[slot] == myself && n != myself) {
4797 if (countKeysInSlot(slot) != 0) {
4798 addReplyErrorFormat(c,
4799 "Can't assign hashslot %d to a different node "
4800 "while I still hold keys for this hash slot.", slot);
4801 return;
4802 }
4803 }
4804 /* If this slot is in migrating status but we have no keys
4805 * for it assigning the slot to another node will clear
4806 * the migrating status. */
4807 if (countKeysInSlot(slot) == 0 &&
4808 server.cluster->migrating_slots_to[slot])
4809 server.cluster->migrating_slots_to[slot] = NULL;
4810
4811 clusterDelSlot(slot);
4812 clusterAddSlot(n,slot);
4813
4814 /* If this node was importing this slot, assigning the slot to
4815 * itself also clears the importing status. */
4816 if (n == myself &&
4817 server.cluster->importing_slots_from[slot])
4818 {
4819 /* This slot was manually migrated, set this node configEpoch
4820 * to a new epoch so that the new version can be propagated
4821 * by the cluster.
4822 *
4823 * Note that if this ever results in a collision with another
4824 * node getting the same configEpoch, for example because a
4825 * failover happens at the same time we close the slot, the
4826 * configEpoch collision resolution will fix it assigning
4827 * a different epoch to each node. */
4828 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4829 serverLog(LL_WARNING,
4830 "configEpoch updated after importing slot %d", slot);
4831 }
4832 server.cluster->importing_slots_from[slot] = NULL;
4833 /* After importing this slot, let the other nodes know as
4834 * soon as possible. */
4835 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
4836 }
4837 } else {
4838 addReplyError(c,
4839 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4840 return;
4841 }
4842 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4843 addReply(c,shared.ok);
4844 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4845 /* CLUSTER BUMPEPOCH */
4846 int retval = clusterBumpConfigEpochWithoutConsensus();
4847 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4848 (retval == C_OK) ? "BUMPED" : "STILL",
4849 (unsigned long long) myself->configEpoch);
4850 addReplySds(c,reply);
4851 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4852 /* CLUSTER INFO */
4853 char *statestr[] = {"ok","fail","needhelp"};
4854 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4855 uint64_t myepoch;
4856 int j;
4857
4858 for (j = 0; j < CLUSTER_SLOTS; j++) {
4859 clusterNode *n = server.cluster->slots[j];
4860
4861 if (n == NULL) continue;
4862 slots_assigned++;
4863 if (nodeFailed(n)) {
4864 slots_fail++;
4865 } else if (nodeTimedOut(n)) {
4866 slots_pfail++;
4867 } else {
4868 slots_ok++;
4869 }
4870 }
4871
4872 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4873 myself->slaveof->configEpoch : myself->configEpoch;
4874
4875 sds info = sdscatprintf(sdsempty(),
4876 "cluster_state:%s\r\n"
4877 "cluster_slots_assigned:%d\r\n"
4878 "cluster_slots_ok:%d\r\n"
4879 "cluster_slots_pfail:%d\r\n"
4880 "cluster_slots_fail:%d\r\n"
4881 "cluster_known_nodes:%lu\r\n"
4882 "cluster_size:%d\r\n"
4883 "cluster_current_epoch:%llu\r\n"
4884 "cluster_my_epoch:%llu\r\n"
4885 , statestr[server.cluster->state],
4886 slots_assigned,
4887 slots_ok,
4888 slots_pfail,
4889 slots_fail,
4890 dictSize(server.cluster->nodes),
4891 server.cluster->size,
4892 (unsigned long long) server.cluster->currentEpoch,
4893 (unsigned long long) myepoch
4894 );
4895
4896 /* Show stats about messages sent and received. */
4897 long long tot_msg_sent = 0;
4898 long long tot_msg_received = 0;
4899
4900 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4901 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4902 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4903 info = sdscatprintf(info,
4904 "cluster_stats_messages_%s_sent:%lld\r\n",
4905 clusterGetMessageTypeString(i),
4906 server.cluster->stats_bus_messages_sent[i]);
4907 }
4908 info = sdscatprintf(info,
4909 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4910
4911 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4912 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4913 tot_msg_received += server.cluster->stats_bus_messages_received[i];
4914 info = sdscatprintf(info,
4915 "cluster_stats_messages_%s_received:%lld\r\n",
4916 clusterGetMessageTypeString(i),
4917 server.cluster->stats_bus_messages_received[i]);
4918 }
4919 info = sdscatprintf(info,
4920 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4921
4922 /* Produce the reply protocol. */
4923 addReplyVerbatim(c,info,sdslen(info),"txt");
4924 sdsfree(info);
4925 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4926 int retval = clusterSaveConfig(1);
4927
4928 if (retval == 0)
4929 addReply(c,shared.ok);
4930 else
4931 addReplyErrorFormat(c,"error saving the cluster node config: %s",
4932 strerror(errno));
4933 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4934 /* CLUSTER KEYSLOT <key> */
4935 sds key = c->argv[2]->ptr;
4936
4937 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4938 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4939 /* CLUSTER COUNTKEYSINSLOT <slot> */
4940 long long slot;
4941
4942 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4943 return;
4944 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4945 addReplyError(c,"Invalid slot");
4946 return;
4947 }
4948 addReplyLongLong(c,countKeysInSlot(slot));
4949 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4950 /* CLUSTER GETKEYSINSLOT <slot> <count> */
4951 long long maxkeys, slot;
4952
4953 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4954 return;
4955 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4956 != C_OK)
4957 return;
4958 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4959 addReplyError(c,"Invalid slot or number of keys");
4960 return;
4961 }
4962
4963 unsigned int keys_in_slot = countKeysInSlot(slot);
4964 unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
4965 addReplyArrayLen(c,numkeys);
4966 dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
4967 for (unsigned int j = 0; j < numkeys; j++) {
4968 serverAssert(de != NULL);
4969 sds sdskey = dictGetKey(de);
4970 addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
4971 de = dictEntryNextInSlot(de);
4972 }
4973 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4974 /* CLUSTER FORGET <NODE ID> */
4975 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4976
4977 if (!n) {
4978 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4979 return;
4980 } else if (n == myself) {
4981 addReplyError(c,"I tried hard but I can't forget myself...");
4982 return;
4983 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4984 addReplyError(c,"Can't forget my master!");
4985 return;
4986 }
4987 clusterBlacklistAddNode(n);
4988 clusterDelNode(n);
4989 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4990 CLUSTER_TODO_SAVE_CONFIG);
4991 addReply(c,shared.ok);
4992 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4993 /* CLUSTER REPLICATE <NODE ID> */
4994 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4995
4996 /* Lookup the specified node in our table. */
4997 if (!n) {
4998 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4999 return;
5000 }
5001
5002 /* I can't replicate myself. */
5003 if (n == myself) {
5004 addReplyError(c,"Can't replicate myself");
5005 return;
5006 }
5007
5008 /* Can't replicate a slave. */
5009 if (nodeIsSlave(n)) {
5010 addReplyError(c,"I can only replicate a master, not a replica.");
5011 return;
5012 }
5013
5014 /* If the instance is currently a master, it should have no assigned
5015 * slots nor keys to accept to replicate some other node.
5016 * Slaves can switch to another master without issues. */
5017 if (nodeIsMaster(myself) &&
5018 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
5019 addReplyError(c,
5020 "To set a master the node must be empty and "
5021 "without assigned slots.");
5022 return;
5023 }
5024
5025 /* Set the master. */
5026 clusterSetMaster(n);
5027 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5028 addReply(c,shared.ok);
5029 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
5030 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
5031 /* CLUSTER SLAVES <NODE ID> */
5032 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
5033 int j;
5034
5035 /* Lookup the specified node in our table. */
5036 if (!n) {
5037 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5038 return;
5039 }
5040
5041 if (nodeIsSlave(n)) {
5042 addReplyError(c,"The specified node is not a master");
5043 return;
5044 }
5045
5046 /* Use plaintext port if cluster is TLS but client is non-TLS. */
5047 int use_pport = (server.tls_cluster &&
5048 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
5049 addReplyArrayLen(c,n->numslaves);
5050 for (j = 0; j < n->numslaves; j++) {
5051 sds ni = clusterGenNodeDescription(n->slaves[j], use_pport);
5052 addReplyBulkCString(c,ni);
5053 sdsfree(ni);
5054 }
5055 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
5056 c->argc == 3)
5057 {
5058 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
5059 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
5060
5061 if (!n) {
5062 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5063 return;
5064 } else {
5065 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
5066 }
5067 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
5068 (c->argc == 2 || c->argc == 3))
5069 {
5070 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
5071 int force = 0, takeover = 0;
5072
5073 if (c->argc == 3) {
5074 if (!strcasecmp(c->argv[2]->ptr,"force")) {
5075 force = 1;
5076 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
5077 takeover = 1;
5078 force = 1; /* Takeover also implies force. */
5079 } else {
5080 addReplyErrorObject(c,shared.syntaxerr);
5081 return;
5082 }
5083 }
5084
5085 /* Check preconditions. */
5086 if (nodeIsMaster(myself)) {
5087 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
5088 return;
5089 } else if (myself->slaveof == NULL) {
5090 addReplyError(c,"I'm a replica but my master is unknown to me");
5091 return;
5092 } else if (!force &&
5093 (nodeFailed(myself->slaveof) ||
5094 myself->slaveof->link == NULL))
5095 {
5096 addReplyError(c,"Master is down or failed, "
5097 "please use CLUSTER FAILOVER FORCE");
5098 return;
5099 }
5100 resetManualFailover();
5101 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
5102
5103 if (takeover) {
5104 /* A takeover does not perform any initial check. It just
5105 * generates a new configuration epoch for this node without
5106 * consensus, claims the master's slots, and broadcast the new
5107 * configuration. */
5108 serverLog(LL_WARNING,"Taking over the master (user request).");
5109 clusterBumpConfigEpochWithoutConsensus();
5110 clusterFailoverReplaceYourMaster();
5111 } else if (force) {
5112 /* If this is a forced failover, we don't need to talk with our
5113 * master to agree about the offset. We just failover taking over
5114 * it without coordination. */
5115 serverLog(LL_WARNING,"Forced failover user request accepted.");
5116 server.cluster->mf_can_start = 1;
5117 } else {
5118 serverLog(LL_WARNING,"Manual failover user request accepted.");
5119 clusterSendMFStart(myself->slaveof);
5120 }
5121 addReply(c,shared.ok);
5122 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
5123 {
5124 /* CLUSTER SET-CONFIG-EPOCH <epoch>
5125 *
5126 * The user is allowed to set the config epoch only when a node is
5127 * totally fresh: no config epoch, no other known node, and so forth.
5128 * This happens at cluster creation time to start with a cluster where
5129 * every node has a different node ID, without to rely on the conflicts
5130 * resolution system which is too slow when a big cluster is created. */
5131 long long epoch;
5132
5133 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
5134 return;
5135
5136 if (epoch < 0) {
5137 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
5138 } else if (dictSize(server.cluster->nodes) > 1) {
5139 addReplyError(c,"The user can assign a config epoch only when the "
5140 "node does not know any other node.");
5141 } else if (myself->configEpoch != 0) {
5142 addReplyError(c,"Node config epoch is already non-zero");
5143 } else {
5144 myself->configEpoch = epoch;
5145 serverLog(LL_WARNING,
5146 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
5147 (unsigned long long) myself->configEpoch);
5148
5149 if (server.cluster->currentEpoch < (uint64_t)epoch)
5150 server.cluster->currentEpoch = epoch;
5151 /* No need to fsync the config here since in the unlucky event
5152 * of a failure to persist the config, the conflict resolution code
5153 * will assign a unique config to this node. */
5154 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
5155 CLUSTER_TODO_SAVE_CONFIG);
5156 addReply(c,shared.ok);
5157 }
5158 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
5159 (c->argc == 2 || c->argc == 3))
5160 {
5161 /* CLUSTER RESET [SOFT|HARD] */
5162 int hard = 0;
5163
5164 /* Parse soft/hard argument. Default is soft. */
5165 if (c->argc == 3) {
5166 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
5167 hard = 1;
5168 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
5169 hard = 0;
5170 } else {
5171 addReplyErrorObject(c,shared.syntaxerr);
5172 return;
5173 }
5174 }
5175
5176 /* Slaves can be reset while containing data, but not master nodes
5177 * that must be empty. */
5178 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
5179 addReplyError(c,"CLUSTER RESET can't be called with "
5180 "master nodes containing keys");
5181 return;
5182 }
5183 clusterReset(hard);
5184 addReply(c,shared.ok);
5185 } else {
5186 addReplySubcommandSyntaxError(c);
5187 return;
5188 }
5189 }
5190
5191 /* -----------------------------------------------------------------------------
5192 * DUMP, RESTORE and MIGRATE commands
5193 * -------------------------------------------------------------------------- */
5194
5195 /* Generates a DUMP-format representation of the object 'o', adding it to the
5196 * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key,int dbid)5197 void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
5198 unsigned char buf[2];
5199 uint64_t crc;
5200
5201 /* Serialize the object in an RDB-like format. It consist of an object type
5202 * byte followed by the serialized object. This is understood by RESTORE. */
5203 rioInitWithBuffer(payload,sdsempty());
5204 serverAssert(rdbSaveObjectType(payload,o));
5205 serverAssert(rdbSaveObject(payload,o,key,dbid));
5206
5207 /* Write the footer, this is how it looks like:
5208 * ----------------+---------------------+---------------+
5209 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
5210 * ----------------+---------------------+---------------+
5211 * RDB version and CRC are both in little endian.
5212 */
5213
5214 /* RDB version */
5215 buf[0] = RDB_VERSION & 0xff;
5216 buf[1] = (RDB_VERSION >> 8) & 0xff;
5217 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
5218
5219 /* CRC64 */
5220 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
5221 sdslen(payload->io.buffer.ptr));
5222 memrev64ifbe(&crc);
5223 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
5224 }
5225
5226 /* Verify that the RDB version of the dump payload matches the one of this Redis
5227 * instance and that the checksum is ok.
5228 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
5229 * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)5230 int verifyDumpPayload(unsigned char *p, size_t len) {
5231 unsigned char *footer;
5232 uint16_t rdbver;
5233 uint64_t crc;
5234
5235 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
5236 if (len < 10) return C_ERR;
5237 footer = p+(len-10);
5238
5239 /* Verify RDB version */
5240 rdbver = (footer[1] << 8) | footer[0];
5241 if (rdbver > RDB_VERSION) return C_ERR;
5242
5243 if (server.skip_checksum_validation)
5244 return C_OK;
5245
5246 /* Verify CRC64 */
5247 crc = crc64(0,p,len-8);
5248 memrev64ifbe(&crc);
5249 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
5250 }
5251
5252 /* DUMP keyname
5253 * DUMP is actually not used by Redis Cluster but it is the obvious
5254 * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)5255 void dumpCommand(client *c) {
5256 robj *o;
5257 rio payload;
5258
5259 /* Check if the key is here. */
5260 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
5261 addReplyNull(c);
5262 return;
5263 }
5264
5265 /* Create the DUMP encoded representation. */
5266 createDumpPayload(&payload,o,c->argv[1],c->db->id);
5267
5268 /* Transfer to the client */
5269 addReplyBulkSds(c,payload.io.buffer.ptr);
5270 return;
5271 }
5272
5273 /* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
restoreCommand(client * c)5274 void restoreCommand(client *c) {
5275 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
5276 rio payload;
5277 int j, type, replace = 0, absttl = 0;
5278 robj *obj;
5279
5280 /* Parse additional options */
5281 for (j = 4; j < c->argc; j++) {
5282 int additional = c->argc-j-1;
5283 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5284 replace = 1;
5285 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
5286 absttl = 1;
5287 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
5288 lfu_freq == -1)
5289 {
5290 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
5291 != C_OK) return;
5292 if (lru_idle < 0) {
5293 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
5294 return;
5295 }
5296 lru_clock = LRU_CLOCK();
5297 j++; /* Consume additional arg. */
5298 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
5299 lru_idle == -1)
5300 {
5301 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
5302 != C_OK) return;
5303 if (lfu_freq < 0 || lfu_freq > 255) {
5304 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
5305 return;
5306 }
5307 j++; /* Consume additional arg. */
5308 } else {
5309 addReplyErrorObject(c,shared.syntaxerr);
5310 return;
5311 }
5312 }
5313
5314 /* Make sure this key does not already exist here... */
5315 robj *key = c->argv[1];
5316 if (!replace && lookupKeyWrite(c->db,key) != NULL) {
5317 addReplyErrorObject(c,shared.busykeyerr);
5318 return;
5319 }
5320
5321 /* Check if the TTL value makes sense */
5322 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
5323 return;
5324 } else if (ttl < 0) {
5325 addReplyError(c,"Invalid TTL value, must be >= 0");
5326 return;
5327 }
5328
5329 /* Verify RDB version and data checksum. */
5330 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
5331 {
5332 addReplyError(c,"DUMP payload version or checksum are wrong");
5333 return;
5334 }
5335
5336 rioInitWithBuffer(&payload,c->argv[3]->ptr);
5337 if (((type = rdbLoadObjectType(&payload)) == -1) ||
5338 ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL))
5339 {
5340 addReplyError(c,"Bad data format");
5341 return;
5342 }
5343
5344 /* Remove the old key if needed. */
5345 int deleted = 0;
5346 if (replace)
5347 deleted = dbDelete(c->db,key);
5348
5349 if (ttl && !absttl) ttl+=mstime();
5350 if (ttl && checkAlreadyExpired(ttl)) {
5351 if (deleted) {
5352 rewriteClientCommandVector(c,2,shared.del,key);
5353 signalModifiedKey(c,c->db,key);
5354 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
5355 server.dirty++;
5356 }
5357 decrRefCount(obj);
5358 addReply(c, shared.ok);
5359 return;
5360 }
5361
5362 /* Create the key and set the TTL if any */
5363 dbAdd(c->db,key,obj);
5364 if (ttl) {
5365 setExpire(c,c->db,key,ttl);
5366 if (!absttl) {
5367 /* Propagate TTL as absolute timestamp */
5368 robj *ttl_obj = createStringObjectFromLongLong(ttl);
5369 rewriteClientCommandArgument(c,2,ttl_obj);
5370 decrRefCount(ttl_obj);
5371 rewriteClientCommandArgument(c,c->argc,shared.absttl);
5372 }
5373 }
5374 objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
5375 signalModifiedKey(c,c->db,key);
5376 notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
5377 addReply(c,shared.ok);
5378 server.dirty++;
5379 }
5380
5381 /* MIGRATE socket cache implementation.
5382 *
5383 * We take a map between host:ip and a TCP socket that we used to connect
5384 * to this instance in recent time.
5385 * This sockets are closed when the max number we cache is reached, and also
5386 * in serverCron() when they are around for more than a few seconds. */
5387 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5388 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5389
5390 typedef struct migrateCachedSocket {
5391 connection *conn;
5392 long last_dbid;
5393 time_t last_use_time;
5394 } migrateCachedSocket;
5395
5396 /* Return a migrateCachedSocket containing a TCP socket connected with the
5397 * target instance, possibly returning a cached one.
5398 *
5399 * This function is responsible of sending errors to the client if a
5400 * connection can't be established. In this case -1 is returned.
5401 * Otherwise on success the socket is returned, and the caller should not
5402 * attempt to free it after usage.
5403 *
5404 * If the caller detects an error while using the socket, migrateCloseSocket()
5405 * should be called so that the connection will be created from scratch
5406 * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5407 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5408 connection *conn;
5409 sds name = sdsempty();
5410 migrateCachedSocket *cs;
5411
5412 /* Check if we have an already cached socket for this ip:port pair. */
5413 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5414 name = sdscatlen(name,":",1);
5415 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5416 cs = dictFetchValue(server.migrate_cached_sockets,name);
5417 if (cs) {
5418 sdsfree(name);
5419 cs->last_use_time = server.unixtime;
5420 return cs;
5421 }
5422
5423 /* No cached socket, create one. */
5424 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5425 /* Too many items, drop one at random. */
5426 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5427 cs = dictGetVal(de);
5428 connClose(cs->conn);
5429 zfree(cs);
5430 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5431 }
5432
5433 /* Create the socket */
5434 conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
5435 if (connBlockingConnect(conn, c->argv[1]->ptr, atoi(c->argv[2]->ptr), timeout)
5436 != C_OK) {
5437 addReplyError(c,"-IOERR error or timeout connecting to the client");
5438 connClose(conn);
5439 sdsfree(name);
5440 return NULL;
5441 }
5442 connEnableTcpNoDelay(conn);
5443
5444 /* Add to the cache and return it to the caller. */
5445 cs = zmalloc(sizeof(*cs));
5446 cs->conn = conn;
5447
5448 cs->last_dbid = -1;
5449 cs->last_use_time = server.unixtime;
5450 dictAdd(server.migrate_cached_sockets,name,cs);
5451 return cs;
5452 }
5453
5454 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5455 void migrateCloseSocket(robj *host, robj *port) {
5456 sds name = sdsempty();
5457 migrateCachedSocket *cs;
5458
5459 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5460 name = sdscatlen(name,":",1);
5461 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5462 cs = dictFetchValue(server.migrate_cached_sockets,name);
5463 if (!cs) {
5464 sdsfree(name);
5465 return;
5466 }
5467
5468 connClose(cs->conn);
5469 zfree(cs);
5470 dictDelete(server.migrate_cached_sockets,name);
5471 sdsfree(name);
5472 }
5473
migrateCloseTimedoutSockets(void)5474 void migrateCloseTimedoutSockets(void) {
5475 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5476 dictEntry *de;
5477
5478 while((de = dictNext(di)) != NULL) {
5479 migrateCachedSocket *cs = dictGetVal(de);
5480
5481 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5482 connClose(cs->conn);
5483 zfree(cs);
5484 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5485 }
5486 }
5487 dictReleaseIterator(di);
5488 }
5489
5490 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
5491 * AUTH2 username password]
5492 *
5493 * On in the multiple keys form:
5494 *
5495 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
5496 * AUTH2 username password] KEYS key1 key2 ... keyN */
migrateCommand(client * c)5497 void migrateCommand(client *c) {
5498 migrateCachedSocket *cs;
5499 int copy = 0, replace = 0, j;
5500 char *username = NULL;
5501 char *password = NULL;
5502 long timeout;
5503 long dbid;
5504 robj **ov = NULL; /* Objects to migrate. */
5505 robj **kv = NULL; /* Key names. */
5506 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5507 rio cmd, payload;
5508 int may_retry = 1;
5509 int write_error = 0;
5510 int argv_rewritten = 0;
5511
5512 /* To support the KEYS option we need the following additional state. */
5513 int first_key = 3; /* Argument index of the first key. */
5514 int num_keys = 1; /* By default only migrate the 'key' argument. */
5515
5516 /* Parse additional options */
5517 for (j = 6; j < c->argc; j++) {
5518 int moreargs = (c->argc-1) - j;
5519 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5520 copy = 1;
5521 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5522 replace = 1;
5523 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5524 if (!moreargs) {
5525 addReplyErrorObject(c,shared.syntaxerr);
5526 return;
5527 }
5528 j++;
5529 password = c->argv[j]->ptr;
5530 redactClientCommandArgument(c,j);
5531 } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
5532 if (moreargs < 2) {
5533 addReplyErrorObject(c,shared.syntaxerr);
5534 return;
5535 }
5536 username = c->argv[++j]->ptr;
5537 redactClientCommandArgument(c,j);
5538 password = c->argv[++j]->ptr;
5539 redactClientCommandArgument(c,j);
5540 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5541 if (sdslen(c->argv[3]->ptr) != 0) {
5542 addReplyError(c,
5543 "When using MIGRATE KEYS option, the key argument"
5544 " must be set to the empty string");
5545 return;
5546 }
5547 first_key = j+1;
5548 num_keys = c->argc - j - 1;
5549 break; /* All the remaining args are keys. */
5550 } else {
5551 addReplyErrorObject(c,shared.syntaxerr);
5552 return;
5553 }
5554 }
5555
5556 /* Sanity check */
5557 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5558 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5559 {
5560 return;
5561 }
5562 if (timeout <= 0) timeout = 1000;
5563
5564 /* Check if the keys are here. If at least one key is to migrate, do it
5565 * otherwise if all the keys are missing reply with "NOKEY" to signal
5566 * the caller there was nothing to migrate. We don't return an error in
5567 * this case, since often this is due to a normal condition like the key
5568 * expiring in the meantime. */
5569 ov = zrealloc(ov,sizeof(robj*)*num_keys);
5570 kv = zrealloc(kv,sizeof(robj*)*num_keys);
5571 int oi = 0;
5572
5573 for (j = 0; j < num_keys; j++) {
5574 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5575 kv[oi] = c->argv[first_key+j];
5576 oi++;
5577 }
5578 }
5579 num_keys = oi;
5580 if (num_keys == 0) {
5581 zfree(ov); zfree(kv);
5582 addReplySds(c,sdsnew("+NOKEY\r\n"));
5583 return;
5584 }
5585
5586 try_again:
5587 write_error = 0;
5588
5589 /* Connect */
5590 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5591 if (cs == NULL) {
5592 zfree(ov); zfree(kv);
5593 return; /* error sent to the client by migrateGetSocket() */
5594 }
5595
5596 rioInitWithBuffer(&cmd,sdsempty());
5597
5598 /* Authentication */
5599 if (password) {
5600 int arity = username ? 3 : 2;
5601 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
5602 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5603 if (username) {
5604 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
5605 sdslen(username)));
5606 }
5607 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5608 sdslen(password)));
5609 }
5610
5611 /* Send the SELECT command if the current DB is not already selected. */
5612 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5613 if (select) {
5614 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5615 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5616 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5617 }
5618
5619 int non_expired = 0; /* Number of keys that we'll find non expired.
5620 Note that serializing large keys may take some time
5621 so certain keys that were found non expired by the
5622 lookupKey() function, may be expired later. */
5623
5624 /* Create RESTORE payload and generate the protocol to call the command. */
5625 for (j = 0; j < num_keys; j++) {
5626 long long ttl = 0;
5627 long long expireat = getExpire(c->db,kv[j]);
5628
5629 if (expireat != -1) {
5630 ttl = expireat-mstime();
5631 if (ttl < 0) {
5632 continue;
5633 }
5634 if (ttl < 1) ttl = 1;
5635 }
5636
5637 /* Relocate valid (non expired) keys and values into the array in successive
5638 * positions to remove holes created by the keys that were present
5639 * in the first lookup but are now expired after the second lookup. */
5640 ov[non_expired] = ov[j];
5641 kv[non_expired++] = kv[j];
5642
5643 serverAssertWithInfo(c,NULL,
5644 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5645
5646 if (server.cluster_enabled)
5647 serverAssertWithInfo(c,NULL,
5648 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5649 else
5650 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5651 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5652 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5653 sdslen(kv[j]->ptr)));
5654 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5655
5656 /* Emit the payload argument, that is the serialized object using
5657 * the DUMP format. */
5658 createDumpPayload(&payload,ov[j],kv[j],dbid);
5659 serverAssertWithInfo(c,NULL,
5660 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5661 sdslen(payload.io.buffer.ptr)));
5662 sdsfree(payload.io.buffer.ptr);
5663
5664 /* Add the REPLACE option to the RESTORE command if it was specified
5665 * as a MIGRATE option. */
5666 if (replace)
5667 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5668 }
5669
5670 /* Fix the actual number of keys we are migrating. */
5671 num_keys = non_expired;
5672
5673 /* Transfer the query to the other node in 64K chunks. */
5674 errno = 0;
5675 {
5676 sds buf = cmd.io.buffer.ptr;
5677 size_t pos = 0, towrite;
5678 int nwritten = 0;
5679
5680 while ((towrite = sdslen(buf)-pos) > 0) {
5681 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5682 nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
5683 if (nwritten != (signed)towrite) {
5684 write_error = 1;
5685 goto socket_err;
5686 }
5687 pos += nwritten;
5688 }
5689 }
5690
5691 char buf0[1024]; /* Auth reply. */
5692 char buf1[1024]; /* Select reply. */
5693 char buf2[1024]; /* Restore reply. */
5694
5695 /* Read the AUTH reply if needed. */
5696 if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
5697 goto socket_err;
5698
5699 /* Read the SELECT reply if needed. */
5700 if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
5701 goto socket_err;
5702
5703 /* Read the RESTORE replies. */
5704 int error_from_target = 0;
5705 int socket_error = 0;
5706 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5707
5708 /* Allocate the new argument vector that will replace the current command,
5709 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5710 * We allocate num_keys+1 because the additional argument is for "DEL"
5711 * command name itself. */
5712 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5713
5714 for (j = 0; j < num_keys; j++) {
5715 if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
5716 socket_error = 1;
5717 break;
5718 }
5719 if ((password && buf0[0] == '-') ||
5720 (select && buf1[0] == '-') ||
5721 buf2[0] == '-')
5722 {
5723 /* On error assume that last_dbid is no longer valid. */
5724 if (!error_from_target) {
5725 cs->last_dbid = -1;
5726 char *errbuf;
5727 if (password && buf0[0] == '-') errbuf = buf0;
5728 else if (select && buf1[0] == '-') errbuf = buf1;
5729 else errbuf = buf2;
5730
5731 error_from_target = 1;
5732 addReplyErrorFormat(c,"Target instance replied with error: %s",
5733 errbuf+1);
5734 }
5735 } else {
5736 if (!copy) {
5737 /* No COPY option: remove the local key, signal the change. */
5738 dbDelete(c->db,kv[j]);
5739 signalModifiedKey(c,c->db,kv[j]);
5740 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
5741 server.dirty++;
5742
5743 /* Populate the argument vector to replace the old one. */
5744 newargv[del_idx++] = kv[j];
5745 incrRefCount(kv[j]);
5746 }
5747 }
5748 }
5749
5750 /* On socket error, if we want to retry, do it now before rewriting the
5751 * command vector. We only retry if we are sure nothing was processed
5752 * and we failed to read the first reply (j == 0 test). */
5753 if (!error_from_target && socket_error && j == 0 && may_retry &&
5754 errno != ETIMEDOUT)
5755 {
5756 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5757 }
5758
5759 /* On socket errors, close the migration socket now that we still have
5760 * the original host/port in the ARGV. Later the original command may be
5761 * rewritten to DEL and will be too later. */
5762 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5763
5764 if (!copy) {
5765 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5766 * this only for the keys for which we received an acknowledgement
5767 * from the receiving Redis server, by using the del_idx index. */
5768 if (del_idx > 1) {
5769 newargv[0] = createStringObject("DEL",3);
5770 /* Note that the following call takes ownership of newargv. */
5771 replaceClientCommandVector(c,del_idx,newargv);
5772 argv_rewritten = 1;
5773 } else {
5774 /* No key transfer acknowledged, no need to rewrite as DEL. */
5775 zfree(newargv);
5776 }
5777 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5778 }
5779
5780 /* If we are here and a socket error happened, we don't want to retry.
5781 * Just signal the problem to the client, but only do it if we did not
5782 * already queue a different error reported by the destination server. */
5783 if (!error_from_target && socket_error) {
5784 may_retry = 0;
5785 goto socket_err;
5786 }
5787
5788 if (!error_from_target) {
5789 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5790 * avoid SELECT the next time if the target DB is the same. Reply +OK.
5791 *
5792 * Note: If we reached this point, even if socket_error is true
5793 * still the SELECT command succeeded (otherwise the code jumps to
5794 * socket_err label. */
5795 cs->last_dbid = dbid;
5796 addReply(c,shared.ok);
5797 } else {
5798 /* On error we already sent it in the for loop above, and set
5799 * the currently selected socket to -1 to force SELECT the next time. */
5800 }
5801
5802 sdsfree(cmd.io.buffer.ptr);
5803 zfree(ov); zfree(kv); zfree(newargv);
5804 return;
5805
5806 /* On socket errors we try to close the cached socket and try again.
5807 * It is very common for the cached socket to get closed, if just reopening
5808 * it works it's a shame to notify the error to the caller. */
5809 socket_err:
5810 /* Cleanup we want to perform in both the retry and no retry case.
5811 * Note: Closing the migrate socket will also force SELECT next time. */
5812 sdsfree(cmd.io.buffer.ptr);
5813
5814 /* If the command was rewritten as DEL and there was a socket error,
5815 * we already closed the socket earlier. While migrateCloseSocket()
5816 * is idempotent, the host/port arguments are now gone, so don't do it
5817 * again. */
5818 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5819 zfree(newargv);
5820 newargv = NULL; /* This will get reallocated on retry. */
5821
5822 /* Retry only if it's not a timeout and we never attempted a retry
5823 * (or the code jumping here did not set may_retry to zero). */
5824 if (errno != ETIMEDOUT && may_retry) {
5825 may_retry = 0;
5826 goto try_again;
5827 }
5828
5829 /* Cleanup we want to do if no retry is attempted. */
5830 zfree(ov); zfree(kv);
5831 addReplyErrorSds(c, sdscatprintf(sdsempty(),
5832 "-IOERR error or timeout %s to target instance",
5833 write_error ? "writing" : "reading"));
5834 return;
5835 }
5836
5837 /* -----------------------------------------------------------------------------
5838 * Cluster functions related to serving / redirecting clients
5839 * -------------------------------------------------------------------------- */
5840
5841 /* The ASKING command is required after a -ASK redirection.
5842 * The client should issue ASKING before to actually send the command to
5843 * the target instance. See the Redis Cluster specification for more
5844 * information. */
askingCommand(client * c)5845 void askingCommand(client *c) {
5846 if (server.cluster_enabled == 0) {
5847 addReplyError(c,"This instance has cluster support disabled");
5848 return;
5849 }
5850 c->flags |= CLIENT_ASKING;
5851 addReply(c,shared.ok);
5852 }
5853
5854 /* The READONLY command is used by clients to enter the read-only mode.
5855 * In this mode slaves will not redirect clients as long as clients access
5856 * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5857 void readonlyCommand(client *c) {
5858 if (server.cluster_enabled == 0) {
5859 addReplyError(c,"This instance has cluster support disabled");
5860 return;
5861 }
5862 c->flags |= CLIENT_READONLY;
5863 addReply(c,shared.ok);
5864 }
5865
5866 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5867 void readwriteCommand(client *c) {
5868 if (server.cluster_enabled == 0) {
5869 addReplyError(c,"This instance has cluster support disabled");
5870 return;
5871 }
5872 c->flags &= ~CLIENT_READONLY;
5873 addReply(c,shared.ok);
5874 }
5875
5876 /* Return the pointer to the cluster node that is able to serve the command.
5877 * For the function to succeed the command should only target either:
5878 *
5879 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5880 * 2) Multiple keys in the same hash slot, while the slot is stable (no
5881 * resharding in progress).
5882 *
5883 * On success the function returns the node that is able to serve the request.
5884 * If the node is not 'myself' a redirection must be performed. The kind of
5885 * redirection is specified setting the integer passed by reference
5886 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5887 * CLUSTER_REDIR_MOVED.
5888 *
5889 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5890 *
5891 * If the command fails NULL is returned, and the reason of the failure is
5892 * provided via 'error_code', which will be set to:
5893 *
5894 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5895 * don't belong to the same hash slot.
5896 *
5897 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5898 * belonging to the same slot, but the slot is not stable (in migration or
5899 * importing state, likely because a resharding is in progress).
5900 *
5901 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5902 * not bound to any node. In this case the cluster global state should be
5903 * already "down" but it is fragile to rely on the update of the global state,
5904 * so we also handle it here.
5905 *
5906 * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
5907 * down but the user attempts to execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5908 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5909 clusterNode *n = NULL;
5910 robj *firstkey = NULL;
5911 int multiple_keys = 0;
5912 multiState *ms, _ms;
5913 multiCmd mc;
5914 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5915
5916 /* Allow any key to be set if a module disabled cluster redirections. */
5917 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5918 return myself;
5919
5920 /* Set error code optimistically for the base case. */
5921 if (error_code) *error_code = CLUSTER_REDIR_NONE;
5922
5923 /* Modules can turn off Redis Cluster redirection: this is useful
5924 * when writing a module that implements a completely different
5925 * distributed system. */
5926
5927 /* We handle all the cases as if they were EXEC commands, so we have
5928 * a common code path for everything */
5929 if (cmd->proc == execCommand) {
5930 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5931 * error. */
5932 if (!(c->flags & CLIENT_MULTI)) return myself;
5933 ms = &c->mstate;
5934 } else {
5935 /* In order to have a single codepath create a fake Multi State
5936 * structure if the client is not in MULTI/EXEC state, this way
5937 * we have a single codepath below. */
5938 ms = &_ms;
5939 _ms.commands = &mc;
5940 _ms.count = 1;
5941 mc.argv = argv;
5942 mc.argc = argc;
5943 mc.cmd = cmd;
5944 }
5945
5946 /* Check that all the keys are in the same hash slot, and obtain this
5947 * slot and the node associated. */
5948 for (i = 0; i < ms->count; i++) {
5949 struct redisCommand *mcmd;
5950 robj **margv;
5951 int margc, *keyindex, numkeys, j;
5952
5953 mcmd = ms->commands[i].cmd;
5954 margc = ms->commands[i].argc;
5955 margv = ms->commands[i].argv;
5956
5957 getKeysResult result = GETKEYS_RESULT_INIT;
5958 numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
5959 keyindex = result.keys;
5960
5961 for (j = 0; j < numkeys; j++) {
5962 robj *thiskey = margv[keyindex[j]];
5963 int thisslot = keyHashSlot((char*)thiskey->ptr,
5964 sdslen(thiskey->ptr));
5965
5966 if (firstkey == NULL) {
5967 /* This is the first key we see. Check what is the slot
5968 * and node. */
5969 firstkey = thiskey;
5970 slot = thisslot;
5971 n = server.cluster->slots[slot];
5972
5973 /* Error: If a slot is not served, we are in "cluster down"
5974 * state. However the state is yet to be updated, so this was
5975 * not trapped earlier in processCommand(). Report the same
5976 * error to the client. */
5977 if (n == NULL) {
5978 getKeysFreeResult(&result);
5979 if (error_code)
5980 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5981 return NULL;
5982 }
5983
5984 /* If we are migrating or importing this slot, we need to check
5985 * if we have all the keys in the request (the only way we
5986 * can safely serve the request, otherwise we return a TRYAGAIN
5987 * error). To do so we set the importing/migrating state and
5988 * increment a counter for every missing key. */
5989 if (n == myself &&
5990 server.cluster->migrating_slots_to[slot] != NULL)
5991 {
5992 migrating_slot = 1;
5993 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5994 importing_slot = 1;
5995 }
5996 } else {
5997 /* If it is not the first key, make sure it is exactly
5998 * the same key as the first we saw. */
5999 if (!equalStringObjects(firstkey,thiskey)) {
6000 if (slot != thisslot) {
6001 /* Error: multiple keys from different slots. */
6002 getKeysFreeResult(&result);
6003 if (error_code)
6004 *error_code = CLUSTER_REDIR_CROSS_SLOT;
6005 return NULL;
6006 } else {
6007 /* Flag this request as one with multiple different
6008 * keys. */
6009 multiple_keys = 1;
6010 }
6011 }
6012 }
6013
6014 /* Migrating / Importing slot? Count keys we don't have. */
6015 int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
6016 if ((migrating_slot || importing_slot) &&
6017 lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
6018 {
6019 missing_keys++;
6020 }
6021 }
6022 getKeysFreeResult(&result);
6023 }
6024
6025 /* No key at all in command? then we can serve the request
6026 * without redirections or errors in all the cases. */
6027 if (n == NULL) return myself;
6028
6029 /* Cluster is globally down but we got keys? We only serve the request
6030 * if it is a read command and when allow_reads_when_down is enabled. */
6031 if (server.cluster->state != CLUSTER_OK) {
6032 if (!server.cluster_allow_reads_when_down) {
6033 /* The cluster is configured to block commands when the
6034 * cluster is down. */
6035 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
6036 return NULL;
6037 } else if (cmd->flags & CMD_WRITE) {
6038 /* The cluster is configured to allow read only commands */
6039 if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
6040 return NULL;
6041 } else {
6042 /* Fall through and allow the command to be executed:
6043 * this happens when server.cluster_allow_reads_when_down is
6044 * true and the command is not a write command */
6045 }
6046 }
6047
6048 /* Return the hashslot by reference. */
6049 if (hashslot) *hashslot = slot;
6050
6051 /* MIGRATE always works in the context of the local node if the slot
6052 * is open (migrating or importing state). We need to be able to freely
6053 * move keys among instances in this case. */
6054 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
6055 return myself;
6056
6057 /* If we don't have all the keys and we are migrating the slot, send
6058 * an ASK redirection. */
6059 if (migrating_slot && missing_keys) {
6060 if (error_code) *error_code = CLUSTER_REDIR_ASK;
6061 return server.cluster->migrating_slots_to[slot];
6062 }
6063
6064 /* If we are receiving the slot, and the client correctly flagged the
6065 * request as "ASKING", we can serve the request. However if the request
6066 * involves multiple keys and we don't have them all, the only option is
6067 * to send a TRYAGAIN error. */
6068 if (importing_slot &&
6069 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
6070 {
6071 if (multiple_keys && missing_keys) {
6072 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
6073 return NULL;
6074 } else {
6075 return myself;
6076 }
6077 }
6078
6079 /* Handle the read-only client case reading from a slave: if this
6080 * node is a slave and the request is about a hash slot our master
6081 * is serving, we can reply without redirection. */
6082 int is_write_command = (c->cmd->flags & CMD_WRITE) ||
6083 (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
6084 if (c->flags & CLIENT_READONLY &&
6085 !is_write_command &&
6086 nodeIsSlave(myself) &&
6087 myself->slaveof == n)
6088 {
6089 return myself;
6090 }
6091
6092 /* Base case: just return the right node. However if this node is not
6093 * myself, set error_code to MOVED since we need to issue a redirection. */
6094 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
6095 return n;
6096 }
6097
6098 /* Send the client the right redirection code, according to error_code
6099 * that should be set to one of CLUSTER_REDIR_* macros.
6100 *
6101 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
6102 * are used, then the node 'n' should not be NULL, but should be the
6103 * node we want to mention in the redirection. Moreover hashslot should
6104 * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)6105 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
6106 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
6107 addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
6108 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
6109 /* The request spawns multiple keys in the same slot,
6110 * but the slot is not "stable" currently as there is
6111 * a migration or import in progress. */
6112 addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
6113 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
6114 addReplyError(c,"-CLUSTERDOWN The cluster is down");
6115 } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
6116 addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
6117 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
6118 addReplyError(c,"-CLUSTERDOWN Hash slot not served");
6119 } else if (error_code == CLUSTER_REDIR_MOVED ||
6120 error_code == CLUSTER_REDIR_ASK)
6121 {
6122 /* Redirect to IP:port. Include plaintext port if cluster is TLS but
6123 * client is non-TLS. */
6124 int use_pport = (server.tls_cluster &&
6125 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
6126 int port = use_pport && n->pport ? n->pport : n->port;
6127 addReplyErrorSds(c,sdscatprintf(sdsempty(),
6128 "-%s %d %s:%d",
6129 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
6130 hashslot, n->ip, port));
6131 } else {
6132 serverPanic("getNodeByQuery() unknown error.");
6133 }
6134 }
6135
6136 /* This function is called by the function processing clients incrementally
6137 * to detect timeouts, in order to handle the following case:
6138 *
6139 * 1) A client blocks with BLPOP or similar blocking operation.
6140 * 2) The master migrates the hash slot elsewhere or turns into a slave.
6141 * 3) The client may remain blocked forever (or up to the max timeout time)
6142 * waiting for a key change that will never happen.
6143 *
6144 * If the client is found to be blocked into a hash slot this node no
6145 * longer handles, the client is sent a redirection error, and the function
6146 * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)6147 int clusterRedirectBlockedClientIfNeeded(client *c) {
6148 if (c->flags & CLIENT_BLOCKED &&
6149 (c->btype == BLOCKED_LIST ||
6150 c->btype == BLOCKED_ZSET ||
6151 c->btype == BLOCKED_STREAM ||
6152 c->btype == BLOCKED_MODULE))
6153 {
6154 dictEntry *de;
6155 dictIterator *di;
6156
6157 /* If the cluster is down, unblock the client with the right error.
6158 * If the cluster is configured to allow reads on cluster down, we
6159 * still want to emit this error since a write will be required
6160 * to unblock them which may never come. */
6161 if (server.cluster->state == CLUSTER_FAIL) {
6162 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
6163 return 1;
6164 }
6165
6166 /* If the client is blocked on module, but ont on a specific key,
6167 * don't unblock it (except for the CLSUTER_FAIL case above). */
6168 if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
6169 return 0;
6170
6171 /* All keys must belong to the same slot, so check first key only. */
6172 di = dictGetIterator(c->bpop.keys);
6173 if ((de = dictNext(di)) != NULL) {
6174 robj *key = dictGetKey(de);
6175 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
6176 clusterNode *node = server.cluster->slots[slot];
6177
6178 /* if the client is read-only and attempting to access key that our
6179 * replica can handle, allow it. */
6180 if ((c->flags & CLIENT_READONLY) &&
6181 !(c->lastcmd->flags & CMD_WRITE) &&
6182 nodeIsSlave(myself) && myself->slaveof == node)
6183 {
6184 node = myself;
6185 }
6186
6187 /* We send an error and unblock the client if:
6188 * 1) The slot is unassigned, emitting a cluster down error.
6189 * 2) The slot is not handled by this node, nor being imported. */
6190 if (node != myself &&
6191 server.cluster->importing_slots_from[slot] == NULL)
6192 {
6193 if (node == NULL) {
6194 clusterRedirectClient(c,NULL,0,
6195 CLUSTER_REDIR_DOWN_UNBOUND);
6196 } else {
6197 clusterRedirectClient(c,node,slot,
6198 CLUSTER_REDIR_MOVED);
6199 }
6200 dictReleaseIterator(di);
6201 return 1;
6202 }
6203 }
6204 dictReleaseIterator(di);
6205 }
6206 return 0;
6207 }
6208
6209 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
6210 * a fast way a key that belongs to a specified hash slot. This is useful
6211 * while rehashing the cluster and in other conditions when we need to
6212 * understand if we have keys for a given hash slot. */
6213
slotToKeyAddEntry(dictEntry * entry,redisDb * db)6214 void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
6215 sds key = entry->key;
6216 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6217 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6218 slot_to_keys->count++;
6219
6220 /* Insert entry before the first element in the list. */
6221 dictEntry *first = slot_to_keys->head;
6222 dictEntryNextInSlot(entry) = first;
6223 if (first != NULL) {
6224 serverAssert(dictEntryPrevInSlot(first) == NULL);
6225 dictEntryPrevInSlot(first) = entry;
6226 }
6227 serverAssert(dictEntryPrevInSlot(entry) == NULL);
6228 slot_to_keys->head = entry;
6229 }
6230
slotToKeyDelEntry(dictEntry * entry,redisDb * db)6231 void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
6232 sds key = entry->key;
6233 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6234 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6235 slot_to_keys->count--;
6236
6237 /* Connect previous and next entries to each other. */
6238 dictEntry *next = dictEntryNextInSlot(entry);
6239 dictEntry *prev = dictEntryPrevInSlot(entry);
6240 if (next != NULL) {
6241 dictEntryPrevInSlot(next) = prev;
6242 }
6243 if (prev != NULL) {
6244 dictEntryNextInSlot(prev) = next;
6245 } else {
6246 /* The removed entry was the first in the list. */
6247 serverAssert(slot_to_keys->head == entry);
6248 slot_to_keys->head = next;
6249 }
6250 }
6251
6252 /* Updates neighbour entries when an entry has been replaced (e.g. reallocated
6253 * during active defrag). */
slotToKeyReplaceEntry(dictEntry * entry,redisDb * db)6254 void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
6255 dictEntry *next = dictEntryNextInSlot(entry);
6256 dictEntry *prev = dictEntryPrevInSlot(entry);
6257 if (next != NULL) {
6258 dictEntryPrevInSlot(next) = entry;
6259 }
6260 if (prev != NULL) {
6261 dictEntryNextInSlot(prev) = entry;
6262 } else {
6263 /* The replaced entry was the first in the list. */
6264 sds key = entry->key;
6265 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6266 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6267 slot_to_keys->head = entry;
6268 }
6269 }
6270
6271 /* Initialize slots-keys map of given db. */
slotToKeyInit(redisDb * db)6272 void slotToKeyInit(redisDb *db) {
6273 db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
6274 }
6275
6276 /* Empty slots-keys map of given db. */
slotToKeyFlush(redisDb * db)6277 void slotToKeyFlush(redisDb *db) {
6278 memset(db->slots_to_keys, 0,
6279 sizeof(clusterSlotToKeyMapping));
6280 }
6281
6282 /* Free slots-keys map of given db. */
slotToKeyDestroy(redisDb * db)6283 void slotToKeyDestroy(redisDb *db) {
6284 zfree(db->slots_to_keys);
6285 db->slots_to_keys = NULL;
6286 }
6287
6288 /* Remove all the keys in the specified hash slot.
6289 * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)6290 unsigned int delKeysInSlot(unsigned int hashslot) {
6291 unsigned int j = 0;
6292 dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
6293 while (de != NULL) {
6294 sds sdskey = dictGetKey(de);
6295 de = dictEntryNextInSlot(de);
6296 robj *key = createStringObject(sdskey, sdslen(sdskey));
6297 dbDelete(&server.db[0], key);
6298 decrRefCount(key);
6299 j++;
6300 }
6301 return j;
6302 }
6303
countKeysInSlot(unsigned int hashslot)6304 unsigned int countKeysInSlot(unsigned int hashslot) {
6305 return (*server.db->slots_to_keys).by_slot[hashslot].count;
6306 }
6307