1package state
2
3import (
4	"context"
5	"fmt"
6	"net/url"
7	"strconv"
8	"time"
9
10	"github.com/grafana/grafana/pkg/models"
11	"github.com/grafana/grafana/pkg/services/annotations"
12	"github.com/grafana/grafana/pkg/services/sqlstore"
13
14	"github.com/grafana/grafana/pkg/infra/log"
15
16	"github.com/grafana/grafana/pkg/services/ngalert/eval"
17	"github.com/grafana/grafana/pkg/services/ngalert/metrics"
18	ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
19	"github.com/grafana/grafana/pkg/services/ngalert/store"
20)
21
22var ResendDelay = 30 * time.Second
23
24type Manager struct {
25	log     log.Logger
26	metrics *metrics.State
27
28	cache       *cache
29	quit        chan struct{}
30	ResendDelay time.Duration
31
32	ruleStore     store.RuleStore
33	instanceStore store.InstanceStore
34}
35
36func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL, ruleStore store.RuleStore, instanceStore store.InstanceStore) *Manager {
37	manager := &Manager{
38		cache:         newCache(logger, metrics, externalURL),
39		quit:          make(chan struct{}),
40		ResendDelay:   ResendDelay, // TODO: make this configurable
41		log:           logger,
42		metrics:       metrics,
43		ruleStore:     ruleStore,
44		instanceStore: instanceStore,
45	}
46	go manager.recordMetrics()
47	return manager
48}
49
50func (st *Manager) Close() {
51	st.quit <- struct{}{}
52}
53
54func (st *Manager) Warm() {
55	st.log.Info("warming cache for startup")
56	st.ResetCache()
57
58	orgIds, err := st.instanceStore.FetchOrgIds()
59	if err != nil {
60		st.log.Error("unable to fetch orgIds", "msg", err.Error())
61	}
62
63	var states []*State
64	for _, orgId := range orgIds {
65		// Get Rules
66		ruleCmd := ngModels.ListAlertRulesQuery{
67			OrgID: orgId,
68		}
69		if err := st.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil {
70			st.log.Error("unable to fetch previous state", "msg", err.Error())
71		}
72
73		ruleByUID := make(map[string]*ngModels.AlertRule, len(ruleCmd.Result))
74		for _, rule := range ruleCmd.Result {
75			ruleByUID[rule.UID] = rule
76		}
77
78		// Get Instances
79		cmd := ngModels.ListAlertInstancesQuery{
80			RuleOrgID: orgId,
81		}
82		if err := st.instanceStore.ListAlertInstances(&cmd); err != nil {
83			st.log.Error("unable to fetch previous state", "msg", err.Error())
84		}
85
86		for _, entry := range cmd.Result {
87			ruleForEntry, ok := ruleByUID[entry.RuleUID]
88			if !ok {
89				st.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
90				continue
91			}
92
93			lbs := map[string]string(entry.Labels)
94			cacheId, err := entry.Labels.StringKey()
95			if err != nil {
96				st.log.Error("error getting cacheId for entry", "msg", err.Error())
97			}
98			stateForEntry := &State{
99				AlertRuleUID:       entry.RuleUID,
100				OrgID:              entry.RuleOrgID,
101				CacheId:            cacheId,
102				Labels:             lbs,
103				State:              translateInstanceState(entry.CurrentState),
104				Results:            []Evaluation{},
105				StartsAt:           entry.CurrentStateSince,
106				EndsAt:             entry.CurrentStateEnd,
107				LastEvaluationTime: entry.LastEvalTime,
108				Annotations:        ruleForEntry.Annotations,
109			}
110			states = append(states, stateForEntry)
111		}
112	}
113
114	for _, s := range states {
115		st.set(s)
116	}
117}
118
119func (st *Manager) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State {
120	return st.cache.getOrCreate(alertRule, result)
121}
122
123func (st *Manager) set(entry *State) {
124	st.cache.set(entry)
125}
126
127func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) {
128	return st.cache.get(orgID, alertRuleUID, stateId)
129}
130
131// ResetCache is used to ensure a clean cache on startup.
132func (st *Manager) ResetCache() {
133	st.cache.reset()
134}
135
136// RemoveByRuleUID deletes all entries in the state manager that match the given rule UID.
137func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
138	st.cache.removeByRuleUID(orgID, ruleUID)
139}
140
141func (st *Manager) ProcessEvalResults(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results) []*State {
142	st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
143	var states []*State
144	processedResults := make(map[string]*State, len(results))
145	for _, result := range results {
146		s := st.setNextState(ctx, alertRule, result)
147		states = append(states, s)
148		processedResults[s.CacheId] = s
149	}
150	st.staleResultsHandler(alertRule, processedResults)
151	return states
152}
153
154// Set the current state based on evaluation results
155func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State {
156	currentState := st.getOrCreate(alertRule, result)
157
158	currentState.LastEvaluationTime = result.EvaluatedAt
159	currentState.EvaluationDuration = result.EvaluationDuration
160	currentState.Results = append(currentState.Results, Evaluation{
161		EvaluationTime:   result.EvaluatedAt,
162		EvaluationState:  result.State,
163		EvaluationString: result.EvaluationString,
164		Values:           NewEvaluationValues(result.Values),
165	})
166	currentState.TrimResults(alertRule)
167	oldState := currentState.State
168
169	st.log.Debug("setting alert state", "uid", alertRule.UID)
170	switch result.State {
171	case eval.Normal:
172		currentState.resultNormal(alertRule, result)
173	case eval.Alerting:
174		currentState.resultAlerting(alertRule, result)
175	case eval.Error:
176		currentState.resultError(alertRule, result)
177	case eval.NoData:
178		currentState.resultNoData(alertRule, result)
179	case eval.Pending: // we do not emit results with this state
180	}
181
182	// Set Resolved property so the scheduler knows to send a postable alert
183	// to Alertmanager.
184	currentState.Resolved = oldState == eval.Alerting && currentState.State == eval.Normal
185
186	st.set(currentState)
187	if oldState != currentState.State {
188		go st.createAlertAnnotation(ctx, currentState.State, alertRule, result, oldState)
189	}
190	return currentState
191}
192
193func (st *Manager) GetAll(orgID int64) []*State {
194	return st.cache.getAll(orgID)
195}
196
197func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
198	return st.cache.getStatesForRuleUID(orgID, alertRuleUID)
199}
200
201func (st *Manager) recordMetrics() {
202	// TODO: parameterize?
203	// Setting to a reasonable default scrape interval for Prometheus.
204	dur := time.Duration(15) * time.Second
205	ticker := time.NewTicker(dur)
206	for {
207		select {
208		case <-ticker.C:
209			st.log.Debug("recording state cache metrics", "now", time.Now())
210			st.cache.recordMetrics()
211		case <-st.quit:
212			st.log.Debug("stopping state cache metrics recording", "now", time.Now())
213			ticker.Stop()
214			return
215		}
216	}
217}
218
219func (st *Manager) Put(states []*State) {
220	for _, s := range states {
221		st.set(s)
222	}
223}
224
225func translateInstanceState(state ngModels.InstanceStateType) eval.State {
226	switch {
227	case state == ngModels.InstanceStateFiring:
228		return eval.Alerting
229	case state == ngModels.InstanceStateNormal:
230		return eval.Normal
231	default:
232		return eval.Error
233	}
234}
235
236func (st *Manager) createAlertAnnotation(ctx context.Context, new eval.State, alertRule *ngModels.AlertRule, result eval.Result, oldState eval.State) {
237	st.log.Debug("alert state changed creating annotation", "alertRuleUID", alertRule.UID, "newState", new.String(), "oldState", oldState.String())
238
239	annotationText := fmt.Sprintf("%s {%s} - %s", alertRule.Title, result.Instance.String(), new.String())
240
241	item := &annotations.Item{
242		AlertId:   alertRule.ID,
243		OrgId:     alertRule.OrgID,
244		PrevState: oldState.String(),
245		NewState:  new.String(),
246		Text:      annotationText,
247		Epoch:     result.EvaluatedAt.UnixNano() / int64(time.Millisecond),
248	}
249
250	dashUid, ok := alertRule.Annotations[ngModels.DashboardUIDAnnotation]
251	if ok {
252		panelUid := alertRule.Annotations[ngModels.PanelIDAnnotation]
253
254		panelId, err := strconv.ParseInt(panelUid, 10, 64)
255		if err != nil {
256			st.log.Error("error parsing panelUID for alert annotation", "panelUID", panelUid, "alertRuleUID", alertRule.UID, "error", err.Error())
257			return
258		}
259
260		query := &models.GetDashboardQuery{
261			Uid:   dashUid,
262			OrgId: alertRule.OrgID,
263		}
264
265		err = sqlstore.GetDashboard(ctx, query)
266		if err != nil {
267			st.log.Error("error getting dashboard for alert annotation", "dashboardUID", dashUid, "alertRuleUID", alertRule.UID, "error", err.Error())
268			return
269		}
270
271		item.PanelId = panelId
272		item.DashboardId = query.Result.Id
273	}
274
275	annotationRepo := annotations.GetRepository()
276	if err := annotationRepo.Save(item); err != nil {
277		st.log.Error("error saving alert annotation", "alertRuleUID", alertRule.UID, "error", err.Error())
278		return
279	}
280}
281
282func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map[string]*State) {
283	allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID)
284	for _, s := range allStates {
285		_, ok := states[s.CacheId]
286		if !ok && isItStale(s.LastEvaluationTime, alertRule.IntervalSeconds) {
287			st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
288			st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId)
289			ilbs := ngModels.InstanceLabels(s.Labels)
290			_, labelsHash, err := ilbs.StringAndHash()
291			if err != nil {
292				st.log.Error("unable to get labelsHash", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID)
293			}
294
295			if err = st.instanceStore.DeleteAlertInstance(s.OrgID, s.AlertRuleUID, labelsHash); err != nil {
296				st.log.Error("unable to delete stale instance from database", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
297			}
298		}
299	}
300}
301
302func isItStale(lastEval time.Time, intervalSeconds int64) bool {
303	return lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).Before(time.Now())
304}
305