1import { merge, Observable, of, Subject, throwError, Unsubscribable } from 'rxjs'; 2import { catchError, filter, finalize, mergeMap, take, takeUntil } from 'rxjs/operators'; 3import { 4 CoreApp, 5 DataQuery, 6 DataQueryRequest, 7 DataSourceApi, 8 getDefaultTimeRange, 9 LoadingState, 10 PanelData, 11 ScopedVars, 12} from '@grafana/data'; 13 14import { VariableIdentifier } from '../state/types'; 15import { getVariable } from '../state/selectors'; 16import { QueryVariableModel, VariableRefresh } from '../types'; 17import { StoreState, ThunkDispatch } from '../../../types'; 18import { dispatch, getState } from '../../../store/store'; 19import { getTemplatedRegex } from '../utils'; 20import { v4 as uuidv4 } from 'uuid'; 21import { getTimeSrv } from '../../dashboard/services/TimeSrv'; 22import { QueryRunners } from './queryRunners'; 23import { runRequest } from '../../query/state/runRequest'; 24import { toMetricFindValues, updateOptionsState, validateVariableSelection } from './operators'; 25 26interface UpdateOptionsArgs { 27 identifier: VariableIdentifier; 28 datasource: DataSourceApi; 29 searchFilter?: string; 30} 31 32export interface UpdateOptionsResults { 33 state: LoadingState; 34 identifier: VariableIdentifier; 35 error?: any; 36 cancelled?: boolean; 37} 38 39interface VariableQueryRunnerArgs { 40 dispatch: ThunkDispatch; 41 getState: () => StoreState; 42 getVariable: typeof getVariable; 43 getTemplatedRegex: typeof getTemplatedRegex; 44 getTimeSrv: typeof getTimeSrv; 45 queryRunners: QueryRunners; 46 runRequest: typeof runRequest; 47} 48 49export class VariableQueryRunner { 50 private readonly updateOptionsRequests: Subject<UpdateOptionsArgs>; 51 private readonly updateOptionsResults: Subject<UpdateOptionsResults>; 52 private readonly cancelRequests: Subject<{ identifier: VariableIdentifier }>; 53 private readonly subscription: Unsubscribable; 54 55 constructor( 56 private dependencies: VariableQueryRunnerArgs = { 57 dispatch, 58 getState, 59 getVariable, 60 getTemplatedRegex, 61 getTimeSrv, 62 queryRunners: new QueryRunners(), 63 runRequest, 64 } 65 ) { 66 this.updateOptionsRequests = new Subject<UpdateOptionsArgs>(); 67 this.updateOptionsResults = new Subject<UpdateOptionsResults>(); 68 this.cancelRequests = new Subject<{ identifier: VariableIdentifier }>(); 69 this.onNewRequest = this.onNewRequest.bind(this); 70 this.subscription = this.updateOptionsRequests.subscribe(this.onNewRequest); 71 } 72 73 queueRequest(args: UpdateOptionsArgs): void { 74 this.updateOptionsRequests.next(args); 75 } 76 77 getResponse(identifier: VariableIdentifier): Observable<UpdateOptionsResults> { 78 return this.updateOptionsResults.asObservable().pipe(filter((result) => result.identifier === identifier)); 79 } 80 81 cancelRequest(identifier: VariableIdentifier): void { 82 this.cancelRequests.next({ identifier }); 83 } 84 85 destroy(): void { 86 this.subscription.unsubscribe(); 87 } 88 89 private onNewRequest(args: UpdateOptionsArgs): void { 90 const { datasource, identifier, searchFilter } = args; 91 try { 92 const { 93 dispatch, 94 runRequest, 95 getTemplatedRegex: getTemplatedRegexFunc, 96 getVariable, 97 queryRunners, 98 getTimeSrv, 99 getState, 100 } = this.dependencies; 101 102 const beforeUid = getState().templating.transaction.uid; 103 104 this.updateOptionsResults.next({ identifier, state: LoadingState.Loading }); 105 106 const variable = getVariable<QueryVariableModel>(identifier.id, getState()); 107 const timeSrv = getTimeSrv(); 108 const runnerArgs = { variable, datasource, searchFilter, timeSrv, runRequest }; 109 const runner = queryRunners.getRunnerForDatasource(datasource); 110 const target = runner.getTarget({ datasource, variable }); 111 const request = this.getRequest(variable, args, target); 112 113 runner 114 .runRequest(runnerArgs, request) 115 .pipe( 116 filter(() => { 117 // Lets check if we started another batch during the execution of the observable. If so we just want to abort the rest. 118 const afterUid = getState().templating.transaction.uid; 119 120 return beforeUid === afterUid; 121 }), 122 filter((data) => data.state === LoadingState.Done || data.state === LoadingState.Error), // we only care about done or error for now 123 take(1), // take the first result, using first caused a bug where it in some situations throw an uncaught error because of no results had been received yet 124 mergeMap((data: PanelData) => { 125 if (data.state === LoadingState.Error) { 126 return throwError(() => data.error); 127 } 128 129 return of(data); 130 }), 131 toMetricFindValues(), 132 updateOptionsState({ variable, dispatch, getTemplatedRegexFunc }), 133 validateVariableSelection({ variable, dispatch, searchFilter }), 134 takeUntil( 135 merge(this.updateOptionsRequests, this.cancelRequests).pipe( 136 filter((args) => { 137 let cancelRequest = false; 138 139 if (args.identifier.id === identifier.id) { 140 cancelRequest = true; 141 this.updateOptionsResults.next({ identifier, state: LoadingState.Loading, cancelled: cancelRequest }); 142 } 143 144 return cancelRequest; 145 }) 146 ) 147 ), 148 catchError((error) => { 149 if (error.cancelled) { 150 return of({}); 151 } 152 153 this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error }); 154 return throwError(() => error); 155 }), 156 finalize(() => { 157 this.updateOptionsResults.next({ identifier, state: LoadingState.Done }); 158 }) 159 ) 160 .subscribe(); 161 } catch (error) { 162 this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error }); 163 } 164 } 165 166 private getRequest(variable: QueryVariableModel, args: UpdateOptionsArgs, target: DataQuery) { 167 const { searchFilter } = args; 168 const variableAsVars = { variable: { text: variable.current.text, value: variable.current.value } }; 169 const searchFilterScope = { searchFilter: { text: searchFilter, value: searchFilter } }; 170 const searchFilterAsVars = searchFilter ? searchFilterScope : {}; 171 const scopedVars = { ...searchFilterAsVars, ...variableAsVars } as ScopedVars; 172 const range = 173 variable.refresh === VariableRefresh.onTimeRangeChanged 174 ? this.dependencies.getTimeSrv().timeRange() 175 : getDefaultTimeRange(); 176 177 const request: DataQueryRequest = { 178 app: CoreApp.Dashboard, 179 requestId: uuidv4(), 180 timezone: '', 181 range, 182 interval: '', 183 intervalMs: 0, 184 targets: [target], 185 scopedVars, 186 startTime: Date.now(), 187 }; 188 189 return request; 190 } 191} 192 193let singleton: VariableQueryRunner; 194 195export function setVariableQueryRunner(runner: VariableQueryRunner): void { 196 singleton = runner; 197} 198 199export function getVariableQueryRunner(): VariableQueryRunner { 200 return singleton; 201} 202