1import { Observable, of, OperatorFunction, ReplaySubject, Unsubscribable } from 'rxjs';
2import { catchError, map, share } from 'rxjs/operators';
3import { v4 as uuidv4 } from 'uuid';
4import {
5  dataFrameFromJSON,
6  DataFrameJSON,
7  getDefaultTimeRange,
8  LoadingState,
9  PanelData,
10  rangeUtil,
11  TimeRange,
12  withLoadingIndicator,
13} from '@grafana/data';
14import { FetchResponse, getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
15import { BackendSrv, getBackendSrv } from 'app/core/services/backend_srv';
16import { preProcessPanelData } from 'app/features/query/state/runRequest';
17import { AlertQuery } from 'app/types/unified-alerting-dto';
18import { getTimeRangeForExpression } from '../utils/timeRange';
19import { isExpressionQuery } from 'app/features/expressions/guards';
20import { setStructureRevision } from 'app/features/query/state/processing/revision';
21import { cancelNetworkRequestsOnUnsubscribe } from 'app/features/query/state/processing/canceler';
22
23export interface AlertingQueryResult {
24  frames: DataFrameJSON[];
25}
26
27export interface AlertingQueryResponse {
28  results: Record<string, AlertingQueryResult>;
29}
30export class AlertingQueryRunner {
31  private subject: ReplaySubject<Record<string, PanelData>>;
32  private subscription?: Unsubscribable;
33  private lastResult: Record<string, PanelData>;
34
35  constructor(private backendSrv = getBackendSrv(), private dataSourceSrv = getDataSourceSrv()) {
36    this.subject = new ReplaySubject(1);
37    this.lastResult = {};
38  }
39
40  get(): Observable<Record<string, PanelData>> {
41    return this.subject.asObservable();
42  }
43
44  async run(queries: AlertQuery[]) {
45    if (queries.length === 0) {
46      const empty = initialState(queries, LoadingState.Done);
47      return this.subject.next(empty);
48    }
49
50    // do not execute if one more of the queries are not runnable,
51    // for example not completely configured
52    for (const query of queries) {
53      if (!isExpressionQuery(query.model)) {
54        const ds = await this.dataSourceSrv.get(query.datasourceUid);
55        if (ds.filterQuery && !ds.filterQuery(query.model)) {
56          const empty = initialState(queries, LoadingState.Done);
57          return this.subject.next(empty);
58        }
59      }
60    }
61
62    this.subscription = runRequest(this.backendSrv, queries).subscribe({
63      next: (dataPerQuery) => {
64        const nextResult = applyChange(dataPerQuery, (refId, data) => {
65          const previous = this.lastResult[refId];
66          const preProcessed = preProcessPanelData(data, previous);
67          return setStructureRevision(preProcessed, previous);
68        });
69
70        this.lastResult = nextResult;
71        this.subject.next(this.lastResult);
72      },
73
74      error: (error: Error) => {
75        this.lastResult = mapErrorToPanelData(this.lastResult, error);
76        this.subject.next(this.lastResult);
77      },
78    });
79  }
80
81  cancel() {
82    if (!this.subscription) {
83      return;
84    }
85    this.subscription.unsubscribe();
86
87    let requestIsRunning = false;
88
89    const nextResult = applyChange(this.lastResult, (refId, data) => {
90      if (data.state === LoadingState.Loading) {
91        requestIsRunning = true;
92      }
93
94      return {
95        ...data,
96        state: LoadingState.Done,
97      };
98    });
99
100    if (requestIsRunning) {
101      this.subject.next(nextResult);
102    }
103  }
104
105  destroy() {
106    if (this.subject) {
107      this.subject.complete();
108    }
109
110    this.cancel();
111  }
112}
113
114const runRequest = (backendSrv: BackendSrv, queries: AlertQuery[]): Observable<Record<string, PanelData>> => {
115  const initial = initialState(queries, LoadingState.Loading);
116  const request = {
117    data: { data: queries },
118    url: '/api/v1/eval',
119    method: 'POST',
120    requestId: uuidv4(),
121  };
122
123  return withLoadingIndicator({
124    whileLoading: initial,
125    source: backendSrv.fetch<AlertingQueryResponse>(request).pipe(
126      mapToPanelData(initial),
127      catchError((error) => of(mapErrorToPanelData(initial, error))),
128      cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId),
129      share()
130    ),
131  });
132};
133
134const initialState = (queries: AlertQuery[], state: LoadingState): Record<string, PanelData> => {
135  return queries.reduce((dataByQuery: Record<string, PanelData>, query) => {
136    dataByQuery[query.refId] = {
137      state,
138      series: [],
139      timeRange: getTimeRange(query, queries),
140    };
141
142    return dataByQuery;
143  }, {});
144};
145
146const getTimeRange = (query: AlertQuery, queries: AlertQuery[]): TimeRange => {
147  if (isExpressionQuery(query.model)) {
148    const relative = getTimeRangeForExpression(query.model, queries);
149    return rangeUtil.relativeToTimeRange(relative);
150  }
151
152  if (!query.relativeTimeRange) {
153    console.warn(`Query with refId: ${query.refId} did not have any relative time range, using default.`);
154    return getDefaultTimeRange();
155  }
156
157  return rangeUtil.relativeToTimeRange(query.relativeTimeRange);
158};
159
160const mapToPanelData = (
161  dataByQuery: Record<string, PanelData>
162): OperatorFunction<FetchResponse<AlertingQueryResponse>, Record<string, PanelData>> => {
163  return map((response) => {
164    const { data } = response;
165    const results: Record<string, PanelData> = {};
166
167    for (const [refId, result] of Object.entries(data.results)) {
168      results[refId] = {
169        timeRange: dataByQuery[refId].timeRange,
170        state: LoadingState.Done,
171        series: result.frames.map(dataFrameFromJSON),
172      };
173    }
174
175    return results;
176  });
177};
178
179const mapErrorToPanelData = (lastResult: Record<string, PanelData>, error: Error): Record<string, PanelData> => {
180  const queryError = toDataQueryError(error);
181
182  return applyChange(lastResult, (refId, data) => {
183    return {
184      ...data,
185      state: LoadingState.Error,
186      error: queryError,
187    };
188  });
189};
190
191const applyChange = (
192  initial: Record<string, PanelData>,
193  change: (refId: string, data: PanelData) => PanelData
194): Record<string, PanelData> => {
195  const nextResult: Record<string, PanelData> = {};
196
197  for (const [refId, data] of Object.entries(initial)) {
198    nextResult[refId] = change(refId, data);
199  }
200
201  return nextResult;
202};
203