1package redis
2
3import (
4	"context"
5	"sync"
6
7	"github.com/go-redis/redis/v7/internal/pool"
8)
9
10type pipelineExecer func(context.Context, []Cmder) error
11
12// Pipeliner is an mechanism to realise Redis Pipeline technique.
13//
14// Pipelining is a technique to extremely speed up processing by packing
15// operations to batches, send them at once to Redis and read a replies in a
16// singe step.
17// See https://redis.io/topics/pipelining
18//
19// Pay attention, that Pipeline is not a transaction, so you can get unexpected
20// results in case of big pipelines and small read/write timeouts.
21// Redis client has retransmission logic in case of timeouts, pipeline
22// can be retransmitted and commands can be executed more then once.
23// To avoid this: it is good idea to use reasonable bigger read/write timeouts
24// depends of your batch size and/or use TxPipeline.
25type Pipeliner interface {
26	StatefulCmdable
27	Do(args ...interface{}) *Cmd
28	Process(cmd Cmder) error
29	Close() error
30	Discard() error
31	Exec() ([]Cmder, error)
32	ExecContext(ctx context.Context) ([]Cmder, error)
33}
34
35var _ Pipeliner = (*Pipeline)(nil)
36
37// Pipeline implements pipelining as described in
38// http://redis.io/topics/pipelining. It's safe for concurrent use
39// by multiple goroutines.
40type Pipeline struct {
41	cmdable
42	statefulCmdable
43
44	ctx  context.Context
45	exec pipelineExecer
46
47	mu     sync.Mutex
48	cmds   []Cmder
49	closed bool
50}
51
52func (c *Pipeline) init() {
53	c.cmdable = c.Process
54	c.statefulCmdable = c.Process
55}
56
57func (c *Pipeline) Do(args ...interface{}) *Cmd {
58	cmd := NewCmd(args...)
59	_ = c.Process(cmd)
60	return cmd
61}
62
63// Process queues the cmd for later execution.
64func (c *Pipeline) Process(cmd Cmder) error {
65	c.mu.Lock()
66	c.cmds = append(c.cmds, cmd)
67	c.mu.Unlock()
68	return nil
69}
70
71// Close closes the pipeline, releasing any open resources.
72func (c *Pipeline) Close() error {
73	c.mu.Lock()
74	_ = c.discard()
75	c.closed = true
76	c.mu.Unlock()
77	return nil
78}
79
80// Discard resets the pipeline and discards queued commands.
81func (c *Pipeline) Discard() error {
82	c.mu.Lock()
83	err := c.discard()
84	c.mu.Unlock()
85	return err
86}
87
88func (c *Pipeline) discard() error {
89	if c.closed {
90		return pool.ErrClosed
91	}
92	c.cmds = c.cmds[:0]
93	return nil
94}
95
96// Exec executes all previously queued commands using one
97// client-server roundtrip.
98//
99// Exec always returns list of commands and error of the first failed
100// command if any.
101func (c *Pipeline) Exec() ([]Cmder, error) {
102	return c.ExecContext(c.ctx)
103}
104
105func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) {
106	c.mu.Lock()
107	defer c.mu.Unlock()
108
109	if c.closed {
110		return nil, pool.ErrClosed
111	}
112
113	if len(c.cmds) == 0 {
114		return nil, nil
115	}
116
117	cmds := c.cmds
118	c.cmds = nil
119
120	return cmds, c.exec(ctx, cmds)
121}
122
123func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
124	if err := fn(c); err != nil {
125		return nil, err
126	}
127	cmds, err := c.Exec()
128	_ = c.Close()
129	return cmds, err
130}
131
132func (c *Pipeline) Pipeline() Pipeliner {
133	return c
134}
135
136func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
137	return c.Pipelined(fn)
138}
139
140func (c *Pipeline) TxPipeline() Pipeliner {
141	return c
142}
143