1package redis
2
3import (
4	"context"
5	"crypto/tls"
6	"fmt"
7	"math"
8	"net"
9	"runtime"
10	"sort"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/go-redis/redis/v8/internal"
16	"github.com/go-redis/redis/v8/internal/hashtag"
17	"github.com/go-redis/redis/v8/internal/pool"
18	"github.com/go-redis/redis/v8/internal/proto"
19	"github.com/go-redis/redis/v8/internal/rand"
20)
21
22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
23
24// ClusterOptions are used to configure a cluster client and should be
25// passed to NewClusterClient.
26type ClusterOptions struct {
27	// A seed list of host:port addresses of cluster nodes.
28	Addrs []string
29
30	// NewClient creates a cluster node client with provided name and options.
31	NewClient func(opt *Options) *Client
32
33	// The maximum number of retries before giving up. Command is retried
34	// on network errors and MOVED/ASK redirects.
35	// Default is 3 retries.
36	MaxRedirects int
37
38	// Enables read-only commands on slave nodes.
39	ReadOnly bool
40	// Allows routing read-only commands to the closest master or slave node.
41	// It automatically enables ReadOnly.
42	RouteByLatency bool
43	// Allows routing read-only commands to the random master or slave node.
44	// It automatically enables ReadOnly.
45	RouteRandomly bool
46
47	// Optional function that returns cluster slots information.
48	// It is useful to manually create cluster of standalone Redis servers
49	// and load-balance read/write operations between master and slaves.
50	// It can use service like ZooKeeper to maintain configuration information
51	// and Cluster.ReloadState to manually trigger state reloading.
52	ClusterSlots func(context.Context) ([]ClusterSlot, error)
53
54	// Following options are copied from Options struct.
55
56	Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
57
58	OnConnect func(ctx context.Context, cn *Conn) error
59
60	Username string
61	Password string
62
63	MaxRetries      int
64	MinRetryBackoff time.Duration
65	MaxRetryBackoff time.Duration
66
67	DialTimeout  time.Duration
68	ReadTimeout  time.Duration
69	WriteTimeout time.Duration
70
71	// PoolSize applies per cluster node and not for the whole cluster.
72	PoolSize           int
73	MinIdleConns       int
74	MaxConnAge         time.Duration
75	PoolTimeout        time.Duration
76	IdleTimeout        time.Duration
77	IdleCheckFrequency time.Duration
78
79	TLSConfig *tls.Config
80}
81
82func (opt *ClusterOptions) init() {
83	if opt.MaxRedirects == -1 {
84		opt.MaxRedirects = 0
85	} else if opt.MaxRedirects == 0 {
86		opt.MaxRedirects = 3
87	}
88
89	if opt.RouteByLatency || opt.RouteRandomly {
90		opt.ReadOnly = true
91	}
92
93	if opt.PoolSize == 0 {
94		opt.PoolSize = 5 * runtime.NumCPU()
95	}
96
97	switch opt.ReadTimeout {
98	case -1:
99		opt.ReadTimeout = 0
100	case 0:
101		opt.ReadTimeout = 3 * time.Second
102	}
103	switch opt.WriteTimeout {
104	case -1:
105		opt.WriteTimeout = 0
106	case 0:
107		opt.WriteTimeout = opt.ReadTimeout
108	}
109
110	if opt.MaxRetries == 0 {
111		opt.MaxRetries = -1
112	}
113	switch opt.MinRetryBackoff {
114	case -1:
115		opt.MinRetryBackoff = 0
116	case 0:
117		opt.MinRetryBackoff = 8 * time.Millisecond
118	}
119	switch opt.MaxRetryBackoff {
120	case -1:
121		opt.MaxRetryBackoff = 0
122	case 0:
123		opt.MaxRetryBackoff = 512 * time.Millisecond
124	}
125
126	if opt.NewClient == nil {
127		opt.NewClient = NewClient
128	}
129}
130
131func (opt *ClusterOptions) clientOptions() *Options {
132	const disableIdleCheck = -1
133
134	return &Options{
135		Dialer:    opt.Dialer,
136		OnConnect: opt.OnConnect,
137
138		Username: opt.Username,
139		Password: opt.Password,
140
141		MaxRetries:      opt.MaxRetries,
142		MinRetryBackoff: opt.MinRetryBackoff,
143		MaxRetryBackoff: opt.MaxRetryBackoff,
144
145		DialTimeout:  opt.DialTimeout,
146		ReadTimeout:  opt.ReadTimeout,
147		WriteTimeout: opt.WriteTimeout,
148
149		PoolSize:           opt.PoolSize,
150		MinIdleConns:       opt.MinIdleConns,
151		MaxConnAge:         opt.MaxConnAge,
152		PoolTimeout:        opt.PoolTimeout,
153		IdleTimeout:        opt.IdleTimeout,
154		IdleCheckFrequency: disableIdleCheck,
155
156		TLSConfig: opt.TLSConfig,
157		// If ClusterSlots is populated, then we probably have an artificial
158		// cluster whose nodes are not in clustering mode (otherwise there isn't
159		// much use for ClusterSlots config).  This means we cannot execute the
160		// READONLY command against that node -- setting readOnly to false in such
161		// situations in the options below will prevent that from happening.
162		readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
163	}
164}
165
166//------------------------------------------------------------------------------
167
168type clusterNode struct {
169	Client *Client
170
171	latency    uint32 // atomic
172	generation uint32 // atomic
173	failing    uint32 // atomic
174}
175
176func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
177	opt := clOpt.clientOptions()
178	opt.Addr = addr
179	node := clusterNode{
180		Client: clOpt.NewClient(opt),
181	}
182
183	node.latency = math.MaxUint32
184	if clOpt.RouteByLatency {
185		go node.updateLatency()
186	}
187
188	return &node
189}
190
191func (n *clusterNode) String() string {
192	return n.Client.String()
193}
194
195func (n *clusterNode) Close() error {
196	return n.Client.Close()
197}
198
199func (n *clusterNode) updateLatency() {
200	const numProbe = 10
201	var dur uint64
202
203	for i := 0; i < numProbe; i++ {
204		time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
205
206		start := time.Now()
207		n.Client.Ping(context.TODO())
208		dur += uint64(time.Since(start) / time.Microsecond)
209	}
210
211	latency := float64(dur) / float64(numProbe)
212	atomic.StoreUint32(&n.latency, uint32(latency+0.5))
213}
214
215func (n *clusterNode) Latency() time.Duration {
216	latency := atomic.LoadUint32(&n.latency)
217	return time.Duration(latency) * time.Microsecond
218}
219
220func (n *clusterNode) MarkAsFailing() {
221	atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
222}
223
224func (n *clusterNode) Failing() bool {
225	const timeout = 15 // 15 seconds
226
227	failing := atomic.LoadUint32(&n.failing)
228	if failing == 0 {
229		return false
230	}
231	if time.Now().Unix()-int64(failing) < timeout {
232		return true
233	}
234	atomic.StoreUint32(&n.failing, 0)
235	return false
236}
237
238func (n *clusterNode) Generation() uint32 {
239	return atomic.LoadUint32(&n.generation)
240}
241
242func (n *clusterNode) SetGeneration(gen uint32) {
243	for {
244		v := atomic.LoadUint32(&n.generation)
245		if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
246			break
247		}
248	}
249}
250
251//------------------------------------------------------------------------------
252
253type clusterNodes struct {
254	opt *ClusterOptions
255
256	mu          sync.RWMutex
257	addrs       []string
258	nodes       map[string]*clusterNode
259	activeAddrs []string
260	closed      bool
261
262	_generation uint32 // atomic
263}
264
265func newClusterNodes(opt *ClusterOptions) *clusterNodes {
266	return &clusterNodes{
267		opt: opt,
268
269		addrs: opt.Addrs,
270		nodes: make(map[string]*clusterNode),
271	}
272}
273
274func (c *clusterNodes) Close() error {
275	c.mu.Lock()
276	defer c.mu.Unlock()
277
278	if c.closed {
279		return nil
280	}
281	c.closed = true
282
283	var firstErr error
284	for _, node := range c.nodes {
285		if err := node.Client.Close(); err != nil && firstErr == nil {
286			firstErr = err
287		}
288	}
289
290	c.nodes = nil
291	c.activeAddrs = nil
292
293	return firstErr
294}
295
296func (c *clusterNodes) Addrs() ([]string, error) {
297	var addrs []string
298	c.mu.RLock()
299	closed := c.closed
300	if !closed {
301		if len(c.activeAddrs) > 0 {
302			addrs = c.activeAddrs
303		} else {
304			addrs = c.addrs
305		}
306	}
307	c.mu.RUnlock()
308
309	if closed {
310		return nil, pool.ErrClosed
311	}
312	if len(addrs) == 0 {
313		return nil, errClusterNoNodes
314	}
315	return addrs, nil
316}
317
318func (c *clusterNodes) NextGeneration() uint32 {
319	return atomic.AddUint32(&c._generation, 1)
320}
321
322// GC removes unused nodes.
323func (c *clusterNodes) GC(generation uint32) {
324	//nolint:prealloc
325	var collected []*clusterNode
326
327	c.mu.Lock()
328
329	c.activeAddrs = c.activeAddrs[:0]
330	for addr, node := range c.nodes {
331		if node.Generation() >= generation {
332			c.activeAddrs = append(c.activeAddrs, addr)
333			if c.opt.RouteByLatency {
334				go node.updateLatency()
335			}
336			continue
337		}
338
339		delete(c.nodes, addr)
340		collected = append(collected, node)
341	}
342
343	c.mu.Unlock()
344
345	for _, node := range collected {
346		_ = node.Client.Close()
347	}
348}
349
350func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
351	node, err := c.get(addr)
352	if err != nil {
353		return nil, err
354	}
355	if node != nil {
356		return node, nil
357	}
358
359	c.mu.Lock()
360	defer c.mu.Unlock()
361
362	if c.closed {
363		return nil, pool.ErrClosed
364	}
365
366	node, ok := c.nodes[addr]
367	if ok {
368		return node, nil
369	}
370
371	node = newClusterNode(c.opt, addr)
372
373	c.addrs = appendIfNotExists(c.addrs, addr)
374	c.nodes[addr] = node
375
376	return node, nil
377}
378
379func (c *clusterNodes) get(addr string) (*clusterNode, error) {
380	var node *clusterNode
381	var err error
382	c.mu.RLock()
383	if c.closed {
384		err = pool.ErrClosed
385	} else {
386		node = c.nodes[addr]
387	}
388	c.mu.RUnlock()
389	return node, err
390}
391
392func (c *clusterNodes) All() ([]*clusterNode, error) {
393	c.mu.RLock()
394	defer c.mu.RUnlock()
395
396	if c.closed {
397		return nil, pool.ErrClosed
398	}
399
400	cp := make([]*clusterNode, 0, len(c.nodes))
401	for _, node := range c.nodes {
402		cp = append(cp, node)
403	}
404	return cp, nil
405}
406
407func (c *clusterNodes) Random() (*clusterNode, error) {
408	addrs, err := c.Addrs()
409	if err != nil {
410		return nil, err
411	}
412
413	n := rand.Intn(len(addrs))
414	return c.Get(addrs[n])
415}
416
417//------------------------------------------------------------------------------
418
419type clusterSlot struct {
420	start, end int
421	nodes      []*clusterNode
422}
423
424type clusterSlotSlice []*clusterSlot
425
426func (p clusterSlotSlice) Len() int {
427	return len(p)
428}
429
430func (p clusterSlotSlice) Less(i, j int) bool {
431	return p[i].start < p[j].start
432}
433
434func (p clusterSlotSlice) Swap(i, j int) {
435	p[i], p[j] = p[j], p[i]
436}
437
438type clusterState struct {
439	nodes   *clusterNodes
440	Masters []*clusterNode
441	Slaves  []*clusterNode
442
443	slots []*clusterSlot
444
445	generation uint32
446	createdAt  time.Time
447}
448
449func newClusterState(
450	nodes *clusterNodes, slots []ClusterSlot, origin string,
451) (*clusterState, error) {
452	c := clusterState{
453		nodes: nodes,
454
455		slots: make([]*clusterSlot, 0, len(slots)),
456
457		generation: nodes.NextGeneration(),
458		createdAt:  time.Now(),
459	}
460
461	originHost, _, _ := net.SplitHostPort(origin)
462	isLoopbackOrigin := isLoopback(originHost)
463
464	for _, slot := range slots {
465		var nodes []*clusterNode
466		for i, slotNode := range slot.Nodes {
467			addr := slotNode.Addr
468			if !isLoopbackOrigin {
469				addr = replaceLoopbackHost(addr, originHost)
470			}
471
472			node, err := c.nodes.Get(addr)
473			if err != nil {
474				return nil, err
475			}
476
477			node.SetGeneration(c.generation)
478			nodes = append(nodes, node)
479
480			if i == 0 {
481				c.Masters = appendUniqueNode(c.Masters, node)
482			} else {
483				c.Slaves = appendUniqueNode(c.Slaves, node)
484			}
485		}
486
487		c.slots = append(c.slots, &clusterSlot{
488			start: slot.Start,
489			end:   slot.End,
490			nodes: nodes,
491		})
492	}
493
494	sort.Sort(clusterSlotSlice(c.slots))
495
496	time.AfterFunc(time.Minute, func() {
497		nodes.GC(c.generation)
498	})
499
500	return &c, nil
501}
502
503func replaceLoopbackHost(nodeAddr, originHost string) string {
504	nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
505	if err != nil {
506		return nodeAddr
507	}
508
509	nodeIP := net.ParseIP(nodeHost)
510	if nodeIP == nil {
511		return nodeAddr
512	}
513
514	if !nodeIP.IsLoopback() {
515		return nodeAddr
516	}
517
518	// Use origin host which is not loopback and node port.
519	return net.JoinHostPort(originHost, nodePort)
520}
521
522func isLoopback(host string) bool {
523	ip := net.ParseIP(host)
524	if ip == nil {
525		return true
526	}
527	return ip.IsLoopback()
528}
529
530func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
531	nodes := c.slotNodes(slot)
532	if len(nodes) > 0 {
533		return nodes[0], nil
534	}
535	return c.nodes.Random()
536}
537
538func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
539	nodes := c.slotNodes(slot)
540	switch len(nodes) {
541	case 0:
542		return c.nodes.Random()
543	case 1:
544		return nodes[0], nil
545	case 2:
546		if slave := nodes[1]; !slave.Failing() {
547			return slave, nil
548		}
549		return nodes[0], nil
550	default:
551		var slave *clusterNode
552		for i := 0; i < 10; i++ {
553			n := rand.Intn(len(nodes)-1) + 1
554			slave = nodes[n]
555			if !slave.Failing() {
556				return slave, nil
557			}
558		}
559
560		// All slaves are loading - use master.
561		return nodes[0], nil
562	}
563}
564
565func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
566	nodes := c.slotNodes(slot)
567	if len(nodes) == 0 {
568		return c.nodes.Random()
569	}
570
571	var node *clusterNode
572	for _, n := range nodes {
573		if n.Failing() {
574			continue
575		}
576		if node == nil || n.Latency() < node.Latency() {
577			node = n
578		}
579	}
580	if node != nil {
581		return node, nil
582	}
583
584	// If all nodes are failing - return random node
585	return c.nodes.Random()
586}
587
588func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
589	nodes := c.slotNodes(slot)
590	if len(nodes) == 0 {
591		return c.nodes.Random()
592	}
593	n := rand.Intn(len(nodes))
594	return nodes[n], nil
595}
596
597func (c *clusterState) slotNodes(slot int) []*clusterNode {
598	i := sort.Search(len(c.slots), func(i int) bool {
599		return c.slots[i].end >= slot
600	})
601	if i >= len(c.slots) {
602		return nil
603	}
604	x := c.slots[i]
605	if slot >= x.start && slot <= x.end {
606		return x.nodes
607	}
608	return nil
609}
610
611//------------------------------------------------------------------------------
612
613type clusterStateHolder struct {
614	load func(ctx context.Context) (*clusterState, error)
615
616	state     atomic.Value
617	reloading uint32 // atomic
618}
619
620func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
621	return &clusterStateHolder{
622		load: fn,
623	}
624}
625
626func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
627	state, err := c.load(ctx)
628	if err != nil {
629		return nil, err
630	}
631	c.state.Store(state)
632	return state, nil
633}
634
635func (c *clusterStateHolder) LazyReload() {
636	if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
637		return
638	}
639	go func() {
640		defer atomic.StoreUint32(&c.reloading, 0)
641
642		_, err := c.Reload(context.Background())
643		if err != nil {
644			return
645		}
646		time.Sleep(200 * time.Millisecond)
647	}()
648}
649
650func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
651	v := c.state.Load()
652	if v != nil {
653		state := v.(*clusterState)
654		if time.Since(state.createdAt) > 10*time.Second {
655			c.LazyReload()
656		}
657		return state, nil
658	}
659	return c.Reload(ctx)
660}
661
662func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
663	state, err := c.Reload(ctx)
664	if err == nil {
665		return state, nil
666	}
667	return c.Get(ctx)
668}
669
670//------------------------------------------------------------------------------
671
672type clusterClient struct {
673	opt           *ClusterOptions
674	nodes         *clusterNodes
675	state         *clusterStateHolder //nolint:structcheck
676	cmdsInfoCache *cmdsInfoCache      //nolint:structcheck
677}
678
679// ClusterClient is a Redis Cluster client representing a pool of zero
680// or more underlying connections. It's safe for concurrent use by
681// multiple goroutines.
682type ClusterClient struct {
683	*clusterClient
684	cmdable
685	hooks
686	ctx context.Context
687}
688
689// NewClusterClient returns a Redis Cluster client as described in
690// http://redis.io/topics/cluster-spec.
691func NewClusterClient(opt *ClusterOptions) *ClusterClient {
692	opt.init()
693
694	c := &ClusterClient{
695		clusterClient: &clusterClient{
696			opt:   opt,
697			nodes: newClusterNodes(opt),
698		},
699		ctx: context.Background(),
700	}
701	c.state = newClusterStateHolder(c.loadState)
702	c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
703	c.cmdable = c.Process
704
705	if opt.IdleCheckFrequency > 0 {
706		go c.reaper(opt.IdleCheckFrequency)
707	}
708
709	return c
710}
711
712func (c *ClusterClient) Context() context.Context {
713	return c.ctx
714}
715
716func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
717	if ctx == nil {
718		panic("nil context")
719	}
720	clone := *c
721	clone.cmdable = clone.Process
722	clone.hooks.lock()
723	clone.ctx = ctx
724	return &clone
725}
726
727// Options returns read-only Options that were used to create the client.
728func (c *ClusterClient) Options() *ClusterOptions {
729	return c.opt
730}
731
732// ReloadState reloads cluster state. If available it calls ClusterSlots func
733// to get cluster slots information.
734func (c *ClusterClient) ReloadState(ctx context.Context) {
735	c.state.LazyReload()
736}
737
738// Close closes the cluster client, releasing any open resources.
739//
740// It is rare to Close a ClusterClient, as the ClusterClient is meant
741// to be long-lived and shared between many goroutines.
742func (c *ClusterClient) Close() error {
743	return c.nodes.Close()
744}
745
746// Do creates a Cmd from the args and processes the cmd.
747func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
748	cmd := NewCmd(ctx, args...)
749	_ = c.Process(ctx, cmd)
750	return cmd
751}
752
753func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
754	return c.hooks.process(ctx, cmd, c.process)
755}
756
757func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
758	cmdInfo := c.cmdInfo(cmd.Name())
759	slot := c.cmdSlot(cmd)
760
761	var node *clusterNode
762	var ask bool
763	var lastErr error
764	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
765		if attempt > 0 {
766			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
767				return err
768			}
769		}
770
771		if node == nil {
772			var err error
773			node, err = c.cmdNode(ctx, cmdInfo, slot)
774			if err != nil {
775				return err
776			}
777		}
778
779		if ask {
780			pipe := node.Client.Pipeline()
781			_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
782			_ = pipe.Process(ctx, cmd)
783			_, lastErr = pipe.Exec(ctx)
784			_ = pipe.Close()
785			ask = false
786		} else {
787			lastErr = node.Client.Process(ctx, cmd)
788		}
789
790		// If there is no error - we are done.
791		if lastErr == nil {
792			return nil
793		}
794		if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
795			if isReadOnly {
796				c.state.LazyReload()
797			}
798			node = nil
799			continue
800		}
801
802		// If slave is loading - pick another node.
803		if c.opt.ReadOnly && isLoadingError(lastErr) {
804			node.MarkAsFailing()
805			node = nil
806			continue
807		}
808
809		var moved bool
810		var addr string
811		moved, ask, addr = isMovedError(lastErr)
812		if moved || ask {
813			var err error
814			node, err = c.nodes.Get(addr)
815			if err != nil {
816				return err
817			}
818			continue
819		}
820
821		if shouldRetry(lastErr, cmd.readTimeout() == nil) {
822			// First retry the same node.
823			if attempt == 0 {
824				continue
825			}
826
827			// Second try another node.
828			node.MarkAsFailing()
829			node = nil
830			continue
831		}
832
833		return lastErr
834	}
835	return lastErr
836}
837
838// ForEachMaster concurrently calls the fn on each master node in the cluster.
839// It returns the first error if any.
840func (c *ClusterClient) ForEachMaster(
841	ctx context.Context,
842	fn func(ctx context.Context, client *Client) error,
843) error {
844	state, err := c.state.ReloadOrGet(ctx)
845	if err != nil {
846		return err
847	}
848
849	var wg sync.WaitGroup
850	errCh := make(chan error, 1)
851
852	for _, master := range state.Masters {
853		wg.Add(1)
854		go func(node *clusterNode) {
855			defer wg.Done()
856			err := fn(ctx, node.Client)
857			if err != nil {
858				select {
859				case errCh <- err:
860				default:
861				}
862			}
863		}(master)
864	}
865
866	wg.Wait()
867
868	select {
869	case err := <-errCh:
870		return err
871	default:
872		return nil
873	}
874}
875
876// ForEachSlave concurrently calls the fn on each slave node in the cluster.
877// It returns the first error if any.
878func (c *ClusterClient) ForEachSlave(
879	ctx context.Context,
880	fn func(ctx context.Context, client *Client) error,
881) error {
882	state, err := c.state.ReloadOrGet(ctx)
883	if err != nil {
884		return err
885	}
886
887	var wg sync.WaitGroup
888	errCh := make(chan error, 1)
889
890	for _, slave := range state.Slaves {
891		wg.Add(1)
892		go func(node *clusterNode) {
893			defer wg.Done()
894			err := fn(ctx, node.Client)
895			if err != nil {
896				select {
897				case errCh <- err:
898				default:
899				}
900			}
901		}(slave)
902	}
903
904	wg.Wait()
905
906	select {
907	case err := <-errCh:
908		return err
909	default:
910		return nil
911	}
912}
913
914// ForEachShard concurrently calls the fn on each known node in the cluster.
915// It returns the first error if any.
916func (c *ClusterClient) ForEachShard(
917	ctx context.Context,
918	fn func(ctx context.Context, client *Client) error,
919) error {
920	state, err := c.state.ReloadOrGet(ctx)
921	if err != nil {
922		return err
923	}
924
925	var wg sync.WaitGroup
926	errCh := make(chan error, 1)
927
928	worker := func(node *clusterNode) {
929		defer wg.Done()
930		err := fn(ctx, node.Client)
931		if err != nil {
932			select {
933			case errCh <- err:
934			default:
935			}
936		}
937	}
938
939	for _, node := range state.Masters {
940		wg.Add(1)
941		go worker(node)
942	}
943	for _, node := range state.Slaves {
944		wg.Add(1)
945		go worker(node)
946	}
947
948	wg.Wait()
949
950	select {
951	case err := <-errCh:
952		return err
953	default:
954		return nil
955	}
956}
957
958// PoolStats returns accumulated connection pool stats.
959func (c *ClusterClient) PoolStats() *PoolStats {
960	var acc PoolStats
961
962	state, _ := c.state.Get(context.TODO())
963	if state == nil {
964		return &acc
965	}
966
967	for _, node := range state.Masters {
968		s := node.Client.connPool.Stats()
969		acc.Hits += s.Hits
970		acc.Misses += s.Misses
971		acc.Timeouts += s.Timeouts
972
973		acc.TotalConns += s.TotalConns
974		acc.IdleConns += s.IdleConns
975		acc.StaleConns += s.StaleConns
976	}
977
978	for _, node := range state.Slaves {
979		s := node.Client.connPool.Stats()
980		acc.Hits += s.Hits
981		acc.Misses += s.Misses
982		acc.Timeouts += s.Timeouts
983
984		acc.TotalConns += s.TotalConns
985		acc.IdleConns += s.IdleConns
986		acc.StaleConns += s.StaleConns
987	}
988
989	return &acc
990}
991
992func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
993	if c.opt.ClusterSlots != nil {
994		slots, err := c.opt.ClusterSlots(ctx)
995		if err != nil {
996			return nil, err
997		}
998		return newClusterState(c.nodes, slots, "")
999	}
1000
1001	addrs, err := c.nodes.Addrs()
1002	if err != nil {
1003		return nil, err
1004	}
1005
1006	var firstErr error
1007
1008	for _, idx := range rand.Perm(len(addrs)) {
1009		addr := addrs[idx]
1010
1011		node, err := c.nodes.Get(addr)
1012		if err != nil {
1013			if firstErr == nil {
1014				firstErr = err
1015			}
1016			continue
1017		}
1018
1019		slots, err := node.Client.ClusterSlots(ctx).Result()
1020		if err != nil {
1021			if firstErr == nil {
1022				firstErr = err
1023			}
1024			continue
1025		}
1026
1027		return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1028	}
1029
1030	/*
1031	 * No node is connectable. It's possible that all nodes' IP has changed.
1032	 * Clear activeAddrs to let client be able to re-connect using the initial
1033	 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
1034	 * which might have chance to resolve domain name and get updated IP address.
1035	 */
1036	c.nodes.mu.Lock()
1037	c.nodes.activeAddrs = nil
1038	c.nodes.mu.Unlock()
1039
1040	return nil, firstErr
1041}
1042
1043// reaper closes idle connections to the cluster.
1044func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1045	ticker := time.NewTicker(idleCheckFrequency)
1046	defer ticker.Stop()
1047
1048	for range ticker.C {
1049		nodes, err := c.nodes.All()
1050		if err != nil {
1051			break
1052		}
1053
1054		for _, node := range nodes {
1055			_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1056			if err != nil {
1057				internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
1058			}
1059		}
1060	}
1061}
1062
1063func (c *ClusterClient) Pipeline() Pipeliner {
1064	pipe := Pipeline{
1065		ctx:  c.ctx,
1066		exec: c.processPipeline,
1067	}
1068	pipe.init()
1069	return &pipe
1070}
1071
1072func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1073	return c.Pipeline().Pipelined(ctx, fn)
1074}
1075
1076func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1077	return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
1078}
1079
1080func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
1081	cmdsMap := newCmdsMap()
1082	err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
1083	if err != nil {
1084		setCmdsErr(cmds, err)
1085		return err
1086	}
1087
1088	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1089		if attempt > 0 {
1090			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1091				setCmdsErr(cmds, err)
1092				return err
1093			}
1094		}
1095
1096		failedCmds := newCmdsMap()
1097		var wg sync.WaitGroup
1098
1099		for node, cmds := range cmdsMap.m {
1100			wg.Add(1)
1101			go func(node *clusterNode, cmds []Cmder) {
1102				defer wg.Done()
1103
1104				err := c._processPipelineNode(ctx, node, cmds, failedCmds)
1105				if err == nil {
1106					return
1107				}
1108				if attempt < c.opt.MaxRedirects {
1109					if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1110						setCmdsErr(cmds, err)
1111					}
1112				} else {
1113					setCmdsErr(cmds, err)
1114				}
1115			}(node, cmds)
1116		}
1117
1118		wg.Wait()
1119		if len(failedCmds.m) == 0 {
1120			break
1121		}
1122		cmdsMap = failedCmds
1123	}
1124
1125	return cmdsFirstErr(cmds)
1126}
1127
1128func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1129	state, err := c.state.Get(ctx)
1130	if err != nil {
1131		return err
1132	}
1133
1134	if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
1135		for _, cmd := range cmds {
1136			slot := c.cmdSlot(cmd)
1137			node, err := c.slotReadOnlyNode(state, slot)
1138			if err != nil {
1139				return err
1140			}
1141			cmdsMap.Add(node, cmd)
1142		}
1143		return nil
1144	}
1145
1146	for _, cmd := range cmds {
1147		slot := c.cmdSlot(cmd)
1148		node, err := state.slotMasterNode(slot)
1149		if err != nil {
1150			return err
1151		}
1152		cmdsMap.Add(node, cmd)
1153	}
1154	return nil
1155}
1156
1157func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1158	for _, cmd := range cmds {
1159		cmdInfo := c.cmdInfo(cmd.Name())
1160		if cmdInfo == nil || !cmdInfo.ReadOnly {
1161			return false
1162		}
1163	}
1164	return true
1165}
1166
1167func (c *ClusterClient) _processPipelineNode(
1168	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1169) error {
1170	return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1171		return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1172			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1173				return writeCmds(wr, cmds)
1174			})
1175			if err != nil {
1176				return err
1177			}
1178
1179			return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1180				return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1181			})
1182		})
1183	})
1184}
1185
1186func (c *ClusterClient) pipelineReadCmds(
1187	ctx context.Context,
1188	node *clusterNode,
1189	rd *proto.Reader,
1190	cmds []Cmder,
1191	failedCmds *cmdsMap,
1192) error {
1193	for _, cmd := range cmds {
1194		err := cmd.readReply(rd)
1195		cmd.SetErr(err)
1196
1197		if err == nil {
1198			continue
1199		}
1200
1201		if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1202			continue
1203		}
1204
1205		if c.opt.ReadOnly && isLoadingError(err) {
1206			node.MarkAsFailing()
1207			return err
1208		}
1209		if isRedisError(err) {
1210			continue
1211		}
1212		return err
1213	}
1214	return nil
1215}
1216
1217func (c *ClusterClient) checkMovedErr(
1218	ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
1219) bool {
1220	moved, ask, addr := isMovedError(err)
1221	if !moved && !ask {
1222		return false
1223	}
1224
1225	node, err := c.nodes.Get(addr)
1226	if err != nil {
1227		return false
1228	}
1229
1230	if moved {
1231		c.state.LazyReload()
1232		failedCmds.Add(node, cmd)
1233		return true
1234	}
1235
1236	if ask {
1237		failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1238		return true
1239	}
1240
1241	panic("not reached")
1242}
1243
1244// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1245func (c *ClusterClient) TxPipeline() Pipeliner {
1246	pipe := Pipeline{
1247		ctx:  c.ctx,
1248		exec: c.processTxPipeline,
1249	}
1250	pipe.init()
1251	return &pipe
1252}
1253
1254func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1255	return c.TxPipeline().Pipelined(ctx, fn)
1256}
1257
1258func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1259	return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
1260}
1261
1262func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
1263	// Trim multi .. exec.
1264	cmds = cmds[1 : len(cmds)-1]
1265
1266	state, err := c.state.Get(ctx)
1267	if err != nil {
1268		setCmdsErr(cmds, err)
1269		return err
1270	}
1271
1272	cmdsMap := c.mapCmdsBySlot(cmds)
1273	for slot, cmds := range cmdsMap {
1274		node, err := state.slotMasterNode(slot)
1275		if err != nil {
1276			setCmdsErr(cmds, err)
1277			continue
1278		}
1279
1280		cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1281		for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1282			if attempt > 0 {
1283				if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1284					setCmdsErr(cmds, err)
1285					return err
1286				}
1287			}
1288
1289			failedCmds := newCmdsMap()
1290			var wg sync.WaitGroup
1291
1292			for node, cmds := range cmdsMap {
1293				wg.Add(1)
1294				go func(node *clusterNode, cmds []Cmder) {
1295					defer wg.Done()
1296
1297					err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1298					if err == nil {
1299						return
1300					}
1301
1302					if attempt < c.opt.MaxRedirects {
1303						if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1304							setCmdsErr(cmds, err)
1305						}
1306					} else {
1307						setCmdsErr(cmds, err)
1308					}
1309				}(node, cmds)
1310			}
1311
1312			wg.Wait()
1313			if len(failedCmds.m) == 0 {
1314				break
1315			}
1316			cmdsMap = failedCmds.m
1317		}
1318	}
1319
1320	return cmdsFirstErr(cmds)
1321}
1322
1323func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1324	cmdsMap := make(map[int][]Cmder)
1325	for _, cmd := range cmds {
1326		slot := c.cmdSlot(cmd)
1327		cmdsMap[slot] = append(cmdsMap[slot], cmd)
1328	}
1329	return cmdsMap
1330}
1331
1332func (c *ClusterClient) _processTxPipelineNode(
1333	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1334) error {
1335	return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1336		return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1337			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1338				return writeCmds(wr, cmds)
1339			})
1340			if err != nil {
1341				return err
1342			}
1343
1344			return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1345				statusCmd := cmds[0].(*StatusCmd)
1346				// Trim multi and exec.
1347				cmds = cmds[1 : len(cmds)-1]
1348
1349				err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
1350				if err != nil {
1351					moved, ask, addr := isMovedError(err)
1352					if moved || ask {
1353						return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
1354					}
1355					return err
1356				}
1357
1358				return pipelineReadCmds(rd, cmds)
1359			})
1360		})
1361	})
1362}
1363
1364func (c *ClusterClient) txPipelineReadQueued(
1365	ctx context.Context,
1366	rd *proto.Reader,
1367	statusCmd *StatusCmd,
1368	cmds []Cmder,
1369	failedCmds *cmdsMap,
1370) error {
1371	// Parse queued replies.
1372	if err := statusCmd.readReply(rd); err != nil {
1373		return err
1374	}
1375
1376	for _, cmd := range cmds {
1377		err := statusCmd.readReply(rd)
1378		if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
1379			continue
1380		}
1381		return err
1382	}
1383
1384	// Parse number of replies.
1385	line, err := rd.ReadLine()
1386	if err != nil {
1387		if err == Nil {
1388			err = TxFailedErr
1389		}
1390		return err
1391	}
1392
1393	switch line[0] {
1394	case proto.ErrorReply:
1395		return proto.ParseErrorReply(line)
1396	case proto.ArrayReply:
1397		// ok
1398	default:
1399		return fmt.Errorf("redis: expected '*', but got line %q", line)
1400	}
1401
1402	return nil
1403}
1404
1405func (c *ClusterClient) cmdsMoved(
1406	ctx context.Context, cmds []Cmder,
1407	moved, ask bool,
1408	addr string,
1409	failedCmds *cmdsMap,
1410) error {
1411	node, err := c.nodes.Get(addr)
1412	if err != nil {
1413		return err
1414	}
1415
1416	if moved {
1417		c.state.LazyReload()
1418		for _, cmd := range cmds {
1419			failedCmds.Add(node, cmd)
1420		}
1421		return nil
1422	}
1423
1424	if ask {
1425		for _, cmd := range cmds {
1426			failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1427		}
1428		return nil
1429	}
1430
1431	return nil
1432}
1433
1434func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
1435	if len(keys) == 0 {
1436		return fmt.Errorf("redis: Watch requires at least one key")
1437	}
1438
1439	slot := hashtag.Slot(keys[0])
1440	for _, key := range keys[1:] {
1441		if hashtag.Slot(key) != slot {
1442			err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
1443			return err
1444		}
1445	}
1446
1447	node, err := c.slotMasterNode(ctx, slot)
1448	if err != nil {
1449		return err
1450	}
1451
1452	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1453		if attempt > 0 {
1454			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1455				return err
1456			}
1457		}
1458
1459		err = node.Client.Watch(ctx, fn, keys...)
1460		if err == nil {
1461			break
1462		}
1463
1464		moved, ask, addr := isMovedError(err)
1465		if moved || ask {
1466			node, err = c.nodes.Get(addr)
1467			if err != nil {
1468				return err
1469			}
1470			continue
1471		}
1472
1473		if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
1474			if isReadOnly {
1475				c.state.LazyReload()
1476			}
1477			node, err = c.slotMasterNode(ctx, slot)
1478			if err != nil {
1479				return err
1480			}
1481			continue
1482		}
1483
1484		if shouldRetry(err, true) {
1485			continue
1486		}
1487
1488		return err
1489	}
1490
1491	return err
1492}
1493
1494func (c *ClusterClient) pubSub() *PubSub {
1495	var node *clusterNode
1496	pubsub := &PubSub{
1497		opt: c.opt.clientOptions(),
1498
1499		newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
1500			if node != nil {
1501				panic("node != nil")
1502			}
1503
1504			var err error
1505			if len(channels) > 0 {
1506				slot := hashtag.Slot(channels[0])
1507				node, err = c.slotMasterNode(ctx, slot)
1508			} else {
1509				node, err = c.nodes.Random()
1510			}
1511			if err != nil {
1512				return nil, err
1513			}
1514
1515			cn, err := node.Client.newConn(context.TODO())
1516			if err != nil {
1517				node = nil
1518
1519				return nil, err
1520			}
1521
1522			return cn, nil
1523		},
1524		closeConn: func(cn *pool.Conn) error {
1525			err := node.Client.connPool.CloseConn(cn)
1526			node = nil
1527			return err
1528		},
1529	}
1530	pubsub.init()
1531
1532	return pubsub
1533}
1534
1535// Subscribe subscribes the client to the specified channels.
1536// Channels can be omitted to create empty subscription.
1537func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
1538	pubsub := c.pubSub()
1539	if len(channels) > 0 {
1540		_ = pubsub.Subscribe(ctx, channels...)
1541	}
1542	return pubsub
1543}
1544
1545// PSubscribe subscribes the client to the given patterns.
1546// Patterns can be omitted to create empty subscription.
1547func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
1548	pubsub := c.pubSub()
1549	if len(channels) > 0 {
1550		_ = pubsub.PSubscribe(ctx, channels...)
1551	}
1552	return pubsub
1553}
1554
1555func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
1556	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
1557}
1558
1559func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
1560	// Try 3 random nodes.
1561	const nodeLimit = 3
1562
1563	addrs, err := c.nodes.Addrs()
1564	if err != nil {
1565		return nil, err
1566	}
1567
1568	var firstErr error
1569
1570	perm := rand.Perm(len(addrs))
1571	if len(perm) > nodeLimit {
1572		perm = perm[:nodeLimit]
1573	}
1574
1575	for _, idx := range perm {
1576		addr := addrs[idx]
1577
1578		node, err := c.nodes.Get(addr)
1579		if err != nil {
1580			if firstErr == nil {
1581				firstErr = err
1582			}
1583			continue
1584		}
1585
1586		info, err := node.Client.Command(ctx).Result()
1587		if err == nil {
1588			return info, nil
1589		}
1590		if firstErr == nil {
1591			firstErr = err
1592		}
1593	}
1594
1595	if firstErr == nil {
1596		panic("not reached")
1597	}
1598	return nil, firstErr
1599}
1600
1601func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
1602	cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
1603	if err != nil {
1604		return nil
1605	}
1606
1607	info := cmdsInfo[name]
1608	if info == nil {
1609		internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
1610	}
1611	return info
1612}
1613
1614func (c *ClusterClient) cmdSlot(cmd Cmder) int {
1615	args := cmd.Args()
1616	if args[0] == "cluster" && args[1] == "getkeysinslot" {
1617		return args[2].(int)
1618	}
1619
1620	cmdInfo := c.cmdInfo(cmd.Name())
1621	return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
1622}
1623
1624func cmdSlot(cmd Cmder, pos int) int {
1625	if pos == 0 {
1626		return hashtag.RandomSlot()
1627	}
1628	firstKey := cmd.stringArg(pos)
1629	return hashtag.Slot(firstKey)
1630}
1631
1632func (c *ClusterClient) cmdNode(
1633	ctx context.Context,
1634	cmdInfo *CommandInfo,
1635	slot int,
1636) (*clusterNode, error) {
1637	state, err := c.state.Get(ctx)
1638	if err != nil {
1639		return nil, err
1640	}
1641
1642	if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
1643		return c.slotReadOnlyNode(state, slot)
1644	}
1645	return state.slotMasterNode(slot)
1646}
1647
1648func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
1649	if c.opt.RouteByLatency {
1650		return state.slotClosestNode(slot)
1651	}
1652	if c.opt.RouteRandomly {
1653		return state.slotRandomNode(slot)
1654	}
1655	return state.slotSlaveNode(slot)
1656}
1657
1658func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
1659	state, err := c.state.Get(ctx)
1660	if err != nil {
1661		return nil, err
1662	}
1663	return state.slotMasterNode(slot)
1664}
1665
1666// SlaveForKey gets a client for a replica node to run any command on it.
1667// This is especially useful if we want to run a particular lua script which has
1668// only read only commands on the replica.
1669// This is because other redis commands generally have a flag that points that
1670// they are read only and automatically run on the replica nodes
1671// if ClusterOptions.ReadOnly flag is set to true.
1672func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
1673	state, err := c.state.Get(ctx)
1674	if err != nil {
1675		return nil, err
1676	}
1677	slot := hashtag.Slot(key)
1678	node, err := c.slotReadOnlyNode(state, slot)
1679	if err != nil {
1680		return nil, err
1681	}
1682	return node.Client, err
1683}
1684
1685// MasterForKey return a client to the master node for a particular key.
1686func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
1687	slot := hashtag.Slot(key)
1688	node, err := c.slotMasterNode(ctx, slot)
1689	if err != nil {
1690		return nil, err
1691	}
1692	return node.Client, err
1693}
1694
1695func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1696	for _, n := range nodes {
1697		if n == node {
1698			return nodes
1699		}
1700	}
1701	return append(nodes, node)
1702}
1703
1704func appendIfNotExists(ss []string, es ...string) []string {
1705loop:
1706	for _, e := range es {
1707		for _, s := range ss {
1708			if s == e {
1709				continue loop
1710			}
1711		}
1712		ss = append(ss, e)
1713	}
1714	return ss
1715}
1716
1717//------------------------------------------------------------------------------
1718
1719type cmdsMap struct {
1720	mu sync.Mutex
1721	m  map[*clusterNode][]Cmder
1722}
1723
1724func newCmdsMap() *cmdsMap {
1725	return &cmdsMap{
1726		m: make(map[*clusterNode][]Cmder),
1727	}
1728}
1729
1730func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
1731	m.mu.Lock()
1732	m.m[node] = append(m.m[node], cmds...)
1733	m.mu.Unlock()
1734}
1735