1package redis_test
2
3import (
4	"net"
5
6	"github.com/go-redis/redis/v8"
7
8	. "github.com/onsi/ginkgo"
9	. "github.com/onsi/gomega"
10)
11
12var _ = Describe("Sentinel", func() {
13	var client *redis.Client
14	var master *redis.Client
15	var masterPort string
16
17	BeforeEach(func() {
18		client = redis.NewFailoverClient(&redis.FailoverOptions{
19			MasterName:    sentinelName,
20			SentinelAddrs: sentinelAddrs,
21			MaxRetries:    -1,
22		})
23		Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
24
25		sentinel := redis.NewSentinelClient(&redis.Options{
26			Addr:       ":" + sentinelPort1,
27			MaxRetries: -1,
28		})
29
30		addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
31		Expect(err).NotTo(HaveOccurred())
32
33		master = redis.NewClient(&redis.Options{
34			Addr:       net.JoinHostPort(addr[0], addr[1]),
35			MaxRetries: -1,
36		})
37		masterPort = addr[1]
38
39		// Wait until slaves are picked up by sentinel.
40		Eventually(func() string {
41			return sentinel1.Info(ctx).Val()
42		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
43		Eventually(func() string {
44			return sentinel2.Info(ctx).Val()
45		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
46		Eventually(func() string {
47			return sentinel3.Info(ctx).Val()
48		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
49	})
50
51	AfterEach(func() {
52		_ = client.Close()
53		_ = master.Close()
54	})
55
56	It("should facilitate failover", func() {
57		// Set value on master.
58		err := client.Set(ctx, "foo", "master", 0).Err()
59		Expect(err).NotTo(HaveOccurred())
60
61		// Verify.
62		val, err := client.Get(ctx, "foo").Result()
63		Expect(err).NotTo(HaveOccurred())
64		Expect(val).To(Equal("master"))
65
66		// Create subscription.
67		ch := client.Subscribe(ctx, "foo").Channel()
68
69		// Kill master.
70		err = master.Shutdown(ctx).Err()
71		Expect(err).NotTo(HaveOccurred())
72		Eventually(func() error {
73			return master.Ping(ctx).Err()
74		}, "15s", "100ms").Should(HaveOccurred())
75
76		// Check that client picked up new master.
77		Eventually(func() string {
78			return client.Get(ctx, "foo").Val()
79		}, "15s", "100ms").Should(Equal("master"))
80
81		// Check if subscription is renewed.
82		var msg *redis.Message
83		Eventually(func() <-chan *redis.Message {
84			_ = client.Publish(ctx, "foo", "hello").Err()
85			return ch
86		}, "15s", "100ms").Should(Receive(&msg))
87		Expect(msg.Channel).To(Equal("foo"))
88		Expect(msg.Payload).To(Equal("hello"))
89
90		_, err = startRedis(masterPort)
91		Expect(err).NotTo(HaveOccurred())
92	})
93
94	It("supports DB selection", func() {
95		Expect(client.Close()).NotTo(HaveOccurred())
96
97		client = redis.NewFailoverClient(&redis.FailoverOptions{
98			MasterName:    sentinelName,
99			SentinelAddrs: sentinelAddrs,
100			DB:            1,
101		})
102		err := client.Ping(ctx).Err()
103		Expect(err).NotTo(HaveOccurred())
104	})
105})
106
107var _ = Describe("NewFailoverClusterClient", func() {
108	var client *redis.ClusterClient
109	var master *redis.Client
110	var masterPort string
111
112	BeforeEach(func() {
113		client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
114			MasterName:    sentinelName,
115			SentinelAddrs: sentinelAddrs,
116
117			RouteRandomly: true,
118		})
119		Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
120
121		sentinel := redis.NewSentinelClient(&redis.Options{
122			Addr:       ":" + sentinelPort1,
123			MaxRetries: -1,
124		})
125
126		addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
127		Expect(err).NotTo(HaveOccurred())
128
129		master = redis.NewClient(&redis.Options{
130			Addr:       net.JoinHostPort(addr[0], addr[1]),
131			MaxRetries: -1,
132		})
133		masterPort = addr[1]
134
135		// Wait until slaves are picked up by sentinel.
136		Eventually(func() string {
137			return sentinel1.Info(ctx).Val()
138		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
139		Eventually(func() string {
140			return sentinel2.Info(ctx).Val()
141		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
142		Eventually(func() string {
143			return sentinel3.Info(ctx).Val()
144		}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
145	})
146
147	AfterEach(func() {
148		_ = client.Close()
149		_ = master.Close()
150	})
151
152	It("should facilitate failover", func() {
153		// Set value.
154		err := client.Set(ctx, "foo", "master", 0).Err()
155		Expect(err).NotTo(HaveOccurred())
156
157		for i := 0; i < 100; i++ {
158			// Verify.
159			Eventually(func() string {
160				return client.Get(ctx, "foo").Val()
161			}, "15s", "1ms").Should(Equal("master"))
162		}
163
164		// Create subscription.
165		ch := client.Subscribe(ctx, "foo").Channel()
166
167		// Kill master.
168		err = master.Shutdown(ctx).Err()
169		Expect(err).NotTo(HaveOccurred())
170		Eventually(func() error {
171			return sentinelMaster.Ping(ctx).Err()
172		}, "15s", "100ms").Should(HaveOccurred())
173
174		// Check that client picked up new master.
175		Eventually(func() string {
176			return client.Get(ctx, "foo").Val()
177		}, "15s", "100ms").Should(Equal("master"))
178
179		// Check if subscription is renewed.
180		var msg *redis.Message
181		Eventually(func() <-chan *redis.Message {
182			_ = client.Publish(ctx, "foo", "hello").Err()
183			return ch
184		}, "15s", "100ms").Should(Receive(&msg))
185		Expect(msg.Channel).To(Equal("foo"))
186		Expect(msg.Payload).To(Equal("hello"))
187
188		_, err = startRedis(masterPort)
189		Expect(err).NotTo(HaveOccurred())
190	})
191})
192