1import { Observable, of, OperatorFunction, ReplaySubject, Unsubscribable } from 'rxjs'; 2import { catchError, map, share } from 'rxjs/operators'; 3import { v4 as uuidv4 } from 'uuid'; 4import { 5 dataFrameFromJSON, 6 DataFrameJSON, 7 getDefaultTimeRange, 8 LoadingState, 9 PanelData, 10 rangeUtil, 11 TimeRange, 12 withLoadingIndicator, 13} from '@grafana/data'; 14import { FetchResponse, getDataSourceSrv, toDataQueryError } from '@grafana/runtime'; 15import { BackendSrv, getBackendSrv } from 'app/core/services/backend_srv'; 16import { preProcessPanelData } from 'app/features/query/state/runRequest'; 17import { AlertQuery } from 'app/types/unified-alerting-dto'; 18import { getTimeRangeForExpression } from '../utils/timeRange'; 19import { isExpressionQuery } from 'app/features/expressions/guards'; 20import { setStructureRevision } from 'app/features/query/state/processing/revision'; 21import { cancelNetworkRequestsOnUnsubscribe } from 'app/features/query/state/processing/canceler'; 22 23export interface AlertingQueryResult { 24 frames: DataFrameJSON[]; 25} 26 27export interface AlertingQueryResponse { 28 results: Record<string, AlertingQueryResult>; 29} 30export class AlertingQueryRunner { 31 private subject: ReplaySubject<Record<string, PanelData>>; 32 private subscription?: Unsubscribable; 33 private lastResult: Record<string, PanelData>; 34 35 constructor(private backendSrv = getBackendSrv(), private dataSourceSrv = getDataSourceSrv()) { 36 this.subject = new ReplaySubject(1); 37 this.lastResult = {}; 38 } 39 40 get(): Observable<Record<string, PanelData>> { 41 return this.subject.asObservable(); 42 } 43 44 async run(queries: AlertQuery[]) { 45 if (queries.length === 0) { 46 const empty = initialState(queries, LoadingState.Done); 47 return this.subject.next(empty); 48 } 49 50 // do not execute if one more of the queries are not runnable, 51 // for example not completely configured 52 for (const query of queries) { 53 if (!isExpressionQuery(query.model)) { 54 const ds = await this.dataSourceSrv.get(query.datasourceUid); 55 if (ds.filterQuery && !ds.filterQuery(query.model)) { 56 const empty = initialState(queries, LoadingState.Done); 57 return this.subject.next(empty); 58 } 59 } 60 } 61 62 this.subscription = runRequest(this.backendSrv, queries).subscribe({ 63 next: (dataPerQuery) => { 64 const nextResult = applyChange(dataPerQuery, (refId, data) => { 65 const previous = this.lastResult[refId]; 66 const preProcessed = preProcessPanelData(data, previous); 67 return setStructureRevision(preProcessed, previous); 68 }); 69 70 this.lastResult = nextResult; 71 this.subject.next(this.lastResult); 72 }, 73 74 error: (error: Error) => { 75 this.lastResult = mapErrorToPanelData(this.lastResult, error); 76 this.subject.next(this.lastResult); 77 }, 78 }); 79 } 80 81 cancel() { 82 if (!this.subscription) { 83 return; 84 } 85 this.subscription.unsubscribe(); 86 87 let requestIsRunning = false; 88 89 const nextResult = applyChange(this.lastResult, (refId, data) => { 90 if (data.state === LoadingState.Loading) { 91 requestIsRunning = true; 92 } 93 94 return { 95 ...data, 96 state: LoadingState.Done, 97 }; 98 }); 99 100 if (requestIsRunning) { 101 this.subject.next(nextResult); 102 } 103 } 104 105 destroy() { 106 if (this.subject) { 107 this.subject.complete(); 108 } 109 110 this.cancel(); 111 } 112} 113 114const runRequest = (backendSrv: BackendSrv, queries: AlertQuery[]): Observable<Record<string, PanelData>> => { 115 const initial = initialState(queries, LoadingState.Loading); 116 const request = { 117 data: { data: queries }, 118 url: '/api/v1/eval', 119 method: 'POST', 120 requestId: uuidv4(), 121 }; 122 123 return withLoadingIndicator({ 124 whileLoading: initial, 125 source: backendSrv.fetch<AlertingQueryResponse>(request).pipe( 126 mapToPanelData(initial), 127 catchError((error) => of(mapErrorToPanelData(initial, error))), 128 cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId), 129 share() 130 ), 131 }); 132}; 133 134const initialState = (queries: AlertQuery[], state: LoadingState): Record<string, PanelData> => { 135 return queries.reduce((dataByQuery: Record<string, PanelData>, query) => { 136 dataByQuery[query.refId] = { 137 state, 138 series: [], 139 timeRange: getTimeRange(query, queries), 140 }; 141 142 return dataByQuery; 143 }, {}); 144}; 145 146const getTimeRange = (query: AlertQuery, queries: AlertQuery[]): TimeRange => { 147 if (isExpressionQuery(query.model)) { 148 const relative = getTimeRangeForExpression(query.model, queries); 149 return rangeUtil.relativeToTimeRange(relative); 150 } 151 152 if (!query.relativeTimeRange) { 153 console.warn(`Query with refId: ${query.refId} did not have any relative time range, using default.`); 154 return getDefaultTimeRange(); 155 } 156 157 return rangeUtil.relativeToTimeRange(query.relativeTimeRange); 158}; 159 160const mapToPanelData = ( 161 dataByQuery: Record<string, PanelData> 162): OperatorFunction<FetchResponse<AlertingQueryResponse>, Record<string, PanelData>> => { 163 return map((response) => { 164 const { data } = response; 165 const results: Record<string, PanelData> = {}; 166 167 for (const [refId, result] of Object.entries(data.results)) { 168 results[refId] = { 169 timeRange: dataByQuery[refId].timeRange, 170 state: LoadingState.Done, 171 series: result.frames.map(dataFrameFromJSON), 172 }; 173 } 174 175 return results; 176 }); 177}; 178 179const mapErrorToPanelData = (lastResult: Record<string, PanelData>, error: Error): Record<string, PanelData> => { 180 const queryError = toDataQueryError(error); 181 182 return applyChange(lastResult, (refId, data) => { 183 return { 184 ...data, 185 state: LoadingState.Error, 186 error: queryError, 187 }; 188 }); 189}; 190 191const applyChange = ( 192 initial: Record<string, PanelData>, 193 change: (refId: string, data: PanelData) => PanelData 194): Record<string, PanelData> => { 195 const nextResult: Record<string, PanelData> = {}; 196 197 for (const [refId, data] of Object.entries(initial)) { 198 nextResult[refId] = change(refId, data); 199 } 200 201 return nextResult; 202}; 203