1import { merge, Observable, of, Subject, throwError, Unsubscribable } from 'rxjs';
2import { catchError, filter, finalize, mergeMap, take, takeUntil } from 'rxjs/operators';
3import {
4  CoreApp,
5  DataQuery,
6  DataQueryRequest,
7  DataSourceApi,
8  getDefaultTimeRange,
9  LoadingState,
10  PanelData,
11  ScopedVars,
12} from '@grafana/data';
13
14import { VariableIdentifier } from '../state/types';
15import { getVariable } from '../state/selectors';
16import { QueryVariableModel, VariableRefresh } from '../types';
17import { StoreState, ThunkDispatch } from '../../../types';
18import { dispatch, getState } from '../../../store/store';
19import { getTemplatedRegex } from '../utils';
20import { v4 as uuidv4 } from 'uuid';
21import { getTimeSrv } from '../../dashboard/services/TimeSrv';
22import { QueryRunners } from './queryRunners';
23import { runRequest } from '../../query/state/runRequest';
24import { toMetricFindValues, updateOptionsState, validateVariableSelection } from './operators';
25
26interface UpdateOptionsArgs {
27  identifier: VariableIdentifier;
28  datasource: DataSourceApi;
29  searchFilter?: string;
30}
31
32export interface UpdateOptionsResults {
33  state: LoadingState;
34  identifier: VariableIdentifier;
35  error?: any;
36  cancelled?: boolean;
37}
38
39interface VariableQueryRunnerArgs {
40  dispatch: ThunkDispatch;
41  getState: () => StoreState;
42  getVariable: typeof getVariable;
43  getTemplatedRegex: typeof getTemplatedRegex;
44  getTimeSrv: typeof getTimeSrv;
45  queryRunners: QueryRunners;
46  runRequest: typeof runRequest;
47}
48
49export class VariableQueryRunner {
50  private readonly updateOptionsRequests: Subject<UpdateOptionsArgs>;
51  private readonly updateOptionsResults: Subject<UpdateOptionsResults>;
52  private readonly cancelRequests: Subject<{ identifier: VariableIdentifier }>;
53  private readonly subscription: Unsubscribable;
54
55  constructor(
56    private dependencies: VariableQueryRunnerArgs = {
57      dispatch,
58      getState,
59      getVariable,
60      getTemplatedRegex,
61      getTimeSrv,
62      queryRunners: new QueryRunners(),
63      runRequest,
64    }
65  ) {
66    this.updateOptionsRequests = new Subject<UpdateOptionsArgs>();
67    this.updateOptionsResults = new Subject<UpdateOptionsResults>();
68    this.cancelRequests = new Subject<{ identifier: VariableIdentifier }>();
69    this.onNewRequest = this.onNewRequest.bind(this);
70    this.subscription = this.updateOptionsRequests.subscribe(this.onNewRequest);
71  }
72
73  queueRequest(args: UpdateOptionsArgs): void {
74    this.updateOptionsRequests.next(args);
75  }
76
77  getResponse(identifier: VariableIdentifier): Observable<UpdateOptionsResults> {
78    return this.updateOptionsResults.asObservable().pipe(filter((result) => result.identifier === identifier));
79  }
80
81  cancelRequest(identifier: VariableIdentifier): void {
82    this.cancelRequests.next({ identifier });
83  }
84
85  destroy(): void {
86    this.subscription.unsubscribe();
87  }
88
89  private onNewRequest(args: UpdateOptionsArgs): void {
90    const { datasource, identifier, searchFilter } = args;
91    try {
92      const {
93        dispatch,
94        runRequest,
95        getTemplatedRegex: getTemplatedRegexFunc,
96        getVariable,
97        queryRunners,
98        getTimeSrv,
99        getState,
100      } = this.dependencies;
101
102      const beforeUid = getState().templating.transaction.uid;
103
104      this.updateOptionsResults.next({ identifier, state: LoadingState.Loading });
105
106      const variable = getVariable<QueryVariableModel>(identifier.id, getState());
107      const timeSrv = getTimeSrv();
108      const runnerArgs = { variable, datasource, searchFilter, timeSrv, runRequest };
109      const runner = queryRunners.getRunnerForDatasource(datasource);
110      const target = runner.getTarget({ datasource, variable });
111      const request = this.getRequest(variable, args, target);
112
113      runner
114        .runRequest(runnerArgs, request)
115        .pipe(
116          filter(() => {
117            // Lets check if we started another batch during the execution of the observable. If so we just want to abort the rest.
118            const afterUid = getState().templating.transaction.uid;
119
120            return beforeUid === afterUid;
121          }),
122          filter((data) => data.state === LoadingState.Done || data.state === LoadingState.Error), // we only care about done or error for now
123          take(1), // take the first result, using first caused a bug where it in some situations throw an uncaught error because of no results had been received yet
124          mergeMap((data: PanelData) => {
125            if (data.state === LoadingState.Error) {
126              return throwError(() => data.error);
127            }
128
129            return of(data);
130          }),
131          toMetricFindValues(),
132          updateOptionsState({ variable, dispatch, getTemplatedRegexFunc }),
133          validateVariableSelection({ variable, dispatch, searchFilter }),
134          takeUntil(
135            merge(this.updateOptionsRequests, this.cancelRequests).pipe(
136              filter((args) => {
137                let cancelRequest = false;
138
139                if (args.identifier.id === identifier.id) {
140                  cancelRequest = true;
141                  this.updateOptionsResults.next({ identifier, state: LoadingState.Loading, cancelled: cancelRequest });
142                }
143
144                return cancelRequest;
145              })
146            )
147          ),
148          catchError((error) => {
149            if (error.cancelled) {
150              return of({});
151            }
152
153            this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
154            return throwError(() => error);
155          }),
156          finalize(() => {
157            this.updateOptionsResults.next({ identifier, state: LoadingState.Done });
158          })
159        )
160        .subscribe();
161    } catch (error) {
162      this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
163    }
164  }
165
166  private getRequest(variable: QueryVariableModel, args: UpdateOptionsArgs, target: DataQuery) {
167    const { searchFilter } = args;
168    const variableAsVars = { variable: { text: variable.current.text, value: variable.current.value } };
169    const searchFilterScope = { searchFilter: { text: searchFilter, value: searchFilter } };
170    const searchFilterAsVars = searchFilter ? searchFilterScope : {};
171    const scopedVars = { ...searchFilterAsVars, ...variableAsVars } as ScopedVars;
172    const range =
173      variable.refresh === VariableRefresh.onTimeRangeChanged
174        ? this.dependencies.getTimeSrv().timeRange()
175        : getDefaultTimeRange();
176
177    const request: DataQueryRequest = {
178      app: CoreApp.Dashboard,
179      requestId: uuidv4(),
180      timezone: '',
181      range,
182      interval: '',
183      intervalMs: 0,
184      targets: [target],
185      scopedVars,
186      startTime: Date.now(),
187    };
188
189    return request;
190  }
191}
192
193let singleton: VariableQueryRunner;
194
195export function setVariableQueryRunner(runner: VariableQueryRunner): void {
196  singleton = runner;
197}
198
199export function getVariableQueryRunner(): VariableQueryRunner {
200  return singleton;
201}
202