1// Copyright 2018 Prometheus Team
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package test
15
16import (
17	"bytes"
18	"context"
19	"fmt"
20	"io/ioutil"
21	"net"
22	"os"
23	"os/exec"
24	"path/filepath"
25	"sync"
26	"syscall"
27	"testing"
28	"time"
29
30	apiclient "github.com/prometheus/alertmanager/api/v2/client"
31	"github.com/prometheus/alertmanager/api/v2/client/alert"
32	"github.com/prometheus/alertmanager/api/v2/client/general"
33	"github.com/prometheus/alertmanager/api/v2/client/silence"
34	"github.com/prometheus/alertmanager/api/v2/models"
35
36	httptransport "github.com/go-openapi/runtime/client"
37	"github.com/go-openapi/strfmt"
38)
39
40// AcceptanceTest provides declarative definition of given inputs and expected
41// output of an Alertmanager setup.
42type AcceptanceTest struct {
43	*testing.T
44
45	opts *AcceptanceOpts
46
47	amc        *AlertmanagerCluster
48	collectors []*Collector
49
50	actions map[float64][]func()
51}
52
53// AcceptanceOpts defines configuration parameters for an acceptance test.
54type AcceptanceOpts struct {
55	RoutePrefix string
56	Tolerance   time.Duration
57	baseTime    time.Time
58}
59
60func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
61	if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
62		return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
63	}
64	return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
65}
66
67// expandTime returns the absolute time for the relative time
68// calculated from the test's base time.
69func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
70	return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
71}
72
73// expandTime returns the relative time for the given time
74// calculated from the test's base time.
75func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
76	return float64(act.Sub(opts.baseTime)) / float64(time.Second)
77}
78
79// NewAcceptanceTest returns a new acceptance test with the base time
80// set to the current time.
81func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
82	test := &AcceptanceTest{
83		T:       t,
84		opts:    opts,
85		actions: map[float64][]func(){},
86	}
87	// TODO: Should this really be set during creation time? Why not do this
88	// during Run() time, maybe there is something else long happening between
89	// creation and running.
90	opts.baseTime = time.Now()
91
92	return test
93}
94
95// freeAddress returns a new listen address not currently in use.
96func freeAddress() string {
97	// Let the OS allocate a free address, close it and hope
98	// it is still free when starting Alertmanager.
99	l, err := net.Listen("tcp4", "localhost:0")
100	if err != nil {
101		panic(err)
102	}
103	defer func() {
104		if err := l.Close(); err != nil {
105			panic(err)
106		}
107	}()
108
109	return l.Addr().String()
110}
111
112// Do sets the given function to be executed at the given time.
113func (t *AcceptanceTest) Do(at float64, f func()) {
114	t.actions[at] = append(t.actions[at], f)
115}
116
117// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
118// cluster of Alertmanager instances on random ports.
119func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
120	amc := AlertmanagerCluster{}
121
122	for i := 0; i < size; i++ {
123		am := &Alertmanager{
124			t:    t,
125			opts: t.opts,
126		}
127
128		dir, err := ioutil.TempDir("", "am_test")
129		if err != nil {
130			t.Fatal(err)
131		}
132		am.dir = dir
133
134		cf, err := os.Create(filepath.Join(dir, "config.yml"))
135		if err != nil {
136			t.Fatal(err)
137		}
138		am.confFile = cf
139		am.UpdateConfig(conf)
140
141		am.apiAddr = freeAddress()
142		am.clusterAddr = freeAddress()
143
144		transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
145		am.clientV2 = apiclient.New(transport, strfmt.Default)
146
147		amc.ams = append(amc.ams, am)
148	}
149
150	t.amc = &amc
151
152	return &amc
153}
154
155// Collector returns a new collector bound to the test instance.
156func (t *AcceptanceTest) Collector(name string) *Collector {
157	co := &Collector{
158		t:         t.T,
159		name:      name,
160		opts:      t.opts,
161		collected: map[float64][]models.GettableAlerts{},
162		expected:  map[Interval][]models.GettableAlerts{},
163	}
164	t.collectors = append(t.collectors, co)
165
166	return co
167}
168
169// Run starts all Alertmanagers and runs queries against them. It then checks
170// whether all expected notifications have arrived at the expected receiver.
171func (t *AcceptanceTest) Run() {
172	errc := make(chan error)
173
174	for _, am := range t.amc.ams {
175		am.errc = errc
176		defer func(am *Alertmanager) {
177			am.Terminate()
178			am.cleanup()
179			t.Logf("stdout:\n%v", am.cmd.Stdout)
180			t.Logf("stderr:\n%v", am.cmd.Stderr)
181		}(am)
182	}
183
184	err := t.amc.Start()
185	if err != nil {
186		t.T.Fatal(err)
187	}
188
189	go t.runActions()
190
191	var latest float64
192	for _, coll := range t.collectors {
193		if l := coll.latest(); l > latest {
194			latest = l
195		}
196	}
197
198	deadline := t.opts.expandTime(latest)
199
200	select {
201	case <-time.After(time.Until(deadline)):
202		// continue
203	case err := <-errc:
204		t.Error(err)
205	}
206}
207
208// runActions performs the stored actions at the defined times.
209func (t *AcceptanceTest) runActions() {
210	var wg sync.WaitGroup
211
212	for at, fs := range t.actions {
213		ts := t.opts.expandTime(at)
214		wg.Add(len(fs))
215
216		for _, f := range fs {
217			go func(f func()) {
218				time.Sleep(time.Until(ts))
219				f()
220				wg.Done()
221			}(f)
222		}
223	}
224
225	wg.Wait()
226}
227
228type buffer struct {
229	b   bytes.Buffer
230	mtx sync.Mutex
231}
232
233func (b *buffer) Write(p []byte) (int, error) {
234	b.mtx.Lock()
235	defer b.mtx.Unlock()
236	return b.b.Write(p)
237}
238
239func (b *buffer) String() string {
240	b.mtx.Lock()
241	defer b.mtx.Unlock()
242	return b.b.String()
243}
244
245// Alertmanager encapsulates an Alertmanager process and allows
246// declaring alerts being pushed to it at fixed points in time.
247type Alertmanager struct {
248	t    *AcceptanceTest
249	opts *AcceptanceOpts
250
251	apiAddr     string
252	clusterAddr string
253	clientV2    *apiclient.Alertmanager
254	cmd         *exec.Cmd
255	confFile    *os.File
256	dir         string
257
258	errc chan<- error
259}
260
261// AlertmanagerCluster represents a group of Alertmanager instances
262// acting as a cluster.
263type AlertmanagerCluster struct {
264	ams []*Alertmanager
265}
266
267// Start the Alertmanager cluster and wait until it is ready to receive.
268func (amc *AlertmanagerCluster) Start() error {
269	var peerFlags []string
270	for _, am := range amc.ams {
271		peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
272	}
273
274	for _, am := range amc.ams {
275		err := am.Start(peerFlags)
276		if err != nil {
277			return fmt.Errorf("starting alertmanager cluster: %v", err.Error())
278		}
279	}
280
281	for _, am := range amc.ams {
282		err := am.WaitForCluster(len(amc.ams))
283		if err != nil {
284			return fmt.Errorf("starting alertmanager cluster: %v", err.Error())
285		}
286	}
287
288	return nil
289}
290
291// Members returns the underlying slice of cluster members.
292func (amc *AlertmanagerCluster) Members() []*Alertmanager {
293	return amc.ams
294}
295
296// Start the alertmanager and wait until it is ready to receive.
297func (am *Alertmanager) Start(additionalArg []string) error {
298	am.t.Helper()
299	args := []string{
300		"--config.file", am.confFile.Name(),
301		"--log.level", "debug",
302		"--web.listen-address", am.apiAddr,
303		"--storage.path", am.dir,
304		"--cluster.listen-address", am.clusterAddr,
305		"--cluster.settle-timeout", "0s",
306	}
307	if am.opts.RoutePrefix != "" {
308		args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
309	}
310	args = append(args, additionalArg...)
311
312	cmd := exec.Command("../../../alertmanager", args...)
313
314	if am.cmd == nil {
315		var outb, errb buffer
316		cmd.Stdout = &outb
317		cmd.Stderr = &errb
318	} else {
319		cmd.Stdout = am.cmd.Stdout
320		cmd.Stderr = am.cmd.Stderr
321	}
322	am.cmd = cmd
323
324	if err := am.cmd.Start(); err != nil {
325		return fmt.Errorf("starting alertmanager failed: %s", err)
326	}
327
328	go func() {
329		if err := am.cmd.Wait(); err != nil {
330			am.errc <- err
331		}
332	}()
333
334	time.Sleep(50 * time.Millisecond)
335	for i := 0; i < 10; i++ {
336		_, err := am.clientV2.General.GetStatus(nil)
337		if err == nil {
338			return nil
339		}
340		time.Sleep(500 * time.Millisecond)
341	}
342	return fmt.Errorf("starting alertmanager failed: timeout")
343}
344
345// WaitForCluster waits for the Alertmanager instance to join a cluster with the
346// given size.
347func (am *Alertmanager) WaitForCluster(size int) error {
348	params := general.NewGetStatusParams()
349	params.WithContext(context.Background())
350	var status *general.GetStatusOK
351
352	// Poll for 2s
353	for i := 0; i < 20; i++ {
354		var err error
355		status, err = am.clientV2.General.GetStatus(params)
356		if err != nil {
357			return err
358		}
359
360		if len(status.Payload.Cluster.Peers) == size {
361			return nil
362		}
363		time.Sleep(100 * time.Millisecond)
364	}
365
366	return fmt.Errorf(
367		"failed to wait for Alertmanager instance %q to join cluster: expected %v peers, but got %v",
368		am.clusterAddr,
369		size,
370		len(status.Payload.Cluster.Peers),
371	)
372}
373
374// Terminate kills the underlying Alertmanager cluster processes and removes intermediate
375// data.
376func (amc *AlertmanagerCluster) Terminate() {
377	for _, am := range amc.ams {
378		am.Terminate()
379	}
380}
381
382// Terminate kills the underlying Alertmanager process and remove intermediate
383// data.
384func (am *Alertmanager) Terminate() {
385	am.t.Helper()
386	if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
387		am.t.Fatalf("Error sending SIGTERM to Alertmanager process: %v", err)
388	}
389}
390
391// Reload sends the reloading signal to the Alertmanager instances.
392func (amc *AlertmanagerCluster) Reload() {
393	for _, am := range amc.ams {
394		am.Reload()
395	}
396}
397
398// Reload sends the reloading signal to the Alertmanager process.
399func (am *Alertmanager) Reload() {
400	am.t.Helper()
401	if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
402		am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
403	}
404}
405
406func (am *Alertmanager) cleanup() {
407	am.t.Helper()
408	if err := os.RemoveAll(am.confFile.Name()); err != nil {
409		am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
410	}
411}
412
413// Push declares alerts that are to be pushed to the Alertmanager
414// servers at a relative point in time.
415func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) {
416	for _, am := range amc.ams {
417		am.Push(at, alerts...)
418	}
419}
420
421// Push declares alerts that are to be pushed to the Alertmanager
422// server at a relative point in time.
423func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
424	var cas models.PostableAlerts
425	for i := range alerts {
426		a := alerts[i].nativeAlert(am.opts)
427		alert := &models.PostableAlert{
428			Alert: models.Alert{
429				Labels:       a.Labels,
430				GeneratorURL: a.GeneratorURL,
431			},
432			Annotations: a.Annotations,
433		}
434		if a.StartsAt != nil {
435			alert.StartsAt = *a.StartsAt
436		}
437		if a.EndsAt != nil {
438			alert.EndsAt = *a.EndsAt
439		}
440		cas = append(cas, alert)
441	}
442
443	am.t.Do(at, func() {
444		params := alert.PostAlertsParams{}
445		params.WithContext(context.Background()).WithAlerts(cas)
446
447		_, err := am.clientV2.Alert.PostAlerts(&params)
448		if err != nil {
449			am.t.Errorf("Error pushing %v: %v", cas, err)
450		}
451	})
452}
453
454// SetSilence updates or creates the given Silence.
455func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
456	for _, am := range amc.ams {
457		am.SetSilence(at, sil)
458	}
459}
460
461// SetSilence updates or creates the given Silence.
462func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
463	am.t.Do(at, func() {
464		resp, err := am.clientV2.Silence.PostSilences(
465			silence.NewPostSilencesParams().WithSilence(
466				&models.PostableSilence{
467					Silence: *sil.nativeSilence(am.opts),
468				},
469			),
470		)
471		if err != nil {
472			am.t.Errorf("Error setting silence %v: %s", sil, err)
473			return
474		}
475		sil.SetID(resp.Payload.SilenceID)
476	})
477}
478
479// DelSilence deletes the silence with the sid at the given time.
480func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
481	for _, am := range amc.ams {
482		am.DelSilence(at, sil)
483	}
484}
485
486// DelSilence deletes the silence with the sid at the given time.
487func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
488	am.t.Do(at, func() {
489		_, err := am.clientV2.Silence.DeleteSilence(
490			silence.NewDeleteSilenceParams().WithSilenceID(strfmt.UUID(sil.ID())),
491		)
492		if err != nil {
493			am.t.Errorf("Error deleting silence %v: %s", sil, err)
494		}
495	})
496}
497
498// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
499// does not initiate config reloading.
500func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
501	for _, am := range amc.ams {
502		am.UpdateConfig(conf)
503	}
504}
505
506// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
507// initiate config reloading.
508func (am *Alertmanager) UpdateConfig(conf string) {
509	if _, err := am.confFile.WriteString(conf); err != nil {
510		am.t.Fatal(err)
511		return
512	}
513	if err := am.confFile.Sync(); err != nil {
514		am.t.Fatal(err)
515		return
516	}
517}
518
519// Client returns a client to interact with the API v2 endpoint.
520func (am *Alertmanager) Client() *apiclient.Alertmanager {
521	return am.clientV2
522}
523