1// Copyright 2015 Prometheus Team
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package notify
15
16import (
17	"context"
18	"fmt"
19	"sort"
20	"sync"
21	"time"
22
23	"github.com/cenkalti/backoff"
24	"github.com/cespare/xxhash"
25	"github.com/go-kit/kit/log"
26	"github.com/go-kit/kit/log/level"
27	"github.com/prometheus/client_golang/prometheus"
28	"github.com/prometheus/common/model"
29
30	"github.com/prometheus/alertmanager/cluster"
31	"github.com/prometheus/alertmanager/inhibit"
32	"github.com/prometheus/alertmanager/nflog"
33	"github.com/prometheus/alertmanager/nflog/nflogpb"
34	"github.com/prometheus/alertmanager/silence"
35	"github.com/prometheus/alertmanager/types"
36)
37
38// ResolvedSender returns true if resolved notifications should be sent.
39type ResolvedSender interface {
40	SendResolved() bool
41}
42
43// MinTimeout is the minimum timeout that is set for the context of a call
44// to a notification pipeline.
45const MinTimeout = 10 * time.Second
46
47// Notifier notifies about alerts under constraints of the given context. It
48// returns an error if unsuccessful and a flag whether the error is
49// recoverable. This information is useful for a retry logic.
50type Notifier interface {
51	Notify(context.Context, ...*types.Alert) (bool, error)
52}
53
54// Integration wraps a notifier and its configuration to be uniquely identified
55// by name and index from its origin in the configuration.
56type Integration struct {
57	notifier Notifier
58	rs       ResolvedSender
59	name     string
60	idx      int
61}
62
63// NewIntegration returns a new integration.
64func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration {
65	return Integration{
66		notifier: notifier,
67		rs:       rs,
68		name:     name,
69		idx:      idx,
70	}
71}
72
73// Notify implements the Notifier interface.
74func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
75	return i.notifier.Notify(ctx, alerts...)
76}
77
78// SendResolved implements the ResolvedSender interface.
79func (i *Integration) SendResolved() bool {
80	return i.rs.SendResolved()
81}
82
83// Name returns the name of the integration.
84func (i *Integration) Name() string {
85	return i.name
86}
87
88// Index returns the index of the integration.
89func (i *Integration) Index() int {
90	return i.idx
91}
92
93// notifyKey defines a custom type with which a context is populated to
94// avoid accidental collisions.
95type notifyKey int
96
97const (
98	keyReceiverName notifyKey = iota
99	keyRepeatInterval
100	keyGroupLabels
101	keyGroupKey
102	keyFiringAlerts
103	keyResolvedAlerts
104	keyNow
105)
106
107// WithReceiverName populates a context with a receiver name.
108func WithReceiverName(ctx context.Context, rcv string) context.Context {
109	return context.WithValue(ctx, keyReceiverName, rcv)
110}
111
112// WithGroupKey populates a context with a group key.
113func WithGroupKey(ctx context.Context, s string) context.Context {
114	return context.WithValue(ctx, keyGroupKey, s)
115}
116
117// WithFiringAlerts populates a context with a slice of firing alerts.
118func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context {
119	return context.WithValue(ctx, keyFiringAlerts, alerts)
120}
121
122// WithResolvedAlerts populates a context with a slice of resolved alerts.
123func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context {
124	return context.WithValue(ctx, keyResolvedAlerts, alerts)
125}
126
127// WithGroupLabels populates a context with grouping labels.
128func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
129	return context.WithValue(ctx, keyGroupLabels, lset)
130}
131
132// WithNow populates a context with a now timestamp.
133func WithNow(ctx context.Context, t time.Time) context.Context {
134	return context.WithValue(ctx, keyNow, t)
135}
136
137// WithRepeatInterval populates a context with a repeat interval.
138func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
139	return context.WithValue(ctx, keyRepeatInterval, t)
140}
141
142// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
143// second argument is false.
144func RepeatInterval(ctx context.Context) (time.Duration, bool) {
145	v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
146	return v, ok
147}
148
149// ReceiverName extracts a receiver name from the context. Iff none exists, the
150// second argument is false.
151func ReceiverName(ctx context.Context) (string, bool) {
152	v, ok := ctx.Value(keyReceiverName).(string)
153	return v, ok
154}
155
156// GroupKey extracts a group key from the context. Iff none exists, the
157// second argument is false.
158func GroupKey(ctx context.Context) (string, bool) {
159	v, ok := ctx.Value(keyGroupKey).(string)
160	return v, ok
161}
162
163// GroupLabels extracts grouping label set from the context. Iff none exists, the
164// second argument is false.
165func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
166	v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
167	return v, ok
168}
169
170// Now extracts a now timestamp from the context. Iff none exists, the
171// second argument is false.
172func Now(ctx context.Context) (time.Time, bool) {
173	v, ok := ctx.Value(keyNow).(time.Time)
174	return v, ok
175}
176
177// FiringAlerts extracts a slice of firing alerts from the context.
178// Iff none exists, the second argument is false.
179func FiringAlerts(ctx context.Context) ([]uint64, bool) {
180	v, ok := ctx.Value(keyFiringAlerts).([]uint64)
181	return v, ok
182}
183
184// ResolvedAlerts extracts a slice of firing alerts from the context.
185// Iff none exists, the second argument is false.
186func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
187	v, ok := ctx.Value(keyResolvedAlerts).([]uint64)
188	return v, ok
189}
190
191// A Stage processes alerts under the constraints of the given context.
192type Stage interface {
193	Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
194}
195
196// StageFunc wraps a function to represent a Stage.
197type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
198
199// Exec implements Stage interface.
200func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
201	return f(ctx, l, alerts...)
202}
203
204type NotificationLog interface {
205	Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
206	Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
207}
208
209type metrics struct {
210	numNotifications           *prometheus.CounterVec
211	numFailedNotifications     *prometheus.CounterVec
212	notificationLatencySeconds *prometheus.HistogramVec
213}
214
215func newMetrics(r prometheus.Registerer) *metrics {
216	m := &metrics{
217		numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
218			Namespace: "alertmanager",
219			Name:      "notifications_total",
220			Help:      "The total number of attempted notifications.",
221		}, []string{"integration"}),
222		numFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
223			Namespace: "alertmanager",
224			Name:      "notifications_failed_total",
225			Help:      "The total number of failed notifications.",
226		}, []string{"integration"}),
227		notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
228			Namespace: "alertmanager",
229			Name:      "notification_latency_seconds",
230			Help:      "The latency of notifications in seconds.",
231			Buckets:   []float64{1, 5, 10, 15, 20},
232		}, []string{"integration"}),
233	}
234	for _, integration := range []string{
235		"email",
236		"hipchat",
237		"pagerduty",
238		"wechat",
239		"pushover",
240		"slack",
241		"opsgenie",
242		"webhook",
243		"victorops",
244	} {
245		m.numNotifications.WithLabelValues(integration)
246		m.numFailedNotifications.WithLabelValues(integration)
247		m.notificationLatencySeconds.WithLabelValues(integration)
248	}
249	r.MustRegister(m.numNotifications, m.numFailedNotifications, m.notificationLatencySeconds)
250	return m
251}
252
253type PipelineBuilder struct {
254	metrics *metrics
255}
256
257func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder {
258	return &PipelineBuilder{
259		metrics: newMetrics(r),
260	}
261}
262
263// New returns a map of receivers to Stages.
264func (pb *PipelineBuilder) New(
265	receivers map[string][]Integration,
266	wait func() time.Duration,
267	inhibitor *inhibit.Inhibitor,
268	silencer *silence.Silencer,
269	notificationLog NotificationLog,
270	peer *cluster.Peer,
271) RoutingStage {
272	rs := make(RoutingStage, len(receivers))
273
274	ms := NewGossipSettleStage(peer)
275	is := NewMuteStage(inhibitor)
276	ss := NewMuteStage(silencer)
277
278	for name := range receivers {
279		st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
280		rs[name] = MultiStage{ms, is, ss, st}
281	}
282	return rs
283}
284
285// createReceiverStage creates a pipeline of stages for a receiver.
286func createReceiverStage(
287	name string,
288	integrations []Integration,
289	wait func() time.Duration,
290	notificationLog NotificationLog,
291	metrics *metrics,
292) Stage {
293	var fs FanoutStage
294	for i := range integrations {
295		recv := &nflogpb.Receiver{
296			GroupName:   name,
297			Integration: integrations[i].Name(),
298			Idx:         uint32(integrations[i].Index()),
299		}
300		var s MultiStage
301		s = append(s, NewWaitStage(wait))
302		s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
303		s = append(s, NewRetryStage(integrations[i], name, metrics))
304		s = append(s, NewSetNotifiesStage(notificationLog, recv))
305
306		fs = append(fs, s)
307	}
308	return fs
309}
310
311// RoutingStage executes the inner stages based on the receiver specified in
312// the context.
313type RoutingStage map[string]Stage
314
315// Exec implements the Stage interface.
316func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
317	receiver, ok := ReceiverName(ctx)
318	if !ok {
319		return ctx, nil, fmt.Errorf("receiver missing")
320	}
321
322	s, ok := rs[receiver]
323	if !ok {
324		return ctx, nil, fmt.Errorf("stage for receiver missing")
325	}
326
327	return s.Exec(ctx, l, alerts...)
328}
329
330// A MultiStage executes a series of stages sequentially.
331type MultiStage []Stage
332
333// Exec implements the Stage interface.
334func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
335	var err error
336	for _, s := range ms {
337		if len(alerts) == 0 {
338			return ctx, nil, nil
339		}
340
341		ctx, alerts, err = s.Exec(ctx, l, alerts...)
342		if err != nil {
343			return ctx, nil, err
344		}
345	}
346	return ctx, alerts, nil
347}
348
349// FanoutStage executes its stages concurrently
350type FanoutStage []Stage
351
352// Exec attempts to execute all stages concurrently and discards the results.
353// It returns its input alerts and a types.MultiError if one or more stages fail.
354func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
355	var (
356		wg sync.WaitGroup
357		me types.MultiError
358	)
359	wg.Add(len(fs))
360
361	for _, s := range fs {
362		go func(s Stage) {
363			if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
364				me.Add(err)
365				lvl := level.Error(l)
366				if ctx.Err() == context.Canceled {
367					// It is expected for the context to be canceled on
368					// configuration reload or shutdown. In this case, the
369					// message should only be logged at the debug level.
370					lvl = level.Debug(l)
371				}
372				lvl.Log("msg", "Error on notify", "err", err, "context_err", ctx.Err())
373			}
374			wg.Done()
375		}(s)
376	}
377	wg.Wait()
378
379	if me.Len() > 0 {
380		return ctx, alerts, &me
381	}
382	return ctx, alerts, nil
383}
384
385// GossipSettleStage waits until the Gossip has settled to forward alerts.
386type GossipSettleStage struct {
387	peer *cluster.Peer
388}
389
390// NewGossipSettleStage returns a new GossipSettleStage.
391func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage {
392	return &GossipSettleStage{peer: p}
393}
394
395func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
396	if n.peer != nil {
397		n.peer.WaitReady()
398	}
399	return ctx, alerts, nil
400}
401
402// MuteStage filters alerts through a Muter.
403type MuteStage struct {
404	muter types.Muter
405}
406
407// NewMuteStage return a new MuteStage.
408func NewMuteStage(m types.Muter) *MuteStage {
409	return &MuteStage{muter: m}
410}
411
412// Exec implements the Stage interface.
413func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
414	var filtered []*types.Alert
415	for _, a := range alerts {
416		// TODO(fabxc): increment total alerts counter.
417		// Do not send the alert if muted.
418		if !n.muter.Mutes(a.Labels) {
419			filtered = append(filtered, a)
420		}
421		// TODO(fabxc): increment muted alerts counter if muted.
422	}
423	return ctx, filtered, nil
424}
425
426// WaitStage waits for a certain amount of time before continuing or until the
427// context is done.
428type WaitStage struct {
429	wait func() time.Duration
430}
431
432// NewWaitStage returns a new WaitStage.
433func NewWaitStage(wait func() time.Duration) *WaitStage {
434	return &WaitStage{
435		wait: wait,
436	}
437}
438
439// Exec implements the Stage interface.
440func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
441	select {
442	case <-time.After(ws.wait()):
443	case <-ctx.Done():
444		return ctx, nil, ctx.Err()
445	}
446	return ctx, alerts, nil
447}
448
449// DedupStage filters alerts.
450// Filtering happens based on a notification log.
451type DedupStage struct {
452	rs    ResolvedSender
453	nflog NotificationLog
454	recv  *nflogpb.Receiver
455
456	now  func() time.Time
457	hash func(*types.Alert) uint64
458}
459
460// NewDedupStage wraps a DedupStage that runs against the given notification log.
461func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
462	return &DedupStage{
463		rs:    rs,
464		nflog: l,
465		recv:  recv,
466		now:   utcNow,
467		hash:  hashAlert,
468	}
469}
470
471func utcNow() time.Time {
472	return time.Now().UTC()
473}
474
475var hashBuffers = sync.Pool{}
476
477func getHashBuffer() []byte {
478	b := hashBuffers.Get()
479	if b == nil {
480		return make([]byte, 0, 1024)
481	}
482	return b.([]byte)
483}
484
485func putHashBuffer(b []byte) {
486	b = b[:0]
487	//lint:ignore SA6002 relax staticcheck verification.
488	hashBuffers.Put(b)
489}
490
491func hashAlert(a *types.Alert) uint64 {
492	const sep = '\xff'
493
494	b := getHashBuffer()
495	defer putHashBuffer(b)
496
497	names := make(model.LabelNames, 0, len(a.Labels))
498
499	for ln := range a.Labels {
500		names = append(names, ln)
501	}
502	sort.Sort(names)
503
504	for _, ln := range names {
505		b = append(b, string(ln)...)
506		b = append(b, sep)
507		b = append(b, string(a.Labels[ln])...)
508		b = append(b, sep)
509	}
510
511	hash := xxhash.Sum64(b)
512
513	return hash
514}
515
516func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
517	// If we haven't notified about the alert group before, notify right away
518	// unless we only have resolved alerts.
519	if entry == nil {
520		return len(firing) > 0
521	}
522
523	if !entry.IsFiringSubset(firing) {
524		return true
525	}
526
527	// Notify about all alerts being resolved.
528	// This is done irrespective of the send_resolved flag to make sure that
529	// the firing alerts are cleared from the notification log.
530	if len(firing) == 0 {
531		// If the current alert group and last notification contain no firing
532		// alert, it means that some alerts have been fired and resolved during the
533		// last interval. In this case, there is no need to notify the receiver
534		// since it doesn't know about them.
535		return len(entry.FiringAlerts) > 0
536	}
537
538	if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
539		return true
540	}
541
542	// Nothing changed, only notify if the repeat interval has passed.
543	return entry.Timestamp.Before(n.now().Add(-repeat))
544}
545
546// Exec implements the Stage interface.
547func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
548	gkey, ok := GroupKey(ctx)
549	if !ok {
550		return ctx, nil, fmt.Errorf("group key missing")
551	}
552
553	repeatInterval, ok := RepeatInterval(ctx)
554	if !ok {
555		return ctx, nil, fmt.Errorf("repeat interval missing")
556	}
557
558	firingSet := map[uint64]struct{}{}
559	resolvedSet := map[uint64]struct{}{}
560	firing := []uint64{}
561	resolved := []uint64{}
562
563	var hash uint64
564	for _, a := range alerts {
565		hash = n.hash(a)
566		if a.Resolved() {
567			resolved = append(resolved, hash)
568			resolvedSet[hash] = struct{}{}
569		} else {
570			firing = append(firing, hash)
571			firingSet[hash] = struct{}{}
572		}
573	}
574
575	ctx = WithFiringAlerts(ctx, firing)
576	ctx = WithResolvedAlerts(ctx, resolved)
577
578	entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
579	if err != nil && err != nflog.ErrNotFound {
580		return ctx, nil, err
581	}
582
583	var entry *nflogpb.Entry
584	switch len(entries) {
585	case 0:
586	case 1:
587		entry = entries[0]
588	default:
589		return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
590	}
591
592	if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
593		return ctx, alerts, nil
594	}
595	return ctx, nil, nil
596}
597
598// RetryStage notifies via passed integration with exponential backoff until it
599// succeeds. It aborts if the context is canceled or timed out.
600type RetryStage struct {
601	integration Integration
602	groupName   string
603	metrics     *metrics
604}
605
606// NewRetryStage returns a new instance of a RetryStage.
607func NewRetryStage(i Integration, groupName string, metrics *metrics) *RetryStage {
608	return &RetryStage{
609		integration: i,
610		groupName:   groupName,
611		metrics:     metrics,
612	}
613}
614
615// Exec implements the Stage interface.
616func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
617	var sent []*types.Alert
618
619	// If we shouldn't send notifications for resolved alerts, but there are only
620	// resolved alerts, report them all as successfully notified (we still want the
621	// notification log to log them for the next run of DedupStage).
622	if !r.integration.SendResolved() {
623		firing, ok := FiringAlerts(ctx)
624		if !ok {
625			return ctx, nil, fmt.Errorf("firing alerts missing")
626		}
627		if len(firing) == 0 {
628			return ctx, alerts, nil
629		}
630		for _, a := range alerts {
631			if a.Status() != model.AlertResolved {
632				sent = append(sent, a)
633			}
634		}
635	} else {
636		sent = alerts
637	}
638
639	var (
640		i    = 0
641		b    = backoff.NewExponentialBackOff()
642		tick = backoff.NewTicker(b)
643		iErr error
644	)
645	defer tick.Stop()
646
647	for {
648		i++
649		// Always check the context first to not notify again.
650		select {
651		case <-ctx.Done():
652			if iErr != nil {
653				return ctx, nil, iErr
654			}
655
656			return ctx, nil, ctx.Err()
657		default:
658		}
659
660		select {
661		case <-tick.C:
662			now := time.Now()
663			retry, err := r.integration.Notify(ctx, sent...)
664			r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
665			r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc()
666			if err != nil {
667				r.metrics.numFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
668				level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err)
669				if !retry {
670					return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err)
671				}
672
673				// Save this error to be able to return the last seen error by an
674				// integration upon context timeout.
675				iErr = err
676			} else {
677				return ctx, alerts, nil
678			}
679		case <-ctx.Done():
680			if iErr != nil {
681				return ctx, nil, iErr
682			}
683
684			return ctx, nil, ctx.Err()
685		}
686	}
687}
688
689// SetNotifiesStage sets the notification information about passed alerts. The
690// passed alerts should have already been sent to the receivers.
691type SetNotifiesStage struct {
692	nflog NotificationLog
693	recv  *nflogpb.Receiver
694}
695
696// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
697func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
698	return &SetNotifiesStage{
699		nflog: l,
700		recv:  recv,
701	}
702}
703
704// Exec implements the Stage interface.
705func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
706	gkey, ok := GroupKey(ctx)
707	if !ok {
708		return ctx, nil, fmt.Errorf("group key missing")
709	}
710
711	firing, ok := FiringAlerts(ctx)
712	if !ok {
713		return ctx, nil, fmt.Errorf("firing alerts missing")
714	}
715
716	resolved, ok := ResolvedAlerts(ctx)
717	if !ok {
718		return ctx, nil, fmt.Errorf("resolved alerts missing")
719	}
720
721	return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
722}
723