1package api 2 3import ( 4 "strings" 5 "testing" 6 "time" 7 8 "github.com/hashicorp/consul/sdk/testutil/retry" 9 10 "github.com/hashicorp/go-uuid" 11 12 "github.com/stretchr/testify/require" 13) 14 15func TestAPI_ClientTxn(t *testing.T) { 16 t.Parallel() 17 c, s := makeClient(t) 18 defer s.Stop() 19 20 s.WaitForSerfCheck(t) 21 22 session := c.Session() 23 txn := c.Txn() 24 25 // Set up a test service and health check. 26 nodeID, err := uuid.GenerateUUID() 27 require.NoError(t, err) 28 29 catalog := c.Catalog() 30 reg := &CatalogRegistration{ 31 ID: nodeID, 32 Node: "foo", 33 Address: "2.2.2.2", 34 Service: &AgentService{ 35 ID: "foo1", 36 Service: "foo", 37 }, 38 Checks: HealthChecks{ 39 { 40 CheckID: "bar", 41 Status: "critical", 42 Definition: HealthCheckDefinition{ 43 TCP: "1.1.1.1", 44 IntervalDuration: 5 * time.Second, 45 TimeoutDuration: 10 * time.Second, 46 DeregisterCriticalServiceAfterDuration: 20 * time.Second, 47 }, 48 }, 49 { 50 CheckID: "baz", 51 Status: "passing", 52 Definition: HealthCheckDefinition{ 53 TCP: "2.2.2.2", 54 Interval: ReadableDuration(40 * time.Second), 55 Timeout: ReadableDuration(80 * time.Second), 56 DeregisterCriticalServiceAfter: ReadableDuration(160 * time.Second), 57 }, 58 }, 59 }, 60 } 61 _, err = catalog.Register(reg, nil) 62 require.NoError(t, err) 63 64 node, _, err := catalog.Node("foo", nil) 65 require.NoError(t, err) 66 require.Equal(t, nodeID, node.Node.ID) 67 68 // Make a session. 69 id, _, err := session.CreateNoChecks(nil, nil) 70 if err != nil { 71 t.Fatalf("err: %v", err) 72 } 73 defer session.Destroy(id, nil) 74 75 // Acquire and get the key via a transaction, but don't supply a valid 76 // session. 77 key := testKey() 78 value := []byte("test") 79 ops := TxnOps{ 80 &TxnOp{ 81 KV: &KVTxnOp{ 82 Verb: KVLock, 83 Key: key, 84 Value: value, 85 }, 86 }, 87 &TxnOp{ 88 KV: &KVTxnOp{ 89 Verb: KVGet, 90 Key: key, 91 }, 92 }, 93 &TxnOp{ 94 Node: &NodeTxnOp{ 95 Verb: NodeGet, 96 Node: Node{Node: "foo"}, 97 }, 98 }, 99 &TxnOp{ 100 Service: &ServiceTxnOp{ 101 Verb: ServiceGet, 102 Node: "foo", 103 Service: AgentService{ID: "foo1"}, 104 }, 105 }, 106 &TxnOp{ 107 Check: &CheckTxnOp{ 108 Verb: CheckGet, 109 Check: HealthCheck{Node: "foo", CheckID: "bar"}, 110 }, 111 }, 112 &TxnOp{ 113 Check: &CheckTxnOp{ 114 Verb: CheckGet, 115 Check: HealthCheck{Node: "foo", CheckID: "baz"}, 116 }, 117 }, 118 } 119 ok, ret, _, err := txn.Txn(ops, nil) 120 if err != nil { 121 t.Fatalf("err: %v", err) 122 } else if ok { 123 t.Fatalf("transaction should have failed") 124 } 125 126 if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 { 127 t.Fatalf("bad: %v", ret.Errors[2]) 128 } 129 if ret.Errors[0].OpIndex != 0 || 130 !strings.Contains(ret.Errors[0].What, "missing session") || 131 !strings.Contains(ret.Errors[1].What, "doesn't exist") { 132 t.Fatalf("bad: %v", ret.Errors[0]) 133 } 134 135 // Now poke in a real session and try again. 136 ops[0].KV.Session = id 137 ok, ret, _, err = txn.Txn(ops, nil) 138 if err != nil { 139 t.Fatalf("err: %v", err) 140 } else if !ok { 141 t.Fatalf("transaction failure") 142 } 143 144 if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 6 { 145 t.Fatalf("bad: %v", ret) 146 } 147 expected := TxnResults{ 148 &TxnResult{ 149 KV: &KVPair{ 150 Key: key, 151 Session: id, 152 LockIndex: 1, 153 CreateIndex: ret.Results[0].KV.CreateIndex, 154 ModifyIndex: ret.Results[0].KV.ModifyIndex, 155 Namespace: ret.Results[0].KV.Namespace, 156 }, 157 }, 158 &TxnResult{ 159 KV: &KVPair{ 160 Key: key, 161 Session: id, 162 Value: []byte("test"), 163 LockIndex: 1, 164 CreateIndex: ret.Results[1].KV.CreateIndex, 165 ModifyIndex: ret.Results[1].KV.ModifyIndex, 166 Namespace: ret.Results[0].KV.Namespace, 167 }, 168 }, 169 &TxnResult{ 170 Node: &Node{ 171 ID: nodeID, 172 Node: "foo", 173 Address: "2.2.2.2", 174 Datacenter: "dc1", 175 CreateIndex: ret.Results[2].Node.CreateIndex, 176 ModifyIndex: ret.Results[2].Node.CreateIndex, 177 }, 178 }, 179 &TxnResult{ 180 Service: &CatalogService{ 181 ID: "foo1", 182 CreateIndex: ret.Results[3].Service.CreateIndex, 183 ModifyIndex: ret.Results[3].Service.CreateIndex, 184 Namespace: defaultNamespace, 185 }, 186 }, 187 &TxnResult{ 188 Check: &HealthCheck{ 189 Node: "foo", 190 CheckID: "bar", 191 Status: "critical", 192 Definition: HealthCheckDefinition{ 193 TCP: "1.1.1.1", 194 Interval: ReadableDuration(5 * time.Second), 195 IntervalDuration: 5 * time.Second, 196 Timeout: ReadableDuration(10 * time.Second), 197 TimeoutDuration: 10 * time.Second, 198 DeregisterCriticalServiceAfter: ReadableDuration(20 * time.Second), 199 DeregisterCriticalServiceAfterDuration: 20 * time.Second, 200 }, 201 Type: "tcp", 202 Namespace: defaultNamespace, 203 CreateIndex: ret.Results[4].Check.CreateIndex, 204 ModifyIndex: ret.Results[4].Check.CreateIndex, 205 }, 206 }, 207 &TxnResult{ 208 Check: &HealthCheck{ 209 Node: "foo", 210 CheckID: "baz", 211 Status: "passing", 212 Definition: HealthCheckDefinition{ 213 TCP: "2.2.2.2", 214 Interval: ReadableDuration(40 * time.Second), 215 IntervalDuration: 40 * time.Second, 216 Timeout: ReadableDuration(80 * time.Second), 217 TimeoutDuration: 80 * time.Second, 218 DeregisterCriticalServiceAfter: ReadableDuration(160 * time.Second), 219 DeregisterCriticalServiceAfterDuration: 160 * time.Second, 220 }, 221 Type: "tcp", 222 Namespace: defaultNamespace, 223 CreateIndex: ret.Results[4].Check.CreateIndex, 224 ModifyIndex: ret.Results[4].Check.CreateIndex, 225 }, 226 }, 227 } 228 require.Equal(t, ret.Results, expected) 229 230 retry.Run(t, func(r *retry.R) { 231 // Run a read-only transaction. 232 ops = TxnOps{ 233 &TxnOp{ 234 KV: &KVTxnOp{ 235 Verb: KVGet, 236 Key: key, 237 }, 238 }, 239 &TxnOp{ 240 Node: &NodeTxnOp{ 241 Verb: NodeGet, 242 Node: Node{ID: s.Config.NodeID, Node: s.Config.NodeName}, 243 }, 244 }, 245 } 246 ok, ret, _, err = txn.Txn(ops, nil) 247 if err != nil { 248 r.Fatalf("err: %v", err) 249 } else if !ok { 250 r.Fatalf("transaction failure") 251 } 252 253 expected = TxnResults{ 254 &TxnResult{ 255 KV: &KVPair{ 256 Key: key, 257 Session: id, 258 Value: []byte("test"), 259 LockIndex: 1, 260 CreateIndex: ret.Results[0].KV.CreateIndex, 261 ModifyIndex: ret.Results[0].KV.ModifyIndex, 262 Namespace: ret.Results[0].KV.Namespace, 263 }, 264 }, 265 &TxnResult{ 266 Node: &Node{ 267 ID: s.Config.NodeID, 268 Node: s.Config.NodeName, 269 Address: "127.0.0.1", 270 Datacenter: "dc1", 271 TaggedAddresses: map[string]string{ 272 "lan": s.Config.Bind, 273 "lan_ipv4": s.Config.Bind, 274 "wan": s.Config.Bind, 275 "wan_ipv4": s.Config.Bind, 276 }, 277 Meta: map[string]string{"consul-network-segment": ""}, 278 CreateIndex: ret.Results[1].Node.CreateIndex, 279 ModifyIndex: ret.Results[1].Node.ModifyIndex, 280 }, 281 }, 282 } 283 require.Equal(r, ret.Results, expected) 284 }) 285 286 // Sanity check using the regular GET API. 287 kv := c.KV() 288 pair, meta, err := kv.Get(key, nil) 289 if err != nil { 290 t.Fatalf("err: %v", err) 291 } 292 if pair == nil { 293 t.Fatalf("expected value: %#v", pair) 294 } 295 if pair.LockIndex != 1 { 296 t.Fatalf("Expected lock: %v", pair) 297 } 298 if pair.Session != id { 299 t.Fatalf("Expected lock: %v", pair) 300 } 301 if meta.LastIndex == 0 { 302 t.Fatalf("unexpected value: %#v", meta) 303 } 304} 305