1package querier
2
3import (
4	"context"
5	"errors"
6	"flag"
7	"fmt"
8	"strings"
9	"sync"
10	"time"
11
12	"github.com/go-kit/log"
13	"github.com/go-kit/log/level"
14	"github.com/grafana/dskit/flagext"
15	"github.com/prometheus/client_golang/prometheus"
16	"github.com/prometheus/common/model"
17	"github.com/prometheus/prometheus/pkg/labels"
18	"github.com/prometheus/prometheus/promql"
19	"github.com/prometheus/prometheus/storage"
20	"github.com/thanos-io/thanos/pkg/strutil"
21	"golang.org/x/sync/errgroup"
22
23	"github.com/cortexproject/cortex/pkg/chunk"
24	"github.com/cortexproject/cortex/pkg/chunk/purger"
25	"github.com/cortexproject/cortex/pkg/querier/batch"
26	"github.com/cortexproject/cortex/pkg/querier/chunkstore"
27	"github.com/cortexproject/cortex/pkg/querier/iterators"
28	"github.com/cortexproject/cortex/pkg/querier/lazyquery"
29	"github.com/cortexproject/cortex/pkg/querier/series"
30	"github.com/cortexproject/cortex/pkg/tenant"
31	"github.com/cortexproject/cortex/pkg/util"
32	"github.com/cortexproject/cortex/pkg/util/limiter"
33	"github.com/cortexproject/cortex/pkg/util/spanlogger"
34	"github.com/cortexproject/cortex/pkg/util/validation"
35)
36
37// Config contains the configuration require to create a querier
38type Config struct {
39	MaxConcurrent        int           `yaml:"max_concurrent"`
40	Timeout              time.Duration `yaml:"timeout"`
41	Iterators            bool          `yaml:"iterators"`
42	BatchIterators       bool          `yaml:"batch_iterators"`
43	IngesterStreaming    bool          `yaml:"ingester_streaming"`
44	MaxSamples           int           `yaml:"max_samples"`
45	QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
46	QueryStoreForLabels  bool          `yaml:"query_store_for_labels_enabled"`
47	AtModifierEnabled    bool          `yaml:"at_modifier_enabled"`
48
49	// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
50	QueryStoreAfter    time.Duration `yaml:"query_store_after"`
51	MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"`
52
53	// The default evaluation interval for the promql engine.
54	// Needs to be configured for subqueries to work as it is the default
55	// step if not specified.
56	DefaultEvaluationInterval time.Duration `yaml:"default_evaluation_interval"`
57
58	// Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!).
59	// ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup.
60	// However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL
61	// engine.
62	ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"`
63	// LookbackDelta determines the time since the last sample after which a time
64	// series is considered stale.
65	LookbackDelta time.Duration `yaml:"lookback_delta"`
66
67	// Blocks storage only.
68	StoreGatewayAddresses string       `yaml:"store_gateway_addresses"`
69	StoreGatewayClient    ClientConfig `yaml:"store_gateway_client"`
70
71	SecondStoreEngine        string       `yaml:"second_store_engine"`
72	UseSecondStoreBeforeTime flagext.Time `yaml:"use_second_store_before_time"`
73
74	ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"`
75}
76
77var (
78	errBadLookbackConfigs                             = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
79	errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'")
80	errEmptyTimeRange                                 = errors.New("empty time range")
81)
82
83// RegisterFlags adds the flags required to config this to the given FlagSet.
84func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
85	cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f)
86	f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
87	f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
88	f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.")
89	f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", true, "Use batch iterators to execute query, as opposed to fully materialising the series in memory.  Takes precedent over the -querier.iterators flag.")
90	f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", true, "Use streaming RPCs to query ingester.")
91	f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
92	f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
93	f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.")
94	f.BoolVar(&cfg.AtModifierEnabled, "querier.at-modifier-enabled", false, "Enable the @ modifier in PromQL.")
95	f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
96	f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
97	f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
98	f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
99	f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
100	f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
101	f.StringVar(&cfg.SecondStoreEngine, "querier.second-store-engine", "", "Second store engine to use for querying. Empty = disabled.")
102	f.Var(&cfg.UseSecondStoreBeforeTime, "querier.use-second-store-before-time", "If specified, second store is only used for queries before this timestamp. Default value 0 means secondary store is always queried.")
103	f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
104}
105
106// Validate the config
107func (cfg *Config) Validate() error {
108	// Ensure the config wont create a situation where no queriers are returned.
109	if cfg.QueryIngestersWithin != 0 && cfg.QueryStoreAfter != 0 {
110		if cfg.QueryStoreAfter >= cfg.QueryIngestersWithin {
111			return errBadLookbackConfigs
112		}
113	}
114
115	if cfg.ShuffleShardingIngestersLookbackPeriod > 0 {
116		if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter {
117			return errShuffleShardingLookbackLessThanQueryStoreAfter
118		}
119	}
120
121	return nil
122}
123
124func (cfg *Config) GetStoreGatewayAddresses() []string {
125	if cfg.StoreGatewayAddresses == "" {
126		return nil
127	}
128
129	return strings.Split(cfg.StoreGatewayAddresses, ",")
130}
131
132func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
133	if cfg.BatchIterators {
134		return batch.NewChunkMergeIterator
135	} else if cfg.Iterators {
136		return iterators.NewChunkMergeIterator
137	}
138	return mergeChunks
139}
140
141// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store.
142func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable {
143	return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg))
144}
145
146// New builds a queryable and promql engine.
147func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader *purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, *promql.Engine) {
148	iteratorFunc := getChunksIteratorFunction(cfg)
149
150	distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, iteratorFunc, cfg.QueryIngestersWithin)
151
152	ns := make([]QueryableWithFilter, len(stores))
153	for ix, s := range stores {
154		ns[ix] = storeQueryable{
155			QueryableWithFilter: s,
156			QueryStoreAfter:     cfg.QueryStoreAfter,
157		}
158	}
159	queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader)
160	exemplarQueryable := newDistributorExemplarQueryable(distributor)
161
162	lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
163		querier, err := queryable.Querier(ctx, mint, maxt)
164		if err != nil {
165			return nil, err
166		}
167		return lazyquery.NewLazyQuerier(querier), nil
168	})
169
170	engine := promql.NewEngine(promql.EngineOpts{
171		Logger:             logger,
172		Reg:                reg,
173		ActiveQueryTracker: createActiveQueryTracker(cfg, logger),
174		MaxSamples:         cfg.MaxSamples,
175		Timeout:            cfg.Timeout,
176		LookbackDelta:      cfg.LookbackDelta,
177		EnableAtModifier:   cfg.AtModifierEnabled,
178		NoStepSubqueryIntervalFn: func(int64) int64 {
179			return cfg.DefaultEvaluationInterval.Milliseconds()
180		},
181	})
182	return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, engine
183}
184
185// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a
186// Queryable with a ChunkQueryable stub, that errors once it get's called.
187func NewSampleAndChunkQueryable(q storage.Queryable) storage.SampleAndChunkQueryable {
188	return &sampleAndChunkQueryable{q}
189}
190
191type sampleAndChunkQueryable struct {
192	storage.Queryable
193}
194
195func (q *sampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
196	return nil, errors.New("ChunkQuerier not implemented")
197}
198
199func createActiveQueryTracker(cfg Config, logger log.Logger) *promql.ActiveQueryTracker {
200	dir := cfg.ActiveQueryTrackerDir
201
202	if dir != "" {
203		return promql.NewActiveQueryTracker(dir, cfg.MaxConcurrent, logger)
204	}
205
206	return nil
207}
208
209// QueryableWithFilter extends Queryable interface with `UseQueryable` filtering function.
210type QueryableWithFilter interface {
211	storage.Queryable
212
213	// UseQueryable returns true if this queryable should be used to satisfy the query for given time range.
214	// Query min and max time are in milliseconds since epoch.
215	UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool
216}
217
218// NewQueryable creates a new Queryable for cortex.
219func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader *purger.TombstonesLoader) storage.Queryable {
220	return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
221		now := time.Now()
222
223		userID, err := tenant.TenantID(ctx)
224		if err != nil {
225			return nil, err
226		}
227
228		ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID)))
229
230		mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
231		if err == errEmptyTimeRange {
232			return storage.NoopQuerier(), nil
233		} else if err != nil {
234			return nil, err
235		}
236
237		q := querier{
238			ctx:                 ctx,
239			mint:                mint,
240			maxt:                maxt,
241			chunkIterFn:         chunkIterFn,
242			tombstonesLoader:    tombstonesLoader,
243			limits:              limits,
244			maxQueryIntoFuture:  cfg.MaxQueryIntoFuture,
245			queryStoreForLabels: cfg.QueryStoreForLabels,
246		}
247
248		dqr, err := distributor.Querier(ctx, mint, maxt)
249		if err != nil {
250			return nil, err
251		}
252
253		q.metadataQuerier = dqr
254
255		if distributor.UseQueryable(now, mint, maxt) {
256			q.queriers = append(q.queriers, dqr)
257		}
258
259		for _, s := range stores {
260			if !s.UseQueryable(now, mint, maxt) {
261				continue
262			}
263
264			cqr, err := s.Querier(ctx, mint, maxt)
265			if err != nil {
266				return nil, err
267			}
268
269			q.queriers = append(q.queriers, cqr)
270		}
271
272		return q, nil
273	})
274}
275
276type querier struct {
277	// used for labels and metadata queries
278	metadataQuerier storage.Querier
279
280	// used for selecting series
281	queriers []storage.Querier
282
283	chunkIterFn chunkIteratorFunc
284	ctx         context.Context
285	mint, maxt  int64
286
287	tombstonesLoader    *purger.TombstonesLoader
288	limits              *validation.Overrides
289	maxQueryIntoFuture  time.Duration
290	queryStoreForLabels bool
291}
292
293// Select implements storage.Querier interface.
294// The bool passed is ignored because the series is always sorted.
295func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
296	log, ctx := spanlogger.New(q.ctx, "querier.Select")
297	defer log.Span.Finish()
298
299	if sp != nil {
300		level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers)
301	}
302
303	if sp == nil {
304		// if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt
305		sp = &storage.SelectHints{Start: q.mint, End: q.maxt}
306	} else if sp.Func == "series" && !q.queryStoreForLabels {
307		// Else if the querier receives a 'series' query, it means only metadata is needed.
308		// Here we expect that metadataQuerier querier will handle that.
309		// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
310		// See: https://github.com/prometheus/prometheus/pull/8050
311
312		// In this case, the query time range has already been validated when the querier has been
313		// created.
314		return q.metadataQuerier.Select(true, sp, matchers...)
315	}
316
317	userID, err := tenant.TenantID(ctx)
318	if err != nil {
319		return storage.ErrSeriesSet(err)
320	}
321
322	// Validate query time range. Even if the time range has already been validated when we created
323	// the querier, we need to check it again here because the time range specified in hints may be
324	// different.
325	startMs, endMs, err := validateQueryTimeRange(ctx, userID, sp.Start, sp.End, q.limits, q.maxQueryIntoFuture)
326	if err == errEmptyTimeRange {
327		return storage.NoopSeriesSet()
328	} else if err != nil {
329		return storage.ErrSeriesSet(err)
330	}
331
332	// The time range may have been manipulated during the validation,
333	// so we make sure changes are reflected back to hints.
334	sp.Start = startMs
335	sp.End = endMs
336
337	startTime := model.Time(startMs)
338	endTime := model.Time(endMs)
339
340	// Validate query time range. This validation should be done only for instant / range queries and
341	// NOT for metadata queries (series, labels) because the query-frontend doesn't support splitting
342	// of such queries.
343	if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength {
344		limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength))
345		return storage.ErrSeriesSet(limitErr)
346	}
347
348	tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, startTime, endTime)
349	if err != nil {
350		return storage.ErrSeriesSet(err)
351	}
352
353	if len(q.queriers) == 1 {
354		seriesSet := q.queriers[0].Select(true, sp, matchers...)
355
356		if tombstones.Len() != 0 {
357			seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
358		}
359
360		return seriesSet
361	}
362
363	sets := make(chan storage.SeriesSet, len(q.queriers))
364	for _, querier := range q.queriers {
365		go func(querier storage.Querier) {
366			sets <- querier.Select(true, sp, matchers...)
367		}(querier)
368	}
369
370	var result []storage.SeriesSet
371	for range q.queriers {
372		select {
373		case set := <-sets:
374			result = append(result, set)
375		case <-ctx.Done():
376			return storage.ErrSeriesSet(ctx.Err())
377		}
378	}
379
380	// we have all the sets from different sources (chunk from store, chunks from ingesters,
381	// time series from store and time series from ingesters).
382	// mergeSeriesSets will return sorted set.
383	seriesSet := q.mergeSeriesSets(result)
384
385	if tombstones.Len() != 0 {
386		seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
387	}
388	return seriesSet
389}
390
391// LabelsValue implements storage.Querier.
392func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
393	if !q.queryStoreForLabels {
394		return q.metadataQuerier.LabelValues(name, matchers...)
395	}
396
397	if len(q.queriers) == 1 {
398		return q.queriers[0].LabelValues(name, matchers...)
399	}
400
401	var (
402		g, _     = errgroup.WithContext(q.ctx)
403		sets     = [][]string{}
404		warnings = storage.Warnings(nil)
405
406		resMtx sync.Mutex
407	)
408
409	for _, querier := range q.queriers {
410		// Need to reassign as the original variable will change and can't be relied on in a goroutine.
411		querier := querier
412		g.Go(func() error {
413			// NB: Values are sorted in Cortex already.
414			myValues, myWarnings, err := querier.LabelValues(name, matchers...)
415			if err != nil {
416				return err
417			}
418
419			resMtx.Lock()
420			sets = append(sets, myValues)
421			warnings = append(warnings, myWarnings...)
422			resMtx.Unlock()
423
424			return nil
425		})
426	}
427
428	err := g.Wait()
429	if err != nil {
430		return nil, nil, err
431	}
432
433	return strutil.MergeSlices(sets...), warnings, nil
434}
435
436func (q querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
437	if !q.queryStoreForLabels {
438		return q.metadataQuerier.LabelNames(matchers...)
439	}
440
441	if len(q.queriers) == 1 {
442		return q.queriers[0].LabelNames(matchers...)
443	}
444
445	var (
446		g, _     = errgroup.WithContext(q.ctx)
447		sets     = [][]string{}
448		warnings = storage.Warnings(nil)
449
450		resMtx sync.Mutex
451	)
452
453	for _, querier := range q.queriers {
454		// Need to reassign as the original variable will change and can't be relied on in a goroutine.
455		querier := querier
456		g.Go(func() error {
457			// NB: Names are sorted in Cortex already.
458			myNames, myWarnings, err := querier.LabelNames(matchers...)
459			if err != nil {
460				return err
461			}
462
463			resMtx.Lock()
464			sets = append(sets, myNames)
465			warnings = append(warnings, myWarnings...)
466			resMtx.Unlock()
467
468			return nil
469		})
470	}
471
472	err := g.Wait()
473	if err != nil {
474		return nil, nil, err
475	}
476
477	return strutil.MergeSlices(sets...), warnings, nil
478}
479
480func (querier) Close() error {
481	return nil
482}
483
484func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
485	// Here we deal with sets that are based on chunks and build single set from them.
486	// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet
487
488	otherSets := []storage.SeriesSet(nil)
489	chunks := []chunk.Chunk(nil)
490
491	for _, set := range sets {
492		nonChunkSeries := []storage.Series(nil)
493
494		// SeriesSet may have some series backed up by chunks, and some not.
495		for set.Next() {
496			s := set.At()
497
498			if sc, ok := s.(SeriesWithChunks); ok {
499				chunks = append(chunks, sc.Chunks()...)
500			} else {
501				nonChunkSeries = append(nonChunkSeries, s)
502			}
503		}
504
505		if err := set.Err(); err != nil {
506			otherSets = append(otherSets, storage.ErrSeriesSet(err))
507		} else if len(nonChunkSeries) > 0 {
508			otherSets = append(otherSets, &sliceSeriesSet{series: nonChunkSeries, ix: -1})
509		}
510	}
511
512	if len(chunks) == 0 {
513		return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
514	}
515
516	// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
517	chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn)
518
519	if len(otherSets) == 0 {
520		return chunksSet
521	}
522
523	otherSets = append(otherSets, chunksSet)
524	return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
525}
526
527type sliceSeriesSet struct {
528	series []storage.Series
529	ix     int
530}
531
532func (s *sliceSeriesSet) Next() bool {
533	s.ix++
534	return s.ix < len(s.series)
535}
536
537func (s *sliceSeriesSet) At() storage.Series {
538	if s.ix < 0 || s.ix >= len(s.series) {
539		return nil
540	}
541	return s.series[s.ix]
542}
543
544func (s *sliceSeriesSet) Err() error {
545	return nil
546}
547
548func (s *sliceSeriesSet) Warnings() storage.Warnings {
549	return nil
550}
551
552type storeQueryable struct {
553	QueryableWithFilter
554	QueryStoreAfter time.Duration
555}
556
557func (s storeQueryable) UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool {
558	// Include this store only if mint is within QueryStoreAfter w.r.t current time.
559	if s.QueryStoreAfter != 0 && queryMinT > util.TimeToMillis(now.Add(-s.QueryStoreAfter)) {
560		return false
561	}
562	return s.QueryableWithFilter.UseQueryable(now, queryMinT, queryMaxT)
563}
564
565type alwaysTrueFilterQueryable struct {
566	storage.Queryable
567}
568
569func (alwaysTrueFilterQueryable) UseQueryable(_ time.Time, _, _ int64) bool {
570	return true
571}
572
573// Wraps storage.Queryable into QueryableWithFilter, with no query filtering.
574func UseAlwaysQueryable(q storage.Queryable) QueryableWithFilter {
575	return alwaysTrueFilterQueryable{Queryable: q}
576}
577
578type useBeforeTimestampQueryable struct {
579	storage.Queryable
580	ts int64 // Timestamp in milliseconds
581}
582
583func (u useBeforeTimestampQueryable) UseQueryable(_ time.Time, queryMinT, _ int64) bool {
584	if u.ts == 0 {
585		return true
586	}
587	return queryMinT < u.ts
588}
589
590// Returns QueryableWithFilter, that is used only if query starts before given timestamp.
591// If timestamp is zero (time.IsZero), queryable is always used.
592func UseBeforeTimestampQueryable(queryable storage.Queryable, ts time.Time) QueryableWithFilter {
593	t := int64(0)
594	if !ts.IsZero() {
595		t = util.TimeToMillis(ts)
596	}
597	return useBeforeTimestampQueryable{
598		Queryable: queryable,
599		ts:        t,
600	}
601}
602
603func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs int64, limits *validation.Overrides, maxQueryIntoFuture time.Duration) (int64, int64, error) {
604	now := model.Now()
605	startTime := model.Time(startMs)
606	endTime := model.Time(endMs)
607
608	// Clamp time range based on max query into future.
609	if maxQueryIntoFuture > 0 && endTime.After(now.Add(maxQueryIntoFuture)) {
610		origEndTime := endTime
611		endTime = now.Add(maxQueryIntoFuture)
612
613		// Make sure to log it in traces to ease debugging.
614		level.Debug(spanlogger.FromContext(ctx)).Log(
615			"msg", "the end time of the query has been manipulated because of the 'max query into future' setting",
616			"original", util.FormatTimeModel(origEndTime),
617			"updated", util.FormatTimeModel(endTime))
618
619		if endTime.Before(startTime) {
620			return 0, 0, errEmptyTimeRange
621		}
622	}
623
624	// Clamp the time range based on the max query lookback.
625	if maxQueryLookback := limits.MaxQueryLookback(userID); maxQueryLookback > 0 && startTime.Before(now.Add(-maxQueryLookback)) {
626		origStartTime := startTime
627		startTime = now.Add(-maxQueryLookback)
628
629		// Make sure to log it in traces to ease debugging.
630		level.Debug(spanlogger.FromContext(ctx)).Log(
631			"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
632			"original", util.FormatTimeModel(origStartTime),
633			"updated", util.FormatTimeModel(startTime))
634
635		if endTime.Before(startTime) {
636			return 0, 0, errEmptyTimeRange
637		}
638	}
639
640	return int64(startTime), int64(endTime), nil
641}
642