1package redis
2
3import (
4	"context"
5	"fmt"
6	"log"
7	"os"
8	"time"
9
10	"github.com/go-redis/redis/internal"
11	"github.com/go-redis/redis/internal/pool"
12	"github.com/go-redis/redis/internal/proto"
13)
14
15// Nil reply Redis returns when key does not exist.
16const Nil = proto.Nil
17
18func init() {
19	SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
20}
21
22func SetLogger(logger *log.Logger) {
23	internal.Logger = logger
24}
25
26type baseClient struct {
27	opt      *Options
28	connPool pool.Pooler
29
30	process           func(Cmder) error
31	processPipeline   func([]Cmder) error
32	processTxPipeline func([]Cmder) error
33
34	onClose func() error // hook called when client is closed
35}
36
37func (c *baseClient) init() {
38	c.process = c.defaultProcess
39	c.processPipeline = c.defaultProcessPipeline
40	c.processTxPipeline = c.defaultProcessTxPipeline
41}
42
43func (c *baseClient) String() string {
44	return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
45}
46
47func (c *baseClient) newConn() (*pool.Conn, error) {
48	cn, err := c.connPool.NewConn()
49	if err != nil {
50		return nil, err
51	}
52
53	if !cn.Inited {
54		if err := c.initConn(cn); err != nil {
55			_ = c.connPool.CloseConn(cn)
56			return nil, err
57		}
58	}
59
60	return cn, nil
61}
62
63func (c *baseClient) getConn() (*pool.Conn, bool, error) {
64	cn, isNew, err := c.connPool.Get()
65	if err != nil {
66		return nil, false, err
67	}
68
69	if !cn.Inited {
70		if err := c.initConn(cn); err != nil {
71			_ = c.connPool.Remove(cn)
72			return nil, false, err
73		}
74	}
75
76	return cn, isNew, nil
77}
78
79func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
80	if internal.IsBadConn(err, false) {
81		_ = c.connPool.Remove(cn)
82		return false
83	}
84
85	_ = c.connPool.Put(cn)
86	return true
87}
88
89func (c *baseClient) initConn(cn *pool.Conn) error {
90	cn.Inited = true
91
92	if c.opt.Password == "" &&
93		c.opt.DB == 0 &&
94		!c.opt.readOnly &&
95		c.opt.OnConnect == nil {
96		return nil
97	}
98
99	conn := newConn(c.opt, cn)
100	_, err := conn.Pipelined(func(pipe Pipeliner) error {
101		if c.opt.Password != "" {
102			pipe.Auth(c.opt.Password)
103		}
104
105		if c.opt.DB > 0 {
106			pipe.Select(c.opt.DB)
107		}
108
109		if c.opt.readOnly {
110			pipe.ReadOnly()
111		}
112
113		return nil
114	})
115	if err != nil {
116		return err
117	}
118
119	if c.opt.OnConnect != nil {
120		return c.opt.OnConnect(conn)
121	}
122	return nil
123}
124
125// WrapProcess wraps function that processes Redis commands.
126func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
127	c.process = fn(c.process)
128}
129
130func (c *baseClient) Process(cmd Cmder) error {
131	return c.process(cmd)
132}
133
134func (c *baseClient) defaultProcess(cmd Cmder) error {
135	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
136		if attempt > 0 {
137			time.Sleep(c.retryBackoff(attempt))
138		}
139
140		cn, _, err := c.getConn()
141		if err != nil {
142			cmd.setErr(err)
143			if internal.IsRetryableError(err, true) {
144				continue
145			}
146			return err
147		}
148
149		cn.SetWriteTimeout(c.opt.WriteTimeout)
150		if err := writeCmd(cn, cmd); err != nil {
151			c.releaseConn(cn, err)
152			cmd.setErr(err)
153			if internal.IsRetryableError(err, true) {
154				continue
155			}
156			return err
157		}
158
159		cn.SetReadTimeout(c.cmdTimeout(cmd))
160		err = cmd.readReply(cn)
161		c.releaseConn(cn, err)
162		if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
163			continue
164		}
165
166		return err
167	}
168
169	return cmd.Err()
170}
171
172func (c *baseClient) retryBackoff(attempt int) time.Duration {
173	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
174}
175
176func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
177	if timeout := cmd.readTimeout(); timeout != nil {
178		return *timeout
179	}
180
181	return c.opt.ReadTimeout
182}
183
184// Close closes the client, releasing any open resources.
185//
186// It is rare to Close a Client, as the Client is meant to be
187// long-lived and shared between many goroutines.
188func (c *baseClient) Close() error {
189	var firstErr error
190	if c.onClose != nil {
191		if err := c.onClose(); err != nil && firstErr == nil {
192			firstErr = err
193		}
194	}
195	if err := c.connPool.Close(); err != nil && firstErr == nil {
196		firstErr = err
197	}
198	return firstErr
199}
200
201func (c *baseClient) getAddr() string {
202	return c.opt.Addr
203}
204
205func (c *baseClient) WrapProcessPipeline(
206	fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
207) {
208	c.processPipeline = fn(c.processPipeline)
209	c.processTxPipeline = fn(c.processTxPipeline)
210}
211
212func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
213	return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
214}
215
216func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
217	return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
218}
219
220type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
221
222func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
223	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
224		if attempt > 0 {
225			time.Sleep(c.retryBackoff(attempt))
226		}
227
228		cn, _, err := c.getConn()
229		if err != nil {
230			setCmdsErr(cmds, err)
231			return err
232		}
233
234		canRetry, err := p(cn, cmds)
235
236		if err == nil || internal.IsRedisError(err) {
237			_ = c.connPool.Put(cn)
238			break
239		}
240		_ = c.connPool.Remove(cn)
241
242		if !canRetry || !internal.IsRetryableError(err, true) {
243			break
244		}
245	}
246	return firstCmdsErr(cmds)
247}
248
249func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
250	cn.SetWriteTimeout(c.opt.WriteTimeout)
251	if err := writeCmd(cn, cmds...); err != nil {
252		setCmdsErr(cmds, err)
253		return true, err
254	}
255
256	// Set read timeout for all commands.
257	cn.SetReadTimeout(c.opt.ReadTimeout)
258	return true, pipelineReadCmds(cn, cmds)
259}
260
261func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
262	for _, cmd := range cmds {
263		err := cmd.readReply(cn)
264		if err != nil && !internal.IsRedisError(err) {
265			return err
266		}
267	}
268	return nil
269}
270
271func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
272	cn.SetWriteTimeout(c.opt.WriteTimeout)
273	if err := txPipelineWriteMulti(cn, cmds); err != nil {
274		setCmdsErr(cmds, err)
275		return true, err
276	}
277
278	// Set read timeout for all commands.
279	cn.SetReadTimeout(c.opt.ReadTimeout)
280
281	if err := c.txPipelineReadQueued(cn, cmds); err != nil {
282		setCmdsErr(cmds, err)
283		return false, err
284	}
285
286	return false, pipelineReadCmds(cn, cmds)
287}
288
289func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
290	multiExec := make([]Cmder, 0, len(cmds)+2)
291	multiExec = append(multiExec, NewStatusCmd("MULTI"))
292	multiExec = append(multiExec, cmds...)
293	multiExec = append(multiExec, NewSliceCmd("EXEC"))
294	return writeCmd(cn, multiExec...)
295}
296
297func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
298	// Parse queued replies.
299	var statusCmd StatusCmd
300	if err := statusCmd.readReply(cn); err != nil {
301		return err
302	}
303
304	for _ = range cmds {
305		err := statusCmd.readReply(cn)
306		if err != nil && !internal.IsRedisError(err) {
307			return err
308		}
309	}
310
311	// Parse number of replies.
312	line, err := cn.Rd.ReadLine()
313	if err != nil {
314		if err == Nil {
315			err = TxFailedErr
316		}
317		return err
318	}
319
320	switch line[0] {
321	case proto.ErrorReply:
322		return proto.ParseErrorReply(line)
323	case proto.ArrayReply:
324		// ok
325	default:
326		err := fmt.Errorf("redis: expected '*', but got line %q", line)
327		return err
328	}
329
330	return nil
331}
332
333//------------------------------------------------------------------------------
334
335// Client is a Redis client representing a pool of zero or more
336// underlying connections. It's safe for concurrent use by multiple
337// goroutines.
338type Client struct {
339	baseClient
340	cmdable
341
342	ctx context.Context
343}
344
345// NewClient returns a client to the Redis Server specified by Options.
346func NewClient(opt *Options) *Client {
347	opt.init()
348
349	c := Client{
350		baseClient: baseClient{
351			opt:      opt,
352			connPool: newConnPool(opt),
353		},
354	}
355	c.baseClient.init()
356	c.init()
357
358	return &c
359}
360
361func (c *Client) init() {
362	c.cmdable.setProcessor(c.Process)
363}
364
365func (c *Client) Context() context.Context {
366	if c.ctx != nil {
367		return c.ctx
368	}
369	return context.Background()
370}
371
372func (c *Client) WithContext(ctx context.Context) *Client {
373	if ctx == nil {
374		panic("nil context")
375	}
376	c2 := c.copy()
377	c2.ctx = ctx
378	return c2
379}
380
381func (c *Client) copy() *Client {
382	cp := *c
383	cp.init()
384	return &cp
385}
386
387// Options returns read-only Options that were used to create the client.
388func (c *Client) Options() *Options {
389	return c.opt
390}
391
392type PoolStats pool.Stats
393
394// PoolStats returns connection pool stats.
395func (c *Client) PoolStats() *PoolStats {
396	stats := c.connPool.Stats()
397	return (*PoolStats)(stats)
398}
399
400func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
401	return c.Pipeline().Pipelined(fn)
402}
403
404func (c *Client) Pipeline() Pipeliner {
405	pipe := Pipeline{
406		exec: c.processPipeline,
407	}
408	pipe.statefulCmdable.setProcessor(pipe.Process)
409	return &pipe
410}
411
412func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
413	return c.TxPipeline().Pipelined(fn)
414}
415
416// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
417func (c *Client) TxPipeline() Pipeliner {
418	pipe := Pipeline{
419		exec: c.processTxPipeline,
420	}
421	pipe.statefulCmdable.setProcessor(pipe.Process)
422	return &pipe
423}
424
425func (c *Client) pubSub() *PubSub {
426	return &PubSub{
427		opt: c.opt,
428
429		newConn: func(channels []string) (*pool.Conn, error) {
430			return c.newConn()
431		},
432		closeConn: c.connPool.CloseConn,
433	}
434}
435
436// Subscribe subscribes the client to the specified channels.
437// Channels can be omitted to create empty subscription.
438func (c *Client) Subscribe(channels ...string) *PubSub {
439	pubsub := c.pubSub()
440	if len(channels) > 0 {
441		_ = pubsub.Subscribe(channels...)
442	}
443	return pubsub
444}
445
446// PSubscribe subscribes the client to the given patterns.
447// Patterns can be omitted to create empty subscription.
448func (c *Client) PSubscribe(channels ...string) *PubSub {
449	pubsub := c.pubSub()
450	if len(channels) > 0 {
451		_ = pubsub.PSubscribe(channels...)
452	}
453	return pubsub
454}
455
456//------------------------------------------------------------------------------
457
458// Conn is like Client, but its pool contains single connection.
459type Conn struct {
460	baseClient
461	statefulCmdable
462}
463
464func newConn(opt *Options, cn *pool.Conn) *Conn {
465	c := Conn{
466		baseClient: baseClient{
467			opt:      opt,
468			connPool: pool.NewSingleConnPool(cn),
469		},
470	}
471	c.baseClient.init()
472	c.statefulCmdable.setProcessor(c.Process)
473	return &c
474}
475
476func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
477	return c.Pipeline().Pipelined(fn)
478}
479
480func (c *Conn) Pipeline() Pipeliner {
481	pipe := Pipeline{
482		exec: c.processPipeline,
483	}
484	pipe.statefulCmdable.setProcessor(pipe.Process)
485	return &pipe
486}
487
488func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
489	return c.TxPipeline().Pipelined(fn)
490}
491
492// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
493func (c *Conn) TxPipeline() Pipeliner {
494	pipe := Pipeline{
495		exec: c.processTxPipeline,
496	}
497	pipe.statefulCmdable.setProcessor(pipe.Process)
498	return &pipe
499}
500