1package querier 2 3import ( 4 "context" 5 6 "github.com/prometheus/common/model" 7 "github.com/prometheus/prometheus/pkg/labels" 8 "github.com/prometheus/prometheus/storage" 9 "github.com/prometheus/prometheus/tsdb/chunkenc" 10 11 "github.com/cortexproject/cortex/pkg/chunk" 12 "github.com/cortexproject/cortex/pkg/ingester/client" 13 "github.com/cortexproject/cortex/pkg/querier/chunkstore" 14 seriesset "github.com/cortexproject/cortex/pkg/querier/series" 15 "github.com/cortexproject/cortex/pkg/tenant" 16) 17 18type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator 19 20func newChunkStoreQueryable(store chunkstore.ChunkStore, chunkIteratorFunc chunkIteratorFunc) storage.Queryable { 21 return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { 22 return &chunkStoreQuerier{ 23 store: store, 24 chunkIteratorFunc: chunkIteratorFunc, 25 ctx: ctx, 26 mint: mint, 27 maxt: maxt, 28 }, nil 29 }) 30} 31 32type chunkStoreQuerier struct { 33 store chunkstore.ChunkStore 34 chunkIteratorFunc chunkIteratorFunc 35 ctx context.Context 36 mint, maxt int64 37} 38 39// Select implements storage.Querier interface. 40// The bool passed is ignored because the series is always sorted. 41func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { 42 userID, err := tenant.TenantID(q.ctx) 43 if err != nil { 44 return storage.ErrSeriesSet(err) 45 } 46 47 minT, maxT := q.mint, q.maxt 48 if sp != nil { 49 minT, maxT = sp.Start, sp.End 50 } 51 52 // We will hit this for /series lookup when -querier.query-store-for-labels-enabled is set. 53 // If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded. 54 // That flag is only to be set with blocks storage engine, and this is a protective measure. 55 if sp != nil && sp.Func == "series" { 56 return storage.EmptySeriesSet() 57 } 58 59 chunks, err := q.store.Get(q.ctx, userID, model.Time(minT), model.Time(maxT), matchers...) 60 if err != nil { 61 return storage.ErrSeriesSet(err) 62 } 63 64 return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc) 65} 66 67// Series in the returned set are sorted alphabetically by labels. 68func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet { 69 chunksBySeries := map[string][]chunk.Chunk{} 70 for _, c := range chunks { 71 key := client.LabelsToKeyString(c.Metric) 72 chunksBySeries[key] = append(chunksBySeries[key], c) 73 } 74 75 series := make([]storage.Series, 0, len(chunksBySeries)) 76 for i := range chunksBySeries { 77 series = append(series, &chunkSeries{ 78 labels: chunksBySeries[i][0].Metric, 79 chunks: chunksBySeries[i], 80 chunkIteratorFunc: iteratorFunc, 81 mint: mint, 82 maxt: maxt, 83 }) 84 } 85 86 return seriesset.NewConcreteSeriesSet(series) 87} 88 89func (q *chunkStoreQuerier) LabelValues(name string, labels ...*labels.Matcher) ([]string, storage.Warnings, error) { 90 return nil, nil, nil 91} 92 93func (q *chunkStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { 94 return nil, nil, nil 95} 96 97func (q *chunkStoreQuerier) Close() error { 98 return nil 99} 100 101// Implements SeriesWithChunks 102type chunkSeries struct { 103 labels labels.Labels 104 chunks []chunk.Chunk 105 chunkIteratorFunc chunkIteratorFunc 106 mint, maxt int64 107} 108 109func (s *chunkSeries) Labels() labels.Labels { 110 return s.labels 111} 112 113// Iterator returns a new iterator of the data of the series. 114func (s *chunkSeries) Iterator() chunkenc.Iterator { 115 return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt)) 116} 117 118// Chunks implements SeriesWithChunks interface. 119func (s *chunkSeries) Chunks() []chunk.Chunk { 120 return s.chunks 121} 122