1// Copyright 2018 Prometheus Team 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package test 15 16import ( 17 "bytes" 18 "context" 19 "fmt" 20 "io/ioutil" 21 "net" 22 "os" 23 "os/exec" 24 "path/filepath" 25 "sync" 26 "syscall" 27 "testing" 28 "time" 29 30 apiclient "github.com/prometheus/alertmanager/api/v2/client" 31 "github.com/prometheus/alertmanager/api/v2/client/alert" 32 "github.com/prometheus/alertmanager/api/v2/client/general" 33 "github.com/prometheus/alertmanager/api/v2/client/silence" 34 "github.com/prometheus/alertmanager/api/v2/models" 35 36 httptransport "github.com/go-openapi/runtime/client" 37 "github.com/go-openapi/strfmt" 38) 39 40// AcceptanceTest provides declarative definition of given inputs and expected 41// output of an Alertmanager setup. 42type AcceptanceTest struct { 43 *testing.T 44 45 opts *AcceptanceOpts 46 47 amc *AlertmanagerCluster 48 collectors []*Collector 49 50 actions map[float64][]func() 51} 52 53// AcceptanceOpts defines configuration parameters for an acceptance test. 54type AcceptanceOpts struct { 55 RoutePrefix string 56 Tolerance time.Duration 57 baseTime time.Time 58} 59 60func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string { 61 if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() { 62 return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt))) 63 } 64 return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt))) 65} 66 67// expandTime returns the absolute time for the relative time 68// calculated from the test's base time. 69func (opts *AcceptanceOpts) expandTime(rel float64) time.Time { 70 return opts.baseTime.Add(time.Duration(rel * float64(time.Second))) 71} 72 73// expandTime returns the relative time for the given time 74// calculated from the test's base time. 75func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 { 76 return float64(act.Sub(opts.baseTime)) / float64(time.Second) 77} 78 79// NewAcceptanceTest returns a new acceptance test with the base time 80// set to the current time. 81func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest { 82 test := &AcceptanceTest{ 83 T: t, 84 opts: opts, 85 actions: map[float64][]func(){}, 86 } 87 // TODO: Should this really be set during creation time? Why not do this 88 // during Run() time, maybe there is something else long happening between 89 // creation and running. 90 opts.baseTime = time.Now() 91 92 return test 93} 94 95// freeAddress returns a new listen address not currently in use. 96func freeAddress() string { 97 // Let the OS allocate a free address, close it and hope 98 // it is still free when starting Alertmanager. 99 l, err := net.Listen("tcp4", "localhost:0") 100 if err != nil { 101 panic(err) 102 } 103 defer func() { 104 if err := l.Close(); err != nil { 105 panic(err) 106 } 107 }() 108 109 return l.Addr().String() 110} 111 112// Do sets the given function to be executed at the given time. 113func (t *AcceptanceTest) Do(at float64, f func()) { 114 t.actions[at] = append(t.actions[at], f) 115} 116 117// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a 118// cluster of Alertmanager instances on random ports. 119func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster { 120 amc := AlertmanagerCluster{} 121 122 for i := 0; i < size; i++ { 123 am := &Alertmanager{ 124 t: t, 125 opts: t.opts, 126 } 127 128 dir, err := ioutil.TempDir("", "am_test") 129 if err != nil { 130 t.Fatal(err) 131 } 132 am.dir = dir 133 134 cf, err := os.Create(filepath.Join(dir, "config.yml")) 135 if err != nil { 136 t.Fatal(err) 137 } 138 am.confFile = cf 139 am.UpdateConfig(conf) 140 141 am.apiAddr = freeAddress() 142 am.clusterAddr = freeAddress() 143 144 transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil) 145 am.clientV2 = apiclient.New(transport, strfmt.Default) 146 147 amc.ams = append(amc.ams, am) 148 } 149 150 t.amc = &amc 151 152 return &amc 153} 154 155// Collector returns a new collector bound to the test instance. 156func (t *AcceptanceTest) Collector(name string) *Collector { 157 co := &Collector{ 158 t: t.T, 159 name: name, 160 opts: t.opts, 161 collected: map[float64][]models.GettableAlerts{}, 162 expected: map[Interval][]models.GettableAlerts{}, 163 } 164 t.collectors = append(t.collectors, co) 165 166 return co 167} 168 169// Run starts all Alertmanagers and runs queries against them. It then checks 170// whether all expected notifications have arrived at the expected receiver. 171func (t *AcceptanceTest) Run() { 172 errc := make(chan error) 173 174 for _, am := range t.amc.ams { 175 am.errc = errc 176 defer func(am *Alertmanager) { 177 am.Terminate() 178 am.cleanup() 179 t.Logf("stdout:\n%v", am.cmd.Stdout) 180 t.Logf("stderr:\n%v", am.cmd.Stderr) 181 }(am) 182 } 183 184 err := t.amc.Start() 185 if err != nil { 186 t.T.Fatal(err) 187 } 188 189 go t.runActions() 190 191 var latest float64 192 for _, coll := range t.collectors { 193 if l := coll.latest(); l > latest { 194 latest = l 195 } 196 } 197 198 deadline := t.opts.expandTime(latest) 199 200 select { 201 case <-time.After(time.Until(deadline)): 202 // continue 203 case err := <-errc: 204 t.Error(err) 205 } 206} 207 208// runActions performs the stored actions at the defined times. 209func (t *AcceptanceTest) runActions() { 210 var wg sync.WaitGroup 211 212 for at, fs := range t.actions { 213 ts := t.opts.expandTime(at) 214 wg.Add(len(fs)) 215 216 for _, f := range fs { 217 go func(f func()) { 218 time.Sleep(time.Until(ts)) 219 f() 220 wg.Done() 221 }(f) 222 } 223 } 224 225 wg.Wait() 226} 227 228type buffer struct { 229 b bytes.Buffer 230 mtx sync.Mutex 231} 232 233func (b *buffer) Write(p []byte) (int, error) { 234 b.mtx.Lock() 235 defer b.mtx.Unlock() 236 return b.b.Write(p) 237} 238 239func (b *buffer) String() string { 240 b.mtx.Lock() 241 defer b.mtx.Unlock() 242 return b.b.String() 243} 244 245// Alertmanager encapsulates an Alertmanager process and allows 246// declaring alerts being pushed to it at fixed points in time. 247type Alertmanager struct { 248 t *AcceptanceTest 249 opts *AcceptanceOpts 250 251 apiAddr string 252 clusterAddr string 253 clientV2 *apiclient.Alertmanager 254 cmd *exec.Cmd 255 confFile *os.File 256 dir string 257 258 errc chan<- error 259} 260 261// AlertmanagerCluster represents a group of Alertmanager instances 262// acting as a cluster. 263type AlertmanagerCluster struct { 264 ams []*Alertmanager 265} 266 267// Start the Alertmanager cluster and wait until it is ready to receive. 268func (amc *AlertmanagerCluster) Start() error { 269 var peerFlags []string 270 for _, am := range amc.ams { 271 peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr) 272 } 273 274 for _, am := range amc.ams { 275 err := am.Start(peerFlags) 276 if err != nil { 277 return fmt.Errorf("starting alertmanager cluster: %v", err.Error()) 278 } 279 } 280 281 for _, am := range amc.ams { 282 err := am.WaitForCluster(len(amc.ams)) 283 if err != nil { 284 return fmt.Errorf("starting alertmanager cluster: %v", err.Error()) 285 } 286 } 287 288 return nil 289} 290 291// Members returns the underlying slice of cluster members. 292func (amc *AlertmanagerCluster) Members() []*Alertmanager { 293 return amc.ams 294} 295 296// Start the alertmanager and wait until it is ready to receive. 297func (am *Alertmanager) Start(additionalArg []string) error { 298 am.t.Helper() 299 args := []string{ 300 "--config.file", am.confFile.Name(), 301 "--log.level", "debug", 302 "--web.listen-address", am.apiAddr, 303 "--storage.path", am.dir, 304 "--cluster.listen-address", am.clusterAddr, 305 "--cluster.settle-timeout", "0s", 306 } 307 if am.opts.RoutePrefix != "" { 308 args = append(args, "--web.route-prefix", am.opts.RoutePrefix) 309 } 310 args = append(args, additionalArg...) 311 312 cmd := exec.Command("../../../alertmanager", args...) 313 314 if am.cmd == nil { 315 var outb, errb buffer 316 cmd.Stdout = &outb 317 cmd.Stderr = &errb 318 } else { 319 cmd.Stdout = am.cmd.Stdout 320 cmd.Stderr = am.cmd.Stderr 321 } 322 am.cmd = cmd 323 324 if err := am.cmd.Start(); err != nil { 325 return fmt.Errorf("starting alertmanager failed: %s", err) 326 } 327 328 go func() { 329 if err := am.cmd.Wait(); err != nil { 330 am.errc <- err 331 } 332 }() 333 334 time.Sleep(50 * time.Millisecond) 335 for i := 0; i < 10; i++ { 336 _, err := am.clientV2.General.GetStatus(nil) 337 if err == nil { 338 return nil 339 } 340 time.Sleep(500 * time.Millisecond) 341 } 342 return fmt.Errorf("starting alertmanager failed: timeout") 343} 344 345// WaitForCluster waits for the Alertmanager instance to join a cluster with the 346// given size. 347func (am *Alertmanager) WaitForCluster(size int) error { 348 params := general.NewGetStatusParams() 349 params.WithContext(context.Background()) 350 var status *general.GetStatusOK 351 352 // Poll for 2s 353 for i := 0; i < 20; i++ { 354 var err error 355 status, err = am.clientV2.General.GetStatus(params) 356 if err != nil { 357 return err 358 } 359 360 if len(status.Payload.Cluster.Peers) == size { 361 return nil 362 } 363 time.Sleep(100 * time.Millisecond) 364 } 365 366 return fmt.Errorf( 367 "failed to wait for Alertmanager instance %q to join cluster: expected %v peers, but got %v", 368 am.clusterAddr, 369 size, 370 len(status.Payload.Cluster.Peers), 371 ) 372} 373 374// Terminate kills the underlying Alertmanager cluster processes and removes intermediate 375// data. 376func (amc *AlertmanagerCluster) Terminate() { 377 for _, am := range amc.ams { 378 am.Terminate() 379 } 380} 381 382// Terminate kills the underlying Alertmanager process and remove intermediate 383// data. 384func (am *Alertmanager) Terminate() { 385 am.t.Helper() 386 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil { 387 am.t.Fatalf("Error sending SIGTERM to Alertmanager process: %v", err) 388 } 389} 390 391// Reload sends the reloading signal to the Alertmanager instances. 392func (amc *AlertmanagerCluster) Reload() { 393 for _, am := range amc.ams { 394 am.Reload() 395 } 396} 397 398// Reload sends the reloading signal to the Alertmanager process. 399func (am *Alertmanager) Reload() { 400 am.t.Helper() 401 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil { 402 am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err) 403 } 404} 405 406func (am *Alertmanager) cleanup() { 407 am.t.Helper() 408 if err := os.RemoveAll(am.confFile.Name()); err != nil { 409 am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err) 410 } 411} 412 413// Push declares alerts that are to be pushed to the Alertmanager 414// servers at a relative point in time. 415func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) { 416 for _, am := range amc.ams { 417 am.Push(at, alerts...) 418 } 419} 420 421// Push declares alerts that are to be pushed to the Alertmanager 422// server at a relative point in time. 423func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) { 424 var cas models.PostableAlerts 425 for i := range alerts { 426 a := alerts[i].nativeAlert(am.opts) 427 alert := &models.PostableAlert{ 428 Alert: models.Alert{ 429 Labels: a.Labels, 430 GeneratorURL: a.GeneratorURL, 431 }, 432 Annotations: a.Annotations, 433 } 434 if a.StartsAt != nil { 435 alert.StartsAt = *a.StartsAt 436 } 437 if a.EndsAt != nil { 438 alert.EndsAt = *a.EndsAt 439 } 440 cas = append(cas, alert) 441 } 442 443 am.t.Do(at, func() { 444 params := alert.PostAlertsParams{} 445 params.WithContext(context.Background()).WithAlerts(cas) 446 447 _, err := am.clientV2.Alert.PostAlerts(¶ms) 448 if err != nil { 449 am.t.Errorf("Error pushing %v: %v", cas, err) 450 } 451 }) 452} 453 454// SetSilence updates or creates the given Silence. 455func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) { 456 for _, am := range amc.ams { 457 am.SetSilence(at, sil) 458 } 459} 460 461// SetSilence updates or creates the given Silence. 462func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) { 463 am.t.Do(at, func() { 464 resp, err := am.clientV2.Silence.PostSilences( 465 silence.NewPostSilencesParams().WithSilence( 466 &models.PostableSilence{ 467 Silence: *sil.nativeSilence(am.opts), 468 }, 469 ), 470 ) 471 if err != nil { 472 am.t.Errorf("Error setting silence %v: %s", sil, err) 473 return 474 } 475 sil.SetID(resp.Payload.SilenceID) 476 }) 477} 478 479// DelSilence deletes the silence with the sid at the given time. 480func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) { 481 for _, am := range amc.ams { 482 am.DelSilence(at, sil) 483 } 484} 485 486// DelSilence deletes the silence with the sid at the given time. 487func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) { 488 am.t.Do(at, func() { 489 _, err := am.clientV2.Silence.DeleteSilence( 490 silence.NewDeleteSilenceParams().WithSilenceID(strfmt.UUID(sil.ID())), 491 ) 492 if err != nil { 493 am.t.Errorf("Error deleting silence %v: %s", sil, err) 494 } 495 }) 496} 497 498// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It 499// does not initiate config reloading. 500func (amc *AlertmanagerCluster) UpdateConfig(conf string) { 501 for _, am := range amc.ams { 502 am.UpdateConfig(conf) 503 } 504} 505 506// UpdateConfig rewrites the configuration file for the Alertmanager. It does not 507// initiate config reloading. 508func (am *Alertmanager) UpdateConfig(conf string) { 509 if _, err := am.confFile.WriteString(conf); err != nil { 510 am.t.Fatal(err) 511 return 512 } 513 if err := am.confFile.Sync(); err != nil { 514 am.t.Fatal(err) 515 return 516 } 517} 518 519// Client returns a client to interact with the API v2 endpoint. 520func (am *Alertmanager) Client() *apiclient.Alertmanager { 521 return am.clientV2 522} 523