1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package query
5
6import (
7	"context"
8	"sort"
9	"strings"
10
11	"github.com/go-kit/kit/log"
12	"github.com/pkg/errors"
13	"github.com/prometheus/prometheus/pkg/labels"
14	"github.com/prometheus/prometheus/storage"
15	"github.com/thanos-io/thanos/pkg/store/storepb"
16	"github.com/thanos-io/thanos/pkg/tracing"
17)
18
19// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
20// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default.
21// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying
22// replicaLabels at query time.
23// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
24// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
25type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
26
27// NewQueryableCreator creates QueryableCreator.
28func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
29	return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
30		return &queryable{
31			logger:              logger,
32			replicaLabels:       replicaLabels,
33			proxy:               proxy,
34			deduplicate:         deduplicate,
35			maxResolutionMillis: maxResolutionMillis,
36			partialResponse:     partialResponse,
37			skipChunks:          skipChunks,
38		}
39	}
40}
41
42type queryable struct {
43	logger              log.Logger
44	replicaLabels       []string
45	proxy               storepb.StoreServer
46	deduplicate         bool
47	maxResolutionMillis int64
48	partialResponse     bool
49	skipChunks          bool
50}
51
52// Querier returns a new storage querier against the underlying proxy store API.
53func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
54	return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil
55}
56
57type querier struct {
58	ctx                 context.Context
59	logger              log.Logger
60	cancel              func()
61	mint, maxt          int64
62	replicaLabels       map[string]struct{}
63	proxy               storepb.StoreServer
64	deduplicate         bool
65	maxResolutionMillis int64
66	partialResponse     bool
67	skipChunks          bool
68}
69
70// newQuerier creates implementation of storage.Querier that fetches data from the proxy
71// store API endpoints.
72func newQuerier(
73	ctx context.Context,
74	logger log.Logger,
75	mint, maxt int64,
76	replicaLabels []string,
77	proxy storepb.StoreServer,
78	deduplicate bool,
79	maxResolutionMillis int64,
80	partialResponse bool,
81	skipChunks bool,
82) *querier {
83	if logger == nil {
84		logger = log.NewNopLogger()
85	}
86	ctx, cancel := context.WithCancel(ctx)
87
88	rl := make(map[string]struct{})
89	for _, replicaLabel := range replicaLabels {
90		rl[replicaLabel] = struct{}{}
91	}
92	return &querier{
93		ctx:                 ctx,
94		logger:              logger,
95		cancel:              cancel,
96		mint:                mint,
97		maxt:                maxt,
98		replicaLabels:       rl,
99		proxy:               proxy,
100		deduplicate:         deduplicate,
101		maxResolutionMillis: maxResolutionMillis,
102		partialResponse:     partialResponse,
103		skipChunks:          skipChunks,
104	}
105}
106
107func (q *querier) isDedupEnabled() bool {
108	return q.deduplicate && len(q.replicaLabels) > 0
109}
110
111type seriesServer struct {
112	// This field just exist to pseudo-implement the unused methods of the interface.
113	storepb.Store_SeriesServer
114	ctx context.Context
115
116	seriesSet []storepb.Series
117	warnings  []string
118}
119
120func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
121	if r.GetWarning() != "" {
122		s.warnings = append(s.warnings, r.GetWarning())
123		return nil
124	}
125
126	if r.GetSeries() == nil {
127		return errors.New("no seriesSet")
128	}
129	s.seriesSet = append(s.seriesSet, *r.GetSeries())
130	return nil
131}
132
133func (s *seriesServer) Context() context.Context {
134	return s.ctx
135}
136
137type resAggr int
138
139const (
140	resAggrAvg resAggr = iota
141	resAggrCount
142	resAggrSum
143	resAggrMin
144	resAggrMax
145	resAggrCounter
146)
147
148// aggrsFromFunc infers aggregates of the underlying data based on the wrapping
149// function of a series selection.
150func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
151	if f == "min" || strings.HasPrefix(f, "min_") {
152		return []storepb.Aggr{storepb.Aggr_MIN}, resAggrMin
153	}
154	if f == "max" || strings.HasPrefix(f, "max_") {
155		return []storepb.Aggr{storepb.Aggr_MAX}, resAggrMax
156	}
157	if f == "count" || strings.HasPrefix(f, "count_") {
158		return []storepb.Aggr{storepb.Aggr_COUNT}, resAggrCount
159	}
160	// f == "sum" falls through here since we want the actual samples.
161	if strings.HasPrefix(f, "sum_") {
162		return []storepb.Aggr{storepb.Aggr_SUM}, resAggrSum
163	}
164	if f == "increase" || f == "rate" {
165		return []storepb.Aggr{storepb.Aggr_COUNTER}, resAggrCounter
166	}
167	// In the default case, we retrieve count and sum to compute an average.
168	return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
169}
170
171func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
172	span, ctx := tracing.StartSpan(q.ctx, "querier_select")
173	defer span.Finish()
174
175	sms, err := translateMatchers(ms...)
176	if err != nil {
177		return nil, nil, errors.Wrap(err, "convert matchers")
178	}
179
180	if params == nil {
181		params = &storage.SelectParams{
182			Start: q.mint,
183			End:   q.maxt,
184		}
185	}
186	queryAggrs, resAggr := aggrsFromFunc(params.Func)
187
188	resp := &seriesServer{ctx: ctx}
189	if err := q.proxy.Series(&storepb.SeriesRequest{
190		MinTime:                 params.Start,
191		MaxTime:                 params.End,
192		Matchers:                sms,
193		MaxResolutionWindow:     q.maxResolutionMillis,
194		Aggregates:              queryAggrs,
195		PartialResponseDisabled: !q.partialResponse,
196		SkipChunks:              q.skipChunks,
197	}, resp); err != nil {
198		return nil, nil, errors.Wrap(err, "proxy Series()")
199	}
200
201	var warns storage.Warnings
202	for _, w := range resp.warnings {
203		warns = append(warns, errors.New(w))
204	}
205
206	if !q.isDedupEnabled() {
207		// Return data without any deduplication.
208		return &promSeriesSet{
209			mint: q.mint,
210			maxt: q.maxt,
211			set:  newStoreSeriesSet(resp.seriesSet),
212			aggr: resAggr,
213		}, warns, nil
214	}
215
216	// TODO(fabxc): this could potentially pushed further down into the store API
217	// to make true streaming possible.
218	sortDedupLabels(resp.seriesSet, q.replicaLabels)
219
220	set := &promSeriesSet{
221		mint: q.mint,
222		maxt: q.maxt,
223		set:  newStoreSeriesSet(resp.seriesSet),
224		aggr: resAggr,
225	}
226
227	// The merged series set assembles all potentially-overlapping time ranges
228	// of the same series into a single one. The series are ordered so that equal series
229	// from different replicas are sequential. We can now deduplicate those.
230	return newDedupSeriesSet(set, q.replicaLabels), warns, nil
231}
232
233// sortDedupLabels re-sorts the set so that the same series with different replica
234// labels are coming right after each other.
235func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
236	for _, s := range set {
237		// Move the replica labels to the very end.
238		sort.Slice(s.Labels, func(i, j int) bool {
239			if _, ok := replicaLabels[s.Labels[i].Name]; ok {
240				return false
241			}
242			if _, ok := replicaLabels[s.Labels[j].Name]; ok {
243				return true
244			}
245			return s.Labels[i].Name < s.Labels[j].Name
246		})
247	}
248	// With the re-ordered label sets, re-sorting all series aligns the same series
249	// from different replicas sequentially.
250	sort.Slice(set, func(i, j int) bool {
251		return storepb.CompareLabels(set[i].Labels, set[j].Labels) < 0
252	})
253}
254
255// LabelValues returns all potential values for a label name.
256func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
257	span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
258	defer span.Finish()
259
260	resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
261	if err != nil {
262		return nil, nil, errors.Wrap(err, "proxy LabelValues()")
263	}
264
265	var warns storage.Warnings
266	for _, w := range resp.Warnings {
267		warns = append(warns, errors.New(w))
268	}
269
270	return resp.Values, warns, nil
271}
272
273// LabelNames returns all the unique label names present in the block in sorted order.
274func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
275	span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
276	defer span.Finish()
277
278	resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
279	if err != nil {
280		return nil, nil, errors.Wrap(err, "proxy LabelNames()")
281	}
282
283	var warns storage.Warnings
284	for _, w := range resp.Warnings {
285		warns = append(warns, errors.New(w))
286	}
287
288	return resp.Names, warns, nil
289}
290
291func (q *querier) Close() error {
292	q.cancel()
293	return nil
294}
295