1// Copyright 2015 Prometheus Team 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package notify 15 16import ( 17 "context" 18 "fmt" 19 "sort" 20 "sync" 21 "time" 22 23 "github.com/cenkalti/backoff" 24 "github.com/cespare/xxhash" 25 "github.com/go-kit/kit/log" 26 "github.com/go-kit/kit/log/level" 27 "github.com/prometheus/client_golang/prometheus" 28 "github.com/prometheus/common/model" 29 30 "github.com/prometheus/alertmanager/cluster" 31 "github.com/prometheus/alertmanager/inhibit" 32 "github.com/prometheus/alertmanager/nflog" 33 "github.com/prometheus/alertmanager/nflog/nflogpb" 34 "github.com/prometheus/alertmanager/silence" 35 "github.com/prometheus/alertmanager/types" 36) 37 38// ResolvedSender returns true if resolved notifications should be sent. 39type ResolvedSender interface { 40 SendResolved() bool 41} 42 43// MinTimeout is the minimum timeout that is set for the context of a call 44// to a notification pipeline. 45const MinTimeout = 10 * time.Second 46 47// Notifier notifies about alerts under constraints of the given context. It 48// returns an error if unsuccessful and a flag whether the error is 49// recoverable. This information is useful for a retry logic. 50type Notifier interface { 51 Notify(context.Context, ...*types.Alert) (bool, error) 52} 53 54// Integration wraps a notifier and its configuration to be uniquely identified 55// by name and index from its origin in the configuration. 56type Integration struct { 57 notifier Notifier 58 rs ResolvedSender 59 name string 60 idx int 61} 62 63// NewIntegration returns a new integration. 64func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration { 65 return Integration{ 66 notifier: notifier, 67 rs: rs, 68 name: name, 69 idx: idx, 70 } 71} 72 73// Notify implements the Notifier interface. 74func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { 75 return i.notifier.Notify(ctx, alerts...) 76} 77 78// SendResolved implements the ResolvedSender interface. 79func (i *Integration) SendResolved() bool { 80 return i.rs.SendResolved() 81} 82 83// Name returns the name of the integration. 84func (i *Integration) Name() string { 85 return i.name 86} 87 88// Index returns the index of the integration. 89func (i *Integration) Index() int { 90 return i.idx 91} 92 93// notifyKey defines a custom type with which a context is populated to 94// avoid accidental collisions. 95type notifyKey int 96 97const ( 98 keyReceiverName notifyKey = iota 99 keyRepeatInterval 100 keyGroupLabels 101 keyGroupKey 102 keyFiringAlerts 103 keyResolvedAlerts 104 keyNow 105) 106 107// WithReceiverName populates a context with a receiver name. 108func WithReceiverName(ctx context.Context, rcv string) context.Context { 109 return context.WithValue(ctx, keyReceiverName, rcv) 110} 111 112// WithGroupKey populates a context with a group key. 113func WithGroupKey(ctx context.Context, s string) context.Context { 114 return context.WithValue(ctx, keyGroupKey, s) 115} 116 117// WithFiringAlerts populates a context with a slice of firing alerts. 118func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { 119 return context.WithValue(ctx, keyFiringAlerts, alerts) 120} 121 122// WithResolvedAlerts populates a context with a slice of resolved alerts. 123func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context { 124 return context.WithValue(ctx, keyResolvedAlerts, alerts) 125} 126 127// WithGroupLabels populates a context with grouping labels. 128func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context { 129 return context.WithValue(ctx, keyGroupLabels, lset) 130} 131 132// WithNow populates a context with a now timestamp. 133func WithNow(ctx context.Context, t time.Time) context.Context { 134 return context.WithValue(ctx, keyNow, t) 135} 136 137// WithRepeatInterval populates a context with a repeat interval. 138func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context { 139 return context.WithValue(ctx, keyRepeatInterval, t) 140} 141 142// RepeatInterval extracts a repeat interval from the context. Iff none exists, the 143// second argument is false. 144func RepeatInterval(ctx context.Context) (time.Duration, bool) { 145 v, ok := ctx.Value(keyRepeatInterval).(time.Duration) 146 return v, ok 147} 148 149// ReceiverName extracts a receiver name from the context. Iff none exists, the 150// second argument is false. 151func ReceiverName(ctx context.Context) (string, bool) { 152 v, ok := ctx.Value(keyReceiverName).(string) 153 return v, ok 154} 155 156// GroupKey extracts a group key from the context. Iff none exists, the 157// second argument is false. 158func GroupKey(ctx context.Context) (string, bool) { 159 v, ok := ctx.Value(keyGroupKey).(string) 160 return v, ok 161} 162 163// GroupLabels extracts grouping label set from the context. Iff none exists, the 164// second argument is false. 165func GroupLabels(ctx context.Context) (model.LabelSet, bool) { 166 v, ok := ctx.Value(keyGroupLabels).(model.LabelSet) 167 return v, ok 168} 169 170// Now extracts a now timestamp from the context. Iff none exists, the 171// second argument is false. 172func Now(ctx context.Context) (time.Time, bool) { 173 v, ok := ctx.Value(keyNow).(time.Time) 174 return v, ok 175} 176 177// FiringAlerts extracts a slice of firing alerts from the context. 178// Iff none exists, the second argument is false. 179func FiringAlerts(ctx context.Context) ([]uint64, bool) { 180 v, ok := ctx.Value(keyFiringAlerts).([]uint64) 181 return v, ok 182} 183 184// ResolvedAlerts extracts a slice of firing alerts from the context. 185// Iff none exists, the second argument is false. 186func ResolvedAlerts(ctx context.Context) ([]uint64, bool) { 187 v, ok := ctx.Value(keyResolvedAlerts).([]uint64) 188 return v, ok 189} 190 191// A Stage processes alerts under the constraints of the given context. 192type Stage interface { 193 Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) 194} 195 196// StageFunc wraps a function to represent a Stage. 197type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) 198 199// Exec implements Stage interface. 200func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 201 return f(ctx, l, alerts...) 202} 203 204type NotificationLog interface { 205 Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error 206 Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error) 207} 208 209type metrics struct { 210 numNotifications *prometheus.CounterVec 211 numFailedNotifications *prometheus.CounterVec 212 notificationLatencySeconds *prometheus.HistogramVec 213} 214 215func newMetrics(r prometheus.Registerer) *metrics { 216 m := &metrics{ 217 numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ 218 Namespace: "alertmanager", 219 Name: "notifications_total", 220 Help: "The total number of attempted notifications.", 221 }, []string{"integration"}), 222 numFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ 223 Namespace: "alertmanager", 224 Name: "notifications_failed_total", 225 Help: "The total number of failed notifications.", 226 }, []string{"integration"}), 227 notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{ 228 Namespace: "alertmanager", 229 Name: "notification_latency_seconds", 230 Help: "The latency of notifications in seconds.", 231 Buckets: []float64{1, 5, 10, 15, 20}, 232 }, []string{"integration"}), 233 } 234 for _, integration := range []string{ 235 "email", 236 "hipchat", 237 "pagerduty", 238 "wechat", 239 "pushover", 240 "slack", 241 "opsgenie", 242 "webhook", 243 "victorops", 244 } { 245 m.numNotifications.WithLabelValues(integration) 246 m.numFailedNotifications.WithLabelValues(integration) 247 m.notificationLatencySeconds.WithLabelValues(integration) 248 } 249 r.MustRegister(m.numNotifications, m.numFailedNotifications, m.notificationLatencySeconds) 250 return m 251} 252 253type PipelineBuilder struct { 254 metrics *metrics 255} 256 257func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder { 258 return &PipelineBuilder{ 259 metrics: newMetrics(r), 260 } 261} 262 263// New returns a map of receivers to Stages. 264func (pb *PipelineBuilder) New( 265 receivers map[string][]Integration, 266 wait func() time.Duration, 267 inhibitor *inhibit.Inhibitor, 268 silencer *silence.Silencer, 269 notificationLog NotificationLog, 270 peer *cluster.Peer, 271) RoutingStage { 272 rs := make(RoutingStage, len(receivers)) 273 274 ms := NewGossipSettleStage(peer) 275 is := NewMuteStage(inhibitor) 276 ss := NewMuteStage(silencer) 277 278 for name := range receivers { 279 st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) 280 rs[name] = MultiStage{ms, is, ss, st} 281 } 282 return rs 283} 284 285// createReceiverStage creates a pipeline of stages for a receiver. 286func createReceiverStage( 287 name string, 288 integrations []Integration, 289 wait func() time.Duration, 290 notificationLog NotificationLog, 291 metrics *metrics, 292) Stage { 293 var fs FanoutStage 294 for i := range integrations { 295 recv := &nflogpb.Receiver{ 296 GroupName: name, 297 Integration: integrations[i].Name(), 298 Idx: uint32(integrations[i].Index()), 299 } 300 var s MultiStage 301 s = append(s, NewWaitStage(wait)) 302 s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) 303 s = append(s, NewRetryStage(integrations[i], name, metrics)) 304 s = append(s, NewSetNotifiesStage(notificationLog, recv)) 305 306 fs = append(fs, s) 307 } 308 return fs 309} 310 311// RoutingStage executes the inner stages based on the receiver specified in 312// the context. 313type RoutingStage map[string]Stage 314 315// Exec implements the Stage interface. 316func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 317 receiver, ok := ReceiverName(ctx) 318 if !ok { 319 return ctx, nil, fmt.Errorf("receiver missing") 320 } 321 322 s, ok := rs[receiver] 323 if !ok { 324 return ctx, nil, fmt.Errorf("stage for receiver missing") 325 } 326 327 return s.Exec(ctx, l, alerts...) 328} 329 330// A MultiStage executes a series of stages sequentially. 331type MultiStage []Stage 332 333// Exec implements the Stage interface. 334func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 335 var err error 336 for _, s := range ms { 337 if len(alerts) == 0 { 338 return ctx, nil, nil 339 } 340 341 ctx, alerts, err = s.Exec(ctx, l, alerts...) 342 if err != nil { 343 return ctx, nil, err 344 } 345 } 346 return ctx, alerts, nil 347} 348 349// FanoutStage executes its stages concurrently 350type FanoutStage []Stage 351 352// Exec attempts to execute all stages concurrently and discards the results. 353// It returns its input alerts and a types.MultiError if one or more stages fail. 354func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 355 var ( 356 wg sync.WaitGroup 357 me types.MultiError 358 ) 359 wg.Add(len(fs)) 360 361 for _, s := range fs { 362 go func(s Stage) { 363 if _, _, err := s.Exec(ctx, l, alerts...); err != nil { 364 me.Add(err) 365 lvl := level.Error(l) 366 if ctx.Err() == context.Canceled { 367 // It is expected for the context to be canceled on 368 // configuration reload or shutdown. In this case, the 369 // message should only be logged at the debug level. 370 lvl = level.Debug(l) 371 } 372 lvl.Log("msg", "Error on notify", "err", err, "context_err", ctx.Err()) 373 } 374 wg.Done() 375 }(s) 376 } 377 wg.Wait() 378 379 if me.Len() > 0 { 380 return ctx, alerts, &me 381 } 382 return ctx, alerts, nil 383} 384 385// GossipSettleStage waits until the Gossip has settled to forward alerts. 386type GossipSettleStage struct { 387 peer *cluster.Peer 388} 389 390// NewGossipSettleStage returns a new GossipSettleStage. 391func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage { 392 return &GossipSettleStage{peer: p} 393} 394 395func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 396 if n.peer != nil { 397 n.peer.WaitReady() 398 } 399 return ctx, alerts, nil 400} 401 402// MuteStage filters alerts through a Muter. 403type MuteStage struct { 404 muter types.Muter 405} 406 407// NewMuteStage return a new MuteStage. 408func NewMuteStage(m types.Muter) *MuteStage { 409 return &MuteStage{muter: m} 410} 411 412// Exec implements the Stage interface. 413func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 414 var filtered []*types.Alert 415 for _, a := range alerts { 416 // TODO(fabxc): increment total alerts counter. 417 // Do not send the alert if muted. 418 if !n.muter.Mutes(a.Labels) { 419 filtered = append(filtered, a) 420 } 421 // TODO(fabxc): increment muted alerts counter if muted. 422 } 423 return ctx, filtered, nil 424} 425 426// WaitStage waits for a certain amount of time before continuing or until the 427// context is done. 428type WaitStage struct { 429 wait func() time.Duration 430} 431 432// NewWaitStage returns a new WaitStage. 433func NewWaitStage(wait func() time.Duration) *WaitStage { 434 return &WaitStage{ 435 wait: wait, 436 } 437} 438 439// Exec implements the Stage interface. 440func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 441 select { 442 case <-time.After(ws.wait()): 443 case <-ctx.Done(): 444 return ctx, nil, ctx.Err() 445 } 446 return ctx, alerts, nil 447} 448 449// DedupStage filters alerts. 450// Filtering happens based on a notification log. 451type DedupStage struct { 452 rs ResolvedSender 453 nflog NotificationLog 454 recv *nflogpb.Receiver 455 456 now func() time.Time 457 hash func(*types.Alert) uint64 458} 459 460// NewDedupStage wraps a DedupStage that runs against the given notification log. 461func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { 462 return &DedupStage{ 463 rs: rs, 464 nflog: l, 465 recv: recv, 466 now: utcNow, 467 hash: hashAlert, 468 } 469} 470 471func utcNow() time.Time { 472 return time.Now().UTC() 473} 474 475var hashBuffers = sync.Pool{} 476 477func getHashBuffer() []byte { 478 b := hashBuffers.Get() 479 if b == nil { 480 return make([]byte, 0, 1024) 481 } 482 return b.([]byte) 483} 484 485func putHashBuffer(b []byte) { 486 b = b[:0] 487 //lint:ignore SA6002 relax staticcheck verification. 488 hashBuffers.Put(b) 489} 490 491func hashAlert(a *types.Alert) uint64 { 492 const sep = '\xff' 493 494 b := getHashBuffer() 495 defer putHashBuffer(b) 496 497 names := make(model.LabelNames, 0, len(a.Labels)) 498 499 for ln := range a.Labels { 500 names = append(names, ln) 501 } 502 sort.Sort(names) 503 504 for _, ln := range names { 505 b = append(b, string(ln)...) 506 b = append(b, sep) 507 b = append(b, string(a.Labels[ln])...) 508 b = append(b, sep) 509 } 510 511 hash := xxhash.Sum64(b) 512 513 return hash 514} 515 516func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { 517 // If we haven't notified about the alert group before, notify right away 518 // unless we only have resolved alerts. 519 if entry == nil { 520 return len(firing) > 0 521 } 522 523 if !entry.IsFiringSubset(firing) { 524 return true 525 } 526 527 // Notify about all alerts being resolved. 528 // This is done irrespective of the send_resolved flag to make sure that 529 // the firing alerts are cleared from the notification log. 530 if len(firing) == 0 { 531 // If the current alert group and last notification contain no firing 532 // alert, it means that some alerts have been fired and resolved during the 533 // last interval. In this case, there is no need to notify the receiver 534 // since it doesn't know about them. 535 return len(entry.FiringAlerts) > 0 536 } 537 538 if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { 539 return true 540 } 541 542 // Nothing changed, only notify if the repeat interval has passed. 543 return entry.Timestamp.Before(n.now().Add(-repeat)) 544} 545 546// Exec implements the Stage interface. 547func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 548 gkey, ok := GroupKey(ctx) 549 if !ok { 550 return ctx, nil, fmt.Errorf("group key missing") 551 } 552 553 repeatInterval, ok := RepeatInterval(ctx) 554 if !ok { 555 return ctx, nil, fmt.Errorf("repeat interval missing") 556 } 557 558 firingSet := map[uint64]struct{}{} 559 resolvedSet := map[uint64]struct{}{} 560 firing := []uint64{} 561 resolved := []uint64{} 562 563 var hash uint64 564 for _, a := range alerts { 565 hash = n.hash(a) 566 if a.Resolved() { 567 resolved = append(resolved, hash) 568 resolvedSet[hash] = struct{}{} 569 } else { 570 firing = append(firing, hash) 571 firingSet[hash] = struct{}{} 572 } 573 } 574 575 ctx = WithFiringAlerts(ctx, firing) 576 ctx = WithResolvedAlerts(ctx, resolved) 577 578 entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv)) 579 if err != nil && err != nflog.ErrNotFound { 580 return ctx, nil, err 581 } 582 583 var entry *nflogpb.Entry 584 switch len(entries) { 585 case 0: 586 case 1: 587 entry = entries[0] 588 default: 589 return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) 590 } 591 592 if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { 593 return ctx, alerts, nil 594 } 595 return ctx, nil, nil 596} 597 598// RetryStage notifies via passed integration with exponential backoff until it 599// succeeds. It aborts if the context is canceled or timed out. 600type RetryStage struct { 601 integration Integration 602 groupName string 603 metrics *metrics 604} 605 606// NewRetryStage returns a new instance of a RetryStage. 607func NewRetryStage(i Integration, groupName string, metrics *metrics) *RetryStage { 608 return &RetryStage{ 609 integration: i, 610 groupName: groupName, 611 metrics: metrics, 612 } 613} 614 615// Exec implements the Stage interface. 616func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 617 var sent []*types.Alert 618 619 // If we shouldn't send notifications for resolved alerts, but there are only 620 // resolved alerts, report them all as successfully notified (we still want the 621 // notification log to log them for the next run of DedupStage). 622 if !r.integration.SendResolved() { 623 firing, ok := FiringAlerts(ctx) 624 if !ok { 625 return ctx, nil, fmt.Errorf("firing alerts missing") 626 } 627 if len(firing) == 0 { 628 return ctx, alerts, nil 629 } 630 for _, a := range alerts { 631 if a.Status() != model.AlertResolved { 632 sent = append(sent, a) 633 } 634 } 635 } else { 636 sent = alerts 637 } 638 639 var ( 640 i = 0 641 b = backoff.NewExponentialBackOff() 642 tick = backoff.NewTicker(b) 643 iErr error 644 ) 645 defer tick.Stop() 646 647 for { 648 i++ 649 // Always check the context first to not notify again. 650 select { 651 case <-ctx.Done(): 652 if iErr != nil { 653 return ctx, nil, iErr 654 } 655 656 return ctx, nil, ctx.Err() 657 default: 658 } 659 660 select { 661 case <-tick.C: 662 now := time.Now() 663 retry, err := r.integration.Notify(ctx, sent...) 664 r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds()) 665 r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc() 666 if err != nil { 667 r.metrics.numFailedNotifications.WithLabelValues(r.integration.Name()).Inc() 668 level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err) 669 if !retry { 670 return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err) 671 } 672 673 // Save this error to be able to return the last seen error by an 674 // integration upon context timeout. 675 iErr = err 676 } else { 677 return ctx, alerts, nil 678 } 679 case <-ctx.Done(): 680 if iErr != nil { 681 return ctx, nil, iErr 682 } 683 684 return ctx, nil, ctx.Err() 685 } 686 } 687} 688 689// SetNotifiesStage sets the notification information about passed alerts. The 690// passed alerts should have already been sent to the receivers. 691type SetNotifiesStage struct { 692 nflog NotificationLog 693 recv *nflogpb.Receiver 694} 695 696// NewSetNotifiesStage returns a new instance of a SetNotifiesStage. 697func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage { 698 return &SetNotifiesStage{ 699 nflog: l, 700 recv: recv, 701 } 702} 703 704// Exec implements the Stage interface. 705func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { 706 gkey, ok := GroupKey(ctx) 707 if !ok { 708 return ctx, nil, fmt.Errorf("group key missing") 709 } 710 711 firing, ok := FiringAlerts(ctx) 712 if !ok { 713 return ctx, nil, fmt.Errorf("firing alerts missing") 714 } 715 716 resolved, ok := ResolvedAlerts(ctx) 717 if !ok { 718 return ctx, nil, fmt.Errorf("resolved alerts missing") 719 } 720 721 return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved) 722} 723