1package querier 2 3import ( 4 "context" 5 "errors" 6 "flag" 7 "fmt" 8 "strings" 9 "sync" 10 "time" 11 12 "github.com/go-kit/log" 13 "github.com/go-kit/log/level" 14 "github.com/grafana/dskit/flagext" 15 "github.com/prometheus/client_golang/prometheus" 16 "github.com/prometheus/common/model" 17 "github.com/prometheus/prometheus/pkg/labels" 18 "github.com/prometheus/prometheus/promql" 19 "github.com/prometheus/prometheus/storage" 20 "github.com/thanos-io/thanos/pkg/strutil" 21 "golang.org/x/sync/errgroup" 22 23 "github.com/cortexproject/cortex/pkg/chunk" 24 "github.com/cortexproject/cortex/pkg/chunk/purger" 25 "github.com/cortexproject/cortex/pkg/querier/batch" 26 "github.com/cortexproject/cortex/pkg/querier/chunkstore" 27 "github.com/cortexproject/cortex/pkg/querier/iterators" 28 "github.com/cortexproject/cortex/pkg/querier/lazyquery" 29 "github.com/cortexproject/cortex/pkg/querier/series" 30 "github.com/cortexproject/cortex/pkg/tenant" 31 "github.com/cortexproject/cortex/pkg/util" 32 "github.com/cortexproject/cortex/pkg/util/limiter" 33 "github.com/cortexproject/cortex/pkg/util/spanlogger" 34 "github.com/cortexproject/cortex/pkg/util/validation" 35) 36 37// Config contains the configuration require to create a querier 38type Config struct { 39 MaxConcurrent int `yaml:"max_concurrent"` 40 Timeout time.Duration `yaml:"timeout"` 41 Iterators bool `yaml:"iterators"` 42 BatchIterators bool `yaml:"batch_iterators"` 43 IngesterStreaming bool `yaml:"ingester_streaming"` 44 MaxSamples int `yaml:"max_samples"` 45 QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` 46 QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"` 47 AtModifierEnabled bool `yaml:"at_modifier_enabled"` 48 49 // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. 50 QueryStoreAfter time.Duration `yaml:"query_store_after"` 51 MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"` 52 53 // The default evaluation interval for the promql engine. 54 // Needs to be configured for subqueries to work as it is the default 55 // step if not specified. 56 DefaultEvaluationInterval time.Duration `yaml:"default_evaluation_interval"` 57 58 // Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!). 59 // ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup. 60 // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL 61 // engine. 62 ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"` 63 // LookbackDelta determines the time since the last sample after which a time 64 // series is considered stale. 65 LookbackDelta time.Duration `yaml:"lookback_delta"` 66 67 // Blocks storage only. 68 StoreGatewayAddresses string `yaml:"store_gateway_addresses"` 69 StoreGatewayClient ClientConfig `yaml:"store_gateway_client"` 70 71 SecondStoreEngine string `yaml:"second_store_engine"` 72 UseSecondStoreBeforeTime flagext.Time `yaml:"use_second_store_before_time"` 73 74 ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` 75} 76 77var ( 78 errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent") 79 errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'") 80 errEmptyTimeRange = errors.New("empty time range") 81) 82 83// RegisterFlags adds the flags required to config this to the given FlagSet. 84func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 85 cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f) 86 f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") 87 f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") 88 f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.") 89 f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", true, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.") 90 f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", true, "Use streaming RPCs to query ingester.") 91 f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") 92 f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") 93 f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.") 94 f.BoolVar(&cfg.AtModifierEnabled, "querier.at-modifier-enabled", false, "Enable the @ modifier in PromQL.") 95 f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") 96 f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") 97 f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") 98 f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.") 99 f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") 100 f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") 101 f.StringVar(&cfg.SecondStoreEngine, "querier.second-store-engine", "", "Second store engine to use for querying. Empty = disabled.") 102 f.Var(&cfg.UseSecondStoreBeforeTime, "querier.use-second-store-before-time", "If specified, second store is only used for queries before this timestamp. Default value 0 means secondary store is always queried.") 103 f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") 104} 105 106// Validate the config 107func (cfg *Config) Validate() error { 108 // Ensure the config wont create a situation where no queriers are returned. 109 if cfg.QueryIngestersWithin != 0 && cfg.QueryStoreAfter != 0 { 110 if cfg.QueryStoreAfter >= cfg.QueryIngestersWithin { 111 return errBadLookbackConfigs 112 } 113 } 114 115 if cfg.ShuffleShardingIngestersLookbackPeriod > 0 { 116 if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter { 117 return errShuffleShardingLookbackLessThanQueryStoreAfter 118 } 119 } 120 121 return nil 122} 123 124func (cfg *Config) GetStoreGatewayAddresses() []string { 125 if cfg.StoreGatewayAddresses == "" { 126 return nil 127 } 128 129 return strings.Split(cfg.StoreGatewayAddresses, ",") 130} 131 132func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { 133 if cfg.BatchIterators { 134 return batch.NewChunkMergeIterator 135 } else if cfg.Iterators { 136 return iterators.NewChunkMergeIterator 137 } 138 return mergeChunks 139} 140 141// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store. 142func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable { 143 return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg)) 144} 145 146// New builds a queryable and promql engine. 147func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader *purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, *promql.Engine) { 148 iteratorFunc := getChunksIteratorFunction(cfg) 149 150 distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, iteratorFunc, cfg.QueryIngestersWithin) 151 152 ns := make([]QueryableWithFilter, len(stores)) 153 for ix, s := range stores { 154 ns[ix] = storeQueryable{ 155 QueryableWithFilter: s, 156 QueryStoreAfter: cfg.QueryStoreAfter, 157 } 158 } 159 queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader) 160 exemplarQueryable := newDistributorExemplarQueryable(distributor) 161 162 lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { 163 querier, err := queryable.Querier(ctx, mint, maxt) 164 if err != nil { 165 return nil, err 166 } 167 return lazyquery.NewLazyQuerier(querier), nil 168 }) 169 170 engine := promql.NewEngine(promql.EngineOpts{ 171 Logger: logger, 172 Reg: reg, 173 ActiveQueryTracker: createActiveQueryTracker(cfg, logger), 174 MaxSamples: cfg.MaxSamples, 175 Timeout: cfg.Timeout, 176 LookbackDelta: cfg.LookbackDelta, 177 EnableAtModifier: cfg.AtModifierEnabled, 178 NoStepSubqueryIntervalFn: func(int64) int64 { 179 return cfg.DefaultEvaluationInterval.Milliseconds() 180 }, 181 }) 182 return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, engine 183} 184 185// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a 186// Queryable with a ChunkQueryable stub, that errors once it get's called. 187func NewSampleAndChunkQueryable(q storage.Queryable) storage.SampleAndChunkQueryable { 188 return &sampleAndChunkQueryable{q} 189} 190 191type sampleAndChunkQueryable struct { 192 storage.Queryable 193} 194 195func (q *sampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { 196 return nil, errors.New("ChunkQuerier not implemented") 197} 198 199func createActiveQueryTracker(cfg Config, logger log.Logger) *promql.ActiveQueryTracker { 200 dir := cfg.ActiveQueryTrackerDir 201 202 if dir != "" { 203 return promql.NewActiveQueryTracker(dir, cfg.MaxConcurrent, logger) 204 } 205 206 return nil 207} 208 209// QueryableWithFilter extends Queryable interface with `UseQueryable` filtering function. 210type QueryableWithFilter interface { 211 storage.Queryable 212 213 // UseQueryable returns true if this queryable should be used to satisfy the query for given time range. 214 // Query min and max time are in milliseconds since epoch. 215 UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool 216} 217 218// NewQueryable creates a new Queryable for cortex. 219func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader *purger.TombstonesLoader) storage.Queryable { 220 return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { 221 now := time.Now() 222 223 userID, err := tenant.TenantID(ctx) 224 if err != nil { 225 return nil, err 226 } 227 228 ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID))) 229 230 mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) 231 if err == errEmptyTimeRange { 232 return storage.NoopQuerier(), nil 233 } else if err != nil { 234 return nil, err 235 } 236 237 q := querier{ 238 ctx: ctx, 239 mint: mint, 240 maxt: maxt, 241 chunkIterFn: chunkIterFn, 242 tombstonesLoader: tombstonesLoader, 243 limits: limits, 244 maxQueryIntoFuture: cfg.MaxQueryIntoFuture, 245 queryStoreForLabels: cfg.QueryStoreForLabels, 246 } 247 248 dqr, err := distributor.Querier(ctx, mint, maxt) 249 if err != nil { 250 return nil, err 251 } 252 253 q.metadataQuerier = dqr 254 255 if distributor.UseQueryable(now, mint, maxt) { 256 q.queriers = append(q.queriers, dqr) 257 } 258 259 for _, s := range stores { 260 if !s.UseQueryable(now, mint, maxt) { 261 continue 262 } 263 264 cqr, err := s.Querier(ctx, mint, maxt) 265 if err != nil { 266 return nil, err 267 } 268 269 q.queriers = append(q.queriers, cqr) 270 } 271 272 return q, nil 273 }) 274} 275 276type querier struct { 277 // used for labels and metadata queries 278 metadataQuerier storage.Querier 279 280 // used for selecting series 281 queriers []storage.Querier 282 283 chunkIterFn chunkIteratorFunc 284 ctx context.Context 285 mint, maxt int64 286 287 tombstonesLoader *purger.TombstonesLoader 288 limits *validation.Overrides 289 maxQueryIntoFuture time.Duration 290 queryStoreForLabels bool 291} 292 293// Select implements storage.Querier interface. 294// The bool passed is ignored because the series is always sorted. 295func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { 296 log, ctx := spanlogger.New(q.ctx, "querier.Select") 297 defer log.Span.Finish() 298 299 if sp != nil { 300 level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers) 301 } 302 303 if sp == nil { 304 // if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt 305 sp = &storage.SelectHints{Start: q.mint, End: q.maxt} 306 } else if sp.Func == "series" && !q.queryStoreForLabels { 307 // Else if the querier receives a 'series' query, it means only metadata is needed. 308 // Here we expect that metadataQuerier querier will handle that. 309 // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". 310 // See: https://github.com/prometheus/prometheus/pull/8050 311 312 // In this case, the query time range has already been validated when the querier has been 313 // created. 314 return q.metadataQuerier.Select(true, sp, matchers...) 315 } 316 317 userID, err := tenant.TenantID(ctx) 318 if err != nil { 319 return storage.ErrSeriesSet(err) 320 } 321 322 // Validate query time range. Even if the time range has already been validated when we created 323 // the querier, we need to check it again here because the time range specified in hints may be 324 // different. 325 startMs, endMs, err := validateQueryTimeRange(ctx, userID, sp.Start, sp.End, q.limits, q.maxQueryIntoFuture) 326 if err == errEmptyTimeRange { 327 return storage.NoopSeriesSet() 328 } else if err != nil { 329 return storage.ErrSeriesSet(err) 330 } 331 332 // The time range may have been manipulated during the validation, 333 // so we make sure changes are reflected back to hints. 334 sp.Start = startMs 335 sp.End = endMs 336 337 startTime := model.Time(startMs) 338 endTime := model.Time(endMs) 339 340 // Validate query time range. This validation should be done only for instant / range queries and 341 // NOT for metadata queries (series, labels) because the query-frontend doesn't support splitting 342 // of such queries. 343 if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength { 344 limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength)) 345 return storage.ErrSeriesSet(limitErr) 346 } 347 348 tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, startTime, endTime) 349 if err != nil { 350 return storage.ErrSeriesSet(err) 351 } 352 353 if len(q.queriers) == 1 { 354 seriesSet := q.queriers[0].Select(true, sp, matchers...) 355 356 if tombstones.Len() != 0 { 357 seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime}) 358 } 359 360 return seriesSet 361 } 362 363 sets := make(chan storage.SeriesSet, len(q.queriers)) 364 for _, querier := range q.queriers { 365 go func(querier storage.Querier) { 366 sets <- querier.Select(true, sp, matchers...) 367 }(querier) 368 } 369 370 var result []storage.SeriesSet 371 for range q.queriers { 372 select { 373 case set := <-sets: 374 result = append(result, set) 375 case <-ctx.Done(): 376 return storage.ErrSeriesSet(ctx.Err()) 377 } 378 } 379 380 // we have all the sets from different sources (chunk from store, chunks from ingesters, 381 // time series from store and time series from ingesters). 382 // mergeSeriesSets will return sorted set. 383 seriesSet := q.mergeSeriesSets(result) 384 385 if tombstones.Len() != 0 { 386 seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime}) 387 } 388 return seriesSet 389} 390 391// LabelsValue implements storage.Querier. 392func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { 393 if !q.queryStoreForLabels { 394 return q.metadataQuerier.LabelValues(name, matchers...) 395 } 396 397 if len(q.queriers) == 1 { 398 return q.queriers[0].LabelValues(name, matchers...) 399 } 400 401 var ( 402 g, _ = errgroup.WithContext(q.ctx) 403 sets = [][]string{} 404 warnings = storage.Warnings(nil) 405 406 resMtx sync.Mutex 407 ) 408 409 for _, querier := range q.queriers { 410 // Need to reassign as the original variable will change and can't be relied on in a goroutine. 411 querier := querier 412 g.Go(func() error { 413 // NB: Values are sorted in Cortex already. 414 myValues, myWarnings, err := querier.LabelValues(name, matchers...) 415 if err != nil { 416 return err 417 } 418 419 resMtx.Lock() 420 sets = append(sets, myValues) 421 warnings = append(warnings, myWarnings...) 422 resMtx.Unlock() 423 424 return nil 425 }) 426 } 427 428 err := g.Wait() 429 if err != nil { 430 return nil, nil, err 431 } 432 433 return strutil.MergeSlices(sets...), warnings, nil 434} 435 436func (q querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { 437 if !q.queryStoreForLabels { 438 return q.metadataQuerier.LabelNames(matchers...) 439 } 440 441 if len(q.queriers) == 1 { 442 return q.queriers[0].LabelNames(matchers...) 443 } 444 445 var ( 446 g, _ = errgroup.WithContext(q.ctx) 447 sets = [][]string{} 448 warnings = storage.Warnings(nil) 449 450 resMtx sync.Mutex 451 ) 452 453 for _, querier := range q.queriers { 454 // Need to reassign as the original variable will change and can't be relied on in a goroutine. 455 querier := querier 456 g.Go(func() error { 457 // NB: Names are sorted in Cortex already. 458 myNames, myWarnings, err := querier.LabelNames(matchers...) 459 if err != nil { 460 return err 461 } 462 463 resMtx.Lock() 464 sets = append(sets, myNames) 465 warnings = append(warnings, myWarnings...) 466 resMtx.Unlock() 467 468 return nil 469 }) 470 } 471 472 err := g.Wait() 473 if err != nil { 474 return nil, nil, err 475 } 476 477 return strutil.MergeSlices(sets...), warnings, nil 478} 479 480func (querier) Close() error { 481 return nil 482} 483 484func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet { 485 // Here we deal with sets that are based on chunks and build single set from them. 486 // Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet 487 488 otherSets := []storage.SeriesSet(nil) 489 chunks := []chunk.Chunk(nil) 490 491 for _, set := range sets { 492 nonChunkSeries := []storage.Series(nil) 493 494 // SeriesSet may have some series backed up by chunks, and some not. 495 for set.Next() { 496 s := set.At() 497 498 if sc, ok := s.(SeriesWithChunks); ok { 499 chunks = append(chunks, sc.Chunks()...) 500 } else { 501 nonChunkSeries = append(nonChunkSeries, s) 502 } 503 } 504 505 if err := set.Err(); err != nil { 506 otherSets = append(otherSets, storage.ErrSeriesSet(err)) 507 } else if len(nonChunkSeries) > 0 { 508 otherSets = append(otherSets, &sliceSeriesSet{series: nonChunkSeries, ix: -1}) 509 } 510 } 511 512 if len(chunks) == 0 { 513 return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge) 514 } 515 516 // partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet 517 chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn) 518 519 if len(otherSets) == 0 { 520 return chunksSet 521 } 522 523 otherSets = append(otherSets, chunksSet) 524 return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge) 525} 526 527type sliceSeriesSet struct { 528 series []storage.Series 529 ix int 530} 531 532func (s *sliceSeriesSet) Next() bool { 533 s.ix++ 534 return s.ix < len(s.series) 535} 536 537func (s *sliceSeriesSet) At() storage.Series { 538 if s.ix < 0 || s.ix >= len(s.series) { 539 return nil 540 } 541 return s.series[s.ix] 542} 543 544func (s *sliceSeriesSet) Err() error { 545 return nil 546} 547 548func (s *sliceSeriesSet) Warnings() storage.Warnings { 549 return nil 550} 551 552type storeQueryable struct { 553 QueryableWithFilter 554 QueryStoreAfter time.Duration 555} 556 557func (s storeQueryable) UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool { 558 // Include this store only if mint is within QueryStoreAfter w.r.t current time. 559 if s.QueryStoreAfter != 0 && queryMinT > util.TimeToMillis(now.Add(-s.QueryStoreAfter)) { 560 return false 561 } 562 return s.QueryableWithFilter.UseQueryable(now, queryMinT, queryMaxT) 563} 564 565type alwaysTrueFilterQueryable struct { 566 storage.Queryable 567} 568 569func (alwaysTrueFilterQueryable) UseQueryable(_ time.Time, _, _ int64) bool { 570 return true 571} 572 573// Wraps storage.Queryable into QueryableWithFilter, with no query filtering. 574func UseAlwaysQueryable(q storage.Queryable) QueryableWithFilter { 575 return alwaysTrueFilterQueryable{Queryable: q} 576} 577 578type useBeforeTimestampQueryable struct { 579 storage.Queryable 580 ts int64 // Timestamp in milliseconds 581} 582 583func (u useBeforeTimestampQueryable) UseQueryable(_ time.Time, queryMinT, _ int64) bool { 584 if u.ts == 0 { 585 return true 586 } 587 return queryMinT < u.ts 588} 589 590// Returns QueryableWithFilter, that is used only if query starts before given timestamp. 591// If timestamp is zero (time.IsZero), queryable is always used. 592func UseBeforeTimestampQueryable(queryable storage.Queryable, ts time.Time) QueryableWithFilter { 593 t := int64(0) 594 if !ts.IsZero() { 595 t = util.TimeToMillis(ts) 596 } 597 return useBeforeTimestampQueryable{ 598 Queryable: queryable, 599 ts: t, 600 } 601} 602 603func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs int64, limits *validation.Overrides, maxQueryIntoFuture time.Duration) (int64, int64, error) { 604 now := model.Now() 605 startTime := model.Time(startMs) 606 endTime := model.Time(endMs) 607 608 // Clamp time range based on max query into future. 609 if maxQueryIntoFuture > 0 && endTime.After(now.Add(maxQueryIntoFuture)) { 610 origEndTime := endTime 611 endTime = now.Add(maxQueryIntoFuture) 612 613 // Make sure to log it in traces to ease debugging. 614 level.Debug(spanlogger.FromContext(ctx)).Log( 615 "msg", "the end time of the query has been manipulated because of the 'max query into future' setting", 616 "original", util.FormatTimeModel(origEndTime), 617 "updated", util.FormatTimeModel(endTime)) 618 619 if endTime.Before(startTime) { 620 return 0, 0, errEmptyTimeRange 621 } 622 } 623 624 // Clamp the time range based on the max query lookback. 625 if maxQueryLookback := limits.MaxQueryLookback(userID); maxQueryLookback > 0 && startTime.Before(now.Add(-maxQueryLookback)) { 626 origStartTime := startTime 627 startTime = now.Add(-maxQueryLookback) 628 629 // Make sure to log it in traces to ease debugging. 630 level.Debug(spanlogger.FromContext(ctx)).Log( 631 "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", 632 "original", util.FormatTimeModel(origStartTime), 633 "updated", util.FormatTimeModel(startTime)) 634 635 if endTime.Before(startTime) { 636 return 0, 0, errEmptyTimeRange 637 } 638 } 639 640 return int64(startTime), int64(endTime), nil 641} 642