1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18	"context"
19	"sync"
20
21	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
22
23	"google.golang.org/grpc"
24)
25
26// Txn is the interface that wraps mini-transactions.
27//
28//	 Txn(context.TODO()).If(
29//	  Compare(Value(k1), ">", v1),
30//	  Compare(Version(k1), "=", 2)
31//	 ).Then(
32//	  OpPut(k2,v2), OpPut(k3,v3)
33//	 ).Else(
34//	  OpPut(k4,v4), OpPut(k5,v5)
35//	 ).Commit()
36//
37type Txn interface {
38	// If takes a list of comparison. If all comparisons passed in succeed,
39	// the operations passed into Then() will be executed. Or the operations
40	// passed into Else() will be executed.
41	If(cs ...Cmp) Txn
42
43	// Then takes a list of operations. The Ops list will be executed, if the
44	// comparisons passed in If() succeed.
45	Then(ops ...Op) Txn
46
47	// Else takes a list of operations. The Ops list will be executed, if the
48	// comparisons passed in If() fail.
49	Else(ops ...Op) Txn
50
51	// Commit tries to commit the transaction.
52	Commit() (*TxnResponse, error)
53}
54
55type txn struct {
56	kv  *kv
57	ctx context.Context
58
59	mu    sync.Mutex
60	cif   bool
61	cthen bool
62	celse bool
63
64	isWrite bool
65
66	cmps []*pb.Compare
67
68	sus []*pb.RequestOp
69	fas []*pb.RequestOp
70
71	callOpts []grpc.CallOption
72}
73
74func (txn *txn) If(cs ...Cmp) Txn {
75	txn.mu.Lock()
76	defer txn.mu.Unlock()
77
78	if txn.cif {
79		panic("cannot call If twice!")
80	}
81
82	if txn.cthen {
83		panic("cannot call If after Then!")
84	}
85
86	if txn.celse {
87		panic("cannot call If after Else!")
88	}
89
90	txn.cif = true
91
92	for i := range cs {
93		txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
94	}
95
96	return txn
97}
98
99func (txn *txn) Then(ops ...Op) Txn {
100	txn.mu.Lock()
101	defer txn.mu.Unlock()
102
103	if txn.cthen {
104		panic("cannot call Then twice!")
105	}
106	if txn.celse {
107		panic("cannot call Then after Else!")
108	}
109
110	txn.cthen = true
111
112	for _, op := range ops {
113		txn.isWrite = txn.isWrite || op.isWrite()
114		txn.sus = append(txn.sus, op.toRequestOp())
115	}
116
117	return txn
118}
119
120func (txn *txn) Else(ops ...Op) Txn {
121	txn.mu.Lock()
122	defer txn.mu.Unlock()
123
124	if txn.celse {
125		panic("cannot call Else twice!")
126	}
127
128	txn.celse = true
129
130	for _, op := range ops {
131		txn.isWrite = txn.isWrite || op.isWrite()
132		txn.fas = append(txn.fas, op.toRequestOp())
133	}
134
135	return txn
136}
137
138func (txn *txn) Commit() (*TxnResponse, error) {
139	txn.mu.Lock()
140	defer txn.mu.Unlock()
141
142	r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
143
144	var resp *pb.TxnResponse
145	var err error
146	resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
147	if err != nil {
148		return nil, toErr(txn.ctx, err)
149	}
150	return (*TxnResponse)(resp), nil
151}
152