1 /* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43
44 /* A global reference to myself is handy to make code more clear.
45 * Myself always points to server.cluster->myself, that is, the clusterNode
46 * that represents this node. */
47 clusterNode *myself = NULL;
48
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(connection *conn);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79
80 #define RCVBUF_INIT_LEN 1024
81 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
82
83 /* -----------------------------------------------------------------------------
84 * Initialization
85 * -------------------------------------------------------------------------- */
86
87 /* Load the cluster config from 'filename'.
88 *
89 * If the file does not exist or is zero-length (this may happen because
90 * when we lock the nodes.conf file, we create a zero-length one for the
91 * sake of locking if it does not already exist), C_ERR is returned.
92 * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)93 int clusterLoadConfig(char *filename) {
94 FILE *fp = fopen(filename,"r");
95 struct stat sb;
96 char *line;
97 int maxline, j;
98
99 if (fp == NULL) {
100 if (errno == ENOENT) {
101 return C_ERR;
102 } else {
103 serverLog(LL_WARNING,
104 "Loading the cluster node config from %s: %s",
105 filename, strerror(errno));
106 exit(1);
107 }
108 }
109
110 /* Check if the file is zero-length: if so return C_ERR to signal
111 * we have to write the config. */
112 if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
113 fclose(fp);
114 return C_ERR;
115 }
116
117 /* Parse the file. Note that single lines of the cluster config file can
118 * be really long as they include all the hash slots of the node.
119 * This means in the worst possible case, half of the Redis slots will be
120 * present in a single line, possibly in importing or migrating state, so
121 * together with the node ID of the sender/receiver.
122 *
123 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
124 maxline = 1024+CLUSTER_SLOTS*128;
125 line = zmalloc(maxline);
126 while(fgets(line,maxline,fp) != NULL) {
127 int argc;
128 sds *argv;
129 clusterNode *n, *master;
130 char *p, *s;
131
132 /* Skip blank lines, they can be created either by users manually
133 * editing nodes.conf or by the config writing process if stopped
134 * before the truncate() call. */
135 if (line[0] == '\n' || line[0] == '\0') continue;
136
137 /* Split the line into arguments for processing. */
138 argv = sdssplitargs(line,&argc);
139 if (argv == NULL) goto fmterr;
140
141 /* Handle the special "vars" line. Don't pretend it is the last
142 * line even if it actually is when generated by Redis. */
143 if (strcasecmp(argv[0],"vars") == 0) {
144 if (!(argc % 2)) goto fmterr;
145 for (j = 1; j < argc; j += 2) {
146 if (strcasecmp(argv[j],"currentEpoch") == 0) {
147 server.cluster->currentEpoch =
148 strtoull(argv[j+1],NULL,10);
149 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
150 server.cluster->lastVoteEpoch =
151 strtoull(argv[j+1],NULL,10);
152 } else {
153 serverLog(LL_WARNING,
154 "Skipping unknown cluster config variable '%s'",
155 argv[j]);
156 }
157 }
158 sdsfreesplitres(argv,argc);
159 continue;
160 }
161
162 /* Regular config lines have at least eight fields */
163 if (argc < 8) {
164 sdsfreesplitres(argv,argc);
165 goto fmterr;
166 }
167
168 /* Create this node if it does not exist */
169 n = clusterLookupNode(argv[0]);
170 if (!n) {
171 n = createClusterNode(argv[0],0);
172 clusterAddNode(n);
173 }
174 /* Address and port */
175 if ((p = strrchr(argv[1],':')) == NULL) {
176 sdsfreesplitres(argv,argc);
177 goto fmterr;
178 }
179 *p = '\0';
180 memcpy(n->ip,argv[1],strlen(argv[1])+1);
181 char *port = p+1;
182 char *busp = strchr(port,'@');
183 if (busp) {
184 *busp = '\0';
185 busp++;
186 }
187 n->port = atoi(port);
188 /* In older versions of nodes.conf the "@busport" part is missing.
189 * In this case we set it to the default offset of 10000 from the
190 * base port. */
191 n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
192
193 /* Parse flags */
194 p = s = argv[2];
195 while(p) {
196 p = strchr(s,',');
197 if (p) *p = '\0';
198 if (!strcasecmp(s,"myself")) {
199 serverAssert(server.cluster->myself == NULL);
200 myself = server.cluster->myself = n;
201 n->flags |= CLUSTER_NODE_MYSELF;
202 } else if (!strcasecmp(s,"master")) {
203 n->flags |= CLUSTER_NODE_MASTER;
204 } else if (!strcasecmp(s,"slave")) {
205 n->flags |= CLUSTER_NODE_SLAVE;
206 } else if (!strcasecmp(s,"fail?")) {
207 n->flags |= CLUSTER_NODE_PFAIL;
208 } else if (!strcasecmp(s,"fail")) {
209 n->flags |= CLUSTER_NODE_FAIL;
210 n->fail_time = mstime();
211 } else if (!strcasecmp(s,"handshake")) {
212 n->flags |= CLUSTER_NODE_HANDSHAKE;
213 } else if (!strcasecmp(s,"noaddr")) {
214 n->flags |= CLUSTER_NODE_NOADDR;
215 } else if (!strcasecmp(s,"nofailover")) {
216 n->flags |= CLUSTER_NODE_NOFAILOVER;
217 } else if (!strcasecmp(s,"noflags")) {
218 /* nothing to do */
219 } else {
220 serverPanic("Unknown flag in redis cluster config file");
221 }
222 if (p) s = p+1;
223 }
224
225 /* Get master if any. Set the master and populate master's
226 * slave list. */
227 if (argv[3][0] != '-') {
228 master = clusterLookupNode(argv[3]);
229 if (!master) {
230 master = createClusterNode(argv[3],0);
231 clusterAddNode(master);
232 }
233 n->slaveof = master;
234 clusterNodeAddSlave(master,n);
235 }
236
237 /* Set ping sent / pong received timestamps */
238 if (atoi(argv[4])) n->ping_sent = mstime();
239 if (atoi(argv[5])) n->pong_received = mstime();
240
241 /* Set configEpoch for this node. */
242 n->configEpoch = strtoull(argv[6],NULL,10);
243
244 /* Populate hash slots served by this instance. */
245 for (j = 8; j < argc; j++) {
246 int start, stop;
247
248 if (argv[j][0] == '[') {
249 /* Here we handle migrating / importing slots */
250 int slot;
251 char direction;
252 clusterNode *cn;
253
254 p = strchr(argv[j],'-');
255 serverAssert(p != NULL);
256 *p = '\0';
257 direction = p[1]; /* Either '>' or '<' */
258 slot = atoi(argv[j]+1);
259 if (slot < 0 || slot >= CLUSTER_SLOTS) {
260 sdsfreesplitres(argv,argc);
261 goto fmterr;
262 }
263 p += 3;
264 cn = clusterLookupNode(p);
265 if (!cn) {
266 cn = createClusterNode(p,0);
267 clusterAddNode(cn);
268 }
269 if (direction == '>') {
270 server.cluster->migrating_slots_to[slot] = cn;
271 } else {
272 server.cluster->importing_slots_from[slot] = cn;
273 }
274 continue;
275 } else if ((p = strchr(argv[j],'-')) != NULL) {
276 *p = '\0';
277 start = atoi(argv[j]);
278 stop = atoi(p+1);
279 } else {
280 start = stop = atoi(argv[j]);
281 }
282 if (start < 0 || start >= CLUSTER_SLOTS ||
283 stop < 0 || stop >= CLUSTER_SLOTS)
284 {
285 sdsfreesplitres(argv,argc);
286 goto fmterr;
287 }
288 while(start <= stop) clusterAddSlot(n, start++);
289 }
290
291 sdsfreesplitres(argv,argc);
292 }
293 /* Config sanity check */
294 if (server.cluster->myself == NULL) goto fmterr;
295
296 zfree(line);
297 fclose(fp);
298
299 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
300
301 /* Something that should never happen: currentEpoch smaller than
302 * the max epoch found in the nodes configuration. However we handle this
303 * as some form of protection against manual editing of critical files. */
304 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
305 server.cluster->currentEpoch = clusterGetMaxEpoch();
306 }
307 return C_OK;
308
309 fmterr:
310 serverLog(LL_WARNING,
311 "Unrecoverable error: corrupted cluster config file.");
312 zfree(line);
313 if (fp) fclose(fp);
314 exit(1);
315 }
316
317 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
318 *
319 * This function writes the node config and returns 0, on error -1
320 * is returned.
321 *
322 * Note: we need to write the file in an atomic way from the point of view
323 * of the POSIX filesystem semantics, so that if the server is stopped
324 * or crashes during the write, we'll end with either the old file or the
325 * new one. Since we have the full payload to write available we can use
326 * a single write to write the whole file. If the pre-existing file was
327 * bigger we pad our payload with newlines that are anyway ignored and truncate
328 * the file afterward. */
clusterSaveConfig(int do_fsync)329 int clusterSaveConfig(int do_fsync) {
330 sds ci;
331 size_t content_size;
332 struct stat sb;
333 int fd;
334
335 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
336
337 /* Get the nodes description and concatenate our "vars" directive to
338 * save currentEpoch and lastVoteEpoch. */
339 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
340 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
341 (unsigned long long) server.cluster->currentEpoch,
342 (unsigned long long) server.cluster->lastVoteEpoch);
343 content_size = sdslen(ci);
344
345 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
346 == -1) goto err;
347
348 /* Pad the new payload if the existing file length is greater. */
349 if (fstat(fd,&sb) != -1) {
350 if (sb.st_size > (off_t)content_size) {
351 ci = sdsgrowzero(ci,sb.st_size);
352 memset(ci+content_size,'\n',sb.st_size-content_size);
353 }
354 }
355 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
356 if (do_fsync) {
357 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
358 fsync(fd);
359 }
360
361 /* Truncate the file if needed to remove the final \n padding that
362 * is just garbage. */
363 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
364 /* ftruncate() failing is not a critical error. */
365 }
366 close(fd);
367 sdsfree(ci);
368 return 0;
369
370 err:
371 if (fd != -1) close(fd);
372 sdsfree(ci);
373 return -1;
374 }
375
clusterSaveConfigOrDie(int do_fsync)376 void clusterSaveConfigOrDie(int do_fsync) {
377 if (clusterSaveConfig(do_fsync) == -1) {
378 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
379 exit(1);
380 }
381 }
382
383 /* Lock the cluster config using flock(), and leaks the file descriptor used to
384 * acquire the lock so that the file will be locked forever.
385 *
386 * This works because we always update nodes.conf with a new version
387 * in-place, reopening the file, and writing to it in place (later adjusting
388 * the length with ftruncate()).
389 *
390 * On success C_OK is returned, otherwise an error is logged and
391 * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)392 int clusterLockConfig(char *filename) {
393 /* flock() does not exist on Solaris
394 * and a fcntl-based solution won't help, as we constantly re-open that file,
395 * which will release _all_ locks anyway
396 */
397 #if !defined(__sun)
398 /* To lock it, we need to open the file in a way it is created if
399 * it does not exist, otherwise there is a race condition with other
400 * processes. */
401 int fd = open(filename,O_WRONLY|O_CREAT,0644);
402 if (fd == -1) {
403 serverLog(LL_WARNING,
404 "Can't open %s in order to acquire a lock: %s",
405 filename, strerror(errno));
406 return C_ERR;
407 }
408
409 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
410 if (errno == EWOULDBLOCK) {
411 serverLog(LL_WARNING,
412 "Sorry, the cluster configuration file %s is already used "
413 "by a different Redis Cluster node. Please make sure that "
414 "different nodes use different cluster configuration "
415 "files.", filename);
416 } else {
417 serverLog(LL_WARNING,
418 "Impossible to lock %s: %s", filename, strerror(errno));
419 }
420 close(fd);
421 return C_ERR;
422 }
423 /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
424 * lock to the file as long as the process exists.
425 *
426 * After fork, the child process will get the fd opened by the parent process,
427 * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
428 * it will be closed in the child process.
429 * If it is not closed, when the main process is killed -9, but the child process
430 * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
431 * child process, and the main process will fail to get lock, means fail to start. */
432 server.cluster_config_file_lock_fd = fd;
433 #endif /* __sun */
434
435 return C_OK;
436 }
437
438 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
439 * in the "myself" node based on the current configuration of the node,
440 * that may change at runtime via CONFIG SET. This function changes the
441 * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)442 void clusterUpdateMyselfFlags(void) {
443 int oldflags = myself->flags;
444 int nofailover = server.cluster_slave_no_failover ?
445 CLUSTER_NODE_NOFAILOVER : 0;
446 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
447 myself->flags |= nofailover;
448 if (myself->flags != oldflags) {
449 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
450 CLUSTER_TODO_UPDATE_STATE);
451 }
452 }
453
clusterInit(void)454 void clusterInit(void) {
455 int saveconf = 0;
456
457 server.cluster = zmalloc(sizeof(clusterState));
458 server.cluster->myself = NULL;
459 server.cluster->currentEpoch = 0;
460 server.cluster->state = CLUSTER_FAIL;
461 server.cluster->size = 1;
462 server.cluster->todo_before_sleep = 0;
463 server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
464 server.cluster->nodes_black_list =
465 dictCreate(&clusterNodesBlackListDictType,NULL);
466 server.cluster->failover_auth_time = 0;
467 server.cluster->failover_auth_count = 0;
468 server.cluster->failover_auth_rank = 0;
469 server.cluster->failover_auth_epoch = 0;
470 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
471 server.cluster->lastVoteEpoch = 0;
472 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
473 server.cluster->stats_bus_messages_sent[i] = 0;
474 server.cluster->stats_bus_messages_received[i] = 0;
475 }
476 server.cluster->stats_pfail_nodes = 0;
477 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
478 clusterCloseAllSlots();
479
480 /* Lock the cluster config file to make sure every node uses
481 * its own nodes.conf. */
482 server.cluster_config_file_lock_fd = -1;
483 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
484 exit(1);
485
486 /* Load or create a new nodes configuration. */
487 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
488 /* No configuration found. We will just use the random name provided
489 * by the createClusterNode() function. */
490 myself = server.cluster->myself =
491 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
492 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
493 myself->name);
494 clusterAddNode(myself);
495 saveconf = 1;
496 }
497 if (saveconf) clusterSaveConfigOrDie(1);
498
499 /* We need a listening TCP port for our cluster messaging needs. */
500 server.cfd_count = 0;
501
502 /* Port sanity check II
503 * The other handshake port check is triggered too late to stop
504 * us from trying to use a too-high cluster port number. */
505 int port = server.tls_cluster ? server.tls_port : server.port;
506 if (port > (65535-CLUSTER_PORT_INCR)) {
507 serverLog(LL_WARNING, "Redis port number too high. "
508 "Cluster communication port is 10,000 port "
509 "numbers higher than your Redis port. "
510 "Your Redis port number must be "
511 "lower than 55535.");
512 exit(1);
513 }
514 if (listenToPort(port+CLUSTER_PORT_INCR,
515 server.cfd,&server.cfd_count) == C_ERR)
516 {
517 exit(1);
518 } else {
519 int j;
520
521 for (j = 0; j < server.cfd_count; j++) {
522 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
523 clusterAcceptHandler, NULL) == AE_ERR)
524 serverPanic("Unrecoverable error creating Redis Cluster "
525 "file event.");
526 }
527 }
528
529 /* The slots -> keys map is a radix tree. Initialize it here. */
530 server.cluster->slots_to_keys = raxNew();
531 memset(server.cluster->slots_keys_count,0,
532 sizeof(server.cluster->slots_keys_count));
533
534 /* Set myself->port / cport to my listening ports, we'll just need to
535 * discover the IP address via MEET messages. */
536 myself->port = port;
537 myself->cport = port+CLUSTER_PORT_INCR;
538 if (server.cluster_announce_port)
539 myself->port = server.cluster_announce_port;
540 if (server.cluster_announce_bus_port)
541 myself->cport = server.cluster_announce_bus_port;
542
543 server.cluster->mf_end = 0;
544 resetManualFailover();
545 clusterUpdateMyselfFlags();
546 }
547
548 /* Reset a node performing a soft or hard reset:
549 *
550 * 1) All other nodes are forgotten.
551 * 2) All the assigned / open slots are released.
552 * 3) If the node is a slave, it turns into a master.
553 * 4) Only for hard reset: a new Node ID is generated.
554 * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
555 * 6) The new configuration is saved and the cluster state updated.
556 * 7) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)557 void clusterReset(int hard) {
558 dictIterator *di;
559 dictEntry *de;
560 int j;
561
562 /* Turn into master. */
563 if (nodeIsSlave(myself)) {
564 clusterSetNodeAsMaster(myself);
565 replicationUnsetMaster();
566 emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
567 }
568
569 /* Close slots, reset manual failover state. */
570 clusterCloseAllSlots();
571 resetManualFailover();
572
573 /* Unassign all the slots. */
574 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
575
576 /* Forget all the nodes, but myself. */
577 di = dictGetSafeIterator(server.cluster->nodes);
578 while((de = dictNext(di)) != NULL) {
579 clusterNode *node = dictGetVal(de);
580
581 if (node == myself) continue;
582 clusterDelNode(node);
583 }
584 dictReleaseIterator(di);
585
586 /* Hard reset only: set epochs to 0, change node ID. */
587 if (hard) {
588 sds oldname;
589
590 server.cluster->currentEpoch = 0;
591 server.cluster->lastVoteEpoch = 0;
592 myself->configEpoch = 0;
593 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
594
595 /* To change the Node ID we need to remove the old name from the
596 * nodes table, change the ID, and re-add back with new name. */
597 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
598 dictDelete(server.cluster->nodes,oldname);
599 sdsfree(oldname);
600 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
601 clusterAddNode(myself);
602 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
603 }
604
605 /* Make sure to persist the new config and update the state. */
606 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
607 CLUSTER_TODO_UPDATE_STATE|
608 CLUSTER_TODO_FSYNC_CONFIG);
609 }
610
611 /* -----------------------------------------------------------------------------
612 * CLUSTER communication link
613 * -------------------------------------------------------------------------- */
614
createClusterLink(clusterNode * node)615 clusterLink *createClusterLink(clusterNode *node) {
616 clusterLink *link = zmalloc(sizeof(*link));
617 link->ctime = mstime();
618 link->sndbuf = sdsempty();
619 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
620 link->rcvbuf_len = 0;
621 link->node = node;
622 link->conn = NULL;
623 return link;
624 }
625
626 /* Free a cluster link, but does not free the associated node of course.
627 * This function will just make sure that the original node associated
628 * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)629 void freeClusterLink(clusterLink *link) {
630 if (link->conn) {
631 connClose(link->conn);
632 link->conn = NULL;
633 }
634 sdsfree(link->sndbuf);
635 zfree(link->rcvbuf);
636 if (link->node)
637 link->node->link = NULL;
638 zfree(link);
639 }
640
clusterConnAcceptHandler(connection * conn)641 static void clusterConnAcceptHandler(connection *conn) {
642 clusterLink *link;
643
644 if (connGetState(conn) != CONN_STATE_CONNECTED) {
645 serverLog(LL_VERBOSE,
646 "Error accepting cluster node connection: %s", connGetLastError(conn));
647 connClose(conn);
648 return;
649 }
650
651 /* Create a link object we use to handle the connection.
652 * It gets passed to the readable handler when data is available.
653 * Initially the link->node pointer is set to NULL as we don't know
654 * which node is, but the right node is references once we know the
655 * node identity. */
656 link = createClusterLink(NULL);
657 link->conn = conn;
658 connSetPrivateData(conn, link);
659
660 /* Register read handler */
661 connSetReadHandler(conn, clusterReadHandler);
662 }
663
664 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)665 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
666 int cport, cfd;
667 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
668 char cip[NET_IP_STR_LEN];
669 UNUSED(el);
670 UNUSED(mask);
671 UNUSED(privdata);
672
673 /* If the server is starting up, don't accept cluster connections:
674 * UPDATE messages may interact with the database content. */
675 if (server.masterhost == NULL && server.loading) return;
676
677 while(max--) {
678 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
679 if (cfd == ANET_ERR) {
680 if (errno != EWOULDBLOCK)
681 serverLog(LL_VERBOSE,
682 "Error accepting cluster node: %s", server.neterr);
683 return;
684 }
685
686 connection *conn = server.tls_cluster ?
687 connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
688
689 /* Make sure connection is not in an error state */
690 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
691 serverLog(LL_VERBOSE,
692 "Error creating an accepting connection for cluster node: %s",
693 connGetLastError(conn));
694 connClose(conn);
695 return;
696 }
697 connNonBlock(conn);
698 connEnableTcpNoDelay(conn);
699 connKeepAlive(conn,server.cluster_node_timeout * 2);
700
701 /* Use non-blocking I/O for cluster messages. */
702 serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
703
704 /* Accept the connection now. connAccept() may call our handler directly
705 * or schedule it for later depending on connection implementation.
706 */
707 if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
708 if (connGetState(conn) == CONN_STATE_ERROR)
709 serverLog(LL_VERBOSE,
710 "Error accepting cluster node connection: %s",
711 connGetLastError(conn));
712 connClose(conn);
713 return;
714 }
715 }
716 }
717
718 /* Return the approximated number of sockets we are using in order to
719 * take the cluster bus connections. */
getClusterConnectionsCount(void)720 unsigned long getClusterConnectionsCount(void) {
721 /* We decrement the number of nodes by one, since there is the
722 * "myself" node too in the list. Each node uses two file descriptors,
723 * one incoming and one outgoing, thus the multiplication by 2. */
724 return server.cluster_enabled ?
725 ((dictSize(server.cluster->nodes)-1)*2) : 0;
726 }
727
728 /* -----------------------------------------------------------------------------
729 * Key space handling
730 * -------------------------------------------------------------------------- */
731
732 /* We have 16384 hash slots. The hash slot of a given key is obtained
733 * as the least significant 14 bits of the crc16 of the key.
734 *
735 * However if the key contains the {...} pattern, only the part between
736 * { and } is hashed. This may be useful in the future to force certain
737 * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)738 unsigned int keyHashSlot(char *key, int keylen) {
739 int s, e; /* start-end indexes of { and } */
740
741 for (s = 0; s < keylen; s++)
742 if (key[s] == '{') break;
743
744 /* No '{' ? Hash the whole key. This is the base case. */
745 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
746
747 /* '{' found? Check if we have the corresponding '}'. */
748 for (e = s+1; e < keylen; e++)
749 if (key[e] == '}') break;
750
751 /* No '}' or nothing between {} ? Hash the whole key. */
752 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
753
754 /* If we are here there is both a { and a } on its right. Hash
755 * what is in the middle between { and }. */
756 return crc16(key+s+1,e-s-1) & 0x3FFF;
757 }
758
759 /* -----------------------------------------------------------------------------
760 * CLUSTER node API
761 * -------------------------------------------------------------------------- */
762
763 /* Create a new cluster node, with the specified flags.
764 * If "nodename" is NULL this is considered a first handshake and a random
765 * node name is assigned to this node (it will be fixed later when we'll
766 * receive the first pong).
767 *
768 * The node is created and returned to the user, but it is not automatically
769 * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)770 clusterNode *createClusterNode(char *nodename, int flags) {
771 clusterNode *node = zmalloc(sizeof(*node));
772
773 if (nodename)
774 memcpy(node->name, nodename, CLUSTER_NAMELEN);
775 else
776 getRandomHexChars(node->name, CLUSTER_NAMELEN);
777 node->ctime = mstime();
778 node->configEpoch = 0;
779 node->flags = flags;
780 memset(node->slots,0,sizeof(node->slots));
781 node->numslots = 0;
782 node->numslaves = 0;
783 node->slaves = NULL;
784 node->slaveof = NULL;
785 node->ping_sent = node->pong_received = 0;
786 node->data_received = 0;
787 node->fail_time = 0;
788 node->link = NULL;
789 memset(node->ip,0,sizeof(node->ip));
790 node->port = 0;
791 node->cport = 0;
792 node->fail_reports = listCreate();
793 node->voted_time = 0;
794 node->orphaned_time = 0;
795 node->repl_offset_time = 0;
796 node->repl_offset = 0;
797 listSetFreeMethod(node->fail_reports,zfree);
798 return node;
799 }
800
801 /* This function is called every time we get a failure report from a node.
802 * The side effect is to populate the fail_reports list (or to update
803 * the timestamp of an existing report).
804 *
805 * 'failing' is the node that is in failure state according to the
806 * 'sender' node.
807 *
808 * The function returns 0 if it just updates a timestamp of an existing
809 * failure report from the same sender. 1 is returned if a new failure
810 * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)811 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
812 list *l = failing->fail_reports;
813 listNode *ln;
814 listIter li;
815 clusterNodeFailReport *fr;
816
817 /* If a failure report from the same sender already exists, just update
818 * the timestamp. */
819 listRewind(l,&li);
820 while ((ln = listNext(&li)) != NULL) {
821 fr = ln->value;
822 if (fr->node == sender) {
823 fr->time = mstime();
824 return 0;
825 }
826 }
827
828 /* Otherwise create a new report. */
829 fr = zmalloc(sizeof(*fr));
830 fr->node = sender;
831 fr->time = mstime();
832 listAddNodeTail(l,fr);
833 return 1;
834 }
835
836 /* Remove failure reports that are too old, where too old means reasonably
837 * older than the global node timeout. Note that anyway for a node to be
838 * flagged as FAIL we need to have a local PFAIL state that is at least
839 * older than the global node timeout, so we don't just trust the number
840 * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)841 void clusterNodeCleanupFailureReports(clusterNode *node) {
842 list *l = node->fail_reports;
843 listNode *ln;
844 listIter li;
845 clusterNodeFailReport *fr;
846 mstime_t maxtime = server.cluster_node_timeout *
847 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
848 mstime_t now = mstime();
849
850 listRewind(l,&li);
851 while ((ln = listNext(&li)) != NULL) {
852 fr = ln->value;
853 if (now - fr->time > maxtime) listDelNode(l,ln);
854 }
855 }
856
857 /* Remove the failing report for 'node' if it was previously considered
858 * failing by 'sender'. This function is called when a node informs us via
859 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
860 *
861 * Note that this function is called relatively often as it gets called even
862 * when there are no nodes failing, and is O(N), however when the cluster is
863 * fine the failure reports list is empty so the function runs in constant
864 * time.
865 *
866 * The function returns 1 if the failure report was found and removed.
867 * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)868 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
869 list *l = node->fail_reports;
870 listNode *ln;
871 listIter li;
872 clusterNodeFailReport *fr;
873
874 /* Search for a failure report from this sender. */
875 listRewind(l,&li);
876 while ((ln = listNext(&li)) != NULL) {
877 fr = ln->value;
878 if (fr->node == sender) break;
879 }
880 if (!ln) return 0; /* No failure report from this sender. */
881
882 /* Remove the failure report. */
883 listDelNode(l,ln);
884 clusterNodeCleanupFailureReports(node);
885 return 1;
886 }
887
888 /* Return the number of external nodes that believe 'node' is failing,
889 * not including this node, that may have a PFAIL or FAIL state for this
890 * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)891 int clusterNodeFailureReportsCount(clusterNode *node) {
892 clusterNodeCleanupFailureReports(node);
893 return listLength(node->fail_reports);
894 }
895
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)896 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
897 int j;
898
899 for (j = 0; j < master->numslaves; j++) {
900 if (master->slaves[j] == slave) {
901 if ((j+1) < master->numslaves) {
902 int remaining_slaves = (master->numslaves - j) - 1;
903 memmove(master->slaves+j,master->slaves+(j+1),
904 (sizeof(*master->slaves) * remaining_slaves));
905 }
906 master->numslaves--;
907 if (master->numslaves == 0)
908 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
909 return C_OK;
910 }
911 }
912 return C_ERR;
913 }
914
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)915 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
916 int j;
917
918 /* If it's already a slave, don't add it again. */
919 for (j = 0; j < master->numslaves; j++)
920 if (master->slaves[j] == slave) return C_ERR;
921 master->slaves = zrealloc(master->slaves,
922 sizeof(clusterNode*)*(master->numslaves+1));
923 master->slaves[master->numslaves] = slave;
924 master->numslaves++;
925 master->flags |= CLUSTER_NODE_MIGRATE_TO;
926 return C_OK;
927 }
928
clusterCountNonFailingSlaves(clusterNode * n)929 int clusterCountNonFailingSlaves(clusterNode *n) {
930 int j, okslaves = 0;
931
932 for (j = 0; j < n->numslaves; j++)
933 if (!nodeFailed(n->slaves[j])) okslaves++;
934 return okslaves;
935 }
936
937 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)938 void freeClusterNode(clusterNode *n) {
939 sds nodename;
940 int j;
941
942 /* If the node has associated slaves, we have to set
943 * all the slaves->slaveof fields to NULL (unknown). */
944 for (j = 0; j < n->numslaves; j++)
945 n->slaves[j]->slaveof = NULL;
946
947 /* Remove this node from the list of slaves of its master. */
948 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
949
950 /* Unlink from the set of nodes. */
951 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
952 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
953 sdsfree(nodename);
954
955 /* Release link and associated data structures. */
956 if (n->link) freeClusterLink(n->link);
957 listRelease(n->fail_reports);
958 zfree(n->slaves);
959 zfree(n);
960 }
961
962 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)963 int clusterAddNode(clusterNode *node) {
964 int retval;
965
966 retval = dictAdd(server.cluster->nodes,
967 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
968 return (retval == DICT_OK) ? C_OK : C_ERR;
969 }
970
971 /* Remove a node from the cluster. The function performs the high level
972 * cleanup, calling freeClusterNode() for the low level cleanup.
973 * Here we do the following:
974 *
975 * 1) Mark all the slots handled by it as unassigned.
976 * 2) Remove all the failure reports sent by this node and referenced by
977 * other nodes.
978 * 3) Free the node with freeClusterNode() that will in turn remove it
979 * from the hash table and from the list of slaves of its master, if
980 * it is a slave node.
981 */
clusterDelNode(clusterNode * delnode)982 void clusterDelNode(clusterNode *delnode) {
983 int j;
984 dictIterator *di;
985 dictEntry *de;
986
987 /* 1) Mark slots as unassigned. */
988 for (j = 0; j < CLUSTER_SLOTS; j++) {
989 if (server.cluster->importing_slots_from[j] == delnode)
990 server.cluster->importing_slots_from[j] = NULL;
991 if (server.cluster->migrating_slots_to[j] == delnode)
992 server.cluster->migrating_slots_to[j] = NULL;
993 if (server.cluster->slots[j] == delnode)
994 clusterDelSlot(j);
995 }
996
997 /* 2) Remove failure reports. */
998 di = dictGetSafeIterator(server.cluster->nodes);
999 while((de = dictNext(di)) != NULL) {
1000 clusterNode *node = dictGetVal(de);
1001
1002 if (node == delnode) continue;
1003 clusterNodeDelFailureReport(node,delnode);
1004 }
1005 dictReleaseIterator(di);
1006
1007 /* 3) Free the node, unlinking it from the cluster. */
1008 freeClusterNode(delnode);
1009 }
1010
1011 /* Node lookup by name */
clusterLookupNode(const char * name)1012 clusterNode *clusterLookupNode(const char *name) {
1013 sds s = sdsnewlen(name, CLUSTER_NAMELEN);
1014 dictEntry *de;
1015
1016 de = dictFind(server.cluster->nodes,s);
1017 sdsfree(s);
1018 if (de == NULL) return NULL;
1019 return dictGetVal(de);
1020 }
1021
1022 /* This is only used after the handshake. When we connect a given IP/PORT
1023 * as a result of CLUSTER MEET we don't have the node name yet, so we
1024 * pick a random one, and will fix it when we receive the PONG request using
1025 * this function. */
clusterRenameNode(clusterNode * node,char * newname)1026 void clusterRenameNode(clusterNode *node, char *newname) {
1027 int retval;
1028 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1029
1030 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1031 node->name, newname);
1032 retval = dictDelete(server.cluster->nodes, s);
1033 sdsfree(s);
1034 serverAssert(retval == DICT_OK);
1035 memcpy(node->name, newname, CLUSTER_NAMELEN);
1036 clusterAddNode(node);
1037 }
1038
1039 /* -----------------------------------------------------------------------------
1040 * CLUSTER config epoch handling
1041 * -------------------------------------------------------------------------- */
1042
1043 /* Return the greatest configEpoch found in the cluster, or the current
1044 * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)1045 uint64_t clusterGetMaxEpoch(void) {
1046 uint64_t max = 0;
1047 dictIterator *di;
1048 dictEntry *de;
1049
1050 di = dictGetSafeIterator(server.cluster->nodes);
1051 while((de = dictNext(di)) != NULL) {
1052 clusterNode *node = dictGetVal(de);
1053 if (node->configEpoch > max) max = node->configEpoch;
1054 }
1055 dictReleaseIterator(di);
1056 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1057 return max;
1058 }
1059
1060 /* If this node epoch is zero or is not already the greatest across the
1061 * cluster (from the POV of the local configuration), this function will:
1062 *
1063 * 1) Generate a new config epoch, incrementing the current epoch.
1064 * 2) Assign the new epoch to this node, WITHOUT any consensus.
1065 * 3) Persist the configuration on disk before sending packets with the
1066 * new configuration.
1067 *
1068 * If the new config epoch is generated and assigned, C_OK is returned,
1069 * otherwise C_ERR is returned (since the node has already the greatest
1070 * configuration around) and no operation is performed.
1071 *
1072 * Important note: this function violates the principle that config epochs
1073 * should be generated with consensus and should be unique across the cluster.
1074 * However Redis Cluster uses this auto-generated new config epochs in two
1075 * cases:
1076 *
1077 * 1) When slots are closed after importing. Otherwise resharding would be
1078 * too expensive.
1079 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1080 * failover its master even if there is not master majority able to
1081 * create a new configuration epoch.
1082 *
1083 * Redis Cluster will not explode using this function, even in the case of
1084 * a collision between this node and another node, generating the same
1085 * configuration epoch unilaterally, because the config epoch conflict
1086 * resolution algorithm will eventually move colliding nodes to different
1087 * config epochs. However using this function may violate the "last failover
1088 * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1089 int clusterBumpConfigEpochWithoutConsensus(void) {
1090 uint64_t maxEpoch = clusterGetMaxEpoch();
1091
1092 if (myself->configEpoch == 0 ||
1093 myself->configEpoch != maxEpoch)
1094 {
1095 server.cluster->currentEpoch++;
1096 myself->configEpoch = server.cluster->currentEpoch;
1097 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1098 CLUSTER_TODO_FSYNC_CONFIG);
1099 serverLog(LL_WARNING,
1100 "New configEpoch set to %llu",
1101 (unsigned long long) myself->configEpoch);
1102 return C_OK;
1103 } else {
1104 return C_ERR;
1105 }
1106 }
1107
1108 /* This function is called when this node is a master, and we receive from
1109 * another master a configuration epoch that is equal to our configuration
1110 * epoch.
1111 *
1112 * BACKGROUND
1113 *
1114 * It is not possible that different slaves get the same config
1115 * epoch during a failover election, because the slaves need to get voted
1116 * by a majority. However when we perform a manual resharding of the cluster
1117 * the node will assign a configuration epoch to itself without to ask
1118 * for agreement. Usually resharding happens when the cluster is working well
1119 * and is supervised by the sysadmin, however it is possible for a failover
1120 * to happen exactly while the node we are resharding a slot to assigns itself
1121 * a new configuration epoch, but before it is able to propagate it.
1122 *
1123 * So technically it is possible in this condition that two nodes end with
1124 * the same configuration epoch.
1125 *
1126 * Another possibility is that there are bugs in the implementation causing
1127 * this to happen.
1128 *
1129 * Moreover when a new cluster is created, all the nodes start with the same
1130 * configEpoch. This collision resolution code allows nodes to automatically
1131 * end with a different configEpoch at startup automatically.
1132 *
1133 * In all the cases, we want a mechanism that resolves this issue automatically
1134 * as a safeguard. The same configuration epoch for masters serving different
1135 * set of slots is not harmful, but it is if the nodes end serving the same
1136 * slots for some reason (manual errors or software bugs) without a proper
1137 * failover procedure.
1138 *
1139 * In general we want a system that eventually always ends with different
1140 * masters having different configuration epochs whatever happened, since
1141 * nothing is worse than a split-brain condition in a distributed system.
1142 *
1143 * BEHAVIOR
1144 *
1145 * When this function gets called, what happens is that if this node
1146 * has the lexicographically smaller Node ID compared to the other node
1147 * with the conflicting epoch (the 'sender' node), it will assign itself
1148 * the greatest configuration epoch currently detected among nodes plus 1.
1149 *
1150 * This means that even if there are multiple nodes colliding, the node
1151 * with the greatest Node ID never moves forward, so eventually all the nodes
1152 * end with a different configuration epoch.
1153 */
clusterHandleConfigEpochCollision(clusterNode * sender)1154 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1155 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1156 if (sender->configEpoch != myself->configEpoch ||
1157 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1158 /* Don't act if the colliding node has a smaller Node ID. */
1159 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1160 /* Get the next ID available at the best of this node knowledge. */
1161 server.cluster->currentEpoch++;
1162 myself->configEpoch = server.cluster->currentEpoch;
1163 clusterSaveConfigOrDie(1);
1164 serverLog(LL_VERBOSE,
1165 "WARNING: configEpoch collision with node %.40s."
1166 " configEpoch set to %llu",
1167 sender->name,
1168 (unsigned long long) myself->configEpoch);
1169 }
1170
1171 /* -----------------------------------------------------------------------------
1172 * CLUSTER nodes blacklist
1173 *
1174 * The nodes blacklist is just a way to ensure that a given node with a given
1175 * Node ID is not readded before some time elapsed (this time is specified
1176 * in seconds in CLUSTER_BLACKLIST_TTL).
1177 *
1178 * This is useful when we want to remove a node from the cluster completely:
1179 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1180 * that even if we receive gossip messages from other nodes that still remember
1181 * about the node we want to remove, we don't re-add it before some time.
1182 *
1183 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1184 * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1185 * in the cluster without dealing with the problem of other nodes re-adding
1186 * back the node to nodes we already sent the FORGET command to.
1187 *
1188 * The data structure used is a hash table with an sds string representing
1189 * the node ID as key, and the time when it is ok to re-add the node as
1190 * value.
1191 * -------------------------------------------------------------------------- */
1192
1193 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1194
1195
1196 /* Before of the addNode() or Exists() operations we always remove expired
1197 * entries from the black list. This is an O(N) operation but it is not a
1198 * problem since add / exists operations are called very infrequently and
1199 * the hash table is supposed to contain very little elements at max.
1200 * However without the cleanup during long uptime and with some automated
1201 * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1202 void clusterBlacklistCleanup(void) {
1203 dictIterator *di;
1204 dictEntry *de;
1205
1206 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1207 while((de = dictNext(di)) != NULL) {
1208 int64_t expire = dictGetUnsignedIntegerVal(de);
1209
1210 if (expire < server.unixtime)
1211 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1212 }
1213 dictReleaseIterator(di);
1214 }
1215
1216 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1217 void clusterBlacklistAddNode(clusterNode *node) {
1218 dictEntry *de;
1219 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1220
1221 clusterBlacklistCleanup();
1222 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1223 /* If the key was added, duplicate the sds string representation of
1224 * the key for the next lookup. We'll free it at the end. */
1225 id = sdsdup(id);
1226 }
1227 de = dictFind(server.cluster->nodes_black_list,id);
1228 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1229 sdsfree(id);
1230 }
1231
1232 /* Return non-zero if the specified node ID exists in the blacklist.
1233 * You don't need to pass an sds string here, any pointer to 40 bytes
1234 * will work. */
clusterBlacklistExists(char * nodeid)1235 int clusterBlacklistExists(char *nodeid) {
1236 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1237 int retval;
1238
1239 clusterBlacklistCleanup();
1240 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1241 sdsfree(id);
1242 return retval;
1243 }
1244
1245 /* -----------------------------------------------------------------------------
1246 * CLUSTER messages exchange - PING/PONG and gossip
1247 * -------------------------------------------------------------------------- */
1248
1249 /* This function checks if a given node should be marked as FAIL.
1250 * It happens if the following conditions are met:
1251 *
1252 * 1) We received enough failure reports from other master nodes via gossip.
1253 * Enough means that the majority of the masters signaled the node is
1254 * down recently.
1255 * 2) We believe this node is in PFAIL state.
1256 *
1257 * If a failure is detected we also inform the whole cluster about this
1258 * event trying to force every other node to set the FAIL flag for the node.
1259 *
1260 * Note that the form of agreement used here is weak, as we collect the majority
1261 * of masters state during some time, and even if we force agreement by
1262 * propagating the FAIL message, because of partitions we may not reach every
1263 * node. However:
1264 *
1265 * 1) Either we reach the majority and eventually the FAIL state will propagate
1266 * to all the cluster.
1267 * 2) Or there is no majority so no slave promotion will be authorized and the
1268 * FAIL flag will be cleared after some time.
1269 */
markNodeAsFailingIfNeeded(clusterNode * node)1270 void markNodeAsFailingIfNeeded(clusterNode *node) {
1271 int failures;
1272 int needed_quorum = (server.cluster->size / 2) + 1;
1273
1274 if (!nodeTimedOut(node)) return; /* We can reach it. */
1275 if (nodeFailed(node)) return; /* Already FAILing. */
1276
1277 failures = clusterNodeFailureReportsCount(node);
1278 /* Also count myself as a voter if I'm a master. */
1279 if (nodeIsMaster(myself)) failures++;
1280 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1281
1282 serverLog(LL_NOTICE,
1283 "Marking node %.40s as failing (quorum reached).", node->name);
1284
1285 /* Mark the node as failing. */
1286 node->flags &= ~CLUSTER_NODE_PFAIL;
1287 node->flags |= CLUSTER_NODE_FAIL;
1288 node->fail_time = mstime();
1289
1290 /* Broadcast the failing node name to everybody, forcing all the other
1291 * reachable nodes to flag the node as FAIL.
1292 * We do that even if this node is a replica and not a master: anyway
1293 * the failing state is triggered collecting failure reports from masters,
1294 * so here the replica is only helping propagating this status. */
1295 clusterSendFail(node->name);
1296 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1297 }
1298
1299 /* This function is called only if a node is marked as FAIL, but we are able
1300 * to reach it again. It checks if there are the conditions to undo the FAIL
1301 * state. */
clearNodeFailureIfNeeded(clusterNode * node)1302 void clearNodeFailureIfNeeded(clusterNode *node) {
1303 mstime_t now = mstime();
1304
1305 serverAssert(nodeFailed(node));
1306
1307 /* For slaves we always clear the FAIL flag if we can contact the
1308 * node again. */
1309 if (nodeIsSlave(node) || node->numslots == 0) {
1310 serverLog(LL_NOTICE,
1311 "Clear FAIL state for node %.40s: %s is reachable again.",
1312 node->name,
1313 nodeIsSlave(node) ? "replica" : "master without slots");
1314 node->flags &= ~CLUSTER_NODE_FAIL;
1315 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1316 }
1317
1318 /* If it is a master and...
1319 * 1) The FAIL state is old enough.
1320 * 2) It is yet serving slots from our point of view (not failed over).
1321 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1322 if (nodeIsMaster(node) && node->numslots > 0 &&
1323 (now - node->fail_time) >
1324 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1325 {
1326 serverLog(LL_NOTICE,
1327 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1328 node->name);
1329 node->flags &= ~CLUSTER_NODE_FAIL;
1330 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1331 }
1332 }
1333
1334 /* Return true if we already have a node in HANDSHAKE state matching the
1335 * specified ip address and port number. This function is used in order to
1336 * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1337 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1338 dictIterator *di;
1339 dictEntry *de;
1340
1341 di = dictGetSafeIterator(server.cluster->nodes);
1342 while((de = dictNext(di)) != NULL) {
1343 clusterNode *node = dictGetVal(de);
1344
1345 if (!nodeInHandshake(node)) continue;
1346 if (!strcasecmp(node->ip,ip) &&
1347 node->port == port &&
1348 node->cport == cport) break;
1349 }
1350 dictReleaseIterator(di);
1351 return de != NULL;
1352 }
1353
1354 /* Start a handshake with the specified address if there is not one
1355 * already in progress. Returns non-zero if the handshake was actually
1356 * started. On error zero is returned and errno is set to one of the
1357 * following values:
1358 *
1359 * EAGAIN - There is already a handshake in progress for this address.
1360 * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1361 int clusterStartHandshake(char *ip, int port, int cport) {
1362 clusterNode *n;
1363 char norm_ip[NET_IP_STR_LEN];
1364 struct sockaddr_storage sa;
1365
1366 /* IP sanity check */
1367 if (inet_pton(AF_INET,ip,
1368 &(((struct sockaddr_in *)&sa)->sin_addr)))
1369 {
1370 sa.ss_family = AF_INET;
1371 } else if (inet_pton(AF_INET6,ip,
1372 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1373 {
1374 sa.ss_family = AF_INET6;
1375 } else {
1376 errno = EINVAL;
1377 return 0;
1378 }
1379
1380 /* Port sanity check */
1381 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1382 errno = EINVAL;
1383 return 0;
1384 }
1385
1386 /* Set norm_ip as the normalized string representation of the node
1387 * IP address. */
1388 memset(norm_ip,0,NET_IP_STR_LEN);
1389 if (sa.ss_family == AF_INET)
1390 inet_ntop(AF_INET,
1391 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1392 norm_ip,NET_IP_STR_LEN);
1393 else
1394 inet_ntop(AF_INET6,
1395 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1396 norm_ip,NET_IP_STR_LEN);
1397
1398 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1399 errno = EAGAIN;
1400 return 0;
1401 }
1402
1403 /* Add the node with a random address (NULL as first argument to
1404 * createClusterNode()). Everything will be fixed during the
1405 * handshake. */
1406 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1407 memcpy(n->ip,norm_ip,sizeof(n->ip));
1408 n->port = port;
1409 n->cport = cport;
1410 clusterAddNode(n);
1411 return 1;
1412 }
1413
1414 /* Process the gossip section of PING or PONG packets.
1415 * Note that this function assumes that the packet is already sanity-checked
1416 * by the caller, not in the content of the gossip section, but in the
1417 * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1418 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1419 uint16_t count = ntohs(hdr->count);
1420 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1421 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1422
1423 while(count--) {
1424 uint16_t flags = ntohs(g->flags);
1425 clusterNode *node;
1426 sds ci;
1427
1428 if (server.verbosity == LL_DEBUG) {
1429 ci = representClusterNodeFlags(sdsempty(), flags);
1430 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1431 g->nodename,
1432 g->ip,
1433 ntohs(g->port),
1434 ntohs(g->cport),
1435 ci);
1436 sdsfree(ci);
1437 }
1438
1439 /* Update our state accordingly to the gossip sections */
1440 node = clusterLookupNode(g->nodename);
1441 if (node) {
1442 /* We already know this node.
1443 Handle failure reports, only when the sender is a master. */
1444 if (sender && nodeIsMaster(sender) && node != myself) {
1445 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1446 if (clusterNodeAddFailureReport(node,sender)) {
1447 serverLog(LL_VERBOSE,
1448 "Node %.40s reported node %.40s as not reachable.",
1449 sender->name, node->name);
1450 }
1451 markNodeAsFailingIfNeeded(node);
1452 } else {
1453 if (clusterNodeDelFailureReport(node,sender)) {
1454 serverLog(LL_VERBOSE,
1455 "Node %.40s reported node %.40s is back online.",
1456 sender->name, node->name);
1457 }
1458 }
1459 }
1460
1461 /* If from our POV the node is up (no failure flags are set),
1462 * we have no pending ping for the node, nor we have failure
1463 * reports for this node, update the last pong time with the
1464 * one we see from the other nodes. */
1465 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1466 node->ping_sent == 0 &&
1467 clusterNodeFailureReportsCount(node) == 0)
1468 {
1469 mstime_t pongtime = ntohl(g->pong_received);
1470 pongtime *= 1000; /* Convert back to milliseconds. */
1471
1472 /* Replace the pong time with the received one only if
1473 * it's greater than our view but is not in the future
1474 * (with 500 milliseconds tolerance) from the POV of our
1475 * clock. */
1476 if (pongtime <= (server.mstime+500) &&
1477 pongtime > node->pong_received)
1478 {
1479 node->pong_received = pongtime;
1480 }
1481 }
1482
1483 /* If we already know this node, but it is not reachable, and
1484 * we see a different address in the gossip section of a node that
1485 * can talk with this other node, update the address, disconnect
1486 * the old link if any, so that we'll attempt to connect with the
1487 * new address. */
1488 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1489 !(flags & CLUSTER_NODE_NOADDR) &&
1490 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1491 (strcasecmp(node->ip,g->ip) ||
1492 node->port != ntohs(g->port) ||
1493 node->cport != ntohs(g->cport)))
1494 {
1495 if (node->link) freeClusterLink(node->link);
1496 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1497 node->port = ntohs(g->port);
1498 node->cport = ntohs(g->cport);
1499 node->flags &= ~CLUSTER_NODE_NOADDR;
1500 }
1501 } else {
1502 /* If it's not in NOADDR state and we don't have it, we
1503 * add it to our trusted dict with exact nodeid and flag.
1504 * Note that we cannot simply start a handshake against
1505 * this IP/PORT pairs, since IP/PORT can be reused already,
1506 * otherwise we risk joining another cluster.
1507 *
1508 * Note that we require that the sender of this gossip message
1509 * is a well known node in our cluster, otherwise we risk
1510 * joining another cluster. */
1511 if (sender &&
1512 !(flags & CLUSTER_NODE_NOADDR) &&
1513 !clusterBlacklistExists(g->nodename))
1514 {
1515 clusterNode *node;
1516 node = createClusterNode(g->nodename, flags);
1517 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1518 node->port = ntohs(g->port);
1519 node->cport = ntohs(g->cport);
1520 clusterAddNode(node);
1521 }
1522 }
1523
1524 /* Next node */
1525 g++;
1526 }
1527 }
1528
1529 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1530 * If 'announced_ip' length is non-zero, it is used instead of extracting
1531 * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1532 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1533 if (announced_ip[0] != '\0') {
1534 memcpy(buf,announced_ip,NET_IP_STR_LEN);
1535 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1536 } else {
1537 connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
1538 }
1539 }
1540
1541 /* Update the node address to the IP address that can be extracted
1542 * from link->fd, or if hdr->myip is non empty, to the address the node
1543 * is announcing us. The port is taken from the packet header as well.
1544 *
1545 * If the address or port changed, disconnect the node link so that we'll
1546 * connect again to the new address.
1547 *
1548 * If the ip/port pair are already correct no operation is performed at
1549 * all.
1550 *
1551 * The function returns 0 if the node address is still the same,
1552 * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1553 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1554 clusterMsg *hdr)
1555 {
1556 char ip[NET_IP_STR_LEN] = {0};
1557 int port = ntohs(hdr->port);
1558 int cport = ntohs(hdr->cport);
1559
1560 /* We don't proceed if the link is the same as the sender link, as this
1561 * function is designed to see if the node link is consistent with the
1562 * symmetric link that is used to receive PINGs from the node.
1563 *
1564 * As a side effect this function never frees the passed 'link', so
1565 * it is safe to call during packet processing. */
1566 if (link == node->link) return 0;
1567
1568 nodeIp2String(ip,link,hdr->myip);
1569 if (node->port == port && node->cport == cport &&
1570 strcmp(ip,node->ip) == 0) return 0;
1571
1572 /* IP / port is different, update it. */
1573 memcpy(node->ip,ip,sizeof(ip));
1574 node->port = port;
1575 node->cport = cport;
1576 if (node->link) freeClusterLink(node->link);
1577 node->flags &= ~CLUSTER_NODE_NOADDR;
1578 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1579 node->name, node->ip, node->port);
1580
1581 /* Check if this is our master and we have to change the
1582 * replication target as well. */
1583 if (nodeIsSlave(myself) && myself->slaveof == node)
1584 replicationSetMaster(node->ip, node->port);
1585 return 1;
1586 }
1587
1588 /* Reconfigure the specified node 'n' as a master. This function is called when
1589 * a node that we believed to be a slave is now acting as master in order to
1590 * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1591 void clusterSetNodeAsMaster(clusterNode *n) {
1592 if (nodeIsMaster(n)) return;
1593
1594 if (n->slaveof) {
1595 clusterNodeRemoveSlave(n->slaveof,n);
1596 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1597 }
1598 n->flags &= ~CLUSTER_NODE_SLAVE;
1599 n->flags |= CLUSTER_NODE_MASTER;
1600 n->slaveof = NULL;
1601
1602 /* Update config and state. */
1603 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1604 CLUSTER_TODO_UPDATE_STATE);
1605 }
1606
1607 /* This function is called when we receive a master configuration via a
1608 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1609 * node, and the set of slots claimed under this configEpoch.
1610 *
1611 * What we do is to rebind the slots with newer configuration compared to our
1612 * local configuration, and if needed, we turn ourself into a replica of the
1613 * node (see the function comments for more info).
1614 *
1615 * The 'sender' is the node for which we received a configuration update.
1616 * Sometimes it is not actually the "Sender" of the information, like in the
1617 * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1618 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1619 int j;
1620 clusterNode *curmaster, *newmaster = NULL;
1621 /* The dirty slots list is a list of slots for which we lose the ownership
1622 * while having still keys inside. This usually happens after a failover
1623 * or after a manual cluster reconfiguration operated by the admin.
1624 *
1625 * If the update message is not able to demote a master to slave (in this
1626 * case we'll resync with the master updating the whole key space), we
1627 * need to delete all the keys in the slots we lost ownership. */
1628 uint16_t dirty_slots[CLUSTER_SLOTS];
1629 int dirty_slots_count = 0;
1630
1631 /* Here we set curmaster to this node or the node this node
1632 * replicates to if it's a slave. In the for loop we are
1633 * interested to check if slots are taken away from curmaster. */
1634 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1635
1636 if (sender == myself) {
1637 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1638 return;
1639 }
1640
1641 for (j = 0; j < CLUSTER_SLOTS; j++) {
1642 if (bitmapTestBit(slots,j)) {
1643 /* The slot is already bound to the sender of this message. */
1644 if (server.cluster->slots[j] == sender) continue;
1645
1646 /* The slot is in importing state, it should be modified only
1647 * manually via redis-trib (example: a resharding is in progress
1648 * and the migrating side slot was already closed and is advertising
1649 * a new config. We still want the slot to be closed manually). */
1650 if (server.cluster->importing_slots_from[j]) continue;
1651
1652 /* We rebind the slot to the new node claiming it if:
1653 * 1) The slot was unassigned or the new node claims it with a
1654 * greater configEpoch.
1655 * 2) We are not currently importing the slot. */
1656 if (server.cluster->slots[j] == NULL ||
1657 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1658 {
1659 /* Was this slot mine, and still contains keys? Mark it as
1660 * a dirty slot. */
1661 if (server.cluster->slots[j] == myself &&
1662 countKeysInSlot(j) &&
1663 sender != myself)
1664 {
1665 dirty_slots[dirty_slots_count] = j;
1666 dirty_slots_count++;
1667 }
1668
1669 if (server.cluster->slots[j] == curmaster)
1670 newmaster = sender;
1671 clusterDelSlot(j);
1672 clusterAddSlot(sender,j);
1673 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1674 CLUSTER_TODO_UPDATE_STATE|
1675 CLUSTER_TODO_FSYNC_CONFIG);
1676 }
1677 }
1678 }
1679
1680 /* After updating the slots configuration, don't do any actual change
1681 * in the state of the server if a module disabled Redis Cluster
1682 * keys redirections. */
1683 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1684 return;
1685
1686 /* If at least one slot was reassigned from a node to another node
1687 * with a greater configEpoch, it is possible that:
1688 * 1) We are a master left without slots. This means that we were
1689 * failed over and we should turn into a replica of the new
1690 * master.
1691 * 2) We are a slave and our master is left without slots. We need
1692 * to replicate to the new slots owner. */
1693 if (newmaster && curmaster->numslots == 0) {
1694 serverLog(LL_WARNING,
1695 "Configuration change detected. Reconfiguring myself "
1696 "as a replica of %.40s", sender->name);
1697 clusterSetMaster(sender);
1698 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1699 CLUSTER_TODO_UPDATE_STATE|
1700 CLUSTER_TODO_FSYNC_CONFIG);
1701 } else if (dirty_slots_count) {
1702 /* If we are here, we received an update message which removed
1703 * ownership for certain slots we still have keys about, but still
1704 * we are serving some slots, so this master node was not demoted to
1705 * a slave.
1706 *
1707 * In order to maintain a consistent state between keys and slots
1708 * we need to remove all the keys from the slots we lost. */
1709 for (j = 0; j < dirty_slots_count; j++)
1710 delKeysInSlot(dirty_slots[j]);
1711 }
1712 }
1713
1714 /* When this function is called, there is a packet to process starting
1715 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1716 * function should just handle the higher level stuff of processing the
1717 * packet, modifying the cluster state if needed.
1718 *
1719 * The function returns 1 if the link is still valid after the packet
1720 * was processed, otherwise 0 if the link was freed since the packet
1721 * processing lead to some inconsistency error (for instance a PONG
1722 * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1723 int clusterProcessPacket(clusterLink *link) {
1724 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1725 uint32_t totlen = ntohl(hdr->totlen);
1726 uint16_t type = ntohs(hdr->type);
1727 mstime_t now = mstime();
1728
1729 if (type < CLUSTERMSG_TYPE_COUNT)
1730 server.cluster->stats_bus_messages_received[type]++;
1731 serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1732 type, (unsigned long) totlen);
1733
1734 /* Perform sanity checks */
1735 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1736 if (totlen > link->rcvbuf_len) return 1;
1737
1738 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1739 /* Can't handle messages of different versions. */
1740 return 1;
1741 }
1742
1743 uint16_t flags = ntohs(hdr->flags);
1744 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1745 clusterNode *sender;
1746
1747 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1748 type == CLUSTERMSG_TYPE_MEET)
1749 {
1750 uint16_t count = ntohs(hdr->count);
1751 uint32_t explen; /* expected length of this packet */
1752
1753 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1754 explen += (sizeof(clusterMsgDataGossip)*count);
1755 if (totlen != explen) return 1;
1756 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1757 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1758
1759 explen += sizeof(clusterMsgDataFail);
1760 if (totlen != explen) return 1;
1761 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1762 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1763
1764 explen += sizeof(clusterMsgDataPublish) -
1765 8 +
1766 ntohl(hdr->data.publish.msg.channel_len) +
1767 ntohl(hdr->data.publish.msg.message_len);
1768 if (totlen != explen) return 1;
1769 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1770 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1771 type == CLUSTERMSG_TYPE_MFSTART)
1772 {
1773 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1774
1775 if (totlen != explen) return 1;
1776 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1777 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1778
1779 explen += sizeof(clusterMsgDataUpdate);
1780 if (totlen != explen) return 1;
1781 } else if (type == CLUSTERMSG_TYPE_MODULE) {
1782 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1783
1784 explen += sizeof(clusterMsgModule) -
1785 3 + ntohl(hdr->data.module.msg.len);
1786 if (totlen != explen) return 1;
1787 }
1788
1789 /* Check if the sender is a known node. Note that for incoming connections
1790 * we don't store link->node information, but resolve the node by the
1791 * ID in the header each time in the current implementation. */
1792 sender = clusterLookupNode(hdr->sender);
1793
1794 /* Update the last time we saw any data from this node. We
1795 * use this in order to avoid detecting a timeout from a node that
1796 * is just sending a lot of data in the cluster bus, for instance
1797 * because of Pub/Sub. */
1798 if (sender) sender->data_received = now;
1799
1800 if (sender && !nodeInHandshake(sender)) {
1801 /* Update our currentEpoch if we see a newer epoch in the cluster. */
1802 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1803 senderConfigEpoch = ntohu64(hdr->configEpoch);
1804 if (senderCurrentEpoch > server.cluster->currentEpoch)
1805 server.cluster->currentEpoch = senderCurrentEpoch;
1806 /* Update the sender configEpoch if it is publishing a newer one. */
1807 if (senderConfigEpoch > sender->configEpoch) {
1808 sender->configEpoch = senderConfigEpoch;
1809 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1810 CLUSTER_TODO_FSYNC_CONFIG);
1811 }
1812 /* Update the replication offset info for this node. */
1813 sender->repl_offset = ntohu64(hdr->offset);
1814 sender->repl_offset_time = now;
1815 /* If we are a slave performing a manual failover and our master
1816 * sent its offset while already paused, populate the MF state. */
1817 if (server.cluster->mf_end &&
1818 nodeIsSlave(myself) &&
1819 myself->slaveof == sender &&
1820 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1821 server.cluster->mf_master_offset == 0)
1822 {
1823 server.cluster->mf_master_offset = sender->repl_offset;
1824 serverLog(LL_WARNING,
1825 "Received replication offset for paused "
1826 "master manual failover: %lld",
1827 server.cluster->mf_master_offset);
1828 }
1829 }
1830
1831 /* Initial processing of PING and MEET requests replying with a PONG. */
1832 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1833 serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1834
1835 /* We use incoming MEET messages in order to set the address
1836 * for 'myself', since only other cluster nodes will send us
1837 * MEET messages on handshakes, when the cluster joins, or
1838 * later if we changed address, and those nodes will use our
1839 * official address to connect to us. So by obtaining this address
1840 * from the socket is a simple way to discover / update our own
1841 * address in the cluster without it being hardcoded in the config.
1842 *
1843 * However if we don't have an address at all, we update the address
1844 * even with a normal PING packet. If it's wrong it will be fixed
1845 * by MEET later. */
1846 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1847 server.cluster_announce_ip == NULL)
1848 {
1849 char ip[NET_IP_STR_LEN];
1850
1851 if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
1852 strcmp(ip,myself->ip))
1853 {
1854 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1855 serverLog(LL_WARNING,"IP address for this node updated to %s",
1856 myself->ip);
1857 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1858 }
1859 }
1860
1861 /* Add this node if it is new for us and the msg type is MEET.
1862 * In this stage we don't try to add the node with the right
1863 * flags, slaveof pointer, and so forth, as this details will be
1864 * resolved when we'll receive PONGs from the node. */
1865 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1866 clusterNode *node;
1867
1868 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1869 nodeIp2String(node->ip,link,hdr->myip);
1870 node->port = ntohs(hdr->port);
1871 node->cport = ntohs(hdr->cport);
1872 clusterAddNode(node);
1873 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1874 }
1875
1876 /* If this is a MEET packet from an unknown node, we still process
1877 * the gossip section here since we have to trust the sender because
1878 * of the message type. */
1879 if (!sender && type == CLUSTERMSG_TYPE_MEET)
1880 clusterProcessGossipSection(hdr,link);
1881
1882 /* Anyway reply with a PONG */
1883 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1884 }
1885
1886 /* PING, PONG, MEET: process config information. */
1887 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1888 type == CLUSTERMSG_TYPE_MEET)
1889 {
1890 serverLog(LL_DEBUG,"%s packet received: %p",
1891 type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1892 (void*)link->node);
1893 if (link->node) {
1894 if (nodeInHandshake(link->node)) {
1895 /* If we already have this node, try to change the
1896 * IP/port of the node with the new one. */
1897 if (sender) {
1898 serverLog(LL_VERBOSE,
1899 "Handshake: we already know node %.40s, "
1900 "updating the address if needed.", sender->name);
1901 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1902 {
1903 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1904 CLUSTER_TODO_UPDATE_STATE);
1905 }
1906 /* Free this node as we already have it. This will
1907 * cause the link to be freed as well. */
1908 clusterDelNode(link->node);
1909 return 0;
1910 }
1911
1912 /* First thing to do is replacing the random name with the
1913 * right node name if this was a handshake stage. */
1914 clusterRenameNode(link->node, hdr->sender);
1915 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1916 link->node->name);
1917 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1918 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1919 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1920 } else if (memcmp(link->node->name,hdr->sender,
1921 CLUSTER_NAMELEN) != 0)
1922 {
1923 /* If the reply has a non matching node ID we
1924 * disconnect this node and set it as not having an associated
1925 * address. */
1926 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1927 link->node->name,
1928 (int)(now-(link->node->ctime)),
1929 link->node->flags);
1930 link->node->flags |= CLUSTER_NODE_NOADDR;
1931 link->node->ip[0] = '\0';
1932 link->node->port = 0;
1933 link->node->cport = 0;
1934 freeClusterLink(link);
1935 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1936 return 0;
1937 }
1938 }
1939
1940 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1941 * announced. This is a dynamic flag that we receive from the
1942 * sender, and the latest status must be trusted. We need it to
1943 * be propagated because the slave ranking used to understand the
1944 * delay of each slave in the voting process, needs to know
1945 * what are the instances really competing. */
1946 if (sender) {
1947 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1948 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1949 sender->flags |= nofailover;
1950 }
1951
1952 /* Update the node address if it changed. */
1953 if (sender && type == CLUSTERMSG_TYPE_PING &&
1954 !nodeInHandshake(sender) &&
1955 nodeUpdateAddressIfNeeded(sender,link,hdr))
1956 {
1957 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1958 CLUSTER_TODO_UPDATE_STATE);
1959 }
1960
1961 /* Update our info about the node */
1962 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1963 link->node->pong_received = now;
1964 link->node->ping_sent = 0;
1965
1966 /* The PFAIL condition can be reversed without external
1967 * help if it is momentary (that is, if it does not
1968 * turn into a FAIL state).
1969 *
1970 * The FAIL condition is also reversible under specific
1971 * conditions detected by clearNodeFailureIfNeeded(). */
1972 if (nodeTimedOut(link->node)) {
1973 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1974 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1975 CLUSTER_TODO_UPDATE_STATE);
1976 } else if (nodeFailed(link->node)) {
1977 clearNodeFailureIfNeeded(link->node);
1978 }
1979 }
1980
1981 /* Check for role switch: slave -> master or master -> slave. */
1982 if (sender) {
1983 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1984 sizeof(hdr->slaveof)))
1985 {
1986 /* Node is a master. */
1987 clusterSetNodeAsMaster(sender);
1988 } else {
1989 /* Node is a slave. */
1990 clusterNode *master = clusterLookupNode(hdr->slaveof);
1991
1992 if (nodeIsMaster(sender)) {
1993 /* Master turned into a slave! Reconfigure the node. */
1994 clusterDelNodeSlots(sender);
1995 sender->flags &= ~(CLUSTER_NODE_MASTER|
1996 CLUSTER_NODE_MIGRATE_TO);
1997 sender->flags |= CLUSTER_NODE_SLAVE;
1998
1999 /* Update config and state. */
2000 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2001 CLUSTER_TODO_UPDATE_STATE);
2002 }
2003
2004 /* Master node changed for this slave? */
2005 if (master && sender->slaveof != master) {
2006 if (sender->slaveof)
2007 clusterNodeRemoveSlave(sender->slaveof,sender);
2008 clusterNodeAddSlave(master,sender);
2009 sender->slaveof = master;
2010
2011 /* Update config. */
2012 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2013 }
2014 }
2015 }
2016
2017 /* Update our info about served slots.
2018 *
2019 * Note: this MUST happen after we update the master/slave state
2020 * so that CLUSTER_NODE_MASTER flag will be set. */
2021
2022 /* Many checks are only needed if the set of served slots this
2023 * instance claims is different compared to the set of slots we have
2024 * for it. Check this ASAP to avoid other computational expansive
2025 * checks later. */
2026 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
2027 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
2028
2029 if (sender) {
2030 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
2031 if (sender_master) {
2032 dirty_slots = memcmp(sender_master->slots,
2033 hdr->myslots,sizeof(hdr->myslots)) != 0;
2034 }
2035 }
2036
2037 /* 1) If the sender of the message is a master, and we detected that
2038 * the set of slots it claims changed, scan the slots to see if we
2039 * need to update our configuration. */
2040 if (sender && nodeIsMaster(sender) && dirty_slots)
2041 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
2042
2043 /* 2) We also check for the reverse condition, that is, the sender
2044 * claims to serve slots we know are served by a master with a
2045 * greater configEpoch. If this happens we inform the sender.
2046 *
2047 * This is useful because sometimes after a partition heals, a
2048 * reappearing master may be the last one to claim a given set of
2049 * hash slots, but with a configuration that other instances know to
2050 * be deprecated. Example:
2051 *
2052 * A and B are master and slave for slots 1,2,3.
2053 * A is partitioned away, B gets promoted.
2054 * B is partitioned away, and A returns available.
2055 *
2056 * Usually B would PING A publishing its set of served slots and its
2057 * configEpoch, but because of the partition B can't inform A of the
2058 * new configuration, so other nodes that have an updated table must
2059 * do it. In this way A will stop to act as a master (or can try to
2060 * failover if there are the conditions to win the election). */
2061 if (sender && dirty_slots) {
2062 int j;
2063
2064 for (j = 0; j < CLUSTER_SLOTS; j++) {
2065 if (bitmapTestBit(hdr->myslots,j)) {
2066 if (server.cluster->slots[j] == sender ||
2067 server.cluster->slots[j] == NULL) continue;
2068 if (server.cluster->slots[j]->configEpoch >
2069 senderConfigEpoch)
2070 {
2071 serverLog(LL_VERBOSE,
2072 "Node %.40s has old slots configuration, sending "
2073 "an UPDATE message about %.40s",
2074 sender->name, server.cluster->slots[j]->name);
2075 clusterSendUpdate(sender->link,
2076 server.cluster->slots[j]);
2077
2078 /* TODO: instead of exiting the loop send every other
2079 * UPDATE packet for other nodes that are the new owner
2080 * of sender's slots. */
2081 break;
2082 }
2083 }
2084 }
2085 }
2086
2087 /* If our config epoch collides with the sender's try to fix
2088 * the problem. */
2089 if (sender &&
2090 nodeIsMaster(myself) && nodeIsMaster(sender) &&
2091 senderConfigEpoch == myself->configEpoch)
2092 {
2093 clusterHandleConfigEpochCollision(sender);
2094 }
2095
2096 /* Get info from the gossip section */
2097 if (sender) clusterProcessGossipSection(hdr,link);
2098 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2099 clusterNode *failing;
2100
2101 if (sender) {
2102 failing = clusterLookupNode(hdr->data.fail.about.nodename);
2103 if (failing &&
2104 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2105 {
2106 serverLog(LL_NOTICE,
2107 "FAIL message received from %.40s about %.40s",
2108 hdr->sender, hdr->data.fail.about.nodename);
2109 failing->flags |= CLUSTER_NODE_FAIL;
2110 failing->fail_time = now;
2111 failing->flags &= ~CLUSTER_NODE_PFAIL;
2112 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2113 CLUSTER_TODO_UPDATE_STATE);
2114 }
2115 } else {
2116 serverLog(LL_NOTICE,
2117 "Ignoring FAIL message from unknown node %.40s about %.40s",
2118 hdr->sender, hdr->data.fail.about.nodename);
2119 }
2120 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2121 robj *channel, *message;
2122 uint32_t channel_len, message_len;
2123
2124 /* Don't bother creating useless objects if there are no
2125 * Pub/Sub subscribers. */
2126 if (dictSize(server.pubsub_channels) ||
2127 listLength(server.pubsub_patterns))
2128 {
2129 channel_len = ntohl(hdr->data.publish.msg.channel_len);
2130 message_len = ntohl(hdr->data.publish.msg.message_len);
2131 channel = createStringObject(
2132 (char*)hdr->data.publish.msg.bulk_data,channel_len);
2133 message = createStringObject(
2134 (char*)hdr->data.publish.msg.bulk_data+channel_len,
2135 message_len);
2136 pubsubPublishMessage(channel,message);
2137 decrRefCount(channel);
2138 decrRefCount(message);
2139 }
2140 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2141 if (!sender) return 1; /* We don't know that node. */
2142 clusterSendFailoverAuthIfNeeded(sender,hdr);
2143 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2144 if (!sender) return 1; /* We don't know that node. */
2145 /* We consider this vote only if the sender is a master serving
2146 * a non zero number of slots, and its currentEpoch is greater or
2147 * equal to epoch where this node started the election. */
2148 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2149 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2150 {
2151 server.cluster->failover_auth_count++;
2152 /* Maybe we reached a quorum here, set a flag to make sure
2153 * we check ASAP. */
2154 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2155 }
2156 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2157 /* This message is acceptable only if I'm a master and the sender
2158 * is one of my slaves. */
2159 if (!sender || sender->slaveof != myself) return 1;
2160 /* Manual failover requested from slaves. Initialize the state
2161 * accordingly. */
2162 resetManualFailover();
2163 server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2164 server.cluster->mf_slave = sender;
2165 pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
2166 serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2167 sender->name);
2168 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2169 clusterNode *n; /* The node the update is about. */
2170 uint64_t reportedConfigEpoch =
2171 ntohu64(hdr->data.update.nodecfg.configEpoch);
2172
2173 if (!sender) return 1; /* We don't know the sender. */
2174 n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2175 if (!n) return 1; /* We don't know the reported node. */
2176 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2177
2178 /* If in our current config the node is a slave, set it as a master. */
2179 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2180
2181 /* Update the node's configEpoch. */
2182 n->configEpoch = reportedConfigEpoch;
2183 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2184 CLUSTER_TODO_FSYNC_CONFIG);
2185
2186 /* Check the bitmap of served slots and update our
2187 * config accordingly. */
2188 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2189 hdr->data.update.nodecfg.slots);
2190 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2191 if (!sender) return 1; /* Protect the module from unknown nodes. */
2192 /* We need to route this message back to the right module subscribed
2193 * for the right message type. */
2194 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2195 uint32_t len = ntohl(hdr->data.module.msg.len);
2196 uint8_t type = hdr->data.module.msg.type;
2197 unsigned char *payload = hdr->data.module.msg.bulk_data;
2198 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2199 } else {
2200 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2201 }
2202 return 1;
2203 }
2204
2205 /* This function is called when we detect the link with this node is lost.
2206 We set the node as no longer connected. The Cluster Cron will detect
2207 this connection and will try to get it connected again.
2208
2209 Instead if the node is a temporary node used to accept a query, we
2210 completely free the node on error. */
handleLinkIOError(clusterLink * link)2211 void handleLinkIOError(clusterLink *link) {
2212 freeClusterLink(link);
2213 }
2214
2215 /* Send data. This is handled using a trivial send buffer that gets
2216 * consumed by write(). We don't try to optimize this for speed too much
2217 * as this is a very low traffic channel. */
clusterWriteHandler(connection * conn)2218 void clusterWriteHandler(connection *conn) {
2219 clusterLink *link = connGetPrivateData(conn);
2220 ssize_t nwritten;
2221
2222 nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
2223 if (nwritten <= 0) {
2224 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2225 (nwritten == -1) ? connGetLastError(conn) : "short write");
2226 handleLinkIOError(link);
2227 return;
2228 }
2229 sdsrange(link->sndbuf,nwritten,-1);
2230 if (sdslen(link->sndbuf) == 0)
2231 connSetWriteHandler(link->conn, NULL);
2232 }
2233
2234 /* A connect handler that gets called when a connection to another node
2235 * gets established.
2236 */
clusterLinkConnectHandler(connection * conn)2237 void clusterLinkConnectHandler(connection *conn) {
2238 clusterLink *link = connGetPrivateData(conn);
2239 clusterNode *node = link->node;
2240
2241 /* Check if connection succeeded */
2242 if (connGetState(conn) != CONN_STATE_CONNECTED) {
2243 serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
2244 node->name, node->ip, node->cport,
2245 connGetLastError(conn));
2246 freeClusterLink(link);
2247 return;
2248 }
2249
2250 /* Register a read handler from now on */
2251 connSetReadHandler(conn, clusterReadHandler);
2252
2253 /* Queue a PING in the new connection ASAP: this is crucial
2254 * to avoid false positives in failure detection.
2255 *
2256 * If the node is flagged as MEET, we send a MEET message instead
2257 * of a PING one, to force the receiver to add us in its node
2258 * table. */
2259 mstime_t old_ping_sent = node->ping_sent;
2260 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
2261 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
2262 if (old_ping_sent) {
2263 /* If there was an active ping before the link was
2264 * disconnected, we want to restore the ping time, otherwise
2265 * replaced by the clusterSendPing() call. */
2266 node->ping_sent = old_ping_sent;
2267 }
2268 /* We can clear the flag after the first packet is sent.
2269 * If we'll never receive a PONG, we'll never send new packets
2270 * to this node. Instead after the PONG is received and we
2271 * are no longer in meet/handshake status, we want to send
2272 * normal PING packets. */
2273 node->flags &= ~CLUSTER_NODE_MEET;
2274
2275 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
2276 node->name, node->ip, node->cport);
2277 }
2278
2279 /* Read data. Try to read the first field of the header first to check the
2280 * full length of the packet. When a whole packet is in memory this function
2281 * will call the function to process the packet. And so forth. */
clusterReadHandler(connection * conn)2282 void clusterReadHandler(connection *conn) {
2283 clusterMsg buf[1];
2284 ssize_t nread;
2285 clusterMsg *hdr;
2286 clusterLink *link = connGetPrivateData(conn);
2287 unsigned int readlen, rcvbuflen;
2288
2289 while(1) { /* Read as long as there is data to read. */
2290 rcvbuflen = link->rcvbuf_len;
2291 if (rcvbuflen < 8) {
2292 /* First, obtain the first 8 bytes to get the full message
2293 * length. */
2294 readlen = 8 - rcvbuflen;
2295 } else {
2296 /* Finally read the full message. */
2297 hdr = (clusterMsg*) link->rcvbuf;
2298 if (rcvbuflen == 8) {
2299 /* Perform some sanity check on the message signature
2300 * and length. */
2301 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2302 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2303 {
2304 serverLog(LL_WARNING,
2305 "Bad message length or signature received "
2306 "from Cluster bus.");
2307 handleLinkIOError(link);
2308 return;
2309 }
2310 }
2311 readlen = ntohl(hdr->totlen) - rcvbuflen;
2312 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2313 }
2314
2315 nread = connRead(conn,buf,readlen);
2316 if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
2317
2318 if (nread <= 0) {
2319 /* I/O error... */
2320 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2321 (nread == 0) ? "connection closed" : connGetLastError(conn));
2322 handleLinkIOError(link);
2323 return;
2324 } else {
2325 /* Read data and recast the pointer to the new buffer. */
2326 size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
2327 if ((size_t)nread > unused) {
2328 size_t required = link->rcvbuf_len + nread;
2329 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
2330 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
2331 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
2332 }
2333 memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
2334 link->rcvbuf_len += nread;
2335 hdr = (clusterMsg*) link->rcvbuf;
2336 rcvbuflen += nread;
2337 }
2338
2339 /* Total length obtained? Process this packet. */
2340 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2341 if (clusterProcessPacket(link)) {
2342 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
2343 zfree(link->rcvbuf);
2344 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
2345 }
2346 link->rcvbuf_len = 0;
2347 } else {
2348 return; /* Link no longer valid. */
2349 }
2350 }
2351 }
2352 }
2353
2354 /* Put stuff into the send buffer.
2355 *
2356 * It is guaranteed that this function will never have as a side effect
2357 * the link to be invalidated, so it is safe to call this function
2358 * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2359 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2360 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2361 connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
2362
2363 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2364
2365 /* Populate sent messages stats. */
2366 clusterMsg *hdr = (clusterMsg*) msg;
2367 uint16_t type = ntohs(hdr->type);
2368 if (type < CLUSTERMSG_TYPE_COUNT)
2369 server.cluster->stats_bus_messages_sent[type]++;
2370 }
2371
2372 /* Send a message to all the nodes that are part of the cluster having
2373 * a connected link.
2374 *
2375 * It is guaranteed that this function will never have as a side effect
2376 * some node->link to be invalidated, so it is safe to call this function
2377 * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2378 void clusterBroadcastMessage(void *buf, size_t len) {
2379 dictIterator *di;
2380 dictEntry *de;
2381
2382 di = dictGetSafeIterator(server.cluster->nodes);
2383 while((de = dictNext(di)) != NULL) {
2384 clusterNode *node = dictGetVal(de);
2385
2386 if (!node->link) continue;
2387 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2388 continue;
2389 clusterSendMessage(node->link,buf,len);
2390 }
2391 dictReleaseIterator(di);
2392 }
2393
2394 /* Build the message header. hdr must point to a buffer at least
2395 * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2396 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2397 int totlen = 0;
2398 uint64_t offset;
2399 clusterNode *master;
2400
2401 /* If this node is a master, we send its slots bitmap and configEpoch.
2402 * If this node is a slave we send the master's information instead (the
2403 * node is flagged as slave so the receiver knows that it is NOT really
2404 * in charge for this slots. */
2405 master = (nodeIsSlave(myself) && myself->slaveof) ?
2406 myself->slaveof : myself;
2407
2408 memset(hdr,0,sizeof(*hdr));
2409 hdr->ver = htons(CLUSTER_PROTO_VER);
2410 hdr->sig[0] = 'R';
2411 hdr->sig[1] = 'C';
2412 hdr->sig[2] = 'm';
2413 hdr->sig[3] = 'b';
2414 hdr->type = htons(type);
2415 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2416
2417 /* If cluster-announce-ip option is enabled, force the receivers of our
2418 * packets to use the specified address for this node. Otherwise if the
2419 * first byte is zero, they'll do auto discovery. */
2420 memset(hdr->myip,0,NET_IP_STR_LEN);
2421 if (server.cluster_announce_ip) {
2422 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2423 hdr->myip[NET_IP_STR_LEN-1] = '\0';
2424 }
2425
2426 /* Handle cluster-announce-port as well. */
2427 int port = server.tls_cluster ? server.tls_port : server.port;
2428 int announced_port = server.cluster_announce_port ?
2429 server.cluster_announce_port : port;
2430 int announced_cport = server.cluster_announce_bus_port ?
2431 server.cluster_announce_bus_port :
2432 (port + CLUSTER_PORT_INCR);
2433
2434 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2435 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2436 if (myself->slaveof != NULL)
2437 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2438 hdr->port = htons(announced_port);
2439 hdr->cport = htons(announced_cport);
2440 hdr->flags = htons(myself->flags);
2441 hdr->state = server.cluster->state;
2442
2443 /* Set the currentEpoch and configEpochs. */
2444 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2445 hdr->configEpoch = htonu64(master->configEpoch);
2446
2447 /* Set the replication offset. */
2448 if (nodeIsSlave(myself))
2449 offset = replicationGetSlaveOffset();
2450 else
2451 offset = server.master_repl_offset;
2452 hdr->offset = htonu64(offset);
2453
2454 /* Set the message flags. */
2455 if (nodeIsMaster(myself) && server.cluster->mf_end)
2456 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2457
2458 /* Compute the message length for certain messages. For other messages
2459 * this is up to the caller. */
2460 if (type == CLUSTERMSG_TYPE_FAIL) {
2461 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2462 totlen += sizeof(clusterMsgDataFail);
2463 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2464 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2465 totlen += sizeof(clusterMsgDataUpdate);
2466 }
2467 hdr->totlen = htonl(totlen);
2468 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2469 }
2470
2471 /* Return non zero if the node is already present in the gossip section of the
2472 * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2473 * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2474 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2475 int j;
2476 for (j = 0; j < count; j++) {
2477 if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2478 CLUSTER_NAMELEN) == 0) break;
2479 }
2480 return j != count;
2481 }
2482
2483 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2484 * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2485 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2486 clusterMsgDataGossip *gossip;
2487 gossip = &(hdr->data.ping.gossip[i]);
2488 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2489 gossip->ping_sent = htonl(n->ping_sent/1000);
2490 gossip->pong_received = htonl(n->pong_received/1000);
2491 memcpy(gossip->ip,n->ip,sizeof(n->ip));
2492 gossip->port = htons(n->port);
2493 gossip->cport = htons(n->cport);
2494 gossip->flags = htons(n->flags);
2495 gossip->notused1 = 0;
2496 }
2497
2498 /* Send a PING or PONG packet to the specified node, making sure to add enough
2499 * gossip information. */
clusterSendPing(clusterLink * link,int type)2500 void clusterSendPing(clusterLink *link, int type) {
2501 unsigned char *buf;
2502 clusterMsg *hdr;
2503 int gossipcount = 0; /* Number of gossip sections added so far. */
2504 int wanted; /* Number of gossip sections we want to append if possible. */
2505 int totlen; /* Total packet length. */
2506 /* freshnodes is the max number of nodes we can hope to append at all:
2507 * nodes available minus two (ourself and the node we are sending the
2508 * message to). However practically there may be less valid nodes since
2509 * nodes in handshake state, disconnected, are not considered. */
2510 int freshnodes = dictSize(server.cluster->nodes)-2;
2511
2512 /* How many gossip sections we want to add? 1/10 of the number of nodes
2513 * and anyway at least 3. Why 1/10?
2514 *
2515 * If we have N masters, with N/10 entries, and we consider that in
2516 * node_timeout we exchange with each other node at least 4 packets
2517 * (we ping in the worst case in node_timeout/2 time, and we also
2518 * receive two pings from the host), we have a total of 8 packets
2519 * in the node_timeout*2 failure reports validity time. So we have
2520 * that, for a single PFAIL node, we can expect to receive the following
2521 * number of failure reports (in the specified window of time):
2522 *
2523 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2524 *
2525 * PROB = probability of being featured in a single gossip entry,
2526 * which is 1 / NUM_OF_NODES.
2527 * ENTRIES = 10.
2528 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2529 *
2530 * If we assume we have just masters (so num of nodes and num of masters
2531 * is the same), with 1/10 we always get over the majority, and specifically
2532 * 80% of the number of nodes, to account for many masters failing at the
2533 * same time.
2534 *
2535 * Since we have non-voting slaves that lower the probability of an entry
2536 * to feature our node, we set the number of entries per packet as
2537 * 10% of the total nodes we have. */
2538 wanted = floor(dictSize(server.cluster->nodes)/10);
2539 if (wanted < 3) wanted = 3;
2540 if (wanted > freshnodes) wanted = freshnodes;
2541
2542 /* Include all the nodes in PFAIL state, so that failure reports are
2543 * faster to propagate to go from PFAIL to FAIL state. */
2544 int pfail_wanted = server.cluster->stats_pfail_nodes;
2545
2546 /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
2547 * later according to the number of gossip sections we really were able
2548 * to put inside the packet. */
2549 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2550 totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2551 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2552 * sizeof(clusterMsg) or more. */
2553 if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2554 buf = zcalloc(totlen);
2555 hdr = (clusterMsg*) buf;
2556
2557 /* Populate the header. */
2558 if (link->node && type == CLUSTERMSG_TYPE_PING)
2559 link->node->ping_sent = mstime();
2560 clusterBuildMessageHdr(hdr,type);
2561
2562 /* Populate the gossip fields */
2563 int maxiterations = wanted*3;
2564 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2565 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2566 clusterNode *this = dictGetVal(de);
2567
2568 /* Don't include this node: the whole packet header is about us
2569 * already, so we just gossip about other nodes. */
2570 if (this == myself) continue;
2571
2572 /* PFAIL nodes will be added later. */
2573 if (this->flags & CLUSTER_NODE_PFAIL) continue;
2574
2575 /* In the gossip section don't include:
2576 * 1) Nodes in HANDSHAKE state.
2577 * 3) Nodes with the NOADDR flag set.
2578 * 4) Disconnected nodes if they don't have configured slots.
2579 */
2580 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2581 (this->link == NULL && this->numslots == 0))
2582 {
2583 freshnodes--; /* Technically not correct, but saves CPU. */
2584 continue;
2585 }
2586
2587 /* Do not add a node we already have. */
2588 if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2589
2590 /* Add it */
2591 clusterSetGossipEntry(hdr,gossipcount,this);
2592 freshnodes--;
2593 gossipcount++;
2594 }
2595
2596 /* If there are PFAIL nodes, add them at the end. */
2597 if (pfail_wanted) {
2598 dictIterator *di;
2599 dictEntry *de;
2600
2601 di = dictGetSafeIterator(server.cluster->nodes);
2602 while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2603 clusterNode *node = dictGetVal(de);
2604 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2605 if (node->flags & CLUSTER_NODE_NOADDR) continue;
2606 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2607 clusterSetGossipEntry(hdr,gossipcount,node);
2608 freshnodes--;
2609 gossipcount++;
2610 /* We take the count of the slots we allocated, since the
2611 * PFAIL stats may not match perfectly with the current number
2612 * of PFAIL nodes. */
2613 pfail_wanted--;
2614 }
2615 dictReleaseIterator(di);
2616 }
2617
2618 /* Ready to send... fix the totlen fiend and queue the message in the
2619 * output buffer. */
2620 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2621 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2622 hdr->count = htons(gossipcount);
2623 hdr->totlen = htonl(totlen);
2624 clusterSendMessage(link,buf,totlen);
2625 zfree(buf);
2626 }
2627
2628 /* Send a PONG packet to every connected node that's not in handshake state
2629 * and for which we have a valid link.
2630 *
2631 * In Redis Cluster pongs are not used just for failure detection, but also
2632 * to carry important configuration information. So broadcasting a pong is
2633 * useful when something changes in the configuration and we want to make
2634 * the cluster aware ASAP (for instance after a slave promotion).
2635 *
2636 * The 'target' argument specifies the receiving instances using the
2637 * defines below:
2638 *
2639 * CLUSTER_BROADCAST_ALL -> All known instances.
2640 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2641 */
2642 #define CLUSTER_BROADCAST_ALL 0
2643 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2644 void clusterBroadcastPong(int target) {
2645 dictIterator *di;
2646 dictEntry *de;
2647
2648 di = dictGetSafeIterator(server.cluster->nodes);
2649 while((de = dictNext(di)) != NULL) {
2650 clusterNode *node = dictGetVal(de);
2651
2652 if (!node->link) continue;
2653 if (node == myself || nodeInHandshake(node)) continue;
2654 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2655 int local_slave =
2656 nodeIsSlave(node) && node->slaveof &&
2657 (node->slaveof == myself || node->slaveof == myself->slaveof);
2658 if (!local_slave) continue;
2659 }
2660 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2661 }
2662 dictReleaseIterator(di);
2663 }
2664
2665 /* Send a PUBLISH message.
2666 *
2667 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2668 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2669 unsigned char *payload;
2670 clusterMsg buf[1];
2671 clusterMsg *hdr = (clusterMsg*) buf;
2672 uint32_t totlen;
2673 uint32_t channel_len, message_len;
2674
2675 channel = getDecodedObject(channel);
2676 message = getDecodedObject(message);
2677 channel_len = sdslen(channel->ptr);
2678 message_len = sdslen(message->ptr);
2679
2680 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2681 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2682 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2683
2684 hdr->data.publish.msg.channel_len = htonl(channel_len);
2685 hdr->data.publish.msg.message_len = htonl(message_len);
2686 hdr->totlen = htonl(totlen);
2687
2688 /* Try to use the local buffer if possible */
2689 if (totlen < sizeof(buf)) {
2690 payload = (unsigned char*)buf;
2691 } else {
2692 payload = zmalloc(totlen);
2693 memcpy(payload,hdr,sizeof(*hdr));
2694 hdr = (clusterMsg*) payload;
2695 }
2696 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2697 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2698 message->ptr,sdslen(message->ptr));
2699
2700 if (link)
2701 clusterSendMessage(link,payload,totlen);
2702 else
2703 clusterBroadcastMessage(payload,totlen);
2704
2705 decrRefCount(channel);
2706 decrRefCount(message);
2707 if (payload != (unsigned char*)buf) zfree(payload);
2708 }
2709
2710 /* Send a FAIL message to all the nodes we are able to contact.
2711 * The FAIL message is sent when we detect that a node is failing
2712 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2713 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2714 * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2715 void clusterSendFail(char *nodename) {
2716 clusterMsg buf[1];
2717 clusterMsg *hdr = (clusterMsg*) buf;
2718
2719 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2720 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2721 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2722 }
2723
2724 /* Send an UPDATE message to the specified link carrying the specified 'node'
2725 * slots configuration. The node name, slots bitmap, and configEpoch info
2726 * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2727 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2728 clusterMsg buf[1];
2729 clusterMsg *hdr = (clusterMsg*) buf;
2730
2731 if (link == NULL) return;
2732 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2733 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2734 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2735 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2736 clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
2737 }
2738
2739 /* Send a MODULE message.
2740 *
2741 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2742 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2743 unsigned char *payload, uint32_t len) {
2744 unsigned char *heapbuf;
2745 clusterMsg buf[1];
2746 clusterMsg *hdr = (clusterMsg*) buf;
2747 uint32_t totlen;
2748
2749 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2750 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2751 totlen += sizeof(clusterMsgModule) - 3 + len;
2752
2753 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2754 hdr->data.module.msg.type = type;
2755 hdr->data.module.msg.len = htonl(len);
2756 hdr->totlen = htonl(totlen);
2757
2758 /* Try to use the local buffer if possible */
2759 if (totlen < sizeof(buf)) {
2760 heapbuf = (unsigned char*)buf;
2761 } else {
2762 heapbuf = zmalloc(totlen);
2763 memcpy(heapbuf,hdr,sizeof(*hdr));
2764 hdr = (clusterMsg*) heapbuf;
2765 }
2766 memcpy(hdr->data.module.msg.bulk_data,payload,len);
2767
2768 if (link)
2769 clusterSendMessage(link,heapbuf,totlen);
2770 else
2771 clusterBroadcastMessage(heapbuf,totlen);
2772
2773 if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
2774 }
2775
2776 /* This function gets a cluster node ID string as target, the same way the nodes
2777 * addresses are represented in the modules side, resolves the node, and sends
2778 * the message. If the target is NULL the message is broadcasted.
2779 *
2780 * The function returns C_OK if the target is valid, otherwise C_ERR is
2781 * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2782 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2783 clusterNode *node = NULL;
2784
2785 if (target != NULL) {
2786 node = clusterLookupNode(target);
2787 if (node == NULL || node->link == NULL) return C_ERR;
2788 }
2789
2790 clusterSendModule(target ? node->link : NULL,
2791 module_id, type, payload, len);
2792 return C_OK;
2793 }
2794
2795 /* -----------------------------------------------------------------------------
2796 * CLUSTER Pub/Sub support
2797 *
2798 * For now we do very little, just propagating PUBLISH messages across the whole
2799 * cluster. In the future we'll try to get smarter and avoiding propagating those
2800 * messages to hosts without receives for a given channel.
2801 * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2802 void clusterPropagatePublish(robj *channel, robj *message) {
2803 clusterSendPublish(NULL, channel, message);
2804 }
2805
2806 /* -----------------------------------------------------------------------------
2807 * SLAVE node specific functions
2808 * -------------------------------------------------------------------------- */
2809
2810 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2811 * see if there is the quorum for this slave instance to failover its failing
2812 * master.
2813 *
2814 * Note that we send the failover request to everybody, master and slave nodes,
2815 * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2816 void clusterRequestFailoverAuth(void) {
2817 clusterMsg buf[1];
2818 clusterMsg *hdr = (clusterMsg*) buf;
2819 uint32_t totlen;
2820
2821 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2822 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2823 * in the header to communicate the nodes receiving the message that
2824 * they should authorized the failover even if the master is working. */
2825 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2826 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2827 hdr->totlen = htonl(totlen);
2828 clusterBroadcastMessage(buf,totlen);
2829 }
2830
2831 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2832 void clusterSendFailoverAuth(clusterNode *node) {
2833 clusterMsg buf[1];
2834 clusterMsg *hdr = (clusterMsg*) buf;
2835 uint32_t totlen;
2836
2837 if (!node->link) return;
2838 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2839 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2840 hdr->totlen = htonl(totlen);
2841 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2842 }
2843
2844 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2845 void clusterSendMFStart(clusterNode *node) {
2846 clusterMsg buf[1];
2847 clusterMsg *hdr = (clusterMsg*) buf;
2848 uint32_t totlen;
2849
2850 if (!node->link) return;
2851 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2852 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2853 hdr->totlen = htonl(totlen);
2854 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
2855 }
2856
2857 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2858 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2859 clusterNode *master = node->slaveof;
2860 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2861 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2862 unsigned char *claimed_slots = request->myslots;
2863 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2864 int j;
2865
2866 /* IF we are not a master serving at least 1 slot, we don't have the
2867 * right to vote, as the cluster size in Redis Cluster is the number
2868 * of masters serving at least one slot, and quorum is the cluster
2869 * size + 1 */
2870 if (nodeIsSlave(myself) || myself->numslots == 0) return;
2871
2872 /* Request epoch must be >= our currentEpoch.
2873 * Note that it is impossible for it to actually be greater since
2874 * our currentEpoch was updated as a side effect of receiving this
2875 * request, if the request epoch was greater. */
2876 if (requestCurrentEpoch < server.cluster->currentEpoch) {
2877 serverLog(LL_WARNING,
2878 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2879 node->name,
2880 (unsigned long long) requestCurrentEpoch,
2881 (unsigned long long) server.cluster->currentEpoch);
2882 return;
2883 }
2884
2885 /* I already voted for this epoch? Return ASAP. */
2886 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2887 serverLog(LL_WARNING,
2888 "Failover auth denied to %.40s: already voted for epoch %llu",
2889 node->name,
2890 (unsigned long long) server.cluster->currentEpoch);
2891 return;
2892 }
2893
2894 /* Node must be a slave and its master down.
2895 * The master can be non failing if the request is flagged
2896 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2897 if (nodeIsMaster(node) || master == NULL ||
2898 (!nodeFailed(master) && !force_ack))
2899 {
2900 if (nodeIsMaster(node)) {
2901 serverLog(LL_WARNING,
2902 "Failover auth denied to %.40s: it is a master node",
2903 node->name);
2904 } else if (master == NULL) {
2905 serverLog(LL_WARNING,
2906 "Failover auth denied to %.40s: I don't know its master",
2907 node->name);
2908 } else if (!nodeFailed(master)) {
2909 serverLog(LL_WARNING,
2910 "Failover auth denied to %.40s: its master is up",
2911 node->name);
2912 }
2913 return;
2914 }
2915
2916 /* We did not voted for a slave about this master for two
2917 * times the node timeout. This is not strictly needed for correctness
2918 * of the algorithm but makes the base case more linear. */
2919 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2920 {
2921 serverLog(LL_WARNING,
2922 "Failover auth denied to %.40s: "
2923 "can't vote about this master before %lld milliseconds",
2924 node->name,
2925 (long long) ((server.cluster_node_timeout*2)-
2926 (mstime() - node->slaveof->voted_time)));
2927 return;
2928 }
2929
2930 /* The slave requesting the vote must have a configEpoch for the claimed
2931 * slots that is >= the one of the masters currently serving the same
2932 * slots in the current configuration. */
2933 for (j = 0; j < CLUSTER_SLOTS; j++) {
2934 if (bitmapTestBit(claimed_slots, j) == 0) continue;
2935 if (server.cluster->slots[j] == NULL ||
2936 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2937 {
2938 continue;
2939 }
2940 /* If we reached this point we found a slot that in our current slots
2941 * is served by a master with a greater configEpoch than the one claimed
2942 * by the slave requesting our vote. Refuse to vote for this slave. */
2943 serverLog(LL_WARNING,
2944 "Failover auth denied to %.40s: "
2945 "slot %d epoch (%llu) > reqEpoch (%llu)",
2946 node->name, j,
2947 (unsigned long long) server.cluster->slots[j]->configEpoch,
2948 (unsigned long long) requestConfigEpoch);
2949 return;
2950 }
2951
2952 /* We can vote for this slave. */
2953 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2954 node->slaveof->voted_time = mstime();
2955 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2956 clusterSendFailoverAuth(node);
2957 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2958 node->name, (unsigned long long) server.cluster->currentEpoch);
2959 }
2960
2961 /* This function returns the "rank" of this instance, a slave, in the context
2962 * of its master-slaves ring. The rank of the slave is given by the number of
2963 * other slaves for the same master that have a better replication offset
2964 * compared to the local one (better means, greater, so they claim more data).
2965 *
2966 * A slave with rank 0 is the one with the greatest (most up to date)
2967 * replication offset, and so forth. Note that because how the rank is computed
2968 * multiple slaves may have the same rank, in case they have the same offset.
2969 *
2970 * The slave rank is used to add a delay to start an election in order to
2971 * get voted and replace a failing master. Slaves with better replication
2972 * offsets are more likely to win. */
clusterGetSlaveRank(void)2973 int clusterGetSlaveRank(void) {
2974 long long myoffset;
2975 int j, rank = 0;
2976 clusterNode *master;
2977
2978 serverAssert(nodeIsSlave(myself));
2979 master = myself->slaveof;
2980 if (master == NULL) return 0; /* Never called by slaves without master. */
2981
2982 myoffset = replicationGetSlaveOffset();
2983 for (j = 0; j < master->numslaves; j++)
2984 if (master->slaves[j] != myself &&
2985 !nodeCantFailover(master->slaves[j]) &&
2986 master->slaves[j]->repl_offset > myoffset) rank++;
2987 return rank;
2988 }
2989
2990 /* This function is called by clusterHandleSlaveFailover() in order to
2991 * let the slave log why it is not able to failover. Sometimes there are
2992 * not the conditions, but since the failover function is called again and
2993 * again, we can't log the same things continuously.
2994 *
2995 * This function works by logging only if a given set of conditions are
2996 * true:
2997 *
2998 * 1) The reason for which the failover can't be initiated changed.
2999 * The reasons also include a NONE reason we reset the state to
3000 * when the slave finds that its master is fine (no FAIL flag).
3001 * 2) Also, the log is emitted again if the master is still down and
3002 * the reason for not failing over is still the same, but more than
3003 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
3004 * 3) Finally, the function only logs if the slave is down for more than
3005 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
3006 * failover starts in a reasonable time.
3007 *
3008 * The function is called with the reason why the slave can't failover
3009 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
3010 *
3011 * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)3012 void clusterLogCantFailover(int reason) {
3013 char *msg;
3014 static time_t lastlog_time = 0;
3015 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
3016
3017 /* Don't log if we have the same reason for some time. */
3018 if (reason == server.cluster->cant_failover_reason &&
3019 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
3020 return;
3021
3022 server.cluster->cant_failover_reason = reason;
3023
3024 /* We also don't emit any log if the master failed no long ago, the
3025 * goal of this function is to log slaves in a stalled condition for
3026 * a long time. */
3027 if (myself->slaveof &&
3028 nodeFailed(myself->slaveof) &&
3029 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
3030
3031 switch(reason) {
3032 case CLUSTER_CANT_FAILOVER_DATA_AGE:
3033 msg = "Disconnected from master for longer than allowed. "
3034 "Please check the 'cluster-replica-validity-factor' configuration "
3035 "option.";
3036 break;
3037 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
3038 msg = "Waiting the delay before I can start a new failover.";
3039 break;
3040 case CLUSTER_CANT_FAILOVER_EXPIRED:
3041 msg = "Failover attempt expired.";
3042 break;
3043 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
3044 msg = "Waiting for votes, but majority still not reached.";
3045 break;
3046 default:
3047 msg = "Unknown reason code.";
3048 break;
3049 }
3050 lastlog_time = time(NULL);
3051 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
3052 }
3053
3054 /* This function implements the final part of automatic and manual failovers,
3055 * where the slave grabs its master's hash slots, and propagates the new
3056 * configuration.
3057 *
3058 * Note that it's up to the caller to be sure that the node got a new
3059 * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)3060 void clusterFailoverReplaceYourMaster(void) {
3061 int j;
3062 clusterNode *oldmaster = myself->slaveof;
3063
3064 if (nodeIsMaster(myself) || oldmaster == NULL) return;
3065
3066 /* 1) Turn this node into a master. */
3067 clusterSetNodeAsMaster(myself);
3068 replicationUnsetMaster();
3069
3070 /* 2) Claim all the slots assigned to our master. */
3071 for (j = 0; j < CLUSTER_SLOTS; j++) {
3072 if (clusterNodeGetSlotBit(oldmaster,j)) {
3073 clusterDelSlot(j);
3074 clusterAddSlot(myself,j);
3075 }
3076 }
3077
3078 /* 3) Update state and save config. */
3079 clusterUpdateState();
3080 clusterSaveConfigOrDie(1);
3081
3082 /* 4) Pong all the other nodes so that they can update the state
3083 * accordingly and detect that we switched to master role. */
3084 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
3085
3086 /* 5) If there was a manual failover in progress, clear the state. */
3087 resetManualFailover();
3088 }
3089
3090 /* This function is called if we are a slave node and our master serving
3091 * a non-zero amount of hash slots is in FAIL state.
3092 *
3093 * The gaol of this function is:
3094 * 1) To check if we are able to perform a failover, is our data updated?
3095 * 2) Try to get elected by masters.
3096 * 3) Perform the failover informing all the other nodes.
3097 */
clusterHandleSlaveFailover(void)3098 void clusterHandleSlaveFailover(void) {
3099 mstime_t data_age;
3100 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
3101 int needed_quorum = (server.cluster->size / 2) + 1;
3102 int manual_failover = server.cluster->mf_end != 0 &&
3103 server.cluster->mf_can_start;
3104 mstime_t auth_timeout, auth_retry_time;
3105
3106 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
3107
3108 /* Compute the failover timeout (the max time we have to send votes
3109 * and wait for replies), and the failover retry time (the time to wait
3110 * before trying to get voted again).
3111 *
3112 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3113 * Retry is two times the Timeout.
3114 */
3115 auth_timeout = server.cluster_node_timeout*2;
3116 if (auth_timeout < 2000) auth_timeout = 2000;
3117 auth_retry_time = auth_timeout*2;
3118
3119 /* Pre conditions to run the function, that must be met both in case
3120 * of an automatic or manual failover:
3121 * 1) We are a slave.
3122 * 2) Our master is flagged as FAIL, or this is a manual failover.
3123 * 3) We don't have the no failover configuration set, and this is
3124 * not a manual failover.
3125 * 4) It is serving slots. */
3126 if (nodeIsMaster(myself) ||
3127 myself->slaveof == NULL ||
3128 (!nodeFailed(myself->slaveof) && !manual_failover) ||
3129 (server.cluster_slave_no_failover && !manual_failover) ||
3130 myself->slaveof->numslots == 0)
3131 {
3132 /* There are no reasons to failover, so we set the reason why we
3133 * are returning without failing over to NONE. */
3134 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3135 return;
3136 }
3137
3138 /* Set data_age to the number of seconds we are disconnected from
3139 * the master. */
3140 if (server.repl_state == REPL_STATE_CONNECTED) {
3141 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3142 * 1000;
3143 } else {
3144 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3145 }
3146
3147 /* Remove the node timeout from the data age as it is fine that we are
3148 * disconnected from our master at least for the time it was down to be
3149 * flagged as FAIL, that's the baseline. */
3150 if (data_age > server.cluster_node_timeout)
3151 data_age -= server.cluster_node_timeout;
3152
3153 /* Check if our data is recent enough according to the slave validity
3154 * factor configured by the user.
3155 *
3156 * Check bypassed for manual failovers. */
3157 if (server.cluster_slave_validity_factor &&
3158 data_age >
3159 (((mstime_t)server.repl_ping_slave_period * 1000) +
3160 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3161 {
3162 if (!manual_failover) {
3163 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3164 return;
3165 }
3166 }
3167
3168 /* If the previous failover attempt timeout and the retry time has
3169 * elapsed, we can setup a new one. */
3170 if (auth_age > auth_retry_time) {
3171 server.cluster->failover_auth_time = mstime() +
3172 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3173 random() % 500; /* Random delay between 0 and 500 milliseconds. */
3174 server.cluster->failover_auth_count = 0;
3175 server.cluster->failover_auth_sent = 0;
3176 server.cluster->failover_auth_rank = clusterGetSlaveRank();
3177 /* We add another delay that is proportional to the slave rank.
3178 * Specifically 1 second * rank. This way slaves that have a probably
3179 * less updated replication offset, are penalized. */
3180 server.cluster->failover_auth_time +=
3181 server.cluster->failover_auth_rank * 1000;
3182 /* However if this is a manual failover, no delay is needed. */
3183 if (server.cluster->mf_end) {
3184 server.cluster->failover_auth_time = mstime();
3185 server.cluster->failover_auth_rank = 0;
3186 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3187 }
3188 serverLog(LL_WARNING,
3189 "Start of election delayed for %lld milliseconds "
3190 "(rank #%d, offset %lld).",
3191 server.cluster->failover_auth_time - mstime(),
3192 server.cluster->failover_auth_rank,
3193 replicationGetSlaveOffset());
3194 /* Now that we have a scheduled election, broadcast our offset
3195 * to all the other slaves so that they'll updated their offsets
3196 * if our offset is better. */
3197 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3198 return;
3199 }
3200
3201 /* It is possible that we received more updated offsets from other
3202 * slaves for the same master since we computed our election delay.
3203 * Update the delay if our rank changed.
3204 *
3205 * Not performed if this is a manual failover. */
3206 if (server.cluster->failover_auth_sent == 0 &&
3207 server.cluster->mf_end == 0)
3208 {
3209 int newrank = clusterGetSlaveRank();
3210 if (newrank > server.cluster->failover_auth_rank) {
3211 long long added_delay =
3212 (newrank - server.cluster->failover_auth_rank) * 1000;
3213 server.cluster->failover_auth_time += added_delay;
3214 server.cluster->failover_auth_rank = newrank;
3215 serverLog(LL_WARNING,
3216 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3217 newrank, added_delay);
3218 }
3219 }
3220
3221 /* Return ASAP if we can't still start the election. */
3222 if (mstime() < server.cluster->failover_auth_time) {
3223 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3224 return;
3225 }
3226
3227 /* Return ASAP if the election is too old to be valid. */
3228 if (auth_age > auth_timeout) {
3229 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3230 return;
3231 }
3232
3233 /* Ask for votes if needed. */
3234 if (server.cluster->failover_auth_sent == 0) {
3235 server.cluster->currentEpoch++;
3236 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3237 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3238 (unsigned long long) server.cluster->currentEpoch);
3239 clusterRequestFailoverAuth();
3240 server.cluster->failover_auth_sent = 1;
3241 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3242 CLUSTER_TODO_UPDATE_STATE|
3243 CLUSTER_TODO_FSYNC_CONFIG);
3244 return; /* Wait for replies. */
3245 }
3246
3247 /* Check if we reached the quorum. */
3248 if (server.cluster->failover_auth_count >= needed_quorum) {
3249 /* We have the quorum, we can finally failover the master. */
3250
3251 serverLog(LL_WARNING,
3252 "Failover election won: I'm the new master.");
3253
3254 /* Update my configEpoch to the epoch of the election. */
3255 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3256 myself->configEpoch = server.cluster->failover_auth_epoch;
3257 serverLog(LL_WARNING,
3258 "configEpoch set to %llu after successful failover",
3259 (unsigned long long) myself->configEpoch);
3260 }
3261
3262 /* Take responsibility for the cluster slots. */
3263 clusterFailoverReplaceYourMaster();
3264 } else {
3265 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3266 }
3267 }
3268
3269 /* -----------------------------------------------------------------------------
3270 * CLUSTER slave migration
3271 *
3272 * Slave migration is the process that allows a slave of a master that is
3273 * already covered by at least another slave, to "migrate" to a master that
3274 * is orphaned, that is, left with no working slaves.
3275 * ------------------------------------------------------------------------- */
3276
3277 /* This function is responsible to decide if this replica should be migrated
3278 * to a different (orphaned) master. It is called by the clusterCron() function
3279 * only if:
3280 *
3281 * 1) We are a slave node.
3282 * 2) It was detected that there is at least one orphaned master in
3283 * the cluster.
3284 * 3) We are a slave of one of the masters with the greatest number of
3285 * slaves.
3286 *
3287 * This checks are performed by the caller since it requires to iterate
3288 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3289 * if definitely needed.
3290 *
3291 * The function is called with a pre-computed max_slaves, that is the max
3292 * number of working (not in FAIL state) slaves for a single master.
3293 *
3294 * Additional conditions for migration are examined inside the function.
3295 */
clusterHandleSlaveMigration(int max_slaves)3296 void clusterHandleSlaveMigration(int max_slaves) {
3297 int j, okslaves = 0;
3298 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3299 dictIterator *di;
3300 dictEntry *de;
3301
3302 /* Step 1: Don't migrate if the cluster state is not ok. */
3303 if (server.cluster->state != CLUSTER_OK) return;
3304
3305 /* Step 2: Don't migrate if my master will not be left with at least
3306 * 'migration-barrier' slaves after my migration. */
3307 if (mymaster == NULL) return;
3308 for (j = 0; j < mymaster->numslaves; j++)
3309 if (!nodeFailed(mymaster->slaves[j]) &&
3310 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3311 if (okslaves <= server.cluster_migration_barrier) return;
3312
3313 /* Step 3: Identify a candidate for migration, and check if among the
3314 * masters with the greatest number of ok slaves, I'm the one with the
3315 * smallest node ID (the "candidate slave").
3316 *
3317 * Note: this means that eventually a replica migration will occur
3318 * since slaves that are reachable again always have their FAIL flag
3319 * cleared, so eventually there must be a candidate. At the same time
3320 * this does not mean that there are no race conditions possible (two
3321 * slaves migrating at the same time), but this is unlikely to
3322 * happen, and harmless when happens. */
3323 candidate = myself;
3324 di = dictGetSafeIterator(server.cluster->nodes);
3325 while((de = dictNext(di)) != NULL) {
3326 clusterNode *node = dictGetVal(de);
3327 int okslaves = 0, is_orphaned = 1;
3328
3329 /* We want to migrate only if this master is working, orphaned, and
3330 * used to have slaves or if failed over a master that had slaves
3331 * (MIGRATE_TO flag). This way we only migrate to instances that were
3332 * supposed to have replicas. */
3333 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3334 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3335
3336 /* Check number of working slaves. */
3337 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3338 if (okslaves > 0) is_orphaned = 0;
3339
3340 if (is_orphaned) {
3341 if (!target && node->numslots > 0) target = node;
3342
3343 /* Track the starting time of the orphaned condition for this
3344 * master. */
3345 if (!node->orphaned_time) node->orphaned_time = mstime();
3346 } else {
3347 node->orphaned_time = 0;
3348 }
3349
3350 /* Check if I'm the slave candidate for the migration: attached
3351 * to a master with the maximum number of slaves and with the smallest
3352 * node ID. */
3353 if (okslaves == max_slaves) {
3354 for (j = 0; j < node->numslaves; j++) {
3355 if (memcmp(node->slaves[j]->name,
3356 candidate->name,
3357 CLUSTER_NAMELEN) < 0)
3358 {
3359 candidate = node->slaves[j];
3360 }
3361 }
3362 }
3363 }
3364 dictReleaseIterator(di);
3365
3366 /* Step 4: perform the migration if there is a target, and if I'm the
3367 * candidate, but only if the master is continuously orphaned for a
3368 * couple of seconds, so that during failovers, we give some time to
3369 * the natural slaves of this instance to advertise their switch from
3370 * the old master to the new one. */
3371 if (target && candidate == myself &&
3372 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3373 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3374 {
3375 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3376 target->name);
3377 clusterSetMaster(target);
3378 }
3379 }
3380
3381 /* -----------------------------------------------------------------------------
3382 * CLUSTER manual failover
3383 *
3384 * This are the important steps performed by slaves during a manual failover:
3385 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3386 * setting mf_end to the millisecond unix time at which we'll abort the
3387 * attempt.
3388 * 2) Slave sends a MFSTART message to the master requesting to pause clients
3389 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3390 * When master is paused for manual failover, it also starts to flag
3391 * packets with CLUSTERMSG_FLAG0_PAUSED.
3392 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3393 * 4) If slave received the offset from the master, and its offset matches,
3394 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3395 * the failover as usually, with the difference that the vote request
3396 * will be modified to force masters to vote for a slave that has a
3397 * working master.
3398 *
3399 * From the point of view of the master things are simpler: when a
3400 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3401 * the sender in mf_slave. During the time limit for the manual failover
3402 * the master will just send PINGs more often to this slave, flagged with
3403 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3404 * a packet from the master with this flag set.
3405 *
3406 * The gaol of the manual failover is to perform a fast failover without
3407 * data loss due to the asynchronous master-slave replication.
3408 * -------------------------------------------------------------------------- */
3409
3410 /* Reset the manual failover state. This works for both masters and slaves
3411 * as all the state about manual failover is cleared.
3412 *
3413 * The function can be used both to initialize the manual failover state at
3414 * startup or to abort a manual failover in progress. */
resetManualFailover(void)3415 void resetManualFailover(void) {
3416 if (server.cluster->mf_end && clientsArePaused()) {
3417 server.clients_pause_end_time = 0;
3418 clientsArePaused(); /* Just use the side effect of the function. */
3419 }
3420 server.cluster->mf_end = 0; /* No manual failover in progress. */
3421 server.cluster->mf_can_start = 0;
3422 server.cluster->mf_slave = NULL;
3423 server.cluster->mf_master_offset = 0;
3424 }
3425
3426 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3427 void manualFailoverCheckTimeout(void) {
3428 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3429 serverLog(LL_WARNING,"Manual failover timed out.");
3430 resetManualFailover();
3431 }
3432 }
3433
3434 /* This function is called from the cluster cron function in order to go
3435 * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3436 void clusterHandleManualFailover(void) {
3437 /* Return ASAP if no manual failover is in progress. */
3438 if (server.cluster->mf_end == 0) return;
3439
3440 /* If mf_can_start is non-zero, the failover was already triggered so the
3441 * next steps are performed by clusterHandleSlaveFailover(). */
3442 if (server.cluster->mf_can_start) return;
3443
3444 if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3445
3446 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3447 /* Our replication offset matches the master replication offset
3448 * announced after clients were paused. We can start the failover. */
3449 server.cluster->mf_can_start = 1;
3450 serverLog(LL_WARNING,
3451 "All master replication stream processed, "
3452 "manual failover can start.");
3453 }
3454 }
3455
3456 /* -----------------------------------------------------------------------------
3457 * CLUSTER cron job
3458 * -------------------------------------------------------------------------- */
3459
3460 /* This is executed 10 times every second */
clusterCron(void)3461 void clusterCron(void) {
3462 dictIterator *di;
3463 dictEntry *de;
3464 int update_state = 0;
3465 int orphaned_masters; /* How many masters there are without ok slaves. */
3466 int max_slaves; /* Max number of ok slaves for a single master. */
3467 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3468 mstime_t min_pong = 0, now = mstime();
3469 clusterNode *min_pong_node = NULL;
3470 static unsigned long long iteration = 0;
3471 mstime_t handshake_timeout;
3472
3473 iteration++; /* Number of times this function was called so far. */
3474
3475 /* We want to take myself->ip in sync with the cluster-announce-ip option.
3476 * The option can be set at runtime via CONFIG SET, so we periodically check
3477 * if the option changed to reflect this into myself->ip. */
3478 {
3479 static char *prev_ip = NULL;
3480 char *curr_ip = server.cluster_announce_ip;
3481 int changed = 0;
3482
3483 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3484 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3485 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3486
3487 if (changed) {
3488 if (prev_ip) zfree(prev_ip);
3489 prev_ip = curr_ip;
3490
3491 if (curr_ip) {
3492 /* We always take a copy of the previous IP address, by
3493 * duplicating the string. This way later we can check if
3494 * the address really changed. */
3495 prev_ip = zstrdup(prev_ip);
3496 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3497 myself->ip[NET_IP_STR_LEN-1] = '\0';
3498 } else {
3499 myself->ip[0] = '\0'; /* Force autodetection. */
3500 }
3501 }
3502 }
3503
3504 /* The handshake timeout is the time after which a handshake node that was
3505 * not turned into a normal node is removed from the nodes. Usually it is
3506 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3507 * the value of 1 second. */
3508 handshake_timeout = server.cluster_node_timeout;
3509 if (handshake_timeout < 1000) handshake_timeout = 1000;
3510
3511 /* Update myself flags. */
3512 clusterUpdateMyselfFlags();
3513
3514 /* Check if we have disconnected nodes and re-establish the connection.
3515 * Also update a few stats while we are here, that can be used to make
3516 * better decisions in other part of the code. */
3517 di = dictGetSafeIterator(server.cluster->nodes);
3518 server.cluster->stats_pfail_nodes = 0;
3519 while((de = dictNext(di)) != NULL) {
3520 clusterNode *node = dictGetVal(de);
3521
3522 /* Not interested in reconnecting the link with myself or nodes
3523 * for which we have no address. */
3524 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3525
3526 if (node->flags & CLUSTER_NODE_PFAIL)
3527 server.cluster->stats_pfail_nodes++;
3528
3529 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3530 * configured node timeout. */
3531 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3532 clusterDelNode(node);
3533 continue;
3534 }
3535
3536 if (node->link == NULL) {
3537 clusterLink *link = createClusterLink(node);
3538 link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
3539 connSetPrivateData(link->conn, link);
3540 if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
3541 clusterLinkConnectHandler) == -1) {
3542 /* We got a synchronous error from connect before
3543 * clusterSendPing() had a chance to be called.
3544 * If node->ping_sent is zero, failure detection can't work,
3545 * so we claim we actually sent a ping now (that will
3546 * be really sent as soon as the link is obtained). */
3547 if (node->ping_sent == 0) node->ping_sent = mstime();
3548 serverLog(LL_DEBUG, "Unable to connect to "
3549 "Cluster Node [%s]:%d -> %s", node->ip,
3550 node->cport, server.neterr);
3551
3552 freeClusterLink(link);
3553 continue;
3554 }
3555 node->link = link;
3556 }
3557 }
3558 dictReleaseIterator(di);
3559
3560 /* Ping some random node 1 time every 10 iterations, so that we usually ping
3561 * one random node every second. */
3562 if (!(iteration % 10)) {
3563 int j;
3564
3565 /* Check a few random nodes and ping the one with the oldest
3566 * pong_received time. */
3567 for (j = 0; j < 5; j++) {
3568 de = dictGetRandomKey(server.cluster->nodes);
3569 clusterNode *this = dictGetVal(de);
3570
3571 /* Don't ping nodes disconnected or with a ping currently active. */
3572 if (this->link == NULL || this->ping_sent != 0) continue;
3573 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3574 continue;
3575 if (min_pong_node == NULL || min_pong > this->pong_received) {
3576 min_pong_node = this;
3577 min_pong = this->pong_received;
3578 }
3579 }
3580 if (min_pong_node) {
3581 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3582 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3583 }
3584 }
3585
3586 /* Iterate nodes to check if we need to flag something as failing.
3587 * This loop is also responsible to:
3588 * 1) Check if there are orphaned masters (masters without non failing
3589 * slaves).
3590 * 2) Count the max number of non failing slaves for a single master.
3591 * 3) Count the number of slaves for our master, if we are a slave. */
3592 orphaned_masters = 0;
3593 max_slaves = 0;
3594 this_slaves = 0;
3595 di = dictGetSafeIterator(server.cluster->nodes);
3596 while((de = dictNext(di)) != NULL) {
3597 clusterNode *node = dictGetVal(de);
3598 now = mstime(); /* Use an updated time at every iteration. */
3599
3600 if (node->flags &
3601 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3602 continue;
3603
3604 /* Orphaned master check, useful only if the current instance
3605 * is a slave that may migrate to another master. */
3606 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3607 int okslaves = clusterCountNonFailingSlaves(node);
3608
3609 /* A master is orphaned if it is serving a non-zero number of
3610 * slots, have no working slaves, but used to have at least one
3611 * slave, or failed over a master that used to have slaves. */
3612 if (okslaves == 0 && node->numslots > 0 &&
3613 node->flags & CLUSTER_NODE_MIGRATE_TO)
3614 {
3615 orphaned_masters++;
3616 }
3617 if (okslaves > max_slaves) max_slaves = okslaves;
3618 if (nodeIsSlave(myself) && myself->slaveof == node)
3619 this_slaves = okslaves;
3620 }
3621
3622 /* If we are not receiving any data for more than half the cluster
3623 * timeout, reconnect the link: maybe there is a connection
3624 * issue even if the node is alive. */
3625 mstime_t ping_delay = now - node->ping_sent;
3626 mstime_t data_delay = now - node->data_received;
3627 if (node->link && /* is connected */
3628 now - node->link->ctime >
3629 server.cluster_node_timeout && /* was not already reconnected */
3630 node->ping_sent && /* we already sent a ping */
3631 /* and we are waiting for the pong more than timeout/2 */
3632 ping_delay > server.cluster_node_timeout/2 &&
3633 /* and in such interval we are not seeing any traffic at all. */
3634 data_delay > server.cluster_node_timeout/2)
3635 {
3636 /* Disconnect the link, it will be reconnected automatically. */
3637 freeClusterLink(node->link);
3638 }
3639
3640 /* If we have currently no active ping in this instance, and the
3641 * received PONG is older than half the cluster timeout, send
3642 * a new ping now, to ensure all the nodes are pinged without
3643 * a too big delay. */
3644 if (node->link &&
3645 node->ping_sent == 0 &&
3646 (now - node->pong_received) > server.cluster_node_timeout/2)
3647 {
3648 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3649 continue;
3650 }
3651
3652 /* If we are a master and one of the slaves requested a manual
3653 * failover, ping it continuously. */
3654 if (server.cluster->mf_end &&
3655 nodeIsMaster(myself) &&
3656 server.cluster->mf_slave == node &&
3657 node->link)
3658 {
3659 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3660 continue;
3661 }
3662
3663 /* Check only if we have an active ping for this instance. */
3664 if (node->ping_sent == 0) continue;
3665
3666 /* Check if this node looks unreachable.
3667 * Note that if we already received the PONG, then node->ping_sent
3668 * is zero, so can't reach this code at all, so we don't risk of
3669 * checking for a PONG delay if we didn't sent the PING.
3670 *
3671 * We also consider every incoming data as proof of liveness, since
3672 * our cluster bus link is also used for data: under heavy data
3673 * load pong delays are possible. */
3674 mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
3675 data_delay;
3676
3677 if (node_delay > server.cluster_node_timeout) {
3678 /* Timeout reached. Set the node as possibly failing if it is
3679 * not already in this state. */
3680 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3681 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3682 node->name);
3683 node->flags |= CLUSTER_NODE_PFAIL;
3684 update_state = 1;
3685 }
3686 }
3687 }
3688 dictReleaseIterator(di);
3689
3690 /* If we are a slave node but the replication is still turned off,
3691 * enable it if we know the address of our master and it appears to
3692 * be up. */
3693 if (nodeIsSlave(myself) &&
3694 server.masterhost == NULL &&
3695 myself->slaveof &&
3696 nodeHasAddr(myself->slaveof))
3697 {
3698 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3699 }
3700
3701 /* Abort a manual failover if the timeout is reached. */
3702 manualFailoverCheckTimeout();
3703
3704 if (nodeIsSlave(myself)) {
3705 clusterHandleManualFailover();
3706 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3707 clusterHandleSlaveFailover();
3708 /* If there are orphaned slaves, and we are a slave among the masters
3709 * with the max number of non-failing slaves, consider migrating to
3710 * the orphaned masters. Note that it does not make sense to try
3711 * a migration if there is no master with at least *two* working
3712 * slaves. */
3713 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3714 clusterHandleSlaveMigration(max_slaves);
3715 }
3716
3717 if (update_state || server.cluster->state == CLUSTER_FAIL)
3718 clusterUpdateState();
3719 }
3720
3721 /* This function is called before the event handler returns to sleep for
3722 * events. It is useful to perform operations that must be done ASAP in
3723 * reaction to events fired but that are not safe to perform inside event
3724 * handlers, or to perform potentially expansive tasks that we need to do
3725 * a single time before replying to clients. */
clusterBeforeSleep(void)3726 void clusterBeforeSleep(void) {
3727 /* Handle failover, this is needed when it is likely that there is already
3728 * the quorum from masters in order to react fast. */
3729 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3730 clusterHandleSlaveFailover();
3731
3732 /* Update the cluster state. */
3733 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3734 clusterUpdateState();
3735
3736 /* Save the config, possibly using fsync. */
3737 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3738 int fsync = server.cluster->todo_before_sleep &
3739 CLUSTER_TODO_FSYNC_CONFIG;
3740 clusterSaveConfigOrDie(fsync);
3741 }
3742
3743 /* Reset our flags (not strictly needed since every single function
3744 * called for flags set should be able to clear its flag). */
3745 server.cluster->todo_before_sleep = 0;
3746 }
3747
clusterDoBeforeSleep(int flags)3748 void clusterDoBeforeSleep(int flags) {
3749 server.cluster->todo_before_sleep |= flags;
3750 }
3751
3752 /* -----------------------------------------------------------------------------
3753 * Slots management
3754 * -------------------------------------------------------------------------- */
3755
3756 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3757 * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3758 int bitmapTestBit(unsigned char *bitmap, int pos) {
3759 off_t byte = pos/8;
3760 int bit = pos&7;
3761 return (bitmap[byte] & (1<<bit)) != 0;
3762 }
3763
3764 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3765 void bitmapSetBit(unsigned char *bitmap, int pos) {
3766 off_t byte = pos/8;
3767 int bit = pos&7;
3768 bitmap[byte] |= 1<<bit;
3769 }
3770
3771 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3772 void bitmapClearBit(unsigned char *bitmap, int pos) {
3773 off_t byte = pos/8;
3774 int bit = pos&7;
3775 bitmap[byte] &= ~(1<<bit);
3776 }
3777
3778 /* Return non-zero if there is at least one master with slaves in the cluster.
3779 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3780 * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3781 int clusterMastersHaveSlaves(void) {
3782 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3783 dictEntry *de;
3784 int slaves = 0;
3785 while((de = dictNext(di)) != NULL) {
3786 clusterNode *node = dictGetVal(de);
3787
3788 if (nodeIsSlave(node)) continue;
3789 slaves += node->numslaves;
3790 }
3791 dictReleaseIterator(di);
3792 return slaves != 0;
3793 }
3794
3795 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3796 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3797 int old = bitmapTestBit(n->slots,slot);
3798 bitmapSetBit(n->slots,slot);
3799 if (!old) {
3800 n->numslots++;
3801 /* When a master gets its first slot, even if it has no slaves,
3802 * it gets flagged with MIGRATE_TO, that is, the master is a valid
3803 * target for replicas migration, if and only if at least one of
3804 * the other masters has slaves right now.
3805 *
3806 * Normally masters are valid targets of replica migration if:
3807 * 1. The used to have slaves (but no longer have).
3808 * 2. They are slaves failing over a master that used to have slaves.
3809 *
3810 * However new masters with slots assigned are considered valid
3811 * migration targets if the rest of the cluster is not a slave-less.
3812 *
3813 * See https://github.com/antirez/redis/issues/3043 for more info. */
3814 if (n->numslots == 1 && clusterMastersHaveSlaves())
3815 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3816 }
3817 return old;
3818 }
3819
3820 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3821 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3822 int old = bitmapTestBit(n->slots,slot);
3823 bitmapClearBit(n->slots,slot);
3824 if (old) n->numslots--;
3825 return old;
3826 }
3827
3828 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3829 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3830 return bitmapTestBit(n->slots,slot);
3831 }
3832
3833 /* Add the specified slot to the list of slots that node 'n' will
3834 * serve. Return C_OK if the operation ended with success.
3835 * If the slot is already assigned to another instance this is considered
3836 * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3837 int clusterAddSlot(clusterNode *n, int slot) {
3838 if (server.cluster->slots[slot]) return C_ERR;
3839 clusterNodeSetSlotBit(n,slot);
3840 server.cluster->slots[slot] = n;
3841 return C_OK;
3842 }
3843
3844 /* Delete the specified slot marking it as unassigned.
3845 * Returns C_OK if the slot was assigned, otherwise if the slot was
3846 * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3847 int clusterDelSlot(int slot) {
3848 clusterNode *n = server.cluster->slots[slot];
3849
3850 if (!n) return C_ERR;
3851 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3852 server.cluster->slots[slot] = NULL;
3853 return C_OK;
3854 }
3855
3856 /* Delete all the slots associated with the specified node.
3857 * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3858 int clusterDelNodeSlots(clusterNode *node) {
3859 int deleted = 0, j;
3860
3861 for (j = 0; j < CLUSTER_SLOTS; j++) {
3862 if (clusterNodeGetSlotBit(node,j)) {
3863 clusterDelSlot(j);
3864 deleted++;
3865 }
3866 }
3867 return deleted;
3868 }
3869
3870 /* Clear the migrating / importing state for all the slots.
3871 * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3872 void clusterCloseAllSlots(void) {
3873 memset(server.cluster->migrating_slots_to,0,
3874 sizeof(server.cluster->migrating_slots_to));
3875 memset(server.cluster->importing_slots_from,0,
3876 sizeof(server.cluster->importing_slots_from));
3877 }
3878
3879 /* -----------------------------------------------------------------------------
3880 * Cluster state evaluation function
3881 * -------------------------------------------------------------------------- */
3882
3883 /* The following are defines that are only used in the evaluation function
3884 * and are based on heuristics. Actually the main point about the rejoin and
3885 * writable delay is that they should be a few orders of magnitude larger
3886 * than the network latency. */
3887 #define CLUSTER_MAX_REJOIN_DELAY 5000
3888 #define CLUSTER_MIN_REJOIN_DELAY 500
3889 #define CLUSTER_WRITABLE_DELAY 2000
3890
clusterUpdateState(void)3891 void clusterUpdateState(void) {
3892 int j, new_state;
3893 int reachable_masters = 0;
3894 static mstime_t among_minority_time;
3895 static mstime_t first_call_time = 0;
3896
3897 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3898
3899 /* If this is a master node, wait some time before turning the state
3900 * into OK, since it is not a good idea to rejoin the cluster as a writable
3901 * master, after a reboot, without giving the cluster a chance to
3902 * reconfigure this node. Note that the delay is calculated starting from
3903 * the first call to this function and not since the server start, in order
3904 * to don't count the DB loading time. */
3905 if (first_call_time == 0) first_call_time = mstime();
3906 if (nodeIsMaster(myself) &&
3907 server.cluster->state == CLUSTER_FAIL &&
3908 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3909
3910 /* Start assuming the state is OK. We'll turn it into FAIL if there
3911 * are the right conditions. */
3912 new_state = CLUSTER_OK;
3913
3914 /* Check if all the slots are covered. */
3915 if (server.cluster_require_full_coverage) {
3916 for (j = 0; j < CLUSTER_SLOTS; j++) {
3917 if (server.cluster->slots[j] == NULL ||
3918 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3919 {
3920 new_state = CLUSTER_FAIL;
3921 break;
3922 }
3923 }
3924 }
3925
3926 /* Compute the cluster size, that is the number of master nodes
3927 * serving at least a single slot.
3928 *
3929 * At the same time count the number of reachable masters having
3930 * at least one slot. */
3931 {
3932 dictIterator *di;
3933 dictEntry *de;
3934
3935 server.cluster->size = 0;
3936 di = dictGetSafeIterator(server.cluster->nodes);
3937 while((de = dictNext(di)) != NULL) {
3938 clusterNode *node = dictGetVal(de);
3939
3940 if (nodeIsMaster(node) && node->numslots) {
3941 server.cluster->size++;
3942 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3943 reachable_masters++;
3944 }
3945 }
3946 dictReleaseIterator(di);
3947 }
3948
3949 /* If we are in a minority partition, change the cluster state
3950 * to FAIL. */
3951 {
3952 int needed_quorum = (server.cluster->size / 2) + 1;
3953
3954 if (reachable_masters < needed_quorum) {
3955 new_state = CLUSTER_FAIL;
3956 among_minority_time = mstime();
3957 }
3958 }
3959
3960 /* Log a state change */
3961 if (new_state != server.cluster->state) {
3962 mstime_t rejoin_delay = server.cluster_node_timeout;
3963
3964 /* If the instance is a master and was partitioned away with the
3965 * minority, don't let it accept queries for some time after the
3966 * partition heals, to make sure there is enough time to receive
3967 * a configuration update. */
3968 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3969 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3970 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3971 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3972
3973 if (new_state == CLUSTER_OK &&
3974 nodeIsMaster(myself) &&
3975 mstime() - among_minority_time < rejoin_delay)
3976 {
3977 return;
3978 }
3979
3980 /* Change the state and log the event. */
3981 serverLog(LL_WARNING,"Cluster state changed: %s",
3982 new_state == CLUSTER_OK ? "ok" : "fail");
3983 server.cluster->state = new_state;
3984 }
3985 }
3986
3987 /* This function is called after the node startup in order to verify that data
3988 * loaded from disk is in agreement with the cluster configuration:
3989 *
3990 * 1) If we find keys about hash slots we have no responsibility for, the
3991 * following happens:
3992 * A) If no other node is in charge according to the current cluster
3993 * configuration, we add these slots to our node.
3994 * B) If according to our config other nodes are already in charge for
3995 * this slots, we set the slots as IMPORTING from our point of view
3996 * in order to justify we have those slots, and in order to make
3997 * redis-trib aware of the issue, so that it can try to fix it.
3998 * 2) If we find data in a DB different than DB0 we return C_ERR to
3999 * signal the caller it should quit the server with an error message
4000 * or take other actions.
4001 *
4002 * The function always returns C_OK even if it will try to correct
4003 * the error described in "1". However if data is found in DB different
4004 * from DB0, C_ERR is returned.
4005 *
4006 * The function also uses the logging facility in order to warn the user
4007 * about desynchronizations between the data we have in memory and the
4008 * cluster configuration. */
verifyClusterConfigWithData(void)4009 int verifyClusterConfigWithData(void) {
4010 int j;
4011 int update_config = 0;
4012
4013 /* Return ASAP if a module disabled cluster redirections. In that case
4014 * every master can store keys about every possible hash slot. */
4015 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
4016 return C_OK;
4017
4018 /* If this node is a slave, don't perform the check at all as we
4019 * completely depend on the replication stream. */
4020 if (nodeIsSlave(myself)) return C_OK;
4021
4022 /* Make sure we only have keys in DB0. */
4023 for (j = 1; j < server.dbnum; j++) {
4024 if (dictSize(server.db[j].dict)) return C_ERR;
4025 }
4026
4027 /* Check that all the slots we see populated memory have a corresponding
4028 * entry in the cluster table. Otherwise fix the table. */
4029 for (j = 0; j < CLUSTER_SLOTS; j++) {
4030 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
4031 /* Check if we are assigned to this slot or if we are importing it.
4032 * In both cases check the next slot as the configuration makes
4033 * sense. */
4034 if (server.cluster->slots[j] == myself ||
4035 server.cluster->importing_slots_from[j] != NULL) continue;
4036
4037 /* If we are here data and cluster config don't agree, and we have
4038 * slot 'j' populated even if we are not importing it, nor we are
4039 * assigned to this slot. Fix this condition. */
4040
4041 update_config++;
4042 /* Case A: slot is unassigned. Take responsibility for it. */
4043 if (server.cluster->slots[j] == NULL) {
4044 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
4045 "Taking responsibility for it.",j);
4046 clusterAddSlot(myself,j);
4047 } else {
4048 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
4049 "assigned to another node. "
4050 "Setting it to importing state.",j);
4051 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
4052 }
4053 }
4054 if (update_config) clusterSaveConfigOrDie(1);
4055 return C_OK;
4056 }
4057
4058 /* -----------------------------------------------------------------------------
4059 * SLAVE nodes handling
4060 * -------------------------------------------------------------------------- */
4061
4062 /* Set the specified node 'n' as master for this node.
4063 * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)4064 void clusterSetMaster(clusterNode *n) {
4065 serverAssert(n != myself);
4066 serverAssert(myself->numslots == 0);
4067
4068 if (nodeIsMaster(myself)) {
4069 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
4070 myself->flags |= CLUSTER_NODE_SLAVE;
4071 clusterCloseAllSlots();
4072 } else {
4073 if (myself->slaveof)
4074 clusterNodeRemoveSlave(myself->slaveof,myself);
4075 }
4076 myself->slaveof = n;
4077 clusterNodeAddSlave(n,myself);
4078 replicationSetMaster(n->ip, n->port);
4079 resetManualFailover();
4080 }
4081
4082 /* -----------------------------------------------------------------------------
4083 * Nodes to string representation functions.
4084 * -------------------------------------------------------------------------- */
4085
4086 struct redisNodeFlags {
4087 uint16_t flag;
4088 char *name;
4089 };
4090
4091 static struct redisNodeFlags redisNodeFlagsTable[] = {
4092 {CLUSTER_NODE_MYSELF, "myself,"},
4093 {CLUSTER_NODE_MASTER, "master,"},
4094 {CLUSTER_NODE_SLAVE, "slave,"},
4095 {CLUSTER_NODE_PFAIL, "fail?,"},
4096 {CLUSTER_NODE_FAIL, "fail,"},
4097 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
4098 {CLUSTER_NODE_NOADDR, "noaddr,"},
4099 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
4100 };
4101
4102 /* Concatenate the comma separated list of node flags to the given SDS
4103 * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)4104 sds representClusterNodeFlags(sds ci, uint16_t flags) {
4105 size_t orig_len = sdslen(ci);
4106 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4107 for (i = 0; i < size; i++) {
4108 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4109 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4110 }
4111 /* If no flag was added, add the "noflags" special flag. */
4112 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4113 sdsIncrLen(ci,-1); /* Remove trailing comma. */
4114 return ci;
4115 }
4116
4117 /* Generate a csv-alike representation of the specified cluster node.
4118 * See clusterGenNodesDescription() top comment for more information.
4119 *
4120 * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)4121 sds clusterGenNodeDescription(clusterNode *node) {
4122 int j, start;
4123 sds ci;
4124
4125 /* Node coordinates */
4126 ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
4127 node->name,
4128 node->ip,
4129 node->port,
4130 node->cport);
4131
4132 /* Flags */
4133 ci = representClusterNodeFlags(ci, node->flags);
4134
4135 /* Slave of... or just "-" */
4136 if (node->slaveof)
4137 ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4138 else
4139 ci = sdscatlen(ci," - ",3);
4140
4141 unsigned long long nodeEpoch = node->configEpoch;
4142 if (nodeIsSlave(node) && node->slaveof) {
4143 nodeEpoch = node->slaveof->configEpoch;
4144 }
4145 /* Latency from the POV of this node, config epoch, link status */
4146 ci = sdscatprintf(ci,"%lld %lld %llu %s",
4147 (long long) node->ping_sent,
4148 (long long) node->pong_received,
4149 nodeEpoch,
4150 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4151 "connected" : "disconnected");
4152
4153 /* Slots served by this instance */
4154 start = -1;
4155 for (j = 0; j < CLUSTER_SLOTS; j++) {
4156 int bit;
4157
4158 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4159 if (start == -1) start = j;
4160 }
4161 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4162 if (bit && j == CLUSTER_SLOTS-1) j++;
4163
4164 if (start == j-1) {
4165 ci = sdscatprintf(ci," %d",start);
4166 } else {
4167 ci = sdscatprintf(ci," %d-%d",start,j-1);
4168 }
4169 start = -1;
4170 }
4171 }
4172
4173 /* Just for MYSELF node we also dump info about slots that
4174 * we are migrating to other instances or importing from other
4175 * instances. */
4176 if (node->flags & CLUSTER_NODE_MYSELF) {
4177 for (j = 0; j < CLUSTER_SLOTS; j++) {
4178 if (server.cluster->migrating_slots_to[j]) {
4179 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4180 server.cluster->migrating_slots_to[j]->name);
4181 } else if (server.cluster->importing_slots_from[j]) {
4182 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4183 server.cluster->importing_slots_from[j]->name);
4184 }
4185 }
4186 }
4187 return ci;
4188 }
4189
4190 /* Generate a csv-alike representation of the nodes we are aware of,
4191 * including the "myself" node, and return an SDS string containing the
4192 * representation (it is up to the caller to free it).
4193 *
4194 * All the nodes matching at least one of the node flags specified in
4195 * "filter" are excluded from the output, so using zero as a filter will
4196 * include all the known nodes in the representation, including nodes in
4197 * the HANDSHAKE state.
4198 *
4199 * The representation obtained using this function is used for the output
4200 * of the CLUSTER NODES function, and as format for the cluster
4201 * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4202 sds clusterGenNodesDescription(int filter) {
4203 sds ci = sdsempty(), ni;
4204 dictIterator *di;
4205 dictEntry *de;
4206
4207 di = dictGetSafeIterator(server.cluster->nodes);
4208 while((de = dictNext(di)) != NULL) {
4209 clusterNode *node = dictGetVal(de);
4210
4211 if (node->flags & filter) continue;
4212 ni = clusterGenNodeDescription(node);
4213 ci = sdscatsds(ci,ni);
4214 sdsfree(ni);
4215 ci = sdscatlen(ci,"\n",1);
4216 }
4217 dictReleaseIterator(di);
4218 return ci;
4219 }
4220
4221 /* -----------------------------------------------------------------------------
4222 * CLUSTER command
4223 * -------------------------------------------------------------------------- */
4224
clusterGetMessageTypeString(int type)4225 const char *clusterGetMessageTypeString(int type) {
4226 switch(type) {
4227 case CLUSTERMSG_TYPE_PING: return "ping";
4228 case CLUSTERMSG_TYPE_PONG: return "pong";
4229 case CLUSTERMSG_TYPE_MEET: return "meet";
4230 case CLUSTERMSG_TYPE_FAIL: return "fail";
4231 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4232 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4233 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4234 case CLUSTERMSG_TYPE_UPDATE: return "update";
4235 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4236 case CLUSTERMSG_TYPE_MODULE: return "module";
4237 }
4238 return "unknown";
4239 }
4240
getSlotOrReply(client * c,robj * o)4241 int getSlotOrReply(client *c, robj *o) {
4242 long long slot;
4243
4244 if (getLongLongFromObject(o,&slot) != C_OK ||
4245 slot < 0 || slot >= CLUSTER_SLOTS)
4246 {
4247 addReplyError(c,"Invalid or out of range slot");
4248 return -1;
4249 }
4250 return (int) slot;
4251 }
4252
clusterReplyMultiBulkSlots(client * c)4253 void clusterReplyMultiBulkSlots(client *c) {
4254 /* Format: 1) 1) start slot
4255 * 2) end slot
4256 * 3) 1) master IP
4257 * 2) master port
4258 * 3) node ID
4259 * 4) 1) replica IP
4260 * 2) replica port
4261 * 3) node ID
4262 * ... continued until done
4263 */
4264
4265 int num_masters = 0;
4266 void *slot_replylen = addReplyDeferredLen(c);
4267
4268 dictEntry *de;
4269 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4270 while((de = dictNext(di)) != NULL) {
4271 clusterNode *node = dictGetVal(de);
4272 int j = 0, start = -1;
4273 int i, nested_elements = 0;
4274
4275 /* Skip slaves (that are iterated when producing the output of their
4276 * master) and masters not serving any slot. */
4277 if (!nodeIsMaster(node) || node->numslots == 0) continue;
4278
4279 for(i = 0; i < node->numslaves; i++) {
4280 if (nodeFailed(node->slaves[i])) continue;
4281 nested_elements++;
4282 }
4283
4284 for (j = 0; j < CLUSTER_SLOTS; j++) {
4285 int bit, i;
4286
4287 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4288 if (start == -1) start = j;
4289 }
4290 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4291 addReplyArrayLen(c, nested_elements + 3); /* slots (2) + master addr (1). */
4292
4293 if (bit && j == CLUSTER_SLOTS-1) j++;
4294
4295 /* If slot exists in output map, add to it's list.
4296 * else, create a new output map for this slot */
4297 if (start == j-1) {
4298 addReplyLongLong(c, start); /* only one slot; low==high */
4299 addReplyLongLong(c, start);
4300 } else {
4301 addReplyLongLong(c, start); /* low */
4302 addReplyLongLong(c, j-1); /* high */
4303 }
4304 start = -1;
4305
4306 /* First node reply position is always the master */
4307 addReplyArrayLen(c, 3);
4308 addReplyBulkCString(c, node->ip);
4309 addReplyLongLong(c, node->port);
4310 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4311
4312 /* Remaining nodes in reply are replicas for slot range */
4313 for (i = 0; i < node->numslaves; i++) {
4314 /* This loop is copy/pasted from clusterGenNodeDescription()
4315 * with modifications for per-slot node aggregation */
4316 if (nodeFailed(node->slaves[i])) continue;
4317 addReplyArrayLen(c, 3);
4318 addReplyBulkCString(c, node->slaves[i]->ip);
4319 addReplyLongLong(c, node->slaves[i]->port);
4320 addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4321 }
4322 num_masters++;
4323 }
4324 }
4325 }
4326 dictReleaseIterator(di);
4327 setDeferredArrayLen(c, slot_replylen, num_masters);
4328 }
4329
clusterCommand(client * c)4330 void clusterCommand(client *c) {
4331 if (server.cluster_enabled == 0) {
4332 addReplyError(c,"This instance has cluster support disabled");
4333 return;
4334 }
4335
4336 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4337 const char *help[] = {
4338 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4339 "BUMPEPOCH -- Advance the cluster config epoch.",
4340 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4341 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4342 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4343 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4344 "FORGET <node-id> -- Remove a node from the cluster.",
4345 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4346 "FLUSHSLOTS -- Delete current node own slots information.",
4347 "INFO - Return information about the cluster.",
4348 "KEYSLOT <key> -- Return the hash slot for <key>.",
4349 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4350 "MYID -- Return the node id.",
4351 "NODES -- Return cluster configuration seen by node. Output format:",
4352 " <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4353 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4354 "RESET [hard|soft] -- Reset current node (default: soft).",
4355 "SET-config-epoch <epoch> - Set config epoch of current node.",
4356 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4357 "REPLICAS <node-id> -- Return <node-id> replicas.",
4358 "SAVECONFIG - Force saving cluster configuration on disk.",
4359 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4360 " start, end, master and replicas IP addresses, ports and ids",
4361 NULL
4362 };
4363 addReplyHelp(c, help);
4364 } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4365 /* CLUSTER MEET <ip> <port> [cport] */
4366 long long port, cport;
4367
4368 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4369 addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4370 (char*)c->argv[3]->ptr);
4371 return;
4372 }
4373
4374 if (c->argc == 5) {
4375 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4376 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4377 (char*)c->argv[4]->ptr);
4378 return;
4379 }
4380 } else {
4381 cport = port + CLUSTER_PORT_INCR;
4382 }
4383
4384 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4385 errno == EINVAL)
4386 {
4387 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4388 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4389 } else {
4390 addReply(c,shared.ok);
4391 }
4392 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4393 /* CLUSTER NODES */
4394 sds nodes = clusterGenNodesDescription(0);
4395 addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
4396 sdsfree(nodes);
4397 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4398 /* CLUSTER MYID */
4399 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4400 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4401 /* CLUSTER SLOTS */
4402 clusterReplyMultiBulkSlots(c);
4403 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4404 /* CLUSTER FLUSHSLOTS */
4405 if (dictSize(server.db[0].dict) != 0) {
4406 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4407 return;
4408 }
4409 clusterDelNodeSlots(myself);
4410 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4411 addReply(c,shared.ok);
4412 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4413 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4414 {
4415 /* CLUSTER ADDSLOTS <slot> [slot] ... */
4416 /* CLUSTER DELSLOTS <slot> [slot] ... */
4417 int j, slot;
4418 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4419 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4420
4421 memset(slots,0,CLUSTER_SLOTS);
4422 /* Check that all the arguments are parseable and that all the
4423 * slots are not already busy. */
4424 for (j = 2; j < c->argc; j++) {
4425 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4426 zfree(slots);
4427 return;
4428 }
4429 if (del && server.cluster->slots[slot] == NULL) {
4430 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4431 zfree(slots);
4432 return;
4433 } else if (!del && server.cluster->slots[slot]) {
4434 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4435 zfree(slots);
4436 return;
4437 }
4438 if (slots[slot]++ == 1) {
4439 addReplyErrorFormat(c,"Slot %d specified multiple times",
4440 (int)slot);
4441 zfree(slots);
4442 return;
4443 }
4444 }
4445 for (j = 0; j < CLUSTER_SLOTS; j++) {
4446 if (slots[j]) {
4447 int retval;
4448
4449 /* If this slot was set as importing we can clear this
4450 * state as now we are the real owner of the slot. */
4451 if (server.cluster->importing_slots_from[j])
4452 server.cluster->importing_slots_from[j] = NULL;
4453
4454 retval = del ? clusterDelSlot(j) :
4455 clusterAddSlot(myself,j);
4456 serverAssertWithInfo(c,NULL,retval == C_OK);
4457 }
4458 }
4459 zfree(slots);
4460 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4461 addReply(c,shared.ok);
4462 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4463 /* SETSLOT 10 MIGRATING <node ID> */
4464 /* SETSLOT 10 IMPORTING <node ID> */
4465 /* SETSLOT 10 STABLE */
4466 /* SETSLOT 10 NODE <node ID> */
4467 int slot;
4468 clusterNode *n;
4469
4470 if (nodeIsSlave(myself)) {
4471 addReplyError(c,"Please use SETSLOT only with masters.");
4472 return;
4473 }
4474
4475 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4476
4477 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4478 if (server.cluster->slots[slot] != myself) {
4479 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4480 return;
4481 }
4482 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4483 addReplyErrorFormat(c,"I don't know about node %s",
4484 (char*)c->argv[4]->ptr);
4485 return;
4486 }
4487 server.cluster->migrating_slots_to[slot] = n;
4488 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4489 if (server.cluster->slots[slot] == myself) {
4490 addReplyErrorFormat(c,
4491 "I'm already the owner of hash slot %u",slot);
4492 return;
4493 }
4494 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4495 addReplyErrorFormat(c,"I don't know about node %s",
4496 (char*)c->argv[4]->ptr);
4497 return;
4498 }
4499 server.cluster->importing_slots_from[slot] = n;
4500 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4501 /* CLUSTER SETSLOT <SLOT> STABLE */
4502 server.cluster->importing_slots_from[slot] = NULL;
4503 server.cluster->migrating_slots_to[slot] = NULL;
4504 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4505 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4506 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4507
4508 if (!n) {
4509 addReplyErrorFormat(c,"Unknown node %s",
4510 (char*)c->argv[4]->ptr);
4511 return;
4512 }
4513 /* If this hash slot was served by 'myself' before to switch
4514 * make sure there are no longer local keys for this hash slot. */
4515 if (server.cluster->slots[slot] == myself && n != myself) {
4516 if (countKeysInSlot(slot) != 0) {
4517 addReplyErrorFormat(c,
4518 "Can't assign hashslot %d to a different node "
4519 "while I still hold keys for this hash slot.", slot);
4520 return;
4521 }
4522 }
4523 /* If this slot is in migrating status but we have no keys
4524 * for it assigning the slot to another node will clear
4525 * the migrating status. */
4526 if (countKeysInSlot(slot) == 0 &&
4527 server.cluster->migrating_slots_to[slot])
4528 server.cluster->migrating_slots_to[slot] = NULL;
4529
4530 /* If this node was importing this slot, assigning the slot to
4531 * itself also clears the importing status. */
4532 if (n == myself &&
4533 server.cluster->importing_slots_from[slot])
4534 {
4535 /* This slot was manually migrated, set this node configEpoch
4536 * to a new epoch so that the new version can be propagated
4537 * by the cluster.
4538 *
4539 * Note that if this ever results in a collision with another
4540 * node getting the same configEpoch, for example because a
4541 * failover happens at the same time we close the slot, the
4542 * configEpoch collision resolution will fix it assigning
4543 * a different epoch to each node. */
4544 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4545 serverLog(LL_WARNING,
4546 "configEpoch updated after importing slot %d", slot);
4547 }
4548 server.cluster->importing_slots_from[slot] = NULL;
4549 }
4550 clusterDelSlot(slot);
4551 clusterAddSlot(n,slot);
4552 } else {
4553 addReplyError(c,
4554 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4555 return;
4556 }
4557 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4558 addReply(c,shared.ok);
4559 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4560 /* CLUSTER BUMPEPOCH */
4561 int retval = clusterBumpConfigEpochWithoutConsensus();
4562 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4563 (retval == C_OK) ? "BUMPED" : "STILL",
4564 (unsigned long long) myself->configEpoch);
4565 addReplySds(c,reply);
4566 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4567 /* CLUSTER INFO */
4568 char *statestr[] = {"ok","fail","needhelp"};
4569 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4570 uint64_t myepoch;
4571 int j;
4572
4573 for (j = 0; j < CLUSTER_SLOTS; j++) {
4574 clusterNode *n = server.cluster->slots[j];
4575
4576 if (n == NULL) continue;
4577 slots_assigned++;
4578 if (nodeFailed(n)) {
4579 slots_fail++;
4580 } else if (nodeTimedOut(n)) {
4581 slots_pfail++;
4582 } else {
4583 slots_ok++;
4584 }
4585 }
4586
4587 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4588 myself->slaveof->configEpoch : myself->configEpoch;
4589
4590 sds info = sdscatprintf(sdsempty(),
4591 "cluster_state:%s\r\n"
4592 "cluster_slots_assigned:%d\r\n"
4593 "cluster_slots_ok:%d\r\n"
4594 "cluster_slots_pfail:%d\r\n"
4595 "cluster_slots_fail:%d\r\n"
4596 "cluster_known_nodes:%lu\r\n"
4597 "cluster_size:%d\r\n"
4598 "cluster_current_epoch:%llu\r\n"
4599 "cluster_my_epoch:%llu\r\n"
4600 , statestr[server.cluster->state],
4601 slots_assigned,
4602 slots_ok,
4603 slots_pfail,
4604 slots_fail,
4605 dictSize(server.cluster->nodes),
4606 server.cluster->size,
4607 (unsigned long long) server.cluster->currentEpoch,
4608 (unsigned long long) myepoch
4609 );
4610
4611 /* Show stats about messages sent and received. */
4612 long long tot_msg_sent = 0;
4613 long long tot_msg_received = 0;
4614
4615 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4616 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4617 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4618 info = sdscatprintf(info,
4619 "cluster_stats_messages_%s_sent:%lld\r\n",
4620 clusterGetMessageTypeString(i),
4621 server.cluster->stats_bus_messages_sent[i]);
4622 }
4623 info = sdscatprintf(info,
4624 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4625
4626 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4627 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4628 tot_msg_received += server.cluster->stats_bus_messages_received[i];
4629 info = sdscatprintf(info,
4630 "cluster_stats_messages_%s_received:%lld\r\n",
4631 clusterGetMessageTypeString(i),
4632 server.cluster->stats_bus_messages_received[i]);
4633 }
4634 info = sdscatprintf(info,
4635 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4636
4637 /* Produce the reply protocol. */
4638 addReplyVerbatim(c,info,sdslen(info),"txt");
4639 sdsfree(info);
4640 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4641 int retval = clusterSaveConfig(1);
4642
4643 if (retval == 0)
4644 addReply(c,shared.ok);
4645 else
4646 addReplyErrorFormat(c,"error saving the cluster node config: %s",
4647 strerror(errno));
4648 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4649 /* CLUSTER KEYSLOT <key> */
4650 sds key = c->argv[2]->ptr;
4651
4652 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4653 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4654 /* CLUSTER COUNTKEYSINSLOT <slot> */
4655 long long slot;
4656
4657 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4658 return;
4659 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4660 addReplyError(c,"Invalid slot");
4661 return;
4662 }
4663 addReplyLongLong(c,countKeysInSlot(slot));
4664 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4665 /* CLUSTER GETKEYSINSLOT <slot> <count> */
4666 long long maxkeys, slot;
4667 unsigned int numkeys, j;
4668 robj **keys;
4669
4670 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4671 return;
4672 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4673 != C_OK)
4674 return;
4675 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4676 addReplyError(c,"Invalid slot or number of keys");
4677 return;
4678 }
4679
4680 /* Avoid allocating more than needed in case of large COUNT argument
4681 * and smaller actual number of keys. */
4682 unsigned int keys_in_slot = countKeysInSlot(slot);
4683 if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4684
4685 keys = zmalloc(sizeof(robj*)*maxkeys);
4686 numkeys = getKeysInSlot(slot, keys, maxkeys);
4687 addReplyArrayLen(c,numkeys);
4688 for (j = 0; j < numkeys; j++) {
4689 addReplyBulk(c,keys[j]);
4690 decrRefCount(keys[j]);
4691 }
4692 zfree(keys);
4693 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4694 /* CLUSTER FORGET <NODE ID> */
4695 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4696
4697 if (!n) {
4698 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4699 return;
4700 } else if (n == myself) {
4701 addReplyError(c,"I tried hard but I can't forget myself...");
4702 return;
4703 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4704 addReplyError(c,"Can't forget my master!");
4705 return;
4706 }
4707 clusterBlacklistAddNode(n);
4708 clusterDelNode(n);
4709 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4710 CLUSTER_TODO_SAVE_CONFIG);
4711 addReply(c,shared.ok);
4712 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4713 /* CLUSTER REPLICATE <NODE ID> */
4714 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4715
4716 /* Lookup the specified node in our table. */
4717 if (!n) {
4718 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4719 return;
4720 }
4721
4722 /* I can't replicate myself. */
4723 if (n == myself) {
4724 addReplyError(c,"Can't replicate myself");
4725 return;
4726 }
4727
4728 /* Can't replicate a slave. */
4729 if (nodeIsSlave(n)) {
4730 addReplyError(c,"I can only replicate a master, not a replica.");
4731 return;
4732 }
4733
4734 /* If the instance is currently a master, it should have no assigned
4735 * slots nor keys to accept to replicate some other node.
4736 * Slaves can switch to another master without issues. */
4737 if (nodeIsMaster(myself) &&
4738 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4739 addReplyError(c,
4740 "To set a master the node must be empty and "
4741 "without assigned slots.");
4742 return;
4743 }
4744
4745 /* Set the master. */
4746 clusterSetMaster(n);
4747 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4748 addReply(c,shared.ok);
4749 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4750 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4751 /* CLUSTER SLAVES <NODE ID> */
4752 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4753 int j;
4754
4755 /* Lookup the specified node in our table. */
4756 if (!n) {
4757 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4758 return;
4759 }
4760
4761 if (nodeIsSlave(n)) {
4762 addReplyError(c,"The specified node is not a master");
4763 return;
4764 }
4765
4766 addReplyArrayLen(c,n->numslaves);
4767 for (j = 0; j < n->numslaves; j++) {
4768 sds ni = clusterGenNodeDescription(n->slaves[j]);
4769 addReplyBulkCString(c,ni);
4770 sdsfree(ni);
4771 }
4772 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4773 c->argc == 3)
4774 {
4775 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4776 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4777
4778 if (!n) {
4779 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4780 return;
4781 } else {
4782 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4783 }
4784 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4785 (c->argc == 2 || c->argc == 3))
4786 {
4787 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4788 int force = 0, takeover = 0;
4789
4790 if (c->argc == 3) {
4791 if (!strcasecmp(c->argv[2]->ptr,"force")) {
4792 force = 1;
4793 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4794 takeover = 1;
4795 force = 1; /* Takeover also implies force. */
4796 } else {
4797 addReply(c,shared.syntaxerr);
4798 return;
4799 }
4800 }
4801
4802 /* Check preconditions. */
4803 if (nodeIsMaster(myself)) {
4804 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4805 return;
4806 } else if (myself->slaveof == NULL) {
4807 addReplyError(c,"I'm a replica but my master is unknown to me");
4808 return;
4809 } else if (!force &&
4810 (nodeFailed(myself->slaveof) ||
4811 myself->slaveof->link == NULL))
4812 {
4813 addReplyError(c,"Master is down or failed, "
4814 "please use CLUSTER FAILOVER FORCE");
4815 return;
4816 }
4817 resetManualFailover();
4818 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4819
4820 if (takeover) {
4821 /* A takeover does not perform any initial check. It just
4822 * generates a new configuration epoch for this node without
4823 * consensus, claims the master's slots, and broadcast the new
4824 * configuration. */
4825 serverLog(LL_WARNING,"Taking over the master (user request).");
4826 clusterBumpConfigEpochWithoutConsensus();
4827 clusterFailoverReplaceYourMaster();
4828 } else if (force) {
4829 /* If this is a forced failover, we don't need to talk with our
4830 * master to agree about the offset. We just failover taking over
4831 * it without coordination. */
4832 serverLog(LL_WARNING,"Forced failover user request accepted.");
4833 server.cluster->mf_can_start = 1;
4834 } else {
4835 serverLog(LL_WARNING,"Manual failover user request accepted.");
4836 clusterSendMFStart(myself->slaveof);
4837 }
4838 addReply(c,shared.ok);
4839 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4840 {
4841 /* CLUSTER SET-CONFIG-EPOCH <epoch>
4842 *
4843 * The user is allowed to set the config epoch only when a node is
4844 * totally fresh: no config epoch, no other known node, and so forth.
4845 * This happens at cluster creation time to start with a cluster where
4846 * every node has a different node ID, without to rely on the conflicts
4847 * resolution system which is too slow when a big cluster is created. */
4848 long long epoch;
4849
4850 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4851 return;
4852
4853 if (epoch < 0) {
4854 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4855 } else if (dictSize(server.cluster->nodes) > 1) {
4856 addReplyError(c,"The user can assign a config epoch only when the "
4857 "node does not know any other node.");
4858 } else if (myself->configEpoch != 0) {
4859 addReplyError(c,"Node config epoch is already non-zero");
4860 } else {
4861 myself->configEpoch = epoch;
4862 serverLog(LL_WARNING,
4863 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4864 (unsigned long long) myself->configEpoch);
4865
4866 if (server.cluster->currentEpoch < (uint64_t)epoch)
4867 server.cluster->currentEpoch = epoch;
4868 /* No need to fsync the config here since in the unlucky event
4869 * of a failure to persist the config, the conflict resolution code
4870 * will assign a unique config to this node. */
4871 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4872 CLUSTER_TODO_SAVE_CONFIG);
4873 addReply(c,shared.ok);
4874 }
4875 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4876 (c->argc == 2 || c->argc == 3))
4877 {
4878 /* CLUSTER RESET [SOFT|HARD] */
4879 int hard = 0;
4880
4881 /* Parse soft/hard argument. Default is soft. */
4882 if (c->argc == 3) {
4883 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4884 hard = 1;
4885 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4886 hard = 0;
4887 } else {
4888 addReply(c,shared.syntaxerr);
4889 return;
4890 }
4891 }
4892
4893 /* Slaves can be reset while containing data, but not master nodes
4894 * that must be empty. */
4895 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4896 addReplyError(c,"CLUSTER RESET can't be called with "
4897 "master nodes containing keys");
4898 return;
4899 }
4900 clusterReset(hard);
4901 addReply(c,shared.ok);
4902 } else {
4903 addReplySubcommandSyntaxError(c);
4904 return;
4905 }
4906 }
4907
4908 /* -----------------------------------------------------------------------------
4909 * DUMP, RESTORE and MIGRATE commands
4910 * -------------------------------------------------------------------------- */
4911
4912 /* Generates a DUMP-format representation of the object 'o', adding it to the
4913 * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4914 void createDumpPayload(rio *payload, robj *o, robj *key) {
4915 unsigned char buf[2];
4916 uint64_t crc;
4917
4918 /* Serialize the object in an RDB-like format. It consist of an object type
4919 * byte followed by the serialized object. This is understood by RESTORE. */
4920 rioInitWithBuffer(payload,sdsempty());
4921 serverAssert(rdbSaveObjectType(payload,o));
4922 serverAssert(rdbSaveObject(payload,o,key));
4923
4924 /* Write the footer, this is how it looks like:
4925 * ----------------+---------------------+---------------+
4926 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4927 * ----------------+---------------------+---------------+
4928 * RDB version and CRC are both in little endian.
4929 */
4930
4931 /* RDB version */
4932 buf[0] = RDB_VERSION & 0xff;
4933 buf[1] = (RDB_VERSION >> 8) & 0xff;
4934 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4935
4936 /* CRC64 */
4937 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4938 sdslen(payload->io.buffer.ptr));
4939 memrev64ifbe(&crc);
4940 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4941 }
4942
4943 /* Verify that the RDB version of the dump payload matches the one of this Redis
4944 * instance and that the checksum is ok.
4945 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4946 * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4947 int verifyDumpPayload(unsigned char *p, size_t len) {
4948 unsigned char *footer;
4949 uint16_t rdbver;
4950 uint64_t crc;
4951
4952 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4953 if (len < 10) return C_ERR;
4954 footer = p+(len-10);
4955
4956 /* Verify RDB version */
4957 rdbver = (footer[1] << 8) | footer[0];
4958 if (rdbver > RDB_VERSION) return C_ERR;
4959
4960 /* Verify CRC64 */
4961 crc = crc64(0,p,len-8);
4962 memrev64ifbe(&crc);
4963 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4964 }
4965
4966 /* DUMP keyname
4967 * DUMP is actually not used by Redis Cluster but it is the obvious
4968 * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4969 void dumpCommand(client *c) {
4970 robj *o;
4971 rio payload;
4972
4973 /* Check if the key is here. */
4974 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4975 addReplyNull(c);
4976 return;
4977 }
4978
4979 /* Create the DUMP encoded representation. */
4980 createDumpPayload(&payload,o,c->argv[1]);
4981
4982 /* Transfer to the client */
4983 addReplyBulkSds(c,payload.io.buffer.ptr);
4984 return;
4985 }
4986
4987 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4988 void restoreCommand(client *c) {
4989 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4990 rio payload;
4991 int j, type, replace = 0, absttl = 0;
4992 robj *obj;
4993
4994 /* Parse additional options */
4995 for (j = 4; j < c->argc; j++) {
4996 int additional = c->argc-j-1;
4997 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4998 replace = 1;
4999 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
5000 absttl = 1;
5001 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
5002 lfu_freq == -1)
5003 {
5004 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
5005 != C_OK) return;
5006 if (lru_idle < 0) {
5007 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
5008 return;
5009 }
5010 lru_clock = LRU_CLOCK();
5011 j++; /* Consume additional arg. */
5012 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
5013 lru_idle == -1)
5014 {
5015 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
5016 != C_OK) return;
5017 if (lfu_freq < 0 || lfu_freq > 255) {
5018 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
5019 return;
5020 }
5021 j++; /* Consume additional arg. */
5022 } else {
5023 addReply(c,shared.syntaxerr);
5024 return;
5025 }
5026 }
5027
5028 /* Make sure this key does not already exist here... */
5029 robj *key = c->argv[1];
5030 if (!replace && lookupKeyWrite(c->db,key) != NULL) {
5031 addReply(c,shared.busykeyerr);
5032 return;
5033 }
5034
5035 /* Check if the TTL value makes sense */
5036 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
5037 return;
5038 } else if (ttl < 0) {
5039 addReplyError(c,"Invalid TTL value, must be >= 0");
5040 return;
5041 }
5042
5043 /* Verify RDB version and data checksum. */
5044 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
5045 {
5046 addReplyError(c,"DUMP payload version or checksum are wrong");
5047 return;
5048 }
5049
5050 rioInitWithBuffer(&payload,c->argv[3]->ptr);
5051 if (((type = rdbLoadObjectType(&payload)) == -1) ||
5052 ((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
5053 {
5054 addReplyError(c,"Bad data format");
5055 return;
5056 }
5057
5058 /* Remove the old key if needed. */
5059 int deleted = 0;
5060 if (replace)
5061 deleted = dbDelete(c->db,key);
5062
5063 if (ttl && !absttl) ttl+=mstime();
5064 if (ttl && checkAlreadyExpired(ttl)) {
5065 if (deleted) {
5066 rewriteClientCommandVector(c,2,shared.del,key);
5067 signalModifiedKey(c,c->db,key);
5068 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
5069 server.dirty++;
5070 }
5071 decrRefCount(obj);
5072 addReply(c, shared.ok);
5073 return;
5074 }
5075
5076 /* Create the key and set the TTL if any */
5077 dbAdd(c->db,key,obj);
5078 if (ttl) {
5079 setExpire(c,c->db,key,ttl);
5080 }
5081 objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
5082 signalModifiedKey(c,c->db,key);
5083 notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
5084 addReply(c,shared.ok);
5085 server.dirty++;
5086 }
5087
5088 /* MIGRATE socket cache implementation.
5089 *
5090 * We take a map between host:ip and a TCP socket that we used to connect
5091 * to this instance in recent time.
5092 * This sockets are closed when the max number we cache is reached, and also
5093 * in serverCron() when they are around for more than a few seconds. */
5094 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
5095 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
5096
5097 typedef struct migrateCachedSocket {
5098 connection *conn;
5099 long last_dbid;
5100 time_t last_use_time;
5101 } migrateCachedSocket;
5102
5103 /* Return a migrateCachedSocket containing a TCP socket connected with the
5104 * target instance, possibly returning a cached one.
5105 *
5106 * This function is responsible of sending errors to the client if a
5107 * connection can't be established. In this case -1 is returned.
5108 * Otherwise on success the socket is returned, and the caller should not
5109 * attempt to free it after usage.
5110 *
5111 * If the caller detects an error while using the socket, migrateCloseSocket()
5112 * should be called so that the connection will be created from scratch
5113 * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)5114 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
5115 connection *conn;
5116 sds name = sdsempty();
5117 migrateCachedSocket *cs;
5118
5119 /* Check if we have an already cached socket for this ip:port pair. */
5120 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5121 name = sdscatlen(name,":",1);
5122 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5123 cs = dictFetchValue(server.migrate_cached_sockets,name);
5124 if (cs) {
5125 sdsfree(name);
5126 cs->last_use_time = server.unixtime;
5127 return cs;
5128 }
5129
5130 /* No cached socket, create one. */
5131 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
5132 /* Too many items, drop one at random. */
5133 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
5134 cs = dictGetVal(de);
5135 connClose(cs->conn);
5136 zfree(cs);
5137 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5138 }
5139
5140 /* Create the socket */
5141 conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
5142 if (connBlockingConnect(conn, c->argv[1]->ptr, atoi(c->argv[2]->ptr), timeout)
5143 != C_OK) {
5144 addReplySds(c,
5145 sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5146 connClose(conn);
5147 sdsfree(name);
5148 return NULL;
5149 }
5150 connEnableTcpNoDelay(conn);
5151
5152 /* Add to the cache and return it to the caller. */
5153 cs = zmalloc(sizeof(*cs));
5154 cs->conn = conn;
5155
5156 cs->last_dbid = -1;
5157 cs->last_use_time = server.unixtime;
5158 dictAdd(server.migrate_cached_sockets,name,cs);
5159 return cs;
5160 }
5161
5162 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5163 void migrateCloseSocket(robj *host, robj *port) {
5164 sds name = sdsempty();
5165 migrateCachedSocket *cs;
5166
5167 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5168 name = sdscatlen(name,":",1);
5169 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5170 cs = dictFetchValue(server.migrate_cached_sockets,name);
5171 if (!cs) {
5172 sdsfree(name);
5173 return;
5174 }
5175
5176 connClose(cs->conn);
5177 zfree(cs);
5178 dictDelete(server.migrate_cached_sockets,name);
5179 sdsfree(name);
5180 }
5181
migrateCloseTimedoutSockets(void)5182 void migrateCloseTimedoutSockets(void) {
5183 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5184 dictEntry *de;
5185
5186 while((de = dictNext(di)) != NULL) {
5187 migrateCachedSocket *cs = dictGetVal(de);
5188
5189 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5190 connClose(cs->conn);
5191 zfree(cs);
5192 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5193 }
5194 }
5195 dictReleaseIterator(di);
5196 }
5197
5198 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
5199 * AUTH2 username password]
5200 *
5201 * On in the multiple keys form:
5202 *
5203 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
5204 * AUTH2 username password] KEYS key1 key2 ... keyN */
migrateCommand(client * c)5205 void migrateCommand(client *c) {
5206 migrateCachedSocket *cs;
5207 int copy = 0, replace = 0, j;
5208 char *username = NULL;
5209 char *password = NULL;
5210 long timeout;
5211 long dbid;
5212 robj **ov = NULL; /* Objects to migrate. */
5213 robj **kv = NULL; /* Key names. */
5214 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5215 rio cmd, payload;
5216 int may_retry = 1;
5217 int write_error = 0;
5218 int argv_rewritten = 0;
5219
5220 /* To support the KEYS option we need the following additional state. */
5221 int first_key = 3; /* Argument index of the first key. */
5222 int num_keys = 1; /* By default only migrate the 'key' argument. */
5223
5224 /* Parse additional options */
5225 for (j = 6; j < c->argc; j++) {
5226 int moreargs = (c->argc-1) - j;
5227 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5228 copy = 1;
5229 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5230 replace = 1;
5231 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5232 if (!moreargs) {
5233 addReply(c,shared.syntaxerr);
5234 return;
5235 }
5236 j++;
5237 password = c->argv[j]->ptr;
5238 } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
5239 if (moreargs < 2) {
5240 addReply(c,shared.syntaxerr);
5241 return;
5242 }
5243 username = c->argv[++j]->ptr;
5244 password = c->argv[++j]->ptr;
5245 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5246 if (sdslen(c->argv[3]->ptr) != 0) {
5247 addReplyError(c,
5248 "When using MIGRATE KEYS option, the key argument"
5249 " must be set to the empty string");
5250 return;
5251 }
5252 first_key = j+1;
5253 num_keys = c->argc - j - 1;
5254 break; /* All the remaining args are keys. */
5255 } else {
5256 addReply(c,shared.syntaxerr);
5257 return;
5258 }
5259 }
5260
5261 /* Sanity check */
5262 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5263 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5264 {
5265 return;
5266 }
5267 if (timeout <= 0) timeout = 1000;
5268
5269 /* Check if the keys are here. If at least one key is to migrate, do it
5270 * otherwise if all the keys are missing reply with "NOKEY" to signal
5271 * the caller there was nothing to migrate. We don't return an error in
5272 * this case, since often this is due to a normal condition like the key
5273 * expiring in the meantime. */
5274 ov = zrealloc(ov,sizeof(robj*)*num_keys);
5275 kv = zrealloc(kv,sizeof(robj*)*num_keys);
5276 int oi = 0;
5277
5278 for (j = 0; j < num_keys; j++) {
5279 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5280 kv[oi] = c->argv[first_key+j];
5281 oi++;
5282 }
5283 }
5284 num_keys = oi;
5285 if (num_keys == 0) {
5286 zfree(ov); zfree(kv);
5287 addReplySds(c,sdsnew("+NOKEY\r\n"));
5288 return;
5289 }
5290
5291 try_again:
5292 write_error = 0;
5293
5294 /* Connect */
5295 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5296 if (cs == NULL) {
5297 zfree(ov); zfree(kv);
5298 return; /* error sent to the client by migrateGetSocket() */
5299 }
5300
5301 rioInitWithBuffer(&cmd,sdsempty());
5302
5303 /* Authentication */
5304 if (password) {
5305 int arity = username ? 3 : 2;
5306 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
5307 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5308 if (username) {
5309 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
5310 sdslen(username)));
5311 }
5312 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5313 sdslen(password)));
5314 }
5315
5316 /* Send the SELECT command if the current DB is not already selected. */
5317 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5318 if (select) {
5319 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5320 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5321 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5322 }
5323
5324 int non_expired = 0; /* Number of keys that we'll find non expired.
5325 Note that serializing large keys may take some time
5326 so certain keys that were found non expired by the
5327 lookupKey() function, may be expired later. */
5328
5329 /* Create RESTORE payload and generate the protocol to call the command. */
5330 for (j = 0; j < num_keys; j++) {
5331 long long ttl = 0;
5332 long long expireat = getExpire(c->db,kv[j]);
5333
5334 if (expireat != -1) {
5335 ttl = expireat-mstime();
5336 if (ttl < 0) {
5337 continue;
5338 }
5339 if (ttl < 1) ttl = 1;
5340 }
5341
5342 /* Relocate valid (non expired) keys into the array in successive
5343 * positions to remove holes created by the keys that were present
5344 * in the first lookup but are now expired after the second lookup. */
5345 kv[non_expired++] = kv[j];
5346
5347 serverAssertWithInfo(c,NULL,
5348 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5349
5350 if (server.cluster_enabled)
5351 serverAssertWithInfo(c,NULL,
5352 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5353 else
5354 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5355 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5356 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5357 sdslen(kv[j]->ptr)));
5358 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5359
5360 /* Emit the payload argument, that is the serialized object using
5361 * the DUMP format. */
5362 createDumpPayload(&payload,ov[j],kv[j]);
5363 serverAssertWithInfo(c,NULL,
5364 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5365 sdslen(payload.io.buffer.ptr)));
5366 sdsfree(payload.io.buffer.ptr);
5367
5368 /* Add the REPLACE option to the RESTORE command if it was specified
5369 * as a MIGRATE option. */
5370 if (replace)
5371 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5372 }
5373
5374 /* Fix the actual number of keys we are migrating. */
5375 num_keys = non_expired;
5376
5377 /* Transfer the query to the other node in 64K chunks. */
5378 errno = 0;
5379 {
5380 sds buf = cmd.io.buffer.ptr;
5381 size_t pos = 0, towrite;
5382 int nwritten = 0;
5383
5384 while ((towrite = sdslen(buf)-pos) > 0) {
5385 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5386 nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
5387 if (nwritten != (signed)towrite) {
5388 write_error = 1;
5389 goto socket_err;
5390 }
5391 pos += nwritten;
5392 }
5393 }
5394
5395 char buf0[1024]; /* Auth reply. */
5396 char buf1[1024]; /* Select reply. */
5397 char buf2[1024]; /* Restore reply. */
5398
5399 /* Read the AUTH reply if needed. */
5400 if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
5401 goto socket_err;
5402
5403 /* Read the SELECT reply if needed. */
5404 if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
5405 goto socket_err;
5406
5407 /* Read the RESTORE replies. */
5408 int error_from_target = 0;
5409 int socket_error = 0;
5410 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5411
5412 /* Allocate the new argument vector that will replace the current command,
5413 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5414 * We allocate num_keys+1 because the additional argument is for "DEL"
5415 * command name itself. */
5416 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5417
5418 for (j = 0; j < num_keys; j++) {
5419 if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
5420 socket_error = 1;
5421 break;
5422 }
5423 if ((password && buf0[0] == '-') ||
5424 (select && buf1[0] == '-') ||
5425 buf2[0] == '-')
5426 {
5427 /* On error assume that last_dbid is no longer valid. */
5428 if (!error_from_target) {
5429 cs->last_dbid = -1;
5430 char *errbuf;
5431 if (password && buf0[0] == '-') errbuf = buf0;
5432 else if (select && buf1[0] == '-') errbuf = buf1;
5433 else errbuf = buf2;
5434
5435 error_from_target = 1;
5436 addReplyErrorFormat(c,"Target instance replied with error: %s",
5437 errbuf+1);
5438 }
5439 } else {
5440 if (!copy) {
5441 /* No COPY option: remove the local key, signal the change. */
5442 dbDelete(c->db,kv[j]);
5443 signalModifiedKey(c,c->db,kv[j]);
5444 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
5445 server.dirty++;
5446
5447 /* Populate the argument vector to replace the old one. */
5448 newargv[del_idx++] = kv[j];
5449 incrRefCount(kv[j]);
5450 }
5451 }
5452 }
5453
5454 /* On socket error, if we want to retry, do it now before rewriting the
5455 * command vector. We only retry if we are sure nothing was processed
5456 * and we failed to read the first reply (j == 0 test). */
5457 if (!error_from_target && socket_error && j == 0 && may_retry &&
5458 errno != ETIMEDOUT)
5459 {
5460 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5461 }
5462
5463 /* On socket errors, close the migration socket now that we still have
5464 * the original host/port in the ARGV. Later the original command may be
5465 * rewritten to DEL and will be too later. */
5466 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5467
5468 if (!copy) {
5469 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5470 * this only for the keys for which we received an acknowledgement
5471 * from the receiving Redis server, by using the del_idx index. */
5472 if (del_idx > 1) {
5473 newargv[0] = createStringObject("DEL",3);
5474 /* Note that the following call takes ownership of newargv. */
5475 replaceClientCommandVector(c,del_idx,newargv);
5476 argv_rewritten = 1;
5477 } else {
5478 /* No key transfer acknowledged, no need to rewrite as DEL. */
5479 zfree(newargv);
5480 }
5481 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5482 }
5483
5484 /* If we are here and a socket error happened, we don't want to retry.
5485 * Just signal the problem to the client, but only do it if we did not
5486 * already queue a different error reported by the destination server. */
5487 if (!error_from_target && socket_error) {
5488 may_retry = 0;
5489 goto socket_err;
5490 }
5491
5492 if (!error_from_target) {
5493 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5494 * avoid SELECT the next time if the target DB is the same. Reply +OK.
5495 *
5496 * Note: If we reached this point, even if socket_error is true
5497 * still the SELECT command succeeded (otherwise the code jumps to
5498 * socket_err label. */
5499 cs->last_dbid = dbid;
5500 addReply(c,shared.ok);
5501 } else {
5502 /* On error we already sent it in the for loop above, and set
5503 * the currently selected socket to -1 to force SELECT the next time. */
5504 }
5505
5506 sdsfree(cmd.io.buffer.ptr);
5507 zfree(ov); zfree(kv); zfree(newargv);
5508 return;
5509
5510 /* On socket errors we try to close the cached socket and try again.
5511 * It is very common for the cached socket to get closed, if just reopening
5512 * it works it's a shame to notify the error to the caller. */
5513 socket_err:
5514 /* Cleanup we want to perform in both the retry and no retry case.
5515 * Note: Closing the migrate socket will also force SELECT next time. */
5516 sdsfree(cmd.io.buffer.ptr);
5517
5518 /* If the command was rewritten as DEL and there was a socket error,
5519 * we already closed the socket earlier. While migrateCloseSocket()
5520 * is idempotent, the host/port arguments are now gone, so don't do it
5521 * again. */
5522 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5523 zfree(newargv);
5524 newargv = NULL; /* This will get reallocated on retry. */
5525
5526 /* Retry only if it's not a timeout and we never attempted a retry
5527 * (or the code jumping here did not set may_retry to zero). */
5528 if (errno != ETIMEDOUT && may_retry) {
5529 may_retry = 0;
5530 goto try_again;
5531 }
5532
5533 /* Cleanup we want to do if no retry is attempted. */
5534 zfree(ov); zfree(kv);
5535 addReplySds(c,
5536 sdscatprintf(sdsempty(),
5537 "-IOERR error or timeout %s to target instance\r\n",
5538 write_error ? "writing" : "reading"));
5539 return;
5540 }
5541
5542 /* -----------------------------------------------------------------------------
5543 * Cluster functions related to serving / redirecting clients
5544 * -------------------------------------------------------------------------- */
5545
5546 /* The ASKING command is required after a -ASK redirection.
5547 * The client should issue ASKING before to actually send the command to
5548 * the target instance. See the Redis Cluster specification for more
5549 * information. */
askingCommand(client * c)5550 void askingCommand(client *c) {
5551 if (server.cluster_enabled == 0) {
5552 addReplyError(c,"This instance has cluster support disabled");
5553 return;
5554 }
5555 c->flags |= CLIENT_ASKING;
5556 addReply(c,shared.ok);
5557 }
5558
5559 /* The READONLY command is used by clients to enter the read-only mode.
5560 * In this mode slaves will not redirect clients as long as clients access
5561 * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5562 void readonlyCommand(client *c) {
5563 if (server.cluster_enabled == 0) {
5564 addReplyError(c,"This instance has cluster support disabled");
5565 return;
5566 }
5567 c->flags |= CLIENT_READONLY;
5568 addReply(c,shared.ok);
5569 }
5570
5571 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5572 void readwriteCommand(client *c) {
5573 c->flags &= ~CLIENT_READONLY;
5574 addReply(c,shared.ok);
5575 }
5576
5577 /* Return the pointer to the cluster node that is able to serve the command.
5578 * For the function to succeed the command should only target either:
5579 *
5580 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5581 * 2) Multiple keys in the same hash slot, while the slot is stable (no
5582 * resharding in progress).
5583 *
5584 * On success the function returns the node that is able to serve the request.
5585 * If the node is not 'myself' a redirection must be performed. The kind of
5586 * redirection is specified setting the integer passed by reference
5587 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5588 * CLUSTER_REDIR_MOVED.
5589 *
5590 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5591 *
5592 * If the command fails NULL is returned, and the reason of the failure is
5593 * provided via 'error_code', which will be set to:
5594 *
5595 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5596 * don't belong to the same hash slot.
5597 *
5598 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5599 * belonging to the same slot, but the slot is not stable (in migration or
5600 * importing state, likely because a resharding is in progress).
5601 *
5602 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5603 * not bound to any node. In this case the cluster global state should be
5604 * already "down" but it is fragile to rely on the update of the global state,
5605 * so we also handle it here.
5606 *
5607 * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
5608 * down but the user attempts to execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5609 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5610 clusterNode *n = NULL;
5611 robj *firstkey = NULL;
5612 int multiple_keys = 0;
5613 multiState *ms, _ms;
5614 multiCmd mc;
5615 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5616
5617 /* Allow any key to be set if a module disabled cluster redirections. */
5618 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5619 return myself;
5620
5621 /* Set error code optimistically for the base case. */
5622 if (error_code) *error_code = CLUSTER_REDIR_NONE;
5623
5624 /* Modules can turn off Redis Cluster redirection: this is useful
5625 * when writing a module that implements a completely different
5626 * distributed system. */
5627
5628 /* We handle all the cases as if they were EXEC commands, so we have
5629 * a common code path for everything */
5630 if (cmd->proc == execCommand) {
5631 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5632 * error. */
5633 if (!(c->flags & CLIENT_MULTI)) return myself;
5634 ms = &c->mstate;
5635 } else {
5636 /* In order to have a single codepath create a fake Multi State
5637 * structure if the client is not in MULTI/EXEC state, this way
5638 * we have a single codepath below. */
5639 ms = &_ms;
5640 _ms.commands = &mc;
5641 _ms.count = 1;
5642 mc.argv = argv;
5643 mc.argc = argc;
5644 mc.cmd = cmd;
5645 }
5646
5647 /* Check that all the keys are in the same hash slot, and obtain this
5648 * slot and the node associated. */
5649 for (i = 0; i < ms->count; i++) {
5650 struct redisCommand *mcmd;
5651 robj **margv;
5652 int margc, *keyindex, numkeys, j;
5653
5654 mcmd = ms->commands[i].cmd;
5655 margc = ms->commands[i].argc;
5656 margv = ms->commands[i].argv;
5657
5658 getKeysResult result = GETKEYS_RESULT_INIT;
5659 numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
5660 keyindex = result.keys;
5661
5662 for (j = 0; j < numkeys; j++) {
5663 robj *thiskey = margv[keyindex[j]];
5664 int thisslot = keyHashSlot((char*)thiskey->ptr,
5665 sdslen(thiskey->ptr));
5666
5667 if (firstkey == NULL) {
5668 /* This is the first key we see. Check what is the slot
5669 * and node. */
5670 firstkey = thiskey;
5671 slot = thisslot;
5672 n = server.cluster->slots[slot];
5673
5674 /* Error: If a slot is not served, we are in "cluster down"
5675 * state. However the state is yet to be updated, so this was
5676 * not trapped earlier in processCommand(). Report the same
5677 * error to the client. */
5678 if (n == NULL) {
5679 getKeysFreeResult(&result);
5680 if (error_code)
5681 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5682 return NULL;
5683 }
5684
5685 /* If we are migrating or importing this slot, we need to check
5686 * if we have all the keys in the request (the only way we
5687 * can safely serve the request, otherwise we return a TRYAGAIN
5688 * error). To do so we set the importing/migrating state and
5689 * increment a counter for every missing key. */
5690 if (n == myself &&
5691 server.cluster->migrating_slots_to[slot] != NULL)
5692 {
5693 migrating_slot = 1;
5694 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5695 importing_slot = 1;
5696 }
5697 } else {
5698 /* If it is not the first key, make sure it is exactly
5699 * the same key as the first we saw. */
5700 if (!equalStringObjects(firstkey,thiskey)) {
5701 if (slot != thisslot) {
5702 /* Error: multiple keys from different slots. */
5703 getKeysFreeResult(&result);
5704 if (error_code)
5705 *error_code = CLUSTER_REDIR_CROSS_SLOT;
5706 return NULL;
5707 } else {
5708 /* Flag this request as one with multiple different
5709 * keys. */
5710 multiple_keys = 1;
5711 }
5712 }
5713 }
5714
5715 /* Migrating / Importing slot? Count keys we don't have. */
5716 if ((migrating_slot || importing_slot) &&
5717 lookupKeyRead(&server.db[0],thiskey) == NULL)
5718 {
5719 missing_keys++;
5720 }
5721 }
5722 getKeysFreeResult(&result);
5723 }
5724
5725 /* No key at all in command? then we can serve the request
5726 * without redirections or errors in all the cases. */
5727 if (n == NULL) return myself;
5728
5729 /* Cluster is globally down but we got keys? We only serve the request
5730 * if it is a read command and when allow_reads_when_down is enabled. */
5731 if (server.cluster->state != CLUSTER_OK) {
5732 if (!server.cluster_allow_reads_when_down) {
5733 /* The cluster is configured to block commands when the
5734 * cluster is down. */
5735 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5736 return NULL;
5737 } else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
5738 && !(cmd->proc == evalShaCommand))
5739 {
5740 /* The cluster is configured to allow read only commands
5741 * but this command is neither readonly, nor EVAL or
5742 * EVALSHA. */
5743 if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
5744 return NULL;
5745 } else {
5746 /* Fall through and allow the command to be executed:
5747 * this happens when server.cluster_allow_reads_when_down is
5748 * true and the command is a readonly command or EVAL / EVALSHA. */
5749 }
5750 }
5751
5752 /* Return the hashslot by reference. */
5753 if (hashslot) *hashslot = slot;
5754
5755 /* MIGRATE always works in the context of the local node if the slot
5756 * is open (migrating or importing state). We need to be able to freely
5757 * move keys among instances in this case. */
5758 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5759 return myself;
5760
5761 /* If we don't have all the keys and we are migrating the slot, send
5762 * an ASK redirection. */
5763 if (migrating_slot && missing_keys) {
5764 if (error_code) *error_code = CLUSTER_REDIR_ASK;
5765 return server.cluster->migrating_slots_to[slot];
5766 }
5767
5768 /* If we are receiving the slot, and the client correctly flagged the
5769 * request as "ASKING", we can serve the request. However if the request
5770 * involves multiple keys and we don't have them all, the only option is
5771 * to send a TRYAGAIN error. */
5772 if (importing_slot &&
5773 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5774 {
5775 if (multiple_keys && missing_keys) {
5776 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5777 return NULL;
5778 } else {
5779 return myself;
5780 }
5781 }
5782
5783 /* Handle the read-only client case reading from a slave: if this
5784 * node is a slave and the request is about a hash slot our master
5785 * is serving, we can reply without redirection. */
5786 int is_readonly_command = (c->cmd->flags & CMD_READONLY) ||
5787 (c->cmd->proc == execCommand && !(c->mstate.cmd_inv_flags & CMD_READONLY));
5788 if (c->flags & CLIENT_READONLY &&
5789 (is_readonly_command || cmd->proc == evalCommand ||
5790 cmd->proc == evalShaCommand) &&
5791 nodeIsSlave(myself) &&
5792 myself->slaveof == n)
5793 {
5794 return myself;
5795 }
5796
5797 /* Base case: just return the right node. However if this node is not
5798 * myself, set error_code to MOVED since we need to issue a redirection. */
5799 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5800 return n;
5801 }
5802
5803 /* Send the client the right redirection code, according to error_code
5804 * that should be set to one of CLUSTER_REDIR_* macros.
5805 *
5806 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5807 * are used, then the node 'n' should not be NULL, but should be the
5808 * node we want to mention in the redirection. Moreover hashslot should
5809 * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5810 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5811 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5812 addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5813 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5814 /* The request spawns multiple keys in the same slot,
5815 * but the slot is not "stable" currently as there is
5816 * a migration or import in progress. */
5817 addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5818 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5819 addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5820 } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
5821 addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down and only accepts read commands\r\n"));
5822 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5823 addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5824 } else if (error_code == CLUSTER_REDIR_MOVED ||
5825 error_code == CLUSTER_REDIR_ASK)
5826 {
5827 addReplySds(c,sdscatprintf(sdsempty(),
5828 "-%s %d %s:%d\r\n",
5829 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5830 hashslot,n->ip,n->port));
5831 } else {
5832 serverPanic("getNodeByQuery() unknown error.");
5833 }
5834 }
5835
5836 /* This function is called by the function processing clients incrementally
5837 * to detect timeouts, in order to handle the following case:
5838 *
5839 * 1) A client blocks with BLPOP or similar blocking operation.
5840 * 2) The master migrates the hash slot elsewhere or turns into a slave.
5841 * 3) The client may remain blocked forever (or up to the max timeout time)
5842 * waiting for a key change that will never happen.
5843 *
5844 * If the client is found to be blocked into a hash slot this node no
5845 * longer handles, the client is sent a redirection error, and the function
5846 * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5847 int clusterRedirectBlockedClientIfNeeded(client *c) {
5848 if (c->flags & CLIENT_BLOCKED &&
5849 (c->btype == BLOCKED_LIST ||
5850 c->btype == BLOCKED_ZSET ||
5851 c->btype == BLOCKED_STREAM))
5852 {
5853 dictEntry *de;
5854 dictIterator *di;
5855
5856 /* If the cluster is down, unblock the client with the right error.
5857 * If the cluster is configured to allow reads on cluster down, we
5858 * still want to emit this error since a write will be required
5859 * to unblock them which may never come. */
5860 if (server.cluster->state == CLUSTER_FAIL) {
5861 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5862 return 1;
5863 }
5864
5865 /* All keys must belong to the same slot, so check first key only. */
5866 di = dictGetIterator(c->bpop.keys);
5867 if ((de = dictNext(di)) != NULL) {
5868 robj *key = dictGetKey(de);
5869 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5870 clusterNode *node = server.cluster->slots[slot];
5871
5872 /* if the client is read-only and attempting to access key that our
5873 * replica can handle, allow it. */
5874 if ((c->flags & CLIENT_READONLY) &&
5875 (c->lastcmd->flags & CMD_READONLY) &&
5876 nodeIsSlave(myself) && myself->slaveof == node)
5877 {
5878 node = myself;
5879 }
5880
5881 /* We send an error and unblock the client if:
5882 * 1) The slot is unassigned, emitting a cluster down error.
5883 * 2) The slot is not handled by this node, nor being imported. */
5884 if (node != myself &&
5885 server.cluster->importing_slots_from[slot] == NULL)
5886 {
5887 if (node == NULL) {
5888 clusterRedirectClient(c,NULL,0,
5889 CLUSTER_REDIR_DOWN_UNBOUND);
5890 } else {
5891 clusterRedirectClient(c,node,slot,
5892 CLUSTER_REDIR_MOVED);
5893 }
5894 dictReleaseIterator(di);
5895 return 1;
5896 }
5897 }
5898 dictReleaseIterator(di);
5899 }
5900 return 0;
5901 }
5902