1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package queryfrontend
5
6import (
7	"context"
8
9	"github.com/cortexproject/cortex/pkg/querier/queryrange"
10	"github.com/prometheus/client_golang/prometheus"
11	"github.com/prometheus/client_golang/prometheus/promauto"
12	"github.com/thanos-io/thanos/pkg/compact/downsample"
13)
14
15// DownsampledMiddleware creates a new Middleware that requests downsampled data
16// should response to original request with auto max_source_resolution not contain data points.
17func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
18	return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
19		return downsampled{
20			next:   next,
21			merger: merger,
22			additionalQueriesCount: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
23				Namespace: "thanos",
24				Name:      "frontend_downsampled_extra_queries_total",
25				Help:      "Total number of additional queries for downsampled data",
26			}),
27		}
28	})
29}
30
31type downsampled struct {
32	next   queryrange.Handler
33	merger queryrange.Merger
34
35	// Metrics.
36	additionalQueriesCount prometheus.Counter
37}
38
39var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2}
40
41func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
42	tqrr, ok := req.(*ThanosQueryRangeRequest)
43	if !ok || !tqrr.AutoDownsampling {
44		return d.next.Do(ctx, req)
45	}
46
47	var (
48		resps = make([]queryrange.Response, 0)
49		resp  queryrange.Response
50		err   error
51		i     int
52	)
53
54forLoop:
55	for i < len(resolutions) {
56		if i > 0 {
57			d.additionalQueriesCount.Inc()
58		}
59		r := *tqrr
60		resp, err = d.next.Do(ctx, &r)
61		if err != nil {
62			return nil, err
63		}
64		resps = append(resps, resp)
65		// Set MaxSourceResolution for next request, if any.
66		for i < len(resolutions) {
67			if tqrr.MaxSourceResolution < resolutions[i] {
68				tqrr.AutoDownsampling = false
69				tqrr.MaxSourceResolution = resolutions[i]
70				break
71			}
72			i++
73		}
74		m := minResponseTime(resp)
75		switch m {
76		case tqrr.Start: // Response not impacted by retention policy.
77			break forLoop
78		case -1: // Empty response, retry with higher MaxSourceResolution.
79			continue
80		default: // Data partially present, query for empty part with higher MaxSourceResolution.
81			tqrr.End = m - tqrr.Step
82		}
83		if tqrr.Start > tqrr.End {
84			break forLoop
85		}
86	}
87	response, err := d.merger.MergeResponse(resps...)
88	if err != nil {
89		return nil, err
90	}
91	return response, nil
92}
93
94// minResponseTime returns earliest timestamp in r.Data.Result.
95// -1 is returned if r contains no data points.
96// Each SampleStream within r.Data.Result must be sorted by timestamp.
97func minResponseTime(r queryrange.Response) int64 {
98	var res = r.(*queryrange.PrometheusResponse).Data.Result
99	if len(res) == 0 || len(res[0].Samples) == 0 {
100		return -1
101	}
102	var minTs = res[0].Samples[0].TimestampMs
103	for _, sampleStream := range res[1:] {
104		if ts := sampleStream.Samples[0].TimestampMs; ts < minTs {
105			minTs = ts
106		}
107	}
108	return minTs
109}
110