1#!/usr/bin/env ruby 2 3# TODO (temporary here, we'll move this into the Github issues once 4# redis-trib initial implementation is completed). 5# 6# - Make sure that if the rehashing fails in the middle redis-trib will try 7# to recover. 8# - When redis-trib performs a cluster check, if it detects a slot move in 9# progress it should prompt the user to continue the move from where it 10# stopped. 11# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop 12# while rehashing, and performing the best cleanup possible if the user 13# forces the quit. 14# - When doing "fix" set a global Fix to true, and prompt the user to 15# fix the problem if automatically fixable every time there is something 16# to fix. For instance: 17# 1) If there is a node that pretend to receive a slot, or to migrate a 18# slot, but has no entries in that slot, fix it. 19# 2) If there is a node having keys in slots that are not owned by it 20# fix this condition moving the entries in the same node. 21# 3) Perform more possibly slow tests about the state of the cluster. 22# 4) When aborted slot migration is detected, fix it. 23 24require 'rubygems' 25require 'redis' 26 27ClusterHashSlots = 16384 28MigrateDefaultTimeout = 60000 29MigrateDefaultPipeline = 10 30RebalanceDefaultThreshold = 2 31 32$verbose = false 33 34def xputs(s) 35 case s[0..2] 36 when ">>>" 37 color="29;1" 38 when "[ER" 39 color="31;1" 40 when "[WA" 41 color="31;1" 42 when "[OK" 43 color="32" 44 when "[FA","***" 45 color="33" 46 else 47 color=nil 48 end 49 50 color = nil if ENV['TERM'] != "xterm" 51 print "\033[#{color}m" if color 52 print s 53 print "\033[0m" if color 54 print "\n" 55end 56 57class ClusterNode 58 def initialize(addr) 59 s = addr.split(":") 60 if s.length < 2 61 puts "Invalid IP or Port (given as #{addr}) - use IP:Port format" 62 exit 1 63 end 64 port = s.pop # removes port from split array 65 ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address 66 @r = nil 67 @info = {} 68 @info[:host] = ip 69 @info[:port] = port 70 @info[:slots] = {} 71 @info[:migrating] = {} 72 @info[:importing] = {} 73 @info[:replicate] = false 74 @dirty = false # True if we need to flush slots info into node. 75 @friends = [] 76 end 77 78 def friends 79 @friends 80 end 81 82 def slots 83 @info[:slots] 84 end 85 86 def has_flag?(flag) 87 @info[:flags].index(flag) 88 end 89 90 def to_s 91 "#{@info[:host]}:#{@info[:port]}" 92 end 93 94 def connect(o={}) 95 return if @r 96 print "Connecting to node #{self}: " if $verbose 97 STDOUT.flush 98 begin 99 @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60) 100 @r.ping 101 rescue 102 xputs "[ERR] Sorry, can't connect to node #{self}" 103 exit 1 if o[:abort] 104 @r = nil 105 end 106 xputs "OK" if $verbose 107 end 108 109 def assert_cluster 110 info = @r.info 111 if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 112 xputs "[ERR] Node #{self} is not configured as a cluster node." 113 exit 1 114 end 115 end 116 117 def assert_empty 118 if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || 119 (@r.info['db0']) 120 xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0." 121 exit 1 122 end 123 end 124 125 def load_info(o={}) 126 self.connect 127 nodes = @r.cluster("nodes").split("\n") 128 nodes.each{|n| 129 # name addr flags role ping_sent ping_recv link_status slots 130 split = n.split 131 name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6] 132 slots = split[8..-1] 133 info = { 134 :name => name, 135 :addr => addr, 136 :flags => flags.split(","), 137 :replicate => master_id, 138 :ping_sent => ping_sent.to_i, 139 :ping_recv => ping_recv.to_i, 140 :link_status => link_status 141 } 142 info[:replicate] = false if master_id == "-" 143 144 if info[:flags].index("myself") 145 @info = @info.merge(info) 146 @info[:slots] = {} 147 slots.each{|s| 148 if s[0..0] == '[' 149 if s.index("->-") # Migrating 150 slot,dst = s[1..-1].split("->-") 151 @info[:migrating][slot.to_i] = dst 152 elsif s.index("-<-") # Importing 153 slot,src = s[1..-1].split("-<-") 154 @info[:importing][slot.to_i] = src 155 end 156 elsif s.index("-") 157 start,stop = s.split("-") 158 self.add_slots((start.to_i)..(stop.to_i)) 159 else 160 self.add_slots((s.to_i)..(s.to_i)) 161 end 162 } if slots 163 @dirty = false 164 @r.cluster("info").split("\n").each{|e| 165 k,v=e.split(":") 166 k = k.to_sym 167 v.chop! 168 if k != :cluster_state 169 @info[k] = v.to_i 170 else 171 @info[k] = v 172 end 173 } 174 elsif o[:getfriends] 175 @friends << info 176 end 177 } 178 end 179 180 def add_slots(slots) 181 slots.each{|s| 182 @info[:slots][s] = :new 183 } 184 @dirty = true 185 end 186 187 def set_as_replica(node_id) 188 @info[:replicate] = node_id 189 @dirty = true 190 end 191 192 def flush_node_config 193 return if !@dirty 194 if @info[:replicate] 195 begin 196 @r.cluster("replicate",@info[:replicate]) 197 rescue 198 # If the cluster did not already joined it is possible that 199 # the slave does not know the master node yet. So on errors 200 # we return ASAP leaving the dirty flag set, to flush the 201 # config later. 202 return 203 end 204 else 205 new = [] 206 @info[:slots].each{|s,val| 207 if val == :new 208 new << s 209 @info[:slots][s] = true 210 end 211 } 212 @r.cluster("addslots",*new) 213 end 214 @dirty = false 215 end 216 217 def info_string 218 # We want to display the hash slots assigned to this node 219 # as ranges, like in: "1-5,8-9,20-25,30" 220 # 221 # Note: this could be easily written without side effects, 222 # we use 'slots' just to split the computation into steps. 223 224 # First step: we want an increasing array of integers 225 # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] 226 slots = @info[:slots].keys.sort 227 228 # As we want to aggregate adjacent slots we convert all the 229 # slot integers into ranges (with just one element) 230 # So we have something like [1..1,2..2, ... and so forth. 231 slots.map!{|x| x..x} 232 233 # Finally we group ranges with adjacent elements. 234 slots = slots.reduce([]) {|a,b| 235 if !a.empty? && b.first == (a[-1].last)+1 236 a[0..-2] + [(a[-1].first)..(b.last)] 237 else 238 a + [b] 239 end 240 } 241 242 # Now our task is easy, we just convert ranges with just one 243 # element into a number, and a real range into a start-end format. 244 # Finally we join the array using the comma as separator. 245 slots = slots.map{|x| 246 x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" 247 }.join(",") 248 249 role = self.has_flag?("master") ? "M" : "S" 250 251 if self.info[:replicate] and @dirty 252 is = "S: #{self.info[:name]} #{self.to_s}" 253 else 254 is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+ 255 " slots:#{slots} (#{self.slots.length} slots) "+ 256 "#{(self.info[:flags]-["myself"]).join(",")}" 257 end 258 if self.info[:replicate] 259 is += "\n replicates #{info[:replicate]}" 260 elsif self.has_flag?("master") && self.info[:replicas] 261 is += "\n #{info[:replicas].length} additional replica(s)" 262 end 263 is 264 end 265 266 # Return a single string representing nodes and associated slots. 267 # TODO: remove slaves from config when slaves will be handled 268 # by Redis Cluster. 269 def get_config_signature 270 config = [] 271 @r.cluster("nodes").each_line{|l| 272 s = l.split 273 slots = s[8..-1].select {|x| x[0..0] != "["} 274 next if slots.length == 0 275 config << s[0]+":"+(slots.sort.join(",")) 276 } 277 config.sort.join("|") 278 end 279 280 def info 281 @info 282 end 283 284 def is_dirty? 285 @dirty 286 end 287 288 def r 289 @r 290 end 291end 292 293class RedisTrib 294 def initialize 295 @nodes = [] 296 @fix = false 297 @errors = [] 298 @timeout = MigrateDefaultTimeout 299 end 300 301 def check_arity(req_args, num_args) 302 if ((req_args > 0 and num_args != req_args) || 303 (req_args < 0 and num_args < req_args.abs)) 304 xputs "[ERR] Wrong number of arguments for specified sub command" 305 exit 1 306 end 307 end 308 309 def add_node(node) 310 @nodes << node 311 end 312 313 def reset_nodes 314 @nodes = [] 315 end 316 317 def cluster_error(msg) 318 @errors << msg 319 xputs msg 320 end 321 322 # Return the node with the specified ID or Nil. 323 def get_node_by_name(name) 324 @nodes.each{|n| 325 return n if n.info[:name] == name.downcase 326 } 327 return nil 328 end 329 330 # Like get_node_by_name but the specified name can be just the first 331 # part of the node ID as long as the prefix in unique across the 332 # cluster. 333 def get_node_by_abbreviated_name(name) 334 l = name.length 335 candidates = [] 336 @nodes.each{|n| 337 if n.info[:name][0...l] == name.downcase 338 candidates << n 339 end 340 } 341 return nil if candidates.length != 1 342 candidates[0] 343 end 344 345 # This function returns the master that has the least number of replicas 346 # in the cluster. If there are multiple masters with the same smaller 347 # number of replicas, one at random is returned. 348 def get_master_with_least_replicas 349 masters = @nodes.select{|n| n.has_flag? "master"} 350 sorted = masters.sort{|a,b| 351 a.info[:replicas].length <=> b.info[:replicas].length 352 } 353 sorted[0] 354 end 355 356 def check_cluster(opt={}) 357 xputs ">>> Performing Cluster Check (using node #{@nodes[0]})" 358 show_nodes if !opt[:quiet] 359 check_config_consistency 360 check_open_slots 361 check_slots_coverage 362 end 363 364 def show_cluster_info 365 masters = 0 366 keys = 0 367 @nodes.each{|n| 368 if n.has_flag?("master") 369 puts "#{n} (#{n.info[:name][0...8]}...) -> #{n.r.dbsize} keys | #{n.slots.length} slots | "+ 370 "#{n.info[:replicas].length} slaves." 371 masters += 1 372 keys += n.r.dbsize 373 end 374 } 375 xputs "[OK] #{keys} keys in #{masters} masters." 376 keys_per_slot = sprintf("%.2f",keys/16384.0) 377 puts "#{keys_per_slot} keys per slot on average." 378 end 379 380 # Merge slots of every known node. If the resulting slots are equal 381 # to ClusterHashSlots, then all slots are served. 382 def covered_slots 383 slots = {} 384 @nodes.each{|n| 385 slots = slots.merge(n.slots) 386 } 387 slots 388 end 389 390 def check_slots_coverage 391 xputs ">>> Check slots coverage..." 392 slots = covered_slots 393 if slots.length == ClusterHashSlots 394 xputs "[OK] All #{ClusterHashSlots} slots covered." 395 else 396 cluster_error \ 397 "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes." 398 fix_slots_coverage if @fix 399 end 400 end 401 402 def check_open_slots 403 xputs ">>> Check for open slots..." 404 open_slots = [] 405 @nodes.each{|n| 406 if n.info[:migrating].size > 0 407 cluster_error \ 408 "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})." 409 open_slots += n.info[:migrating].keys 410 end 411 if n.info[:importing].size > 0 412 cluster_error \ 413 "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})." 414 open_slots += n.info[:importing].keys 415 end 416 } 417 open_slots.uniq! 418 if open_slots.length > 0 419 xputs "[WARNING] The following slots are open: #{open_slots.join(",")}" 420 end 421 if @fix 422 open_slots.each{|slot| fix_open_slot slot} 423 end 424 end 425 426 def nodes_with_keys_in_slot(slot) 427 nodes = [] 428 @nodes.each{|n| 429 next if n.has_flag?("slave") 430 nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0 431 } 432 nodes 433 end 434 435 def fix_slots_coverage 436 not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys 437 xputs ">>> Fixing slots coverage..." 438 xputs "List of not covered slots: " + not_covered.join(",") 439 440 # For every slot, take action depending on the actual condition: 441 # 1) No node has keys for this slot. 442 # 2) A single node has keys for this slot. 443 # 3) Multiple nodes have keys for this slot. 444 slots = {} 445 not_covered.each{|slot| 446 nodes = nodes_with_keys_in_slot(slot) 447 slots[slot] = nodes 448 xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join(", ")}" 449 } 450 451 none = slots.select {|k,v| v.length == 0} 452 single = slots.select {|k,v| v.length == 1} 453 multi = slots.select {|k,v| v.length > 1} 454 455 # Handle case "1": keys in no node. 456 if none.length > 0 457 xputs "The folowing uncovered slots have no keys across the cluster:" 458 xputs none.keys.join(",") 459 yes_or_die "Fix these slots by covering with a random node?" 460 none.each{|slot,nodes| 461 node = @nodes.sample 462 xputs ">>> Covering slot #{slot} with #{node}" 463 node.r.cluster("addslots",slot) 464 } 465 end 466 467 # Handle case "2": keys only in one node. 468 if single.length > 0 469 xputs "The folowing uncovered slots have keys in just one node:" 470 puts single.keys.join(",") 471 yes_or_die "Fix these slots by covering with those nodes?" 472 single.each{|slot,nodes| 473 xputs ">>> Covering slot #{slot} with #{nodes[0]}" 474 nodes[0].r.cluster("addslots",slot) 475 } 476 end 477 478 # Handle case "3": keys in multiple nodes. 479 if multi.length > 0 480 xputs "The folowing uncovered slots have keys in multiple nodes:" 481 xputs multi.keys.join(",") 482 yes_or_die "Fix these slots by moving keys into a single node?" 483 multi.each{|slot,nodes| 484 target = get_node_with_most_keys_in_slot(nodes,slot) 485 xputs ">>> Covering slot #{slot} moving keys to #{target}" 486 487 target.r.cluster('addslots',slot) 488 target.r.cluster('setslot',slot,'stable') 489 nodes.each{|src| 490 next if src == target 491 # Set the source node in 'importing' state (even if we will 492 # actually migrate keys away) in order to avoid receiving 493 # redirections for MIGRATE. 494 src.r.cluster('setslot',slot,'importing',target.info[:name]) 495 move_slot(src,target,slot,:dots=>true,:fix=>true,:cold=>true) 496 src.r.cluster('setslot',slot,'stable') 497 } 498 } 499 end 500 end 501 502 # Return the owner of the specified slot 503 def get_slot_owners(slot) 504 owners = [] 505 @nodes.each{|n| 506 next if n.has_flag?("slave") 507 n.slots.each{|s,_| 508 owners << n if s == slot 509 } 510 } 511 owners 512 end 513 514 # Return the node, among 'nodes' with the greatest number of keys 515 # in the specified slot. 516 def get_node_with_most_keys_in_slot(nodes,slot) 517 best = nil 518 best_numkeys = 0 519 @nodes.each{|n| 520 next if n.has_flag?("slave") 521 numkeys = n.r.cluster("countkeysinslot",slot) 522 if numkeys > best_numkeys || best == nil 523 best = n 524 best_numkeys = numkeys 525 end 526 } 527 return best 528 end 529 530 # Slot 'slot' was found to be in importing or migrating state in one or 531 # more nodes. This function fixes this condition by migrating keys where 532 # it seems more sensible. 533 def fix_open_slot(slot) 534 puts ">>> Fixing open slot #{slot}" 535 536 # Try to obtain the current slot owner, according to the current 537 # nodes configuration. 538 owners = get_slot_owners(slot) 539 owner = owners[0] if owners.length == 1 540 541 migrating = [] 542 importing = [] 543 @nodes.each{|n| 544 next if n.has_flag? "slave" 545 if n.info[:migrating][slot] 546 migrating << n 547 elsif n.info[:importing][slot] 548 importing << n 549 elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner 550 xputs "*** Found keys about slot #{slot} in node #{n}!" 551 importing << n 552 end 553 } 554 puts "Set as migrating in: #{migrating.join(",")}" 555 puts "Set as importing in: #{importing.join(",")}" 556 557 # If there is no slot owner, set as owner the slot with the biggest 558 # number of keys, among the set of migrating / importing nodes. 559 if !owner 560 xputs ">>> Nobody claims ownership, selecting an owner..." 561 owner = get_node_with_most_keys_in_slot(@nodes,slot) 562 563 # If we still don't have an owner, we can't fix it. 564 if !owner 565 xputs "[ERR] Can't select a slot owner. Impossible to fix." 566 exit 1 567 end 568 569 # Use ADDSLOTS to assign the slot. 570 puts "*** Configuring #{owner} as the slot owner" 571 owner.r.cluster("setslot",slot,"stable") 572 owner.r.cluster("addslots",slot) 573 # Make sure this information will propagate. Not strictly needed 574 # since there is no past owner, so all the other nodes will accept 575 # whatever epoch this node will claim the slot with. 576 owner.r.cluster("bumpepoch") 577 578 # Remove the owner from the list of migrating/importing 579 # nodes. 580 migrating.delete(owner) 581 importing.delete(owner) 582 end 583 584 # If there are multiple owners of the slot, we need to fix it 585 # so that a single node is the owner and all the other nodes 586 # are in importing state. Later the fix can be handled by one 587 # of the base cases above. 588 # 589 # Note that this case also covers multiple nodes having the slot 590 # in migrating state, since migrating is a valid state only for 591 # slot owners. 592 if owners.length > 1 593 owner = get_node_with_most_keys_in_slot(owners,slot) 594 owners.each{|n| 595 next if n == owner 596 n.r.cluster('delslots',slot) 597 n.r.cluster('setslot',slot,'importing',owner.info[:name]) 598 importing.delete(n) # Avoid duplciates 599 importing << n 600 } 601 owner.r.cluster('bumpepoch') 602 end 603 604 # Case 1: The slot is in migrating state in one slot, and in 605 # importing state in 1 slot. That's trivial to address. 606 if migrating.length == 1 && importing.length == 1 607 move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true) 608 # Case 2: There are multiple nodes that claim the slot as importing, 609 # they probably got keys about the slot after a restart so opened 610 # the slot. In this case we just move all the keys to the owner 611 # according to the configuration. 612 elsif migrating.length == 0 && importing.length > 0 613 xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}" 614 importing.each {|node| 615 next if node == owner 616 move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true) 617 xputs ">>> Setting #{slot} as STABLE in #{node}" 618 node.r.cluster("setslot",slot,"stable") 619 } 620 # Case 3: There are no slots claiming to be in importing state, but 621 # there is a migrating node that actually don't have any key. We 622 # can just close the slot, probably a reshard interrupted in the middle. 623 elsif importing.length == 0 && migrating.length == 1 && 624 migrating[0].r.cluster("getkeysinslot",slot,10).length == 0 625 migrating[0].r.cluster("setslot",slot,"stable") 626 else 627 xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}" 628 end 629 end 630 631 # Check if all the nodes agree about the cluster configuration 632 def check_config_consistency 633 if !is_config_consistent? 634 cluster_error "[ERR] Nodes don't agree about configuration!" 635 else 636 xputs "[OK] All nodes agree about slots configuration." 637 end 638 end 639 640 def is_config_consistent? 641 signatures=[] 642 @nodes.each{|n| 643 signatures << n.get_config_signature 644 } 645 return signatures.uniq.length == 1 646 end 647 648 def wait_cluster_join 649 print "Waiting for the cluster to join" 650 while !is_config_consistent? 651 print "." 652 STDOUT.flush 653 sleep 1 654 end 655 print "\n" 656 end 657 658 def alloc_slots 659 nodes_count = @nodes.length 660 masters_count = @nodes.length / (@replicas+1) 661 masters = [] 662 663 # The first step is to split instances by IP. This is useful as 664 # we'll try to allocate master nodes in different physical machines 665 # (as much as possible) and to allocate slaves of a given master in 666 # different physical machines as well. 667 # 668 # This code assumes just that if the IP is different, than it is more 669 # likely that the instance is running in a different physical host 670 # or at least a different virtual machine. 671 ips = {} 672 @nodes.each{|n| 673 ips[n.info[:host]] = [] if !ips[n.info[:host]] 674 ips[n.info[:host]] << n 675 } 676 677 # Select master instances 678 puts "Using #{masters_count} masters:" 679 interleaved = [] 680 stop = false 681 while not stop do 682 # Take one node from each IP until we run out of nodes 683 # across every IP. 684 ips.each do |ip,nodes| 685 if nodes.empty? 686 # if this IP has no remaining nodes, check for termination 687 if interleaved.length == nodes_count 688 # stop when 'interleaved' has accumulated all nodes 689 stop = true 690 next 691 end 692 else 693 # else, move one node from this IP to 'interleaved' 694 interleaved.push nodes.shift 695 end 696 end 697 end 698 699 masters = interleaved.slice!(0, masters_count) 700 nodes_count -= masters.length 701 702 masters.each{|m| puts m} 703 704 # Alloc slots on masters 705 slots_per_node = ClusterHashSlots.to_f / masters_count 706 first = 0 707 cursor = 0.0 708 masters.each_with_index{|n,masternum| 709 last = (cursor+slots_per_node-1).round 710 if last > ClusterHashSlots || masternum == masters.length-1 711 last = ClusterHashSlots-1 712 end 713 last = first if last < first # Min step is 1. 714 n.add_slots first..last 715 first = last+1 716 cursor += slots_per_node 717 } 718 719 # Select N replicas for every master. 720 # We try to split the replicas among all the IPs with spare nodes 721 # trying to avoid the host where the master is running, if possible. 722 # 723 # Note we loop two times. The first loop assigns the requested 724 # number of replicas to each master. The second loop assigns any 725 # remaining instances as extra replicas to masters. Some masters 726 # may end up with more than their requested number of replicas, but 727 # all nodes will be used. 728 assignment_verbose = false 729 730 [:requested,:unused].each do |assign| 731 masters.each do |m| 732 assigned_replicas = 0 733 while assigned_replicas < @replicas 734 break if nodes_count == 0 735 if assignment_verbose 736 if assign == :requested 737 puts "Requesting total of #{@replicas} replicas " \ 738 "(#{assigned_replicas} replicas assigned " \ 739 "so far with #{nodes_count} total remaining)." 740 elsif assign == :unused 741 puts "Assigning extra instance to replication " \ 742 "role too (#{nodes_count} remaining)." 743 end 744 end 745 746 # Return the first node not matching our current master 747 node = interleaved.find{|n| n.info[:host] != m.info[:host]} 748 749 # If we found a node, use it as a best-first match. 750 # Otherwise, we didn't find a node on a different IP, so we 751 # go ahead and use a same-IP replica. 752 if node 753 slave = node 754 interleaved.delete node 755 else 756 slave = interleaved.shift 757 end 758 slave.set_as_replica(m.info[:name]) 759 nodes_count -= 1 760 assigned_replicas += 1 761 puts "Adding replica #{slave} to #{m}" 762 763 # If we are in the "assign extra nodes" loop, 764 # we want to assign one extra replica to each 765 # master before repeating masters. 766 # This break lets us assign extra replicas to masters 767 # in a round-robin way. 768 break if assign == :unused 769 end 770 end 771 end 772 end 773 774 def flush_nodes_config 775 @nodes.each{|n| 776 n.flush_node_config 777 } 778 end 779 780 def show_nodes 781 @nodes.each{|n| 782 xputs n.info_string 783 } 784 end 785 786 # Redis Cluster config epoch collision resolution code is able to eventually 787 # set a different epoch to each node after a new cluster is created, but 788 # it is slow compared to assign a progressive config epoch to each node 789 # before joining the cluster. However we do just a best-effort try here 790 # since if we fail is not a problem. 791 def assign_config_epoch 792 config_epoch = 1 793 @nodes.each{|n| 794 begin 795 n.r.cluster("set-config-epoch",config_epoch) 796 rescue 797 end 798 config_epoch += 1 799 } 800 end 801 802 def join_cluster 803 # We use a brute force approach to make sure the node will meet 804 # each other, that is, sending CLUSTER MEET messages to all the nodes 805 # about the very same node. 806 # Thanks to gossip this information should propagate across all the 807 # cluster in a matter of seconds. 808 first = false 809 @nodes.each{|n| 810 if !first then first = n.info; next; end # Skip the first node 811 n.r.cluster("meet",first[:host],first[:port]) 812 } 813 end 814 815 def yes_or_die(msg) 816 print "#{msg} (type 'yes' to accept): " 817 STDOUT.flush 818 if !(STDIN.gets.chomp.downcase == "yes") 819 xputs "*** Aborting..." 820 exit 1 821 end 822 end 823 824 def load_cluster_info_from_node(nodeaddr) 825 node = ClusterNode.new(nodeaddr) 826 node.connect(:abort => true) 827 node.assert_cluster 828 node.load_info(:getfriends => true) 829 add_node(node) 830 node.friends.each{|f| 831 next if f[:flags].index("noaddr") || 832 f[:flags].index("disconnected") || 833 f[:flags].index("fail") 834 fnode = ClusterNode.new(f[:addr]) 835 fnode.connect() 836 next if !fnode.r 837 begin 838 fnode.load_info() 839 add_node(fnode) 840 rescue => e 841 xputs "[ERR] Unable to load info for node #{fnode}" 842 end 843 } 844 populate_nodes_replicas_info 845 end 846 847 # This function is called by load_cluster_info_from_node in order to 848 # add additional information to every node as a list of replicas. 849 def populate_nodes_replicas_info 850 # Start adding the new field to every node. 851 @nodes.each{|n| 852 n.info[:replicas] = [] 853 } 854 855 # Populate the replicas field using the replicate field of slave 856 # nodes. 857 @nodes.each{|n| 858 if n.info[:replicate] 859 master = get_node_by_name(n.info[:replicate]) 860 if !master 861 xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}." 862 else 863 master.info[:replicas] << n 864 end 865 end 866 } 867 end 868 869 # Given a list of source nodes return a "resharding plan" 870 # with what slots to move in order to move "numslots" slots to another 871 # instance. 872 def compute_reshard_table(sources,numslots) 873 moved = [] 874 # Sort from bigger to smaller instance, for two reasons: 875 # 1) If we take less slots than instances it is better to start 876 # getting from the biggest instances. 877 # 2) We take one slot more from the first instance in the case of not 878 # perfect divisibility. Like we have 3 nodes and need to get 10 879 # slots, we take 4 from the first, and 3 from the rest. So the 880 # biggest is always the first. 881 sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} 882 source_tot_slots = sources.inject(0) {|sum,source| 883 sum+source.slots.length 884 } 885 sources.each_with_index{|s,i| 886 # Every node will provide a number of slots proportional to the 887 # slots it has assigned. 888 n = (numslots.to_f/source_tot_slots*s.slots.length) 889 if i == 0 890 n = n.ceil 891 else 892 n = n.floor 893 end 894 s.slots.keys.sort[(0...n)].each{|slot| 895 if moved.length < numslots 896 moved << {:source => s, :slot => slot} 897 end 898 } 899 } 900 return moved 901 end 902 903 def show_reshard_table(table) 904 table.each{|e| 905 puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}" 906 } 907 end 908 909 # Move slots between source and target nodes using MIGRATE. 910 # 911 # Options: 912 # :verbose -- Print a dot for every moved key. 913 # :fix -- We are moving in the context of a fix. Use REPLACE. 914 # :cold -- Move keys without opening slots / reconfiguring the nodes. 915 # :update -- Update nodes.info[:slots] for source/target nodes. 916 # :quiet -- Don't print info messages. 917 def move_slot(source,target,slot,o={}) 918 o = {:pipeline => MigrateDefaultPipeline}.merge(o) 919 920 # We start marking the slot as importing in the destination node, 921 # and the slot as migrating in the target host. Note that the order of 922 # the operations is important, as otherwise a client may be redirected 923 # to the target node that does not yet know it is importing this slot. 924 if !o[:quiet] 925 print "Moving slot #{slot} from #{source} to #{target}: " 926 STDOUT.flush 927 end 928 929 if !o[:cold] 930 target.r.cluster("setslot",slot,"importing",source.info[:name]) 931 source.r.cluster("setslot",slot,"migrating",target.info[:name]) 932 end 933 # Migrate all the keys from source to target using the MIGRATE command 934 while true 935 keys = source.r.cluster("getkeysinslot",slot,o[:pipeline]) 936 break if keys.length == 0 937 begin 938 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys]) 939 rescue => e 940 if o[:fix] && e.to_s =~ /BUSYKEY/ 941 xputs "*** Target key exists. Replacing it for FIX." 942 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:replace,:keys,*keys]) 943 else 944 puts "" 945 xputs "[ERR] Calling MIGRATE: #{e}" 946 exit 1 947 end 948 end 949 print "."*keys.length if o[:dots] 950 STDOUT.flush 951 end 952 953 puts if !o[:quiet] 954 # Set the new node as the owner of the slot in all the known nodes. 955 if !o[:cold] 956 @nodes.each{|n| 957 next if n.has_flag?("slave") 958 n.r.cluster("setslot",slot,"node",target.info[:name]) 959 } 960 end 961 962 # Update the node logical config 963 if o[:update] then 964 source.info[:slots].delete(slot) 965 target.info[:slots][slot] = true 966 end 967 end 968 969 # redis-trib subcommands implementations. 970 971 def check_cluster_cmd(argv,opt) 972 load_cluster_info_from_node(argv[0]) 973 check_cluster 974 end 975 976 def info_cluster_cmd(argv,opt) 977 load_cluster_info_from_node(argv[0]) 978 show_cluster_info 979 end 980 981 def rebalance_cluster_cmd(argv,opt) 982 opt = { 983 'pipeline' => MigrateDefaultPipeline, 984 'threshold' => RebalanceDefaultThreshold 985 }.merge(opt) 986 987 # Load nodes info before parsing options, otherwise we can't 988 # handle --weight. 989 load_cluster_info_from_node(argv[0]) 990 991 # Options parsing 992 threshold = opt['threshold'].to_i 993 autoweights = opt['auto-weights'] 994 weights = {} 995 opt['weight'].each{|w| 996 fields = w.split("=") 997 node = get_node_by_abbreviated_name(fields[0]) 998 if !node || !node.has_flag?("master") 999 puts "*** No such master node #{fields[0]}" 1000 exit 1 1001 end 1002 weights[node.info[:name]] = fields[1].to_f 1003 } if opt['weight'] 1004 useempty = opt['use-empty-masters'] 1005 1006 # Assign a weight to each node, and compute the total cluster weight. 1007 total_weight = 0 1008 nodes_involved = 0 1009 @nodes.each{|n| 1010 if n.has_flag?("master") 1011 next if !useempty && n.slots.length == 0 1012 n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1 1013 total_weight += n.info[:w] 1014 nodes_involved += 1 1015 end 1016 } 1017 1018 # Check cluster, only proceed if it looks sane. 1019 check_cluster(:quiet => true) 1020 if @errors.length != 0 1021 puts "*** Please fix your cluster problems before rebalancing" 1022 exit 1 1023 end 1024 1025 # Calculate the slots balance for each node. It's the number of 1026 # slots the node should lose (if positive) or gain (if negative) 1027 # in order to be balanced. 1028 threshold = opt['threshold'].to_f 1029 threshold_reached = false 1030 @nodes.each{|n| 1031 if n.has_flag?("master") 1032 next if !n.info[:w] 1033 expected = ((ClusterHashSlots.to_f / total_weight) * 1034 n.info[:w]).to_i 1035 n.info[:balance] = n.slots.length - expected 1036 # Compute the percentage of difference between the 1037 # expected number of slots and the real one, to see 1038 # if it's over the threshold specified by the user. 1039 over_threshold = false 1040 if threshold > 0 1041 if n.slots.length > 0 1042 err_perc = (100-(100.0*expected/n.slots.length)).abs 1043 over_threshold = true if err_perc > threshold 1044 elsif expected > 0 1045 over_threshold = true 1046 end 1047 end 1048 threshold_reached = true if over_threshold 1049 end 1050 } 1051 if !threshold_reached 1052 xputs "*** No rebalancing needed! All nodes are within the #{threshold}% threshold." 1053 return 1054 end 1055 1056 # Only consider nodes we want to change 1057 sn = @nodes.select{|n| 1058 n.has_flag?("master") && n.info[:w] 1059 } 1060 1061 # Because of rounding, it is possible that the balance of all nodes 1062 # summed does not give 0. Make sure that nodes that have to provide 1063 # slots are always matched by nodes receiving slots. 1064 total_balance = sn.map{|x| x.info[:balance]}.reduce{|a,b| a+b} 1065 while total_balance > 0 1066 sn.each{|n| 1067 if n.info[:balance] < 0 && total_balance > 0 1068 n.info[:balance] -= 1 1069 total_balance -= 1 1070 end 1071 } 1072 end 1073 1074 # Sort nodes by their slots balance. 1075 sn = sn.sort{|a,b| 1076 a.info[:balance] <=> b.info[:balance] 1077 } 1078 1079 xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}" 1080 1081 if $verbose 1082 sn.each{|n| 1083 puts "#{n} balance is #{n.info[:balance]} slots" 1084 } 1085 end 1086 1087 # Now we have at the start of the 'sn' array nodes that should get 1088 # slots, at the end nodes that must give slots. 1089 # We take two indexes, one at the start, and one at the end, 1090 # incrementing or decrementing the indexes accordingly til we 1091 # find nodes that need to get/provide slots. 1092 dst_idx = 0 1093 src_idx = sn.length - 1 1094 1095 while dst_idx < src_idx 1096 dst = sn[dst_idx] 1097 src = sn[src_idx] 1098 numslots = [dst.info[:balance],src.info[:balance]].map{|n| 1099 n.abs 1100 }.min 1101 1102 if numslots > 0 1103 puts "Moving #{numslots} slots from #{src} to #{dst}" 1104 1105 # Actaully move the slots. 1106 reshard_table = compute_reshard_table([src],numslots) 1107 if reshard_table.length != numslots 1108 xputs "*** Assertio failed: Reshard table != number of slots" 1109 exit 1 1110 end 1111 if opt['simulate'] 1112 print "#"*reshard_table.length 1113 else 1114 reshard_table.each{|e| 1115 move_slot(e[:source],dst,e[:slot], 1116 :quiet=>true, 1117 :dots=>false, 1118 :update=>true, 1119 :pipeline=>opt['pipeline']) 1120 print "#" 1121 STDOUT.flush 1122 } 1123 end 1124 puts 1125 end 1126 1127 # Update nodes balance. 1128 dst.info[:balance] += numslots 1129 src.info[:balance] -= numslots 1130 dst_idx += 1 if dst.info[:balance] == 0 1131 src_idx -= 1 if src.info[:balance] == 0 1132 end 1133 end 1134 1135 def fix_cluster_cmd(argv,opt) 1136 @fix = true 1137 @timeout = opt['timeout'].to_i if opt['timeout'] 1138 1139 load_cluster_info_from_node(argv[0]) 1140 check_cluster 1141 end 1142 1143 def reshard_cluster_cmd(argv,opt) 1144 opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt) 1145 1146 load_cluster_info_from_node(argv[0]) 1147 check_cluster 1148 if @errors.length != 0 1149 puts "*** Please fix your cluster problems before resharding" 1150 exit 1 1151 end 1152 1153 @timeout = opt['timeout'].to_i if opt['timeout'].to_i 1154 1155 # Get number of slots 1156 if opt['slots'] 1157 numslots = opt['slots'].to_i 1158 else 1159 numslots = 0 1160 while numslots <= 0 or numslots > ClusterHashSlots 1161 print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? " 1162 numslots = STDIN.gets.to_i 1163 end 1164 end 1165 1166 # Get the target instance 1167 if opt['to'] 1168 target = get_node_by_name(opt['to']) 1169 if !target || target.has_flag?("slave") 1170 xputs "*** The specified node is not known or not a master, please retry." 1171 exit 1 1172 end 1173 else 1174 target = nil 1175 while not target 1176 print "What is the receiving node ID? " 1177 target = get_node_by_name(STDIN.gets.chop) 1178 if !target || target.has_flag?("slave") 1179 xputs "*** The specified node is not known or not a master, please retry." 1180 target = nil 1181 end 1182 end 1183 end 1184 1185 # Get the source instances 1186 sources = [] 1187 if opt['from'] 1188 opt['from'].split(',').each{|node_id| 1189 if node_id == "all" 1190 sources = "all" 1191 break 1192 end 1193 src = get_node_by_name(node_id) 1194 if !src || src.has_flag?("slave") 1195 xputs "*** The specified node is not known or is not a master, please retry." 1196 exit 1 1197 end 1198 sources << src 1199 } 1200 else 1201 xputs "Please enter all the source node IDs." 1202 xputs " Type 'all' to use all the nodes as source nodes for the hash slots." 1203 xputs " Type 'done' once you entered all the source nodes IDs." 1204 while true 1205 print "Source node ##{sources.length+1}:" 1206 line = STDIN.gets.chop 1207 src = get_node_by_name(line) 1208 if line == "done" 1209 break 1210 elsif line == "all" 1211 sources = "all" 1212 break 1213 elsif !src || src.has_flag?("slave") 1214 xputs "*** The specified node is not known or is not a master, please retry." 1215 elsif src.info[:name] == target.info[:name] 1216 xputs "*** It is not possible to use the target node as source node." 1217 else 1218 sources << src 1219 end 1220 end 1221 end 1222 1223 if sources.length == 0 1224 puts "*** No source nodes given, operation aborted" 1225 exit 1 1226 end 1227 1228 # Handle soures == all. 1229 if sources == "all" 1230 sources = [] 1231 @nodes.each{|n| 1232 next if n.info[:name] == target.info[:name] 1233 next if n.has_flag?("slave") 1234 sources << n 1235 } 1236 end 1237 1238 # Check if the destination node is the same of any source nodes. 1239 if sources.index(target) 1240 xputs "*** Target node is also listed among the source nodes!" 1241 exit 1 1242 end 1243 1244 puts "\nReady to move #{numslots} slots." 1245 puts " Source nodes:" 1246 sources.each{|s| puts " "+s.info_string} 1247 puts " Destination node:" 1248 puts " #{target.info_string}" 1249 reshard_table = compute_reshard_table(sources,numslots) 1250 puts " Resharding plan:" 1251 show_reshard_table(reshard_table) 1252 if !opt['yes'] 1253 print "Do you want to proceed with the proposed reshard plan (yes/no)? " 1254 yesno = STDIN.gets.chop 1255 exit(1) if (yesno != "yes") 1256 end 1257 reshard_table.each{|e| 1258 move_slot(e[:source],target,e[:slot], 1259 :dots=>true, 1260 :pipeline=>opt['pipeline']) 1261 } 1262 end 1263 1264 # This is an helper function for create_cluster_cmd that verifies if 1265 # the number of nodes and the specified replicas have a valid configuration 1266 # where there are at least three master nodes and enough replicas per node. 1267 def check_create_parameters 1268 masters = @nodes.length/(@replicas+1) 1269 if masters < 3 1270 puts "*** ERROR: Invalid configuration for cluster creation." 1271 puts "*** Redis Cluster requires at least 3 master nodes." 1272 puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node." 1273 puts "*** At least #{3*(@replicas+1)} nodes are required." 1274 exit 1 1275 end 1276 end 1277 1278 def create_cluster_cmd(argv,opt) 1279 opt = {'replicas' => 0}.merge(opt) 1280 @replicas = opt['replicas'].to_i 1281 1282 xputs ">>> Creating cluster" 1283 argv[0..-1].each{|n| 1284 node = ClusterNode.new(n) 1285 node.connect(:abort => true) 1286 node.assert_cluster 1287 node.load_info 1288 node.assert_empty 1289 add_node(node) 1290 } 1291 check_create_parameters 1292 xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..." 1293 alloc_slots 1294 show_nodes 1295 yes_or_die "Can I set the above configuration?" 1296 flush_nodes_config 1297 xputs ">>> Nodes configuration updated" 1298 xputs ">>> Assign a different config epoch to each node" 1299 assign_config_epoch 1300 xputs ">>> Sending CLUSTER MEET messages to join the cluster" 1301 join_cluster 1302 # Give one second for the join to start, in order to avoid that 1303 # wait_cluster_join will find all the nodes agree about the config as 1304 # they are still empty with unassigned slots. 1305 sleep 1 1306 wait_cluster_join 1307 flush_nodes_config # Useful for the replicas 1308 check_cluster 1309 end 1310 1311 def addnode_cluster_cmd(argv,opt) 1312 xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}" 1313 1314 # Check the existing cluster 1315 load_cluster_info_from_node(argv[1]) 1316 check_cluster 1317 1318 # If --master-id was specified, try to resolve it now so that we 1319 # abort before starting with the node configuration. 1320 if opt['slave'] 1321 if opt['master-id'] 1322 master = get_node_by_name(opt['master-id']) 1323 if !master 1324 xputs "[ERR] No such master ID #{opt['master-id']}" 1325 end 1326 else 1327 master = get_master_with_least_replicas 1328 xputs "Automatically selected master #{master}" 1329 end 1330 end 1331 1332 # Add the new node 1333 new = ClusterNode.new(argv[0]) 1334 new.connect(:abort => true) 1335 new.assert_cluster 1336 new.load_info 1337 new.assert_empty 1338 first = @nodes.first.info 1339 add_node(new) 1340 1341 # Send CLUSTER MEET command to the new node 1342 xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster." 1343 new.r.cluster("meet",first[:host],first[:port]) 1344 1345 # Additional configuration is needed if the node is added as 1346 # a slave. 1347 if opt['slave'] 1348 wait_cluster_join 1349 xputs ">>> Configure node as replica of #{master}." 1350 new.r.cluster("replicate",master.info[:name]) 1351 end 1352 xputs "[OK] New node added correctly." 1353 end 1354 1355 def delnode_cluster_cmd(argv,opt) 1356 id = argv[1].downcase 1357 xputs ">>> Removing node #{id} from cluster #{argv[0]}" 1358 1359 # Load cluster information 1360 load_cluster_info_from_node(argv[0]) 1361 1362 # Check if the node exists and is not empty 1363 node = get_node_by_name(id) 1364 1365 if !node 1366 xputs "[ERR] No such node ID #{id}" 1367 exit 1 1368 end 1369 1370 if node.slots.length != 0 1371 xputs "[ERR] Node #{node} is not empty! Reshard data away and try again." 1372 exit 1 1373 end 1374 1375 # Send CLUSTER FORGET to all the nodes but the node to remove 1376 xputs ">>> Sending CLUSTER FORGET messages to the cluster..." 1377 @nodes.each{|n| 1378 next if n == node 1379 if n.info[:replicate] && n.info[:replicate].downcase == id 1380 # Reconfigure the slave to replicate with some other node 1381 master = get_master_with_least_replicas 1382 xputs ">>> #{n} as replica of #{master}" 1383 n.r.cluster("replicate",master.info[:name]) 1384 end 1385 n.r.cluster("forget",argv[1]) 1386 } 1387 1388 # Finally shutdown the node 1389 xputs ">>> SHUTDOWN the node." 1390 node.r.shutdown 1391 end 1392 1393 def set_timeout_cluster_cmd(argv,opt) 1394 timeout = argv[1].to_i 1395 if timeout < 100 1396 puts "Setting a node timeout of less than 100 milliseconds is a bad idea." 1397 exit 1 1398 end 1399 1400 # Load cluster information 1401 load_cluster_info_from_node(argv[0]) 1402 ok_count = 0 1403 err_count = 0 1404 1405 # Send CLUSTER FORGET to all the nodes but the node to remove 1406 xputs ">>> Reconfiguring node timeout in every cluster node..." 1407 @nodes.each{|n| 1408 begin 1409 n.r.config("set","cluster-node-timeout",timeout) 1410 n.r.config("rewrite") 1411 ok_count += 1 1412 xputs "*** New timeout set for #{n}" 1413 rescue => e 1414 puts "ERR setting node-timeot for #{n}: #{e}" 1415 err_count += 1 1416 end 1417 } 1418 xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR." 1419 end 1420 1421 def call_cluster_cmd(argv,opt) 1422 cmd = argv[1..-1] 1423 cmd[0] = cmd[0].upcase 1424 1425 # Load cluster information 1426 load_cluster_info_from_node(argv[0]) 1427 xputs ">>> Calling #{cmd.join(" ")}" 1428 @nodes.each{|n| 1429 begin 1430 res = n.r.send(*cmd) 1431 puts "#{n}: #{res}" 1432 rescue => e 1433 puts "#{n}: #{e}" 1434 end 1435 } 1436 end 1437 1438 def import_cluster_cmd(argv,opt) 1439 source_addr = opt['from'] 1440 xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}" 1441 use_copy = opt['copy'] 1442 use_replace = opt['replace'] 1443 1444 # Check the existing cluster. 1445 load_cluster_info_from_node(argv[0]) 1446 check_cluster 1447 1448 # Connect to the source node. 1449 xputs ">>> Connecting to the source Redis instance" 1450 src_host,src_port = source_addr.split(":") 1451 source = Redis.new(:host =>src_host, :port =>src_port) 1452 if source.info['cluster_enabled'].to_i == 1 1453 xputs "[ERR] The source node should not be a cluster node." 1454 end 1455 xputs "*** Importing #{source.dbsize} keys from DB 0" 1456 1457 # Build a slot -> node map 1458 slots = {} 1459 @nodes.each{|n| 1460 n.slots.each{|s,_| 1461 slots[s] = n 1462 } 1463 } 1464 1465 # Use SCAN to iterate over the keys, migrating to the 1466 # right node as needed. 1467 cursor = nil 1468 while cursor != 0 1469 cursor,keys = source.scan(cursor, :count => 1000) 1470 cursor = cursor.to_i 1471 keys.each{|k| 1472 # Migrate keys using the MIGRATE command. 1473 slot = key_to_slot(k) 1474 target = slots[slot] 1475 print "Migrating #{k} to #{target}: " 1476 STDOUT.flush 1477 begin 1478 cmd = ["migrate",target.info[:host],target.info[:port],k,0,@timeout] 1479 cmd << :copy if use_copy 1480 cmd << :replace if use_replace 1481 source.client.call(cmd) 1482 rescue => e 1483 puts e 1484 else 1485 puts "OK" 1486 end 1487 } 1488 end 1489 end 1490 1491 def help_cluster_cmd(argv,opt) 1492 show_help 1493 exit 0 1494 end 1495 1496 # Parse the options for the specific command "cmd". 1497 # Returns an hash populate with option => value pairs, and the index of 1498 # the first non-option argument in ARGV. 1499 def parse_options(cmd) 1500 idx = 1 ; # Current index into ARGV 1501 options={} 1502 while idx < ARGV.length && ARGV[idx][0..1] == '--' 1503 if ARGV[idx][0..1] == "--" 1504 option = ARGV[idx][2..-1] 1505 idx += 1 1506 1507 # --verbose is a global option 1508 if option == "verbose" 1509 $verbose = true 1510 next 1511 end 1512 1513 if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil 1514 puts "Unknown option '#{option}' for command '#{cmd}'" 1515 exit 1 1516 end 1517 if ALLOWED_OPTIONS[cmd][option] != false 1518 value = ARGV[idx] 1519 idx += 1 1520 else 1521 value = true 1522 end 1523 1524 # If the option is set to [], it's a multiple arguments 1525 # option. We just queue every new value into an array. 1526 if ALLOWED_OPTIONS[cmd][option] == [] 1527 options[option] = [] if !options[option] 1528 options[option] << value 1529 else 1530 options[option] = value 1531 end 1532 else 1533 # Remaining arguments are not options. 1534 break 1535 end 1536 end 1537 1538 # Enforce mandatory options 1539 if ALLOWED_OPTIONS[cmd] 1540 ALLOWED_OPTIONS[cmd].each {|option,val| 1541 if !options[option] && val == :required 1542 puts "Option '--#{option}' is required "+ \ 1543 "for subcommand '#{cmd}'" 1544 exit 1 1545 end 1546 } 1547 end 1548 return options,idx 1549 end 1550end 1551 1552################################################################################# 1553# Libraries 1554# 1555# We try to don't depend on external libs since this is a critical part 1556# of Redis Cluster. 1557################################################################################# 1558 1559# This is the CRC16 algorithm used by Redis Cluster to hash keys. 1560# Implementation according to CCITT standards. 1561# 1562# This is actually the XMODEM CRC 16 algorithm, using the 1563# following parameters: 1564# 1565# Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN" 1566# Width : 16 bit 1567# Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) 1568# Initialization : 0000 1569# Reflect Input byte : False 1570# Reflect Output CRC : False 1571# Xor constant to output CRC : 0000 1572# Output for "123456789" : 31C3 1573 1574module RedisClusterCRC16 1575 def RedisClusterCRC16.crc16(bytes) 1576 crc = 0 1577 bytes.each_byte{|b| 1578 crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff] 1579 } 1580 crc 1581 end 1582 1583private 1584 XMODEMCRC16Lookup = [ 1585 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, 1586 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 1587 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, 1588 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, 1589 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, 1590 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, 1591 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, 1592 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, 1593 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, 1594 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, 1595 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, 1596 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, 1597 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, 1598 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, 1599 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, 1600 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, 1601 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, 1602 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, 1603 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, 1604 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, 1605 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, 1606 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, 1607 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, 1608 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, 1609 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, 1610 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, 1611 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, 1612 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, 1613 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, 1614 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, 1615 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, 1616 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 1617 ] 1618end 1619 1620# Turn a key name into the corrisponding Redis Cluster slot. 1621def key_to_slot(key) 1622 # Only hash what is inside {...} if there is such a pattern in the key. 1623 # Note that the specification requires the content that is between 1624 # the first { and the first } after the first {. If we found {} without 1625 # nothing in the middle, the whole key is hashed as usually. 1626 s = key.index "{" 1627 if s 1628 e = key.index "}",s+1 1629 if e && e != s+1 1630 key = key[s+1..e-1] 1631 end 1632 end 1633 RedisClusterCRC16.crc16(key) % 16384 1634end 1635 1636################################################################################# 1637# Definition of commands 1638################################################################################# 1639 1640COMMANDS={ 1641 "create" => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"], 1642 "check" => ["check_cluster_cmd", 2, "host:port"], 1643 "info" => ["info_cluster_cmd", 2, "host:port"], 1644 "fix" => ["fix_cluster_cmd", 2, "host:port"], 1645 "reshard" => ["reshard_cluster_cmd", 2, "host:port"], 1646 "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"], 1647 "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"], 1648 "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"], 1649 "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"], 1650 "call" => ["call_cluster_cmd", -3, "host:port command arg arg .. arg"], 1651 "import" => ["import_cluster_cmd", 2, "host:port"], 1652 "help" => ["help_cluster_cmd", 1, "(show this help)"] 1653} 1654 1655ALLOWED_OPTIONS={ 1656 "create" => {"replicas" => true}, 1657 "add-node" => {"slave" => false, "master-id" => true}, 1658 "import" => {"from" => :required, "copy" => false, "replace" => false}, 1659 "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true}, 1660 "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true}, 1661 "fix" => {"timeout" => MigrateDefaultTimeout}, 1662} 1663 1664def show_help 1665 puts "Usage: redis-trib <command> <options> <arguments ...>\n\n" 1666 COMMANDS.each{|k,v| 1667 o = "" 1668 puts " #{k.ljust(15)} #{v[2]}" 1669 if ALLOWED_OPTIONS[k] 1670 ALLOWED_OPTIONS[k].each{|optname,has_arg| 1671 puts " --#{optname}" + (has_arg ? " <arg>" : "") 1672 } 1673 end 1674 } 1675 puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n" 1676end 1677 1678# Sanity check 1679if ARGV.length == 0 1680 show_help 1681 exit 1 1682end 1683 1684rt = RedisTrib.new 1685cmd_spec = COMMANDS[ARGV[0].downcase] 1686if !cmd_spec 1687 puts "Unknown redis-trib subcommand '#{ARGV[0]}'" 1688 exit 1 1689end 1690 1691# Parse options 1692cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase) 1693rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1)) 1694 1695# Dispatch 1696rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options) 1697