1import { 2 DataSourceApi, 3 DataQueryRequest, 4 DataQueryResponse, 5 DataSourceInstanceSettings, 6 DataQuery, 7 DataSourceJsonData, 8 ScopedVars, 9 makeClassES5Compatible, 10 DataFrame, 11 parseLiveChannelAddress, 12 StreamingFrameOptions, 13 StreamingFrameAction, 14 getDataSourceRef, 15 DataSourceRef, 16} from '@grafana/data'; 17import { merge, Observable, of } from 'rxjs'; 18import { catchError, switchMap } from 'rxjs/operators'; 19import { getBackendSrv, getDataSourceSrv, getGrafanaLiveSrv } from '../services'; 20import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse'; 21 22/** 23 * @internal 24 */ 25export const ExpressionDatasourceRef = Object.freeze({ 26 type: '__expr__', 27 uid: '__expr__', 28}); 29 30/** 31 * @internal 32 */ 33export function isExpressionReference(ref?: DataSourceRef | string | null): boolean { 34 if (!ref) { 35 return false; 36 } 37 const v = (ref as any).type ?? ref; 38 return v === ExpressionDatasourceRef.type || v === '-100'; // -100 was a legacy accident that should be removed 39} 40 41class HealthCheckError extends Error { 42 details: HealthCheckResultDetails; 43 44 constructor(message: string, details: HealthCheckResultDetails) { 45 super(message); 46 this.details = details; 47 this.name = 'HealthCheckError'; 48 } 49} 50 51/** 52 * Describes the current health status of a data source plugin. 53 * 54 * @public 55 */ 56export enum HealthStatus { 57 Unknown = 'UNKNOWN', 58 OK = 'OK', 59 Error = 'ERROR', 60} 61 62/** 63 * Describes the details in the payload returned when checking the health of a data source 64 * plugin. 65 * 66 * If the 'message' key exists, this will be displayed in the error message in DataSourceSettingsPage 67 * If the 'verboseMessage' key exists, this will be displayed in the expandable details in the error message in DataSourceSettingsPage 68 * 69 * @public 70 */ 71export type HealthCheckResultDetails = Record<string, any> | undefined; 72 73/** 74 * Describes the payload returned when checking the health of a data source 75 * plugin. 76 * 77 * @public 78 */ 79export interface HealthCheckResult { 80 status: HealthStatus; 81 message: string; 82 details: HealthCheckResultDetails; 83} 84 85/** 86 * Extend this class to implement a data source plugin that is depending on the Grafana 87 * backend API. 88 * 89 * @public 90 */ 91class DataSourceWithBackend< 92 TQuery extends DataQuery = DataQuery, 93 TOptions extends DataSourceJsonData = DataSourceJsonData 94> extends DataSourceApi<TQuery, TOptions> { 95 constructor(instanceSettings: DataSourceInstanceSettings<TOptions>) { 96 super(instanceSettings); 97 } 98 99 /** 100 * Ideally final -- any other implementation may not work as expected 101 */ 102 query(request: DataQueryRequest<TQuery>): Observable<DataQueryResponse> { 103 const { intervalMs, maxDataPoints, range, requestId } = request; 104 let targets = request.targets; 105 106 if (this.filterQuery) { 107 targets = targets.filter((q) => this.filterQuery!(q)); 108 } 109 110 const queries = targets.map((q) => { 111 let datasource = this.getRef(); 112 let datasourceId = this.id; 113 114 if (isExpressionReference(q.datasource)) { 115 return { 116 ...q, 117 datasource: ExpressionDatasourceRef, 118 }; 119 } 120 121 if (q.datasource) { 122 const ds = getDataSourceSrv().getInstanceSettings(q.datasource, request.scopedVars); 123 124 if (!ds) { 125 throw new Error(`Unknown Datasource: ${JSON.stringify(q.datasource)}`); 126 } 127 128 datasource = ds.rawRef ?? getDataSourceRef(ds); 129 datasourceId = ds.id; 130 } 131 132 return { 133 ...this.applyTemplateVariables(q, request.scopedVars), 134 datasource, 135 datasourceId, // deprecated! 136 intervalMs, 137 maxDataPoints, 138 }; 139 }); 140 141 // Return early if no queries exist 142 if (!queries.length) { 143 return of({ data: [] }); 144 } 145 146 const body: any = { queries }; 147 148 if (range) { 149 body.range = range; 150 body.from = range.from.valueOf().toString(); 151 body.to = range.to.valueOf().toString(); 152 } 153 154 return getBackendSrv() 155 .fetch<BackendDataSourceResponse>({ 156 url: '/api/ds/query', 157 method: 'POST', 158 data: body, 159 requestId, 160 }) 161 .pipe( 162 switchMap((raw) => { 163 const rsp = toDataQueryResponse(raw, queries as DataQuery[]); 164 // Check if any response should subscribe to a live stream 165 if (rsp.data?.length && rsp.data.find((f: DataFrame) => f.meta?.channel)) { 166 return toStreamingDataResponse(rsp, request, this.streamOptionsProvider); 167 } 168 return of(rsp); 169 }), 170 catchError((err) => { 171 return of(toDataQueryResponse(err)); 172 }) 173 ); 174 } 175 176 /** 177 * Apply template variables for explore 178 */ 179 interpolateVariablesInQueries(queries: TQuery[], scopedVars: ScopedVars | {}): TQuery[] { 180 return queries.map((q) => this.applyTemplateVariables(q, scopedVars) as TQuery); 181 } 182 183 /** 184 * Override to apply template variables. The result is usually also `TQuery`, but sometimes this can 185 * be used to modify the query structure before sending to the backend. 186 * 187 * NOTE: if you do modify the structure or use template variables, alerting queries may not work 188 * as expected 189 * 190 * @virtual 191 */ 192 applyTemplateVariables(query: TQuery, scopedVars: ScopedVars): Record<string, any> { 193 return query; 194 } 195 196 /** 197 * Optionally override the streaming behavior 198 */ 199 streamOptionsProvider: StreamOptionsProvider<TQuery> = standardStreamOptionsProvider; 200 201 /** 202 * Make a GET request to the datasource resource path 203 */ 204 async getResource(path: string, params?: any): Promise<any> { 205 return getBackendSrv().get(`/api/datasources/${this.id}/resources/${path}`, params); 206 } 207 208 /** 209 * Send a POST request to the datasource resource path 210 */ 211 async postResource(path: string, body?: any): Promise<any> { 212 return getBackendSrv().post(`/api/datasources/${this.id}/resources/${path}`, { ...body }); 213 } 214 215 /** 216 * Run the datasource healthcheck 217 */ 218 async callHealthCheck(): Promise<HealthCheckResult> { 219 return getBackendSrv() 220 .request({ method: 'GET', url: `/api/datasources/${this.id}/health`, showErrorAlert: false }) 221 .then((v) => { 222 return v as HealthCheckResult; 223 }) 224 .catch((err) => { 225 return err.data as HealthCheckResult; 226 }); 227 } 228 229 /** 230 * Checks the plugin health 231 * see public/app/features/datasources/state/actions.ts for what needs to be returned here 232 */ 233 async testDatasource(): Promise<any> { 234 return this.callHealthCheck().then((res) => { 235 if (res.status === HealthStatus.OK) { 236 return { 237 status: 'success', 238 message: res.message, 239 }; 240 } 241 242 throw new HealthCheckError(res.message, res.details); 243 }); 244 } 245} 246 247/** 248 * @internal exported for tests 249 */ 250export function toStreamingDataResponse<TQuery extends DataQuery = DataQuery>( 251 rsp: DataQueryResponse, 252 req: DataQueryRequest<TQuery>, 253 getter: (req: DataQueryRequest<TQuery>, frame: DataFrame) => StreamingFrameOptions 254): Observable<DataQueryResponse> { 255 const live = getGrafanaLiveSrv(); 256 if (!live) { 257 return of(rsp); // add warning? 258 } 259 260 const staticdata: DataFrame[] = []; 261 const streams: Array<Observable<DataQueryResponse>> = []; 262 for (const f of rsp.data) { 263 const addr = parseLiveChannelAddress(f.meta?.channel); 264 if (addr) { 265 const frame = f as DataFrame; 266 streams.push( 267 live.getDataStream({ 268 addr, 269 buffer: getter(req, frame), 270 frame, 271 }) 272 ); 273 } else { 274 staticdata.push(f); 275 } 276 } 277 if (staticdata.length) { 278 streams.push(of({ ...rsp, data: staticdata })); 279 } 280 if (streams.length === 1) { 281 return streams[0]; // avoid merge wrapper 282 } 283 return merge(...streams); 284} 285 286/** 287 * This allows data sources to customize the streaming connection query 288 * 289 * @public 290 */ 291export type StreamOptionsProvider<TQuery extends DataQuery = DataQuery> = ( 292 request: DataQueryRequest<TQuery>, 293 frame: DataFrame 294) => StreamingFrameOptions; 295 296/** 297 * @public 298 */ 299export const standardStreamOptionsProvider: StreamOptionsProvider = (request: DataQueryRequest, frame: DataFrame) => { 300 const buffer: StreamingFrameOptions = { 301 maxLength: request.maxDataPoints ?? 500, 302 action: StreamingFrameAction.Append, 303 }; 304 305 // For recent queries, clamp to the current time range 306 if (request.rangeRaw?.to === 'now') { 307 buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf(); 308 } 309 return buffer; 310}; 311 312//@ts-ignore 313DataSourceWithBackend = makeClassES5Compatible(DataSourceWithBackend); 314 315export { DataSourceWithBackend }; 316