1import { mergeMap, throttleTime } from 'rxjs/operators'; 2import { identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs'; 3import { 4 AbsoluteTimeRange, 5 DataQuery, 6 DataQueryErrorType, 7 DataQueryResponse, 8 DataSourceApi, 9 hasLogsVolumeSupport, 10 LoadingState, 11 PanelData, 12 PanelEvents, 13 QueryFixAction, 14 toLegacyResponseData, 15} from '@grafana/data'; 16 17import { 18 buildQueryTransaction, 19 ensureQueries, 20 generateEmptyQuery, 21 generateNewKeyAndAddRefIdIfMissing, 22 getQueryKeys, 23 hasNonEmptyQuery, 24 stopQueryState, 25 updateHistory, 26} from 'app/core/utils/explore'; 27import { addToRichHistory } from 'app/core/utils/richHistory'; 28import { ExploreItemState, ExplorePanelData, ThunkResult } from 'app/types'; 29import { ExploreId, QueryOptions } from 'app/types/explore'; 30import { getTimeZone } from 'app/features/profile/state/selectors'; 31import { getShiftedTimeRange } from 'app/core/utils/timePicker'; 32import { notifyApp } from '../../../core/actions'; 33import { runRequest } from '../../query/state/runRequest'; 34import { decorateData } from '../utils/decorators'; 35import { createErrorNotification } from '../../../core/copy/appNotification'; 36import { localStorageFullAction, richHistoryLimitExceededAction, richHistoryUpdatedAction, stateSave } from './main'; 37import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit'; 38import { updateTime } from './time'; 39import { historyUpdatedAction } from './history'; 40import { createCacheKey, getResultsFromCache } from './utils'; 41import deepEqual from 'fast-deep-equal'; 42 43// 44// Actions and Payloads 45// 46 47/** 48 * Adds a query row after the row with the given index. 49 */ 50export interface AddQueryRowPayload { 51 exploreId: ExploreId; 52 index: number; 53 query: DataQuery; 54} 55export const addQueryRowAction = createAction<AddQueryRowPayload>('explore/addQueryRow'); 56 57/** 58 * Query change handler for the query row with the given index. 59 * If `override` is reset the query modifications and run the queries. Use this to set queries via a link. 60 */ 61export interface ChangeQueriesPayload { 62 exploreId: ExploreId; 63 queries: DataQuery[]; 64} 65export const changeQueriesAction = createAction<ChangeQueriesPayload>('explore/changeQueries'); 66 67/** 68 * Cancel running queries. 69 */ 70export interface CancelQueriesPayload { 71 exploreId: ExploreId; 72} 73export const cancelQueriesAction = createAction<CancelQueriesPayload>('explore/cancelQueries'); 74 75export interface QueriesImportedPayload { 76 exploreId: ExploreId; 77 queries: DataQuery[]; 78} 79export const queriesImportedAction = createAction<QueriesImportedPayload>('explore/queriesImported'); 80 81/** 82 * Action to modify a query given a datasource-specific modifier action. 83 * @param exploreId Explore area 84 * @param modification Action object with a type, e.g., ADD_FILTER 85 * @param index Optional query row index. If omitted, the modification is applied to all query rows. 86 * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`. 87 */ 88export interface ModifyQueriesPayload { 89 exploreId: ExploreId; 90 modification: QueryFixAction; 91 index?: number; 92 modifier: (query: DataQuery, modification: QueryFixAction) => DataQuery; 93} 94export const modifyQueriesAction = createAction<ModifyQueriesPayload>('explore/modifyQueries'); 95 96export interface QueryStoreSubscriptionPayload { 97 exploreId: ExploreId; 98 querySubscription: Unsubscribable; 99} 100 101export const queryStoreSubscriptionAction = createAction<QueryStoreSubscriptionPayload>( 102 'explore/queryStoreSubscription' 103); 104 105export interface StoreLogsVolumeDataProvider { 106 exploreId: ExploreId; 107 logsVolumeDataProvider?: Observable<DataQueryResponse>; 108} 109 110/** 111 * Stores available logs volume provider after running the query. Used internally by runQueries(). 112 */ 113export const storeLogsVolumeDataProviderAction = createAction<StoreLogsVolumeDataProvider>( 114 'explore/storeLogsVolumeDataProviderAction' 115); 116 117export const cleanLogsVolumeAction = createAction<{ exploreId: ExploreId }>('explore/cleanLogsVolumeAction'); 118 119export interface StoreLogsVolumeDataSubscriptionPayload { 120 exploreId: ExploreId; 121 logsVolumeDataSubscription?: SubscriptionLike; 122} 123 124/** 125 * Stores current logs volume subscription for given explore pane. 126 */ 127const storeLogsVolumeDataSubscriptionAction = createAction<StoreLogsVolumeDataSubscriptionPayload>( 128 'explore/storeLogsVolumeDataSubscriptionAction' 129); 130 131/** 132 * Stores data returned by the provider. Used internally by loadLogsVolumeData(). 133 */ 134const updateLogsVolumeDataAction = createAction<{ 135 exploreId: ExploreId; 136 logsVolumeData: DataQueryResponse; 137}>('explore/updateLogsVolumeDataAction'); 138 139export interface QueryEndedPayload { 140 exploreId: ExploreId; 141 response: ExplorePanelData; 142} 143export const queryStreamUpdatedAction = createAction<QueryEndedPayload>('explore/queryStreamUpdated'); 144 145/** 146 * Reset queries to the given queries. Any modifications will be discarded. 147 * Use this action for clicks on query examples. Triggers a query run. 148 */ 149export interface SetQueriesPayload { 150 exploreId: ExploreId; 151 queries: DataQuery[]; 152} 153export const setQueriesAction = createAction<SetQueriesPayload>('explore/setQueries'); 154 155export interface ChangeLoadingStatePayload { 156 exploreId: ExploreId; 157 loadingState: LoadingState; 158} 159export const changeLoadingStateAction = createAction<ChangeLoadingStatePayload>('changeLoadingState'); 160 161export interface SetPausedStatePayload { 162 exploreId: ExploreId; 163 isPaused: boolean; 164} 165export const setPausedStateAction = createAction<SetPausedStatePayload>('explore/setPausedState'); 166 167/** 168 * Start a scan for more results using the given scanner. 169 * @param exploreId Explore area 170 * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range 171 */ 172export interface ScanStartPayload { 173 exploreId: ExploreId; 174} 175export const scanStartAction = createAction<ScanStartPayload>('explore/scanStart'); 176 177/** 178 * Stop any scanning for more results. 179 */ 180export interface ScanStopPayload { 181 exploreId: ExploreId; 182} 183export const scanStopAction = createAction<ScanStopPayload>('explore/scanStop'); 184 185/** 186 * Adds query results to cache. 187 * This is currently used to cache last 5 query results for log queries run from logs navigation (pagination). 188 */ 189export interface AddResultsToCachePayload { 190 exploreId: ExploreId; 191 cacheKey: string; 192 queryResponse: PanelData; 193} 194export const addResultsToCacheAction = createAction<AddResultsToCachePayload>('explore/addResultsToCache'); 195 196/** 197 * Clears cache. 198 */ 199export interface ClearCachePayload { 200 exploreId: ExploreId; 201} 202export const clearCacheAction = createAction<ClearCachePayload>('explore/clearCache'); 203 204// 205// Action creators 206// 207 208/** 209 * Adds a query row after the row with the given index. 210 */ 211export function addQueryRow(exploreId: ExploreId, index: number): ThunkResult<void> { 212 return (dispatch, getState) => { 213 const queries = getState().explore[exploreId]!.queries; 214 const query = generateEmptyQuery(queries, index); 215 216 dispatch(addQueryRowAction({ exploreId, index, query })); 217 }; 218} 219 220/** 221 * Cancel running queries 222 */ 223export function cancelQueries(exploreId: ExploreId): ThunkResult<void> { 224 return (dispatch, getState) => { 225 dispatch(scanStopAction({ exploreId })); 226 dispatch(cancelQueriesAction({ exploreId })); 227 dispatch( 228 storeLogsVolumeDataProviderAction({ 229 exploreId, 230 logsVolumeDataProvider: undefined, 231 }) 232 ); 233 // clear any incomplete data 234 if (getState().explore[exploreId]!.logsVolumeData?.state !== LoadingState.Done) { 235 dispatch(cleanLogsVolumeAction({ exploreId })); 236 } 237 dispatch(stateSave()); 238 }; 239} 240 241/** 242 * Import queries from previous datasource if possible eg Loki and Prometheus have similar query language so the 243 * labels part can be reused to get similar data. 244 * @param exploreId 245 * @param queries 246 * @param sourceDataSource 247 * @param targetDataSource 248 */ 249export const importQueries = ( 250 exploreId: ExploreId, 251 queries: DataQuery[], 252 sourceDataSource: DataSourceApi | undefined | null, 253 targetDataSource: DataSourceApi 254): ThunkResult<void> => { 255 return async (dispatch) => { 256 if (!sourceDataSource) { 257 // explore not initialized 258 dispatch(queriesImportedAction({ exploreId, queries })); 259 return; 260 } 261 262 let importedQueries = queries; 263 // Check if queries can be imported from previously selected datasource 264 if (sourceDataSource.meta?.id === targetDataSource.meta?.id) { 265 // Keep same queries if same type of datasource, but delete datasource query property to prevent mismatch of new and old data source instance 266 importedQueries = queries.map(({ datasource, ...query }) => query); 267 } else if (targetDataSource.importQueries) { 268 // Datasource-specific importers 269 importedQueries = await targetDataSource.importQueries(queries, sourceDataSource); 270 } else { 271 // Default is blank queries 272 importedQueries = ensureQueries(); 273 } 274 275 const nextQueries = ensureQueries(importedQueries); 276 277 dispatch(queriesImportedAction({ exploreId, queries: nextQueries })); 278 }; 279}; 280 281/** 282 * Action to modify a query given a datasource-specific modifier action. 283 * @param exploreId Explore area 284 * @param modification Action object with a type, e.g., ADD_FILTER 285 * @param index Optional query row index. If omitted, the modification is applied to all query rows. 286 * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`. 287 */ 288export function modifyQueries( 289 exploreId: ExploreId, 290 modification: QueryFixAction, 291 modifier: any, 292 index?: number 293): ThunkResult<void> { 294 return (dispatch) => { 295 dispatch(modifyQueriesAction({ exploreId, modification, index, modifier })); 296 if (!modification.preventSubmit) { 297 dispatch(runQueries(exploreId)); 298 } 299 }; 300} 301 302/** 303 * Main action to run queries and dispatches sub-actions based on which result viewers are active 304 */ 305export const runQueries = ( 306 exploreId: ExploreId, 307 options?: { replaceUrl?: boolean; preserveCache?: boolean } 308): ThunkResult<void> => { 309 return (dispatch, getState) => { 310 dispatch(updateTime({ exploreId })); 311 312 // We always want to clear cache unless we explicitly pass preserveCache parameter 313 const preserveCache = options?.preserveCache === true; 314 if (!preserveCache) { 315 dispatch(clearCache(exploreId)); 316 } 317 318 const { richHistory } = getState().explore; 319 const exploreItemState = getState().explore[exploreId]!; 320 const { 321 datasourceInstance, 322 containerWidth, 323 isLive: live, 324 range, 325 scanning, 326 queryResponse, 327 querySubscription, 328 history, 329 refreshInterval, 330 absoluteRange, 331 cache, 332 logsVolumeDataProvider, 333 } = exploreItemState; 334 let newQuerySub; 335 336 const queries = exploreItemState.queries.map((query) => ({ 337 ...query, 338 datasource: query.datasource || datasourceInstance?.getRef(), 339 })); 340 341 const cachedValue = getResultsFromCache(cache, absoluteRange); 342 343 // If we have results saved in cache, we are going to use those results instead of running queries 344 if (cachedValue) { 345 newQuerySub = of(cachedValue) 346 .pipe( 347 mergeMap((data: PanelData) => 348 decorateData(data, queryResponse, absoluteRange, refreshInterval, queries, !!logsVolumeDataProvider) 349 ) 350 ) 351 .subscribe((data) => { 352 if (!data.error) { 353 dispatch(stateSave()); 354 } 355 356 dispatch(queryStreamUpdatedAction({ exploreId, response: data })); 357 }); 358 359 // If we don't have results saved in cache, run new queries 360 } else { 361 if (!hasNonEmptyQuery(queries)) { 362 dispatch(stateSave({ replace: options?.replaceUrl })); // Remember to save to state and update location 363 return; 364 } 365 366 if (!datasourceInstance) { 367 return; 368 } 369 370 // Some datasource's query builders allow per-query interval limits, 371 // but we're using the datasource interval limit for now 372 const minInterval = datasourceInstance?.interval; 373 374 stopQueryState(querySubscription); 375 376 const datasourceId = datasourceInstance?.meta.id; 377 378 const queryOptions: QueryOptions = { 379 minInterval, 380 // maxDataPoints is used in: 381 // Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that. 382 // Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit. 383 // Influx - used to correctly display logs in graph 384 // TODO:unification 385 // maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth, 386 maxDataPoints: containerWidth, 387 liveStreaming: live, 388 }; 389 390 const datasourceName = datasourceInstance.name; 391 const timeZone = getTimeZone(getState().user); 392 const transaction = buildQueryTransaction(exploreId, queries, queryOptions, range, scanning, timeZone); 393 394 let querySaved = false; 395 dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Loading })); 396 397 newQuerySub = runRequest(datasourceInstance, transaction.request) 398 .pipe( 399 // Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and 400 // rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user 401 // actually can see what is happening. 402 live ? throttleTime(500) : identity, 403 mergeMap((data: PanelData) => 404 decorateData( 405 data, 406 queryResponse, 407 absoluteRange, 408 refreshInterval, 409 queries, 410 !!getState().explore[exploreId]!.logsVolumeDataProvider 411 ) 412 ) 413 ) 414 .subscribe( 415 (data) => { 416 if (data.state !== LoadingState.Loading && !data.error && !querySaved) { 417 // Side-effect: Saving history in localstorage 418 const nextHistory = updateHistory(history, datasourceId, queries); 419 const { richHistory: nextRichHistory, localStorageFull, limitExceeded } = addToRichHistory( 420 richHistory || [], 421 datasourceId, 422 datasourceName, 423 queries, 424 false, 425 '', 426 '', 427 !getState().explore.localStorageFull, 428 !getState().explore.richHistoryLimitExceededWarningShown 429 ); 430 dispatch(historyUpdatedAction({ exploreId, history: nextHistory })); 431 dispatch(richHistoryUpdatedAction({ richHistory: nextRichHistory })); 432 if (localStorageFull) { 433 dispatch(localStorageFullAction()); 434 } 435 if (limitExceeded) { 436 dispatch(richHistoryLimitExceededAction()); 437 } 438 439 // We save queries to the URL here so that only successfully run queries change the URL. 440 dispatch(stateSave({ replace: options?.replaceUrl })); 441 querySaved = true; 442 } 443 444 dispatch(queryStreamUpdatedAction({ exploreId, response: data })); 445 446 // Keep scanning for results if this was the last scanning transaction 447 if (getState().explore[exploreId]!.scanning) { 448 if (data.state === LoadingState.Done && data.series.length === 0) { 449 const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range); 450 dispatch(updateTime({ exploreId, absoluteRange: range })); 451 dispatch(runQueries(exploreId)); 452 } else { 453 // We can stop scanning if we have a result 454 dispatch(scanStopAction({ exploreId })); 455 } 456 } 457 }, 458 (error) => { 459 dispatch(notifyApp(createErrorNotification('Query processing error', error))); 460 dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error })); 461 console.error(error); 462 } 463 ); 464 465 if (live) { 466 dispatch( 467 storeLogsVolumeDataProviderAction({ 468 exploreId, 469 logsVolumeDataProvider: undefined, 470 }) 471 ); 472 dispatch(cleanLogsVolumeAction({ exploreId })); 473 } else if (hasLogsVolumeSupport(datasourceInstance)) { 474 const logsVolumeDataProvider = datasourceInstance.getLogsVolumeDataProvider(transaction.request); 475 dispatch( 476 storeLogsVolumeDataProviderAction({ 477 exploreId, 478 logsVolumeDataProvider, 479 }) 480 ); 481 const { logsVolumeData, absoluteRange } = getState().explore[exploreId]!; 482 if (!canReuseLogsVolumeData(logsVolumeData, queries, absoluteRange)) { 483 dispatch(cleanLogsVolumeAction({ exploreId })); 484 dispatch(loadLogsVolumeData(exploreId)); 485 } 486 } else { 487 dispatch( 488 storeLogsVolumeDataProviderAction({ 489 exploreId, 490 logsVolumeDataProvider: undefined, 491 }) 492 ); 493 } 494 } 495 496 dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub })); 497 }; 498}; 499 500/** 501 * Checks if after changing the time range the existing data can be used to show logs volume. 502 * It can happen if queries are the same and new time range is within existing data time range. 503 */ 504function canReuseLogsVolumeData( 505 logsVolumeData: DataQueryResponse | undefined, 506 queries: DataQuery[], 507 selectedTimeRange: AbsoluteTimeRange 508): boolean { 509 if (logsVolumeData && logsVolumeData.data[0]) { 510 // check if queries are the same 511 if (!deepEqual(logsVolumeData.data[0].meta?.custom?.targets, queries)) { 512 return false; 513 } 514 const dataRange = logsVolumeData && logsVolumeData.data[0] && logsVolumeData.data[0].meta?.custom?.absoluteRange; 515 // if selected range is within loaded logs volume 516 if (dataRange && dataRange.from <= selectedTimeRange.from && selectedTimeRange.to <= dataRange.to) { 517 return true; 518 } 519 } 520 return false; 521} 522 523/** 524 * Reset queries to the given queries. Any modifications will be discarded. 525 * Use this action for clicks on query examples. Triggers a query run. 526 */ 527export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): ThunkResult<void> { 528 return (dispatch, getState) => { 529 // Inject react keys into query objects 530 const queries = getState().explore[exploreId]!.queries; 531 const nextQueries = rawQueries.map((query, index) => generateNewKeyAndAddRefIdIfMissing(query, queries, index)); 532 dispatch(setQueriesAction({ exploreId, queries: nextQueries })); 533 dispatch(runQueries(exploreId)); 534 }; 535} 536 537/** 538 * Start a scan for more results using the given scanner. 539 * @param exploreId Explore area 540 * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range 541 */ 542export function scanStart(exploreId: ExploreId): ThunkResult<void> { 543 return (dispatch, getState) => { 544 // Register the scanner 545 dispatch(scanStartAction({ exploreId })); 546 // Scanning must trigger query run, and return the new range 547 const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range); 548 // Set the new range to be displayed 549 dispatch(updateTime({ exploreId, absoluteRange: range })); 550 dispatch(runQueries(exploreId)); 551 }; 552} 553 554export function addResultsToCache(exploreId: ExploreId): ThunkResult<void> { 555 return (dispatch, getState) => { 556 const queryResponse = getState().explore[exploreId]!.queryResponse; 557 const absoluteRange = getState().explore[exploreId]!.absoluteRange; 558 const cacheKey = createCacheKey(absoluteRange); 559 560 // Save results to cache only when all results recived and loading is done 561 if (queryResponse.state === LoadingState.Done) { 562 dispatch(addResultsToCacheAction({ exploreId, cacheKey, queryResponse })); 563 } 564 }; 565} 566 567export function clearCache(exploreId: ExploreId): ThunkResult<void> { 568 return (dispatch, getState) => { 569 dispatch(clearCacheAction({ exploreId })); 570 }; 571} 572 573/** 574 * Initializes loading logs volume data and stores emitted value. 575 */ 576export function loadLogsVolumeData(exploreId: ExploreId): ThunkResult<void> { 577 return (dispatch, getState) => { 578 const { logsVolumeDataProvider } = getState().explore[exploreId]!; 579 if (logsVolumeDataProvider) { 580 const logsVolumeDataSubscription = logsVolumeDataProvider.subscribe({ 581 next: (logsVolumeData: DataQueryResponse) => { 582 dispatch(updateLogsVolumeDataAction({ exploreId, logsVolumeData })); 583 }, 584 }); 585 dispatch(storeLogsVolumeDataSubscriptionAction({ exploreId, logsVolumeDataSubscription })); 586 } 587 }; 588} 589 590// 591// Reducer 592// 593 594// Redux Toolkit uses ImmerJs as part of their solution to ensure that state objects are not mutated. 595// ImmerJs has an autoFreeze option that freezes objects from change which means this reducer can't be migrated to createSlice 596// because the state would become frozen and during run time we would get errors because flot (Graph lib) would try to mutate 597// the frozen state. 598// https://github.com/reduxjs/redux-toolkit/issues/242 599export const queryReducer = (state: ExploreItemState, action: AnyAction): ExploreItemState => { 600 if (addQueryRowAction.match(action)) { 601 const { queries } = state; 602 const { index, query } = action.payload; 603 604 // Add to queries, which will cause a new row to be rendered 605 const nextQueries = [...queries.slice(0, index + 1), { ...query }, ...queries.slice(index + 1)]; 606 607 return { 608 ...state, 609 queries: nextQueries, 610 queryKeys: getQueryKeys(nextQueries, state.datasourceInstance), 611 }; 612 } 613 614 if (changeQueriesAction.match(action)) { 615 const { queries } = action.payload; 616 617 return { 618 ...state, 619 queries, 620 }; 621 } 622 623 if (cancelQueriesAction.match(action)) { 624 stopQueryState(state.querySubscription); 625 626 return { 627 ...state, 628 loading: false, 629 }; 630 } 631 632 if (modifyQueriesAction.match(action)) { 633 const { queries } = state; 634 const { modification, index, modifier } = action.payload; 635 let nextQueries: DataQuery[]; 636 if (index === undefined) { 637 // Modify all queries 638 nextQueries = queries.map((query, i) => { 639 const nextQuery = modifier({ ...query }, modification); 640 return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i); 641 }); 642 } else { 643 // Modify query only at index 644 nextQueries = queries.map((query, i) => { 645 if (i === index) { 646 const nextQuery = modifier({ ...query }, modification); 647 return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i); 648 } 649 650 return query; 651 }); 652 } 653 return { 654 ...state, 655 queries: nextQueries, 656 queryKeys: getQueryKeys(nextQueries, state.datasourceInstance), 657 }; 658 } 659 660 if (setQueriesAction.match(action)) { 661 const { queries } = action.payload; 662 return { 663 ...state, 664 queries: queries.slice(), 665 queryKeys: getQueryKeys(queries, state.datasourceInstance), 666 }; 667 } 668 669 if (queriesImportedAction.match(action)) { 670 const { queries } = action.payload; 671 return { 672 ...state, 673 queries, 674 queryKeys: getQueryKeys(queries, state.datasourceInstance), 675 }; 676 } 677 678 if (queryStoreSubscriptionAction.match(action)) { 679 const { querySubscription } = action.payload; 680 return { 681 ...state, 682 querySubscription, 683 }; 684 } 685 686 if (storeLogsVolumeDataProviderAction.match(action)) { 687 let { logsVolumeDataProvider } = action.payload; 688 if (state.logsVolumeDataSubscription) { 689 state.logsVolumeDataSubscription.unsubscribe(); 690 } 691 return { 692 ...state, 693 logsVolumeDataProvider, 694 logsVolumeDataSubscription: undefined, 695 }; 696 } 697 698 if (cleanLogsVolumeAction.match(action)) { 699 return { 700 ...state, 701 logsVolumeData: undefined, 702 }; 703 } 704 705 if (storeLogsVolumeDataSubscriptionAction.match(action)) { 706 const { logsVolumeDataSubscription } = action.payload; 707 return { 708 ...state, 709 logsVolumeDataSubscription, 710 }; 711 } 712 713 if (updateLogsVolumeDataAction.match(action)) { 714 let { logsVolumeData } = action.payload; 715 716 return { 717 ...state, 718 logsVolumeData, 719 }; 720 } 721 722 if (queryStreamUpdatedAction.match(action)) { 723 return processQueryResponse(state, action); 724 } 725 726 if (queriesImportedAction.match(action)) { 727 const { queries } = action.payload; 728 return { 729 ...state, 730 queries, 731 queryKeys: getQueryKeys(queries, state.datasourceInstance), 732 }; 733 } 734 735 if (changeLoadingStateAction.match(action)) { 736 const { loadingState } = action.payload; 737 return { 738 ...state, 739 queryResponse: { 740 ...state.queryResponse, 741 state: loadingState, 742 }, 743 loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming, 744 }; 745 } 746 747 if (setPausedStateAction.match(action)) { 748 const { isPaused } = action.payload; 749 return { 750 ...state, 751 isPaused: isPaused, 752 }; 753 } 754 755 if (scanStartAction.match(action)) { 756 return { ...state, scanning: true }; 757 } 758 759 if (scanStopAction.match(action)) { 760 return { 761 ...state, 762 scanning: false, 763 scanRange: undefined, 764 }; 765 } 766 767 if (addResultsToCacheAction.match(action)) { 768 const CACHE_LIMIT = 5; 769 const { cache } = state; 770 const { queryResponse, cacheKey } = action.payload; 771 772 let newCache = [...cache]; 773 const isDuplicateKey = newCache.some((c) => c.key === cacheKey); 774 775 if (!isDuplicateKey) { 776 const newCacheItem = { key: cacheKey, value: queryResponse }; 777 newCache = [newCacheItem, ...newCache].slice(0, CACHE_LIMIT); 778 } 779 780 return { 781 ...state, 782 cache: newCache, 783 }; 784 } 785 786 if (clearCacheAction.match(action)) { 787 return { 788 ...state, 789 cache: [], 790 }; 791 } 792 793 return state; 794}; 795 796export const processQueryResponse = ( 797 state: ExploreItemState, 798 action: PayloadAction<QueryEndedPayload> 799): ExploreItemState => { 800 const { response } = action.payload; 801 const { 802 request, 803 state: loadingState, 804 series, 805 error, 806 graphResult, 807 logsResult, 808 tableResult, 809 traceFrames, 810 nodeGraphFrames, 811 } = response; 812 813 if (error) { 814 if (error.type === DataQueryErrorType.Timeout) { 815 return { 816 ...state, 817 queryResponse: response, 818 loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming, 819 }; 820 } else if (error.type === DataQueryErrorType.Cancelled) { 821 return state; 822 } 823 824 // Send error to Angular editors 825 if (state.datasourceInstance?.components?.QueryCtrl) { 826 state.eventBridge.emit(PanelEvents.dataError, error); 827 } 828 } 829 830 if (!request) { 831 return { ...state }; 832 } 833 834 // Send legacy data to Angular editors 835 if (state.datasourceInstance?.components?.QueryCtrl) { 836 const legacy = series.map((v) => toLegacyResponseData(v)); 837 state.eventBridge.emit(PanelEvents.dataReceived, legacy); 838 } 839 840 return { 841 ...state, 842 queryResponse: response, 843 graphResult, 844 tableResult, 845 logsResult, 846 loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming, 847 showLogs: !!logsResult, 848 showMetrics: !!graphResult, 849 showTable: !!tableResult, 850 showTrace: !!traceFrames.length, 851 showNodeGraph: !!nodeGraphFrames.length, 852 }; 853}; 854