1import {
2  DataQuery,
3  DataQueryRequest,
4  DataQueryResponse,
5  DataSourceApi,
6  DataSourceInstanceSettings,
7  LoadingState,
8} from '@grafana/data';
9import { getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
10import { cloneDeep, groupBy } from 'lodash';
11import { forkJoin, from, Observable, of, OperatorFunction } from 'rxjs';
12import { catchError, map, mergeAll, mergeMap, reduce, toArray } from 'rxjs/operators';
13
14export const MIXED_DATASOURCE_NAME = '-- Mixed --';
15
16export interface BatchedQueries {
17  datasource: Promise<DataSourceApi>;
18  targets: DataQuery[];
19}
20
21export class MixedDatasource extends DataSourceApi<DataQuery> {
22  constructor(instanceSettings: DataSourceInstanceSettings) {
23    super(instanceSettings);
24  }
25
26  query(request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
27    // Remove any invalid queries
28    const queries = request.targets.filter((t) => {
29      return t.datasource?.uid !== MIXED_DATASOURCE_NAME;
30    });
31
32    if (!queries.length) {
33      return of({ data: [] } as DataQueryResponse); // nothing
34    }
35
36    // Build groups of queries to run in parallel
37    const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid');
38    const mixed: BatchedQueries[] = [];
39
40    for (const key in sets) {
41      const targets = sets[key];
42
43      mixed.push({
44        datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars),
45        targets,
46      });
47    }
48
49    // Missing UIDs?
50    if (!mixed.length) {
51      return of({ data: [] } as DataQueryResponse); // nothing
52    }
53
54    return this.batchQueries(mixed, request);
55  }
56
57  batchQueries(mixed: BatchedQueries[], request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
58    const runningQueries = mixed.filter(this.isQueryable).map((query, i) =>
59      from(query.datasource).pipe(
60        mergeMap((api: DataSourceApi) => {
61          const dsRequest = cloneDeep(request);
62          dsRequest.requestId = `mixed-${i}-${dsRequest.requestId || ''}`;
63          dsRequest.targets = query.targets;
64
65          return from(api.query(dsRequest)).pipe(
66            map((response) => {
67              return {
68                ...response,
69                data: response.data || [],
70                state: LoadingState.Loading,
71                key: `mixed-${i}-${response.key || ''}`,
72              } as DataQueryResponse;
73            }),
74            toArray(),
75            catchError((err) => {
76              err = toDataQueryError(err);
77              err.message = `${api.name}: ${err.message}`;
78
79              return of<DataQueryResponse[]>([
80                {
81                  data: [],
82                  state: LoadingState.Error,
83                  error: err,
84                  key: `mixed-${i}-${dsRequest.requestId || ''}`,
85                },
86              ]);
87            })
88          );
89        })
90      )
91    );
92
93    return forkJoin(runningQueries).pipe(flattenResponses(), map(this.finalizeResponses), mergeAll());
94  }
95
96  testDatasource() {
97    return Promise.resolve({});
98  }
99
100  private isQueryable(query: BatchedQueries): boolean {
101    return query && Array.isArray(query.targets) && query.targets.length > 0;
102  }
103
104  private finalizeResponses(responses: DataQueryResponse[]): DataQueryResponse[] {
105    const { length } = responses;
106
107    if (length === 0) {
108      return responses;
109    }
110
111    const error = responses.find((response) => response.state === LoadingState.Error);
112    if (error) {
113      responses.push(error); // adds the first found error entry so error shows up in the panel
114    } else {
115      responses[length - 1].state = LoadingState.Done;
116    }
117
118    return responses;
119  }
120}
121
122function flattenResponses(): OperatorFunction<DataQueryResponse[][], DataQueryResponse[]> {
123  return reduce((all: DataQueryResponse[], current) => {
124    return current.reduce((innerAll, innerCurrent) => {
125      innerAll.push.apply(innerAll, innerCurrent);
126      return innerAll;
127    }, all);
128  }, []);
129}
130