1package logql
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"time"
8
9	"github.com/cortexproject/cortex/pkg/querier/astmapper"
10	util_log "github.com/cortexproject/cortex/pkg/util/log"
11	"github.com/go-kit/kit/log/level"
12	"github.com/prometheus/prometheus/promql"
13
14	"github.com/grafana/loki/pkg/iter"
15	"github.com/grafana/loki/pkg/logqlmodel"
16	"github.com/grafana/loki/pkg/logqlmodel/stats"
17	"github.com/grafana/loki/pkg/util"
18)
19
20/*
21This includes a bunch of tooling for parallelization improvements based on backend shard factors.
22In schemas 10+ a shard factor (default 16) is introduced in the index store,
23calculated by hashing the label set of a log stream. This allows us to perform certain optimizations
24that fall under the umbrella of query remapping and querying shards individually.
25For instance, `{app="foo"} |= "bar"` can be executed on each shard independently, then reaggregated.
26There are also a class of optimizations that can be performed by altering a query into a functionally equivalent,
27but more parallelizable form. For instance, an average can be remapped into a sum/count,
28which can then take advantage of our sharded execution model.
29*/
30
31// ShardedEngine is an Engine implementation that can split queries into more parallelizable forms via
32// querying the underlying backend shards individually and reaggregating them.
33type ShardedEngine struct {
34	timeout        time.Duration
35	downstreamable Downstreamable
36	limits         Limits
37	metrics        *ShardingMetrics
38}
39
40// NewShardedEngine constructs a *ShardedEngine
41func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits) *ShardedEngine {
42	opts.applyDefault()
43	return &ShardedEngine{
44		timeout:        opts.Timeout,
45		downstreamable: downstreamable,
46		metrics:        metrics,
47		limits:         limits,
48	}
49}
50
51// Query constructs a Query
52func (ng *ShardedEngine) Query(p Params, mapped Expr) Query {
53	return &query{
54		timeout:   ng.timeout,
55		params:    p,
56		evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()),
57		parse: func(_ context.Context, _ string) (Expr, error) {
58			return mapped, nil
59		},
60		limits: ng.limits,
61	}
62}
63
64// DownstreamSampleExpr is a SampleExpr which signals downstream computation
65type DownstreamSampleExpr struct {
66	shard *astmapper.ShardAnnotation
67	SampleExpr
68}
69
70func (d DownstreamSampleExpr) String() string {
71	return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard)
72}
73
74// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation
75type DownstreamLogSelectorExpr struct {
76	shard *astmapper.ShardAnnotation
77	LogSelectorExpr
78}
79
80func (d DownstreamLogSelectorExpr) String() string {
81	return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard)
82}
83
84func (d DownstreamSampleExpr) Walk(f WalkFn) { f(d) }
85
86// ConcatSampleExpr is an expr for concatenating multiple SampleExpr
87// Contract: The embedded SampleExprs within a linked list of ConcatSampleExprs must be of the
88// same structure. This makes special implementations of SampleExpr.Associative() unnecessary.
89type ConcatSampleExpr struct {
90	DownstreamSampleExpr
91	next *ConcatSampleExpr
92}
93
94func (c ConcatSampleExpr) String() string {
95	if c.next == nil {
96		return c.DownstreamSampleExpr.String()
97	}
98
99	return fmt.Sprintf("%s ++ %s", c.DownstreamSampleExpr.String(), c.next.String())
100}
101
102func (c ConcatSampleExpr) Walk(f WalkFn) {
103	f(c)
104	f(c.next)
105}
106
107// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr
108type ConcatLogSelectorExpr struct {
109	DownstreamLogSelectorExpr
110	next *ConcatLogSelectorExpr
111}
112
113func (c ConcatLogSelectorExpr) String() string {
114	if c.next == nil {
115		return c.DownstreamLogSelectorExpr.String()
116	}
117
118	return fmt.Sprintf("%s ++ %s", c.DownstreamLogSelectorExpr.String(), c.next.String())
119}
120
121type Shards []astmapper.ShardAnnotation
122
123func (xs Shards) Encode() (encoded []string) {
124	for _, shard := range xs {
125		encoded = append(encoded, shard.String())
126	}
127
128	return encoded
129}
130
131// ParseShards parses a list of string encoded shards
132func ParseShards(strs []string) (Shards, error) {
133	if len(strs) == 0 {
134		return nil, nil
135	}
136	shards := make([]astmapper.ShardAnnotation, 0, len(strs))
137
138	for _, str := range strs {
139		shard, err := astmapper.ParseShard(str)
140		if err != nil {
141			return nil, err
142		}
143		shards = append(shards, shard)
144	}
145	return shards, nil
146}
147
148type Downstreamable interface {
149	Downstreamer() Downstreamer
150}
151
152type DownstreamQuery struct {
153	Expr   Expr
154	Params Params
155	Shards Shards
156}
157
158// Downstreamer is an interface for deferring responsibility for query execution.
159// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs.
160type Downstreamer interface {
161	Downstream(context.Context, []DownstreamQuery) ([]logqlmodel.Result, error)
162}
163
164// DownstreamEvaluator is an evaluator which handles shard aware AST nodes
165type DownstreamEvaluator struct {
166	Downstreamer
167	defaultEvaluator Evaluator
168}
169
170// Downstream runs queries and collects stats from the embedded Downstreamer
171func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) {
172	results, err := ev.Downstreamer.Downstream(ctx, queries)
173	if err != nil {
174		return nil, err
175	}
176
177	for _, res := range results {
178		if err := stats.JoinResults(ctx, res.Statistics); err != nil {
179			level.Warn(util_log.Logger).Log("msg", "unable to merge downstream results", "err", err)
180		}
181	}
182
183	return results, nil
184}
185
186type errorQuerier struct{}
187
188func (errorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) {
189	return nil, errors.New("Unimplemented")
190}
191
192func (errorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
193	return nil, errors.New("Unimplemented")
194}
195
196func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator {
197	return &DownstreamEvaluator{
198		Downstreamer:     downstreamer,
199		defaultEvaluator: NewDefaultEvaluator(&errorQuerier{}, 0),
200	}
201}
202
203// Evaluator returns a StepEvaluator for a given SampleExpr
204func (ev *DownstreamEvaluator) StepEvaluator(
205	ctx context.Context,
206	nextEv SampleEvaluator,
207	expr SampleExpr,
208	params Params,
209) (StepEvaluator, error) {
210	switch e := expr.(type) {
211
212	case DownstreamSampleExpr:
213		// downstream to a querier
214		var shards []astmapper.ShardAnnotation
215		if e.shard != nil {
216			shards = append(shards, *e.shard)
217		}
218		results, err := ev.Downstream(ctx, []DownstreamQuery{{
219			Expr:   e.SampleExpr,
220			Params: params,
221			Shards: shards,
222		}})
223		if err != nil {
224			return nil, err
225		}
226		return ResultStepEvaluator(results[0], params)
227
228	case *ConcatSampleExpr:
229		cur := e
230		var queries []DownstreamQuery
231		for cur != nil {
232			qry := DownstreamQuery{
233				Expr:   cur.DownstreamSampleExpr.SampleExpr,
234				Params: params,
235			}
236			if shard := cur.DownstreamSampleExpr.shard; shard != nil {
237				qry.Shards = Shards{*shard}
238			}
239			queries = append(queries, qry)
240			cur = cur.next
241		}
242
243		results, err := ev.Downstream(ctx, queries)
244		if err != nil {
245			return nil, err
246		}
247
248		xs := make([]StepEvaluator, 0, len(queries))
249		for i, res := range results {
250			stepper, err := ResultStepEvaluator(res, params)
251			if err != nil {
252				level.Warn(util_log.Logger).Log(
253					"msg", "could not extract StepEvaluator",
254					"err", err,
255					"expr", queries[i].Expr.String(),
256				)
257				return nil, err
258			}
259			xs = append(xs, stepper)
260		}
261
262		return ConcatEvaluator(xs)
263
264	default:
265		return ev.defaultEvaluator.StepEvaluator(ctx, nextEv, e, params)
266	}
267}
268
269// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
270func (ev *DownstreamEvaluator) Iterator(
271	ctx context.Context,
272	expr LogSelectorExpr,
273	params Params,
274) (iter.EntryIterator, error) {
275	switch e := expr.(type) {
276	case DownstreamLogSelectorExpr:
277		// downstream to a querier
278		var shards Shards
279		if e.shard != nil {
280			shards = append(shards, *e.shard)
281		}
282		results, err := ev.Downstream(ctx, []DownstreamQuery{{
283			Expr:   e.LogSelectorExpr,
284			Params: params,
285			Shards: shards,
286		}})
287		if err != nil {
288			return nil, err
289		}
290		return ResultIterator(results[0], params)
291
292	case *ConcatLogSelectorExpr:
293		cur := e
294		var queries []DownstreamQuery
295		for cur != nil {
296			qry := DownstreamQuery{
297				Expr:   cur.DownstreamLogSelectorExpr.LogSelectorExpr,
298				Params: params,
299			}
300			if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil {
301				qry.Shards = Shards{*shard}
302			}
303			queries = append(queries, qry)
304			cur = cur.next
305		}
306
307		results, err := ev.Downstream(ctx, queries)
308		if err != nil {
309			return nil, err
310		}
311
312		xs := make([]iter.EntryIterator, 0, len(queries))
313		for i, res := range results {
314			iter, err := ResultIterator(res, params)
315			if err != nil {
316				level.Warn(util_log.Logger).Log(
317					"msg", "could not extract Iterator",
318					"err", err,
319					"expr", queries[i].Expr.String(),
320				)
321			}
322			xs = append(xs, iter)
323		}
324
325		return iter.NewHeapIterator(ctx, xs, params.Direction()), nil
326
327	default:
328		return nil, EvaluatorUnsupportedType(expr, ev)
329	}
330}
331
332// ConcatEvaluator joins multiple StepEvaluators.
333// Contract: They must be of identical start, end, and step values.
334func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) {
335	return newStepEvaluator(
336		func() (ok bool, ts int64, vec promql.Vector) {
337			var cur promql.Vector
338			for _, eval := range evaluators {
339				ok, ts, cur = eval.Next()
340				vec = append(vec, cur...)
341			}
342			return ok, ts, vec
343		},
344		func() (lastErr error) {
345			for _, eval := range evaluators {
346				if err := eval.Close(); err != nil {
347					lastErr = err
348				}
349			}
350			return lastErr
351		},
352		func() error {
353			var errs []error
354			for _, eval := range evaluators {
355				if err := eval.Error(); err != nil {
356					errs = append(errs, err)
357				}
358			}
359			switch len(errs) {
360			case 0:
361				return nil
362			case 1:
363				return errs[0]
364			default:
365				return util.MultiError(errs)
366			}
367		},
368	)
369}
370
371// ResultStepEvaluator coerces a downstream vector or matrix into a StepEvaluator
372func ResultStepEvaluator(res logqlmodel.Result, params Params) (StepEvaluator, error) {
373	var (
374		start = params.Start()
375		end   = params.End()
376		step  = params.Step()
377	)
378
379	switch data := res.Data.(type) {
380	case promql.Vector:
381		var exhausted bool
382		return newStepEvaluator(func() (bool, int64, promql.Vector) {
383			if !exhausted {
384				exhausted = true
385				return true, start.UnixNano() / int64(time.Millisecond), data
386			}
387			return false, 0, nil
388		}, nil, nil)
389	case promql.Matrix:
390		return NewMatrixStepper(start, end, step, data), nil
391	default:
392		return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
393	}
394}
395
396// ResultIterator coerces a downstream streams result into an iter.EntryIterator
397func ResultIterator(res logqlmodel.Result, params Params) (iter.EntryIterator, error) {
398	streams, ok := res.Data.(logqlmodel.Streams)
399	if !ok {
400		return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), logqlmodel.ValueTypeStreams)
401	}
402	return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil
403}
404