1package redis 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "math" 8 "net" 9 "runtime" 10 "sort" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/go-redis/redis/v8/internal" 16 "github.com/go-redis/redis/v8/internal/hashtag" 17 "github.com/go-redis/redis/v8/internal/pool" 18 "github.com/go-redis/redis/v8/internal/proto" 19 "github.com/go-redis/redis/v8/internal/rand" 20) 21 22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") 23 24// ClusterOptions are used to configure a cluster client and should be 25// passed to NewClusterClient. 26type ClusterOptions struct { 27 // A seed list of host:port addresses of cluster nodes. 28 Addrs []string 29 30 // NewClient creates a cluster node client with provided name and options. 31 NewClient func(opt *Options) *Client 32 33 // The maximum number of retries before giving up. Command is retried 34 // on network errors and MOVED/ASK redirects. 35 // Default is 3 retries. 36 MaxRedirects int 37 38 // Enables read-only commands on slave nodes. 39 ReadOnly bool 40 // Allows routing read-only commands to the closest master or slave node. 41 // It automatically enables ReadOnly. 42 RouteByLatency bool 43 // Allows routing read-only commands to the random master or slave node. 44 // It automatically enables ReadOnly. 45 RouteRandomly bool 46 47 // Optional function that returns cluster slots information. 48 // It is useful to manually create cluster of standalone Redis servers 49 // and load-balance read/write operations between master and slaves. 50 // It can use service like ZooKeeper to maintain configuration information 51 // and Cluster.ReloadState to manually trigger state reloading. 52 ClusterSlots func(context.Context) ([]ClusterSlot, error) 53 54 // Following options are copied from Options struct. 55 56 Dialer func(ctx context.Context, network, addr string) (net.Conn, error) 57 58 OnConnect func(ctx context.Context, cn *Conn) error 59 60 Username string 61 Password string 62 63 MaxRetries int 64 MinRetryBackoff time.Duration 65 MaxRetryBackoff time.Duration 66 67 DialTimeout time.Duration 68 ReadTimeout time.Duration 69 WriteTimeout time.Duration 70 71 // PoolSize applies per cluster node and not for the whole cluster. 72 PoolSize int 73 MinIdleConns int 74 MaxConnAge time.Duration 75 PoolTimeout time.Duration 76 IdleTimeout time.Duration 77 IdleCheckFrequency time.Duration 78 79 TLSConfig *tls.Config 80} 81 82func (opt *ClusterOptions) init() { 83 if opt.MaxRedirects == -1 { 84 opt.MaxRedirects = 0 85 } else if opt.MaxRedirects == 0 { 86 opt.MaxRedirects = 3 87 } 88 89 if opt.RouteByLatency || opt.RouteRandomly { 90 opt.ReadOnly = true 91 } 92 93 if opt.PoolSize == 0 { 94 opt.PoolSize = 5 * runtime.NumCPU() 95 } 96 97 switch opt.ReadTimeout { 98 case -1: 99 opt.ReadTimeout = 0 100 case 0: 101 opt.ReadTimeout = 3 * time.Second 102 } 103 switch opt.WriteTimeout { 104 case -1: 105 opt.WriteTimeout = 0 106 case 0: 107 opt.WriteTimeout = opt.ReadTimeout 108 } 109 110 if opt.MaxRetries == 0 { 111 opt.MaxRetries = -1 112 } 113 switch opt.MinRetryBackoff { 114 case -1: 115 opt.MinRetryBackoff = 0 116 case 0: 117 opt.MinRetryBackoff = 8 * time.Millisecond 118 } 119 switch opt.MaxRetryBackoff { 120 case -1: 121 opt.MaxRetryBackoff = 0 122 case 0: 123 opt.MaxRetryBackoff = 512 * time.Millisecond 124 } 125 126 if opt.NewClient == nil { 127 opt.NewClient = NewClient 128 } 129} 130 131func (opt *ClusterOptions) clientOptions() *Options { 132 const disableIdleCheck = -1 133 134 return &Options{ 135 Dialer: opt.Dialer, 136 OnConnect: opt.OnConnect, 137 138 Username: opt.Username, 139 Password: opt.Password, 140 141 MaxRetries: opt.MaxRetries, 142 MinRetryBackoff: opt.MinRetryBackoff, 143 MaxRetryBackoff: opt.MaxRetryBackoff, 144 145 DialTimeout: opt.DialTimeout, 146 ReadTimeout: opt.ReadTimeout, 147 WriteTimeout: opt.WriteTimeout, 148 149 PoolSize: opt.PoolSize, 150 MinIdleConns: opt.MinIdleConns, 151 MaxConnAge: opt.MaxConnAge, 152 PoolTimeout: opt.PoolTimeout, 153 IdleTimeout: opt.IdleTimeout, 154 IdleCheckFrequency: disableIdleCheck, 155 156 TLSConfig: opt.TLSConfig, 157 // If ClusterSlots is populated, then we probably have an artificial 158 // cluster whose nodes are not in clustering mode (otherwise there isn't 159 // much use for ClusterSlots config). This means we cannot execute the 160 // READONLY command against that node -- setting readOnly to false in such 161 // situations in the options below will prevent that from happening. 162 readOnly: opt.ReadOnly && opt.ClusterSlots == nil, 163 } 164} 165 166//------------------------------------------------------------------------------ 167 168type clusterNode struct { 169 Client *Client 170 171 latency uint32 // atomic 172 generation uint32 // atomic 173 failing uint32 // atomic 174} 175 176func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { 177 opt := clOpt.clientOptions() 178 opt.Addr = addr 179 node := clusterNode{ 180 Client: clOpt.NewClient(opt), 181 } 182 183 node.latency = math.MaxUint32 184 if clOpt.RouteByLatency { 185 go node.updateLatency() 186 } 187 188 return &node 189} 190 191func (n *clusterNode) String() string { 192 return n.Client.String() 193} 194 195func (n *clusterNode) Close() error { 196 return n.Client.Close() 197} 198 199func (n *clusterNode) updateLatency() { 200 const numProbe = 10 201 var dur uint64 202 203 for i := 0; i < numProbe; i++ { 204 time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond) 205 206 start := time.Now() 207 n.Client.Ping(context.TODO()) 208 dur += uint64(time.Since(start) / time.Microsecond) 209 } 210 211 latency := float64(dur) / float64(numProbe) 212 atomic.StoreUint32(&n.latency, uint32(latency+0.5)) 213} 214 215func (n *clusterNode) Latency() time.Duration { 216 latency := atomic.LoadUint32(&n.latency) 217 return time.Duration(latency) * time.Microsecond 218} 219 220func (n *clusterNode) MarkAsFailing() { 221 atomic.StoreUint32(&n.failing, uint32(time.Now().Unix())) 222} 223 224func (n *clusterNode) Failing() bool { 225 const timeout = 15 // 15 seconds 226 227 failing := atomic.LoadUint32(&n.failing) 228 if failing == 0 { 229 return false 230 } 231 if time.Now().Unix()-int64(failing) < timeout { 232 return true 233 } 234 atomic.StoreUint32(&n.failing, 0) 235 return false 236} 237 238func (n *clusterNode) Generation() uint32 { 239 return atomic.LoadUint32(&n.generation) 240} 241 242func (n *clusterNode) SetGeneration(gen uint32) { 243 for { 244 v := atomic.LoadUint32(&n.generation) 245 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) { 246 break 247 } 248 } 249} 250 251//------------------------------------------------------------------------------ 252 253type clusterNodes struct { 254 opt *ClusterOptions 255 256 mu sync.RWMutex 257 addrs []string 258 nodes map[string]*clusterNode 259 activeAddrs []string 260 closed bool 261 262 _generation uint32 // atomic 263} 264 265func newClusterNodes(opt *ClusterOptions) *clusterNodes { 266 return &clusterNodes{ 267 opt: opt, 268 269 addrs: opt.Addrs, 270 nodes: make(map[string]*clusterNode), 271 } 272} 273 274func (c *clusterNodes) Close() error { 275 c.mu.Lock() 276 defer c.mu.Unlock() 277 278 if c.closed { 279 return nil 280 } 281 c.closed = true 282 283 var firstErr error 284 for _, node := range c.nodes { 285 if err := node.Client.Close(); err != nil && firstErr == nil { 286 firstErr = err 287 } 288 } 289 290 c.nodes = nil 291 c.activeAddrs = nil 292 293 return firstErr 294} 295 296func (c *clusterNodes) Addrs() ([]string, error) { 297 var addrs []string 298 c.mu.RLock() 299 closed := c.closed 300 if !closed { 301 if len(c.activeAddrs) > 0 { 302 addrs = c.activeAddrs 303 } else { 304 addrs = c.addrs 305 } 306 } 307 c.mu.RUnlock() 308 309 if closed { 310 return nil, pool.ErrClosed 311 } 312 if len(addrs) == 0 { 313 return nil, errClusterNoNodes 314 } 315 return addrs, nil 316} 317 318func (c *clusterNodes) NextGeneration() uint32 { 319 return atomic.AddUint32(&c._generation, 1) 320} 321 322// GC removes unused nodes. 323func (c *clusterNodes) GC(generation uint32) { 324 //nolint:prealloc 325 var collected []*clusterNode 326 327 c.mu.Lock() 328 329 c.activeAddrs = c.activeAddrs[:0] 330 for addr, node := range c.nodes { 331 if node.Generation() >= generation { 332 c.activeAddrs = append(c.activeAddrs, addr) 333 if c.opt.RouteByLatency { 334 go node.updateLatency() 335 } 336 continue 337 } 338 339 delete(c.nodes, addr) 340 collected = append(collected, node) 341 } 342 343 c.mu.Unlock() 344 345 for _, node := range collected { 346 _ = node.Client.Close() 347 } 348} 349 350func (c *clusterNodes) Get(addr string) (*clusterNode, error) { 351 node, err := c.get(addr) 352 if err != nil { 353 return nil, err 354 } 355 if node != nil { 356 return node, nil 357 } 358 359 c.mu.Lock() 360 defer c.mu.Unlock() 361 362 if c.closed { 363 return nil, pool.ErrClosed 364 } 365 366 node, ok := c.nodes[addr] 367 if ok { 368 return node, nil 369 } 370 371 node = newClusterNode(c.opt, addr) 372 373 c.addrs = appendIfNotExists(c.addrs, addr) 374 c.nodes[addr] = node 375 376 return node, nil 377} 378 379func (c *clusterNodes) get(addr string) (*clusterNode, error) { 380 var node *clusterNode 381 var err error 382 c.mu.RLock() 383 if c.closed { 384 err = pool.ErrClosed 385 } else { 386 node = c.nodes[addr] 387 } 388 c.mu.RUnlock() 389 return node, err 390} 391 392func (c *clusterNodes) All() ([]*clusterNode, error) { 393 c.mu.RLock() 394 defer c.mu.RUnlock() 395 396 if c.closed { 397 return nil, pool.ErrClosed 398 } 399 400 cp := make([]*clusterNode, 0, len(c.nodes)) 401 for _, node := range c.nodes { 402 cp = append(cp, node) 403 } 404 return cp, nil 405} 406 407func (c *clusterNodes) Random() (*clusterNode, error) { 408 addrs, err := c.Addrs() 409 if err != nil { 410 return nil, err 411 } 412 413 n := rand.Intn(len(addrs)) 414 return c.Get(addrs[n]) 415} 416 417//------------------------------------------------------------------------------ 418 419type clusterSlot struct { 420 start, end int 421 nodes []*clusterNode 422} 423 424type clusterSlotSlice []*clusterSlot 425 426func (p clusterSlotSlice) Len() int { 427 return len(p) 428} 429 430func (p clusterSlotSlice) Less(i, j int) bool { 431 return p[i].start < p[j].start 432} 433 434func (p clusterSlotSlice) Swap(i, j int) { 435 p[i], p[j] = p[j], p[i] 436} 437 438type clusterState struct { 439 nodes *clusterNodes 440 Masters []*clusterNode 441 Slaves []*clusterNode 442 443 slots []*clusterSlot 444 445 generation uint32 446 createdAt time.Time 447} 448 449func newClusterState( 450 nodes *clusterNodes, slots []ClusterSlot, origin string, 451) (*clusterState, error) { 452 c := clusterState{ 453 nodes: nodes, 454 455 slots: make([]*clusterSlot, 0, len(slots)), 456 457 generation: nodes.NextGeneration(), 458 createdAt: time.Now(), 459 } 460 461 originHost, _, _ := net.SplitHostPort(origin) 462 isLoopbackOrigin := isLoopback(originHost) 463 464 for _, slot := range slots { 465 var nodes []*clusterNode 466 for i, slotNode := range slot.Nodes { 467 addr := slotNode.Addr 468 if !isLoopbackOrigin { 469 addr = replaceLoopbackHost(addr, originHost) 470 } 471 472 node, err := c.nodes.Get(addr) 473 if err != nil { 474 return nil, err 475 } 476 477 node.SetGeneration(c.generation) 478 nodes = append(nodes, node) 479 480 if i == 0 { 481 c.Masters = appendUniqueNode(c.Masters, node) 482 } else { 483 c.Slaves = appendUniqueNode(c.Slaves, node) 484 } 485 } 486 487 c.slots = append(c.slots, &clusterSlot{ 488 start: slot.Start, 489 end: slot.End, 490 nodes: nodes, 491 }) 492 } 493 494 sort.Sort(clusterSlotSlice(c.slots)) 495 496 time.AfterFunc(time.Minute, func() { 497 nodes.GC(c.generation) 498 }) 499 500 return &c, nil 501} 502 503func replaceLoopbackHost(nodeAddr, originHost string) string { 504 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) 505 if err != nil { 506 return nodeAddr 507 } 508 509 nodeIP := net.ParseIP(nodeHost) 510 if nodeIP == nil { 511 return nodeAddr 512 } 513 514 if !nodeIP.IsLoopback() { 515 return nodeAddr 516 } 517 518 // Use origin host which is not loopback and node port. 519 return net.JoinHostPort(originHost, nodePort) 520} 521 522func isLoopback(host string) bool { 523 ip := net.ParseIP(host) 524 if ip == nil { 525 return true 526 } 527 return ip.IsLoopback() 528} 529 530func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { 531 nodes := c.slotNodes(slot) 532 if len(nodes) > 0 { 533 return nodes[0], nil 534 } 535 return c.nodes.Random() 536} 537 538func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { 539 nodes := c.slotNodes(slot) 540 switch len(nodes) { 541 case 0: 542 return c.nodes.Random() 543 case 1: 544 return nodes[0], nil 545 case 2: 546 if slave := nodes[1]; !slave.Failing() { 547 return slave, nil 548 } 549 return nodes[0], nil 550 default: 551 var slave *clusterNode 552 for i := 0; i < 10; i++ { 553 n := rand.Intn(len(nodes)-1) + 1 554 slave = nodes[n] 555 if !slave.Failing() { 556 return slave, nil 557 } 558 } 559 560 // All slaves are loading - use master. 561 return nodes[0], nil 562 } 563} 564 565func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { 566 nodes := c.slotNodes(slot) 567 if len(nodes) == 0 { 568 return c.nodes.Random() 569 } 570 571 var node *clusterNode 572 for _, n := range nodes { 573 if n.Failing() { 574 continue 575 } 576 if node == nil || n.Latency() < node.Latency() { 577 node = n 578 } 579 } 580 if node != nil { 581 return node, nil 582 } 583 584 // If all nodes are failing - return random node 585 return c.nodes.Random() 586} 587 588func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) { 589 nodes := c.slotNodes(slot) 590 if len(nodes) == 0 { 591 return c.nodes.Random() 592 } 593 n := rand.Intn(len(nodes)) 594 return nodes[n], nil 595} 596 597func (c *clusterState) slotNodes(slot int) []*clusterNode { 598 i := sort.Search(len(c.slots), func(i int) bool { 599 return c.slots[i].end >= slot 600 }) 601 if i >= len(c.slots) { 602 return nil 603 } 604 x := c.slots[i] 605 if slot >= x.start && slot <= x.end { 606 return x.nodes 607 } 608 return nil 609} 610 611//------------------------------------------------------------------------------ 612 613type clusterStateHolder struct { 614 load func(ctx context.Context) (*clusterState, error) 615 616 state atomic.Value 617 reloading uint32 // atomic 618} 619 620func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder { 621 return &clusterStateHolder{ 622 load: fn, 623 } 624} 625 626func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) { 627 state, err := c.load(ctx) 628 if err != nil { 629 return nil, err 630 } 631 c.state.Store(state) 632 return state, nil 633} 634 635func (c *clusterStateHolder) LazyReload() { 636 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { 637 return 638 } 639 go func() { 640 defer atomic.StoreUint32(&c.reloading, 0) 641 642 _, err := c.Reload(context.Background()) 643 if err != nil { 644 return 645 } 646 time.Sleep(200 * time.Millisecond) 647 }() 648} 649 650func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) { 651 v := c.state.Load() 652 if v != nil { 653 state := v.(*clusterState) 654 if time.Since(state.createdAt) > 10*time.Second { 655 c.LazyReload() 656 } 657 return state, nil 658 } 659 return c.Reload(ctx) 660} 661 662func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) { 663 state, err := c.Reload(ctx) 664 if err == nil { 665 return state, nil 666 } 667 return c.Get(ctx) 668} 669 670//------------------------------------------------------------------------------ 671 672type clusterClient struct { 673 opt *ClusterOptions 674 nodes *clusterNodes 675 state *clusterStateHolder //nolint:structcheck 676 cmdsInfoCache *cmdsInfoCache //nolint:structcheck 677} 678 679// ClusterClient is a Redis Cluster client representing a pool of zero 680// or more underlying connections. It's safe for concurrent use by 681// multiple goroutines. 682type ClusterClient struct { 683 *clusterClient 684 cmdable 685 hooks 686 ctx context.Context 687} 688 689// NewClusterClient returns a Redis Cluster client as described in 690// http://redis.io/topics/cluster-spec. 691func NewClusterClient(opt *ClusterOptions) *ClusterClient { 692 opt.init() 693 694 c := &ClusterClient{ 695 clusterClient: &clusterClient{ 696 opt: opt, 697 nodes: newClusterNodes(opt), 698 }, 699 ctx: context.Background(), 700 } 701 c.state = newClusterStateHolder(c.loadState) 702 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) 703 c.cmdable = c.Process 704 705 if opt.IdleCheckFrequency > 0 { 706 go c.reaper(opt.IdleCheckFrequency) 707 } 708 709 return c 710} 711 712func (c *ClusterClient) Context() context.Context { 713 return c.ctx 714} 715 716func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { 717 if ctx == nil { 718 panic("nil context") 719 } 720 clone := *c 721 clone.cmdable = clone.Process 722 clone.hooks.lock() 723 clone.ctx = ctx 724 return &clone 725} 726 727// Options returns read-only Options that were used to create the client. 728func (c *ClusterClient) Options() *ClusterOptions { 729 return c.opt 730} 731 732// ReloadState reloads cluster state. If available it calls ClusterSlots func 733// to get cluster slots information. 734func (c *ClusterClient) ReloadState(ctx context.Context) { 735 c.state.LazyReload() 736} 737 738// Close closes the cluster client, releasing any open resources. 739// 740// It is rare to Close a ClusterClient, as the ClusterClient is meant 741// to be long-lived and shared between many goroutines. 742func (c *ClusterClient) Close() error { 743 return c.nodes.Close() 744} 745 746// Do creates a Cmd from the args and processes the cmd. 747func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd { 748 cmd := NewCmd(ctx, args...) 749 _ = c.Process(ctx, cmd) 750 return cmd 751} 752 753func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { 754 return c.hooks.process(ctx, cmd, c.process) 755} 756 757func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { 758 cmdInfo := c.cmdInfo(cmd.Name()) 759 slot := c.cmdSlot(cmd) 760 761 var node *clusterNode 762 var ask bool 763 var lastErr error 764 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 765 if attempt > 0 { 766 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 767 return err 768 } 769 } 770 771 if node == nil { 772 var err error 773 node, err = c.cmdNode(ctx, cmdInfo, slot) 774 if err != nil { 775 return err 776 } 777 } 778 779 if ask { 780 pipe := node.Client.Pipeline() 781 _ = pipe.Process(ctx, NewCmd(ctx, "asking")) 782 _ = pipe.Process(ctx, cmd) 783 _, lastErr = pipe.Exec(ctx) 784 _ = pipe.Close() 785 ask = false 786 } else { 787 lastErr = node.Client.Process(ctx, cmd) 788 } 789 790 // If there is no error - we are done. 791 if lastErr == nil { 792 return nil 793 } 794 if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed { 795 if isReadOnly { 796 c.state.LazyReload() 797 } 798 node = nil 799 continue 800 } 801 802 // If slave is loading - pick another node. 803 if c.opt.ReadOnly && isLoadingError(lastErr) { 804 node.MarkAsFailing() 805 node = nil 806 continue 807 } 808 809 var moved bool 810 var addr string 811 moved, ask, addr = isMovedError(lastErr) 812 if moved || ask { 813 var err error 814 node, err = c.nodes.Get(addr) 815 if err != nil { 816 return err 817 } 818 continue 819 } 820 821 if shouldRetry(lastErr, cmd.readTimeout() == nil) { 822 // First retry the same node. 823 if attempt == 0 { 824 continue 825 } 826 827 // Second try another node. 828 node.MarkAsFailing() 829 node = nil 830 continue 831 } 832 833 return lastErr 834 } 835 return lastErr 836} 837 838// ForEachMaster concurrently calls the fn on each master node in the cluster. 839// It returns the first error if any. 840func (c *ClusterClient) ForEachMaster( 841 ctx context.Context, 842 fn func(ctx context.Context, client *Client) error, 843) error { 844 state, err := c.state.ReloadOrGet(ctx) 845 if err != nil { 846 return err 847 } 848 849 var wg sync.WaitGroup 850 errCh := make(chan error, 1) 851 852 for _, master := range state.Masters { 853 wg.Add(1) 854 go func(node *clusterNode) { 855 defer wg.Done() 856 err := fn(ctx, node.Client) 857 if err != nil { 858 select { 859 case errCh <- err: 860 default: 861 } 862 } 863 }(master) 864 } 865 866 wg.Wait() 867 868 select { 869 case err := <-errCh: 870 return err 871 default: 872 return nil 873 } 874} 875 876// ForEachSlave concurrently calls the fn on each slave node in the cluster. 877// It returns the first error if any. 878func (c *ClusterClient) ForEachSlave( 879 ctx context.Context, 880 fn func(ctx context.Context, client *Client) error, 881) error { 882 state, err := c.state.ReloadOrGet(ctx) 883 if err != nil { 884 return err 885 } 886 887 var wg sync.WaitGroup 888 errCh := make(chan error, 1) 889 890 for _, slave := range state.Slaves { 891 wg.Add(1) 892 go func(node *clusterNode) { 893 defer wg.Done() 894 err := fn(ctx, node.Client) 895 if err != nil { 896 select { 897 case errCh <- err: 898 default: 899 } 900 } 901 }(slave) 902 } 903 904 wg.Wait() 905 906 select { 907 case err := <-errCh: 908 return err 909 default: 910 return nil 911 } 912} 913 914// ForEachShard concurrently calls the fn on each known node in the cluster. 915// It returns the first error if any. 916func (c *ClusterClient) ForEachShard( 917 ctx context.Context, 918 fn func(ctx context.Context, client *Client) error, 919) error { 920 state, err := c.state.ReloadOrGet(ctx) 921 if err != nil { 922 return err 923 } 924 925 var wg sync.WaitGroup 926 errCh := make(chan error, 1) 927 928 worker := func(node *clusterNode) { 929 defer wg.Done() 930 err := fn(ctx, node.Client) 931 if err != nil { 932 select { 933 case errCh <- err: 934 default: 935 } 936 } 937 } 938 939 for _, node := range state.Masters { 940 wg.Add(1) 941 go worker(node) 942 } 943 for _, node := range state.Slaves { 944 wg.Add(1) 945 go worker(node) 946 } 947 948 wg.Wait() 949 950 select { 951 case err := <-errCh: 952 return err 953 default: 954 return nil 955 } 956} 957 958// PoolStats returns accumulated connection pool stats. 959func (c *ClusterClient) PoolStats() *PoolStats { 960 var acc PoolStats 961 962 state, _ := c.state.Get(context.TODO()) 963 if state == nil { 964 return &acc 965 } 966 967 for _, node := range state.Masters { 968 s := node.Client.connPool.Stats() 969 acc.Hits += s.Hits 970 acc.Misses += s.Misses 971 acc.Timeouts += s.Timeouts 972 973 acc.TotalConns += s.TotalConns 974 acc.IdleConns += s.IdleConns 975 acc.StaleConns += s.StaleConns 976 } 977 978 for _, node := range state.Slaves { 979 s := node.Client.connPool.Stats() 980 acc.Hits += s.Hits 981 acc.Misses += s.Misses 982 acc.Timeouts += s.Timeouts 983 984 acc.TotalConns += s.TotalConns 985 acc.IdleConns += s.IdleConns 986 acc.StaleConns += s.StaleConns 987 } 988 989 return &acc 990} 991 992func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) { 993 if c.opt.ClusterSlots != nil { 994 slots, err := c.opt.ClusterSlots(ctx) 995 if err != nil { 996 return nil, err 997 } 998 return newClusterState(c.nodes, slots, "") 999 } 1000 1001 addrs, err := c.nodes.Addrs() 1002 if err != nil { 1003 return nil, err 1004 } 1005 1006 var firstErr error 1007 1008 for _, idx := range rand.Perm(len(addrs)) { 1009 addr := addrs[idx] 1010 1011 node, err := c.nodes.Get(addr) 1012 if err != nil { 1013 if firstErr == nil { 1014 firstErr = err 1015 } 1016 continue 1017 } 1018 1019 slots, err := node.Client.ClusterSlots(ctx).Result() 1020 if err != nil { 1021 if firstErr == nil { 1022 firstErr = err 1023 } 1024 continue 1025 } 1026 1027 return newClusterState(c.nodes, slots, node.Client.opt.Addr) 1028 } 1029 1030 /* 1031 * No node is connectable. It's possible that all nodes' IP has changed. 1032 * Clear activeAddrs to let client be able to re-connect using the initial 1033 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]), 1034 * which might have chance to resolve domain name and get updated IP address. 1035 */ 1036 c.nodes.mu.Lock() 1037 c.nodes.activeAddrs = nil 1038 c.nodes.mu.Unlock() 1039 1040 return nil, firstErr 1041} 1042 1043// reaper closes idle connections to the cluster. 1044func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { 1045 ticker := time.NewTicker(idleCheckFrequency) 1046 defer ticker.Stop() 1047 1048 for range ticker.C { 1049 nodes, err := c.nodes.All() 1050 if err != nil { 1051 break 1052 } 1053 1054 for _, node := range nodes { 1055 _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() 1056 if err != nil { 1057 internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err) 1058 } 1059 } 1060 } 1061} 1062 1063func (c *ClusterClient) Pipeline() Pipeliner { 1064 pipe := Pipeline{ 1065 ctx: c.ctx, 1066 exec: c.processPipeline, 1067 } 1068 pipe.init() 1069 return &pipe 1070} 1071 1072func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 1073 return c.Pipeline().Pipelined(ctx, fn) 1074} 1075 1076func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error { 1077 return c.hooks.processPipeline(ctx, cmds, c._processPipeline) 1078} 1079 1080func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error { 1081 cmdsMap := newCmdsMap() 1082 err := c.mapCmdsByNode(ctx, cmdsMap, cmds) 1083 if err != nil { 1084 setCmdsErr(cmds, err) 1085 return err 1086 } 1087 1088 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1089 if attempt > 0 { 1090 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1091 setCmdsErr(cmds, err) 1092 return err 1093 } 1094 } 1095 1096 failedCmds := newCmdsMap() 1097 var wg sync.WaitGroup 1098 1099 for node, cmds := range cmdsMap.m { 1100 wg.Add(1) 1101 go func(node *clusterNode, cmds []Cmder) { 1102 defer wg.Done() 1103 1104 err := c._processPipelineNode(ctx, node, cmds, failedCmds) 1105 if err == nil { 1106 return 1107 } 1108 if attempt < c.opt.MaxRedirects { 1109 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { 1110 setCmdsErr(cmds, err) 1111 } 1112 } else { 1113 setCmdsErr(cmds, err) 1114 } 1115 }(node, cmds) 1116 } 1117 1118 wg.Wait() 1119 if len(failedCmds.m) == 0 { 1120 break 1121 } 1122 cmdsMap = failedCmds 1123 } 1124 1125 return cmdsFirstErr(cmds) 1126} 1127 1128func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error { 1129 state, err := c.state.Get(ctx) 1130 if err != nil { 1131 return err 1132 } 1133 1134 if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) { 1135 for _, cmd := range cmds { 1136 slot := c.cmdSlot(cmd) 1137 node, err := c.slotReadOnlyNode(state, slot) 1138 if err != nil { 1139 return err 1140 } 1141 cmdsMap.Add(node, cmd) 1142 } 1143 return nil 1144 } 1145 1146 for _, cmd := range cmds { 1147 slot := c.cmdSlot(cmd) 1148 node, err := state.slotMasterNode(slot) 1149 if err != nil { 1150 return err 1151 } 1152 cmdsMap.Add(node, cmd) 1153 } 1154 return nil 1155} 1156 1157func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { 1158 for _, cmd := range cmds { 1159 cmdInfo := c.cmdInfo(cmd.Name()) 1160 if cmdInfo == nil || !cmdInfo.ReadOnly { 1161 return false 1162 } 1163 } 1164 return true 1165} 1166 1167func (c *ClusterClient) _processPipelineNode( 1168 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, 1169) error { 1170 return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { 1171 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { 1172 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { 1173 return writeCmds(wr, cmds) 1174 }) 1175 if err != nil { 1176 return err 1177 } 1178 1179 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { 1180 return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) 1181 }) 1182 }) 1183 }) 1184} 1185 1186func (c *ClusterClient) pipelineReadCmds( 1187 ctx context.Context, 1188 node *clusterNode, 1189 rd *proto.Reader, 1190 cmds []Cmder, 1191 failedCmds *cmdsMap, 1192) error { 1193 for _, cmd := range cmds { 1194 err := cmd.readReply(rd) 1195 cmd.SetErr(err) 1196 1197 if err == nil { 1198 continue 1199 } 1200 1201 if c.checkMovedErr(ctx, cmd, err, failedCmds) { 1202 continue 1203 } 1204 1205 if c.opt.ReadOnly && isLoadingError(err) { 1206 node.MarkAsFailing() 1207 return err 1208 } 1209 if isRedisError(err) { 1210 continue 1211 } 1212 return err 1213 } 1214 return nil 1215} 1216 1217func (c *ClusterClient) checkMovedErr( 1218 ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap, 1219) bool { 1220 moved, ask, addr := isMovedError(err) 1221 if !moved && !ask { 1222 return false 1223 } 1224 1225 node, err := c.nodes.Get(addr) 1226 if err != nil { 1227 return false 1228 } 1229 1230 if moved { 1231 c.state.LazyReload() 1232 failedCmds.Add(node, cmd) 1233 return true 1234 } 1235 1236 if ask { 1237 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd) 1238 return true 1239 } 1240 1241 panic("not reached") 1242} 1243 1244// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. 1245func (c *ClusterClient) TxPipeline() Pipeliner { 1246 pipe := Pipeline{ 1247 ctx: c.ctx, 1248 exec: c.processTxPipeline, 1249 } 1250 pipe.init() 1251 return &pipe 1252} 1253 1254func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 1255 return c.TxPipeline().Pipelined(ctx, fn) 1256} 1257 1258func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { 1259 return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline) 1260} 1261 1262func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error { 1263 // Trim multi .. exec. 1264 cmds = cmds[1 : len(cmds)-1] 1265 1266 state, err := c.state.Get(ctx) 1267 if err != nil { 1268 setCmdsErr(cmds, err) 1269 return err 1270 } 1271 1272 cmdsMap := c.mapCmdsBySlot(cmds) 1273 for slot, cmds := range cmdsMap { 1274 node, err := state.slotMasterNode(slot) 1275 if err != nil { 1276 setCmdsErr(cmds, err) 1277 continue 1278 } 1279 1280 cmdsMap := map[*clusterNode][]Cmder{node: cmds} 1281 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1282 if attempt > 0 { 1283 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1284 setCmdsErr(cmds, err) 1285 return err 1286 } 1287 } 1288 1289 failedCmds := newCmdsMap() 1290 var wg sync.WaitGroup 1291 1292 for node, cmds := range cmdsMap { 1293 wg.Add(1) 1294 go func(node *clusterNode, cmds []Cmder) { 1295 defer wg.Done() 1296 1297 err := c._processTxPipelineNode(ctx, node, cmds, failedCmds) 1298 if err == nil { 1299 return 1300 } 1301 1302 if attempt < c.opt.MaxRedirects { 1303 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { 1304 setCmdsErr(cmds, err) 1305 } 1306 } else { 1307 setCmdsErr(cmds, err) 1308 } 1309 }(node, cmds) 1310 } 1311 1312 wg.Wait() 1313 if len(failedCmds.m) == 0 { 1314 break 1315 } 1316 cmdsMap = failedCmds.m 1317 } 1318 } 1319 1320 return cmdsFirstErr(cmds) 1321} 1322 1323func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { 1324 cmdsMap := make(map[int][]Cmder) 1325 for _, cmd := range cmds { 1326 slot := c.cmdSlot(cmd) 1327 cmdsMap[slot] = append(cmdsMap[slot], cmd) 1328 } 1329 return cmdsMap 1330} 1331 1332func (c *ClusterClient) _processTxPipelineNode( 1333 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, 1334) error { 1335 return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { 1336 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { 1337 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { 1338 return writeCmds(wr, cmds) 1339 }) 1340 if err != nil { 1341 return err 1342 } 1343 1344 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { 1345 statusCmd := cmds[0].(*StatusCmd) 1346 // Trim multi and exec. 1347 cmds = cmds[1 : len(cmds)-1] 1348 1349 err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds) 1350 if err != nil { 1351 moved, ask, addr := isMovedError(err) 1352 if moved || ask { 1353 return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds) 1354 } 1355 return err 1356 } 1357 1358 return pipelineReadCmds(rd, cmds) 1359 }) 1360 }) 1361 }) 1362} 1363 1364func (c *ClusterClient) txPipelineReadQueued( 1365 ctx context.Context, 1366 rd *proto.Reader, 1367 statusCmd *StatusCmd, 1368 cmds []Cmder, 1369 failedCmds *cmdsMap, 1370) error { 1371 // Parse queued replies. 1372 if err := statusCmd.readReply(rd); err != nil { 1373 return err 1374 } 1375 1376 for _, cmd := range cmds { 1377 err := statusCmd.readReply(rd) 1378 if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) { 1379 continue 1380 } 1381 return err 1382 } 1383 1384 // Parse number of replies. 1385 line, err := rd.ReadLine() 1386 if err != nil { 1387 if err == Nil { 1388 err = TxFailedErr 1389 } 1390 return err 1391 } 1392 1393 switch line[0] { 1394 case proto.ErrorReply: 1395 return proto.ParseErrorReply(line) 1396 case proto.ArrayReply: 1397 // ok 1398 default: 1399 return fmt.Errorf("redis: expected '*', but got line %q", line) 1400 } 1401 1402 return nil 1403} 1404 1405func (c *ClusterClient) cmdsMoved( 1406 ctx context.Context, cmds []Cmder, 1407 moved, ask bool, 1408 addr string, 1409 failedCmds *cmdsMap, 1410) error { 1411 node, err := c.nodes.Get(addr) 1412 if err != nil { 1413 return err 1414 } 1415 1416 if moved { 1417 c.state.LazyReload() 1418 for _, cmd := range cmds { 1419 failedCmds.Add(node, cmd) 1420 } 1421 return nil 1422 } 1423 1424 if ask { 1425 for _, cmd := range cmds { 1426 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd) 1427 } 1428 return nil 1429 } 1430 1431 return nil 1432} 1433 1434func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { 1435 if len(keys) == 0 { 1436 return fmt.Errorf("redis: Watch requires at least one key") 1437 } 1438 1439 slot := hashtag.Slot(keys[0]) 1440 for _, key := range keys[1:] { 1441 if hashtag.Slot(key) != slot { 1442 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") 1443 return err 1444 } 1445 } 1446 1447 node, err := c.slotMasterNode(ctx, slot) 1448 if err != nil { 1449 return err 1450 } 1451 1452 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1453 if attempt > 0 { 1454 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1455 return err 1456 } 1457 } 1458 1459 err = node.Client.Watch(ctx, fn, keys...) 1460 if err == nil { 1461 break 1462 } 1463 1464 moved, ask, addr := isMovedError(err) 1465 if moved || ask { 1466 node, err = c.nodes.Get(addr) 1467 if err != nil { 1468 return err 1469 } 1470 continue 1471 } 1472 1473 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed { 1474 if isReadOnly { 1475 c.state.LazyReload() 1476 } 1477 node, err = c.slotMasterNode(ctx, slot) 1478 if err != nil { 1479 return err 1480 } 1481 continue 1482 } 1483 1484 if shouldRetry(err, true) { 1485 continue 1486 } 1487 1488 return err 1489 } 1490 1491 return err 1492} 1493 1494func (c *ClusterClient) pubSub() *PubSub { 1495 var node *clusterNode 1496 pubsub := &PubSub{ 1497 opt: c.opt.clientOptions(), 1498 1499 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) { 1500 if node != nil { 1501 panic("node != nil") 1502 } 1503 1504 var err error 1505 if len(channels) > 0 { 1506 slot := hashtag.Slot(channels[0]) 1507 node, err = c.slotMasterNode(ctx, slot) 1508 } else { 1509 node, err = c.nodes.Random() 1510 } 1511 if err != nil { 1512 return nil, err 1513 } 1514 1515 cn, err := node.Client.newConn(context.TODO()) 1516 if err != nil { 1517 node = nil 1518 1519 return nil, err 1520 } 1521 1522 return cn, nil 1523 }, 1524 closeConn: func(cn *pool.Conn) error { 1525 err := node.Client.connPool.CloseConn(cn) 1526 node = nil 1527 return err 1528 }, 1529 } 1530 pubsub.init() 1531 1532 return pubsub 1533} 1534 1535// Subscribe subscribes the client to the specified channels. 1536// Channels can be omitted to create empty subscription. 1537func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub { 1538 pubsub := c.pubSub() 1539 if len(channels) > 0 { 1540 _ = pubsub.Subscribe(ctx, channels...) 1541 } 1542 return pubsub 1543} 1544 1545// PSubscribe subscribes the client to the given patterns. 1546// Patterns can be omitted to create empty subscription. 1547func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub { 1548 pubsub := c.pubSub() 1549 if len(channels) > 0 { 1550 _ = pubsub.PSubscribe(ctx, channels...) 1551 } 1552 return pubsub 1553} 1554 1555func (c *ClusterClient) retryBackoff(attempt int) time.Duration { 1556 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) 1557} 1558 1559func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) { 1560 // Try 3 random nodes. 1561 const nodeLimit = 3 1562 1563 addrs, err := c.nodes.Addrs() 1564 if err != nil { 1565 return nil, err 1566 } 1567 1568 var firstErr error 1569 1570 perm := rand.Perm(len(addrs)) 1571 if len(perm) > nodeLimit { 1572 perm = perm[:nodeLimit] 1573 } 1574 1575 for _, idx := range perm { 1576 addr := addrs[idx] 1577 1578 node, err := c.nodes.Get(addr) 1579 if err != nil { 1580 if firstErr == nil { 1581 firstErr = err 1582 } 1583 continue 1584 } 1585 1586 info, err := node.Client.Command(ctx).Result() 1587 if err == nil { 1588 return info, nil 1589 } 1590 if firstErr == nil { 1591 firstErr = err 1592 } 1593 } 1594 1595 if firstErr == nil { 1596 panic("not reached") 1597 } 1598 return nil, firstErr 1599} 1600 1601func (c *ClusterClient) cmdInfo(name string) *CommandInfo { 1602 cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx) 1603 if err != nil { 1604 return nil 1605 } 1606 1607 info := cmdsInfo[name] 1608 if info == nil { 1609 internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) 1610 } 1611 return info 1612} 1613 1614func (c *ClusterClient) cmdSlot(cmd Cmder) int { 1615 args := cmd.Args() 1616 if args[0] == "cluster" && args[1] == "getkeysinslot" { 1617 return args[2].(int) 1618 } 1619 1620 cmdInfo := c.cmdInfo(cmd.Name()) 1621 return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) 1622} 1623 1624func cmdSlot(cmd Cmder, pos int) int { 1625 if pos == 0 { 1626 return hashtag.RandomSlot() 1627 } 1628 firstKey := cmd.stringArg(pos) 1629 return hashtag.Slot(firstKey) 1630} 1631 1632func (c *ClusterClient) cmdNode( 1633 ctx context.Context, 1634 cmdInfo *CommandInfo, 1635 slot int, 1636) (*clusterNode, error) { 1637 state, err := c.state.Get(ctx) 1638 if err != nil { 1639 return nil, err 1640 } 1641 1642 if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { 1643 return c.slotReadOnlyNode(state, slot) 1644 } 1645 return state.slotMasterNode(slot) 1646} 1647 1648func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) { 1649 if c.opt.RouteByLatency { 1650 return state.slotClosestNode(slot) 1651 } 1652 if c.opt.RouteRandomly { 1653 return state.slotRandomNode(slot) 1654 } 1655 return state.slotSlaveNode(slot) 1656} 1657 1658func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) { 1659 state, err := c.state.Get(ctx) 1660 if err != nil { 1661 return nil, err 1662 } 1663 return state.slotMasterNode(slot) 1664} 1665 1666// SlaveForKey gets a client for a replica node to run any command on it. 1667// This is especially useful if we want to run a particular lua script which has 1668// only read only commands on the replica. 1669// This is because other redis commands generally have a flag that points that 1670// they are read only and automatically run on the replica nodes 1671// if ClusterOptions.ReadOnly flag is set to true. 1672func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) { 1673 state, err := c.state.Get(ctx) 1674 if err != nil { 1675 return nil, err 1676 } 1677 slot := hashtag.Slot(key) 1678 node, err := c.slotReadOnlyNode(state, slot) 1679 if err != nil { 1680 return nil, err 1681 } 1682 return node.Client, err 1683} 1684 1685// MasterForKey return a client to the master node for a particular key. 1686func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) { 1687 slot := hashtag.Slot(key) 1688 node, err := c.slotMasterNode(ctx, slot) 1689 if err != nil { 1690 return nil, err 1691 } 1692 return node.Client, err 1693} 1694 1695func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { 1696 for _, n := range nodes { 1697 if n == node { 1698 return nodes 1699 } 1700 } 1701 return append(nodes, node) 1702} 1703 1704func appendIfNotExists(ss []string, es ...string) []string { 1705loop: 1706 for _, e := range es { 1707 for _, s := range ss { 1708 if s == e { 1709 continue loop 1710 } 1711 } 1712 ss = append(ss, e) 1713 } 1714 return ss 1715} 1716 1717//------------------------------------------------------------------------------ 1718 1719type cmdsMap struct { 1720 mu sync.Mutex 1721 m map[*clusterNode][]Cmder 1722} 1723 1724func newCmdsMap() *cmdsMap { 1725 return &cmdsMap{ 1726 m: make(map[*clusterNode][]Cmder), 1727 } 1728} 1729 1730func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) { 1731 m.mu.Lock() 1732 m.m[node] = append(m.m[node], cmds...) 1733 m.mu.Unlock() 1734} 1735