1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4// Package alert contains logic to send alert notifications to Alertmanager clusters.
5package alert
6
7import (
8	"bytes"
9	"context"
10	"encoding/json"
11	"fmt"
12	"io"
13	"net/http"
14	"net/url"
15	"path"
16	"sync"
17	"time"
18
19	"github.com/go-kit/kit/log"
20	"github.com/go-kit/kit/log/level"
21	"github.com/go-openapi/strfmt"
22	"github.com/pkg/errors"
23	"github.com/prometheus/alertmanager/api/v2/models"
24	"github.com/prometheus/client_golang/prometheus"
25	"github.com/prometheus/client_golang/prometheus/promauto"
26	"github.com/prometheus/prometheus/pkg/labels"
27	"go.uber.org/atomic"
28
29	"github.com/thanos-io/thanos/pkg/runutil"
30	"github.com/thanos-io/thanos/pkg/tracing"
31)
32
33const (
34	defaultAlertmanagerPort = 9093
35	contentTypeJSON         = "application/json"
36)
37
38// Alert is a generic representation of an alert in the Prometheus eco-system.
39type Alert struct {
40	// Label value pairs for purpose of aggregation, matching, and disposition
41	// dispatching. This must minimally include an "alertname" label.
42	Labels labels.Labels `json:"labels"`
43
44	// Extra key/value information which does not define alert identity.
45	Annotations labels.Labels `json:"annotations"`
46
47	// The known time range for this alert. Start and end time are both optional.
48	StartsAt     time.Time `json:"startsAt,omitempty"`
49	EndsAt       time.Time `json:"endsAt,omitempty"`
50	GeneratorURL string    `json:"generatorURL,omitempty"`
51}
52
53// Name returns the name of the alert. It is equivalent to the "alertname" label.
54func (a *Alert) Name() string {
55	return a.Labels.Get(labels.AlertName)
56}
57
58// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
59func (a *Alert) Hash() uint64 {
60	return a.Labels.Hash()
61}
62
63func (a *Alert) String() string {
64	s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7])
65	if a.Resolved() {
66		return s + "[resolved]"
67	}
68	return s + "[active]"
69}
70
71// Resolved returns true iff the activity interval ended in the past.
72func (a *Alert) Resolved() bool {
73	return a.ResolvedAt(time.Now())
74}
75
76// ResolvedAt returns true off the activity interval ended before
77// the given timestamp.
78func (a *Alert) ResolvedAt(ts time.Time) bool {
79	if a.EndsAt.IsZero() {
80		return false
81	}
82	return !a.EndsAt.After(ts)
83}
84
85// Queue is a queue of alert notifications waiting to be sent. The queue is consumed in batches
86// and entries are dropped at the front if it runs full.
87type Queue struct {
88	logger          log.Logger
89	maxBatchSize    int
90	capacity        int
91	toAddLset       labels.Labels
92	toExcludeLabels labels.Labels
93
94	mtx   sync.Mutex
95	queue []*Alert
96	morec chan struct{}
97
98	pushed  prometheus.Counter
99	popped  prometheus.Counter
100	dropped prometheus.Counter
101}
102
103func relabelLabels(lset labels.Labels, excludeLset []string) (toAdd labels.Labels, toExclude labels.Labels) {
104	for _, ln := range excludeLset {
105		toExclude = append(toExclude, labels.Label{Name: ln})
106	}
107
108	for _, l := range lset {
109		// Exclude labels to  to add straight away.
110		if toExclude.Has(l.Name) {
111			continue
112		}
113		toAdd = append(toAdd, labels.Label{
114			Name:  l.Name,
115			Value: l.Value,
116		})
117	}
118	return toAdd, toExclude
119}
120
121// NewQueue returns a new queue. The given label set is attached to all alerts pushed to the queue.
122// The given exclude label set tells what label names to drop including external labels.
123func NewQueue(logger log.Logger, reg prometheus.Registerer, capacity, maxBatchSize int, externalLset labels.Labels, excludeLabels []string) *Queue {
124	toAdd, toExclude := relabelLabels(externalLset, excludeLabels)
125
126	if logger == nil {
127		logger = log.NewNopLogger()
128	}
129	q := &Queue{
130		logger:          logger,
131		capacity:        capacity,
132		morec:           make(chan struct{}, 1),
133		maxBatchSize:    maxBatchSize,
134		toAddLset:       toAdd,
135		toExcludeLabels: toExclude,
136
137		dropped: promauto.With(reg).NewCounter(prometheus.CounterOpts{
138			Name: "thanos_alert_queue_alerts_dropped_total",
139			Help: "Total number of alerts that were dropped from the queue.",
140		}),
141		pushed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
142			Name: "thanos_alert_queue_alerts_pushed_total",
143			Help: "Total number of alerts pushed to the queue.",
144		}),
145		popped: promauto.With(reg).NewCounter(prometheus.CounterOpts{
146			Name: "thanos_alert_queue_alerts_popped_total",
147			Help: "Total number of alerts popped from the queue.",
148		}),
149	}
150	_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
151		Name: "thanos_alert_queue_capacity",
152		Help: "Capacity of the alert queue.",
153	}, func() float64 {
154		return float64(q.Cap())
155	})
156	_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
157		Name: "thanos_alert_queue_length",
158		Help: "Length of the alert queue.",
159	}, func() float64 {
160		return float64(q.Len())
161	})
162	return q
163}
164
165// Len returns the current length of the queue.
166func (q *Queue) Len() int {
167	q.mtx.Lock()
168	defer q.mtx.Unlock()
169	return len(q.queue)
170}
171
172// Cap returns the fixed capacity of the queue.
173func (q *Queue) Cap() int {
174	return q.capacity
175}
176
177// Pop takes a batch of alerts from the front of the queue. The batch size is limited
178// according to the queues maxBatchSize limit.
179// It blocks until elements are available or a termination signal is send on termc.
180func (q *Queue) Pop(termc <-chan struct{}) []*Alert {
181	select {
182	case <-termc:
183		return nil
184	case <-q.morec:
185	}
186
187	q.mtx.Lock()
188	defer q.mtx.Unlock()
189
190	as := make([]*Alert, q.maxBatchSize)
191	n := copy(as, q.queue)
192	q.queue = q.queue[n:]
193
194	q.popped.Add(float64(n))
195
196	if len(q.queue) > 0 {
197		select {
198		case q.morec <- struct{}{}:
199		default:
200		}
201	}
202	return as[:n]
203}
204
205// Push adds a list of alerts to the queue.
206func (q *Queue) Push(alerts []*Alert) {
207	if len(alerts) == 0 {
208		return
209	}
210
211	q.mtx.Lock()
212	defer q.mtx.Unlock()
213
214	q.pushed.Add(float64(len(alerts)))
215
216	// Attach external labels and drop excluded labels before sending.
217	// TODO(bwplotka): User proper relabelling with https://github.com/thanos-io/thanos/issues/660.
218	for _, a := range alerts {
219		lb := labels.NewBuilder(labels.Labels{})
220		for _, l := range a.Labels {
221			if q.toExcludeLabels.Has(l.Name) {
222				continue
223			}
224			lb.Set(l.Name, l.Value)
225		}
226		for _, l := range q.toAddLset {
227			lb.Set(l.Name, l.Value)
228		}
229		a.Labels = lb.Labels()
230	}
231
232	// Queue capacity should be significantly larger than a single alert
233	// batch could be.
234	if d := len(alerts) - q.capacity; d > 0 {
235		alerts = alerts[d:]
236
237		level.Warn(q.logger).Log(
238			"msg", "Alert batch larger than queue capacity, dropping alerts",
239			"numDropped", d)
240		q.dropped.Add(float64(d))
241	}
242
243	// If the queue is full, remove the oldest alerts in favor
244	// of newer ones.
245	if d := (len(q.queue) + len(alerts)) - q.capacity; d > 0 {
246		q.queue = q.queue[d:]
247
248		level.Warn(q.logger).Log(
249			"msg", "Alert notification queue full, dropping alerts",
250			"numDropped", d)
251		q.dropped.Add(float64(d))
252	}
253
254	q.queue = append(q.queue, alerts...)
255
256	select {
257	case q.morec <- struct{}{}:
258	default:
259	}
260}
261
262// Sender sends notifications to a dynamic set of alertmanagers.
263type Sender struct {
264	logger        log.Logger
265	alertmanagers []*Alertmanager
266	versions      []APIVersion
267
268	sent    *prometheus.CounterVec
269	errs    *prometheus.CounterVec
270	dropped prometheus.Counter
271	latency *prometheus.HistogramVec
272}
273
274// NewSender returns a new sender. On each call to Send the entire alert batch is sent
275// to each Alertmanager returned by the getter function.
276func NewSender(
277	logger log.Logger,
278	reg prometheus.Registerer,
279	alertmanagers []*Alertmanager,
280) *Sender {
281	if logger == nil {
282		logger = log.NewNopLogger()
283	}
284	var (
285		versions       []APIVersion
286		versionPresent map[APIVersion]struct{}
287	)
288	for _, am := range alertmanagers {
289		if _, found := versionPresent[am.version]; found {
290			continue
291		}
292		versions = append(versions, am.version)
293	}
294	s := &Sender{
295		logger:        logger,
296		alertmanagers: alertmanagers,
297		versions:      versions,
298
299		sent: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
300			Name: "thanos_alert_sender_alerts_sent_total",
301			Help: "Total number of alerts sent by alertmanager.",
302		}, []string{"alertmanager"}),
303
304		errs: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
305			Name: "thanos_alert_sender_errors_total",
306			Help: "Total number of errors while sending alerts to alertmanager.",
307		}, []string{"alertmanager"}),
308
309		dropped: promauto.With(reg).NewCounter(prometheus.CounterOpts{
310			Name: "thanos_alert_sender_alerts_dropped_total",
311			Help: "Total number of alerts dropped in case of all sends to alertmanagers failed.",
312		}),
313
314		latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
315			Name: "thanos_alert_sender_latency_seconds",
316			Help: "Latency for sending alert notifications (not including dropped notifications).",
317		}, []string{"alertmanager"}),
318	}
319	return s
320}
321
322func toAPILabels(labels labels.Labels) models.LabelSet {
323	apiLabels := make(models.LabelSet, len(labels))
324	for _, label := range labels {
325		apiLabels[label.Name] = label.Value
326	}
327
328	return apiLabels
329}
330
331// Send an alert batch to all given Alertmanager clients.
332// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
333func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
334	if len(alerts) == 0 {
335		return
336	}
337
338	payload := make(map[APIVersion][]byte)
339	for _, version := range s.versions {
340		var (
341			b   []byte
342			err error
343		)
344		switch version {
345		case APIv1:
346			if b, err = json.Marshal(alerts); err != nil {
347				level.Warn(s.logger).Log("msg", "encoding alerts for v1 API failed", "err", err)
348				return
349			}
350		case APIv2:
351			apiAlerts := make(models.PostableAlerts, 0, len(alerts))
352			for _, a := range alerts {
353				apiAlerts = append(apiAlerts, &models.PostableAlert{
354					Annotations: toAPILabels(a.Annotations),
355					EndsAt:      strfmt.DateTime(a.EndsAt),
356					StartsAt:    strfmt.DateTime(a.StartsAt),
357					Alert: models.Alert{
358						GeneratorURL: strfmt.URI(a.GeneratorURL),
359						Labels:       toAPILabels(a.Labels),
360					},
361				})
362			}
363			if b, err = json.Marshal(apiAlerts); err != nil {
364				level.Warn(s.logger).Log("msg", "encoding alerts for v2 API failed", "err", err)
365				return
366			}
367		}
368		payload[version] = b
369	}
370
371	var (
372		wg         sync.WaitGroup
373		numSuccess atomic.Uint64
374	)
375	for _, am := range s.alertmanagers {
376		for _, u := range am.dispatcher.Endpoints() {
377			wg.Add(1)
378			go func(am *Alertmanager, u url.URL) {
379				defer wg.Done()
380
381				level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
382				start := time.Now()
383				u.Path = path.Join(u.Path, fmt.Sprintf("/api/%s/alerts", string(am.version)))
384
385				tracing.DoInSpan(ctx, "post_alerts HTTP[client]", func(ctx context.Context) {
386					if err := am.postAlerts(ctx, u, bytes.NewReader(payload[am.version])); err != nil {
387						level.Warn(s.logger).Log(
388							"msg", "sending alerts failed",
389							"alertmanager", u.Host,
390							"alerts", string(payload[am.version]),
391							"err", err,
392						)
393						s.errs.WithLabelValues(u.Host).Inc()
394						return
395					}
396					s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds())
397					s.sent.WithLabelValues(u.Host).Add(float64(len(alerts)))
398
399					numSuccess.Inc()
400				})
401			}(am, *u)
402		}
403	}
404	wg.Wait()
405
406	if numSuccess.Load() > 0 {
407		return
408	}
409
410	s.dropped.Add(float64(len(alerts)))
411	level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "numAlerts", len(alerts))
412}
413
414type Dispatcher interface {
415	// Endpoints returns the list of endpoint URLs the dispatcher knows about.
416	Endpoints() []*url.URL
417	// Do sends an HTTP request and returns a response.
418	Do(*http.Request) (*http.Response, error)
419}
420
421// Alertmanager is an HTTP client that can send alerts to a cluster of Alertmanager endpoints.
422type Alertmanager struct {
423	logger     log.Logger
424	dispatcher Dispatcher
425	timeout    time.Duration
426	version    APIVersion
427}
428
429// NewAlertmanager returns a new Alertmanager client.
430func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration, version APIVersion) *Alertmanager {
431	if logger == nil {
432		logger = log.NewNopLogger()
433	}
434
435	return &Alertmanager{
436		logger:     logger,
437		dispatcher: dispatcher,
438		timeout:    timeout,
439		version:    version,
440	}
441}
442
443func (a *Alertmanager) postAlerts(ctx context.Context, u url.URL, r io.Reader) error {
444	req, err := http.NewRequest("POST", u.String(), r)
445	if err != nil {
446		return err
447	}
448	ctx, cancel := context.WithTimeout(ctx, a.timeout)
449	defer cancel()
450	req = req.WithContext(ctx)
451	req.Header.Set("Content-Type", contentTypeJSON)
452
453	resp, err := a.dispatcher.Do(req)
454	if err != nil {
455		return errors.Wrapf(err, "send request to %q", u.String())
456	}
457	defer runutil.ExhaustCloseWithLogOnErr(a.logger, resp.Body, "send one alert")
458
459	if resp.StatusCode/100 != 2 {
460		return errors.Errorf("bad response status %v from %q", resp.Status, u.String())
461	}
462	return nil
463}
464