1package state 2 3import ( 4 "fmt" 5 "strings" 6 "testing" 7 8 "github.com/hashicorp/consul/agent/structs" 9 "github.com/hashicorp/consul/api" 10 "github.com/hashicorp/consul/types" 11 "github.com/pascaldekloe/goe/verify" 12 "github.com/stretchr/testify/require" 13) 14 15func TestStateStore_Txn_Intention(t *testing.T) { 16 require := require.New(t) 17 s := testStateStore(t) 18 19 // Create some intentions. 20 ixn1 := &structs.Intention{ 21 ID: testUUID(), 22 SourceNS: "default", 23 SourceName: "web", 24 DestinationNS: "default", 25 DestinationName: "db", 26 Meta: map[string]string{}, 27 } 28 ixn2 := &structs.Intention{ 29 ID: testUUID(), 30 SourceNS: "default", 31 SourceName: "db", 32 DestinationNS: "default", 33 DestinationName: "*", 34 Action: structs.IntentionActionDeny, 35 Meta: map[string]string{}, 36 } 37 ixn3 := &structs.Intention{ 38 ID: testUUID(), 39 SourceNS: "default", 40 SourceName: "foo", 41 DestinationNS: "default", 42 DestinationName: "*", 43 Meta: map[string]string{}, 44 } 45 46 // Write the first two to the state store, leave the third 47 // to be created by the transaction operation. 48 require.NoError(s.IntentionSet(1, ixn1)) 49 require.NoError(s.IntentionSet(2, ixn2)) 50 51 // Set up a transaction that hits every operation. 52 ops := structs.TxnOps{ 53 &structs.TxnOp{ 54 Intention: &structs.TxnIntentionOp{ 55 Op: structs.IntentionOpUpdate, 56 Intention: ixn1, 57 }, 58 }, 59 &structs.TxnOp{ 60 Intention: &structs.TxnIntentionOp{ 61 Op: structs.IntentionOpDelete, 62 Intention: ixn2, 63 }, 64 }, 65 &structs.TxnOp{ 66 Intention: &structs.TxnIntentionOp{ 67 Op: structs.IntentionOpCreate, 68 Intention: ixn3, 69 }, 70 }, 71 } 72 results, errors := s.TxnRW(3, ops) 73 if len(errors) > 0 { 74 t.Fatalf("err: %v", errors) 75 } 76 77 // Make sure the response looks as expected. 78 expected := structs.TxnResults{} 79 verify.Values(t, "", results, expected) 80 81 // Pull the resulting state store contents. 82 idx, actual, err := s.Intentions(nil) 83 require.NoError(err) 84 if idx != 3 { 85 t.Fatalf("bad index: %d", idx) 86 } 87 88 // Make sure it looks as expected. 89 intentions := structs.Intentions{ 90 &structs.Intention{ 91 ID: ixn1.ID, 92 SourceNS: "default", 93 SourceName: "web", 94 DestinationNS: "default", 95 DestinationName: "db", 96 Meta: map[string]string{}, 97 Precedence: 9, 98 RaftIndex: structs.RaftIndex{ 99 CreateIndex: 1, 100 ModifyIndex: 3, 101 }, 102 }, 103 &structs.Intention{ 104 ID: ixn3.ID, 105 SourceNS: "default", 106 SourceName: "foo", 107 DestinationNS: "default", 108 DestinationName: "*", 109 Meta: map[string]string{}, 110 Precedence: 6, 111 RaftIndex: structs.RaftIndex{ 112 CreateIndex: 3, 113 ModifyIndex: 3, 114 }, 115 }, 116 } 117 verify.Values(t, "", actual, intentions) 118} 119 120func TestStateStore_Txn_Node(t *testing.T) { 121 require := require.New(t) 122 s := testStateStore(t) 123 124 // Create some nodes. 125 var nodes [5]structs.Node 126 for i := 0; i < len(nodes); i++ { 127 nodes[i] = structs.Node{ 128 Node: fmt.Sprintf("node%d", i+1), 129 ID: types.NodeID(testUUID()), 130 } 131 132 // Leave node5 to be created by an operation. 133 if i < 5 { 134 s.EnsureNode(uint64(i+1), &nodes[i]) 135 } 136 } 137 138 // Set up a transaction that hits every operation. 139 ops := structs.TxnOps{ 140 &structs.TxnOp{ 141 Node: &structs.TxnNodeOp{ 142 Verb: api.NodeGet, 143 Node: nodes[0], 144 }, 145 }, 146 &structs.TxnOp{ 147 Node: &structs.TxnNodeOp{ 148 Verb: api.NodeSet, 149 Node: nodes[4], 150 }, 151 }, 152 &structs.TxnOp{ 153 Node: &structs.TxnNodeOp{ 154 Verb: api.NodeCAS, 155 Node: structs.Node{ 156 Node: "node2", 157 ID: nodes[1].ID, 158 Datacenter: "dc2", 159 RaftIndex: structs.RaftIndex{ModifyIndex: 2}, 160 }, 161 }, 162 }, 163 &structs.TxnOp{ 164 Node: &structs.TxnNodeOp{ 165 Verb: api.NodeDelete, 166 Node: structs.Node{Node: "node3"}, 167 }, 168 }, 169 &structs.TxnOp{ 170 Node: &structs.TxnNodeOp{ 171 Verb: api.NodeDeleteCAS, 172 Node: structs.Node{ 173 Node: "node4", 174 RaftIndex: structs.RaftIndex{ModifyIndex: 4}, 175 }, 176 }, 177 }, 178 } 179 results, errors := s.TxnRW(8, ops) 180 if len(errors) > 0 { 181 t.Fatalf("err: %v", errors) 182 } 183 184 // Make sure the response looks as expected. 185 nodes[1].Datacenter = "dc2" 186 nodes[1].ModifyIndex = 8 187 expected := structs.TxnResults{ 188 &structs.TxnResult{ 189 Node: &nodes[0], 190 }, 191 &structs.TxnResult{ 192 Node: &nodes[4], 193 }, 194 &structs.TxnResult{ 195 Node: &nodes[1], 196 }, 197 } 198 verify.Values(t, "", results, expected) 199 200 // Pull the resulting state store contents. 201 idx, actual, err := s.Nodes(nil) 202 require.NoError(err) 203 if idx != 8 { 204 t.Fatalf("bad index: %d", idx) 205 } 206 207 // Make sure it looks as expected. 208 expectedNodes := structs.Nodes{&nodes[0], &nodes[1], &nodes[4]} 209 verify.Values(t, "", actual, expectedNodes) 210} 211 212func TestStateStore_Txn_Service(t *testing.T) { 213 require := require.New(t) 214 s := testStateStore(t) 215 216 testRegisterNode(t, s, 1, "node1") 217 218 // Create some services. 219 for i := 1; i <= 4; i++ { 220 testRegisterService(t, s, uint64(i+1), "node1", fmt.Sprintf("svc%d", i)) 221 } 222 223 // Set up a transaction that hits every operation. 224 ops := structs.TxnOps{ 225 &structs.TxnOp{ 226 Service: &structs.TxnServiceOp{ 227 Verb: api.ServiceGet, 228 Node: "node1", 229 Service: structs.NodeService{ID: "svc1"}, 230 }, 231 }, 232 &structs.TxnOp{ 233 Service: &structs.TxnServiceOp{ 234 Verb: api.ServiceSet, 235 Node: "node1", 236 Service: structs.NodeService{ID: "svc5"}, 237 }, 238 }, 239 &structs.TxnOp{ 240 Service: &structs.TxnServiceOp{ 241 Verb: api.ServiceCAS, 242 Node: "node1", 243 Service: structs.NodeService{ 244 ID: "svc2", 245 Tags: []string{"modified"}, 246 RaftIndex: structs.RaftIndex{ModifyIndex: 3}, 247 }, 248 }, 249 }, 250 &structs.TxnOp{ 251 Service: &structs.TxnServiceOp{ 252 Verb: api.ServiceDelete, 253 Node: "node1", 254 Service: structs.NodeService{ID: "svc3"}, 255 }, 256 }, 257 &structs.TxnOp{ 258 Service: &structs.TxnServiceOp{ 259 Verb: api.ServiceDeleteCAS, 260 Node: "node1", 261 Service: structs.NodeService{ 262 ID: "svc4", 263 RaftIndex: structs.RaftIndex{ModifyIndex: 5}, 264 }, 265 }, 266 }, 267 } 268 results, errors := s.TxnRW(6, ops) 269 if len(errors) > 0 { 270 t.Fatalf("err: %v", errors) 271 } 272 273 // Make sure the response looks as expected. 274 expected := structs.TxnResults{ 275 &structs.TxnResult{ 276 Service: &structs.NodeService{ 277 ID: "svc1", 278 Service: "svc1", 279 Address: "1.1.1.1", 280 Port: 1111, 281 Weights: &structs.Weights{Passing: 1, Warning: 1}, 282 RaftIndex: structs.RaftIndex{ 283 CreateIndex: 2, 284 ModifyIndex: 2, 285 }, 286 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 287 }, 288 }, 289 &structs.TxnResult{ 290 Service: &structs.NodeService{ 291 ID: "svc5", 292 Weights: &structs.Weights{Passing: 1, Warning: 1}, 293 RaftIndex: structs.RaftIndex{ 294 CreateIndex: 6, 295 ModifyIndex: 6, 296 }, 297 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 298 }, 299 }, 300 &structs.TxnResult{ 301 Service: &structs.NodeService{ 302 ID: "svc2", 303 Tags: []string{"modified"}, 304 Weights: &structs.Weights{Passing: 1, Warning: 1}, 305 RaftIndex: structs.RaftIndex{ 306 CreateIndex: 3, 307 ModifyIndex: 6, 308 }, 309 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 310 }, 311 }, 312 } 313 verify.Values(t, "", results, expected) 314 315 // Pull the resulting state store contents. 316 idx, actual, err := s.NodeServices(nil, "node1", nil) 317 require.NoError(err) 318 if idx != 6 { 319 t.Fatalf("bad index: %d", idx) 320 } 321 322 // Make sure it looks as expected. 323 expectedServices := &structs.NodeServices{ 324 Node: &structs.Node{ 325 Node: "node1", 326 RaftIndex: structs.RaftIndex{ 327 CreateIndex: 1, 328 ModifyIndex: 1, 329 }, 330 }, 331 Services: map[string]*structs.NodeService{ 332 "svc1": &structs.NodeService{ 333 ID: "svc1", 334 Service: "svc1", 335 Address: "1.1.1.1", 336 Port: 1111, 337 RaftIndex: structs.RaftIndex{ 338 CreateIndex: 2, 339 ModifyIndex: 2, 340 }, 341 Weights: &structs.Weights{Passing: 1, Warning: 1}, 342 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 343 }, 344 "svc5": &structs.NodeService{ 345 ID: "svc5", 346 RaftIndex: structs.RaftIndex{ 347 CreateIndex: 6, 348 ModifyIndex: 6, 349 }, 350 Weights: &structs.Weights{Passing: 1, Warning: 1}, 351 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 352 }, 353 "svc2": &structs.NodeService{ 354 ID: "svc2", 355 Tags: []string{"modified"}, 356 RaftIndex: structs.RaftIndex{ 357 CreateIndex: 3, 358 ModifyIndex: 6, 359 }, 360 Weights: &structs.Weights{Passing: 1, Warning: 1}, 361 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 362 }, 363 }, 364 } 365 verify.Values(t, "", actual, expectedServices) 366} 367 368func TestStateStore_Txn_Checks(t *testing.T) { 369 require := require.New(t) 370 s := testStateStore(t) 371 372 testRegisterNode(t, s, 1, "node1") 373 374 // Create some checks. 375 for i := 1; i <= 4; i++ { 376 testRegisterCheck(t, s, uint64(i+1), "node1", "", types.CheckID(fmt.Sprintf("check%d", i)), "failing") 377 } 378 379 // Set up a transaction that hits every operation. 380 ops := structs.TxnOps{ 381 &structs.TxnOp{ 382 Check: &structs.TxnCheckOp{ 383 Verb: api.CheckGet, 384 Check: structs.HealthCheck{Node: "node1", CheckID: "check1"}, 385 }, 386 }, 387 &structs.TxnOp{ 388 Check: &structs.TxnCheckOp{ 389 Verb: api.CheckSet, 390 Check: structs.HealthCheck{Node: "node1", CheckID: "check5", Status: "passing"}, 391 }, 392 }, 393 &structs.TxnOp{ 394 Check: &structs.TxnCheckOp{ 395 Verb: api.CheckCAS, 396 Check: structs.HealthCheck{ 397 Node: "node1", 398 CheckID: "check2", 399 Status: "warning", 400 RaftIndex: structs.RaftIndex{ModifyIndex: 3}, 401 }, 402 }, 403 }, 404 &structs.TxnOp{ 405 Check: &structs.TxnCheckOp{ 406 Verb: api.CheckDelete, 407 Check: structs.HealthCheck{Node: "node1", CheckID: "check3"}, 408 }, 409 }, 410 &structs.TxnOp{ 411 Check: &structs.TxnCheckOp{ 412 Verb: api.CheckDeleteCAS, 413 Check: structs.HealthCheck{ 414 Node: "node1", 415 CheckID: "check4", 416 RaftIndex: structs.RaftIndex{ModifyIndex: 5}, 417 }, 418 }, 419 }, 420 } 421 results, errors := s.TxnRW(6, ops) 422 if len(errors) > 0 { 423 t.Fatalf("err: %v", errors) 424 } 425 426 // Make sure the response looks as expected. 427 expected := structs.TxnResults{ 428 &structs.TxnResult{ 429 Check: &structs.HealthCheck{ 430 Node: "node1", 431 CheckID: "check1", 432 Status: "failing", 433 RaftIndex: structs.RaftIndex{ 434 CreateIndex: 2, 435 ModifyIndex: 2, 436 }, 437 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 438 }, 439 }, 440 &structs.TxnResult{ 441 Check: &structs.HealthCheck{ 442 Node: "node1", 443 CheckID: "check5", 444 Status: "passing", 445 RaftIndex: structs.RaftIndex{ 446 CreateIndex: 6, 447 ModifyIndex: 6, 448 }, 449 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 450 }, 451 }, 452 &structs.TxnResult{ 453 Check: &structs.HealthCheck{ 454 Node: "node1", 455 CheckID: "check2", 456 Status: "warning", 457 RaftIndex: structs.RaftIndex{ 458 CreateIndex: 3, 459 ModifyIndex: 6, 460 }, 461 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 462 }, 463 }, 464 } 465 verify.Values(t, "", results, expected) 466 467 // Pull the resulting state store contents. 468 idx, actual, err := s.NodeChecks(nil, "node1", nil) 469 require.NoError(err) 470 if idx != 6 { 471 t.Fatalf("bad index: %d", idx) 472 } 473 474 // Make sure it looks as expected. 475 expectedChecks := structs.HealthChecks{ 476 &structs.HealthCheck{ 477 Node: "node1", 478 CheckID: "check1", 479 Status: "failing", 480 RaftIndex: structs.RaftIndex{ 481 CreateIndex: 2, 482 ModifyIndex: 2, 483 }, 484 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 485 }, 486 &structs.HealthCheck{ 487 Node: "node1", 488 CheckID: "check2", 489 Status: "warning", 490 RaftIndex: structs.RaftIndex{ 491 CreateIndex: 3, 492 ModifyIndex: 6, 493 }, 494 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 495 }, 496 &structs.HealthCheck{ 497 Node: "node1", 498 CheckID: "check5", 499 Status: "passing", 500 RaftIndex: structs.RaftIndex{ 501 CreateIndex: 6, 502 ModifyIndex: 6, 503 }, 504 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 505 }, 506 } 507 verify.Values(t, "", actual, expectedChecks) 508} 509 510func TestStateStore_Txn_KVS(t *testing.T) { 511 s := testStateStore(t) 512 513 // Create KV entries in the state store. 514 testSetKey(t, s, 1, "foo/delete", "bar", nil) 515 testSetKey(t, s, 2, "foo/bar/baz", "baz", nil) 516 testSetKey(t, s, 3, "foo/bar/zip", "zip", nil) 517 testSetKey(t, s, 4, "foo/zorp", "zorp", nil) 518 testSetKey(t, s, 5, "foo/update", "stale", nil) 519 520 // Make a real session. 521 testRegisterNode(t, s, 6, "node1") 522 session := testUUID() 523 if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil { 524 t.Fatalf("err: %s", err) 525 } 526 527 // Set up a transaction that hits every operation. 528 ops := structs.TxnOps{ 529 &structs.TxnOp{ 530 KV: &structs.TxnKVOp{ 531 Verb: api.KVGetTree, 532 DirEnt: structs.DirEntry{ 533 Key: "foo/bar", 534 }, 535 }, 536 }, 537 &structs.TxnOp{ 538 KV: &structs.TxnKVOp{ 539 Verb: api.KVSet, 540 DirEnt: structs.DirEntry{ 541 Key: "foo/new", 542 Value: []byte("one"), 543 }, 544 }, 545 }, 546 &structs.TxnOp{ 547 KV: &structs.TxnKVOp{ 548 Verb: api.KVDelete, 549 DirEnt: structs.DirEntry{ 550 Key: "foo/zorp", 551 }, 552 }, 553 }, 554 &structs.TxnOp{ 555 KV: &structs.TxnKVOp{ 556 Verb: api.KVDeleteCAS, 557 DirEnt: structs.DirEntry{ 558 Key: "foo/delete", 559 RaftIndex: structs.RaftIndex{ 560 ModifyIndex: 1, 561 }, 562 }, 563 }, 564 }, 565 &structs.TxnOp{ 566 KV: &structs.TxnKVOp{ 567 Verb: api.KVDeleteTree, 568 DirEnt: structs.DirEntry{ 569 Key: "foo/bar", 570 }, 571 }, 572 }, 573 &structs.TxnOp{ 574 KV: &structs.TxnKVOp{ 575 Verb: api.KVGet, 576 DirEnt: structs.DirEntry{ 577 Key: "foo/update", 578 }, 579 }, 580 }, 581 &structs.TxnOp{ 582 KV: &structs.TxnKVOp{ 583 Verb: api.KVCheckIndex, 584 DirEnt: structs.DirEntry{ 585 Key: "foo/update", 586 RaftIndex: structs.RaftIndex{ 587 ModifyIndex: 5, 588 }, 589 }, 590 }, 591 }, 592 &structs.TxnOp{ 593 KV: &structs.TxnKVOp{ 594 Verb: api.KVCAS, 595 DirEnt: structs.DirEntry{ 596 Key: "foo/update", 597 Value: []byte("new"), 598 RaftIndex: structs.RaftIndex{ 599 ModifyIndex: 5, 600 }, 601 }, 602 }, 603 }, 604 &structs.TxnOp{ 605 KV: &structs.TxnKVOp{ 606 Verb: api.KVGet, 607 DirEnt: structs.DirEntry{ 608 Key: "foo/update", 609 }, 610 }, 611 }, 612 &structs.TxnOp{ 613 KV: &structs.TxnKVOp{ 614 Verb: api.KVCheckIndex, 615 DirEnt: structs.DirEntry{ 616 Key: "foo/update", 617 RaftIndex: structs.RaftIndex{ 618 ModifyIndex: 8, 619 }, 620 }, 621 }, 622 }, 623 &structs.TxnOp{ 624 KV: &structs.TxnKVOp{ 625 Verb: api.KVLock, 626 DirEnt: structs.DirEntry{ 627 Key: "foo/lock", 628 Session: session, 629 }, 630 }, 631 }, 632 &structs.TxnOp{ 633 KV: &structs.TxnKVOp{ 634 Verb: api.KVCheckSession, 635 DirEnt: structs.DirEntry{ 636 Key: "foo/lock", 637 Session: session, 638 }, 639 }, 640 }, 641 &structs.TxnOp{ 642 KV: &structs.TxnKVOp{ 643 Verb: api.KVUnlock, 644 DirEnt: structs.DirEntry{ 645 Key: "foo/lock", 646 Session: session, 647 }, 648 }, 649 }, 650 &structs.TxnOp{ 651 KV: &structs.TxnKVOp{ 652 Verb: api.KVCheckSession, 653 DirEnt: structs.DirEntry{ 654 Key: "foo/lock", 655 Session: "", 656 }, 657 }, 658 }, 659 } 660 results, errors := s.TxnRW(8, ops) 661 if len(errors) > 0 { 662 t.Fatalf("err: %v", errors) 663 } 664 665 // Make sure the response looks as expected. 666 expected := structs.TxnResults{ 667 &structs.TxnResult{ 668 KV: &structs.DirEntry{ 669 Key: "foo/bar/baz", 670 Value: []byte("baz"), 671 RaftIndex: structs.RaftIndex{ 672 CreateIndex: 2, 673 ModifyIndex: 2, 674 }, 675 }, 676 }, 677 &structs.TxnResult{ 678 KV: &structs.DirEntry{ 679 Key: "foo/bar/zip", 680 Value: []byte("zip"), 681 RaftIndex: structs.RaftIndex{ 682 CreateIndex: 3, 683 ModifyIndex: 3, 684 }, 685 }, 686 }, 687 &structs.TxnResult{ 688 KV: &structs.DirEntry{ 689 Key: "foo/new", 690 RaftIndex: structs.RaftIndex{ 691 CreateIndex: 8, 692 ModifyIndex: 8, 693 }, 694 }, 695 }, 696 &structs.TxnResult{ 697 KV: &structs.DirEntry{ 698 Key: "foo/update", 699 Value: []byte("stale"), 700 RaftIndex: structs.RaftIndex{ 701 CreateIndex: 5, 702 ModifyIndex: 5, 703 }, 704 }, 705 }, 706 &structs.TxnResult{ 707 KV: &structs.DirEntry{ 708 709 Key: "foo/update", 710 RaftIndex: structs.RaftIndex{ 711 CreateIndex: 5, 712 ModifyIndex: 5, 713 }, 714 }, 715 }, 716 &structs.TxnResult{ 717 KV: &structs.DirEntry{ 718 Key: "foo/update", 719 RaftIndex: structs.RaftIndex{ 720 CreateIndex: 5, 721 ModifyIndex: 8, 722 }, 723 }, 724 }, 725 &structs.TxnResult{ 726 KV: &structs.DirEntry{ 727 Key: "foo/update", 728 Value: []byte("new"), 729 RaftIndex: structs.RaftIndex{ 730 CreateIndex: 5, 731 ModifyIndex: 8, 732 }, 733 }, 734 }, 735 &structs.TxnResult{ 736 KV: &structs.DirEntry{ 737 Key: "foo/update", 738 RaftIndex: structs.RaftIndex{ 739 CreateIndex: 5, 740 ModifyIndex: 8, 741 }, 742 }, 743 }, 744 &structs.TxnResult{ 745 KV: &structs.DirEntry{ 746 Key: "foo/lock", 747 Session: session, 748 LockIndex: 1, 749 RaftIndex: structs.RaftIndex{ 750 CreateIndex: 8, 751 ModifyIndex: 8, 752 }, 753 }, 754 }, 755 &structs.TxnResult{ 756 KV: &structs.DirEntry{ 757 Key: "foo/lock", 758 Session: session, 759 LockIndex: 1, 760 RaftIndex: structs.RaftIndex{ 761 CreateIndex: 8, 762 ModifyIndex: 8, 763 }, 764 }, 765 }, 766 &structs.TxnResult{ 767 KV: &structs.DirEntry{ 768 Key: "foo/lock", 769 LockIndex: 1, 770 RaftIndex: structs.RaftIndex{ 771 CreateIndex: 8, 772 ModifyIndex: 8, 773 }, 774 }, 775 }, 776 &structs.TxnResult{ 777 KV: &structs.DirEntry{ 778 Key: "foo/lock", 779 LockIndex: 1, 780 RaftIndex: structs.RaftIndex{ 781 CreateIndex: 8, 782 ModifyIndex: 8, 783 }, 784 }, 785 }, 786 } 787 if len(results) != len(expected) { 788 t.Fatalf("bad: %v", results) 789 } 790 for i, e := range expected { 791 if e.KV.Key != results[i].KV.Key { 792 t.Fatalf("expected key %s, got %s", e.KV.Key, results[i].KV.Key) 793 } 794 if e.KV.LockIndex != results[i].KV.LockIndex { 795 t.Fatalf("expected lock index %d, got %d", e.KV.LockIndex, results[i].KV.LockIndex) 796 } 797 if e.KV.CreateIndex != results[i].KV.CreateIndex { 798 t.Fatalf("expected create index %d, got %d", e.KV.CreateIndex, results[i].KV.CreateIndex) 799 } 800 if e.KV.ModifyIndex != results[i].KV.ModifyIndex { 801 t.Fatalf("expected modify index %d, got %d", e.KV.ModifyIndex, results[i].KV.ModifyIndex) 802 } 803 } 804 805 // Pull the resulting state store contents. 806 idx, actual, err := s.KVSList(nil, "", nil) 807 if err != nil { 808 t.Fatalf("err: %s", err) 809 } 810 if idx != 8 { 811 t.Fatalf("bad index: %d", idx) 812 } 813 814 // Make sure it looks as expected. 815 entries := structs.DirEntries{ 816 &structs.DirEntry{ 817 Key: "foo/lock", 818 LockIndex: 1, 819 RaftIndex: structs.RaftIndex{ 820 CreateIndex: 8, 821 ModifyIndex: 8, 822 }, 823 }, 824 &structs.DirEntry{ 825 Key: "foo/new", 826 Value: []byte("one"), 827 RaftIndex: structs.RaftIndex{ 828 CreateIndex: 8, 829 ModifyIndex: 8, 830 }, 831 }, 832 &structs.DirEntry{ 833 Key: "foo/update", 834 Value: []byte("new"), 835 RaftIndex: structs.RaftIndex{ 836 CreateIndex: 5, 837 ModifyIndex: 8, 838 }, 839 }, 840 } 841 if len(actual) != len(entries) { 842 t.Fatalf("bad len: %d != %d", len(actual), len(entries)) 843 } 844 for i, e := range entries { 845 if e.Key != actual[i].Key { 846 t.Fatalf("expected key %s, got %s", e.Key, actual[i].Key) 847 } 848 if string(e.Value) != string(actual[i].Value) { 849 t.Fatalf("expected value %s, got %s", e.Value, actual[i].Value) 850 } 851 if e.LockIndex != actual[i].LockIndex { 852 t.Fatalf("expected lock index %d, got %d", e.LockIndex, actual[i].LockIndex) 853 } 854 if e.CreateIndex != actual[i].CreateIndex { 855 t.Fatalf("expected create index %d, got %d", e.CreateIndex, actual[i].CreateIndex) 856 } 857 if e.ModifyIndex != actual[i].ModifyIndex { 858 t.Fatalf("expected modify index %d, got %d", e.ModifyIndex, actual[i].ModifyIndex) 859 } 860 } 861} 862 863func TestStateStore_Txn_KVS_Rollback(t *testing.T) { 864 s := testStateStore(t) 865 866 // Create KV entries in the state store. 867 testSetKey(t, s, 1, "foo/delete", "bar", nil) 868 testSetKey(t, s, 2, "foo/update", "stale", nil) 869 870 testRegisterNode(t, s, 3, "node1") 871 session := testUUID() 872 if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil { 873 t.Fatalf("err: %s", err) 874 } 875 ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session}) 876 if !ok || err != nil { 877 t.Fatalf("didn't get the lock: %v %s", ok, err) 878 } 879 880 bogus := testUUID() 881 if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil { 882 t.Fatalf("err: %s", err) 883 } 884 885 // This function verifies that the state store wasn't changed. 886 verifyStateStore := func(desc string) { 887 idx, actual, err := s.KVSList(nil, "", nil) 888 if err != nil { 889 t.Fatalf("err (%s): %s", desc, err) 890 } 891 if idx != 5 { 892 t.Fatalf("bad index (%s): %d", desc, idx) 893 } 894 895 // Make sure it looks as expected. 896 entries := structs.DirEntries{ 897 &structs.DirEntry{ 898 Key: "foo/delete", 899 Value: []byte("bar"), 900 RaftIndex: structs.RaftIndex{ 901 CreateIndex: 1, 902 ModifyIndex: 1, 903 }, 904 }, 905 &structs.DirEntry{ 906 Key: "foo/lock", 907 Value: []byte("foo"), 908 LockIndex: 1, 909 Session: session, 910 RaftIndex: structs.RaftIndex{ 911 CreateIndex: 5, 912 ModifyIndex: 5, 913 }, 914 }, 915 &structs.DirEntry{ 916 Key: "foo/update", 917 Value: []byte("stale"), 918 RaftIndex: structs.RaftIndex{ 919 CreateIndex: 2, 920 ModifyIndex: 2, 921 }, 922 }, 923 } 924 if len(actual) != len(entries) { 925 t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries)) 926 } 927 for i, e := range entries { 928 if e.Key != actual[i].Key { 929 t.Fatalf("expected key %s, got %s", e.Key, actual[i].Key) 930 } 931 if string(e.Value) != string(actual[i].Value) { 932 t.Fatalf("expected value %s, got %s", e.Value, actual[i].Value) 933 } 934 if e.LockIndex != actual[i].LockIndex { 935 t.Fatalf("expected lock index %d, got %d", e.LockIndex, actual[i].LockIndex) 936 } 937 if e.CreateIndex != actual[i].CreateIndex { 938 t.Fatalf("expected create index %d, got %d", e.CreateIndex, actual[i].CreateIndex) 939 } 940 if e.ModifyIndex != actual[i].ModifyIndex { 941 t.Fatalf("expected modify index %d, got %d", e.ModifyIndex, actual[i].ModifyIndex) 942 } 943 } 944 } 945 verifyStateStore("initial") 946 947 // Set up a transaction that fails every operation. 948 ops := structs.TxnOps{ 949 &structs.TxnOp{ 950 KV: &structs.TxnKVOp{ 951 Verb: api.KVCAS, 952 DirEnt: structs.DirEntry{ 953 Key: "foo/update", 954 Value: []byte("new"), 955 RaftIndex: structs.RaftIndex{ 956 ModifyIndex: 1, 957 }, 958 }, 959 }, 960 }, 961 &structs.TxnOp{ 962 KV: &structs.TxnKVOp{ 963 Verb: api.KVLock, 964 DirEnt: structs.DirEntry{ 965 Key: "foo/lock", 966 Session: bogus, 967 }, 968 }, 969 }, 970 &structs.TxnOp{ 971 KV: &structs.TxnKVOp{ 972 Verb: api.KVUnlock, 973 DirEnt: structs.DirEntry{ 974 Key: "foo/lock", 975 Session: bogus, 976 }, 977 }, 978 }, 979 &structs.TxnOp{ 980 KV: &structs.TxnKVOp{ 981 Verb: api.KVCheckSession, 982 DirEnt: structs.DirEntry{ 983 Key: "foo/lock", 984 Session: bogus, 985 }, 986 }, 987 }, 988 &structs.TxnOp{ 989 KV: &structs.TxnKVOp{ 990 Verb: api.KVGet, 991 DirEnt: structs.DirEntry{ 992 Key: "nope", 993 }, 994 }, 995 }, 996 &structs.TxnOp{ 997 KV: &structs.TxnKVOp{ 998 Verb: api.KVCheckSession, 999 DirEnt: structs.DirEntry{ 1000 Key: "nope", 1001 Session: bogus, 1002 }, 1003 }, 1004 }, 1005 &structs.TxnOp{ 1006 KV: &structs.TxnKVOp{ 1007 Verb: api.KVCheckIndex, 1008 DirEnt: structs.DirEntry{ 1009 Key: "foo/lock", 1010 RaftIndex: structs.RaftIndex{ 1011 ModifyIndex: 6, 1012 }, 1013 }, 1014 }, 1015 }, 1016 &structs.TxnOp{ 1017 KV: &structs.TxnKVOp{ 1018 Verb: api.KVCheckIndex, 1019 DirEnt: structs.DirEntry{ 1020 Key: "nope", 1021 RaftIndex: structs.RaftIndex{ 1022 ModifyIndex: 6, 1023 }, 1024 }, 1025 }, 1026 }, 1027 &structs.TxnOp{ 1028 KV: &structs.TxnKVOp{ 1029 Verb: "nope", 1030 DirEnt: structs.DirEntry{ 1031 Key: "foo/delete", 1032 }, 1033 }, 1034 }, 1035 } 1036 results, errors := s.TxnRW(7, ops) 1037 if len(errors) != len(ops) { 1038 t.Fatalf("bad len: %d != %d", len(errors), len(ops)) 1039 } 1040 if len(results) != 0 { 1041 t.Fatalf("bad len: %d != 0", len(results)) 1042 } 1043 verifyStateStore("after") 1044 1045 // Make sure the errors look reasonable. 1046 expected := []string{ 1047 "index is stale", 1048 "lock is already held", 1049 "lock isn't held, or is held by another session", 1050 "current session", 1051 `key "nope" doesn't exist`, 1052 `key "nope" doesn't exist`, 1053 "current modify index", 1054 `key "nope" doesn't exist`, 1055 "unknown KV verb", 1056 } 1057 if len(errors) != len(expected) { 1058 t.Fatalf("bad len: %d != %d", len(errors), len(expected)) 1059 } 1060 for i, msg := range expected { 1061 if errors[i].OpIndex != i { 1062 t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) 1063 } 1064 if !strings.Contains(errors[i].Error(), msg) { 1065 t.Fatalf("bad %d: %v", i, errors[i].Error()) 1066 } 1067 } 1068} 1069 1070func TestStateStore_Txn_KVS_RO(t *testing.T) { 1071 s := testStateStore(t) 1072 1073 // Create KV entries in the state store. 1074 testSetKey(t, s, 1, "foo", "bar", nil) 1075 testSetKey(t, s, 2, "foo/bar/baz", "baz", nil) 1076 testSetKey(t, s, 3, "foo/bar/zip", "zip", nil) 1077 1078 // Set up a transaction that hits all the read-only operations. 1079 ops := structs.TxnOps{ 1080 &structs.TxnOp{ 1081 KV: &structs.TxnKVOp{ 1082 Verb: api.KVGetTree, 1083 DirEnt: structs.DirEntry{ 1084 Key: "foo/bar", 1085 }, 1086 }, 1087 }, 1088 &structs.TxnOp{ 1089 KV: &structs.TxnKVOp{ 1090 Verb: api.KVGet, 1091 DirEnt: structs.DirEntry{ 1092 Key: "foo", 1093 }, 1094 }, 1095 }, 1096 &structs.TxnOp{ 1097 KV: &structs.TxnKVOp{ 1098 Verb: api.KVCheckSession, 1099 DirEnt: structs.DirEntry{ 1100 Key: "foo/bar/baz", 1101 Session: "", 1102 }, 1103 }, 1104 }, 1105 &structs.TxnOp{ 1106 KV: &structs.TxnKVOp{ 1107 Verb: api.KVCheckSession, 1108 DirEnt: structs.DirEntry{ 1109 Key: "foo/bar/zip", 1110 RaftIndex: structs.RaftIndex{ 1111 ModifyIndex: 3, 1112 }, 1113 }, 1114 }, 1115 }, 1116 } 1117 results, errors := s.TxnRO(ops) 1118 if len(errors) > 0 { 1119 t.Fatalf("err: %v", errors) 1120 } 1121 1122 // Make sure the response looks as expected. 1123 expected := structs.TxnResults{ 1124 &structs.TxnResult{ 1125 KV: &structs.DirEntry{ 1126 Key: "foo/bar/baz", 1127 Value: []byte("baz"), 1128 RaftIndex: structs.RaftIndex{ 1129 CreateIndex: 2, 1130 ModifyIndex: 2, 1131 }, 1132 }, 1133 }, 1134 &structs.TxnResult{ 1135 KV: &structs.DirEntry{ 1136 Key: "foo/bar/zip", 1137 Value: []byte("zip"), 1138 RaftIndex: structs.RaftIndex{ 1139 CreateIndex: 3, 1140 ModifyIndex: 3, 1141 }, 1142 }, 1143 }, 1144 &structs.TxnResult{ 1145 KV: &structs.DirEntry{ 1146 Key: "foo", 1147 Value: []byte("bar"), 1148 RaftIndex: structs.RaftIndex{ 1149 CreateIndex: 1, 1150 ModifyIndex: 1, 1151 }, 1152 }, 1153 }, 1154 &structs.TxnResult{ 1155 KV: &structs.DirEntry{ 1156 Key: "foo/bar/baz", 1157 RaftIndex: structs.RaftIndex{ 1158 CreateIndex: 2, 1159 ModifyIndex: 2, 1160 }, 1161 }, 1162 }, 1163 &structs.TxnResult{ 1164 KV: &structs.DirEntry{ 1165 Key: "foo/bar/zip", 1166 RaftIndex: structs.RaftIndex{ 1167 CreateIndex: 3, 1168 ModifyIndex: 3, 1169 }, 1170 }, 1171 }, 1172 } 1173 if len(results) != len(expected) { 1174 t.Fatalf("bad: %v", results) 1175 } 1176 for i, e := range expected { 1177 if e.KV.Key != results[i].KV.Key { 1178 t.Fatalf("expected key %s, got %s", e.KV.Key, results[i].KV.Key) 1179 } 1180 if e.KV.LockIndex != results[i].KV.LockIndex { 1181 t.Fatalf("expected lock index %d, got %d", e.KV.LockIndex, results[i].KV.LockIndex) 1182 } 1183 if e.KV.CreateIndex != results[i].KV.CreateIndex { 1184 t.Fatalf("expected create index %d, got %d", e.KV.CreateIndex, results[i].KV.CreateIndex) 1185 } 1186 if e.KV.ModifyIndex != results[i].KV.ModifyIndex { 1187 t.Fatalf("expected modify index %d, got %d", e.KV.ModifyIndex, results[i].KV.ModifyIndex) 1188 } 1189 } 1190} 1191 1192func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) { 1193 s := testStateStore(t) 1194 1195 // Create KV entries in the state store. 1196 testSetKey(t, s, 1, "foo", "bar", nil) 1197 testSetKey(t, s, 2, "foo/bar/baz", "baz", nil) 1198 testSetKey(t, s, 3, "foo/bar/zip", "zip", nil) 1199 1200 // Set up a transaction that hits all the read-only operations. 1201 ops := structs.TxnOps{ 1202 &structs.TxnOp{ 1203 KV: &structs.TxnKVOp{ 1204 Verb: api.KVSet, 1205 DirEnt: structs.DirEntry{ 1206 Key: "foo", 1207 Value: []byte("nope"), 1208 }, 1209 }, 1210 }, 1211 &structs.TxnOp{ 1212 KV: &structs.TxnKVOp{ 1213 Verb: api.KVDelete, 1214 DirEnt: structs.DirEntry{ 1215 Key: "foo/bar/baz", 1216 }, 1217 }, 1218 }, 1219 &structs.TxnOp{ 1220 KV: &structs.TxnKVOp{ 1221 Verb: api.KVDeleteTree, 1222 DirEnt: structs.DirEntry{ 1223 Key: "foo/bar", 1224 }, 1225 }, 1226 }, 1227 } 1228 results, errors := s.TxnRO(ops) 1229 if len(results) > 0 { 1230 t.Fatalf("bad: %v", results) 1231 } 1232 if len(errors) != len(ops) { 1233 t.Fatalf("bad len: %d != %d", len(errors), len(ops)) 1234 } 1235 1236 // Make sure the errors look reasonable (tombstone inserts cause the 1237 // insert errors during the delete operations). 1238 expected := []string{ 1239 "cannot insert in read-only transaction", 1240 "cannot insert in read-only transaction", 1241 "failed recursive deleting kvs entry", 1242 } 1243 if len(errors) != len(expected) { 1244 t.Fatalf("bad len: %d != %d", len(errors), len(expected)) 1245 } 1246 for i, msg := range expected { 1247 if errors[i].OpIndex != i { 1248 t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) 1249 } 1250 if !strings.Contains(errors[i].Error(), msg) { 1251 t.Fatalf("bad %d: %v", i, errors[i].Error()) 1252 } 1253 } 1254} 1255