1// Copyright 2015 Prometheus Team 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package notify 15 16import ( 17 "context" 18 "fmt" 19 "sort" 20 "sync" 21 "time" 22 23 "github.com/cenkalti/backoff/v4" 24 "github.com/cespare/xxhash/v2" 25 "github.com/go-kit/log" 26 "github.com/go-kit/log/level" 27 "github.com/pkg/errors" 28 "github.com/prometheus/client_golang/prometheus" 29 "github.com/prometheus/common/model" 30 31 "github.com/prometheus/alertmanager/inhibit" 32 "github.com/prometheus/alertmanager/nflog" 33 "github.com/prometheus/alertmanager/nflog/nflogpb" 34 "github.com/prometheus/alertmanager/silence" 35 "github.com/prometheus/alertmanager/timeinterval" 36 "github.com/prometheus/alertmanager/types" 37) 38 39// ResolvedSender returns true if resolved notifications should be sent. 40type ResolvedSender interface { 41 SendResolved() bool 42} 43 44// Peer represents the cluster node from where we are the sending the notification. 45type Peer interface { 46 // WaitReady waits until the node silences and notifications have settled before attempting to send a notification. 47 WaitReady(context.Context) error 48} 49 50// MinTimeout is the minimum timeout that is set for the context of a call 51// to a notification pipeline. 52const MinTimeout = 10 * time.Second 53 54// Notifier notifies about alerts under constraints of the given context. It 55// returns an error if unsuccessful and a flag whether the error is 56// recoverable. This information is useful for a retry logic. 57type Notifier interface { 58 Notify(context.Context, ...*types.Alert) (bool, error) 59} 60 61// Integration wraps a notifier and its configuration to be uniquely identified 62// by name and index from its origin in the configuration. 63type Integration struct { 64 notifier Notifier 65 rs ResolvedSender 66 name string 67 idx int 68} 69 70// NewIntegration returns a new integration. 71func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration { 72 return Integration{ 73 notifier: notifier, 74 rs: rs, 75 name: name, 76 idx: idx, 77 } 78} 79 80// Notify implements the Notifier interface. 81func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { 82 return i.notifier.Notify(ctx, alerts...) 83} 84 85// SendResolved implements the ResolvedSender interface. 86func (i *Integration) SendResolved() bool { 87 return i.rs.SendResolved() 88} 89 90// Name returns the name of the integration. 91func (i *Integration) Name() string { 92 return i.name 93} 94 95// Index returns the index of the integration. 96func (i *Integration) Index() int { 97 return i.idx 98} 99 100// String implements the Stringer interface. 101func (i *Integration) String() string { 102 return fmt.Sprintf("%s[%d]", i.name, i.idx) 103} 104 105// notifyKey defines a custom type with which a context is populated to 106// avoid accidental collisions. 107type notifyKey int 108 109const ( 110 keyReceiverName notifyKey = iota 111 keyRepeatInterval 112 keyGroupLabels 113 keyGroupKey 114 keyFiringAlerts 115 keyResolvedAlerts 116 keyNow 117 keyMuteTimeIntervals 118) 119 120// WithReceiverName populates a context with a receiver name. 121func WithReceiverName(ctx context.Context, rcv string) context.Context { 122 return context.WithValue(ctx, keyReceiverName, rcv) 123} 124 125// WithGroupKey populates a context with a group key. 126func WithGroupKey(ctx context.Context, s string) context.Context { 127 return context.WithValue(ctx, keyGroupKey, s) 128} 129 130// WithFiringAlerts populates a context with a slice of firing alerts. 131func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { 132 return context.WithValue(ctx, keyFiringAlerts, alerts) 133} 134 135// WithResolvedAlerts populates a context with a slice of resolved alerts. 136func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context { 137 return context.WithValue(ctx, keyResolvedAlerts, alerts) 138} 139 140// WithGroupLabels populates a context with grouping labels. 141func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context { 142 return context.WithValue(ctx, keyGroupLabels, lset) 143} 144 145// WithNow populates a context with a now timestamp. 146func WithNow(ctx context.Context, t time.Time) context.Context { 147 return context.WithValue(ctx, keyNow, t) 148} 149 150// WithRepeatInterval populates a context with a repeat interval. 151func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context { 152 return context.WithValue(ctx, keyRepeatInterval, t) 153} 154 155// WithMuteTimeIntervals populates a context with a slice of mute time names. 156func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context { 157 return context.WithValue(ctx, keyMuteTimeIntervals, mt) 158} 159 160// RepeatInterval extracts a repeat interval from the context. Iff none exists, the 161// second argument is false. 162func RepeatInterval(ctx context.Context) (time.Duration, bool) { 163 v, ok := ctx.Value(keyRepeatInterval).(time.Duration) 164 return v, ok 165} 166 167// ReceiverName extracts a receiver name from the context. Iff none exists, the 168// second argument is false. 169func ReceiverName(ctx context.Context) (string, bool) { 170 v, ok := ctx.Value(keyReceiverName).(string) 171 return v, ok 172} 173 174// GroupKey extracts a group key from the context. Iff none exists, the 175// second argument is false. 176func GroupKey(ctx context.Context) (string, bool) { 177 v, ok := ctx.Value(keyGroupKey).(string) 178 return v, ok 179} 180 181// GroupLabels extracts grouping label set from the context. Iff none exists, the 182// second argument is false. 183func GroupLabels(ctx context.Context) (model.LabelSet, bool) { 184 v, ok := ctx.Value(keyGroupLabels).(model.LabelSet) 185 return v, ok 186} 187 188// Now extracts a now timestamp from the context. Iff none exists, the 189// second argument is false. 190func Now(ctx context.Context) (time.Time, bool) { 191 v, ok := ctx.Value(keyNow).(time.Time) 192 return v, ok 193} 194 195// FiringAlerts extracts a slice of firing alerts from the context. 196// Iff none exists, the second argument is false. 197func FiringAlerts(ctx context.Context) ([]uint64, bool) { 198 v, ok := ctx.Value(keyFiringAlerts).([]uint64) 199 return v, ok 200} 201 202// ResolvedAlerts extracts a slice of firing alerts from the context. 203// Iff none exists, the second argument is false. 204func ResolvedAlerts(ctx context.Context) ([]uint64, bool) { 205 v, ok := ctx.Value(keyResolvedAlerts).([]uint64) 206 return v, ok 207} 208 209// MuteTimeIntervalNames extracts a slice of mute time names from the context. Iff none exists, the 210// second argument is false. 211func MuteTimeIntervalNames(ctx context.Context) ([]string, bool) { 212 v, ok := ctx.Value(keyMuteTimeIntervals).([]string) 213 return v, ok 214} 215 216// A Stage processes alerts under the constraints of the given context. 217type Stage interface { 218 Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) 219} 220 221// StageFunc wraps a function to represent a Stage. 222type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) 223 224// Exec implements Stage interface. 225func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 226 return f(ctx, l, alerts...) 227} 228 229type NotificationLog interface { 230 Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error 231 Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error) 232} 233 234type Metrics struct { 235 numNotifications *prometheus.CounterVec 236 numTotalFailedNotifications *prometheus.CounterVec 237 numNotificationRequestsTotal *prometheus.CounterVec 238 numNotificationRequestsFailedTotal *prometheus.CounterVec 239 notificationLatencySeconds *prometheus.HistogramVec 240} 241 242func NewMetrics(r prometheus.Registerer) *Metrics { 243 m := &Metrics{ 244 numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ 245 Namespace: "alertmanager", 246 Name: "notifications_total", 247 Help: "The total number of attempted notifications.", 248 }, []string{"integration"}), 249 numTotalFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ 250 Namespace: "alertmanager", 251 Name: "notifications_failed_total", 252 Help: "The total number of failed notifications.", 253 }, []string{"integration"}), 254 numNotificationRequestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ 255 Namespace: "alertmanager", 256 Name: "notification_requests_total", 257 Help: "The total number of attempted notification requests.", 258 }, []string{"integration"}), 259 numNotificationRequestsFailedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ 260 Namespace: "alertmanager", 261 Name: "notification_requests_failed_total", 262 Help: "The total number of failed notification requests.", 263 }, []string{"integration"}), 264 notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{ 265 Namespace: "alertmanager", 266 Name: "notification_latency_seconds", 267 Help: "The latency of notifications in seconds.", 268 Buckets: []float64{1, 5, 10, 15, 20}, 269 }, []string{"integration"}), 270 } 271 for _, integration := range []string{ 272 "email", 273 "pagerduty", 274 "wechat", 275 "pushover", 276 "slack", 277 "opsgenie", 278 "webhook", 279 "victorops", 280 "sns", 281 } { 282 m.numNotifications.WithLabelValues(integration) 283 m.numTotalFailedNotifications.WithLabelValues(integration) 284 m.numNotificationRequestsTotal.WithLabelValues(integration) 285 m.numNotificationRequestsFailedTotal.WithLabelValues(integration) 286 m.notificationLatencySeconds.WithLabelValues(integration) 287 } 288 r.MustRegister( 289 m.numNotifications, m.numTotalFailedNotifications, 290 m.numNotificationRequestsTotal, m.numNotificationRequestsFailedTotal, 291 m.notificationLatencySeconds, 292 ) 293 return m 294} 295 296type PipelineBuilder struct { 297 metrics *Metrics 298} 299 300func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder { 301 return &PipelineBuilder{ 302 metrics: NewMetrics(r), 303 } 304} 305 306// New returns a map of receivers to Stages. 307func (pb *PipelineBuilder) New( 308 receivers map[string][]Integration, 309 wait func() time.Duration, 310 inhibitor *inhibit.Inhibitor, 311 silencer *silence.Silencer, 312 muteTimes map[string][]timeinterval.TimeInterval, 313 notificationLog NotificationLog, 314 peer Peer, 315) RoutingStage { 316 rs := make(RoutingStage, len(receivers)) 317 318 ms := NewGossipSettleStage(peer) 319 is := NewMuteStage(inhibitor) 320 ss := NewMuteStage(silencer) 321 tms := NewTimeMuteStage(muteTimes) 322 323 for name := range receivers { 324 st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) 325 rs[name] = MultiStage{ms, is, tms, ss, st} 326 } 327 return rs 328} 329 330// createReceiverStage creates a pipeline of stages for a receiver. 331func createReceiverStage( 332 name string, 333 integrations []Integration, 334 wait func() time.Duration, 335 notificationLog NotificationLog, 336 metrics *Metrics, 337) Stage { 338 var fs FanoutStage 339 for i := range integrations { 340 recv := &nflogpb.Receiver{ 341 GroupName: name, 342 Integration: integrations[i].Name(), 343 Idx: uint32(integrations[i].Index()), 344 } 345 var s MultiStage 346 s = append(s, NewWaitStage(wait)) 347 s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) 348 s = append(s, NewRetryStage(integrations[i], name, metrics)) 349 s = append(s, NewSetNotifiesStage(notificationLog, recv)) 350 351 fs = append(fs, s) 352 } 353 return fs 354} 355 356// RoutingStage executes the inner stages based on the receiver specified in 357// the context. 358type RoutingStage map[string]Stage 359 360// Exec implements the Stage interface. 361func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 362 receiver, ok := ReceiverName(ctx) 363 if !ok { 364 return ctx, nil, errors.New("receiver missing") 365 } 366 367 s, ok := rs[receiver] 368 if !ok { 369 return ctx, nil, errors.New("stage for receiver missing") 370 } 371 372 return s.Exec(ctx, l, alerts...) 373} 374 375// A MultiStage executes a series of stages sequentially. 376type MultiStage []Stage 377 378// Exec implements the Stage interface. 379func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 380 var err error 381 for _, s := range ms { 382 if len(alerts) == 0 { 383 return ctx, nil, nil 384 } 385 386 ctx, alerts, err = s.Exec(ctx, l, alerts...) 387 if err != nil { 388 return ctx, nil, err 389 } 390 } 391 return ctx, alerts, nil 392} 393 394// FanoutStage executes its stages concurrently 395type FanoutStage []Stage 396 397// Exec attempts to execute all stages concurrently and discards the results. 398// It returns its input alerts and a types.MultiError if one or more stages fail. 399func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 400 var ( 401 wg sync.WaitGroup 402 me types.MultiError 403 ) 404 wg.Add(len(fs)) 405 406 for _, s := range fs { 407 go func(s Stage) { 408 if _, _, err := s.Exec(ctx, l, alerts...); err != nil { 409 me.Add(err) 410 } 411 wg.Done() 412 }(s) 413 } 414 wg.Wait() 415 416 if me.Len() > 0 { 417 return ctx, alerts, &me 418 } 419 return ctx, alerts, nil 420} 421 422// GossipSettleStage waits until the Gossip has settled to forward alerts. 423type GossipSettleStage struct { 424 peer Peer 425} 426 427// NewGossipSettleStage returns a new GossipSettleStage. 428func NewGossipSettleStage(p Peer) *GossipSettleStage { 429 return &GossipSettleStage{peer: p} 430} 431 432func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 433 if n.peer != nil { 434 if err := n.peer.WaitReady(ctx); err != nil { 435 return ctx, nil, err 436 } 437 } 438 return ctx, alerts, nil 439} 440 441// MuteStage filters alerts through a Muter. 442type MuteStage struct { 443 muter types.Muter 444} 445 446// NewMuteStage return a new MuteStage. 447func NewMuteStage(m types.Muter) *MuteStage { 448 return &MuteStage{muter: m} 449} 450 451// Exec implements the Stage interface. 452func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 453 var filtered []*types.Alert 454 for _, a := range alerts { 455 // TODO(fabxc): increment total alerts counter. 456 // Do not send the alert if muted. 457 if !n.muter.Mutes(a.Labels) { 458 filtered = append(filtered, a) 459 } 460 // TODO(fabxc): increment muted alerts counter if muted. 461 } 462 return ctx, filtered, nil 463} 464 465// WaitStage waits for a certain amount of time before continuing or until the 466// context is done. 467type WaitStage struct { 468 wait func() time.Duration 469} 470 471// NewWaitStage returns a new WaitStage. 472func NewWaitStage(wait func() time.Duration) *WaitStage { 473 return &WaitStage{ 474 wait: wait, 475 } 476} 477 478// Exec implements the Stage interface. 479func (ws *WaitStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 480 select { 481 case <-time.After(ws.wait()): 482 case <-ctx.Done(): 483 return ctx, nil, ctx.Err() 484 } 485 return ctx, alerts, nil 486} 487 488// DedupStage filters alerts. 489// Filtering happens based on a notification log. 490type DedupStage struct { 491 rs ResolvedSender 492 nflog NotificationLog 493 recv *nflogpb.Receiver 494 495 now func() time.Time 496 hash func(*types.Alert) uint64 497} 498 499// NewDedupStage wraps a DedupStage that runs against the given notification log. 500func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { 501 return &DedupStage{ 502 rs: rs, 503 nflog: l, 504 recv: recv, 505 now: utcNow, 506 hash: hashAlert, 507 } 508} 509 510func utcNow() time.Time { 511 return time.Now().UTC() 512} 513 514// Wrap a slice in a struct so we can store a pointer in sync.Pool 515type hashBuffer struct { 516 buf []byte 517} 518 519var hashBuffers = sync.Pool{ 520 New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} }, 521} 522 523func hashAlert(a *types.Alert) uint64 { 524 const sep = '\xff' 525 526 hb := hashBuffers.Get().(*hashBuffer) 527 defer hashBuffers.Put(hb) 528 b := hb.buf[:0] 529 530 names := make(model.LabelNames, 0, len(a.Labels)) 531 532 for ln := range a.Labels { 533 names = append(names, ln) 534 } 535 sort.Sort(names) 536 537 for _, ln := range names { 538 b = append(b, string(ln)...) 539 b = append(b, sep) 540 b = append(b, string(a.Labels[ln])...) 541 b = append(b, sep) 542 } 543 544 hash := xxhash.Sum64(b) 545 546 return hash 547} 548 549func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { 550 // If we haven't notified about the alert group before, notify right away 551 // unless we only have resolved alerts. 552 if entry == nil { 553 return len(firing) > 0 554 } 555 556 if !entry.IsFiringSubset(firing) { 557 return true 558 } 559 560 // Notify about all alerts being resolved. 561 // This is done irrespective of the send_resolved flag to make sure that 562 // the firing alerts are cleared from the notification log. 563 if len(firing) == 0 { 564 // If the current alert group and last notification contain no firing 565 // alert, it means that some alerts have been fired and resolved during the 566 // last interval. In this case, there is no need to notify the receiver 567 // since it doesn't know about them. 568 return len(entry.FiringAlerts) > 0 569 } 570 571 if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { 572 return true 573 } 574 575 // Nothing changed, only notify if the repeat interval has passed. 576 return entry.Timestamp.Before(n.now().Add(-repeat)) 577} 578 579// Exec implements the Stage interface. 580func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 581 gkey, ok := GroupKey(ctx) 582 if !ok { 583 return ctx, nil, errors.New("group key missing") 584 } 585 586 repeatInterval, ok := RepeatInterval(ctx) 587 if !ok { 588 return ctx, nil, errors.New("repeat interval missing") 589 } 590 591 firingSet := map[uint64]struct{}{} 592 resolvedSet := map[uint64]struct{}{} 593 firing := []uint64{} 594 resolved := []uint64{} 595 596 var hash uint64 597 for _, a := range alerts { 598 hash = n.hash(a) 599 if a.Resolved() { 600 resolved = append(resolved, hash) 601 resolvedSet[hash] = struct{}{} 602 } else { 603 firing = append(firing, hash) 604 firingSet[hash] = struct{}{} 605 } 606 } 607 608 ctx = WithFiringAlerts(ctx, firing) 609 ctx = WithResolvedAlerts(ctx, resolved) 610 611 entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv)) 612 if err != nil && err != nflog.ErrNotFound { 613 return ctx, nil, err 614 } 615 616 var entry *nflogpb.Entry 617 switch len(entries) { 618 case 0: 619 case 1: 620 entry = entries[0] 621 default: 622 return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries)) 623 } 624 625 if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { 626 return ctx, alerts, nil 627 } 628 return ctx, nil, nil 629} 630 631// RetryStage notifies via passed integration with exponential backoff until it 632// succeeds. It aborts if the context is canceled or timed out. 633type RetryStage struct { 634 integration Integration 635 groupName string 636 metrics *Metrics 637} 638 639// NewRetryStage returns a new instance of a RetryStage. 640func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { 641 return &RetryStage{ 642 integration: i, 643 groupName: groupName, 644 metrics: metrics, 645 } 646} 647 648func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 649 r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc() 650 ctx, alerts, err := r.exec(ctx, l, alerts...) 651 if err != nil { 652 r.metrics.numTotalFailedNotifications.WithLabelValues(r.integration.Name()).Inc() 653 } 654 return ctx, alerts, err 655} 656 657func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 658 var sent []*types.Alert 659 660 // If we shouldn't send notifications for resolved alerts, but there are only 661 // resolved alerts, report them all as successfully notified (we still want the 662 // notification log to log them for the next run of DedupStage). 663 if !r.integration.SendResolved() { 664 firing, ok := FiringAlerts(ctx) 665 if !ok { 666 return ctx, nil, errors.New("firing alerts missing") 667 } 668 if len(firing) == 0 { 669 return ctx, alerts, nil 670 } 671 for _, a := range alerts { 672 if a.Status() != model.AlertResolved { 673 sent = append(sent, a) 674 } 675 } 676 } else { 677 sent = alerts 678 } 679 680 b := backoff.NewExponentialBackOff() 681 b.MaxElapsedTime = 0 // Always retry. 682 683 tick := backoff.NewTicker(b) 684 defer tick.Stop() 685 686 var ( 687 i = 0 688 iErr error 689 ) 690 l = log.With(l, "receiver", r.groupName, "integration", r.integration.String()) 691 692 for { 693 i++ 694 // Always check the context first to not notify again. 695 select { 696 case <-ctx.Done(): 697 if iErr == nil { 698 iErr = ctx.Err() 699 } 700 701 return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) 702 default: 703 } 704 705 select { 706 case <-tick.C: 707 now := time.Now() 708 retry, err := r.integration.Notify(ctx, sent...) 709 r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds()) 710 r.metrics.numNotificationRequestsTotal.WithLabelValues(r.integration.Name()).Inc() 711 if err != nil { 712 r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc() 713 if !retry { 714 return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) 715 } 716 if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) { 717 // Log the error if the context isn't done and the error isn't the same as before. 718 level.Warn(l).Log("msg", "Notify attempt failed, will retry later", "attempts", i, "err", err) 719 } 720 721 // Save this error to be able to return the last seen error by an 722 // integration upon context timeout. 723 iErr = err 724 } else { 725 lvl := level.Debug(l) 726 if i > 1 { 727 lvl = level.Info(l) 728 } 729 lvl.Log("msg", "Notify success", "attempts", i) 730 return ctx, alerts, nil 731 } 732 case <-ctx.Done(): 733 if iErr == nil { 734 iErr = ctx.Err() 735 } 736 737 return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) 738 } 739 } 740} 741 742// SetNotifiesStage sets the notification information about passed alerts. The 743// passed alerts should have already been sent to the receivers. 744type SetNotifiesStage struct { 745 nflog NotificationLog 746 recv *nflogpb.Receiver 747} 748 749// NewSetNotifiesStage returns a new instance of a SetNotifiesStage. 750func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage { 751 return &SetNotifiesStage{ 752 nflog: l, 753 recv: recv, 754 } 755} 756 757// Exec implements the Stage interface. 758func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 759 gkey, ok := GroupKey(ctx) 760 if !ok { 761 return ctx, nil, errors.New("group key missing") 762 } 763 764 firing, ok := FiringAlerts(ctx) 765 if !ok { 766 return ctx, nil, errors.New("firing alerts missing") 767 } 768 769 resolved, ok := ResolvedAlerts(ctx) 770 if !ok { 771 return ctx, nil, errors.New("resolved alerts missing") 772 } 773 774 return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved) 775} 776 777type TimeMuteStage struct { 778 muteTimes map[string][]timeinterval.TimeInterval 779} 780 781func NewTimeMuteStage(mt map[string][]timeinterval.TimeInterval) *TimeMuteStage { 782 return &TimeMuteStage{mt} 783} 784 785// Exec implements the stage interface for TimeMuteStage. 786// TimeMuteStage is responsible for muting alerts whose route is not in an active time. 787func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 788 muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx) 789 if !ok { 790 return ctx, alerts, nil 791 } 792 now, ok := Now(ctx) 793 if !ok { 794 return ctx, alerts, errors.New("missing now timestamp") 795 } 796 797 muted := false 798Loop: 799 for _, mtName := range muteTimeIntervalNames { 800 mt, ok := tms.muteTimes[mtName] 801 if !ok { 802 return ctx, alerts, errors.Errorf("mute time %s doesn't exist in config", mtName) 803 } 804 for _, ti := range mt { 805 if ti.ContainsTime(now.UTC()) { 806 muted = true 807 break Loop 808 } 809 } 810 } 811 // If the current time is inside a mute time, all alerts are removed from the pipeline. 812 if muted { 813 level.Debug(l).Log("msg", "Notifications not sent, route is within mute time") 814 return ctx, nil, nil 815 } 816 return ctx, alerts, nil 817} 818