1package agent 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "crypto/x509" 8 "encoding/base64" 9 "encoding/json" 10 "fmt" 11 "io/ioutil" 12 "math/rand" 13 "net" 14 "net/http" 15 "net/http/httptest" 16 "net/url" 17 "os" 18 "path/filepath" 19 "strconv" 20 "strings" 21 "testing" 22 "time" 23 24 "github.com/golang/protobuf/jsonpb" 25 "github.com/google/tcpproxy" 26 "github.com/hashicorp/go-hclog" 27 "github.com/hashicorp/serf/coordinate" 28 "github.com/hashicorp/serf/serf" 29 "github.com/stretchr/testify/assert" 30 "github.com/stretchr/testify/require" 31 "golang.org/x/sync/errgroup" 32 "google.golang.org/grpc" 33 "gopkg.in/square/go-jose.v2/jwt" 34 35 "github.com/hashicorp/consul/agent/cache" 36 cachetype "github.com/hashicorp/consul/agent/cache-types" 37 "github.com/hashicorp/consul/agent/checks" 38 "github.com/hashicorp/consul/agent/config" 39 "github.com/hashicorp/consul/agent/connect" 40 "github.com/hashicorp/consul/agent/consul" 41 "github.com/hashicorp/consul/agent/structs" 42 "github.com/hashicorp/consul/agent/token" 43 "github.com/hashicorp/consul/api" 44 "github.com/hashicorp/consul/internal/go-sso/oidcauth/oidcauthtest" 45 "github.com/hashicorp/consul/ipaddr" 46 "github.com/hashicorp/consul/lib" 47 "github.com/hashicorp/consul/proto/pbautoconf" 48 "github.com/hashicorp/consul/sdk/freeport" 49 "github.com/hashicorp/consul/sdk/testutil" 50 "github.com/hashicorp/consul/sdk/testutil/retry" 51 "github.com/hashicorp/consul/testrpc" 52 "github.com/hashicorp/consul/tlsutil" 53 "github.com/hashicorp/consul/types" 54) 55 56func getService(a *TestAgent, id string) *structs.NodeService { 57 return a.State.Service(structs.NewServiceID(id, nil)) 58} 59 60func getCheck(a *TestAgent, id types.CheckID) *structs.HealthCheck { 61 return a.State.Check(structs.NewCheckID(id, nil)) 62} 63 64func requireServiceExists(t *testing.T, a *TestAgent, id string) *structs.NodeService { 65 t.Helper() 66 svc := getService(a, id) 67 require.NotNil(t, svc, "missing service %q", id) 68 return svc 69} 70 71func requireServiceMissing(t *testing.T, a *TestAgent, id string) { 72 t.Helper() 73 require.Nil(t, getService(a, id), "have service %q (expected missing)", id) 74} 75 76func requireCheckExists(t *testing.T, a *TestAgent, id types.CheckID) *structs.HealthCheck { 77 t.Helper() 78 chk := getCheck(a, id) 79 require.NotNil(t, chk, "missing check %q", id) 80 return chk 81} 82 83func requireCheckMissing(t *testing.T, a *TestAgent, id types.CheckID) { 84 t.Helper() 85 require.Nil(t, getCheck(a, id), "have check %q (expected missing)", id) 86} 87 88func requireCheckExistsMap(t *testing.T, m interface{}, id types.CheckID) { 89 t.Helper() 90 require.Contains(t, m, structs.NewCheckID(id, nil), "missing check %q", id) 91} 92 93func requireCheckMissingMap(t *testing.T, m interface{}, id types.CheckID) { 94 t.Helper() 95 require.NotContains(t, m, structs.NewCheckID(id, nil), "have check %q (expected missing)", id) 96} 97 98func TestAgent_MultiStartStop(t *testing.T) { 99 if testing.Short() { 100 t.Skip("too slow for testing.Short") 101 } 102 103 for i := 0; i < 10; i++ { 104 t.Run("", func(t *testing.T) { 105 t.Parallel() 106 a := NewTestAgent(t, "") 107 time.Sleep(250 * time.Millisecond) 108 a.Shutdown() 109 }) 110 } 111} 112 113func TestAgent_ConnectClusterIDConfig(t *testing.T) { 114 if testing.Short() { 115 t.Skip("too slow for testing.Short") 116 } 117 118 tests := []struct { 119 name string 120 hcl string 121 wantClusterID string 122 wantErr bool 123 }{ 124 { 125 name: "default TestAgent has fixed cluster id", 126 hcl: "", 127 wantClusterID: connect.TestClusterID, 128 }, 129 { 130 name: "no cluster ID specified sets to test ID", 131 hcl: "connect { enabled = true }", 132 wantClusterID: connect.TestClusterID, 133 }, 134 { 135 name: "non-UUID cluster_id is fatal", 136 hcl: `connect { 137 enabled = true 138 ca_config { 139 cluster_id = "fake-id" 140 } 141 }`, 142 wantClusterID: "", 143 wantErr: true, 144 }, 145 } 146 147 for _, tt := range tests { 148 t.Run(tt.name, func(t *testing.T) { 149 a := TestAgent{HCL: tt.hcl} 150 err := a.Start(t) 151 if tt.wantErr { 152 if err == nil { 153 t.Fatal("expected error, got nil") 154 } 155 return // don't run the rest of the test 156 } 157 if !tt.wantErr && err != nil { 158 t.Fatal(err) 159 } 160 defer a.Shutdown() 161 162 cfg := a.consulConfig() 163 assert.Equal(t, tt.wantClusterID, cfg.CAConfig.ClusterID) 164 }) 165 } 166} 167 168func TestAgent_StartStop(t *testing.T) { 169 if testing.Short() { 170 t.Skip("too slow for testing.Short") 171 } 172 173 t.Parallel() 174 a := NewTestAgent(t, "") 175 defer a.Shutdown() 176 177 if err := a.Leave(); err != nil { 178 t.Fatalf("err: %v", err) 179 } 180 if err := a.Shutdown(); err != nil { 181 t.Fatalf("err: %v", err) 182 } 183 184 select { 185 case <-a.ShutdownCh(): 186 default: 187 t.Fatalf("should be closed") 188 } 189} 190 191func TestAgent_RPCPing(t *testing.T) { 192 if testing.Short() { 193 t.Skip("too slow for testing.Short") 194 } 195 196 t.Parallel() 197 a := NewTestAgent(t, "") 198 defer a.Shutdown() 199 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 200 201 var out struct{} 202 if err := a.RPC("Status.Ping", struct{}{}, &out); err != nil { 203 t.Fatalf("err: %v", err) 204 } 205} 206 207func TestAgent_TokenStore(t *testing.T) { 208 if testing.Short() { 209 t.Skip("too slow for testing.Short") 210 } 211 212 t.Parallel() 213 214 a := NewTestAgent(t, ` 215 acl_token = "user" 216 acl_agent_token = "agent" 217 acl_agent_master_token = "master"`, 218 ) 219 defer a.Shutdown() 220 221 if got, want := a.tokens.UserToken(), "user"; got != want { 222 t.Fatalf("got %q want %q", got, want) 223 } 224 if got, want := a.tokens.AgentToken(), "agent"; got != want { 225 t.Fatalf("got %q want %q", got, want) 226 } 227 if got, want := a.tokens.IsAgentMasterToken("master"), true; got != want { 228 t.Fatalf("got %v want %v", got, want) 229 } 230} 231 232func TestAgent_ReconnectConfigSettings(t *testing.T) { 233 if testing.Short() { 234 t.Skip("too slow for testing.Short") 235 } 236 237 t.Parallel() 238 func() { 239 a := NewTestAgent(t, "") 240 defer a.Shutdown() 241 242 lan := a.consulConfig().SerfLANConfig.ReconnectTimeout 243 if lan != 3*24*time.Hour { 244 t.Fatalf("bad: %s", lan.String()) 245 } 246 247 wan := a.consulConfig().SerfWANConfig.ReconnectTimeout 248 if wan != 3*24*time.Hour { 249 t.Fatalf("bad: %s", wan.String()) 250 } 251 }() 252 253 func() { 254 a := NewTestAgent(t, ` 255 reconnect_timeout = "24h" 256 reconnect_timeout_wan = "36h" 257 `) 258 defer a.Shutdown() 259 260 lan := a.consulConfig().SerfLANConfig.ReconnectTimeout 261 if lan != 24*time.Hour { 262 t.Fatalf("bad: %s", lan.String()) 263 } 264 265 wan := a.consulConfig().SerfWANConfig.ReconnectTimeout 266 if wan != 36*time.Hour { 267 t.Fatalf("bad: %s", wan.String()) 268 } 269 }() 270} 271 272func TestAgent_HTTPMaxHeaderBytes(t *testing.T) { 273 tests := []struct { 274 name string 275 maxHeaderBytes int 276 expectedHTTPResponse int 277 }{ 278 { 279 "max header bytes 1 returns 431 http response when too large headers are sent", 280 1, 281 431, 282 }, 283 { 284 "max header bytes 0 returns 200 http response, as the http.DefaultMaxHeaderBytes size of 1MB is used", 285 0, 286 200, 287 }, 288 { 289 "negative maxHeaderBytes returns 200 http response, as the http.DefaultMaxHeaderBytes size of 1MB is used", 290 -10, 291 200, 292 }, 293 } 294 for _, tt := range tests { 295 t.Run(tt.name, func(t *testing.T) { 296 ports, err := freeport.Take(1) 297 require.NoError(t, err) 298 t.Cleanup(func() { freeport.Return(ports) }) 299 300 caConfig := tlsutil.Config{} 301 tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil)) 302 require.NoError(t, err) 303 304 bd := BaseDeps{ 305 Deps: consul.Deps{ 306 Logger: hclog.NewInterceptLogger(nil), 307 Tokens: new(token.Store), 308 TLSConfigurator: tlsConf, 309 GRPCConnPool: &fakeGRPCConnPool{}, 310 }, 311 RuntimeConfig: &config.RuntimeConfig{ 312 HTTPAddrs: []net.Addr{ 313 &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: ports[0]}, 314 }, 315 HTTPMaxHeaderBytes: tt.maxHeaderBytes, 316 }, 317 Cache: cache.New(cache.Options{}), 318 } 319 bd, err = initEnterpriseBaseDeps(bd, nil) 320 require.NoError(t, err) 321 322 a, err := New(bd) 323 require.NoError(t, err) 324 325 a.startLicenseManager(testutil.TestContext(t)) 326 327 srvs, err := a.listenHTTP() 328 require.NoError(t, err) 329 330 require.Equal(t, tt.maxHeaderBytes, a.config.HTTPMaxHeaderBytes) 331 332 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 333 t.Cleanup(cancel) 334 335 g := new(errgroup.Group) 336 for _, s := range srvs { 337 g.Go(s.Run) 338 } 339 340 require.Len(t, srvs, 1) 341 342 client := &http.Client{} 343 for _, s := range srvs { 344 u := url.URL{Scheme: s.Protocol, Host: s.Addr.String()} 345 req, err := http.NewRequest(http.MethodGet, u.String(), nil) 346 require.NoError(t, err) 347 348 // This is directly pulled from the testing of request limits in the net/http source 349 // https://github.com/golang/go/blob/go1.15.3/src/net/http/serve_test.go#L2897-L2900 350 var bytesPerHeader = len("header12345: val12345\r\n") 351 for i := 0; i < ((tt.maxHeaderBytes+4096)/bytesPerHeader)+1; i++ { 352 req.Header.Set(fmt.Sprintf("header%05d", i), fmt.Sprintf("val%05d", i)) 353 } 354 355 resp, err := client.Do(req.WithContext(ctx)) 356 require.NoError(t, err) 357 require.Equal(t, tt.expectedHTTPResponse, resp.StatusCode, "expected a '%d' http response, got '%d'", tt.expectedHTTPResponse, resp.StatusCode) 358 } 359 }) 360 } 361} 362 363type fakeGRPCConnPool struct{} 364 365func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) { 366 return nil, nil 367} 368 369func (f fakeGRPCConnPool) SetGatewayResolver(_ func(string) string) { 370} 371 372func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { 373 if testing.Short() { 374 t.Skip("too slow for testing.Short") 375 } 376 377 t.Parallel() 378 379 a := NewTestAgent(t, ` 380 ports { serf_wan = -1 } 381 reconnect_timeout_wan = "36h" 382 `) 383 defer a.Shutdown() 384 385 // This is also testing that we dont panic like before #4515 386 require.Nil(t, a.consulConfig().SerfWANConfig) 387} 388 389func TestAgent_AddService(t *testing.T) { 390 if testing.Short() { 391 t.Skip("too slow for testing.Short") 392 } 393 394 t.Run("normal", func(t *testing.T) { 395 t.Parallel() 396 testAgent_AddService(t, "enable_central_service_config = false") 397 }) 398 t.Run("service manager", func(t *testing.T) { 399 t.Parallel() 400 testAgent_AddService(t, "enable_central_service_config = true") 401 }) 402} 403 404func testAgent_AddService(t *testing.T, extraHCL string) { 405 t.Helper() 406 407 a := NewTestAgent(t, ` 408 node_name = "node1" 409 `+extraHCL) 410 defer a.Shutdown() 411 412 tests := []struct { 413 desc string 414 srv *structs.NodeService 415 wantSrv func(ns *structs.NodeService) 416 chkTypes []*structs.CheckType 417 healthChks map[string]*structs.HealthCheck 418 }{ 419 { 420 "one check", 421 &structs.NodeService{ 422 ID: "svcid1", 423 Service: "svcname1", 424 Tags: []string{"tag1"}, 425 Weights: nil, // nil weights... 426 Port: 8100, 427 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 428 }, 429 // ... should be populated to avoid "IsSame" returning true during AE. 430 func(ns *structs.NodeService) { 431 ns.Weights = &structs.Weights{ 432 Passing: 1, 433 Warning: 1, 434 } 435 }, 436 []*structs.CheckType{ 437 { 438 CheckID: "check1", 439 Name: "name1", 440 TTL: time.Minute, 441 Notes: "note1", 442 }, 443 }, 444 map[string]*structs.HealthCheck{ 445 "check1": { 446 Node: "node1", 447 CheckID: "check1", 448 Name: "name1", 449 Status: "critical", 450 Notes: "note1", 451 ServiceID: "svcid1", 452 ServiceName: "svcname1", 453 ServiceTags: []string{"tag1"}, 454 Type: "ttl", 455 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 456 }, 457 }, 458 }, 459 { 460 "multiple checks", 461 &structs.NodeService{ 462 ID: "svcid2", 463 Service: "svcname2", 464 Weights: &structs.Weights{ 465 Passing: 2, 466 Warning: 1, 467 }, 468 Tags: []string{"tag2"}, 469 Port: 8200, 470 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 471 }, 472 nil, // No change expected 473 []*structs.CheckType{ 474 { 475 CheckID: "check1", 476 Name: "name1", 477 TTL: time.Minute, 478 Notes: "note1", 479 }, 480 { 481 CheckID: "check-noname", 482 TTL: time.Minute, 483 }, 484 { 485 Name: "check-noid", 486 TTL: time.Minute, 487 }, 488 { 489 TTL: time.Minute, 490 }, 491 }, 492 map[string]*structs.HealthCheck{ 493 "check1": { 494 Node: "node1", 495 CheckID: "check1", 496 Name: "name1", 497 Status: "critical", 498 Notes: "note1", 499 ServiceID: "svcid2", 500 ServiceName: "svcname2", 501 ServiceTags: []string{"tag2"}, 502 Type: "ttl", 503 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 504 }, 505 "check-noname": { 506 Node: "node1", 507 CheckID: "check-noname", 508 Name: "Service 'svcname2' check", 509 Status: "critical", 510 ServiceID: "svcid2", 511 ServiceName: "svcname2", 512 ServiceTags: []string{"tag2"}, 513 Type: "ttl", 514 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 515 }, 516 "service:svcid2:3": { 517 Node: "node1", 518 CheckID: "service:svcid2:3", 519 Name: "check-noid", 520 Status: "critical", 521 ServiceID: "svcid2", 522 ServiceName: "svcname2", 523 ServiceTags: []string{"tag2"}, 524 Type: "ttl", 525 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 526 }, 527 "service:svcid2:4": { 528 Node: "node1", 529 CheckID: "service:svcid2:4", 530 Name: "Service 'svcname2' check", 531 Status: "critical", 532 ServiceID: "svcid2", 533 ServiceName: "svcname2", 534 ServiceTags: []string{"tag2"}, 535 Type: "ttl", 536 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 537 }, 538 }, 539 }, 540 } 541 542 for _, tt := range tests { 543 t.Run(tt.desc, func(t *testing.T) { 544 // check the service registration 545 t.Run(tt.srv.ID, func(t *testing.T) { 546 err := a.addServiceFromSource(tt.srv, tt.chkTypes, false, "", ConfigSourceLocal) 547 if err != nil { 548 t.Fatalf("err: %v", err) 549 } 550 551 got := getService(a, tt.srv.ID) 552 // Make a copy since the tt.srv points to the one in memory in the local 553 // state still so changing it is a tautology! 554 want := *tt.srv 555 if tt.wantSrv != nil { 556 tt.wantSrv(&want) 557 } 558 require.Equal(t, &want, got) 559 require.True(t, got.IsSame(&want)) 560 }) 561 562 // check the health checks 563 for k, v := range tt.healthChks { 564 t.Run(k, func(t *testing.T) { 565 got := getCheck(a, types.CheckID(k)) 566 require.Equal(t, v, got) 567 }) 568 } 569 570 // check the ttl checks 571 for k := range tt.healthChks { 572 t.Run(k+" ttl", func(t *testing.T) { 573 chk := a.checkTTLs[structs.NewCheckID(types.CheckID(k), nil)] 574 if chk == nil { 575 t.Fatal("got nil want TTL check") 576 } 577 if got, want := string(chk.CheckID.ID), k; got != want { 578 t.Fatalf("got CheckID %v want %v", got, want) 579 } 580 if got, want := chk.TTL, time.Minute; got != want { 581 t.Fatalf("got TTL %v want %v", got, want) 582 } 583 }) 584 } 585 }) 586 } 587} 588 589// addServiceFromSource is a test helper that exists to maintain an old function 590// signature that was used in many tests. 591// Deprecated: use AddService 592func (a *Agent) addServiceFromSource(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { 593 return a.AddService(AddServiceRequest{ 594 Service: service, 595 chkTypes: chkTypes, 596 persist: persist, 597 token: token, 598 replaceExistingChecks: false, 599 Source: source, 600 }) 601} 602 603func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) { 604 if testing.Short() { 605 t.Skip("too slow for testing.Short") 606 } 607 608 t.Run("normal", func(t *testing.T) { 609 t.Parallel() 610 testAgent_AddServices_AliasUpdateCheckNotReverted(t, "enable_central_service_config = false") 611 }) 612 t.Run("service manager", func(t *testing.T) { 613 t.Parallel() 614 testAgent_AddServices_AliasUpdateCheckNotReverted(t, "enable_central_service_config = true") 615 }) 616} 617 618func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL string) { 619 t.Helper() 620 621 a := NewTestAgent(t, ` 622 node_name = "node1" 623 `+extraHCL) 624 defer a.Shutdown() 625 626 // It's tricky to get an UpdateCheck call to be timed properly so it lands 627 // right in the middle of an addServiceInternal call so we cheat a bit and 628 // rely upon alias checks to do that work for us. We add enough services 629 // that probabilistically one of them is going to end up properly in the 630 // critical section. 631 // 632 // The first number I picked here (10) surprisingly failed every time prior 633 // to PR #6144 solving the underlying problem. 634 const numServices = 10 635 636 services := make([]*structs.ServiceDefinition, numServices) 637 checkIDs := make([]types.CheckID, numServices) 638 services[0] = &structs.ServiceDefinition{ 639 ID: "fake", 640 Name: "fake", 641 Port: 8080, 642 Checks: []*structs.CheckType{}, 643 } 644 for i := 1; i < numServices; i++ { 645 name := fmt.Sprintf("web-%d", i) 646 647 services[i] = &structs.ServiceDefinition{ 648 ID: name, 649 Name: name, 650 Port: 8080 + i, 651 Checks: []*structs.CheckType{ 652 { 653 Name: "alias-for-fake-service", 654 AliasService: "fake", 655 }, 656 }, 657 } 658 659 checkIDs[i] = types.CheckID("service:" + name) 660 } 661 662 // Add all of the services quickly as you might do from config file snippets. 663 for _, service := range services { 664 ns := service.NodeService() 665 666 chkTypes, err := service.CheckTypes() 667 require.NoError(t, err) 668 669 require.NoError(t, a.addServiceFromSource(ns, chkTypes, false, service.Token, ConfigSourceLocal)) 670 } 671 672 retry.Run(t, func(r *retry.R) { 673 gotChecks := a.State.Checks(nil) 674 for id, check := range gotChecks { 675 require.Equal(r, "passing", check.Status, "check %q is wrong", id) 676 require.Equal(r, "No checks found.", check.Output, "check %q is wrong", id) 677 } 678 }) 679} 680 681func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, expectedResult string) func(r *retry.R) { 682 t.Helper() 683 serviceNum := rand.Int() 684 srv := &structs.NodeService{ 685 Service: fmt.Sprintf("serviceAlias-%d", serviceNum), 686 Tags: []string{"tag1"}, 687 Port: 8900 + serviceNum, 688 } 689 if srv.ID == "" { 690 srv.ID = fmt.Sprintf("serviceAlias-%d", serviceNum) 691 } 692 chk.Status = api.HealthWarning 693 if chk.CheckID == "" { 694 chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum)) 695 } 696 err := agent.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal) 697 assert.NoError(t, err) 698 return func(r *retry.R) { 699 t.Helper() 700 found := false 701 for _, c := range agent.State.CheckStates(structs.WildcardEnterpriseMeta()) { 702 if c.Check.CheckID == chk.CheckID { 703 found = true 704 assert.Equal(t, expectedResult, c.Check.Status, "Check state should be %s, was %s in %#v", expectedResult, c.Check.Status, c.Check) 705 srvID := structs.NewServiceID(srv.ID, structs.WildcardEnterpriseMeta()) 706 if err := agent.Agent.State.RemoveService(srvID); err != nil { 707 fmt.Println("[DEBUG] Fail to remove service", srvID, ", err:=", err) 708 } 709 fmt.Println("[DEBUG] Service Removed", srvID, ", err:=", err) 710 break 711 } 712 } 713 assert.True(t, found) 714 } 715} 716 717// TestAgent_CheckAliasRPC test the Alias Check to be properly sync remotely 718// and locally. 719// It contains a few hacks such as unlockIndexOnNode because watch performed 720// in CheckAlias.runQuery() waits for 1 min, so Shutdoww the agent might take time 721// So, we ensure the agent will update regularilly the index 722func TestAgent_CheckAliasRPC(t *testing.T) { 723 if testing.Short() { 724 t.Skip("too slow for testing.Short") 725 } 726 727 t.Helper() 728 729 a := NewTestAgent(t, ` 730 node_name = "node1" 731 `) 732 733 srv := &structs.NodeService{ 734 ID: "svcid1", 735 Service: "svcname1", 736 Tags: []string{"tag1"}, 737 Port: 8100, 738 } 739 unlockIndexOnNode := func() { 740 // We ensure to not block and update Agent's index 741 srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())} 742 assert.NoError(t, a.waitForUp()) 743 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) 744 assert.NoError(t, err) 745 } 746 shutdownAgent := func() { 747 // This is to be sure Alias Checks on remote won't be blocked during 1 min 748 unlockIndexOnNode() 749 fmt.Println("[DEBUG] STARTING shutdown for TestAgent_CheckAliasRPC", time.Now()) 750 go a.Shutdown() 751 unlockIndexOnNode() 752 fmt.Println("[DEBUG] DONE shutdown for TestAgent_CheckAliasRPC", time.Now()) 753 } 754 defer shutdownAgent() 755 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 756 757 assert.NoError(t, a.waitForUp()) 758 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) 759 assert.NoError(t, err) 760 761 retry.Run(t, func(r *retry.R) { 762 t.Helper() 763 var args structs.NodeSpecificRequest 764 args.Datacenter = "dc1" 765 args.Node = "node1" 766 args.AllowStale = true 767 var out structs.IndexedNodeServices 768 err := a.RPC("Catalog.NodeServices", &args, &out) 769 assert.NoError(r, err) 770 foundService := false 771 lookup := structs.NewServiceID("svcid1", structs.WildcardEnterpriseMeta()) 772 for _, srv := range out.NodeServices.Services { 773 if lookup.Matches(srv.CompoundServiceID()) { 774 foundService = true 775 } 776 } 777 assert.True(r, foundService, "could not find svcid1 in %#v", out.NodeServices.Services) 778 }) 779 780 checks := make([](func(*retry.R)), 0) 781 782 checks = append(checks, test_createAlias(t, a, &structs.CheckType{ 783 Name: "Check_Local_Ok", 784 AliasService: "svcid1", 785 }, api.HealthPassing)) 786 787 checks = append(checks, test_createAlias(t, a, &structs.CheckType{ 788 Name: "Check_Local_Fail", 789 AliasService: "svcidNoExistingID", 790 }, api.HealthCritical)) 791 792 checks = append(checks, test_createAlias(t, a, &structs.CheckType{ 793 Name: "Check_Remote_Host_Ok", 794 AliasNode: "node1", 795 AliasService: "svcid1", 796 }, api.HealthPassing)) 797 798 checks = append(checks, test_createAlias(t, a, &structs.CheckType{ 799 Name: "Check_Remote_Host_Non_Existing_Service", 800 AliasNode: "node1", 801 AliasService: "svcidNoExistingID", 802 }, api.HealthCritical)) 803 804 // We wait for max 5s for all checks to be in sync 805 { 806 for i := 0; i < 50; i++ { 807 unlockIndexOnNode() 808 allNonWarning := true 809 for _, chk := range a.State.Checks(structs.WildcardEnterpriseMeta()) { 810 if chk.Status == api.HealthWarning { 811 allNonWarning = false 812 } 813 } 814 if allNonWarning { 815 break 816 } else { 817 time.Sleep(100 * time.Millisecond) 818 } 819 } 820 } 821 822 for _, toRun := range checks { 823 unlockIndexOnNode() 824 retry.Run(t, toRun) 825 } 826} 827 828func TestAgent_AddServiceWithH2PINGCheck(t *testing.T) { 829 t.Parallel() 830 a := NewTestAgent(t, "") 831 defer a.Shutdown() 832 check := []*structs.CheckType{ 833 { 834 CheckID: "test-h2ping-check", 835 Name: "test-h2ping-check", 836 H2PING: "localhost:12345", 837 TLSSkipVerify: true, 838 Interval: 10 * time.Second, 839 }, 840 } 841 842 nodeService := &structs.NodeService{ 843 ID: "test-h2ping-check-service", 844 Service: "test-h2ping-check-service", 845 } 846 err := a.addServiceFromSource(nodeService, check, false, "", ConfigSourceLocal) 847 if err != nil { 848 t.Fatalf("Error registering service: %v", err) 849 } 850 requireCheckExists(t, a, "test-h2ping-check") 851} 852 853func TestAgent_AddServiceNoExec(t *testing.T) { 854 if testing.Short() { 855 t.Skip("too slow for testing.Short") 856 } 857 858 t.Run("normal", func(t *testing.T) { 859 t.Parallel() 860 testAgent_AddServiceNoExec(t, "enable_central_service_config = false") 861 }) 862 t.Run("service manager", func(t *testing.T) { 863 t.Parallel() 864 testAgent_AddServiceNoExec(t, "enable_central_service_config = true") 865 }) 866} 867 868func testAgent_AddServiceNoExec(t *testing.T, extraHCL string) { 869 t.Helper() 870 871 a := NewTestAgent(t, ` 872 node_name = "node1" 873 `+extraHCL) 874 defer a.Shutdown() 875 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 876 877 srv := &structs.NodeService{ 878 ID: "svcid1", 879 Service: "svcname1", 880 Tags: []string{"tag1"}, 881 Port: 8100, 882 } 883 chk := &structs.CheckType{ 884 ScriptArgs: []string{"exit", "0"}, 885 Interval: 15 * time.Second, 886 } 887 888 err := a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal) 889 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") { 890 t.Fatalf("err: %v", err) 891 } 892 893 err = a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote) 894 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") { 895 t.Fatalf("err: %v", err) 896 } 897} 898 899func TestAgent_AddServiceNoRemoteExec(t *testing.T) { 900 if testing.Short() { 901 t.Skip("too slow for testing.Short") 902 } 903 904 t.Run("normal", func(t *testing.T) { 905 t.Parallel() 906 testAgent_AddServiceNoRemoteExec(t, "enable_central_service_config = false") 907 }) 908 t.Run("service manager", func(t *testing.T) { 909 t.Parallel() 910 testAgent_AddServiceNoRemoteExec(t, "enable_central_service_config = true") 911 }) 912} 913 914func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) { 915 t.Helper() 916 917 a := NewTestAgent(t, ` 918 node_name = "node1" 919 enable_local_script_checks = true 920 `+extraHCL) 921 defer a.Shutdown() 922 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 923 924 srv := &structs.NodeService{ 925 ID: "svcid1", 926 Service: "svcname1", 927 Tags: []string{"tag1"}, 928 Port: 8100, 929 } 930 chk := &structs.CheckType{ 931 ScriptArgs: []string{"exit", "0"}, 932 Interval: 15 * time.Second, 933 } 934 935 err := a.addServiceFromSource(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote) 936 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") { 937 t.Fatalf("err: %v", err) 938 } 939} 940 941func TestAddServiceIPv4TaggedDefault(t *testing.T) { 942 if testing.Short() { 943 t.Skip("too slow for testing.Short") 944 } 945 946 t.Helper() 947 948 a := NewTestAgent(t, "") 949 defer a.Shutdown() 950 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 951 952 srv := &structs.NodeService{ 953 Service: "my_service", 954 ID: "my_service_id", 955 Port: 8100, 956 Address: "10.0.1.2", 957 } 958 959 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) 960 require.Nil(t, err) 961 962 ns := a.State.Service(structs.NewServiceID("my_service_id", nil)) 963 require.NotNil(t, ns) 964 965 svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port} 966 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv4]) 967 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressWANIPv4]) 968 _, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv6] 969 require.False(t, ok) 970 _, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv6] 971 require.False(t, ok) 972} 973 974func TestAddServiceIPv6TaggedDefault(t *testing.T) { 975 if testing.Short() { 976 t.Skip("too slow for testing.Short") 977 } 978 979 t.Helper() 980 981 a := NewTestAgent(t, "") 982 defer a.Shutdown() 983 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 984 985 srv := &structs.NodeService{ 986 Service: "my_service", 987 ID: "my_service_id", 988 Port: 8100, 989 Address: "::5", 990 } 991 992 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) 993 require.Nil(t, err) 994 995 ns := a.State.Service(structs.NewServiceID("my_service_id", nil)) 996 require.NotNil(t, ns) 997 998 svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port} 999 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv6]) 1000 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressWANIPv6]) 1001 _, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv4] 1002 require.False(t, ok) 1003 _, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv4] 1004 require.False(t, ok) 1005} 1006 1007func TestAddServiceIPv4TaggedSet(t *testing.T) { 1008 if testing.Short() { 1009 t.Skip("too slow for testing.Short") 1010 } 1011 1012 t.Helper() 1013 1014 a := NewTestAgent(t, "") 1015 defer a.Shutdown() 1016 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 1017 1018 srv := &structs.NodeService{ 1019 Service: "my_service", 1020 ID: "my_service_id", 1021 Port: 8100, 1022 Address: "10.0.1.2", 1023 TaggedAddresses: map[string]structs.ServiceAddress{ 1024 structs.TaggedAddressWANIPv4: { 1025 Address: "10.100.200.5", 1026 Port: 8100, 1027 }, 1028 }, 1029 } 1030 1031 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) 1032 require.Nil(t, err) 1033 1034 ns := a.State.Service(structs.NewServiceID("my_service_id", nil)) 1035 require.NotNil(t, ns) 1036 1037 svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port} 1038 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv4]) 1039 require.Equal(t, structs.ServiceAddress{Address: "10.100.200.5", Port: 8100}, ns.TaggedAddresses[structs.TaggedAddressWANIPv4]) 1040 _, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv6] 1041 require.False(t, ok) 1042 _, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv6] 1043 require.False(t, ok) 1044} 1045 1046func TestAddServiceIPv6TaggedSet(t *testing.T) { 1047 if testing.Short() { 1048 t.Skip("too slow for testing.Short") 1049 } 1050 1051 t.Helper() 1052 1053 a := NewTestAgent(t, "") 1054 defer a.Shutdown() 1055 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 1056 1057 srv := &structs.NodeService{ 1058 Service: "my_service", 1059 ID: "my_service_id", 1060 Port: 8100, 1061 Address: "::5", 1062 TaggedAddresses: map[string]structs.ServiceAddress{ 1063 structs.TaggedAddressWANIPv6: { 1064 Address: "::6", 1065 Port: 8100, 1066 }, 1067 }, 1068 } 1069 1070 err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) 1071 require.Nil(t, err) 1072 1073 ns := a.State.Service(structs.NewServiceID("my_service_id", nil)) 1074 require.NotNil(t, ns) 1075 1076 svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port} 1077 require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv6]) 1078 require.Equal(t, structs.ServiceAddress{Address: "::6", Port: 8100}, ns.TaggedAddresses[structs.TaggedAddressWANIPv6]) 1079 _, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv4] 1080 require.False(t, ok) 1081 _, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv4] 1082 require.False(t, ok) 1083} 1084 1085func TestAgent_RemoveService(t *testing.T) { 1086 if testing.Short() { 1087 t.Skip("too slow for testing.Short") 1088 } 1089 1090 t.Run("normal", func(t *testing.T) { 1091 t.Parallel() 1092 testAgent_RemoveService(t, "enable_central_service_config = false") 1093 }) 1094 t.Run("service manager", func(t *testing.T) { 1095 t.Parallel() 1096 testAgent_RemoveService(t, "enable_central_service_config = true") 1097 }) 1098} 1099 1100func testAgent_RemoveService(t *testing.T, extraHCL string) { 1101 t.Helper() 1102 1103 a := NewTestAgent(t, extraHCL) 1104 defer a.Shutdown() 1105 1106 // Remove a service that doesn't exist 1107 if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil { 1108 t.Fatalf("err: %v", err) 1109 } 1110 1111 // Remove without an ID 1112 if err := a.RemoveService(structs.NewServiceID("", nil)); err == nil { 1113 t.Fatalf("should have errored") 1114 } 1115 1116 // Removing a service with a single check works 1117 { 1118 srv := &structs.NodeService{ 1119 ID: "memcache", 1120 Service: "memcache", 1121 Port: 8000, 1122 } 1123 chkTypes := []*structs.CheckType{{TTL: time.Minute}} 1124 1125 if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil { 1126 t.Fatalf("err: %v", err) 1127 } 1128 1129 // Add a check after the fact with a specific check ID 1130 check := &structs.CheckDefinition{ 1131 ID: "check2", 1132 Name: "check2", 1133 ServiceID: "memcache", 1134 TTL: time.Minute, 1135 } 1136 hc := check.HealthCheck("node1") 1137 if err := a.AddCheck(hc, check.CheckType(), false, "", ConfigSourceLocal); err != nil { 1138 t.Fatalf("err: %s", err) 1139 } 1140 1141 if err := a.RemoveService(structs.NewServiceID("memcache", nil)); err != nil { 1142 t.Fatalf("err: %s", err) 1143 } 1144 require.Nil(t, a.State.Check(structs.NewCheckID("service:memcache", nil)), "have memcache check") 1145 require.Nil(t, a.State.Check(structs.NewCheckID("check2", nil)), "have check2 check") 1146 } 1147 1148 // Removing a service with multiple checks works 1149 { 1150 // add a service to remove 1151 srv := &structs.NodeService{ 1152 ID: "redis", 1153 Service: "redis", 1154 Port: 8000, 1155 } 1156 chkTypes := []*structs.CheckType{ 1157 {TTL: time.Minute}, 1158 {TTL: 30 * time.Second}, 1159 } 1160 if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil { 1161 t.Fatalf("err: %v", err) 1162 } 1163 1164 // add another service that wont be affected 1165 srv = &structs.NodeService{ 1166 ID: "mysql", 1167 Service: "mysql", 1168 Port: 3306, 1169 } 1170 chkTypes = []*structs.CheckType{ 1171 {TTL: time.Minute}, 1172 {TTL: 30 * time.Second}, 1173 } 1174 if err := a.addServiceFromSource(srv, chkTypes, false, "", ConfigSourceLocal); err != nil { 1175 t.Fatalf("err: %v", err) 1176 } 1177 1178 // Remove the service 1179 if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil { 1180 t.Fatalf("err: %v", err) 1181 } 1182 1183 // Ensure we have a state mapping 1184 requireServiceMissing(t, a, "redis") 1185 1186 // Ensure checks were removed 1187 requireCheckMissing(t, a, "service:redis:1") 1188 requireCheckMissing(t, a, "service:redis:2") 1189 requireCheckMissingMap(t, a.checkTTLs, "service:redis:1") 1190 requireCheckMissingMap(t, a.checkTTLs, "service:redis:2") 1191 1192 // check the mysql service is unnafected 1193 requireCheckExistsMap(t, a.checkTTLs, "service:mysql:1") 1194 requireCheckExists(t, a, "service:mysql:1") 1195 requireCheckExistsMap(t, a.checkTTLs, "service:mysql:2") 1196 requireCheckExists(t, a, "service:mysql:2") 1197 } 1198} 1199 1200func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { 1201 if testing.Short() { 1202 t.Skip("too slow for testing.Short") 1203 } 1204 1205 t.Run("normal", func(t *testing.T) { 1206 t.Parallel() 1207 testAgent_RemoveServiceRemovesAllChecks(t, "enable_central_service_config = false") 1208 }) 1209 t.Run("service manager", func(t *testing.T) { 1210 t.Parallel() 1211 testAgent_RemoveServiceRemovesAllChecks(t, "enable_central_service_config = true") 1212 }) 1213} 1214 1215func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) { 1216 t.Helper() 1217 1218 a := NewTestAgent(t, ` 1219 node_name = "node1" 1220 `+extraHCL) 1221 defer a.Shutdown() 1222 svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000, EnterpriseMeta: *structs.DefaultEnterpriseMeta()} 1223 chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute} 1224 chk2 := &structs.CheckType{CheckID: "chk2", Name: "chk2", TTL: 2 * time.Minute} 1225 hchk1 := &structs.HealthCheck{ 1226 Node: "node1", 1227 CheckID: "chk1", 1228 Name: "chk1", 1229 Status: "critical", 1230 ServiceID: "redis", 1231 ServiceName: "redis", 1232 Type: "ttl", 1233 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 1234 } 1235 hchk2 := &structs.HealthCheck{Node: "node1", 1236 CheckID: "chk2", 1237 Name: "chk2", 1238 Status: "critical", 1239 ServiceID: "redis", 1240 ServiceName: "redis", 1241 Type: "ttl", 1242 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 1243 } 1244 1245 // register service with chk1 1246 if err := a.addServiceFromSource(svc, []*structs.CheckType{chk1}, false, "", ConfigSourceLocal); err != nil { 1247 t.Fatal("Failed to register service", err) 1248 } 1249 1250 // verify chk1 exists 1251 requireCheckExists(t, a, "chk1") 1252 1253 // update the service with chk2 1254 if err := a.addServiceFromSource(svc, []*structs.CheckType{chk2}, false, "", ConfigSourceLocal); err != nil { 1255 t.Fatal("Failed to update service", err) 1256 } 1257 1258 // check that both checks are there 1259 require.Equal(t, hchk1, getCheck(a, "chk1")) 1260 require.Equal(t, hchk2, getCheck(a, "chk2")) 1261 1262 // Remove service 1263 if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil { 1264 t.Fatal("Failed to remove service", err) 1265 } 1266 1267 // Check that both checks are gone 1268 requireCheckMissing(t, a, "chk1") 1269 requireCheckMissing(t, a, "chk2") 1270} 1271 1272// TestAgent_IndexChurn is designed to detect a class of issues where 1273// we would have unnecessary catalog churn from anti-entropy. See issues 1274// #3259, #3642, #3845, and #3866. 1275func TestAgent_IndexChurn(t *testing.T) { 1276 if testing.Short() { 1277 t.Skip("too slow for testing.Short") 1278 } 1279 1280 t.Parallel() 1281 1282 t.Run("no tags", func(t *testing.T) { 1283 verifyIndexChurn(t, nil) 1284 }) 1285 1286 t.Run("with tags", func(t *testing.T) { 1287 verifyIndexChurn(t, []string{"foo", "bar"}) 1288 }) 1289} 1290 1291// verifyIndexChurn registers some things and runs anti-entropy a bunch of times 1292// in a row to make sure there are no index bumps. 1293func verifyIndexChurn(t *testing.T, tags []string) { 1294 t.Helper() 1295 a := NewTestAgent(t, "") 1296 defer a.Shutdown() 1297 1298 weights := &structs.Weights{ 1299 Passing: 1, 1300 Warning: 1, 1301 } 1302 // Ensure we have a leader before we start adding the services 1303 testrpc.WaitForLeader(t, a.RPC, "dc1") 1304 1305 svc := &structs.NodeService{ 1306 ID: "redis", 1307 Service: "redis", 1308 Port: 8000, 1309 Tags: tags, 1310 Weights: weights, 1311 } 1312 if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil { 1313 t.Fatalf("err: %v", err) 1314 } 1315 1316 chk := &structs.HealthCheck{ 1317 CheckID: "redis-check", 1318 Name: "Service-level check", 1319 ServiceID: "redis", 1320 Status: api.HealthCritical, 1321 } 1322 chkt := &structs.CheckType{ 1323 TTL: time.Hour, 1324 } 1325 if err := a.AddCheck(chk, chkt, true, "", ConfigSourceLocal); err != nil { 1326 t.Fatalf("err: %v", err) 1327 } 1328 1329 chk = &structs.HealthCheck{ 1330 CheckID: "node-check", 1331 Name: "Node-level check", 1332 Status: api.HealthCritical, 1333 } 1334 chkt = &structs.CheckType{ 1335 TTL: time.Hour, 1336 } 1337 if err := a.AddCheck(chk, chkt, true, "", ConfigSourceLocal); err != nil { 1338 t.Fatalf("err: %v", err) 1339 } 1340 1341 if err := a.sync.State.SyncFull(); err != nil { 1342 t.Fatalf("err: %v", err) 1343 } 1344 1345 args := &structs.ServiceSpecificRequest{ 1346 Datacenter: "dc1", 1347 ServiceName: "redis", 1348 } 1349 var before structs.IndexedCheckServiceNodes 1350 1351 // This sleep is so that the serfHealth check is added to the agent 1352 // A value of 375ms is sufficient enough time to ensure the serfHealth 1353 // check is added to an agent. 500ms so that we don't see flakiness ever. 1354 time.Sleep(500 * time.Millisecond) 1355 1356 if err := a.RPC("Health.ServiceNodes", args, &before); err != nil { 1357 t.Fatalf("err: %v", err) 1358 } 1359 for _, name := range before.Nodes[0].Checks { 1360 a.logger.Debug("Registered node", "node", name.Name) 1361 } 1362 if got, want := len(before.Nodes), 1; got != want { 1363 t.Fatalf("got %d want %d", got, want) 1364 } 1365 if got, want := len(before.Nodes[0].Checks), 3; /* incl. serfHealth */ got != want { 1366 t.Fatalf("got %d want %d", got, want) 1367 } 1368 1369 for i := 0; i < 10; i++ { 1370 a.logger.Info("Sync in progress", "iteration", i+1) 1371 if err := a.sync.State.SyncFull(); err != nil { 1372 t.Fatalf("err: %v", err) 1373 } 1374 } 1375 // If this test fails here this means that the Consul-X-Index 1376 // has changed for the RPC, which means that idempotent ops 1377 // are not working as intended. 1378 var after structs.IndexedCheckServiceNodes 1379 if err := a.RPC("Health.ServiceNodes", args, &after); err != nil { 1380 t.Fatalf("err: %v", err) 1381 } 1382 require.Equal(t, before, after) 1383} 1384 1385func TestAgent_AddCheck(t *testing.T) { 1386 if testing.Short() { 1387 t.Skip("too slow for testing.Short") 1388 } 1389 1390 t.Parallel() 1391 a := NewTestAgent(t, ` 1392 enable_script_checks = true 1393 `) 1394 defer a.Shutdown() 1395 1396 health := &structs.HealthCheck{ 1397 Node: "foo", 1398 CheckID: "mem", 1399 Name: "memory util", 1400 Status: api.HealthCritical, 1401 } 1402 chk := &structs.CheckType{ 1403 ScriptArgs: []string{"exit", "0"}, 1404 Interval: 15 * time.Second, 1405 } 1406 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1407 if err != nil { 1408 t.Fatalf("err: %v", err) 1409 } 1410 1411 // Ensure we have a check mapping 1412 sChk := requireCheckExists(t, a, "mem") 1413 1414 // Ensure our check is in the right state 1415 if sChk.Status != api.HealthCritical { 1416 t.Fatalf("check not critical") 1417 } 1418 1419 // Ensure a TTL is setup 1420 requireCheckExistsMap(t, a.checkMonitors, "mem") 1421} 1422 1423func TestAgent_AddCheck_StartPassing(t *testing.T) { 1424 if testing.Short() { 1425 t.Skip("too slow for testing.Short") 1426 } 1427 1428 t.Parallel() 1429 a := NewTestAgent(t, ` 1430 enable_script_checks = true 1431 `) 1432 defer a.Shutdown() 1433 1434 health := &structs.HealthCheck{ 1435 Node: "foo", 1436 CheckID: "mem", 1437 Name: "memory util", 1438 Status: api.HealthPassing, 1439 } 1440 chk := &structs.CheckType{ 1441 ScriptArgs: []string{"exit", "0"}, 1442 Interval: 15 * time.Second, 1443 } 1444 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1445 if err != nil { 1446 t.Fatalf("err: %v", err) 1447 } 1448 1449 // Ensure we have a check mapping 1450 sChk := requireCheckExists(t, a, "mem") 1451 1452 // Ensure our check is in the right state 1453 if sChk.Status != api.HealthPassing { 1454 t.Fatalf("check not passing") 1455 } 1456 1457 // Ensure a TTL is setup 1458 requireCheckExistsMap(t, a.checkMonitors, "mem") 1459} 1460 1461func TestAgent_AddCheck_MinInterval(t *testing.T) { 1462 if testing.Short() { 1463 t.Skip("too slow for testing.Short") 1464 } 1465 1466 t.Parallel() 1467 a := NewTestAgent(t, ` 1468 enable_script_checks = true 1469 `) 1470 defer a.Shutdown() 1471 1472 health := &structs.HealthCheck{ 1473 Node: "foo", 1474 CheckID: "mem", 1475 Name: "memory util", 1476 Status: api.HealthCritical, 1477 } 1478 chk := &structs.CheckType{ 1479 ScriptArgs: []string{"exit", "0"}, 1480 Interval: time.Microsecond, 1481 } 1482 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1483 if err != nil { 1484 t.Fatalf("err: %v", err) 1485 } 1486 1487 // Ensure we have a check mapping 1488 requireCheckExists(t, a, "mem") 1489 1490 // Ensure a TTL is setup 1491 if mon, ok := a.checkMonitors[structs.NewCheckID("mem", nil)]; !ok { 1492 t.Fatalf("missing mem monitor") 1493 } else if mon.Interval != checks.MinInterval { 1494 t.Fatalf("bad mem monitor interval") 1495 } 1496} 1497 1498func TestAgent_AddCheck_MissingService(t *testing.T) { 1499 if testing.Short() { 1500 t.Skip("too slow for testing.Short") 1501 } 1502 1503 t.Parallel() 1504 a := NewTestAgent(t, ` 1505 enable_script_checks = true 1506 `) 1507 defer a.Shutdown() 1508 1509 health := &structs.HealthCheck{ 1510 Node: "foo", 1511 CheckID: "baz", 1512 Name: "baz check 1", 1513 ServiceID: "baz", 1514 } 1515 chk := &structs.CheckType{ 1516 ScriptArgs: []string{"exit", "0"}, 1517 Interval: time.Microsecond, 1518 } 1519 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1520 if err == nil || err.Error() != fmt.Sprintf("ServiceID %q does not exist", structs.ServiceIDString("baz", nil)) { 1521 t.Fatalf("expected service id error, got: %v", err) 1522 } 1523} 1524 1525func TestAgent_AddCheck_RestoreState(t *testing.T) { 1526 if testing.Short() { 1527 t.Skip("too slow for testing.Short") 1528 } 1529 1530 t.Parallel() 1531 a := NewTestAgent(t, "") 1532 defer a.Shutdown() 1533 1534 // Create some state and persist it 1535 ttl := &checks.CheckTTL{ 1536 CheckID: structs.NewCheckID("baz", nil), 1537 TTL: time.Minute, 1538 } 1539 err := a.persistCheckState(ttl, api.HealthPassing, "yup") 1540 if err != nil { 1541 t.Fatalf("err: %s", err) 1542 } 1543 1544 // Build and register the check definition and initial state 1545 health := &structs.HealthCheck{ 1546 Node: "foo", 1547 CheckID: "baz", 1548 Name: "baz check 1", 1549 } 1550 chk := &structs.CheckType{ 1551 TTL: time.Minute, 1552 } 1553 err = a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1554 if err != nil { 1555 t.Fatalf("err: %s", err) 1556 } 1557 1558 // Ensure the check status was restored during registration 1559 check := requireCheckExists(t, a, "baz") 1560 if check.Status != api.HealthPassing { 1561 t.Fatalf("bad: %#v", check) 1562 } 1563 if check.Output != "yup" { 1564 t.Fatalf("bad: %#v", check) 1565 } 1566} 1567 1568func TestAgent_AddCheck_ExecDisable(t *testing.T) { 1569 if testing.Short() { 1570 t.Skip("too slow for testing.Short") 1571 } 1572 1573 t.Parallel() 1574 1575 a := NewTestAgent(t, "") 1576 defer a.Shutdown() 1577 1578 health := &structs.HealthCheck{ 1579 Node: "foo", 1580 CheckID: "mem", 1581 Name: "memory util", 1582 Status: api.HealthCritical, 1583 } 1584 chk := &structs.CheckType{ 1585 ScriptArgs: []string{"exit", "0"}, 1586 Interval: 15 * time.Second, 1587 } 1588 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1589 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") { 1590 t.Fatalf("err: %v", err) 1591 } 1592 1593 // Ensure we don't have a check mapping 1594 requireCheckMissing(t, a, "mem") 1595 1596 err = a.AddCheck(health, chk, false, "", ConfigSourceRemote) 1597 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") { 1598 t.Fatalf("err: %v", err) 1599 } 1600 1601 // Ensure we don't have a check mapping 1602 requireCheckMissing(t, a, "mem") 1603} 1604 1605func TestAgent_AddCheck_ExecRemoteDisable(t *testing.T) { 1606 if testing.Short() { 1607 t.Skip("too slow for testing.Short") 1608 } 1609 1610 t.Parallel() 1611 1612 a := NewTestAgent(t, ` 1613 enable_local_script_checks = true 1614 `) 1615 defer a.Shutdown() 1616 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 1617 1618 health := &structs.HealthCheck{ 1619 Node: "foo", 1620 CheckID: "mem", 1621 Name: "memory util", 1622 Status: api.HealthCritical, 1623 } 1624 chk := &structs.CheckType{ 1625 ScriptArgs: []string{"exit", "0"}, 1626 Interval: 15 * time.Second, 1627 } 1628 err := a.AddCheck(health, chk, false, "", ConfigSourceRemote) 1629 if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent from remote calls") { 1630 t.Fatalf("err: %v", err) 1631 } 1632 1633 // Ensure we don't have a check mapping 1634 requireCheckMissing(t, a, "mem") 1635} 1636 1637func TestAgent_AddCheck_GRPC(t *testing.T) { 1638 if testing.Short() { 1639 t.Skip("too slow for testing.Short") 1640 } 1641 1642 t.Parallel() 1643 a := NewTestAgent(t, "") 1644 defer a.Shutdown() 1645 1646 health := &structs.HealthCheck{ 1647 Node: "foo", 1648 CheckID: "grpchealth", 1649 Name: "grpc health checking protocol", 1650 Status: api.HealthCritical, 1651 } 1652 chk := &structs.CheckType{ 1653 GRPC: "localhost:12345/package.Service", 1654 Interval: 15 * time.Second, 1655 } 1656 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1657 if err != nil { 1658 t.Fatalf("err: %v", err) 1659 } 1660 1661 // Ensure we have a check mapping 1662 sChk := requireCheckExists(t, a, "grpchealth") 1663 1664 // Ensure our check is in the right state 1665 if sChk.Status != api.HealthCritical { 1666 t.Fatalf("check not critical") 1667 } 1668 1669 // Ensure a check is setup 1670 requireCheckExistsMap(t, a.checkGRPCs, "grpchealth") 1671} 1672 1673func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) { 1674 // t.Parallel() don't even think about making this parallel 1675 1676 // This test is very contrived and tests for the absence of race conditions 1677 // related to the implementation of alias checks. As such it is slow, 1678 // serial, full of sleeps and retries, and not generally a great test to 1679 // run all of the time. 1680 // 1681 // That said it made it incredibly easy to root out various race conditions 1682 // quite successfully. 1683 // 1684 // The original set of races was between: 1685 // 1686 // - agent startup reloading Services and Checks from disk 1687 // - API requests to also re-register those same Services and Checks 1688 // - the goroutines for the as-yet-to-be-stopped CheckAlias goroutines 1689 1690 if os.Getenv("SLOWTEST") != "1" { 1691 t.Skip("skipping slow test; set SLOWTEST=1 to run") 1692 return 1693 } 1694 1695 // We do this so that the agent logs and the informational messages from 1696 // the test itself are interwoven properly. 1697 logf := func(t *testing.T, a *TestAgent, format string, args ...interface{}) { 1698 a.logger.Info("testharness: " + fmt.Sprintf(format, args...)) 1699 } 1700 1701 cfg := ` 1702 server = false 1703 bootstrap = false 1704 enable_central_service_config = false 1705 ` 1706 a := StartTestAgent(t, TestAgent{HCL: cfg}) 1707 defer a.Shutdown() 1708 1709 testCtx, testCancel := context.WithCancel(context.Background()) 1710 defer testCancel() 1711 1712 testHTTPServer, returnPort := launchHTTPCheckServer(t, testCtx) 1713 defer func() { 1714 testHTTPServer.Close() 1715 returnPort() 1716 }() 1717 1718 registerServicesAndChecks := func(t *testing.T, a *TestAgent) { 1719 // add one persistent service with a simple check 1720 require.NoError(t, a.addServiceFromSource( 1721 &structs.NodeService{ 1722 ID: "ping", 1723 Service: "ping", 1724 Port: 8000, 1725 }, 1726 []*structs.CheckType{ 1727 { 1728 HTTP: testHTTPServer.URL, 1729 Method: "GET", 1730 Interval: 5 * time.Second, 1731 Timeout: 1 * time.Second, 1732 }, 1733 }, 1734 true, "", ConfigSourceLocal, 1735 )) 1736 1737 // add one persistent sidecar service with an alias check in the manner 1738 // of how sidecar_service would add it 1739 require.NoError(t, a.addServiceFromSource( 1740 &structs.NodeService{ 1741 ID: "ping-sidecar-proxy", 1742 Service: "ping-sidecar-proxy", 1743 Port: 9000, 1744 }, 1745 []*structs.CheckType{ 1746 { 1747 Name: "Connect Sidecar Aliasing ping", 1748 AliasService: "ping", 1749 }, 1750 }, 1751 true, "", ConfigSourceLocal, 1752 )) 1753 } 1754 1755 retryUntilCheckState := func(t *testing.T, a *TestAgent, checkID string, expectedStatus string) { 1756 t.Helper() 1757 retry.Run(t, func(r *retry.R) { 1758 chk := requireCheckExists(t, a, types.CheckID(checkID)) 1759 if chk.Status != expectedStatus { 1760 logf(t, a, "check=%q expected status %q but got %q", checkID, expectedStatus, chk.Status) 1761 r.Fatalf("check=%q expected status %q but got %q", checkID, expectedStatus, chk.Status) 1762 } 1763 logf(t, a, "check %q has reached desired status %q", checkID, expectedStatus) 1764 }) 1765 } 1766 1767 registerServicesAndChecks(t, a) 1768 1769 time.Sleep(1 * time.Second) 1770 1771 retryUntilCheckState(t, a, "service:ping", api.HealthPassing) 1772 retryUntilCheckState(t, a, "service:ping-sidecar-proxy", api.HealthPassing) 1773 1774 logf(t, a, "==== POWERING DOWN ORIGINAL ====") 1775 1776 require.NoError(t, a.Shutdown()) 1777 1778 time.Sleep(1 * time.Second) 1779 1780 futureHCL := cfg + ` 1781node_id = "` + string(a.Config.NodeID) + `" 1782node_name = "` + a.Config.NodeName + `" 1783 ` 1784 1785 restartOnce := func(idx int, t *testing.T) { 1786 t.Helper() 1787 1788 // Reload and retain former NodeID and data directory. 1789 a2 := StartTestAgent(t, TestAgent{HCL: futureHCL, DataDir: a.DataDir}) 1790 defer a2.Shutdown() 1791 a = nil 1792 1793 // reregister during standup; we use an adjustable timing to try and force a race 1794 sleepDur := time.Duration(idx+1) * 500 * time.Millisecond 1795 time.Sleep(sleepDur) 1796 logf(t, a2, "re-registering checks and services after a delay of %v", sleepDur) 1797 for i := 0; i < 20; i++ { // RACE RACE RACE! 1798 registerServicesAndChecks(t, a2) 1799 time.Sleep(50 * time.Millisecond) 1800 } 1801 1802 time.Sleep(1 * time.Second) 1803 1804 retryUntilCheckState(t, a2, "service:ping", api.HealthPassing) 1805 1806 logf(t, a2, "giving the alias check a chance to notice...") 1807 time.Sleep(5 * time.Second) 1808 1809 retryUntilCheckState(t, a2, "service:ping-sidecar-proxy", api.HealthPassing) 1810 } 1811 1812 for i := 0; i < 20; i++ { 1813 name := "restart-" + strconv.Itoa(i) 1814 ok := t.Run(name, func(t *testing.T) { 1815 restartOnce(i, t) 1816 }) 1817 require.True(t, ok, name+" failed") 1818 } 1819} 1820 1821func launchHTTPCheckServer(t *testing.T, ctx context.Context) (srv *httptest.Server, returnPortsFn func()) { 1822 ports := freeport.MustTake(1) 1823 port := ports[0] 1824 1825 addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(port)) 1826 1827 var lc net.ListenConfig 1828 listener, err := lc.Listen(ctx, "tcp", addr) 1829 require.NoError(t, err) 1830 1831 handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 1832 w.WriteHeader(http.StatusOK) 1833 _, _ = w.Write([]byte("OK\n")) 1834 }) 1835 1836 srv = &httptest.Server{ 1837 Listener: listener, 1838 Config: &http.Server{Handler: handler}, 1839 } 1840 srv.Start() 1841 return srv, func() { freeport.Return(ports) } 1842} 1843 1844func TestAgent_AddCheck_Alias(t *testing.T) { 1845 if testing.Short() { 1846 t.Skip("too slow for testing.Short") 1847 } 1848 1849 t.Parallel() 1850 1851 require := require.New(t) 1852 a := NewTestAgent(t, "") 1853 defer a.Shutdown() 1854 1855 health := &structs.HealthCheck{ 1856 Node: "foo", 1857 CheckID: "aliashealth", 1858 Name: "Alias health check", 1859 Status: api.HealthCritical, 1860 } 1861 chk := &structs.CheckType{ 1862 AliasService: "foo", 1863 } 1864 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1865 require.NoError(err) 1866 1867 // Ensure we have a check mapping 1868 sChk := requireCheckExists(t, a, "aliashealth") 1869 require.Equal(api.HealthCritical, sChk.Status) 1870 1871 chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)] 1872 require.True(ok, "missing aliashealth check") 1873 require.Equal("", chkImpl.RPCReq.Token) 1874 1875 cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil)) 1876 require.NotNil(cs) 1877 require.Equal("", cs.Token) 1878} 1879 1880func TestAgent_AddCheck_Alias_setToken(t *testing.T) { 1881 if testing.Short() { 1882 t.Skip("too slow for testing.Short") 1883 } 1884 1885 t.Parallel() 1886 1887 require := require.New(t) 1888 a := NewTestAgent(t, "") 1889 defer a.Shutdown() 1890 1891 health := &structs.HealthCheck{ 1892 Node: "foo", 1893 CheckID: "aliashealth", 1894 Name: "Alias health check", 1895 Status: api.HealthCritical, 1896 } 1897 chk := &structs.CheckType{ 1898 AliasService: "foo", 1899 } 1900 err := a.AddCheck(health, chk, false, "foo", ConfigSourceLocal) 1901 require.NoError(err) 1902 1903 cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil)) 1904 require.NotNil(cs) 1905 require.Equal("foo", cs.Token) 1906 1907 chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)] 1908 require.True(ok, "missing aliashealth check") 1909 require.Equal("foo", chkImpl.RPCReq.Token) 1910} 1911 1912func TestAgent_AddCheck_Alias_userToken(t *testing.T) { 1913 if testing.Short() { 1914 t.Skip("too slow for testing.Short") 1915 } 1916 1917 t.Parallel() 1918 1919 require := require.New(t) 1920 a := NewTestAgent(t, ` 1921acl_token = "hello" 1922 `) 1923 defer a.Shutdown() 1924 1925 health := &structs.HealthCheck{ 1926 Node: "foo", 1927 CheckID: "aliashealth", 1928 Name: "Alias health check", 1929 Status: api.HealthCritical, 1930 } 1931 chk := &structs.CheckType{ 1932 AliasService: "foo", 1933 } 1934 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 1935 require.NoError(err) 1936 1937 cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil)) 1938 require.NotNil(cs) 1939 require.Equal("", cs.Token) // State token should still be empty 1940 1941 chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)] 1942 require.True(ok, "missing aliashealth check") 1943 require.Equal("hello", chkImpl.RPCReq.Token) // Check should use the token 1944} 1945 1946func TestAgent_AddCheck_Alias_userAndSetToken(t *testing.T) { 1947 if testing.Short() { 1948 t.Skip("too slow for testing.Short") 1949 } 1950 1951 t.Parallel() 1952 1953 require := require.New(t) 1954 a := NewTestAgent(t, ` 1955acl_token = "hello" 1956 `) 1957 defer a.Shutdown() 1958 1959 health := &structs.HealthCheck{ 1960 Node: "foo", 1961 CheckID: "aliashealth", 1962 Name: "Alias health check", 1963 Status: api.HealthCritical, 1964 } 1965 chk := &structs.CheckType{ 1966 AliasService: "foo", 1967 } 1968 err := a.AddCheck(health, chk, false, "goodbye", ConfigSourceLocal) 1969 require.NoError(err) 1970 1971 cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil)) 1972 require.NotNil(cs) 1973 require.Equal("goodbye", cs.Token) 1974 1975 chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)] 1976 require.True(ok, "missing aliashealth check") 1977 require.Equal("goodbye", chkImpl.RPCReq.Token) 1978} 1979 1980func TestAgent_RemoveCheck(t *testing.T) { 1981 if testing.Short() { 1982 t.Skip("too slow for testing.Short") 1983 } 1984 1985 t.Parallel() 1986 a := NewTestAgent(t, ` 1987 enable_script_checks = true 1988 `) 1989 defer a.Shutdown() 1990 1991 // Remove check that doesn't exist 1992 if err := a.RemoveCheck(structs.NewCheckID("mem", nil), false); err != nil { 1993 t.Fatalf("err: %v", err) 1994 } 1995 1996 // Remove without an ID 1997 if err := a.RemoveCheck(structs.NewCheckID("", nil), false); err == nil { 1998 t.Fatalf("should have errored") 1999 } 2000 2001 health := &structs.HealthCheck{ 2002 Node: "foo", 2003 CheckID: "mem", 2004 Name: "memory util", 2005 Status: api.HealthCritical, 2006 } 2007 chk := &structs.CheckType{ 2008 ScriptArgs: []string{"exit", "0"}, 2009 Interval: 15 * time.Second, 2010 } 2011 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 2012 if err != nil { 2013 t.Fatalf("err: %v", err) 2014 } 2015 2016 // Remove check 2017 if err := a.RemoveCheck(structs.NewCheckID("mem", nil), false); err != nil { 2018 t.Fatalf("err: %v", err) 2019 } 2020 2021 // Ensure we have a check mapping 2022 requireCheckMissing(t, a, "mem") 2023 2024 // Ensure a TTL is setup 2025 requireCheckMissingMap(t, a.checkMonitors, "mem") 2026} 2027 2028func TestAgent_HTTPCheck_TLSSkipVerify(t *testing.T) { 2029 if testing.Short() { 2030 t.Skip("too slow for testing.Short") 2031 } 2032 2033 t.Parallel() 2034 2035 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 2036 fmt.Fprintln(w, "GOOD") 2037 }) 2038 server := httptest.NewTLSServer(handler) 2039 defer server.Close() 2040 2041 a := NewTestAgent(t, "") 2042 defer a.Shutdown() 2043 2044 health := &structs.HealthCheck{ 2045 Node: "foo", 2046 CheckID: "tls", 2047 Name: "tls check", 2048 Status: api.HealthCritical, 2049 } 2050 chk := &structs.CheckType{ 2051 HTTP: server.URL, 2052 Interval: 20 * time.Millisecond, 2053 TLSSkipVerify: true, 2054 } 2055 2056 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 2057 if err != nil { 2058 t.Fatalf("err: %v", err) 2059 } 2060 2061 retry.Run(t, func(r *retry.R) { 2062 status := getCheck(a, "tls") 2063 if status.Status != api.HealthPassing { 2064 r.Fatalf("bad: %v", status.Status) 2065 } 2066 if !strings.Contains(status.Output, "GOOD") { 2067 r.Fatalf("bad: %v", status.Output) 2068 } 2069 }) 2070 2071} 2072 2073func TestAgent_HTTPCheck_EnableAgentTLSForChecks(t *testing.T) { 2074 if testing.Short() { 2075 t.Skip("too slow for testing.Short") 2076 } 2077 2078 t.Parallel() 2079 2080 run := func(t *testing.T, ca string) { 2081 a := StartTestAgent(t, TestAgent{ 2082 UseTLS: true, 2083 HCL: ` 2084 enable_agent_tls_for_checks = true 2085 2086 verify_incoming = true 2087 server_name = "consul.test" 2088 key_file = "../test/client_certs/server.key" 2089 cert_file = "../test/client_certs/server.crt" 2090 ` + ca, 2091 }) 2092 defer a.Shutdown() 2093 2094 health := &structs.HealthCheck{ 2095 Node: "foo", 2096 CheckID: "tls", 2097 Name: "tls check", 2098 Status: api.HealthCritical, 2099 } 2100 2101 addr, err := firstAddr(a.Agent.apiServers, "https") 2102 require.NoError(t, err) 2103 url := fmt.Sprintf("https://%s/v1/agent/self", addr.String()) 2104 chk := &structs.CheckType{ 2105 HTTP: url, 2106 Interval: 20 * time.Millisecond, 2107 } 2108 2109 err = a.AddCheck(health, chk, false, "", ConfigSourceLocal) 2110 if err != nil { 2111 t.Fatalf("err: %v", err) 2112 } 2113 2114 retry.Run(t, func(r *retry.R) { 2115 status := getCheck(a, "tls") 2116 if status.Status != api.HealthPassing { 2117 r.Fatalf("bad: %v", status.Status) 2118 } 2119 if !strings.Contains(status.Output, "200 OK") { 2120 r.Fatalf("bad: %v", status.Output) 2121 } 2122 }) 2123 } 2124 2125 // We need to test both methods of passing the CA info to ensure that 2126 // we propagate all the fields correctly. All the other fields are 2127 // covered by the HCL in the test run function. 2128 tests := []struct { 2129 desc string 2130 config string 2131 }{ 2132 {"ca_file", `ca_file = "../test/client_certs/rootca.crt"`}, 2133 {"ca_path", `ca_path = "../test/client_certs/path"`}, 2134 } 2135 for _, tt := range tests { 2136 t.Run(tt.desc, func(t *testing.T) { 2137 run(t, tt.config) 2138 }) 2139 } 2140} 2141 2142func TestAgent_updateTTLCheck(t *testing.T) { 2143 if testing.Short() { 2144 t.Skip("too slow for testing.Short") 2145 } 2146 2147 t.Parallel() 2148 a := NewTestAgent(t, "") 2149 defer a.Shutdown() 2150 checkBufSize := 100 2151 health := &structs.HealthCheck{ 2152 Node: "foo", 2153 CheckID: "mem", 2154 Name: "memory util", 2155 Status: api.HealthCritical, 2156 } 2157 chk := &structs.CheckType{ 2158 TTL: 15 * time.Second, 2159 OutputMaxSize: checkBufSize, 2160 } 2161 2162 // Add check and update it. 2163 err := a.AddCheck(health, chk, false, "", ConfigSourceLocal) 2164 if err != nil { 2165 t.Fatalf("err: %v", err) 2166 } 2167 if err := a.updateTTLCheck(structs.NewCheckID("mem", nil), api.HealthPassing, "foo"); err != nil { 2168 t.Fatalf("err: %v", err) 2169 } 2170 2171 // Ensure we have a check mapping. 2172 status := getCheck(a, "mem") 2173 if status.Status != api.HealthPassing { 2174 t.Fatalf("bad: %v", status) 2175 } 2176 if status.Output != "foo" { 2177 t.Fatalf("bad: %v", status) 2178 } 2179 2180 if err := a.updateTTLCheck(structs.NewCheckID("mem", nil), api.HealthCritical, strings.Repeat("--bad-- ", 5*checkBufSize)); err != nil { 2181 t.Fatalf("err: %v", err) 2182 } 2183 2184 // Ensure we have a check mapping. 2185 status = getCheck(a, "mem") 2186 if status.Status != api.HealthCritical { 2187 t.Fatalf("bad: %v", status) 2188 } 2189 if len(status.Output) > checkBufSize*2 { 2190 t.Fatalf("bad: %v", len(status.Output)) 2191 } 2192} 2193 2194func TestAgent_PersistService(t *testing.T) { 2195 if testing.Short() { 2196 t.Skip("too slow for testing.Short") 2197 } 2198 2199 t.Run("normal", func(t *testing.T) { 2200 t.Parallel() 2201 testAgent_PersistService(t, "enable_central_service_config = false") 2202 }) 2203 t.Run("service manager", func(t *testing.T) { 2204 t.Parallel() 2205 testAgent_PersistService(t, "enable_central_service_config = true") 2206 }) 2207} 2208 2209func testAgent_PersistService(t *testing.T, extraHCL string) { 2210 t.Helper() 2211 2212 cfg := ` 2213 server = false 2214 bootstrap = false 2215 ` + extraHCL 2216 a := StartTestAgent(t, TestAgent{HCL: cfg}) 2217 defer a.Shutdown() 2218 2219 svc := &structs.NodeService{ 2220 ID: "redis", 2221 Service: "redis", 2222 Tags: []string{"foo"}, 2223 Port: 8000, 2224 } 2225 2226 file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID)) 2227 2228 // Check is not persisted unless requested 2229 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 2230 t.Fatalf("err: %v", err) 2231 } 2232 if _, err := os.Stat(file); err == nil { 2233 t.Fatalf("should not persist") 2234 } 2235 2236 // Persists to file if requested 2237 if err := a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil { 2238 t.Fatalf("err: %v", err) 2239 } 2240 if _, err := os.Stat(file); err != nil { 2241 t.Fatalf("err: %s", err) 2242 } 2243 expected, err := json.Marshal(persistedService{ 2244 Token: "mytoken", 2245 Service: svc, 2246 Source: "local", 2247 }) 2248 if err != nil { 2249 t.Fatalf("err: %s", err) 2250 } 2251 content, err := ioutil.ReadFile(file) 2252 if err != nil { 2253 t.Fatalf("err: %s", err) 2254 } 2255 if !bytes.Equal(expected, content) { 2256 t.Fatalf("bad: %s", string(content)) 2257 } 2258 2259 // Updates service definition on disk 2260 svc.Port = 8001 2261 if err := a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil { 2262 t.Fatalf("err: %v", err) 2263 } 2264 expected, err = json.Marshal(persistedService{ 2265 Token: "mytoken", 2266 Service: svc, 2267 Source: "local", 2268 }) 2269 if err != nil { 2270 t.Fatalf("err: %s", err) 2271 } 2272 content, err = ioutil.ReadFile(file) 2273 if err != nil { 2274 t.Fatalf("err: %s", err) 2275 } 2276 if !bytes.Equal(expected, content) { 2277 t.Fatalf("bad: %s", string(content)) 2278 } 2279 a.Shutdown() 2280 2281 // Should load it back during later start 2282 a2 := StartTestAgent(t, TestAgent{HCL: cfg, DataDir: a.DataDir}) 2283 defer a2.Shutdown() 2284 2285 restored := a2.State.ServiceState(structs.NewServiceID(svc.ID, nil)) 2286 if restored == nil { 2287 t.Fatalf("service %q missing", svc.ID) 2288 } 2289 if got, want := restored.Token, "mytoken"; got != want { 2290 t.Fatalf("got token %q want %q", got, want) 2291 } 2292 if got, want := restored.Service.Port, 8001; got != want { 2293 t.Fatalf("got port %d want %d", got, want) 2294 } 2295} 2296 2297func TestAgent_persistedService_compat(t *testing.T) { 2298 if testing.Short() { 2299 t.Skip("too slow for testing.Short") 2300 } 2301 2302 t.Run("normal", func(t *testing.T) { 2303 t.Parallel() 2304 testAgent_persistedService_compat(t, "enable_central_service_config = false") 2305 }) 2306 t.Run("service manager", func(t *testing.T) { 2307 t.Parallel() 2308 testAgent_persistedService_compat(t, "enable_central_service_config = true") 2309 }) 2310} 2311 2312func testAgent_persistedService_compat(t *testing.T, extraHCL string) { 2313 t.Helper() 2314 2315 // Tests backwards compatibility of persisted services from pre-0.5.1 2316 a := NewTestAgent(t, extraHCL) 2317 defer a.Shutdown() 2318 2319 svc := &structs.NodeService{ 2320 ID: "redis", 2321 Service: "redis", 2322 Tags: []string{"foo"}, 2323 Port: 8000, 2324 TaggedAddresses: map[string]structs.ServiceAddress{}, 2325 Weights: &structs.Weights{Passing: 1, Warning: 1}, 2326 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 2327 } 2328 2329 // Encode the NodeService directly. This is what previous versions 2330 // would serialize to the file (without the wrapper) 2331 encoded, err := json.Marshal(svc) 2332 if err != nil { 2333 t.Fatalf("err: %s", err) 2334 } 2335 2336 // Write the content to the file 2337 file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID)) 2338 if err := os.MkdirAll(filepath.Dir(file), 0700); err != nil { 2339 t.Fatalf("err: %s", err) 2340 } 2341 if err := ioutil.WriteFile(file, encoded, 0600); err != nil { 2342 t.Fatalf("err: %s", err) 2343 } 2344 2345 // Load the services 2346 if err := a.loadServices(a.Config, nil); err != nil { 2347 t.Fatalf("err: %s", err) 2348 } 2349 2350 // Ensure the service was restored 2351 result := requireServiceExists(t, a, "redis") 2352 require.Equal(t, svc, result) 2353} 2354 2355func TestAgent_PurgeService(t *testing.T) { 2356 if testing.Short() { 2357 t.Skip("too slow for testing.Short") 2358 } 2359 2360 t.Run("normal", func(t *testing.T) { 2361 t.Parallel() 2362 testAgent_PurgeService(t, "enable_central_service_config = false") 2363 }) 2364 t.Run("service manager", func(t *testing.T) { 2365 t.Parallel() 2366 testAgent_PurgeService(t, "enable_central_service_config = true") 2367 }) 2368} 2369 2370func testAgent_PurgeService(t *testing.T, extraHCL string) { 2371 t.Helper() 2372 2373 a := NewTestAgent(t, extraHCL) 2374 defer a.Shutdown() 2375 2376 svc := &structs.NodeService{ 2377 ID: "redis", 2378 Service: "redis", 2379 Tags: []string{"foo"}, 2380 Port: 8000, 2381 } 2382 2383 file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID)) 2384 if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil { 2385 t.Fatalf("err: %v", err) 2386 } 2387 // Exists 2388 if _, err := os.Stat(file); err != nil { 2389 t.Fatalf("err: %s", err) 2390 } 2391 2392 // Not removed 2393 if err := a.removeService(structs.NewServiceID(svc.ID, nil), false); err != nil { 2394 t.Fatalf("err: %s", err) 2395 } 2396 if _, err := os.Stat(file); err != nil { 2397 t.Fatalf("err: %s", err) 2398 } 2399 2400 // Re-add the service 2401 if err := a.addServiceFromSource(svc, nil, true, "", ConfigSourceLocal); err != nil { 2402 t.Fatalf("err: %v", err) 2403 } 2404 2405 // Removed 2406 if err := a.removeService(structs.NewServiceID(svc.ID, nil), true); err != nil { 2407 t.Fatalf("err: %s", err) 2408 } 2409 if _, err := os.Stat(file); !os.IsNotExist(err) { 2410 t.Fatalf("bad: %#v", err) 2411 } 2412} 2413 2414func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { 2415 if testing.Short() { 2416 t.Skip("too slow for testing.Short") 2417 } 2418 2419 t.Run("normal", func(t *testing.T) { 2420 t.Parallel() 2421 testAgent_PurgeServiceOnDuplicate(t, "enable_central_service_config = false") 2422 }) 2423 t.Run("service manager", func(t *testing.T) { 2424 t.Parallel() 2425 testAgent_PurgeServiceOnDuplicate(t, "enable_central_service_config = true") 2426 }) 2427} 2428 2429func testAgent_PurgeServiceOnDuplicate(t *testing.T, extraHCL string) { 2430 t.Helper() 2431 2432 cfg := ` 2433 server = false 2434 bootstrap = false 2435 ` + extraHCL 2436 a := StartTestAgent(t, TestAgent{HCL: cfg}) 2437 defer a.Shutdown() 2438 2439 svc1 := &structs.NodeService{ 2440 ID: "redis", 2441 Service: "redis", 2442 Tags: []string{"foo"}, 2443 Port: 8000, 2444 } 2445 2446 // First persist the service 2447 require.NoError(t, a.addServiceFromSource(svc1, nil, true, "", ConfigSourceLocal)) 2448 a.Shutdown() 2449 2450 // Try bringing the agent back up with the service already 2451 // existing in the config 2452 a2 := StartTestAgent(t, TestAgent{Name: "Agent2", HCL: cfg + ` 2453 service = { 2454 id = "redis" 2455 name = "redis" 2456 tags = ["bar"] 2457 port = 9000 2458 } 2459 `, DataDir: a.DataDir}) 2460 defer a2.Shutdown() 2461 2462 sid := svc1.CompoundServiceID() 2463 file := filepath.Join(a.Config.DataDir, servicesDir, sid.StringHash()) 2464 _, err := os.Stat(file) 2465 require.Error(t, err, "should have removed persisted service") 2466 result := requireServiceExists(t, a, "redis") 2467 require.NotEqual(t, []string{"bar"}, result.Tags) 2468 require.NotEqual(t, 9000, result.Port) 2469} 2470 2471func TestAgent_PersistCheck(t *testing.T) { 2472 if testing.Short() { 2473 t.Skip("too slow for testing.Short") 2474 } 2475 2476 t.Parallel() 2477 cfg := ` 2478 server = false 2479 bootstrap = false 2480 enable_script_checks = true 2481 ` 2482 a := StartTestAgent(t, TestAgent{HCL: cfg}) 2483 defer a.Shutdown() 2484 2485 check := &structs.HealthCheck{ 2486 Node: a.config.NodeName, 2487 CheckID: "mem", 2488 Name: "memory check", 2489 Status: api.HealthPassing, 2490 } 2491 chkType := &structs.CheckType{ 2492 ScriptArgs: []string{"/bin/true"}, 2493 Interval: 10 * time.Second, 2494 } 2495 2496 cid := check.CompoundCheckID() 2497 file := filepath.Join(a.Config.DataDir, checksDir, cid.StringHash()) 2498 2499 // Not persisted if not requested 2500 require.NoError(t, a.AddCheck(check, chkType, false, "", ConfigSourceLocal)) 2501 _, err := os.Stat(file) 2502 require.Error(t, err, "should not persist") 2503 2504 // Should persist if requested 2505 require.NoError(t, a.AddCheck(check, chkType, true, "mytoken", ConfigSourceLocal)) 2506 _, err = os.Stat(file) 2507 require.NoError(t, err) 2508 2509 expected, err := json.Marshal(persistedCheck{ 2510 Check: check, 2511 ChkType: chkType, 2512 Token: "mytoken", 2513 Source: "local", 2514 }) 2515 require.NoError(t, err) 2516 2517 content, err := ioutil.ReadFile(file) 2518 require.NoError(t, err) 2519 2520 require.Equal(t, expected, content) 2521 2522 // Updates the check definition on disk 2523 check.Name = "mem1" 2524 require.NoError(t, a.AddCheck(check, chkType, true, "mytoken", ConfigSourceLocal)) 2525 expected, err = json.Marshal(persistedCheck{ 2526 Check: check, 2527 ChkType: chkType, 2528 Token: "mytoken", 2529 Source: "local", 2530 }) 2531 require.NoError(t, err) 2532 content, err = ioutil.ReadFile(file) 2533 require.NoError(t, err) 2534 require.Equal(t, expected, content) 2535 a.Shutdown() 2536 2537 // Should load it back during later start 2538 a2 := StartTestAgent(t, TestAgent{Name: "Agent2", HCL: cfg, DataDir: a.DataDir}) 2539 defer a2.Shutdown() 2540 2541 result := requireCheckExists(t, a2, check.CheckID) 2542 require.Equal(t, api.HealthCritical, result.Status) 2543 require.Equal(t, "mem1", result.Name) 2544 2545 // Should have restored the monitor 2546 requireCheckExistsMap(t, a2.checkMonitors, check.CheckID) 2547 chkState := a2.State.CheckState(structs.NewCheckID(check.CheckID, nil)) 2548 require.NotNil(t, chkState) 2549 require.Equal(t, "mytoken", chkState.Token) 2550} 2551 2552func TestAgent_PurgeCheck(t *testing.T) { 2553 if testing.Short() { 2554 t.Skip("too slow for testing.Short") 2555 } 2556 2557 t.Parallel() 2558 a := NewTestAgent(t, "") 2559 defer a.Shutdown() 2560 2561 check := &structs.HealthCheck{ 2562 Node: a.Config.NodeName, 2563 CheckID: "mem", 2564 Name: "memory check", 2565 Status: api.HealthPassing, 2566 } 2567 2568 file := filepath.Join(a.Config.DataDir, checksDir, checkIDHash(check.CheckID)) 2569 if err := a.AddCheck(check, nil, true, "", ConfigSourceLocal); err != nil { 2570 t.Fatalf("err: %v", err) 2571 } 2572 2573 // Not removed 2574 if err := a.RemoveCheck(structs.NewCheckID(check.CheckID, nil), false); err != nil { 2575 t.Fatalf("err: %s", err) 2576 } 2577 if _, err := os.Stat(file); err != nil { 2578 t.Fatalf("err: %s", err) 2579 } 2580 2581 // Removed 2582 if err := a.RemoveCheck(structs.NewCheckID(check.CheckID, nil), true); err != nil { 2583 t.Fatalf("err: %s", err) 2584 } 2585 if _, err := os.Stat(file); !os.IsNotExist(err) { 2586 t.Fatalf("bad: %#v", err) 2587 } 2588} 2589 2590func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { 2591 if testing.Short() { 2592 t.Skip("too slow for testing.Short") 2593 } 2594 2595 t.Parallel() 2596 nodeID := NodeID() 2597 a := StartTestAgent(t, TestAgent{ 2598 HCL: ` 2599 node_id = "` + nodeID + `" 2600 node_name = "Node ` + nodeID + `" 2601 server = false 2602 bootstrap = false 2603 enable_script_checks = true 2604 `}) 2605 defer a.Shutdown() 2606 2607 check1 := &structs.HealthCheck{ 2608 Node: a.Config.NodeName, 2609 CheckID: "mem", 2610 Name: "memory check", 2611 Status: api.HealthPassing, 2612 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 2613 } 2614 2615 // First persist the check 2616 if err := a.AddCheck(check1, nil, true, "", ConfigSourceLocal); err != nil { 2617 t.Fatalf("err: %v", err) 2618 } 2619 a.Shutdown() 2620 2621 // Start again with the check registered in config 2622 a2 := StartTestAgent(t, TestAgent{ 2623 Name: "Agent2", 2624 DataDir: a.DataDir, 2625 HCL: ` 2626 node_id = "` + nodeID + `" 2627 node_name = "Node ` + nodeID + `" 2628 server = false 2629 bootstrap = false 2630 enable_script_checks = true 2631 check = { 2632 id = "mem" 2633 name = "memory check" 2634 notes = "my cool notes" 2635 args = ["/bin/check-redis.py"] 2636 interval = "30s" 2637 timeout = "5s" 2638 } 2639 `}) 2640 defer a2.Shutdown() 2641 2642 cid := check1.CompoundCheckID() 2643 file := filepath.Join(a.DataDir, checksDir, cid.StringHash()) 2644 if _, err := os.Stat(file); err == nil { 2645 t.Fatalf("should have removed persisted check") 2646 } 2647 result := requireCheckExists(t, a2, "mem") 2648 expected := &structs.HealthCheck{ 2649 Node: a2.Config.NodeName, 2650 CheckID: "mem", 2651 Name: "memory check", 2652 Status: api.HealthCritical, 2653 Notes: "my cool notes", 2654 Interval: "30s", 2655 Timeout: "5s", 2656 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 2657 } 2658 require.Equal(t, expected, result) 2659} 2660 2661func TestAgent_DeregisterPersistedSidecarAfterRestart(t *testing.T) { 2662 if testing.Short() { 2663 t.Skip("too slow for testing.Short") 2664 } 2665 2666 t.Parallel() 2667 nodeID := NodeID() 2668 a := StartTestAgent(t, TestAgent{ 2669 HCL: ` 2670 node_id = "` + nodeID + `" 2671 node_name = "Node ` + nodeID + `" 2672 server = false 2673 bootstrap = false 2674 enable_central_service_config = false 2675 `}) 2676 defer a.Shutdown() 2677 2678 srv := &structs.NodeService{ 2679 ID: "svc", 2680 Service: "svc", 2681 Weights: &structs.Weights{ 2682 Passing: 2, 2683 Warning: 1, 2684 }, 2685 Tags: []string{"tag2"}, 2686 Port: 8200, 2687 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 2688 2689 Connect: structs.ServiceConnect{ 2690 SidecarService: &structs.ServiceDefinition{}, 2691 }, 2692 } 2693 2694 connectSrv, _, _, err := a.sidecarServiceFromNodeService(srv, "") 2695 require.NoError(t, err) 2696 2697 // First persist the check 2698 err = a.addServiceFromSource(srv, nil, true, "", ConfigSourceLocal) 2699 require.NoError(t, err) 2700 err = a.addServiceFromSource(connectSrv, nil, true, "", ConfigSourceLocal) 2701 require.NoError(t, err) 2702 2703 // check both services were registered 2704 require.NotNil(t, a.State.Service(srv.CompoundServiceID())) 2705 require.NotNil(t, a.State.Service(connectSrv.CompoundServiceID())) 2706 2707 a.Shutdown() 2708 2709 // Start again with the check registered in config 2710 a2 := StartTestAgent(t, TestAgent{ 2711 Name: "Agent2", 2712 DataDir: a.DataDir, 2713 HCL: ` 2714 node_id = "` + nodeID + `" 2715 node_name = "Node ` + nodeID + `" 2716 server = false 2717 bootstrap = false 2718 enable_central_service_config = false 2719 `}) 2720 defer a2.Shutdown() 2721 2722 // check both services were restored 2723 require.NotNil(t, a2.State.Service(srv.CompoundServiceID())) 2724 require.NotNil(t, a2.State.Service(connectSrv.CompoundServiceID())) 2725 2726 err = a2.RemoveService(srv.CompoundServiceID()) 2727 require.NoError(t, err) 2728 2729 // check both services were deregistered 2730 require.Nil(t, a2.State.Service(srv.CompoundServiceID())) 2731 require.Nil(t, a2.State.Service(connectSrv.CompoundServiceID())) 2732} 2733 2734func TestAgent_loadChecks_token(t *testing.T) { 2735 if testing.Short() { 2736 t.Skip("too slow for testing.Short") 2737 } 2738 2739 t.Parallel() 2740 a := NewTestAgent(t, ` 2741 check = { 2742 id = "rabbitmq" 2743 name = "rabbitmq" 2744 token = "abc123" 2745 ttl = "10s" 2746 } 2747 `) 2748 defer a.Shutdown() 2749 2750 requireCheckExists(t, a, "rabbitmq") 2751 require.Equal(t, "abc123", a.State.CheckToken(structs.NewCheckID("rabbitmq", nil))) 2752} 2753 2754func TestAgent_unloadChecks(t *testing.T) { 2755 if testing.Short() { 2756 t.Skip("too slow for testing.Short") 2757 } 2758 2759 t.Parallel() 2760 a := NewTestAgent(t, "") 2761 defer a.Shutdown() 2762 2763 // First register a service 2764 svc := &structs.NodeService{ 2765 ID: "redis", 2766 Service: "redis", 2767 Tags: []string{"foo"}, 2768 Port: 8000, 2769 } 2770 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 2771 t.Fatalf("err: %v", err) 2772 } 2773 2774 // Register a check 2775 check1 := &structs.HealthCheck{ 2776 Node: a.Config.NodeName, 2777 CheckID: "service:redis", 2778 Name: "redischeck", 2779 Status: api.HealthPassing, 2780 ServiceID: "redis", 2781 ServiceName: "redis", 2782 } 2783 if err := a.AddCheck(check1, nil, false, "", ConfigSourceLocal); err != nil { 2784 t.Fatalf("err: %s", err) 2785 } 2786 2787 requireCheckExists(t, a, check1.CheckID) 2788 2789 // Unload all of the checks 2790 if err := a.unloadChecks(); err != nil { 2791 t.Fatalf("err: %s", err) 2792 } 2793 2794 // Make sure it was unloaded 2795 requireCheckMissing(t, a, check1.CheckID) 2796} 2797 2798func TestAgent_loadServices_token(t *testing.T) { 2799 if testing.Short() { 2800 t.Skip("too slow for testing.Short") 2801 } 2802 2803 t.Run("normal", func(t *testing.T) { 2804 t.Parallel() 2805 testAgent_loadServices_token(t, "enable_central_service_config = false") 2806 }) 2807 t.Run("service manager", func(t *testing.T) { 2808 t.Parallel() 2809 testAgent_loadServices_token(t, "enable_central_service_config = true") 2810 }) 2811} 2812 2813func testAgent_loadServices_token(t *testing.T, extraHCL string) { 2814 t.Helper() 2815 2816 a := NewTestAgent(t, ` 2817 service = { 2818 id = "rabbitmq" 2819 name = "rabbitmq" 2820 port = 5672 2821 token = "abc123" 2822 } 2823 `+extraHCL) 2824 defer a.Shutdown() 2825 2826 requireServiceExists(t, a, "rabbitmq") 2827 if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" { 2828 t.Fatalf("bad: %s", token) 2829 } 2830} 2831 2832func TestAgent_loadServices_sidecar(t *testing.T) { 2833 if testing.Short() { 2834 t.Skip("too slow for testing.Short") 2835 } 2836 2837 t.Run("normal", func(t *testing.T) { 2838 t.Parallel() 2839 testAgent_loadServices_sidecar(t, "enable_central_service_config = false") 2840 }) 2841 t.Run("service manager", func(t *testing.T) { 2842 t.Parallel() 2843 testAgent_loadServices_sidecar(t, "enable_central_service_config = true") 2844 }) 2845} 2846 2847func testAgent_loadServices_sidecar(t *testing.T, extraHCL string) { 2848 t.Helper() 2849 2850 a := NewTestAgent(t, ` 2851 service = { 2852 id = "rabbitmq" 2853 name = "rabbitmq" 2854 port = 5672 2855 token = "abc123" 2856 connect = { 2857 sidecar_service {} 2858 } 2859 } 2860 `+extraHCL) 2861 defer a.Shutdown() 2862 2863 svc := requireServiceExists(t, a, "rabbitmq") 2864 if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" { 2865 t.Fatalf("bad: %s", token) 2866 } 2867 requireServiceExists(t, a, "rabbitmq-sidecar-proxy") 2868 if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq-sidecar-proxy", nil)); token != "abc123" { 2869 t.Fatalf("bad: %s", token) 2870 } 2871 2872 // Sanity check rabbitmq service should NOT have sidecar info in state since 2873 // it's done it's job and should be a registration syntax sugar only. 2874 assert.Nil(t, svc.Connect.SidecarService) 2875} 2876 2877func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) { 2878 if testing.Short() { 2879 t.Skip("too slow for testing.Short") 2880 } 2881 2882 t.Run("normal", func(t *testing.T) { 2883 t.Parallel() 2884 testAgent_loadServices_sidecarSeparateToken(t, "enable_central_service_config = false") 2885 }) 2886 t.Run("service manager", func(t *testing.T) { 2887 t.Parallel() 2888 testAgent_loadServices_sidecarSeparateToken(t, "enable_central_service_config = true") 2889 }) 2890} 2891 2892func testAgent_loadServices_sidecarSeparateToken(t *testing.T, extraHCL string) { 2893 t.Helper() 2894 2895 a := NewTestAgent(t, ` 2896 service = { 2897 id = "rabbitmq" 2898 name = "rabbitmq" 2899 port = 5672 2900 token = "abc123" 2901 connect = { 2902 sidecar_service { 2903 token = "789xyz" 2904 } 2905 } 2906 } 2907 `+extraHCL) 2908 defer a.Shutdown() 2909 2910 requireServiceExists(t, a, "rabbitmq") 2911 if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" { 2912 t.Fatalf("bad: %s", token) 2913 } 2914 requireServiceExists(t, a, "rabbitmq-sidecar-proxy") 2915 if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq-sidecar-proxy", nil)); token != "789xyz" { 2916 t.Fatalf("bad: %s", token) 2917 } 2918} 2919 2920func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) { 2921 if testing.Short() { 2922 t.Skip("too slow for testing.Short") 2923 } 2924 2925 t.Run("normal", func(t *testing.T) { 2926 t.Parallel() 2927 testAgent_loadServices_sidecarInheritMeta(t, "enable_central_service_config = false") 2928 }) 2929 t.Run("service manager", func(t *testing.T) { 2930 t.Parallel() 2931 testAgent_loadServices_sidecarInheritMeta(t, "enable_central_service_config = true") 2932 }) 2933} 2934 2935func testAgent_loadServices_sidecarInheritMeta(t *testing.T, extraHCL string) { 2936 t.Helper() 2937 2938 a := NewTestAgent(t, ` 2939 service = { 2940 id = "rabbitmq" 2941 name = "rabbitmq" 2942 port = 5672 2943 tags = ["a", "b"], 2944 meta = { 2945 environment = "prod" 2946 } 2947 connect = { 2948 sidecar_service { 2949 2950 } 2951 } 2952 } 2953 `+extraHCL) 2954 defer a.Shutdown() 2955 2956 svc := requireServiceExists(t, a, "rabbitmq") 2957 require.Len(t, svc.Tags, 2) 2958 require.Len(t, svc.Meta, 1) 2959 2960 sidecar := requireServiceExists(t, a, "rabbitmq-sidecar-proxy") 2961 require.ElementsMatch(t, svc.Tags, sidecar.Tags) 2962 require.Len(t, sidecar.Meta, 1) 2963 meta, ok := sidecar.Meta["environment"] 2964 require.True(t, ok, "missing sidecar service meta") 2965 require.Equal(t, "prod", meta) 2966} 2967 2968func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) { 2969 if testing.Short() { 2970 t.Skip("too slow for testing.Short") 2971 } 2972 2973 t.Run("normal", func(t *testing.T) { 2974 t.Parallel() 2975 testAgent_loadServices_sidecarOverrideMeta(t, "enable_central_service_config = false") 2976 }) 2977 t.Run("service manager", func(t *testing.T) { 2978 t.Parallel() 2979 testAgent_loadServices_sidecarOverrideMeta(t, "enable_central_service_config = true") 2980 }) 2981} 2982 2983func testAgent_loadServices_sidecarOverrideMeta(t *testing.T, extraHCL string) { 2984 t.Helper() 2985 2986 a := NewTestAgent(t, ` 2987 service = { 2988 id = "rabbitmq" 2989 name = "rabbitmq" 2990 port = 5672 2991 tags = ["a", "b"], 2992 meta = { 2993 environment = "prod" 2994 } 2995 connect = { 2996 sidecar_service { 2997 tags = ["foo"], 2998 meta = { 2999 environment = "qa" 3000 } 3001 } 3002 } 3003 } 3004 `+extraHCL) 3005 defer a.Shutdown() 3006 3007 svc := requireServiceExists(t, a, "rabbitmq") 3008 require.Len(t, svc.Tags, 2) 3009 require.Len(t, svc.Meta, 1) 3010 3011 sidecar := requireServiceExists(t, a, "rabbitmq-sidecar-proxy") 3012 require.Len(t, sidecar.Tags, 1) 3013 require.Equal(t, "foo", sidecar.Tags[0]) 3014 require.Len(t, sidecar.Meta, 1) 3015 meta, ok := sidecar.Meta["environment"] 3016 require.True(t, ok, "missing sidecar service meta") 3017 require.Equal(t, "qa", meta) 3018} 3019 3020func TestAgent_unloadServices(t *testing.T) { 3021 if testing.Short() { 3022 t.Skip("too slow for testing.Short") 3023 } 3024 3025 t.Run("normal", func(t *testing.T) { 3026 t.Parallel() 3027 testAgent_unloadServices(t, "enable_central_service_config = false") 3028 }) 3029 t.Run("service manager", func(t *testing.T) { 3030 t.Parallel() 3031 testAgent_unloadServices(t, "enable_central_service_config = true") 3032 }) 3033} 3034 3035func testAgent_unloadServices(t *testing.T, extraHCL string) { 3036 t.Helper() 3037 3038 a := NewTestAgent(t, extraHCL) 3039 defer a.Shutdown() 3040 3041 svc := &structs.NodeService{ 3042 ID: "redis", 3043 Service: "redis", 3044 Tags: []string{"foo"}, 3045 Port: 8000, 3046 } 3047 3048 // Register the service 3049 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 3050 t.Fatalf("err: %v", err) 3051 } 3052 3053 requireServiceExists(t, a, svc.ID) 3054 3055 // Unload all services 3056 if err := a.unloadServices(); err != nil { 3057 t.Fatalf("err: %s", err) 3058 } 3059 if len(a.State.Services(structs.WildcardEnterpriseMeta())) != 0 { 3060 t.Fatalf("should have unloaded services") 3061 } 3062} 3063 3064func TestAgent_Service_MaintenanceMode(t *testing.T) { 3065 if testing.Short() { 3066 t.Skip("too slow for testing.Short") 3067 } 3068 3069 t.Parallel() 3070 a := NewTestAgent(t, "") 3071 defer a.Shutdown() 3072 3073 svc := &structs.NodeService{ 3074 ID: "redis", 3075 Service: "redis", 3076 Tags: []string{"foo"}, 3077 Port: 8000, 3078 } 3079 3080 // Register the service 3081 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 3082 t.Fatalf("err: %v", err) 3083 } 3084 3085 sid := structs.NewServiceID("redis", nil) 3086 // Enter maintenance mode for the service 3087 if err := a.EnableServiceMaintenance(sid, "broken", "mytoken"); err != nil { 3088 t.Fatalf("err: %s", err) 3089 } 3090 3091 // Make sure the critical health check was added 3092 checkID := serviceMaintCheckID(sid) 3093 check := a.State.Check(checkID) 3094 if check == nil { 3095 t.Fatalf("should have registered critical maintenance check") 3096 } 3097 3098 // Check that the token was used to register the check 3099 if token := a.State.CheckToken(checkID); token != "mytoken" { 3100 t.Fatalf("expected 'mytoken', got: '%s'", token) 3101 } 3102 3103 // Ensure the reason was set in notes 3104 if check.Notes != "broken" { 3105 t.Fatalf("bad: %#v", check) 3106 } 3107 3108 // Leave maintenance mode 3109 if err := a.DisableServiceMaintenance(sid); err != nil { 3110 t.Fatalf("err: %s", err) 3111 } 3112 3113 // Ensure the check was deregistered 3114 3115 if found := a.State.Check(checkID); found != nil { 3116 t.Fatalf("should have deregistered maintenance check") 3117 } 3118 3119 // Enter service maintenance mode without providing a reason 3120 if err := a.EnableServiceMaintenance(sid, "", ""); err != nil { 3121 t.Fatalf("err: %s", err) 3122 } 3123 3124 // Ensure the check was registered with the default notes 3125 check = a.State.Check(checkID) 3126 if check == nil { 3127 t.Fatalf("should have registered critical check") 3128 } 3129 if check.Notes != defaultServiceMaintReason { 3130 t.Fatalf("bad: %#v", check) 3131 } 3132} 3133 3134func TestAgent_Service_Reap(t *testing.T) { 3135 if testing.Short() { 3136 t.Skip("too slow for testing.Short") 3137 } 3138 3139 // t.Parallel() // timing test. no parallel 3140 a := StartTestAgent(t, TestAgent{Overrides: ` 3141 check_reap_interval = "50ms" 3142 check_deregister_interval_min = "0s" 3143 `}) 3144 defer a.Shutdown() 3145 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 3146 3147 svc := &structs.NodeService{ 3148 ID: "redis", 3149 Service: "redis", 3150 Tags: []string{"foo"}, 3151 Port: 8000, 3152 } 3153 chkTypes := []*structs.CheckType{ 3154 { 3155 Status: api.HealthPassing, 3156 TTL: 25 * time.Millisecond, 3157 DeregisterCriticalServiceAfter: 200 * time.Millisecond, 3158 }, 3159 } 3160 3161 // Register the service. 3162 if err := a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal); err != nil { 3163 t.Fatalf("err: %v", err) 3164 } 3165 3166 // Make sure it's there and there's no critical check yet. 3167 requireServiceExists(t, a, "redis") 3168 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks") 3169 3170 // Wait for the check TTL to fail but before the check is reaped. 3171 time.Sleep(100 * time.Millisecond) 3172 requireServiceExists(t, a, "redis") 3173 require.Len(t, a.State.CriticalCheckStates(nil), 1, "should have 1 critical check") 3174 3175 // Pass the TTL. 3176 if err := a.updateTTLCheck(structs.NewCheckID("service:redis", nil), api.HealthPassing, "foo"); err != nil { 3177 t.Fatalf("err: %v", err) 3178 } 3179 requireServiceExists(t, a, "redis") 3180 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks") 3181 3182 // Wait for the check TTL to fail again. 3183 time.Sleep(100 * time.Millisecond) 3184 requireServiceExists(t, a, "redis") 3185 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1, "should have 1 critical check") 3186 3187 // Wait for the reap. 3188 time.Sleep(400 * time.Millisecond) 3189 requireServiceMissing(t, a, "redis") 3190 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks") 3191} 3192 3193func TestAgent_Service_NoReap(t *testing.T) { 3194 if testing.Short() { 3195 t.Skip("too slow for testing.Short") 3196 } 3197 3198 // t.Parallel() // timing test. no parallel 3199 a := StartTestAgent(t, TestAgent{Overrides: ` 3200 check_reap_interval = "50ms" 3201 check_deregister_interval_min = "0s" 3202 `}) 3203 defer a.Shutdown() 3204 3205 svc := &structs.NodeService{ 3206 ID: "redis", 3207 Service: "redis", 3208 Tags: []string{"foo"}, 3209 Port: 8000, 3210 } 3211 chkTypes := []*structs.CheckType{ 3212 { 3213 Status: api.HealthPassing, 3214 TTL: 25 * time.Millisecond, 3215 }, 3216 } 3217 3218 // Register the service. 3219 if err := a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal); err != nil { 3220 t.Fatalf("err: %v", err) 3221 } 3222 3223 // Make sure it's there and there's no critical check yet. 3224 requireServiceExists(t, a, "redis") 3225 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0) 3226 3227 // Wait for the check TTL to fail. 3228 time.Sleep(200 * time.Millisecond) 3229 requireServiceExists(t, a, "redis") 3230 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1) 3231 3232 // Wait a while and make sure it doesn't reap. 3233 time.Sleep(200 * time.Millisecond) 3234 requireServiceExists(t, a, "redis") 3235 require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1) 3236} 3237 3238func TestAgent_AddService_restoresSnapshot(t *testing.T) { 3239 if testing.Short() { 3240 t.Skip("too slow for testing.Short") 3241 } 3242 3243 t.Run("normal", func(t *testing.T) { 3244 t.Parallel() 3245 testAgent_AddService_restoresSnapshot(t, "enable_central_service_config = false") 3246 }) 3247 t.Run("service manager", func(t *testing.T) { 3248 t.Parallel() 3249 testAgent_AddService_restoresSnapshot(t, "enable_central_service_config = true") 3250 }) 3251} 3252 3253func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) { 3254 a := NewTestAgent(t, extraHCL) 3255 defer a.Shutdown() 3256 3257 // First register a service 3258 svc := &structs.NodeService{ 3259 ID: "redis", 3260 Service: "redis", 3261 Tags: []string{"foo"}, 3262 Port: 8000, 3263 } 3264 require.NoError(t, a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal)) 3265 3266 // Register a check 3267 check1 := &structs.HealthCheck{ 3268 Node: a.Config.NodeName, 3269 CheckID: "service:redis", 3270 Name: "redischeck", 3271 Status: api.HealthPassing, 3272 ServiceID: "redis", 3273 ServiceName: "redis", 3274 } 3275 require.NoError(t, a.AddCheck(check1, nil, false, "", ConfigSourceLocal)) 3276 3277 // Re-registering the service preserves the state of the check 3278 chkTypes := []*structs.CheckType{{TTL: 30 * time.Second}} 3279 require.NoError(t, a.addServiceFromSource(svc, chkTypes, false, "", ConfigSourceLocal)) 3280 check := requireCheckExists(t, a, "service:redis") 3281 require.Equal(t, api.HealthPassing, check.Status) 3282} 3283 3284func TestAgent_AddCheck_restoresSnapshot(t *testing.T) { 3285 if testing.Short() { 3286 t.Skip("too slow for testing.Short") 3287 } 3288 3289 t.Parallel() 3290 a := NewTestAgent(t, "") 3291 defer a.Shutdown() 3292 3293 // First register a service 3294 svc := &structs.NodeService{ 3295 ID: "redis", 3296 Service: "redis", 3297 Tags: []string{"foo"}, 3298 Port: 8000, 3299 } 3300 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 3301 t.Fatalf("err: %v", err) 3302 } 3303 3304 // Register a check 3305 check1 := &structs.HealthCheck{ 3306 Node: a.Config.NodeName, 3307 CheckID: "service:redis", 3308 Name: "redischeck", 3309 Status: api.HealthPassing, 3310 ServiceID: "redis", 3311 ServiceName: "redis", 3312 } 3313 if err := a.AddCheck(check1, nil, false, "", ConfigSourceLocal); err != nil { 3314 t.Fatalf("err: %s", err) 3315 } 3316 3317 // Re-registering the check preserves its state 3318 check1.Status = "" 3319 if err := a.AddCheck(check1, &structs.CheckType{TTL: 30 * time.Second}, false, "", ConfigSourceLocal); err != nil { 3320 t.Fatalf("err: %s", err) 3321 } 3322 check := requireCheckExists(t, a, "service:redis") 3323 if check.Status != api.HealthPassing { 3324 t.Fatalf("bad: %s", check.Status) 3325 } 3326} 3327 3328func TestAgent_NodeMaintenanceMode(t *testing.T) { 3329 if testing.Short() { 3330 t.Skip("too slow for testing.Short") 3331 } 3332 3333 t.Parallel() 3334 a := NewTestAgent(t, "") 3335 defer a.Shutdown() 3336 3337 // Enter maintenance mode for the node 3338 a.EnableNodeMaintenance("broken", "mytoken") 3339 3340 // Make sure the critical health check was added 3341 check := requireCheckExists(t, a, structs.NodeMaint) 3342 3343 // Check that the token was used to register the check 3344 if token := a.State.CheckToken(structs.NodeMaintCheckID); token != "mytoken" { 3345 t.Fatalf("expected 'mytoken', got: '%s'", token) 3346 } 3347 3348 // Ensure the reason was set in notes 3349 if check.Notes != "broken" { 3350 t.Fatalf("bad: %#v", check) 3351 } 3352 3353 // Leave maintenance mode 3354 a.DisableNodeMaintenance() 3355 3356 // Ensure the check was deregistered 3357 requireCheckMissing(t, a, structs.NodeMaint) 3358 3359 // Enter maintenance mode without passing a reason 3360 a.EnableNodeMaintenance("", "") 3361 3362 // Make sure the check was registered with the default note 3363 check = requireCheckExists(t, a, structs.NodeMaint) 3364 if check.Notes != defaultNodeMaintReason { 3365 t.Fatalf("bad: %#v", check) 3366 } 3367} 3368 3369func TestAgent_checkStateSnapshot(t *testing.T) { 3370 if testing.Short() { 3371 t.Skip("too slow for testing.Short") 3372 } 3373 3374 t.Parallel() 3375 a := NewTestAgent(t, "") 3376 defer a.Shutdown() 3377 3378 // First register a service 3379 svc := &structs.NodeService{ 3380 ID: "redis", 3381 Service: "redis", 3382 Tags: []string{"foo"}, 3383 Port: 8000, 3384 } 3385 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 3386 t.Fatalf("err: %v", err) 3387 } 3388 3389 // Register a check 3390 check1 := &structs.HealthCheck{ 3391 Node: a.Config.NodeName, 3392 CheckID: "service:redis", 3393 Name: "redischeck", 3394 Status: api.HealthPassing, 3395 ServiceID: "redis", 3396 ServiceName: "redis", 3397 } 3398 if err := a.AddCheck(check1, nil, true, "", ConfigSourceLocal); err != nil { 3399 t.Fatalf("err: %s", err) 3400 } 3401 3402 // Snapshot the state 3403 snap := a.snapshotCheckState() 3404 3405 // Unload all of the checks 3406 if err := a.unloadChecks(); err != nil { 3407 t.Fatalf("err: %s", err) 3408 } 3409 3410 // Reload the checks and restore the snapshot. 3411 if err := a.loadChecks(a.Config, snap); err != nil { 3412 t.Fatalf("err: %s", err) 3413 } 3414 3415 // Search for the check 3416 out := requireCheckExists(t, a, check1.CheckID) 3417 3418 // Make sure state was restored 3419 if out.Status != api.HealthPassing { 3420 t.Fatalf("should have restored check state") 3421 } 3422} 3423 3424func TestAgent_loadChecks_checkFails(t *testing.T) { 3425 if testing.Short() { 3426 t.Skip("too slow for testing.Short") 3427 } 3428 3429 t.Parallel() 3430 a := NewTestAgent(t, "") 3431 defer a.Shutdown() 3432 3433 // Persist a health check with an invalid service ID 3434 check := &structs.HealthCheck{ 3435 Node: a.Config.NodeName, 3436 CheckID: "service:redis", 3437 Name: "redischeck", 3438 Status: api.HealthPassing, 3439 ServiceID: "nope", 3440 } 3441 if err := a.persistCheck(check, nil, ConfigSourceLocal); err != nil { 3442 t.Fatalf("err: %s", err) 3443 } 3444 3445 // Check to make sure the check was persisted 3446 checkHash := checkIDHash(check.CheckID) 3447 checkPath := filepath.Join(a.Config.DataDir, checksDir, checkHash) 3448 if _, err := os.Stat(checkPath); err != nil { 3449 t.Fatalf("err: %s", err) 3450 } 3451 3452 // Try loading the checks from the persisted files 3453 if err := a.loadChecks(a.Config, nil); err != nil { 3454 t.Fatalf("err: %s", err) 3455 } 3456 3457 // Ensure the erroneous check was purged 3458 if _, err := os.Stat(checkPath); err == nil { 3459 t.Fatalf("should have purged check") 3460 } 3461} 3462 3463func TestAgent_persistCheckState(t *testing.T) { 3464 if testing.Short() { 3465 t.Skip("too slow for testing.Short") 3466 } 3467 3468 t.Parallel() 3469 a := NewTestAgent(t, "") 3470 defer a.Shutdown() 3471 3472 cid := structs.NewCheckID("check1", nil) 3473 // Create the TTL check to persist 3474 check := &checks.CheckTTL{ 3475 CheckID: cid, 3476 TTL: 10 * time.Minute, 3477 } 3478 3479 // Persist some check state for the check 3480 err := a.persistCheckState(check, api.HealthCritical, "nope") 3481 if err != nil { 3482 t.Fatalf("err: %s", err) 3483 } 3484 3485 // Check the persisted file exists and has the content 3486 file := filepath.Join(a.Config.DataDir, checkStateDir, cid.StringHash()) 3487 buf, err := ioutil.ReadFile(file) 3488 if err != nil { 3489 t.Fatalf("err: %s", err) 3490 } 3491 3492 // Decode the state 3493 var p persistedCheckState 3494 if err := json.Unmarshal(buf, &p); err != nil { 3495 t.Fatalf("err: %s", err) 3496 } 3497 3498 // Check the fields 3499 if p.CheckID != cid.ID { 3500 t.Fatalf("bad: %#v", p) 3501 } 3502 if p.Output != "nope" { 3503 t.Fatalf("bad: %#v", p) 3504 } 3505 if p.Status != api.HealthCritical { 3506 t.Fatalf("bad: %#v", p) 3507 } 3508 3509 // Check the expiration time was set 3510 if p.Expires < time.Now().Unix() { 3511 t.Fatalf("bad: %#v", p) 3512 } 3513} 3514 3515func TestAgent_loadCheckState(t *testing.T) { 3516 if testing.Short() { 3517 t.Skip("too slow for testing.Short") 3518 } 3519 3520 t.Parallel() 3521 a := NewTestAgent(t, "") 3522 defer a.Shutdown() 3523 3524 // Create a check whose state will expire immediately 3525 check := &checks.CheckTTL{ 3526 CheckID: structs.NewCheckID("check1", nil), 3527 TTL: 0, 3528 } 3529 3530 // Persist the check state 3531 err := a.persistCheckState(check, api.HealthPassing, "yup") 3532 if err != nil { 3533 t.Fatalf("err: %s", err) 3534 } 3535 3536 // Try to load the state 3537 health := &structs.HealthCheck{ 3538 CheckID: "check1", 3539 Status: api.HealthCritical, 3540 } 3541 if err := a.loadCheckState(health); err != nil { 3542 t.Fatalf("err: %s", err) 3543 } 3544 3545 // Should not have restored the status due to expiration 3546 if health.Status != api.HealthCritical { 3547 t.Fatalf("bad: %#v", health) 3548 } 3549 if health.Output != "" { 3550 t.Fatalf("bad: %#v", health) 3551 } 3552 3553 // Should have purged the state 3554 file := filepath.Join(a.Config.DataDir, checksDir, stringHash("check1")) 3555 if _, err := os.Stat(file); !os.IsNotExist(err) { 3556 t.Fatalf("should have purged state") 3557 } 3558 3559 // Set a TTL which will not expire before we check it 3560 check.TTL = time.Minute 3561 err = a.persistCheckState(check, api.HealthPassing, "yup") 3562 if err != nil { 3563 t.Fatalf("err: %s", err) 3564 } 3565 3566 // Try to load 3567 if err := a.loadCheckState(health); err != nil { 3568 t.Fatalf("err: %s", err) 3569 } 3570 3571 // Should have restored 3572 if health.Status != api.HealthPassing { 3573 t.Fatalf("bad: %#v", health) 3574 } 3575 if health.Output != "yup" { 3576 t.Fatalf("bad: %#v", health) 3577 } 3578} 3579 3580func TestAgent_purgeCheckState(t *testing.T) { 3581 if testing.Short() { 3582 t.Skip("too slow for testing.Short") 3583 } 3584 3585 t.Parallel() 3586 a := NewTestAgent(t, "") 3587 defer a.Shutdown() 3588 3589 cid := structs.NewCheckID("check1", nil) 3590 // No error if the state does not exist 3591 if err := a.purgeCheckState(cid); err != nil { 3592 t.Fatalf("err: %s", err) 3593 } 3594 3595 // Persist some state to the data dir 3596 check := &checks.CheckTTL{ 3597 CheckID: cid, 3598 TTL: time.Minute, 3599 } 3600 err := a.persistCheckState(check, api.HealthPassing, "yup") 3601 if err != nil { 3602 t.Fatalf("err: %s", err) 3603 } 3604 3605 // Purge the check state 3606 if err := a.purgeCheckState(cid); err != nil { 3607 t.Fatalf("err: %s", err) 3608 } 3609 3610 // Removed the file 3611 file := filepath.Join(a.Config.DataDir, checkStateDir, cid.StringHash()) 3612 if _, err := os.Stat(file); !os.IsNotExist(err) { 3613 t.Fatalf("should have removed file") 3614 } 3615} 3616 3617func TestAgent_GetCoordinate(t *testing.T) { 3618 if testing.Short() { 3619 t.Skip("too slow for testing.Short") 3620 } 3621 3622 a := NewTestAgent(t, ``) 3623 defer a.Shutdown() 3624 3625 coords, err := a.GetLANCoordinate() 3626 require.NoError(t, err) 3627 expected := lib.CoordinateSet{ 3628 "": &coordinate.Coordinate{ 3629 Error: 1.5, 3630 Height: 1e-05, 3631 Vec: []float64{0, 0, 0, 0, 0, 0, 0, 0}, 3632 }, 3633 } 3634 require.Equal(t, expected, coords) 3635} 3636 3637func TestAgent_reloadWatches(t *testing.T) { 3638 if testing.Short() { 3639 t.Skip("too slow for testing.Short") 3640 } 3641 3642 t.Parallel() 3643 a := NewTestAgent(t, "") 3644 defer a.Shutdown() 3645 3646 // Normal watch with http addr set, should succeed 3647 newConf := *a.config 3648 newConf.Watches = []map[string]interface{}{ 3649 { 3650 "type": "key", 3651 "key": "asdf", 3652 "args": []interface{}{"ls"}, 3653 }, 3654 } 3655 if err := a.reloadWatches(&newConf); err != nil { 3656 t.Fatalf("bad: %s", err) 3657 } 3658 3659 // Should fail to reload with connect watches 3660 newConf.Watches = []map[string]interface{}{ 3661 { 3662 "type": "connect_roots", 3663 "key": "asdf", 3664 "args": []interface{}{"ls"}, 3665 }, 3666 } 3667 if err := a.reloadWatches(&newConf); err == nil || !strings.Contains(err.Error(), "not allowed in agent config") { 3668 t.Fatalf("bad: %s", err) 3669 } 3670 3671 // Should still succeed with only HTTPS addresses 3672 newConf.HTTPSAddrs = newConf.HTTPAddrs 3673 newConf.HTTPAddrs = make([]net.Addr, 0) 3674 newConf.Watches = []map[string]interface{}{ 3675 { 3676 "type": "key", 3677 "key": "asdf", 3678 "args": []interface{}{"ls"}, 3679 }, 3680 } 3681 if err := a.reloadWatches(&newConf); err != nil { 3682 t.Fatalf("bad: %s", err) 3683 } 3684 3685 // Should fail to reload with no http or https addrs 3686 newConf.HTTPSAddrs = make([]net.Addr, 0) 3687 newConf.Watches = []map[string]interface{}{ 3688 { 3689 "type": "key", 3690 "key": "asdf", 3691 "args": []interface{}{"ls"}, 3692 }, 3693 } 3694 if err := a.reloadWatches(&newConf); err == nil || !strings.Contains(err.Error(), "watch plans require an HTTP or HTTPS endpoint") { 3695 t.Fatalf("bad: %s", err) 3696 } 3697} 3698 3699func TestAgent_reloadWatchesHTTPS(t *testing.T) { 3700 if testing.Short() { 3701 t.Skip("too slow for testing.Short") 3702 } 3703 3704 t.Parallel() 3705 a := TestAgent{UseTLS: true} 3706 if err := a.Start(t); err != nil { 3707 t.Fatal(err) 3708 } 3709 defer a.Shutdown() 3710 3711 // Normal watch with http addr set, should succeed 3712 newConf := *a.config 3713 newConf.Watches = []map[string]interface{}{ 3714 { 3715 "type": "key", 3716 "key": "asdf", 3717 "args": []interface{}{"ls"}, 3718 }, 3719 } 3720 if err := a.reloadWatches(&newConf); err != nil { 3721 t.Fatalf("bad: %s", err) 3722 } 3723} 3724 3725func TestAgent_SecurityChecks(t *testing.T) { 3726 if testing.Short() { 3727 t.Skip("too slow for testing.Short") 3728 } 3729 3730 t.Parallel() 3731 hcl := ` 3732 enable_script_checks = true 3733 ` 3734 a := &TestAgent{Name: t.Name(), HCL: hcl} 3735 defer a.Shutdown() 3736 3737 data := make([]byte, 0, 8192) 3738 bytesBuffer := bytes.NewBuffer(data) 3739 a.LogOutput = bytesBuffer 3740 assert.NoError(t, a.Start(t)) 3741 assert.Contains(t, bytesBuffer.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS") 3742} 3743 3744func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) { 3745 if testing.Short() { 3746 t.Skip("too slow for testing.Short") 3747 } 3748 3749 t.Parallel() 3750 dataDir := testutil.TempDir(t, "agent") // we manage the data dir 3751 hcl := ` 3752 data_dir = "` + dataDir + `" 3753 verify_outgoing = true 3754 ca_file = "../test/ca/root.cer" 3755 cert_file = "../test/key/ourdomain.cer" 3756 key_file = "../test/key/ourdomain.key" 3757 verify_server_hostname = false 3758 ` 3759 a := NewTestAgent(t, hcl) 3760 defer a.Shutdown() 3761 tlsConf := a.tlsConfigurator.OutgoingRPCConfig() 3762 require.True(t, tlsConf.InsecureSkipVerify) 3763 require.Len(t, tlsConf.ClientCAs.Subjects(), 1) 3764 require.Len(t, tlsConf.RootCAs.Subjects(), 1) 3765 3766 hcl = ` 3767 data_dir = "` + dataDir + `" 3768 verify_outgoing = true 3769 ca_path = "../test/ca_path" 3770 cert_file = "../test/key/ourdomain.cer" 3771 key_file = "../test/key/ourdomain.key" 3772 verify_server_hostname = true 3773 ` 3774 c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl}) 3775 require.NoError(t, a.reloadConfigInternal(c)) 3776 tlsConf = a.tlsConfigurator.OutgoingRPCConfig() 3777 require.False(t, tlsConf.InsecureSkipVerify) 3778 require.Len(t, tlsConf.RootCAs.Subjects(), 2) 3779 require.Len(t, tlsConf.ClientCAs.Subjects(), 2) 3780} 3781 3782func TestAgent_ReloadConfigAndKeepChecksStatus(t *testing.T) { 3783 if testing.Short() { 3784 t.Skip("too slow for testing.Short") 3785 } 3786 3787 t.Run("normal", func(t *testing.T) { 3788 t.Parallel() 3789 testAgent_ReloadConfigAndKeepChecksStatus(t, "enable_central_service_config = false") 3790 }) 3791 t.Run("service manager", func(t *testing.T) { 3792 t.Parallel() 3793 testAgent_ReloadConfigAndKeepChecksStatus(t, "enable_central_service_config = true") 3794 }) 3795} 3796 3797func testAgent_ReloadConfigAndKeepChecksStatus(t *testing.T, extraHCL string) { 3798 dataDir := testutil.TempDir(t, "agent") // we manage the data dir 3799 hcl := `data_dir = "` + dataDir + `" 3800 enable_local_script_checks=true 3801 services=[{ 3802 name="webserver1", 3803 check{id="check1", ttl="30s"} 3804 }] ` + extraHCL 3805 a := NewTestAgent(t, hcl) 3806 defer a.Shutdown() 3807 3808 require.NoError(t, a.updateTTLCheck(structs.NewCheckID("check1", nil), api.HealthPassing, "testing agent reload")) 3809 3810 // Make sure check is passing before we reload. 3811 gotChecks := a.State.Checks(nil) 3812 require.Equal(t, 1, len(gotChecks), "Should have a check registered, but had %#v", gotChecks) 3813 for id, check := range gotChecks { 3814 require.Equal(t, "passing", check.Status, "check %q is wrong", id) 3815 } 3816 3817 c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl}) 3818 require.NoError(t, a.reloadConfigInternal(c)) 3819 3820 // After reload, should be passing directly (no critical state) 3821 for id, check := range a.State.Checks(nil) { 3822 require.Equal(t, "passing", check.Status, "check %q is wrong", id) 3823 } 3824} 3825 3826func TestAgent_ReloadConfigIncomingRPCConfig(t *testing.T) { 3827 if testing.Short() { 3828 t.Skip("too slow for testing.Short") 3829 } 3830 3831 t.Parallel() 3832 dataDir := testutil.TempDir(t, "agent") // we manage the data dir 3833 hcl := ` 3834 data_dir = "` + dataDir + `" 3835 verify_outgoing = true 3836 ca_file = "../test/ca/root.cer" 3837 cert_file = "../test/key/ourdomain.cer" 3838 key_file = "../test/key/ourdomain.key" 3839 verify_server_hostname = false 3840 ` 3841 a := NewTestAgent(t, hcl) 3842 defer a.Shutdown() 3843 tlsConf := a.tlsConfigurator.IncomingRPCConfig() 3844 require.NotNil(t, tlsConf.GetConfigForClient) 3845 tlsConf, err := tlsConf.GetConfigForClient(nil) 3846 require.NoError(t, err) 3847 require.NotNil(t, tlsConf) 3848 require.True(t, tlsConf.InsecureSkipVerify) 3849 require.Len(t, tlsConf.ClientCAs.Subjects(), 1) 3850 require.Len(t, tlsConf.RootCAs.Subjects(), 1) 3851 3852 hcl = ` 3853 data_dir = "` + dataDir + `" 3854 verify_outgoing = true 3855 ca_path = "../test/ca_path" 3856 cert_file = "../test/key/ourdomain.cer" 3857 key_file = "../test/key/ourdomain.key" 3858 verify_server_hostname = true 3859 ` 3860 c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl}) 3861 require.NoError(t, a.reloadConfigInternal(c)) 3862 tlsConf, err = tlsConf.GetConfigForClient(nil) 3863 require.NoError(t, err) 3864 require.False(t, tlsConf.InsecureSkipVerify) 3865 require.Len(t, tlsConf.ClientCAs.Subjects(), 2) 3866 require.Len(t, tlsConf.RootCAs.Subjects(), 2) 3867} 3868 3869func TestAgent_ReloadConfigTLSConfigFailure(t *testing.T) { 3870 if testing.Short() { 3871 t.Skip("too slow for testing.Short") 3872 } 3873 3874 t.Parallel() 3875 dataDir := testutil.TempDir(t, "agent") // we manage the data dir 3876 hcl := ` 3877 data_dir = "` + dataDir + `" 3878 verify_outgoing = true 3879 ca_file = "../test/ca/root.cer" 3880 cert_file = "../test/key/ourdomain.cer" 3881 key_file = "../test/key/ourdomain.key" 3882 verify_server_hostname = false 3883 ` 3884 a := NewTestAgent(t, hcl) 3885 defer a.Shutdown() 3886 tlsConf := a.tlsConfigurator.IncomingRPCConfig() 3887 3888 hcl = ` 3889 data_dir = "` + dataDir + `" 3890 verify_incoming = true 3891 ` 3892 c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl}) 3893 require.Error(t, a.reloadConfigInternal(c)) 3894 tlsConf, err := tlsConf.GetConfigForClient(nil) 3895 require.NoError(t, err) 3896 require.Equal(t, tls.NoClientCert, tlsConf.ClientAuth) 3897 require.Len(t, tlsConf.ClientCAs.Subjects(), 1) 3898 require.Len(t, tlsConf.RootCAs.Subjects(), 1) 3899} 3900 3901func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) { 3902 if testing.Short() { 3903 t.Skip("too slow for testing.Short") 3904 } 3905 3906 t.Parallel() 3907 dataDir := testutil.TempDir(t, "agent") // we manage the data dir 3908 hcl := ` 3909 data_dir = "` + dataDir + `" 3910 verify_incoming = true 3911 ca_file = "../test/ca/root.cer" 3912 cert_file = "../test/key/ourdomain.cer" 3913 key_file = "../test/key/ourdomain.key" 3914 auto_encrypt { allow_tls = true } 3915 ` 3916 a := NewTestAgent(t, hcl) 3917 defer a.Shutdown() 3918 require.True(t, a.consulConfig().AutoEncryptAllowTLS) 3919} 3920 3921func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) { 3922 if testing.Short() { 3923 t.Skip("too slow for testing.Short") 3924 } 3925 3926 t.Parallel() 3927 hcl := ` 3928 raft_trailing_logs = 812345 3929 ` 3930 a := NewTestAgent(t, hcl) 3931 defer a.Shutdown() 3932 require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs) 3933} 3934 3935func TestAgent_grpcInjectAddr(t *testing.T) { 3936 tt := []struct { 3937 name string 3938 grpc string 3939 ip string 3940 port int 3941 want string 3942 }{ 3943 { 3944 name: "localhost web svc", 3945 grpc: "localhost:8080/web", 3946 ip: "192.168.0.0", 3947 port: 9090, 3948 want: "192.168.0.0:9090/web", 3949 }, 3950 { 3951 name: "localhost no svc", 3952 grpc: "localhost:8080", 3953 ip: "192.168.0.0", 3954 port: 9090, 3955 want: "192.168.0.0:9090", 3956 }, 3957 { 3958 name: "ipv4 web svc", 3959 grpc: "127.0.0.1:8080/web", 3960 ip: "192.168.0.0", 3961 port: 9090, 3962 want: "192.168.0.0:9090/web", 3963 }, 3964 { 3965 name: "ipv4 no svc", 3966 grpc: "127.0.0.1:8080", 3967 ip: "192.168.0.0", 3968 port: 9090, 3969 want: "192.168.0.0:9090", 3970 }, 3971 { 3972 name: "ipv6 no svc", 3973 grpc: "2001:db8:1f70::999:de8:7648:6e8:5000", 3974 ip: "192.168.0.0", 3975 port: 9090, 3976 want: "192.168.0.0:9090", 3977 }, 3978 { 3979 name: "ipv6 web svc", 3980 grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web", 3981 ip: "192.168.0.0", 3982 port: 9090, 3983 want: "192.168.0.0:9090/web", 3984 }, 3985 { 3986 name: "zone ipv6 web svc", 3987 grpc: "::FFFF:C0A8:1%1:5000/web", 3988 ip: "192.168.0.0", 3989 port: 9090, 3990 want: "192.168.0.0:9090/web", 3991 }, 3992 { 3993 name: "ipv6 literal web svc", 3994 grpc: "::FFFF:192.168.0.1:5000/web", 3995 ip: "192.168.0.0", 3996 port: 9090, 3997 want: "192.168.0.0:9090/web", 3998 }, 3999 { 4000 name: "ipv6 injected into ipv6 url", 4001 grpc: "2001:db8:1f70::999:de8:7648:6e8:5000", 4002 ip: "::FFFF:C0A8:1", 4003 port: 9090, 4004 want: "::FFFF:C0A8:1:9090", 4005 }, 4006 { 4007 name: "ipv6 injected into ipv6 url with svc", 4008 grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web", 4009 ip: "::FFFF:C0A8:1", 4010 port: 9090, 4011 want: "::FFFF:C0A8:1:9090/web", 4012 }, 4013 { 4014 name: "ipv6 injected into ipv6 url with special", 4015 grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/service-$name:with@special:Chars", 4016 ip: "::FFFF:C0A8:1", 4017 port: 9090, 4018 want: "::FFFF:C0A8:1:9090/service-$name:with@special:Chars", 4019 }, 4020 } 4021 for _, tt := range tt { 4022 t.Run(tt.name, func(t *testing.T) { 4023 got := grpcInjectAddr(tt.grpc, tt.ip, tt.port) 4024 if got != tt.want { 4025 t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want) 4026 } 4027 }) 4028 } 4029} 4030 4031func TestAgent_httpInjectAddr(t *testing.T) { 4032 tt := []struct { 4033 name string 4034 url string 4035 ip string 4036 port int 4037 want string 4038 }{ 4039 { 4040 name: "localhost health", 4041 url: "http://localhost:8080/health", 4042 ip: "192.168.0.0", 4043 port: 9090, 4044 want: "http://192.168.0.0:9090/health", 4045 }, 4046 { 4047 name: "https localhost health", 4048 url: "https://localhost:8080/health", 4049 ip: "192.168.0.0", 4050 port: 9090, 4051 want: "https://192.168.0.0:9090/health", 4052 }, 4053 { 4054 name: "https ipv4 health", 4055 url: "https://127.0.0.1:8080/health", 4056 ip: "192.168.0.0", 4057 port: 9090, 4058 want: "https://192.168.0.0:9090/health", 4059 }, 4060 { 4061 name: "https ipv4 without path", 4062 url: "https://127.0.0.1:8080", 4063 ip: "192.168.0.0", 4064 port: 9090, 4065 want: "https://192.168.0.0:9090", 4066 }, 4067 { 4068 name: "https ipv6 health", 4069 url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000/health", 4070 ip: "192.168.0.0", 4071 port: 9090, 4072 want: "https://192.168.0.0:9090/health", 4073 }, 4074 { 4075 name: "https ipv6 with zone", 4076 url: "https://[::FFFF:C0A8:1%1]:5000/health", 4077 ip: "192.168.0.0", 4078 port: 9090, 4079 want: "https://192.168.0.0:9090/health", 4080 }, 4081 { 4082 name: "https ipv6 literal", 4083 url: "https://[::FFFF:192.168.0.1]:5000/health", 4084 ip: "192.168.0.0", 4085 port: 9090, 4086 want: "https://192.168.0.0:9090/health", 4087 }, 4088 { 4089 name: "https ipv6 without path", 4090 url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000", 4091 ip: "192.168.0.0", 4092 port: 9090, 4093 want: "https://192.168.0.0:9090", 4094 }, 4095 { 4096 name: "ipv6 injected into ipv6 url", 4097 url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000", 4098 ip: "::FFFF:C0A8:1", 4099 port: 9090, 4100 want: "https://[::FFFF:C0A8:1]:9090", 4101 }, 4102 { 4103 name: "ipv6 with brackets injected into ipv6 url", 4104 url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000", 4105 ip: "[::FFFF:C0A8:1]", 4106 port: 9090, 4107 want: "https://[::FFFF:C0A8:1]:9090", 4108 }, 4109 { 4110 name: "short domain health", 4111 url: "http://i.co:8080/health", 4112 ip: "192.168.0.0", 4113 port: 9090, 4114 want: "http://192.168.0.0:9090/health", 4115 }, 4116 { 4117 name: "nested url in query", 4118 url: "http://my.corp.com:8080/health?from=http://google.com:8080", 4119 ip: "192.168.0.0", 4120 port: 9090, 4121 want: "http://192.168.0.0:9090/health?from=http://google.com:8080", 4122 }, 4123 } 4124 for _, tt := range tt { 4125 t.Run(tt.name, func(t *testing.T) { 4126 got := httpInjectAddr(tt.url, tt.ip, tt.port) 4127 if got != tt.want { 4128 t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want) 4129 } 4130 }) 4131 } 4132} 4133 4134func TestDefaultIfEmpty(t *testing.T) { 4135 require.Equal(t, "", defaultIfEmpty("", "")) 4136 require.Equal(t, "foo", defaultIfEmpty("", "foo")) 4137 require.Equal(t, "bar", defaultIfEmpty("bar", "foo")) 4138 require.Equal(t, "bar", defaultIfEmpty("bar", "")) 4139} 4140 4141func TestConfigSourceFromName(t *testing.T) { 4142 cases := []struct { 4143 in string 4144 expect configSource 4145 bad bool 4146 }{ 4147 {in: "local", expect: ConfigSourceLocal}, 4148 {in: "remote", expect: ConfigSourceRemote}, 4149 {in: "", expect: ConfigSourceLocal}, 4150 {in: "LOCAL", bad: true}, 4151 {in: "REMOTE", bad: true}, 4152 {in: "garbage", bad: true}, 4153 {in: " ", bad: true}, 4154 } 4155 4156 for _, tc := range cases { 4157 tc := tc 4158 t.Run(tc.in, func(t *testing.T) { 4159 got, ok := ConfigSourceFromName(tc.in) 4160 if tc.bad { 4161 require.False(t, ok) 4162 require.Empty(t, got) 4163 } else { 4164 require.True(t, ok) 4165 require.Equal(t, tc.expect, got) 4166 } 4167 }) 4168 } 4169} 4170 4171func TestAgent_RerouteExistingHTTPChecks(t *testing.T) { 4172 if testing.Short() { 4173 t.Skip("too slow for testing.Short") 4174 } 4175 4176 t.Parallel() 4177 4178 a := NewTestAgent(t, "") 4179 defer a.Shutdown() 4180 4181 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 4182 4183 // Register a service without a ProxyAddr 4184 svc := &structs.NodeService{ 4185 ID: "web", 4186 Service: "web", 4187 Address: "localhost", 4188 Port: 8080, 4189 } 4190 chks := []*structs.CheckType{ 4191 { 4192 CheckID: "http", 4193 HTTP: "http://localhost:8080/mypath?query", 4194 Interval: 20 * time.Millisecond, 4195 TLSSkipVerify: true, 4196 }, 4197 { 4198 CheckID: "grpc", 4199 GRPC: "localhost:8080/myservice", 4200 Interval: 20 * time.Millisecond, 4201 TLSSkipVerify: true, 4202 }, 4203 } 4204 if err := a.addServiceFromSource(svc, chks, false, "", ConfigSourceLocal); err != nil { 4205 t.Fatalf("failed to add svc: %v", err) 4206 } 4207 4208 // Register a proxy and expose HTTP checks. 4209 // This should trigger setting ProxyHTTP and ProxyGRPC in the checks. 4210 proxy := &structs.NodeService{ 4211 Kind: "connect-proxy", 4212 ID: "web-proxy", 4213 Service: "web-proxy", 4214 Address: "localhost", 4215 Port: 21500, 4216 Proxy: structs.ConnectProxyConfig{ 4217 DestinationServiceName: "web", 4218 DestinationServiceID: "web", 4219 LocalServiceAddress: "localhost", 4220 LocalServicePort: 8080, 4221 MeshGateway: structs.MeshGatewayConfig{}, 4222 Expose: structs.ExposeConfig{ 4223 Checks: true, 4224 }, 4225 }, 4226 } 4227 if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil { 4228 t.Fatalf("failed to add svc: %v", err) 4229 } 4230 4231 retry.Run(t, func(r *retry.R) { 4232 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4233 require.Equal(r, chks[0].ProxyHTTP, "http://localhost:21500/mypath?query") 4234 }) 4235 4236 retry.Run(t, func(r *retry.R) { 4237 hc := a.State.Check(structs.NewCheckID("http", nil)) 4238 require.Equal(r, hc.ExposedPort, 21500) 4239 }) 4240 4241 retry.Run(t, func(r *retry.R) { 4242 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4243 4244 // GRPC check will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks. 4245 // Note that this relies on listener ports auto-incrementing in a.listenerPortLocked. 4246 require.Equal(r, chks[1].ProxyGRPC, "localhost:21501/myservice") 4247 }) 4248 4249 retry.Run(t, func(r *retry.R) { 4250 hc := a.State.Check(structs.NewCheckID("grpc", nil)) 4251 require.Equal(r, hc.ExposedPort, 21501) 4252 }) 4253 4254 // Re-register a proxy and disable exposing HTTP checks. 4255 // This should trigger resetting ProxyHTTP and ProxyGRPC to empty strings 4256 // and reset saved exposed ports in the agent's state. 4257 proxy = &structs.NodeService{ 4258 Kind: "connect-proxy", 4259 ID: "web-proxy", 4260 Service: "web-proxy", 4261 Address: "localhost", 4262 Port: 21500, 4263 Proxy: structs.ConnectProxyConfig{ 4264 DestinationServiceName: "web", 4265 DestinationServiceID: "web", 4266 LocalServiceAddress: "localhost", 4267 LocalServicePort: 8080, 4268 MeshGateway: structs.MeshGatewayConfig{}, 4269 Expose: structs.ExposeConfig{ 4270 Checks: false, 4271 }, 4272 }, 4273 } 4274 if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil { 4275 t.Fatalf("failed to add svc: %v", err) 4276 } 4277 4278 retry.Run(t, func(r *retry.R) { 4279 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4280 require.Empty(r, chks[0].ProxyHTTP, "ProxyHTTP addr was not reset") 4281 }) 4282 4283 retry.Run(t, func(r *retry.R) { 4284 hc := a.State.Check(structs.NewCheckID("http", nil)) 4285 require.Equal(r, hc.ExposedPort, 0) 4286 }) 4287 4288 retry.Run(t, func(r *retry.R) { 4289 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4290 4291 // Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks. 4292 require.Empty(r, chks[1].ProxyGRPC, "ProxyGRPC addr was not reset") 4293 }) 4294 4295 retry.Run(t, func(r *retry.R) { 4296 hc := a.State.Check(structs.NewCheckID("grpc", nil)) 4297 require.Equal(r, hc.ExposedPort, 0) 4298 }) 4299} 4300 4301func TestAgent_RerouteNewHTTPChecks(t *testing.T) { 4302 if testing.Short() { 4303 t.Skip("too slow for testing.Short") 4304 } 4305 4306 t.Parallel() 4307 4308 a := NewTestAgent(t, "") 4309 defer a.Shutdown() 4310 4311 testrpc.WaitForTestAgent(t, a.RPC, "dc1") 4312 4313 // Register a service without a ProxyAddr 4314 svc := &structs.NodeService{ 4315 ID: "web", 4316 Service: "web", 4317 Address: "localhost", 4318 Port: 8080, 4319 } 4320 if err := a.addServiceFromSource(svc, nil, false, "", ConfigSourceLocal); err != nil { 4321 t.Fatalf("failed to add svc: %v", err) 4322 } 4323 4324 // Register a proxy and expose HTTP checks 4325 proxy := &structs.NodeService{ 4326 Kind: "connect-proxy", 4327 ID: "web-proxy", 4328 Service: "web-proxy", 4329 Address: "localhost", 4330 Port: 21500, 4331 Proxy: structs.ConnectProxyConfig{ 4332 DestinationServiceName: "web", 4333 DestinationServiceID: "web", 4334 LocalServiceAddress: "localhost", 4335 LocalServicePort: 8080, 4336 MeshGateway: structs.MeshGatewayConfig{}, 4337 Expose: structs.ExposeConfig{ 4338 Checks: true, 4339 }, 4340 }, 4341 } 4342 if err := a.addServiceFromSource(proxy, nil, false, "", ConfigSourceLocal); err != nil { 4343 t.Fatalf("failed to add svc: %v", err) 4344 } 4345 4346 checks := []*structs.HealthCheck{ 4347 { 4348 CheckID: "http", 4349 Name: "http", 4350 ServiceID: "web", 4351 Status: api.HealthCritical, 4352 }, 4353 { 4354 CheckID: "grpc", 4355 Name: "grpc", 4356 ServiceID: "web", 4357 Status: api.HealthCritical, 4358 }, 4359 } 4360 chkTypes := []*structs.CheckType{ 4361 { 4362 CheckID: "http", 4363 HTTP: "http://localhost:8080/mypath?query", 4364 Interval: 20 * time.Millisecond, 4365 TLSSkipVerify: true, 4366 }, 4367 { 4368 CheckID: "grpc", 4369 GRPC: "localhost:8080/myservice", 4370 Interval: 20 * time.Millisecond, 4371 TLSSkipVerify: true, 4372 }, 4373 } 4374 4375 // ProxyGRPC and ProxyHTTP should be set when creating check 4376 // since proxy.expose.checks is enabled on the proxy 4377 if err := a.AddCheck(checks[0], chkTypes[0], false, "", ConfigSourceLocal); err != nil { 4378 t.Fatalf("failed to add check: %v", err) 4379 } 4380 if err := a.AddCheck(checks[1], chkTypes[1], false, "", ConfigSourceLocal); err != nil { 4381 t.Fatalf("failed to add check: %v", err) 4382 } 4383 4384 retry.Run(t, func(r *retry.R) { 4385 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4386 require.Equal(r, chks[0].ProxyHTTP, "http://localhost:21500/mypath?query") 4387 }) 4388 4389 retry.Run(t, func(r *retry.R) { 4390 hc := a.State.Check(structs.NewCheckID("http", nil)) 4391 require.Equal(r, hc.ExposedPort, 21500) 4392 }) 4393 4394 retry.Run(t, func(r *retry.R) { 4395 chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil)) 4396 4397 // GRPC check will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks. 4398 require.Equal(r, chks[1].ProxyGRPC, "localhost:21501/myservice") 4399 }) 4400 4401 retry.Run(t, func(r *retry.R) { 4402 hc := a.State.Check(structs.NewCheckID("grpc", nil)) 4403 require.Equal(r, hc.ExposedPort, 21501) 4404 }) 4405} 4406 4407func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) { 4408 if testing.Short() { 4409 t.Skip("too slow for testing.Short") 4410 } 4411 4412 t.Parallel() 4413 4414 // Ensure that initial failures to fetch the discovery chain via the agent 4415 // cache using the notify API for a service with no config entries 4416 // correctly recovers when those RPCs resume working. The key here is that 4417 // the lack of config entries guarantees that the RPC will come back with a 4418 // synthetic index of 1. 4419 // 4420 // The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the 4421 // index for the next query from 0 to 1 for all queries, when it should 4422 // have not done so for queries that errored. 4423 4424 a1 := StartTestAgent(t, TestAgent{Name: "Agent1"}) 4425 defer a1.Shutdown() 4426 testrpc.WaitForLeader(t, a1.RPC, "dc1") 4427 4428 a2 := StartTestAgent(t, TestAgent{Name: "Agent2", HCL: ` 4429 server = false 4430 bootstrap = false 4431services { 4432 name = "echo-client" 4433 port = 8080 4434 connect { 4435 sidecar_service { 4436 proxy { 4437 upstreams { 4438 destination_name = "echo" 4439 local_bind_port = 9191 4440 } 4441 } 4442 } 4443 } 4444} 4445 4446services { 4447 name = "echo" 4448 port = 9090 4449 connect { 4450 sidecar_service {} 4451 } 4452} 4453 `}) 4454 defer a2.Shutdown() 4455 4456 // Starting a client agent disconnected from a server with services. 4457 ctx, cancel := context.WithCancel(context.Background()) 4458 defer cancel() 4459 4460 ch := make(chan cache.UpdateEvent, 1) 4461 require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ 4462 Datacenter: "dc1", 4463 Name: "echo", 4464 EvaluateInDatacenter: "dc1", 4465 EvaluateInNamespace: "default", 4466 }, "foo", ch)) 4467 4468 { // The first event is an error because we are not joined yet. 4469 evt := <-ch 4470 require.Equal(t, "foo", evt.CorrelationID) 4471 require.Nil(t, evt.Result) 4472 require.Error(t, evt.Err) 4473 require.Equal(t, evt.Err, structs.ErrNoServers) 4474 } 4475 4476 t.Logf("joining client to server") 4477 4478 // Now connect to server 4479 _, err := a1.JoinLAN([]string{ 4480 fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN), 4481 }) 4482 require.NoError(t, err) 4483 4484 t.Logf("joined client to server") 4485 4486 deadlineCh := time.After(10 * time.Second) 4487 start := time.Now() 4488LOOP: 4489 for { 4490 select { 4491 case evt := <-ch: 4492 // We may receive several notifications of an error until we get the 4493 // first successful reply. 4494 require.Equal(t, "foo", evt.CorrelationID) 4495 if evt.Err != nil { 4496 break LOOP 4497 } 4498 require.NoError(t, evt.Err) 4499 require.NotNil(t, evt.Result) 4500 t.Logf("took %s to get first success", time.Since(start)) 4501 case <-deadlineCh: 4502 t.Fatal("did not get notified successfully") 4503 } 4504 } 4505} 4506 4507// This is a mirror of a similar test in agent/consul/server_test.go 4508// 4509// TODO(rb): implement something similar to this as a full containerized test suite with proper 4510// isolation so requests can't "cheat" and bypass the mesh gateways 4511func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) { 4512 if testing.Short() { 4513 t.Skip("too slow for testing.Short") 4514 } 4515 4516 t.Parallel() 4517 4518 gwPort := freeport.MustTake(1) 4519 defer freeport.Return(gwPort) 4520 gwAddr := ipaddr.FormatAddressPort("127.0.0.1", gwPort[0]) 4521 4522 // Due to some ordering, we'll have to manually configure these ports in 4523 // advance. 4524 secondaryRPCPorts := freeport.MustTake(2) 4525 defer freeport.Return(secondaryRPCPorts) 4526 4527 a1 := StartTestAgent(t, TestAgent{Name: "bob", HCL: ` 4528 domain = "consul" 4529 node_name = "bob" 4530 datacenter = "dc1" 4531 primary_datacenter = "dc1" 4532 # tls 4533 ca_file = "../test/hostname/CertAuth.crt" 4534 cert_file = "../test/hostname/Bob.crt" 4535 key_file = "../test/hostname/Bob.key" 4536 verify_incoming = true 4537 verify_outgoing = true 4538 verify_server_hostname = true 4539 # wanfed 4540 connect { 4541 enabled = true 4542 enable_mesh_gateway_wan_federation = true 4543 } 4544 `}) 4545 defer a1.Shutdown() 4546 testrpc.WaitForTestAgent(t, a1.RPC, "dc1") 4547 4548 // We'll use the same gateway for all datacenters since it doesn't care. 4549 var ( 4550 rpcAddr1 = ipaddr.FormatAddressPort("127.0.0.1", a1.Config.ServerPort) 4551 rpcAddr2 = ipaddr.FormatAddressPort("127.0.0.1", secondaryRPCPorts[0]) 4552 rpcAddr3 = ipaddr.FormatAddressPort("127.0.0.1", secondaryRPCPorts[1]) 4553 ) 4554 var p tcpproxy.Proxy 4555 p.AddSNIRoute(gwAddr, "bob.server.dc1.consul", tcpproxy.To(rpcAddr1)) 4556 p.AddSNIRoute(gwAddr, "server.dc1.consul", tcpproxy.To(rpcAddr1)) 4557 p.AddSNIRoute(gwAddr, "betty.server.dc2.consul", tcpproxy.To(rpcAddr2)) 4558 p.AddSNIRoute(gwAddr, "server.dc2.consul", tcpproxy.To(rpcAddr2)) 4559 p.AddSNIRoute(gwAddr, "bonnie.server.dc3.consul", tcpproxy.To(rpcAddr3)) 4560 p.AddSNIRoute(gwAddr, "server.dc3.consul", tcpproxy.To(rpcAddr3)) 4561 p.AddStopACMESearch(gwAddr) 4562 require.NoError(t, p.Start()) 4563 defer func() { 4564 p.Close() 4565 p.Wait() 4566 }() 4567 4568 t.Logf("routing %s => %s", "{bob.,}server.dc1.consul", rpcAddr1) 4569 t.Logf("routing %s => %s", "{betty.,}server.dc2.consul", rpcAddr2) 4570 t.Logf("routing %s => %s", "{bonnie.,}server.dc3.consul", rpcAddr3) 4571 4572 // Register this into the agent in dc1. 4573 { 4574 args := &structs.ServiceDefinition{ 4575 Kind: structs.ServiceKindMeshGateway, 4576 ID: "mesh-gateway", 4577 Name: "mesh-gateway", 4578 Meta: map[string]string{structs.MetaWANFederationKey: "1"}, 4579 Port: gwPort[0], 4580 } 4581 req, err := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args)) 4582 require.NoError(t, err) 4583 4584 obj, err := a1.srv.AgentRegisterService(nil, req) 4585 require.NoError(t, err) 4586 require.Nil(t, obj) 4587 } 4588 4589 waitForFederationState := func(t *testing.T, a *TestAgent, dc string) { 4590 retry.Run(t, func(r *retry.R) { 4591 req, err := http.NewRequest("GET", "/v1/internal/federation-state/"+dc, nil) 4592 require.NoError(r, err) 4593 4594 resp := httptest.NewRecorder() 4595 obj, err := a.srv.FederationStateGet(resp, req) 4596 require.NoError(r, err) 4597 require.NotNil(r, obj) 4598 4599 out, ok := obj.(structs.FederationStateResponse) 4600 require.True(r, ok) 4601 require.NotNil(r, out.State) 4602 require.Len(r, out.State.MeshGateways, 1) 4603 }) 4604 } 4605 4606 // Wait until at least catalog AE and federation state AE fire. 4607 waitForFederationState(t, a1, "dc1") 4608 retry.Run(t, func(r *retry.R) { 4609 require.NotEmpty(r, a1.PickRandomMeshGatewaySuitableForDialing("dc1")) 4610 }) 4611 4612 a2 := StartTestAgent(t, TestAgent{Name: "betty", HCL: ` 4613 domain = "consul" 4614 node_name = "betty" 4615 datacenter = "dc2" 4616 primary_datacenter = "dc1" 4617 # tls 4618 ca_file = "../test/hostname/CertAuth.crt" 4619 cert_file = "../test/hostname/Betty.crt" 4620 key_file = "../test/hostname/Betty.key" 4621 verify_incoming = true 4622 verify_outgoing = true 4623 verify_server_hostname = true 4624 ports { 4625 server = ` + strconv.Itoa(secondaryRPCPorts[0]) + ` 4626 } 4627 # wanfed 4628 primary_gateways = ["` + gwAddr + `"] 4629 retry_interval_wan = "250ms" 4630 connect { 4631 enabled = true 4632 enable_mesh_gateway_wan_federation = true 4633 } 4634 `}) 4635 defer a2.Shutdown() 4636 testrpc.WaitForTestAgent(t, a2.RPC, "dc2") 4637 4638 a3 := StartTestAgent(t, TestAgent{Name: "bonnie", HCL: ` 4639 domain = "consul" 4640 node_name = "bonnie" 4641 datacenter = "dc3" 4642 primary_datacenter = "dc1" 4643 # tls 4644 ca_file = "../test/hostname/CertAuth.crt" 4645 cert_file = "../test/hostname/Bonnie.crt" 4646 key_file = "../test/hostname/Bonnie.key" 4647 verify_incoming = true 4648 verify_outgoing = true 4649 verify_server_hostname = true 4650 ports { 4651 server = ` + strconv.Itoa(secondaryRPCPorts[1]) + ` 4652 } 4653 # wanfed 4654 primary_gateways = ["` + gwAddr + `"] 4655 retry_interval_wan = "250ms" 4656 connect { 4657 enabled = true 4658 enable_mesh_gateway_wan_federation = true 4659 } 4660 `}) 4661 defer a3.Shutdown() 4662 testrpc.WaitForTestAgent(t, a3.RPC, "dc3") 4663 4664 // The primary_gateways config setting should cause automatic mesh join. 4665 // Assert that the secondaries have joined the primary. 4666 findPrimary := func(r *retry.R, a *TestAgent) *serf.Member { 4667 var primary *serf.Member 4668 for _, m := range a.WANMembers() { 4669 if m.Tags["dc"] == "dc1" { 4670 require.Nil(r, primary, "already found one node in dc1") 4671 primary = &m 4672 } 4673 } 4674 require.NotNil(r, primary) 4675 return primary 4676 } 4677 retry.Run(t, func(r *retry.R) { 4678 p2, p3 := findPrimary(r, a2), findPrimary(r, a3) 4679 require.Equal(r, "bob.dc1", p2.Name) 4680 require.Equal(r, "bob.dc1", p3.Name) 4681 }) 4682 4683 testrpc.WaitForLeader(t, a2.RPC, "dc2") 4684 testrpc.WaitForLeader(t, a3.RPC, "dc3") 4685 4686 // Now we can register this into the catalog in dc2 and dc3. 4687 { 4688 args := &structs.ServiceDefinition{ 4689 Kind: structs.ServiceKindMeshGateway, 4690 ID: "mesh-gateway", 4691 Name: "mesh-gateway", 4692 Meta: map[string]string{structs.MetaWANFederationKey: "1"}, 4693 Port: gwPort[0], 4694 } 4695 req, err := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args)) 4696 require.NoError(t, err) 4697 4698 obj, err := a2.srv.AgentRegisterService(nil, req) 4699 require.NoError(t, err) 4700 require.Nil(t, obj) 4701 } 4702 { 4703 args := &structs.ServiceDefinition{ 4704 Kind: structs.ServiceKindMeshGateway, 4705 ID: "mesh-gateway", 4706 Name: "mesh-gateway", 4707 Meta: map[string]string{structs.MetaWANFederationKey: "1"}, 4708 Port: gwPort[0], 4709 } 4710 req, err := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args)) 4711 require.NoError(t, err) 4712 4713 obj, err := a3.srv.AgentRegisterService(nil, req) 4714 require.NoError(t, err) 4715 require.Nil(t, obj) 4716 } 4717 4718 // Wait until federation state replication functions 4719 waitForFederationState(t, a1, "dc1") 4720 waitForFederationState(t, a1, "dc2") 4721 waitForFederationState(t, a1, "dc3") 4722 4723 waitForFederationState(t, a2, "dc1") 4724 waitForFederationState(t, a2, "dc2") 4725 waitForFederationState(t, a2, "dc3") 4726 4727 waitForFederationState(t, a3, "dc1") 4728 waitForFederationState(t, a3, "dc2") 4729 waitForFederationState(t, a3, "dc3") 4730 4731 retry.Run(t, func(r *retry.R) { 4732 require.NotEmpty(r, a1.PickRandomMeshGatewaySuitableForDialing("dc1")) 4733 require.NotEmpty(r, a1.PickRandomMeshGatewaySuitableForDialing("dc2")) 4734 require.NotEmpty(r, a1.PickRandomMeshGatewaySuitableForDialing("dc3")) 4735 4736 require.NotEmpty(r, a2.PickRandomMeshGatewaySuitableForDialing("dc1")) 4737 require.NotEmpty(r, a2.PickRandomMeshGatewaySuitableForDialing("dc2")) 4738 require.NotEmpty(r, a2.PickRandomMeshGatewaySuitableForDialing("dc3")) 4739 4740 require.NotEmpty(r, a3.PickRandomMeshGatewaySuitableForDialing("dc1")) 4741 require.NotEmpty(r, a3.PickRandomMeshGatewaySuitableForDialing("dc2")) 4742 require.NotEmpty(r, a3.PickRandomMeshGatewaySuitableForDialing("dc3")) 4743 }) 4744 4745 retry.Run(t, func(r *retry.R) { 4746 if got, want := len(a1.WANMembers()), 3; got != want { 4747 r.Fatalf("got %d WAN members want at least %d", got, want) 4748 } 4749 if got, want := len(a2.WANMembers()), 3; got != want { 4750 r.Fatalf("got %d WAN members want at least %d", got, want) 4751 } 4752 if got, want := len(a3.WANMembers()), 3; got != want { 4753 r.Fatalf("got %d WAN members want at least %d", got, want) 4754 } 4755 }) 4756 4757 // Ensure we can do some trivial RPC in all directions. 4758 // 4759 // NOTE: we explicitly make streaming and non-streaming assertions here to 4760 // verify both rpc and grpc codepaths. 4761 agents := map[string]*TestAgent{"dc1": a1, "dc2": a2, "dc3": a3} 4762 names := map[string]string{"dc1": "bob", "dc2": "betty", "dc3": "bonnie"} 4763 for _, srcDC := range []string{"dc1", "dc2", "dc3"} { 4764 a := agents[srcDC] 4765 for _, dstDC := range []string{"dc1", "dc2", "dc3"} { 4766 if srcDC == dstDC { 4767 continue 4768 } 4769 t.Run(srcDC+" to "+dstDC, func(t *testing.T) { 4770 t.Run("normal-rpc", func(t *testing.T) { 4771 req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc="+dstDC, nil) 4772 require.NoError(t, err) 4773 4774 resp := httptest.NewRecorder() 4775 obj, err := a.srv.CatalogNodes(resp, req) 4776 require.NoError(t, err) 4777 require.NotNil(t, obj) 4778 4779 nodes, ok := obj.(structs.Nodes) 4780 require.True(t, ok) 4781 require.Len(t, nodes, 1) 4782 node := nodes[0] 4783 require.Equal(t, dstDC, node.Datacenter) 4784 require.Equal(t, names[dstDC], node.Node) 4785 }) 4786 t.Run("streaming-grpc", func(t *testing.T) { 4787 req, err := http.NewRequest("GET", "/v1/health/service/consul?cached&dc="+dstDC, nil) 4788 require.NoError(t, err) 4789 4790 resp := httptest.NewRecorder() 4791 obj, err := a.srv.HealthServiceNodes(resp, req) 4792 require.NoError(t, err) 4793 require.NotNil(t, obj) 4794 4795 csns, ok := obj.(structs.CheckServiceNodes) 4796 require.True(t, ok) 4797 require.Len(t, csns, 1) 4798 4799 csn := csns[0] 4800 require.Equal(t, dstDC, csn.Node.Datacenter) 4801 require.Equal(t, names[dstDC], csn.Node.Node) 4802 }) 4803 }) 4804 } 4805 } 4806} 4807 4808func TestAutoConfig_Integration(t *testing.T) { 4809 if testing.Short() { 4810 t.Skip("too slow for testing.Short") 4811 } 4812 4813 // eventually this test should really live with integration tests 4814 // the goal here is to have one test server and another test client 4815 // spin up both agents and allow the server to authorize the auto config 4816 // request and then see the client joined. Finally we force a CA roots 4817 // update and wait to see that the agents TLS certificate gets updated. 4818 4819 cfgDir := testutil.TempDir(t, "auto-config") 4820 4821 // write some test TLS certificates out to the cfg dir 4822 cert, key, cacert, err := testTLSCertificates("server.dc1.consul") 4823 require.NoError(t, err) 4824 4825 certFile := filepath.Join(cfgDir, "cert.pem") 4826 caFile := filepath.Join(cfgDir, "cacert.pem") 4827 keyFile := filepath.Join(cfgDir, "key.pem") 4828 4829 require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600)) 4830 require.NoError(t, ioutil.WriteFile(caFile, []byte(cacert), 0600)) 4831 require.NoError(t, ioutil.WriteFile(keyFile, []byte(key), 0600)) 4832 4833 // generate a gossip key 4834 gossipKey := make([]byte, 32) 4835 n, err := rand.Read(gossipKey) 4836 require.NoError(t, err) 4837 require.Equal(t, 32, n) 4838 gossipKeyEncoded := base64.StdEncoding.EncodeToString(gossipKey) 4839 4840 // generate the JWT signing keys 4841 pub, priv, err := oidcauthtest.GenerateKey() 4842 require.NoError(t, err) 4843 4844 hclConfig := TestACLConfigWithParams(nil) + ` 4845 encrypt = "` + gossipKeyEncoded + `" 4846 encrypt_verify_incoming = true 4847 encrypt_verify_outgoing = true 4848 verify_incoming = true 4849 verify_outgoing = true 4850 verify_server_hostname = true 4851 ca_file = "` + caFile + `" 4852 cert_file = "` + certFile + `" 4853 key_file = "` + keyFile + `" 4854 connect { enabled = true } 4855 auto_config { 4856 authorization { 4857 enabled = true 4858 static { 4859 claim_mappings = { 4860 consul_node_name = "node" 4861 } 4862 claim_assertions = [ 4863 "value.node == \"${node}\"" 4864 ] 4865 bound_issuer = "consul" 4866 bound_audiences = [ 4867 "consul" 4868 ] 4869 jwt_validation_pub_keys = ["` + strings.ReplaceAll(pub, "\n", "\\n") + `"] 4870 } 4871 } 4872 } 4873 ` 4874 4875 srv := StartTestAgent(t, TestAgent{Name: "TestAgent-Server", HCL: hclConfig}) 4876 defer srv.Shutdown() 4877 4878 testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken)) 4879 4880 // sign a JWT token 4881 now := time.Now() 4882 token, err := oidcauthtest.SignJWT(priv, jwt.Claims{ 4883 Subject: "consul", 4884 Issuer: "consul", 4885 Audience: jwt.Audience{"consul"}, 4886 NotBefore: jwt.NewNumericDate(now.Add(-1 * time.Second)), 4887 Expiry: jwt.NewNumericDate(now.Add(5 * time.Minute)), 4888 }, map[string]interface{}{ 4889 "consul_node_name": "test-client", 4890 }) 4891 require.NoError(t, err) 4892 4893 client := StartTestAgent(t, TestAgent{Name: "test-client", 4894 Overrides: ` 4895 connect { 4896 test_ca_leaf_root_change_spread = "1ns" 4897 } 4898 `, 4899 HCL: ` 4900 bootstrap = false 4901 server = false 4902 ca_file = "` + caFile + `" 4903 verify_outgoing = true 4904 verify_server_hostname = true 4905 node_name = "test-client" 4906 ports { 4907 server = ` + strconv.Itoa(srv.Config.RPCBindAddr.Port) + ` 4908 } 4909 auto_config { 4910 enabled = true 4911 intro_token = "` + token + `" 4912 server_addresses = ["` + srv.Config.RPCBindAddr.String() + `"] 4913 }`, 4914 }) 4915 4916 defer client.Shutdown() 4917 4918 retry.Run(t, func(r *retry.R) { 4919 require.NotNil(r, client.Agent.tlsConfigurator.Cert()) 4920 }) 4921 4922 // when this is successful we managed to get the gossip key and serf addresses to bind to 4923 // and then connect. Additionally we would have to have certificates or else the 4924 // verify_incoming config on the server would not let it work. 4925 testrpc.WaitForTestAgent(t, client.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken)) 4926 4927 // grab the existing cert 4928 cert1 := client.Agent.tlsConfigurator.Cert() 4929 require.NotNil(t, cert1) 4930 4931 // force a roots rotation by updating the CA config 4932 t.Logf("Forcing roots rotation on the server") 4933 ca := connect.TestCA(t, nil) 4934 req := &structs.CARequest{ 4935 Datacenter: "dc1", 4936 WriteRequest: structs.WriteRequest{Token: TestDefaultMasterToken}, 4937 Config: &structs.CAConfiguration{ 4938 Provider: "consul", 4939 Config: map[string]interface{}{ 4940 "LeafCertTTL": "1h", 4941 "PrivateKey": ca.SigningKey, 4942 "RootCert": ca.RootCert, 4943 "IntermediateCertTTL": "3h", 4944 }, 4945 }, 4946 } 4947 var reply interface{} 4948 require.NoError(t, srv.RPC("ConnectCA.ConfigurationSet", &req, &reply)) 4949 4950 // ensure that a new cert gets generated and pushed into the TLS configurator 4951 retry.Run(t, func(r *retry.R) { 4952 require.NotEqual(r, cert1, client.Agent.tlsConfigurator.Cert()) 4953 4954 // check that the on disk certs match expectations 4955 data, err := ioutil.ReadFile(filepath.Join(client.DataDir, "auto-config.json")) 4956 require.NoError(r, err) 4957 rdr := strings.NewReader(string(data)) 4958 4959 var resp pbautoconf.AutoConfigResponse 4960 pbUnmarshaler := &jsonpb.Unmarshaler{ 4961 AllowUnknownFields: false, 4962 } 4963 require.NoError(r, pbUnmarshaler.Unmarshal(rdr, &resp), "data: %s", data) 4964 4965 actual, err := tls.X509KeyPair([]byte(resp.Certificate.CertPEM), []byte(resp.Certificate.PrivateKeyPEM)) 4966 require.NoError(r, err) 4967 require.Equal(r, client.Agent.tlsConfigurator.Cert(), &actual) 4968 }) 4969 4970 // spot check that we now have an ACL token 4971 require.NotEmpty(t, client.tokens.AgentToken()) 4972} 4973 4974func TestAgent_AutoEncrypt(t *testing.T) { 4975 if testing.Short() { 4976 t.Skip("too slow for testing.Short") 4977 } 4978 4979 // eventually this test should really live with integration tests 4980 // the goal here is to have one test server and another test client 4981 // spin up both agents and allow the server to authorize the auto encrypt 4982 // request and then see the client get a TLS certificate 4983 cfgDir := testutil.TempDir(t, "auto-encrypt") 4984 4985 // write some test TLS certificates out to the cfg dir 4986 cert, key, cacert, err := testTLSCertificates("server.dc1.consul") 4987 require.NoError(t, err) 4988 4989 certFile := filepath.Join(cfgDir, "cert.pem") 4990 caFile := filepath.Join(cfgDir, "cacert.pem") 4991 keyFile := filepath.Join(cfgDir, "key.pem") 4992 4993 require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600)) 4994 require.NoError(t, ioutil.WriteFile(caFile, []byte(cacert), 0600)) 4995 require.NoError(t, ioutil.WriteFile(keyFile, []byte(key), 0600)) 4996 4997 hclConfig := TestACLConfigWithParams(nil) + ` 4998 verify_incoming = true 4999 verify_outgoing = true 5000 verify_server_hostname = true 5001 ca_file = "` + caFile + `" 5002 cert_file = "` + certFile + `" 5003 key_file = "` + keyFile + `" 5004 connect { enabled = true } 5005 auto_encrypt { allow_tls = true } 5006 ` 5007 5008 srv := StartTestAgent(t, TestAgent{Name: "test-server", HCL: hclConfig}) 5009 defer srv.Shutdown() 5010 5011 testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken)) 5012 5013 client := StartTestAgent(t, TestAgent{Name: "test-client", HCL: TestACLConfigWithParams(nil) + ` 5014 bootstrap = false 5015 server = false 5016 ca_file = "` + caFile + `" 5017 verify_outgoing = true 5018 verify_server_hostname = true 5019 node_name = "test-client" 5020 auto_encrypt { 5021 tls = true 5022 } 5023 ports { 5024 server = ` + strconv.Itoa(srv.Config.RPCBindAddr.Port) + ` 5025 } 5026 retry_join = ["` + srv.Config.SerfBindAddrLAN.String() + `"]`, 5027 UseTLS: true, 5028 }) 5029 5030 defer client.Shutdown() 5031 5032 // when this is successful we managed to get a TLS certificate and are using it for 5033 // encrypted RPC connections. 5034 testrpc.WaitForTestAgent(t, client.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken)) 5035 5036 // now we need to validate that our certificate has the correct CN 5037 aeCert := client.tlsConfigurator.Cert() 5038 require.NotNil(t, aeCert) 5039 5040 id := connect.SpiffeIDAgent{ 5041 Host: connect.TestClusterID + ".consul", 5042 Datacenter: "dc1", 5043 Agent: "test-client", 5044 } 5045 expectedCN := connect.AgentCN("test-client", connect.TestClusterID) 5046 x509Cert, err := x509.ParseCertificate(aeCert.Certificate[0]) 5047 require.NoError(t, err) 5048 require.Equal(t, expectedCN, x509Cert.Subject.CommonName) 5049 require.Len(t, x509Cert.URIs, 1) 5050 require.Equal(t, id.URI(), x509Cert.URIs[0]) 5051} 5052 5053func TestSharedRPCRouter(t *testing.T) { 5054 if testing.Short() { 5055 t.Skip("too slow for testing.Short") 5056 } 5057 5058 // this test runs both a server and client and ensures that the shared 5059 // router is being used. It would be possible for the Client and Server 5060 // types to create and use their own routers and for RPCs such as the 5061 // ones used in WaitForTestAgent to succeed. However accessing the 5062 // router stored on the agent ensures that Serf information from the 5063 // Client/Server types are being set in the same shared rpc router. 5064 5065 srv := NewTestAgent(t, "") 5066 defer srv.Shutdown() 5067 5068 testrpc.WaitForTestAgent(t, srv.RPC, "dc1") 5069 5070 mgr, server := srv.Agent.baseDeps.Router.FindLANRoute() 5071 require.NotNil(t, mgr) 5072 require.NotNil(t, server) 5073 5074 client := NewTestAgent(t, ` 5075 server = false 5076 bootstrap = false 5077 retry_join = ["`+srv.Config.SerfBindAddrLAN.String()+`"] 5078 `) 5079 5080 testrpc.WaitForTestAgent(t, client.RPC, "dc1") 5081 5082 mgr, server = client.Agent.baseDeps.Router.FindLANRoute() 5083 require.NotNil(t, mgr) 5084 require.NotNil(t, server) 5085} 5086 5087func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) { 5088 if testing.Short() { 5089 t.Skip("too slow for testing.Short") 5090 } 5091 5092 ports, err := freeport.Take(2) 5093 require.NoError(t, err) 5094 t.Cleanup(func() { freeport.Return(ports) }) 5095 5096 caConfig := tlsutil.Config{} 5097 tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil)) 5098 require.NoError(t, err) 5099 bd := BaseDeps{ 5100 Deps: consul.Deps{ 5101 Logger: hclog.NewInterceptLogger(nil), 5102 Tokens: new(token.Store), 5103 TLSConfigurator: tlsConf, 5104 GRPCConnPool: &fakeGRPCConnPool{}, 5105 }, 5106 RuntimeConfig: &config.RuntimeConfig{ 5107 HTTPAddrs: []net.Addr{ 5108 &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: ports[0]}, 5109 &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: ports[1]}, 5110 }, 5111 }, 5112 Cache: cache.New(cache.Options{}), 5113 } 5114 5115 bd, err = initEnterpriseBaseDeps(bd, nil) 5116 require.NoError(t, err) 5117 5118 agent, err := New(bd) 5119 require.NoError(t, err) 5120 5121 agent.startLicenseManager(testutil.TestContext(t)) 5122 5123 srvs, err := agent.listenHTTP() 5124 require.NoError(t, err) 5125 defer func() { 5126 ctx := context.Background() 5127 for _, srv := range srvs { 5128 srv.Shutdown(ctx) 5129 } 5130 }() 5131 5132 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5133 t.Cleanup(cancel) 5134 5135 g := new(errgroup.Group) 5136 for _, s := range srvs { 5137 g.Go(s.Run) 5138 } 5139 5140 require.Len(t, srvs, 2) 5141 require.Len(t, uniqueAddrs(srvs), 2) 5142 5143 client := &http.Client{} 5144 for _, s := range srvs { 5145 u := url.URL{Scheme: s.Protocol, Host: s.Addr.String()} 5146 req, err := http.NewRequest(http.MethodGet, u.String(), nil) 5147 require.NoError(t, err) 5148 5149 resp, err := client.Do(req.WithContext(ctx)) 5150 require.NoError(t, err) 5151 require.Equal(t, 200, resp.StatusCode) 5152 } 5153} 5154 5155func uniqueAddrs(srvs []apiServer) map[string]struct{} { 5156 result := make(map[string]struct{}, len(srvs)) 5157 for _, s := range srvs { 5158 result[s.Addr.String()] = struct{}{} 5159 } 5160 return result 5161} 5162