1package api 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 "net/http" 8) 9 10// Txn is used to manipulate the Txn API 11type Txn struct { 12 c *Client 13} 14 15// Txn is used to return a handle to the K/V apis 16func (c *Client) Txn() *Txn { 17 return &Txn{c} 18} 19 20// TxnOp is the internal format we send to Consul. Currently only K/V and 21// check operations are supported. 22type TxnOp struct { 23 KV *KVTxnOp 24 Node *NodeTxnOp 25 Service *ServiceTxnOp 26 Check *CheckTxnOp 27} 28 29// TxnOps is a list of transaction operations. 30type TxnOps []*TxnOp 31 32// TxnResult is the internal format we receive from Consul. 33type TxnResult struct { 34 KV *KVPair 35 Node *Node 36 Service *CatalogService 37 Check *HealthCheck 38} 39 40// TxnResults is a list of TxnResult objects. 41type TxnResults []*TxnResult 42 43// TxnError is used to return information about an operation in a transaction. 44type TxnError struct { 45 OpIndex int 46 What string 47} 48 49// TxnErrors is a list of TxnError objects. 50type TxnErrors []*TxnError 51 52// TxnResponse is the internal format we receive from Consul. 53type TxnResponse struct { 54 Results TxnResults 55 Errors TxnErrors 56} 57 58// KVOp constants give possible operations available in a transaction. 59type KVOp string 60 61const ( 62 KVSet KVOp = "set" 63 KVDelete KVOp = "delete" 64 KVDeleteCAS KVOp = "delete-cas" 65 KVDeleteTree KVOp = "delete-tree" 66 KVCAS KVOp = "cas" 67 KVLock KVOp = "lock" 68 KVUnlock KVOp = "unlock" 69 KVGet KVOp = "get" 70 KVGetTree KVOp = "get-tree" 71 KVCheckSession KVOp = "check-session" 72 KVCheckIndex KVOp = "check-index" 73 KVCheckNotExists KVOp = "check-not-exists" 74) 75 76// KVTxnOp defines a single operation inside a transaction. 77type KVTxnOp struct { 78 Verb KVOp 79 Key string 80 Value []byte 81 Flags uint64 82 Index uint64 83 Session string 84 Namespace string `json:",omitempty"` 85 Partition string `json:",omitempty"` 86} 87 88// KVTxnOps defines a set of operations to be performed inside a single 89// transaction. 90type KVTxnOps []*KVTxnOp 91 92// KVTxnResponse has the outcome of a transaction. 93type KVTxnResponse struct { 94 Results []*KVPair 95 Errors TxnErrors 96} 97 98// SessionOp constants give possible operations available in a transaction. 99type SessionOp string 100 101const ( 102 SessionDelete SessionOp = "delete" 103) 104 105// SessionTxnOp defines a single operation inside a transaction. 106type SessionTxnOp struct { 107 Verb SessionOp 108 Session Session 109} 110 111// NodeOp constants give possible operations available in a transaction. 112type NodeOp string 113 114const ( 115 NodeGet NodeOp = "get" 116 NodeSet NodeOp = "set" 117 NodeCAS NodeOp = "cas" 118 NodeDelete NodeOp = "delete" 119 NodeDeleteCAS NodeOp = "delete-cas" 120) 121 122// NodeTxnOp defines a single operation inside a transaction. 123type NodeTxnOp struct { 124 Verb NodeOp 125 Node Node 126} 127 128// ServiceOp constants give possible operations available in a transaction. 129type ServiceOp string 130 131const ( 132 ServiceGet ServiceOp = "get" 133 ServiceSet ServiceOp = "set" 134 ServiceCAS ServiceOp = "cas" 135 ServiceDelete ServiceOp = "delete" 136 ServiceDeleteCAS ServiceOp = "delete-cas" 137) 138 139// ServiceTxnOp defines a single operation inside a transaction. 140type ServiceTxnOp struct { 141 Verb ServiceOp 142 Node string 143 Service AgentService 144} 145 146// CheckOp constants give possible operations available in a transaction. 147type CheckOp string 148 149const ( 150 CheckGet CheckOp = "get" 151 CheckSet CheckOp = "set" 152 CheckCAS CheckOp = "cas" 153 CheckDelete CheckOp = "delete" 154 CheckDeleteCAS CheckOp = "delete-cas" 155) 156 157// CheckTxnOp defines a single operation inside a transaction. 158type CheckTxnOp struct { 159 Verb CheckOp 160 Check HealthCheck 161} 162 163// Txn is used to apply multiple Consul operations in a single, atomic transaction. 164// 165// Note that Go will perform the required base64 encoding on the values 166// automatically because the type is a byte slice. Transactions are defined as a 167// list of operations to perform, using the different fields in the TxnOp structure 168// to define operations. If any operation fails, none of the changes are applied 169// to the state store. 170// 171// Even though this is generally a write operation, we take a QueryOptions input 172// and return a QueryMeta output. If the transaction contains only read ops, then 173// Consul will fast-path it to a different endpoint internally which supports 174// consistency controls, but not blocking. If there are write operations then 175// the request will always be routed through raft and any consistency settings 176// will be ignored. 177// 178// Here's an example: 179// 180// ops := KVTxnOps{ 181// &KVTxnOp{ 182// Verb: KVLock, 183// Key: "test/lock", 184// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", 185// Value: []byte("hello"), 186// }, 187// &KVTxnOp{ 188// Verb: KVGet, 189// Key: "another/key", 190// }, 191// &CheckTxnOp{ 192// Verb: CheckSet, 193// HealthCheck: HealthCheck{ 194// Node: "foo", 195// CheckID: "redis:a", 196// Name: "Redis Health Check", 197// Status: "passing", 198// }, 199// } 200// } 201// ok, response, _, err := kv.Txn(&ops, nil) 202// 203// If there is a problem making the transaction request then an error will be 204// returned. Otherwise, the ok value will be true if the transaction succeeded 205// or false if it was rolled back. The response is a structured return value which 206// will have the outcome of the transaction. Its Results member will have entries 207// for each operation. For KV operations, Deleted keys will have a nil entry in the 208// results, and to save space, the Value of each key in the Results will be nil 209// unless the operation is a KVGet. If the transaction was rolled back, the Errors 210// member will have entries referencing the index of the operation that failed 211// along with an error message. 212func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { 213 return t.c.txn(txn, q) 214} 215 216func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { 217 r := c.newRequest("PUT", "/v1/txn") 218 r.setQueryOptions(q) 219 220 r.obj = txn 221 rtt, resp, err := c.doRequest(r) 222 if err != nil { 223 return false, nil, nil, err 224 } 225 defer closeResponseBody(resp) 226 227 qm := &QueryMeta{} 228 parseQueryMeta(resp, qm) 229 qm.RequestTime = rtt 230 231 if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { 232 var txnResp TxnResponse 233 if err := decodeBody(resp, &txnResp); err != nil { 234 return false, nil, nil, err 235 } 236 237 return resp.StatusCode == http.StatusOK, &txnResp, qm, nil 238 } 239 240 var buf bytes.Buffer 241 if _, err := io.Copy(&buf, resp.Body); err != nil { 242 return false, nil, nil, fmt.Errorf("Failed to read response: %v", err) 243 } 244 return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String()) 245} 246