1import { mergeMap, throttleTime } from 'rxjs/operators';
2import { identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs';
3import {
4  AbsoluteTimeRange,
5  DataQuery,
6  DataQueryErrorType,
7  DataQueryResponse,
8  DataSourceApi,
9  hasLogsVolumeSupport,
10  LoadingState,
11  PanelData,
12  PanelEvents,
13  QueryFixAction,
14  toLegacyResponseData,
15} from '@grafana/data';
16
17import {
18  buildQueryTransaction,
19  ensureQueries,
20  generateEmptyQuery,
21  generateNewKeyAndAddRefIdIfMissing,
22  getQueryKeys,
23  hasNonEmptyQuery,
24  stopQueryState,
25  updateHistory,
26} from 'app/core/utils/explore';
27import { addToRichHistory } from 'app/core/utils/richHistory';
28import { ExploreItemState, ExplorePanelData, ThunkResult } from 'app/types';
29import { ExploreId, QueryOptions } from 'app/types/explore';
30import { getTimeZone } from 'app/features/profile/state/selectors';
31import { getShiftedTimeRange } from 'app/core/utils/timePicker';
32import { notifyApp } from '../../../core/actions';
33import { runRequest } from '../../query/state/runRequest';
34import { decorateData } from '../utils/decorators';
35import { createErrorNotification } from '../../../core/copy/appNotification';
36import { localStorageFullAction, richHistoryLimitExceededAction, richHistoryUpdatedAction, stateSave } from './main';
37import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit';
38import { updateTime } from './time';
39import { historyUpdatedAction } from './history';
40import { createCacheKey, getResultsFromCache } from './utils';
41import deepEqual from 'fast-deep-equal';
42
43//
44// Actions and Payloads
45//
46
47/**
48 * Adds a query row after the row with the given index.
49 */
50export interface AddQueryRowPayload {
51  exploreId: ExploreId;
52  index: number;
53  query: DataQuery;
54}
55export const addQueryRowAction = createAction<AddQueryRowPayload>('explore/addQueryRow');
56
57/**
58 * Query change handler for the query row with the given index.
59 * If `override` is reset the query modifications and run the queries. Use this to set queries via a link.
60 */
61export interface ChangeQueriesPayload {
62  exploreId: ExploreId;
63  queries: DataQuery[];
64}
65export const changeQueriesAction = createAction<ChangeQueriesPayload>('explore/changeQueries');
66
67/**
68 * Cancel running queries.
69 */
70export interface CancelQueriesPayload {
71  exploreId: ExploreId;
72}
73export const cancelQueriesAction = createAction<CancelQueriesPayload>('explore/cancelQueries');
74
75export interface QueriesImportedPayload {
76  exploreId: ExploreId;
77  queries: DataQuery[];
78}
79export const queriesImportedAction = createAction<QueriesImportedPayload>('explore/queriesImported');
80
81/**
82 * Action to modify a query given a datasource-specific modifier action.
83 * @param exploreId Explore area
84 * @param modification Action object with a type, e.g., ADD_FILTER
85 * @param index Optional query row index. If omitted, the modification is applied to all query rows.
86 * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
87 */
88export interface ModifyQueriesPayload {
89  exploreId: ExploreId;
90  modification: QueryFixAction;
91  index?: number;
92  modifier: (query: DataQuery, modification: QueryFixAction) => DataQuery;
93}
94export const modifyQueriesAction = createAction<ModifyQueriesPayload>('explore/modifyQueries');
95
96export interface QueryStoreSubscriptionPayload {
97  exploreId: ExploreId;
98  querySubscription: Unsubscribable;
99}
100
101export const queryStoreSubscriptionAction = createAction<QueryStoreSubscriptionPayload>(
102  'explore/queryStoreSubscription'
103);
104
105export interface StoreLogsVolumeDataProvider {
106  exploreId: ExploreId;
107  logsVolumeDataProvider?: Observable<DataQueryResponse>;
108}
109
110/**
111 * Stores available logs volume provider after running the query. Used internally by runQueries().
112 */
113export const storeLogsVolumeDataProviderAction = createAction<StoreLogsVolumeDataProvider>(
114  'explore/storeLogsVolumeDataProviderAction'
115);
116
117export const cleanLogsVolumeAction = createAction<{ exploreId: ExploreId }>('explore/cleanLogsVolumeAction');
118
119export interface StoreLogsVolumeDataSubscriptionPayload {
120  exploreId: ExploreId;
121  logsVolumeDataSubscription?: SubscriptionLike;
122}
123
124/**
125 * Stores current logs volume subscription for given explore pane.
126 */
127const storeLogsVolumeDataSubscriptionAction = createAction<StoreLogsVolumeDataSubscriptionPayload>(
128  'explore/storeLogsVolumeDataSubscriptionAction'
129);
130
131/**
132 * Stores data returned by the provider. Used internally by loadLogsVolumeData().
133 */
134const updateLogsVolumeDataAction = createAction<{
135  exploreId: ExploreId;
136  logsVolumeData: DataQueryResponse;
137}>('explore/updateLogsVolumeDataAction');
138
139export interface QueryEndedPayload {
140  exploreId: ExploreId;
141  response: ExplorePanelData;
142}
143export const queryStreamUpdatedAction = createAction<QueryEndedPayload>('explore/queryStreamUpdated');
144
145/**
146 * Reset queries to the given queries. Any modifications will be discarded.
147 * Use this action for clicks on query examples. Triggers a query run.
148 */
149export interface SetQueriesPayload {
150  exploreId: ExploreId;
151  queries: DataQuery[];
152}
153export const setQueriesAction = createAction<SetQueriesPayload>('explore/setQueries');
154
155export interface ChangeLoadingStatePayload {
156  exploreId: ExploreId;
157  loadingState: LoadingState;
158}
159export const changeLoadingStateAction = createAction<ChangeLoadingStatePayload>('changeLoadingState');
160
161export interface SetPausedStatePayload {
162  exploreId: ExploreId;
163  isPaused: boolean;
164}
165export const setPausedStateAction = createAction<SetPausedStatePayload>('explore/setPausedState');
166
167/**
168 * Start a scan for more results using the given scanner.
169 * @param exploreId Explore area
170 * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
171 */
172export interface ScanStartPayload {
173  exploreId: ExploreId;
174}
175export const scanStartAction = createAction<ScanStartPayload>('explore/scanStart');
176
177/**
178 * Stop any scanning for more results.
179 */
180export interface ScanStopPayload {
181  exploreId: ExploreId;
182}
183export const scanStopAction = createAction<ScanStopPayload>('explore/scanStop');
184
185/**
186 * Adds query results to cache.
187 * This is currently used to cache last 5 query results for log queries run from logs navigation (pagination).
188 */
189export interface AddResultsToCachePayload {
190  exploreId: ExploreId;
191  cacheKey: string;
192  queryResponse: PanelData;
193}
194export const addResultsToCacheAction = createAction<AddResultsToCachePayload>('explore/addResultsToCache');
195
196/**
197 *  Clears cache.
198 */
199export interface ClearCachePayload {
200  exploreId: ExploreId;
201}
202export const clearCacheAction = createAction<ClearCachePayload>('explore/clearCache');
203
204//
205// Action creators
206//
207
208/**
209 * Adds a query row after the row with the given index.
210 */
211export function addQueryRow(exploreId: ExploreId, index: number): ThunkResult<void> {
212  return (dispatch, getState) => {
213    const queries = getState().explore[exploreId]!.queries;
214    const query = generateEmptyQuery(queries, index);
215
216    dispatch(addQueryRowAction({ exploreId, index, query }));
217  };
218}
219
220/**
221 * Cancel running queries
222 */
223export function cancelQueries(exploreId: ExploreId): ThunkResult<void> {
224  return (dispatch, getState) => {
225    dispatch(scanStopAction({ exploreId }));
226    dispatch(cancelQueriesAction({ exploreId }));
227    dispatch(
228      storeLogsVolumeDataProviderAction({
229        exploreId,
230        logsVolumeDataProvider: undefined,
231      })
232    );
233    // clear any incomplete data
234    if (getState().explore[exploreId]!.logsVolumeData?.state !== LoadingState.Done) {
235      dispatch(cleanLogsVolumeAction({ exploreId }));
236    }
237    dispatch(stateSave());
238  };
239}
240
241/**
242 * Import queries from previous datasource if possible eg Loki and Prometheus have similar query language so the
243 * labels part can be reused to get similar data.
244 * @param exploreId
245 * @param queries
246 * @param sourceDataSource
247 * @param targetDataSource
248 */
249export const importQueries = (
250  exploreId: ExploreId,
251  queries: DataQuery[],
252  sourceDataSource: DataSourceApi | undefined | null,
253  targetDataSource: DataSourceApi
254): ThunkResult<void> => {
255  return async (dispatch) => {
256    if (!sourceDataSource) {
257      // explore not initialized
258      dispatch(queriesImportedAction({ exploreId, queries }));
259      return;
260    }
261
262    let importedQueries = queries;
263    // Check if queries can be imported from previously selected datasource
264    if (sourceDataSource.meta?.id === targetDataSource.meta?.id) {
265      // Keep same queries if same type of datasource, but delete datasource query property to prevent mismatch of new and old data source instance
266      importedQueries = queries.map(({ datasource, ...query }) => query);
267    } else if (targetDataSource.importQueries) {
268      // Datasource-specific importers
269      importedQueries = await targetDataSource.importQueries(queries, sourceDataSource);
270    } else {
271      // Default is blank queries
272      importedQueries = ensureQueries();
273    }
274
275    const nextQueries = ensureQueries(importedQueries);
276
277    dispatch(queriesImportedAction({ exploreId, queries: nextQueries }));
278  };
279};
280
281/**
282 * Action to modify a query given a datasource-specific modifier action.
283 * @param exploreId Explore area
284 * @param modification Action object with a type, e.g., ADD_FILTER
285 * @param index Optional query row index. If omitted, the modification is applied to all query rows.
286 * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
287 */
288export function modifyQueries(
289  exploreId: ExploreId,
290  modification: QueryFixAction,
291  modifier: any,
292  index?: number
293): ThunkResult<void> {
294  return (dispatch) => {
295    dispatch(modifyQueriesAction({ exploreId, modification, index, modifier }));
296    if (!modification.preventSubmit) {
297      dispatch(runQueries(exploreId));
298    }
299  };
300}
301
302/**
303 * Main action to run queries and dispatches sub-actions based on which result viewers are active
304 */
305export const runQueries = (
306  exploreId: ExploreId,
307  options?: { replaceUrl?: boolean; preserveCache?: boolean }
308): ThunkResult<void> => {
309  return (dispatch, getState) => {
310    dispatch(updateTime({ exploreId }));
311
312    // We always want to clear cache unless we explicitly pass preserveCache parameter
313    const preserveCache = options?.preserveCache === true;
314    if (!preserveCache) {
315      dispatch(clearCache(exploreId));
316    }
317
318    const { richHistory } = getState().explore;
319    const exploreItemState = getState().explore[exploreId]!;
320    const {
321      datasourceInstance,
322      containerWidth,
323      isLive: live,
324      range,
325      scanning,
326      queryResponse,
327      querySubscription,
328      history,
329      refreshInterval,
330      absoluteRange,
331      cache,
332      logsVolumeDataProvider,
333    } = exploreItemState;
334    let newQuerySub;
335
336    const queries = exploreItemState.queries.map((query) => ({
337      ...query,
338      datasource: query.datasource || datasourceInstance?.getRef(),
339    }));
340
341    const cachedValue = getResultsFromCache(cache, absoluteRange);
342
343    // If we have results saved in cache, we are going to use those results instead of running queries
344    if (cachedValue) {
345      newQuerySub = of(cachedValue)
346        .pipe(
347          mergeMap((data: PanelData) =>
348            decorateData(data, queryResponse, absoluteRange, refreshInterval, queries, !!logsVolumeDataProvider)
349          )
350        )
351        .subscribe((data) => {
352          if (!data.error) {
353            dispatch(stateSave());
354          }
355
356          dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
357        });
358
359      // If we don't have results saved in cache, run new queries
360    } else {
361      if (!hasNonEmptyQuery(queries)) {
362        dispatch(stateSave({ replace: options?.replaceUrl })); // Remember to save to state and update location
363        return;
364      }
365
366      if (!datasourceInstance) {
367        return;
368      }
369
370      // Some datasource's query builders allow per-query interval limits,
371      // but we're using the datasource interval limit for now
372      const minInterval = datasourceInstance?.interval;
373
374      stopQueryState(querySubscription);
375
376      const datasourceId = datasourceInstance?.meta.id;
377
378      const queryOptions: QueryOptions = {
379        minInterval,
380        // maxDataPoints is used in:
381        // Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
382        // Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
383        // Influx - used to correctly display logs in graph
384        // TODO:unification
385        // maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
386        maxDataPoints: containerWidth,
387        liveStreaming: live,
388      };
389
390      const datasourceName = datasourceInstance.name;
391      const timeZone = getTimeZone(getState().user);
392      const transaction = buildQueryTransaction(exploreId, queries, queryOptions, range, scanning, timeZone);
393
394      let querySaved = false;
395      dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Loading }));
396
397      newQuerySub = runRequest(datasourceInstance, transaction.request)
398        .pipe(
399          // Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
400          // rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
401          // actually can see what is happening.
402          live ? throttleTime(500) : identity,
403          mergeMap((data: PanelData) =>
404            decorateData(
405              data,
406              queryResponse,
407              absoluteRange,
408              refreshInterval,
409              queries,
410              !!getState().explore[exploreId]!.logsVolumeDataProvider
411            )
412          )
413        )
414        .subscribe(
415          (data) => {
416            if (data.state !== LoadingState.Loading && !data.error && !querySaved) {
417              // Side-effect: Saving history in localstorage
418              const nextHistory = updateHistory(history, datasourceId, queries);
419              const { richHistory: nextRichHistory, localStorageFull, limitExceeded } = addToRichHistory(
420                richHistory || [],
421                datasourceId,
422                datasourceName,
423                queries,
424                false,
425                '',
426                '',
427                !getState().explore.localStorageFull,
428                !getState().explore.richHistoryLimitExceededWarningShown
429              );
430              dispatch(historyUpdatedAction({ exploreId, history: nextHistory }));
431              dispatch(richHistoryUpdatedAction({ richHistory: nextRichHistory }));
432              if (localStorageFull) {
433                dispatch(localStorageFullAction());
434              }
435              if (limitExceeded) {
436                dispatch(richHistoryLimitExceededAction());
437              }
438
439              // We save queries to the URL here so that only successfully run queries change the URL.
440              dispatch(stateSave({ replace: options?.replaceUrl }));
441              querySaved = true;
442            }
443
444            dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
445
446            // Keep scanning for results if this was the last scanning transaction
447            if (getState().explore[exploreId]!.scanning) {
448              if (data.state === LoadingState.Done && data.series.length === 0) {
449                const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
450                dispatch(updateTime({ exploreId, absoluteRange: range }));
451                dispatch(runQueries(exploreId));
452              } else {
453                // We can stop scanning if we have a result
454                dispatch(scanStopAction({ exploreId }));
455              }
456            }
457          },
458          (error) => {
459            dispatch(notifyApp(createErrorNotification('Query processing error', error)));
460            dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error }));
461            console.error(error);
462          }
463        );
464
465      if (live) {
466        dispatch(
467          storeLogsVolumeDataProviderAction({
468            exploreId,
469            logsVolumeDataProvider: undefined,
470          })
471        );
472        dispatch(cleanLogsVolumeAction({ exploreId }));
473      } else if (hasLogsVolumeSupport(datasourceInstance)) {
474        const logsVolumeDataProvider = datasourceInstance.getLogsVolumeDataProvider(transaction.request);
475        dispatch(
476          storeLogsVolumeDataProviderAction({
477            exploreId,
478            logsVolumeDataProvider,
479          })
480        );
481        const { logsVolumeData, absoluteRange } = getState().explore[exploreId]!;
482        if (!canReuseLogsVolumeData(logsVolumeData, queries, absoluteRange)) {
483          dispatch(cleanLogsVolumeAction({ exploreId }));
484          dispatch(loadLogsVolumeData(exploreId));
485        }
486      } else {
487        dispatch(
488          storeLogsVolumeDataProviderAction({
489            exploreId,
490            logsVolumeDataProvider: undefined,
491          })
492        );
493      }
494    }
495
496    dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub }));
497  };
498};
499
500/**
501 * Checks if after changing the time range the existing data can be used to show logs volume.
502 * It can happen if queries are the same and new time range is within existing data time range.
503 */
504function canReuseLogsVolumeData(
505  logsVolumeData: DataQueryResponse | undefined,
506  queries: DataQuery[],
507  selectedTimeRange: AbsoluteTimeRange
508): boolean {
509  if (logsVolumeData && logsVolumeData.data[0]) {
510    // check if queries are the same
511    if (!deepEqual(logsVolumeData.data[0].meta?.custom?.targets, queries)) {
512      return false;
513    }
514    const dataRange = logsVolumeData && logsVolumeData.data[0] && logsVolumeData.data[0].meta?.custom?.absoluteRange;
515    // if selected range is within loaded logs volume
516    if (dataRange && dataRange.from <= selectedTimeRange.from && selectedTimeRange.to <= dataRange.to) {
517      return true;
518    }
519  }
520  return false;
521}
522
523/**
524 * Reset queries to the given queries. Any modifications will be discarded.
525 * Use this action for clicks on query examples. Triggers a query run.
526 */
527export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): ThunkResult<void> {
528  return (dispatch, getState) => {
529    // Inject react keys into query objects
530    const queries = getState().explore[exploreId]!.queries;
531    const nextQueries = rawQueries.map((query, index) => generateNewKeyAndAddRefIdIfMissing(query, queries, index));
532    dispatch(setQueriesAction({ exploreId, queries: nextQueries }));
533    dispatch(runQueries(exploreId));
534  };
535}
536
537/**
538 * Start a scan for more results using the given scanner.
539 * @param exploreId Explore area
540 * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
541 */
542export function scanStart(exploreId: ExploreId): ThunkResult<void> {
543  return (dispatch, getState) => {
544    // Register the scanner
545    dispatch(scanStartAction({ exploreId }));
546    // Scanning must trigger query run, and return the new range
547    const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
548    // Set the new range to be displayed
549    dispatch(updateTime({ exploreId, absoluteRange: range }));
550    dispatch(runQueries(exploreId));
551  };
552}
553
554export function addResultsToCache(exploreId: ExploreId): ThunkResult<void> {
555  return (dispatch, getState) => {
556    const queryResponse = getState().explore[exploreId]!.queryResponse;
557    const absoluteRange = getState().explore[exploreId]!.absoluteRange;
558    const cacheKey = createCacheKey(absoluteRange);
559
560    // Save results to cache only when all results recived and loading is done
561    if (queryResponse.state === LoadingState.Done) {
562      dispatch(addResultsToCacheAction({ exploreId, cacheKey, queryResponse }));
563    }
564  };
565}
566
567export function clearCache(exploreId: ExploreId): ThunkResult<void> {
568  return (dispatch, getState) => {
569    dispatch(clearCacheAction({ exploreId }));
570  };
571}
572
573/**
574 * Initializes loading logs volume data and stores emitted value.
575 */
576export function loadLogsVolumeData(exploreId: ExploreId): ThunkResult<void> {
577  return (dispatch, getState) => {
578    const { logsVolumeDataProvider } = getState().explore[exploreId]!;
579    if (logsVolumeDataProvider) {
580      const logsVolumeDataSubscription = logsVolumeDataProvider.subscribe({
581        next: (logsVolumeData: DataQueryResponse) => {
582          dispatch(updateLogsVolumeDataAction({ exploreId, logsVolumeData }));
583        },
584      });
585      dispatch(storeLogsVolumeDataSubscriptionAction({ exploreId, logsVolumeDataSubscription }));
586    }
587  };
588}
589
590//
591// Reducer
592//
593
594// Redux Toolkit uses ImmerJs as part of their solution to ensure that state objects are not mutated.
595// ImmerJs has an autoFreeze option that freezes objects from change which means this reducer can't be migrated to createSlice
596// because the state would become frozen and during run time we would get errors because flot (Graph lib) would try to mutate
597// the frozen state.
598// https://github.com/reduxjs/redux-toolkit/issues/242
599export const queryReducer = (state: ExploreItemState, action: AnyAction): ExploreItemState => {
600  if (addQueryRowAction.match(action)) {
601    const { queries } = state;
602    const { index, query } = action.payload;
603
604    // Add to queries, which will cause a new row to be rendered
605    const nextQueries = [...queries.slice(0, index + 1), { ...query }, ...queries.slice(index + 1)];
606
607    return {
608      ...state,
609      queries: nextQueries,
610      queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
611    };
612  }
613
614  if (changeQueriesAction.match(action)) {
615    const { queries } = action.payload;
616
617    return {
618      ...state,
619      queries,
620    };
621  }
622
623  if (cancelQueriesAction.match(action)) {
624    stopQueryState(state.querySubscription);
625
626    return {
627      ...state,
628      loading: false,
629    };
630  }
631
632  if (modifyQueriesAction.match(action)) {
633    const { queries } = state;
634    const { modification, index, modifier } = action.payload;
635    let nextQueries: DataQuery[];
636    if (index === undefined) {
637      // Modify all queries
638      nextQueries = queries.map((query, i) => {
639        const nextQuery = modifier({ ...query }, modification);
640        return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
641      });
642    } else {
643      // Modify query only at index
644      nextQueries = queries.map((query, i) => {
645        if (i === index) {
646          const nextQuery = modifier({ ...query }, modification);
647          return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
648        }
649
650        return query;
651      });
652    }
653    return {
654      ...state,
655      queries: nextQueries,
656      queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
657    };
658  }
659
660  if (setQueriesAction.match(action)) {
661    const { queries } = action.payload;
662    return {
663      ...state,
664      queries: queries.slice(),
665      queryKeys: getQueryKeys(queries, state.datasourceInstance),
666    };
667  }
668
669  if (queriesImportedAction.match(action)) {
670    const { queries } = action.payload;
671    return {
672      ...state,
673      queries,
674      queryKeys: getQueryKeys(queries, state.datasourceInstance),
675    };
676  }
677
678  if (queryStoreSubscriptionAction.match(action)) {
679    const { querySubscription } = action.payload;
680    return {
681      ...state,
682      querySubscription,
683    };
684  }
685
686  if (storeLogsVolumeDataProviderAction.match(action)) {
687    let { logsVolumeDataProvider } = action.payload;
688    if (state.logsVolumeDataSubscription) {
689      state.logsVolumeDataSubscription.unsubscribe();
690    }
691    return {
692      ...state,
693      logsVolumeDataProvider,
694      logsVolumeDataSubscription: undefined,
695    };
696  }
697
698  if (cleanLogsVolumeAction.match(action)) {
699    return {
700      ...state,
701      logsVolumeData: undefined,
702    };
703  }
704
705  if (storeLogsVolumeDataSubscriptionAction.match(action)) {
706    const { logsVolumeDataSubscription } = action.payload;
707    return {
708      ...state,
709      logsVolumeDataSubscription,
710    };
711  }
712
713  if (updateLogsVolumeDataAction.match(action)) {
714    let { logsVolumeData } = action.payload;
715
716    return {
717      ...state,
718      logsVolumeData,
719    };
720  }
721
722  if (queryStreamUpdatedAction.match(action)) {
723    return processQueryResponse(state, action);
724  }
725
726  if (queriesImportedAction.match(action)) {
727    const { queries } = action.payload;
728    return {
729      ...state,
730      queries,
731      queryKeys: getQueryKeys(queries, state.datasourceInstance),
732    };
733  }
734
735  if (changeLoadingStateAction.match(action)) {
736    const { loadingState } = action.payload;
737    return {
738      ...state,
739      queryResponse: {
740        ...state.queryResponse,
741        state: loadingState,
742      },
743      loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
744    };
745  }
746
747  if (setPausedStateAction.match(action)) {
748    const { isPaused } = action.payload;
749    return {
750      ...state,
751      isPaused: isPaused,
752    };
753  }
754
755  if (scanStartAction.match(action)) {
756    return { ...state, scanning: true };
757  }
758
759  if (scanStopAction.match(action)) {
760    return {
761      ...state,
762      scanning: false,
763      scanRange: undefined,
764    };
765  }
766
767  if (addResultsToCacheAction.match(action)) {
768    const CACHE_LIMIT = 5;
769    const { cache } = state;
770    const { queryResponse, cacheKey } = action.payload;
771
772    let newCache = [...cache];
773    const isDuplicateKey = newCache.some((c) => c.key === cacheKey);
774
775    if (!isDuplicateKey) {
776      const newCacheItem = { key: cacheKey, value: queryResponse };
777      newCache = [newCacheItem, ...newCache].slice(0, CACHE_LIMIT);
778    }
779
780    return {
781      ...state,
782      cache: newCache,
783    };
784  }
785
786  if (clearCacheAction.match(action)) {
787    return {
788      ...state,
789      cache: [],
790    };
791  }
792
793  return state;
794};
795
796export const processQueryResponse = (
797  state: ExploreItemState,
798  action: PayloadAction<QueryEndedPayload>
799): ExploreItemState => {
800  const { response } = action.payload;
801  const {
802    request,
803    state: loadingState,
804    series,
805    error,
806    graphResult,
807    logsResult,
808    tableResult,
809    traceFrames,
810    nodeGraphFrames,
811  } = response;
812
813  if (error) {
814    if (error.type === DataQueryErrorType.Timeout) {
815      return {
816        ...state,
817        queryResponse: response,
818        loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
819      };
820    } else if (error.type === DataQueryErrorType.Cancelled) {
821      return state;
822    }
823
824    // Send error to Angular editors
825    if (state.datasourceInstance?.components?.QueryCtrl) {
826      state.eventBridge.emit(PanelEvents.dataError, error);
827    }
828  }
829
830  if (!request) {
831    return { ...state };
832  }
833
834  // Send legacy data to Angular editors
835  if (state.datasourceInstance?.components?.QueryCtrl) {
836    const legacy = series.map((v) => toLegacyResponseData(v));
837    state.eventBridge.emit(PanelEvents.dataReceived, legacy);
838  }
839
840  return {
841    ...state,
842    queryResponse: response,
843    graphResult,
844    tableResult,
845    logsResult,
846    loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
847    showLogs: !!logsResult,
848    showMetrics: !!graphResult,
849    showTable: !!tableResult,
850    showTrace: !!traceFrames.length,
851    showNodeGraph: !!nodeGraphFrames.length,
852  };
853};
854