1import { 2 DataQuery, 3 DataQueryRequest, 4 DataQueryResponse, 5 DataSourceApi, 6 DataSourceInstanceSettings, 7 LoadingState, 8} from '@grafana/data'; 9import { getDataSourceSrv, toDataQueryError } from '@grafana/runtime'; 10import { cloneDeep, groupBy } from 'lodash'; 11import { forkJoin, from, Observable, of, OperatorFunction } from 'rxjs'; 12import { catchError, map, mergeAll, mergeMap, reduce, toArray } from 'rxjs/operators'; 13 14export const MIXED_DATASOURCE_NAME = '-- Mixed --'; 15 16export interface BatchedQueries { 17 datasource: Promise<DataSourceApi>; 18 targets: DataQuery[]; 19} 20 21export class MixedDatasource extends DataSourceApi<DataQuery> { 22 constructor(instanceSettings: DataSourceInstanceSettings) { 23 super(instanceSettings); 24 } 25 26 query(request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> { 27 // Remove any invalid queries 28 const queries = request.targets.filter((t) => { 29 return t.datasource?.uid !== MIXED_DATASOURCE_NAME; 30 }); 31 32 if (!queries.length) { 33 return of({ data: [] } as DataQueryResponse); // nothing 34 } 35 36 // Build groups of queries to run in parallel 37 const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid'); 38 const mixed: BatchedQueries[] = []; 39 40 for (const key in sets) { 41 const targets = sets[key]; 42 43 mixed.push({ 44 datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars), 45 targets, 46 }); 47 } 48 49 // Missing UIDs? 50 if (!mixed.length) { 51 return of({ data: [] } as DataQueryResponse); // nothing 52 } 53 54 return this.batchQueries(mixed, request); 55 } 56 57 batchQueries(mixed: BatchedQueries[], request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> { 58 const runningQueries = mixed.filter(this.isQueryable).map((query, i) => 59 from(query.datasource).pipe( 60 mergeMap((api: DataSourceApi) => { 61 const dsRequest = cloneDeep(request); 62 dsRequest.requestId = `mixed-${i}-${dsRequest.requestId || ''}`; 63 dsRequest.targets = query.targets; 64 65 return from(api.query(dsRequest)).pipe( 66 map((response) => { 67 return { 68 ...response, 69 data: response.data || [], 70 state: LoadingState.Loading, 71 key: `mixed-${i}-${response.key || ''}`, 72 } as DataQueryResponse; 73 }), 74 toArray(), 75 catchError((err) => { 76 err = toDataQueryError(err); 77 err.message = `${api.name}: ${err.message}`; 78 79 return of<DataQueryResponse[]>([ 80 { 81 data: [], 82 state: LoadingState.Error, 83 error: err, 84 key: `mixed-${i}-${dsRequest.requestId || ''}`, 85 }, 86 ]); 87 }) 88 ); 89 }) 90 ) 91 ); 92 93 return forkJoin(runningQueries).pipe(flattenResponses(), map(this.finalizeResponses), mergeAll()); 94 } 95 96 testDatasource() { 97 return Promise.resolve({}); 98 } 99 100 private isQueryable(query: BatchedQueries): boolean { 101 return query && Array.isArray(query.targets) && query.targets.length > 0; 102 } 103 104 private finalizeResponses(responses: DataQueryResponse[]): DataQueryResponse[] { 105 const { length } = responses; 106 107 if (length === 0) { 108 return responses; 109 } 110 111 const error = responses.find((response) => response.state === LoadingState.Error); 112 if (error) { 113 responses.push(error); // adds the first found error entry so error shows up in the panel 114 } else { 115 responses[length - 1].state = LoadingState.Done; 116 } 117 118 return responses; 119 } 120} 121 122function flattenResponses(): OperatorFunction<DataQueryResponse[][], DataQueryResponse[]> { 123 return reduce((all: DataQueryResponse[], current) => { 124 return current.reduce((innerAll, innerCurrent) => { 125 innerAll.push.apply(innerAll, innerCurrent); 126 return innerAll; 127 }, all); 128 }, []); 129} 130