1// Copyright 2013 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package notifier
15
16import (
17	"bytes"
18	"context"
19	"encoding/json"
20	"fmt"
21	"io/ioutil"
22	"net/http"
23	"net/http/httptest"
24	"net/url"
25	"testing"
26	"time"
27
28	"github.com/pkg/errors"
29	"github.com/prometheus/alertmanager/api/v2/models"
30	config_util "github.com/prometheus/common/config"
31	"github.com/prometheus/common/model"
32	"github.com/stretchr/testify/require"
33	"go.uber.org/atomic"
34	yaml "gopkg.in/yaml.v2"
35
36	"github.com/prometheus/prometheus/config"
37	"github.com/prometheus/prometheus/discovery/targetgroup"
38	"github.com/prometheus/prometheus/pkg/labels"
39	"github.com/prometheus/prometheus/pkg/relabel"
40)
41
42func TestPostPath(t *testing.T) {
43	var cases = []struct {
44		in, out string
45	}{
46		{
47			in:  "",
48			out: "/api/v1/alerts",
49		},
50		{
51			in:  "/",
52			out: "/api/v1/alerts",
53		},
54		{
55			in:  "/prefix",
56			out: "/prefix/api/v1/alerts",
57		},
58		{
59			in:  "/prefix//",
60			out: "/prefix/api/v1/alerts",
61		},
62		{
63			in:  "prefix//",
64			out: "/prefix/api/v1/alerts",
65		},
66	}
67	for _, c := range cases {
68		require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV1))
69	}
70}
71
72func TestHandlerNextBatch(t *testing.T) {
73	h := NewManager(&Options{}, nil)
74
75	for i := range make([]struct{}, 2*maxBatchSize+1) {
76		h.queue = append(h.queue, &Alert{
77			Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
78		})
79	}
80
81	expected := append([]*Alert{}, h.queue...)
82
83	require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch()))
84	require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch()))
85	require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch()))
86	require.Equal(t, 0, len(h.queue), "Expected queue to be empty but got %d alerts", len(h.queue))
87}
88
89func alertsEqual(a, b []*Alert) error {
90	if len(a) != len(b) {
91		return errors.Errorf("length mismatch: %v != %v", a, b)
92	}
93	for i, alert := range a {
94		if !labels.Equal(alert.Labels, b[i].Labels) {
95			return errors.Errorf("label mismatch at index %d: %s != %s", i, alert.Labels, b[i].Labels)
96		}
97	}
98	return nil
99}
100
101func TestHandlerSendAll(t *testing.T) {
102	var (
103		errc             = make(chan error, 1)
104		expected         = make([]*Alert, 0, maxBatchSize)
105		status1, status2 atomic.Int32
106	)
107	status1.Store(int32(http.StatusOK))
108	status2.Store(int32(http.StatusOK))
109
110	newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server {
111		return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
112			var err error
113			defer func() {
114				if err == nil {
115					return
116				}
117				select {
118				case errc <- err:
119				default:
120				}
121			}()
122			user, pass, _ := r.BasicAuth()
123			if user != u || pass != p {
124				err = errors.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p)
125				w.WriteHeader(http.StatusInternalServerError)
126				return
127			}
128
129			b, err := ioutil.ReadAll(r.Body)
130			if err != nil {
131				err = errors.Errorf("error reading body: %v", err)
132				w.WriteHeader(http.StatusInternalServerError)
133				return
134			}
135
136			var alerts []*Alert
137			err = json.Unmarshal(b, &alerts)
138			if err == nil {
139				err = alertsEqual(expected, alerts)
140			}
141			w.WriteHeader(int(status.Load()))
142		}))
143	}
144	server1 := newHTTPServer("prometheus", "testing_password", &status1)
145	server2 := newHTTPServer("", "", &status2)
146	defer server1.Close()
147	defer server2.Close()
148
149	h := NewManager(&Options{}, nil)
150
151	authClient, _ := config_util.NewClientFromConfig(
152		config_util.HTTPClientConfig{
153			BasicAuth: &config_util.BasicAuth{
154				Username: "prometheus",
155				Password: "testing_password",
156			},
157		}, "auth_alertmanager")
158
159	h.alertmanagers = make(map[string]*alertmanagerSet)
160
161	am1Cfg := config.DefaultAlertmanagerConfig
162	am1Cfg.Timeout = model.Duration(time.Second)
163
164	am2Cfg := config.DefaultAlertmanagerConfig
165	am2Cfg.Timeout = model.Duration(time.Second)
166
167	h.alertmanagers["1"] = &alertmanagerSet{
168		ams: []alertmanager{
169			alertmanagerMock{
170				urlf: func() string { return server1.URL },
171			},
172		},
173		cfg:    &am1Cfg,
174		client: authClient,
175	}
176
177	h.alertmanagers["2"] = &alertmanagerSet{
178		ams: []alertmanager{
179			alertmanagerMock{
180				urlf: func() string { return server2.URL },
181			},
182		},
183		cfg: &am2Cfg,
184	}
185
186	for i := range make([]struct{}, maxBatchSize) {
187		h.queue = append(h.queue, &Alert{
188			Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
189		})
190		expected = append(expected, &Alert{
191			Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
192		})
193	}
194
195	checkNoErr := func() {
196		t.Helper()
197		select {
198		case err := <-errc:
199			require.NoError(t, err)
200		default:
201		}
202	}
203
204	require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
205	checkNoErr()
206
207	status1.Store(int32(http.StatusNotFound))
208	require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
209	checkNoErr()
210
211	status2.Store(int32(http.StatusInternalServerError))
212	require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly")
213	checkNoErr()
214}
215
216func TestCustomDo(t *testing.T) {
217	const testURL = "http://testurl.com/"
218	const testBody = "testbody"
219
220	var received bool
221	h := NewManager(&Options{
222		Do: func(_ context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
223			received = true
224			body, err := ioutil.ReadAll(req.Body)
225
226			require.NoError(t, err)
227
228			require.Equal(t, testBody, string(body))
229
230			require.Equal(t, testURL, req.URL.String())
231
232			return &http.Response{
233				Body: ioutil.NopCloser(bytes.NewBuffer(nil)),
234			}, nil
235		},
236	}, nil)
237
238	h.sendOne(context.Background(), nil, testURL, []byte(testBody))
239
240	require.True(t, received, "Expected to receive an alert, but didn't")
241}
242
243func TestExternalLabels(t *testing.T) {
244	h := NewManager(&Options{
245		QueueCapacity:  3 * maxBatchSize,
246		ExternalLabels: labels.Labels{{Name: "a", Value: "b"}},
247		RelabelConfigs: []*relabel.Config{
248			{
249				SourceLabels: model.LabelNames{"alertname"},
250				TargetLabel:  "a",
251				Action:       "replace",
252				Regex:        relabel.MustNewRegexp("externalrelabelthis"),
253				Replacement:  "c",
254			},
255		},
256	}, nil)
257
258	// This alert should get the external label attached.
259	h.Send(&Alert{
260		Labels: labels.FromStrings("alertname", "test"),
261	})
262
263	// This alert should get the external label attached, but then set to "c"
264	// through relabelling.
265	h.Send(&Alert{
266		Labels: labels.FromStrings("alertname", "externalrelabelthis"),
267	})
268
269	expected := []*Alert{
270		{Labels: labels.FromStrings("alertname", "test", "a", "b")},
271		{Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")},
272	}
273
274	require.NoError(t, alertsEqual(expected, h.queue))
275}
276
277func TestHandlerRelabel(t *testing.T) {
278	h := NewManager(&Options{
279		QueueCapacity: 3 * maxBatchSize,
280		RelabelConfigs: []*relabel.Config{
281			{
282				SourceLabels: model.LabelNames{"alertname"},
283				Action:       "drop",
284				Regex:        relabel.MustNewRegexp("drop"),
285			},
286			{
287				SourceLabels: model.LabelNames{"alertname"},
288				TargetLabel:  "alertname",
289				Action:       "replace",
290				Regex:        relabel.MustNewRegexp("rename"),
291				Replacement:  "renamed",
292			},
293		},
294	}, nil)
295
296	// This alert should be dropped due to the configuration
297	h.Send(&Alert{
298		Labels: labels.FromStrings("alertname", "drop"),
299	})
300
301	// This alert should be replaced due to the configuration
302	h.Send(&Alert{
303		Labels: labels.FromStrings("alertname", "rename"),
304	})
305
306	expected := []*Alert{
307		{Labels: labels.FromStrings("alertname", "renamed")},
308	}
309
310	require.NoError(t, alertsEqual(expected, h.queue))
311}
312
313func TestHandlerQueuing(t *testing.T) {
314	var (
315		expectedc = make(chan []*Alert)
316		called    = make(chan struct{})
317		done      = make(chan struct{})
318		errc      = make(chan error, 1)
319	)
320
321	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
322		// Notify the test function that we have received something.
323		select {
324		case called <- struct{}{}:
325		case <-done:
326			return
327		}
328
329		// Wait for the test function to unblock us.
330		select {
331		case expected := <-expectedc:
332			var alerts []*Alert
333
334			b, err := ioutil.ReadAll(r.Body)
335			if err != nil {
336				panic(err)
337			}
338
339			err = json.Unmarshal(b, &alerts)
340			if err == nil {
341				err = alertsEqual(expected, alerts)
342			}
343			select {
344			case errc <- err:
345			default:
346			}
347		case <-done:
348		}
349	}))
350	defer func() {
351		close(done)
352		server.Close()
353	}()
354
355	h := NewManager(
356		&Options{
357			QueueCapacity: 3 * maxBatchSize,
358		},
359		nil,
360	)
361
362	h.alertmanagers = make(map[string]*alertmanagerSet)
363
364	am1Cfg := config.DefaultAlertmanagerConfig
365	am1Cfg.Timeout = model.Duration(time.Second)
366
367	h.alertmanagers["1"] = &alertmanagerSet{
368		ams: []alertmanager{
369			alertmanagerMock{
370				urlf: func() string { return server.URL },
371			},
372		},
373		cfg: &am1Cfg,
374	}
375	go h.Run(nil)
376	defer h.Stop()
377
378	var alerts []*Alert
379	for i := range make([]struct{}, 20*maxBatchSize) {
380		alerts = append(alerts, &Alert{
381			Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
382		})
383	}
384
385	assertAlerts := func(expected []*Alert) {
386		t.Helper()
387		for {
388			select {
389			case <-called:
390				expectedc <- expected
391			case err := <-errc:
392				require.NoError(t, err)
393				return
394			case <-time.After(5 * time.Second):
395				require.FailNow(t, "Alerts were not pushed.")
396			}
397		}
398	}
399
400	// If the batch is larger than the queue capacity, it should be truncated
401	// from the front.
402	h.Send(alerts[:4*maxBatchSize]...)
403	for i := 1; i < 4; i++ {
404		assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize])
405	}
406
407	// Send one batch, wait for it to arrive and block the server so the queue fills up.
408	h.Send(alerts[:maxBatchSize]...)
409	<-called
410
411	// Send several batches while the server is still blocked so the queue
412	// fills up to its maximum capacity (3*maxBatchSize). Then check that the
413	// queue is truncated in the front.
414	h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) // this batch should be dropped.
415	h.Send(alerts[2*maxBatchSize : 3*maxBatchSize]...)
416	h.Send(alerts[3*maxBatchSize : 4*maxBatchSize]...)
417
418	// Send the batch that drops the first one.
419	h.Send(alerts[4*maxBatchSize : 5*maxBatchSize]...)
420
421	// Unblock the server.
422	expectedc <- alerts[:maxBatchSize]
423	select {
424	case err := <-errc:
425		require.NoError(t, err)
426	case <-time.After(5 * time.Second):
427		require.FailNow(t, "Alerts were not pushed.")
428	}
429
430	// Verify that we receive the last 3 batches.
431	for i := 2; i < 5; i++ {
432		assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize])
433	}
434}
435
436type alertmanagerMock struct {
437	urlf func() string
438}
439
440func (a alertmanagerMock) url() *url.URL {
441	u, err := url.Parse(a.urlf())
442	if err != nil {
443		panic(err)
444	}
445	return u
446}
447
448func TestLabelSetNotReused(t *testing.T) {
449	tg := makeInputTargetGroup()
450	_, _, err := alertmanagerFromGroup(tg, &config.AlertmanagerConfig{})
451
452	require.NoError(t, err)
453
454	// Target modified during alertmanager extraction
455	require.Equal(t, tg, makeInputTargetGroup())
456}
457
458func TestReload(t *testing.T) {
459	var tests = []struct {
460		in  *targetgroup.Group
461		out string
462	}{
463		{
464			in: &targetgroup.Group{
465				Targets: []model.LabelSet{
466					{
467						"__address__": "alertmanager:9093",
468					},
469				},
470			},
471			out: "http://alertmanager:9093/api/v2/alerts",
472		},
473	}
474
475	n := NewManager(&Options{}, nil)
476
477	cfg := &config.Config{}
478	s := `
479alerting:
480  alertmanagers:
481  - static_configs:
482`
483	err := yaml.UnmarshalStrict([]byte(s), cfg)
484	require.NoError(t, err, "Unable to load YAML config.")
485	require.Equal(t, 1, len(cfg.AlertingConfig.AlertmanagerConfigs))
486
487	err = n.ApplyConfig(cfg)
488	require.NoError(t, err, "Error applying the config.")
489
490	tgs := make(map[string][]*targetgroup.Group)
491	for _, tt := range tests {
492		for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
493			tgs[k] = []*targetgroup.Group{
494				tt.in,
495			}
496			break
497		}
498		n.reload(tgs)
499		res := n.Alertmanagers()[0].String()
500
501		require.Equal(t, tt.out, res)
502	}
503
504}
505
506func TestDroppedAlertmanagers(t *testing.T) {
507	var tests = []struct {
508		in  *targetgroup.Group
509		out string
510	}{
511		{
512			in: &targetgroup.Group{
513				Targets: []model.LabelSet{
514					{
515						"__address__": "alertmanager:9093",
516					},
517				},
518			},
519			out: "http://alertmanager:9093/api/v2/alerts",
520		},
521	}
522
523	n := NewManager(&Options{}, nil)
524
525	cfg := &config.Config{}
526	s := `
527alerting:
528  alertmanagers:
529  - static_configs:
530    relabel_configs:
531      - source_labels: ['__address__']
532        regex: 'alertmanager:9093'
533        action: drop
534`
535	err := yaml.UnmarshalStrict([]byte(s), cfg)
536	require.NoError(t, err, "Unable to load YAML config.")
537	require.Equal(t, 1, len(cfg.AlertingConfig.AlertmanagerConfigs))
538
539	err = n.ApplyConfig(cfg)
540	require.NoError(t, err, "Error applying the config.")
541
542	tgs := make(map[string][]*targetgroup.Group)
543	for _, tt := range tests {
544		for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
545			tgs[k] = []*targetgroup.Group{
546				tt.in,
547			}
548			break
549		}
550
551		n.reload(tgs)
552		res := n.DroppedAlertmanagers()[0].String()
553
554		require.Equal(t, res, tt.out)
555	}
556}
557
558func makeInputTargetGroup() *targetgroup.Group {
559	return &targetgroup.Group{
560		Targets: []model.LabelSet{
561			{
562				model.AddressLabel:            model.LabelValue("1.1.1.1:9090"),
563				model.LabelName("notcommon1"): model.LabelValue("label"),
564			},
565		},
566		Labels: model.LabelSet{
567			model.LabelName("common"): model.LabelValue("label"),
568		},
569		Source: "testsource",
570	}
571}
572
573func TestLabelsToOpenAPILabelSet(t *testing.T) {
574	require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.Labels{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}}))
575}
576