1import { 2 CoreApp, 3 DataQueryRequest, 4 DataSourceApi, 5 PanelData, 6 rangeUtil, 7 ScopedVars, 8 QueryRunnerOptions, 9 QueryRunner as QueryRunnerSrv, 10 LoadingState, 11 DataSourceRef, 12} from '@grafana/data'; 13import { getTemplateSrv } from '@grafana/runtime'; 14import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; 15import { cloneDeep } from 'lodash'; 16import { from, Observable, ReplaySubject, Unsubscribable } from 'rxjs'; 17import { first } from 'rxjs/operators'; 18import { getNextRequestId } from './PanelQueryRunner'; 19import { setStructureRevision } from './processing/revision'; 20import { preProcessPanelData, runRequest } from './runRequest'; 21 22export class QueryRunner implements QueryRunnerSrv { 23 private subject: ReplaySubject<PanelData>; 24 private subscription?: Unsubscribable; 25 private lastResult?: PanelData; 26 27 constructor() { 28 this.subject = new ReplaySubject(1); 29 } 30 31 get(): Observable<PanelData> { 32 return this.subject.asObservable(); 33 } 34 35 run(options: QueryRunnerOptions): void { 36 const { 37 queries, 38 timezone, 39 datasource, 40 panelId, 41 app, 42 dashboardId, 43 timeRange, 44 timeInfo, 45 cacheTimeout, 46 maxDataPoints, 47 scopedVars, 48 minInterval, 49 } = options; 50 51 if (this.subscription) { 52 this.subscription.unsubscribe(); 53 } 54 55 const request: DataQueryRequest = { 56 app: app ?? CoreApp.Unknown, 57 requestId: getNextRequestId(), 58 timezone, 59 panelId, 60 dashboardId, 61 range: timeRange, 62 timeInfo, 63 interval: '', 64 intervalMs: 0, 65 targets: cloneDeep(queries), 66 maxDataPoints: maxDataPoints, 67 scopedVars: scopedVars || {}, 68 cacheTimeout, 69 startTime: Date.now(), 70 }; 71 72 // Add deprecated property 73 (request as any).rangeRaw = timeRange.raw; 74 75 from(getDataSource(datasource, request.scopedVars)) 76 .pipe(first()) 77 .subscribe({ 78 next: (ds) => { 79 // Attach the datasource name to each query 80 request.targets = request.targets.map((query) => { 81 if (!query.datasource) { 82 query.datasource = ds.getRef(); 83 } 84 return query; 85 }); 86 87 const lowerIntervalLimit = minInterval 88 ? getTemplateSrv().replace(minInterval, request.scopedVars) 89 : ds.interval; 90 const norm = rangeUtil.calculateInterval(timeRange, maxDataPoints, lowerIntervalLimit); 91 92 // make shallow copy of scoped vars, 93 // and add built in variables interval and interval_ms 94 request.scopedVars = Object.assign({}, request.scopedVars, { 95 __interval: { text: norm.interval, value: norm.interval }, 96 __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs }, 97 }); 98 99 request.interval = norm.interval; 100 request.intervalMs = norm.intervalMs; 101 102 this.subscription = runRequest(ds, request).subscribe({ 103 next: (data) => { 104 const results = preProcessPanelData(data, this.lastResult); 105 this.lastResult = setStructureRevision(results, this.lastResult); 106 // Store preprocessed query results for applying overrides later on in the pipeline 107 this.subject.next(this.lastResult); 108 }, 109 }); 110 }, 111 error: (error) => console.error('PanelQueryRunner Error', error), 112 }); 113 } 114 115 cancel(): void { 116 if (!this.subscription) { 117 return; 118 } 119 120 this.subscription.unsubscribe(); 121 122 // If we have an old result with loading state, send it with done state 123 if (this.lastResult && this.lastResult.state === LoadingState.Loading) { 124 this.subject.next({ 125 ...this.lastResult, 126 state: LoadingState.Done, 127 }); 128 } 129 } 130 131 destroy(): void { 132 // Tell anyone listening that we are done 133 if (this.subject) { 134 this.subject.complete(); 135 } 136 137 if (this.subscription) { 138 this.subscription.unsubscribe(); 139 } 140 } 141} 142 143async function getDataSource( 144 datasource: DataSourceRef | DataSourceApi | null, 145 scopedVars: ScopedVars 146): Promise<DataSourceApi> { 147 if (datasource && (datasource as any).query) { 148 return datasource as DataSourceApi; 149 } 150 return await getDatasourceSrv().get(datasource, scopedVars); 151} 152