1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package consul 15 16import ( 17 "context" 18 "net/http" 19 "net/http/httptest" 20 "net/url" 21 "testing" 22 "time" 23 24 "github.com/go-kit/log" 25 "github.com/prometheus/common/config" 26 "github.com/prometheus/common/model" 27 "github.com/stretchr/testify/require" 28 "go.uber.org/goleak" 29 "gopkg.in/yaml.v2" 30 31 "github.com/prometheus/prometheus/discovery/targetgroup" 32) 33 34func TestMain(m *testing.M) { 35 goleak.VerifyTestMain(m) 36} 37 38func TestConfiguredService(t *testing.T) { 39 conf := &SDConfig{ 40 Services: []string{"configuredServiceName"}} 41 consulDiscovery, err := NewDiscovery(conf, nil) 42 43 if err != nil { 44 t.Errorf("Unexpected error when initializing discovery %v", err) 45 } 46 if !consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { 47 t.Errorf("Expected service %s to be watched", "configuredServiceName") 48 } 49 if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { 50 t.Errorf("Expected service %s to not be watched", "nonConfiguredServiceName") 51 } 52} 53 54func TestConfiguredServiceWithTag(t *testing.T) { 55 conf := &SDConfig{ 56 Services: []string{"configuredServiceName"}, 57 ServiceTags: []string{"http"}, 58 } 59 consulDiscovery, err := NewDiscovery(conf, nil) 60 61 if err != nil { 62 t.Errorf("Unexpected error when initializing discovery %v", err) 63 } 64 if consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { 65 t.Errorf("Expected service %s to not be watched without tag", "configuredServiceName") 66 } 67 if !consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}) { 68 t.Errorf("Expected service %s to be watched with tag %s", "configuredServiceName", "http") 69 } 70 if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { 71 t.Errorf("Expected service %s to not be watched without tag", "nonConfiguredServiceName") 72 } 73 if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}) { 74 t.Errorf("Expected service %s to not be watched with tag %s", "nonConfiguredServiceName", "http") 75 } 76} 77 78func TestConfiguredServiceWithTags(t *testing.T) { 79 type testcase struct { 80 // What we've configured to watch. 81 conf *SDConfig 82 // The service we're checking if we should watch or not. 83 serviceName string 84 serviceTags []string 85 shouldWatch bool 86 } 87 88 cases := []testcase{ 89 { 90 conf: &SDConfig{ 91 Services: []string{"configuredServiceName"}, 92 ServiceTags: []string{"http", "v1"}, 93 }, 94 serviceName: "configuredServiceName", 95 serviceTags: []string{""}, 96 shouldWatch: false, 97 }, 98 { 99 conf: &SDConfig{ 100 Services: []string{"configuredServiceName"}, 101 ServiceTags: []string{"http", "v1"}, 102 }, 103 serviceName: "configuredServiceName", 104 serviceTags: []string{"http", "v1"}, 105 shouldWatch: true, 106 }, 107 { 108 conf: &SDConfig{ 109 Services: []string{"configuredServiceName"}, 110 ServiceTags: []string{"http", "v1"}, 111 }, 112 serviceName: "nonConfiguredServiceName", 113 serviceTags: []string{""}, 114 shouldWatch: false, 115 }, 116 { 117 conf: &SDConfig{ 118 Services: []string{"configuredServiceName"}, 119 ServiceTags: []string{"http", "v1"}, 120 }, 121 serviceName: "nonConfiguredServiceName", 122 serviceTags: []string{"http, v1"}, 123 shouldWatch: false, 124 }, 125 { 126 conf: &SDConfig{ 127 Services: []string{"configuredServiceName"}, 128 ServiceTags: []string{"http", "v1"}, 129 }, 130 serviceName: "configuredServiceName", 131 serviceTags: []string{"http", "v1", "foo"}, 132 shouldWatch: true, 133 }, 134 { 135 conf: &SDConfig{ 136 Services: []string{"configuredServiceName"}, 137 ServiceTags: []string{"http", "v1", "foo"}, 138 }, 139 serviceName: "configuredServiceName", 140 serviceTags: []string{"http", "v1", "foo"}, 141 shouldWatch: true, 142 }, 143 { 144 conf: &SDConfig{ 145 Services: []string{"configuredServiceName"}, 146 ServiceTags: []string{"http", "v1"}, 147 }, 148 serviceName: "configuredServiceName", 149 serviceTags: []string{"http", "v1", "v1"}, 150 shouldWatch: true, 151 }, 152 } 153 154 for _, tc := range cases { 155 consulDiscovery, err := NewDiscovery(tc.conf, nil) 156 157 if err != nil { 158 t.Errorf("Unexpected error when initializing discovery %v", err) 159 } 160 ret := consulDiscovery.shouldWatch(tc.serviceName, tc.serviceTags) 161 if ret != tc.shouldWatch { 162 t.Errorf("Expected should watch? %t, got %t. Watched service and tags: %s %+v, input was %s %+v", tc.shouldWatch, ret, tc.conf.Services, tc.conf.ServiceTags, tc.serviceName, tc.serviceTags) 163 } 164 165 } 166} 167 168func TestNonConfiguredService(t *testing.T) { 169 conf := &SDConfig{} 170 consulDiscovery, err := NewDiscovery(conf, nil) 171 172 if err != nil { 173 t.Errorf("Unexpected error when initializing discovery %v", err) 174 } 175 if !consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { 176 t.Errorf("Expected service %s to be watched", "nonConfiguredServiceName") 177 } 178} 179 180const ( 181 AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}` 182 ServiceTestAnswer = ` 183[{ 184 "Node": { 185 "ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e", 186 "Node": "node1", 187 "Address": "1.1.1.1", 188 "Datacenter": "test-dc", 189 "TaggedAddresses": { 190 "lan": "192.168.10.10", 191 "wan": "10.0.10.10" 192 }, 193 "Meta": {"rack_name": "2304"}, 194 "CreateIndex": 1, 195 "ModifyIndex": 1 196 }, 197 "Service": { 198 "ID": "test", 199 "Service": "test", 200 "Tags": ["tag1"], 201 "Address": "", 202 "Meta": {"version":"1.0.0","environment":"staging"}, 203 "Port": 3341, 204 "Weights": { 205 "Passing": 1, 206 "Warning": 1 207 }, 208 "EnableTagOverride": false, 209 "ProxyDestination": "", 210 "Proxy": {}, 211 "Connect": {}, 212 "CreateIndex": 1, 213 "ModifyIndex": 1 214 }, 215 "Checks": [{ 216 "Node": "node1", 217 "CheckID": "serfHealth", 218 "Name": "Serf Health Status", 219 "Status": "passing" 220 }] 221}]` 222 223 ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}` 224) 225 226func newServer(t *testing.T) (*httptest.Server, *SDConfig) { 227 // github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary. 228 stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 229 response := "" 230 switch r.URL.String() { 231 case "/v1/agent/self": 232 response = AgentAnswer 233 case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=120000ms": 234 response = ServiceTestAnswer 235 case "/v1/health/service/test?wait=120000ms": 236 response = ServiceTestAnswer 237 case "/v1/health/service/other?wait=120000ms": 238 response = `[]` 239 case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms": 240 response = ServicesTestAnswer 241 case "/v1/catalog/services?wait=120000ms": 242 response = ServicesTestAnswer 243 case "/v1/catalog/services?index=1&node-meta=rack_name%3A2304&stale=&wait=120000ms": 244 time.Sleep(5 * time.Second) 245 response = ServicesTestAnswer 246 case "/v1/catalog/services?index=1&wait=120000ms": 247 time.Sleep(5 * time.Second) 248 response = ServicesTestAnswer 249 default: 250 t.Errorf("Unhandled consul call: %s", r.URL) 251 } 252 w.Header().Add("X-Consul-Index", "1") 253 w.Write([]byte(response)) 254 })) 255 stuburl, err := url.Parse(stub.URL) 256 require.NoError(t, err) 257 258 config := &SDConfig{ 259 Server: stuburl.Host, 260 Token: "fake-token", 261 RefreshInterval: model.Duration(1 * time.Second), 262 } 263 return stub, config 264} 265 266func newDiscovery(t *testing.T, config *SDConfig) *Discovery { 267 logger := log.NewNopLogger() 268 d, err := NewDiscovery(config, logger) 269 require.NoError(t, err) 270 return d 271} 272 273func checkOneTarget(t *testing.T, tg []*targetgroup.Group) { 274 require.Equal(t, 1, len(tg)) 275 target := tg[0] 276 require.Equal(t, "test-dc", string(target.Labels["__meta_consul_dc"])) 277 require.Equal(t, target.Source, string(target.Labels["__meta_consul_service"])) 278 if target.Source == "test" { 279 // test service should have one node. 280 require.Greater(t, len(target.Targets), 0, "Test service should have one node") 281 } 282} 283 284// Watch all the services in the catalog. 285func TestAllServices(t *testing.T) { 286 stub, config := newServer(t) 287 defer stub.Close() 288 289 d := newDiscovery(t, config) 290 291 ctx, cancel := context.WithCancel(context.Background()) 292 ch := make(chan []*targetgroup.Group) 293 go func() { 294 d.Run(ctx, ch) 295 close(ch) 296 }() 297 checkOneTarget(t, <-ch) 298 checkOneTarget(t, <-ch) 299 cancel() 300 <-ch 301} 302 303// targetgroup with no targets is emitted if no services were discovered. 304func TestNoTargets(t *testing.T) { 305 stub, config := newServer(t) 306 defer stub.Close() 307 config.ServiceTags = []string{"missing"} 308 309 d := newDiscovery(t, config) 310 311 ctx, cancel := context.WithCancel(context.Background()) 312 ch := make(chan []*targetgroup.Group) 313 go d.Run(ctx, ch) 314 315 targets := (<-ch)[0].Targets 316 require.Equal(t, 0, len(targets)) 317 cancel() 318} 319 320// Watch only the test service. 321func TestOneService(t *testing.T) { 322 stub, config := newServer(t) 323 defer stub.Close() 324 325 config.Services = []string{"test"} 326 d := newDiscovery(t, config) 327 328 ctx, cancel := context.WithCancel(context.Background()) 329 ch := make(chan []*targetgroup.Group) 330 go d.Run(ctx, ch) 331 checkOneTarget(t, <-ch) 332 cancel() 333} 334 335// Watch the test service with a specific tag and node-meta. 336func TestAllOptions(t *testing.T) { 337 stub, config := newServer(t) 338 defer stub.Close() 339 340 config.Services = []string{"test"} 341 config.NodeMeta = map[string]string{"rack_name": "2304"} 342 config.ServiceTags = []string{"tag1"} 343 config.AllowStale = true 344 config.Token = "fake-token" 345 346 d := newDiscovery(t, config) 347 348 ctx, cancel := context.WithCancel(context.Background()) 349 ch := make(chan []*targetgroup.Group) 350 go func() { 351 d.Run(ctx, ch) 352 close(ch) 353 }() 354 checkOneTarget(t, <-ch) 355 cancel() 356 <-ch 357} 358 359func TestGetDatacenterShouldReturnError(t *testing.T) { 360 for _, tc := range []struct { 361 handler func(http.ResponseWriter, *http.Request) 362 errMessage string 363 }{ 364 { 365 // Define a handler that will return status 500. 366 handler: func(w http.ResponseWriter, r *http.Request) { 367 w.WriteHeader(500) 368 }, 369 errMessage: "Unexpected response code: 500 ()", 370 }, 371 { 372 // Define a handler that will return incorrect response. 373 handler: func(w http.ResponseWriter, r *http.Request) { 374 w.Write([]byte(`{"Config": {"Not-Datacenter": "test-dc"}}`)) 375 }, 376 errMessage: "invalid value '<nil>' for Config.Datacenter", 377 }, 378 } { 379 stub := httptest.NewServer(http.HandlerFunc(tc.handler)) 380 stuburl, err := url.Parse(stub.URL) 381 require.NoError(t, err) 382 383 config := &SDConfig{ 384 Server: stuburl.Host, 385 Token: "fake-token", 386 RefreshInterval: model.Duration(1 * time.Second), 387 } 388 defer stub.Close() 389 d := newDiscovery(t, config) 390 391 // Should be empty if not initialized. 392 require.Equal(t, "", d.clientDatacenter) 393 394 err = d.getDatacenter() 395 396 // An error should be returned. 397 require.Equal(t, tc.errMessage, err.Error()) 398 // Should still be empty. 399 require.Equal(t, "", d.clientDatacenter) 400 } 401} 402 403func TestUnmarshalConfig(t *testing.T) { 404 unmarshal := func(d []byte) func(interface{}) error { 405 return func(o interface{}) error { 406 return yaml.Unmarshal(d, o) 407 } 408 } 409 410 goodConfig := DefaultSDConfig 411 goodConfig.Username = "123" 412 goodConfig.Password = "1234" 413 goodConfig.HTTPClientConfig = config.HTTPClientConfig{ 414 BasicAuth: &config.BasicAuth{ 415 Username: "123", 416 Password: "1234", 417 }, 418 FollowRedirects: true, 419 } 420 421 cases := []struct { 422 name string 423 config string 424 expected SDConfig 425 errMessage string 426 }{ 427 { 428 name: "good", 429 config: ` 430server: localhost:8500 431username: 123 432password: 1234 433`, 434 expected: goodConfig, 435 }, 436 { 437 name: "username and password and basic auth configured", 438 config: ` 439server: localhost:8500 440username: 123 441password: 1234 442basic_auth: 443 username: 12345 444 password: 123456 445`, 446 errMessage: "at most one of consul SD configuration username and password and basic auth can be configured", 447 }, 448 { 449 name: "token and authorization configured", 450 config: ` 451server: localhost:8500 452token: 1234567 453authorization: 454 credentials: 12345678 455`, 456 errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured", 457 }, 458 { 459 name: "token and oauth2 configured", 460 config: ` 461server: localhost:8500 462token: 1234567 463oauth2: 464 client_id: 10 465 client_secret: 11 466 token_url: http://example.com 467`, 468 errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured", 469 }, 470 } 471 472 for _, test := range cases { 473 t.Run(test.name, func(t *testing.T) { 474 var config SDConfig 475 err := config.UnmarshalYAML(unmarshal([]byte(test.config))) 476 if err != nil { 477 require.Equalf(t, err.Error(), test.errMessage, "Expected error '%s', got '%v'", test.errMessage, err) 478 return 479 } 480 if test.errMessage != "" { 481 t.Errorf("Expected error %s, got none", test.errMessage) 482 return 483 } 484 485 require.Equal(t, config, test.expected) 486 }) 487 } 488} 489