1package schedule
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"fmt"
8	"math/rand"
9	"net/url"
10	"runtime"
11	"sync"
12	"testing"
13	"time"
14
15	"github.com/benbjohnson/clock"
16	"github.com/grafana/grafana-plugin-sdk-go/data"
17	"github.com/prometheus/client_golang/prometheus"
18	"github.com/prometheus/client_golang/prometheus/testutil"
19	"github.com/prometheus/common/model"
20	"github.com/stretchr/testify/require"
21
22	"github.com/grafana/grafana/pkg/expr"
23	"github.com/grafana/grafana/pkg/infra/log"
24	"github.com/grafana/grafana/pkg/services/annotations"
25	apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
26	"github.com/grafana/grafana/pkg/services/ngalert/eval"
27	"github.com/grafana/grafana/pkg/services/ngalert/metrics"
28	"github.com/grafana/grafana/pkg/services/ngalert/models"
29	"github.com/grafana/grafana/pkg/services/ngalert/notifier"
30	"github.com/grafana/grafana/pkg/services/ngalert/sender"
31	"github.com/grafana/grafana/pkg/services/ngalert/state"
32	"github.com/grafana/grafana/pkg/services/ngalert/store"
33	"github.com/grafana/grafana/pkg/services/secrets/fakes"
34	secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
35	"github.com/grafana/grafana/pkg/setting"
36)
37
38func TestSendingToExternalAlertmanager(t *testing.T) {
39	fakeAM := NewFakeExternalAlertmanager(t)
40	defer fakeAM.Close()
41	fakeRuleStore := newFakeRuleStore(t)
42	fakeInstanceStore := &FakeInstanceStore{}
43	fakeAdminConfigStore := newFakeAdminConfigStore(t)
44
45	// create alert rule with one second interval
46	alertRule := CreateTestAlertRule(t, fakeRuleStore, 1, 1, eval.Alerting)
47
48	// First, let's create an admin configuration that holds an alertmanager.
49	adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}}
50	cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
51	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
52
53	sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
54
55	// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
56	// when the first alert triggers.
57	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
58	sched.sendersMtx.Lock()
59	require.Equal(t, 1, len(sched.senders))
60	require.Equal(t, 1, len(sched.sendersCfgHash))
61	sched.sendersMtx.Unlock()
62
63	// Then, ensure we've discovered the Alertmanager.
64	require.Eventually(t, func() bool {
65		return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
66	}, 10*time.Second, 200*time.Millisecond)
67
68	ctx, cancel := context.WithCancel(context.Background())
69	t.Cleanup(func() {
70		cancel()
71	})
72	go func() {
73		err := sched.Run(ctx)
74		require.NoError(t, err)
75	}()
76
77	// With everything up and running, let's advance the time to make sure we get at least one alert iteration.
78	mockedClock.Add(2 * time.Second)
79
80	// Eventually, our Alertmanager should have received at least one alert.
81	require.Eventually(t, func() bool {
82		return fakeAM.AlertsCount() >= 1 && fakeAM.AlertNamesCompare([]string{alertRule.Title})
83	}, 10*time.Second, 200*time.Millisecond)
84
85	// Now, let's remove the Alertmanager from the admin configuration.
86	adminConfig.Alertmanagers = []string{}
87	cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
88	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
89
90	// Again, make sure we sync and verify the senders.
91	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
92	sched.sendersMtx.Lock()
93	require.Equal(t, 0, len(sched.senders))
94	require.Equal(t, 0, len(sched.sendersCfgHash))
95	sched.sendersMtx.Unlock()
96
97	// Then, ensure we've dropped the Alertmanager.
98	require.Eventually(t, func() bool {
99		return len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
100	}, 10*time.Second, 200*time.Millisecond)
101}
102
103func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
104	fakeAM := NewFakeExternalAlertmanager(t)
105	defer fakeAM.Close()
106	fakeRuleStore := newFakeRuleStore(t)
107	fakeInstanceStore := &FakeInstanceStore{}
108	fakeAdminConfigStore := newFakeAdminConfigStore(t)
109
110	// First, let's create an admin configuration that holds an alertmanager.
111	adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}}
112	cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
113	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
114
115	sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
116
117	// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
118	// when the first alert triggers.
119	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
120	sched.sendersMtx.Lock()
121	require.Equal(t, 1, len(sched.senders))
122	require.Equal(t, 1, len(sched.sendersCfgHash))
123	sched.sendersMtx.Unlock()
124
125	// Then, ensure we've discovered the Alertmanager.
126	require.Eventuallyf(t, func() bool {
127		return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
128	}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 was never discovered")
129
130	ctx, cancel := context.WithCancel(context.Background())
131	t.Cleanup(func() {
132		cancel()
133	})
134	go func() {
135		err := sched.Run(ctx)
136		require.NoError(t, err)
137	}()
138
139	// 1. Now, let's assume a new org comes along.
140	adminConfig2 := &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL}}
141	cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
142	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
143
144	// If we sync again, new senders must have spawned.
145	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
146	sched.sendersMtx.Lock()
147	require.Equal(t, 2, len(sched.senders))
148	require.Equal(t, 2, len(sched.sendersCfgHash))
149	sched.sendersMtx.Unlock()
150
151	// Then, ensure we've discovered the Alertmanager for the new organization.
152	require.Eventuallyf(t, func() bool {
153		return len(sched.AlertmanagersFor(2)) == 1 && len(sched.DroppedAlertmanagersFor(2)) == 0
154	}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never discovered")
155
156	// With everything up and running, let's advance the time to make sure we get at least one alert iteration.
157	mockedClock.Add(10 * time.Second)
158
159	// TODO(gotjosh): Disabling this assertion as for some reason even after advancing the clock the alert is not being delivered.
160	// the check previous to this assertion would ensure that the sender is up and running before sending the notification.
161	// However, sometimes this does not happen.
162
163	// Create two alert rules with one second interval.
164	// alertRuleOrgOne := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
165	// alertRuleOrgTwo := CreateTestAlertRule(t, fakeRuleStore, 1, 2)
166	// Eventually, our Alertmanager should have received at least two alerts.
167	// var count int
168	// require.Eventuallyf(t, func() bool {
169	//	count := fakeAM.AlertsCount()
170	//	return count == 2 && fakeAM.AlertNamesCompare([]string{alertRuleOrgOne.Title, alertRuleOrgTwo.Title})
171	// }, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s' from org 1 or '%s' from org 2, the alert count was: %d", alertRuleOrgOne.Title, alertRuleOrgTwo.Title, count)
172
173	// 2. Next, let's modify the configuration of an organization by adding an extra alertmanager.
174	fakeAM2 := NewFakeExternalAlertmanager(t)
175	adminConfig2 = &models.AdminConfiguration{OrgID: 2, Alertmanagers: []string{fakeAM.server.URL, fakeAM2.server.URL}}
176	cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
177	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
178
179	// Before we sync, let's grab the existing hash of this particular org.
180	sched.sendersMtx.Lock()
181	currentHash := sched.sendersCfgHash[2]
182	sched.sendersMtx.Unlock()
183
184	// Now, sync again.
185	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
186
187	// The hash for org two should not be the same and we should still have two senders.
188	sched.sendersMtx.Lock()
189	require.NotEqual(t, sched.sendersCfgHash[2], currentHash)
190	require.Equal(t, 2, len(sched.senders))
191	require.Equal(t, 2, len(sched.sendersCfgHash))
192	sched.sendersMtx.Unlock()
193
194	// Wait for the discovery of the new Alertmanager for orgID = 2.
195	require.Eventuallyf(t, func() bool {
196		return len(sched.AlertmanagersFor(2)) == 2 && len(sched.DroppedAlertmanagersFor(2)) == 0
197	}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never re-discovered after fix")
198
199	// 3. Now, let's provide a configuration that fails for OrgID = 1.
200	adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"123://invalid.org"}}
201	cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
202	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
203
204	// Before we sync, let's get the current config hash.
205	sched.sendersMtx.Lock()
206	currentHash = sched.sendersCfgHash[1]
207	sched.sendersMtx.Unlock()
208
209	// Now, sync again.
210	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
211
212	// The old configuration should still be running.
213	sched.sendersMtx.Lock()
214	require.Equal(t, sched.sendersCfgHash[1], currentHash)
215	sched.sendersMtx.Unlock()
216	require.Equal(t, 1, len(sched.AlertmanagersFor(1)))
217
218	// If we fix it - it should be applied.
219	adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"notarealalertmanager:3030"}}
220	cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
221	require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
222	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
223	sched.sendersMtx.Lock()
224	require.NotEqual(t, sched.sendersCfgHash[1], currentHash)
225	sched.sendersMtx.Unlock()
226
227	// Finally, remove everything.
228	require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(1))
229	require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(2))
230	require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
231	sched.sendersMtx.Lock()
232	require.Equal(t, 0, len(sched.senders))
233	require.Equal(t, 0, len(sched.sendersCfgHash))
234	sched.sendersMtx.Unlock()
235
236	require.Eventuallyf(t, func() bool {
237		NoAlertmanagerOrgOne := len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
238		NoAlertmanagerOrgTwo := len(sched.AlertmanagersFor(2)) == 0 && len(sched.DroppedAlertmanagersFor(2)) == 0
239
240		return NoAlertmanagerOrgOne && NoAlertmanagerOrgTwo
241	}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 and 2 were never removed")
242}
243
244func TestSchedule_ruleRoutine(t *testing.T) {
245	createSchedule := func(
246		evalAppliedChan chan time.Time,
247	) (*schedule, *fakeRuleStore, *FakeInstanceStore, *fakeAdminConfigStore, prometheus.Gatherer) {
248		ruleStore := newFakeRuleStore(t)
249		instanceStore := &FakeInstanceStore{}
250		adminConfigStore := newFakeAdminConfigStore(t)
251
252		registry := prometheus.NewPedanticRegistry()
253		sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, registry)
254		sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
255			evalAppliedChan <- t
256		}
257		return sch, ruleStore, instanceStore, adminConfigStore, registry
258	}
259
260	// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
261	normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
262	randomNormalState := func() eval.State {
263		// pick only supported cases
264		return normalStates[rand.Intn(3)]
265	}
266
267	for _, evalState := range normalStates {
268		// TODO rewrite when we are able to mock/fake state manager
269		t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
270			evalChan := make(chan *evalContext)
271			evalAppliedChan := make(chan time.Time)
272			sch, ruleStore, instanceStore, _, reg := createSchedule(evalAppliedChan)
273
274			rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState)
275
276			go func() {
277				ctx, cancel := context.WithCancel(context.Background())
278				t.Cleanup(cancel)
279				_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
280			}()
281
282			expectedTime := time.UnixMicro(rand.Int63())
283
284			evalChan <- &evalContext{
285				now:     expectedTime,
286				version: rule.Version,
287			}
288
289			actualTime := waitForTimeChannel(t, evalAppliedChan)
290			require.Equal(t, expectedTime, actualTime)
291
292			t.Run("it should get rule from database when run the first time", func(t *testing.T) {
293				queries := make([]models.GetAlertRuleByUIDQuery, 0)
294				for _, op := range ruleStore.recordedOps {
295					switch q := op.(type) {
296					case models.GetAlertRuleByUIDQuery:
297						queries = append(queries, q)
298					}
299				}
300				require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", models.GetAlertRuleByUIDQuery{})
301				require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", models.GetAlertRuleByUIDQuery{}, len(queries))
302				require.Equal(t, rule.UID, queries[0].UID)
303				require.Equal(t, rule.OrgID, queries[0].OrgID)
304			})
305			t.Run("it should process evaluation results via state manager", func(t *testing.T) {
306				// TODO rewrite when we are able to mock/fake state manager
307				states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
308				require.Len(t, states, 1)
309				s := states[0]
310				t.Logf("State: %v", s)
311				require.Equal(t, rule.UID, s.AlertRuleUID)
312				require.Len(t, s.Results, 1)
313				var expectedStatus = evalState
314				if evalState == eval.Pending {
315					expectedStatus = eval.Alerting
316				}
317				require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String())
318				require.Equal(t, expectedTime, s.Results[0].EvaluationTime)
319			})
320			t.Run("it should save alert instances to storage", func(t *testing.T) {
321				// TODO rewrite when we are able to mock/fake state manager
322				states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
323				require.Len(t, states, 1)
324				s := states[0]
325
326				var cmd *models.SaveAlertInstanceCommand
327				for _, op := range instanceStore.recordedOps {
328					switch q := op.(type) {
329					case models.SaveAlertInstanceCommand:
330						cmd = &q
331					}
332					if cmd != nil {
333						break
334					}
335				}
336
337				require.NotNil(t, cmd)
338				t.Logf("Saved alert instance: %v", cmd)
339				require.Equal(t, rule.OrgID, cmd.RuleOrgID)
340				require.Equal(t, expectedTime, cmd.LastEvalTime)
341				require.Equal(t, cmd.RuleUID, cmd.RuleUID)
342				require.Equal(t, evalState.String(), string(cmd.State))
343				require.Equal(t, s.Labels, data.Labels(cmd.Labels))
344			})
345			t.Run("it reports metrics", func(t *testing.T) {
346				// duration metric has 0 values because of mocked clock that do not advance
347				expectedMetric := fmt.Sprintf(
348					`# HELP grafana_alerting_rule_evaluation_duration_seconds The duration for a rule to execute.
349        	            	# TYPE grafana_alerting_rule_evaluation_duration_seconds summary
350        	            	grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.5"} 0
351        	            	grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.9"} 0
352        	            	grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.99"} 0
353        	            	grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
354        	            	grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
355							# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
356        	            	# TYPE grafana_alerting_rule_evaluation_failures_total counter
357        	            	grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0
358        	            	# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
359        	            	# TYPE grafana_alerting_rule_evaluations_total counter
360        	            	grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
361				`, rule.OrgID)
362
363				err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total")
364				require.NoError(t, err)
365			})
366		})
367	}
368
369	t.Run("should exit", func(t *testing.T) {
370		t.Run("when context is cancelled", func(t *testing.T) {
371			stoppedChan := make(chan error)
372			sch, _, _, _, _ := createSchedule(make(chan time.Time))
373
374			ctx, cancel := context.WithCancel(context.Background())
375			go func() {
376				err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext))
377				stoppedChan <- err
378			}()
379
380			cancel()
381			err := waitForErrChannel(t, stoppedChan)
382			require.NoError(t, err)
383		})
384	})
385
386	t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) {
387		evalChan := make(chan *evalContext)
388		evalAppliedChan := make(chan time.Time)
389
390		sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
391
392		rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
393
394		go func() {
395			ctx, cancel := context.WithCancel(context.Background())
396			t.Cleanup(cancel)
397			_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
398		}()
399
400		expectedTime := time.UnixMicro(rand.Int63())
401		evalChan <- &evalContext{
402			now:     expectedTime,
403			version: rule.Version,
404		}
405
406		actualTime := waitForTimeChannel(t, evalAppliedChan)
407		require.Equal(t, expectedTime, actualTime)
408
409		// Now update the rule
410		newRule := *rule
411		newRule.Version++
412		ruleStore.putRule(&newRule)
413
414		// and call with new version
415		expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
416		evalChan <- &evalContext{
417			now:     expectedTime,
418			version: newRule.Version,
419		}
420
421		actualTime = waitForTimeChannel(t, evalAppliedChan)
422		require.Equal(t, expectedTime, actualTime)
423
424		queries := make([]models.GetAlertRuleByUIDQuery, 0)
425		for _, op := range ruleStore.recordedOps {
426			switch q := op.(type) {
427			case models.GetAlertRuleByUIDQuery:
428				queries = append(queries, q)
429			}
430		}
431		require.Len(t, queries, 2, "Expected exactly two request of %T", models.GetAlertRuleByUIDQuery{})
432		require.Equal(t, rule.UID, queries[0].UID)
433		require.Equal(t, rule.OrgID, queries[0].OrgID)
434		require.Equal(t, rule.UID, queries[1].UID)
435		require.Equal(t, rule.OrgID, queries[1].OrgID)
436	})
437
438	t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) {
439		evalChan := make(chan *evalContext)
440		evalAppliedChan := make(chan time.Time)
441
442		sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
443
444		rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
445
446		go func() {
447			ctx, cancel := context.WithCancel(context.Background())
448			t.Cleanup(cancel)
449			_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
450		}()
451
452		expectedTime := time.UnixMicro(rand.Int63())
453		evalChan <- &evalContext{
454			now:     expectedTime,
455			version: rule.Version,
456		}
457
458		actualTime := waitForTimeChannel(t, evalAppliedChan)
459		require.Equal(t, expectedTime, actualTime)
460
461		// try again with the same version
462		expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
463		evalChan <- &evalContext{
464			now:     expectedTime,
465			version: rule.Version,
466		}
467		actualTime = waitForTimeChannel(t, evalAppliedChan)
468		require.Equal(t, expectedTime, actualTime)
469
470		expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
471		evalChan <- &evalContext{
472			now:     expectedTime,
473			version: rule.Version - 1,
474		}
475		actualTime = waitForTimeChannel(t, evalAppliedChan)
476		require.Equal(t, expectedTime, actualTime)
477
478		queries := make([]models.GetAlertRuleByUIDQuery, 0)
479		for _, op := range ruleStore.recordedOps {
480			switch q := op.(type) {
481			case models.GetAlertRuleByUIDQuery:
482				queries = append(queries, q)
483			}
484		}
485		require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{})
486	})
487
488	t.Run("when evaluation fails", func(t *testing.T) {
489		t.Run("it should increase failure counter", func(t *testing.T) {
490			t.Skip()
491			// TODO implement check for counter
492		})
493		t.Run("it should retry up to configured times", func(t *testing.T) {
494			// TODO figure out how to simulate failure
495			t.Skip()
496		})
497	})
498
499	t.Run("when there are alerts that should be firing", func(t *testing.T) {
500		t.Run("it should send to local alertmanager if configured for organization", func(t *testing.T) {
501			// TODO figure out how to simulate multiorg alertmanager
502			t.Skip()
503		})
504		t.Run("it should send to external alertmanager if configured for organization", func(t *testing.T) {
505			fakeAM := NewFakeExternalAlertmanager(t)
506			defer fakeAM.Close()
507
508			orgID := rand.Int63()
509			s, err := sender.New(nil)
510			require.NoError(t, err)
511			adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.server.URL}}
512			err = s.ApplyConfig(adminConfig)
513			require.NoError(t, err)
514			s.Run()
515			defer s.Stop()
516
517			require.Eventuallyf(t, func() bool {
518				return len(s.Alertmanagers()) == 1
519			}, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.")
520
521			evalChan := make(chan *evalContext)
522			evalAppliedChan := make(chan time.Time)
523
524			sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
525			sch.senders[orgID] = s
526			// eval.Alerting makes state manager to create notifications for alertmanagers
527			rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
528
529			go func() {
530				ctx, cancel := context.WithCancel(context.Background())
531				t.Cleanup(cancel)
532				_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
533			}()
534
535			evalChan <- &evalContext{
536				now:     time.Now(),
537				version: rule.Version,
538			}
539			waitForTimeChannel(t, evalAppliedChan)
540
541			var count int
542			require.Eventuallyf(t, func() bool {
543				count = fakeAM.AlertsCount()
544				return count == 1 && fakeAM.AlertNamesCompare([]string{rule.Title})
545			}, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s', received alerts count: %d", rule.Title, count)
546		})
547	})
548
549	t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
550		// TODO needs some mocking/stubbing for Alertmanager and Sender to make sure it was not called
551		t.Skip()
552	})
553}
554
555func TestSchedule_alertRuleInfo(t *testing.T) {
556	t.Run("when rule evaluation is not stopped", func(t *testing.T) {
557		t.Run("eval should send to evalCh", func(t *testing.T) {
558			r := newAlertRuleInfo(context.Background())
559			expected := time.Now()
560			resultCh := make(chan bool)
561			version := rand.Int63()
562			go func() {
563				resultCh <- r.eval(expected, version)
564			}()
565			select {
566			case ctx := <-r.evalCh:
567				require.Equal(t, version, ctx.version)
568				require.Equal(t, expected, ctx.now)
569				require.True(t, <-resultCh)
570			case <-time.After(5 * time.Second):
571				t.Fatal("No message was received on eval channel")
572			}
573		})
574		t.Run("eval should exit when context is cancelled", func(t *testing.T) {
575			r := newAlertRuleInfo(context.Background())
576			resultCh := make(chan bool)
577			go func() {
578				resultCh <- r.eval(time.Now(), rand.Int63())
579			}()
580			runtime.Gosched()
581			r.stop()
582			select {
583			case result := <-resultCh:
584				require.False(t, result)
585			case <-time.After(5 * time.Second):
586				t.Fatal("No message was received on eval channel")
587			}
588		})
589	})
590	t.Run("when rule evaluation is stopped", func(t *testing.T) {
591		t.Run("eval should do nothing", func(t *testing.T) {
592			r := newAlertRuleInfo(context.Background())
593			r.stop()
594			require.False(t, r.eval(time.Now(), rand.Int63()))
595		})
596		t.Run("stop should do nothing", func(t *testing.T) {
597			r := newAlertRuleInfo(context.Background())
598			r.stop()
599			r.stop()
600		})
601	})
602	t.Run("should be thread-safe", func(t *testing.T) {
603		r := newAlertRuleInfo(context.Background())
604		wg := sync.WaitGroup{}
605		go func() {
606			for {
607				select {
608				case <-r.evalCh:
609					time.Sleep(time.Millisecond)
610				case <-r.ctx.Done():
611					return
612				}
613			}
614		}()
615
616		for i := 0; i < 10; i++ {
617			wg.Add(1)
618			go func() {
619				for i := 0; i < 20; i++ {
620					max := 2
621					if i <= 10 {
622						max = 1
623					}
624					switch rand.Intn(max) + 1 {
625					case 1:
626						r.eval(time.Now(), rand.Int63())
627					case 2:
628						r.stop()
629					}
630				}
631				wg.Done()
632			}()
633		}
634
635		wg.Wait()
636	})
637}
638
639func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) {
640	t.Helper()
641
642	fakeAnnoRepo := NewFakeAnnotationsRepo()
643	annotations.SetRepository(fakeAnnoRepo)
644	mockedClock := clock.NewMock()
645	logger := log.New("ngalert schedule test")
646	if registry == nil {
647		registry = prometheus.NewPedanticRegistry()
648	}
649	m := metrics.NewNGAlert(registry)
650	secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore())
651	decryptFn := secretsService.GetDecryptedValue
652	moa, err := notifier.NewMultiOrgAlertmanager(&setting.Cfg{}, &notifier.FakeConfigStore{}, &notifier.FakeOrgStore{}, &notifier.FakeKVStore{}, decryptFn, nil, log.New("testlogger"))
653	require.NoError(t, err)
654
655	schedCfg := SchedulerCfg{
656		C:                       mockedClock,
657		BaseInterval:            time.Second,
658		MaxAttempts:             1,
659		Evaluator:               eval.Evaluator{Cfg: &setting.Cfg{ExpressionsEnabled: true}, Log: logger},
660		RuleStore:               rs,
661		InstanceStore:           is,
662		AdminConfigStore:        acs,
663		MultiOrgNotifier:        moa,
664		Logger:                  logger,
665		Metrics:                 m.GetSchedulerMetrics(),
666		AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests.
667	}
668	st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is)
669	appUrl := &url.URL{
670		Scheme: "http",
671		Host:   "localhost",
672	}
673	return NewScheduler(schedCfg, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil), appUrl, st), mockedClock
674}
675
676// createTestAlertRule creates a dummy alert definition to be used by the tests.
677func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds int64, orgID int64, evalResult eval.State) *models.AlertRule {
678	t.Helper()
679	records := make([]interface{}, 0, len(dbstore.recordedOps))
680	copy(records, dbstore.recordedOps)
681	defer func() {
682		// erase queries that were made by the testing suite
683		dbstore.recordedOps = records
684	}()
685	d := rand.Intn(1000)
686	ruleGroup := fmt.Sprintf("ruleGroup-%d", d)
687
688	var expression string
689	var forDuration time.Duration
690	switch evalResult {
691	case eval.Normal:
692		expression = `{
693			"datasourceUid": "-100",
694			"type":"math",
695			"expression":"2 + 1 < 1"
696		}`
697	case eval.Pending, eval.Alerting:
698		expression = `{
699			"datasourceUid": "-100",
700			"type":"math",
701			"expression":"2 + 2 > 1"
702		}`
703		if evalResult == eval.Pending {
704			forDuration = 100 * time.Second
705		}
706	case eval.Error:
707		expression = `{
708			"datasourceUid": "-100",
709			"type":"math",
710			"expression":"$A"
711		}`
712	case eval.NoData:
713		// TODO Implement support for NoData
714		require.Fail(t, "Alert rule with desired evaluation result NoData is not supported yet")
715	}
716
717	err := dbstore.UpdateRuleGroup(store.UpdateRuleGroupCmd{
718		OrgID:        orgID,
719		NamespaceUID: "namespace",
720		RuleGroupConfig: apimodels.PostableRuleGroupConfig{
721			Name:     ruleGroup,
722			Interval: model.Duration(time.Duration(intervalSeconds) * time.Second),
723			Rules: []apimodels.PostableExtendedRuleNode{
724				{
725					ApiRuleNode: &apimodels.ApiRuleNode{
726						Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
727						For:         model.Duration(forDuration),
728					},
729					GrafanaManagedAlert: &apimodels.PostableGrafanaRule{
730						Title:     fmt.Sprintf("an alert definition %d", d),
731						Condition: "A",
732						Data: []models.AlertQuery{
733							{
734								DatasourceUID: "-100",
735								Model:         json.RawMessage(expression),
736								RelativeTimeRange: models.RelativeTimeRange{
737									From: models.Duration(5 * time.Hour),
738									To:   models.Duration(3 * time.Hour),
739								},
740								RefID: "A",
741							},
742						},
743					},
744				},
745			},
746		},
747	})
748	require.NoError(t, err)
749
750	q := models.ListRuleGroupAlertRulesQuery{
751		OrgID:        orgID,
752		NamespaceUID: "namespace",
753		RuleGroup:    ruleGroup,
754	}
755	err = dbstore.GetRuleGroupAlertRules(&q)
756	require.NoError(t, err)
757	require.NotEmpty(t, q.Result)
758
759	rule := q.Result[0]
760	t.Logf("alert definition: %v with interval: %d created", rule.GetKey(), rule.IntervalSeconds)
761	return rule
762}
763