1package redis 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "os" 8 "time" 9 10 "github.com/go-redis/redis/internal" 11 "github.com/go-redis/redis/internal/pool" 12 "github.com/go-redis/redis/internal/proto" 13) 14 15// Nil reply Redis returns when key does not exist. 16const Nil = proto.Nil 17 18func init() { 19 SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) 20} 21 22func SetLogger(logger *log.Logger) { 23 internal.Logger = logger 24} 25 26type baseClient struct { 27 opt *Options 28 connPool pool.Pooler 29 30 process func(Cmder) error 31 processPipeline func([]Cmder) error 32 processTxPipeline func([]Cmder) error 33 34 onClose func() error // hook called when client is closed 35} 36 37func (c *baseClient) init() { 38 c.process = c.defaultProcess 39 c.processPipeline = c.defaultProcessPipeline 40 c.processTxPipeline = c.defaultProcessTxPipeline 41} 42 43func (c *baseClient) String() string { 44 return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) 45} 46 47func (c *baseClient) newConn() (*pool.Conn, error) { 48 cn, err := c.connPool.NewConn() 49 if err != nil { 50 return nil, err 51 } 52 53 if !cn.Inited { 54 if err := c.initConn(cn); err != nil { 55 _ = c.connPool.CloseConn(cn) 56 return nil, err 57 } 58 } 59 60 return cn, nil 61} 62 63func (c *baseClient) getConn() (*pool.Conn, bool, error) { 64 cn, isNew, err := c.connPool.Get() 65 if err != nil { 66 return nil, false, err 67 } 68 69 if !cn.Inited { 70 if err := c.initConn(cn); err != nil { 71 _ = c.connPool.Remove(cn) 72 return nil, false, err 73 } 74 } 75 76 return cn, isNew, nil 77} 78 79func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { 80 if internal.IsBadConn(err, false) { 81 _ = c.connPool.Remove(cn) 82 return false 83 } 84 85 _ = c.connPool.Put(cn) 86 return true 87} 88 89func (c *baseClient) initConn(cn *pool.Conn) error { 90 cn.Inited = true 91 92 if c.opt.Password == "" && 93 c.opt.DB == 0 && 94 !c.opt.readOnly && 95 c.opt.OnConnect == nil { 96 return nil 97 } 98 99 conn := newConn(c.opt, cn) 100 _, err := conn.Pipelined(func(pipe Pipeliner) error { 101 if c.opt.Password != "" { 102 pipe.Auth(c.opt.Password) 103 } 104 105 if c.opt.DB > 0 { 106 pipe.Select(c.opt.DB) 107 } 108 109 if c.opt.readOnly { 110 pipe.ReadOnly() 111 } 112 113 return nil 114 }) 115 if err != nil { 116 return err 117 } 118 119 if c.opt.OnConnect != nil { 120 return c.opt.OnConnect(conn) 121 } 122 return nil 123} 124 125// WrapProcess wraps function that processes Redis commands. 126func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { 127 c.process = fn(c.process) 128} 129 130func (c *baseClient) Process(cmd Cmder) error { 131 return c.process(cmd) 132} 133 134func (c *baseClient) defaultProcess(cmd Cmder) error { 135 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { 136 if attempt > 0 { 137 time.Sleep(c.retryBackoff(attempt)) 138 } 139 140 cn, _, err := c.getConn() 141 if err != nil { 142 cmd.setErr(err) 143 if internal.IsRetryableError(err, true) { 144 continue 145 } 146 return err 147 } 148 149 cn.SetWriteTimeout(c.opt.WriteTimeout) 150 if err := writeCmd(cn, cmd); err != nil { 151 c.releaseConn(cn, err) 152 cmd.setErr(err) 153 if internal.IsRetryableError(err, true) { 154 continue 155 } 156 return err 157 } 158 159 cn.SetReadTimeout(c.cmdTimeout(cmd)) 160 err = cmd.readReply(cn) 161 c.releaseConn(cn, err) 162 if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { 163 continue 164 } 165 166 return err 167 } 168 169 return cmd.Err() 170} 171 172func (c *baseClient) retryBackoff(attempt int) time.Duration { 173 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) 174} 175 176func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { 177 if timeout := cmd.readTimeout(); timeout != nil { 178 return *timeout 179 } 180 181 return c.opt.ReadTimeout 182} 183 184// Close closes the client, releasing any open resources. 185// 186// It is rare to Close a Client, as the Client is meant to be 187// long-lived and shared between many goroutines. 188func (c *baseClient) Close() error { 189 var firstErr error 190 if c.onClose != nil { 191 if err := c.onClose(); err != nil && firstErr == nil { 192 firstErr = err 193 } 194 } 195 if err := c.connPool.Close(); err != nil && firstErr == nil { 196 firstErr = err 197 } 198 return firstErr 199} 200 201func (c *baseClient) getAddr() string { 202 return c.opt.Addr 203} 204 205func (c *baseClient) WrapProcessPipeline( 206 fn func(oldProcess func([]Cmder) error) func([]Cmder) error, 207) { 208 c.processPipeline = fn(c.processPipeline) 209 c.processTxPipeline = fn(c.processTxPipeline) 210} 211 212func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { 213 return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) 214} 215 216func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { 217 return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) 218} 219 220type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) 221 222func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { 223 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { 224 if attempt > 0 { 225 time.Sleep(c.retryBackoff(attempt)) 226 } 227 228 cn, _, err := c.getConn() 229 if err != nil { 230 setCmdsErr(cmds, err) 231 return err 232 } 233 234 canRetry, err := p(cn, cmds) 235 236 if err == nil || internal.IsRedisError(err) { 237 _ = c.connPool.Put(cn) 238 break 239 } 240 _ = c.connPool.Remove(cn) 241 242 if !canRetry || !internal.IsRetryableError(err, true) { 243 break 244 } 245 } 246 return firstCmdsErr(cmds) 247} 248 249func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { 250 cn.SetWriteTimeout(c.opt.WriteTimeout) 251 if err := writeCmd(cn, cmds...); err != nil { 252 setCmdsErr(cmds, err) 253 return true, err 254 } 255 256 // Set read timeout for all commands. 257 cn.SetReadTimeout(c.opt.ReadTimeout) 258 return true, pipelineReadCmds(cn, cmds) 259} 260 261func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { 262 for _, cmd := range cmds { 263 err := cmd.readReply(cn) 264 if err != nil && !internal.IsRedisError(err) { 265 return err 266 } 267 } 268 return nil 269} 270 271func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { 272 cn.SetWriteTimeout(c.opt.WriteTimeout) 273 if err := txPipelineWriteMulti(cn, cmds); err != nil { 274 setCmdsErr(cmds, err) 275 return true, err 276 } 277 278 // Set read timeout for all commands. 279 cn.SetReadTimeout(c.opt.ReadTimeout) 280 281 if err := c.txPipelineReadQueued(cn, cmds); err != nil { 282 setCmdsErr(cmds, err) 283 return false, err 284 } 285 286 return false, pipelineReadCmds(cn, cmds) 287} 288 289func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { 290 multiExec := make([]Cmder, 0, len(cmds)+2) 291 multiExec = append(multiExec, NewStatusCmd("MULTI")) 292 multiExec = append(multiExec, cmds...) 293 multiExec = append(multiExec, NewSliceCmd("EXEC")) 294 return writeCmd(cn, multiExec...) 295} 296 297func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { 298 // Parse queued replies. 299 var statusCmd StatusCmd 300 if err := statusCmd.readReply(cn); err != nil { 301 return err 302 } 303 304 for _ = range cmds { 305 err := statusCmd.readReply(cn) 306 if err != nil && !internal.IsRedisError(err) { 307 return err 308 } 309 } 310 311 // Parse number of replies. 312 line, err := cn.Rd.ReadLine() 313 if err != nil { 314 if err == Nil { 315 err = TxFailedErr 316 } 317 return err 318 } 319 320 switch line[0] { 321 case proto.ErrorReply: 322 return proto.ParseErrorReply(line) 323 case proto.ArrayReply: 324 // ok 325 default: 326 err := fmt.Errorf("redis: expected '*', but got line %q", line) 327 return err 328 } 329 330 return nil 331} 332 333//------------------------------------------------------------------------------ 334 335// Client is a Redis client representing a pool of zero or more 336// underlying connections. It's safe for concurrent use by multiple 337// goroutines. 338type Client struct { 339 baseClient 340 cmdable 341 342 ctx context.Context 343} 344 345// NewClient returns a client to the Redis Server specified by Options. 346func NewClient(opt *Options) *Client { 347 opt.init() 348 349 c := Client{ 350 baseClient: baseClient{ 351 opt: opt, 352 connPool: newConnPool(opt), 353 }, 354 } 355 c.baseClient.init() 356 c.init() 357 358 return &c 359} 360 361func (c *Client) init() { 362 c.cmdable.setProcessor(c.Process) 363} 364 365func (c *Client) Context() context.Context { 366 if c.ctx != nil { 367 return c.ctx 368 } 369 return context.Background() 370} 371 372func (c *Client) WithContext(ctx context.Context) *Client { 373 if ctx == nil { 374 panic("nil context") 375 } 376 c2 := c.copy() 377 c2.ctx = ctx 378 return c2 379} 380 381func (c *Client) copy() *Client { 382 cp := *c 383 cp.init() 384 return &cp 385} 386 387// Options returns read-only Options that were used to create the client. 388func (c *Client) Options() *Options { 389 return c.opt 390} 391 392type PoolStats pool.Stats 393 394// PoolStats returns connection pool stats. 395func (c *Client) PoolStats() *PoolStats { 396 stats := c.connPool.Stats() 397 return (*PoolStats)(stats) 398} 399 400func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { 401 return c.Pipeline().Pipelined(fn) 402} 403 404func (c *Client) Pipeline() Pipeliner { 405 pipe := Pipeline{ 406 exec: c.processPipeline, 407 } 408 pipe.statefulCmdable.setProcessor(pipe.Process) 409 return &pipe 410} 411 412func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { 413 return c.TxPipeline().Pipelined(fn) 414} 415 416// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. 417func (c *Client) TxPipeline() Pipeliner { 418 pipe := Pipeline{ 419 exec: c.processTxPipeline, 420 } 421 pipe.statefulCmdable.setProcessor(pipe.Process) 422 return &pipe 423} 424 425func (c *Client) pubSub() *PubSub { 426 return &PubSub{ 427 opt: c.opt, 428 429 newConn: func(channels []string) (*pool.Conn, error) { 430 return c.newConn() 431 }, 432 closeConn: c.connPool.CloseConn, 433 } 434} 435 436// Subscribe subscribes the client to the specified channels. 437// Channels can be omitted to create empty subscription. 438func (c *Client) Subscribe(channels ...string) *PubSub { 439 pubsub := c.pubSub() 440 if len(channels) > 0 { 441 _ = pubsub.Subscribe(channels...) 442 } 443 return pubsub 444} 445 446// PSubscribe subscribes the client to the given patterns. 447// Patterns can be omitted to create empty subscription. 448func (c *Client) PSubscribe(channels ...string) *PubSub { 449 pubsub := c.pubSub() 450 if len(channels) > 0 { 451 _ = pubsub.PSubscribe(channels...) 452 } 453 return pubsub 454} 455 456//------------------------------------------------------------------------------ 457 458// Conn is like Client, but its pool contains single connection. 459type Conn struct { 460 baseClient 461 statefulCmdable 462} 463 464func newConn(opt *Options, cn *pool.Conn) *Conn { 465 c := Conn{ 466 baseClient: baseClient{ 467 opt: opt, 468 connPool: pool.NewSingleConnPool(cn), 469 }, 470 } 471 c.baseClient.init() 472 c.statefulCmdable.setProcessor(c.Process) 473 return &c 474} 475 476func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { 477 return c.Pipeline().Pipelined(fn) 478} 479 480func (c *Conn) Pipeline() Pipeliner { 481 pipe := Pipeline{ 482 exec: c.processPipeline, 483 } 484 pipe.statefulCmdable.setProcessor(pipe.Process) 485 return &pipe 486} 487 488func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { 489 return c.TxPipeline().Pipelined(fn) 490} 491 492// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. 493func (c *Conn) TxPipeline() Pipeliner { 494 pipe := Pipeline{ 495 exec: c.processTxPipeline, 496 } 497 pipe.statefulCmdable.setProcessor(pipe.Process) 498 return &pipe 499} 500