1package querier
2
3import (
4	"context"
5
6	"github.com/prometheus/common/model"
7	"github.com/prometheus/prometheus/pkg/labels"
8	"github.com/prometheus/prometheus/storage"
9	"github.com/prometheus/prometheus/tsdb/chunkenc"
10
11	"github.com/cortexproject/cortex/pkg/chunk"
12	"github.com/cortexproject/cortex/pkg/ingester/client"
13	"github.com/cortexproject/cortex/pkg/querier/chunkstore"
14	seriesset "github.com/cortexproject/cortex/pkg/querier/series"
15	"github.com/cortexproject/cortex/pkg/tenant"
16)
17
18type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
19
20func newChunkStoreQueryable(store chunkstore.ChunkStore, chunkIteratorFunc chunkIteratorFunc) storage.Queryable {
21	return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
22		return &chunkStoreQuerier{
23			store:             store,
24			chunkIteratorFunc: chunkIteratorFunc,
25			ctx:               ctx,
26			mint:              mint,
27			maxt:              maxt,
28		}, nil
29	})
30}
31
32type chunkStoreQuerier struct {
33	store             chunkstore.ChunkStore
34	chunkIteratorFunc chunkIteratorFunc
35	ctx               context.Context
36	mint, maxt        int64
37}
38
39// Select implements storage.Querier interface.
40// The bool passed is ignored because the series is always sorted.
41func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
42	userID, err := tenant.TenantID(q.ctx)
43	if err != nil {
44		return storage.ErrSeriesSet(err)
45	}
46
47	minT, maxT := q.mint, q.maxt
48	if sp != nil {
49		minT, maxT = sp.Start, sp.End
50	}
51
52	// We will hit this for /series lookup when -querier.query-store-for-labels-enabled is set.
53	// If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded.
54	// That flag is only to be set with blocks storage engine, and this is a protective measure.
55	if sp != nil && sp.Func == "series" {
56		return storage.EmptySeriesSet()
57	}
58
59	chunks, err := q.store.Get(q.ctx, userID, model.Time(minT), model.Time(maxT), matchers...)
60	if err != nil {
61		return storage.ErrSeriesSet(err)
62	}
63
64	return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc)
65}
66
67// Series in the returned set are sorted alphabetically by labels.
68func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
69	chunksBySeries := map[string][]chunk.Chunk{}
70	for _, c := range chunks {
71		key := client.LabelsToKeyString(c.Metric)
72		chunksBySeries[key] = append(chunksBySeries[key], c)
73	}
74
75	series := make([]storage.Series, 0, len(chunksBySeries))
76	for i := range chunksBySeries {
77		series = append(series, &chunkSeries{
78			labels:            chunksBySeries[i][0].Metric,
79			chunks:            chunksBySeries[i],
80			chunkIteratorFunc: iteratorFunc,
81			mint:              mint,
82			maxt:              maxt,
83		})
84	}
85
86	return seriesset.NewConcreteSeriesSet(series)
87}
88
89func (q *chunkStoreQuerier) LabelValues(name string, labels ...*labels.Matcher) ([]string, storage.Warnings, error) {
90	return nil, nil, nil
91}
92
93func (q *chunkStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
94	return nil, nil, nil
95}
96
97func (q *chunkStoreQuerier) Close() error {
98	return nil
99}
100
101// Implements SeriesWithChunks
102type chunkSeries struct {
103	labels            labels.Labels
104	chunks            []chunk.Chunk
105	chunkIteratorFunc chunkIteratorFunc
106	mint, maxt        int64
107}
108
109func (s *chunkSeries) Labels() labels.Labels {
110	return s.labels
111}
112
113// Iterator returns a new iterator of the data of the series.
114func (s *chunkSeries) Iterator() chunkenc.Iterator {
115	return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
116}
117
118// Chunks implements SeriesWithChunks interface.
119func (s *chunkSeries) Chunks() []chunk.Chunk {
120	return s.chunks
121}
122