1package redis 2 3import ( 4 "context" 5 6 "github.com/go-redis/redis/v8/internal/pool" 7 "github.com/go-redis/redis/v8/internal/proto" 8) 9 10// TxFailedErr transaction redis failed. 11const TxFailedErr = proto.RedisError("redis: transaction failed") 12 13// Tx implements Redis transactions as described in 14// http://redis.io/topics/transactions. It's NOT safe for concurrent use 15// by multiple goroutines, because Exec resets list of watched keys. 16// If you don't need WATCH it is better to use Pipeline. 17type Tx struct { 18 baseClient 19 cmdable 20 statefulCmdable 21 hooks 22 ctx context.Context 23} 24 25func (c *Client) newTx(ctx context.Context) *Tx { 26 tx := Tx{ 27 baseClient: baseClient{ 28 opt: c.opt, 29 connPool: pool.NewStickyConnPool(c.connPool), 30 }, 31 hooks: c.hooks.clone(), 32 ctx: ctx, 33 } 34 tx.init() 35 return &tx 36} 37 38func (c *Tx) init() { 39 c.cmdable = c.Process 40 c.statefulCmdable = c.Process 41} 42 43func (c *Tx) Context() context.Context { 44 return c.ctx 45} 46 47func (c *Tx) WithContext(ctx context.Context) *Tx { 48 if ctx == nil { 49 panic("nil context") 50 } 51 clone := *c 52 clone.init() 53 clone.hooks.lock() 54 clone.ctx = ctx 55 return &clone 56} 57 58func (c *Tx) Process(ctx context.Context, cmd Cmder) error { 59 return c.hooks.process(ctx, cmd, c.baseClient.process) 60} 61 62// Watch prepares a transaction and marks the keys to be watched 63// for conditional execution if there are any keys. 64// 65// The transaction is automatically closed when fn exits. 66func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { 67 tx := c.newTx(ctx) 68 defer tx.Close(ctx) 69 if len(keys) > 0 { 70 if err := tx.Watch(ctx, keys...).Err(); err != nil { 71 return err 72 } 73 } 74 return fn(tx) 75} 76 77// Close closes the transaction, releasing any open resources. 78func (c *Tx) Close(ctx context.Context) error { 79 _ = c.Unwatch(ctx).Err() 80 return c.baseClient.Close() 81} 82 83// Watch marks the keys to be watched for conditional execution 84// of a transaction. 85func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd { 86 args := make([]interface{}, 1+len(keys)) 87 args[0] = "watch" 88 for i, key := range keys { 89 args[1+i] = key 90 } 91 cmd := NewStatusCmd(ctx, args...) 92 _ = c.Process(ctx, cmd) 93 return cmd 94} 95 96// Unwatch flushes all the previously watched keys for a transaction. 97func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd { 98 args := make([]interface{}, 1+len(keys)) 99 args[0] = "unwatch" 100 for i, key := range keys { 101 args[1+i] = key 102 } 103 cmd := NewStatusCmd(ctx, args...) 104 _ = c.Process(ctx, cmd) 105 return cmd 106} 107 108// Pipeline creates a pipeline. Usually it is more convenient to use Pipelined. 109func (c *Tx) Pipeline() Pipeliner { 110 pipe := Pipeline{ 111 ctx: c.ctx, 112 exec: func(ctx context.Context, cmds []Cmder) error { 113 return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline) 114 }, 115 } 116 pipe.init() 117 return &pipe 118} 119 120// Pipelined executes commands queued in the fn outside of the transaction. 121// Use TxPipelined if you need transactional behavior. 122func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 123 return c.Pipeline().Pipelined(ctx, fn) 124} 125 126// TxPipelined executes commands queued in the fn in the transaction. 127// 128// When using WATCH, EXEC will execute commands only if the watched keys 129// were not modified, allowing for a check-and-set mechanism. 130// 131// Exec always returns list of commands. If transaction fails 132// TxFailedErr is returned. Otherwise Exec returns an error of the first 133// failed command or nil. 134func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { 135 return c.TxPipeline().Pipelined(ctx, fn) 136} 137 138// TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined. 139func (c *Tx) TxPipeline() Pipeliner { 140 pipe := Pipeline{ 141 ctx: c.ctx, 142 exec: func(ctx context.Context, cmds []Cmder) error { 143 return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline) 144 }, 145 } 146 pipe.init() 147 return &pipe 148} 149