1import {
2  addQueryRowAction,
3  addResultsToCache,
4  cancelQueries,
5  cancelQueriesAction,
6  cleanLogsVolumeAction,
7  clearCache,
8  importQueries,
9  queryReducer,
10  runQueries,
11  scanStartAction,
12  scanStopAction,
13  storeLogsVolumeDataProviderAction,
14} from './query';
15import { ExploreId, ExploreItemState, StoreState, ThunkDispatch } from 'app/types';
16import { interval, Observable, of } from 'rxjs';
17import {
18  ArrayVector,
19  DataFrame,
20  DataQuery,
21  DataQueryResponse,
22  DataSourceApi,
23  DataSourceJsonData,
24  DefaultTimeZone,
25  LoadingState,
26  MutableDataFrame,
27  PanelData,
28  RawTimeRange,
29  toUtc,
30} from '@grafana/data';
31import { thunkTester } from 'test/core/thunk/thunkTester';
32import { makeExplorePaneState } from './utils';
33import { reducerTester } from '../../../../test/core/redux/reducerTester';
34import { configureStore } from '../../../store/configureStore';
35import { setTimeSrv } from '../../dashboard/services/TimeSrv';
36import Mock = jest.Mock;
37
38const t = toUtc();
39const testRange = {
40  from: t,
41  to: t,
42  raw: {
43    from: t,
44    to: t,
45  },
46};
47const defaultInitialState = {
48  user: {
49    orgId: '1',
50    timeZone: DefaultTimeZone,
51  },
52  explore: {
53    [ExploreId.left]: {
54      datasourceInstance: {
55        query: jest.fn(),
56        getRef: jest.fn(),
57        meta: {
58          id: 'something',
59        },
60      },
61      initialized: true,
62      containerWidth: 1920,
63      eventBridge: { emit: () => {} } as any,
64      queries: [{ expr: 'test' }] as any[],
65      range: testRange,
66      refreshInterval: {
67        label: 'Off',
68        value: 0,
69      },
70      cache: [],
71    },
72  },
73};
74
75function setupQueryResponse(state: StoreState) {
76  (state.explore[ExploreId.left].datasourceInstance?.query as Mock).mockReturnValueOnce(
77    of({
78      error: { message: 'test error' },
79      data: [
80        new MutableDataFrame({
81          fields: [{ name: 'test', values: new ArrayVector() }],
82          meta: {
83            preferredVisualisationType: 'graph',
84          },
85        }),
86      ],
87    } as DataQueryResponse)
88  );
89}
90
91describe('runQueries', () => {
92  it('should pass dataFrames to state even if there is error in response', async () => {
93    setTimeSrv({
94      init() {},
95    } as any);
96    const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
97      ...(defaultInitialState as any),
98    });
99    setupQueryResponse(getState());
100    await dispatch(runQueries(ExploreId.left));
101    expect(getState().explore[ExploreId.left].showMetrics).toBeTruthy();
102    expect(getState().explore[ExploreId.left].graphResult).toBeDefined();
103  });
104});
105
106describe('running queries', () => {
107  it('should cancel running query when cancelQueries is dispatched', async () => {
108    const unsubscribable = interval(1000);
109    unsubscribable.subscribe();
110    const exploreId = ExploreId.left;
111    const initialState = {
112      explore: {
113        [exploreId]: {
114          datasourceInstance: { name: 'testDs' },
115          initialized: true,
116          loading: true,
117          querySubscription: unsubscribable,
118          queries: ['A'],
119          range: testRange,
120        },
121      },
122
123      user: {
124        orgId: 'A',
125      },
126    };
127
128    const dispatchedActions = await thunkTester(initialState)
129      .givenThunk(cancelQueries)
130      .whenThunkIsDispatched(exploreId);
131
132    expect(dispatchedActions).toEqual([
133      scanStopAction({ exploreId }),
134      cancelQueriesAction({ exploreId }),
135      storeLogsVolumeDataProviderAction({ exploreId, logsVolumeDataProvider: undefined }),
136      cleanLogsVolumeAction({ exploreId }),
137    ]);
138  });
139});
140
141describe('importing queries', () => {
142  describe('when importing queries between the same type of data source', () => {
143    it('remove datasource property from all of the queries', async () => {
144      const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
145        ...(defaultInitialState as any),
146        explore: {
147          [ExploreId.left]: {
148            ...defaultInitialState.explore[ExploreId.left],
149            datasourceInstance: { name: 'testDs', type: 'postgres' },
150          },
151        },
152      });
153
154      await dispatch(
155        importQueries(
156          ExploreId.left,
157          [
158            { datasource: { type: 'postgresql' }, refId: 'refId_A' },
159            { datasource: { type: 'postgresql' }, refId: 'refId_B' },
160          ],
161          { name: 'Postgres1', type: 'postgres' } as DataSourceApi<DataQuery, DataSourceJsonData, {}>,
162          { name: 'Postgres2', type: 'postgres' } as DataSourceApi<DataQuery, DataSourceJsonData, {}>
163        )
164      );
165
166      expect(getState().explore[ExploreId.left].queries[0]).toHaveProperty('refId', 'refId_A');
167      expect(getState().explore[ExploreId.left].queries[1]).toHaveProperty('refId', 'refId_B');
168      expect(getState().explore[ExploreId.left].queries[0]).not.toHaveProperty('datasource');
169      expect(getState().explore[ExploreId.left].queries[1]).not.toHaveProperty('datasource');
170    });
171  });
172});
173
174describe('reducer', () => {
175  describe('scanning', () => {
176    it('should start scanning', () => {
177      const initialState: ExploreItemState = {
178        ...makeExplorePaneState(),
179        scanning: false,
180      };
181
182      reducerTester<ExploreItemState>()
183        .givenReducer(queryReducer, initialState)
184        .whenActionIsDispatched(scanStartAction({ exploreId: ExploreId.left }))
185        .thenStateShouldEqual({
186          ...initialState,
187          scanning: true,
188        });
189    });
190    it('should stop scanning', () => {
191      const initialState = {
192        ...makeExplorePaneState(),
193        scanning: true,
194        scanRange: {} as RawTimeRange,
195      };
196
197      reducerTester<ExploreItemState>()
198        .givenReducer(queryReducer, initialState)
199        .whenActionIsDispatched(scanStopAction({ exploreId: ExploreId.left }))
200        .thenStateShouldEqual({
201          ...initialState,
202          scanning: false,
203          scanRange: undefined,
204        });
205    });
206  });
207
208  describe('query rows', () => {
209    it('adds a new query row', () => {
210      reducerTester<ExploreItemState>()
211        .givenReducer(queryReducer, ({
212          queries: [],
213        } as unknown) as ExploreItemState)
214        .whenActionIsDispatched(
215          addQueryRowAction({
216            exploreId: ExploreId.left,
217            query: { refId: 'A', key: 'mockKey' },
218            index: 0,
219          })
220        )
221        .thenStateShouldEqual(({
222          queries: [{ refId: 'A', key: 'mockKey' }],
223          queryKeys: ['mockKey-0'],
224        } as unknown) as ExploreItemState);
225    });
226  });
227
228  describe('caching', () => {
229    it('should add response to cache', async () => {
230      const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
231        ...(defaultInitialState as any),
232        explore: {
233          [ExploreId.left]: {
234            ...defaultInitialState.explore[ExploreId.left],
235            queryResponse: {
236              series: [{ name: 'test name' }] as DataFrame[],
237              state: LoadingState.Done,
238            } as PanelData,
239            absoluteRange: { from: 1621348027000, to: 1621348050000 },
240          },
241        },
242      });
243
244      await dispatch(addResultsToCache(ExploreId.left));
245
246      expect(getState().explore[ExploreId.left].cache).toEqual([
247        { key: 'from=1621348027000&to=1621348050000', value: { series: [{ name: 'test name' }], state: 'Done' } },
248      ]);
249    });
250
251    it('should not add response to cache if response is still loading', async () => {
252      const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
253        ...(defaultInitialState as any),
254        explore: {
255          [ExploreId.left]: {
256            ...defaultInitialState.explore[ExploreId.left],
257            queryResponse: { series: [{ name: 'test name' }] as DataFrame[], state: LoadingState.Loading } as PanelData,
258            absoluteRange: { from: 1621348027000, to: 1621348050000 },
259          },
260        },
261      });
262
263      await dispatch(addResultsToCache(ExploreId.left));
264
265      expect(getState().explore[ExploreId.left].cache).toEqual([]);
266    });
267
268    it('should not add duplicate response to cache', async () => {
269      const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
270        ...(defaultInitialState as any),
271        explore: {
272          [ExploreId.left]: {
273            ...defaultInitialState.explore[ExploreId.left],
274            queryResponse: {
275              series: [{ name: 'test name' }] as DataFrame[],
276              state: LoadingState.Done,
277            } as PanelData,
278            absoluteRange: { from: 1621348027000, to: 1621348050000 },
279            cache: [
280              {
281                key: 'from=1621348027000&to=1621348050000',
282                value: { series: [{ name: 'old test name' }], state: LoadingState.Done },
283              },
284            ],
285          },
286        },
287      });
288
289      await dispatch(addResultsToCache(ExploreId.left));
290
291      expect(getState().explore[ExploreId.left].cache).toHaveLength(1);
292      expect(getState().explore[ExploreId.left].cache).toEqual([
293        { key: 'from=1621348027000&to=1621348050000', value: { series: [{ name: 'old test name' }], state: 'Done' } },
294      ]);
295    });
296
297    it('should clear cache', async () => {
298      const { dispatch, getState }: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
299        ...(defaultInitialState as any),
300        explore: {
301          [ExploreId.left]: {
302            ...defaultInitialState.explore[ExploreId.left],
303            cache: [
304              {
305                key: 'from=1621348027000&to=1621348050000',
306                value: { series: [{ name: 'old test name' }], state: 'Done' },
307              },
308            ],
309          },
310        },
311      });
312
313      await dispatch(clearCache(ExploreId.left));
314
315      expect(getState().explore[ExploreId.left].cache).toEqual([]);
316    });
317  });
318
319  describe('log volume', () => {
320    let dispatch: ThunkDispatch,
321      getState: () => StoreState,
322      unsubscribes: Function[],
323      mockLogsVolumeDataProvider: () => Observable<DataQueryResponse>;
324
325    beforeEach(() => {
326      unsubscribes = [];
327      mockLogsVolumeDataProvider = () => {
328        return ({
329          subscribe: () => {
330            const unsubscribe = jest.fn();
331            unsubscribes.push(unsubscribe);
332            return {
333              unsubscribe,
334            };
335          },
336        } as unknown) as Observable<DataQueryResponse>;
337      };
338
339      const store: { dispatch: ThunkDispatch; getState: () => StoreState } = configureStore({
340        ...(defaultInitialState as any),
341        explore: {
342          [ExploreId.left]: {
343            ...defaultInitialState.explore[ExploreId.left],
344            datasourceInstance: {
345              query: jest.fn(),
346              getRef: jest.fn(),
347              meta: {
348                id: 'something',
349              },
350              getLogsVolumeDataProvider: () => {
351                return mockLogsVolumeDataProvider();
352              },
353            },
354          },
355        },
356      });
357
358      dispatch = store.dispatch;
359      getState = store.getState;
360
361      setupQueryResponse(getState());
362    });
363
364    it('should cancel any unfinished logs volume queries when a new query is run', async () => {
365      await dispatch(runQueries(ExploreId.left));
366      // first query is run automatically
367      // loading in progress - one subscription created, not cleaned up yet
368      expect(unsubscribes).toHaveLength(1);
369      expect(unsubscribes[0]).not.toBeCalled();
370
371      setupQueryResponse(getState());
372      await dispatch(runQueries(ExploreId.left));
373      // a new query is run while log volume query is not resolve yet...
374      expect(unsubscribes[0]).toBeCalled();
375      // first subscription is cleaned up, a new subscription is created automatically
376      expect(unsubscribes).toHaveLength(2);
377      expect(unsubscribes[1]).not.toBeCalled();
378    });
379
380    it('should cancel log volume query when the main query is canceled', async () => {
381      await dispatch(runQueries(ExploreId.left));
382      expect(unsubscribes).toHaveLength(1);
383      expect(unsubscribes[0]).not.toBeCalled();
384
385      await dispatch(cancelQueries(ExploreId.left));
386      expect(unsubscribes).toHaveLength(1);
387      expect(unsubscribes[0]).toBeCalled();
388
389      expect(getState().explore[ExploreId.left].logsVolumeData).toBeUndefined();
390      expect(getState().explore[ExploreId.left].logsVolumeDataProvider).toBeUndefined();
391    });
392
393    it('should load logs volume after running the query', async () => {
394      await dispatch(runQueries(ExploreId.left));
395      expect(unsubscribes).toHaveLength(1);
396    });
397
398    it('should clean any incomplete log volume data when main query is canceled', async () => {
399      mockLogsVolumeDataProvider = () => {
400        return of({ state: LoadingState.Loading, error: undefined, data: [] });
401      };
402      await dispatch(runQueries(ExploreId.left));
403
404      expect(getState().explore[ExploreId.left].logsVolumeData).toBeDefined();
405      expect(getState().explore[ExploreId.left].logsVolumeData!.state).toBe(LoadingState.Loading);
406      expect(getState().explore[ExploreId.left].logsVolumeDataProvider).toBeDefined();
407
408      await dispatch(cancelQueries(ExploreId.left));
409      expect(getState().explore[ExploreId.left].logsVolumeData).toBeUndefined();
410      expect(getState().explore[ExploreId.left].logsVolumeDataProvider).toBeUndefined();
411    });
412
413    it('keeps complete log volume data when main query is canceled', async () => {
414      mockLogsVolumeDataProvider = () => {
415        return of(
416          { state: LoadingState.Loading, error: undefined, data: [] },
417          { state: LoadingState.Done, error: undefined, data: [{}] }
418        );
419      };
420      await dispatch(runQueries(ExploreId.left));
421
422      expect(getState().explore[ExploreId.left].logsVolumeData).toBeDefined();
423      expect(getState().explore[ExploreId.left].logsVolumeData!.state).toBe(LoadingState.Done);
424      expect(getState().explore[ExploreId.left].logsVolumeDataProvider).toBeDefined();
425
426      await dispatch(cancelQueries(ExploreId.left));
427      expect(getState().explore[ExploreId.left].logsVolumeData).toBeDefined();
428      expect(getState().explore[ExploreId.left].logsVolumeData!.state).toBe(LoadingState.Done);
429      expect(getState().explore[ExploreId.left].logsVolumeDataProvider).toBeUndefined();
430    });
431  });
432});
433