1import {
2  DataSourceApi,
3  DataQueryRequest,
4  DataQueryResponse,
5  DataSourceInstanceSettings,
6  DataQuery,
7  DataSourceJsonData,
8  ScopedVars,
9  makeClassES5Compatible,
10  DataFrame,
11  parseLiveChannelAddress,
12  StreamingFrameOptions,
13  StreamingFrameAction,
14  getDataSourceRef,
15  DataSourceRef,
16} from '@grafana/data';
17import { merge, Observable, of } from 'rxjs';
18import { catchError, switchMap } from 'rxjs/operators';
19import { getBackendSrv, getDataSourceSrv, getGrafanaLiveSrv } from '../services';
20import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
21
22/**
23 * @internal
24 */
25export const ExpressionDatasourceRef = Object.freeze({
26  type: '__expr__',
27  uid: '__expr__',
28});
29
30/**
31 * @internal
32 */
33export function isExpressionReference(ref?: DataSourceRef | string | null): boolean {
34  if (!ref) {
35    return false;
36  }
37  const v = (ref as any).type ?? ref;
38  return v === ExpressionDatasourceRef.type || v === '-100'; // -100 was a legacy accident that should be removed
39}
40
41class HealthCheckError extends Error {
42  details: HealthCheckResultDetails;
43
44  constructor(message: string, details: HealthCheckResultDetails) {
45    super(message);
46    this.details = details;
47    this.name = 'HealthCheckError';
48  }
49}
50
51/**
52 * Describes the current health status of a data source plugin.
53 *
54 * @public
55 */
56export enum HealthStatus {
57  Unknown = 'UNKNOWN',
58  OK = 'OK',
59  Error = 'ERROR',
60}
61
62/**
63 * Describes the details in the payload returned when checking the health of a data source
64 * plugin.
65 *
66 * If the 'message' key exists, this will be displayed in the error message in DataSourceSettingsPage
67 * If the 'verboseMessage' key exists, this will be displayed in the expandable details in the error message in DataSourceSettingsPage
68 *
69 * @public
70 */
71export type HealthCheckResultDetails = Record<string, any> | undefined;
72
73/**
74 * Describes the payload returned when checking the health of a data source
75 * plugin.
76 *
77 * @public
78 */
79export interface HealthCheckResult {
80  status: HealthStatus;
81  message: string;
82  details: HealthCheckResultDetails;
83}
84
85/**
86 * Extend this class to implement a data source plugin that is depending on the Grafana
87 * backend API.
88 *
89 * @public
90 */
91class DataSourceWithBackend<
92  TQuery extends DataQuery = DataQuery,
93  TOptions extends DataSourceJsonData = DataSourceJsonData
94> extends DataSourceApi<TQuery, TOptions> {
95  constructor(instanceSettings: DataSourceInstanceSettings<TOptions>) {
96    super(instanceSettings);
97  }
98
99  /**
100   * Ideally final -- any other implementation may not work as expected
101   */
102  query(request: DataQueryRequest<TQuery>): Observable<DataQueryResponse> {
103    const { intervalMs, maxDataPoints, range, requestId } = request;
104    let targets = request.targets;
105
106    if (this.filterQuery) {
107      targets = targets.filter((q) => this.filterQuery!(q));
108    }
109
110    const queries = targets.map((q) => {
111      let datasource = this.getRef();
112      let datasourceId = this.id;
113
114      if (isExpressionReference(q.datasource)) {
115        return {
116          ...q,
117          datasource: ExpressionDatasourceRef,
118        };
119      }
120
121      if (q.datasource) {
122        const ds = getDataSourceSrv().getInstanceSettings(q.datasource, request.scopedVars);
123
124        if (!ds) {
125          throw new Error(`Unknown Datasource: ${JSON.stringify(q.datasource)}`);
126        }
127
128        datasource = ds.rawRef ?? getDataSourceRef(ds);
129        datasourceId = ds.id;
130      }
131
132      return {
133        ...this.applyTemplateVariables(q, request.scopedVars),
134        datasource,
135        datasourceId, // deprecated!
136        intervalMs,
137        maxDataPoints,
138      };
139    });
140
141    // Return early if no queries exist
142    if (!queries.length) {
143      return of({ data: [] });
144    }
145
146    const body: any = { queries };
147
148    if (range) {
149      body.range = range;
150      body.from = range.from.valueOf().toString();
151      body.to = range.to.valueOf().toString();
152    }
153
154    return getBackendSrv()
155      .fetch<BackendDataSourceResponse>({
156        url: '/api/ds/query',
157        method: 'POST',
158        data: body,
159        requestId,
160      })
161      .pipe(
162        switchMap((raw) => {
163          const rsp = toDataQueryResponse(raw, queries as DataQuery[]);
164          // Check if any response should subscribe to a live stream
165          if (rsp.data?.length && rsp.data.find((f: DataFrame) => f.meta?.channel)) {
166            return toStreamingDataResponse(rsp, request, this.streamOptionsProvider);
167          }
168          return of(rsp);
169        }),
170        catchError((err) => {
171          return of(toDataQueryResponse(err));
172        })
173      );
174  }
175
176  /**
177   * Apply template variables for explore
178   */
179  interpolateVariablesInQueries(queries: TQuery[], scopedVars: ScopedVars | {}): TQuery[] {
180    return queries.map((q) => this.applyTemplateVariables(q, scopedVars) as TQuery);
181  }
182
183  /**
184   * Override to apply template variables.  The result is usually also `TQuery`, but sometimes this can
185   * be used to modify the query structure before sending to the backend.
186   *
187   * NOTE: if you do modify the structure or use template variables, alerting queries may not work
188   * as expected
189   *
190   * @virtual
191   */
192  applyTemplateVariables(query: TQuery, scopedVars: ScopedVars): Record<string, any> {
193    return query;
194  }
195
196  /**
197   * Optionally override the streaming behavior
198   */
199  streamOptionsProvider: StreamOptionsProvider<TQuery> = standardStreamOptionsProvider;
200
201  /**
202   * Make a GET request to the datasource resource path
203   */
204  async getResource(path: string, params?: any): Promise<any> {
205    return getBackendSrv().get(`/api/datasources/${this.id}/resources/${path}`, params);
206  }
207
208  /**
209   * Send a POST request to the datasource resource path
210   */
211  async postResource(path: string, body?: any): Promise<any> {
212    return getBackendSrv().post(`/api/datasources/${this.id}/resources/${path}`, { ...body });
213  }
214
215  /**
216   * Run the datasource healthcheck
217   */
218  async callHealthCheck(): Promise<HealthCheckResult> {
219    return getBackendSrv()
220      .request({ method: 'GET', url: `/api/datasources/${this.id}/health`, showErrorAlert: false })
221      .then((v) => {
222        return v as HealthCheckResult;
223      })
224      .catch((err) => {
225        return err.data as HealthCheckResult;
226      });
227  }
228
229  /**
230   * Checks the plugin health
231   * see public/app/features/datasources/state/actions.ts for what needs to be returned here
232   */
233  async testDatasource(): Promise<any> {
234    return this.callHealthCheck().then((res) => {
235      if (res.status === HealthStatus.OK) {
236        return {
237          status: 'success',
238          message: res.message,
239        };
240      }
241
242      throw new HealthCheckError(res.message, res.details);
243    });
244  }
245}
246
247/**
248 * @internal exported for tests
249 */
250export function toStreamingDataResponse<TQuery extends DataQuery = DataQuery>(
251  rsp: DataQueryResponse,
252  req: DataQueryRequest<TQuery>,
253  getter: (req: DataQueryRequest<TQuery>, frame: DataFrame) => StreamingFrameOptions
254): Observable<DataQueryResponse> {
255  const live = getGrafanaLiveSrv();
256  if (!live) {
257    return of(rsp); // add warning?
258  }
259
260  const staticdata: DataFrame[] = [];
261  const streams: Array<Observable<DataQueryResponse>> = [];
262  for (const f of rsp.data) {
263    const addr = parseLiveChannelAddress(f.meta?.channel);
264    if (addr) {
265      const frame = f as DataFrame;
266      streams.push(
267        live.getDataStream({
268          addr,
269          buffer: getter(req, frame),
270          frame,
271        })
272      );
273    } else {
274      staticdata.push(f);
275    }
276  }
277  if (staticdata.length) {
278    streams.push(of({ ...rsp, data: staticdata }));
279  }
280  if (streams.length === 1) {
281    return streams[0]; // avoid merge wrapper
282  }
283  return merge(...streams);
284}
285
286/**
287 * This allows data sources to customize the streaming connection query
288 *
289 * @public
290 */
291export type StreamOptionsProvider<TQuery extends DataQuery = DataQuery> = (
292  request: DataQueryRequest<TQuery>,
293  frame: DataFrame
294) => StreamingFrameOptions;
295
296/**
297 * @public
298 */
299export const standardStreamOptionsProvider: StreamOptionsProvider = (request: DataQueryRequest, frame: DataFrame) => {
300  const buffer: StreamingFrameOptions = {
301    maxLength: request.maxDataPoints ?? 500,
302    action: StreamingFrameAction.Append,
303  };
304
305  // For recent queries, clamp to the current time range
306  if (request.rangeRaw?.to === 'now') {
307    buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
308  }
309  return buffer;
310};
311
312//@ts-ignore
313DataSourceWithBackend = makeClassES5Compatible(DataSourceWithBackend);
314
315export { DataSourceWithBackend };
316