1#!/usr/bin/env ruby 2 3# Copyright (c) 2006-2015, Salvatore Sanfilippo 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 7# 8# * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following 10# disclaimer in the documentation and/or other materials provided with the distribution. 11# * Neither the name of Redis nor the names of its contributors may be used to endorse or promote products derived 12# from this software without specific prior written permission. 13# 14# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS 15# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 16# AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 17# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 19# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 20# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 21# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 22 23# TODO (temporary here, we'll move this into the Github issues once 24# redis-trib initial implementation is completed). 25# 26# - Make sure that if the rehashing fails in the middle redis-trib will try 27# to recover. 28# - When redis-trib performs a cluster check, if it detects a slot move in 29# progress it should prompt the user to continue the move from where it 30# stopped. 31# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop 32# while rehashing, and performing the best cleanup possible if the user 33# forces the quit. 34# - When doing "fix" set a global Fix to true, and prompt the user to 35# fix the problem if automatically fixable every time there is something 36# to fix. For instance: 37# 1) If there is a node that pretend to receive a slot, or to migrate a 38# slot, but has no entries in that slot, fix it. 39# 2) If there is a node having keys in slots that are not owned by it 40# fix this condition moving the entries in the same node. 41# 3) Perform more possibly slow tests about the state of the cluster. 42# 4) When aborted slot migration is detected, fix it. 43 44require 'rubygems' 45require 'redis' 46 47ClusterHashSlots = 16384 48MigrateDefaultTimeout = 60000 49MigrateDefaultPipeline = 10 50RebalanceDefaultThreshold = 2 51 52$verbose = false 53 54def xputs(s) 55 case s[0..2] 56 when ">>>" 57 color="29;1" 58 when "[ER" 59 color="31;1" 60 when "[WA" 61 color="31;1" 62 when "[OK" 63 color="32" 64 when "[FA","***" 65 color="33" 66 else 67 color=nil 68 end 69 70 color = nil if ENV['TERM'] != "xterm" 71 print "\033[#{color}m" if color 72 print s 73 print "\033[0m" if color 74 print "\n" 75end 76 77class ClusterNode 78 def initialize(addr) 79 s = addr.split("@")[0].split(":") 80 if s.length < 2 81 puts "Invalid IP or Port (given as #{addr}) - use IP:Port format" 82 exit 1 83 end 84 port = s.pop # removes port from split array 85 ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address 86 @r = nil 87 @info = {} 88 @info[:host] = ip 89 @info[:port] = port 90 @info[:slots] = {} 91 @info[:migrating] = {} 92 @info[:importing] = {} 93 @info[:replicate] = false 94 @dirty = false # True if we need to flush slots info into node. 95 @friends = [] 96 end 97 98 def friends 99 @friends 100 end 101 102 def slots 103 @info[:slots] 104 end 105 106 def has_flag?(flag) 107 @info[:flags].index(flag) 108 end 109 110 def to_s 111 "#{@info[:host]}:#{@info[:port]}" 112 end 113 114 def connect(o={}) 115 return if @r 116 print "Connecting to node #{self}: " if $verbose 117 STDOUT.flush 118 begin 119 @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60) 120 @r.ping 121 rescue 122 xputs "[ERR] Sorry, can't connect to node #{self}" 123 exit 1 if o[:abort] 124 @r = nil 125 end 126 xputs "OK" if $verbose 127 end 128 129 def assert_cluster 130 info = @r.info 131 if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 132 xputs "[ERR] Node #{self} is not configured as a cluster node." 133 exit 1 134 end 135 end 136 137 def assert_empty 138 if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || 139 (@r.info['db0']) 140 xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0." 141 exit 1 142 end 143 end 144 145 def load_info(o={}) 146 self.connect 147 nodes = @r.cluster("nodes").split("\n") 148 nodes.each{|n| 149 # name addr flags role ping_sent ping_recv link_status slots 150 split = n.split 151 name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6] 152 slots = split[8..-1] 153 info = { 154 :name => name, 155 :addr => addr, 156 :flags => flags.split(","), 157 :replicate => master_id, 158 :ping_sent => ping_sent.to_i, 159 :ping_recv => ping_recv.to_i, 160 :link_status => link_status 161 } 162 info[:replicate] = false if master_id == "-" 163 164 if info[:flags].index("myself") 165 @info = @info.merge(info) 166 @info[:slots] = {} 167 slots.each{|s| 168 if s[0..0] == '[' 169 if s.index("->-") # Migrating 170 slot,dst = s[1..-1].split("->-") 171 @info[:migrating][slot.to_i] = dst 172 elsif s.index("-<-") # Importing 173 slot,src = s[1..-1].split("-<-") 174 @info[:importing][slot.to_i] = src 175 end 176 elsif s.index("-") 177 start,stop = s.split("-") 178 self.add_slots((start.to_i)..(stop.to_i)) 179 else 180 self.add_slots((s.to_i)..(s.to_i)) 181 end 182 } if slots 183 @dirty = false 184 @r.cluster("info").split("\n").each{|e| 185 k,v=e.split(":") 186 k = k.to_sym 187 v.chop! 188 if k != :cluster_state 189 @info[k] = v.to_i 190 else 191 @info[k] = v 192 end 193 } 194 elsif o[:getfriends] 195 @friends << info 196 end 197 } 198 end 199 200 def add_slots(slots) 201 slots.each{|s| 202 @info[:slots][s] = :new 203 } 204 @dirty = true 205 end 206 207 def set_as_replica(node_id) 208 @info[:replicate] = node_id 209 @dirty = true 210 end 211 212 def flush_node_config 213 return if !@dirty 214 if @info[:replicate] 215 begin 216 @r.cluster("replicate",@info[:replicate]) 217 rescue 218 # If the cluster did not already joined it is possible that 219 # the slave does not know the master node yet. So on errors 220 # we return ASAP leaving the dirty flag set, to flush the 221 # config later. 222 return 223 end 224 else 225 new = [] 226 @info[:slots].each{|s,val| 227 if val == :new 228 new << s 229 @info[:slots][s] = true 230 end 231 } 232 @r.cluster("addslots",*new) 233 end 234 @dirty = false 235 end 236 237 def info_string 238 # We want to display the hash slots assigned to this node 239 # as ranges, like in: "1-5,8-9,20-25,30" 240 # 241 # Note: this could be easily written without side effects, 242 # we use 'slots' just to split the computation into steps. 243 244 # First step: we want an increasing array of integers 245 # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] 246 slots = @info[:slots].keys.sort 247 248 # As we want to aggregate adjacent slots we convert all the 249 # slot integers into ranges (with just one element) 250 # So we have something like [1..1,2..2, ... and so forth. 251 slots.map!{|x| x..x} 252 253 # Finally we group ranges with adjacent elements. 254 slots = slots.reduce([]) {|a,b| 255 if !a.empty? && b.first == (a[-1].last)+1 256 a[0..-2] + [(a[-1].first)..(b.last)] 257 else 258 a + [b] 259 end 260 } 261 262 # Now our task is easy, we just convert ranges with just one 263 # element into a number, and a real range into a start-end format. 264 # Finally we join the array using the comma as separator. 265 slots = slots.map{|x| 266 x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" 267 }.join(",") 268 269 role = self.has_flag?("master") ? "M" : "S" 270 271 if self.info[:replicate] and @dirty 272 is = "S: #{self.info[:name]} #{self.to_s}" 273 else 274 is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+ 275 " slots:#{slots} (#{self.slots.length} slots) "+ 276 "#{(self.info[:flags]-["myself"]).join(",")}" 277 end 278 if self.info[:replicate] 279 is += "\n replicates #{info[:replicate]}" 280 elsif self.has_flag?("master") && self.info[:replicas] 281 is += "\n #{info[:replicas].length} additional replica(s)" 282 end 283 is 284 end 285 286 # Return a single string representing nodes and associated slots. 287 # TODO: remove slaves from config when slaves will be handled 288 # by Redis Cluster. 289 def get_config_signature 290 config = [] 291 @r.cluster("nodes").each_line{|l| 292 s = l.split 293 slots = s[8..-1].select {|x| x[0..0] != "["} 294 next if slots.length == 0 295 config << s[0]+":"+(slots.sort.join(",")) 296 } 297 config.sort.join("|") 298 end 299 300 def info 301 @info 302 end 303 304 def is_dirty? 305 @dirty 306 end 307 308 def r 309 @r 310 end 311end 312 313class RedisTrib 314 def initialize 315 @nodes = [] 316 @fix = false 317 @errors = [] 318 @timeout = MigrateDefaultTimeout 319 end 320 321 def check_arity(req_args, num_args) 322 if ((req_args > 0 and num_args != req_args) || 323 (req_args < 0 and num_args < req_args.abs)) 324 xputs "[ERR] Wrong number of arguments for specified sub command" 325 exit 1 326 end 327 end 328 329 def add_node(node) 330 @nodes << node 331 end 332 333 def reset_nodes 334 @nodes = [] 335 end 336 337 def cluster_error(msg) 338 @errors << msg 339 xputs msg 340 end 341 342 # Return the node with the specified ID or Nil. 343 def get_node_by_name(name) 344 @nodes.each{|n| 345 return n if n.info[:name] == name.downcase 346 } 347 return nil 348 end 349 350 # Like get_node_by_name but the specified name can be just the first 351 # part of the node ID as long as the prefix in unique across the 352 # cluster. 353 def get_node_by_abbreviated_name(name) 354 l = name.length 355 candidates = [] 356 @nodes.each{|n| 357 if n.info[:name][0...l] == name.downcase 358 candidates << n 359 end 360 } 361 return nil if candidates.length != 1 362 candidates[0] 363 end 364 365 # This function returns the master that has the least number of replicas 366 # in the cluster. If there are multiple masters with the same smaller 367 # number of replicas, one at random is returned. 368 def get_master_with_least_replicas 369 masters = @nodes.select{|n| n.has_flag? "master"} 370 sorted = masters.sort{|a,b| 371 a.info[:replicas].length <=> b.info[:replicas].length 372 } 373 sorted[0] 374 end 375 376 def check_cluster(opt={}) 377 xputs ">>> Performing Cluster Check (using node #{@nodes[0]})" 378 show_nodes if !opt[:quiet] 379 check_config_consistency 380 check_open_slots 381 check_slots_coverage 382 end 383 384 def show_cluster_info 385 masters = 0 386 keys = 0 387 @nodes.each{|n| 388 if n.has_flag?("master") 389 puts "#{n} (#{n.info[:name][0...8]}...) -> #{n.r.dbsize} keys | #{n.slots.length} slots | "+ 390 "#{n.info[:replicas].length} slaves." 391 masters += 1 392 keys += n.r.dbsize 393 end 394 } 395 xputs "[OK] #{keys} keys in #{masters} masters." 396 keys_per_slot = sprintf("%.2f",keys/16384.0) 397 puts "#{keys_per_slot} keys per slot on average." 398 end 399 400 # Merge slots of every known node. If the resulting slots are equal 401 # to ClusterHashSlots, then all slots are served. 402 def covered_slots 403 slots = {} 404 @nodes.each{|n| 405 slots = slots.merge(n.slots) 406 } 407 slots 408 end 409 410 def check_slots_coverage 411 xputs ">>> Check slots coverage..." 412 slots = covered_slots 413 if slots.length == ClusterHashSlots 414 xputs "[OK] All #{ClusterHashSlots} slots covered." 415 else 416 cluster_error \ 417 "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes." 418 fix_slots_coverage if @fix 419 end 420 end 421 422 def check_open_slots 423 xputs ">>> Check for open slots..." 424 open_slots = [] 425 @nodes.each{|n| 426 if n.info[:migrating].size > 0 427 cluster_error \ 428 "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})." 429 open_slots += n.info[:migrating].keys 430 elsif n.info[:importing].size > 0 431 cluster_error \ 432 "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})." 433 open_slots += n.info[:importing].keys 434 end 435 } 436 open_slots.uniq! 437 if open_slots.length > 0 438 xputs "[WARNING] The following slots are open: #{open_slots.join(",")}" 439 end 440 if @fix 441 open_slots.each{|slot| fix_open_slot slot} 442 end 443 end 444 445 def nodes_with_keys_in_slot(slot) 446 nodes = [] 447 @nodes.each{|n| 448 next if n.has_flag?("slave") 449 nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0 450 } 451 nodes 452 end 453 454 def fix_slots_coverage 455 not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys 456 xputs ">>> Fixing slots coverage..." 457 xputs "List of not covered slots: " + not_covered.join(",") 458 459 # For every slot, take action depending on the actual condition: 460 # 1) No node has keys for this slot. 461 # 2) A single node has keys for this slot. 462 # 3) Multiple nodes have keys for this slot. 463 slots = {} 464 not_covered.each{|slot| 465 nodes = nodes_with_keys_in_slot(slot) 466 slots[slot] = nodes 467 xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join(", ")}" 468 } 469 470 none = slots.select {|k,v| v.length == 0} 471 single = slots.select {|k,v| v.length == 1} 472 multi = slots.select {|k,v| v.length > 1} 473 474 # Handle case "1": keys in no node. 475 if none.length > 0 476 xputs "The folowing uncovered slots have no keys across the cluster:" 477 xputs none.keys.join(",") 478 yes_or_die "Fix these slots by covering with a random node?" 479 none.each{|slot,nodes| 480 node = @nodes.sample 481 xputs ">>> Covering slot #{slot} with #{node}" 482 node.r.cluster("addslots",slot) 483 } 484 end 485 486 # Handle case "2": keys only in one node. 487 if single.length > 0 488 xputs "The folowing uncovered slots have keys in just one node:" 489 puts single.keys.join(",") 490 yes_or_die "Fix these slots by covering with those nodes?" 491 single.each{|slot,nodes| 492 xputs ">>> Covering slot #{slot} with #{nodes[0]}" 493 nodes[0].r.cluster("addslots",slot) 494 } 495 end 496 497 # Handle case "3": keys in multiple nodes. 498 if multi.length > 0 499 xputs "The folowing uncovered slots have keys in multiple nodes:" 500 xputs multi.keys.join(",") 501 yes_or_die "Fix these slots by moving keys into a single node?" 502 multi.each{|slot,nodes| 503 target = get_node_with_most_keys_in_slot(nodes,slot) 504 xputs ">>> Covering slot #{slot} moving keys to #{target}" 505 506 target.r.cluster('addslots',slot) 507 target.r.cluster('setslot',slot,'stable') 508 nodes.each{|src| 509 next if src == target 510 # Set the source node in 'importing' state (even if we will 511 # actually migrate keys away) in order to avoid receiving 512 # redirections for MIGRATE. 513 src.r.cluster('setslot',slot,'importing',target.info[:name]) 514 move_slot(src,target,slot,:dots=>true,:fix=>true,:cold=>true) 515 src.r.cluster('setslot',slot,'stable') 516 } 517 } 518 end 519 end 520 521 # Return the owner of the specified slot 522 def get_slot_owners(slot) 523 owners = [] 524 @nodes.each{|n| 525 next if n.has_flag?("slave") 526 n.slots.each{|s,_| 527 owners << n if s == slot 528 } 529 } 530 owners 531 end 532 533 # Return the node, among 'nodes' with the greatest number of keys 534 # in the specified slot. 535 def get_node_with_most_keys_in_slot(nodes,slot) 536 best = nil 537 best_numkeys = 0 538 @nodes.each{|n| 539 next if n.has_flag?("slave") 540 numkeys = n.r.cluster("countkeysinslot",slot) 541 if numkeys > best_numkeys || best == nil 542 best = n 543 best_numkeys = numkeys 544 end 545 } 546 return best 547 end 548 549 # Slot 'slot' was found to be in importing or migrating state in one or 550 # more nodes. This function fixes this condition by migrating keys where 551 # it seems more sensible. 552 def fix_open_slot(slot) 553 puts ">>> Fixing open slot #{slot}" 554 555 # Try to obtain the current slot owner, according to the current 556 # nodes configuration. 557 owners = get_slot_owners(slot) 558 owner = owners[0] if owners.length == 1 559 560 migrating = [] 561 importing = [] 562 @nodes.each{|n| 563 next if n.has_flag? "slave" 564 if n.info[:migrating][slot] 565 migrating << n 566 elsif n.info[:importing][slot] 567 importing << n 568 elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner 569 xputs "*** Found keys about slot #{slot} in node #{n}!" 570 importing << n 571 end 572 } 573 puts "Set as migrating in: #{migrating.join(",")}" 574 puts "Set as importing in: #{importing.join(",")}" 575 576 # If there is no slot owner, set as owner the slot with the biggest 577 # number of keys, among the set of migrating / importing nodes. 578 if !owner 579 xputs ">>> Nobody claims ownership, selecting an owner..." 580 owner = get_node_with_most_keys_in_slot(@nodes,slot) 581 582 # If we still don't have an owner, we can't fix it. 583 if !owner 584 xputs "[ERR] Can't select a slot owner. Impossible to fix." 585 exit 1 586 end 587 588 # Use ADDSLOTS to assign the slot. 589 puts "*** Configuring #{owner} as the slot owner" 590 n.r.cluster("setslot",slot,"stable") 591 n.r.cluster("addslot",slot) 592 # Make sure this information will propagate. Not strictly needed 593 # since there is no past owner, so all the other nodes will accept 594 # whatever epoch this node will claim the slot with. 595 n.r.cluster("bumpepoch") 596 597 # Remove the owner from the list of migrating/importing 598 # nodes. 599 migrating.delete(n) 600 importing.delete(n) 601 end 602 603 # If there are multiple owners of the slot, we need to fix it 604 # so that a single node is the owner and all the other nodes 605 # are in importing state. Later the fix can be handled by one 606 # of the base cases above. 607 # 608 # Note that this case also covers multiple nodes having the slot 609 # in migrating state, since migrating is a valid state only for 610 # slot owners. 611 if owners.length > 1 612 owner = get_node_with_most_keys_in_slot(owners,slot) 613 owners.each{|n| 614 next if n == owner 615 n.r.cluster('delslots',slot) 616 n.r.cluster('setslot',slot,'importing',owner.info[:name]) 617 importing.delete(n) # Avoid duplciates 618 importing << n 619 } 620 owner.r.cluster('bumpepoch') 621 end 622 623 # Case 1: The slot is in migrating state in one slot, and in 624 # importing state in 1 slot. That's trivial to address. 625 if migrating.length == 1 && importing.length == 1 626 move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true) 627 # Case 2: There are multiple nodes that claim the slot as importing, 628 # they probably got keys about the slot after a restart so opened 629 # the slot. In this case we just move all the keys to the owner 630 # according to the configuration. 631 elsif migrating.length == 0 && importing.length > 0 632 xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}" 633 importing.each {|node| 634 next if node == owner 635 move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true) 636 xputs ">>> Setting #{slot} as STABLE in #{node}" 637 node.r.cluster("setslot",slot,"stable") 638 } 639 # Case 3: There are no slots claiming to be in importing state, but 640 # there is a migrating node that actually don't have any key. We 641 # can just close the slot, probably a reshard interrupted in the middle. 642 elsif importing.length == 0 && migrating.length == 1 && 643 migrating[0].r.cluster("getkeysinslot",slot,10).length == 0 644 migrating[0].r.cluster("setslot",slot,"stable") 645 else 646 xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}" 647 end 648 end 649 650 # Check if all the nodes agree about the cluster configuration 651 def check_config_consistency 652 if !is_config_consistent? 653 cluster_error "[ERR] Nodes don't agree about configuration!" 654 else 655 xputs "[OK] All nodes agree about slots configuration." 656 end 657 end 658 659 def is_config_consistent? 660 signatures=[] 661 @nodes.each{|n| 662 signatures << n.get_config_signature 663 } 664 return signatures.uniq.length == 1 665 end 666 667 def wait_cluster_join 668 print "Waiting for the cluster to join" 669 while !is_config_consistent? 670 print "." 671 STDOUT.flush 672 sleep 1 673 end 674 print "\n" 675 end 676 677 def alloc_slots 678 nodes_count = @nodes.length 679 masters_count = @nodes.length / (@replicas+1) 680 masters = [] 681 682 # The first step is to split instances by IP. This is useful as 683 # we'll try to allocate master nodes in different physical machines 684 # (as much as possible) and to allocate slaves of a given master in 685 # different physical machines as well. 686 # 687 # This code assumes just that if the IP is different, than it is more 688 # likely that the instance is running in a different physical host 689 # or at least a different virtual machine. 690 ips = {} 691 @nodes.each{|n| 692 ips[n.info[:host]] = [] if !ips[n.info[:host]] 693 ips[n.info[:host]] << n 694 } 695 696 # Select master instances 697 puts "Using #{masters_count} masters:" 698 interleaved = [] 699 stop = false 700 while not stop do 701 # Take one node from each IP until we run out of nodes 702 # across every IP. 703 ips.each do |ip,nodes| 704 if nodes.empty? 705 # if this IP has no remaining nodes, check for termination 706 if interleaved.length == nodes_count 707 # stop when 'interleaved' has accumulated all nodes 708 stop = true 709 next 710 end 711 else 712 # else, move one node from this IP to 'interleaved' 713 interleaved.push nodes.shift 714 end 715 end 716 end 717 718 masters = interleaved.slice!(0, masters_count) 719 nodes_count -= masters.length 720 721 masters.each{|m| puts m} 722 723 # Alloc slots on masters 724 slots_per_node = ClusterHashSlots.to_f / masters_count 725 first = 0 726 cursor = 0.0 727 masters.each_with_index{|n,masternum| 728 last = (cursor+slots_per_node-1).round 729 if last > ClusterHashSlots || masternum == masters.length-1 730 last = ClusterHashSlots-1 731 end 732 last = first if last < first # Min step is 1. 733 n.add_slots first..last 734 first = last+1 735 cursor += slots_per_node 736 } 737 738 # Select N replicas for every master. 739 # We try to split the replicas among all the IPs with spare nodes 740 # trying to avoid the host where the master is running, if possible. 741 # 742 # Note we loop two times. The first loop assigns the requested 743 # number of replicas to each master. The second loop assigns any 744 # remaining instances as extra replicas to masters. Some masters 745 # may end up with more than their requested number of replicas, but 746 # all nodes will be used. 747 assignment_verbose = false 748 749 [:requested,:unused].each do |assign| 750 masters.each do |m| 751 assigned_replicas = 0 752 while assigned_replicas < @replicas 753 break if nodes_count == 0 754 if assignment_verbose 755 if assign == :requested 756 puts "Requesting total of #{@replicas} replicas " \ 757 "(#{assigned_replicas} replicas assigned " \ 758 "so far with #{nodes_count} total remaining)." 759 elsif assign == :unused 760 puts "Assigning extra instance to replication " \ 761 "role too (#{nodes_count} remaining)." 762 end 763 end 764 765 # Return the first node not matching our current master 766 node = interleaved.find{|n| n.info[:host] != m.info[:host]} 767 768 # If we found a node, use it as a best-first match. 769 # Otherwise, we didn't find a node on a different IP, so we 770 # go ahead and use a same-IP replica. 771 if node 772 slave = node 773 interleaved.delete node 774 else 775 slave = interleaved.shift 776 end 777 slave.set_as_replica(m.info[:name]) 778 nodes_count -= 1 779 assigned_replicas += 1 780 puts "Adding replica #{slave} to #{m}" 781 782 # If we are in the "assign extra nodes" loop, 783 # we want to assign one extra replica to each 784 # master before repeating masters. 785 # This break lets us assign extra replicas to masters 786 # in a round-robin way. 787 break if assign == :unused 788 end 789 end 790 end 791 end 792 793 def flush_nodes_config 794 @nodes.each{|n| 795 n.flush_node_config 796 } 797 end 798 799 def show_nodes 800 @nodes.each{|n| 801 xputs n.info_string 802 } 803 end 804 805 # Redis Cluster config epoch collision resolution code is able to eventually 806 # set a different epoch to each node after a new cluster is created, but 807 # it is slow compared to assign a progressive config epoch to each node 808 # before joining the cluster. However we do just a best-effort try here 809 # since if we fail is not a problem. 810 def assign_config_epoch 811 config_epoch = 1 812 @nodes.each{|n| 813 begin 814 n.r.cluster("set-config-epoch",config_epoch) 815 rescue 816 end 817 config_epoch += 1 818 } 819 end 820 821 def join_cluster 822 # We use a brute force approach to make sure the node will meet 823 # each other, that is, sending CLUSTER MEET messages to all the nodes 824 # about the very same node. 825 # Thanks to gossip this information should propagate across all the 826 # cluster in a matter of seconds. 827 first = false 828 @nodes.each{|n| 829 if !first then first = n.info; next; end # Skip the first node 830 n.r.cluster("meet",first[:host],first[:port]) 831 } 832 end 833 834 def yes_or_die(msg) 835 print "#{msg} (type 'yes' to accept): " 836 STDOUT.flush 837 if !(STDIN.gets.chomp.downcase == "yes") 838 xputs "*** Aborting..." 839 exit 1 840 end 841 end 842 843 def load_cluster_info_from_node(nodeaddr) 844 node = ClusterNode.new(nodeaddr) 845 node.connect(:abort => true) 846 node.assert_cluster 847 node.load_info(:getfriends => true) 848 add_node(node) 849 node.friends.each{|f| 850 next if f[:flags].index("noaddr") || 851 f[:flags].index("disconnected") || 852 f[:flags].index("fail") 853 fnode = ClusterNode.new(f[:addr]) 854 fnode.connect() 855 next if !fnode.r 856 begin 857 fnode.load_info() 858 add_node(fnode) 859 rescue => e 860 xputs "[ERR] Unable to load info for node #{fnode}" 861 end 862 } 863 populate_nodes_replicas_info 864 end 865 866 # This function is called by load_cluster_info_from_node in order to 867 # add additional information to every node as a list of replicas. 868 def populate_nodes_replicas_info 869 # Start adding the new field to every node. 870 @nodes.each{|n| 871 n.info[:replicas] = [] 872 } 873 874 # Populate the replicas field using the replicate field of slave 875 # nodes. 876 @nodes.each{|n| 877 if n.info[:replicate] 878 master = get_node_by_name(n.info[:replicate]) 879 if !master 880 xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}." 881 else 882 master.info[:replicas] << n 883 end 884 end 885 } 886 end 887 888 # Given a list of source nodes return a "resharding plan" 889 # with what slots to move in order to move "numslots" slots to another 890 # instance. 891 def compute_reshard_table(sources,numslots) 892 moved = [] 893 # Sort from bigger to smaller instance, for two reasons: 894 # 1) If we take less slots than instances it is better to start 895 # getting from the biggest instances. 896 # 2) We take one slot more from the first instance in the case of not 897 # perfect divisibility. Like we have 3 nodes and need to get 10 898 # slots, we take 4 from the first, and 3 from the rest. So the 899 # biggest is always the first. 900 sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} 901 source_tot_slots = sources.inject(0) {|sum,source| 902 sum+source.slots.length 903 } 904 sources.each_with_index{|s,i| 905 # Every node will provide a number of slots proportional to the 906 # slots it has assigned. 907 n = (numslots.to_f/source_tot_slots*s.slots.length) 908 if i == 0 909 n = n.ceil 910 else 911 n = n.floor 912 end 913 s.slots.keys.sort[(0...n)].each{|slot| 914 if moved.length < numslots 915 moved << {:source => s, :slot => slot} 916 end 917 } 918 } 919 return moved 920 end 921 922 def show_reshard_table(table) 923 table.each{|e| 924 puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}" 925 } 926 end 927 928 # Move slots between source and target nodes using MIGRATE. 929 # 930 # Options: 931 # :verbose -- Print a dot for every moved key. 932 # :fix -- We are moving in the context of a fix. Use REPLACE. 933 # :cold -- Move keys without opening slots / reconfiguring the nodes. 934 # :update -- Update nodes.info[:slots] for source/target nodes. 935 # :quiet -- Don't print info messages. 936 def move_slot(source,target,slot,o={}) 937 o = {:pipeline => MigrateDefaultPipeline}.merge(o) 938 939 # We start marking the slot as importing in the destination node, 940 # and the slot as migrating in the target host. Note that the order of 941 # the operations is important, as otherwise a client may be redirected 942 # to the target node that does not yet know it is importing this slot. 943 if !o[:quiet] 944 print "Moving slot #{slot} from #{source} to #{target}: " 945 STDOUT.flush 946 end 947 948 if !o[:cold] 949 target.r.cluster("setslot",slot,"importing",source.info[:name]) 950 source.r.cluster("setslot",slot,"migrating",target.info[:name]) 951 end 952 # Migrate all the keys from source to target using the MIGRATE command 953 while true 954 keys = source.r.cluster("getkeysinslot",slot,o[:pipeline]) 955 break if keys.length == 0 956 begin 957 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys]) 958 rescue => e 959 if o[:fix] && e.to_s =~ /BUSYKEY/ 960 xputs "*** Target key exists. Replacing it for FIX." 961 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:replace,:keys,*keys]) 962 else 963 puts "" 964 xputs "[ERR] #{e}" 965 exit 1 966 end 967 end 968 print "."*keys.length if o[:dots] 969 STDOUT.flush 970 end 971 972 puts if !o[:quiet] 973 # Set the new node as the owner of the slot in all the known nodes. 974 if !o[:cold] 975 @nodes.each{|n| 976 next if n.has_flag?("slave") 977 n.r.cluster("setslot",slot,"node",target.info[:name]) 978 } 979 end 980 981 # Update the node logical config 982 if o[:update] then 983 source.info[:slots].delete(slot) 984 target.info[:slots][slot] = true 985 end 986 end 987 988 # redis-trib subcommands implementations. 989 990 def check_cluster_cmd(argv,opt) 991 load_cluster_info_from_node(argv[0]) 992 check_cluster 993 end 994 995 def info_cluster_cmd(argv,opt) 996 load_cluster_info_from_node(argv[0]) 997 show_cluster_info 998 end 999 1000 def rebalance_cluster_cmd(argv,opt) 1001 opt = { 1002 'pipeline' => MigrateDefaultPipeline, 1003 'threshold' => RebalanceDefaultThreshold 1004 }.merge(opt) 1005 1006 # Load nodes info before parsing options, otherwise we can't 1007 # handle --weight. 1008 load_cluster_info_from_node(argv[0]) 1009 1010 # Options parsing 1011 threshold = opt['threshold'].to_i 1012 autoweights = opt['auto-weights'] 1013 weights = {} 1014 opt['weight'].each{|w| 1015 fields = w.split("=") 1016 node = get_node_by_abbreviated_name(fields[0]) 1017 if !node || !node.has_flag?("master") 1018 puts "*** No such master node #{fields[0]}" 1019 exit 1 1020 end 1021 weights[node.info[:name]] = fields[1].to_f 1022 } if opt['weight'] 1023 useempty = opt['use-empty-masters'] 1024 1025 # Assign a weight to each node, and compute the total cluster weight. 1026 total_weight = 0 1027 nodes_involved = 0 1028 @nodes.each{|n| 1029 if n.has_flag?("master") 1030 next if !useempty && n.slots.length == 0 1031 n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1 1032 total_weight += n.info[:w] 1033 nodes_involved += 1 1034 end 1035 } 1036 1037 # Check cluster, only proceed if it looks sane. 1038 check_cluster(:quiet => true) 1039 if @errors.length != 0 1040 puts "*** Please fix your cluster problems before rebalancing" 1041 exit 1 1042 end 1043 1044 # Calculate the slots balance for each node. It's the number of 1045 # slots the node should lose (if positive) or gain (if negative) 1046 # in order to be balanced. 1047 threshold = opt['threshold'].to_f 1048 threshold_reached = false 1049 @nodes.each{|n| 1050 if n.has_flag?("master") 1051 next if !n.info[:w] 1052 expected = ((ClusterHashSlots.to_f / total_weight) * 1053 n.info[:w]).to_i 1054 n.info[:balance] = n.slots.length - expected 1055 # Compute the percentage of difference between the 1056 # expected number of slots and the real one, to see 1057 # if it's over the threshold specified by the user. 1058 over_threshold = false 1059 if threshold > 0 1060 if n.slots.length > 0 1061 err_perc = (100-(100.0*expected/n.slots.length)).abs 1062 over_threshold = true if err_perc > threshold 1063 elsif expected > 0 1064 over_threshold = true 1065 end 1066 end 1067 threshold_reached = true if over_threshold 1068 end 1069 } 1070 if !threshold_reached 1071 xputs "*** No rebalancing needed! All nodes are within the #{threshold}% threshold." 1072 return 1073 end 1074 1075 # Only consider nodes we want to change 1076 sn = @nodes.select{|n| 1077 n.has_flag?("master") && n.info[:w] 1078 } 1079 1080 # Because of rounding, it is possible that the balance of all nodes 1081 # summed does not give 0. Make sure that nodes that have to provide 1082 # slots are always matched by nodes receiving slots. 1083 total_balance = sn.map{|x| x.info[:balance]}.reduce{|a,b| a+b} 1084 while total_balance > 0 1085 sn.each{|n| 1086 if n.info[:balance] < 0 && total_balance > 0 1087 n.info[:balance] -= 1 1088 total_balance -= 1 1089 end 1090 } 1091 end 1092 1093 # Sort nodes by their slots balance. 1094 sn = sn.sort{|a,b| 1095 a.info[:balance] <=> b.info[:balance] 1096 } 1097 1098 xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}" 1099 1100 if $verbose 1101 sn.each{|n| 1102 puts "#{n} balance is #{n.info[:balance]} slots" 1103 } 1104 end 1105 1106 # Now we have at the start of the 'sn' array nodes that should get 1107 # slots, at the end nodes that must give slots. 1108 # We take two indexes, one at the start, and one at the end, 1109 # incrementing or decrementing the indexes accordingly til we 1110 # find nodes that need to get/provide slots. 1111 dst_idx = 0 1112 src_idx = sn.length - 1 1113 1114 while dst_idx < src_idx 1115 dst = sn[dst_idx] 1116 src = sn[src_idx] 1117 numslots = [dst.info[:balance],src.info[:balance]].map{|n| 1118 n.abs 1119 }.min 1120 1121 if numslots > 0 1122 puts "Moving #{numslots} slots from #{src} to #{dst}" 1123 1124 # Actaully move the slots. 1125 reshard_table = compute_reshard_table([src],numslots) 1126 if reshard_table.length != numslots 1127 xputs "*** Assertio failed: Reshard table != number of slots" 1128 exit 1 1129 end 1130 if opt['simulate'] 1131 print "#"*reshard_table.length 1132 else 1133 reshard_table.each{|e| 1134 move_slot(e[:source],dst,e[:slot], 1135 :quiet=>true, 1136 :dots=>false, 1137 :update=>true, 1138 :pipeline=>opt['pipeline']) 1139 print "#" 1140 STDOUT.flush 1141 } 1142 end 1143 puts 1144 end 1145 1146 # Update nodes balance. 1147 dst.info[:balance] += numslots 1148 src.info[:balance] -= numslots 1149 dst_idx += 1 if dst.info[:balance] == 0 1150 src_idx -= 1 if src.info[:balance] == 0 1151 end 1152 end 1153 1154 def fix_cluster_cmd(argv,opt) 1155 @fix = true 1156 @timeout = opt['timeout'].to_i if opt['timeout'] 1157 1158 load_cluster_info_from_node(argv[0]) 1159 check_cluster 1160 end 1161 1162 def reshard_cluster_cmd(argv,opt) 1163 opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt) 1164 1165 load_cluster_info_from_node(argv[0]) 1166 check_cluster 1167 if @errors.length != 0 1168 puts "*** Please fix your cluster problems before resharding" 1169 exit 1 1170 end 1171 1172 @timeout = opt['timeout'].to_i if opt['timeout'].to_i 1173 1174 # Get number of slots 1175 if opt['slots'] 1176 numslots = opt['slots'].to_i 1177 else 1178 numslots = 0 1179 while numslots <= 0 or numslots > ClusterHashSlots 1180 print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? " 1181 numslots = STDIN.gets.to_i 1182 end 1183 end 1184 1185 # Get the target instance 1186 if opt['to'] 1187 target = get_node_by_name(opt['to']) 1188 if !target || target.has_flag?("slave") 1189 xputs "*** The specified node is not known or not a master, please retry." 1190 exit 1 1191 end 1192 else 1193 target = nil 1194 while not target 1195 print "What is the receiving node ID? " 1196 target = get_node_by_name(STDIN.gets.chop) 1197 if !target || target.has_flag?("slave") 1198 xputs "*** The specified node is not known or not a master, please retry." 1199 target = nil 1200 end 1201 end 1202 end 1203 1204 # Get the source instances 1205 sources = [] 1206 if opt['from'] 1207 opt['from'].split(',').each{|node_id| 1208 if node_id == "all" 1209 sources = "all" 1210 break 1211 end 1212 src = get_node_by_name(node_id) 1213 if !src || src.has_flag?("slave") 1214 xputs "*** The specified node is not known or is not a master, please retry." 1215 exit 1 1216 end 1217 sources << src 1218 } 1219 else 1220 xputs "Please enter all the source node IDs." 1221 xputs " Type 'all' to use all the nodes as source nodes for the hash slots." 1222 xputs " Type 'done' once you entered all the source nodes IDs." 1223 while true 1224 print "Source node ##{sources.length+1}:" 1225 line = STDIN.gets.chop 1226 src = get_node_by_name(line) 1227 if line == "done" 1228 break 1229 elsif line == "all" 1230 sources = "all" 1231 break 1232 elsif !src || src.has_flag?("slave") 1233 xputs "*** The specified node is not known or is not a master, please retry." 1234 elsif src.info[:name] == target.info[:name] 1235 xputs "*** It is not possible to use the target node as source node." 1236 else 1237 sources << src 1238 end 1239 end 1240 end 1241 1242 if sources.length == 0 1243 puts "*** No source nodes given, operation aborted" 1244 exit 1 1245 end 1246 1247 # Handle soures == all. 1248 if sources == "all" 1249 sources = [] 1250 @nodes.each{|n| 1251 next if n.info[:name] == target.info[:name] 1252 next if n.has_flag?("slave") 1253 sources << n 1254 } 1255 end 1256 1257 # Check if the destination node is the same of any source nodes. 1258 if sources.index(target) 1259 xputs "*** Target node is also listed among the source nodes!" 1260 exit 1 1261 end 1262 1263 puts "\nReady to move #{numslots} slots." 1264 puts " Source nodes:" 1265 sources.each{|s| puts " "+s.info_string} 1266 puts " Destination node:" 1267 puts " #{target.info_string}" 1268 reshard_table = compute_reshard_table(sources,numslots) 1269 puts " Resharding plan:" 1270 show_reshard_table(reshard_table) 1271 if !opt['yes'] 1272 print "Do you want to proceed with the proposed reshard plan (yes/no)? " 1273 yesno = STDIN.gets.chop 1274 exit(1) if (yesno != "yes") 1275 end 1276 reshard_table.each{|e| 1277 move_slot(e[:source],target,e[:slot], 1278 :dots=>true, 1279 :pipeline=>opt['pipeline']) 1280 } 1281 end 1282 1283 # This is an helper function for create_cluster_cmd that verifies if 1284 # the number of nodes and the specified replicas have a valid configuration 1285 # where there are at least three master nodes and enough replicas per node. 1286 def check_create_parameters 1287 masters = @nodes.length/(@replicas+1) 1288 if masters < 3 1289 puts "*** ERROR: Invalid configuration for cluster creation." 1290 puts "*** Redis Cluster requires at least 3 master nodes." 1291 puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node." 1292 puts "*** At least #{3*(@replicas+1)} nodes are required." 1293 exit 1 1294 end 1295 end 1296 1297 def create_cluster_cmd(argv,opt) 1298 opt = {'replicas' => 0}.merge(opt) 1299 @replicas = opt['replicas'].to_i 1300 1301 xputs ">>> Creating cluster" 1302 argv[0..-1].each{|n| 1303 node = ClusterNode.new(n) 1304 node.connect(:abort => true) 1305 node.assert_cluster 1306 node.load_info 1307 node.assert_empty 1308 add_node(node) 1309 } 1310 check_create_parameters 1311 xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..." 1312 alloc_slots 1313 show_nodes 1314 yes_or_die "Can I set the above configuration?" 1315 flush_nodes_config 1316 xputs ">>> Nodes configuration updated" 1317 xputs ">>> Assign a different config epoch to each node" 1318 assign_config_epoch 1319 xputs ">>> Sending CLUSTER MEET messages to join the cluster" 1320 join_cluster 1321 # Give one second for the join to start, in order to avoid that 1322 # wait_cluster_join will find all the nodes agree about the config as 1323 # they are still empty with unassigned slots. 1324 sleep 1 1325 wait_cluster_join 1326 flush_nodes_config # Useful for the replicas 1327 check_cluster 1328 end 1329 1330 def addnode_cluster_cmd(argv,opt) 1331 xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}" 1332 1333 # Check the existing cluster 1334 load_cluster_info_from_node(argv[1]) 1335 check_cluster 1336 1337 # If --master-id was specified, try to resolve it now so that we 1338 # abort before starting with the node configuration. 1339 if opt['slave'] 1340 if opt['master-id'] 1341 master = get_node_by_name(opt['master-id']) 1342 if !master 1343 xputs "[ERR] No such master ID #{opt['master-id']}" 1344 end 1345 else 1346 master = get_master_with_least_replicas 1347 xputs "Automatically selected master #{master}" 1348 end 1349 end 1350 1351 # Add the new node 1352 new = ClusterNode.new(argv[0]) 1353 new.connect(:abort => true) 1354 new.assert_cluster 1355 new.load_info 1356 new.assert_empty 1357 first = @nodes.first.info 1358 add_node(new) 1359 1360 # Send CLUSTER MEET command to the new node 1361 xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster." 1362 new.r.cluster("meet",first[:host],first[:port]) 1363 1364 # Additional configuration is needed if the node is added as 1365 # a slave. 1366 if opt['slave'] 1367 wait_cluster_join 1368 xputs ">>> Configure node as replica of #{master}." 1369 new.r.cluster("replicate",master.info[:name]) 1370 end 1371 xputs "[OK] New node added correctly." 1372 end 1373 1374 def delnode_cluster_cmd(argv,opt) 1375 id = argv[1].downcase 1376 xputs ">>> Removing node #{id} from cluster #{argv[0]}" 1377 1378 # Load cluster information 1379 load_cluster_info_from_node(argv[0]) 1380 1381 # Check if the node exists and is not empty 1382 node = get_node_by_name(id) 1383 1384 if !node 1385 xputs "[ERR] No such node ID #{id}" 1386 exit 1 1387 end 1388 1389 if node.slots.length != 0 1390 xputs "[ERR] Node #{node} is not empty! Reshard data away and try again." 1391 exit 1 1392 end 1393 1394 # Send CLUSTER FORGET to all the nodes but the node to remove 1395 xputs ">>> Sending CLUSTER FORGET messages to the cluster..." 1396 @nodes.each{|n| 1397 next if n == node 1398 if n.info[:replicate] && n.info[:replicate].downcase == id 1399 # Reconfigure the slave to replicate with some other node 1400 master = get_master_with_least_replicas 1401 xputs ">>> #{n} as replica of #{master}" 1402 n.r.cluster("replicate",master.info[:name]) 1403 end 1404 n.r.cluster("forget",argv[1]) 1405 } 1406 1407 # Finally shutdown the node 1408 xputs ">>> SHUTDOWN the node." 1409 node.r.shutdown 1410 end 1411 1412 def set_timeout_cluster_cmd(argv,opt) 1413 timeout = argv[1].to_i 1414 if timeout < 100 1415 puts "Setting a node timeout of less than 100 milliseconds is a bad idea." 1416 exit 1 1417 end 1418 1419 # Load cluster information 1420 load_cluster_info_from_node(argv[0]) 1421 ok_count = 0 1422 err_count = 0 1423 1424 # Send CLUSTER FORGET to all the nodes but the node to remove 1425 xputs ">>> Reconfiguring node timeout in every cluster node..." 1426 @nodes.each{|n| 1427 begin 1428 n.r.config("set","cluster-node-timeout",timeout) 1429 n.r.config("rewrite") 1430 ok_count += 1 1431 xputs "*** New timeout set for #{n}" 1432 rescue => e 1433 puts "ERR setting node-timeot for #{n}: #{e}" 1434 err_count += 1 1435 end 1436 } 1437 xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR." 1438 end 1439 1440 def call_cluster_cmd(argv,opt) 1441 cmd = argv[1..-1] 1442 cmd[0] = cmd[0].upcase 1443 1444 # Load cluster information 1445 load_cluster_info_from_node(argv[0]) 1446 xputs ">>> Calling #{cmd.join(" ")}" 1447 @nodes.each{|n| 1448 begin 1449 res = n.r.send(*cmd) 1450 puts "#{n}: #{res}" 1451 rescue => e 1452 puts "#{n}: #{e}" 1453 end 1454 } 1455 end 1456 1457 def import_cluster_cmd(argv,opt) 1458 source_addr = opt['from'] 1459 xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}" 1460 use_copy = opt['copy'] 1461 use_replace = opt['replace'] 1462 1463 # Check the existing cluster. 1464 load_cluster_info_from_node(argv[0]) 1465 check_cluster 1466 1467 # Connect to the source node. 1468 xputs ">>> Connecting to the source Redis instance" 1469 src_host,src_port = source_addr.split(":") 1470 source = Redis.new(:host =>src_host, :port =>src_port) 1471 if source.info['cluster_enabled'].to_i == 1 1472 xputs "[ERR] The source node should not be a cluster node." 1473 end 1474 xputs "*** Importing #{source.dbsize} keys from DB 0" 1475 1476 # Build a slot -> node map 1477 slots = {} 1478 @nodes.each{|n| 1479 n.slots.each{|s,_| 1480 slots[s] = n 1481 } 1482 } 1483 1484 # Use SCAN to iterate over the keys, migrating to the 1485 # right node as needed. 1486 cursor = nil 1487 while cursor != 0 1488 cursor,keys = source.scan(cursor, :count => 1000) 1489 cursor = cursor.to_i 1490 keys.each{|k| 1491 # Migrate keys using the MIGRATE command. 1492 slot = key_to_slot(k) 1493 target = slots[slot] 1494 print "Migrating #{k} to #{target}: " 1495 STDOUT.flush 1496 begin 1497 cmd = ["migrate",target.info[:host],target.info[:port],k,0,@timeout] 1498 cmd << :copy if use_copy 1499 cmd << :replace if use_replace 1500 source.client.call(cmd) 1501 rescue => e 1502 puts e 1503 else 1504 puts "OK" 1505 end 1506 } 1507 end 1508 end 1509 1510 def help_cluster_cmd(argv,opt) 1511 show_help 1512 exit 0 1513 end 1514 1515 # Parse the options for the specific command "cmd". 1516 # Returns an hash populate with option => value pairs, and the index of 1517 # the first non-option argument in ARGV. 1518 def parse_options(cmd) 1519 idx = 1 ; # Current index into ARGV 1520 options={} 1521 while idx < ARGV.length && ARGV[idx][0..1] == '--' 1522 if ARGV[idx][0..1] == "--" 1523 option = ARGV[idx][2..-1] 1524 idx += 1 1525 1526 # --verbose is a global option 1527 if option == "verbose" 1528 $verbose = true 1529 next 1530 end 1531 1532 if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil 1533 puts "Unknown option '#{option}' for command '#{cmd}'" 1534 exit 1 1535 end 1536 if ALLOWED_OPTIONS[cmd][option] != false 1537 value = ARGV[idx] 1538 idx += 1 1539 else 1540 value = true 1541 end 1542 1543 # If the option is set to [], it's a multiple arguments 1544 # option. We just queue every new value into an array. 1545 if ALLOWED_OPTIONS[cmd][option] == [] 1546 options[option] = [] if !options[option] 1547 options[option] << value 1548 else 1549 options[option] = value 1550 end 1551 else 1552 # Remaining arguments are not options. 1553 break 1554 end 1555 end 1556 1557 # Enforce mandatory options 1558 if ALLOWED_OPTIONS[cmd] 1559 ALLOWED_OPTIONS[cmd].each {|option,val| 1560 if !options[option] && val == :required 1561 puts "Option '--#{option}' is required "+ \ 1562 "for subcommand '#{cmd}'" 1563 exit 1 1564 end 1565 } 1566 end 1567 return options,idx 1568 end 1569end 1570 1571################################################################################# 1572# Libraries 1573# 1574# We try to don't depend on external libs since this is a critical part 1575# of Redis Cluster. 1576################################################################################# 1577 1578# This is the CRC16 algorithm used by Redis Cluster to hash keys. 1579# Implementation according to CCITT standards. 1580# 1581# This is actually the XMODEM CRC 16 algorithm, using the 1582# following parameters: 1583# 1584# Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN" 1585# Width : 16 bit 1586# Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) 1587# Initialization : 0000 1588# Reflect Input byte : False 1589# Reflect Output CRC : False 1590# Xor constant to output CRC : 0000 1591# Output for "123456789" : 31C3 1592 1593module RedisClusterCRC16 1594 def RedisClusterCRC16.crc16(bytes) 1595 crc = 0 1596 bytes.each_byte{|b| 1597 crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff] 1598 } 1599 crc 1600 end 1601 1602private 1603 XMODEMCRC16Lookup = [ 1604 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, 1605 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 1606 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, 1607 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, 1608 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, 1609 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, 1610 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, 1611 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, 1612 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, 1613 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, 1614 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, 1615 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, 1616 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, 1617 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, 1618 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, 1619 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, 1620 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, 1621 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, 1622 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, 1623 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, 1624 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, 1625 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, 1626 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, 1627 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, 1628 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, 1629 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, 1630 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, 1631 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, 1632 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, 1633 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, 1634 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, 1635 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 1636 ] 1637end 1638 1639# Turn a key name into the corrisponding Redis Cluster slot. 1640def key_to_slot(key) 1641 # Only hash what is inside {...} if there is such a pattern in the key. 1642 # Note that the specification requires the content that is between 1643 # the first { and the first } after the first {. If we found {} without 1644 # nothing in the middle, the whole key is hashed as usually. 1645 s = key.index "{" 1646 if s 1647 e = key.index "}",s+1 1648 if e && e != s+1 1649 key = key[s+1..e-1] 1650 end 1651 end 1652 RedisClusterCRC16.crc16(key) % 16384 1653end 1654 1655################################################################################# 1656# Definition of commands 1657################################################################################# 1658 1659COMMANDS={ 1660 "create" => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"], 1661 "check" => ["check_cluster_cmd", 2, "host:port"], 1662 "info" => ["info_cluster_cmd", 2, "host:port"], 1663 "fix" => ["fix_cluster_cmd", 2, "host:port"], 1664 "reshard" => ["reshard_cluster_cmd", 2, "host:port"], 1665 "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"], 1666 "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"], 1667 "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"], 1668 "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"], 1669 "call" => ["call_cluster_cmd", -3, "host:port command arg arg .. arg"], 1670 "import" => ["import_cluster_cmd", 2, "host:port"], 1671 "help" => ["help_cluster_cmd", 1, "(show this help)"] 1672} 1673 1674ALLOWED_OPTIONS={ 1675 "create" => {"replicas" => true}, 1676 "add-node" => {"slave" => false, "master-id" => true}, 1677 "import" => {"from" => :required, "copy" => false, "replace" => false}, 1678 "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true}, 1679 "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true}, 1680 "fix" => {"timeout" => MigrateDefaultTimeout}, 1681} 1682 1683def show_help 1684 puts "Usage: redis-trib <command> <options> <arguments ...>\n\n" 1685 COMMANDS.each{|k,v| 1686 o = "" 1687 puts " #{k.ljust(15)} #{v[2]}" 1688 if ALLOWED_OPTIONS[k] 1689 ALLOWED_OPTIONS[k].each{|optname,has_arg| 1690 puts " --#{optname}" + (has_arg ? " <arg>" : "") 1691 } 1692 end 1693 } 1694 puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n" 1695end 1696 1697# Sanity check 1698if ARGV.length == 0 1699 show_help 1700 exit 1 1701end 1702 1703rt = RedisTrib.new 1704cmd_spec = COMMANDS[ARGV[0].downcase] 1705if !cmd_spec 1706 puts "Unknown redis-trib subcommand '#{ARGV[0]}'" 1707 exit 1 1708end 1709 1710# Parse options 1711cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase) 1712rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1)) 1713 1714# Dispatch 1715rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options) 1716