1package gocbcore 2 3import ( 4 "crypto/x509" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io/ioutil" 9 "log" 10 "net/url" 11 "strconv" 12 "strings" 13 "time" 14 15 "github.com/couchbase/gocbcore/v9/memd" 16) 17 18func (suite *StandardTestSuite) TestCidRetries() { 19 suite.EnsureSupportsFeature(TestFeatureCollections) 20 21 agent, s := suite.GetAgentAndHarness() 22 23 bucketName := suite.BucketName 24 scopeName := suite.ScopeName 25 collectionName := "testCidRetries" 26 27 _, err := testCreateCollection(collectionName, scopeName, bucketName, agent) 28 if err != nil { 29 suite.T().Logf("Failed to create collection: %v", err) 30 } 31 32 // prime the cid map cache 33 s.PushOp(agent.GetCollectionID(scopeName, collectionName, GetCollectionIDOptions{}, 34 func(result *GetCollectionIDResult, err error) { 35 s.Wrap(func() { 36 if err != nil { 37 s.Fatalf("Get CID operation failed: %v", err) 38 } 39 }) 40 }), 41 ) 42 s.Wait(0) 43 44 // delete the collection 45 _, err = testDeleteCollection(collectionName, scopeName, bucketName, agent, true) 46 if err != nil { 47 suite.T().Fatalf("Failed to delete collection: %v", err) 48 } 49 50 // recreate 51 _, err = testCreateCollection(collectionName, scopeName, bucketName, agent) 52 if err != nil { 53 suite.T().Fatalf("Failed to create collection: %v", err) 54 } 55 56 // Set should succeed as we detect cid unknown, fetch the cid and then retry again. This should happen 57 // even if we don't set a retry strategy. 58 s.PushOp(agent.Set(SetOptions{ 59 Key: []byte("test"), 60 Value: []byte("{}"), 61 CollectionName: collectionName, 62 ScopeName: scopeName, 63 }, func(res *StoreResult, err error) { 64 s.Wrap(func() { 65 if err != nil { 66 s.Fatalf("Set operation failed: %v", err) 67 } 68 if res.Cas == Cas(0) { 69 s.Fatalf("Invalid cas received") 70 } 71 }) 72 })) 73 s.Wait(0) 74 75 // Get 76 s.PushOp(agent.Get(GetOptions{ 77 Key: []byte("test"), 78 CollectionName: collectionName, 79 ScopeName: scopeName, 80 }, func(res *GetResult, err error) { 81 s.Wrap(func() { 82 if err != nil { 83 s.Fatalf("Get operation failed: %v", err) 84 } 85 if res.Cas == Cas(0) { 86 s.Fatalf("Invalid cas received") 87 } 88 }) 89 })) 90 s.Wait(0) 91} 92 93func (suite *StandardTestSuite) TestBasicOps() { 94 agent, s := suite.GetAgentAndHarness() 95 96 // Set 97 s.PushOp(agent.Set(SetOptions{ 98 Key: []byte("test"), 99 Value: []byte("{}"), 100 CollectionName: suite.CollectionName, 101 ScopeName: suite.ScopeName, 102 }, func(res *StoreResult, err error) { 103 s.Wrap(func() { 104 if err != nil { 105 s.Fatalf("Set operation failed: %v", err) 106 } 107 if res.Cas == Cas(0) { 108 s.Fatalf("Invalid cas received") 109 } 110 }) 111 })) 112 s.Wait(0) 113 114 // Get 115 s.PushOp(agent.Get(GetOptions{ 116 Key: []byte("test"), 117 CollectionName: suite.CollectionName, 118 ScopeName: suite.ScopeName, 119 }, func(res *GetResult, err error) { 120 s.Wrap(func() { 121 if err != nil { 122 s.Fatalf("Get operation failed: %v", err) 123 } 124 if res.Cas == Cas(0) { 125 s.Fatalf("Invalid cas received") 126 } 127 }) 128 })) 129 s.Wait(0) 130} 131 132func (suite *StandardTestSuite) TestCasMismatch() { 133 agent, s := suite.GetAgentAndHarness() 134 135 // Set 136 var cas Cas 137 s.PushOp(agent.Set(SetOptions{ 138 Key: []byte("testCasMismatch"), 139 Value: []byte("{}"), 140 CollectionName: suite.CollectionName, 141 ScopeName: suite.ScopeName, 142 }, func(res *StoreResult, err error) { 143 s.Wrap(func() { 144 if err != nil { 145 s.Fatalf("Set operation failed: %v", err) 146 } 147 if res.Cas == Cas(0) { 148 s.Fatalf("Invalid cas received") 149 } 150 cas = res.Cas 151 }) 152 })) 153 s.Wait(0) 154 155 // Replace to change cas on the server 156 s.PushOp(agent.Replace(ReplaceOptions{ 157 Key: []byte("testCasMismatch"), 158 Value: []byte("{\"key\":\"value\"}"), 159 CollectionName: suite.CollectionName, 160 ScopeName: suite.ScopeName, 161 }, func(res *StoreResult, err error) { 162 s.Wrap(func() { 163 if err != nil { 164 s.Fatalf("Replace operation failed: %v", err) 165 } 166 if res.Cas == Cas(0) { 167 s.Fatalf("Invalid cas received") 168 } 169 }) 170 })) 171 s.Wait(0) 172 173 // Replace which should fail with a cas mismatch 174 s.PushOp(agent.Replace(ReplaceOptions{ 175 Key: []byte("testCasMismatch"), 176 Value: []byte("{\"key\":\"value2\"}"), 177 CollectionName: suite.CollectionName, 178 ScopeName: suite.ScopeName, 179 Cas: cas, 180 }, func(res *StoreResult, err error) { 181 s.Wrap(func() { 182 if err == nil { 183 s.Fatalf("Set operation succeeded but should have failed") 184 } 185 186 if !errors.Is(err, ErrCasMismatch) { 187 suite.T().Fatalf("Expected CasMismatch error but was %v", err) 188 } 189 }) 190 })) 191 s.Wait(0) 192} 193 194func (suite *StandardTestSuite) TestGetReplica() { 195 suite.EnsureSupportsFeature(TestFeatureReplicas) 196 agent, s := suite.GetAgentAndHarness() 197 198 // Set 199 s.PushOp(agent.Set(SetOptions{ 200 Key: []byte("testReplica"), 201 Value: []byte("{}"), 202 CollectionName: suite.CollectionName, 203 ScopeName: suite.ScopeName, 204 }, func(res *StoreResult, err error) { 205 s.Wrap(func() { 206 if err != nil { 207 s.Fatalf("Set operation failed: %v", err) 208 } 209 if res.Cas == Cas(0) { 210 s.Fatalf("Invalid cas received") 211 } 212 }) 213 })) 214 s.Wait(0) 215 216 retries := 0 217 keyExists := false 218 for { 219 s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{ 220 Key: []byte("testReplica"), 221 ReplicaIdx: 1, 222 CollectionName: suite.CollectionName, 223 ScopeName: suite.ScopeName, 224 }, func(res *GetReplicaResult, err error) { 225 s.Wrap(func() { 226 keyNotFound := errors.Is(err, ErrDocumentNotFound) 227 if err == nil { 228 keyExists = true 229 } else if err != nil && !keyNotFound { 230 s.Fatalf("GetReplica specific returned error that was not document not found: %v", err) 231 } 232 if !keyNotFound && res.Cas == Cas(0) { 233 s.Fatalf("Invalid cas received") 234 } 235 }) 236 })) 237 s.Wait(0) 238 if keyExists { 239 break 240 } 241 retries++ 242 if retries >= 5 { 243 suite.T().Fatalf("GetReplica could not locate key") 244 } 245 time.Sleep(50 * time.Millisecond) 246 } 247} 248 249func (suite *StandardTestSuite) TestDurableWriteGetReplica() { 250 suite.EnsureSupportsFeature(TestFeatureReplicas) 251 suite.EnsureSupportsFeature(TestFeatureEnhancedDurability) 252 agent, s := suite.GetAgentAndHarness() 253 254 // Set 255 s.PushOp(agent.Set(SetOptions{ 256 Key: []byte("testDurableReplica"), 257 Value: []byte("{}"), 258 CollectionName: suite.CollectionName, 259 ScopeName: suite.ScopeName, 260 DurabilityLevel: memd.DurabilityLevelMajority, 261 DurabilityLevelTimeout: 10 * time.Second, 262 }, func(res *StoreResult, err error) { 263 s.Wrap(func() { 264 if err != nil { 265 s.Fatalf("Set operation failed: %v", err) 266 } 267 if res.Cas == Cas(0) { 268 s.Fatalf("Invalid cas received") 269 } 270 }) 271 })) 272 s.Wait(0) 273 274 retries := 0 275 keyExists := false 276 for { 277 s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{ 278 Key: []byte("testDurableReplica"), 279 ReplicaIdx: 1, 280 CollectionName: suite.CollectionName, 281 ScopeName: suite.ScopeName, 282 }, func(res *GetReplicaResult, err error) { 283 s.Wrap(func() { 284 keyNotFound := errors.Is(err, ErrDocumentNotFound) 285 if err == nil { 286 keyExists = true 287 } else if err != nil && !keyNotFound { 288 s.Fatalf("GetReplica specific returned error that was not document not found: %v", err) 289 } 290 if !keyNotFound && res.Cas == Cas(0) { 291 s.Fatalf("Invalid cas received") 292 } 293 }) 294 })) 295 s.Wait(0) 296 if keyExists { 297 break 298 } 299 retries++ 300 if retries >= 5 { 301 suite.T().Fatalf("GetReplica could not locate key") 302 } 303 time.Sleep(50 * time.Millisecond) 304 } 305} 306 307func (suite *StandardTestSuite) TestAddDurableWriteGetReplica() { 308 suite.EnsureSupportsFeature(TestFeatureReplicas) 309 suite.EnsureSupportsFeature(TestFeatureEnhancedDurability) 310 agent, s := suite.GetAgentAndHarness() 311 312 s.PushOp(agent.Add(AddOptions{ 313 Key: []byte("testAddDurableReplica"), 314 Value: []byte("{}"), 315 CollectionName: suite.CollectionName, 316 ScopeName: suite.ScopeName, 317 DurabilityLevel: memd.DurabilityLevelMajority, 318 DurabilityLevelTimeout: 10 * time.Second, 319 }, func(res *StoreResult, err error) { 320 s.Wrap(func() { 321 if err != nil { 322 s.Fatalf("Add operation failed: %v", err) 323 } 324 if res.Cas == Cas(0) { 325 s.Fatalf("Invalid cas received") 326 } 327 }) 328 })) 329 s.Wait(0) 330 331 retries := 0 332 keyExists := false 333 for { 334 s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{ 335 Key: []byte("testAddDurableReplica"), 336 ReplicaIdx: 1, 337 CollectionName: suite.CollectionName, 338 ScopeName: suite.ScopeName, 339 }, func(res *GetReplicaResult, err error) { 340 s.Wrap(func() { 341 keyNotFound := errors.Is(err, ErrDocumentNotFound) 342 if err == nil { 343 keyExists = true 344 } else if err != nil && !keyNotFound { 345 s.Fatalf("GetReplica specific returned error that was not document not found: %v", err) 346 } 347 if !keyNotFound && res.Cas == Cas(0) { 348 s.Fatalf("Invalid cas received") 349 } 350 }) 351 })) 352 s.Wait(0) 353 if keyExists { 354 break 355 } 356 retries++ 357 if retries >= 5 { 358 suite.T().Fatalf("GetReplica could not locate key") 359 } 360 time.Sleep(50 * time.Millisecond) 361 } 362} 363 364func (suite *StandardTestSuite) TestReplaceDurableWriteGetReplica() { 365 suite.EnsureSupportsFeature(TestFeatureReplicas) 366 suite.EnsureSupportsFeature(TestFeatureEnhancedDurability) 367 agent, s := suite.GetAgentAndHarness() 368 369 s.PushOp(agent.Set(SetOptions{ 370 Key: []byte("testReplaceDurableReplica"), 371 Value: []byte("{}"), 372 CollectionName: suite.CollectionName, 373 ScopeName: suite.ScopeName, 374 DurabilityLevel: memd.DurabilityLevelMajority, 375 DurabilityLevelTimeout: 10 * time.Second, 376 }, func(res *StoreResult, err error) { 377 s.Wrap(func() { 378 if err != nil { 379 s.Fatalf("Set operation failed: %v", err) 380 } 381 if res.Cas == Cas(0) { 382 s.Fatalf("Invalid cas received") 383 } 384 }) 385 })) 386 s.Wait(0) 387 388 s.PushOp(agent.Replace(ReplaceOptions{ 389 Key: []byte("testReplaceDurableReplica"), 390 Value: []byte("{}"), 391 CollectionName: suite.CollectionName, 392 ScopeName: suite.ScopeName, 393 DurabilityLevel: memd.DurabilityLevelMajority, 394 DurabilityLevelTimeout: 10 * time.Second, 395 }, func(res *StoreResult, err error) { 396 s.Wrap(func() { 397 if err != nil { 398 s.Fatalf("Replace operation failed: %v", err) 399 } 400 if res.Cas == Cas(0) { 401 s.Fatalf("Invalid cas received") 402 } 403 }) 404 })) 405 s.Wait(0) 406 407 retries := 0 408 keyExists := false 409 for { 410 s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{ 411 Key: []byte("testReplaceDurableReplica"), 412 ReplicaIdx: 1, 413 CollectionName: suite.CollectionName, 414 ScopeName: suite.ScopeName, 415 }, func(res *GetReplicaResult, err error) { 416 s.Wrap(func() { 417 keyNotFound := errors.Is(err, ErrDocumentNotFound) 418 if err == nil { 419 keyExists = true 420 } else if err != nil && !keyNotFound { 421 s.Fatalf("GetReplica specific returned error that was not document not found: %v", err) 422 } 423 if !keyNotFound && res.Cas == Cas(0) { 424 s.Fatalf("Invalid cas received") 425 } 426 }) 427 })) 428 s.Wait(0) 429 if keyExists { 430 break 431 } 432 retries++ 433 if retries >= 5 { 434 suite.T().Fatalf("GetReplica could not locate key") 435 } 436 time.Sleep(50 * time.Millisecond) 437 } 438} 439 440func (suite *StandardTestSuite) TestDeleteDurableWriteGetReplica() { 441 suite.EnsureSupportsFeature(TestFeatureReplicas) 442 suite.EnsureSupportsFeature(TestFeatureEnhancedDurability) 443 agent, s := suite.GetAgentAndHarness() 444 445 s.PushOp(agent.Set(SetOptions{ 446 Key: []byte("testDeleteDurableReplica"), 447 Value: []byte("{}"), 448 CollectionName: suite.CollectionName, 449 ScopeName: suite.ScopeName, 450 DurabilityLevel: memd.DurabilityLevelMajority, 451 DurabilityLevelTimeout: 10 * time.Second, 452 }, func(res *StoreResult, err error) { 453 s.Wrap(func() { 454 if err != nil { 455 s.Fatalf("Set operation failed: %v", err) 456 } 457 if res.Cas == Cas(0) { 458 s.Fatalf("Invalid cas received") 459 } 460 }) 461 })) 462 s.Wait(0) 463 464 s.PushOp(agent.Delete(DeleteOptions{ 465 Key: []byte("testDeleteDurableReplica"), 466 CollectionName: suite.CollectionName, 467 ScopeName: suite.ScopeName, 468 DurabilityLevel: memd.DurabilityLevelMajority, 469 DurabilityLevelTimeout: 10 * time.Second, 470 }, func(res *DeleteResult, err error) { 471 s.Wrap(func() { 472 if err != nil { 473 s.Fatalf("Delete operation failed: %v", err) 474 } 475 if res.Cas == Cas(0) { 476 s.Fatalf("Invalid cas received") 477 } 478 }) 479 })) 480 s.Wait(0) 481 482 retries := 0 483 keyNotFound := false 484 for { 485 s.PushOp(agent.GetOneReplica(GetOneReplicaOptions{ 486 Key: []byte("testDeleteDurableReplica"), 487 ReplicaIdx: 1, 488 CollectionName: suite.CollectionName, 489 ScopeName: suite.ScopeName, 490 }, func(res *GetReplicaResult, err error) { 491 s.Wrap(func() { 492 if errors.Is(err, ErrDocumentNotFound) { 493 keyNotFound = true 494 } else if err != nil { 495 s.Fatalf("GetReplica specific returned error that was not document not found: %v", err) 496 } 497 if !keyNotFound && res.Cas == Cas(0) { 498 s.Fatalf("Invalid cas received") 499 } 500 }) 501 })) 502 s.Wait(0) 503 if keyNotFound { 504 break 505 } 506 retries++ 507 if retries >= 5 { 508 suite.T().Fatalf("GetReplica could always locate key") 509 } 510 time.Sleep(50 * time.Millisecond) 511 } 512} 513 514func (suite *StandardTestSuite) TestBasicReplace() { 515 agent, s := suite.GetAgentAndHarness() 516 517 oldCas := Cas(0) 518 s.PushOp(agent.Set(SetOptions{ 519 Key: []byte("testx"), 520 Value: []byte("{}"), 521 CollectionName: suite.CollectionName, 522 ScopeName: suite.ScopeName, 523 }, func(res *StoreResult, err error) { 524 oldCas = res.Cas 525 s.Continue() 526 })) 527 s.Wait(0) 528 529 s.PushOp(agent.Replace(ReplaceOptions{ 530 Key: []byte("testx"), 531 Value: []byte("[]"), 532 Cas: oldCas, 533 CollectionName: suite.CollectionName, 534 ScopeName: suite.ScopeName, 535 }, func(res *StoreResult, err error) { 536 s.Wrap(func() { 537 if err != nil { 538 s.Fatalf("Replace operation failed: %v", err) 539 } 540 if res.Cas == Cas(0) { 541 s.Fatalf("Invalid cas received") 542 } 543 }) 544 })) 545 s.Wait(0) 546} 547 548func (suite *StandardTestSuite) TestBasicRemove() { 549 agent, s := suite.GetAgentAndHarness() 550 551 s.PushOp(agent.Set(SetOptions{ 552 Key: []byte("testy"), 553 Value: []byte("{}"), 554 CollectionName: suite.CollectionName, 555 ScopeName: suite.ScopeName, 556 }, func(res *StoreResult, err error) { 557 s.Continue() 558 })) 559 s.Wait(0) 560 561 s.PushOp(agent.Delete(DeleteOptions{ 562 Key: []byte("testy"), 563 CollectionName: suite.CollectionName, 564 ScopeName: suite.ScopeName, 565 }, func(res *DeleteResult, err error) { 566 s.Wrap(func() { 567 if err != nil { 568 s.Fatalf("Remove operation failed: %v", err) 569 } 570 }) 571 })) 572 s.Wait(0) 573} 574 575func (suite *StandardTestSuite) TestBasicInsert() { 576 agent, s := suite.GetAgentAndHarness() 577 578 s.PushOp(agent.Delete(DeleteOptions{ 579 Key: []byte("testz"), 580 CollectionName: suite.CollectionName, 581 ScopeName: suite.ScopeName, 582 }, func(res *DeleteResult, err error) { 583 s.Continue() 584 })) 585 s.Wait(0) 586 587 s.PushOp(agent.Add(AddOptions{ 588 Key: []byte("testz"), 589 Value: []byte("[]"), 590 CollectionName: suite.CollectionName, 591 ScopeName: suite.ScopeName, 592 }, func(res *StoreResult, err error) { 593 s.Wrap(func() { 594 if err != nil { 595 s.Fatalf("Add operation failed: %v", err) 596 } 597 if res.Cas == Cas(0) { 598 s.Fatalf("Invalid cas received") 599 } 600 }) 601 })) 602 s.Wait(0) 603} 604 605func (suite *StandardTestSuite) TestBasicCounters() { 606 agent, s := suite.GetAgentAndHarness() 607 608 // Counters 609 s.PushOp(agent.Delete(DeleteOptions{ 610 Key: []byte("testCounters"), 611 CollectionName: suite.CollectionName, 612 ScopeName: suite.ScopeName, 613 }, func(res *DeleteResult, err error) { 614 s.Continue() 615 })) 616 s.Wait(0) 617 618 s.PushOp(agent.Increment(CounterOptions{ 619 Key: []byte("testCounters"), 620 Delta: 5, 621 Initial: 11, 622 CollectionName: suite.CollectionName, 623 ScopeName: suite.ScopeName, 624 }, func(res *CounterResult, err error) { 625 s.Wrap(func() { 626 if err != nil { 627 s.Fatalf("Increment operation failed: %v", err) 628 } 629 if res.Cas == Cas(0) { 630 s.Fatalf("Invalid cas received") 631 } 632 if res.Value != 11 { 633 s.Fatalf("Increment did not operate properly") 634 } 635 }) 636 })) 637 s.Wait(0) 638 639 s.PushOp(agent.Increment(CounterOptions{ 640 Key: []byte("testCounters"), 641 Delta: 5, 642 Initial: 22, 643 CollectionName: suite.CollectionName, 644 ScopeName: suite.ScopeName, 645 }, func(res *CounterResult, err error) { 646 s.Wrap(func() { 647 if err != nil { 648 s.Fatalf("Increment operation failed: %v", err) 649 } 650 if res.Cas == Cas(0) { 651 s.Fatalf("Invalid cas received") 652 } 653 if res.Value != 16 { 654 s.Fatalf("Increment did not operate properly") 655 } 656 }) 657 })) 658 s.Wait(0) 659 660 s.PushOp(agent.Decrement(CounterOptions{ 661 Key: []byte("testCounters"), 662 Delta: 3, 663 Initial: 65, 664 CollectionName: suite.CollectionName, 665 ScopeName: suite.ScopeName, 666 }, func(res *CounterResult, err error) { 667 s.Wrap(func() { 668 if err != nil { 669 s.Fatalf("Increment operation failed: %v", err) 670 } 671 if res.Cas == Cas(0) { 672 s.Fatalf("Invalid cas received") 673 } 674 if res.Value != 13 { 675 s.Fatalf("Increment did not operate properly") 676 } 677 }) 678 })) 679 s.Wait(0) 680} 681 682func (suite *StandardTestSuite) TestBasicAdjoins() { 683 suite.EnsureSupportsFeature(TestFeatureAdjoin) 684 685 agent, s := suite.GetAgentAndHarness() 686 687 s.PushOp(agent.Set(SetOptions{ 688 Key: []byte("testAdjoins"), 689 Value: []byte("there"), 690 CollectionName: suite.CollectionName, 691 ScopeName: suite.ScopeName, 692 }, func(res *StoreResult, err error) { 693 s.Continue() 694 })) 695 s.Wait(0) 696 697 s.PushOp(agent.Append(AdjoinOptions{ 698 Key: []byte("testAdjoins"), 699 Value: []byte(" Frank!"), 700 CollectionName: suite.CollectionName, 701 ScopeName: suite.ScopeName, 702 }, func(res *AdjoinResult, err error) { 703 s.Wrap(func() { 704 if err != nil { 705 s.Fatalf("Append operation failed: %v", err) 706 } 707 if res.Cas == Cas(0) { 708 s.Fatalf("Invalid cas received") 709 } 710 }) 711 })) 712 s.Wait(0) 713 714 s.PushOp(agent.Prepend(AdjoinOptions{ 715 Key: []byte("testAdjoins"), 716 Value: []byte("Hello "), 717 CollectionName: suite.CollectionName, 718 ScopeName: suite.ScopeName, 719 }, func(res *AdjoinResult, err error) { 720 s.Wrap(func() { 721 if err != nil { 722 s.Fatalf("Prepend operation failed: %v", err) 723 } 724 if res.Cas == Cas(0) { 725 s.Fatalf("Invalid cas received") 726 } 727 }) 728 })) 729 s.Wait(0) 730 731 s.PushOp(agent.Get(GetOptions{ 732 Key: []byte("testAdjoins"), 733 CollectionName: suite.CollectionName, 734 ScopeName: suite.ScopeName, 735 }, func(res *GetResult, err error) { 736 s.Wrap(func() { 737 if err != nil { 738 s.Fatalf("Get operation failed: %v", err) 739 } 740 if res.Cas == Cas(0) { 741 s.Fatalf("Invalid cas received") 742 } 743 744 if string(res.Value) != "Hello there Frank!" { 745 s.Fatalf("Adjoin operations did not behave") 746 } 747 }) 748 })) 749 s.Wait(0) 750} 751 752func (suite *StandardTestSuite) TestExpiry() { 753 agent, s := suite.GetAgentAndHarness() 754 755 s.PushOp(agent.Set(SetOptions{ 756 Key: []byte("testExpiry"), 757 Value: []byte("{}"), 758 Expiry: 1, 759 CollectionName: suite.CollectionName, 760 ScopeName: suite.ScopeName, 761 }, func(res *StoreResult, err error) { 762 s.Wrap(func() { 763 if err != nil { 764 s.Fatalf("Set operation failed: %v", err) 765 } 766 }) 767 })) 768 s.Wait(0) 769 770 suite.TimeTravel(2000 * time.Millisecond) 771 772 s.PushOp(agent.Get(GetOptions{ 773 Key: []byte("testExpiry"), 774 CollectionName: suite.CollectionName, 775 ScopeName: suite.ScopeName, 776 RetryStrategy: NewBestEffortRetryStrategy(nil), 777 }, func(res *GetResult, err error) { 778 s.Wrap(func() { 779 if !errors.Is(err, ErrDocumentNotFound) { 780 s.Fatalf("Get should have returned document not found") 781 } 782 }) 783 })) 784 s.Wait(0) 785} 786 787func (suite *StandardTestSuite) TestTouch() { 788 agent, s := suite.GetAgentAndHarness() 789 790 s.PushOp(agent.Set(SetOptions{ 791 Key: []byte("testTouch"), 792 Value: []byte("{}"), 793 Expiry: 1, 794 CollectionName: suite.CollectionName, 795 ScopeName: suite.ScopeName, 796 }, func(res *StoreResult, err error) { 797 s.Wrap(func() { 798 if err != nil { 799 s.Fatalf("Set operation failed: %v", err) 800 } 801 }) 802 })) 803 s.Wait(0) 804 805 s.PushOp(agent.Touch(TouchOptions{ 806 Key: []byte("testTouch"), 807 Expiry: 3, 808 CollectionName: suite.CollectionName, 809 ScopeName: suite.ScopeName, 810 }, func(res *TouchResult, err error) { 811 s.Wrap(func() { 812 if err != nil { 813 s.Fatalf("Touch operation failed: %v", err) 814 } 815 }) 816 })) 817 s.Wait(0) 818 819 suite.TimeTravel(1500 * time.Millisecond) 820 821 s.PushOp(agent.Get(GetOptions{ 822 Key: []byte("testTouch"), 823 CollectionName: suite.CollectionName, 824 ScopeName: suite.ScopeName, 825 }, func(res *GetResult, err error) { 826 s.Wrap(func() { 827 if err != nil { 828 s.Fatalf("Get should have been successful") 829 } 830 }) 831 })) 832 s.Wait(0) 833 834 suite.TimeTravel(2500 * time.Millisecond) 835 836 s.PushOp(agent.Get(GetOptions{ 837 Key: []byte("testTouch"), 838 CollectionName: suite.CollectionName, 839 ScopeName: suite.ScopeName, 840 }, func(res *GetResult, err error) { 841 s.Wrap(func() { 842 if !errors.Is(err, ErrDocumentNotFound) { 843 s.Fatalf("Get should have returned document not found") 844 } 845 }) 846 })) 847 s.Wait(0) 848} 849 850func (suite *StandardTestSuite) TestGetAndTouch() { 851 agent, s := suite.GetAgentAndHarness() 852 853 s.PushOp(agent.Set(SetOptions{ 854 Key: []byte("testGetAndTouch"), 855 Value: []byte("{}"), 856 Expiry: 1, 857 CollectionName: suite.CollectionName, 858 ScopeName: suite.ScopeName, 859 }, func(res *StoreResult, err error) { 860 s.Wrap(func() { 861 if err != nil { 862 s.Fatalf("Set operation failed: %v", err) 863 } 864 }) 865 })) 866 s.Wait(0) 867 868 s.PushOp(agent.GetAndTouch(GetAndTouchOptions{ 869 Key: []byte("testGetAndTouch"), 870 Expiry: 3, 871 CollectionName: suite.CollectionName, 872 ScopeName: suite.ScopeName, 873 }, func(res *GetAndTouchResult, err error) { 874 s.Wrap(func() { 875 if err != nil { 876 s.Fatalf("Touch operation failed: %v", err) 877 } 878 }) 879 })) 880 s.Wait(0) 881 882 suite.TimeTravel(1500 * time.Millisecond) 883 884 s.PushOp(agent.Get(GetOptions{ 885 Key: []byte("testGetAndTouch"), 886 CollectionName: suite.CollectionName, 887 ScopeName: suite.ScopeName, 888 }, func(res *GetResult, err error) { 889 s.Wrap(func() { 890 if err != nil { 891 s.Fatalf("Get should have been successful") 892 } 893 }) 894 })) 895 s.Wait(0) 896 897 suite.TimeTravel(3000 * time.Millisecond) 898 899 s.PushOp(agent.Get(GetOptions{ 900 Key: []byte("testGetAndTouch"), 901 CollectionName: suite.CollectionName, 902 ScopeName: suite.ScopeName, 903 }, func(res *GetResult, err error) { 904 s.Wrap(func() { 905 if !errors.Is(err, ErrDocumentNotFound) { 906 s.Fatalf("Get should have returned document not found: %v", err) 907 } 908 }) 909 })) 910 s.Wait(0) 911} 912 913// This test will lock the document for 1 second, it will then perform set requests for up to 2 seconds, 914// the operation should succeed within the 2 seconds. 915func (suite *StandardTestSuite) TestRetrySet() { 916 agent, s := suite.GetAgentAndHarness() 917 918 s.PushOp(agent.Set(SetOptions{ 919 Key: []byte("testRetrySet"), 920 Value: []byte("{}"), 921 Expiry: 1, 922 CollectionName: suite.CollectionName, 923 ScopeName: suite.ScopeName, 924 }, func(res *StoreResult, err error) { 925 s.Wrap(func() { 926 if err != nil { 927 s.Fatalf("Set operation failed: %v", err) 928 } 929 }) 930 })) 931 s.Wait(0) 932 933 s.PushOp(agent.GetAndLock(GetAndLockOptions{ 934 Key: []byte("testRetrySet"), 935 LockTime: 1, 936 CollectionName: suite.CollectionName, 937 ScopeName: suite.ScopeName, 938 }, func(res *GetAndLockResult, err error) { 939 s.Wrap(func() { 940 if err != nil { 941 s.Fatalf("GetAndLock operation failed: %v", err) 942 } 943 }) 944 })) 945 s.Wait(0) 946 947 s.PushOp(agent.Set(SetOptions{ 948 Key: []byte("testRetrySet"), 949 Value: []byte("{}"), 950 Expiry: 1, 951 CollectionName: suite.CollectionName, 952 ScopeName: suite.ScopeName, 953 RetryStrategy: NewBestEffortRetryStrategy(nil), 954 }, func(res *StoreResult, err error) { 955 s.Wrap(func() { 956 if err != nil { 957 s.Fatalf("Set operation failed: %v", err) 958 } 959 }) 960 })) 961 s.Wait(0) 962} 963 964func (suite *StandardTestSuite) TestObserve() { 965 suite.EnsureSupportsFeature(TestFeatureReplicas) 966 967 agent, s := suite.GetAgentAndHarness() 968 if agent.HasCollectionsSupport() { 969 suite.T().Skip("Skipping test as observe does not support collections") 970 } 971 972 s.PushOp(agent.Set(SetOptions{ 973 Key: []byte("testObserve"), 974 Value: []byte("there"), 975 CollectionName: suite.CollectionName, 976 ScopeName: suite.ScopeName, 977 }, func(res *StoreResult, err error) { 978 s.Continue() 979 })) 980 s.Wait(0) 981 982 s.PushOp(agent.Observe(ObserveOptions{ 983 Key: []byte("testObserve"), 984 ReplicaIdx: 1, 985 CollectionName: suite.CollectionName, 986 ScopeName: suite.ScopeName, 987 }, func(res *ObserveResult, err error) { 988 s.Wrap(func() { 989 if err != nil { 990 s.Fatalf("Observe operation failed: %v", err) 991 } 992 }) 993 })) 994 s.Wait(0) 995} 996 997func (suite *StandardTestSuite) TestObserveSeqNo() { 998 suite.EnsureSupportsFeature(TestFeatureReplicas) 999 1000 agent, s := suite.GetAgentAndHarness() 1001 1002 origMt := MutationToken{} 1003 s.PushOp(agent.Set(SetOptions{ 1004 Key: []byte("testObserve"), 1005 Value: []byte("there"), 1006 CollectionName: suite.CollectionName, 1007 ScopeName: suite.ScopeName, 1008 }, func(res *StoreResult, err error) { 1009 s.Wrap(func() { 1010 if err != nil { 1011 s.Fatalf("Initial set operation failed: %v", err) 1012 } 1013 1014 mt := res.MutationToken 1015 if mt.VbUUID == 0 && mt.SeqNo == 0 { 1016 s.Skipf("ObserveSeqNo not supported by server") 1017 } 1018 1019 origMt = mt 1020 }) 1021 })) 1022 s.Wait(0) 1023 1024 origCurSeqNo := SeqNo(0) 1025 vbID, err := agent.kvMux.KeyToVbucket([]byte("testObserve")) 1026 if err != nil { 1027 s.Fatalf("KeyToVbucket operation failed: %v", err) 1028 } 1029 1030 s.PushOp(agent.ObserveVb(ObserveVbOptions{ 1031 VbID: vbID, 1032 VbUUID: origMt.VbUUID, 1033 ReplicaIdx: 1, 1034 }, func(res *ObserveVbResult, err error) { 1035 s.Wrap(func() { 1036 if err != nil { 1037 s.Fatalf("ObserveSeqNo operation failed: %v", err) 1038 } 1039 1040 origCurSeqNo = res.CurrentSeqNo 1041 }) 1042 })) 1043 s.Wait(0) 1044 1045 newMt := MutationToken{} 1046 s.PushOp(agent.Set(SetOptions{ 1047 Key: []byte("testObserve"), 1048 Value: []byte("there"), 1049 CollectionName: suite.CollectionName, 1050 ScopeName: suite.ScopeName, 1051 }, func(res *StoreResult, err error) { 1052 s.Wrap(func() { 1053 if err != nil { 1054 s.Fatalf("Second set operation failed: %v", err) 1055 } 1056 1057 newMt = res.MutationToken 1058 }) 1059 })) 1060 s.Wait(0) 1061 1062 vbID, err = agent.kvMux.KeyToVbucket([]byte("testObserve")) 1063 if err != nil { 1064 s.Fatalf("KeyToVbucket operation failed: %v", err) 1065 } 1066 s.PushOp(agent.ObserveVb(ObserveVbOptions{ 1067 VbID: vbID, 1068 VbUUID: newMt.VbUUID, 1069 ReplicaIdx: 1, 1070 }, func(res *ObserveVbResult, err error) { 1071 s.Wrap(func() { 1072 if err != nil { 1073 s.Fatalf("ObserveSeqNo operation failed: %v", err) 1074 } 1075 if res.CurrentSeqNo < origCurSeqNo { 1076 s.Fatalf("SeqNo does not appear to be working") 1077 } 1078 }) 1079 })) 1080 s.Wait(0) 1081} 1082 1083func (suite *StandardTestSuite) TestRandomGet() { 1084 agent, s := suite.GetAgentAndHarness() 1085 1086 distkeys, err := MakeDistKeys(agent, time.Now().Add(2*time.Second)) 1087 suite.Require().Nil(err, err) 1088 for _, k := range distkeys { 1089 s.PushOp(agent.Set(SetOptions{ 1090 Key: []byte(k), 1091 Value: []byte("Hello World!"), 1092 CollectionName: suite.CollectionName, 1093 ScopeName: suite.ScopeName, 1094 }, func(res *StoreResult, err error) { 1095 s.Wrap(func() { 1096 if err != nil { 1097 s.Fatalf("Couldn't store some items: %v", err) 1098 } 1099 }) 1100 })) 1101 s.Wait(0) 1102 } 1103 1104 s.PushOp(agent.GetRandom(GetRandomOptions{ 1105 CollectionName: suite.CollectionName, 1106 ScopeName: suite.ScopeName, 1107 }, func(res *GetRandomResult, err error) { 1108 s.Wrap(func() { 1109 if err != nil { 1110 s.Fatalf("Get operation failed: %v", err) 1111 } 1112 if res.Cas == Cas(0) { 1113 s.Fatalf("Invalid cas received") 1114 } 1115 if len(res.Key) == 0 { 1116 s.Fatalf("Invalid key returned") 1117 } 1118 if len(res.Value) == 0 { 1119 s.Fatalf("No value returned") 1120 } 1121 }) 1122 })) 1123 s.Wait(0) 1124} 1125 1126func (suite *StandardTestSuite) TestStats() { 1127 agent, s := suite.GetAgentAndHarness() 1128 1129 snapshot, err := agent.ConfigSnapshot() 1130 if err != nil { 1131 suite.T().Fatalf("Failed to get config snapshot: %s", err) 1132 } 1133 numServers, err := snapshot.NumServers() 1134 if err != nil { 1135 suite.T().Fatalf("Failed to get num servers: %s", err) 1136 } 1137 1138 s.PushOp(agent.Stats(StatsOptions{ 1139 Key: "", 1140 }, func(res *StatsResult, err error) { 1141 s.Wrap(func() { 1142 if len(res.Servers) != numServers { 1143 s.Fatalf("Didn't Get all stats!") 1144 } 1145 for srv, curStats := range res.Servers { 1146 if curStats.Error != nil { 1147 s.Fatalf("Got error %v in stats for %s", curStats.Error, srv) 1148 } 1149 1150 if curStats.Stats == nil || len(curStats.Stats) == 0 { 1151 s.Fatalf("Got no stats in stats for %s", srv) 1152 } 1153 } 1154 }) 1155 })) 1156 s.Wait(0) 1157} 1158 1159func (suite *StandardTestSuite) TestGetHttpEps() { 1160 agent, _ := suite.GetAgentAndHarness() 1161 1162 // Relies on a 3.0.0+ server 1163 n1qlEpList := agent.N1qlEps() 1164 if len(n1qlEpList) == 0 { 1165 suite.T().Fatalf("Failed to retrieve N1QL endpoint list") 1166 } 1167 1168 mgmtEpList := agent.MgmtEps() 1169 if len(mgmtEpList) == 0 { 1170 suite.T().Fatalf("Failed to retrieve N1QL endpoint list") 1171 } 1172 1173 capiEpList := agent.CapiEps() 1174 if len(capiEpList) == 0 { 1175 suite.T().Fatalf("Failed to retrieve N1QL endpoint list") 1176 } 1177} 1178 1179func (suite *StandardTestSuite) TestMemcachedBucket() { 1180 suite.EnsureSupportsFeature(TestFeatureMemd) 1181 1182 s := suite.GetHarness() 1183 agent := suite.MemdAgent() 1184 1185 s.PushOp(agent.Set(SetOptions{ 1186 Key: []byte("key"), 1187 Value: []byte("value"), 1188 }, func(res *StoreResult, err error) { 1189 s.Wrap(func() { 1190 if err != nil { 1191 s.Fatalf("Got error for Set: %v", err) 1192 } 1193 }) 1194 })) 1195 s.Wait(0) 1196 1197 s.PushOp(agent.Get(GetOptions{ 1198 Key: []byte("key"), 1199 }, func(res *GetResult, err error) { 1200 s.Wrap(func() { 1201 if err != nil { 1202 s.Fatalf("Couldn't Get back key: %v", err) 1203 } 1204 if string(res.Value) != "value" { 1205 s.Fatalf("Got back wrong value!") 1206 } 1207 }) 1208 })) 1209 s.Wait(0) 1210 1211 // Try to perform Observe: should fail since this isn't supported on Memcached buckets 1212 _, err := agent.Observe(ObserveOptions{ 1213 Key: []byte("key"), 1214 }, func(res *ObserveResult, err error) { 1215 s.Wrap(func() { 1216 s.Fatalf("Scheduling should fail on memcached buckets!") 1217 }) 1218 }) 1219 1220 if !errors.Is(err, ErrFeatureNotAvailable) { 1221 suite.T().Fatalf("Expected observe error for memcached bucket!") 1222 } 1223} 1224 1225func (suite *StandardTestSuite) TestFlagsRoundTrip() { 1226 // Ensure flags are round-tripped with the server correctly. 1227 agent, s := suite.GetAgentAndHarness() 1228 1229 s.PushOp(agent.Set(SetOptions{ 1230 Key: []byte("flagskey"), 1231 Value: []byte("{}"), 1232 Flags: 0x99889988, 1233 CollectionName: suite.CollectionName, 1234 ScopeName: suite.ScopeName, 1235 }, func(res *StoreResult, err error) { 1236 s.Wrap(func() { 1237 if err != nil { 1238 s.Fatalf("Got error for Set: %v", err) 1239 } 1240 }) 1241 })) 1242 s.Wait(0) 1243 1244 s.PushOp(agent.Get(GetOptions{ 1245 Key: []byte("flagskey"), 1246 CollectionName: suite.CollectionName, 1247 ScopeName: suite.ScopeName, 1248 }, func(res *GetResult, err error) { 1249 s.Wrap(func() { 1250 if err != nil { 1251 s.Fatalf("Couldn't Get back key: %v", err) 1252 } 1253 if res.Flags != 0x99889988 { 1254 s.Fatalf("flags failed to round-trip") 1255 } 1256 }) 1257 })) 1258 s.Wait(0) 1259} 1260 1261func (suite *StandardTestSuite) TestMetaOps() { 1262 suite.EnsureSupportsFeature(TestFeatureGetMeta) 1263 1264 agent, s := suite.GetAgentAndHarness() 1265 1266 var currentCas Cas 1267 1268 // Set 1269 1270 s.PushOp(agent.Set(SetOptions{ 1271 Key: []byte("test"), 1272 Value: []byte("{}"), 1273 }, func(res *StoreResult, err error) { 1274 s.Wrap(func() { 1275 if err != nil { 1276 s.Fatalf("Set operation failed") 1277 } 1278 if res.Cas == Cas(0) { 1279 s.Fatalf("Invalid cas received") 1280 } 1281 1282 currentCas = res.Cas 1283 }) 1284 })) 1285 s.Wait(0) 1286 1287 // GetMeta 1288 s.PushOp(agent.GetMeta(GetMetaOptions{ 1289 Key: []byte("test"), 1290 }, func(res *GetMetaResult, err error) { 1291 s.Wrap(func() { 1292 if err != nil { 1293 s.Fatalf("GetMeta operation failed") 1294 } 1295 if res.Expiry != 0 { 1296 s.Fatalf("Invalid expiry received") 1297 } 1298 if res.Deleted != 0 { 1299 s.Fatalf("Invalid deleted flag received") 1300 } 1301 if res.Cas != currentCas { 1302 s.Fatalf("Invalid cas received") 1303 } 1304 }) 1305 })) 1306 s.Wait(0) 1307} 1308 1309func (suite *StandardTestSuite) TestPing() { 1310 agent, s := suite.GetAgentAndHarness() 1311 1312 s.PushOp(agent.Ping(PingOptions{}, func(res *PingResult, err error) { 1313 s.Wrap(func() { 1314 if len(res.Services) == 0 { 1315 s.Fatalf("Ping report contained no results") 1316 } 1317 }) 1318 })) 1319 s.Wait(5) 1320} 1321 1322func (suite *StandardTestSuite) TestDiagnostics() { 1323 agent, _ := suite.GetAgentAndHarness() 1324 1325 report, err := agent.Diagnostics(DiagnosticsOptions{}) 1326 if err != nil { 1327 suite.T().Fatalf("Failed to fetch diagnostics: %s", err) 1328 } 1329 1330 if len(report.MemdConns) == 0 { 1331 suite.T().Fatalf("Diagnostics report contained no results") 1332 } 1333 1334 for _, conn := range report.MemdConns { 1335 if conn.RemoteAddr == "" { 1336 suite.T().Fatalf("Diagnostic report contained invalid entry") 1337 } 1338 } 1339} 1340 1341type testAlternateAddressesRouteConfigMgr struct { 1342 cfg *routeConfig 1343 cfgCalled bool 1344} 1345 1346func (taa *testAlternateAddressesRouteConfigMgr) OnNewRouteConfig(cfg *routeConfig) { 1347 taa.cfgCalled = true 1348 taa.cfg = cfg 1349} 1350 1351func (suite *StandardTestSuite) TestAlternateAddressesEmptyStringConfig() { 1352 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1353 1354 mgr := &testAlternateAddressesRouteConfigMgr{} 1355 cfgManager := newConfigManager(configManagerProperties{ 1356 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1357 }) 1358 1359 cfgManager.AddConfigWatcher(mgr) 1360 cfgManager.OnNewConfig(cfgBk) 1361 1362 networkType := cfgManager.NetworkType() 1363 if networkType != "external" { 1364 suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType) 1365 } 1366 1367 for i, server := range mgr.cfg.kvServerList { 1368 cfgBkNode := cfgBk.NodesExt[i] 1369 port := cfgBkNode.AltAddresses["external"].Ports.Kv 1370 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port) 1371 if server != cfgBkServer { 1372 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1373 } 1374 } 1375} 1376 1377func (suite *StandardTestSuite) TestAlternateAddressesAutoConfig() { 1378 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1379 1380 mgr := &testAlternateAddressesRouteConfigMgr{} 1381 cfgManager := newConfigManager(configManagerProperties{ 1382 NetworkType: "auto", 1383 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1384 }) 1385 cfgManager.AddConfigWatcher(mgr) 1386 cfgManager.OnNewConfig(cfgBk) 1387 1388 networkType := cfgManager.NetworkType() 1389 if networkType != "external" { 1390 suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType) 1391 } 1392 1393 for i, server := range mgr.cfg.kvServerList { 1394 cfgBkNode := cfgBk.NodesExt[i] 1395 port := cfgBkNode.AltAddresses["external"].Ports.Kv 1396 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port) 1397 if server != cfgBkServer { 1398 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1399 } 1400 } 1401} 1402 1403func (suite *StandardTestSuite) TestAlternateAddressesAutoInternalConfig() { 1404 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1405 1406 mgr := &testAlternateAddressesRouteConfigMgr{} 1407 cfgManager := newConfigManager(configManagerProperties{ 1408 NetworkType: "auto", 1409 SrcMemdAddrs: []string{"172.17.0.4:11210"}, 1410 }) 1411 1412 cfgManager.AddConfigWatcher(mgr) 1413 cfgManager.OnNewConfig(cfgBk) 1414 1415 networkType := cfgManager.NetworkType() 1416 if networkType != "default" { 1417 suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType) 1418 } 1419 1420 for i, server := range mgr.cfg.kvServerList { 1421 cfgBkNode := cfgBk.NodesExt[i] 1422 port := cfgBkNode.Services.Kv 1423 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.Hostname, port) 1424 if server != cfgBkServer { 1425 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1426 } 1427 } 1428} 1429 1430func (suite *StandardTestSuite) TestAlternateAddressesDefaultConfig() { 1431 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1432 1433 mgr := &testAlternateAddressesRouteConfigMgr{} 1434 cfgManager := newConfigManager(configManagerProperties{ 1435 NetworkType: "default", 1436 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1437 }) 1438 cfgManager.AddConfigWatcher(mgr) 1439 cfgManager.OnNewConfig(cfgBk) 1440 1441 networkType := cfgManager.NetworkType() 1442 if networkType != "default" { 1443 suite.T().Fatalf("Expected agent networkType to be default, was %s", networkType) 1444 } 1445 1446 for i, server := range mgr.cfg.kvServerList { 1447 cfgBkNode := cfgBk.NodesExt[i] 1448 port := cfgBkNode.Services.Kv 1449 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.Hostname, port) 1450 if server != cfgBkServer { 1451 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1452 } 1453 } 1454} 1455 1456func (suite *StandardTestSuite) TestAlternateAddressesExternalConfig() { 1457 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1458 1459 mgr := &testAlternateAddressesRouteConfigMgr{} 1460 cfgManager := newConfigManager(configManagerProperties{ 1461 NetworkType: "external", 1462 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1463 }) 1464 cfgManager.AddConfigWatcher(mgr) 1465 cfgManager.OnNewConfig(cfgBk) 1466 1467 networkType := cfgManager.NetworkType() 1468 if networkType != "external" { 1469 suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType) 1470 } 1471 1472 for i, server := range mgr.cfg.kvServerList { 1473 cfgBkNode := cfgBk.NodesExt[i] 1474 port := cfgBkNode.AltAddresses["external"].Ports.Kv 1475 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port) 1476 if server != cfgBkServer { 1477 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1478 } 1479 } 1480} 1481 1482func (suite *StandardTestSuite) TestAlternateAddressesExternalConfigNoPorts() { 1483 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses_without_ports.json") 1484 1485 mgr := &testAlternateAddressesRouteConfigMgr{} 1486 cfgManager := newConfigManager(configManagerProperties{ 1487 NetworkType: "external", 1488 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1489 }) 1490 cfgManager.AddConfigWatcher(mgr) 1491 cfgManager.OnNewConfig(cfgBk) 1492 1493 networkType := cfgManager.NetworkType() 1494 if networkType != "external" { 1495 suite.T().Fatalf("Expected agent networkType to be external, was %s", networkType) 1496 } 1497 1498 for i, server := range mgr.cfg.kvServerList { 1499 cfgBkNode := cfgBk.NodesExt[i] 1500 port := cfgBkNode.Services.Kv 1501 cfgBkServer := fmt.Sprintf("%s:%d", cfgBkNode.AltAddresses["external"].Hostname, port) 1502 if server != cfgBkServer { 1503 suite.T().Fatalf("Expected kv server to be %s but was %s", cfgBkServer, server) 1504 } 1505 } 1506} 1507 1508func (suite *StandardTestSuite) TestAlternateAddressesInvalidConfig() { 1509 cfgBk := suite.LoadConfigFromFile("testdata/bucket_config_with_external_addresses.json") 1510 1511 mgr := &testAlternateAddressesRouteConfigMgr{} 1512 cfgManager := newConfigManager(configManagerProperties{ 1513 NetworkType: "invalid", 1514 SrcMemdAddrs: []string{"192.168.132.234:32799"}, 1515 }) 1516 1517 cfgManager.AddConfigWatcher(mgr) 1518 cfgManager.OnNewConfig(cfgBk) 1519 1520 networkType := cfgManager.NetworkType() 1521 if networkType != "invalid" { 1522 suite.T().Fatalf("Expected agent networkType to be invalid, was %s", networkType) 1523 } 1524 1525 if mgr.cfgCalled { 1526 suite.T().Fatalf("Expected route config to not be propagated, was propagated") 1527 } 1528} 1529 1530func (suite *StandardTestSuite) TestAgentWaitUntilReadyGCCCP() { 1531 suite.EnsureSupportsFeature(TestFeatureGCCCP) 1532 1533 cfg := suite.makeAgentConfig(globalTestConfig) 1534 agent, err := CreateAgent(&cfg) 1535 suite.Require().Nil(err, err) 1536 defer agent.Close() 1537 s := suite.GetHarness() 1538 1539 s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) { 1540 s.Wrap(func() { 1541 if err != nil { 1542 s.Fatalf("WaitUntilReady failed with error: %v", err) 1543 } 1544 }) 1545 })) 1546 s.Wait(6) 1547 1548 s.PushOp(agent.Ping(PingOptions{ 1549 ServiceTypes: []ServiceType{N1qlService}, 1550 N1QLDeadline: time.Now().Add(5 * time.Second), 1551 }, func(result *PingResult, err error) { 1552 s.Wrap(func() { 1553 if err != nil { 1554 s.Fatalf("Ping failed with error: %v", err) 1555 } 1556 }) 1557 })) 1558 s.Wait(0) 1559} 1560 1561func (suite *StandardTestSuite) VerifyConnectedToBucket(agent *Agent, s *TestSubHarness, test string) { 1562 s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) { 1563 s.Wrap(func() { 1564 if err != nil { 1565 s.Fatalf("WaitUntilReady failed with error: %v", err) 1566 } 1567 }) 1568 })) 1569 s.Wait(6) 1570 1571 s.PushOp(agent.Set(SetOptions{ 1572 Key: []byte(test), 1573 Value: []byte("{}"), 1574 CollectionName: suite.CollectionName, 1575 ScopeName: suite.ScopeName, 1576 }, func(res *StoreResult, err error) { 1577 s.Wrap(func() { 1578 if err != nil { 1579 s.Fatalf("Got error for Set: %v", err) 1580 } 1581 }) 1582 })) 1583 s.Wait(0) 1584} 1585 1586func (suite *StandardTestSuite) TestAgentWaitUntilReadyBucket() { 1587 cfg := suite.makeAgentConfig(globalTestConfig) 1588 cfg.BucketName = globalTestConfig.BucketName 1589 agent, err := CreateAgent(&cfg) 1590 suite.Require().Nil(err, err) 1591 defer agent.Close() 1592 s := suite.GetHarness() 1593 1594 suite.VerifyConnectedToBucket(agent, s, "TestAgentWaitUntilReadyBucket") 1595} 1596 1597func (suite *StandardTestSuite) TestAgentGroupWaitUntilReadyGCCCP() { 1598 suite.EnsureSupportsFeature(TestFeatureGCCCP) 1599 1600 cfg := suite.makeAgentGroupConfig(globalTestConfig) 1601 ag, err := CreateAgentGroup(&cfg) 1602 suite.Require().Nil(err, err) 1603 defer ag.Close() 1604 s := suite.GetHarness() 1605 1606 s.PushOp(ag.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{}, func(result *WaitUntilReadyResult, err error) { 1607 s.Wrap(func() { 1608 if err != nil { 1609 s.Fatalf("WaitUntilReady failed with error: %v", err) 1610 } 1611 }) 1612 })) 1613 s.Wait(6) 1614 1615 s.PushOp(ag.Ping(PingOptions{ 1616 ServiceTypes: []ServiceType{N1qlService}, 1617 N1QLDeadline: time.Now().Add(5 * time.Second), 1618 }, func(result *PingResult, err error) { 1619 s.Wrap(func() { 1620 if err != nil { 1621 s.Fatalf("Ping failed with error: %v", err) 1622 } 1623 }) 1624 })) 1625 s.Wait(0) 1626} 1627 1628// This test cannot run against mock as the mock does not respond with 200 status code for all of the endpoints. 1629func (suite *StandardTestSuite) TestAgentGroupWaitUntilReadyBucket() { 1630 suite.EnsureSupportsFeature(TestFeaturePingServices) 1631 1632 cfg := suite.makeAgentGroupConfig(globalTestConfig) 1633 ag, err := CreateAgentGroup(&cfg) 1634 suite.Require().Nil(err, err) 1635 defer ag.Close() 1636 s := suite.GetHarness() 1637 1638 err = ag.OpenBucket(globalTestConfig.BucketName) 1639 suite.Require().Nil(err, err) 1640 1641 agent := ag.GetAgent("default") 1642 suite.Require().NotNil(agent) 1643 1644 suite.VerifyConnectedToBucket(agent, s, "TestAgentGroupWaitUntilReadyBucket") 1645} 1646 1647func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPort() { 1648 cfg := suite.makeAgentConfig(globalTestConfig) 1649 if len(cfg.HTTPAddrs) == 0 { 1650 suite.T().Skip("Skipping test due to no HTTP addresses") 1651 } 1652 1653 addr1 := cfg.HTTPAddrs[0] 1654 port := strings.Split(addr1, ":")[1] 1655 if port != "8091" { 1656 suite.T().Skipf("Skipping test due to non default port %s", port) 1657 } 1658 1659 cfg.HTTPAddrs = []string{addr1} 1660 cfg.MemdAddrs = []string{} 1661 cfg.BucketName = globalTestConfig.BucketName 1662 agent, err := CreateAgent(&cfg) 1663 suite.Require().Nil(err, err) 1664 defer agent.Close() 1665 s := suite.GetHarness() 1666 1667 suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyDefaultPort") 1668} 1669 1670func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPortSSL() { 1671 suite.EnsureSupportsFeature(TestFeatureSsl) 1672 1673 cfg := suite.makeAgentConfig(globalTestConfig) 1674 if len(cfg.HTTPAddrs) == 0 { 1675 suite.T().Skip("Skipping test due to no HTTP addresses") 1676 } 1677 1678 addr1 := cfg.HTTPAddrs[0] 1679 parts := strings.Split(addr1, ":") 1680 if parts[1] != "8091" { 1681 suite.T().Skipf("Skipping test due to non default port %s", parts[1]) 1682 } 1683 1684 cfg.HTTPAddrs = []string{parts[0] + ":" + "18091"} 1685 cfg.MemdAddrs = []string{} 1686 cfg.UseTLS = true 1687 // SkipVerify 1688 cfg.TLSRootCAProvider = func() *x509.CertPool { 1689 return nil 1690 } 1691 cfg.BucketName = globalTestConfig.BucketName 1692 agent, err := CreateAgent(&cfg) 1693 suite.Require().Nil(err, err) 1694 defer agent.Close() 1695 s := suite.GetHarness() 1696 1697 suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyDefaultPortSSL") 1698} 1699 1700func (suite *StandardTestSuite) TestConnectHTTPOnlyDefaultPortFastFailInvalidBucket() { 1701 cfg := suite.makeAgentConfig(globalTestConfig) 1702 if len(cfg.HTTPAddrs) == 0 { 1703 suite.T().Skip("Skipping test due to no HTTP addresses") 1704 } 1705 1706 addr1 := cfg.HTTPAddrs[0] 1707 port := strings.Split(addr1, ":")[1] 1708 if port != "8091" { 1709 suite.T().Skipf("Skipping test due to non default port %s", port) 1710 } 1711 1712 cfg.HTTPAddrs = []string{addr1} 1713 cfg.MemdAddrs = []string{} 1714 cfg.BucketName = "idontexist" 1715 agent, err := CreateAgent(&cfg) 1716 suite.Require().Nil(err, err) 1717 defer agent.Close() 1718 s := suite.GetHarness() 1719 1720 start := time.Now() 1721 s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{ 1722 RetryStrategy: newFailFastRetryStrategy(), 1723 }, func(result *WaitUntilReadyResult, err error) { 1724 s.Wrap(func() { 1725 if err == nil { 1726 s.Fatalf("WaitUntilReady failed without error") 1727 } 1728 if !errors.Is(err, ErrAuthenticationFailure) { 1729 s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err) 1730 } 1731 if time.Since(start) > 5*time.Second { 1732 s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start)) 1733 } 1734 }) 1735 })) 1736 s.Wait(6) 1737} 1738 1739func (suite *StandardTestSuite) TestConnectHTTPOnlyNonDefaultPort() { 1740 cfg := suite.makeAgentConfig(globalTestConfig) 1741 if len(cfg.HTTPAddrs) == 0 { 1742 suite.T().Skip("Skipping test due to no HTTP addresses") 1743 } 1744 1745 addr1 := cfg.HTTPAddrs[0] 1746 port := strings.Split(addr1, ":")[1] 1747 if port == "8091" { 1748 suite.T().Skipf("Skipping test due to default port %s", port) 1749 } 1750 1751 cfg.HTTPAddrs = []string{addr1} 1752 cfg.MemdAddrs = []string{} 1753 cfg.BucketName = globalTestConfig.BucketName 1754 agent, err := CreateAgent(&cfg) 1755 suite.Require().Nil(err, err) 1756 defer agent.Close() 1757 s := suite.GetHarness() 1758 1759 suite.VerifyConnectedToBucket(agent, s, "TestConnectHTTPOnlyNonDefaultPort") 1760} 1761 1762func (suite *StandardTestSuite) TestConnectHTTPOnlyNonDefaultPortFastFailInvalidBucket() { 1763 cfg := suite.makeAgentConfig(globalTestConfig) 1764 if len(cfg.HTTPAddrs) == 0 { 1765 suite.T().Skip("Skipping test due to no HTTP addresses") 1766 } 1767 1768 addr1 := cfg.HTTPAddrs[0] 1769 port := strings.Split(addr1, ":")[1] 1770 if port == "8091" { 1771 suite.T().Skipf("Skipping test due to default port %s", port) 1772 } 1773 1774 cfg.HTTPAddrs = []string{addr1} 1775 cfg.MemdAddrs = []string{} 1776 cfg.BucketName = "idontexist" 1777 agent, err := CreateAgent(&cfg) 1778 suite.Require().Nil(err, err) 1779 defer agent.Close() 1780 s := suite.GetHarness() 1781 1782 start := time.Now() 1783 s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{ 1784 RetryStrategy: newFailFastRetryStrategy(), 1785 }, func(result *WaitUntilReadyResult, err error) { 1786 s.Wrap(func() { 1787 if err == nil { 1788 s.Fatalf("WaitUntilReady failed without error") 1789 } 1790 if !errors.Is(err, ErrAuthenticationFailure) { 1791 s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err) 1792 } 1793 if time.Since(start) > 5*time.Second { 1794 s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start)) 1795 } 1796 }) 1797 })) 1798 s.Wait(6) 1799} 1800 1801func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPort() { 1802 cfg := suite.makeAgentConfig(globalTestConfig) 1803 if len(cfg.MemdAddrs) == 0 { 1804 suite.T().Skip("Skipping test due to no Memd addresses") 1805 } 1806 1807 addr1 := cfg.MemdAddrs[0] 1808 port := strings.Split(addr1, ":")[1] 1809 if port != "11210" { 1810 suite.T().Skipf("Skipping test due to non default port %s", port) 1811 } 1812 1813 cfg.HTTPAddrs = []string{} 1814 cfg.MemdAddrs = []string{addr1} 1815 cfg.BucketName = globalTestConfig.BucketName 1816 agent, err := CreateAgent(&cfg) 1817 suite.Require().Nil(err, err) 1818 defer agent.Close() 1819 s := suite.GetHarness() 1820 1821 suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyDefaultPort") 1822} 1823 1824func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPortSSL() { 1825 suite.EnsureSupportsFeature(TestFeatureSsl) 1826 1827 cfg := suite.makeAgentConfig(globalTestConfig) 1828 if len(cfg.MemdAddrs) == 0 { 1829 suite.T().Skip("Skipping test due to no memd addresses") 1830 } 1831 1832 addr1 := cfg.MemdAddrs[0] 1833 parts := strings.Split(addr1, ":") 1834 if parts[1] != "11210" { 1835 suite.T().Skipf("Skipping test due to non default port %s", parts[1]) 1836 } 1837 1838 cfg.HTTPAddrs = []string{} 1839 cfg.MemdAddrs = []string{parts[0] + ":11207"} 1840 cfg.UseTLS = true 1841 // SkipVerify 1842 cfg.TLSRootCAProvider = func() *x509.CertPool { 1843 return nil 1844 } 1845 cfg.BucketName = globalTestConfig.BucketName 1846 agent, err := CreateAgent(&cfg) 1847 suite.Require().Nil(err, err) 1848 defer agent.Close() 1849 s := suite.GetHarness() 1850 1851 suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyDefaultPortSSL") 1852} 1853 1854func (suite *StandardTestSuite) TestConnectMemdOnlyNonDefaultPort() { 1855 cfg := suite.makeAgentConfig(globalTestConfig) 1856 if len(cfg.MemdAddrs) == 0 { 1857 suite.T().Skip("Skipping test due to no memd addresses") 1858 } 1859 1860 addr1 := cfg.MemdAddrs[0] 1861 port := strings.Split(addr1, ":")[1] 1862 if port == "8091" { 1863 suite.T().Skipf("Skipping test due to default port %s", port) 1864 } 1865 1866 cfg.HTTPAddrs = []string{} 1867 cfg.MemdAddrs = []string{addr1} 1868 cfg.BucketName = globalTestConfig.BucketName 1869 agent, err := CreateAgent(&cfg) 1870 suite.Require().Nil(err, err) 1871 defer agent.Close() 1872 s := suite.GetHarness() 1873 1874 suite.VerifyConnectedToBucket(agent, s, "TestConnectMemdOnlyNonDefaultPort") 1875} 1876 1877func (suite *StandardTestSuite) TestConnectMemdOnlyDefaultPortFastFailInvalidBucket() { 1878 cfg := suite.makeAgentConfig(globalTestConfig) 1879 if len(cfg.MemdAddrs) == 0 { 1880 suite.T().Skip("Skipping test due to no memd addresses") 1881 } 1882 1883 addr1 := cfg.MemdAddrs[0] 1884 port := strings.Split(addr1, ":")[1] 1885 if port != "11210" { 1886 suite.T().Skipf("Skipping test due to non default port %s", port) 1887 } 1888 1889 cfg.HTTPAddrs = []string{} 1890 cfg.MemdAddrs = []string{addr1} 1891 cfg.BucketName = "idontexist" 1892 agent, err := CreateAgent(&cfg) 1893 suite.Require().Nil(err, err) 1894 defer agent.Close() 1895 s := suite.GetHarness() 1896 1897 start := time.Now() 1898 s.PushOp(agent.WaitUntilReady(time.Now().Add(5*time.Second), WaitUntilReadyOptions{ 1899 RetryStrategy: newFailFastRetryStrategy(), 1900 }, func(result *WaitUntilReadyResult, err error) { 1901 s.Wrap(func() { 1902 if err == nil { 1903 s.Fatalf("WaitUntilReady failed without error") 1904 } 1905 if !errors.Is(err, ErrAuthenticationFailure) { 1906 s.Fatalf("WaitUntilReady should have failed with auth error but was %v", err) 1907 } 1908 if time.Since(start) > 5*time.Second { 1909 s.Fatalf("WaitUntilReady should have failed before the timeout duration, was %s", time.Since(start)) 1910 } 1911 }) 1912 })) 1913 s.Wait(6) 1914} 1915 1916// These functions are likely temporary. 1917 1918type testManifestWithError struct { 1919 Manifest Manifest 1920 Err error 1921} 1922 1923func testCreateScope(name, bucketName string, agent *Agent) (*Manifest, error) { 1924 data := url.Values{} 1925 data.Set("name", name) 1926 1927 req := &HTTPRequest{ 1928 Service: MgmtService, 1929 Path: fmt.Sprintf("/pools/default/buckets/%s/collections", bucketName), 1930 Method: "POST", 1931 Body: []byte(data.Encode()), 1932 Headers: make(map[string]string), 1933 Deadline: time.Now().Add(10 * time.Second), 1934 } 1935 1936 req.Headers["Content-Type"] = "application/x-www-form-urlencoded" 1937 1938 resCh := make(chan *HTTPResponse) 1939 errCh := make(chan error) 1940 _, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) { 1941 if err != nil { 1942 errCh <- err 1943 return 1944 } 1945 resCh <- response 1946 }) 1947 if err != nil { 1948 return nil, err 1949 } 1950 1951 var resp *HTTPResponse 1952 select { 1953 case respErr := <-errCh: 1954 if respErr != nil { 1955 return nil, respErr 1956 } 1957 case res := <-resCh: 1958 resp = res 1959 } 1960 1961 if resp.StatusCode >= 300 { 1962 data, err := ioutil.ReadAll(resp.Body) 1963 if err != nil { 1964 return nil, fmt.Errorf("could not create scope, status code: %d", resp.StatusCode) 1965 } 1966 err = resp.Body.Close() 1967 if err != nil { 1968 logDebugf("Failed to close response body") 1969 } 1970 return nil, fmt.Errorf("could not create scope, %s", string(data)) 1971 } 1972 1973 respBody := struct { 1974 UID string `json:"uid"` 1975 }{} 1976 jsonDec := json.NewDecoder(resp.Body) 1977 err = jsonDec.Decode(&respBody) 1978 if err != nil { 1979 return nil, err 1980 } 1981 err = resp.Body.Close() 1982 if err != nil { 1983 return nil, err 1984 } 1985 1986 uid, err := strconv.ParseInt(respBody.UID, 16, 64) 1987 if err != nil { 1988 return nil, err 1989 } 1990 1991 timer := time.NewTimer(20 * time.Second) 1992 waitCh := make(chan testManifestWithError, 1) 1993 go waitForManifest(agent, uint64(uid), waitCh) 1994 1995 for { 1996 select { 1997 case <-timer.C: 1998 return nil, errors.New("wait time for scope to become available expired") 1999 case manifest := <-waitCh: 2000 if manifest.Err != nil { 2001 return nil, manifest.Err 2002 } 2003 2004 return &manifest.Manifest, nil 2005 } 2006 } 2007} 2008 2009func testDeleteScope(name, bucketName string, agent *Agent, waitForDeletion bool) (*Manifest, error) { 2010 data := url.Values{} 2011 data.Set("name", name) 2012 2013 req := &HTTPRequest{ 2014 Service: MgmtService, 2015 Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s", bucketName, name), 2016 Method: "DELETE", 2017 Headers: make(map[string]string), 2018 Deadline: time.Now().Add(10 * time.Second), 2019 } 2020 2021 resCh := make(chan *HTTPResponse) 2022 errCh := make(chan error) 2023 _, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) { 2024 if err != nil { 2025 errCh <- err 2026 return 2027 } 2028 resCh <- response 2029 }) 2030 if err != nil { 2031 return nil, err 2032 } 2033 2034 var resp *HTTPResponse 2035 select { 2036 case respErr := <-errCh: 2037 if respErr != nil { 2038 return nil, respErr 2039 } 2040 case res := <-resCh: 2041 resp = res 2042 } 2043 2044 if err != nil { 2045 return nil, err 2046 } 2047 if resp.StatusCode >= 300 { 2048 data, err := ioutil.ReadAll(resp.Body) 2049 if err != nil { 2050 return nil, fmt.Errorf("could not delete scope, status code: %d", resp.StatusCode) 2051 } 2052 err = resp.Body.Close() 2053 if err != nil { 2054 logDebugf("Failed to close response body") 2055 } 2056 return nil, fmt.Errorf("could not delete scope, %s", string(data)) 2057 } 2058 2059 respBody := struct { 2060 UID string `json:"uid"` 2061 }{} 2062 jsonDec := json.NewDecoder(resp.Body) 2063 err = jsonDec.Decode(&respBody) 2064 if err != nil { 2065 return nil, err 2066 } 2067 err = resp.Body.Close() 2068 if err != nil { 2069 return nil, err 2070 } 2071 2072 uid, err := strconv.ParseInt(respBody.UID, 16, 64) 2073 if err != nil { 2074 return nil, err 2075 } 2076 2077 timer := time.NewTimer(20 * time.Second) 2078 waitCh := make(chan testManifestWithError, 1) 2079 go waitForManifest(agent, uint64(uid), waitCh) 2080 2081 for { 2082 select { 2083 case <-timer.C: 2084 return nil, errors.New("wait time for scope to become deleted expired") 2085 case manifest := <-waitCh: 2086 if manifest.Err != nil { 2087 return nil, manifest.Err 2088 } 2089 2090 return &manifest.Manifest, nil 2091 } 2092 } 2093 2094} 2095 2096func testCreateCollection(name, scopeName, bucketName string, agent *Agent) (*Manifest, error) { 2097 if scopeName == "" { 2098 scopeName = "_default" 2099 } 2100 if name == "" { 2101 name = "_default" 2102 } 2103 2104 data := url.Values{} 2105 data.Set("name", name) 2106 2107 req := &HTTPRequest{ 2108 Service: MgmtService, 2109 Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s/", bucketName, scopeName), 2110 Method: "POST", 2111 Body: []byte(data.Encode()), 2112 Headers: make(map[string]string), 2113 Deadline: time.Now().Add(10 * time.Second), 2114 } 2115 2116 req.Headers["Content-Type"] = "application/x-www-form-urlencoded" 2117 2118 resCh := make(chan *HTTPResponse) 2119 errCh := make(chan error) 2120 _, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) { 2121 if err != nil { 2122 errCh <- err 2123 return 2124 } 2125 resCh <- response 2126 }) 2127 if err != nil { 2128 return nil, err 2129 } 2130 2131 var resp *HTTPResponse 2132 select { 2133 case respErr := <-errCh: 2134 if respErr != nil { 2135 return nil, respErr 2136 } 2137 case res := <-resCh: 2138 resp = res 2139 } 2140 2141 if resp.StatusCode >= 300 { 2142 data, err := ioutil.ReadAll(resp.Body) 2143 if err != nil { 2144 return nil, fmt.Errorf("could not create collection, status code: %d", resp.StatusCode) 2145 } 2146 err = resp.Body.Close() 2147 if err != nil { 2148 logDebugf("Failed to close response body") 2149 } 2150 return nil, fmt.Errorf("could not create collection, %s", string(data)) 2151 } 2152 2153 respBody := struct { 2154 UID string `json:"uid"` 2155 }{} 2156 jsonDec := json.NewDecoder(resp.Body) 2157 err = jsonDec.Decode(&respBody) 2158 if err != nil { 2159 return nil, err 2160 } 2161 err = resp.Body.Close() 2162 if err != nil { 2163 return nil, err 2164 } 2165 2166 uid, err := strconv.ParseInt(respBody.UID, 16, 64) 2167 if err != nil { 2168 return nil, err 2169 } 2170 2171 timer := time.NewTimer(20 * time.Second) 2172 waitCh := make(chan testManifestWithError, 1) 2173 go waitForManifest(agent, uint64(uid), waitCh) 2174 2175 for { 2176 select { 2177 case <-timer.C: 2178 return nil, errors.New("wait time for collection to become available expired") 2179 case manifest := <-waitCh: 2180 if manifest.Err != nil { 2181 return nil, manifest.Err 2182 } 2183 2184 return &manifest.Manifest, nil 2185 } 2186 } 2187} 2188 2189func testDeleteCollection(name, scopeName, bucketName string, agent *Agent, waitForDeletion bool) (*Manifest, error) { 2190 if scopeName == "" { 2191 scopeName = "_default" 2192 } 2193 if name == "" { 2194 name = "_default" 2195 } 2196 2197 data := url.Values{} 2198 data.Set("name", name) 2199 2200 req := &HTTPRequest{ 2201 Service: MgmtService, 2202 Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s/%s", bucketName, scopeName, name), 2203 Method: "DELETE", 2204 Headers: make(map[string]string), 2205 Deadline: time.Now().Add(10 * time.Second), 2206 } 2207 2208 resCh := make(chan *HTTPResponse) 2209 errCh := make(chan error) 2210 _, err := agent.DoHTTPRequest(req, func(response *HTTPResponse, err error) { 2211 if err != nil { 2212 errCh <- err 2213 return 2214 } 2215 resCh <- response 2216 }) 2217 if err != nil { 2218 return nil, err 2219 } 2220 2221 var resp *HTTPResponse 2222 select { 2223 case respErr := <-errCh: 2224 if respErr != nil { 2225 return nil, respErr 2226 } 2227 case res := <-resCh: 2228 resp = res 2229 } 2230 2231 if err != nil { 2232 return nil, err 2233 } 2234 if resp.StatusCode >= 300 { 2235 data, err := ioutil.ReadAll(resp.Body) 2236 if err != nil { 2237 return nil, fmt.Errorf("could not delete collection, status code: %d", resp.StatusCode) 2238 } 2239 err = resp.Body.Close() 2240 if err != nil { 2241 logDebugf("Failed to close response body") 2242 } 2243 return nil, fmt.Errorf("could not delete collection, %s", string(data)) 2244 } 2245 2246 respBody := struct { 2247 UID string `json:"uid"` 2248 }{} 2249 jsonDec := json.NewDecoder(resp.Body) 2250 err = jsonDec.Decode(&respBody) 2251 if err != nil { 2252 return nil, err 2253 } 2254 err = resp.Body.Close() 2255 if err != nil { 2256 return nil, err 2257 } 2258 2259 uid, err := strconv.ParseInt(respBody.UID, 16, 64) 2260 if err != nil { 2261 return nil, err 2262 } 2263 2264 timer := time.NewTimer(20 * time.Second) 2265 waitCh := make(chan testManifestWithError, 1) 2266 go waitForManifest(agent, uint64(uid), waitCh) 2267 2268 for { 2269 select { 2270 case <-timer.C: 2271 return nil, errors.New("wait time for collection to become deleted expired") 2272 case manifest := <-waitCh: 2273 if manifest.Err != nil { 2274 return nil, manifest.Err 2275 } 2276 2277 return &manifest.Manifest, nil 2278 } 2279 } 2280 2281} 2282 2283func waitForManifest(agent *Agent, manifestID uint64, manifestCh chan testManifestWithError) { 2284 var manifest Manifest 2285 for manifest.UID != manifestID { 2286 setCh := make(chan struct{}) 2287 agent.GetCollectionManifest(GetCollectionManifestOptions{}, func(result *GetCollectionManifestResult, err error) { 2288 if err != nil { 2289 log.Println(err.Error()) 2290 close(setCh) 2291 manifestCh <- testManifestWithError{Err: err} 2292 return 2293 } 2294 2295 err = json.Unmarshal(result.Manifest, &manifest) 2296 if err != nil { 2297 log.Println(err.Error()) 2298 close(setCh) 2299 manifestCh <- testManifestWithError{Err: err} 2300 return 2301 } 2302 2303 if manifest.UID == manifestID { 2304 close(setCh) 2305 manifestCh <- testManifestWithError{Manifest: manifest} 2306 return 2307 } 2308 setCh <- struct{}{} 2309 }) 2310 <-setCh 2311 time.Sleep(500 * time.Millisecond) 2312 } 2313} 2314