1package redis 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "math" 8 "net" 9 "runtime" 10 "sort" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/go-redis/redis/v8/internal" 16 "github.com/go-redis/redis/v8/internal/hashtag" 17 "github.com/go-redis/redis/v8/internal/pool" 18 "github.com/go-redis/redis/v8/internal/proto" 19 "github.com/go-redis/redis/v8/internal/rand" 20) 21 22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") 23 24// ClusterOptions are used to configure a cluster client and should be 25// passed to NewClusterClient. 26type ClusterOptions struct { 27 // A seed list of host:port addresses of cluster nodes. 28 Addrs []string 29 30 // NewClient creates a cluster node client with provided name and options. 31 NewClient func(opt *Options) *Client 32 33 // The maximum number of retries before giving up. Command is retried 34 // on network errors and MOVED/ASK redirects. 35 // Default is 3 retries. 36 MaxRedirects int 37 38 // Enables read-only commands on slave nodes. 39 ReadOnly bool 40 // Allows routing read-only commands to the closest master or slave node. 41 // It automatically enables ReadOnly. 42 RouteByLatency bool 43 // Allows routing read-only commands to the random master or slave node. 44 // It automatically enables ReadOnly. 45 RouteRandomly bool 46 47 // Optional function that returns cluster slots information. 48 // It is useful to manually create cluster of standalone Redis servers 49 // and load-balance read/write operations between master and slaves. 50 // It can use service like ZooKeeper to maintain configuration information 51 // and Cluster.ReloadState to manually trigger state reloading. 52 ClusterSlots func(context.Context) ([]ClusterSlot, error) 53 54 // Following options are copied from Options struct. 55 56 Dialer func(ctx context.Context, network, addr string) (net.Conn, error) 57 58 OnConnect func(ctx context.Context, cn *Conn) error 59 60 Username string 61 Password string 62 63 MaxRetries int 64 MinRetryBackoff time.Duration 65 MaxRetryBackoff time.Duration 66 67 DialTimeout time.Duration 68 ReadTimeout time.Duration 69 WriteTimeout time.Duration 70 71 // PoolSize applies per cluster node and not for the whole cluster. 72 PoolSize int 73 MinIdleConns int 74 MaxConnAge time.Duration 75 PoolTimeout time.Duration 76 IdleTimeout time.Duration 77 IdleCheckFrequency time.Duration 78 79 TLSConfig *tls.Config 80} 81 82func (opt *ClusterOptions) init() { 83 if opt.MaxRedirects == -1 { 84 opt.MaxRedirects = 0 85 } else if opt.MaxRedirects == 0 { 86 opt.MaxRedirects = 3 87 } 88 89 if opt.RouteByLatency || opt.RouteRandomly { 90 opt.ReadOnly = true 91 } 92 93 if opt.PoolSize == 0 { 94 opt.PoolSize = 5 * runtime.NumCPU() 95 } 96 97 switch opt.ReadTimeout { 98 case -1: 99 opt.ReadTimeout = 0 100 case 0: 101 opt.ReadTimeout = 3 * time.Second 102 } 103 switch opt.WriteTimeout { 104 case -1: 105 opt.WriteTimeout = 0 106 case 0: 107 opt.WriteTimeout = opt.ReadTimeout 108 } 109 110 if opt.MaxRetries == 0 { 111 opt.MaxRetries = -1 112 } 113 switch opt.MinRetryBackoff { 114 case -1: 115 opt.MinRetryBackoff = 0 116 case 0: 117 opt.MinRetryBackoff = 8 * time.Millisecond 118 } 119 switch opt.MaxRetryBackoff { 120 case -1: 121 opt.MaxRetryBackoff = 0 122 case 0: 123 opt.MaxRetryBackoff = 512 * time.Millisecond 124 } 125 126 if opt.NewClient == nil { 127 opt.NewClient = NewClient 128 } 129} 130 131func (opt *ClusterOptions) clientOptions() *Options { 132 const disableIdleCheck = -1 133 134 return &Options{ 135 Dialer: opt.Dialer, 136 OnConnect: opt.OnConnect, 137 138 Username: opt.Username, 139 Password: opt.Password, 140 141 MaxRetries: opt.MaxRetries, 142 MinRetryBackoff: opt.MinRetryBackoff, 143 MaxRetryBackoff: opt.MaxRetryBackoff, 144 145 DialTimeout: opt.DialTimeout, 146 ReadTimeout: opt.ReadTimeout, 147 WriteTimeout: opt.WriteTimeout, 148 149 PoolSize: opt.PoolSize, 150 MinIdleConns: opt.MinIdleConns, 151 MaxConnAge: opt.MaxConnAge, 152 PoolTimeout: opt.PoolTimeout, 153 IdleTimeout: opt.IdleTimeout, 154 IdleCheckFrequency: disableIdleCheck, 155 156 TLSConfig: opt.TLSConfig, 157 // If ClusterSlots is populated, then we probably have an artificial 158 // cluster whose nodes are not in clustering mode (otherwise there isn't 159 // much use for ClusterSlots config). This means we cannot execute the 160 // READONLY command against that node -- setting readOnly to false in such 161 // situations in the options below will prevent that from happening. 162 readOnly: opt.ReadOnly && opt.ClusterSlots == nil, 163 } 164} 165 166//------------------------------------------------------------------------------ 167 168type clusterNode struct { 169 Client *Client 170 171 latency uint32 // atomic 172 generation uint32 // atomic 173 failing uint32 // atomic 174} 175 176func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { 177 opt := clOpt.clientOptions() 178 opt.Addr = addr 179 node := clusterNode{ 180 Client: clOpt.NewClient(opt), 181 } 182 183 node.latency = math.MaxUint32 184 if clOpt.RouteByLatency { 185 go node.updateLatency() 186 } 187 188 return &node 189} 190 191func (n *clusterNode) String() string { 192 return n.Client.String() 193} 194 195func (n *clusterNode) Close() error { 196 return n.Client.Close() 197} 198 199func (n *clusterNode) updateLatency() { 200 const numProbe = 10 201 var dur uint64 202 203 for i := 0; i < numProbe; i++ { 204 time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond) 205 206 start := time.Now() 207 n.Client.Ping(context.TODO()) 208 dur += uint64(time.Since(start) / time.Microsecond) 209 } 210 211 latency := float64(dur) / float64(numProbe) 212 atomic.StoreUint32(&n.latency, uint32(latency+0.5)) 213} 214 215func (n *clusterNode) Latency() time.Duration { 216 latency := atomic.LoadUint32(&n.latency) 217 return time.Duration(latency) * time.Microsecond 218} 219 220func (n *clusterNode) MarkAsFailing() { 221 atomic.StoreUint32(&n.failing, uint32(time.Now().Unix())) 222} 223 224func (n *clusterNode) Failing() bool { 225 const timeout = 15 // 15 seconds 226 227 failing := atomic.LoadUint32(&n.failing) 228 if failing == 0 { 229 return false 230 } 231 if time.Now().Unix()-int64(failing) < timeout { 232 return true 233 } 234 atomic.StoreUint32(&n.failing, 0) 235 return false 236} 237 238func (n *clusterNode) Generation() uint32 { 239 return atomic.LoadUint32(&n.generation) 240} 241 242func (n *clusterNode) SetGeneration(gen uint32) { 243 for { 244 v := atomic.LoadUint32(&n.generation) 245 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) { 246 break 247 } 248 } 249} 250 251//------------------------------------------------------------------------------ 252 253type clusterNodes struct { 254 opt *ClusterOptions 255 256 mu sync.RWMutex 257 addrs []string 258 nodes map[string]*clusterNode 259 activeAddrs []string 260 closed bool 261 262 _generation uint32 // atomic 263} 264 265func newClusterNodes(opt *ClusterOptions) *clusterNodes { 266 return &clusterNodes{ 267 opt: opt, 268 269 addrs: opt.Addrs, 270 nodes: make(map[string]*clusterNode), 271 } 272} 273 274func (c *clusterNodes) Close() error { 275 c.mu.Lock() 276 defer c.mu.Unlock() 277 278 if c.closed { 279 return nil 280 } 281 c.closed = true 282 283 var firstErr error 284 for _, node := range c.nodes { 285 if err := node.Client.Close(); err != nil && firstErr == nil { 286 firstErr = err 287 } 288 } 289 290 c.nodes = nil 291 c.activeAddrs = nil 292 293 return firstErr 294} 295 296func (c *clusterNodes) Addrs() ([]string, error) { 297 var addrs []string 298 299 c.mu.RLock() 300 closed := c.closed //nolint:ifshort 301 if !closed { 302 if len(c.activeAddrs) > 0 { 303 addrs = c.activeAddrs 304 } else { 305 addrs = c.addrs 306 } 307 } 308 c.mu.RUnlock() 309 310 if closed { 311 return nil, pool.ErrClosed 312 } 313 if len(addrs) == 0 { 314 return nil, errClusterNoNodes 315 } 316 return addrs, nil 317} 318 319func (c *clusterNodes) NextGeneration() uint32 { 320 return atomic.AddUint32(&c._generation, 1) 321} 322 323// GC removes unused nodes. 324func (c *clusterNodes) GC(generation uint32) { 325 //nolint:prealloc 326 var collected []*clusterNode 327 328 c.mu.Lock() 329 330 c.activeAddrs = c.activeAddrs[:0] 331 for addr, node := range c.nodes { 332 if node.Generation() >= generation { 333 c.activeAddrs = append(c.activeAddrs, addr) 334 if c.opt.RouteByLatency { 335 go node.updateLatency() 336 } 337 continue 338 } 339 340 delete(c.nodes, addr) 341 collected = append(collected, node) 342 } 343 344 c.mu.Unlock() 345 346 for _, node := range collected { 347 _ = node.Client.Close() 348 } 349} 350 351func (c *clusterNodes) Get(addr string) (*clusterNode, error) { 352 node, err := c.get(addr) 353 if err != nil { 354 return nil, err 355 } 356 if node != nil { 357 return node, nil 358 } 359 360 c.mu.Lock() 361 defer c.mu.Unlock() 362 363 if c.closed { 364 return nil, pool.ErrClosed 365 } 366 367 node, ok := c.nodes[addr] 368 if ok { 369 return node, nil 370 } 371 372 node = newClusterNode(c.opt, addr) 373 374 c.addrs = appendIfNotExists(c.addrs, addr) 375 c.nodes[addr] = node 376 377 return node, nil 378} 379 380func (c *clusterNodes) get(addr string) (*clusterNode, error) { 381 var node *clusterNode 382 var err error 383 c.mu.RLock() 384 if c.closed { 385 err = pool.ErrClosed 386 } else { 387 node = c.nodes[addr] 388 } 389 c.mu.RUnlock() 390 return node, err 391} 392 393func (c *clusterNodes) All() ([]*clusterNode, error) { 394 c.mu.RLock() 395 defer c.mu.RUnlock() 396 397 if c.closed { 398 return nil, pool.ErrClosed 399 } 400 401 cp := make([]*clusterNode, 0, len(c.nodes)) 402 for _, node := range c.nodes { 403 cp = append(cp, node) 404 } 405 return cp, nil 406} 407 408func (c *clusterNodes) Random() (*clusterNode, error) { 409 addrs, err := c.Addrs() 410 if err != nil { 411 return nil, err 412 } 413 414 n := rand.Intn(len(addrs)) 415 return c.Get(addrs[n]) 416} 417 418//------------------------------------------------------------------------------ 419 420type clusterSlot struct { 421 start, end int 422 nodes []*clusterNode 423} 424 425type clusterSlotSlice []*clusterSlot 426 427func (p clusterSlotSlice) Len() int { 428 return len(p) 429} 430 431func (p clusterSlotSlice) Less(i, j int) bool { 432 return p[i].start < p[j].start 433} 434 435func (p clusterSlotSlice) Swap(i, j int) { 436 p[i], p[j] = p[j], p[i] 437} 438 439type clusterState struct { 440 nodes *clusterNodes 441 Masters []*clusterNode 442 Slaves []*clusterNode 443 444 slots []*clusterSlot 445 446 generation uint32 447 createdAt time.Time 448} 449 450func newClusterState( 451 nodes *clusterNodes, slots []ClusterSlot, origin string, 452) (*clusterState, error) { 453 c := clusterState{ 454 nodes: nodes, 455 456 slots: make([]*clusterSlot, 0, len(slots)), 457 458 generation: nodes.NextGeneration(), 459 createdAt: time.Now(), 460 } 461 462 originHost, _, _ := net.SplitHostPort(origin) 463 isLoopbackOrigin := isLoopback(originHost) 464 465 for _, slot := range slots { 466 var nodes []*clusterNode 467 for i, slotNode := range slot.Nodes { 468 addr := slotNode.Addr 469 if !isLoopbackOrigin { 470 addr = replaceLoopbackHost(addr, originHost) 471 } 472 473 node, err := c.nodes.Get(addr) 474 if err != nil { 475 return nil, err 476 } 477 478 node.SetGeneration(c.generation) 479 nodes = append(nodes, node) 480 481 if i == 0 { 482 c.Masters = appendUniqueNode(c.Masters, node) 483 } else { 484 c.Slaves = appendUniqueNode(c.Slaves, node) 485 } 486 } 487 488 c.slots = append(c.slots, &clusterSlot{ 489 start: slot.Start, 490 end: slot.End, 491 nodes: nodes, 492 }) 493 } 494 495 sort.Sort(clusterSlotSlice(c.slots)) 496 497 time.AfterFunc(time.Minute, func() { 498 nodes.GC(c.generation) 499 }) 500 501 return &c, nil 502} 503 504func replaceLoopbackHost(nodeAddr, originHost string) string { 505 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) 506 if err != nil { 507 return nodeAddr 508 } 509 510 nodeIP := net.ParseIP(nodeHost) 511 if nodeIP == nil { 512 return nodeAddr 513 } 514 515 if !nodeIP.IsLoopback() { 516 return nodeAddr 517 } 518 519 // Use origin host which is not loopback and node port. 520 return net.JoinHostPort(originHost, nodePort) 521} 522 523func isLoopback(host string) bool { 524 ip := net.ParseIP(host) 525 if ip == nil { 526 return true 527 } 528 return ip.IsLoopback() 529} 530 531func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { 532 nodes := c.slotNodes(slot) 533 if len(nodes) > 0 { 534 return nodes[0], nil 535 } 536 return c.nodes.Random() 537} 538 539func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { 540 nodes := c.slotNodes(slot) 541 switch len(nodes) { 542 case 0: 543 return c.nodes.Random() 544 case 1: 545 return nodes[0], nil 546 case 2: 547 if slave := nodes[1]; !slave.Failing() { 548 return slave, nil 549 } 550 return nodes[0], nil 551 default: 552 var slave *clusterNode 553 for i := 0; i < 10; i++ { 554 n := rand.Intn(len(nodes)-1) + 1 555 slave = nodes[n] 556 if !slave.Failing() { 557 return slave, nil 558 } 559 } 560 561 // All slaves are loading - use master. 562 return nodes[0], nil 563 } 564} 565 566func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { 567 nodes := c.slotNodes(slot) 568 if len(nodes) == 0 { 569 return c.nodes.Random() 570 } 571 572 var node *clusterNode 573 for _, n := range nodes { 574 if n.Failing() { 575 continue 576 } 577 if node == nil || n.Latency() < node.Latency() { 578 node = n 579 } 580 } 581 if node != nil { 582 return node, nil 583 } 584 585 // If all nodes are failing - return random node 586 return c.nodes.Random() 587} 588 589func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) { 590 nodes := c.slotNodes(slot) 591 if len(nodes) == 0 { 592 return c.nodes.Random() 593 } 594 n := rand.Intn(len(nodes)) 595 return nodes[n], nil 596} 597 598func (c *clusterState) slotNodes(slot int) []*clusterNode { 599 i := sort.Search(len(c.slots), func(i int) bool { 600 return c.slots[i].end >= slot 601 }) 602 if i >= len(c.slots) { 603 return nil 604 } 605 x := c.slots[i] 606 if slot >= x.start && slot <= x.end { 607 return x.nodes 608 } 609 return nil 610} 611 612//------------------------------------------------------------------------------ 613 614type clusterStateHolder struct { 615 load func(ctx context.Context) (*clusterState, error) 616 617 state atomic.Value 618 reloading uint32 // atomic 619} 620 621func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder { 622 return &clusterStateHolder{ 623 load: fn, 624 } 625} 626 627func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) { 628 state, err := c.load(ctx) 629 if err != nil { 630 return nil, err 631 } 632 c.state.Store(state) 633 return state, nil 634} 635 636func (c *clusterStateHolder) LazyReload() { 637 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { 638 return 639 } 640 go func() { 641 defer atomic.StoreUint32(&c.reloading, 0) 642 643 _, err := c.Reload(context.Background()) 644 if err != nil { 645 return 646 } 647 time.Sleep(200 * time.Millisecond) 648 }() 649} 650 651func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) { 652 v := c.state.Load() 653 if v == nil { 654 return c.Reload(ctx) 655 } 656 657 state := v.(*clusterState) 658 if time.Since(state.createdAt) > 10*time.Second { 659 c.LazyReload() 660 } 661 return state, nil 662} 663 664func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) { 665 state, err := c.Reload(ctx) 666 if err == nil { 667 return state, nil 668 } 669 return c.Get(ctx) 670} 671 672//------------------------------------------------------------------------------ 673 674type clusterClient struct { 675 opt *ClusterOptions 676 nodes *clusterNodes 677 state *clusterStateHolder //nolint:structcheck 678 cmdsInfoCache *cmdsInfoCache //nolint:structcheck 679} 680 681// ClusterClient is a Redis Cluster client representing a pool of zero 682// or more underlying connections. It's safe for concurrent use by 683// multiple goroutines. 684type ClusterClient struct { 685 *clusterClient 686 cmdable 687 hooks 688 ctx context.Context 689} 690 691// NewClusterClient returns a Redis Cluster client as described in 692// http://redis.io/topics/cluster-spec. 693func NewClusterClient(opt *ClusterOptions) *ClusterClient { 694 opt.init() 695 696 c := &ClusterClient{ 697 clusterClient: &clusterClient{ 698 opt: opt, 699 nodes: newClusterNodes(opt), 700 }, 701 ctx: context.Background(), 702 } 703 c.state = newClusterStateHolder(c.loadState) 704 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) 705 c.cmdable = c.Process 706 707 if opt.IdleCheckFrequency > 0 { 708 go c.reaper(opt.IdleCheckFrequency) 709 } 710 711 return c 712} 713 714func (c *ClusterClient) Context() context.Context { 715 return c.ctx 716} 717 718func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { 719 if ctx == nil { 720 panic("nil context") 721 } 722 clone := *c 723 clone.cmdable = clone.Process 724 clone.hooks.lock() 725 clone.ctx = ctx 726 return &clone 727} 728 729// Options returns read-only Options that were used to create the client. 730func (c *ClusterClient) Options() *ClusterOptions { 731 return c.opt 732} 733 734// ReloadState reloads cluster state. If available it calls ClusterSlots func 735// to get cluster slots information. 736func (c *ClusterClient) ReloadState(ctx context.Context) { 737 c.state.LazyReload() 738} 739 740// Close closes the cluster client, releasing any open resources. 741// 742// It is rare to Close a ClusterClient, as the ClusterClient is meant 743// to be long-lived and shared between many goroutines. 744func (c *ClusterClient) Close() error { 745 return c.nodes.Close() 746} 747 748// Do creates a Cmd from the args and processes the cmd. 749func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd { 750 cmd := NewCmd(ctx, args...) 751 _ = c.Process(ctx, cmd) 752 return cmd 753} 754 755func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { 756 return c.hooks.process(ctx, cmd, c.process) 757} 758 759func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { 760 cmdInfo := c.cmdInfo(cmd.Name()) 761 slot := c.cmdSlot(cmd) 762 763 var node *clusterNode 764 var ask bool 765 var lastErr error 766 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 767 if attempt > 0 { 768 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 769 return err 770 } 771 } 772 773 if node == nil { 774 var err error 775 node, err = c.cmdNode(ctx, cmdInfo, slot) 776 if err != nil { 777 return err 778 } 779 } 780 781 if ask { 782 pipe := node.Client.Pipeline() 783 _ = pipe.Process(ctx, NewCmd(ctx, "asking")) 784 _ = pipe.Process(ctx, cmd) 785 _, lastErr = pipe.Exec(ctx) 786 _ = pipe.Close() 787 ask = false 788 } else { 789 lastErr = node.Client.Process(ctx, cmd) 790 } 791 792 // If there is no error - we are done. 793 if lastErr == nil { 794 return nil 795 } 796 if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed { 797 if isReadOnly { 798 c.state.LazyReload() 799 } 800 node = nil 801 continue 802 } 803 804 // If slave is loading - pick another node. 805 if c.opt.ReadOnly && isLoadingError(lastErr) { 806 node.MarkAsFailing() 807 node = nil 808 continue 809 } 810 811 var moved bool 812 var addr string 813 moved, ask, addr = isMovedError(lastErr) 814 if moved || ask { 815 var err error 816 node, err = c.nodes.Get(addr) 817 if err != nil { 818 return err 819 } 820 continue 821 } 822 823 if shouldRetry(lastErr, cmd.readTimeout() == nil) { 824 // First retry the same node. 825 if attempt == 0 { 826 continue 827 } 828 829 // Second try another node. 830 node.MarkAsFailing() 831 node = nil 832 continue 833 } 834 835 return lastErr 836 } 837 return lastErr 838} 839 840// ForEachMaster concurrently calls the fn on each master node in the cluster. 841// It returns the first error if any. 842func (c *ClusterClient) ForEachMaster( 843 ctx context.Context, 844 fn func(ctx context.Context, client *Client) error, 845) error { 846 state, err := c.state.ReloadOrGet(ctx) 847 if err != nil { 848 return err 849 } 850 851 var wg sync.WaitGroup 852 errCh := make(chan error, 1) 853 854 for _, master := range state.Masters { 855 wg.Add(1) 856 go func(node *clusterNode) { 857 defer wg.Done() 858 err := fn(ctx, node.Client) 859 if err != nil { 860 select { 861 case errCh <- err: 862 default: 863 } 864 } 865 }(master) 866 } 867 868 wg.Wait() 869 870 select { 871 case err := <-errCh: 872 return err 873 default: 874 return nil 875 } 876} 877 878// ForEachSlave concurrently calls the fn on each slave node in the cluster. 879// It returns the first error if any. 880func (c *ClusterClient) ForEachSlave( 881 ctx context.Context, 882 fn func(ctx context.Context, client *Client) error, 883) error { 884 state, err := c.state.ReloadOrGet(ctx) 885 if err != nil { 886 return err 887 } 888 889 var wg sync.WaitGroup 890 errCh := make(chan error, 1) 891 892 for _, slave := range state.Slaves { 893 wg.Add(1) 894 go func(node *clusterNode) { 895 defer wg.Done() 896 err := fn(ctx, node.Client) 897 if err != nil { 898 select { 899 case errCh <- err: 900 default: 901 } 902 } 903 }(slave) 904 } 905 906 wg.Wait() 907 908 select { 909 case err := <-errCh: 910 return err 911 default: 912 return nil 913 } 914} 915 916// ForEachShard concurrently calls the fn on each known node in the cluster. 917// It returns the first error if any. 918func (c *ClusterClient) ForEachShard( 919 ctx context.Context, 920 fn func(ctx context.Context, client *Client) error, 921) error { 922 state, err := c.state.ReloadOrGet(ctx) 923 if err != nil { 924 return err 925 } 926 927 var wg sync.WaitGroup 928 errCh := make(chan error, 1) 929 930 worker := func(node *clusterNode) { 931 defer wg.Done() 932 err := fn(ctx, node.Client) 933 if err != nil { 934 select { 935 case errCh <- err: 936 default: 937 } 938 } 939 } 940 941 for _, node := range state.Masters { 942 wg.Add(1) 943 go worker(node) 944 } 945 for _, node := range state.Slaves { 946 wg.Add(1) 947 go worker(node) 948 } 949 950 wg.Wait() 951 952 select { 953 case err := <-errCh: 954 return err 955 default: 956 return nil 957 } 958} 959 960// PoolStats returns accumulated connection pool stats. 961func (c *ClusterClient) PoolStats() *PoolStats { 962 var acc PoolStats 963 964 state, _ := c.state.Get(context.TODO()) 965 if state == nil { 966 return &acc 967 } 968 969 for _, node := range state.Masters { 970 s := node.Client.connPool.Stats() 971 acc.Hits += s.Hits 972 acc.Misses += s.Misses 973 acc.Timeouts += s.Timeouts 974 975 acc.TotalConns += s.TotalConns 976 acc.IdleConns += s.IdleConns 977 acc.StaleConns += s.StaleConns 978 } 979 980 for _, node := range state.Slaves { 981 s := node.Client.connPool.Stats() 982 acc.Hits += s.Hits 983 acc.Misses += s.Misses 984 acc.Timeouts += s.Timeouts 985 986 acc.TotalConns += s.TotalConns 987 acc.IdleConns += s.IdleConns 988 acc.StaleConns += s.StaleConns 989 } 990 991 return &acc 992} 993 994func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) { 995 if c.opt.ClusterSlots != nil { 996 slots, err := c.opt.ClusterSlots(ctx) 997 if err != nil { 998 return nil, err 999 } 1000 return newClusterState(c.nodes, slots, "") 1001 } 1002 1003 addrs, err := c.nodes.Addrs() 1004 if err != nil { 1005 return nil, err 1006 } 1007 1008 var firstErr error 1009 1010 for _, idx := range rand.Perm(len(addrs)) { 1011 addr := addrs[idx] 1012 1013 node, err := c.nodes.Get(addr) 1014 if err != nil { 1015 if firstErr == nil { 1016 firstErr = err 1017 } 1018 continue 1019 } 1020 1021 slots, err := node.Client.ClusterSlots(ctx).Result() 1022 if err != nil { 1023 if firstErr == nil { 1024 firstErr = err 1025 } 1026 continue 1027 } 1028 1029 return newClusterState(c.nodes, slots, node.Client.opt.Addr) 1030 } 1031 1032 /* 1033 * No node is connectable. It's possible that all nodes' IP has changed. 1034 * Clear activeAddrs to let client be able to re-connect using the initial 1035 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]), 1036 * which might have chance to resolve domain name and get updated IP address. 1037 */ 1038 c.nodes.mu.Lock() 1039 c.nodes.activeAddrs = nil 1040 c.nodes.mu.Unlock() 1041 1042 return nil, firstErr 1043} 1044 1045// reaper closes idle connections to the cluster. 1046func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { 1047 ticker := time.NewTicker(idleCheckFrequency) 1048 defer ticker.Stop() 1049 1050 for range ticker.C { 1051 nodes, err := c.nodes.All() 1052 if err != nil { 1053 break 1054 } 1055 1056 for _, node := range nodes { 1057 _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() 1058 if err != nil { 1059 internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err) 1060 } 1061 } 1062 } 1063} 1064 1065func (c *ClusterClient) Pipeline() Pipeliner { 1066 pipe := Pipeline{ 1067 ctx: c.ctx, 1068 exec: c.processPipeline, 1069 } 1070 pipe.init() 1071 return &pipe 1072} 1073 1074func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 1075 return c.Pipeline().Pipelined(ctx, fn) 1076} 1077 1078func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error { 1079 return c.hooks.processPipeline(ctx, cmds, c._processPipeline) 1080} 1081 1082func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error { 1083 cmdsMap := newCmdsMap() 1084 err := c.mapCmdsByNode(ctx, cmdsMap, cmds) 1085 if err != nil { 1086 setCmdsErr(cmds, err) 1087 return err 1088 } 1089 1090 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1091 if attempt > 0 { 1092 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1093 setCmdsErr(cmds, err) 1094 return err 1095 } 1096 } 1097 1098 failedCmds := newCmdsMap() 1099 var wg sync.WaitGroup 1100 1101 for node, cmds := range cmdsMap.m { 1102 wg.Add(1) 1103 go func(node *clusterNode, cmds []Cmder) { 1104 defer wg.Done() 1105 1106 err := c._processPipelineNode(ctx, node, cmds, failedCmds) 1107 if err == nil { 1108 return 1109 } 1110 if attempt < c.opt.MaxRedirects { 1111 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { 1112 setCmdsErr(cmds, err) 1113 } 1114 } else { 1115 setCmdsErr(cmds, err) 1116 } 1117 }(node, cmds) 1118 } 1119 1120 wg.Wait() 1121 if len(failedCmds.m) == 0 { 1122 break 1123 } 1124 cmdsMap = failedCmds 1125 } 1126 1127 return cmdsFirstErr(cmds) 1128} 1129 1130func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error { 1131 state, err := c.state.Get(ctx) 1132 if err != nil { 1133 return err 1134 } 1135 1136 if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) { 1137 for _, cmd := range cmds { 1138 slot := c.cmdSlot(cmd) 1139 node, err := c.slotReadOnlyNode(state, slot) 1140 if err != nil { 1141 return err 1142 } 1143 cmdsMap.Add(node, cmd) 1144 } 1145 return nil 1146 } 1147 1148 for _, cmd := range cmds { 1149 slot := c.cmdSlot(cmd) 1150 node, err := state.slotMasterNode(slot) 1151 if err != nil { 1152 return err 1153 } 1154 cmdsMap.Add(node, cmd) 1155 } 1156 return nil 1157} 1158 1159func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { 1160 for _, cmd := range cmds { 1161 cmdInfo := c.cmdInfo(cmd.Name()) 1162 if cmdInfo == nil || !cmdInfo.ReadOnly { 1163 return false 1164 } 1165 } 1166 return true 1167} 1168 1169func (c *ClusterClient) _processPipelineNode( 1170 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, 1171) error { 1172 return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { 1173 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { 1174 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { 1175 return writeCmds(wr, cmds) 1176 }) 1177 if err != nil { 1178 return err 1179 } 1180 1181 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { 1182 return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) 1183 }) 1184 }) 1185 }) 1186} 1187 1188func (c *ClusterClient) pipelineReadCmds( 1189 ctx context.Context, 1190 node *clusterNode, 1191 rd *proto.Reader, 1192 cmds []Cmder, 1193 failedCmds *cmdsMap, 1194) error { 1195 for _, cmd := range cmds { 1196 err := cmd.readReply(rd) 1197 cmd.SetErr(err) 1198 1199 if err == nil { 1200 continue 1201 } 1202 1203 if c.checkMovedErr(ctx, cmd, err, failedCmds) { 1204 continue 1205 } 1206 1207 if c.opt.ReadOnly && isLoadingError(err) { 1208 node.MarkAsFailing() 1209 return err 1210 } 1211 if isRedisError(err) { 1212 continue 1213 } 1214 return err 1215 } 1216 return nil 1217} 1218 1219func (c *ClusterClient) checkMovedErr( 1220 ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap, 1221) bool { 1222 moved, ask, addr := isMovedError(err) 1223 if !moved && !ask { 1224 return false 1225 } 1226 1227 node, err := c.nodes.Get(addr) 1228 if err != nil { 1229 return false 1230 } 1231 1232 if moved { 1233 c.state.LazyReload() 1234 failedCmds.Add(node, cmd) 1235 return true 1236 } 1237 1238 if ask { 1239 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd) 1240 return true 1241 } 1242 1243 panic("not reached") 1244} 1245 1246// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. 1247func (c *ClusterClient) TxPipeline() Pipeliner { 1248 pipe := Pipeline{ 1249 ctx: c.ctx, 1250 exec: c.processTxPipeline, 1251 } 1252 pipe.init() 1253 return &pipe 1254} 1255 1256func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 1257 return c.TxPipeline().Pipelined(ctx, fn) 1258} 1259 1260func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { 1261 return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline) 1262} 1263 1264func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error { 1265 // Trim multi .. exec. 1266 cmds = cmds[1 : len(cmds)-1] 1267 1268 state, err := c.state.Get(ctx) 1269 if err != nil { 1270 setCmdsErr(cmds, err) 1271 return err 1272 } 1273 1274 cmdsMap := c.mapCmdsBySlot(cmds) 1275 for slot, cmds := range cmdsMap { 1276 node, err := state.slotMasterNode(slot) 1277 if err != nil { 1278 setCmdsErr(cmds, err) 1279 continue 1280 } 1281 1282 cmdsMap := map[*clusterNode][]Cmder{node: cmds} 1283 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1284 if attempt > 0 { 1285 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1286 setCmdsErr(cmds, err) 1287 return err 1288 } 1289 } 1290 1291 failedCmds := newCmdsMap() 1292 var wg sync.WaitGroup 1293 1294 for node, cmds := range cmdsMap { 1295 wg.Add(1) 1296 go func(node *clusterNode, cmds []Cmder) { 1297 defer wg.Done() 1298 1299 err := c._processTxPipelineNode(ctx, node, cmds, failedCmds) 1300 if err == nil { 1301 return 1302 } 1303 1304 if attempt < c.opt.MaxRedirects { 1305 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { 1306 setCmdsErr(cmds, err) 1307 } 1308 } else { 1309 setCmdsErr(cmds, err) 1310 } 1311 }(node, cmds) 1312 } 1313 1314 wg.Wait() 1315 if len(failedCmds.m) == 0 { 1316 break 1317 } 1318 cmdsMap = failedCmds.m 1319 } 1320 } 1321 1322 return cmdsFirstErr(cmds) 1323} 1324 1325func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { 1326 cmdsMap := make(map[int][]Cmder) 1327 for _, cmd := range cmds { 1328 slot := c.cmdSlot(cmd) 1329 cmdsMap[slot] = append(cmdsMap[slot], cmd) 1330 } 1331 return cmdsMap 1332} 1333 1334func (c *ClusterClient) _processTxPipelineNode( 1335 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, 1336) error { 1337 return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { 1338 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { 1339 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { 1340 return writeCmds(wr, cmds) 1341 }) 1342 if err != nil { 1343 return err 1344 } 1345 1346 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { 1347 statusCmd := cmds[0].(*StatusCmd) 1348 // Trim multi and exec. 1349 cmds = cmds[1 : len(cmds)-1] 1350 1351 err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds) 1352 if err != nil { 1353 moved, ask, addr := isMovedError(err) 1354 if moved || ask { 1355 return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds) 1356 } 1357 return err 1358 } 1359 1360 return pipelineReadCmds(rd, cmds) 1361 }) 1362 }) 1363 }) 1364} 1365 1366func (c *ClusterClient) txPipelineReadQueued( 1367 ctx context.Context, 1368 rd *proto.Reader, 1369 statusCmd *StatusCmd, 1370 cmds []Cmder, 1371 failedCmds *cmdsMap, 1372) error { 1373 // Parse queued replies. 1374 if err := statusCmd.readReply(rd); err != nil { 1375 return err 1376 } 1377 1378 for _, cmd := range cmds { 1379 err := statusCmd.readReply(rd) 1380 if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) { 1381 continue 1382 } 1383 return err 1384 } 1385 1386 // Parse number of replies. 1387 line, err := rd.ReadLine() 1388 if err != nil { 1389 if err == Nil { 1390 err = TxFailedErr 1391 } 1392 return err 1393 } 1394 1395 switch line[0] { 1396 case proto.ErrorReply: 1397 return proto.ParseErrorReply(line) 1398 case proto.ArrayReply: 1399 // ok 1400 default: 1401 return fmt.Errorf("redis: expected '*', but got line %q", line) 1402 } 1403 1404 return nil 1405} 1406 1407func (c *ClusterClient) cmdsMoved( 1408 ctx context.Context, cmds []Cmder, 1409 moved, ask bool, 1410 addr string, 1411 failedCmds *cmdsMap, 1412) error { 1413 node, err := c.nodes.Get(addr) 1414 if err != nil { 1415 return err 1416 } 1417 1418 if moved { 1419 c.state.LazyReload() 1420 for _, cmd := range cmds { 1421 failedCmds.Add(node, cmd) 1422 } 1423 return nil 1424 } 1425 1426 if ask { 1427 for _, cmd := range cmds { 1428 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd) 1429 } 1430 return nil 1431 } 1432 1433 return nil 1434} 1435 1436func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { 1437 if len(keys) == 0 { 1438 return fmt.Errorf("redis: Watch requires at least one key") 1439 } 1440 1441 slot := hashtag.Slot(keys[0]) 1442 for _, key := range keys[1:] { 1443 if hashtag.Slot(key) != slot { 1444 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") 1445 return err 1446 } 1447 } 1448 1449 node, err := c.slotMasterNode(ctx, slot) 1450 if err != nil { 1451 return err 1452 } 1453 1454 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { 1455 if attempt > 0 { 1456 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { 1457 return err 1458 } 1459 } 1460 1461 err = node.Client.Watch(ctx, fn, keys...) 1462 if err == nil { 1463 break 1464 } 1465 1466 moved, ask, addr := isMovedError(err) 1467 if moved || ask { 1468 node, err = c.nodes.Get(addr) 1469 if err != nil { 1470 return err 1471 } 1472 continue 1473 } 1474 1475 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed { 1476 if isReadOnly { 1477 c.state.LazyReload() 1478 } 1479 node, err = c.slotMasterNode(ctx, slot) 1480 if err != nil { 1481 return err 1482 } 1483 continue 1484 } 1485 1486 if shouldRetry(err, true) { 1487 continue 1488 } 1489 1490 return err 1491 } 1492 1493 return err 1494} 1495 1496func (c *ClusterClient) pubSub() *PubSub { 1497 var node *clusterNode 1498 pubsub := &PubSub{ 1499 opt: c.opt.clientOptions(), 1500 1501 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) { 1502 if node != nil { 1503 panic("node != nil") 1504 } 1505 1506 var err error 1507 if len(channels) > 0 { 1508 slot := hashtag.Slot(channels[0]) 1509 node, err = c.slotMasterNode(ctx, slot) 1510 } else { 1511 node, err = c.nodes.Random() 1512 } 1513 if err != nil { 1514 return nil, err 1515 } 1516 1517 cn, err := node.Client.newConn(context.TODO()) 1518 if err != nil { 1519 node = nil 1520 1521 return nil, err 1522 } 1523 1524 return cn, nil 1525 }, 1526 closeConn: func(cn *pool.Conn) error { 1527 err := node.Client.connPool.CloseConn(cn) 1528 node = nil 1529 return err 1530 }, 1531 } 1532 pubsub.init() 1533 1534 return pubsub 1535} 1536 1537// Subscribe subscribes the client to the specified channels. 1538// Channels can be omitted to create empty subscription. 1539func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub { 1540 pubsub := c.pubSub() 1541 if len(channels) > 0 { 1542 _ = pubsub.Subscribe(ctx, channels...) 1543 } 1544 return pubsub 1545} 1546 1547// PSubscribe subscribes the client to the given patterns. 1548// Patterns can be omitted to create empty subscription. 1549func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub { 1550 pubsub := c.pubSub() 1551 if len(channels) > 0 { 1552 _ = pubsub.PSubscribe(ctx, channels...) 1553 } 1554 return pubsub 1555} 1556 1557func (c *ClusterClient) retryBackoff(attempt int) time.Duration { 1558 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) 1559} 1560 1561func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) { 1562 // Try 3 random nodes. 1563 const nodeLimit = 3 1564 1565 addrs, err := c.nodes.Addrs() 1566 if err != nil { 1567 return nil, err 1568 } 1569 1570 var firstErr error 1571 1572 perm := rand.Perm(len(addrs)) 1573 if len(perm) > nodeLimit { 1574 perm = perm[:nodeLimit] 1575 } 1576 1577 for _, idx := range perm { 1578 addr := addrs[idx] 1579 1580 node, err := c.nodes.Get(addr) 1581 if err != nil { 1582 if firstErr == nil { 1583 firstErr = err 1584 } 1585 continue 1586 } 1587 1588 info, err := node.Client.Command(ctx).Result() 1589 if err == nil { 1590 return info, nil 1591 } 1592 if firstErr == nil { 1593 firstErr = err 1594 } 1595 } 1596 1597 if firstErr == nil { 1598 panic("not reached") 1599 } 1600 return nil, firstErr 1601} 1602 1603func (c *ClusterClient) cmdInfo(name string) *CommandInfo { 1604 cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx) 1605 if err != nil { 1606 return nil 1607 } 1608 1609 info := cmdsInfo[name] 1610 if info == nil { 1611 internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) 1612 } 1613 return info 1614} 1615 1616func (c *ClusterClient) cmdSlot(cmd Cmder) int { 1617 args := cmd.Args() 1618 if args[0] == "cluster" && args[1] == "getkeysinslot" { 1619 return args[2].(int) 1620 } 1621 1622 cmdInfo := c.cmdInfo(cmd.Name()) 1623 return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) 1624} 1625 1626func cmdSlot(cmd Cmder, pos int) int { 1627 if pos == 0 { 1628 return hashtag.RandomSlot() 1629 } 1630 firstKey := cmd.stringArg(pos) 1631 return hashtag.Slot(firstKey) 1632} 1633 1634func (c *ClusterClient) cmdNode( 1635 ctx context.Context, 1636 cmdInfo *CommandInfo, 1637 slot int, 1638) (*clusterNode, error) { 1639 state, err := c.state.Get(ctx) 1640 if err != nil { 1641 return nil, err 1642 } 1643 1644 if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { 1645 return c.slotReadOnlyNode(state, slot) 1646 } 1647 return state.slotMasterNode(slot) 1648} 1649 1650func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) { 1651 if c.opt.RouteByLatency { 1652 return state.slotClosestNode(slot) 1653 } 1654 if c.opt.RouteRandomly { 1655 return state.slotRandomNode(slot) 1656 } 1657 return state.slotSlaveNode(slot) 1658} 1659 1660func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) { 1661 state, err := c.state.Get(ctx) 1662 if err != nil { 1663 return nil, err 1664 } 1665 return state.slotMasterNode(slot) 1666} 1667 1668// SlaveForKey gets a client for a replica node to run any command on it. 1669// This is especially useful if we want to run a particular lua script which has 1670// only read only commands on the replica. 1671// This is because other redis commands generally have a flag that points that 1672// they are read only and automatically run on the replica nodes 1673// if ClusterOptions.ReadOnly flag is set to true. 1674func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) { 1675 state, err := c.state.Get(ctx) 1676 if err != nil { 1677 return nil, err 1678 } 1679 slot := hashtag.Slot(key) 1680 node, err := c.slotReadOnlyNode(state, slot) 1681 if err != nil { 1682 return nil, err 1683 } 1684 return node.Client, err 1685} 1686 1687// MasterForKey return a client to the master node for a particular key. 1688func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) { 1689 slot := hashtag.Slot(key) 1690 node, err := c.slotMasterNode(ctx, slot) 1691 if err != nil { 1692 return nil, err 1693 } 1694 return node.Client, err 1695} 1696 1697func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { 1698 for _, n := range nodes { 1699 if n == node { 1700 return nodes 1701 } 1702 } 1703 return append(nodes, node) 1704} 1705 1706func appendIfNotExists(ss []string, es ...string) []string { 1707loop: 1708 for _, e := range es { 1709 for _, s := range ss { 1710 if s == e { 1711 continue loop 1712 } 1713 } 1714 ss = append(ss, e) 1715 } 1716 return ss 1717} 1718 1719//------------------------------------------------------------------------------ 1720 1721type cmdsMap struct { 1722 mu sync.Mutex 1723 m map[*clusterNode][]Cmder 1724} 1725 1726func newCmdsMap() *cmdsMap { 1727 return &cmdsMap{ 1728 m: make(map[*clusterNode][]Cmder), 1729 } 1730} 1731 1732func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) { 1733 m.mu.Lock() 1734 m.m[node] = append(m.m[node], cmds...) 1735 m.mu.Unlock() 1736} 1737