1// Copyright 2015 Prometheus Team
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package notify
15
16import (
17	"context"
18	"fmt"
19	"sort"
20	"sync"
21	"time"
22
23	"github.com/cenkalti/backoff/v4"
24	"github.com/cespare/xxhash/v2"
25	"github.com/go-kit/log"
26	"github.com/go-kit/log/level"
27	"github.com/pkg/errors"
28	"github.com/prometheus/client_golang/prometheus"
29	"github.com/prometheus/common/model"
30
31	"github.com/prometheus/alertmanager/inhibit"
32	"github.com/prometheus/alertmanager/nflog"
33	"github.com/prometheus/alertmanager/nflog/nflogpb"
34	"github.com/prometheus/alertmanager/silence"
35	"github.com/prometheus/alertmanager/timeinterval"
36	"github.com/prometheus/alertmanager/types"
37)
38
39// ResolvedSender returns true if resolved notifications should be sent.
40type ResolvedSender interface {
41	SendResolved() bool
42}
43
44// Peer represents the cluster node from where we are the sending the notification.
45type Peer interface {
46	// WaitReady waits until the node silences and notifications have settled before attempting to send a notification.
47	WaitReady(context.Context) error
48}
49
50// MinTimeout is the minimum timeout that is set for the context of a call
51// to a notification pipeline.
52const MinTimeout = 10 * time.Second
53
54// Notifier notifies about alerts under constraints of the given context. It
55// returns an error if unsuccessful and a flag whether the error is
56// recoverable. This information is useful for a retry logic.
57type Notifier interface {
58	Notify(context.Context, ...*types.Alert) (bool, error)
59}
60
61// Integration wraps a notifier and its configuration to be uniquely identified
62// by name and index from its origin in the configuration.
63type Integration struct {
64	notifier Notifier
65	rs       ResolvedSender
66	name     string
67	idx      int
68}
69
70// NewIntegration returns a new integration.
71func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration {
72	return Integration{
73		notifier: notifier,
74		rs:       rs,
75		name:     name,
76		idx:      idx,
77	}
78}
79
80// Notify implements the Notifier interface.
81func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
82	return i.notifier.Notify(ctx, alerts...)
83}
84
85// SendResolved implements the ResolvedSender interface.
86func (i *Integration) SendResolved() bool {
87	return i.rs.SendResolved()
88}
89
90// Name returns the name of the integration.
91func (i *Integration) Name() string {
92	return i.name
93}
94
95// Index returns the index of the integration.
96func (i *Integration) Index() int {
97	return i.idx
98}
99
100// String implements the Stringer interface.
101func (i *Integration) String() string {
102	return fmt.Sprintf("%s[%d]", i.name, i.idx)
103}
104
105// notifyKey defines a custom type with which a context is populated to
106// avoid accidental collisions.
107type notifyKey int
108
109const (
110	keyReceiverName notifyKey = iota
111	keyRepeatInterval
112	keyGroupLabels
113	keyGroupKey
114	keyFiringAlerts
115	keyResolvedAlerts
116	keyNow
117	keyMuteTimeIntervals
118)
119
120// WithReceiverName populates a context with a receiver name.
121func WithReceiverName(ctx context.Context, rcv string) context.Context {
122	return context.WithValue(ctx, keyReceiverName, rcv)
123}
124
125// WithGroupKey populates a context with a group key.
126func WithGroupKey(ctx context.Context, s string) context.Context {
127	return context.WithValue(ctx, keyGroupKey, s)
128}
129
130// WithFiringAlerts populates a context with a slice of firing alerts.
131func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context {
132	return context.WithValue(ctx, keyFiringAlerts, alerts)
133}
134
135// WithResolvedAlerts populates a context with a slice of resolved alerts.
136func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context {
137	return context.WithValue(ctx, keyResolvedAlerts, alerts)
138}
139
140// WithGroupLabels populates a context with grouping labels.
141func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
142	return context.WithValue(ctx, keyGroupLabels, lset)
143}
144
145// WithNow populates a context with a now timestamp.
146func WithNow(ctx context.Context, t time.Time) context.Context {
147	return context.WithValue(ctx, keyNow, t)
148}
149
150// WithRepeatInterval populates a context with a repeat interval.
151func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
152	return context.WithValue(ctx, keyRepeatInterval, t)
153}
154
155// WithMuteTimeIntervals populates a context with a slice of mute time names.
156func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context {
157	return context.WithValue(ctx, keyMuteTimeIntervals, mt)
158}
159
160// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
161// second argument is false.
162func RepeatInterval(ctx context.Context) (time.Duration, bool) {
163	v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
164	return v, ok
165}
166
167// ReceiverName extracts a receiver name from the context. Iff none exists, the
168// second argument is false.
169func ReceiverName(ctx context.Context) (string, bool) {
170	v, ok := ctx.Value(keyReceiverName).(string)
171	return v, ok
172}
173
174// GroupKey extracts a group key from the context. Iff none exists, the
175// second argument is false.
176func GroupKey(ctx context.Context) (string, bool) {
177	v, ok := ctx.Value(keyGroupKey).(string)
178	return v, ok
179}
180
181// GroupLabels extracts grouping label set from the context. Iff none exists, the
182// second argument is false.
183func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
184	v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
185	return v, ok
186}
187
188// Now extracts a now timestamp from the context. Iff none exists, the
189// second argument is false.
190func Now(ctx context.Context) (time.Time, bool) {
191	v, ok := ctx.Value(keyNow).(time.Time)
192	return v, ok
193}
194
195// FiringAlerts extracts a slice of firing alerts from the context.
196// Iff none exists, the second argument is false.
197func FiringAlerts(ctx context.Context) ([]uint64, bool) {
198	v, ok := ctx.Value(keyFiringAlerts).([]uint64)
199	return v, ok
200}
201
202// ResolvedAlerts extracts a slice of firing alerts from the context.
203// Iff none exists, the second argument is false.
204func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
205	v, ok := ctx.Value(keyResolvedAlerts).([]uint64)
206	return v, ok
207}
208
209// MuteTimeIntervalNames extracts a slice of mute time names from the context. Iff none exists, the
210// second argument is false.
211func MuteTimeIntervalNames(ctx context.Context) ([]string, bool) {
212	v, ok := ctx.Value(keyMuteTimeIntervals).([]string)
213	return v, ok
214}
215
216// A Stage processes alerts under the constraints of the given context.
217type Stage interface {
218	Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
219}
220
221// StageFunc wraps a function to represent a Stage.
222type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
223
224// Exec implements Stage interface.
225func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
226	return f(ctx, l, alerts...)
227}
228
229type NotificationLog interface {
230	Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
231	Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
232}
233
234type Metrics struct {
235	numNotifications                   *prometheus.CounterVec
236	numTotalFailedNotifications        *prometheus.CounterVec
237	numNotificationRequestsTotal       *prometheus.CounterVec
238	numNotificationRequestsFailedTotal *prometheus.CounterVec
239	notificationLatencySeconds         *prometheus.HistogramVec
240}
241
242func NewMetrics(r prometheus.Registerer) *Metrics {
243	m := &Metrics{
244		numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
245			Namespace: "alertmanager",
246			Name:      "notifications_total",
247			Help:      "The total number of attempted notifications.",
248		}, []string{"integration"}),
249		numTotalFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
250			Namespace: "alertmanager",
251			Name:      "notifications_failed_total",
252			Help:      "The total number of failed notifications.",
253		}, []string{"integration"}),
254		numNotificationRequestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
255			Namespace: "alertmanager",
256			Name:      "notification_requests_total",
257			Help:      "The total number of attempted notification requests.",
258		}, []string{"integration"}),
259		numNotificationRequestsFailedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
260			Namespace: "alertmanager",
261			Name:      "notification_requests_failed_total",
262			Help:      "The total number of failed notification requests.",
263		}, []string{"integration"}),
264		notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
265			Namespace: "alertmanager",
266			Name:      "notification_latency_seconds",
267			Help:      "The latency of notifications in seconds.",
268			Buckets:   []float64{1, 5, 10, 15, 20},
269		}, []string{"integration"}),
270	}
271	for _, integration := range []string{
272		"email",
273		"pagerduty",
274		"wechat",
275		"pushover",
276		"slack",
277		"opsgenie",
278		"webhook",
279		"victorops",
280		"sns",
281	} {
282		m.numNotifications.WithLabelValues(integration)
283		m.numTotalFailedNotifications.WithLabelValues(integration)
284		m.numNotificationRequestsTotal.WithLabelValues(integration)
285		m.numNotificationRequestsFailedTotal.WithLabelValues(integration)
286		m.notificationLatencySeconds.WithLabelValues(integration)
287	}
288	r.MustRegister(
289		m.numNotifications, m.numTotalFailedNotifications,
290		m.numNotificationRequestsTotal, m.numNotificationRequestsFailedTotal,
291		m.notificationLatencySeconds,
292	)
293	return m
294}
295
296type PipelineBuilder struct {
297	metrics *Metrics
298}
299
300func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder {
301	return &PipelineBuilder{
302		metrics: NewMetrics(r),
303	}
304}
305
306// New returns a map of receivers to Stages.
307func (pb *PipelineBuilder) New(
308	receivers map[string][]Integration,
309	wait func() time.Duration,
310	inhibitor *inhibit.Inhibitor,
311	silencer *silence.Silencer,
312	muteTimes map[string][]timeinterval.TimeInterval,
313	notificationLog NotificationLog,
314	peer Peer,
315) RoutingStage {
316	rs := make(RoutingStage, len(receivers))
317
318	ms := NewGossipSettleStage(peer)
319	is := NewMuteStage(inhibitor)
320	ss := NewMuteStage(silencer)
321	tms := NewTimeMuteStage(muteTimes)
322
323	for name := range receivers {
324		st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
325		rs[name] = MultiStage{ms, is, tms, ss, st}
326	}
327	return rs
328}
329
330// createReceiverStage creates a pipeline of stages for a receiver.
331func createReceiverStage(
332	name string,
333	integrations []Integration,
334	wait func() time.Duration,
335	notificationLog NotificationLog,
336	metrics *Metrics,
337) Stage {
338	var fs FanoutStage
339	for i := range integrations {
340		recv := &nflogpb.Receiver{
341			GroupName:   name,
342			Integration: integrations[i].Name(),
343			Idx:         uint32(integrations[i].Index()),
344		}
345		var s MultiStage
346		s = append(s, NewWaitStage(wait))
347		s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
348		s = append(s, NewRetryStage(integrations[i], name, metrics))
349		s = append(s, NewSetNotifiesStage(notificationLog, recv))
350
351		fs = append(fs, s)
352	}
353	return fs
354}
355
356// RoutingStage executes the inner stages based on the receiver specified in
357// the context.
358type RoutingStage map[string]Stage
359
360// Exec implements the Stage interface.
361func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
362	receiver, ok := ReceiverName(ctx)
363	if !ok {
364		return ctx, nil, errors.New("receiver missing")
365	}
366
367	s, ok := rs[receiver]
368	if !ok {
369		return ctx, nil, errors.New("stage for receiver missing")
370	}
371
372	return s.Exec(ctx, l, alerts...)
373}
374
375// A MultiStage executes a series of stages sequentially.
376type MultiStage []Stage
377
378// Exec implements the Stage interface.
379func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
380	var err error
381	for _, s := range ms {
382		if len(alerts) == 0 {
383			return ctx, nil, nil
384		}
385
386		ctx, alerts, err = s.Exec(ctx, l, alerts...)
387		if err != nil {
388			return ctx, nil, err
389		}
390	}
391	return ctx, alerts, nil
392}
393
394// FanoutStage executes its stages concurrently
395type FanoutStage []Stage
396
397// Exec attempts to execute all stages concurrently and discards the results.
398// It returns its input alerts and a types.MultiError if one or more stages fail.
399func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
400	var (
401		wg sync.WaitGroup
402		me types.MultiError
403	)
404	wg.Add(len(fs))
405
406	for _, s := range fs {
407		go func(s Stage) {
408			if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
409				me.Add(err)
410			}
411			wg.Done()
412		}(s)
413	}
414	wg.Wait()
415
416	if me.Len() > 0 {
417		return ctx, alerts, &me
418	}
419	return ctx, alerts, nil
420}
421
422// GossipSettleStage waits until the Gossip has settled to forward alerts.
423type GossipSettleStage struct {
424	peer Peer
425}
426
427// NewGossipSettleStage returns a new GossipSettleStage.
428func NewGossipSettleStage(p Peer) *GossipSettleStage {
429	return &GossipSettleStage{peer: p}
430}
431
432func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
433	if n.peer != nil {
434		if err := n.peer.WaitReady(ctx); err != nil {
435			return ctx, nil, err
436		}
437	}
438	return ctx, alerts, nil
439}
440
441// MuteStage filters alerts through a Muter.
442type MuteStage struct {
443	muter types.Muter
444}
445
446// NewMuteStage return a new MuteStage.
447func NewMuteStage(m types.Muter) *MuteStage {
448	return &MuteStage{muter: m}
449}
450
451// Exec implements the Stage interface.
452func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
453	var filtered []*types.Alert
454	for _, a := range alerts {
455		// TODO(fabxc): increment total alerts counter.
456		// Do not send the alert if muted.
457		if !n.muter.Mutes(a.Labels) {
458			filtered = append(filtered, a)
459		}
460		// TODO(fabxc): increment muted alerts counter if muted.
461	}
462	return ctx, filtered, nil
463}
464
465// WaitStage waits for a certain amount of time before continuing or until the
466// context is done.
467type WaitStage struct {
468	wait func() time.Duration
469}
470
471// NewWaitStage returns a new WaitStage.
472func NewWaitStage(wait func() time.Duration) *WaitStage {
473	return &WaitStage{
474		wait: wait,
475	}
476}
477
478// Exec implements the Stage interface.
479func (ws *WaitStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
480	select {
481	case <-time.After(ws.wait()):
482	case <-ctx.Done():
483		return ctx, nil, ctx.Err()
484	}
485	return ctx, alerts, nil
486}
487
488// DedupStage filters alerts.
489// Filtering happens based on a notification log.
490type DedupStage struct {
491	rs    ResolvedSender
492	nflog NotificationLog
493	recv  *nflogpb.Receiver
494
495	now  func() time.Time
496	hash func(*types.Alert) uint64
497}
498
499// NewDedupStage wraps a DedupStage that runs against the given notification log.
500func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
501	return &DedupStage{
502		rs:    rs,
503		nflog: l,
504		recv:  recv,
505		now:   utcNow,
506		hash:  hashAlert,
507	}
508}
509
510func utcNow() time.Time {
511	return time.Now().UTC()
512}
513
514// Wrap a slice in a struct so we can store a pointer in sync.Pool
515type hashBuffer struct {
516	buf []byte
517}
518
519var hashBuffers = sync.Pool{
520	New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} },
521}
522
523func hashAlert(a *types.Alert) uint64 {
524	const sep = '\xff'
525
526	hb := hashBuffers.Get().(*hashBuffer)
527	defer hashBuffers.Put(hb)
528	b := hb.buf[:0]
529
530	names := make(model.LabelNames, 0, len(a.Labels))
531
532	for ln := range a.Labels {
533		names = append(names, ln)
534	}
535	sort.Sort(names)
536
537	for _, ln := range names {
538		b = append(b, string(ln)...)
539		b = append(b, sep)
540		b = append(b, string(a.Labels[ln])...)
541		b = append(b, sep)
542	}
543
544	hash := xxhash.Sum64(b)
545
546	return hash
547}
548
549func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
550	// If we haven't notified about the alert group before, notify right away
551	// unless we only have resolved alerts.
552	if entry == nil {
553		return len(firing) > 0
554	}
555
556	if !entry.IsFiringSubset(firing) {
557		return true
558	}
559
560	// Notify about all alerts being resolved.
561	// This is done irrespective of the send_resolved flag to make sure that
562	// the firing alerts are cleared from the notification log.
563	if len(firing) == 0 {
564		// If the current alert group and last notification contain no firing
565		// alert, it means that some alerts have been fired and resolved during the
566		// last interval. In this case, there is no need to notify the receiver
567		// since it doesn't know about them.
568		return len(entry.FiringAlerts) > 0
569	}
570
571	if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
572		return true
573	}
574
575	// Nothing changed, only notify if the repeat interval has passed.
576	return entry.Timestamp.Before(n.now().Add(-repeat))
577}
578
579// Exec implements the Stage interface.
580func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
581	gkey, ok := GroupKey(ctx)
582	if !ok {
583		return ctx, nil, errors.New("group key missing")
584	}
585
586	repeatInterval, ok := RepeatInterval(ctx)
587	if !ok {
588		return ctx, nil, errors.New("repeat interval missing")
589	}
590
591	firingSet := map[uint64]struct{}{}
592	resolvedSet := map[uint64]struct{}{}
593	firing := []uint64{}
594	resolved := []uint64{}
595
596	var hash uint64
597	for _, a := range alerts {
598		hash = n.hash(a)
599		if a.Resolved() {
600			resolved = append(resolved, hash)
601			resolvedSet[hash] = struct{}{}
602		} else {
603			firing = append(firing, hash)
604			firingSet[hash] = struct{}{}
605		}
606	}
607
608	ctx = WithFiringAlerts(ctx, firing)
609	ctx = WithResolvedAlerts(ctx, resolved)
610
611	entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
612	if err != nil && err != nflog.ErrNotFound {
613		return ctx, nil, err
614	}
615
616	var entry *nflogpb.Entry
617	switch len(entries) {
618	case 0:
619	case 1:
620		entry = entries[0]
621	default:
622		return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries))
623	}
624
625	if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
626		return ctx, alerts, nil
627	}
628	return ctx, nil, nil
629}
630
631// RetryStage notifies via passed integration with exponential backoff until it
632// succeeds. It aborts if the context is canceled or timed out.
633type RetryStage struct {
634	integration Integration
635	groupName   string
636	metrics     *Metrics
637}
638
639// NewRetryStage returns a new instance of a RetryStage.
640func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage {
641	return &RetryStage{
642		integration: i,
643		groupName:   groupName,
644		metrics:     metrics,
645	}
646}
647
648func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
649	r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc()
650	ctx, alerts, err := r.exec(ctx, l, alerts...)
651	if err != nil {
652		r.metrics.numTotalFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
653	}
654	return ctx, alerts, err
655}
656
657func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
658	var sent []*types.Alert
659
660	// If we shouldn't send notifications for resolved alerts, but there are only
661	// resolved alerts, report them all as successfully notified (we still want the
662	// notification log to log them for the next run of DedupStage).
663	if !r.integration.SendResolved() {
664		firing, ok := FiringAlerts(ctx)
665		if !ok {
666			return ctx, nil, errors.New("firing alerts missing")
667		}
668		if len(firing) == 0 {
669			return ctx, alerts, nil
670		}
671		for _, a := range alerts {
672			if a.Status() != model.AlertResolved {
673				sent = append(sent, a)
674			}
675		}
676	} else {
677		sent = alerts
678	}
679
680	b := backoff.NewExponentialBackOff()
681	b.MaxElapsedTime = 0 // Always retry.
682
683	tick := backoff.NewTicker(b)
684	defer tick.Stop()
685
686	var (
687		i    = 0
688		iErr error
689	)
690	l = log.With(l, "receiver", r.groupName, "integration", r.integration.String())
691
692	for {
693		i++
694		// Always check the context first to not notify again.
695		select {
696		case <-ctx.Done():
697			if iErr == nil {
698				iErr = ctx.Err()
699			}
700
701			return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
702		default:
703		}
704
705		select {
706		case <-tick.C:
707			now := time.Now()
708			retry, err := r.integration.Notify(ctx, sent...)
709			r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
710			r.metrics.numNotificationRequestsTotal.WithLabelValues(r.integration.Name()).Inc()
711			if err != nil {
712				r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc()
713				if !retry {
714					return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
715				}
716				if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) {
717					// Log the error if the context isn't done and the error isn't the same as before.
718					level.Warn(l).Log("msg", "Notify attempt failed, will retry later", "attempts", i, "err", err)
719				}
720
721				// Save this error to be able to return the last seen error by an
722				// integration upon context timeout.
723				iErr = err
724			} else {
725				lvl := level.Debug(l)
726				if i > 1 {
727					lvl = level.Info(l)
728				}
729				lvl.Log("msg", "Notify success", "attempts", i)
730				return ctx, alerts, nil
731			}
732		case <-ctx.Done():
733			if iErr == nil {
734				iErr = ctx.Err()
735			}
736
737			return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
738		}
739	}
740}
741
742// SetNotifiesStage sets the notification information about passed alerts. The
743// passed alerts should have already been sent to the receivers.
744type SetNotifiesStage struct {
745	nflog NotificationLog
746	recv  *nflogpb.Receiver
747}
748
749// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
750func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
751	return &SetNotifiesStage{
752		nflog: l,
753		recv:  recv,
754	}
755}
756
757// Exec implements the Stage interface.
758func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
759	gkey, ok := GroupKey(ctx)
760	if !ok {
761		return ctx, nil, errors.New("group key missing")
762	}
763
764	firing, ok := FiringAlerts(ctx)
765	if !ok {
766		return ctx, nil, errors.New("firing alerts missing")
767	}
768
769	resolved, ok := ResolvedAlerts(ctx)
770	if !ok {
771		return ctx, nil, errors.New("resolved alerts missing")
772	}
773
774	return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
775}
776
777type TimeMuteStage struct {
778	muteTimes map[string][]timeinterval.TimeInterval
779}
780
781func NewTimeMuteStage(mt map[string][]timeinterval.TimeInterval) *TimeMuteStage {
782	return &TimeMuteStage{mt}
783}
784
785// Exec implements the stage interface for TimeMuteStage.
786// TimeMuteStage is responsible for muting alerts whose route is not in an active time.
787func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
788	muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx)
789	if !ok {
790		return ctx, alerts, nil
791	}
792	now, ok := Now(ctx)
793	if !ok {
794		return ctx, alerts, errors.New("missing now timestamp")
795	}
796
797	muted := false
798Loop:
799	for _, mtName := range muteTimeIntervalNames {
800		mt, ok := tms.muteTimes[mtName]
801		if !ok {
802			return ctx, alerts, errors.Errorf("mute time %s doesn't exist in config", mtName)
803		}
804		for _, ti := range mt {
805			if ti.ContainsTime(now.UTC()) {
806				muted = true
807				break Loop
808			}
809		}
810	}
811	// If the current time is inside a mute time, all alerts are removed from the pipeline.
812	if muted {
813		level.Debug(l).Log("msg", "Notifications not sent, route is within mute time")
814		return ctx, nil, nil
815	}
816	return ctx, alerts, nil
817}
818