1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package query 5 6import ( 7 "context" 8 "sort" 9 "strings" 10 11 "github.com/go-kit/kit/log" 12 "github.com/pkg/errors" 13 "github.com/prometheus/prometheus/pkg/labels" 14 "github.com/prometheus/prometheus/storage" 15 "github.com/thanos-io/thanos/pkg/store/storepb" 16 "github.com/thanos-io/thanos/pkg/tracing" 17) 18 19// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. 20// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. 21// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying 22// replicaLabels at query time. 23// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). 24// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. 25type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable 26 27// NewQueryableCreator creates QueryableCreator. 28func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator { 29 return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { 30 return &queryable{ 31 logger: logger, 32 replicaLabels: replicaLabels, 33 proxy: proxy, 34 deduplicate: deduplicate, 35 maxResolutionMillis: maxResolutionMillis, 36 partialResponse: partialResponse, 37 skipChunks: skipChunks, 38 } 39 } 40} 41 42type queryable struct { 43 logger log.Logger 44 replicaLabels []string 45 proxy storepb.StoreServer 46 deduplicate bool 47 maxResolutionMillis int64 48 partialResponse bool 49 skipChunks bool 50} 51 52// Querier returns a new storage querier against the underlying proxy store API. 53func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { 54 return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil 55} 56 57type querier struct { 58 ctx context.Context 59 logger log.Logger 60 cancel func() 61 mint, maxt int64 62 replicaLabels map[string]struct{} 63 proxy storepb.StoreServer 64 deduplicate bool 65 maxResolutionMillis int64 66 partialResponse bool 67 skipChunks bool 68} 69 70// newQuerier creates implementation of storage.Querier that fetches data from the proxy 71// store API endpoints. 72func newQuerier( 73 ctx context.Context, 74 logger log.Logger, 75 mint, maxt int64, 76 replicaLabels []string, 77 proxy storepb.StoreServer, 78 deduplicate bool, 79 maxResolutionMillis int64, 80 partialResponse bool, 81 skipChunks bool, 82) *querier { 83 if logger == nil { 84 logger = log.NewNopLogger() 85 } 86 ctx, cancel := context.WithCancel(ctx) 87 88 rl := make(map[string]struct{}) 89 for _, replicaLabel := range replicaLabels { 90 rl[replicaLabel] = struct{}{} 91 } 92 return &querier{ 93 ctx: ctx, 94 logger: logger, 95 cancel: cancel, 96 mint: mint, 97 maxt: maxt, 98 replicaLabels: rl, 99 proxy: proxy, 100 deduplicate: deduplicate, 101 maxResolutionMillis: maxResolutionMillis, 102 partialResponse: partialResponse, 103 skipChunks: skipChunks, 104 } 105} 106 107func (q *querier) isDedupEnabled() bool { 108 return q.deduplicate && len(q.replicaLabels) > 0 109} 110 111type seriesServer struct { 112 // This field just exist to pseudo-implement the unused methods of the interface. 113 storepb.Store_SeriesServer 114 ctx context.Context 115 116 seriesSet []storepb.Series 117 warnings []string 118} 119 120func (s *seriesServer) Send(r *storepb.SeriesResponse) error { 121 if r.GetWarning() != "" { 122 s.warnings = append(s.warnings, r.GetWarning()) 123 return nil 124 } 125 126 if r.GetSeries() == nil { 127 return errors.New("no seriesSet") 128 } 129 s.seriesSet = append(s.seriesSet, *r.GetSeries()) 130 return nil 131} 132 133func (s *seriesServer) Context() context.Context { 134 return s.ctx 135} 136 137type resAggr int 138 139const ( 140 resAggrAvg resAggr = iota 141 resAggrCount 142 resAggrSum 143 resAggrMin 144 resAggrMax 145 resAggrCounter 146) 147 148// aggrsFromFunc infers aggregates of the underlying data based on the wrapping 149// function of a series selection. 150func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) { 151 if f == "min" || strings.HasPrefix(f, "min_") { 152 return []storepb.Aggr{storepb.Aggr_MIN}, resAggrMin 153 } 154 if f == "max" || strings.HasPrefix(f, "max_") { 155 return []storepb.Aggr{storepb.Aggr_MAX}, resAggrMax 156 } 157 if f == "count" || strings.HasPrefix(f, "count_") { 158 return []storepb.Aggr{storepb.Aggr_COUNT}, resAggrCount 159 } 160 // f == "sum" falls through here since we want the actual samples. 161 if strings.HasPrefix(f, "sum_") { 162 return []storepb.Aggr{storepb.Aggr_SUM}, resAggrSum 163 } 164 if f == "increase" || f == "rate" { 165 return []storepb.Aggr{storepb.Aggr_COUNTER}, resAggrCounter 166 } 167 // In the default case, we retrieve count and sum to compute an average. 168 return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg 169} 170 171func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { 172 span, ctx := tracing.StartSpan(q.ctx, "querier_select") 173 defer span.Finish() 174 175 sms, err := translateMatchers(ms...) 176 if err != nil { 177 return nil, nil, errors.Wrap(err, "convert matchers") 178 } 179 180 if params == nil { 181 params = &storage.SelectParams{ 182 Start: q.mint, 183 End: q.maxt, 184 } 185 } 186 queryAggrs, resAggr := aggrsFromFunc(params.Func) 187 188 resp := &seriesServer{ctx: ctx} 189 if err := q.proxy.Series(&storepb.SeriesRequest{ 190 MinTime: params.Start, 191 MaxTime: params.End, 192 Matchers: sms, 193 MaxResolutionWindow: q.maxResolutionMillis, 194 Aggregates: queryAggrs, 195 PartialResponseDisabled: !q.partialResponse, 196 SkipChunks: q.skipChunks, 197 }, resp); err != nil { 198 return nil, nil, errors.Wrap(err, "proxy Series()") 199 } 200 201 var warns storage.Warnings 202 for _, w := range resp.warnings { 203 warns = append(warns, errors.New(w)) 204 } 205 206 if !q.isDedupEnabled() { 207 // Return data without any deduplication. 208 return &promSeriesSet{ 209 mint: q.mint, 210 maxt: q.maxt, 211 set: newStoreSeriesSet(resp.seriesSet), 212 aggr: resAggr, 213 }, warns, nil 214 } 215 216 // TODO(fabxc): this could potentially pushed further down into the store API 217 // to make true streaming possible. 218 sortDedupLabels(resp.seriesSet, q.replicaLabels) 219 220 set := &promSeriesSet{ 221 mint: q.mint, 222 maxt: q.maxt, 223 set: newStoreSeriesSet(resp.seriesSet), 224 aggr: resAggr, 225 } 226 227 // The merged series set assembles all potentially-overlapping time ranges 228 // of the same series into a single one. The series are ordered so that equal series 229 // from different replicas are sequential. We can now deduplicate those. 230 return newDedupSeriesSet(set, q.replicaLabels), warns, nil 231} 232 233// sortDedupLabels re-sorts the set so that the same series with different replica 234// labels are coming right after each other. 235func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) { 236 for _, s := range set { 237 // Move the replica labels to the very end. 238 sort.Slice(s.Labels, func(i, j int) bool { 239 if _, ok := replicaLabels[s.Labels[i].Name]; ok { 240 return false 241 } 242 if _, ok := replicaLabels[s.Labels[j].Name]; ok { 243 return true 244 } 245 return s.Labels[i].Name < s.Labels[j].Name 246 }) 247 } 248 // With the re-ordered label sets, re-sorting all series aligns the same series 249 // from different replicas sequentially. 250 sort.Slice(set, func(i, j int) bool { 251 return storepb.CompareLabels(set[i].Labels, set[j].Labels) < 0 252 }) 253} 254 255// LabelValues returns all potential values for a label name. 256func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { 257 span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") 258 defer span.Finish() 259 260 resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse}) 261 if err != nil { 262 return nil, nil, errors.Wrap(err, "proxy LabelValues()") 263 } 264 265 var warns storage.Warnings 266 for _, w := range resp.Warnings { 267 warns = append(warns, errors.New(w)) 268 } 269 270 return resp.Values, warns, nil 271} 272 273// LabelNames returns all the unique label names present in the block in sorted order. 274func (q *querier) LabelNames() ([]string, storage.Warnings, error) { 275 span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") 276 defer span.Finish() 277 278 resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse}) 279 if err != nil { 280 return nil, nil, errors.Wrap(err, "proxy LabelNames()") 281 } 282 283 var warns storage.Warnings 284 for _, w := range resp.Warnings { 285 warns = append(warns, errors.New(w)) 286 } 287 288 return resp.Names, warns, nil 289} 290 291func (q *querier) Close() error { 292 q.cancel() 293 return nil 294} 295