1import {
2  CoreApp,
3  DataQueryRequest,
4  DataSourceApi,
5  PanelData,
6  rangeUtil,
7  ScopedVars,
8  QueryRunnerOptions,
9  QueryRunner as QueryRunnerSrv,
10  LoadingState,
11  DataSourceRef,
12} from '@grafana/data';
13import { getTemplateSrv } from '@grafana/runtime';
14import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
15import { cloneDeep } from 'lodash';
16import { from, Observable, ReplaySubject, Unsubscribable } from 'rxjs';
17import { first } from 'rxjs/operators';
18import { getNextRequestId } from './PanelQueryRunner';
19import { setStructureRevision } from './processing/revision';
20import { preProcessPanelData, runRequest } from './runRequest';
21
22export class QueryRunner implements QueryRunnerSrv {
23  private subject: ReplaySubject<PanelData>;
24  private subscription?: Unsubscribable;
25  private lastResult?: PanelData;
26
27  constructor() {
28    this.subject = new ReplaySubject(1);
29  }
30
31  get(): Observable<PanelData> {
32    return this.subject.asObservable();
33  }
34
35  run(options: QueryRunnerOptions): void {
36    const {
37      queries,
38      timezone,
39      datasource,
40      panelId,
41      app,
42      dashboardId,
43      timeRange,
44      timeInfo,
45      cacheTimeout,
46      maxDataPoints,
47      scopedVars,
48      minInterval,
49    } = options;
50
51    if (this.subscription) {
52      this.subscription.unsubscribe();
53    }
54
55    const request: DataQueryRequest = {
56      app: app ?? CoreApp.Unknown,
57      requestId: getNextRequestId(),
58      timezone,
59      panelId,
60      dashboardId,
61      range: timeRange,
62      timeInfo,
63      interval: '',
64      intervalMs: 0,
65      targets: cloneDeep(queries),
66      maxDataPoints: maxDataPoints,
67      scopedVars: scopedVars || {},
68      cacheTimeout,
69      startTime: Date.now(),
70    };
71
72    // Add deprecated property
73    (request as any).rangeRaw = timeRange.raw;
74
75    from(getDataSource(datasource, request.scopedVars))
76      .pipe(first())
77      .subscribe({
78        next: (ds) => {
79          // Attach the datasource name to each query
80          request.targets = request.targets.map((query) => {
81            if (!query.datasource) {
82              query.datasource = ds.getRef();
83            }
84            return query;
85          });
86
87          const lowerIntervalLimit = minInterval
88            ? getTemplateSrv().replace(minInterval, request.scopedVars)
89            : ds.interval;
90          const norm = rangeUtil.calculateInterval(timeRange, maxDataPoints, lowerIntervalLimit);
91
92          // make shallow copy of scoped vars,
93          // and add built in variables interval and interval_ms
94          request.scopedVars = Object.assign({}, request.scopedVars, {
95            __interval: { text: norm.interval, value: norm.interval },
96            __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
97          });
98
99          request.interval = norm.interval;
100          request.intervalMs = norm.intervalMs;
101
102          this.subscription = runRequest(ds, request).subscribe({
103            next: (data) => {
104              const results = preProcessPanelData(data, this.lastResult);
105              this.lastResult = setStructureRevision(results, this.lastResult);
106              // Store preprocessed query results for applying overrides later on in the pipeline
107              this.subject.next(this.lastResult);
108            },
109          });
110        },
111        error: (error) => console.error('PanelQueryRunner Error', error),
112      });
113  }
114
115  cancel(): void {
116    if (!this.subscription) {
117      return;
118    }
119
120    this.subscription.unsubscribe();
121
122    // If we have an old result with loading state, send it with done state
123    if (this.lastResult && this.lastResult.state === LoadingState.Loading) {
124      this.subject.next({
125        ...this.lastResult,
126        state: LoadingState.Done,
127      });
128    }
129  }
130
131  destroy(): void {
132    // Tell anyone listening that we are done
133    if (this.subject) {
134      this.subject.complete();
135    }
136
137    if (this.subscription) {
138      this.subscription.unsubscribe();
139    }
140  }
141}
142
143async function getDataSource(
144  datasource: DataSourceRef | DataSourceApi | null,
145  scopedVars: ScopedVars
146): Promise<DataSourceApi> {
147  if (datasource && (datasource as any).query) {
148    return datasource as DataSourceApi;
149  }
150  return await getDatasourceSrv().get(datasource, scopedVars);
151}
152