1package schedule 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "math/rand" 9 "net/url" 10 "runtime" 11 "sync" 12 "testing" 13 "time" 14 15 "github.com/benbjohnson/clock" 16 "github.com/grafana/grafana-plugin-sdk-go/data" 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/testutil" 19 "github.com/prometheus/common/model" 20 "github.com/stretchr/testify/require" 21 22 "github.com/grafana/grafana/pkg/expr" 23 "github.com/grafana/grafana/pkg/infra/log" 24 "github.com/grafana/grafana/pkg/services/annotations" 25 apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" 26 "github.com/grafana/grafana/pkg/services/ngalert/eval" 27 "github.com/grafana/grafana/pkg/services/ngalert/metrics" 28 "github.com/grafana/grafana/pkg/services/ngalert/models" 29 "github.com/grafana/grafana/pkg/services/ngalert/notifier" 30 "github.com/grafana/grafana/pkg/services/ngalert/sender" 31 "github.com/grafana/grafana/pkg/services/ngalert/state" 32 "github.com/grafana/grafana/pkg/services/ngalert/store" 33 "github.com/grafana/grafana/pkg/services/secrets/fakes" 34 secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager" 35 "github.com/grafana/grafana/pkg/setting" 36) 37 38func TestSendingToExternalAlertmanager(t *testing.T) { 39 fakeAM := NewFakeExternalAlertmanager(t) 40 defer fakeAM.Close() 41 fakeRuleStore := newFakeRuleStore(t) 42 fakeInstanceStore := &FakeInstanceStore{} 43 fakeAdminConfigStore := newFakeAdminConfigStore(t) 44 45 // create alert rule with one second interval 46 alertRule := CreateTestAlertRule(t, fakeRuleStore, 1, 1, eval.Alerting) 47 48 // First, let's create an admin configuration that holds an alertmanager. 49 adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}} 50 cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig} 51 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 52 53 sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil) 54 55 // Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running 56 // when the first alert triggers. 57 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 58 sched.sendersMtx.Lock() 59 require.Equal(t, 1, len(sched.senders)) 60 require.Equal(t, 1, len(sched.sendersCfgHash)) 61 sched.sendersMtx.Unlock() 62 63 // Then, ensure we've discovered the Alertmanager. 64 require.Eventually(t, func() bool { 65 return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0 66 }, 10*time.Second, 200*time.Millisecond) 67 68 ctx, cancel := context.WithCancel(context.Background()) 69 t.Cleanup(func() { 70 cancel() 71 }) 72 go func() { 73 err := sched.Run(ctx) 74 require.NoError(t, err) 75 }() 76 77 // With everything up and running, let's advance the time to make sure we get at least one alert iteration. 78 mockedClock.Add(2 * time.Second) 79 80 // Eventually, our Alertmanager should have received at least one alert. 81 require.Eventually(t, func() bool { 82 return fakeAM.AlertsCount() >= 1 && fakeAM.AlertNamesCompare([]string{alertRule.Title}) 83 }, 10*time.Second, 200*time.Millisecond) 84 85 // Now, let's remove the Alertmanager from the admin configuration. 86 adminConfig.Alertmanagers = []string{} 87 cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig} 88 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 89 90 // Again, make sure we sync and verify the senders. 91 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 92 sched.sendersMtx.Lock() 93 require.Equal(t, 0, len(sched.senders)) 94 require.Equal(t, 0, len(sched.sendersCfgHash)) 95 sched.sendersMtx.Unlock() 96 97 // Then, ensure we've dropped the Alertmanager. 98 require.Eventually(t, func() bool { 99 return len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0 100 }, 10*time.Second, 200*time.Millisecond) 101} 102 103func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) { 104 fakeAM := NewFakeExternalAlertmanager(t) 105 defer fakeAM.Close() 106 fakeRuleStore := newFakeRuleStore(t) 107 fakeInstanceStore := &FakeInstanceStore{} 108 fakeAdminConfigStore := newFakeAdminConfigStore(t) 109 110 // First, let's create an admin configuration that holds an alertmanager. 111 adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}} 112 cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig} 113 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 114 115 sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil) 116 117 // Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running 118 // when the first alert triggers. 119 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 120 sched.sendersMtx.Lock() 121 require.Equal(t, 1, len(sched.senders)) 122 require.Equal(t, 1, len(sched.sendersCfgHash)) 123 sched.sendersMtx.Unlock() 124 125 // Then, ensure we've discovered the Alertmanager. 126 require.Eventuallyf(t, func() bool { 127 return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0 128 }, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 was never discovered") 129 130 ctx, cancel := context.WithCancel(context.Background()) 131 t.Cleanup(func() { 132 cancel() 133 }) 134 go func() { 135 err := sched.Run(ctx) 136 require.NoError(t, err) 137 }() 138 139 // 1. Now, let's assume a new org comes along. 140 adminConfig2 := &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL}} 141 cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2} 142 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 143 144 // If we sync again, new senders must have spawned. 145 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 146 sched.sendersMtx.Lock() 147 require.Equal(t, 2, len(sched.senders)) 148 require.Equal(t, 2, len(sched.sendersCfgHash)) 149 sched.sendersMtx.Unlock() 150 151 // Then, ensure we've discovered the Alertmanager for the new organization. 152 require.Eventuallyf(t, func() bool { 153 return len(sched.AlertmanagersFor(2)) == 1 && len(sched.DroppedAlertmanagersFor(2)) == 0 154 }, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never discovered") 155 156 // With everything up and running, let's advance the time to make sure we get at least one alert iteration. 157 mockedClock.Add(10 * time.Second) 158 159 // TODO(gotjosh): Disabling this assertion as for some reason even after advancing the clock the alert is not being delivered. 160 // the check previous to this assertion would ensure that the sender is up and running before sending the notification. 161 // However, sometimes this does not happen. 162 163 // Create two alert rules with one second interval. 164 // alertRuleOrgOne := CreateTestAlertRule(t, fakeRuleStore, 1, 1) 165 // alertRuleOrgTwo := CreateTestAlertRule(t, fakeRuleStore, 1, 2) 166 // Eventually, our Alertmanager should have received at least two alerts. 167 // var count int 168 // require.Eventuallyf(t, func() bool { 169 // count := fakeAM.AlertsCount() 170 // return count == 2 && fakeAM.AlertNamesCompare([]string{alertRuleOrgOne.Title, alertRuleOrgTwo.Title}) 171 // }, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s' from org 1 or '%s' from org 2, the alert count was: %d", alertRuleOrgOne.Title, alertRuleOrgTwo.Title, count) 172 173 // 2. Next, let's modify the configuration of an organization by adding an extra alertmanager. 174 fakeAM2 := NewFakeExternalAlertmanager(t) 175 adminConfig2 = &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL, fakeAM2.server.URL}} 176 cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2} 177 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 178 179 // Before we sync, let's grab the existing hash of this particular org. 180 sched.sendersMtx.Lock() 181 currentHash := sched.sendersCfgHash[2] 182 sched.sendersMtx.Unlock() 183 184 // Now, sync again. 185 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 186 187 // The hash for org two should not be the same and we should still have two senders. 188 sched.sendersMtx.Lock() 189 require.NotEqual(t, sched.sendersCfgHash[2], currentHash) 190 require.Equal(t, 2, len(sched.senders)) 191 require.Equal(t, 2, len(sched.sendersCfgHash)) 192 sched.sendersMtx.Unlock() 193 194 // Wait for the discovery of the new Alertmanager for orgID = 2. 195 require.Eventuallyf(t, func() bool { 196 return len(sched.AlertmanagersFor(2)) == 2 && len(sched.DroppedAlertmanagersFor(2)) == 0 197 }, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never re-discovered after fix") 198 199 // 3. Now, let's provide a configuration that fails for OrgID = 1. 200 adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"123://invalid.org"}} 201 cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2} 202 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 203 204 // Before we sync, let's get the current config hash. 205 sched.sendersMtx.Lock() 206 currentHash = sched.sendersCfgHash[1] 207 sched.sendersMtx.Unlock() 208 209 // Now, sync again. 210 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 211 212 // The old configuration should still be running. 213 sched.sendersMtx.Lock() 214 require.Equal(t, sched.sendersCfgHash[1], currentHash) 215 sched.sendersMtx.Unlock() 216 require.Equal(t, 1, len(sched.AlertmanagersFor(1))) 217 218 // If we fix it - it should be applied. 219 adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"notarealalertmanager:3030"}} 220 cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2} 221 require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd)) 222 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 223 sched.sendersMtx.Lock() 224 require.NotEqual(t, sched.sendersCfgHash[1], currentHash) 225 sched.sendersMtx.Unlock() 226 227 // Finally, remove everything. 228 require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(1)) 229 require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(2)) 230 require.NoError(t, sched.SyncAndApplyConfigFromDatabase()) 231 sched.sendersMtx.Lock() 232 require.Equal(t, 0, len(sched.senders)) 233 require.Equal(t, 0, len(sched.sendersCfgHash)) 234 sched.sendersMtx.Unlock() 235 236 require.Eventuallyf(t, func() bool { 237 NoAlertmanagerOrgOne := len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0 238 NoAlertmanagerOrgTwo := len(sched.AlertmanagersFor(2)) == 0 && len(sched.DroppedAlertmanagersFor(2)) == 0 239 240 return NoAlertmanagerOrgOne && NoAlertmanagerOrgTwo 241 }, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 and 2 were never removed") 242} 243 244func TestSchedule_ruleRoutine(t *testing.T) { 245 createSchedule := func( 246 evalAppliedChan chan time.Time, 247 ) (*schedule, *fakeRuleStore, *FakeInstanceStore, *fakeAdminConfigStore, prometheus.Gatherer) { 248 ruleStore := newFakeRuleStore(t) 249 instanceStore := &FakeInstanceStore{} 250 adminConfigStore := newFakeAdminConfigStore(t) 251 252 registry := prometheus.NewPedanticRegistry() 253 sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, registry) 254 sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) { 255 evalAppliedChan <- t 256 } 257 return sch, ruleStore, instanceStore, adminConfigStore, registry 258 } 259 260 // normal states do not include NoData and Error because currently it is not possible to perform any sensible test 261 normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending} 262 randomNormalState := func() eval.State { 263 // pick only supported cases 264 return normalStates[rand.Intn(3)] 265 } 266 267 for _, evalState := range normalStates { 268 // TODO rewrite when we are able to mock/fake state manager 269 t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) { 270 evalChan := make(chan *evalContext) 271 evalAppliedChan := make(chan time.Time) 272 sch, ruleStore, instanceStore, _, reg := createSchedule(evalAppliedChan) 273 274 rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState) 275 276 go func() { 277 ctx, cancel := context.WithCancel(context.Background()) 278 t.Cleanup(cancel) 279 _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) 280 }() 281 282 expectedTime := time.UnixMicro(rand.Int63()) 283 284 evalChan <- &evalContext{ 285 now: expectedTime, 286 version: rule.Version, 287 } 288 289 actualTime := waitForTimeChannel(t, evalAppliedChan) 290 require.Equal(t, expectedTime, actualTime) 291 292 t.Run("it should get rule from database when run the first time", func(t *testing.T) { 293 queries := make([]models.GetAlertRuleByUIDQuery, 0) 294 for _, op := range ruleStore.recordedOps { 295 switch q := op.(type) { 296 case models.GetAlertRuleByUIDQuery: 297 queries = append(queries, q) 298 } 299 } 300 require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", models.GetAlertRuleByUIDQuery{}) 301 require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", models.GetAlertRuleByUIDQuery{}, len(queries)) 302 require.Equal(t, rule.UID, queries[0].UID) 303 require.Equal(t, rule.OrgID, queries[0].OrgID) 304 }) 305 t.Run("it should process evaluation results via state manager", func(t *testing.T) { 306 // TODO rewrite when we are able to mock/fake state manager 307 states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) 308 require.Len(t, states, 1) 309 s := states[0] 310 t.Logf("State: %v", s) 311 require.Equal(t, rule.UID, s.AlertRuleUID) 312 require.Len(t, s.Results, 1) 313 var expectedStatus = evalState 314 if evalState == eval.Pending { 315 expectedStatus = eval.Alerting 316 } 317 require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String()) 318 require.Equal(t, expectedTime, s.Results[0].EvaluationTime) 319 }) 320 t.Run("it should save alert instances to storage", func(t *testing.T) { 321 // TODO rewrite when we are able to mock/fake state manager 322 states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) 323 require.Len(t, states, 1) 324 s := states[0] 325 326 var cmd *models.SaveAlertInstanceCommand 327 for _, op := range instanceStore.recordedOps { 328 switch q := op.(type) { 329 case models.SaveAlertInstanceCommand: 330 cmd = &q 331 } 332 if cmd != nil { 333 break 334 } 335 } 336 337 require.NotNil(t, cmd) 338 t.Logf("Saved alert instance: %v", cmd) 339 require.Equal(t, rule.OrgID, cmd.RuleOrgID) 340 require.Equal(t, expectedTime, cmd.LastEvalTime) 341 require.Equal(t, cmd.RuleUID, cmd.RuleUID) 342 require.Equal(t, evalState.String(), string(cmd.State)) 343 require.Equal(t, s.Labels, data.Labels(cmd.Labels)) 344 }) 345 t.Run("it reports metrics", func(t *testing.T) { 346 // duration metric has 0 values because of mocked clock that do not advance 347 expectedMetric := fmt.Sprintf( 348 `# HELP grafana_alerting_rule_evaluation_duration_seconds The duration for a rule to execute. 349 # TYPE grafana_alerting_rule_evaluation_duration_seconds summary 350 grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.5"} 0 351 grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.9"} 0 352 grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.99"} 0 353 grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0 354 grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1 355 # HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures. 356 # TYPE grafana_alerting_rule_evaluation_failures_total counter 357 grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0 358 # HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations. 359 # TYPE grafana_alerting_rule_evaluations_total counter 360 grafana_alerting_rule_evaluations_total{org="%[1]d"} 1 361 `, rule.OrgID) 362 363 err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total") 364 require.NoError(t, err) 365 }) 366 }) 367 } 368 369 t.Run("should exit", func(t *testing.T) { 370 t.Run("when context is cancelled", func(t *testing.T) { 371 stoppedChan := make(chan error) 372 sch, _, _, _, _ := createSchedule(make(chan time.Time)) 373 374 ctx, cancel := context.WithCancel(context.Background()) 375 go func() { 376 err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext)) 377 stoppedChan <- err 378 }() 379 380 cancel() 381 err := waitForErrChannel(t, stoppedChan) 382 require.NoError(t, err) 383 }) 384 }) 385 386 t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) { 387 evalChan := make(chan *evalContext) 388 evalAppliedChan := make(chan time.Time) 389 390 sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) 391 392 rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) 393 394 go func() { 395 ctx, cancel := context.WithCancel(context.Background()) 396 t.Cleanup(cancel) 397 _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) 398 }() 399 400 expectedTime := time.UnixMicro(rand.Int63()) 401 evalChan <- &evalContext{ 402 now: expectedTime, 403 version: rule.Version, 404 } 405 406 actualTime := waitForTimeChannel(t, evalAppliedChan) 407 require.Equal(t, expectedTime, actualTime) 408 409 // Now update the rule 410 newRule := *rule 411 newRule.Version++ 412 ruleStore.putRule(&newRule) 413 414 // and call with new version 415 expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) 416 evalChan <- &evalContext{ 417 now: expectedTime, 418 version: newRule.Version, 419 } 420 421 actualTime = waitForTimeChannel(t, evalAppliedChan) 422 require.Equal(t, expectedTime, actualTime) 423 424 queries := make([]models.GetAlertRuleByUIDQuery, 0) 425 for _, op := range ruleStore.recordedOps { 426 switch q := op.(type) { 427 case models.GetAlertRuleByUIDQuery: 428 queries = append(queries, q) 429 } 430 } 431 require.Len(t, queries, 2, "Expected exactly two request of %T", models.GetAlertRuleByUIDQuery{}) 432 require.Equal(t, rule.UID, queries[0].UID) 433 require.Equal(t, rule.OrgID, queries[0].OrgID) 434 require.Equal(t, rule.UID, queries[1].UID) 435 require.Equal(t, rule.OrgID, queries[1].OrgID) 436 }) 437 438 t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) { 439 evalChan := make(chan *evalContext) 440 evalAppliedChan := make(chan time.Time) 441 442 sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) 443 444 rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) 445 446 go func() { 447 ctx, cancel := context.WithCancel(context.Background()) 448 t.Cleanup(cancel) 449 _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) 450 }() 451 452 expectedTime := time.UnixMicro(rand.Int63()) 453 evalChan <- &evalContext{ 454 now: expectedTime, 455 version: rule.Version, 456 } 457 458 actualTime := waitForTimeChannel(t, evalAppliedChan) 459 require.Equal(t, expectedTime, actualTime) 460 461 // try again with the same version 462 expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) 463 evalChan <- &evalContext{ 464 now: expectedTime, 465 version: rule.Version, 466 } 467 actualTime = waitForTimeChannel(t, evalAppliedChan) 468 require.Equal(t, expectedTime, actualTime) 469 470 expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) 471 evalChan <- &evalContext{ 472 now: expectedTime, 473 version: rule.Version - 1, 474 } 475 actualTime = waitForTimeChannel(t, evalAppliedChan) 476 require.Equal(t, expectedTime, actualTime) 477 478 queries := make([]models.GetAlertRuleByUIDQuery, 0) 479 for _, op := range ruleStore.recordedOps { 480 switch q := op.(type) { 481 case models.GetAlertRuleByUIDQuery: 482 queries = append(queries, q) 483 } 484 } 485 require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{}) 486 }) 487 488 t.Run("when evaluation fails", func(t *testing.T) { 489 t.Run("it should increase failure counter", func(t *testing.T) { 490 t.Skip() 491 // TODO implement check for counter 492 }) 493 t.Run("it should retry up to configured times", func(t *testing.T) { 494 // TODO figure out how to simulate failure 495 t.Skip() 496 }) 497 }) 498 499 t.Run("when there are alerts that should be firing", func(t *testing.T) { 500 t.Run("it should send to local alertmanager if configured for organization", func(t *testing.T) { 501 // TODO figure out how to simulate multiorg alertmanager 502 t.Skip() 503 }) 504 t.Run("it should send to external alertmanager if configured for organization", func(t *testing.T) { 505 fakeAM := NewFakeExternalAlertmanager(t) 506 defer fakeAM.Close() 507 508 orgID := rand.Int63() 509 s, err := sender.New(nil) 510 require.NoError(t, err) 511 adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.server.URL}} 512 err = s.ApplyConfig(adminConfig) 513 require.NoError(t, err) 514 s.Run() 515 defer s.Stop() 516 517 require.Eventuallyf(t, func() bool { 518 return len(s.Alertmanagers()) == 1 519 }, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.") 520 521 evalChan := make(chan *evalContext) 522 evalAppliedChan := make(chan time.Time) 523 524 sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) 525 sch.senders[orgID] = s 526 // eval.Alerting makes state manager to create notifications for alertmanagers 527 rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) 528 529 go func() { 530 ctx, cancel := context.WithCancel(context.Background()) 531 t.Cleanup(cancel) 532 _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) 533 }() 534 535 evalChan <- &evalContext{ 536 now: time.Now(), 537 version: rule.Version, 538 } 539 waitForTimeChannel(t, evalAppliedChan) 540 541 var count int 542 require.Eventuallyf(t, func() bool { 543 count = fakeAM.AlertsCount() 544 return count == 1 && fakeAM.AlertNamesCompare([]string{rule.Title}) 545 }, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s', received alerts count: %d", rule.Title, count) 546 }) 547 }) 548 549 t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) { 550 // TODO needs some mocking/stubbing for Alertmanager and Sender to make sure it was not called 551 t.Skip() 552 }) 553} 554 555func TestSchedule_alertRuleInfo(t *testing.T) { 556 t.Run("when rule evaluation is not stopped", func(t *testing.T) { 557 t.Run("eval should send to evalCh", func(t *testing.T) { 558 r := newAlertRuleInfo(context.Background()) 559 expected := time.Now() 560 resultCh := make(chan bool) 561 version := rand.Int63() 562 go func() { 563 resultCh <- r.eval(expected, version) 564 }() 565 select { 566 case ctx := <-r.evalCh: 567 require.Equal(t, version, ctx.version) 568 require.Equal(t, expected, ctx.now) 569 require.True(t, <-resultCh) 570 case <-time.After(5 * time.Second): 571 t.Fatal("No message was received on eval channel") 572 } 573 }) 574 t.Run("eval should exit when context is cancelled", func(t *testing.T) { 575 r := newAlertRuleInfo(context.Background()) 576 resultCh := make(chan bool) 577 go func() { 578 resultCh <- r.eval(time.Now(), rand.Int63()) 579 }() 580 runtime.Gosched() 581 r.stop() 582 select { 583 case result := <-resultCh: 584 require.False(t, result) 585 case <-time.After(5 * time.Second): 586 t.Fatal("No message was received on eval channel") 587 } 588 }) 589 }) 590 t.Run("when rule evaluation is stopped", func(t *testing.T) { 591 t.Run("eval should do nothing", func(t *testing.T) { 592 r := newAlertRuleInfo(context.Background()) 593 r.stop() 594 require.False(t, r.eval(time.Now(), rand.Int63())) 595 }) 596 t.Run("stop should do nothing", func(t *testing.T) { 597 r := newAlertRuleInfo(context.Background()) 598 r.stop() 599 r.stop() 600 }) 601 }) 602 t.Run("should be thread-safe", func(t *testing.T) { 603 r := newAlertRuleInfo(context.Background()) 604 wg := sync.WaitGroup{} 605 go func() { 606 for { 607 select { 608 case <-r.evalCh: 609 time.Sleep(time.Millisecond) 610 case <-r.ctx.Done(): 611 return 612 } 613 } 614 }() 615 616 for i := 0; i < 10; i++ { 617 wg.Add(1) 618 go func() { 619 for i := 0; i < 20; i++ { 620 max := 2 621 if i <= 10 { 622 max = 1 623 } 624 switch rand.Intn(max) + 1 { 625 case 1: 626 r.eval(time.Now(), rand.Int63()) 627 case 2: 628 r.stop() 629 } 630 } 631 wg.Done() 632 }() 633 } 634 635 wg.Wait() 636 }) 637} 638 639func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) { 640 t.Helper() 641 642 fakeAnnoRepo := NewFakeAnnotationsRepo() 643 annotations.SetRepository(fakeAnnoRepo) 644 mockedClock := clock.NewMock() 645 logger := log.New("ngalert schedule test") 646 if registry == nil { 647 registry = prometheus.NewPedanticRegistry() 648 } 649 m := metrics.NewNGAlert(registry) 650 secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore()) 651 decryptFn := secretsService.GetDecryptedValue 652 moa, err := notifier.NewMultiOrgAlertmanager(&setting.Cfg{}, ¬ifier.FakeConfigStore{}, ¬ifier.FakeOrgStore{}, ¬ifier.FakeKVStore{}, decryptFn, nil, log.New("testlogger")) 653 require.NoError(t, err) 654 655 schedCfg := SchedulerCfg{ 656 C: mockedClock, 657 BaseInterval: time.Second, 658 MaxAttempts: 1, 659 Evaluator: eval.Evaluator{Cfg: &setting.Cfg{ExpressionsEnabled: true}, Log: logger}, 660 RuleStore: rs, 661 InstanceStore: is, 662 AdminConfigStore: acs, 663 MultiOrgNotifier: moa, 664 Logger: logger, 665 Metrics: m.GetSchedulerMetrics(), 666 AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests. 667 } 668 st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is) 669 appUrl := &url.URL{ 670 Scheme: "http", 671 Host: "localhost", 672 } 673 return NewScheduler(schedCfg, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil), appUrl, st), mockedClock 674} 675 676// createTestAlertRule creates a dummy alert definition to be used by the tests. 677func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds int64, orgID int64, evalResult eval.State) *models.AlertRule { 678 t.Helper() 679 records := make([]interface{}, 0, len(dbstore.recordedOps)) 680 copy(records, dbstore.recordedOps) 681 defer func() { 682 // erase queries that were made by the testing suite 683 dbstore.recordedOps = records 684 }() 685 d := rand.Intn(1000) 686 ruleGroup := fmt.Sprintf("ruleGroup-%d", d) 687 688 var expression string 689 var forDuration time.Duration 690 switch evalResult { 691 case eval.Normal: 692 expression = `{ 693 "datasourceUid": "-100", 694 "type":"math", 695 "expression":"2 + 1 < 1" 696 }` 697 case eval.Pending, eval.Alerting: 698 expression = `{ 699 "datasourceUid": "-100", 700 "type":"math", 701 "expression":"2 + 2 > 1" 702 }` 703 if evalResult == eval.Pending { 704 forDuration = 100 * time.Second 705 } 706 case eval.Error: 707 expression = `{ 708 "datasourceUid": "-100", 709 "type":"math", 710 "expression":"$A" 711 }` 712 case eval.NoData: 713 // TODO Implement support for NoData 714 require.Fail(t, "Alert rule with desired evaluation result NoData is not supported yet") 715 } 716 717 err := dbstore.UpdateRuleGroup(store.UpdateRuleGroupCmd{ 718 OrgID: orgID, 719 NamespaceUID: "namespace", 720 RuleGroupConfig: apimodels.PostableRuleGroupConfig{ 721 Name: ruleGroup, 722 Interval: model.Duration(time.Duration(intervalSeconds) * time.Second), 723 Rules: []apimodels.PostableExtendedRuleNode{ 724 { 725 ApiRuleNode: &apimodels.ApiRuleNode{ 726 Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, 727 For: model.Duration(forDuration), 728 }, 729 GrafanaManagedAlert: &apimodels.PostableGrafanaRule{ 730 Title: fmt.Sprintf("an alert definition %d", d), 731 Condition: "A", 732 Data: []models.AlertQuery{ 733 { 734 DatasourceUID: "-100", 735 Model: json.RawMessage(expression), 736 RelativeTimeRange: models.RelativeTimeRange{ 737 From: models.Duration(5 * time.Hour), 738 To: models.Duration(3 * time.Hour), 739 }, 740 RefID: "A", 741 }, 742 }, 743 }, 744 }, 745 }, 746 }, 747 }) 748 require.NoError(t, err) 749 750 q := models.ListRuleGroupAlertRulesQuery{ 751 OrgID: orgID, 752 NamespaceUID: "namespace", 753 RuleGroup: ruleGroup, 754 } 755 err = dbstore.GetRuleGroupAlertRules(&q) 756 require.NoError(t, err) 757 require.NotEmpty(t, q.Result) 758 759 rule := q.Result[0] 760 t.Logf("alert definition: %v with interval: %d created", rule.GetKey(), rule.IntervalSeconds) 761 return rule 762} 763