1package redis_test 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "net" 8 "os" 9 "os/exec" 10 "path/filepath" 11 "sync" 12 "testing" 13 "time" 14 15 "github.com/go-redis/redis/v7" 16 17 . "github.com/onsi/ginkgo" 18 . "github.com/onsi/gomega" 19) 20 21const ( 22 redisPort = "6380" 23 redisAddr = ":" + redisPort 24 redisSecondaryPort = "6381" 25) 26 27const ( 28 ringShard1Port = "6390" 29 ringShard2Port = "6391" 30 ringShard3Port = "6392" 31) 32 33const ( 34 sentinelName = "mymaster" 35 sentinelMasterPort = "8123" 36 sentinelSlave1Port = "8124" 37 sentinelSlave2Port = "8125" 38 sentinelPort = "8126" 39) 40 41var ( 42 redisMain *redisProcess 43 ringShard1, ringShard2, ringShard3 *redisProcess 44 sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess 45) 46 47var cluster = &clusterScenario{ 48 ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, 49 nodeIDs: make([]string, 6), 50 processes: make(map[string]*redisProcess, 6), 51 clients: make(map[string]*redis.Client, 6), 52} 53 54var _ = BeforeSuite(func() { 55 var err error 56 57 redisMain, err = startRedis(redisPort) 58 Expect(err).NotTo(HaveOccurred()) 59 60 ringShard1, err = startRedis(ringShard1Port) 61 Expect(err).NotTo(HaveOccurred()) 62 63 ringShard2, err = startRedis(ringShard2Port) 64 Expect(err).NotTo(HaveOccurred()) 65 66 ringShard3, err = startRedis(ringShard3Port) 67 Expect(err).NotTo(HaveOccurred()) 68 69 sentinelMaster, err = startRedis(sentinelMasterPort) 70 Expect(err).NotTo(HaveOccurred()) 71 72 sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort) 73 Expect(err).NotTo(HaveOccurred()) 74 75 sentinelSlave1, err = startRedis( 76 sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort) 77 Expect(err).NotTo(HaveOccurred()) 78 79 sentinelSlave2, err = startRedis( 80 sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort) 81 Expect(err).NotTo(HaveOccurred()) 82 83 Expect(startCluster(cluster)).NotTo(HaveOccurred()) 84}) 85 86var _ = AfterSuite(func() { 87 Expect(redisMain.Close()).NotTo(HaveOccurred()) 88 89 Expect(ringShard1.Close()).NotTo(HaveOccurred()) 90 Expect(ringShard2.Close()).NotTo(HaveOccurred()) 91 Expect(ringShard3.Close()).NotTo(HaveOccurred()) 92 93 Expect(sentinel.Close()).NotTo(HaveOccurred()) 94 Expect(sentinelSlave1.Close()).NotTo(HaveOccurred()) 95 Expect(sentinelSlave2.Close()).NotTo(HaveOccurred()) 96 Expect(sentinelMaster.Close()).NotTo(HaveOccurred()) 97 98 Expect(stopCluster(cluster)).NotTo(HaveOccurred()) 99}) 100 101func TestGinkgoSuite(t *testing.T) { 102 RegisterFailHandler(Fail) 103 RunSpecs(t, "go-redis") 104} 105 106//------------------------------------------------------------------------------ 107 108func redisOptions() *redis.Options { 109 return &redis.Options{ 110 Addr: redisAddr, 111 DB: 15, 112 DialTimeout: 10 * time.Second, 113 ReadTimeout: 30 * time.Second, 114 WriteTimeout: 30 * time.Second, 115 PoolSize: 10, 116 PoolTimeout: 30 * time.Second, 117 IdleTimeout: time.Minute, 118 IdleCheckFrequency: 100 * time.Millisecond, 119 } 120} 121 122func redisClusterOptions() *redis.ClusterOptions { 123 return &redis.ClusterOptions{ 124 DialTimeout: 10 * time.Second, 125 ReadTimeout: 30 * time.Second, 126 WriteTimeout: 30 * time.Second, 127 PoolSize: 10, 128 PoolTimeout: 30 * time.Second, 129 IdleTimeout: time.Minute, 130 IdleCheckFrequency: 100 * time.Millisecond, 131 } 132} 133 134func redisRingOptions() *redis.RingOptions { 135 return &redis.RingOptions{ 136 Addrs: map[string]string{ 137 "ringShardOne": ":" + ringShard1Port, 138 "ringShardTwo": ":" + ringShard2Port, 139 }, 140 DialTimeout: 10 * time.Second, 141 ReadTimeout: 30 * time.Second, 142 WriteTimeout: 30 * time.Second, 143 PoolSize: 10, 144 PoolTimeout: 30 * time.Second, 145 IdleTimeout: time.Minute, 146 IdleCheckFrequency: 100 * time.Millisecond, 147 } 148} 149 150func performAsync(n int, cbs ...func(int)) *sync.WaitGroup { 151 var wg sync.WaitGroup 152 for _, cb := range cbs { 153 for i := 0; i < n; i++ { 154 wg.Add(1) 155 go func(cb func(int), i int) { 156 defer GinkgoRecover() 157 defer wg.Done() 158 159 cb(i) 160 }(cb, i) 161 } 162 } 163 return &wg 164} 165 166func perform(n int, cbs ...func(int)) { 167 wg := performAsync(n, cbs...) 168 wg.Wait() 169} 170 171func eventually(fn func() error, timeout time.Duration) error { 172 errCh := make(chan error, 1) 173 done := make(chan struct{}) 174 exit := make(chan struct{}) 175 176 go func() { 177 for { 178 err := fn() 179 if err == nil { 180 close(done) 181 return 182 } 183 184 select { 185 case errCh <- err: 186 default: 187 } 188 189 select { 190 case <-exit: 191 return 192 case <-time.After(timeout / 100): 193 } 194 } 195 }() 196 197 select { 198 case <-done: 199 return nil 200 case <-time.After(timeout): 201 close(exit) 202 select { 203 case err := <-errCh: 204 return err 205 default: 206 return fmt.Errorf("timeout after %s without an error", timeout) 207 } 208 } 209} 210 211func execCmd(name string, args ...string) (*os.Process, error) { 212 cmd := exec.Command(name, args...) 213 if testing.Verbose() { 214 cmd.Stdout = os.Stdout 215 cmd.Stderr = os.Stderr 216 } 217 return cmd.Process, cmd.Start() 218} 219 220func connectTo(port string) (*redis.Client, error) { 221 client := redis.NewClient(&redis.Options{ 222 Addr: ":" + port, 223 }) 224 225 err := eventually(func() error { 226 return client.Ping().Err() 227 }, 30*time.Second) 228 if err != nil { 229 return nil, err 230 } 231 232 return client, nil 233} 234 235type redisProcess struct { 236 *os.Process 237 *redis.Client 238} 239 240func (p *redisProcess) Close() error { 241 if err := p.Kill(); err != nil { 242 return err 243 } 244 245 err := eventually(func() error { 246 if err := p.Client.Ping().Err(); err != nil { 247 return nil 248 } 249 return errors.New("client is not shutdown") 250 }, 10*time.Second) 251 if err != nil { 252 return err 253 } 254 255 p.Client.Close() 256 return nil 257} 258 259var ( 260 redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server")) 261 redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf")) 262) 263 264func redisDir(port string) (string, error) { 265 dir, err := filepath.Abs(filepath.Join("testdata", "instances", port)) 266 if err != nil { 267 return "", err 268 } 269 if err := os.RemoveAll(dir); err != nil { 270 return "", err 271 } 272 if err := os.MkdirAll(dir, 0775); err != nil { 273 return "", err 274 } 275 return dir, nil 276} 277 278func startRedis(port string, args ...string) (*redisProcess, error) { 279 dir, err := redisDir(port) 280 if err != nil { 281 return nil, err 282 } 283 if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil { 284 return nil, err 285 } 286 287 baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir} 288 process, err := execCmd(redisServerBin, append(baseArgs, args...)...) 289 if err != nil { 290 return nil, err 291 } 292 293 client, err := connectTo(port) 294 if err != nil { 295 process.Kill() 296 return nil, err 297 } 298 return &redisProcess{process, client}, err 299} 300 301func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { 302 dir, err := redisDir(port) 303 if err != nil { 304 return nil, err 305 } 306 process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir) 307 if err != nil { 308 return nil, err 309 } 310 client, err := connectTo(port) 311 if err != nil { 312 process.Kill() 313 return nil, err 314 } 315 for _, cmd := range []*redis.StatusCmd{ 316 redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"), 317 redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"), 318 redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"), 319 redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"), 320 } { 321 client.Process(cmd) 322 if err := cmd.Err(); err != nil { 323 process.Kill() 324 return nil, err 325 } 326 } 327 return &redisProcess{process, client}, nil 328} 329 330//------------------------------------------------------------------------------ 331 332type badConnError string 333 334func (e badConnError) Error() string { return string(e) } 335func (e badConnError) Timeout() bool { return false } 336func (e badConnError) Temporary() bool { return false } 337 338type badConn struct { 339 net.TCPConn 340 341 readDelay, writeDelay time.Duration 342 readErr, writeErr error 343} 344 345var _ net.Conn = &badConn{} 346 347func (cn *badConn) SetReadDeadline(t time.Time) error { 348 return nil 349} 350 351func (cn *badConn) SetWriteDeadline(t time.Time) error { 352 return nil 353} 354 355func (cn *badConn) Read([]byte) (int, error) { 356 if cn.readDelay != 0 { 357 time.Sleep(cn.readDelay) 358 } 359 if cn.readErr != nil { 360 return 0, cn.readErr 361 } 362 return 0, badConnError("bad connection") 363} 364 365func (cn *badConn) Write([]byte) (int, error) { 366 if cn.writeDelay != 0 { 367 time.Sleep(cn.writeDelay) 368 } 369 if cn.writeErr != nil { 370 return 0, cn.writeErr 371 } 372 return 0, badConnError("bad connection") 373} 374 375//------------------------------------------------------------------------------ 376 377type hook struct { 378 beforeProcess func(ctx context.Context, cmd redis.Cmder) (context.Context, error) 379 afterProcess func(ctx context.Context, cmd redis.Cmder) error 380 381 beforeProcessPipeline func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) 382 afterProcessPipeline func(ctx context.Context, cmds []redis.Cmder) error 383} 384 385func (h *hook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { 386 if h.beforeProcess != nil { 387 return h.beforeProcess(ctx, cmd) 388 } 389 return ctx, nil 390} 391 392func (h *hook) AfterProcess(ctx context.Context, cmd redis.Cmder) error { 393 if h.afterProcess != nil { 394 return h.afterProcess(ctx, cmd) 395 } 396 return nil 397} 398 399func (h *hook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { 400 if h.beforeProcessPipeline != nil { 401 return h.beforeProcessPipeline(ctx, cmds) 402 } 403 return ctx, nil 404} 405 406func (h *hook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error { 407 if h.afterProcessPipeline != nil { 408 return h.afterProcessPipeline(ctx, cmds) 409 } 410 return nil 411} 412