1 /* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43
44 /* A global reference to myself is handy to make code more clear.
45 * Myself always points to server.cluster->myself, that is, the clusterNode
46 * that represents this node. */
47 clusterNode *myself = NULL;
48
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79
80 /* -----------------------------------------------------------------------------
81 * Initialization
82 * -------------------------------------------------------------------------- */
83
84 /* Load the cluster config from 'filename'.
85 *
86 * If the file does not exist or is zero-length (this may happen because
87 * when we lock the nodes.conf file, we create a zero-length one for the
88 * sake of locking if it does not already exist), C_ERR is returned.
89 * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91 FILE *fp = fopen(filename,"r");
92 struct stat sb;
93 char *line;
94 int maxline, j;
95
96 if (fp == NULL) {
97 if (errno == ENOENT) {
98 return C_ERR;
99 } else {
100 serverLog(LL_WARNING,
101 "Loading the cluster node config from %s: %s",
102 filename, strerror(errno));
103 exit(1);
104 }
105 }
106
107 /* Check if the file is zero-length: if so return C_ERR to signal
108 * we have to write the config. */
109 if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110 fclose(fp);
111 return C_ERR;
112 }
113
114 /* Parse the file. Note that single lines of the cluster config file can
115 * be really long as they include all the hash slots of the node.
116 * This means in the worst possible case, half of the Redis slots will be
117 * present in a single line, possibly in importing or migrating state, so
118 * together with the node ID of the sender/receiver.
119 *
120 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121 maxline = 1024+CLUSTER_SLOTS*128;
122 line = zmalloc(maxline);
123 while(fgets(line,maxline,fp) != NULL) {
124 int argc;
125 sds *argv;
126 clusterNode *n, *master;
127 char *p, *s;
128
129 /* Skip blank lines, they can be created either by users manually
130 * editing nodes.conf or by the config writing process if stopped
131 * before the truncate() call. */
132 if (line[0] == '\n' || line[0] == '\0') continue;
133
134 /* Split the line into arguments for processing. */
135 argv = sdssplitargs(line,&argc);
136 if (argv == NULL) goto fmterr;
137
138 /* Handle the special "vars" line. Don't pretend it is the last
139 * line even if it actually is when generated by Redis. */
140 if (strcasecmp(argv[0],"vars") == 0) {
141 if (!(argc % 2)) goto fmterr;
142 for (j = 1; j < argc; j += 2) {
143 if (strcasecmp(argv[j],"currentEpoch") == 0) {
144 server.cluster->currentEpoch =
145 strtoull(argv[j+1],NULL,10);
146 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
147 server.cluster->lastVoteEpoch =
148 strtoull(argv[j+1],NULL,10);
149 } else {
150 serverLog(LL_WARNING,
151 "Skipping unknown cluster config variable '%s'",
152 argv[j]);
153 }
154 }
155 sdsfreesplitres(argv,argc);
156 continue;
157 }
158
159 /* Regular config lines have at least eight fields */
160 if (argc < 8) {
161 sdsfreesplitres(argv,argc);
162 goto fmterr;
163 }
164
165 /* Create this node if it does not exist */
166 n = clusterLookupNode(argv[0]);
167 if (!n) {
168 n = createClusterNode(argv[0],0);
169 clusterAddNode(n);
170 }
171 /* Address and port */
172 if ((p = strrchr(argv[1],':')) == NULL) {
173 sdsfreesplitres(argv,argc);
174 goto fmterr;
175 }
176 *p = '\0';
177 memcpy(n->ip,argv[1],strlen(argv[1])+1);
178 char *port = p+1;
179 char *busp = strchr(port,'@');
180 if (busp) {
181 *busp = '\0';
182 busp++;
183 }
184 n->port = atoi(port);
185 /* In older versions of nodes.conf the "@busport" part is missing.
186 * In this case we set it to the default offset of 10000 from the
187 * base port. */
188 n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
189
190 /* Parse flags */
191 p = s = argv[2];
192 while(p) {
193 p = strchr(s,',');
194 if (p) *p = '\0';
195 if (!strcasecmp(s,"myself")) {
196 serverAssert(server.cluster->myself == NULL);
197 myself = server.cluster->myself = n;
198 n->flags |= CLUSTER_NODE_MYSELF;
199 } else if (!strcasecmp(s,"master")) {
200 n->flags |= CLUSTER_NODE_MASTER;
201 } else if (!strcasecmp(s,"slave")) {
202 n->flags |= CLUSTER_NODE_SLAVE;
203 } else if (!strcasecmp(s,"fail?")) {
204 n->flags |= CLUSTER_NODE_PFAIL;
205 } else if (!strcasecmp(s,"fail")) {
206 n->flags |= CLUSTER_NODE_FAIL;
207 n->fail_time = mstime();
208 } else if (!strcasecmp(s,"handshake")) {
209 n->flags |= CLUSTER_NODE_HANDSHAKE;
210 } else if (!strcasecmp(s,"noaddr")) {
211 n->flags |= CLUSTER_NODE_NOADDR;
212 } else if (!strcasecmp(s,"nofailover")) {
213 n->flags |= CLUSTER_NODE_NOFAILOVER;
214 } else if (!strcasecmp(s,"noflags")) {
215 /* nothing to do */
216 } else {
217 serverPanic("Unknown flag in redis cluster config file");
218 }
219 if (p) s = p+1;
220 }
221
222 /* Get master if any. Set the master and populate master's
223 * slave list. */
224 if (argv[3][0] != '-') {
225 master = clusterLookupNode(argv[3]);
226 if (!master) {
227 master = createClusterNode(argv[3],0);
228 clusterAddNode(master);
229 }
230 n->slaveof = master;
231 clusterNodeAddSlave(master,n);
232 }
233
234 /* Set ping sent / pong received timestamps */
235 if (atoi(argv[4])) n->ping_sent = mstime();
236 if (atoi(argv[5])) n->pong_received = mstime();
237
238 /* Set configEpoch for this node. */
239 n->configEpoch = strtoull(argv[6],NULL,10);
240
241 /* Populate hash slots served by this instance. */
242 for (j = 8; j < argc; j++) {
243 int start, stop;
244
245 if (argv[j][0] == '[') {
246 /* Here we handle migrating / importing slots */
247 int slot;
248 char direction;
249 clusterNode *cn;
250
251 p = strchr(argv[j],'-');
252 serverAssert(p != NULL);
253 *p = '\0';
254 direction = p[1]; /* Either '>' or '<' */
255 slot = atoi(argv[j]+1);
256 if (slot < 0 || slot >= CLUSTER_SLOTS) {
257 sdsfreesplitres(argv,argc);
258 goto fmterr;
259 }
260 p += 3;
261 cn = clusterLookupNode(p);
262 if (!cn) {
263 cn = createClusterNode(p,0);
264 clusterAddNode(cn);
265 }
266 if (direction == '>') {
267 server.cluster->migrating_slots_to[slot] = cn;
268 } else {
269 server.cluster->importing_slots_from[slot] = cn;
270 }
271 continue;
272 } else if ((p = strchr(argv[j],'-')) != NULL) {
273 *p = '\0';
274 start = atoi(argv[j]);
275 stop = atoi(p+1);
276 } else {
277 start = stop = atoi(argv[j]);
278 }
279 if (start < 0 || start >= CLUSTER_SLOTS ||
280 stop < 0 || stop >= CLUSTER_SLOTS)
281 {
282 sdsfreesplitres(argv,argc);
283 goto fmterr;
284 }
285 while(start <= stop) clusterAddSlot(n, start++);
286 }
287
288 sdsfreesplitres(argv,argc);
289 }
290 /* Config sanity check */
291 if (server.cluster->myself == NULL) goto fmterr;
292
293 zfree(line);
294 fclose(fp);
295
296 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
297
298 /* Something that should never happen: currentEpoch smaller than
299 * the max epoch found in the nodes configuration. However we handle this
300 * as some form of protection against manual editing of critical files. */
301 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
302 server.cluster->currentEpoch = clusterGetMaxEpoch();
303 }
304 return C_OK;
305
306 fmterr:
307 serverLog(LL_WARNING,
308 "Unrecoverable error: corrupted cluster config file.");
309 zfree(line);
310 if (fp) fclose(fp);
311 exit(1);
312 }
313
314 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
315 *
316 * This function writes the node config and returns 0, on error -1
317 * is returned.
318 *
319 * Note: we need to write the file in an atomic way from the point of view
320 * of the POSIX filesystem semantics, so that if the server is stopped
321 * or crashes during the write, we'll end with either the old file or the
322 * new one. Since we have the full payload to write available we can use
323 * a single write to write the whole file. If the pre-existing file was
324 * bigger we pad our payload with newlines that are anyway ignored and truncate
325 * the file afterward. */
clusterSaveConfig(int do_fsync)326 int clusterSaveConfig(int do_fsync) {
327 sds ci;
328 size_t content_size;
329 struct stat sb;
330 int fd;
331
332 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
333
334 /* Get the nodes description and concatenate our "vars" directive to
335 * save currentEpoch and lastVoteEpoch. */
336 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
337 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
338 (unsigned long long) server.cluster->currentEpoch,
339 (unsigned long long) server.cluster->lastVoteEpoch);
340 content_size = sdslen(ci);
341
342 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
343 == -1) goto err;
344
345 /* Pad the new payload if the existing file length is greater. */
346 if (fstat(fd,&sb) != -1) {
347 if (sb.st_size > (off_t)content_size) {
348 ci = sdsgrowzero(ci,sb.st_size);
349 memset(ci+content_size,'\n',sb.st_size-content_size);
350 }
351 }
352 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
353 if (do_fsync) {
354 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
355 fsync(fd);
356 }
357
358 /* Truncate the file if needed to remove the final \n padding that
359 * is just garbage. */
360 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
361 /* ftruncate() failing is not a critical error. */
362 }
363 close(fd);
364 sdsfree(ci);
365 return 0;
366
367 err:
368 if (fd != -1) close(fd);
369 sdsfree(ci);
370 return -1;
371 }
372
clusterSaveConfigOrDie(int do_fsync)373 void clusterSaveConfigOrDie(int do_fsync) {
374 if (clusterSaveConfig(do_fsync) == -1) {
375 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
376 exit(1);
377 }
378 }
379
380 /* Lock the cluster config using flock(), and leaks the file descritor used to
381 * acquire the lock so that the file will be locked forever.
382 *
383 * This works because we always update nodes.conf with a new version
384 * in-place, reopening the file, and writing to it in place (later adjusting
385 * the length with ftruncate()).
386 *
387 * On success C_OK is returned, otherwise an error is logged and
388 * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)389 int clusterLockConfig(char *filename) {
390 /* flock() does not exist on Solaris
391 * and a fcntl-based solution won't help, as we constantly re-open that file,
392 * which will release _all_ locks anyway
393 */
394 #if !defined(__sun)
395 /* To lock it, we need to open the file in a way it is created if
396 * it does not exist, otherwise there is a race condition with other
397 * processes. */
398 int fd = open(filename,O_WRONLY|O_CREAT,0644);
399 if (fd == -1) {
400 serverLog(LL_WARNING,
401 "Can't open %s in order to acquire a lock: %s",
402 filename, strerror(errno));
403 return C_ERR;
404 }
405
406 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
407 if (errno == EWOULDBLOCK) {
408 serverLog(LL_WARNING,
409 "Sorry, the cluster configuration file %s is already used "
410 "by a different Redis Cluster node. Please make sure that "
411 "different nodes use different cluster configuration "
412 "files.", filename);
413 } else {
414 serverLog(LL_WARNING,
415 "Impossible to lock %s: %s", filename, strerror(errno));
416 }
417 close(fd);
418 return C_ERR;
419 }
420 /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
421 * lock to the file as long as the process exists.
422 *
423 * After fork, the child process will get the fd opened by the parent process,
424 * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
425 * it will be closed in the child process.
426 * If it is not closed, when the main process is killed -9, but the child process
427 * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
428 * child process, and the main process will fail to get lock, means fail to start. */
429 server.cluster_config_file_lock_fd = fd;
430 #endif /* __sun */
431
432 return C_OK;
433 }
434
435 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
436 * in the "myself" node based on the current configuration of the node,
437 * that may change at runtime via CONFIG SET. This function changes the
438 * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)439 void clusterUpdateMyselfFlags(void) {
440 int oldflags = myself->flags;
441 int nofailover = server.cluster_slave_no_failover ?
442 CLUSTER_NODE_NOFAILOVER : 0;
443 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
444 myself->flags |= nofailover;
445 if (myself->flags != oldflags) {
446 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
447 CLUSTER_TODO_UPDATE_STATE);
448 }
449 }
450
clusterInit(void)451 void clusterInit(void) {
452 int saveconf = 0;
453
454 server.cluster = zmalloc(sizeof(clusterState));
455 server.cluster->myself = NULL;
456 server.cluster->currentEpoch = 0;
457 server.cluster->state = CLUSTER_FAIL;
458 server.cluster->size = 1;
459 server.cluster->todo_before_sleep = 0;
460 server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
461 server.cluster->nodes_black_list =
462 dictCreate(&clusterNodesBlackListDictType,NULL);
463 server.cluster->failover_auth_time = 0;
464 server.cluster->failover_auth_count = 0;
465 server.cluster->failover_auth_rank = 0;
466 server.cluster->failover_auth_epoch = 0;
467 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
468 server.cluster->lastVoteEpoch = 0;
469 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
470 server.cluster->stats_bus_messages_sent[i] = 0;
471 server.cluster->stats_bus_messages_received[i] = 0;
472 }
473 server.cluster->stats_pfail_nodes = 0;
474 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
475 clusterCloseAllSlots();
476
477 /* Lock the cluster config file to make sure every node uses
478 * its own nodes.conf. */
479 server.cluster_config_file_lock_fd = -1;
480 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
481 exit(1);
482
483 /* Load or create a new nodes configuration. */
484 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
485 /* No configuration found. We will just use the random name provided
486 * by the createClusterNode() function. */
487 myself = server.cluster->myself =
488 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
489 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
490 myself->name);
491 clusterAddNode(myself);
492 saveconf = 1;
493 }
494 if (saveconf) clusterSaveConfigOrDie(1);
495
496 /* We need a listening TCP port for our cluster messaging needs. */
497 server.cfd_count = 0;
498
499 /* Port sanity check II
500 * The other handshake port check is triggered too late to stop
501 * us from trying to use a too-high cluster port number. */
502 if (server.port > (65535-CLUSTER_PORT_INCR)) {
503 serverLog(LL_WARNING, "Redis port number too high. "
504 "Cluster communication port is 10,000 port "
505 "numbers higher than your Redis port. "
506 "Your Redis port number must be "
507 "lower than 55535.");
508 exit(1);
509 }
510
511 if (listenToPort(server.port+CLUSTER_PORT_INCR,
512 server.cfd,&server.cfd_count) == C_ERR)
513 {
514 exit(1);
515 } else {
516 int j;
517
518 for (j = 0; j < server.cfd_count; j++) {
519 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
520 clusterAcceptHandler, NULL) == AE_ERR)
521 serverPanic("Unrecoverable error creating Redis Cluster "
522 "file event.");
523 }
524 }
525
526 /* The slots -> keys map is a radix tree. Initialize it here. */
527 server.cluster->slots_to_keys = raxNew();
528 memset(server.cluster->slots_keys_count,0,
529 sizeof(server.cluster->slots_keys_count));
530
531 /* Set myself->port / cport to my listening ports, we'll just need to
532 * discover the IP address via MEET messages. */
533 myself->port = server.port;
534 myself->cport = server.port+CLUSTER_PORT_INCR;
535 if (server.cluster_announce_port)
536 myself->port = server.cluster_announce_port;
537 if (server.cluster_announce_bus_port)
538 myself->cport = server.cluster_announce_bus_port;
539
540 server.cluster->mf_end = 0;
541 resetManualFailover();
542 clusterUpdateMyselfFlags();
543 }
544
545 /* Reset a node performing a soft or hard reset:
546 *
547 * 1) All other nodes are forget.
548 * 2) All the assigned / open slots are released.
549 * 3) If the node is a slave, it turns into a master.
550 * 5) Only for hard reset: a new Node ID is generated.
551 * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
552 * 7) The new configuration is saved and the cluster state updated.
553 * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)554 void clusterReset(int hard) {
555 dictIterator *di;
556 dictEntry *de;
557 int j;
558
559 /* Turn into master. */
560 if (nodeIsSlave(myself)) {
561 clusterSetNodeAsMaster(myself);
562 replicationUnsetMaster();
563 emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
564 }
565
566 /* Close slots, reset manual failover state. */
567 clusterCloseAllSlots();
568 resetManualFailover();
569
570 /* Unassign all the slots. */
571 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
572
573 /* Forget all the nodes, but myself. */
574 di = dictGetSafeIterator(server.cluster->nodes);
575 while((de = dictNext(di)) != NULL) {
576 clusterNode *node = dictGetVal(de);
577
578 if (node == myself) continue;
579 clusterDelNode(node);
580 }
581 dictReleaseIterator(di);
582
583 /* Hard reset only: set epochs to 0, change node ID. */
584 if (hard) {
585 sds oldname;
586
587 server.cluster->currentEpoch = 0;
588 server.cluster->lastVoteEpoch = 0;
589 myself->configEpoch = 0;
590 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
591
592 /* To change the Node ID we need to remove the old name from the
593 * nodes table, change the ID, and re-add back with new name. */
594 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
595 dictDelete(server.cluster->nodes,oldname);
596 sdsfree(oldname);
597 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
598 clusterAddNode(myself);
599 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
600 }
601
602 /* Make sure to persist the new config and update the state. */
603 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
604 CLUSTER_TODO_UPDATE_STATE|
605 CLUSTER_TODO_FSYNC_CONFIG);
606 }
607
608 /* -----------------------------------------------------------------------------
609 * CLUSTER communication link
610 * -------------------------------------------------------------------------- */
611
createClusterLink(clusterNode * node)612 clusterLink *createClusterLink(clusterNode *node) {
613 clusterLink *link = zmalloc(sizeof(*link));
614 link->ctime = mstime();
615 link->sndbuf = sdsempty();
616 link->rcvbuf = sdsempty();
617 link->node = node;
618 link->fd = -1;
619 return link;
620 }
621
622 /* Free a cluster link, but does not free the associated node of course.
623 * This function will just make sure that the original node associated
624 * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)625 void freeClusterLink(clusterLink *link) {
626 if (link->fd != -1) {
627 aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
628 }
629 sdsfree(link->sndbuf);
630 sdsfree(link->rcvbuf);
631 if (link->node)
632 link->node->link = NULL;
633 close(link->fd);
634 zfree(link);
635 }
636
637 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)638 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
639 int cport, cfd;
640 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
641 char cip[NET_IP_STR_LEN];
642 clusterLink *link;
643 UNUSED(el);
644 UNUSED(mask);
645 UNUSED(privdata);
646
647 /* If the server is starting up, don't accept cluster connections:
648 * UPDATE messages may interact with the database content. */
649 if (server.masterhost == NULL && server.loading) return;
650
651 while(max--) {
652 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
653 if (cfd == ANET_ERR) {
654 if (errno != EWOULDBLOCK)
655 serverLog(LL_VERBOSE,
656 "Error accepting cluster node: %s", server.neterr);
657 return;
658 }
659 anetNonBlock(NULL,cfd);
660 anetEnableTcpNoDelay(NULL,cfd);
661
662 /* Use non-blocking I/O for cluster messages. */
663 serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
664 /* Create a link object we use to handle the connection.
665 * It gets passed to the readable handler when data is available.
666 * Initiallly the link->node pointer is set to NULL as we don't know
667 * which node is, but the right node is references once we know the
668 * node identity. */
669 link = createClusterLink(NULL);
670 link->fd = cfd;
671 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
672 }
673 }
674
675 /* -----------------------------------------------------------------------------
676 * Key space handling
677 * -------------------------------------------------------------------------- */
678
679 /* We have 16384 hash slots. The hash slot of a given key is obtained
680 * as the least significant 14 bits of the crc16 of the key.
681 *
682 * However if the key contains the {...} pattern, only the part between
683 * { and } is hashed. This may be useful in the future to force certain
684 * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)685 unsigned int keyHashSlot(char *key, int keylen) {
686 int s, e; /* start-end indexes of { and } */
687
688 for (s = 0; s < keylen; s++)
689 if (key[s] == '{') break;
690
691 /* No '{' ? Hash the whole key. This is the base case. */
692 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
693
694 /* '{' found? Check if we have the corresponding '}'. */
695 for (e = s+1; e < keylen; e++)
696 if (key[e] == '}') break;
697
698 /* No '}' or nothing between {} ? Hash the whole key. */
699 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
700
701 /* If we are here there is both a { and a } on its right. Hash
702 * what is in the middle between { and }. */
703 return crc16(key+s+1,e-s-1) & 0x3FFF;
704 }
705
706 /* -----------------------------------------------------------------------------
707 * CLUSTER node API
708 * -------------------------------------------------------------------------- */
709
710 /* Create a new cluster node, with the specified flags.
711 * If "nodename" is NULL this is considered a first handshake and a random
712 * node name is assigned to this node (it will be fixed later when we'll
713 * receive the first pong).
714 *
715 * The node is created and returned to the user, but it is not automatically
716 * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)717 clusterNode *createClusterNode(char *nodename, int flags) {
718 clusterNode *node = zmalloc(sizeof(*node));
719
720 if (nodename)
721 memcpy(node->name, nodename, CLUSTER_NAMELEN);
722 else
723 getRandomHexChars(node->name, CLUSTER_NAMELEN);
724 node->ctime = mstime();
725 node->configEpoch = 0;
726 node->flags = flags;
727 memset(node->slots,0,sizeof(node->slots));
728 node->numslots = 0;
729 node->numslaves = 0;
730 node->slaves = NULL;
731 node->slaveof = NULL;
732 node->ping_sent = node->pong_received = 0;
733 node->data_received = 0;
734 node->fail_time = 0;
735 node->link = NULL;
736 memset(node->ip,0,sizeof(node->ip));
737 node->port = 0;
738 node->cport = 0;
739 node->fail_reports = listCreate();
740 node->voted_time = 0;
741 node->orphaned_time = 0;
742 node->repl_offset_time = 0;
743 node->repl_offset = 0;
744 listSetFreeMethod(node->fail_reports,zfree);
745 return node;
746 }
747
748 /* This function is called every time we get a failure report from a node.
749 * The side effect is to populate the fail_reports list (or to update
750 * the timestamp of an existing report).
751 *
752 * 'failing' is the node that is in failure state according to the
753 * 'sender' node.
754 *
755 * The function returns 0 if it just updates a timestamp of an existing
756 * failure report from the same sender. 1 is returned if a new failure
757 * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)758 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
759 list *l = failing->fail_reports;
760 listNode *ln;
761 listIter li;
762 clusterNodeFailReport *fr;
763
764 /* If a failure report from the same sender already exists, just update
765 * the timestamp. */
766 listRewind(l,&li);
767 while ((ln = listNext(&li)) != NULL) {
768 fr = ln->value;
769 if (fr->node == sender) {
770 fr->time = mstime();
771 return 0;
772 }
773 }
774
775 /* Otherwise create a new report. */
776 fr = zmalloc(sizeof(*fr));
777 fr->node = sender;
778 fr->time = mstime();
779 listAddNodeTail(l,fr);
780 return 1;
781 }
782
783 /* Remove failure reports that are too old, where too old means reasonably
784 * older than the global node timeout. Note that anyway for a node to be
785 * flagged as FAIL we need to have a local PFAIL state that is at least
786 * older than the global node timeout, so we don't just trust the number
787 * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)788 void clusterNodeCleanupFailureReports(clusterNode *node) {
789 list *l = node->fail_reports;
790 listNode *ln;
791 listIter li;
792 clusterNodeFailReport *fr;
793 mstime_t maxtime = server.cluster_node_timeout *
794 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
795 mstime_t now = mstime();
796
797 listRewind(l,&li);
798 while ((ln = listNext(&li)) != NULL) {
799 fr = ln->value;
800 if (now - fr->time > maxtime) listDelNode(l,ln);
801 }
802 }
803
804 /* Remove the failing report for 'node' if it was previously considered
805 * failing by 'sender'. This function is called when a node informs us via
806 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
807 *
808 * Note that this function is called relatively often as it gets called even
809 * when there are no nodes failing, and is O(N), however when the cluster is
810 * fine the failure reports list is empty so the function runs in constant
811 * time.
812 *
813 * The function returns 1 if the failure report was found and removed.
814 * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)815 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
816 list *l = node->fail_reports;
817 listNode *ln;
818 listIter li;
819 clusterNodeFailReport *fr;
820
821 /* Search for a failure report from this sender. */
822 listRewind(l,&li);
823 while ((ln = listNext(&li)) != NULL) {
824 fr = ln->value;
825 if (fr->node == sender) break;
826 }
827 if (!ln) return 0; /* No failure report from this sender. */
828
829 /* Remove the failure report. */
830 listDelNode(l,ln);
831 clusterNodeCleanupFailureReports(node);
832 return 1;
833 }
834
835 /* Return the number of external nodes that believe 'node' is failing,
836 * not including this node, that may have a PFAIL or FAIL state for this
837 * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)838 int clusterNodeFailureReportsCount(clusterNode *node) {
839 clusterNodeCleanupFailureReports(node);
840 return listLength(node->fail_reports);
841 }
842
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)843 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
844 int j;
845
846 for (j = 0; j < master->numslaves; j++) {
847 if (master->slaves[j] == slave) {
848 if ((j+1) < master->numslaves) {
849 int remaining_slaves = (master->numslaves - j) - 1;
850 memmove(master->slaves+j,master->slaves+(j+1),
851 (sizeof(*master->slaves) * remaining_slaves));
852 }
853 master->numslaves--;
854 if (master->numslaves == 0)
855 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
856 return C_OK;
857 }
858 }
859 return C_ERR;
860 }
861
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)862 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
863 int j;
864
865 /* If it's already a slave, don't add it again. */
866 for (j = 0; j < master->numslaves; j++)
867 if (master->slaves[j] == slave) return C_ERR;
868 master->slaves = zrealloc(master->slaves,
869 sizeof(clusterNode*)*(master->numslaves+1));
870 master->slaves[master->numslaves] = slave;
871 master->numslaves++;
872 master->flags |= CLUSTER_NODE_MIGRATE_TO;
873 return C_OK;
874 }
875
clusterCountNonFailingSlaves(clusterNode * n)876 int clusterCountNonFailingSlaves(clusterNode *n) {
877 int j, okslaves = 0;
878
879 for (j = 0; j < n->numslaves; j++)
880 if (!nodeFailed(n->slaves[j])) okslaves++;
881 return okslaves;
882 }
883
884 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)885 void freeClusterNode(clusterNode *n) {
886 sds nodename;
887 int j;
888
889 /* If the node has associated slaves, we have to set
890 * all the slaves->slaveof fields to NULL (unknown). */
891 for (j = 0; j < n->numslaves; j++)
892 n->slaves[j]->slaveof = NULL;
893
894 /* Remove this node from the list of slaves of its master. */
895 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
896
897 /* Unlink from the set of nodes. */
898 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
899 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
900 sdsfree(nodename);
901
902 /* Release link and associated data structures. */
903 if (n->link) freeClusterLink(n->link);
904 listRelease(n->fail_reports);
905 zfree(n->slaves);
906 zfree(n);
907 }
908
909 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)910 int clusterAddNode(clusterNode *node) {
911 int retval;
912
913 retval = dictAdd(server.cluster->nodes,
914 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
915 return (retval == DICT_OK) ? C_OK : C_ERR;
916 }
917
918 /* Remove a node from the cluster. The functio performs the high level
919 * cleanup, calling freeClusterNode() for the low level cleanup.
920 * Here we do the following:
921 *
922 * 1) Mark all the slots handled by it as unassigned.
923 * 2) Remove all the failure reports sent by this node and referenced by
924 * other nodes.
925 * 3) Free the node with freeClusterNode() that will in turn remove it
926 * from the hash table and from the list of slaves of its master, if
927 * it is a slave node.
928 */
clusterDelNode(clusterNode * delnode)929 void clusterDelNode(clusterNode *delnode) {
930 int j;
931 dictIterator *di;
932 dictEntry *de;
933
934 /* 1) Mark slots as unassigned. */
935 for (j = 0; j < CLUSTER_SLOTS; j++) {
936 if (server.cluster->importing_slots_from[j] == delnode)
937 server.cluster->importing_slots_from[j] = NULL;
938 if (server.cluster->migrating_slots_to[j] == delnode)
939 server.cluster->migrating_slots_to[j] = NULL;
940 if (server.cluster->slots[j] == delnode)
941 clusterDelSlot(j);
942 }
943
944 /* 2) Remove failure reports. */
945 di = dictGetSafeIterator(server.cluster->nodes);
946 while((de = dictNext(di)) != NULL) {
947 clusterNode *node = dictGetVal(de);
948
949 if (node == delnode) continue;
950 clusterNodeDelFailureReport(node,delnode);
951 }
952 dictReleaseIterator(di);
953
954 /* 3) Free the node, unlinking it from the cluster. */
955 freeClusterNode(delnode);
956 }
957
958 /* Node lookup by name */
clusterLookupNode(const char * name)959 clusterNode *clusterLookupNode(const char *name) {
960 sds s = sdsnewlen(name, CLUSTER_NAMELEN);
961 dictEntry *de;
962
963 de = dictFind(server.cluster->nodes,s);
964 sdsfree(s);
965 if (de == NULL) return NULL;
966 return dictGetVal(de);
967 }
968
969 /* This is only used after the handshake. When we connect a given IP/PORT
970 * as a result of CLUSTER MEET we don't have the node name yet, so we
971 * pick a random one, and will fix it when we receive the PONG request using
972 * this function. */
clusterRenameNode(clusterNode * node,char * newname)973 void clusterRenameNode(clusterNode *node, char *newname) {
974 int retval;
975 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
976
977 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
978 node->name, newname);
979 retval = dictDelete(server.cluster->nodes, s);
980 sdsfree(s);
981 serverAssert(retval == DICT_OK);
982 memcpy(node->name, newname, CLUSTER_NAMELEN);
983 clusterAddNode(node);
984 }
985
986 /* -----------------------------------------------------------------------------
987 * CLUSTER config epoch handling
988 * -------------------------------------------------------------------------- */
989
990 /* Return the greatest configEpoch found in the cluster, or the current
991 * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)992 uint64_t clusterGetMaxEpoch(void) {
993 uint64_t max = 0;
994 dictIterator *di;
995 dictEntry *de;
996
997 di = dictGetSafeIterator(server.cluster->nodes);
998 while((de = dictNext(di)) != NULL) {
999 clusterNode *node = dictGetVal(de);
1000 if (node->configEpoch > max) max = node->configEpoch;
1001 }
1002 dictReleaseIterator(di);
1003 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1004 return max;
1005 }
1006
1007 /* If this node epoch is zero or is not already the greatest across the
1008 * cluster (from the POV of the local configuration), this function will:
1009 *
1010 * 1) Generate a new config epoch, incrementing the current epoch.
1011 * 2) Assign the new epoch to this node, WITHOUT any consensus.
1012 * 3) Persist the configuration on disk before sending packets with the
1013 * new configuration.
1014 *
1015 * If the new config epoch is generated and assigend, C_OK is returned,
1016 * otherwise C_ERR is returned (since the node has already the greatest
1017 * configuration around) and no operation is performed.
1018 *
1019 * Important note: this function violates the principle that config epochs
1020 * should be generated with consensus and should be unique across the cluster.
1021 * However Redis Cluster uses this auto-generated new config epochs in two
1022 * cases:
1023 *
1024 * 1) When slots are closed after importing. Otherwise resharding would be
1025 * too expensive.
1026 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1027 * failover its master even if there is not master majority able to
1028 * create a new configuration epoch.
1029 *
1030 * Redis Cluster will not explode using this function, even in the case of
1031 * a collision between this node and another node, generating the same
1032 * configuration epoch unilaterally, because the config epoch conflict
1033 * resolution algorithm will eventually move colliding nodes to different
1034 * config epochs. However using this function may violate the "last failover
1035 * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1036 int clusterBumpConfigEpochWithoutConsensus(void) {
1037 uint64_t maxEpoch = clusterGetMaxEpoch();
1038
1039 if (myself->configEpoch == 0 ||
1040 myself->configEpoch != maxEpoch)
1041 {
1042 server.cluster->currentEpoch++;
1043 myself->configEpoch = server.cluster->currentEpoch;
1044 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1045 CLUSTER_TODO_FSYNC_CONFIG);
1046 serverLog(LL_WARNING,
1047 "New configEpoch set to %llu",
1048 (unsigned long long) myself->configEpoch);
1049 return C_OK;
1050 } else {
1051 return C_ERR;
1052 }
1053 }
1054
1055 /* This function is called when this node is a master, and we receive from
1056 * another master a configuration epoch that is equal to our configuration
1057 * epoch.
1058 *
1059 * BACKGROUND
1060 *
1061 * It is not possible that different slaves get the same config
1062 * epoch during a failover election, because the slaves need to get voted
1063 * by a majority. However when we perform a manual resharding of the cluster
1064 * the node will assign a configuration epoch to itself without to ask
1065 * for agreement. Usually resharding happens when the cluster is working well
1066 * and is supervised by the sysadmin, however it is possible for a failover
1067 * to happen exactly while the node we are resharding a slot to assigns itself
1068 * a new configuration epoch, but before it is able to propagate it.
1069 *
1070 * So technically it is possible in this condition that two nodes end with
1071 * the same configuration epoch.
1072 *
1073 * Another possibility is that there are bugs in the implementation causing
1074 * this to happen.
1075 *
1076 * Moreover when a new cluster is created, all the nodes start with the same
1077 * configEpoch. This collision resolution code allows nodes to automatically
1078 * end with a different configEpoch at startup automatically.
1079 *
1080 * In all the cases, we want a mechanism that resolves this issue automatically
1081 * as a safeguard. The same configuration epoch for masters serving different
1082 * set of slots is not harmful, but it is if the nodes end serving the same
1083 * slots for some reason (manual errors or software bugs) without a proper
1084 * failover procedure.
1085 *
1086 * In general we want a system that eventually always ends with different
1087 * masters having different configuration epochs whatever happened, since
1088 * nothign is worse than a split-brain condition in a distributed system.
1089 *
1090 * BEHAVIOR
1091 *
1092 * When this function gets called, what happens is that if this node
1093 * has the lexicographically smaller Node ID compared to the other node
1094 * with the conflicting epoch (the 'sender' node), it will assign itself
1095 * the greatest configuration epoch currently detected among nodes plus 1.
1096 *
1097 * This means that even if there are multiple nodes colliding, the node
1098 * with the greatest Node ID never moves forward, so eventually all the nodes
1099 * end with a different configuration epoch.
1100 */
clusterHandleConfigEpochCollision(clusterNode * sender)1101 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1102 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1103 if (sender->configEpoch != myself->configEpoch ||
1104 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1105 /* Don't act if the colliding node has a smaller Node ID. */
1106 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1107 /* Get the next ID available at the best of this node knowledge. */
1108 server.cluster->currentEpoch++;
1109 myself->configEpoch = server.cluster->currentEpoch;
1110 clusterSaveConfigOrDie(1);
1111 serverLog(LL_VERBOSE,
1112 "WARNING: configEpoch collision with node %.40s."
1113 " configEpoch set to %llu",
1114 sender->name,
1115 (unsigned long long) myself->configEpoch);
1116 }
1117
1118 /* -----------------------------------------------------------------------------
1119 * CLUSTER nodes blacklist
1120 *
1121 * The nodes blacklist is just a way to ensure that a given node with a given
1122 * Node ID is not readded before some time elapsed (this time is specified
1123 * in seconds in CLUSTER_BLACKLIST_TTL).
1124 *
1125 * This is useful when we want to remove a node from the cluster completely:
1126 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1127 * that even if we receive gossip messages from other nodes that still remember
1128 * about the node we want to remove, we don't re-add it before some time.
1129 *
1130 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1131 * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1132 * in the cluster without dealing with the problem of other nodes re-adding
1133 * back the node to nodes we already sent the FORGET command to.
1134 *
1135 * The data structure used is a hash table with an sds string representing
1136 * the node ID as key, and the time when it is ok to re-add the node as
1137 * value.
1138 * -------------------------------------------------------------------------- */
1139
1140 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1141
1142
1143 /* Before of the addNode() or Exists() operations we always remove expired
1144 * entries from the black list. This is an O(N) operation but it is not a
1145 * problem since add / exists operations are called very infrequently and
1146 * the hash table is supposed to contain very little elements at max.
1147 * However without the cleanup during long uptimes and with some automated
1148 * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1149 void clusterBlacklistCleanup(void) {
1150 dictIterator *di;
1151 dictEntry *de;
1152
1153 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1154 while((de = dictNext(di)) != NULL) {
1155 int64_t expire = dictGetUnsignedIntegerVal(de);
1156
1157 if (expire < server.unixtime)
1158 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1159 }
1160 dictReleaseIterator(di);
1161 }
1162
1163 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1164 void clusterBlacklistAddNode(clusterNode *node) {
1165 dictEntry *de;
1166 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1167
1168 clusterBlacklistCleanup();
1169 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1170 /* If the key was added, duplicate the sds string representation of
1171 * the key for the next lookup. We'll free it at the end. */
1172 id = sdsdup(id);
1173 }
1174 de = dictFind(server.cluster->nodes_black_list,id);
1175 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1176 sdsfree(id);
1177 }
1178
1179 /* Return non-zero if the specified node ID exists in the blacklist.
1180 * You don't need to pass an sds string here, any pointer to 40 bytes
1181 * will work. */
clusterBlacklistExists(char * nodeid)1182 int clusterBlacklistExists(char *nodeid) {
1183 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1184 int retval;
1185
1186 clusterBlacklistCleanup();
1187 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1188 sdsfree(id);
1189 return retval;
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * CLUSTER messages exchange - PING/PONG and gossip
1194 * -------------------------------------------------------------------------- */
1195
1196 /* This function checks if a given node should be marked as FAIL.
1197 * It happens if the following conditions are met:
1198 *
1199 * 1) We received enough failure reports from other master nodes via gossip.
1200 * Enough means that the majority of the masters signaled the node is
1201 * down recently.
1202 * 2) We believe this node is in PFAIL state.
1203 *
1204 * If a failure is detected we also inform the whole cluster about this
1205 * event trying to force every other node to set the FAIL flag for the node.
1206 *
1207 * Note that the form of agreement used here is weak, as we collect the majority
1208 * of masters state during some time, and even if we force agreement by
1209 * propagating the FAIL message, because of partitions we may not reach every
1210 * node. However:
1211 *
1212 * 1) Either we reach the majority and eventually the FAIL state will propagate
1213 * to all the cluster.
1214 * 2) Or there is no majority so no slave promotion will be authorized and the
1215 * FAIL flag will be cleared after some time.
1216 */
markNodeAsFailingIfNeeded(clusterNode * node)1217 void markNodeAsFailingIfNeeded(clusterNode *node) {
1218 int failures;
1219 int needed_quorum = (server.cluster->size / 2) + 1;
1220
1221 if (!nodeTimedOut(node)) return; /* We can reach it. */
1222 if (nodeFailed(node)) return; /* Already FAILing. */
1223
1224 failures = clusterNodeFailureReportsCount(node);
1225 /* Also count myself as a voter if I'm a master. */
1226 if (nodeIsMaster(myself)) failures++;
1227 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1228
1229 serverLog(LL_NOTICE,
1230 "Marking node %.40s as failing (quorum reached).", node->name);
1231
1232 /* Mark the node as failing. */
1233 node->flags &= ~CLUSTER_NODE_PFAIL;
1234 node->flags |= CLUSTER_NODE_FAIL;
1235 node->fail_time = mstime();
1236
1237 /* Broadcast the failing node name to everybody, forcing all the other
1238 * reachable nodes to flag the node as FAIL. */
1239 if (nodeIsMaster(myself)) clusterSendFail(node->name);
1240 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1241 }
1242
1243 /* This function is called only if a node is marked as FAIL, but we are able
1244 * to reach it again. It checks if there are the conditions to undo the FAIL
1245 * state. */
clearNodeFailureIfNeeded(clusterNode * node)1246 void clearNodeFailureIfNeeded(clusterNode *node) {
1247 mstime_t now = mstime();
1248
1249 serverAssert(nodeFailed(node));
1250
1251 /* For slaves we always clear the FAIL flag if we can contact the
1252 * node again. */
1253 if (nodeIsSlave(node) || node->numslots == 0) {
1254 serverLog(LL_NOTICE,
1255 "Clear FAIL state for node %.40s: %s is reachable again.",
1256 node->name,
1257 nodeIsSlave(node) ? "replica" : "master without slots");
1258 node->flags &= ~CLUSTER_NODE_FAIL;
1259 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1260 }
1261
1262 /* If it is a master and...
1263 * 1) The FAIL state is old enough.
1264 * 2) It is yet serving slots from our point of view (not failed over).
1265 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1266 if (nodeIsMaster(node) && node->numslots > 0 &&
1267 (now - node->fail_time) >
1268 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1269 {
1270 serverLog(LL_NOTICE,
1271 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1272 node->name);
1273 node->flags &= ~CLUSTER_NODE_FAIL;
1274 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1275 }
1276 }
1277
1278 /* Return true if we already have a node in HANDSHAKE state matching the
1279 * specified ip address and port number. This function is used in order to
1280 * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1281 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1282 dictIterator *di;
1283 dictEntry *de;
1284
1285 di = dictGetSafeIterator(server.cluster->nodes);
1286 while((de = dictNext(di)) != NULL) {
1287 clusterNode *node = dictGetVal(de);
1288
1289 if (!nodeInHandshake(node)) continue;
1290 if (!strcasecmp(node->ip,ip) &&
1291 node->port == port &&
1292 node->cport == cport) break;
1293 }
1294 dictReleaseIterator(di);
1295 return de != NULL;
1296 }
1297
1298 /* Start an handshake with the specified address if there is not one
1299 * already in progress. Returns non-zero if the handshake was actually
1300 * started. On error zero is returned and errno is set to one of the
1301 * following values:
1302 *
1303 * EAGAIN - There is already an handshake in progress for this address.
1304 * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1305 int clusterStartHandshake(char *ip, int port, int cport) {
1306 clusterNode *n;
1307 char norm_ip[NET_IP_STR_LEN];
1308 struct sockaddr_storage sa;
1309
1310 /* IP sanity check */
1311 if (inet_pton(AF_INET,ip,
1312 &(((struct sockaddr_in *)&sa)->sin_addr)))
1313 {
1314 sa.ss_family = AF_INET;
1315 } else if (inet_pton(AF_INET6,ip,
1316 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1317 {
1318 sa.ss_family = AF_INET6;
1319 } else {
1320 errno = EINVAL;
1321 return 0;
1322 }
1323
1324 /* Port sanity check */
1325 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1326 errno = EINVAL;
1327 return 0;
1328 }
1329
1330 /* Set norm_ip as the normalized string representation of the node
1331 * IP address. */
1332 memset(norm_ip,0,NET_IP_STR_LEN);
1333 if (sa.ss_family == AF_INET)
1334 inet_ntop(AF_INET,
1335 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1336 norm_ip,NET_IP_STR_LEN);
1337 else
1338 inet_ntop(AF_INET6,
1339 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1340 norm_ip,NET_IP_STR_LEN);
1341
1342 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1343 errno = EAGAIN;
1344 return 0;
1345 }
1346
1347 /* Add the node with a random address (NULL as first argument to
1348 * createClusterNode()). Everything will be fixed during the
1349 * handshake. */
1350 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1351 memcpy(n->ip,norm_ip,sizeof(n->ip));
1352 n->port = port;
1353 n->cport = cport;
1354 clusterAddNode(n);
1355 return 1;
1356 }
1357
1358 /* Process the gossip section of PING or PONG packets.
1359 * Note that this function assumes that the packet is already sanity-checked
1360 * by the caller, not in the content of the gossip section, but in the
1361 * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1362 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1363 uint16_t count = ntohs(hdr->count);
1364 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1365 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1366
1367 while(count--) {
1368 uint16_t flags = ntohs(g->flags);
1369 clusterNode *node;
1370 sds ci;
1371
1372 if (server.verbosity == LL_DEBUG) {
1373 ci = representClusterNodeFlags(sdsempty(), flags);
1374 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1375 g->nodename,
1376 g->ip,
1377 ntohs(g->port),
1378 ntohs(g->cport),
1379 ci);
1380 sdsfree(ci);
1381 }
1382
1383 /* Update our state accordingly to the gossip sections */
1384 node = clusterLookupNode(g->nodename);
1385 if (node) {
1386 /* We already know this node.
1387 Handle failure reports, only when the sender is a master. */
1388 if (sender && nodeIsMaster(sender) && node != myself) {
1389 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1390 if (clusterNodeAddFailureReport(node,sender)) {
1391 serverLog(LL_VERBOSE,
1392 "Node %.40s reported node %.40s as not reachable.",
1393 sender->name, node->name);
1394 }
1395 markNodeAsFailingIfNeeded(node);
1396 } else {
1397 if (clusterNodeDelFailureReport(node,sender)) {
1398 serverLog(LL_VERBOSE,
1399 "Node %.40s reported node %.40s is back online.",
1400 sender->name, node->name);
1401 }
1402 }
1403 }
1404
1405 /* If from our POV the node is up (no failure flags are set),
1406 * we have no pending ping for the node, nor we have failure
1407 * reports for this node, update the last pong time with the
1408 * one we see from the other nodes. */
1409 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1410 node->ping_sent == 0 &&
1411 clusterNodeFailureReportsCount(node) == 0)
1412 {
1413 mstime_t pongtime = ntohl(g->pong_received);
1414 pongtime *= 1000; /* Convert back to milliseconds. */
1415
1416 /* Replace the pong time with the received one only if
1417 * it's greater than our view but is not in the future
1418 * (with 500 milliseconds tolerance) from the POV of our
1419 * clock. */
1420 if (pongtime <= (server.mstime+500) &&
1421 pongtime > node->pong_received)
1422 {
1423 node->pong_received = pongtime;
1424 }
1425 }
1426
1427 /* If we already know this node, but it is not reachable, and
1428 * we see a different address in the gossip section of a node that
1429 * can talk with this other node, update the address, disconnect
1430 * the old link if any, so that we'll attempt to connect with the
1431 * new address. */
1432 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1433 !(flags & CLUSTER_NODE_NOADDR) &&
1434 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1435 (strcasecmp(node->ip,g->ip) ||
1436 node->port != ntohs(g->port) ||
1437 node->cport != ntohs(g->cport)))
1438 {
1439 if (node->link) freeClusterLink(node->link);
1440 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1441 node->port = ntohs(g->port);
1442 node->cport = ntohs(g->cport);
1443 node->flags &= ~CLUSTER_NODE_NOADDR;
1444 }
1445 } else {
1446 /* If it's not in NOADDR state and we don't have it, we
1447 * add it to our trusted dict with exact nodeid and flag.
1448 * Note that we cannot simply start a handshake against
1449 * this IP/PORT pairs, since IP/PORT can be reused already,
1450 * otherwise we risk joining another cluster.
1451 *
1452 * Note that we require that the sender of this gossip message
1453 * is a well known node in our cluster, otherwise we risk
1454 * joining another cluster. */
1455 if (sender &&
1456 !(flags & CLUSTER_NODE_NOADDR) &&
1457 !clusterBlacklistExists(g->nodename))
1458 {
1459 clusterNode *node;
1460 node = createClusterNode(g->nodename, flags);
1461 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1462 node->port = ntohs(g->port);
1463 node->cport = ntohs(g->cport);
1464 clusterAddNode(node);
1465 }
1466 }
1467
1468 /* Next node */
1469 g++;
1470 }
1471 }
1472
1473 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1474 * If 'announced_ip' length is non-zero, it is used instead of extracting
1475 * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1476 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1477 if (announced_ip[0] != '\0') {
1478 memcpy(buf,announced_ip,NET_IP_STR_LEN);
1479 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1480 } else {
1481 anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1482 }
1483 }
1484
1485 /* Update the node address to the IP address that can be extracted
1486 * from link->fd, or if hdr->myip is non empty, to the address the node
1487 * is announcing us. The port is taken from the packet header as well.
1488 *
1489 * If the address or port changed, disconnect the node link so that we'll
1490 * connect again to the new address.
1491 *
1492 * If the ip/port pair are already correct no operation is performed at
1493 * all.
1494 *
1495 * The function returns 0 if the node address is still the same,
1496 * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1497 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1498 clusterMsg *hdr)
1499 {
1500 char ip[NET_IP_STR_LEN] = {0};
1501 int port = ntohs(hdr->port);
1502 int cport = ntohs(hdr->cport);
1503
1504 /* We don't proceed if the link is the same as the sender link, as this
1505 * function is designed to see if the node link is consistent with the
1506 * symmetric link that is used to receive PINGs from the node.
1507 *
1508 * As a side effect this function never frees the passed 'link', so
1509 * it is safe to call during packet processing. */
1510 if (link == node->link) return 0;
1511
1512 nodeIp2String(ip,link,hdr->myip);
1513 if (node->port == port && node->cport == cport &&
1514 strcmp(ip,node->ip) == 0) return 0;
1515
1516 /* IP / port is different, update it. */
1517 memcpy(node->ip,ip,sizeof(ip));
1518 node->port = port;
1519 node->cport = cport;
1520 if (node->link) freeClusterLink(node->link);
1521 node->flags &= ~CLUSTER_NODE_NOADDR;
1522 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1523 node->name, node->ip, node->port);
1524
1525 /* Check if this is our master and we have to change the
1526 * replication target as well. */
1527 if (nodeIsSlave(myself) && myself->slaveof == node)
1528 replicationSetMaster(node->ip, node->port);
1529 return 1;
1530 }
1531
1532 /* Reconfigure the specified node 'n' as a master. This function is called when
1533 * a node that we believed to be a slave is now acting as master in order to
1534 * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1535 void clusterSetNodeAsMaster(clusterNode *n) {
1536 if (nodeIsMaster(n)) return;
1537
1538 if (n->slaveof) {
1539 clusterNodeRemoveSlave(n->slaveof,n);
1540 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1541 }
1542 n->flags &= ~CLUSTER_NODE_SLAVE;
1543 n->flags |= CLUSTER_NODE_MASTER;
1544 n->slaveof = NULL;
1545
1546 /* Update config and state. */
1547 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1548 CLUSTER_TODO_UPDATE_STATE);
1549 }
1550
1551 /* This function is called when we receive a master configuration via a
1552 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1553 * node, and the set of slots claimed under this configEpoch.
1554 *
1555 * What we do is to rebind the slots with newer configuration compared to our
1556 * local configuration, and if needed, we turn ourself into a replica of the
1557 * node (see the function comments for more info).
1558 *
1559 * The 'sender' is the node for which we received a configuration update.
1560 * Sometimes it is not actually the "Sender" of the information, like in the
1561 * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1562 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1563 int j;
1564 clusterNode *curmaster, *newmaster = NULL;
1565 /* The dirty slots list is a list of slots for which we lose the ownership
1566 * while having still keys inside. This usually happens after a failover
1567 * or after a manual cluster reconfiguration operated by the admin.
1568 *
1569 * If the update message is not able to demote a master to slave (in this
1570 * case we'll resync with the master updating the whole key space), we
1571 * need to delete all the keys in the slots we lost ownership. */
1572 uint16_t dirty_slots[CLUSTER_SLOTS];
1573 int dirty_slots_count = 0;
1574
1575 /* Here we set curmaster to this node or the node this node
1576 * replicates to if it's a slave. In the for loop we are
1577 * interested to check if slots are taken away from curmaster. */
1578 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1579
1580 if (sender == myself) {
1581 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1582 return;
1583 }
1584
1585 for (j = 0; j < CLUSTER_SLOTS; j++) {
1586 if (bitmapTestBit(slots,j)) {
1587 /* The slot is already bound to the sender of this message. */
1588 if (server.cluster->slots[j] == sender) continue;
1589
1590 /* The slot is in importing state, it should be modified only
1591 * manually via redis-trib (example: a resharding is in progress
1592 * and the migrating side slot was already closed and is advertising
1593 * a new config. We still want the slot to be closed manually). */
1594 if (server.cluster->importing_slots_from[j]) continue;
1595
1596 /* We rebind the slot to the new node claiming it if:
1597 * 1) The slot was unassigned or the new node claims it with a
1598 * greater configEpoch.
1599 * 2) We are not currently importing the slot. */
1600 if (server.cluster->slots[j] == NULL ||
1601 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1602 {
1603 /* Was this slot mine, and still contains keys? Mark it as
1604 * a dirty slot. */
1605 if (server.cluster->slots[j] == myself &&
1606 countKeysInSlot(j) &&
1607 sender != myself)
1608 {
1609 dirty_slots[dirty_slots_count] = j;
1610 dirty_slots_count++;
1611 }
1612
1613 if (server.cluster->slots[j] == curmaster)
1614 newmaster = sender;
1615 clusterDelSlot(j);
1616 clusterAddSlot(sender,j);
1617 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1618 CLUSTER_TODO_UPDATE_STATE|
1619 CLUSTER_TODO_FSYNC_CONFIG);
1620 }
1621 }
1622 }
1623
1624 /* After updating the slots configuration, don't do any actual change
1625 * in the state of the server if a module disabled Redis Cluster
1626 * keys redirections. */
1627 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1628 return;
1629
1630 /* If at least one slot was reassigned from a node to another node
1631 * with a greater configEpoch, it is possible that:
1632 * 1) We are a master left without slots. This means that we were
1633 * failed over and we should turn into a replica of the new
1634 * master.
1635 * 2) We are a slave and our master is left without slots. We need
1636 * to replicate to the new slots owner. */
1637 if (newmaster && curmaster->numslots == 0) {
1638 serverLog(LL_WARNING,
1639 "Configuration change detected. Reconfiguring myself "
1640 "as a replica of %.40s", sender->name);
1641 clusterSetMaster(sender);
1642 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1643 CLUSTER_TODO_UPDATE_STATE|
1644 CLUSTER_TODO_FSYNC_CONFIG);
1645 } else if (dirty_slots_count) {
1646 /* If we are here, we received an update message which removed
1647 * ownership for certain slots we still have keys about, but still
1648 * we are serving some slots, so this master node was not demoted to
1649 * a slave.
1650 *
1651 * In order to maintain a consistent state between keys and slots
1652 * we need to remove all the keys from the slots we lost. */
1653 for (j = 0; j < dirty_slots_count; j++)
1654 delKeysInSlot(dirty_slots[j]);
1655 }
1656 }
1657
1658 /* When this function is called, there is a packet to process starting
1659 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1660 * function should just handle the higher level stuff of processing the
1661 * packet, modifying the cluster state if needed.
1662 *
1663 * The function returns 1 if the link is still valid after the packet
1664 * was processed, otherwise 0 if the link was freed since the packet
1665 * processing lead to some inconsistency error (for instance a PONG
1666 * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1667 int clusterProcessPacket(clusterLink *link) {
1668 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1669 uint32_t totlen = ntohl(hdr->totlen);
1670 uint16_t type = ntohs(hdr->type);
1671 mstime_t now = mstime();
1672
1673 if (type < CLUSTERMSG_TYPE_COUNT)
1674 server.cluster->stats_bus_messages_received[type]++;
1675 serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1676 type, (unsigned long) totlen);
1677
1678 /* Perform sanity checks */
1679 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1680 if (totlen > sdslen(link->rcvbuf)) return 1;
1681
1682 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1683 /* Can't handle messages of different versions. */
1684 return 1;
1685 }
1686
1687 uint16_t flags = ntohs(hdr->flags);
1688 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1689 clusterNode *sender;
1690
1691 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1692 type == CLUSTERMSG_TYPE_MEET)
1693 {
1694 uint16_t count = ntohs(hdr->count);
1695 uint32_t explen; /* expected length of this packet */
1696
1697 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1698 explen += (sizeof(clusterMsgDataGossip)*count);
1699 if (totlen != explen) return 1;
1700 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1701 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1702
1703 explen += sizeof(clusterMsgDataFail);
1704 if (totlen != explen) return 1;
1705 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1706 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1707
1708 explen += sizeof(clusterMsgDataPublish) -
1709 8 +
1710 ntohl(hdr->data.publish.msg.channel_len) +
1711 ntohl(hdr->data.publish.msg.message_len);
1712 if (totlen != explen) return 1;
1713 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1714 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1715 type == CLUSTERMSG_TYPE_MFSTART)
1716 {
1717 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1718
1719 if (totlen != explen) return 1;
1720 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1721 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1722
1723 explen += sizeof(clusterMsgDataUpdate);
1724 if (totlen != explen) return 1;
1725 } else if (type == CLUSTERMSG_TYPE_MODULE) {
1726 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1727
1728 explen += sizeof(clusterMsgDataPublish) -
1729 3 + ntohl(hdr->data.module.msg.len);
1730 if (totlen != explen) return 1;
1731 }
1732
1733 /* Check if the sender is a known node. */
1734 sender = clusterLookupNode(hdr->sender);
1735
1736 /* Update the last time we saw any data from this node. We
1737 * use this in order to avoid detecting a timeout from a node that
1738 * is just sending a lot of data in the cluster bus, for instance
1739 * because of Pub/Sub. */
1740 if (sender) sender->data_received = now;
1741
1742 if (sender && !nodeInHandshake(sender)) {
1743 /* Update our curretEpoch if we see a newer epoch in the cluster. */
1744 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1745 senderConfigEpoch = ntohu64(hdr->configEpoch);
1746 if (senderCurrentEpoch > server.cluster->currentEpoch)
1747 server.cluster->currentEpoch = senderCurrentEpoch;
1748 /* Update the sender configEpoch if it is publishing a newer one. */
1749 if (senderConfigEpoch > sender->configEpoch) {
1750 sender->configEpoch = senderConfigEpoch;
1751 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1752 CLUSTER_TODO_FSYNC_CONFIG);
1753 }
1754 /* Update the replication offset info for this node. */
1755 sender->repl_offset = ntohu64(hdr->offset);
1756 sender->repl_offset_time = now;
1757 /* If we are a slave performing a manual failover and our master
1758 * sent its offset while already paused, populate the MF state. */
1759 if (server.cluster->mf_end &&
1760 nodeIsSlave(myself) &&
1761 myself->slaveof == sender &&
1762 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1763 server.cluster->mf_master_offset == 0)
1764 {
1765 server.cluster->mf_master_offset = sender->repl_offset;
1766 serverLog(LL_WARNING,
1767 "Received replication offset for paused "
1768 "master manual failover: %lld",
1769 server.cluster->mf_master_offset);
1770 }
1771 }
1772
1773 /* Initial processing of PING and MEET requests replying with a PONG. */
1774 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1775 serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1776
1777 /* We use incoming MEET messages in order to set the address
1778 * for 'myself', since only other cluster nodes will send us
1779 * MEET messages on handshakes, when the cluster joins, or
1780 * later if we changed address, and those nodes will use our
1781 * official address to connect to us. So by obtaining this address
1782 * from the socket is a simple way to discover / update our own
1783 * address in the cluster without it being hardcoded in the config.
1784 *
1785 * However if we don't have an address at all, we update the address
1786 * even with a normal PING packet. If it's wrong it will be fixed
1787 * by MEET later. */
1788 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1789 server.cluster_announce_ip == NULL)
1790 {
1791 char ip[NET_IP_STR_LEN];
1792
1793 if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1794 strcmp(ip,myself->ip))
1795 {
1796 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1797 serverLog(LL_WARNING,"IP address for this node updated to %s",
1798 myself->ip);
1799 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1800 }
1801 }
1802
1803 /* Add this node if it is new for us and the msg type is MEET.
1804 * In this stage we don't try to add the node with the right
1805 * flags, slaveof pointer, and so forth, as this details will be
1806 * resolved when we'll receive PONGs from the node. */
1807 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1808 clusterNode *node;
1809
1810 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1811 nodeIp2String(node->ip,link,hdr->myip);
1812 node->port = ntohs(hdr->port);
1813 node->cport = ntohs(hdr->cport);
1814 clusterAddNode(node);
1815 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1816 }
1817
1818 /* If this is a MEET packet from an unknown node, we still process
1819 * the gossip section here since we have to trust the sender because
1820 * of the message type. */
1821 if (!sender && type == CLUSTERMSG_TYPE_MEET)
1822 clusterProcessGossipSection(hdr,link);
1823
1824 /* Anyway reply with a PONG */
1825 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1826 }
1827
1828 /* PING, PONG, MEET: process config information. */
1829 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1830 type == CLUSTERMSG_TYPE_MEET)
1831 {
1832 serverLog(LL_DEBUG,"%s packet received: %p",
1833 type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1834 (void*)link->node);
1835 if (link->node) {
1836 if (nodeInHandshake(link->node)) {
1837 /* If we already have this node, try to change the
1838 * IP/port of the node with the new one. */
1839 if (sender) {
1840 serverLog(LL_VERBOSE,
1841 "Handshake: we already know node %.40s, "
1842 "updating the address if needed.", sender->name);
1843 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1844 {
1845 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1846 CLUSTER_TODO_UPDATE_STATE);
1847 }
1848 /* Free this node as we already have it. This will
1849 * cause the link to be freed as well. */
1850 clusterDelNode(link->node);
1851 return 0;
1852 }
1853
1854 /* First thing to do is replacing the random name with the
1855 * right node name if this was a handshake stage. */
1856 clusterRenameNode(link->node, hdr->sender);
1857 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1858 link->node->name);
1859 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1860 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1861 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1862 } else if (memcmp(link->node->name,hdr->sender,
1863 CLUSTER_NAMELEN) != 0)
1864 {
1865 /* If the reply has a non matching node ID we
1866 * disconnect this node and set it as not having an associated
1867 * address. */
1868 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1869 link->node->name,
1870 (int)(now-(link->node->ctime)),
1871 link->node->flags);
1872 link->node->flags |= CLUSTER_NODE_NOADDR;
1873 link->node->ip[0] = '\0';
1874 link->node->port = 0;
1875 link->node->cport = 0;
1876 freeClusterLink(link);
1877 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1878 return 0;
1879 }
1880 }
1881
1882 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1883 * announced. This is a dynamic flag that we receive from the
1884 * sender, and the latest status must be trusted. We need it to
1885 * be propagated because the slave ranking used to understand the
1886 * delay of each slave in the voting process, needs to know
1887 * what are the instances really competing. */
1888 if (sender) {
1889 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1890 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1891 sender->flags |= nofailover;
1892 }
1893
1894 /* Update the node address if it changed. */
1895 if (sender && type == CLUSTERMSG_TYPE_PING &&
1896 !nodeInHandshake(sender) &&
1897 nodeUpdateAddressIfNeeded(sender,link,hdr))
1898 {
1899 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1900 CLUSTER_TODO_UPDATE_STATE);
1901 }
1902
1903 /* Update our info about the node */
1904 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1905 link->node->pong_received = now;
1906 link->node->ping_sent = 0;
1907
1908 /* The PFAIL condition can be reversed without external
1909 * help if it is momentary (that is, if it does not
1910 * turn into a FAIL state).
1911 *
1912 * The FAIL condition is also reversible under specific
1913 * conditions detected by clearNodeFailureIfNeeded(). */
1914 if (nodeTimedOut(link->node)) {
1915 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1916 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1917 CLUSTER_TODO_UPDATE_STATE);
1918 } else if (nodeFailed(link->node)) {
1919 clearNodeFailureIfNeeded(link->node);
1920 }
1921 }
1922
1923 /* Check for role switch: slave -> master or master -> slave. */
1924 if (sender) {
1925 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1926 sizeof(hdr->slaveof)))
1927 {
1928 /* Node is a master. */
1929 clusterSetNodeAsMaster(sender);
1930 } else {
1931 /* Node is a slave. */
1932 clusterNode *master = clusterLookupNode(hdr->slaveof);
1933
1934 if (nodeIsMaster(sender)) {
1935 /* Master turned into a slave! Reconfigure the node. */
1936 clusterDelNodeSlots(sender);
1937 sender->flags &= ~(CLUSTER_NODE_MASTER|
1938 CLUSTER_NODE_MIGRATE_TO);
1939 sender->flags |= CLUSTER_NODE_SLAVE;
1940
1941 /* Update config and state. */
1942 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1943 CLUSTER_TODO_UPDATE_STATE);
1944 }
1945
1946 /* Master node changed for this slave? */
1947 if (master && sender->slaveof != master) {
1948 if (sender->slaveof)
1949 clusterNodeRemoveSlave(sender->slaveof,sender);
1950 clusterNodeAddSlave(master,sender);
1951 sender->slaveof = master;
1952
1953 /* Update config. */
1954 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1955 }
1956 }
1957 }
1958
1959 /* Update our info about served slots.
1960 *
1961 * Note: this MUST happen after we update the master/slave state
1962 * so that CLUSTER_NODE_MASTER flag will be set. */
1963
1964 /* Many checks are only needed if the set of served slots this
1965 * instance claims is different compared to the set of slots we have
1966 * for it. Check this ASAP to avoid other computational expansive
1967 * checks later. */
1968 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1969 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1970
1971 if (sender) {
1972 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1973 if (sender_master) {
1974 dirty_slots = memcmp(sender_master->slots,
1975 hdr->myslots,sizeof(hdr->myslots)) != 0;
1976 }
1977 }
1978
1979 /* 1) If the sender of the message is a master, and we detected that
1980 * the set of slots it claims changed, scan the slots to see if we
1981 * need to update our configuration. */
1982 if (sender && nodeIsMaster(sender) && dirty_slots)
1983 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1984
1985 /* 2) We also check for the reverse condition, that is, the sender
1986 * claims to serve slots we know are served by a master with a
1987 * greater configEpoch. If this happens we inform the sender.
1988 *
1989 * This is useful because sometimes after a partition heals, a
1990 * reappearing master may be the last one to claim a given set of
1991 * hash slots, but with a configuration that other instances know to
1992 * be deprecated. Example:
1993 *
1994 * A and B are master and slave for slots 1,2,3.
1995 * A is partitioned away, B gets promoted.
1996 * B is partitioned away, and A returns available.
1997 *
1998 * Usually B would PING A publishing its set of served slots and its
1999 * configEpoch, but because of the partition B can't inform A of the
2000 * new configuration, so other nodes that have an updated table must
2001 * do it. In this way A will stop to act as a master (or can try to
2002 * failover if there are the conditions to win the election). */
2003 if (sender && dirty_slots) {
2004 int j;
2005
2006 for (j = 0; j < CLUSTER_SLOTS; j++) {
2007 if (bitmapTestBit(hdr->myslots,j)) {
2008 if (server.cluster->slots[j] == sender ||
2009 server.cluster->slots[j] == NULL) continue;
2010 if (server.cluster->slots[j]->configEpoch >
2011 senderConfigEpoch)
2012 {
2013 serverLog(LL_VERBOSE,
2014 "Node %.40s has old slots configuration, sending "
2015 "an UPDATE message about %.40s",
2016 sender->name, server.cluster->slots[j]->name);
2017 clusterSendUpdate(sender->link,
2018 server.cluster->slots[j]);
2019
2020 /* TODO: instead of exiting the loop send every other
2021 * UPDATE packet for other nodes that are the new owner
2022 * of sender's slots. */
2023 break;
2024 }
2025 }
2026 }
2027 }
2028
2029 /* If our config epoch collides with the sender's try to fix
2030 * the problem. */
2031 if (sender &&
2032 nodeIsMaster(myself) && nodeIsMaster(sender) &&
2033 senderConfigEpoch == myself->configEpoch)
2034 {
2035 clusterHandleConfigEpochCollision(sender);
2036 }
2037
2038 /* Get info from the gossip section */
2039 if (sender) clusterProcessGossipSection(hdr,link);
2040 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2041 clusterNode *failing;
2042
2043 if (sender) {
2044 failing = clusterLookupNode(hdr->data.fail.about.nodename);
2045 if (failing &&
2046 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2047 {
2048 serverLog(LL_NOTICE,
2049 "FAIL message received from %.40s about %.40s",
2050 hdr->sender, hdr->data.fail.about.nodename);
2051 failing->flags |= CLUSTER_NODE_FAIL;
2052 failing->fail_time = now;
2053 failing->flags &= ~CLUSTER_NODE_PFAIL;
2054 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2055 CLUSTER_TODO_UPDATE_STATE);
2056 }
2057 } else {
2058 serverLog(LL_NOTICE,
2059 "Ignoring FAIL message from unknown node %.40s about %.40s",
2060 hdr->sender, hdr->data.fail.about.nodename);
2061 }
2062 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2063 robj *channel, *message;
2064 uint32_t channel_len, message_len;
2065
2066 /* Don't bother creating useless objects if there are no
2067 * Pub/Sub subscribers. */
2068 if (dictSize(server.pubsub_channels) ||
2069 listLength(server.pubsub_patterns))
2070 {
2071 channel_len = ntohl(hdr->data.publish.msg.channel_len);
2072 message_len = ntohl(hdr->data.publish.msg.message_len);
2073 channel = createStringObject(
2074 (char*)hdr->data.publish.msg.bulk_data,channel_len);
2075 message = createStringObject(
2076 (char*)hdr->data.publish.msg.bulk_data+channel_len,
2077 message_len);
2078 pubsubPublishMessage(channel,message);
2079 decrRefCount(channel);
2080 decrRefCount(message);
2081 }
2082 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2083 if (!sender) return 1; /* We don't know that node. */
2084 clusterSendFailoverAuthIfNeeded(sender,hdr);
2085 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2086 if (!sender) return 1; /* We don't know that node. */
2087 /* We consider this vote only if the sender is a master serving
2088 * a non zero number of slots, and its currentEpoch is greater or
2089 * equal to epoch where this node started the election. */
2090 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2091 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2092 {
2093 server.cluster->failover_auth_count++;
2094 /* Maybe we reached a quorum here, set a flag to make sure
2095 * we check ASAP. */
2096 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2097 }
2098 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2099 /* This message is acceptable only if I'm a master and the sender
2100 * is one of my slaves. */
2101 if (!sender || sender->slaveof != myself) return 1;
2102 /* Manual failover requested from slaves. Initialize the state
2103 * accordingly. */
2104 resetManualFailover();
2105 server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2106 server.cluster->mf_slave = sender;
2107 pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
2108 serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2109 sender->name);
2110 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2111 clusterNode *n; /* The node the update is about. */
2112 uint64_t reportedConfigEpoch =
2113 ntohu64(hdr->data.update.nodecfg.configEpoch);
2114
2115 if (!sender) return 1; /* We don't know the sender. */
2116 n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2117 if (!n) return 1; /* We don't know the reported node. */
2118 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2119
2120 /* If in our current config the node is a slave, set it as a master. */
2121 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2122
2123 /* Update the node's configEpoch. */
2124 n->configEpoch = reportedConfigEpoch;
2125 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2126 CLUSTER_TODO_FSYNC_CONFIG);
2127
2128 /* Check the bitmap of served slots and update our
2129 * config accordingly. */
2130 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2131 hdr->data.update.nodecfg.slots);
2132 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2133 if (!sender) return 1; /* Protect the module from unknown nodes. */
2134 /* We need to route this message back to the right module subscribed
2135 * for the right message type. */
2136 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2137 uint32_t len = ntohl(hdr->data.module.msg.len);
2138 uint8_t type = hdr->data.module.msg.type;
2139 unsigned char *payload = hdr->data.module.msg.bulk_data;
2140 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2141 } else {
2142 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2143 }
2144 return 1;
2145 }
2146
2147 /* This function is called when we detect the link with this node is lost.
2148 We set the node as no longer connected. The Cluster Cron will detect
2149 this connection and will try to get it connected again.
2150
2151 Instead if the node is a temporary node used to accept a query, we
2152 completely free the node on error. */
handleLinkIOError(clusterLink * link)2153 void handleLinkIOError(clusterLink *link) {
2154 freeClusterLink(link);
2155 }
2156
2157 /* Send data. This is handled using a trivial send buffer that gets
2158 * consumed by write(). We don't try to optimize this for speed too much
2159 * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)2160 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2161 clusterLink *link = (clusterLink*) privdata;
2162 ssize_t nwritten;
2163 UNUSED(el);
2164 UNUSED(mask);
2165
2166 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2167 if (nwritten <= 0) {
2168 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2169 (nwritten == -1) ? strerror(errno) : "short write");
2170 handleLinkIOError(link);
2171 return;
2172 }
2173 sdsrange(link->sndbuf,nwritten,-1);
2174 if (sdslen(link->sndbuf) == 0)
2175 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2176 }
2177
2178 /* Read data. Try to read the first field of the header first to check the
2179 * full length of the packet. When a whole packet is in memory this function
2180 * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2181 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2182 char buf[sizeof(clusterMsg)];
2183 ssize_t nread;
2184 clusterMsg *hdr;
2185 clusterLink *link = (clusterLink*) privdata;
2186 unsigned int readlen, rcvbuflen;
2187 UNUSED(el);
2188 UNUSED(mask);
2189
2190 while(1) { /* Read as long as there is data to read. */
2191 rcvbuflen = sdslen(link->rcvbuf);
2192 if (rcvbuflen < 8) {
2193 /* First, obtain the first 8 bytes to get the full message
2194 * length. */
2195 readlen = 8 - rcvbuflen;
2196 } else {
2197 /* Finally read the full message. */
2198 hdr = (clusterMsg*) link->rcvbuf;
2199 if (rcvbuflen == 8) {
2200 /* Perform some sanity check on the message signature
2201 * and length. */
2202 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2203 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2204 {
2205 serverLog(LL_WARNING,
2206 "Bad message length or signature received "
2207 "from Cluster bus.");
2208 handleLinkIOError(link);
2209 return;
2210 }
2211 }
2212 readlen = ntohl(hdr->totlen) - rcvbuflen;
2213 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2214 }
2215
2216 nread = read(fd,buf,readlen);
2217 if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2218
2219 if (nread <= 0) {
2220 /* I/O error... */
2221 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2222 (nread == 0) ? "connection closed" : strerror(errno));
2223 handleLinkIOError(link);
2224 return;
2225 } else {
2226 /* Read data and recast the pointer to the new buffer. */
2227 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2228 hdr = (clusterMsg*) link->rcvbuf;
2229 rcvbuflen += nread;
2230 }
2231
2232 /* Total length obtained? Process this packet. */
2233 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2234 if (clusterProcessPacket(link)) {
2235 sdsfree(link->rcvbuf);
2236 link->rcvbuf = sdsempty();
2237 } else {
2238 return; /* Link no longer valid. */
2239 }
2240 }
2241 }
2242 }
2243
2244 /* Put stuff into the send buffer.
2245 *
2246 * It is guaranteed that this function will never have as a side effect
2247 * the link to be invalidated, so it is safe to call this function
2248 * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2249 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2250 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2251 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
2252 clusterWriteHandler,link);
2253
2254 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2255
2256 /* Populate sent messages stats. */
2257 clusterMsg *hdr = (clusterMsg*) msg;
2258 uint16_t type = ntohs(hdr->type);
2259 if (type < CLUSTERMSG_TYPE_COUNT)
2260 server.cluster->stats_bus_messages_sent[type]++;
2261 }
2262
2263 /* Send a message to all the nodes that are part of the cluster having
2264 * a connected link.
2265 *
2266 * It is guaranteed that this function will never have as a side effect
2267 * some node->link to be invalidated, so it is safe to call this function
2268 * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2269 void clusterBroadcastMessage(void *buf, size_t len) {
2270 dictIterator *di;
2271 dictEntry *de;
2272
2273 di = dictGetSafeIterator(server.cluster->nodes);
2274 while((de = dictNext(di)) != NULL) {
2275 clusterNode *node = dictGetVal(de);
2276
2277 if (!node->link) continue;
2278 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2279 continue;
2280 clusterSendMessage(node->link,buf,len);
2281 }
2282 dictReleaseIterator(di);
2283 }
2284
2285 /* Build the message header. hdr must point to a buffer at least
2286 * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2287 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2288 int totlen = 0;
2289 uint64_t offset;
2290 clusterNode *master;
2291
2292 /* If this node is a master, we send its slots bitmap and configEpoch.
2293 * If this node is a slave we send the master's information instead (the
2294 * node is flagged as slave so the receiver knows that it is NOT really
2295 * in charge for this slots. */
2296 master = (nodeIsSlave(myself) && myself->slaveof) ?
2297 myself->slaveof : myself;
2298
2299 memset(hdr,0,sizeof(*hdr));
2300 hdr->ver = htons(CLUSTER_PROTO_VER);
2301 hdr->sig[0] = 'R';
2302 hdr->sig[1] = 'C';
2303 hdr->sig[2] = 'm';
2304 hdr->sig[3] = 'b';
2305 hdr->type = htons(type);
2306 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2307
2308 /* If cluster-announce-ip option is enabled, force the receivers of our
2309 * packets to use the specified address for this node. Otherwise if the
2310 * first byte is zero, they'll do auto discovery. */
2311 memset(hdr->myip,0,NET_IP_STR_LEN);
2312 if (server.cluster_announce_ip) {
2313 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2314 hdr->myip[NET_IP_STR_LEN-1] = '\0';
2315 }
2316
2317 /* Handle cluster-announce-port as well. */
2318 int announced_port = server.cluster_announce_port ?
2319 server.cluster_announce_port : server.port;
2320 int announced_cport = server.cluster_announce_bus_port ?
2321 server.cluster_announce_bus_port :
2322 (server.port + CLUSTER_PORT_INCR);
2323
2324 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2325 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2326 if (myself->slaveof != NULL)
2327 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2328 hdr->port = htons(announced_port);
2329 hdr->cport = htons(announced_cport);
2330 hdr->flags = htons(myself->flags);
2331 hdr->state = server.cluster->state;
2332
2333 /* Set the currentEpoch and configEpochs. */
2334 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2335 hdr->configEpoch = htonu64(master->configEpoch);
2336
2337 /* Set the replication offset. */
2338 if (nodeIsSlave(myself))
2339 offset = replicationGetSlaveOffset();
2340 else
2341 offset = server.master_repl_offset;
2342 hdr->offset = htonu64(offset);
2343
2344 /* Set the message flags. */
2345 if (nodeIsMaster(myself) && server.cluster->mf_end)
2346 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2347
2348 /* Compute the message length for certain messages. For other messages
2349 * this is up to the caller. */
2350 if (type == CLUSTERMSG_TYPE_FAIL) {
2351 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2352 totlen += sizeof(clusterMsgDataFail);
2353 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2354 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2355 totlen += sizeof(clusterMsgDataUpdate);
2356 }
2357 hdr->totlen = htonl(totlen);
2358 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2359 }
2360
2361 /* Return non zero if the node is already present in the gossip section of the
2362 * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2363 * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2364 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2365 int j;
2366 for (j = 0; j < count; j++) {
2367 if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2368 CLUSTER_NAMELEN) == 0) break;
2369 }
2370 return j != count;
2371 }
2372
2373 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2374 * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2375 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2376 clusterMsgDataGossip *gossip;
2377 gossip = &(hdr->data.ping.gossip[i]);
2378 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2379 gossip->ping_sent = htonl(n->ping_sent/1000);
2380 gossip->pong_received = htonl(n->pong_received/1000);
2381 memcpy(gossip->ip,n->ip,sizeof(n->ip));
2382 gossip->port = htons(n->port);
2383 gossip->cport = htons(n->cport);
2384 gossip->flags = htons(n->flags);
2385 gossip->notused1 = 0;
2386 }
2387
2388 /* Send a PING or PONG packet to the specified node, making sure to add enough
2389 * gossip informations. */
clusterSendPing(clusterLink * link,int type)2390 void clusterSendPing(clusterLink *link, int type) {
2391 unsigned char *buf;
2392 clusterMsg *hdr;
2393 int gossipcount = 0; /* Number of gossip sections added so far. */
2394 int wanted; /* Number of gossip sections we want to append if possible. */
2395 int totlen; /* Total packet length. */
2396 /* freshnodes is the max number of nodes we can hope to append at all:
2397 * nodes available minus two (ourself and the node we are sending the
2398 * message to). However practically there may be less valid nodes since
2399 * nodes in handshake state, disconnected, are not considered. */
2400 int freshnodes = dictSize(server.cluster->nodes)-2;
2401
2402 /* How many gossip sections we want to add? 1/10 of the number of nodes
2403 * and anyway at least 3. Why 1/10?
2404 *
2405 * If we have N masters, with N/10 entries, and we consider that in
2406 * node_timeout we exchange with each other node at least 4 packets
2407 * (we ping in the worst case in node_timeout/2 time, and we also
2408 * receive two pings from the host), we have a total of 8 packets
2409 * in the node_timeout*2 falure reports validity time. So we have
2410 * that, for a single PFAIL node, we can expect to receive the following
2411 * number of failure reports (in the specified window of time):
2412 *
2413 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2414 *
2415 * PROB = probability of being featured in a single gossip entry,
2416 * which is 1 / NUM_OF_NODES.
2417 * ENTRIES = 10.
2418 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2419 *
2420 * If we assume we have just masters (so num of nodes and num of masters
2421 * is the same), with 1/10 we always get over the majority, and specifically
2422 * 80% of the number of nodes, to account for many masters failing at the
2423 * same time.
2424 *
2425 * Since we have non-voting slaves that lower the probability of an entry
2426 * to feature our node, we set the number of entries per packet as
2427 * 10% of the total nodes we have. */
2428 wanted = floor(dictSize(server.cluster->nodes)/10);
2429 if (wanted < 3) wanted = 3;
2430 if (wanted > freshnodes) wanted = freshnodes;
2431
2432 /* Include all the nodes in PFAIL state, so that failure reports are
2433 * faster to propagate to go from PFAIL to FAIL state. */
2434 int pfail_wanted = server.cluster->stats_pfail_nodes;
2435
2436 /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2437 * later according to the number of gossip sections we really were able
2438 * to put inside the packet. */
2439 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2440 totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2441 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2442 * sizeof(clusterMsg) or more. */
2443 if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2444 buf = zcalloc(totlen);
2445 hdr = (clusterMsg*) buf;
2446
2447 /* Populate the header. */
2448 if (link->node && type == CLUSTERMSG_TYPE_PING)
2449 link->node->ping_sent = mstime();
2450 clusterBuildMessageHdr(hdr,type);
2451
2452 /* Populate the gossip fields */
2453 int maxiterations = wanted*3;
2454 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2455 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2456 clusterNode *this = dictGetVal(de);
2457
2458 /* Don't include this node: the whole packet header is about us
2459 * already, so we just gossip about other nodes. */
2460 if (this == myself) continue;
2461
2462 /* PFAIL nodes will be added later. */
2463 if (this->flags & CLUSTER_NODE_PFAIL) continue;
2464
2465 /* In the gossip section don't include:
2466 * 1) Nodes in HANDSHAKE state.
2467 * 3) Nodes with the NOADDR flag set.
2468 * 4) Disconnected nodes if they don't have configured slots.
2469 */
2470 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2471 (this->link == NULL && this->numslots == 0))
2472 {
2473 freshnodes--; /* Tecnically not correct, but saves CPU. */
2474 continue;
2475 }
2476
2477 /* Do not add a node we already have. */
2478 if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2479
2480 /* Add it */
2481 clusterSetGossipEntry(hdr,gossipcount,this);
2482 freshnodes--;
2483 gossipcount++;
2484 }
2485
2486 /* If there are PFAIL nodes, add them at the end. */
2487 if (pfail_wanted) {
2488 dictIterator *di;
2489 dictEntry *de;
2490
2491 di = dictGetSafeIterator(server.cluster->nodes);
2492 while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2493 clusterNode *node = dictGetVal(de);
2494 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2495 if (node->flags & CLUSTER_NODE_NOADDR) continue;
2496 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2497 clusterSetGossipEntry(hdr,gossipcount,node);
2498 freshnodes--;
2499 gossipcount++;
2500 /* We take the count of the slots we allocated, since the
2501 * PFAIL stats may not match perfectly with the current number
2502 * of PFAIL nodes. */
2503 pfail_wanted--;
2504 }
2505 dictReleaseIterator(di);
2506 }
2507
2508 /* Ready to send... fix the totlen fiend and queue the message in the
2509 * output buffer. */
2510 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2511 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2512 hdr->count = htons(gossipcount);
2513 hdr->totlen = htonl(totlen);
2514 clusterSendMessage(link,buf,totlen);
2515 zfree(buf);
2516 }
2517
2518 /* Send a PONG packet to every connected node that's not in handshake state
2519 * and for which we have a valid link.
2520 *
2521 * In Redis Cluster pongs are not used just for failure detection, but also
2522 * to carry important configuration information. So broadcasting a pong is
2523 * useful when something changes in the configuration and we want to make
2524 * the cluster aware ASAP (for instance after a slave promotion).
2525 *
2526 * The 'target' argument specifies the receiving instances using the
2527 * defines below:
2528 *
2529 * CLUSTER_BROADCAST_ALL -> All known instances.
2530 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2531 */
2532 #define CLUSTER_BROADCAST_ALL 0
2533 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2534 void clusterBroadcastPong(int target) {
2535 dictIterator *di;
2536 dictEntry *de;
2537
2538 di = dictGetSafeIterator(server.cluster->nodes);
2539 while((de = dictNext(di)) != NULL) {
2540 clusterNode *node = dictGetVal(de);
2541
2542 if (!node->link) continue;
2543 if (node == myself || nodeInHandshake(node)) continue;
2544 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2545 int local_slave =
2546 nodeIsSlave(node) && node->slaveof &&
2547 (node->slaveof == myself || node->slaveof == myself->slaveof);
2548 if (!local_slave) continue;
2549 }
2550 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2551 }
2552 dictReleaseIterator(di);
2553 }
2554
2555 /* Send a PUBLISH message.
2556 *
2557 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2558 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2559 unsigned char buf[sizeof(clusterMsg)], *payload;
2560 clusterMsg *hdr = (clusterMsg*) buf;
2561 uint32_t totlen;
2562 uint32_t channel_len, message_len;
2563
2564 channel = getDecodedObject(channel);
2565 message = getDecodedObject(message);
2566 channel_len = sdslen(channel->ptr);
2567 message_len = sdslen(message->ptr);
2568
2569 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2570 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2571 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2572
2573 hdr->data.publish.msg.channel_len = htonl(channel_len);
2574 hdr->data.publish.msg.message_len = htonl(message_len);
2575 hdr->totlen = htonl(totlen);
2576
2577 /* Try to use the local buffer if possible */
2578 if (totlen < sizeof(buf)) {
2579 payload = buf;
2580 } else {
2581 payload = zmalloc(totlen);
2582 memcpy(payload,hdr,sizeof(*hdr));
2583 hdr = (clusterMsg*) payload;
2584 }
2585 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2586 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2587 message->ptr,sdslen(message->ptr));
2588
2589 if (link)
2590 clusterSendMessage(link,payload,totlen);
2591 else
2592 clusterBroadcastMessage(payload,totlen);
2593
2594 decrRefCount(channel);
2595 decrRefCount(message);
2596 if (payload != buf) zfree(payload);
2597 }
2598
2599 /* Send a FAIL message to all the nodes we are able to contact.
2600 * The FAIL message is sent when we detect that a node is failing
2601 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2602 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2603 * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2604 void clusterSendFail(char *nodename) {
2605 unsigned char buf[sizeof(clusterMsg)];
2606 clusterMsg *hdr = (clusterMsg*) buf;
2607
2608 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2609 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2610 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2611 }
2612
2613 /* Send an UPDATE message to the specified link carrying the specified 'node'
2614 * slots configuration. The node name, slots bitmap, and configEpoch info
2615 * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2616 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2617 unsigned char buf[sizeof(clusterMsg)];
2618 clusterMsg *hdr = (clusterMsg*) buf;
2619
2620 if (link == NULL) return;
2621 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2622 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2623 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2624 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2625 clusterSendMessage(link,buf,ntohl(hdr->totlen));
2626 }
2627
2628 /* Send a MODULE message.
2629 *
2630 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2631 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2632 unsigned char *payload, uint32_t len) {
2633 unsigned char buf[sizeof(clusterMsg)], *heapbuf;
2634 clusterMsg *hdr = (clusterMsg*) buf;
2635 uint32_t totlen;
2636
2637 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2638 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2639 totlen += sizeof(clusterMsgModule) - 3 + len;
2640
2641 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2642 hdr->data.module.msg.type = type;
2643 hdr->data.module.msg.len = htonl(len);
2644 hdr->totlen = htonl(totlen);
2645
2646 /* Try to use the local buffer if possible */
2647 if (totlen < sizeof(buf)) {
2648 heapbuf = buf;
2649 } else {
2650 heapbuf = zmalloc(totlen);
2651 memcpy(heapbuf,hdr,sizeof(*hdr));
2652 hdr = (clusterMsg*) heapbuf;
2653 }
2654 memcpy(hdr->data.module.msg.bulk_data,payload,len);
2655
2656 if (link)
2657 clusterSendMessage(link,heapbuf,totlen);
2658 else
2659 clusterBroadcastMessage(heapbuf,totlen);
2660
2661 if (heapbuf != buf) zfree(heapbuf);
2662 }
2663
2664 /* This function gets a cluster node ID string as target, the same way the nodes
2665 * addresses are represented in the modules side, resolves the node, and sends
2666 * the message. If the target is NULL the message is broadcasted.
2667 *
2668 * The function returns C_OK if the target is valid, otherwise C_ERR is
2669 * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2670 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2671 clusterNode *node = NULL;
2672
2673 if (target != NULL) {
2674 node = clusterLookupNode(target);
2675 if (node == NULL || node->link == NULL) return C_ERR;
2676 }
2677
2678 clusterSendModule(target ? node->link : NULL,
2679 module_id, type, payload, len);
2680 return C_OK;
2681 }
2682
2683 /* -----------------------------------------------------------------------------
2684 * CLUSTER Pub/Sub support
2685 *
2686 * For now we do very little, just propagating PUBLISH messages across the whole
2687 * cluster. In the future we'll try to get smarter and avoiding propagating those
2688 * messages to hosts without receives for a given channel.
2689 * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2690 void clusterPropagatePublish(robj *channel, robj *message) {
2691 clusterSendPublish(NULL, channel, message);
2692 }
2693
2694 /* -----------------------------------------------------------------------------
2695 * SLAVE node specific functions
2696 * -------------------------------------------------------------------------- */
2697
2698 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2699 * see if there is the quorum for this slave instance to failover its failing
2700 * master.
2701 *
2702 * Note that we send the failover request to everybody, master and slave nodes,
2703 * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2704 void clusterRequestFailoverAuth(void) {
2705 unsigned char buf[sizeof(clusterMsg)];
2706 clusterMsg *hdr = (clusterMsg*) buf;
2707 uint32_t totlen;
2708
2709 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2710 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2711 * in the header to communicate the nodes receiving the message that
2712 * they should authorized the failover even if the master is working. */
2713 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2714 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2715 hdr->totlen = htonl(totlen);
2716 clusterBroadcastMessage(buf,totlen);
2717 }
2718
2719 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2720 void clusterSendFailoverAuth(clusterNode *node) {
2721 unsigned char buf[sizeof(clusterMsg)];
2722 clusterMsg *hdr = (clusterMsg*) buf;
2723 uint32_t totlen;
2724
2725 if (!node->link) return;
2726 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2727 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2728 hdr->totlen = htonl(totlen);
2729 clusterSendMessage(node->link,buf,totlen);
2730 }
2731
2732 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2733 void clusterSendMFStart(clusterNode *node) {
2734 unsigned char buf[sizeof(clusterMsg)];
2735 clusterMsg *hdr = (clusterMsg*) buf;
2736 uint32_t totlen;
2737
2738 if (!node->link) return;
2739 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2740 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2741 hdr->totlen = htonl(totlen);
2742 clusterSendMessage(node->link,buf,totlen);
2743 }
2744
2745 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2746 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2747 clusterNode *master = node->slaveof;
2748 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2749 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2750 unsigned char *claimed_slots = request->myslots;
2751 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2752 int j;
2753
2754 /* IF we are not a master serving at least 1 slot, we don't have the
2755 * right to vote, as the cluster size in Redis Cluster is the number
2756 * of masters serving at least one slot, and quorum is the cluster
2757 * size + 1 */
2758 if (nodeIsSlave(myself) || myself->numslots == 0) return;
2759
2760 /* Request epoch must be >= our currentEpoch.
2761 * Note that it is impossible for it to actually be greater since
2762 * our currentEpoch was updated as a side effect of receiving this
2763 * request, if the request epoch was greater. */
2764 if (requestCurrentEpoch < server.cluster->currentEpoch) {
2765 serverLog(LL_WARNING,
2766 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2767 node->name,
2768 (unsigned long long) requestCurrentEpoch,
2769 (unsigned long long) server.cluster->currentEpoch);
2770 return;
2771 }
2772
2773 /* I already voted for this epoch? Return ASAP. */
2774 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2775 serverLog(LL_WARNING,
2776 "Failover auth denied to %.40s: already voted for epoch %llu",
2777 node->name,
2778 (unsigned long long) server.cluster->currentEpoch);
2779 return;
2780 }
2781
2782 /* Node must be a slave and its master down.
2783 * The master can be non failing if the request is flagged
2784 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2785 if (nodeIsMaster(node) || master == NULL ||
2786 (!nodeFailed(master) && !force_ack))
2787 {
2788 if (nodeIsMaster(node)) {
2789 serverLog(LL_WARNING,
2790 "Failover auth denied to %.40s: it is a master node",
2791 node->name);
2792 } else if (master == NULL) {
2793 serverLog(LL_WARNING,
2794 "Failover auth denied to %.40s: I don't know its master",
2795 node->name);
2796 } else if (!nodeFailed(master)) {
2797 serverLog(LL_WARNING,
2798 "Failover auth denied to %.40s: its master is up",
2799 node->name);
2800 }
2801 return;
2802 }
2803
2804 /* We did not voted for a slave about this master for two
2805 * times the node timeout. This is not strictly needed for correctness
2806 * of the algorithm but makes the base case more linear. */
2807 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2808 {
2809 serverLog(LL_WARNING,
2810 "Failover auth denied to %.40s: "
2811 "can't vote about this master before %lld milliseconds",
2812 node->name,
2813 (long long) ((server.cluster_node_timeout*2)-
2814 (mstime() - node->slaveof->voted_time)));
2815 return;
2816 }
2817
2818 /* The slave requesting the vote must have a configEpoch for the claimed
2819 * slots that is >= the one of the masters currently serving the same
2820 * slots in the current configuration. */
2821 for (j = 0; j < CLUSTER_SLOTS; j++) {
2822 if (bitmapTestBit(claimed_slots, j) == 0) continue;
2823 if (server.cluster->slots[j] == NULL ||
2824 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2825 {
2826 continue;
2827 }
2828 /* If we reached this point we found a slot that in our current slots
2829 * is served by a master with a greater configEpoch than the one claimed
2830 * by the slave requesting our vote. Refuse to vote for this slave. */
2831 serverLog(LL_WARNING,
2832 "Failover auth denied to %.40s: "
2833 "slot %d epoch (%llu) > reqEpoch (%llu)",
2834 node->name, j,
2835 (unsigned long long) server.cluster->slots[j]->configEpoch,
2836 (unsigned long long) requestConfigEpoch);
2837 return;
2838 }
2839
2840 /* We can vote for this slave. */
2841 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2842 node->slaveof->voted_time = mstime();
2843 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2844 clusterSendFailoverAuth(node);
2845 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2846 node->name, (unsigned long long) server.cluster->currentEpoch);
2847 }
2848
2849 /* This function returns the "rank" of this instance, a slave, in the context
2850 * of its master-slaves ring. The rank of the slave is given by the number of
2851 * other slaves for the same master that have a better replication offset
2852 * compared to the local one (better means, greater, so they claim more data).
2853 *
2854 * A slave with rank 0 is the one with the greatest (most up to date)
2855 * replication offset, and so forth. Note that because how the rank is computed
2856 * multiple slaves may have the same rank, in case they have the same offset.
2857 *
2858 * The slave rank is used to add a delay to start an election in order to
2859 * get voted and replace a failing master. Slaves with better replication
2860 * offsets are more likely to win. */
clusterGetSlaveRank(void)2861 int clusterGetSlaveRank(void) {
2862 long long myoffset;
2863 int j, rank = 0;
2864 clusterNode *master;
2865
2866 serverAssert(nodeIsSlave(myself));
2867 master = myself->slaveof;
2868 if (master == NULL) return 0; /* Never called by slaves without master. */
2869
2870 myoffset = replicationGetSlaveOffset();
2871 for (j = 0; j < master->numslaves; j++)
2872 if (master->slaves[j] != myself &&
2873 !nodeCantFailover(master->slaves[j]) &&
2874 master->slaves[j]->repl_offset > myoffset) rank++;
2875 return rank;
2876 }
2877
2878 /* This function is called by clusterHandleSlaveFailover() in order to
2879 * let the slave log why it is not able to failover. Sometimes there are
2880 * not the conditions, but since the failover function is called again and
2881 * again, we can't log the same things continuously.
2882 *
2883 * This function works by logging only if a given set of conditions are
2884 * true:
2885 *
2886 * 1) The reason for which the failover can't be initiated changed.
2887 * The reasons also include a NONE reason we reset the state to
2888 * when the slave finds that its master is fine (no FAIL flag).
2889 * 2) Also, the log is emitted again if the master is still down and
2890 * the reason for not failing over is still the same, but more than
2891 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2892 * 3) Finally, the function only logs if the slave is down for more than
2893 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
2894 * failover starts in a reasonable time.
2895 *
2896 * The function is called with the reason why the slave can't failover
2897 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2898 *
2899 * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2900 void clusterLogCantFailover(int reason) {
2901 char *msg;
2902 static time_t lastlog_time = 0;
2903 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2904
2905 /* Don't log if we have the same reason for some time. */
2906 if (reason == server.cluster->cant_failover_reason &&
2907 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2908 return;
2909
2910 server.cluster->cant_failover_reason = reason;
2911
2912 /* We also don't emit any log if the master failed no long ago, the
2913 * goal of this function is to log slaves in a stalled condition for
2914 * a long time. */
2915 if (myself->slaveof &&
2916 nodeFailed(myself->slaveof) &&
2917 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2918
2919 switch(reason) {
2920 case CLUSTER_CANT_FAILOVER_DATA_AGE:
2921 msg = "Disconnected from master for longer than allowed. "
2922 "Please check the 'cluster-replica-validity-factor' configuration "
2923 "option.";
2924 break;
2925 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2926 msg = "Waiting the delay before I can start a new failover.";
2927 break;
2928 case CLUSTER_CANT_FAILOVER_EXPIRED:
2929 msg = "Failover attempt expired.";
2930 break;
2931 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2932 msg = "Waiting for votes, but majority still not reached.";
2933 break;
2934 default:
2935 msg = "Unknown reason code.";
2936 break;
2937 }
2938 lastlog_time = time(NULL);
2939 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2940 }
2941
2942 /* This function implements the final part of automatic and manual failovers,
2943 * where the slave grabs its master's hash slots, and propagates the new
2944 * configuration.
2945 *
2946 * Note that it's up to the caller to be sure that the node got a new
2947 * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2948 void clusterFailoverReplaceYourMaster(void) {
2949 int j;
2950 clusterNode *oldmaster = myself->slaveof;
2951
2952 if (nodeIsMaster(myself) || oldmaster == NULL) return;
2953
2954 /* 1) Turn this node into a master. */
2955 clusterSetNodeAsMaster(myself);
2956 replicationUnsetMaster();
2957
2958 /* 2) Claim all the slots assigned to our master. */
2959 for (j = 0; j < CLUSTER_SLOTS; j++) {
2960 if (clusterNodeGetSlotBit(oldmaster,j)) {
2961 clusterDelSlot(j);
2962 clusterAddSlot(myself,j);
2963 }
2964 }
2965
2966 /* 3) Update state and save config. */
2967 clusterUpdateState();
2968 clusterSaveConfigOrDie(1);
2969
2970 /* 4) Pong all the other nodes so that they can update the state
2971 * accordingly and detect that we switched to master role. */
2972 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2973
2974 /* 5) If there was a manual failover in progress, clear the state. */
2975 resetManualFailover();
2976 }
2977
2978 /* This function is called if we are a slave node and our master serving
2979 * a non-zero amount of hash slots is in FAIL state.
2980 *
2981 * The gaol of this function is:
2982 * 1) To check if we are able to perform a failover, is our data updated?
2983 * 2) Try to get elected by masters.
2984 * 3) Perform the failover informing all the other nodes.
2985 */
clusterHandleSlaveFailover(void)2986 void clusterHandleSlaveFailover(void) {
2987 mstime_t data_age;
2988 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2989 int needed_quorum = (server.cluster->size / 2) + 1;
2990 int manual_failover = server.cluster->mf_end != 0 &&
2991 server.cluster->mf_can_start;
2992 mstime_t auth_timeout, auth_retry_time;
2993
2994 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2995
2996 /* Compute the failover timeout (the max time we have to send votes
2997 * and wait for replies), and the failover retry time (the time to wait
2998 * before trying to get voted again).
2999 *
3000 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3001 * Retry is two times the Timeout.
3002 */
3003 auth_timeout = server.cluster_node_timeout*2;
3004 if (auth_timeout < 2000) auth_timeout = 2000;
3005 auth_retry_time = auth_timeout*2;
3006
3007 /* Pre conditions to run the function, that must be met both in case
3008 * of an automatic or manual failover:
3009 * 1) We are a slave.
3010 * 2) Our master is flagged as FAIL, or this is a manual failover.
3011 * 3) We don't have the no failover configuration set, and this is
3012 * not a manual failover.
3013 * 4) It is serving slots. */
3014 if (nodeIsMaster(myself) ||
3015 myself->slaveof == NULL ||
3016 (!nodeFailed(myself->slaveof) && !manual_failover) ||
3017 (server.cluster_slave_no_failover && !manual_failover) ||
3018 myself->slaveof->numslots == 0)
3019 {
3020 /* There are no reasons to failover, so we set the reason why we
3021 * are returning without failing over to NONE. */
3022 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3023 return;
3024 }
3025
3026 /* Set data_age to the number of seconds we are disconnected from
3027 * the master. */
3028 if (server.repl_state == REPL_STATE_CONNECTED) {
3029 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3030 * 1000;
3031 } else {
3032 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3033 }
3034
3035 /* Remove the node timeout from the data age as it is fine that we are
3036 * disconnected from our master at least for the time it was down to be
3037 * flagged as FAIL, that's the baseline. */
3038 if (data_age > server.cluster_node_timeout)
3039 data_age -= server.cluster_node_timeout;
3040
3041 /* Check if our data is recent enough according to the slave validity
3042 * factor configured by the user.
3043 *
3044 * Check bypassed for manual failovers. */
3045 if (server.cluster_slave_validity_factor &&
3046 data_age >
3047 (((mstime_t)server.repl_ping_slave_period * 1000) +
3048 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3049 {
3050 if (!manual_failover) {
3051 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3052 return;
3053 }
3054 }
3055
3056 /* If the previous failover attempt timedout and the retry time has
3057 * elapsed, we can setup a new one. */
3058 if (auth_age > auth_retry_time) {
3059 server.cluster->failover_auth_time = mstime() +
3060 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3061 random() % 500; /* Random delay between 0 and 500 milliseconds. */
3062 server.cluster->failover_auth_count = 0;
3063 server.cluster->failover_auth_sent = 0;
3064 server.cluster->failover_auth_rank = clusterGetSlaveRank();
3065 /* We add another delay that is proportional to the slave rank.
3066 * Specifically 1 second * rank. This way slaves that have a probably
3067 * less updated replication offset, are penalized. */
3068 server.cluster->failover_auth_time +=
3069 server.cluster->failover_auth_rank * 1000;
3070 /* However if this is a manual failover, no delay is needed. */
3071 if (server.cluster->mf_end) {
3072 server.cluster->failover_auth_time = mstime();
3073 server.cluster->failover_auth_rank = 0;
3074 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3075 }
3076 serverLog(LL_WARNING,
3077 "Start of election delayed for %lld milliseconds "
3078 "(rank #%d, offset %lld).",
3079 server.cluster->failover_auth_time - mstime(),
3080 server.cluster->failover_auth_rank,
3081 replicationGetSlaveOffset());
3082 /* Now that we have a scheduled election, broadcast our offset
3083 * to all the other slaves so that they'll updated their offsets
3084 * if our offset is better. */
3085 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3086 return;
3087 }
3088
3089 /* It is possible that we received more updated offsets from other
3090 * slaves for the same master since we computed our election delay.
3091 * Update the delay if our rank changed.
3092 *
3093 * Not performed if this is a manual failover. */
3094 if (server.cluster->failover_auth_sent == 0 &&
3095 server.cluster->mf_end == 0)
3096 {
3097 int newrank = clusterGetSlaveRank();
3098 if (newrank > server.cluster->failover_auth_rank) {
3099 long long added_delay =
3100 (newrank - server.cluster->failover_auth_rank) * 1000;
3101 server.cluster->failover_auth_time += added_delay;
3102 server.cluster->failover_auth_rank = newrank;
3103 serverLog(LL_WARNING,
3104 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3105 newrank, added_delay);
3106 }
3107 }
3108
3109 /* Return ASAP if we can't still start the election. */
3110 if (mstime() < server.cluster->failover_auth_time) {
3111 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3112 return;
3113 }
3114
3115 /* Return ASAP if the election is too old to be valid. */
3116 if (auth_age > auth_timeout) {
3117 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3118 return;
3119 }
3120
3121 /* Ask for votes if needed. */
3122 if (server.cluster->failover_auth_sent == 0) {
3123 server.cluster->currentEpoch++;
3124 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3125 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3126 (unsigned long long) server.cluster->currentEpoch);
3127 clusterRequestFailoverAuth();
3128 server.cluster->failover_auth_sent = 1;
3129 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3130 CLUSTER_TODO_UPDATE_STATE|
3131 CLUSTER_TODO_FSYNC_CONFIG);
3132 return; /* Wait for replies. */
3133 }
3134
3135 /* Check if we reached the quorum. */
3136 if (server.cluster->failover_auth_count >= needed_quorum) {
3137 /* We have the quorum, we can finally failover the master. */
3138
3139 serverLog(LL_WARNING,
3140 "Failover election won: I'm the new master.");
3141
3142 /* Update my configEpoch to the epoch of the election. */
3143 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3144 myself->configEpoch = server.cluster->failover_auth_epoch;
3145 serverLog(LL_WARNING,
3146 "configEpoch set to %llu after successful failover",
3147 (unsigned long long) myself->configEpoch);
3148 }
3149
3150 /* Take responsibility for the cluster slots. */
3151 clusterFailoverReplaceYourMaster();
3152 } else {
3153 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3154 }
3155 }
3156
3157 /* -----------------------------------------------------------------------------
3158 * CLUSTER slave migration
3159 *
3160 * Slave migration is the process that allows a slave of a master that is
3161 * already covered by at least another slave, to "migrate" to a master that
3162 * is orpaned, that is, left with no working slaves.
3163 * ------------------------------------------------------------------------- */
3164
3165 /* This function is responsible to decide if this replica should be migrated
3166 * to a different (orphaned) master. It is called by the clusterCron() function
3167 * only if:
3168 *
3169 * 1) We are a slave node.
3170 * 2) It was detected that there is at least one orphaned master in
3171 * the cluster.
3172 * 3) We are a slave of one of the masters with the greatest number of
3173 * slaves.
3174 *
3175 * This checks are performed by the caller since it requires to iterate
3176 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3177 * if definitely needed.
3178 *
3179 * The fuction is called with a pre-computed max_slaves, that is the max
3180 * number of working (not in FAIL state) slaves for a single master.
3181 *
3182 * Additional conditions for migration are examined inside the function.
3183 */
clusterHandleSlaveMigration(int max_slaves)3184 void clusterHandleSlaveMigration(int max_slaves) {
3185 int j, okslaves = 0;
3186 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3187 dictIterator *di;
3188 dictEntry *de;
3189
3190 /* Step 1: Don't migrate if the cluster state is not ok. */
3191 if (server.cluster->state != CLUSTER_OK) return;
3192
3193 /* Step 2: Don't migrate if my master will not be left with at least
3194 * 'migration-barrier' slaves after my migration. */
3195 if (mymaster == NULL) return;
3196 for (j = 0; j < mymaster->numslaves; j++)
3197 if (!nodeFailed(mymaster->slaves[j]) &&
3198 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3199 if (okslaves <= server.cluster_migration_barrier) return;
3200
3201 /* Step 3: Identify a candidate for migration, and check if among the
3202 * masters with the greatest number of ok slaves, I'm the one with the
3203 * smallest node ID (the "candidate slave").
3204 *
3205 * Note: this means that eventually a replica migration will occur
3206 * since slaves that are reachable again always have their FAIL flag
3207 * cleared, so eventually there must be a candidate. At the same time
3208 * this does not mean that there are no race conditions possible (two
3209 * slaves migrating at the same time), but this is unlikely to
3210 * happen, and harmless when happens. */
3211 candidate = myself;
3212 di = dictGetSafeIterator(server.cluster->nodes);
3213 while((de = dictNext(di)) != NULL) {
3214 clusterNode *node = dictGetVal(de);
3215 int okslaves = 0, is_orphaned = 1;
3216
3217 /* We want to migrate only if this master is working, orphaned, and
3218 * used to have slaves or if failed over a master that had slaves
3219 * (MIGRATE_TO flag). This way we only migrate to instances that were
3220 * supposed to have replicas. */
3221 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3222 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3223
3224 /* Check number of working slaves. */
3225 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3226 if (okslaves > 0) is_orphaned = 0;
3227
3228 if (is_orphaned) {
3229 if (!target && node->numslots > 0) target = node;
3230
3231 /* Track the starting time of the orphaned condition for this
3232 * master. */
3233 if (!node->orphaned_time) node->orphaned_time = mstime();
3234 } else {
3235 node->orphaned_time = 0;
3236 }
3237
3238 /* Check if I'm the slave candidate for the migration: attached
3239 * to a master with the maximum number of slaves and with the smallest
3240 * node ID. */
3241 if (okslaves == max_slaves) {
3242 for (j = 0; j < node->numslaves; j++) {
3243 if (memcmp(node->slaves[j]->name,
3244 candidate->name,
3245 CLUSTER_NAMELEN) < 0)
3246 {
3247 candidate = node->slaves[j];
3248 }
3249 }
3250 }
3251 }
3252 dictReleaseIterator(di);
3253
3254 /* Step 4: perform the migration if there is a target, and if I'm the
3255 * candidate, but only if the master is continuously orphaned for a
3256 * couple of seconds, so that during failovers, we give some time to
3257 * the natural slaves of this instance to advertise their switch from
3258 * the old master to the new one. */
3259 if (target && candidate == myself &&
3260 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3261 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3262 {
3263 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3264 target->name);
3265 clusterSetMaster(target);
3266 }
3267 }
3268
3269 /* -----------------------------------------------------------------------------
3270 * CLUSTER manual failover
3271 *
3272 * This are the important steps performed by slaves during a manual failover:
3273 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3274 * setting mf_end to the millisecond unix time at which we'll abort the
3275 * attempt.
3276 * 2) Slave sends a MFSTART message to the master requesting to pause clients
3277 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3278 * When master is paused for manual failover, it also starts to flag
3279 * packets with CLUSTERMSG_FLAG0_PAUSED.
3280 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3281 * 4) If slave received the offset from the master, and its offset matches,
3282 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3283 * the failover as usually, with the difference that the vote request
3284 * will be modified to force masters to vote for a slave that has a
3285 * working master.
3286 *
3287 * From the point of view of the master things are simpler: when a
3288 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3289 * the sender in mf_slave. During the time limit for the manual failover
3290 * the master will just send PINGs more often to this slave, flagged with
3291 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3292 * a packet from the master with this flag set.
3293 *
3294 * The gaol of the manual failover is to perform a fast failover without
3295 * data loss due to the asynchronous master-slave replication.
3296 * -------------------------------------------------------------------------- */
3297
3298 /* Reset the manual failover state. This works for both masters and slavesa
3299 * as all the state about manual failover is cleared.
3300 *
3301 * The function can be used both to initialize the manual failover state at
3302 * startup or to abort a manual failover in progress. */
resetManualFailover(void)3303 void resetManualFailover(void) {
3304 if (server.cluster->mf_end && clientsArePaused()) {
3305 server.clients_pause_end_time = 0;
3306 clientsArePaused(); /* Just use the side effect of the function. */
3307 }
3308 server.cluster->mf_end = 0; /* No manual failover in progress. */
3309 server.cluster->mf_can_start = 0;
3310 server.cluster->mf_slave = NULL;
3311 server.cluster->mf_master_offset = 0;
3312 }
3313
3314 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3315 void manualFailoverCheckTimeout(void) {
3316 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3317 serverLog(LL_WARNING,"Manual failover timed out.");
3318 resetManualFailover();
3319 }
3320 }
3321
3322 /* This function is called from the cluster cron function in order to go
3323 * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3324 void clusterHandleManualFailover(void) {
3325 /* Return ASAP if no manual failover is in progress. */
3326 if (server.cluster->mf_end == 0) return;
3327
3328 /* If mf_can_start is non-zero, the failover was already triggered so the
3329 * next steps are performed by clusterHandleSlaveFailover(). */
3330 if (server.cluster->mf_can_start) return;
3331
3332 if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3333
3334 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3335 /* Our replication offset matches the master replication offset
3336 * announced after clients were paused. We can start the failover. */
3337 server.cluster->mf_can_start = 1;
3338 serverLog(LL_WARNING,
3339 "All master replication stream processed, "
3340 "manual failover can start.");
3341 }
3342 }
3343
3344 /* -----------------------------------------------------------------------------
3345 * CLUSTER cron job
3346 * -------------------------------------------------------------------------- */
3347
3348 /* This is executed 10 times every second */
clusterCron(void)3349 void clusterCron(void) {
3350 dictIterator *di;
3351 dictEntry *de;
3352 int update_state = 0;
3353 int orphaned_masters; /* How many masters there are without ok slaves. */
3354 int max_slaves; /* Max number of ok slaves for a single master. */
3355 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3356 mstime_t min_pong = 0, now = mstime();
3357 clusterNode *min_pong_node = NULL;
3358 static unsigned long long iteration = 0;
3359 mstime_t handshake_timeout;
3360
3361 iteration++; /* Number of times this function was called so far. */
3362
3363 /* We want to take myself->ip in sync with the cluster-announce-ip option.
3364 * The option can be set at runtime via CONFIG SET, so we periodically check
3365 * if the option changed to reflect this into myself->ip. */
3366 {
3367 static char *prev_ip = NULL;
3368 char *curr_ip = server.cluster_announce_ip;
3369 int changed = 0;
3370
3371 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3372 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3373 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3374
3375 if (changed) {
3376 if (prev_ip) zfree(prev_ip);
3377 prev_ip = curr_ip;
3378
3379 if (curr_ip) {
3380 /* We always take a copy of the previous IP address, by
3381 * duplicating the string. This way later we can check if
3382 * the address really changed. */
3383 prev_ip = zstrdup(prev_ip);
3384 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3385 myself->ip[NET_IP_STR_LEN-1] = '\0';
3386 } else {
3387 myself->ip[0] = '\0'; /* Force autodetection. */
3388 }
3389 }
3390 }
3391
3392 /* The handshake timeout is the time after which a handshake node that was
3393 * not turned into a normal node is removed from the nodes. Usually it is
3394 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3395 * the value of 1 second. */
3396 handshake_timeout = server.cluster_node_timeout;
3397 if (handshake_timeout < 1000) handshake_timeout = 1000;
3398
3399 /* Update myself flags. */
3400 clusterUpdateMyselfFlags();
3401
3402 /* Check if we have disconnected nodes and re-establish the connection.
3403 * Also update a few stats while we are here, that can be used to make
3404 * better decisions in other part of the code. */
3405 di = dictGetSafeIterator(server.cluster->nodes);
3406 server.cluster->stats_pfail_nodes = 0;
3407 while((de = dictNext(di)) != NULL) {
3408 clusterNode *node = dictGetVal(de);
3409
3410 /* Not interested in reconnecting the link with myself or nodes
3411 * for which we have no address. */
3412 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3413
3414 if (node->flags & CLUSTER_NODE_PFAIL)
3415 server.cluster->stats_pfail_nodes++;
3416
3417 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3418 * configured node timeout. */
3419 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3420 clusterDelNode(node);
3421 continue;
3422 }
3423
3424 if (node->link == NULL) {
3425 int fd;
3426 mstime_t old_ping_sent;
3427 clusterLink *link;
3428
3429 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3430 node->cport, NET_FIRST_BIND_ADDR);
3431 if (fd == -1) {
3432 /* We got a synchronous error from connect before
3433 * clusterSendPing() had a chance to be called.
3434 * If node->ping_sent is zero, failure detection can't work,
3435 * so we claim we actually sent a ping now (that will
3436 * be really sent as soon as the link is obtained). */
3437 if (node->ping_sent == 0) node->ping_sent = mstime();
3438 serverLog(LL_DEBUG, "Unable to connect to "
3439 "Cluster Node [%s]:%d -> %s", node->ip,
3440 node->cport, server.neterr);
3441 continue;
3442 }
3443 link = createClusterLink(node);
3444 link->fd = fd;
3445 node->link = link;
3446 aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3447 clusterReadHandler,link);
3448 /* Queue a PING in the new connection ASAP: this is crucial
3449 * to avoid false positives in failure detection.
3450 *
3451 * If the node is flagged as MEET, we send a MEET message instead
3452 * of a PING one, to force the receiver to add us in its node
3453 * table. */
3454 old_ping_sent = node->ping_sent;
3455 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3456 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3457 if (old_ping_sent) {
3458 /* If there was an active ping before the link was
3459 * disconnected, we want to restore the ping time, otherwise
3460 * replaced by the clusterSendPing() call. */
3461 node->ping_sent = old_ping_sent;
3462 }
3463 /* We can clear the flag after the first packet is sent.
3464 * If we'll never receive a PONG, we'll never send new packets
3465 * to this node. Instead after the PONG is received and we
3466 * are no longer in meet/handshake status, we want to send
3467 * normal PING packets. */
3468 node->flags &= ~CLUSTER_NODE_MEET;
3469
3470 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3471 node->name, node->ip, node->cport);
3472 }
3473 }
3474 dictReleaseIterator(di);
3475
3476 /* Ping some random node 1 time every 10 iterations, so that we usually ping
3477 * one random node every second. */
3478 if (!(iteration % 10)) {
3479 int j;
3480
3481 /* Check a few random nodes and ping the one with the oldest
3482 * pong_received time. */
3483 for (j = 0; j < 5; j++) {
3484 de = dictGetRandomKey(server.cluster->nodes);
3485 clusterNode *this = dictGetVal(de);
3486
3487 /* Don't ping nodes disconnected or with a ping currently active. */
3488 if (this->link == NULL || this->ping_sent != 0) continue;
3489 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3490 continue;
3491 if (min_pong_node == NULL || min_pong > this->pong_received) {
3492 min_pong_node = this;
3493 min_pong = this->pong_received;
3494 }
3495 }
3496 if (min_pong_node) {
3497 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3498 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3499 }
3500 }
3501
3502 /* Iterate nodes to check if we need to flag something as failing.
3503 * This loop is also responsible to:
3504 * 1) Check if there are orphaned masters (masters without non failing
3505 * slaves).
3506 * 2) Count the max number of non failing slaves for a single master.
3507 * 3) Count the number of slaves for our master, if we are a slave. */
3508 orphaned_masters = 0;
3509 max_slaves = 0;
3510 this_slaves = 0;
3511 di = dictGetSafeIterator(server.cluster->nodes);
3512 while((de = dictNext(di)) != NULL) {
3513 clusterNode *node = dictGetVal(de);
3514 now = mstime(); /* Use an updated time at every iteration. */
3515
3516 if (node->flags &
3517 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3518 continue;
3519
3520 /* Orphaned master check, useful only if the current instance
3521 * is a slave that may migrate to another master. */
3522 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3523 int okslaves = clusterCountNonFailingSlaves(node);
3524
3525 /* A master is orphaned if it is serving a non-zero number of
3526 * slots, have no working slaves, but used to have at least one
3527 * slave, or failed over a master that used to have slaves. */
3528 if (okslaves == 0 && node->numslots > 0 &&
3529 node->flags & CLUSTER_NODE_MIGRATE_TO)
3530 {
3531 orphaned_masters++;
3532 }
3533 if (okslaves > max_slaves) max_slaves = okslaves;
3534 if (nodeIsSlave(myself) && myself->slaveof == node)
3535 this_slaves = okslaves;
3536 }
3537
3538 /* If we are not receiving any data for more than half the cluster
3539 * timeout, reconnect the link: maybe there is a connection
3540 * issue even if the node is alive. */
3541 if (node->link && /* is connected */
3542 now - node->link->ctime >
3543 server.cluster_node_timeout && /* was not already reconnected */
3544 node->ping_sent && /* we already sent a ping */
3545 node->pong_received < node->ping_sent && /* still waiting pong */
3546 /* and we are waiting for the pong more than timeout/2 */
3547 now - node->ping_sent > server.cluster_node_timeout/2 &&
3548 /* and in such interval we are not seeing any traffic at all. */
3549 now - node->data_received > server.cluster_node_timeout/2)
3550 {
3551 /* Disconnect the link, it will be reconnected automatically. */
3552 freeClusterLink(node->link);
3553 }
3554
3555 /* If we have currently no active ping in this instance, and the
3556 * received PONG is older than half the cluster timeout, send
3557 * a new ping now, to ensure all the nodes are pinged without
3558 * a too big delay. */
3559 if (node->link &&
3560 node->ping_sent == 0 &&
3561 (now - node->pong_received) > server.cluster_node_timeout/2)
3562 {
3563 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3564 continue;
3565 }
3566
3567 /* If we are a master and one of the slaves requested a manual
3568 * failover, ping it continuously. */
3569 if (server.cluster->mf_end &&
3570 nodeIsMaster(myself) &&
3571 server.cluster->mf_slave == node &&
3572 node->link)
3573 {
3574 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3575 continue;
3576 }
3577
3578 /* Check only if we have an active ping for this instance. */
3579 if (node->ping_sent == 0) continue;
3580
3581 /* Compute the delay of the PONG. Note that if we already received
3582 * the PONG, then node->ping_sent is zero, so can't reach this
3583 * code at all. */
3584 mstime_t delay = now - node->ping_sent;
3585
3586 /* We consider every incoming data as proof of liveness, since
3587 * our cluster bus link is also used for data: under heavy data
3588 * load pong delays are possible. */
3589 mstime_t data_delay = now - node->data_received;
3590 if (data_delay < delay) delay = data_delay;
3591
3592 if (delay > server.cluster_node_timeout) {
3593 /* Timeout reached. Set the node as possibly failing if it is
3594 * not already in this state. */
3595 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3596 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3597 node->name);
3598 node->flags |= CLUSTER_NODE_PFAIL;
3599 update_state = 1;
3600 }
3601 }
3602 }
3603 dictReleaseIterator(di);
3604
3605 /* If we are a slave node but the replication is still turned off,
3606 * enable it if we know the address of our master and it appears to
3607 * be up. */
3608 if (nodeIsSlave(myself) &&
3609 server.masterhost == NULL &&
3610 myself->slaveof &&
3611 nodeHasAddr(myself->slaveof))
3612 {
3613 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3614 }
3615
3616 /* Abourt a manual failover if the timeout is reached. */
3617 manualFailoverCheckTimeout();
3618
3619 if (nodeIsSlave(myself)) {
3620 clusterHandleManualFailover();
3621 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3622 clusterHandleSlaveFailover();
3623 /* If there are orphaned slaves, and we are a slave among the masters
3624 * with the max number of non-failing slaves, consider migrating to
3625 * the orphaned masters. Note that it does not make sense to try
3626 * a migration if there is no master with at least *two* working
3627 * slaves. */
3628 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3629 clusterHandleSlaveMigration(max_slaves);
3630 }
3631
3632 if (update_state || server.cluster->state == CLUSTER_FAIL)
3633 clusterUpdateState();
3634 }
3635
3636 /* This function is called before the event handler returns to sleep for
3637 * events. It is useful to perform operations that must be done ASAP in
3638 * reaction to events fired but that are not safe to perform inside event
3639 * handlers, or to perform potentially expansive tasks that we need to do
3640 * a single time before replying to clients. */
clusterBeforeSleep(void)3641 void clusterBeforeSleep(void) {
3642 /* Handle failover, this is needed when it is likely that there is already
3643 * the quorum from masters in order to react fast. */
3644 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3645 clusterHandleSlaveFailover();
3646
3647 /* Update the cluster state. */
3648 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3649 clusterUpdateState();
3650
3651 /* Save the config, possibly using fsync. */
3652 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3653 int fsync = server.cluster->todo_before_sleep &
3654 CLUSTER_TODO_FSYNC_CONFIG;
3655 clusterSaveConfigOrDie(fsync);
3656 }
3657
3658 /* Reset our flags (not strictly needed since every single function
3659 * called for flags set should be able to clear its flag). */
3660 server.cluster->todo_before_sleep = 0;
3661 }
3662
clusterDoBeforeSleep(int flags)3663 void clusterDoBeforeSleep(int flags) {
3664 server.cluster->todo_before_sleep |= flags;
3665 }
3666
3667 /* -----------------------------------------------------------------------------
3668 * Slots management
3669 * -------------------------------------------------------------------------- */
3670
3671 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3672 * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3673 int bitmapTestBit(unsigned char *bitmap, int pos) {
3674 off_t byte = pos/8;
3675 int bit = pos&7;
3676 return (bitmap[byte] & (1<<bit)) != 0;
3677 }
3678
3679 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3680 void bitmapSetBit(unsigned char *bitmap, int pos) {
3681 off_t byte = pos/8;
3682 int bit = pos&7;
3683 bitmap[byte] |= 1<<bit;
3684 }
3685
3686 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3687 void bitmapClearBit(unsigned char *bitmap, int pos) {
3688 off_t byte = pos/8;
3689 int bit = pos&7;
3690 bitmap[byte] &= ~(1<<bit);
3691 }
3692
3693 /* Return non-zero if there is at least one master with slaves in the cluster.
3694 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3695 * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3696 int clusterMastersHaveSlaves(void) {
3697 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3698 dictEntry *de;
3699 int slaves = 0;
3700 while((de = dictNext(di)) != NULL) {
3701 clusterNode *node = dictGetVal(de);
3702
3703 if (nodeIsSlave(node)) continue;
3704 slaves += node->numslaves;
3705 }
3706 dictReleaseIterator(di);
3707 return slaves != 0;
3708 }
3709
3710 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3711 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3712 int old = bitmapTestBit(n->slots,slot);
3713 bitmapSetBit(n->slots,slot);
3714 if (!old) {
3715 n->numslots++;
3716 /* When a master gets its first slot, even if it has no slaves,
3717 * it gets flagged with MIGRATE_TO, that is, the master is a valid
3718 * target for replicas migration, if and only if at least one of
3719 * the other masters has slaves right now.
3720 *
3721 * Normally masters are valid targerts of replica migration if:
3722 * 1. The used to have slaves (but no longer have).
3723 * 2. They are slaves failing over a master that used to have slaves.
3724 *
3725 * However new masters with slots assigned are considered valid
3726 * migration tagets if the rest of the cluster is not a slave-less.
3727 *
3728 * See https://github.com/antirez/redis/issues/3043 for more info. */
3729 if (n->numslots == 1 && clusterMastersHaveSlaves())
3730 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3731 }
3732 return old;
3733 }
3734
3735 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3736 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3737 int old = bitmapTestBit(n->slots,slot);
3738 bitmapClearBit(n->slots,slot);
3739 if (old) n->numslots--;
3740 return old;
3741 }
3742
3743 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3744 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3745 return bitmapTestBit(n->slots,slot);
3746 }
3747
3748 /* Add the specified slot to the list of slots that node 'n' will
3749 * serve. Return C_OK if the operation ended with success.
3750 * If the slot is already assigned to another instance this is considered
3751 * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3752 int clusterAddSlot(clusterNode *n, int slot) {
3753 if (server.cluster->slots[slot]) return C_ERR;
3754 clusterNodeSetSlotBit(n,slot);
3755 server.cluster->slots[slot] = n;
3756 return C_OK;
3757 }
3758
3759 /* Delete the specified slot marking it as unassigned.
3760 * Returns C_OK if the slot was assigned, otherwise if the slot was
3761 * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3762 int clusterDelSlot(int slot) {
3763 clusterNode *n = server.cluster->slots[slot];
3764
3765 if (!n) return C_ERR;
3766 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3767 server.cluster->slots[slot] = NULL;
3768 return C_OK;
3769 }
3770
3771 /* Delete all the slots associated with the specified node.
3772 * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3773 int clusterDelNodeSlots(clusterNode *node) {
3774 int deleted = 0, j;
3775
3776 for (j = 0; j < CLUSTER_SLOTS; j++) {
3777 if (clusterNodeGetSlotBit(node,j)) {
3778 clusterDelSlot(j);
3779 deleted++;
3780 }
3781 }
3782 return deleted;
3783 }
3784
3785 /* Clear the migrating / importing state for all the slots.
3786 * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3787 void clusterCloseAllSlots(void) {
3788 memset(server.cluster->migrating_slots_to,0,
3789 sizeof(server.cluster->migrating_slots_to));
3790 memset(server.cluster->importing_slots_from,0,
3791 sizeof(server.cluster->importing_slots_from));
3792 }
3793
3794 /* -----------------------------------------------------------------------------
3795 * Cluster state evaluation function
3796 * -------------------------------------------------------------------------- */
3797
3798 /* The following are defines that are only used in the evaluation function
3799 * and are based on heuristics. Actually the main point about the rejoin and
3800 * writable delay is that they should be a few orders of magnitude larger
3801 * than the network latency. */
3802 #define CLUSTER_MAX_REJOIN_DELAY 5000
3803 #define CLUSTER_MIN_REJOIN_DELAY 500
3804 #define CLUSTER_WRITABLE_DELAY 2000
3805
clusterUpdateState(void)3806 void clusterUpdateState(void) {
3807 int j, new_state;
3808 int reachable_masters = 0;
3809 static mstime_t among_minority_time;
3810 static mstime_t first_call_time = 0;
3811
3812 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3813
3814 /* If this is a master node, wait some time before turning the state
3815 * into OK, since it is not a good idea to rejoin the cluster as a writable
3816 * master, after a reboot, without giving the cluster a chance to
3817 * reconfigure this node. Note that the delay is calculated starting from
3818 * the first call to this function and not since the server start, in order
3819 * to don't count the DB loading time. */
3820 if (first_call_time == 0) first_call_time = mstime();
3821 if (nodeIsMaster(myself) &&
3822 server.cluster->state == CLUSTER_FAIL &&
3823 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3824
3825 /* Start assuming the state is OK. We'll turn it into FAIL if there
3826 * are the right conditions. */
3827 new_state = CLUSTER_OK;
3828
3829 /* Check if all the slots are covered. */
3830 if (server.cluster_require_full_coverage) {
3831 for (j = 0; j < CLUSTER_SLOTS; j++) {
3832 if (server.cluster->slots[j] == NULL ||
3833 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3834 {
3835 new_state = CLUSTER_FAIL;
3836 break;
3837 }
3838 }
3839 }
3840
3841 /* Compute the cluster size, that is the number of master nodes
3842 * serving at least a single slot.
3843 *
3844 * At the same time count the number of reachable masters having
3845 * at least one slot. */
3846 {
3847 dictIterator *di;
3848 dictEntry *de;
3849
3850 server.cluster->size = 0;
3851 di = dictGetSafeIterator(server.cluster->nodes);
3852 while((de = dictNext(di)) != NULL) {
3853 clusterNode *node = dictGetVal(de);
3854
3855 if (nodeIsMaster(node) && node->numslots) {
3856 server.cluster->size++;
3857 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3858 reachable_masters++;
3859 }
3860 }
3861 dictReleaseIterator(di);
3862 }
3863
3864 /* If we are in a minority partition, change the cluster state
3865 * to FAIL. */
3866 {
3867 int needed_quorum = (server.cluster->size / 2) + 1;
3868
3869 if (reachable_masters < needed_quorum) {
3870 new_state = CLUSTER_FAIL;
3871 among_minority_time = mstime();
3872 }
3873 }
3874
3875 /* Log a state change */
3876 if (new_state != server.cluster->state) {
3877 mstime_t rejoin_delay = server.cluster_node_timeout;
3878
3879 /* If the instance is a master and was partitioned away with the
3880 * minority, don't let it accept queries for some time after the
3881 * partition heals, to make sure there is enough time to receive
3882 * a configuration update. */
3883 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3884 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3885 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3886 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3887
3888 if (new_state == CLUSTER_OK &&
3889 nodeIsMaster(myself) &&
3890 mstime() - among_minority_time < rejoin_delay)
3891 {
3892 return;
3893 }
3894
3895 /* Change the state and log the event. */
3896 serverLog(LL_WARNING,"Cluster state changed: %s",
3897 new_state == CLUSTER_OK ? "ok" : "fail");
3898 server.cluster->state = new_state;
3899 }
3900 }
3901
3902 /* This function is called after the node startup in order to verify that data
3903 * loaded from disk is in agreement with the cluster configuration:
3904 *
3905 * 1) If we find keys about hash slots we have no responsibility for, the
3906 * following happens:
3907 * A) If no other node is in charge according to the current cluster
3908 * configuration, we add these slots to our node.
3909 * B) If according to our config other nodes are already in charge for
3910 * this lots, we set the slots as IMPORTING from our point of view
3911 * in order to justify we have those slots, and in order to make
3912 * redis-trib aware of the issue, so that it can try to fix it.
3913 * 2) If we find data in a DB different than DB0 we return C_ERR to
3914 * signal the caller it should quit the server with an error message
3915 * or take other actions.
3916 *
3917 * The function always returns C_OK even if it will try to correct
3918 * the error described in "1". However if data is found in DB different
3919 * from DB0, C_ERR is returned.
3920 *
3921 * The function also uses the logging facility in order to warn the user
3922 * about desynchronizations between the data we have in memory and the
3923 * cluster configuration. */
verifyClusterConfigWithData(void)3924 int verifyClusterConfigWithData(void) {
3925 int j;
3926 int update_config = 0;
3927
3928 /* Return ASAP if a module disabled cluster redirections. In that case
3929 * every master can store keys about every possible hash slot. */
3930 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
3931 return C_OK;
3932
3933 /* If this node is a slave, don't perform the check at all as we
3934 * completely depend on the replication stream. */
3935 if (nodeIsSlave(myself)) return C_OK;
3936
3937 /* Make sure we only have keys in DB0. */
3938 for (j = 1; j < server.dbnum; j++) {
3939 if (dictSize(server.db[j].dict)) return C_ERR;
3940 }
3941
3942 /* Check that all the slots we see populated memory have a corresponding
3943 * entry in the cluster table. Otherwise fix the table. */
3944 for (j = 0; j < CLUSTER_SLOTS; j++) {
3945 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3946 /* Check if we are assigned to this slot or if we are importing it.
3947 * In both cases check the next slot as the configuration makes
3948 * sense. */
3949 if (server.cluster->slots[j] == myself ||
3950 server.cluster->importing_slots_from[j] != NULL) continue;
3951
3952 /* If we are here data and cluster config don't agree, and we have
3953 * slot 'j' populated even if we are not importing it, nor we are
3954 * assigned to this slot. Fix this condition. */
3955
3956 update_config++;
3957 /* Case A: slot is unassigned. Take responsibility for it. */
3958 if (server.cluster->slots[j] == NULL) {
3959 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3960 "Taking responsibility for it.",j);
3961 clusterAddSlot(myself,j);
3962 } else {
3963 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3964 "assigned to another node. "
3965 "Setting it to importing state.",j);
3966 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3967 }
3968 }
3969 if (update_config) clusterSaveConfigOrDie(1);
3970 return C_OK;
3971 }
3972
3973 /* -----------------------------------------------------------------------------
3974 * SLAVE nodes handling
3975 * -------------------------------------------------------------------------- */
3976
3977 /* Set the specified node 'n' as master for this node.
3978 * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3979 void clusterSetMaster(clusterNode *n) {
3980 serverAssert(n != myself);
3981 serverAssert(myself->numslots == 0);
3982
3983 if (nodeIsMaster(myself)) {
3984 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3985 myself->flags |= CLUSTER_NODE_SLAVE;
3986 clusterCloseAllSlots();
3987 } else {
3988 if (myself->slaveof)
3989 clusterNodeRemoveSlave(myself->slaveof,myself);
3990 }
3991 myself->slaveof = n;
3992 clusterNodeAddSlave(n,myself);
3993 replicationSetMaster(n->ip, n->port);
3994 resetManualFailover();
3995 }
3996
3997 /* -----------------------------------------------------------------------------
3998 * Nodes to string representation functions.
3999 * -------------------------------------------------------------------------- */
4000
4001 struct redisNodeFlags {
4002 uint16_t flag;
4003 char *name;
4004 };
4005
4006 static struct redisNodeFlags redisNodeFlagsTable[] = {
4007 {CLUSTER_NODE_MYSELF, "myself,"},
4008 {CLUSTER_NODE_MASTER, "master,"},
4009 {CLUSTER_NODE_SLAVE, "slave,"},
4010 {CLUSTER_NODE_PFAIL, "fail?,"},
4011 {CLUSTER_NODE_FAIL, "fail,"},
4012 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
4013 {CLUSTER_NODE_NOADDR, "noaddr,"},
4014 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
4015 };
4016
4017 /* Concatenate the comma separated list of node flags to the given SDS
4018 * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4019 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4020 size_t orig_len = sdslen(ci);
4021 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4022 for (i = 0; i < size; i++) {
4023 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4024 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4025 }
4026 /* If no flag was added, add the "noflags" special flag. */
4027 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4028 sdsIncrLen(ci,-1); /* Remove trailing comma. */
4029 return ci;
4030 }
4031
4032 /* Generate a csv-alike representation of the specified cluster node.
4033 * See clusterGenNodesDescription() top comment for more information.
4034 *
4035 * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)4036 sds clusterGenNodeDescription(clusterNode *node) {
4037 int j, start;
4038 sds ci;
4039
4040 /* Node coordinates */
4041 ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
4042 node->name,
4043 node->ip,
4044 node->port,
4045 node->cport);
4046
4047 /* Flags */
4048 ci = representClusterNodeFlags(ci, node->flags);
4049
4050 /* Slave of... or just "-" */
4051 if (node->slaveof)
4052 ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4053 else
4054 ci = sdscatlen(ci," - ",3);
4055
4056 /* Latency from the POV of this node, config epoch, link status */
4057 ci = sdscatprintf(ci,"%lld %lld %llu %s",
4058 (long long) node->ping_sent,
4059 (long long) node->pong_received,
4060 (unsigned long long) node->configEpoch,
4061 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4062 "connected" : "disconnected");
4063
4064 /* Slots served by this instance */
4065 start = -1;
4066 for (j = 0; j < CLUSTER_SLOTS; j++) {
4067 int bit;
4068
4069 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4070 if (start == -1) start = j;
4071 }
4072 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4073 if (bit && j == CLUSTER_SLOTS-1) j++;
4074
4075 if (start == j-1) {
4076 ci = sdscatprintf(ci," %d",start);
4077 } else {
4078 ci = sdscatprintf(ci," %d-%d",start,j-1);
4079 }
4080 start = -1;
4081 }
4082 }
4083
4084 /* Just for MYSELF node we also dump info about slots that
4085 * we are migrating to other instances or importing from other
4086 * instances. */
4087 if (node->flags & CLUSTER_NODE_MYSELF) {
4088 for (j = 0; j < CLUSTER_SLOTS; j++) {
4089 if (server.cluster->migrating_slots_to[j]) {
4090 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4091 server.cluster->migrating_slots_to[j]->name);
4092 } else if (server.cluster->importing_slots_from[j]) {
4093 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4094 server.cluster->importing_slots_from[j]->name);
4095 }
4096 }
4097 }
4098 return ci;
4099 }
4100
4101 /* Generate a csv-alike representation of the nodes we are aware of,
4102 * including the "myself" node, and return an SDS string containing the
4103 * representation (it is up to the caller to free it).
4104 *
4105 * All the nodes matching at least one of the node flags specified in
4106 * "filter" are excluded from the output, so using zero as a filter will
4107 * include all the known nodes in the representation, including nodes in
4108 * the HANDSHAKE state.
4109 *
4110 * The representation obtained using this function is used for the output
4111 * of the CLUSTER NODES function, and as format for the cluster
4112 * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4113 sds clusterGenNodesDescription(int filter) {
4114 sds ci = sdsempty(), ni;
4115 dictIterator *di;
4116 dictEntry *de;
4117
4118 di = dictGetSafeIterator(server.cluster->nodes);
4119 while((de = dictNext(di)) != NULL) {
4120 clusterNode *node = dictGetVal(de);
4121
4122 if (node->flags & filter) continue;
4123 ni = clusterGenNodeDescription(node);
4124 ci = sdscatsds(ci,ni);
4125 sdsfree(ni);
4126 ci = sdscatlen(ci,"\n",1);
4127 }
4128 dictReleaseIterator(di);
4129 return ci;
4130 }
4131
4132 /* -----------------------------------------------------------------------------
4133 * CLUSTER command
4134 * -------------------------------------------------------------------------- */
4135
clusterGetMessageTypeString(int type)4136 const char *clusterGetMessageTypeString(int type) {
4137 switch(type) {
4138 case CLUSTERMSG_TYPE_PING: return "ping";
4139 case CLUSTERMSG_TYPE_PONG: return "pong";
4140 case CLUSTERMSG_TYPE_MEET: return "meet";
4141 case CLUSTERMSG_TYPE_FAIL: return "fail";
4142 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4143 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4144 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4145 case CLUSTERMSG_TYPE_UPDATE: return "update";
4146 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4147 case CLUSTERMSG_TYPE_MODULE: return "module";
4148 }
4149 return "unknown";
4150 }
4151
getSlotOrReply(client * c,robj * o)4152 int getSlotOrReply(client *c, robj *o) {
4153 long long slot;
4154
4155 if (getLongLongFromObject(o,&slot) != C_OK ||
4156 slot < 0 || slot >= CLUSTER_SLOTS)
4157 {
4158 addReplyError(c,"Invalid or out of range slot");
4159 return -1;
4160 }
4161 return (int) slot;
4162 }
4163
clusterReplyMultiBulkSlots(client * c)4164 void clusterReplyMultiBulkSlots(client *c) {
4165 /* Format: 1) 1) start slot
4166 * 2) end slot
4167 * 3) 1) master IP
4168 * 2) master port
4169 * 3) node ID
4170 * 4) 1) replica IP
4171 * 2) replica port
4172 * 3) node ID
4173 * ... continued until done
4174 */
4175
4176 int num_masters = 0;
4177 void *slot_replylen = addDeferredMultiBulkLength(c);
4178
4179 dictEntry *de;
4180 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4181 while((de = dictNext(di)) != NULL) {
4182 clusterNode *node = dictGetVal(de);
4183 int j = 0, start = -1;
4184 int i, nested_elements = 0;
4185
4186 /* Skip slaves (that are iterated when producing the output of their
4187 * master) and masters not serving any slot. */
4188 if (!nodeIsMaster(node) || node->numslots == 0) continue;
4189
4190 for(i = 0; i < node->numslaves; i++) {
4191 if (nodeFailed(node->slaves[i])) continue;
4192 nested_elements++;
4193 }
4194
4195 for (j = 0; j < CLUSTER_SLOTS; j++) {
4196 int bit, i;
4197
4198 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4199 if (start == -1) start = j;
4200 }
4201 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4202 addReplyMultiBulkLen(c, nested_elements + 3); /* slots (2) + master addr (1). */
4203
4204 if (bit && j == CLUSTER_SLOTS-1) j++;
4205
4206 /* If slot exists in output map, add to it's list.
4207 * else, create a new output map for this slot */
4208 if (start == j-1) {
4209 addReplyLongLong(c, start); /* only one slot; low==high */
4210 addReplyLongLong(c, start);
4211 } else {
4212 addReplyLongLong(c, start); /* low */
4213 addReplyLongLong(c, j-1); /* high */
4214 }
4215 start = -1;
4216
4217 /* First node reply position is always the master */
4218 addReplyMultiBulkLen(c, 3);
4219 addReplyBulkCString(c, node->ip);
4220 addReplyLongLong(c, node->port);
4221 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4222
4223 /* Remaining nodes in reply are replicas for slot range */
4224 for (i = 0; i < node->numslaves; i++) {
4225 /* This loop is copy/pasted from clusterGenNodeDescription()
4226 * with modifications for per-slot node aggregation */
4227 if (nodeFailed(node->slaves[i])) continue;
4228 addReplyMultiBulkLen(c, 3);
4229 addReplyBulkCString(c, node->slaves[i]->ip);
4230 addReplyLongLong(c, node->slaves[i]->port);
4231 addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4232 }
4233 num_masters++;
4234 }
4235 }
4236 }
4237 dictReleaseIterator(di);
4238 setDeferredMultiBulkLength(c, slot_replylen, num_masters);
4239 }
4240
clusterCommand(client * c)4241 void clusterCommand(client *c) {
4242 if (server.cluster_enabled == 0) {
4243 addReplyError(c,"This instance has cluster support disabled");
4244 return;
4245 }
4246
4247 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4248 const char *help[] = {
4249 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4250 "BUMPEPOCH -- Advance the cluster config epoch.",
4251 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4252 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4253 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4254 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4255 "FORGET <node-id> -- Remove a node from the cluster.",
4256 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4257 "FLUSHSLOTS -- Delete current node own slots information.",
4258 "INFO - Return onformation about the cluster.",
4259 "KEYSLOT <key> -- Return the hash slot for <key>.",
4260 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4261 "MYID -- Return the node id.",
4262 "NODES -- Return cluster configuration seen by node. Output format:",
4263 " <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4264 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4265 "RESET [hard|soft] -- Reset current node (default: soft).",
4266 "SET-config-epoch <epoch> - Set config epoch of current node.",
4267 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4268 "REPLICAS <node-id> -- Return <node-id> replicas.",
4269 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4270 " start, end, master and replicas IP addresses, ports and ids",
4271 NULL
4272 };
4273 addReplyHelp(c, help);
4274 } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4275 /* CLUSTER MEET <ip> <port> [cport] */
4276 long long port, cport;
4277
4278 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4279 addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4280 (char*)c->argv[3]->ptr);
4281 return;
4282 }
4283
4284 if (c->argc == 5) {
4285 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4286 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4287 (char*)c->argv[4]->ptr);
4288 return;
4289 }
4290 } else {
4291 cport = port + CLUSTER_PORT_INCR;
4292 }
4293
4294 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4295 errno == EINVAL)
4296 {
4297 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4298 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4299 } else {
4300 addReply(c,shared.ok);
4301 }
4302 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4303 /* CLUSTER NODES */
4304 robj *o;
4305 sds ci = clusterGenNodesDescription(0);
4306
4307 o = createObject(OBJ_STRING,ci);
4308 addReplyBulk(c,o);
4309 decrRefCount(o);
4310 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4311 /* CLUSTER MYID */
4312 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4313 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4314 /* CLUSTER SLOTS */
4315 clusterReplyMultiBulkSlots(c);
4316 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4317 /* CLUSTER FLUSHSLOTS */
4318 if (dictSize(server.db[0].dict) != 0) {
4319 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4320 return;
4321 }
4322 clusterDelNodeSlots(myself);
4323 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4324 addReply(c,shared.ok);
4325 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4326 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4327 {
4328 /* CLUSTER ADDSLOTS <slot> [slot] ... */
4329 /* CLUSTER DELSLOTS <slot> [slot] ... */
4330 int j, slot;
4331 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4332 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4333
4334 memset(slots,0,CLUSTER_SLOTS);
4335 /* Check that all the arguments are parseable and that all the
4336 * slots are not already busy. */
4337 for (j = 2; j < c->argc; j++) {
4338 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4339 zfree(slots);
4340 return;
4341 }
4342 if (del && server.cluster->slots[slot] == NULL) {
4343 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4344 zfree(slots);
4345 return;
4346 } else if (!del && server.cluster->slots[slot]) {
4347 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4348 zfree(slots);
4349 return;
4350 }
4351 if (slots[slot]++ == 1) {
4352 addReplyErrorFormat(c,"Slot %d specified multiple times",
4353 (int)slot);
4354 zfree(slots);
4355 return;
4356 }
4357 }
4358 for (j = 0; j < CLUSTER_SLOTS; j++) {
4359 if (slots[j]) {
4360 int retval;
4361
4362 /* If this slot was set as importing we can clear this
4363 * state as now we are the real owner of the slot. */
4364 if (server.cluster->importing_slots_from[j])
4365 server.cluster->importing_slots_from[j] = NULL;
4366
4367 retval = del ? clusterDelSlot(j) :
4368 clusterAddSlot(myself,j);
4369 serverAssertWithInfo(c,NULL,retval == C_OK);
4370 }
4371 }
4372 zfree(slots);
4373 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4374 addReply(c,shared.ok);
4375 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4376 /* SETSLOT 10 MIGRATING <node ID> */
4377 /* SETSLOT 10 IMPORTING <node ID> */
4378 /* SETSLOT 10 STABLE */
4379 /* SETSLOT 10 NODE <node ID> */
4380 int slot;
4381 clusterNode *n;
4382
4383 if (nodeIsSlave(myself)) {
4384 addReplyError(c,"Please use SETSLOT only with masters.");
4385 return;
4386 }
4387
4388 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4389
4390 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4391 if (server.cluster->slots[slot] != myself) {
4392 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4393 return;
4394 }
4395 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4396 addReplyErrorFormat(c,"I don't know about node %s",
4397 (char*)c->argv[4]->ptr);
4398 return;
4399 }
4400 server.cluster->migrating_slots_to[slot] = n;
4401 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4402 if (server.cluster->slots[slot] == myself) {
4403 addReplyErrorFormat(c,
4404 "I'm already the owner of hash slot %u",slot);
4405 return;
4406 }
4407 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4408 addReplyErrorFormat(c,"I don't know about node %s",
4409 (char*)c->argv[4]->ptr);
4410 return;
4411 }
4412 server.cluster->importing_slots_from[slot] = n;
4413 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4414 /* CLUSTER SETSLOT <SLOT> STABLE */
4415 server.cluster->importing_slots_from[slot] = NULL;
4416 server.cluster->migrating_slots_to[slot] = NULL;
4417 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4418 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4419 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4420
4421 if (!n) {
4422 addReplyErrorFormat(c,"Unknown node %s",
4423 (char*)c->argv[4]->ptr);
4424 return;
4425 }
4426 /* If this hash slot was served by 'myself' before to switch
4427 * make sure there are no longer local keys for this hash slot. */
4428 if (server.cluster->slots[slot] == myself && n != myself) {
4429 if (countKeysInSlot(slot) != 0) {
4430 addReplyErrorFormat(c,
4431 "Can't assign hashslot %d to a different node "
4432 "while I still hold keys for this hash slot.", slot);
4433 return;
4434 }
4435 }
4436 /* If this slot is in migrating status but we have no keys
4437 * for it assigning the slot to another node will clear
4438 * the migratig status. */
4439 if (countKeysInSlot(slot) == 0 &&
4440 server.cluster->migrating_slots_to[slot])
4441 server.cluster->migrating_slots_to[slot] = NULL;
4442
4443 /* If this node was importing this slot, assigning the slot to
4444 * itself also clears the importing status. */
4445 if (n == myself &&
4446 server.cluster->importing_slots_from[slot])
4447 {
4448 /* This slot was manually migrated, set this node configEpoch
4449 * to a new epoch so that the new version can be propagated
4450 * by the cluster.
4451 *
4452 * Note that if this ever results in a collision with another
4453 * node getting the same configEpoch, for example because a
4454 * failover happens at the same time we close the slot, the
4455 * configEpoch collision resolution will fix it assigning
4456 * a different epoch to each node. */
4457 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4458 serverLog(LL_WARNING,
4459 "configEpoch updated after importing slot %d", slot);
4460 }
4461 server.cluster->importing_slots_from[slot] = NULL;
4462 }
4463 clusterDelSlot(slot);
4464 clusterAddSlot(n,slot);
4465 } else {
4466 addReplyError(c,
4467 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4468 return;
4469 }
4470 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4471 addReply(c,shared.ok);
4472 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4473 /* CLUSTER BUMPEPOCH */
4474 int retval = clusterBumpConfigEpochWithoutConsensus();
4475 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4476 (retval == C_OK) ? "BUMPED" : "STILL",
4477 (unsigned long long) myself->configEpoch);
4478 addReplySds(c,reply);
4479 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4480 /* CLUSTER INFO */
4481 char *statestr[] = {"ok","fail","needhelp"};
4482 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4483 uint64_t myepoch;
4484 int j;
4485
4486 for (j = 0; j < CLUSTER_SLOTS; j++) {
4487 clusterNode *n = server.cluster->slots[j];
4488
4489 if (n == NULL) continue;
4490 slots_assigned++;
4491 if (nodeFailed(n)) {
4492 slots_fail++;
4493 } else if (nodeTimedOut(n)) {
4494 slots_pfail++;
4495 } else {
4496 slots_ok++;
4497 }
4498 }
4499
4500 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4501 myself->slaveof->configEpoch : myself->configEpoch;
4502
4503 sds info = sdscatprintf(sdsempty(),
4504 "cluster_state:%s\r\n"
4505 "cluster_slots_assigned:%d\r\n"
4506 "cluster_slots_ok:%d\r\n"
4507 "cluster_slots_pfail:%d\r\n"
4508 "cluster_slots_fail:%d\r\n"
4509 "cluster_known_nodes:%lu\r\n"
4510 "cluster_size:%d\r\n"
4511 "cluster_current_epoch:%llu\r\n"
4512 "cluster_my_epoch:%llu\r\n"
4513 , statestr[server.cluster->state],
4514 slots_assigned,
4515 slots_ok,
4516 slots_pfail,
4517 slots_fail,
4518 dictSize(server.cluster->nodes),
4519 server.cluster->size,
4520 (unsigned long long) server.cluster->currentEpoch,
4521 (unsigned long long) myepoch
4522 );
4523
4524 /* Show stats about messages sent and received. */
4525 long long tot_msg_sent = 0;
4526 long long tot_msg_received = 0;
4527
4528 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4529 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4530 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4531 info = sdscatprintf(info,
4532 "cluster_stats_messages_%s_sent:%lld\r\n",
4533 clusterGetMessageTypeString(i),
4534 server.cluster->stats_bus_messages_sent[i]);
4535 }
4536 info = sdscatprintf(info,
4537 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4538
4539 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4540 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4541 tot_msg_received += server.cluster->stats_bus_messages_received[i];
4542 info = sdscatprintf(info,
4543 "cluster_stats_messages_%s_received:%lld\r\n",
4544 clusterGetMessageTypeString(i),
4545 server.cluster->stats_bus_messages_received[i]);
4546 }
4547 info = sdscatprintf(info,
4548 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4549
4550 /* Produce the reply protocol. */
4551 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4552 (unsigned long)sdslen(info)));
4553 addReplySds(c,info);
4554 addReply(c,shared.crlf);
4555 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4556 int retval = clusterSaveConfig(1);
4557
4558 if (retval == 0)
4559 addReply(c,shared.ok);
4560 else
4561 addReplyErrorFormat(c,"error saving the cluster node config: %s",
4562 strerror(errno));
4563 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4564 /* CLUSTER KEYSLOT <key> */
4565 sds key = c->argv[2]->ptr;
4566
4567 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4568 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4569 /* CLUSTER COUNTKEYSINSLOT <slot> */
4570 long long slot;
4571
4572 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4573 return;
4574 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4575 addReplyError(c,"Invalid slot");
4576 return;
4577 }
4578 addReplyLongLong(c,countKeysInSlot(slot));
4579 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4580 /* CLUSTER GETKEYSINSLOT <slot> <count> */
4581 long long maxkeys, slot;
4582 unsigned int numkeys, j;
4583 robj **keys;
4584
4585 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4586 return;
4587 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4588 != C_OK)
4589 return;
4590 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4591 addReplyError(c,"Invalid slot or number of keys");
4592 return;
4593 }
4594
4595 /* Avoid allocating more than needed in case of large COUNT argument
4596 * and smaller actual number of keys. */
4597 unsigned int keys_in_slot = countKeysInSlot(slot);
4598 if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4599
4600 keys = zmalloc(sizeof(robj*)*maxkeys);
4601 numkeys = getKeysInSlot(slot, keys, maxkeys);
4602 addReplyMultiBulkLen(c,numkeys);
4603 for (j = 0; j < numkeys; j++) {
4604 addReplyBulk(c,keys[j]);
4605 decrRefCount(keys[j]);
4606 }
4607 zfree(keys);
4608 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4609 /* CLUSTER FORGET <NODE ID> */
4610 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4611
4612 if (!n) {
4613 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4614 return;
4615 } else if (n == myself) {
4616 addReplyError(c,"I tried hard but I can't forget myself...");
4617 return;
4618 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4619 addReplyError(c,"Can't forget my master!");
4620 return;
4621 }
4622 clusterBlacklistAddNode(n);
4623 clusterDelNode(n);
4624 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4625 CLUSTER_TODO_SAVE_CONFIG);
4626 addReply(c,shared.ok);
4627 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4628 /* CLUSTER REPLICATE <NODE ID> */
4629 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4630
4631 /* Lookup the specified node in our table. */
4632 if (!n) {
4633 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4634 return;
4635 }
4636
4637 /* I can't replicate myself. */
4638 if (n == myself) {
4639 addReplyError(c,"Can't replicate myself");
4640 return;
4641 }
4642
4643 /* Can't replicate a slave. */
4644 if (nodeIsSlave(n)) {
4645 addReplyError(c,"I can only replicate a master, not a replica.");
4646 return;
4647 }
4648
4649 /* If the instance is currently a master, it should have no assigned
4650 * slots nor keys to accept to replicate some other node.
4651 * Slaves can switch to another master without issues. */
4652 if (nodeIsMaster(myself) &&
4653 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4654 addReplyError(c,
4655 "To set a master the node must be empty and "
4656 "without assigned slots.");
4657 return;
4658 }
4659
4660 /* Set the master. */
4661 clusterSetMaster(n);
4662 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4663 addReply(c,shared.ok);
4664 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4665 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4666 /* CLUSTER SLAVES <NODE ID> */
4667 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4668 int j;
4669
4670 /* Lookup the specified node in our table. */
4671 if (!n) {
4672 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4673 return;
4674 }
4675
4676 if (nodeIsSlave(n)) {
4677 addReplyError(c,"The specified node is not a master");
4678 return;
4679 }
4680
4681 addReplyMultiBulkLen(c,n->numslaves);
4682 for (j = 0; j < n->numslaves; j++) {
4683 sds ni = clusterGenNodeDescription(n->slaves[j]);
4684 addReplyBulkCString(c,ni);
4685 sdsfree(ni);
4686 }
4687 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4688 c->argc == 3)
4689 {
4690 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4691 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4692
4693 if (!n) {
4694 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4695 return;
4696 } else {
4697 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4698 }
4699 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4700 (c->argc == 2 || c->argc == 3))
4701 {
4702 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4703 int force = 0, takeover = 0;
4704
4705 if (c->argc == 3) {
4706 if (!strcasecmp(c->argv[2]->ptr,"force")) {
4707 force = 1;
4708 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4709 takeover = 1;
4710 force = 1; /* Takeover also implies force. */
4711 } else {
4712 addReply(c,shared.syntaxerr);
4713 return;
4714 }
4715 }
4716
4717 /* Check preconditions. */
4718 if (nodeIsMaster(myself)) {
4719 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4720 return;
4721 } else if (myself->slaveof == NULL) {
4722 addReplyError(c,"I'm a replica but my master is unknown to me");
4723 return;
4724 } else if (!force &&
4725 (nodeFailed(myself->slaveof) ||
4726 myself->slaveof->link == NULL))
4727 {
4728 addReplyError(c,"Master is down or failed, "
4729 "please use CLUSTER FAILOVER FORCE");
4730 return;
4731 }
4732 resetManualFailover();
4733 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4734
4735 if (takeover) {
4736 /* A takeover does not perform any initial check. It just
4737 * generates a new configuration epoch for this node without
4738 * consensus, claims the master's slots, and broadcast the new
4739 * configuration. */
4740 serverLog(LL_WARNING,"Taking over the master (user request).");
4741 clusterBumpConfigEpochWithoutConsensus();
4742 clusterFailoverReplaceYourMaster();
4743 } else if (force) {
4744 /* If this is a forced failover, we don't need to talk with our
4745 * master to agree about the offset. We just failover taking over
4746 * it without coordination. */
4747 serverLog(LL_WARNING,"Forced failover user request accepted.");
4748 server.cluster->mf_can_start = 1;
4749 } else {
4750 serverLog(LL_WARNING,"Manual failover user request accepted.");
4751 clusterSendMFStart(myself->slaveof);
4752 }
4753 addReply(c,shared.ok);
4754 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4755 {
4756 /* CLUSTER SET-CONFIG-EPOCH <epoch>
4757 *
4758 * The user is allowed to set the config epoch only when a node is
4759 * totally fresh: no config epoch, no other known node, and so forth.
4760 * This happens at cluster creation time to start with a cluster where
4761 * every node has a different node ID, without to rely on the conflicts
4762 * resolution system which is too slow when a big cluster is created. */
4763 long long epoch;
4764
4765 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4766 return;
4767
4768 if (epoch < 0) {
4769 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4770 } else if (dictSize(server.cluster->nodes) > 1) {
4771 addReplyError(c,"The user can assign a config epoch only when the "
4772 "node does not know any other node.");
4773 } else if (myself->configEpoch != 0) {
4774 addReplyError(c,"Node config epoch is already non-zero");
4775 } else {
4776 myself->configEpoch = epoch;
4777 serverLog(LL_WARNING,
4778 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4779 (unsigned long long) myself->configEpoch);
4780
4781 if (server.cluster->currentEpoch < (uint64_t)epoch)
4782 server.cluster->currentEpoch = epoch;
4783 /* No need to fsync the config here since in the unlucky event
4784 * of a failure to persist the config, the conflict resolution code
4785 * will assign an unique config to this node. */
4786 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4787 CLUSTER_TODO_SAVE_CONFIG);
4788 addReply(c,shared.ok);
4789 }
4790 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4791 (c->argc == 2 || c->argc == 3))
4792 {
4793 /* CLUSTER RESET [SOFT|HARD] */
4794 int hard = 0;
4795
4796 /* Parse soft/hard argument. Default is soft. */
4797 if (c->argc == 3) {
4798 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4799 hard = 1;
4800 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4801 hard = 0;
4802 } else {
4803 addReply(c,shared.syntaxerr);
4804 return;
4805 }
4806 }
4807
4808 /* Slaves can be reset while containing data, but not master nodes
4809 * that must be empty. */
4810 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4811 addReplyError(c,"CLUSTER RESET can't be called with "
4812 "master nodes containing keys");
4813 return;
4814 }
4815 clusterReset(hard);
4816 addReply(c,shared.ok);
4817 } else {
4818 addReplySubcommandSyntaxError(c);
4819 return;
4820 }
4821 }
4822
4823 /* -----------------------------------------------------------------------------
4824 * DUMP, RESTORE and MIGRATE commands
4825 * -------------------------------------------------------------------------- */
4826
4827 /* Generates a DUMP-format representation of the object 'o', adding it to the
4828 * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4829 void createDumpPayload(rio *payload, robj *o, robj *key) {
4830 unsigned char buf[2];
4831 uint64_t crc;
4832
4833 /* Serialize the object in a RDB-like format. It consist of an object type
4834 * byte followed by the serialized object. This is understood by RESTORE. */
4835 rioInitWithBuffer(payload,sdsempty());
4836 serverAssert(rdbSaveObjectType(payload,o));
4837 serverAssert(rdbSaveObject(payload,o,key));
4838
4839 /* Write the footer, this is how it looks like:
4840 * ----------------+---------------------+---------------+
4841 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4842 * ----------------+---------------------+---------------+
4843 * RDB version and CRC are both in little endian.
4844 */
4845
4846 /* RDB version */
4847 buf[0] = RDB_VERSION & 0xff;
4848 buf[1] = (RDB_VERSION >> 8) & 0xff;
4849 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4850
4851 /* CRC64 */
4852 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4853 sdslen(payload->io.buffer.ptr));
4854 memrev64ifbe(&crc);
4855 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4856 }
4857
4858 /* Verify that the RDB version of the dump payload matches the one of this Redis
4859 * instance and that the checksum is ok.
4860 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4861 * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4862 int verifyDumpPayload(unsigned char *p, size_t len) {
4863 unsigned char *footer;
4864 uint16_t rdbver;
4865 uint64_t crc;
4866
4867 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4868 if (len < 10) return C_ERR;
4869 footer = p+(len-10);
4870
4871 /* Verify RDB version */
4872 rdbver = (footer[1] << 8) | footer[0];
4873 if (rdbver > RDB_VERSION) return C_ERR;
4874
4875 /* Verify CRC64 */
4876 crc = crc64(0,p,len-8);
4877 memrev64ifbe(&crc);
4878 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4879 }
4880
4881 /* DUMP keyname
4882 * DUMP is actually not used by Redis Cluster but it is the obvious
4883 * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4884 void dumpCommand(client *c) {
4885 robj *o, *dumpobj;
4886 rio payload;
4887
4888 /* Check if the key is here. */
4889 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4890 addReply(c,shared.nullbulk);
4891 return;
4892 }
4893
4894 /* Create the DUMP encoded representation. */
4895 createDumpPayload(&payload,o,c->argv[1]);
4896
4897 /* Transfer to the client */
4898 dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4899 addReplyBulk(c,dumpobj);
4900 decrRefCount(dumpobj);
4901 return;
4902 }
4903
4904 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4905 void restoreCommand(client *c) {
4906 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4907 rio payload;
4908 int j, type, replace = 0, absttl = 0;
4909 robj *obj;
4910
4911 /* Parse additional options */
4912 for (j = 4; j < c->argc; j++) {
4913 int additional = c->argc-j-1;
4914 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4915 replace = 1;
4916 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
4917 absttl = 1;
4918 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
4919 lfu_freq == -1)
4920 {
4921 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
4922 != C_OK) return;
4923 if (lru_idle < 0) {
4924 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
4925 return;
4926 }
4927 lru_clock = LRU_CLOCK();
4928 j++; /* Consume additional arg. */
4929 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
4930 lru_idle == -1)
4931 {
4932 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
4933 != C_OK) return;
4934 if (lfu_freq < 0 || lfu_freq > 255) {
4935 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
4936 return;
4937 }
4938 j++; /* Consume additional arg. */
4939 } else {
4940 addReply(c,shared.syntaxerr);
4941 return;
4942 }
4943 }
4944
4945 /* Make sure this key does not already exist here... */
4946 robj *key = c->argv[1];
4947 if (!replace && lookupKeyWrite(c->db,key) != NULL) {
4948 addReply(c,shared.busykeyerr);
4949 return;
4950 }
4951
4952 /* Check if the TTL value makes sense */
4953 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4954 return;
4955 } else if (ttl < 0) {
4956 addReplyError(c,"Invalid TTL value, must be >= 0");
4957 return;
4958 }
4959
4960 /* Verify RDB version and data checksum. */
4961 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4962 {
4963 addReplyError(c,"DUMP payload version or checksum are wrong");
4964 return;
4965 }
4966
4967 rioInitWithBuffer(&payload,c->argv[3]->ptr);
4968 if (((type = rdbLoadObjectType(&payload)) == -1) ||
4969 ((obj = rdbLoadObject(type,&payload,key)) == NULL))
4970 {
4971 addReplyError(c,"Bad data format");
4972 return;
4973 }
4974
4975 /* Remove the old key if needed. */
4976 int deleted = 0;
4977 if (replace)
4978 deleted = dbDelete(c->db,key);
4979
4980 if (ttl && !absttl) ttl+=mstime();
4981 if (ttl && checkAlreadyExpired(ttl)) {
4982 if (deleted) {
4983 rewriteClientCommandVector(c,2,shared.del,key);
4984 signalModifiedKey(c->db,key);
4985 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
4986 server.dirty++;
4987 }
4988 decrRefCount(obj);
4989 addReply(c, shared.ok);
4990 return;
4991 }
4992
4993 /* Create the key and set the TTL if any */
4994 dbAdd(c->db,key,obj);
4995 if (ttl) {
4996 setExpire(c,c->db,key,ttl);
4997 }
4998 objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
4999 signalModifiedKey(c->db,key);
5000 addReply(c,shared.ok);
5001 server.dirty++;
5002 }
5003
5004 /* MIGRATE socket cache implementation.
5005 *
5006 * We take a map between host:ip and a TCP socket that we used to connect
5007 * to this instance in recent time.
5008 * This sockets are closed when the max number we cache is reached, and also
5009 * in serverCron() when they are around for more than a few seconds. */
5010 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5011 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5012
5013 typedef struct migrateCachedSocket {
5014 int fd;
5015 long last_dbid;
5016 time_t last_use_time;
5017 } migrateCachedSocket;
5018
5019 /* Return a migrateCachedSocket containing a TCP socket connected with the
5020 * target instance, possibly returning a cached one.
5021 *
5022 * This function is responsible of sending errors to the client if a
5023 * connection can't be established. In this case -1 is returned.
5024 * Otherwise on success the socket is returned, and the caller should not
5025 * attempt to free it after usage.
5026 *
5027 * If the caller detects an error while using the socket, migrateCloseSocket()
5028 * should be called so that the connection will be created from scratch
5029 * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5030 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5031 int fd;
5032 sds name = sdsempty();
5033 migrateCachedSocket *cs;
5034
5035 /* Check if we have an already cached socket for this ip:port pair. */
5036 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5037 name = sdscatlen(name,":",1);
5038 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5039 cs = dictFetchValue(server.migrate_cached_sockets,name);
5040 if (cs) {
5041 sdsfree(name);
5042 cs->last_use_time = server.unixtime;
5043 return cs;
5044 }
5045
5046 /* No cached socket, create one. */
5047 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5048 /* Too many items, drop one at random. */
5049 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5050 cs = dictGetVal(de);
5051 close(cs->fd);
5052 zfree(cs);
5053 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5054 }
5055
5056 /* Create the socket */
5057 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
5058 atoi(c->argv[2]->ptr));
5059 if (fd == -1) {
5060 sdsfree(name);
5061 addReplyErrorFormat(c,"Can't connect to target node: %s",
5062 server.neterr);
5063 return NULL;
5064 }
5065 anetEnableTcpNoDelay(server.neterr,fd);
5066
5067 /* Check if it connects within the specified timeout. */
5068 if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
5069 sdsfree(name);
5070 addReplySds(c,
5071 sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5072 close(fd);
5073 return NULL;
5074 }
5075
5076 /* Add to the cache and return it to the caller. */
5077 cs = zmalloc(sizeof(*cs));
5078 cs->fd = fd;
5079 cs->last_dbid = -1;
5080 cs->last_use_time = server.unixtime;
5081 dictAdd(server.migrate_cached_sockets,name,cs);
5082 return cs;
5083 }
5084
5085 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5086 void migrateCloseSocket(robj *host, robj *port) {
5087 sds name = sdsempty();
5088 migrateCachedSocket *cs;
5089
5090 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5091 name = sdscatlen(name,":",1);
5092 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5093 cs = dictFetchValue(server.migrate_cached_sockets,name);
5094 if (!cs) {
5095 sdsfree(name);
5096 return;
5097 }
5098
5099 close(cs->fd);
5100 zfree(cs);
5101 dictDelete(server.migrate_cached_sockets,name);
5102 sdsfree(name);
5103 }
5104
migrateCloseTimedoutSockets(void)5105 void migrateCloseTimedoutSockets(void) {
5106 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5107 dictEntry *de;
5108
5109 while((de = dictNext(di)) != NULL) {
5110 migrateCachedSocket *cs = dictGetVal(de);
5111
5112 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5113 close(cs->fd);
5114 zfree(cs);
5115 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5116 }
5117 }
5118 dictReleaseIterator(di);
5119 }
5120
5121 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
5122 *
5123 * On in the multiple keys form:
5124 *
5125 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1
5126 * key2 ... keyN */
migrateCommand(client * c)5127 void migrateCommand(client *c) {
5128 migrateCachedSocket *cs;
5129 int copy = 0, replace = 0, j;
5130 char *password = NULL;
5131 long timeout;
5132 long dbid;
5133 robj **ov = NULL; /* Objects to migrate. */
5134 robj **kv = NULL; /* Key names. */
5135 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5136 rio cmd, payload;
5137 int may_retry = 1;
5138 int write_error = 0;
5139 int argv_rewritten = 0;
5140
5141 /* To support the KEYS option we need the following additional state. */
5142 int first_key = 3; /* Argument index of the first key. */
5143 int num_keys = 1; /* By default only migrate the 'key' argument. */
5144
5145 /* Parse additional options */
5146 for (j = 6; j < c->argc; j++) {
5147 int moreargs = j < c->argc-1;
5148 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5149 copy = 1;
5150 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5151 replace = 1;
5152 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5153 if (!moreargs) {
5154 addReply(c,shared.syntaxerr);
5155 return;
5156 }
5157 j++;
5158 password = c->argv[j]->ptr;
5159 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5160 if (sdslen(c->argv[3]->ptr) != 0) {
5161 addReplyError(c,
5162 "When using MIGRATE KEYS option, the key argument"
5163 " must be set to the empty string");
5164 return;
5165 }
5166 first_key = j+1;
5167 num_keys = c->argc - j - 1;
5168 break; /* All the remaining args are keys. */
5169 } else {
5170 addReply(c,shared.syntaxerr);
5171 return;
5172 }
5173 }
5174
5175 /* Sanity check */
5176 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5177 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5178 {
5179 return;
5180 }
5181 if (timeout <= 0) timeout = 1000;
5182
5183 /* Check if the keys are here. If at least one key is to migrate, do it
5184 * otherwise if all the keys are missing reply with "NOKEY" to signal
5185 * the caller there was nothing to migrate. We don't return an error in
5186 * this case, since often this is due to a normal condition like the key
5187 * expiring in the meantime. */
5188 ov = zrealloc(ov,sizeof(robj*)*num_keys);
5189 kv = zrealloc(kv,sizeof(robj*)*num_keys);
5190 int oi = 0;
5191
5192 for (j = 0; j < num_keys; j++) {
5193 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5194 kv[oi] = c->argv[first_key+j];
5195 oi++;
5196 }
5197 }
5198 num_keys = oi;
5199 if (num_keys == 0) {
5200 zfree(ov); zfree(kv);
5201 addReplySds(c,sdsnew("+NOKEY\r\n"));
5202 return;
5203 }
5204
5205 try_again:
5206 write_error = 0;
5207
5208 /* Connect */
5209 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5210 if (cs == NULL) {
5211 zfree(ov); zfree(kv);
5212 return; /* error sent to the client by migrateGetSocket() */
5213 }
5214
5215 rioInitWithBuffer(&cmd,sdsempty());
5216
5217 /* Authentication */
5218 if (password) {
5219 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5220 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5221 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5222 sdslen(password)));
5223 }
5224
5225 /* Send the SELECT command if the current DB is not already selected. */
5226 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5227 if (select) {
5228 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5229 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5230 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5231 }
5232
5233 int non_expired = 0; /* Number of keys that we'll find non expired.
5234 Note that serializing large keys may take some time
5235 so certain keys that were found non expired by the
5236 lookupKey() function, may be expired later. */
5237
5238 /* Create RESTORE payload and generate the protocol to call the command. */
5239 for (j = 0; j < num_keys; j++) {
5240 long long ttl = 0;
5241 long long expireat = getExpire(c->db,kv[j]);
5242
5243 if (expireat != -1) {
5244 ttl = expireat-mstime();
5245 if (ttl < 0) {
5246 continue;
5247 }
5248 if (ttl < 1) ttl = 1;
5249 }
5250
5251 /* Relocate valid (non expired) keys into the array in successive
5252 * positions to remove holes created by the keys that were present
5253 * in the first lookup but are now expired after the second lookup. */
5254 kv[non_expired++] = kv[j];
5255
5256 serverAssertWithInfo(c,NULL,
5257 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5258
5259 if (server.cluster_enabled)
5260 serverAssertWithInfo(c,NULL,
5261 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5262 else
5263 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5264 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5265 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5266 sdslen(kv[j]->ptr)));
5267 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5268
5269 /* Emit the payload argument, that is the serialized object using
5270 * the DUMP format. */
5271 createDumpPayload(&payload,ov[j],kv[j]);
5272 serverAssertWithInfo(c,NULL,
5273 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5274 sdslen(payload.io.buffer.ptr)));
5275 sdsfree(payload.io.buffer.ptr);
5276
5277 /* Add the REPLACE option to the RESTORE command if it was specified
5278 * as a MIGRATE option. */
5279 if (replace)
5280 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5281 }
5282
5283 /* Fix the actual number of keys we are migrating. */
5284 num_keys = non_expired;
5285
5286 /* Transfer the query to the other node in 64K chunks. */
5287 errno = 0;
5288 {
5289 sds buf = cmd.io.buffer.ptr;
5290 size_t pos = 0, towrite;
5291 int nwritten = 0;
5292
5293 while ((towrite = sdslen(buf)-pos) > 0) {
5294 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5295 nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
5296 if (nwritten != (signed)towrite) {
5297 write_error = 1;
5298 goto socket_err;
5299 }
5300 pos += nwritten;
5301 }
5302 }
5303
5304 char buf0[1024]; /* Auth reply. */
5305 char buf1[1024]; /* Select reply. */
5306 char buf2[1024]; /* Restore reply. */
5307
5308 /* Read the AUTH reply if needed. */
5309 if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
5310 goto socket_err;
5311
5312 /* Read the SELECT reply if needed. */
5313 if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
5314 goto socket_err;
5315
5316 /* Read the RESTORE replies. */
5317 int error_from_target = 0;
5318 int socket_error = 0;
5319 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5320
5321 /* Allocate the new argument vector that will replace the current command,
5322 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5323 * We allocate num_keys+1 because the additional argument is for "DEL"
5324 * command name itself. */
5325 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5326
5327 for (j = 0; j < num_keys; j++) {
5328 if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
5329 socket_error = 1;
5330 break;
5331 }
5332 if ((password && buf0[0] == '-') ||
5333 (select && buf1[0] == '-') ||
5334 buf2[0] == '-')
5335 {
5336 /* On error assume that last_dbid is no longer valid. */
5337 if (!error_from_target) {
5338 cs->last_dbid = -1;
5339 char *errbuf;
5340 if (password && buf0[0] == '-') errbuf = buf0;
5341 else if (select && buf1[0] == '-') errbuf = buf1;
5342 else errbuf = buf2;
5343
5344 error_from_target = 1;
5345 addReplyErrorFormat(c,"Target instance replied with error: %s",
5346 errbuf+1);
5347 }
5348 } else {
5349 if (!copy) {
5350 /* No COPY option: remove the local key, signal the change. */
5351 dbDelete(c->db,kv[j]);
5352 signalModifiedKey(c->db,kv[j]);
5353 server.dirty++;
5354
5355 /* Populate the argument vector to replace the old one. */
5356 newargv[del_idx++] = kv[j];
5357 incrRefCount(kv[j]);
5358 }
5359 }
5360 }
5361
5362 /* On socket error, if we want to retry, do it now before rewriting the
5363 * command vector. We only retry if we are sure nothing was processed
5364 * and we failed to read the first reply (j == 0 test). */
5365 if (!error_from_target && socket_error && j == 0 && may_retry &&
5366 errno != ETIMEDOUT)
5367 {
5368 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5369 }
5370
5371 /* On socket errors, close the migration socket now that we still have
5372 * the original host/port in the ARGV. Later the original command may be
5373 * rewritten to DEL and will be too later. */
5374 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5375
5376 if (!copy) {
5377 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5378 * this only for the keys for which we received an acknowledgement
5379 * from the receiving Redis server, by using the del_idx index. */
5380 if (del_idx > 1) {
5381 newargv[0] = createStringObject("DEL",3);
5382 /* Note that the following call takes ownership of newargv. */
5383 replaceClientCommandVector(c,del_idx,newargv);
5384 argv_rewritten = 1;
5385 } else {
5386 /* No key transfer acknowledged, no need to rewrite as DEL. */
5387 zfree(newargv);
5388 }
5389 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5390 }
5391
5392 /* If we are here and a socket error happened, we don't want to retry.
5393 * Just signal the problem to the client, but only do it if we did not
5394 * already queue a different error reported by the destination server. */
5395 if (!error_from_target && socket_error) {
5396 may_retry = 0;
5397 goto socket_err;
5398 }
5399
5400 if (!error_from_target) {
5401 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5402 * avoid SELECT the next time if the target DB is the same. Reply +OK.
5403 *
5404 * Note: If we reached this point, even if socket_error is true
5405 * still the SELECT command succeeded (otherwise the code jumps to
5406 * socket_err label. */
5407 cs->last_dbid = dbid;
5408 addReply(c,shared.ok);
5409 } else {
5410 /* On error we already sent it in the for loop above, and set
5411 * the currently selected socket to -1 to force SELECT the next time. */
5412 }
5413
5414 sdsfree(cmd.io.buffer.ptr);
5415 zfree(ov); zfree(kv); zfree(newargv);
5416 return;
5417
5418 /* On socket errors we try to close the cached socket and try again.
5419 * It is very common for the cached socket to get closed, if just reopening
5420 * it works it's a shame to notify the error to the caller. */
5421 socket_err:
5422 /* Cleanup we want to perform in both the retry and no retry case.
5423 * Note: Closing the migrate socket will also force SELECT next time. */
5424 sdsfree(cmd.io.buffer.ptr);
5425
5426 /* If the command was rewritten as DEL and there was a socket error,
5427 * we already closed the socket earlier. While migrateCloseSocket()
5428 * is idempotent, the host/port arguments are now gone, so don't do it
5429 * again. */
5430 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5431 zfree(newargv);
5432 newargv = NULL; /* This will get reallocated on retry. */
5433
5434 /* Retry only if it's not a timeout and we never attempted a retry
5435 * (or the code jumping here did not set may_retry to zero). */
5436 if (errno != ETIMEDOUT && may_retry) {
5437 may_retry = 0;
5438 goto try_again;
5439 }
5440
5441 /* Cleanup we want to do if no retry is attempted. */
5442 zfree(ov); zfree(kv);
5443 addReplySds(c,
5444 sdscatprintf(sdsempty(),
5445 "-IOERR error or timeout %s to target instance\r\n",
5446 write_error ? "writing" : "reading"));
5447 return;
5448 }
5449
5450 /* -----------------------------------------------------------------------------
5451 * Cluster functions related to serving / redirecting clients
5452 * -------------------------------------------------------------------------- */
5453
5454 /* The ASKING command is required after a -ASK redirection.
5455 * The client should issue ASKING before to actually send the command to
5456 * the target instance. See the Redis Cluster specification for more
5457 * information. */
askingCommand(client * c)5458 void askingCommand(client *c) {
5459 if (server.cluster_enabled == 0) {
5460 addReplyError(c,"This instance has cluster support disabled");
5461 return;
5462 }
5463 c->flags |= CLIENT_ASKING;
5464 addReply(c,shared.ok);
5465 }
5466
5467 /* The READONLY command is used by clients to enter the read-only mode.
5468 * In this mode slaves will not redirect clients as long as clients access
5469 * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5470 void readonlyCommand(client *c) {
5471 if (server.cluster_enabled == 0) {
5472 addReplyError(c,"This instance has cluster support disabled");
5473 return;
5474 }
5475 c->flags |= CLIENT_READONLY;
5476 addReply(c,shared.ok);
5477 }
5478
5479 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5480 void readwriteCommand(client *c) {
5481 c->flags &= ~CLIENT_READONLY;
5482 addReply(c,shared.ok);
5483 }
5484
5485 /* Return the pointer to the cluster node that is able to serve the command.
5486 * For the function to succeed the command should only target either:
5487 *
5488 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5489 * 2) Multiple keys in the same hash slot, while the slot is stable (no
5490 * resharding in progress).
5491 *
5492 * On success the function returns the node that is able to serve the request.
5493 * If the node is not 'myself' a redirection must be perfomed. The kind of
5494 * redirection is specified setting the integer passed by reference
5495 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5496 * CLUSTER_REDIR_MOVED.
5497 *
5498 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5499 *
5500 * If the command fails NULL is returned, and the reason of the failure is
5501 * provided via 'error_code', which will be set to:
5502 *
5503 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5504 * don't belong to the same hash slot.
5505 *
5506 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5507 * belonging to the same slot, but the slot is not stable (in migration or
5508 * importing state, likely because a resharding is in progress).
5509 *
5510 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5511 * not bound to any node. In this case the cluster global state should be
5512 * already "down" but it is fragile to rely on the update of the global state,
5513 * so we also handle it here.
5514 *
5515 * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
5516 * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5517 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5518 clusterNode *n = NULL;
5519 robj *firstkey = NULL;
5520 int multiple_keys = 0;
5521 multiState *ms, _ms;
5522 multiCmd mc;
5523 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5524
5525 /* Allow any key to be set if a module disabled cluster redirections. */
5526 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5527 return myself;
5528
5529 /* Set error code optimistically for the base case. */
5530 if (error_code) *error_code = CLUSTER_REDIR_NONE;
5531
5532 /* Modules can turn off Redis Cluster redirection: this is useful
5533 * when writing a module that implements a completely different
5534 * distributed system. */
5535
5536 /* We handle all the cases as if they were EXEC commands, so we have
5537 * a common code path for everything */
5538 if (cmd->proc == execCommand) {
5539 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5540 * error. */
5541 if (!(c->flags & CLIENT_MULTI)) return myself;
5542 ms = &c->mstate;
5543 } else {
5544 /* In order to have a single codepath create a fake Multi State
5545 * structure if the client is not in MULTI/EXEC state, this way
5546 * we have a single codepath below. */
5547 ms = &_ms;
5548 _ms.commands = &mc;
5549 _ms.count = 1;
5550 mc.argv = argv;
5551 mc.argc = argc;
5552 mc.cmd = cmd;
5553 }
5554
5555 /* Check that all the keys are in the same hash slot, and obtain this
5556 * slot and the node associated. */
5557 for (i = 0; i < ms->count; i++) {
5558 struct redisCommand *mcmd;
5559 robj **margv;
5560 int margc, *keyindex, numkeys, j;
5561
5562 mcmd = ms->commands[i].cmd;
5563 margc = ms->commands[i].argc;
5564 margv = ms->commands[i].argv;
5565
5566 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5567 for (j = 0; j < numkeys; j++) {
5568 robj *thiskey = margv[keyindex[j]];
5569 int thisslot = keyHashSlot((char*)thiskey->ptr,
5570 sdslen(thiskey->ptr));
5571
5572 if (firstkey == NULL) {
5573 /* This is the first key we see. Check what is the slot
5574 * and node. */
5575 firstkey = thiskey;
5576 slot = thisslot;
5577 n = server.cluster->slots[slot];
5578
5579 /* Error: If a slot is not served, we are in "cluster down"
5580 * state. However the state is yet to be updated, so this was
5581 * not trapped earlier in processCommand(). Report the same
5582 * error to the client. */
5583 if (n == NULL) {
5584 getKeysFreeResult(keyindex);
5585 if (error_code)
5586 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5587 return NULL;
5588 }
5589
5590 /* If we are migrating or importing this slot, we need to check
5591 * if we have all the keys in the request (the only way we
5592 * can safely serve the request, otherwise we return a TRYAGAIN
5593 * error). To do so we set the importing/migrating state and
5594 * increment a counter for every missing key. */
5595 if (n == myself &&
5596 server.cluster->migrating_slots_to[slot] != NULL)
5597 {
5598 migrating_slot = 1;
5599 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5600 importing_slot = 1;
5601 }
5602 } else {
5603 /* If it is not the first key, make sure it is exactly
5604 * the same key as the first we saw. */
5605 if (!equalStringObjects(firstkey,thiskey)) {
5606 if (slot != thisslot) {
5607 /* Error: multiple keys from different slots. */
5608 getKeysFreeResult(keyindex);
5609 if (error_code)
5610 *error_code = CLUSTER_REDIR_CROSS_SLOT;
5611 return NULL;
5612 } else {
5613 /* Flag this request as one with multiple different
5614 * keys. */
5615 multiple_keys = 1;
5616 }
5617 }
5618 }
5619
5620 /* Migarting / Improrting slot? Count keys we don't have. */
5621 if ((migrating_slot || importing_slot) &&
5622 lookupKeyRead(&server.db[0],thiskey) == NULL)
5623 {
5624 missing_keys++;
5625 }
5626 }
5627 getKeysFreeResult(keyindex);
5628 }
5629
5630 /* No key at all in command? then we can serve the request
5631 * without redirections or errors in all the cases. */
5632 if (n == NULL) return myself;
5633
5634 /* Cluster is globally down but we got keys? We can't serve the request. */
5635 if (server.cluster->state != CLUSTER_OK) {
5636 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5637 return NULL;
5638 }
5639
5640 /* Return the hashslot by reference. */
5641 if (hashslot) *hashslot = slot;
5642
5643 /* MIGRATE always works in the context of the local node if the slot
5644 * is open (migrating or importing state). We need to be able to freely
5645 * move keys among instances in this case. */
5646 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5647 return myself;
5648
5649 /* If we don't have all the keys and we are migrating the slot, send
5650 * an ASK redirection. */
5651 if (migrating_slot && missing_keys) {
5652 if (error_code) *error_code = CLUSTER_REDIR_ASK;
5653 return server.cluster->migrating_slots_to[slot];
5654 }
5655
5656 /* If we are receiving the slot, and the client correctly flagged the
5657 * request as "ASKING", we can serve the request. However if the request
5658 * involves multiple keys and we don't have them all, the only option is
5659 * to send a TRYAGAIN error. */
5660 if (importing_slot &&
5661 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5662 {
5663 if (multiple_keys && missing_keys) {
5664 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5665 return NULL;
5666 } else {
5667 return myself;
5668 }
5669 }
5670
5671 /* Handle the read-only client case reading from a slave: if this
5672 * node is a slave and the request is about an hash slot our master
5673 * is serving, we can reply without redirection. */
5674 if (c->flags & CLIENT_READONLY &&
5675 (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
5676 cmd->proc == evalShaCommand) &&
5677 nodeIsSlave(myself) &&
5678 myself->slaveof == n)
5679 {
5680 return myself;
5681 }
5682
5683 /* Base case: just return the right node. However if this node is not
5684 * myself, set error_code to MOVED since we need to issue a rediretion. */
5685 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5686 return n;
5687 }
5688
5689 /* Send the client the right redirection code, according to error_code
5690 * that should be set to one of CLUSTER_REDIR_* macros.
5691 *
5692 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5693 * are used, then the node 'n' should not be NULL, but should be the
5694 * node we want to mention in the redirection. Moreover hashslot should
5695 * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5696 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5697 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5698 addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5699 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5700 /* The request spawns multiple keys in the same slot,
5701 * but the slot is not "stable" currently as there is
5702 * a migration or import in progress. */
5703 addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5704 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5705 addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5706 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5707 addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5708 } else if (error_code == CLUSTER_REDIR_MOVED ||
5709 error_code == CLUSTER_REDIR_ASK)
5710 {
5711 addReplySds(c,sdscatprintf(sdsempty(),
5712 "-%s %d %s:%d\r\n",
5713 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5714 hashslot,n->ip,n->port));
5715 } else {
5716 serverPanic("getNodeByQuery() unknown error.");
5717 }
5718 }
5719
5720 /* This function is called by the function processing clients incrementally
5721 * to detect timeouts, in order to handle the following case:
5722 *
5723 * 1) A client blocks with BLPOP or similar blocking operation.
5724 * 2) The master migrates the hash slot elsewhere or turns into a slave.
5725 * 3) The client may remain blocked forever (or up to the max timeout time)
5726 * waiting for a key change that will never happen.
5727 *
5728 * If the client is found to be blocked into an hash slot this node no
5729 * longer handles, the client is sent a redirection error, and the function
5730 * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5731 int clusterRedirectBlockedClientIfNeeded(client *c) {
5732 if (c->flags & CLIENT_BLOCKED &&
5733 (c->btype == BLOCKED_LIST ||
5734 c->btype == BLOCKED_ZSET ||
5735 c->btype == BLOCKED_STREAM))
5736 {
5737 dictEntry *de;
5738 dictIterator *di;
5739
5740 /* If the cluster is down, unblock the client with the right error. */
5741 if (server.cluster->state == CLUSTER_FAIL) {
5742 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5743 return 1;
5744 }
5745
5746 /* All keys must belong to the same slot, so check first key only. */
5747 di = dictGetIterator(c->bpop.keys);
5748 if ((de = dictNext(di)) != NULL) {
5749 robj *key = dictGetKey(de);
5750 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5751 clusterNode *node = server.cluster->slots[slot];
5752
5753 /* if the client is read-only and attempting to access key that our
5754 * replica can handle, allow it. */
5755 if ((c->flags & CLIENT_READONLY) &&
5756 (c->lastcmd->flags & CMD_READONLY) &&
5757 nodeIsSlave(myself) && myself->slaveof == node)
5758 {
5759 node = myself;
5760 }
5761
5762 /* We send an error and unblock the client if:
5763 * 1) The slot is unassigned, emitting a cluster down error.
5764 * 2) The slot is not handled by this node, nor being imported. */
5765 if (node != myself &&
5766 server.cluster->importing_slots_from[slot] == NULL)
5767 {
5768 if (node == NULL) {
5769 clusterRedirectClient(c,NULL,0,
5770 CLUSTER_REDIR_DOWN_UNBOUND);
5771 } else {
5772 clusterRedirectClient(c,node,slot,
5773 CLUSTER_REDIR_MOVED);
5774 }
5775 dictReleaseIterator(di);
5776 return 1;
5777 }
5778 }
5779 dictReleaseIterator(di);
5780 }
5781 return 0;
5782 }
5783