1package state 2 3import ( 4 "context" 5 "fmt" 6 "net/url" 7 "strconv" 8 "time" 9 10 "github.com/grafana/grafana/pkg/models" 11 "github.com/grafana/grafana/pkg/services/annotations" 12 "github.com/grafana/grafana/pkg/services/sqlstore" 13 14 "github.com/grafana/grafana/pkg/infra/log" 15 16 "github.com/grafana/grafana/pkg/services/ngalert/eval" 17 "github.com/grafana/grafana/pkg/services/ngalert/metrics" 18 ngModels "github.com/grafana/grafana/pkg/services/ngalert/models" 19 "github.com/grafana/grafana/pkg/services/ngalert/store" 20) 21 22var ResendDelay = 30 * time.Second 23 24type Manager struct { 25 log log.Logger 26 metrics *metrics.State 27 28 cache *cache 29 quit chan struct{} 30 ResendDelay time.Duration 31 32 ruleStore store.RuleStore 33 instanceStore store.InstanceStore 34} 35 36func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL, ruleStore store.RuleStore, instanceStore store.InstanceStore) *Manager { 37 manager := &Manager{ 38 cache: newCache(logger, metrics, externalURL), 39 quit: make(chan struct{}), 40 ResendDelay: ResendDelay, // TODO: make this configurable 41 log: logger, 42 metrics: metrics, 43 ruleStore: ruleStore, 44 instanceStore: instanceStore, 45 } 46 go manager.recordMetrics() 47 return manager 48} 49 50func (st *Manager) Close() { 51 st.quit <- struct{}{} 52} 53 54func (st *Manager) Warm() { 55 st.log.Info("warming cache for startup") 56 st.ResetCache() 57 58 orgIds, err := st.instanceStore.FetchOrgIds() 59 if err != nil { 60 st.log.Error("unable to fetch orgIds", "msg", err.Error()) 61 } 62 63 var states []*State 64 for _, orgId := range orgIds { 65 // Get Rules 66 ruleCmd := ngModels.ListAlertRulesQuery{ 67 OrgID: orgId, 68 } 69 if err := st.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil { 70 st.log.Error("unable to fetch previous state", "msg", err.Error()) 71 } 72 73 ruleByUID := make(map[string]*ngModels.AlertRule, len(ruleCmd.Result)) 74 for _, rule := range ruleCmd.Result { 75 ruleByUID[rule.UID] = rule 76 } 77 78 // Get Instances 79 cmd := ngModels.ListAlertInstancesQuery{ 80 RuleOrgID: orgId, 81 } 82 if err := st.instanceStore.ListAlertInstances(&cmd); err != nil { 83 st.log.Error("unable to fetch previous state", "msg", err.Error()) 84 } 85 86 for _, entry := range cmd.Result { 87 ruleForEntry, ok := ruleByUID[entry.RuleUID] 88 if !ok { 89 st.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID) 90 continue 91 } 92 93 lbs := map[string]string(entry.Labels) 94 cacheId, err := entry.Labels.StringKey() 95 if err != nil { 96 st.log.Error("error getting cacheId for entry", "msg", err.Error()) 97 } 98 stateForEntry := &State{ 99 AlertRuleUID: entry.RuleUID, 100 OrgID: entry.RuleOrgID, 101 CacheId: cacheId, 102 Labels: lbs, 103 State: translateInstanceState(entry.CurrentState), 104 Results: []Evaluation{}, 105 StartsAt: entry.CurrentStateSince, 106 EndsAt: entry.CurrentStateEnd, 107 LastEvaluationTime: entry.LastEvalTime, 108 Annotations: ruleForEntry.Annotations, 109 } 110 states = append(states, stateForEntry) 111 } 112 } 113 114 for _, s := range states { 115 st.set(s) 116 } 117} 118 119func (st *Manager) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State { 120 return st.cache.getOrCreate(alertRule, result) 121} 122 123func (st *Manager) set(entry *State) { 124 st.cache.set(entry) 125} 126 127func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) { 128 return st.cache.get(orgID, alertRuleUID, stateId) 129} 130 131// ResetCache is used to ensure a clean cache on startup. 132func (st *Manager) ResetCache() { 133 st.cache.reset() 134} 135 136// RemoveByRuleUID deletes all entries in the state manager that match the given rule UID. 137func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) { 138 st.cache.removeByRuleUID(orgID, ruleUID) 139} 140 141func (st *Manager) ProcessEvalResults(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results) []*State { 142 st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results)) 143 var states []*State 144 processedResults := make(map[string]*State, len(results)) 145 for _, result := range results { 146 s := st.setNextState(ctx, alertRule, result) 147 states = append(states, s) 148 processedResults[s.CacheId] = s 149 } 150 st.staleResultsHandler(alertRule, processedResults) 151 return states 152} 153 154// Set the current state based on evaluation results 155func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State { 156 currentState := st.getOrCreate(alertRule, result) 157 158 currentState.LastEvaluationTime = result.EvaluatedAt 159 currentState.EvaluationDuration = result.EvaluationDuration 160 currentState.Results = append(currentState.Results, Evaluation{ 161 EvaluationTime: result.EvaluatedAt, 162 EvaluationState: result.State, 163 EvaluationString: result.EvaluationString, 164 Values: NewEvaluationValues(result.Values), 165 }) 166 currentState.TrimResults(alertRule) 167 oldState := currentState.State 168 169 st.log.Debug("setting alert state", "uid", alertRule.UID) 170 switch result.State { 171 case eval.Normal: 172 currentState.resultNormal(alertRule, result) 173 case eval.Alerting: 174 currentState.resultAlerting(alertRule, result) 175 case eval.Error: 176 currentState.resultError(alertRule, result) 177 case eval.NoData: 178 currentState.resultNoData(alertRule, result) 179 case eval.Pending: // we do not emit results with this state 180 } 181 182 // Set Resolved property so the scheduler knows to send a postable alert 183 // to Alertmanager. 184 currentState.Resolved = oldState == eval.Alerting && currentState.State == eval.Normal 185 186 st.set(currentState) 187 if oldState != currentState.State { 188 go st.createAlertAnnotation(ctx, currentState.State, alertRule, result, oldState) 189 } 190 return currentState 191} 192 193func (st *Manager) GetAll(orgID int64) []*State { 194 return st.cache.getAll(orgID) 195} 196 197func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State { 198 return st.cache.getStatesForRuleUID(orgID, alertRuleUID) 199} 200 201func (st *Manager) recordMetrics() { 202 // TODO: parameterize? 203 // Setting to a reasonable default scrape interval for Prometheus. 204 dur := time.Duration(15) * time.Second 205 ticker := time.NewTicker(dur) 206 for { 207 select { 208 case <-ticker.C: 209 st.log.Debug("recording state cache metrics", "now", time.Now()) 210 st.cache.recordMetrics() 211 case <-st.quit: 212 st.log.Debug("stopping state cache metrics recording", "now", time.Now()) 213 ticker.Stop() 214 return 215 } 216 } 217} 218 219func (st *Manager) Put(states []*State) { 220 for _, s := range states { 221 st.set(s) 222 } 223} 224 225func translateInstanceState(state ngModels.InstanceStateType) eval.State { 226 switch { 227 case state == ngModels.InstanceStateFiring: 228 return eval.Alerting 229 case state == ngModels.InstanceStateNormal: 230 return eval.Normal 231 default: 232 return eval.Error 233 } 234} 235 236func (st *Manager) createAlertAnnotation(ctx context.Context, new eval.State, alertRule *ngModels.AlertRule, result eval.Result, oldState eval.State) { 237 st.log.Debug("alert state changed creating annotation", "alertRuleUID", alertRule.UID, "newState", new.String(), "oldState", oldState.String()) 238 239 annotationText := fmt.Sprintf("%s {%s} - %s", alertRule.Title, result.Instance.String(), new.String()) 240 241 item := &annotations.Item{ 242 AlertId: alertRule.ID, 243 OrgId: alertRule.OrgID, 244 PrevState: oldState.String(), 245 NewState: new.String(), 246 Text: annotationText, 247 Epoch: result.EvaluatedAt.UnixNano() / int64(time.Millisecond), 248 } 249 250 dashUid, ok := alertRule.Annotations[ngModels.DashboardUIDAnnotation] 251 if ok { 252 panelUid := alertRule.Annotations[ngModels.PanelIDAnnotation] 253 254 panelId, err := strconv.ParseInt(panelUid, 10, 64) 255 if err != nil { 256 st.log.Error("error parsing panelUID for alert annotation", "panelUID", panelUid, "alertRuleUID", alertRule.UID, "error", err.Error()) 257 return 258 } 259 260 query := &models.GetDashboardQuery{ 261 Uid: dashUid, 262 OrgId: alertRule.OrgID, 263 } 264 265 err = sqlstore.GetDashboard(ctx, query) 266 if err != nil { 267 st.log.Error("error getting dashboard for alert annotation", "dashboardUID", dashUid, "alertRuleUID", alertRule.UID, "error", err.Error()) 268 return 269 } 270 271 item.PanelId = panelId 272 item.DashboardId = query.Result.Id 273 } 274 275 annotationRepo := annotations.GetRepository() 276 if err := annotationRepo.Save(item); err != nil { 277 st.log.Error("error saving alert annotation", "alertRuleUID", alertRule.UID, "error", err.Error()) 278 return 279 } 280} 281 282func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map[string]*State) { 283 allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID) 284 for _, s := range allStates { 285 _, ok := states[s.CacheId] 286 if !ok && isItStale(s.LastEvaluationTime, alertRule.IntervalSeconds) { 287 st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) 288 st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId) 289 ilbs := ngModels.InstanceLabels(s.Labels) 290 _, labelsHash, err := ilbs.StringAndHash() 291 if err != nil { 292 st.log.Error("unable to get labelsHash", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID) 293 } 294 295 if err = st.instanceStore.DeleteAlertInstance(s.OrgID, s.AlertRuleUID, labelsHash); err != nil { 296 st.log.Error("unable to delete stale instance from database", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) 297 } 298 } 299 } 300} 301 302func isItStale(lastEval time.Time, intervalSeconds int64) bool { 303 return lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).Before(time.Now()) 304} 305