1// Copyright 2013 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package notifier 15 16import ( 17 "bytes" 18 "context" 19 "encoding/json" 20 "fmt" 21 "io/ioutil" 22 "net/http" 23 "net/http/httptest" 24 "net/url" 25 "testing" 26 "time" 27 28 "github.com/pkg/errors" 29 "github.com/prometheus/alertmanager/api/v2/models" 30 config_util "github.com/prometheus/common/config" 31 "github.com/prometheus/common/model" 32 "github.com/stretchr/testify/require" 33 "go.uber.org/atomic" 34 yaml "gopkg.in/yaml.v2" 35 36 "github.com/prometheus/prometheus/config" 37 "github.com/prometheus/prometheus/discovery/targetgroup" 38 "github.com/prometheus/prometheus/pkg/labels" 39 "github.com/prometheus/prometheus/pkg/relabel" 40) 41 42func TestPostPath(t *testing.T) { 43 var cases = []struct { 44 in, out string 45 }{ 46 { 47 in: "", 48 out: "/api/v1/alerts", 49 }, 50 { 51 in: "/", 52 out: "/api/v1/alerts", 53 }, 54 { 55 in: "/prefix", 56 out: "/prefix/api/v1/alerts", 57 }, 58 { 59 in: "/prefix//", 60 out: "/prefix/api/v1/alerts", 61 }, 62 { 63 in: "prefix//", 64 out: "/prefix/api/v1/alerts", 65 }, 66 } 67 for _, c := range cases { 68 require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV1)) 69 } 70} 71 72func TestHandlerNextBatch(t *testing.T) { 73 h := NewManager(&Options{}, nil) 74 75 for i := range make([]struct{}, 2*maxBatchSize+1) { 76 h.queue = append(h.queue, &Alert{ 77 Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), 78 }) 79 } 80 81 expected := append([]*Alert{}, h.queue...) 82 83 require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) 84 require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch())) 85 require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) 86 require.Equal(t, 0, len(h.queue), "Expected queue to be empty but got %d alerts", len(h.queue)) 87} 88 89func alertsEqual(a, b []*Alert) error { 90 if len(a) != len(b) { 91 return errors.Errorf("length mismatch: %v != %v", a, b) 92 } 93 for i, alert := range a { 94 if !labels.Equal(alert.Labels, b[i].Labels) { 95 return errors.Errorf("label mismatch at index %d: %s != %s", i, alert.Labels, b[i].Labels) 96 } 97 } 98 return nil 99} 100 101func TestHandlerSendAll(t *testing.T) { 102 var ( 103 errc = make(chan error, 1) 104 expected = make([]*Alert, 0, maxBatchSize) 105 status1, status2 atomic.Int32 106 ) 107 status1.Store(int32(http.StatusOK)) 108 status2.Store(int32(http.StatusOK)) 109 110 newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server { 111 return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 112 var err error 113 defer func() { 114 if err == nil { 115 return 116 } 117 select { 118 case errc <- err: 119 default: 120 } 121 }() 122 user, pass, _ := r.BasicAuth() 123 if user != u || pass != p { 124 err = errors.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p) 125 w.WriteHeader(http.StatusInternalServerError) 126 return 127 } 128 129 b, err := ioutil.ReadAll(r.Body) 130 if err != nil { 131 err = errors.Errorf("error reading body: %v", err) 132 w.WriteHeader(http.StatusInternalServerError) 133 return 134 } 135 136 var alerts []*Alert 137 err = json.Unmarshal(b, &alerts) 138 if err == nil { 139 err = alertsEqual(expected, alerts) 140 } 141 w.WriteHeader(int(status.Load())) 142 })) 143 } 144 server1 := newHTTPServer("prometheus", "testing_password", &status1) 145 server2 := newHTTPServer("", "", &status2) 146 defer server1.Close() 147 defer server2.Close() 148 149 h := NewManager(&Options{}, nil) 150 151 authClient, _ := config_util.NewClientFromConfig( 152 config_util.HTTPClientConfig{ 153 BasicAuth: &config_util.BasicAuth{ 154 Username: "prometheus", 155 Password: "testing_password", 156 }, 157 }, "auth_alertmanager") 158 159 h.alertmanagers = make(map[string]*alertmanagerSet) 160 161 am1Cfg := config.DefaultAlertmanagerConfig 162 am1Cfg.Timeout = model.Duration(time.Second) 163 164 am2Cfg := config.DefaultAlertmanagerConfig 165 am2Cfg.Timeout = model.Duration(time.Second) 166 167 h.alertmanagers["1"] = &alertmanagerSet{ 168 ams: []alertmanager{ 169 alertmanagerMock{ 170 urlf: func() string { return server1.URL }, 171 }, 172 }, 173 cfg: &am1Cfg, 174 client: authClient, 175 } 176 177 h.alertmanagers["2"] = &alertmanagerSet{ 178 ams: []alertmanager{ 179 alertmanagerMock{ 180 urlf: func() string { return server2.URL }, 181 }, 182 }, 183 cfg: &am2Cfg, 184 } 185 186 for i := range make([]struct{}, maxBatchSize) { 187 h.queue = append(h.queue, &Alert{ 188 Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), 189 }) 190 expected = append(expected, &Alert{ 191 Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), 192 }) 193 } 194 195 checkNoErr := func() { 196 t.Helper() 197 select { 198 case err := <-errc: 199 require.NoError(t, err) 200 default: 201 } 202 } 203 204 require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") 205 checkNoErr() 206 207 status1.Store(int32(http.StatusNotFound)) 208 require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") 209 checkNoErr() 210 211 status2.Store(int32(http.StatusInternalServerError)) 212 require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") 213 checkNoErr() 214} 215 216func TestCustomDo(t *testing.T) { 217 const testURL = "http://testurl.com/" 218 const testBody = "testbody" 219 220 var received bool 221 h := NewManager(&Options{ 222 Do: func(_ context.Context, client *http.Client, req *http.Request) (*http.Response, error) { 223 received = true 224 body, err := ioutil.ReadAll(req.Body) 225 226 require.NoError(t, err) 227 228 require.Equal(t, testBody, string(body)) 229 230 require.Equal(t, testURL, req.URL.String()) 231 232 return &http.Response{ 233 Body: ioutil.NopCloser(bytes.NewBuffer(nil)), 234 }, nil 235 }, 236 }, nil) 237 238 h.sendOne(context.Background(), nil, testURL, []byte(testBody)) 239 240 require.True(t, received, "Expected to receive an alert, but didn't") 241} 242 243func TestExternalLabels(t *testing.T) { 244 h := NewManager(&Options{ 245 QueueCapacity: 3 * maxBatchSize, 246 ExternalLabels: labels.Labels{{Name: "a", Value: "b"}}, 247 RelabelConfigs: []*relabel.Config{ 248 { 249 SourceLabels: model.LabelNames{"alertname"}, 250 TargetLabel: "a", 251 Action: "replace", 252 Regex: relabel.MustNewRegexp("externalrelabelthis"), 253 Replacement: "c", 254 }, 255 }, 256 }, nil) 257 258 // This alert should get the external label attached. 259 h.Send(&Alert{ 260 Labels: labels.FromStrings("alertname", "test"), 261 }) 262 263 // This alert should get the external label attached, but then set to "c" 264 // through relabelling. 265 h.Send(&Alert{ 266 Labels: labels.FromStrings("alertname", "externalrelabelthis"), 267 }) 268 269 expected := []*Alert{ 270 {Labels: labels.FromStrings("alertname", "test", "a", "b")}, 271 {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, 272 } 273 274 require.NoError(t, alertsEqual(expected, h.queue)) 275} 276 277func TestHandlerRelabel(t *testing.T) { 278 h := NewManager(&Options{ 279 QueueCapacity: 3 * maxBatchSize, 280 RelabelConfigs: []*relabel.Config{ 281 { 282 SourceLabels: model.LabelNames{"alertname"}, 283 Action: "drop", 284 Regex: relabel.MustNewRegexp("drop"), 285 }, 286 { 287 SourceLabels: model.LabelNames{"alertname"}, 288 TargetLabel: "alertname", 289 Action: "replace", 290 Regex: relabel.MustNewRegexp("rename"), 291 Replacement: "renamed", 292 }, 293 }, 294 }, nil) 295 296 // This alert should be dropped due to the configuration 297 h.Send(&Alert{ 298 Labels: labels.FromStrings("alertname", "drop"), 299 }) 300 301 // This alert should be replaced due to the configuration 302 h.Send(&Alert{ 303 Labels: labels.FromStrings("alertname", "rename"), 304 }) 305 306 expected := []*Alert{ 307 {Labels: labels.FromStrings("alertname", "renamed")}, 308 } 309 310 require.NoError(t, alertsEqual(expected, h.queue)) 311} 312 313func TestHandlerQueuing(t *testing.T) { 314 var ( 315 expectedc = make(chan []*Alert) 316 called = make(chan struct{}) 317 done = make(chan struct{}) 318 errc = make(chan error, 1) 319 ) 320 321 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 322 // Notify the test function that we have received something. 323 select { 324 case called <- struct{}{}: 325 case <-done: 326 return 327 } 328 329 // Wait for the test function to unblock us. 330 select { 331 case expected := <-expectedc: 332 var alerts []*Alert 333 334 b, err := ioutil.ReadAll(r.Body) 335 if err != nil { 336 panic(err) 337 } 338 339 err = json.Unmarshal(b, &alerts) 340 if err == nil { 341 err = alertsEqual(expected, alerts) 342 } 343 select { 344 case errc <- err: 345 default: 346 } 347 case <-done: 348 } 349 })) 350 defer func() { 351 close(done) 352 server.Close() 353 }() 354 355 h := NewManager( 356 &Options{ 357 QueueCapacity: 3 * maxBatchSize, 358 }, 359 nil, 360 ) 361 362 h.alertmanagers = make(map[string]*alertmanagerSet) 363 364 am1Cfg := config.DefaultAlertmanagerConfig 365 am1Cfg.Timeout = model.Duration(time.Second) 366 367 h.alertmanagers["1"] = &alertmanagerSet{ 368 ams: []alertmanager{ 369 alertmanagerMock{ 370 urlf: func() string { return server.URL }, 371 }, 372 }, 373 cfg: &am1Cfg, 374 } 375 go h.Run(nil) 376 defer h.Stop() 377 378 var alerts []*Alert 379 for i := range make([]struct{}, 20*maxBatchSize) { 380 alerts = append(alerts, &Alert{ 381 Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), 382 }) 383 } 384 385 assertAlerts := func(expected []*Alert) { 386 t.Helper() 387 for { 388 select { 389 case <-called: 390 expectedc <- expected 391 case err := <-errc: 392 require.NoError(t, err) 393 return 394 case <-time.After(5 * time.Second): 395 require.FailNow(t, "Alerts were not pushed.") 396 } 397 } 398 } 399 400 // If the batch is larger than the queue capacity, it should be truncated 401 // from the front. 402 h.Send(alerts[:4*maxBatchSize]...) 403 for i := 1; i < 4; i++ { 404 assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) 405 } 406 407 // Send one batch, wait for it to arrive and block the server so the queue fills up. 408 h.Send(alerts[:maxBatchSize]...) 409 <-called 410 411 // Send several batches while the server is still blocked so the queue 412 // fills up to its maximum capacity (3*maxBatchSize). Then check that the 413 // queue is truncated in the front. 414 h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) // this batch should be dropped. 415 h.Send(alerts[2*maxBatchSize : 3*maxBatchSize]...) 416 h.Send(alerts[3*maxBatchSize : 4*maxBatchSize]...) 417 418 // Send the batch that drops the first one. 419 h.Send(alerts[4*maxBatchSize : 5*maxBatchSize]...) 420 421 // Unblock the server. 422 expectedc <- alerts[:maxBatchSize] 423 select { 424 case err := <-errc: 425 require.NoError(t, err) 426 case <-time.After(5 * time.Second): 427 require.FailNow(t, "Alerts were not pushed.") 428 } 429 430 // Verify that we receive the last 3 batches. 431 for i := 2; i < 5; i++ { 432 assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) 433 } 434} 435 436type alertmanagerMock struct { 437 urlf func() string 438} 439 440func (a alertmanagerMock) url() *url.URL { 441 u, err := url.Parse(a.urlf()) 442 if err != nil { 443 panic(err) 444 } 445 return u 446} 447 448func TestLabelSetNotReused(t *testing.T) { 449 tg := makeInputTargetGroup() 450 _, _, err := alertmanagerFromGroup(tg, &config.AlertmanagerConfig{}) 451 452 require.NoError(t, err) 453 454 // Target modified during alertmanager extraction 455 require.Equal(t, tg, makeInputTargetGroup()) 456} 457 458func TestReload(t *testing.T) { 459 var tests = []struct { 460 in *targetgroup.Group 461 out string 462 }{ 463 { 464 in: &targetgroup.Group{ 465 Targets: []model.LabelSet{ 466 { 467 "__address__": "alertmanager:9093", 468 }, 469 }, 470 }, 471 out: "http://alertmanager:9093/api/v2/alerts", 472 }, 473 } 474 475 n := NewManager(&Options{}, nil) 476 477 cfg := &config.Config{} 478 s := ` 479alerting: 480 alertmanagers: 481 - static_configs: 482` 483 err := yaml.UnmarshalStrict([]byte(s), cfg) 484 require.NoError(t, err, "Unable to load YAML config.") 485 require.Equal(t, 1, len(cfg.AlertingConfig.AlertmanagerConfigs)) 486 487 err = n.ApplyConfig(cfg) 488 require.NoError(t, err, "Error applying the config.") 489 490 tgs := make(map[string][]*targetgroup.Group) 491 for _, tt := range tests { 492 for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { 493 tgs[k] = []*targetgroup.Group{ 494 tt.in, 495 } 496 break 497 } 498 n.reload(tgs) 499 res := n.Alertmanagers()[0].String() 500 501 require.Equal(t, tt.out, res) 502 } 503 504} 505 506func TestDroppedAlertmanagers(t *testing.T) { 507 var tests = []struct { 508 in *targetgroup.Group 509 out string 510 }{ 511 { 512 in: &targetgroup.Group{ 513 Targets: []model.LabelSet{ 514 { 515 "__address__": "alertmanager:9093", 516 }, 517 }, 518 }, 519 out: "http://alertmanager:9093/api/v2/alerts", 520 }, 521 } 522 523 n := NewManager(&Options{}, nil) 524 525 cfg := &config.Config{} 526 s := ` 527alerting: 528 alertmanagers: 529 - static_configs: 530 relabel_configs: 531 - source_labels: ['__address__'] 532 regex: 'alertmanager:9093' 533 action: drop 534` 535 err := yaml.UnmarshalStrict([]byte(s), cfg) 536 require.NoError(t, err, "Unable to load YAML config.") 537 require.Equal(t, 1, len(cfg.AlertingConfig.AlertmanagerConfigs)) 538 539 err = n.ApplyConfig(cfg) 540 require.NoError(t, err, "Error applying the config.") 541 542 tgs := make(map[string][]*targetgroup.Group) 543 for _, tt := range tests { 544 for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { 545 tgs[k] = []*targetgroup.Group{ 546 tt.in, 547 } 548 break 549 } 550 551 n.reload(tgs) 552 res := n.DroppedAlertmanagers()[0].String() 553 554 require.Equal(t, res, tt.out) 555 } 556} 557 558func makeInputTargetGroup() *targetgroup.Group { 559 return &targetgroup.Group{ 560 Targets: []model.LabelSet{ 561 { 562 model.AddressLabel: model.LabelValue("1.1.1.1:9090"), 563 model.LabelName("notcommon1"): model.LabelValue("label"), 564 }, 565 }, 566 Labels: model.LabelSet{ 567 model.LabelName("common"): model.LabelValue("label"), 568 }, 569 Source: "testsource", 570 } 571} 572 573func TestLabelsToOpenAPILabelSet(t *testing.T) { 574 require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.Labels{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}})) 575} 576