1package redis // import "gopkg.in/redis.v5" 2 3import ( 4 "fmt" 5 "log" 6 "time" 7 8 "gopkg.in/redis.v5/internal" 9 "gopkg.in/redis.v5/internal/pool" 10 "gopkg.in/redis.v5/internal/proto" 11) 12 13// Redis nil reply, .e.g. when key does not exist. 14const Nil = internal.Nil 15 16func SetLogger(logger *log.Logger) { 17 internal.Logger = logger 18} 19 20func (c *baseClient) String() string { 21 return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) 22} 23 24func (c *baseClient) conn() (*pool.Conn, bool, error) { 25 cn, isNew, err := c.connPool.Get() 26 if err != nil { 27 return nil, false, err 28 } 29 if !cn.Inited { 30 if err := c.initConn(cn); err != nil { 31 _ = c.connPool.Remove(cn, err) 32 return nil, false, err 33 } 34 } 35 return cn, isNew, nil 36} 37 38func (c *baseClient) putConn(cn *pool.Conn, err error, allowTimeout bool) bool { 39 if internal.IsBadConn(err, allowTimeout) { 40 _ = c.connPool.Remove(cn, err) 41 return false 42 } 43 44 _ = c.connPool.Put(cn) 45 return true 46} 47 48func (c *baseClient) initConn(cn *pool.Conn) error { 49 cn.Inited = true 50 51 if c.opt.Password == "" && c.opt.DB == 0 && !c.opt.ReadOnly { 52 return nil 53 } 54 55 // Temp client for Auth and Select. 56 client := newClient(c.opt, pool.NewSingleConnPool(cn)) 57 _, err := client.Pipelined(func(pipe *Pipeline) error { 58 if c.opt.Password != "" { 59 pipe.Auth(c.opt.Password) 60 } 61 62 if c.opt.DB > 0 { 63 pipe.Select(c.opt.DB) 64 } 65 66 if c.opt.ReadOnly { 67 pipe.ReadOnly() 68 } 69 70 return nil 71 }) 72 return err 73} 74 75func (c *baseClient) Process(cmd Cmder) error { 76 if c.process != nil { 77 return c.process(cmd) 78 } 79 return c.defaultProcess(cmd) 80} 81 82// WrapProcess replaces the process func. It takes a function createWrapper 83// which is supplied by the user. createWrapper takes the old process func as 84// an input and returns the new wrapper process func. createWrapper should 85// use call the old process func within the new process func. 86func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { 87 c.process = fn(c.defaultProcess) 88} 89 90func (c *baseClient) defaultProcess(cmd Cmder) error { 91 for i := 0; i <= c.opt.MaxRetries; i++ { 92 cn, _, err := c.conn() 93 if err != nil { 94 cmd.setErr(err) 95 return err 96 } 97 98 cn.SetWriteTimeout(c.opt.WriteTimeout) 99 if err := writeCmd(cn, cmd); err != nil { 100 c.putConn(cn, err, false) 101 cmd.setErr(err) 102 if err != nil && internal.IsRetryableError(err) { 103 continue 104 } 105 return err 106 } 107 108 cn.SetReadTimeout(c.cmdTimeout(cmd)) 109 err = cmd.readReply(cn) 110 c.putConn(cn, err, false) 111 if err != nil && internal.IsRetryableError(err) { 112 continue 113 } 114 115 return err 116 } 117 118 return cmd.Err() 119} 120 121func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { 122 if timeout := cmd.readTimeout(); timeout != nil { 123 return *timeout 124 } else { 125 return c.opt.ReadTimeout 126 } 127} 128 129// Close closes the client, releasing any open resources. 130// 131// It is rare to Close a Client, as the Client is meant to be 132// long-lived and shared between many goroutines. 133func (c *baseClient) Close() error { 134 var firstErr error 135 if c.onClose != nil { 136 if err := c.onClose(); err != nil && firstErr == nil { 137 firstErr = err 138 } 139 } 140 if err := c.connPool.Close(); err != nil && firstErr == nil { 141 firstErr = err 142 } 143 return firstErr 144} 145 146func (c *baseClient) getAddr() string { 147 return c.opt.Addr 148} 149 150type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) 151 152func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { 153 return func(cmds []Cmder) error { 154 var firstErr error 155 for i := 0; i <= c.opt.MaxRetries; i++ { 156 cn, _, err := c.conn() 157 if err != nil { 158 setCmdsErr(cmds, err) 159 return err 160 } 161 162 canRetry, err := p(cn, cmds) 163 c.putConn(cn, err, false) 164 if err == nil { 165 return nil 166 } 167 if firstErr == nil { 168 firstErr = err 169 } 170 if !canRetry || !internal.IsRetryableError(err) { 171 break 172 } 173 } 174 return firstErr 175 } 176} 177 178func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) { 179 cn.SetWriteTimeout(c.opt.WriteTimeout) 180 if err := writeCmd(cn, cmds...); err != nil { 181 setCmdsErr(cmds, err) 182 return true, err 183 } 184 185 // Set read timeout for all commands. 186 cn.SetReadTimeout(c.opt.ReadTimeout) 187 return pipelineReadCmds(cn, cmds) 188} 189 190func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) { 191 for i, cmd := range cmds { 192 err := cmd.readReply(cn) 193 if err == nil { 194 continue 195 } 196 if i == 0 { 197 retry = true 198 } 199 if firstErr == nil { 200 firstErr = err 201 } 202 } 203 return false, firstErr 204} 205 206func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { 207 cn.SetWriteTimeout(c.opt.WriteTimeout) 208 if err := txPipelineWriteMulti(cn, cmds); err != nil { 209 setCmdsErr(cmds, err) 210 return true, err 211 } 212 213 // Set read timeout for all commands. 214 cn.SetReadTimeout(c.opt.ReadTimeout) 215 216 if err := c.txPipelineReadQueued(cn, cmds); err != nil { 217 return false, err 218 } 219 220 _, err := pipelineReadCmds(cn, cmds) 221 return false, err 222} 223 224func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { 225 multiExec := make([]Cmder, 0, len(cmds)+2) 226 multiExec = append(multiExec, NewStatusCmd("MULTI")) 227 multiExec = append(multiExec, cmds...) 228 multiExec = append(multiExec, NewSliceCmd("EXEC")) 229 return writeCmd(cn, multiExec...) 230} 231 232func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { 233 var firstErr error 234 235 // Parse queued replies. 236 var statusCmd StatusCmd 237 if err := statusCmd.readReply(cn); err != nil && firstErr == nil { 238 firstErr = err 239 } 240 241 for _, cmd := range cmds { 242 err := statusCmd.readReply(cn) 243 if err != nil { 244 cmd.setErr(err) 245 if firstErr == nil { 246 firstErr = err 247 } 248 } 249 } 250 251 // Parse number of replies. 252 line, err := cn.Rd.ReadLine() 253 if err != nil { 254 if err == Nil { 255 err = TxFailedErr 256 } 257 return err 258 } 259 260 switch line[0] { 261 case proto.ErrorReply: 262 return proto.ParseErrorReply(line) 263 case proto.ArrayReply: 264 // ok 265 default: 266 err := fmt.Errorf("redis: expected '*', but got line %q", line) 267 return err 268 } 269 270 return nil 271} 272 273//------------------------------------------------------------------------------ 274 275// Client is a Redis client representing a pool of zero or more 276// underlying connections. It's safe for concurrent use by multiple 277// goroutines. 278type Client struct { 279 baseClient 280 cmdable 281} 282 283func newClient(opt *Options, pool pool.Pooler) *Client { 284 client := Client{ 285 baseClient: baseClient{ 286 opt: opt, 287 connPool: pool, 288 }, 289 } 290 client.cmdable.process = client.Process 291 return &client 292} 293 294// NewClient returns a client to the Redis Server specified by Options. 295func NewClient(opt *Options) *Client { 296 opt.init() 297 return newClient(opt, newConnPool(opt)) 298} 299 300func (c *Client) copy() *Client { 301 c2 := new(Client) 302 *c2 = *c 303 c2.cmdable.process = c2.Process 304 return c2 305} 306 307// PoolStats returns connection pool stats. 308func (c *Client) PoolStats() *PoolStats { 309 s := c.connPool.Stats() 310 return &PoolStats{ 311 Requests: s.Requests, 312 Hits: s.Hits, 313 Timeouts: s.Timeouts, 314 315 TotalConns: s.TotalConns, 316 FreeConns: s.FreeConns, 317 } 318} 319 320func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { 321 return c.Pipeline().pipelined(fn) 322} 323 324func (c *Client) Pipeline() *Pipeline { 325 pipe := Pipeline{ 326 exec: c.pipelineExecer(c.pipelineProcessCmds), 327 } 328 pipe.cmdable.process = pipe.Process 329 pipe.statefulCmdable.process = pipe.Process 330 return &pipe 331} 332 333func (c *Client) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) { 334 return c.TxPipeline().pipelined(fn) 335} 336 337// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. 338func (c *Client) TxPipeline() *Pipeline { 339 pipe := Pipeline{ 340 exec: c.pipelineExecer(c.txPipelineProcessCmds), 341 } 342 pipe.cmdable.process = pipe.Process 343 pipe.statefulCmdable.process = pipe.Process 344 return &pipe 345} 346 347func (c *Client) pubSub() *PubSub { 348 return &PubSub{ 349 base: baseClient{ 350 opt: c.opt, 351 connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false), 352 }, 353 } 354} 355 356// Subscribe subscribes the client to the specified channels. 357func (c *Client) Subscribe(channels ...string) (*PubSub, error) { 358 pubsub := c.pubSub() 359 if len(channels) > 0 { 360 if err := pubsub.Subscribe(channels...); err != nil { 361 pubsub.Close() 362 return nil, err 363 } 364 } 365 return pubsub, nil 366} 367 368// PSubscribe subscribes the client to the given patterns. 369func (c *Client) PSubscribe(channels ...string) (*PubSub, error) { 370 pubsub := c.pubSub() 371 if len(channels) > 0 { 372 if err := pubsub.PSubscribe(channels...); err != nil { 373 pubsub.Close() 374 return nil, err 375 } 376 } 377 return pubsub, nil 378} 379