1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4// Package alert contains logic to send alert notifications to Alertmanager clusters. 5package alert 6 7import ( 8 "bytes" 9 "context" 10 "encoding/json" 11 "fmt" 12 "io" 13 "net/http" 14 "net/url" 15 "path" 16 "sync" 17 "time" 18 19 "github.com/go-kit/kit/log" 20 "github.com/go-kit/kit/log/level" 21 "github.com/go-openapi/strfmt" 22 "github.com/pkg/errors" 23 "github.com/prometheus/alertmanager/api/v2/models" 24 "github.com/prometheus/client_golang/prometheus" 25 "github.com/prometheus/client_golang/prometheus/promauto" 26 "github.com/prometheus/prometheus/pkg/labels" 27 "go.uber.org/atomic" 28 29 "github.com/thanos-io/thanos/pkg/runutil" 30 "github.com/thanos-io/thanos/pkg/tracing" 31) 32 33const ( 34 defaultAlertmanagerPort = 9093 35 contentTypeJSON = "application/json" 36) 37 38// Alert is a generic representation of an alert in the Prometheus eco-system. 39type Alert struct { 40 // Label value pairs for purpose of aggregation, matching, and disposition 41 // dispatching. This must minimally include an "alertname" label. 42 Labels labels.Labels `json:"labels"` 43 44 // Extra key/value information which does not define alert identity. 45 Annotations labels.Labels `json:"annotations"` 46 47 // The known time range for this alert. Start and end time are both optional. 48 StartsAt time.Time `json:"startsAt,omitempty"` 49 EndsAt time.Time `json:"endsAt,omitempty"` 50 GeneratorURL string `json:"generatorURL,omitempty"` 51} 52 53// Name returns the name of the alert. It is equivalent to the "alertname" label. 54func (a *Alert) Name() string { 55 return a.Labels.Get(labels.AlertName) 56} 57 58// Hash returns a hash over the alert. It is equivalent to the alert labels hash. 59func (a *Alert) Hash() uint64 { 60 return a.Labels.Hash() 61} 62 63func (a *Alert) String() string { 64 s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7]) 65 if a.Resolved() { 66 return s + "[resolved]" 67 } 68 return s + "[active]" 69} 70 71// Resolved returns true iff the activity interval ended in the past. 72func (a *Alert) Resolved() bool { 73 return a.ResolvedAt(time.Now()) 74} 75 76// ResolvedAt returns true off the activity interval ended before 77// the given timestamp. 78func (a *Alert) ResolvedAt(ts time.Time) bool { 79 if a.EndsAt.IsZero() { 80 return false 81 } 82 return !a.EndsAt.After(ts) 83} 84 85// Queue is a queue of alert notifications waiting to be sent. The queue is consumed in batches 86// and entries are dropped at the front if it runs full. 87type Queue struct { 88 logger log.Logger 89 maxBatchSize int 90 capacity int 91 toAddLset labels.Labels 92 toExcludeLabels labels.Labels 93 94 mtx sync.Mutex 95 queue []*Alert 96 morec chan struct{} 97 98 pushed prometheus.Counter 99 popped prometheus.Counter 100 dropped prometheus.Counter 101} 102 103func relabelLabels(lset labels.Labels, excludeLset []string) (toAdd labels.Labels, toExclude labels.Labels) { 104 for _, ln := range excludeLset { 105 toExclude = append(toExclude, labels.Label{Name: ln}) 106 } 107 108 for _, l := range lset { 109 // Exclude labels to to add straight away. 110 if toExclude.Has(l.Name) { 111 continue 112 } 113 toAdd = append(toAdd, labels.Label{ 114 Name: l.Name, 115 Value: l.Value, 116 }) 117 } 118 return toAdd, toExclude 119} 120 121// NewQueue returns a new queue. The given label set is attached to all alerts pushed to the queue. 122// The given exclude label set tells what label names to drop including external labels. 123func NewQueue(logger log.Logger, reg prometheus.Registerer, capacity, maxBatchSize int, externalLset labels.Labels, excludeLabels []string) *Queue { 124 toAdd, toExclude := relabelLabels(externalLset, excludeLabels) 125 126 if logger == nil { 127 logger = log.NewNopLogger() 128 } 129 q := &Queue{ 130 logger: logger, 131 capacity: capacity, 132 morec: make(chan struct{}, 1), 133 maxBatchSize: maxBatchSize, 134 toAddLset: toAdd, 135 toExcludeLabels: toExclude, 136 137 dropped: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 138 Name: "thanos_alert_queue_alerts_dropped_total", 139 Help: "Total number of alerts that were dropped from the queue.", 140 }), 141 pushed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 142 Name: "thanos_alert_queue_alerts_pushed_total", 143 Help: "Total number of alerts pushed to the queue.", 144 }), 145 popped: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 146 Name: "thanos_alert_queue_alerts_popped_total", 147 Help: "Total number of alerts popped from the queue.", 148 }), 149 } 150 _ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ 151 Name: "thanos_alert_queue_capacity", 152 Help: "Capacity of the alert queue.", 153 }, func() float64 { 154 return float64(q.Cap()) 155 }) 156 _ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ 157 Name: "thanos_alert_queue_length", 158 Help: "Length of the alert queue.", 159 }, func() float64 { 160 return float64(q.Len()) 161 }) 162 return q 163} 164 165// Len returns the current length of the queue. 166func (q *Queue) Len() int { 167 q.mtx.Lock() 168 defer q.mtx.Unlock() 169 return len(q.queue) 170} 171 172// Cap returns the fixed capacity of the queue. 173func (q *Queue) Cap() int { 174 return q.capacity 175} 176 177// Pop takes a batch of alerts from the front of the queue. The batch size is limited 178// according to the queues maxBatchSize limit. 179// It blocks until elements are available or a termination signal is send on termc. 180func (q *Queue) Pop(termc <-chan struct{}) []*Alert { 181 select { 182 case <-termc: 183 return nil 184 case <-q.morec: 185 } 186 187 q.mtx.Lock() 188 defer q.mtx.Unlock() 189 190 as := make([]*Alert, q.maxBatchSize) 191 n := copy(as, q.queue) 192 q.queue = q.queue[n:] 193 194 q.popped.Add(float64(n)) 195 196 if len(q.queue) > 0 { 197 select { 198 case q.morec <- struct{}{}: 199 default: 200 } 201 } 202 return as[:n] 203} 204 205// Push adds a list of alerts to the queue. 206func (q *Queue) Push(alerts []*Alert) { 207 if len(alerts) == 0 { 208 return 209 } 210 211 q.mtx.Lock() 212 defer q.mtx.Unlock() 213 214 q.pushed.Add(float64(len(alerts))) 215 216 // Attach external labels and drop excluded labels before sending. 217 // TODO(bwplotka): User proper relabelling with https://github.com/thanos-io/thanos/issues/660. 218 for _, a := range alerts { 219 lb := labels.NewBuilder(labels.Labels{}) 220 for _, l := range a.Labels { 221 if q.toExcludeLabels.Has(l.Name) { 222 continue 223 } 224 lb.Set(l.Name, l.Value) 225 } 226 for _, l := range q.toAddLset { 227 lb.Set(l.Name, l.Value) 228 } 229 a.Labels = lb.Labels() 230 } 231 232 // Queue capacity should be significantly larger than a single alert 233 // batch could be. 234 if d := len(alerts) - q.capacity; d > 0 { 235 alerts = alerts[d:] 236 237 level.Warn(q.logger).Log( 238 "msg", "Alert batch larger than queue capacity, dropping alerts", 239 "numDropped", d) 240 q.dropped.Add(float64(d)) 241 } 242 243 // If the queue is full, remove the oldest alerts in favor 244 // of newer ones. 245 if d := (len(q.queue) + len(alerts)) - q.capacity; d > 0 { 246 q.queue = q.queue[d:] 247 248 level.Warn(q.logger).Log( 249 "msg", "Alert notification queue full, dropping alerts", 250 "numDropped", d) 251 q.dropped.Add(float64(d)) 252 } 253 254 q.queue = append(q.queue, alerts...) 255 256 select { 257 case q.morec <- struct{}{}: 258 default: 259 } 260} 261 262// Sender sends notifications to a dynamic set of alertmanagers. 263type Sender struct { 264 logger log.Logger 265 alertmanagers []*Alertmanager 266 versions []APIVersion 267 268 sent *prometheus.CounterVec 269 errs *prometheus.CounterVec 270 dropped prometheus.Counter 271 latency *prometheus.HistogramVec 272} 273 274// NewSender returns a new sender. On each call to Send the entire alert batch is sent 275// to each Alertmanager returned by the getter function. 276func NewSender( 277 logger log.Logger, 278 reg prometheus.Registerer, 279 alertmanagers []*Alertmanager, 280) *Sender { 281 if logger == nil { 282 logger = log.NewNopLogger() 283 } 284 var ( 285 versions []APIVersion 286 versionPresent map[APIVersion]struct{} 287 ) 288 for _, am := range alertmanagers { 289 if _, found := versionPresent[am.version]; found { 290 continue 291 } 292 versions = append(versions, am.version) 293 } 294 s := &Sender{ 295 logger: logger, 296 alertmanagers: alertmanagers, 297 versions: versions, 298 299 sent: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ 300 Name: "thanos_alert_sender_alerts_sent_total", 301 Help: "Total number of alerts sent by alertmanager.", 302 }, []string{"alertmanager"}), 303 304 errs: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ 305 Name: "thanos_alert_sender_errors_total", 306 Help: "Total number of errors while sending alerts to alertmanager.", 307 }, []string{"alertmanager"}), 308 309 dropped: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 310 Name: "thanos_alert_sender_alerts_dropped_total", 311 Help: "Total number of alerts dropped in case of all sends to alertmanagers failed.", 312 }), 313 314 latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ 315 Name: "thanos_alert_sender_latency_seconds", 316 Help: "Latency for sending alert notifications (not including dropped notifications).", 317 }, []string{"alertmanager"}), 318 } 319 return s 320} 321 322func toAPILabels(labels labels.Labels) models.LabelSet { 323 apiLabels := make(models.LabelSet, len(labels)) 324 for _, label := range labels { 325 apiLabels[label.Name] = label.Value 326 } 327 328 return apiLabels 329} 330 331// Send an alert batch to all given Alertmanager clients. 332// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. 333func (s *Sender) Send(ctx context.Context, alerts []*Alert) { 334 if len(alerts) == 0 { 335 return 336 } 337 338 payload := make(map[APIVersion][]byte) 339 for _, version := range s.versions { 340 var ( 341 b []byte 342 err error 343 ) 344 switch version { 345 case APIv1: 346 if b, err = json.Marshal(alerts); err != nil { 347 level.Warn(s.logger).Log("msg", "encoding alerts for v1 API failed", "err", err) 348 return 349 } 350 case APIv2: 351 apiAlerts := make(models.PostableAlerts, 0, len(alerts)) 352 for _, a := range alerts { 353 apiAlerts = append(apiAlerts, &models.PostableAlert{ 354 Annotations: toAPILabels(a.Annotations), 355 EndsAt: strfmt.DateTime(a.EndsAt), 356 StartsAt: strfmt.DateTime(a.StartsAt), 357 Alert: models.Alert{ 358 GeneratorURL: strfmt.URI(a.GeneratorURL), 359 Labels: toAPILabels(a.Labels), 360 }, 361 }) 362 } 363 if b, err = json.Marshal(apiAlerts); err != nil { 364 level.Warn(s.logger).Log("msg", "encoding alerts for v2 API failed", "err", err) 365 return 366 } 367 } 368 payload[version] = b 369 } 370 371 var ( 372 wg sync.WaitGroup 373 numSuccess atomic.Uint64 374 ) 375 for _, am := range s.alertmanagers { 376 for _, u := range am.dispatcher.Endpoints() { 377 wg.Add(1) 378 go func(am *Alertmanager, u url.URL) { 379 defer wg.Done() 380 381 level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts)) 382 start := time.Now() 383 u.Path = path.Join(u.Path, fmt.Sprintf("/api/%s/alerts", string(am.version))) 384 385 tracing.DoInSpan(ctx, "post_alerts HTTP[client]", func(ctx context.Context) { 386 if err := am.postAlerts(ctx, u, bytes.NewReader(payload[am.version])); err != nil { 387 level.Warn(s.logger).Log( 388 "msg", "sending alerts failed", 389 "alertmanager", u.Host, 390 "alerts", string(payload[am.version]), 391 "err", err, 392 ) 393 s.errs.WithLabelValues(u.Host).Inc() 394 return 395 } 396 s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds()) 397 s.sent.WithLabelValues(u.Host).Add(float64(len(alerts))) 398 399 numSuccess.Inc() 400 }) 401 }(am, *u) 402 } 403 } 404 wg.Wait() 405 406 if numSuccess.Load() > 0 { 407 return 408 } 409 410 s.dropped.Add(float64(len(alerts))) 411 level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "numAlerts", len(alerts)) 412} 413 414type Dispatcher interface { 415 // Endpoints returns the list of endpoint URLs the dispatcher knows about. 416 Endpoints() []*url.URL 417 // Do sends an HTTP request and returns a response. 418 Do(*http.Request) (*http.Response, error) 419} 420 421// Alertmanager is an HTTP client that can send alerts to a cluster of Alertmanager endpoints. 422type Alertmanager struct { 423 logger log.Logger 424 dispatcher Dispatcher 425 timeout time.Duration 426 version APIVersion 427} 428 429// NewAlertmanager returns a new Alertmanager client. 430func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration, version APIVersion) *Alertmanager { 431 if logger == nil { 432 logger = log.NewNopLogger() 433 } 434 435 return &Alertmanager{ 436 logger: logger, 437 dispatcher: dispatcher, 438 timeout: timeout, 439 version: version, 440 } 441} 442 443func (a *Alertmanager) postAlerts(ctx context.Context, u url.URL, r io.Reader) error { 444 req, err := http.NewRequest("POST", u.String(), r) 445 if err != nil { 446 return err 447 } 448 ctx, cancel := context.WithTimeout(ctx, a.timeout) 449 defer cancel() 450 req = req.WithContext(ctx) 451 req.Header.Set("Content-Type", contentTypeJSON) 452 453 resp, err := a.dispatcher.Do(req) 454 if err != nil { 455 return errors.Wrapf(err, "send request to %q", u.String()) 456 } 457 defer runutil.ExhaustCloseWithLogOnErr(a.logger, resp.Body, "send one alert") 458 459 if resp.StatusCode/100 != 2 { 460 return errors.Errorf("bad response status %v from %q", resp.Status, u.String()) 461 } 462 return nil 463} 464