1package redis 2 3import ( 4 "context" 5 "sync" 6 7 "github.com/go-redis/redis/v7/internal/pool" 8) 9 10type pipelineExecer func(context.Context, []Cmder) error 11 12// Pipeliner is an mechanism to realise Redis Pipeline technique. 13// 14// Pipelining is a technique to extremely speed up processing by packing 15// operations to batches, send them at once to Redis and read a replies in a 16// singe step. 17// See https://redis.io/topics/pipelining 18// 19// Pay attention, that Pipeline is not a transaction, so you can get unexpected 20// results in case of big pipelines and small read/write timeouts. 21// Redis client has retransmission logic in case of timeouts, pipeline 22// can be retransmitted and commands can be executed more then once. 23// To avoid this: it is good idea to use reasonable bigger read/write timeouts 24// depends of your batch size and/or use TxPipeline. 25type Pipeliner interface { 26 StatefulCmdable 27 Do(args ...interface{}) *Cmd 28 Process(cmd Cmder) error 29 Close() error 30 Discard() error 31 Exec() ([]Cmder, error) 32 ExecContext(ctx context.Context) ([]Cmder, error) 33} 34 35var _ Pipeliner = (*Pipeline)(nil) 36 37// Pipeline implements pipelining as described in 38// http://redis.io/topics/pipelining. It's safe for concurrent use 39// by multiple goroutines. 40type Pipeline struct { 41 cmdable 42 statefulCmdable 43 44 ctx context.Context 45 exec pipelineExecer 46 47 mu sync.Mutex 48 cmds []Cmder 49 closed bool 50} 51 52func (c *Pipeline) init() { 53 c.cmdable = c.Process 54 c.statefulCmdable = c.Process 55} 56 57func (c *Pipeline) Do(args ...interface{}) *Cmd { 58 cmd := NewCmd(args...) 59 _ = c.Process(cmd) 60 return cmd 61} 62 63// Process queues the cmd for later execution. 64func (c *Pipeline) Process(cmd Cmder) error { 65 c.mu.Lock() 66 c.cmds = append(c.cmds, cmd) 67 c.mu.Unlock() 68 return nil 69} 70 71// Close closes the pipeline, releasing any open resources. 72func (c *Pipeline) Close() error { 73 c.mu.Lock() 74 _ = c.discard() 75 c.closed = true 76 c.mu.Unlock() 77 return nil 78} 79 80// Discard resets the pipeline and discards queued commands. 81func (c *Pipeline) Discard() error { 82 c.mu.Lock() 83 err := c.discard() 84 c.mu.Unlock() 85 return err 86} 87 88func (c *Pipeline) discard() error { 89 if c.closed { 90 return pool.ErrClosed 91 } 92 c.cmds = c.cmds[:0] 93 return nil 94} 95 96// Exec executes all previously queued commands using one 97// client-server roundtrip. 98// 99// Exec always returns list of commands and error of the first failed 100// command if any. 101func (c *Pipeline) Exec() ([]Cmder, error) { 102 return c.ExecContext(c.ctx) 103} 104 105func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) { 106 c.mu.Lock() 107 defer c.mu.Unlock() 108 109 if c.closed { 110 return nil, pool.ErrClosed 111 } 112 113 if len(c.cmds) == 0 { 114 return nil, nil 115 } 116 117 cmds := c.cmds 118 c.cmds = nil 119 120 return cmds, c.exec(ctx, cmds) 121} 122 123func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { 124 if err := fn(c); err != nil { 125 return nil, err 126 } 127 cmds, err := c.Exec() 128 _ = c.Close() 129 return cmds, err 130} 131 132func (c *Pipeline) Pipeline() Pipeliner { 133 return c 134} 135 136func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { 137 return c.Pipelined(fn) 138} 139 140func (c *Pipeline) TxPipeline() Pipeliner { 141 return c 142} 143