1package testutil 2 3// TestServer is a test helper. It uses a fork/exec model to create 4// a test Consul server instance in the background and initialize it 5// with some data and/or services. The test server can then be used 6// to run a unit test, and offers an easy API to tear itself down 7// when the test has completed. The only prerequisite is to have a consul 8// binary available on the $PATH. 9// 10// This package does not use Consul's official API client. This is 11// because we use TestServer to test the API client, which would 12// otherwise cause an import cycle. 13 14import ( 15 "context" 16 "encoding/json" 17 "fmt" 18 "io" 19 "io/ioutil" 20 "net" 21 "net/http" 22 "os" 23 "os/exec" 24 "path/filepath" 25 "runtime" 26 "strconv" 27 "strings" 28 "syscall" 29 "testing" 30 "time" 31 32 "github.com/hashicorp/consul/sdk/freeport" 33 "github.com/hashicorp/consul/sdk/testutil/retry" 34 "github.com/hashicorp/go-cleanhttp" 35 "github.com/hashicorp/go-uuid" 36 "github.com/pkg/errors" 37) 38 39// TestPerformanceConfig configures the performance parameters. 40type TestPerformanceConfig struct { 41 RaftMultiplier uint `json:"raft_multiplier,omitempty"` 42} 43 44// TestPortConfig configures the various ports used for services 45// provided by the Consul server. 46type TestPortConfig struct { 47 DNS int `json:"dns,omitempty"` 48 HTTP int `json:"http,omitempty"` 49 HTTPS int `json:"https,omitempty"` 50 SerfLan int `json:"serf_lan,omitempty"` 51 SerfWan int `json:"serf_wan,omitempty"` 52 Server int `json:"server,omitempty"` 53 ProxyMinPort int `json:"proxy_min_port,omitempty"` 54 ProxyMaxPort int `json:"proxy_max_port,omitempty"` 55} 56 57// TestAddressConfig contains the bind addresses for various 58// components of the Consul server. 59type TestAddressConfig struct { 60 HTTP string `json:"http,omitempty"` 61} 62 63// TestNetworkSegment contains the configuration for a network segment. 64type TestNetworkSegment struct { 65 Name string `json:"name"` 66 Bind string `json:"bind"` 67 Port int `json:"port"` 68 Advertise string `json:"advertise"` 69} 70 71// TestServerConfig is the main server configuration struct. 72type TestServerConfig struct { 73 NodeName string `json:"node_name"` 74 NodeID string `json:"node_id"` 75 NodeMeta map[string]string `json:"node_meta,omitempty"` 76 Performance *TestPerformanceConfig `json:"performance,omitempty"` 77 Bootstrap bool `json:"bootstrap,omitempty"` 78 Server bool `json:"server,omitempty"` 79 DataDir string `json:"data_dir,omitempty"` 80 Datacenter string `json:"datacenter,omitempty"` 81 Segments []TestNetworkSegment `json:"segments"` 82 DisableCheckpoint bool `json:"disable_update_check"` 83 LogLevel string `json:"log_level,omitempty"` 84 Bind string `json:"bind_addr,omitempty"` 85 Addresses *TestAddressConfig `json:"addresses,omitempty"` 86 Ports *TestPortConfig `json:"ports,omitempty"` 87 RaftProtocol int `json:"raft_protocol,omitempty"` 88 ACLMasterToken string `json:"acl_master_token,omitempty"` 89 ACLDatacenter string `json:"acl_datacenter,omitempty"` 90 PrimaryDatacenter string `json:"primary_datacenter,omitempty"` 91 ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` 92 ACL TestACLs `json:"acl,omitempty"` 93 Encrypt string `json:"encrypt,omitempty"` 94 CAFile string `json:"ca_file,omitempty"` 95 CertFile string `json:"cert_file,omitempty"` 96 KeyFile string `json:"key_file,omitempty"` 97 VerifyIncoming bool `json:"verify_incoming,omitempty"` 98 VerifyIncomingRPC bool `json:"verify_incoming_rpc,omitempty"` 99 VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"` 100 VerifyOutgoing bool `json:"verify_outgoing,omitempty"` 101 EnableScriptChecks bool `json:"enable_script_checks,omitempty"` 102 Connect map[string]interface{} `json:"connect,omitempty"` 103 EnableDebug bool `json:"enable_debug,omitempty"` 104 ReadyTimeout time.Duration `json:"-"` 105 Stdout io.Writer `json:"-"` 106 Stderr io.Writer `json:"-"` 107 Args []string `json:"-"` 108 ReturnPorts func() `json:"-"` 109} 110 111type TestACLs struct { 112 Enabled bool `json:"enabled,omitempty"` 113 TokenReplication bool `json:"enable_token_replication,omitempty"` 114 PolicyTTL string `json:"policy_ttl,omitempty"` 115 TokenTTL string `json:"token_ttl,omitempty"` 116 DownPolicy string `json:"down_policy,omitempty"` 117 DefaultPolicy string `json:"default_policy,omitempty"` 118 EnableKeyListPolicy bool `json:"enable_key_list_policy,omitempty"` 119 Tokens TestTokens `json:"tokens,omitempty"` 120 DisabledTTL string `json:"disabled_ttl,omitempty"` 121} 122 123type TestTokens struct { 124 Master string `json:"master,omitempty"` 125 Replication string `json:"replication,omitempty"` 126 AgentMaster string `json:"agent_master,omitempty"` 127 Default string `json:"default,omitempty"` 128 Agent string `json:"agent,omitempty"` 129} 130 131// ServerConfigCallback is a function interface which can be 132// passed to NewTestServerConfig to modify the server config. 133type ServerConfigCallback func(c *TestServerConfig) 134 135// defaultServerConfig returns a new TestServerConfig struct 136// with all of the listen ports incremented by one. 137func defaultServerConfig(t TestingTB) *TestServerConfig { 138 nodeID, err := uuid.GenerateUUID() 139 if err != nil { 140 panic(err) 141 } 142 143 ports := freeport.MustTake(6) 144 logBuffer := NewLogBuffer(t) 145 146 return &TestServerConfig{ 147 NodeName: "node-" + nodeID, 148 NodeID: nodeID, 149 DisableCheckpoint: true, 150 Performance: &TestPerformanceConfig{ 151 RaftMultiplier: 1, 152 }, 153 Bootstrap: true, 154 Server: true, 155 LogLevel: "debug", 156 Bind: "127.0.0.1", 157 Addresses: &TestAddressConfig{}, 158 Ports: &TestPortConfig{ 159 DNS: ports[0], 160 HTTP: ports[1], 161 HTTPS: ports[2], 162 SerfLan: ports[3], 163 SerfWan: ports[4], 164 Server: ports[5], 165 }, 166 ReadyTimeout: 10 * time.Second, 167 Connect: map[string]interface{}{ 168 "enabled": true, 169 "ca_config": map[string]interface{}{ 170 // const TestClusterID causes import cycle so hard code it here. 171 "cluster_id": "11111111-2222-3333-4444-555555555555", 172 }, 173 }, 174 ReturnPorts: func() { 175 freeport.Return(ports) 176 }, 177 Stdout: logBuffer, 178 Stderr: logBuffer, 179 } 180} 181 182// TestService is used to serialize a service definition. 183type TestService struct { 184 ID string `json:",omitempty"` 185 Name string `json:",omitempty"` 186 Tags []string `json:",omitempty"` 187 Address string `json:",omitempty"` 188 Port int `json:",omitempty"` 189} 190 191// TestCheck is used to serialize a check definition. 192type TestCheck struct { 193 ID string `json:",omitempty"` 194 Name string `json:",omitempty"` 195 ServiceID string `json:",omitempty"` 196 TTL string `json:",omitempty"` 197} 198 199// TestKVResponse is what we use to decode KV data. 200type TestKVResponse struct { 201 Value string 202} 203 204// TestServer is the main server wrapper struct. 205type TestServer struct { 206 cmd *exec.Cmd 207 Config *TestServerConfig 208 209 HTTPAddr string 210 HTTPSAddr string 211 LANAddr string 212 WANAddr string 213 214 HTTPClient *http.Client 215 216 tmpdir string 217} 218 219// NewTestServerConfigT creates a new TestServer, and makes a call to an optional 220// callback function to modify the configuration. If there is an error 221// configuring or starting the server, the server will NOT be running when the 222// function returns (thus you do not need to stop it). 223func NewTestServerConfigT(t TestingTB, cb ServerConfigCallback) (*TestServer, error) { 224 path, err := exec.LookPath("consul") 225 if err != nil || path == "" { 226 return nil, fmt.Errorf("consul not found on $PATH - download and install " + 227 "consul or skip this test") 228 } 229 230 prefix := "consul" 231 if t != nil { 232 // Use test name for tmpdir if available 233 prefix = strings.Replace(t.Name(), "/", "_", -1) 234 } 235 tmpdir, err := ioutil.TempDir("", prefix) 236 if err != nil { 237 return nil, errors.Wrap(err, "failed to create tempdir") 238 } 239 240 cfg := defaultServerConfig(t) 241 cfg.DataDir = filepath.Join(tmpdir, "data") 242 if cb != nil { 243 cb(cfg) 244 } 245 246 b, err := json.Marshal(cfg) 247 if err != nil { 248 cfg.ReturnPorts() 249 os.RemoveAll(tmpdir) 250 return nil, errors.Wrap(err, "failed marshaling json") 251 } 252 253 t.Logf("CONFIG JSON: %s", string(b)) 254 configFile := filepath.Join(tmpdir, "config.json") 255 if err := ioutil.WriteFile(configFile, b, 0644); err != nil { 256 cfg.ReturnPorts() 257 os.RemoveAll(tmpdir) 258 return nil, errors.Wrap(err, "failed writing config content") 259 } 260 261 // Start the server 262 args := []string{"agent", "-config-file", configFile} 263 args = append(args, cfg.Args...) 264 cmd := exec.Command("consul", args...) 265 cmd.Stdout = cfg.Stdout 266 cmd.Stderr = cfg.Stderr 267 if err := cmd.Start(); err != nil { 268 cfg.ReturnPorts() 269 os.RemoveAll(tmpdir) 270 return nil, errors.Wrap(err, "failed starting command") 271 } 272 273 httpAddr := fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTP) 274 client := cleanhttp.DefaultClient() 275 if strings.HasPrefix(cfg.Addresses.HTTP, "unix://") { 276 httpAddr = cfg.Addresses.HTTP 277 tr := cleanhttp.DefaultTransport() 278 tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { 279 return net.Dial("unix", httpAddr[len("unix://"):]) 280 } 281 client = &http.Client{Transport: tr} 282 } 283 284 server := &TestServer{ 285 Config: cfg, 286 cmd: cmd, 287 288 HTTPAddr: httpAddr, 289 HTTPSAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTPS), 290 LANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfLan), 291 WANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfWan), 292 293 HTTPClient: client, 294 295 tmpdir: tmpdir, 296 } 297 298 // Wait for the server to be ready 299 if err := server.waitForAPI(); err != nil { 300 if err := server.Stop(); err != nil { 301 t.Logf("server stop failed with: %v", err) 302 } 303 return nil, err 304 } 305 306 return server, nil 307} 308 309// Stop stops the test Consul server, and removes the Consul data 310// directory once we are done. 311func (s *TestServer) Stop() error { 312 defer s.Config.ReturnPorts() 313 defer os.RemoveAll(s.tmpdir) 314 315 // There was no process 316 if s.cmd == nil { 317 return nil 318 } 319 320 if s.cmd.Process != nil { 321 if runtime.GOOS == "windows" { 322 if err := s.cmd.Process.Kill(); err != nil { 323 return errors.Wrap(err, "failed to kill consul server") 324 } 325 } else { // interrupt is not supported in windows 326 if err := s.cmd.Process.Signal(os.Interrupt); err != nil { 327 return errors.Wrap(err, "failed to kill consul server") 328 } 329 } 330 } 331 332 waitDone := make(chan error) 333 go func() { 334 waitDone <- s.cmd.Wait() 335 close(waitDone) 336 }() 337 338 // wait for the process to exit to be sure that the data dir can be 339 // deleted on all platforms. 340 select { 341 case err := <-waitDone: 342 return err 343 case <-time.After(10 * time.Second): 344 s.cmd.Process.Signal(syscall.SIGABRT) 345 s.cmd.Wait() 346 return fmt.Errorf("timeout waiting for server to stop gracefully") 347 } 348} 349 350// waitForAPI waits for the /status/leader HTTP endpoint to start 351// responding. This is an indication that the agent has started, 352// but will likely return before a leader is elected. 353// Note: We do not check for a successful response status because 354// we want this function to return without error even when 355// there's no leader elected. 356func (s *TestServer) waitForAPI() error { 357 var failed bool 358 359 // This retry replicates the logic of retry.Run to allow for nested retries. 360 // By returning an error we can wrap TestServer creation with retry.Run 361 // in makeClientWithConfig. 362 timer := retry.TwoSeconds() 363 deadline := time.Now().Add(timer.Timeout) 364 for !time.Now().After(deadline) { 365 time.Sleep(timer.Wait) 366 367 url := s.url("/v1/status/leader") 368 resp, err := s.masterGet(url) 369 if err != nil { 370 failed = true 371 continue 372 } 373 resp.Body.Close() 374 375 failed = false 376 } 377 if failed { 378 return fmt.Errorf("api unavailable") 379 } 380 return nil 381} 382 383// waitForLeader waits for the Consul server's HTTP API to become 384// available, and then waits for a known leader and an index of 385// 2 or more to be observed to confirm leader election is done. 386func (s *TestServer) WaitForLeader(t testing.TB) { 387 retry.Run(t, func(r *retry.R) { 388 // Query the API and check the status code. 389 url := s.url("/v1/catalog/nodes") 390 resp, err := s.masterGet(url) 391 if err != nil { 392 r.Fatalf("failed http get '%s': %v", url, err) 393 } 394 defer resp.Body.Close() 395 if err := s.requireOK(resp); err != nil { 396 r.Fatalf("failed OK response: %v", err) 397 } 398 399 // Ensure we have a leader and a node registration. 400 if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" { 401 r.Fatalf("Consul leader status: %#v", leader) 402 } 403 index, err := strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64) 404 if err != nil { 405 r.Fatalf("bad consul index: %v", err) 406 } 407 if index < 2 { 408 r.Fatal("consul index should be at least 2") 409 } 410 }) 411} 412 413// WaitForActiveCARoot waits until the server can return a Connect CA meaning 414// connect has completed bootstrapping and is ready to use. 415func (s *TestServer) WaitForActiveCARoot(t testing.TB) { 416 // don't need to fully decode the response 417 type rootsResponse struct { 418 ActiveRootID string 419 TrustDomain string 420 Roots []interface{} 421 } 422 423 retry.Run(t, func(r *retry.R) { 424 // Query the API and check the status code. 425 url := s.url("/v1/agent/connect/ca/roots") 426 resp, err := s.masterGet(url) 427 if err != nil { 428 r.Fatalf("failed http get '%s': %v", url, err) 429 } 430 defer resp.Body.Close() 431 // Roots will return an error status until it's been bootstrapped. We could 432 // parse the body and sanity check but that causes either import cycles 433 // since this is used in both `api` and consul test or duplication. The 200 434 // is all we really need to wait for. 435 if err := s.requireOK(resp); err != nil { 436 r.Fatalf("failed OK response: %v", err) 437 } 438 439 var roots rootsResponse 440 441 dec := json.NewDecoder(resp.Body) 442 if err := dec.Decode(&roots); err != nil { 443 r.Fatal(err) 444 } 445 446 if roots.ActiveRootID == "" || len(roots.Roots) < 1 { 447 r.Fatalf("/v1/agent/connect/ca/roots returned 200 but without roots: %+v", roots) 448 } 449 }) 450} 451 452// WaitForServiceIntentions waits until the server can accept config entry 453// kinds of service-intentions meaning any migration bootstrapping from pre-1.9 454// intentions has completed. 455func (s *TestServer) WaitForServiceIntentions(t testing.TB) { 456 const fakeConfigName = "Sa4ohw5raith4si0Ohwuqu3lowiethoh" 457 retry.Run(t, func(r *retry.R) { 458 // Try to delete a non-existent service-intentions config entry. The 459 // preflightCheck call in agent/consul/config_endpoint.go will fail if 460 // we aren't ready yet, vs just doing no work instead. 461 url := s.url("/v1/config/service-intentions/" + fakeConfigName) 462 resp, err := s.masterDelete(url) 463 if err != nil { 464 r.Fatalf("failed http get '%s': %v", url, err) 465 } 466 defer resp.Body.Close() 467 if err := s.requireOK(resp); err != nil { 468 r.Fatalf("failed OK response: %v", err) 469 } 470 }) 471} 472 473// WaitForSerfCheck ensures we have a node with serfHealth check registered 474// Behavior mirrors testrpc.WaitForTestAgent but avoids the dependency cycle in api pkg 475func (s *TestServer) WaitForSerfCheck(t testing.TB) { 476 retry.Run(t, func(r *retry.R) { 477 // Query the API and check the status code. 478 url := s.url("/v1/catalog/nodes?index=0") 479 resp, err := s.masterGet(url) 480 if err != nil { 481 r.Fatalf("failed http get: %v", err) 482 } 483 defer resp.Body.Close() 484 if err := s.requireOK(resp); err != nil { 485 r.Fatalf("failed OK response: %v", err) 486 } 487 488 // Watch for the anti-entropy sync to finish. 489 var payload []map[string]interface{} 490 dec := json.NewDecoder(resp.Body) 491 if err := dec.Decode(&payload); err != nil { 492 r.Fatal(err) 493 } 494 if len(payload) < 1 { 495 r.Fatal("No nodes") 496 } 497 498 // Ensure the serfHealth check is registered 499 url = s.url(fmt.Sprintf("/v1/health/node/%s", payload[0]["Node"])) 500 resp, err = s.masterGet(url) 501 if err != nil { 502 r.Fatalf("failed http get: %v", err) 503 } 504 defer resp.Body.Close() 505 if err := s.requireOK(resp); err != nil { 506 r.Fatalf("failed OK response: %v", err) 507 } 508 dec = json.NewDecoder(resp.Body) 509 if err = dec.Decode(&payload); err != nil { 510 r.Fatal(err) 511 } 512 513 var found bool 514 for _, check := range payload { 515 if check["CheckID"].(string) == "serfHealth" { 516 found = true 517 break 518 } 519 } 520 if !found { 521 r.Fatal("missing serfHealth registration") 522 } 523 }) 524} 525 526func (s *TestServer) masterGet(url string) (*http.Response, error) { 527 req, err := http.NewRequest("GET", url, nil) 528 if err != nil { 529 return nil, err 530 } 531 if s.Config.ACL.Tokens.Master != "" { 532 req.Header.Set("x-consul-token", s.Config.ACL.Tokens.Master) 533 } 534 return s.HTTPClient.Do(req) 535} 536 537func (s *TestServer) masterDelete(url string) (*http.Response, error) { 538 req, err := http.NewRequest("DELETE", url, nil) 539 if err != nil { 540 return nil, err 541 } 542 if s.Config.ACL.Tokens.Master != "" { 543 req.Header.Set("x-consul-token", s.Config.ACL.Tokens.Master) 544 } 545 return s.HTTPClient.Do(req) 546} 547