1package redis_test
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"net"
8	"os"
9	"os/exec"
10	"path/filepath"
11	"sync"
12	"testing"
13	"time"
14
15	"github.com/go-redis/redis/v7"
16
17	. "github.com/onsi/ginkgo"
18	. "github.com/onsi/gomega"
19)
20
21const (
22	redisPort          = "6380"
23	redisAddr          = ":" + redisPort
24	redisSecondaryPort = "6381"
25)
26
27const (
28	ringShard1Port = "6390"
29	ringShard2Port = "6391"
30	ringShard3Port = "6392"
31)
32
33const (
34	sentinelName       = "mymaster"
35	sentinelMasterPort = "8123"
36	sentinelSlave1Port = "8124"
37	sentinelSlave2Port = "8125"
38	sentinelPort       = "8126"
39)
40
41var (
42	redisMain                                                *redisProcess
43	ringShard1, ringShard2, ringShard3                       *redisProcess
44	sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
45)
46
47var cluster = &clusterScenario{
48	ports:     []string{"8220", "8221", "8222", "8223", "8224", "8225"},
49	nodeIDs:   make([]string, 6),
50	processes: make(map[string]*redisProcess, 6),
51	clients:   make(map[string]*redis.Client, 6),
52}
53
54var _ = BeforeSuite(func() {
55	var err error
56
57	redisMain, err = startRedis(redisPort)
58	Expect(err).NotTo(HaveOccurred())
59
60	ringShard1, err = startRedis(ringShard1Port)
61	Expect(err).NotTo(HaveOccurred())
62
63	ringShard2, err = startRedis(ringShard2Port)
64	Expect(err).NotTo(HaveOccurred())
65
66	ringShard3, err = startRedis(ringShard3Port)
67	Expect(err).NotTo(HaveOccurred())
68
69	sentinelMaster, err = startRedis(sentinelMasterPort)
70	Expect(err).NotTo(HaveOccurred())
71
72	sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
73	Expect(err).NotTo(HaveOccurred())
74
75	sentinelSlave1, err = startRedis(
76		sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
77	Expect(err).NotTo(HaveOccurred())
78
79	sentinelSlave2, err = startRedis(
80		sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
81	Expect(err).NotTo(HaveOccurred())
82
83	Expect(startCluster(cluster)).NotTo(HaveOccurred())
84})
85
86var _ = AfterSuite(func() {
87	Expect(redisMain.Close()).NotTo(HaveOccurred())
88
89	Expect(ringShard1.Close()).NotTo(HaveOccurred())
90	Expect(ringShard2.Close()).NotTo(HaveOccurred())
91	Expect(ringShard3.Close()).NotTo(HaveOccurred())
92
93	Expect(sentinel.Close()).NotTo(HaveOccurred())
94	Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
95	Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
96	Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
97
98	Expect(stopCluster(cluster)).NotTo(HaveOccurred())
99})
100
101func TestGinkgoSuite(t *testing.T) {
102	RegisterFailHandler(Fail)
103	RunSpecs(t, "go-redis")
104}
105
106//------------------------------------------------------------------------------
107
108func redisOptions() *redis.Options {
109	return &redis.Options{
110		Addr:               redisAddr,
111		DB:                 15,
112		DialTimeout:        10 * time.Second,
113		ReadTimeout:        30 * time.Second,
114		WriteTimeout:       30 * time.Second,
115		PoolSize:           10,
116		PoolTimeout:        30 * time.Second,
117		IdleTimeout:        time.Minute,
118		IdleCheckFrequency: 100 * time.Millisecond,
119	}
120}
121
122func redisClusterOptions() *redis.ClusterOptions {
123	return &redis.ClusterOptions{
124		DialTimeout:        10 * time.Second,
125		ReadTimeout:        30 * time.Second,
126		WriteTimeout:       30 * time.Second,
127		PoolSize:           10,
128		PoolTimeout:        30 * time.Second,
129		IdleTimeout:        time.Minute,
130		IdleCheckFrequency: 100 * time.Millisecond,
131	}
132}
133
134func redisRingOptions() *redis.RingOptions {
135	return &redis.RingOptions{
136		Addrs: map[string]string{
137			"ringShardOne": ":" + ringShard1Port,
138			"ringShardTwo": ":" + ringShard2Port,
139		},
140		DialTimeout:        10 * time.Second,
141		ReadTimeout:        30 * time.Second,
142		WriteTimeout:       30 * time.Second,
143		PoolSize:           10,
144		PoolTimeout:        30 * time.Second,
145		IdleTimeout:        time.Minute,
146		IdleCheckFrequency: 100 * time.Millisecond,
147	}
148}
149
150func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
151	var wg sync.WaitGroup
152	for _, cb := range cbs {
153		for i := 0; i < n; i++ {
154			wg.Add(1)
155			go func(cb func(int), i int) {
156				defer GinkgoRecover()
157				defer wg.Done()
158
159				cb(i)
160			}(cb, i)
161		}
162	}
163	return &wg
164}
165
166func perform(n int, cbs ...func(int)) {
167	wg := performAsync(n, cbs...)
168	wg.Wait()
169}
170
171func eventually(fn func() error, timeout time.Duration) error {
172	errCh := make(chan error, 1)
173	done := make(chan struct{})
174	exit := make(chan struct{})
175
176	go func() {
177		for {
178			err := fn()
179			if err == nil {
180				close(done)
181				return
182			}
183
184			select {
185			case errCh <- err:
186			default:
187			}
188
189			select {
190			case <-exit:
191				return
192			case <-time.After(timeout / 100):
193			}
194		}
195	}()
196
197	select {
198	case <-done:
199		return nil
200	case <-time.After(timeout):
201		close(exit)
202		select {
203		case err := <-errCh:
204			return err
205		default:
206			return fmt.Errorf("timeout after %s without an error", timeout)
207		}
208	}
209}
210
211func execCmd(name string, args ...string) (*os.Process, error) {
212	cmd := exec.Command(name, args...)
213	if testing.Verbose() {
214		cmd.Stdout = os.Stdout
215		cmd.Stderr = os.Stderr
216	}
217	return cmd.Process, cmd.Start()
218}
219
220func connectTo(port string) (*redis.Client, error) {
221	client := redis.NewClient(&redis.Options{
222		Addr: ":" + port,
223	})
224
225	err := eventually(func() error {
226		return client.Ping().Err()
227	}, 30*time.Second)
228	if err != nil {
229		return nil, err
230	}
231
232	return client, nil
233}
234
235type redisProcess struct {
236	*os.Process
237	*redis.Client
238}
239
240func (p *redisProcess) Close() error {
241	if err := p.Kill(); err != nil {
242		return err
243	}
244
245	err := eventually(func() error {
246		if err := p.Client.Ping().Err(); err != nil {
247			return nil
248		}
249		return errors.New("client is not shutdown")
250	}, 10*time.Second)
251	if err != nil {
252		return err
253	}
254
255	p.Client.Close()
256	return nil
257}
258
259var (
260	redisServerBin, _  = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
261	redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf"))
262)
263
264func redisDir(port string) (string, error) {
265	dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
266	if err != nil {
267		return "", err
268	}
269	if err := os.RemoveAll(dir); err != nil {
270		return "", err
271	}
272	if err := os.MkdirAll(dir, 0775); err != nil {
273		return "", err
274	}
275	return dir, nil
276}
277
278func startRedis(port string, args ...string) (*redisProcess, error) {
279	dir, err := redisDir(port)
280	if err != nil {
281		return nil, err
282	}
283	if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
284		return nil, err
285	}
286
287	baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
288	process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
289	if err != nil {
290		return nil, err
291	}
292
293	client, err := connectTo(port)
294	if err != nil {
295		process.Kill()
296		return nil, err
297	}
298	return &redisProcess{process, client}, err
299}
300
301func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
302	dir, err := redisDir(port)
303	if err != nil {
304		return nil, err
305	}
306	process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
307	if err != nil {
308		return nil, err
309	}
310	client, err := connectTo(port)
311	if err != nil {
312		process.Kill()
313		return nil, err
314	}
315	for _, cmd := range []*redis.StatusCmd{
316		redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
317		redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
318		redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
319		redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
320	} {
321		client.Process(cmd)
322		if err := cmd.Err(); err != nil {
323			process.Kill()
324			return nil, err
325		}
326	}
327	return &redisProcess{process, client}, nil
328}
329
330//------------------------------------------------------------------------------
331
332type badConnError string
333
334func (e badConnError) Error() string   { return string(e) }
335func (e badConnError) Timeout() bool   { return false }
336func (e badConnError) Temporary() bool { return false }
337
338type badConn struct {
339	net.TCPConn
340
341	readDelay, writeDelay time.Duration
342	readErr, writeErr     error
343}
344
345var _ net.Conn = &badConn{}
346
347func (cn *badConn) SetReadDeadline(t time.Time) error {
348	return nil
349}
350
351func (cn *badConn) SetWriteDeadline(t time.Time) error {
352	return nil
353}
354
355func (cn *badConn) Read([]byte) (int, error) {
356	if cn.readDelay != 0 {
357		time.Sleep(cn.readDelay)
358	}
359	if cn.readErr != nil {
360		return 0, cn.readErr
361	}
362	return 0, badConnError("bad connection")
363}
364
365func (cn *badConn) Write([]byte) (int, error) {
366	if cn.writeDelay != 0 {
367		time.Sleep(cn.writeDelay)
368	}
369	if cn.writeErr != nil {
370		return 0, cn.writeErr
371	}
372	return 0, badConnError("bad connection")
373}
374
375//------------------------------------------------------------------------------
376
377type hook struct {
378	beforeProcess func(ctx context.Context, cmd redis.Cmder) (context.Context, error)
379	afterProcess  func(ctx context.Context, cmd redis.Cmder) error
380
381	beforeProcessPipeline func(ctx context.Context, cmds []redis.Cmder) (context.Context, error)
382	afterProcessPipeline  func(ctx context.Context, cmds []redis.Cmder) error
383}
384
385func (h *hook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
386	if h.beforeProcess != nil {
387		return h.beforeProcess(ctx, cmd)
388	}
389	return ctx, nil
390}
391
392func (h *hook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
393	if h.afterProcess != nil {
394		return h.afterProcess(ctx, cmd)
395	}
396	return nil
397}
398
399func (h *hook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
400	if h.beforeProcessPipeline != nil {
401		return h.beforeProcessPipeline(ctx, cmds)
402	}
403	return ctx, nil
404}
405
406func (h *hook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
407	if h.afterProcessPipeline != nil {
408		return h.afterProcessPipeline(ctx, cmds)
409	}
410	return nil
411}
412