1package redis 2 3import ( 4 "context" 5 6 "github.com/go-redis/redis/v8/internal/pool" 7 "github.com/go-redis/redis/v8/internal/proto" 8) 9 10// TxFailedErr transaction redis failed. 11const TxFailedErr = proto.RedisError("redis: transaction failed") 12 13// Tx implements Redis transactions as described in 14// http://redis.io/topics/transactions. It's NOT safe for concurrent use 15// by multiple goroutines, because Exec resets list of watched keys. 16// If you don't need WATCH it is better to use Pipeline. 17type Tx struct { 18 baseClient 19 cmdable 20 statefulCmdable 21 hooks 22 ctx context.Context 23} 24 25func (c *Client) newTx(ctx context.Context) *Tx { 26 tx := Tx{ 27 baseClient: baseClient{ 28 opt: c.opt, 29 connPool: pool.NewStickyConnPool(c.connPool), 30 }, 31 hooks: c.hooks.clone(), 32 ctx: ctx, 33 } 34 tx.init() 35 return &tx 36} 37 38func (c *Tx) init() { 39 c.cmdable = c.Process 40 c.statefulCmdable = c.Process 41} 42 43func (c *Tx) Context() context.Context { 44 return c.ctx 45} 46 47func (c *Tx) WithContext(ctx context.Context) *Tx { 48 if ctx == nil { 49 panic("nil context") 50 } 51 clone := *c 52 clone.init() 53 clone.hooks.lock() 54 clone.ctx = ctx 55 return &clone 56} 57 58func (c *Tx) Process(ctx context.Context, cmd Cmder) error { 59 return c.hooks.process(ctx, cmd, c.baseClient.process) 60} 61 62// Watch prepares a transaction and marks the keys to be watched 63// for conditional execution if there are any keys. 64// 65// The transaction is automatically closed when fn exits. 66func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { 67 tx := c.newTx(ctx) 68 if len(keys) > 0 { 69 if err := tx.Watch(ctx, keys...).Err(); err != nil { 70 _ = tx.Close(ctx) 71 return err 72 } 73 } 74 75 err := fn(tx) 76 _ = tx.Close(ctx) 77 return err 78} 79 80// Close closes the transaction, releasing any open resources. 81func (c *Tx) Close(ctx context.Context) error { 82 _ = c.Unwatch(ctx).Err() 83 return c.baseClient.Close() 84} 85 86// Watch marks the keys to be watched for conditional execution 87// of a transaction. 88func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd { 89 args := make([]interface{}, 1+len(keys)) 90 args[0] = "watch" 91 for i, key := range keys { 92 args[1+i] = key 93 } 94 cmd := NewStatusCmd(ctx, args...) 95 _ = c.Process(ctx, cmd) 96 return cmd 97} 98 99// Unwatch flushes all the previously watched keys for a transaction. 100func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd { 101 args := make([]interface{}, 1+len(keys)) 102 args[0] = "unwatch" 103 for i, key := range keys { 104 args[1+i] = key 105 } 106 cmd := NewStatusCmd(ctx, args...) 107 _ = c.Process(ctx, cmd) 108 return cmd 109} 110 111// Pipeline creates a pipeline. Usually it is more convenient to use Pipelined. 112func (c *Tx) Pipeline() Pipeliner { 113 pipe := Pipeline{ 114 ctx: c.ctx, 115 exec: func(ctx context.Context, cmds []Cmder) error { 116 return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline) 117 }, 118 } 119 pipe.init() 120 return &pipe 121} 122 123// Pipelined executes commands queued in the fn outside of the transaction. 124// Use TxPipelined if you need transactional behavior. 125func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 126 return c.Pipeline().Pipelined(ctx, fn) 127} 128 129// TxPipelined executes commands queued in the fn in the transaction. 130// 131// When using WATCH, EXEC will execute commands only if the watched keys 132// were not modified, allowing for a check-and-set mechanism. 133// 134// Exec always returns list of commands. If transaction fails 135// TxFailedErr is returned. Otherwise Exec returns an error of the first 136// failed command or nil. 137func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 138 return c.TxPipeline().Pipelined(ctx, fn) 139} 140 141// TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined. 142func (c *Tx) TxPipeline() Pipeliner { 143 pipe := Pipeline{ 144 ctx: c.ctx, 145 exec: func(ctx context.Context, cmds []Cmder) error { 146 return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline) 147 }, 148 } 149 pipe.init() 150 return &pipe 151} 152