1package redis
2
3import (
4	"context"
5	"crypto/tls"
6	"fmt"
7	"math"
8	"net"
9	"runtime"
10	"sort"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/go-redis/redis/v8/internal"
16	"github.com/go-redis/redis/v8/internal/hashtag"
17	"github.com/go-redis/redis/v8/internal/pool"
18	"github.com/go-redis/redis/v8/internal/proto"
19	"github.com/go-redis/redis/v8/internal/rand"
20)
21
22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
23
24// ClusterOptions are used to configure a cluster client and should be
25// passed to NewClusterClient.
26type ClusterOptions struct {
27	// A seed list of host:port addresses of cluster nodes.
28	Addrs []string
29
30	// NewClient creates a cluster node client with provided name and options.
31	NewClient func(opt *Options) *Client
32
33	// The maximum number of retries before giving up. Command is retried
34	// on network errors and MOVED/ASK redirects.
35	// Default is 3 retries.
36	MaxRedirects int
37
38	// Enables read-only commands on slave nodes.
39	ReadOnly bool
40	// Allows routing read-only commands to the closest master or slave node.
41	// It automatically enables ReadOnly.
42	RouteByLatency bool
43	// Allows routing read-only commands to the random master or slave node.
44	// It automatically enables ReadOnly.
45	RouteRandomly bool
46
47	// Optional function that returns cluster slots information.
48	// It is useful to manually create cluster of standalone Redis servers
49	// and load-balance read/write operations between master and slaves.
50	// It can use service like ZooKeeper to maintain configuration information
51	// and Cluster.ReloadState to manually trigger state reloading.
52	ClusterSlots func(context.Context) ([]ClusterSlot, error)
53
54	// Following options are copied from Options struct.
55
56	Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
57
58	OnConnect func(ctx context.Context, cn *Conn) error
59
60	Username string
61	Password string
62
63	MaxRetries      int
64	MinRetryBackoff time.Duration
65	MaxRetryBackoff time.Duration
66
67	DialTimeout  time.Duration
68	ReadTimeout  time.Duration
69	WriteTimeout time.Duration
70
71	// PoolSize applies per cluster node and not for the whole cluster.
72	PoolSize           int
73	MinIdleConns       int
74	MaxConnAge         time.Duration
75	PoolTimeout        time.Duration
76	IdleTimeout        time.Duration
77	IdleCheckFrequency time.Duration
78
79	TLSConfig *tls.Config
80}
81
82func (opt *ClusterOptions) init() {
83	if opt.MaxRedirects == -1 {
84		opt.MaxRedirects = 0
85	} else if opt.MaxRedirects == 0 {
86		opt.MaxRedirects = 3
87	}
88
89	if opt.RouteByLatency || opt.RouteRandomly {
90		opt.ReadOnly = true
91	}
92
93	if opt.PoolSize == 0 {
94		opt.PoolSize = 5 * runtime.NumCPU()
95	}
96
97	switch opt.ReadTimeout {
98	case -1:
99		opt.ReadTimeout = 0
100	case 0:
101		opt.ReadTimeout = 3 * time.Second
102	}
103	switch opt.WriteTimeout {
104	case -1:
105		opt.WriteTimeout = 0
106	case 0:
107		opt.WriteTimeout = opt.ReadTimeout
108	}
109
110	if opt.MaxRetries == 0 {
111		opt.MaxRetries = -1
112	}
113	switch opt.MinRetryBackoff {
114	case -1:
115		opt.MinRetryBackoff = 0
116	case 0:
117		opt.MinRetryBackoff = 8 * time.Millisecond
118	}
119	switch opt.MaxRetryBackoff {
120	case -1:
121		opt.MaxRetryBackoff = 0
122	case 0:
123		opt.MaxRetryBackoff = 512 * time.Millisecond
124	}
125
126	if opt.NewClient == nil {
127		opt.NewClient = NewClient
128	}
129}
130
131func (opt *ClusterOptions) clientOptions() *Options {
132	const disableIdleCheck = -1
133
134	return &Options{
135		Dialer:    opt.Dialer,
136		OnConnect: opt.OnConnect,
137
138		Username: opt.Username,
139		Password: opt.Password,
140
141		MaxRetries:      opt.MaxRetries,
142		MinRetryBackoff: opt.MinRetryBackoff,
143		MaxRetryBackoff: opt.MaxRetryBackoff,
144
145		DialTimeout:  opt.DialTimeout,
146		ReadTimeout:  opt.ReadTimeout,
147		WriteTimeout: opt.WriteTimeout,
148
149		PoolSize:           opt.PoolSize,
150		MinIdleConns:       opt.MinIdleConns,
151		MaxConnAge:         opt.MaxConnAge,
152		PoolTimeout:        opt.PoolTimeout,
153		IdleTimeout:        opt.IdleTimeout,
154		IdleCheckFrequency: disableIdleCheck,
155
156		TLSConfig: opt.TLSConfig,
157		// If ClusterSlots is populated, then we probably have an artificial
158		// cluster whose nodes are not in clustering mode (otherwise there isn't
159		// much use for ClusterSlots config).  This means we cannot execute the
160		// READONLY command against that node -- setting readOnly to false in such
161		// situations in the options below will prevent that from happening.
162		readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
163	}
164}
165
166//------------------------------------------------------------------------------
167
168type clusterNode struct {
169	Client *Client
170
171	latency    uint32 // atomic
172	generation uint32 // atomic
173	failing    uint32 // atomic
174}
175
176func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
177	opt := clOpt.clientOptions()
178	opt.Addr = addr
179	node := clusterNode{
180		Client: clOpt.NewClient(opt),
181	}
182
183	node.latency = math.MaxUint32
184	if clOpt.RouteByLatency {
185		go node.updateLatency()
186	}
187
188	return &node
189}
190
191func (n *clusterNode) String() string {
192	return n.Client.String()
193}
194
195func (n *clusterNode) Close() error {
196	return n.Client.Close()
197}
198
199func (n *clusterNode) updateLatency() {
200	const numProbe = 10
201	var dur uint64
202
203	for i := 0; i < numProbe; i++ {
204		time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
205
206		start := time.Now()
207		n.Client.Ping(context.TODO())
208		dur += uint64(time.Since(start) / time.Microsecond)
209	}
210
211	latency := float64(dur) / float64(numProbe)
212	atomic.StoreUint32(&n.latency, uint32(latency+0.5))
213}
214
215func (n *clusterNode) Latency() time.Duration {
216	latency := atomic.LoadUint32(&n.latency)
217	return time.Duration(latency) * time.Microsecond
218}
219
220func (n *clusterNode) MarkAsFailing() {
221	atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
222}
223
224func (n *clusterNode) Failing() bool {
225	const timeout = 15 // 15 seconds
226
227	failing := atomic.LoadUint32(&n.failing)
228	if failing == 0 {
229		return false
230	}
231	if time.Now().Unix()-int64(failing) < timeout {
232		return true
233	}
234	atomic.StoreUint32(&n.failing, 0)
235	return false
236}
237
238func (n *clusterNode) Generation() uint32 {
239	return atomic.LoadUint32(&n.generation)
240}
241
242func (n *clusterNode) SetGeneration(gen uint32) {
243	for {
244		v := atomic.LoadUint32(&n.generation)
245		if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
246			break
247		}
248	}
249}
250
251//------------------------------------------------------------------------------
252
253type clusterNodes struct {
254	opt *ClusterOptions
255
256	mu          sync.RWMutex
257	addrs       []string
258	nodes       map[string]*clusterNode
259	activeAddrs []string
260	closed      bool
261
262	_generation uint32 // atomic
263}
264
265func newClusterNodes(opt *ClusterOptions) *clusterNodes {
266	return &clusterNodes{
267		opt: opt,
268
269		addrs: opt.Addrs,
270		nodes: make(map[string]*clusterNode),
271	}
272}
273
274func (c *clusterNodes) Close() error {
275	c.mu.Lock()
276	defer c.mu.Unlock()
277
278	if c.closed {
279		return nil
280	}
281	c.closed = true
282
283	var firstErr error
284	for _, node := range c.nodes {
285		if err := node.Client.Close(); err != nil && firstErr == nil {
286			firstErr = err
287		}
288	}
289
290	c.nodes = nil
291	c.activeAddrs = nil
292
293	return firstErr
294}
295
296func (c *clusterNodes) Addrs() ([]string, error) {
297	var addrs []string
298
299	c.mu.RLock()
300	closed := c.closed //nolint:ifshort
301	if !closed {
302		if len(c.activeAddrs) > 0 {
303			addrs = c.activeAddrs
304		} else {
305			addrs = c.addrs
306		}
307	}
308	c.mu.RUnlock()
309
310	if closed {
311		return nil, pool.ErrClosed
312	}
313	if len(addrs) == 0 {
314		return nil, errClusterNoNodes
315	}
316	return addrs, nil
317}
318
319func (c *clusterNodes) NextGeneration() uint32 {
320	return atomic.AddUint32(&c._generation, 1)
321}
322
323// GC removes unused nodes.
324func (c *clusterNodes) GC(generation uint32) {
325	//nolint:prealloc
326	var collected []*clusterNode
327
328	c.mu.Lock()
329
330	c.activeAddrs = c.activeAddrs[:0]
331	for addr, node := range c.nodes {
332		if node.Generation() >= generation {
333			c.activeAddrs = append(c.activeAddrs, addr)
334			if c.opt.RouteByLatency {
335				go node.updateLatency()
336			}
337			continue
338		}
339
340		delete(c.nodes, addr)
341		collected = append(collected, node)
342	}
343
344	c.mu.Unlock()
345
346	for _, node := range collected {
347		_ = node.Client.Close()
348	}
349}
350
351func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
352	node, err := c.get(addr)
353	if err != nil {
354		return nil, err
355	}
356	if node != nil {
357		return node, nil
358	}
359
360	c.mu.Lock()
361	defer c.mu.Unlock()
362
363	if c.closed {
364		return nil, pool.ErrClosed
365	}
366
367	node, ok := c.nodes[addr]
368	if ok {
369		return node, nil
370	}
371
372	node = newClusterNode(c.opt, addr)
373
374	c.addrs = appendIfNotExists(c.addrs, addr)
375	c.nodes[addr] = node
376
377	return node, nil
378}
379
380func (c *clusterNodes) get(addr string) (*clusterNode, error) {
381	var node *clusterNode
382	var err error
383	c.mu.RLock()
384	if c.closed {
385		err = pool.ErrClosed
386	} else {
387		node = c.nodes[addr]
388	}
389	c.mu.RUnlock()
390	return node, err
391}
392
393func (c *clusterNodes) All() ([]*clusterNode, error) {
394	c.mu.RLock()
395	defer c.mu.RUnlock()
396
397	if c.closed {
398		return nil, pool.ErrClosed
399	}
400
401	cp := make([]*clusterNode, 0, len(c.nodes))
402	for _, node := range c.nodes {
403		cp = append(cp, node)
404	}
405	return cp, nil
406}
407
408func (c *clusterNodes) Random() (*clusterNode, error) {
409	addrs, err := c.Addrs()
410	if err != nil {
411		return nil, err
412	}
413
414	n := rand.Intn(len(addrs))
415	return c.Get(addrs[n])
416}
417
418//------------------------------------------------------------------------------
419
420type clusterSlot struct {
421	start, end int
422	nodes      []*clusterNode
423}
424
425type clusterSlotSlice []*clusterSlot
426
427func (p clusterSlotSlice) Len() int {
428	return len(p)
429}
430
431func (p clusterSlotSlice) Less(i, j int) bool {
432	return p[i].start < p[j].start
433}
434
435func (p clusterSlotSlice) Swap(i, j int) {
436	p[i], p[j] = p[j], p[i]
437}
438
439type clusterState struct {
440	nodes   *clusterNodes
441	Masters []*clusterNode
442	Slaves  []*clusterNode
443
444	slots []*clusterSlot
445
446	generation uint32
447	createdAt  time.Time
448}
449
450func newClusterState(
451	nodes *clusterNodes, slots []ClusterSlot, origin string,
452) (*clusterState, error) {
453	c := clusterState{
454		nodes: nodes,
455
456		slots: make([]*clusterSlot, 0, len(slots)),
457
458		generation: nodes.NextGeneration(),
459		createdAt:  time.Now(),
460	}
461
462	originHost, _, _ := net.SplitHostPort(origin)
463	isLoopbackOrigin := isLoopback(originHost)
464
465	for _, slot := range slots {
466		var nodes []*clusterNode
467		for i, slotNode := range slot.Nodes {
468			addr := slotNode.Addr
469			if !isLoopbackOrigin {
470				addr = replaceLoopbackHost(addr, originHost)
471			}
472
473			node, err := c.nodes.Get(addr)
474			if err != nil {
475				return nil, err
476			}
477
478			node.SetGeneration(c.generation)
479			nodes = append(nodes, node)
480
481			if i == 0 {
482				c.Masters = appendUniqueNode(c.Masters, node)
483			} else {
484				c.Slaves = appendUniqueNode(c.Slaves, node)
485			}
486		}
487
488		c.slots = append(c.slots, &clusterSlot{
489			start: slot.Start,
490			end:   slot.End,
491			nodes: nodes,
492		})
493	}
494
495	sort.Sort(clusterSlotSlice(c.slots))
496
497	time.AfterFunc(time.Minute, func() {
498		nodes.GC(c.generation)
499	})
500
501	return &c, nil
502}
503
504func replaceLoopbackHost(nodeAddr, originHost string) string {
505	nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
506	if err != nil {
507		return nodeAddr
508	}
509
510	nodeIP := net.ParseIP(nodeHost)
511	if nodeIP == nil {
512		return nodeAddr
513	}
514
515	if !nodeIP.IsLoopback() {
516		return nodeAddr
517	}
518
519	// Use origin host which is not loopback and node port.
520	return net.JoinHostPort(originHost, nodePort)
521}
522
523func isLoopback(host string) bool {
524	ip := net.ParseIP(host)
525	if ip == nil {
526		return true
527	}
528	return ip.IsLoopback()
529}
530
531func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
532	nodes := c.slotNodes(slot)
533	if len(nodes) > 0 {
534		return nodes[0], nil
535	}
536	return c.nodes.Random()
537}
538
539func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
540	nodes := c.slotNodes(slot)
541	switch len(nodes) {
542	case 0:
543		return c.nodes.Random()
544	case 1:
545		return nodes[0], nil
546	case 2:
547		if slave := nodes[1]; !slave.Failing() {
548			return slave, nil
549		}
550		return nodes[0], nil
551	default:
552		var slave *clusterNode
553		for i := 0; i < 10; i++ {
554			n := rand.Intn(len(nodes)-1) + 1
555			slave = nodes[n]
556			if !slave.Failing() {
557				return slave, nil
558			}
559		}
560
561		// All slaves are loading - use master.
562		return nodes[0], nil
563	}
564}
565
566func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
567	nodes := c.slotNodes(slot)
568	if len(nodes) == 0 {
569		return c.nodes.Random()
570	}
571
572	var node *clusterNode
573	for _, n := range nodes {
574		if n.Failing() {
575			continue
576		}
577		if node == nil || n.Latency() < node.Latency() {
578			node = n
579		}
580	}
581	if node != nil {
582		return node, nil
583	}
584
585	// If all nodes are failing - return random node
586	return c.nodes.Random()
587}
588
589func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
590	nodes := c.slotNodes(slot)
591	if len(nodes) == 0 {
592		return c.nodes.Random()
593	}
594	n := rand.Intn(len(nodes))
595	return nodes[n], nil
596}
597
598func (c *clusterState) slotNodes(slot int) []*clusterNode {
599	i := sort.Search(len(c.slots), func(i int) bool {
600		return c.slots[i].end >= slot
601	})
602	if i >= len(c.slots) {
603		return nil
604	}
605	x := c.slots[i]
606	if slot >= x.start && slot <= x.end {
607		return x.nodes
608	}
609	return nil
610}
611
612//------------------------------------------------------------------------------
613
614type clusterStateHolder struct {
615	load func(ctx context.Context) (*clusterState, error)
616
617	state     atomic.Value
618	reloading uint32 // atomic
619}
620
621func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
622	return &clusterStateHolder{
623		load: fn,
624	}
625}
626
627func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
628	state, err := c.load(ctx)
629	if err != nil {
630		return nil, err
631	}
632	c.state.Store(state)
633	return state, nil
634}
635
636func (c *clusterStateHolder) LazyReload() {
637	if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
638		return
639	}
640	go func() {
641		defer atomic.StoreUint32(&c.reloading, 0)
642
643		_, err := c.Reload(context.Background())
644		if err != nil {
645			return
646		}
647		time.Sleep(200 * time.Millisecond)
648	}()
649}
650
651func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
652	v := c.state.Load()
653	if v == nil {
654		return c.Reload(ctx)
655	}
656
657	state := v.(*clusterState)
658	if time.Since(state.createdAt) > 10*time.Second {
659		c.LazyReload()
660	}
661	return state, nil
662}
663
664func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
665	state, err := c.Reload(ctx)
666	if err == nil {
667		return state, nil
668	}
669	return c.Get(ctx)
670}
671
672//------------------------------------------------------------------------------
673
674type clusterClient struct {
675	opt           *ClusterOptions
676	nodes         *clusterNodes
677	state         *clusterStateHolder //nolint:structcheck
678	cmdsInfoCache *cmdsInfoCache      //nolint:structcheck
679}
680
681// ClusterClient is a Redis Cluster client representing a pool of zero
682// or more underlying connections. It's safe for concurrent use by
683// multiple goroutines.
684type ClusterClient struct {
685	*clusterClient
686	cmdable
687	hooks
688	ctx context.Context
689}
690
691// NewClusterClient returns a Redis Cluster client as described in
692// http://redis.io/topics/cluster-spec.
693func NewClusterClient(opt *ClusterOptions) *ClusterClient {
694	opt.init()
695
696	c := &ClusterClient{
697		clusterClient: &clusterClient{
698			opt:   opt,
699			nodes: newClusterNodes(opt),
700		},
701		ctx: context.Background(),
702	}
703	c.state = newClusterStateHolder(c.loadState)
704	c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
705	c.cmdable = c.Process
706
707	if opt.IdleCheckFrequency > 0 {
708		go c.reaper(opt.IdleCheckFrequency)
709	}
710
711	return c
712}
713
714func (c *ClusterClient) Context() context.Context {
715	return c.ctx
716}
717
718func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
719	if ctx == nil {
720		panic("nil context")
721	}
722	clone := *c
723	clone.cmdable = clone.Process
724	clone.hooks.lock()
725	clone.ctx = ctx
726	return &clone
727}
728
729// Options returns read-only Options that were used to create the client.
730func (c *ClusterClient) Options() *ClusterOptions {
731	return c.opt
732}
733
734// ReloadState reloads cluster state. If available it calls ClusterSlots func
735// to get cluster slots information.
736func (c *ClusterClient) ReloadState(ctx context.Context) {
737	c.state.LazyReload()
738}
739
740// Close closes the cluster client, releasing any open resources.
741//
742// It is rare to Close a ClusterClient, as the ClusterClient is meant
743// to be long-lived and shared between many goroutines.
744func (c *ClusterClient) Close() error {
745	return c.nodes.Close()
746}
747
748// Do creates a Cmd from the args and processes the cmd.
749func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
750	cmd := NewCmd(ctx, args...)
751	_ = c.Process(ctx, cmd)
752	return cmd
753}
754
755func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
756	return c.hooks.process(ctx, cmd, c.process)
757}
758
759func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
760	cmdInfo := c.cmdInfo(cmd.Name())
761	slot := c.cmdSlot(cmd)
762
763	var node *clusterNode
764	var ask bool
765	var lastErr error
766	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
767		if attempt > 0 {
768			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
769				return err
770			}
771		}
772
773		if node == nil {
774			var err error
775			node, err = c.cmdNode(ctx, cmdInfo, slot)
776			if err != nil {
777				return err
778			}
779		}
780
781		if ask {
782			pipe := node.Client.Pipeline()
783			_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
784			_ = pipe.Process(ctx, cmd)
785			_, lastErr = pipe.Exec(ctx)
786			_ = pipe.Close()
787			ask = false
788		} else {
789			lastErr = node.Client.Process(ctx, cmd)
790		}
791
792		// If there is no error - we are done.
793		if lastErr == nil {
794			return nil
795		}
796		if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
797			if isReadOnly {
798				c.state.LazyReload()
799			}
800			node = nil
801			continue
802		}
803
804		// If slave is loading - pick another node.
805		if c.opt.ReadOnly && isLoadingError(lastErr) {
806			node.MarkAsFailing()
807			node = nil
808			continue
809		}
810
811		var moved bool
812		var addr string
813		moved, ask, addr = isMovedError(lastErr)
814		if moved || ask {
815			var err error
816			node, err = c.nodes.Get(addr)
817			if err != nil {
818				return err
819			}
820			continue
821		}
822
823		if shouldRetry(lastErr, cmd.readTimeout() == nil) {
824			// First retry the same node.
825			if attempt == 0 {
826				continue
827			}
828
829			// Second try another node.
830			node.MarkAsFailing()
831			node = nil
832			continue
833		}
834
835		return lastErr
836	}
837	return lastErr
838}
839
840// ForEachMaster concurrently calls the fn on each master node in the cluster.
841// It returns the first error if any.
842func (c *ClusterClient) ForEachMaster(
843	ctx context.Context,
844	fn func(ctx context.Context, client *Client) error,
845) error {
846	state, err := c.state.ReloadOrGet(ctx)
847	if err != nil {
848		return err
849	}
850
851	var wg sync.WaitGroup
852	errCh := make(chan error, 1)
853
854	for _, master := range state.Masters {
855		wg.Add(1)
856		go func(node *clusterNode) {
857			defer wg.Done()
858			err := fn(ctx, node.Client)
859			if err != nil {
860				select {
861				case errCh <- err:
862				default:
863				}
864			}
865		}(master)
866	}
867
868	wg.Wait()
869
870	select {
871	case err := <-errCh:
872		return err
873	default:
874		return nil
875	}
876}
877
878// ForEachSlave concurrently calls the fn on each slave node in the cluster.
879// It returns the first error if any.
880func (c *ClusterClient) ForEachSlave(
881	ctx context.Context,
882	fn func(ctx context.Context, client *Client) error,
883) error {
884	state, err := c.state.ReloadOrGet(ctx)
885	if err != nil {
886		return err
887	}
888
889	var wg sync.WaitGroup
890	errCh := make(chan error, 1)
891
892	for _, slave := range state.Slaves {
893		wg.Add(1)
894		go func(node *clusterNode) {
895			defer wg.Done()
896			err := fn(ctx, node.Client)
897			if err != nil {
898				select {
899				case errCh <- err:
900				default:
901				}
902			}
903		}(slave)
904	}
905
906	wg.Wait()
907
908	select {
909	case err := <-errCh:
910		return err
911	default:
912		return nil
913	}
914}
915
916// ForEachShard concurrently calls the fn on each known node in the cluster.
917// It returns the first error if any.
918func (c *ClusterClient) ForEachShard(
919	ctx context.Context,
920	fn func(ctx context.Context, client *Client) error,
921) error {
922	state, err := c.state.ReloadOrGet(ctx)
923	if err != nil {
924		return err
925	}
926
927	var wg sync.WaitGroup
928	errCh := make(chan error, 1)
929
930	worker := func(node *clusterNode) {
931		defer wg.Done()
932		err := fn(ctx, node.Client)
933		if err != nil {
934			select {
935			case errCh <- err:
936			default:
937			}
938		}
939	}
940
941	for _, node := range state.Masters {
942		wg.Add(1)
943		go worker(node)
944	}
945	for _, node := range state.Slaves {
946		wg.Add(1)
947		go worker(node)
948	}
949
950	wg.Wait()
951
952	select {
953	case err := <-errCh:
954		return err
955	default:
956		return nil
957	}
958}
959
960// PoolStats returns accumulated connection pool stats.
961func (c *ClusterClient) PoolStats() *PoolStats {
962	var acc PoolStats
963
964	state, _ := c.state.Get(context.TODO())
965	if state == nil {
966		return &acc
967	}
968
969	for _, node := range state.Masters {
970		s := node.Client.connPool.Stats()
971		acc.Hits += s.Hits
972		acc.Misses += s.Misses
973		acc.Timeouts += s.Timeouts
974
975		acc.TotalConns += s.TotalConns
976		acc.IdleConns += s.IdleConns
977		acc.StaleConns += s.StaleConns
978	}
979
980	for _, node := range state.Slaves {
981		s := node.Client.connPool.Stats()
982		acc.Hits += s.Hits
983		acc.Misses += s.Misses
984		acc.Timeouts += s.Timeouts
985
986		acc.TotalConns += s.TotalConns
987		acc.IdleConns += s.IdleConns
988		acc.StaleConns += s.StaleConns
989	}
990
991	return &acc
992}
993
994func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
995	if c.opt.ClusterSlots != nil {
996		slots, err := c.opt.ClusterSlots(ctx)
997		if err != nil {
998			return nil, err
999		}
1000		return newClusterState(c.nodes, slots, "")
1001	}
1002
1003	addrs, err := c.nodes.Addrs()
1004	if err != nil {
1005		return nil, err
1006	}
1007
1008	var firstErr error
1009
1010	for _, idx := range rand.Perm(len(addrs)) {
1011		addr := addrs[idx]
1012
1013		node, err := c.nodes.Get(addr)
1014		if err != nil {
1015			if firstErr == nil {
1016				firstErr = err
1017			}
1018			continue
1019		}
1020
1021		slots, err := node.Client.ClusterSlots(ctx).Result()
1022		if err != nil {
1023			if firstErr == nil {
1024				firstErr = err
1025			}
1026			continue
1027		}
1028
1029		return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1030	}
1031
1032	/*
1033	 * No node is connectable. It's possible that all nodes' IP has changed.
1034	 * Clear activeAddrs to let client be able to re-connect using the initial
1035	 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
1036	 * which might have chance to resolve domain name and get updated IP address.
1037	 */
1038	c.nodes.mu.Lock()
1039	c.nodes.activeAddrs = nil
1040	c.nodes.mu.Unlock()
1041
1042	return nil, firstErr
1043}
1044
1045// reaper closes idle connections to the cluster.
1046func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1047	ticker := time.NewTicker(idleCheckFrequency)
1048	defer ticker.Stop()
1049
1050	for range ticker.C {
1051		nodes, err := c.nodes.All()
1052		if err != nil {
1053			break
1054		}
1055
1056		for _, node := range nodes {
1057			_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1058			if err != nil {
1059				internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
1060			}
1061		}
1062	}
1063}
1064
1065func (c *ClusterClient) Pipeline() Pipeliner {
1066	pipe := Pipeline{
1067		ctx:  c.ctx,
1068		exec: c.processPipeline,
1069	}
1070	pipe.init()
1071	return &pipe
1072}
1073
1074func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1075	return c.Pipeline().Pipelined(ctx, fn)
1076}
1077
1078func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1079	return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
1080}
1081
1082func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
1083	cmdsMap := newCmdsMap()
1084	err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
1085	if err != nil {
1086		setCmdsErr(cmds, err)
1087		return err
1088	}
1089
1090	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1091		if attempt > 0 {
1092			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1093				setCmdsErr(cmds, err)
1094				return err
1095			}
1096		}
1097
1098		failedCmds := newCmdsMap()
1099		var wg sync.WaitGroup
1100
1101		for node, cmds := range cmdsMap.m {
1102			wg.Add(1)
1103			go func(node *clusterNode, cmds []Cmder) {
1104				defer wg.Done()
1105
1106				err := c._processPipelineNode(ctx, node, cmds, failedCmds)
1107				if err == nil {
1108					return
1109				}
1110				if attempt < c.opt.MaxRedirects {
1111					if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1112						setCmdsErr(cmds, err)
1113					}
1114				} else {
1115					setCmdsErr(cmds, err)
1116				}
1117			}(node, cmds)
1118		}
1119
1120		wg.Wait()
1121		if len(failedCmds.m) == 0 {
1122			break
1123		}
1124		cmdsMap = failedCmds
1125	}
1126
1127	return cmdsFirstErr(cmds)
1128}
1129
1130func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1131	state, err := c.state.Get(ctx)
1132	if err != nil {
1133		return err
1134	}
1135
1136	if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
1137		for _, cmd := range cmds {
1138			slot := c.cmdSlot(cmd)
1139			node, err := c.slotReadOnlyNode(state, slot)
1140			if err != nil {
1141				return err
1142			}
1143			cmdsMap.Add(node, cmd)
1144		}
1145		return nil
1146	}
1147
1148	for _, cmd := range cmds {
1149		slot := c.cmdSlot(cmd)
1150		node, err := state.slotMasterNode(slot)
1151		if err != nil {
1152			return err
1153		}
1154		cmdsMap.Add(node, cmd)
1155	}
1156	return nil
1157}
1158
1159func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1160	for _, cmd := range cmds {
1161		cmdInfo := c.cmdInfo(cmd.Name())
1162		if cmdInfo == nil || !cmdInfo.ReadOnly {
1163			return false
1164		}
1165	}
1166	return true
1167}
1168
1169func (c *ClusterClient) _processPipelineNode(
1170	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1171) error {
1172	return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1173		return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1174			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1175				return writeCmds(wr, cmds)
1176			})
1177			if err != nil {
1178				return err
1179			}
1180
1181			return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1182				return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1183			})
1184		})
1185	})
1186}
1187
1188func (c *ClusterClient) pipelineReadCmds(
1189	ctx context.Context,
1190	node *clusterNode,
1191	rd *proto.Reader,
1192	cmds []Cmder,
1193	failedCmds *cmdsMap,
1194) error {
1195	for _, cmd := range cmds {
1196		err := cmd.readReply(rd)
1197		cmd.SetErr(err)
1198
1199		if err == nil {
1200			continue
1201		}
1202
1203		if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1204			continue
1205		}
1206
1207		if c.opt.ReadOnly && isLoadingError(err) {
1208			node.MarkAsFailing()
1209			return err
1210		}
1211		if isRedisError(err) {
1212			continue
1213		}
1214		return err
1215	}
1216	return nil
1217}
1218
1219func (c *ClusterClient) checkMovedErr(
1220	ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
1221) bool {
1222	moved, ask, addr := isMovedError(err)
1223	if !moved && !ask {
1224		return false
1225	}
1226
1227	node, err := c.nodes.Get(addr)
1228	if err != nil {
1229		return false
1230	}
1231
1232	if moved {
1233		c.state.LazyReload()
1234		failedCmds.Add(node, cmd)
1235		return true
1236	}
1237
1238	if ask {
1239		failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1240		return true
1241	}
1242
1243	panic("not reached")
1244}
1245
1246// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1247func (c *ClusterClient) TxPipeline() Pipeliner {
1248	pipe := Pipeline{
1249		ctx:  c.ctx,
1250		exec: c.processTxPipeline,
1251	}
1252	pipe.init()
1253	return &pipe
1254}
1255
1256func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1257	return c.TxPipeline().Pipelined(ctx, fn)
1258}
1259
1260func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1261	return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
1262}
1263
1264func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
1265	// Trim multi .. exec.
1266	cmds = cmds[1 : len(cmds)-1]
1267
1268	state, err := c.state.Get(ctx)
1269	if err != nil {
1270		setCmdsErr(cmds, err)
1271		return err
1272	}
1273
1274	cmdsMap := c.mapCmdsBySlot(cmds)
1275	for slot, cmds := range cmdsMap {
1276		node, err := state.slotMasterNode(slot)
1277		if err != nil {
1278			setCmdsErr(cmds, err)
1279			continue
1280		}
1281
1282		cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1283		for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1284			if attempt > 0 {
1285				if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1286					setCmdsErr(cmds, err)
1287					return err
1288				}
1289			}
1290
1291			failedCmds := newCmdsMap()
1292			var wg sync.WaitGroup
1293
1294			for node, cmds := range cmdsMap {
1295				wg.Add(1)
1296				go func(node *clusterNode, cmds []Cmder) {
1297					defer wg.Done()
1298
1299					err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1300					if err == nil {
1301						return
1302					}
1303
1304					if attempt < c.opt.MaxRedirects {
1305						if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1306							setCmdsErr(cmds, err)
1307						}
1308					} else {
1309						setCmdsErr(cmds, err)
1310					}
1311				}(node, cmds)
1312			}
1313
1314			wg.Wait()
1315			if len(failedCmds.m) == 0 {
1316				break
1317			}
1318			cmdsMap = failedCmds.m
1319		}
1320	}
1321
1322	return cmdsFirstErr(cmds)
1323}
1324
1325func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1326	cmdsMap := make(map[int][]Cmder)
1327	for _, cmd := range cmds {
1328		slot := c.cmdSlot(cmd)
1329		cmdsMap[slot] = append(cmdsMap[slot], cmd)
1330	}
1331	return cmdsMap
1332}
1333
1334func (c *ClusterClient) _processTxPipelineNode(
1335	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1336) error {
1337	return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1338		return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1339			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1340				return writeCmds(wr, cmds)
1341			})
1342			if err != nil {
1343				return err
1344			}
1345
1346			return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1347				statusCmd := cmds[0].(*StatusCmd)
1348				// Trim multi and exec.
1349				cmds = cmds[1 : len(cmds)-1]
1350
1351				err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
1352				if err != nil {
1353					moved, ask, addr := isMovedError(err)
1354					if moved || ask {
1355						return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
1356					}
1357					return err
1358				}
1359
1360				return pipelineReadCmds(rd, cmds)
1361			})
1362		})
1363	})
1364}
1365
1366func (c *ClusterClient) txPipelineReadQueued(
1367	ctx context.Context,
1368	rd *proto.Reader,
1369	statusCmd *StatusCmd,
1370	cmds []Cmder,
1371	failedCmds *cmdsMap,
1372) error {
1373	// Parse queued replies.
1374	if err := statusCmd.readReply(rd); err != nil {
1375		return err
1376	}
1377
1378	for _, cmd := range cmds {
1379		err := statusCmd.readReply(rd)
1380		if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
1381			continue
1382		}
1383		return err
1384	}
1385
1386	// Parse number of replies.
1387	line, err := rd.ReadLine()
1388	if err != nil {
1389		if err == Nil {
1390			err = TxFailedErr
1391		}
1392		return err
1393	}
1394
1395	switch line[0] {
1396	case proto.ErrorReply:
1397		return proto.ParseErrorReply(line)
1398	case proto.ArrayReply:
1399		// ok
1400	default:
1401		return fmt.Errorf("redis: expected '*', but got line %q", line)
1402	}
1403
1404	return nil
1405}
1406
1407func (c *ClusterClient) cmdsMoved(
1408	ctx context.Context, cmds []Cmder,
1409	moved, ask bool,
1410	addr string,
1411	failedCmds *cmdsMap,
1412) error {
1413	node, err := c.nodes.Get(addr)
1414	if err != nil {
1415		return err
1416	}
1417
1418	if moved {
1419		c.state.LazyReload()
1420		for _, cmd := range cmds {
1421			failedCmds.Add(node, cmd)
1422		}
1423		return nil
1424	}
1425
1426	if ask {
1427		for _, cmd := range cmds {
1428			failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1429		}
1430		return nil
1431	}
1432
1433	return nil
1434}
1435
1436func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
1437	if len(keys) == 0 {
1438		return fmt.Errorf("redis: Watch requires at least one key")
1439	}
1440
1441	slot := hashtag.Slot(keys[0])
1442	for _, key := range keys[1:] {
1443		if hashtag.Slot(key) != slot {
1444			err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
1445			return err
1446		}
1447	}
1448
1449	node, err := c.slotMasterNode(ctx, slot)
1450	if err != nil {
1451		return err
1452	}
1453
1454	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1455		if attempt > 0 {
1456			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1457				return err
1458			}
1459		}
1460
1461		err = node.Client.Watch(ctx, fn, keys...)
1462		if err == nil {
1463			break
1464		}
1465
1466		moved, ask, addr := isMovedError(err)
1467		if moved || ask {
1468			node, err = c.nodes.Get(addr)
1469			if err != nil {
1470				return err
1471			}
1472			continue
1473		}
1474
1475		if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
1476			if isReadOnly {
1477				c.state.LazyReload()
1478			}
1479			node, err = c.slotMasterNode(ctx, slot)
1480			if err != nil {
1481				return err
1482			}
1483			continue
1484		}
1485
1486		if shouldRetry(err, true) {
1487			continue
1488		}
1489
1490		return err
1491	}
1492
1493	return err
1494}
1495
1496func (c *ClusterClient) pubSub() *PubSub {
1497	var node *clusterNode
1498	pubsub := &PubSub{
1499		opt: c.opt.clientOptions(),
1500
1501		newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
1502			if node != nil {
1503				panic("node != nil")
1504			}
1505
1506			var err error
1507			if len(channels) > 0 {
1508				slot := hashtag.Slot(channels[0])
1509				node, err = c.slotMasterNode(ctx, slot)
1510			} else {
1511				node, err = c.nodes.Random()
1512			}
1513			if err != nil {
1514				return nil, err
1515			}
1516
1517			cn, err := node.Client.newConn(context.TODO())
1518			if err != nil {
1519				node = nil
1520
1521				return nil, err
1522			}
1523
1524			return cn, nil
1525		},
1526		closeConn: func(cn *pool.Conn) error {
1527			err := node.Client.connPool.CloseConn(cn)
1528			node = nil
1529			return err
1530		},
1531	}
1532	pubsub.init()
1533
1534	return pubsub
1535}
1536
1537// Subscribe subscribes the client to the specified channels.
1538// Channels can be omitted to create empty subscription.
1539func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
1540	pubsub := c.pubSub()
1541	if len(channels) > 0 {
1542		_ = pubsub.Subscribe(ctx, channels...)
1543	}
1544	return pubsub
1545}
1546
1547// PSubscribe subscribes the client to the given patterns.
1548// Patterns can be omitted to create empty subscription.
1549func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
1550	pubsub := c.pubSub()
1551	if len(channels) > 0 {
1552		_ = pubsub.PSubscribe(ctx, channels...)
1553	}
1554	return pubsub
1555}
1556
1557func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
1558	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
1559}
1560
1561func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
1562	// Try 3 random nodes.
1563	const nodeLimit = 3
1564
1565	addrs, err := c.nodes.Addrs()
1566	if err != nil {
1567		return nil, err
1568	}
1569
1570	var firstErr error
1571
1572	perm := rand.Perm(len(addrs))
1573	if len(perm) > nodeLimit {
1574		perm = perm[:nodeLimit]
1575	}
1576
1577	for _, idx := range perm {
1578		addr := addrs[idx]
1579
1580		node, err := c.nodes.Get(addr)
1581		if err != nil {
1582			if firstErr == nil {
1583				firstErr = err
1584			}
1585			continue
1586		}
1587
1588		info, err := node.Client.Command(ctx).Result()
1589		if err == nil {
1590			return info, nil
1591		}
1592		if firstErr == nil {
1593			firstErr = err
1594		}
1595	}
1596
1597	if firstErr == nil {
1598		panic("not reached")
1599	}
1600	return nil, firstErr
1601}
1602
1603func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
1604	cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
1605	if err != nil {
1606		return nil
1607	}
1608
1609	info := cmdsInfo[name]
1610	if info == nil {
1611		internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
1612	}
1613	return info
1614}
1615
1616func (c *ClusterClient) cmdSlot(cmd Cmder) int {
1617	args := cmd.Args()
1618	if args[0] == "cluster" && args[1] == "getkeysinslot" {
1619		return args[2].(int)
1620	}
1621
1622	cmdInfo := c.cmdInfo(cmd.Name())
1623	return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
1624}
1625
1626func cmdSlot(cmd Cmder, pos int) int {
1627	if pos == 0 {
1628		return hashtag.RandomSlot()
1629	}
1630	firstKey := cmd.stringArg(pos)
1631	return hashtag.Slot(firstKey)
1632}
1633
1634func (c *ClusterClient) cmdNode(
1635	ctx context.Context,
1636	cmdInfo *CommandInfo,
1637	slot int,
1638) (*clusterNode, error) {
1639	state, err := c.state.Get(ctx)
1640	if err != nil {
1641		return nil, err
1642	}
1643
1644	if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
1645		return c.slotReadOnlyNode(state, slot)
1646	}
1647	return state.slotMasterNode(slot)
1648}
1649
1650func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
1651	if c.opt.RouteByLatency {
1652		return state.slotClosestNode(slot)
1653	}
1654	if c.opt.RouteRandomly {
1655		return state.slotRandomNode(slot)
1656	}
1657	return state.slotSlaveNode(slot)
1658}
1659
1660func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
1661	state, err := c.state.Get(ctx)
1662	if err != nil {
1663		return nil, err
1664	}
1665	return state.slotMasterNode(slot)
1666}
1667
1668// SlaveForKey gets a client for a replica node to run any command on it.
1669// This is especially useful if we want to run a particular lua script which has
1670// only read only commands on the replica.
1671// This is because other redis commands generally have a flag that points that
1672// they are read only and automatically run on the replica nodes
1673// if ClusterOptions.ReadOnly flag is set to true.
1674func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
1675	state, err := c.state.Get(ctx)
1676	if err != nil {
1677		return nil, err
1678	}
1679	slot := hashtag.Slot(key)
1680	node, err := c.slotReadOnlyNode(state, slot)
1681	if err != nil {
1682		return nil, err
1683	}
1684	return node.Client, err
1685}
1686
1687// MasterForKey return a client to the master node for a particular key.
1688func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
1689	slot := hashtag.Slot(key)
1690	node, err := c.slotMasterNode(ctx, slot)
1691	if err != nil {
1692		return nil, err
1693	}
1694	return node.Client, err
1695}
1696
1697func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1698	for _, n := range nodes {
1699		if n == node {
1700			return nodes
1701		}
1702	}
1703	return append(nodes, node)
1704}
1705
1706func appendIfNotExists(ss []string, es ...string) []string {
1707loop:
1708	for _, e := range es {
1709		for _, s := range ss {
1710			if s == e {
1711				continue loop
1712			}
1713		}
1714		ss = append(ss, e)
1715	}
1716	return ss
1717}
1718
1719//------------------------------------------------------------------------------
1720
1721type cmdsMap struct {
1722	mu sync.Mutex
1723	m  map[*clusterNode][]Cmder
1724}
1725
1726func newCmdsMap() *cmdsMap {
1727	return &cmdsMap{
1728		m: make(map[*clusterNode][]Cmder),
1729	}
1730}
1731
1732func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
1733	m.mu.Lock()
1734	m.m[node] = append(m.m[node], cmds...)
1735	m.mu.Unlock()
1736}
1737