1package logql 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/cortexproject/cortex/pkg/querier/astmapper" 10 util_log "github.com/cortexproject/cortex/pkg/util/log" 11 "github.com/go-kit/kit/log/level" 12 "github.com/prometheus/prometheus/promql" 13 14 "github.com/grafana/loki/pkg/iter" 15 "github.com/grafana/loki/pkg/logqlmodel" 16 "github.com/grafana/loki/pkg/logqlmodel/stats" 17 "github.com/grafana/loki/pkg/util" 18) 19 20/* 21This includes a bunch of tooling for parallelization improvements based on backend shard factors. 22In schemas 10+ a shard factor (default 16) is introduced in the index store, 23calculated by hashing the label set of a log stream. This allows us to perform certain optimizations 24that fall under the umbrella of query remapping and querying shards individually. 25For instance, `{app="foo"} |= "bar"` can be executed on each shard independently, then reaggregated. 26There are also a class of optimizations that can be performed by altering a query into a functionally equivalent, 27but more parallelizable form. For instance, an average can be remapped into a sum/count, 28which can then take advantage of our sharded execution model. 29*/ 30 31// ShardedEngine is an Engine implementation that can split queries into more parallelizable forms via 32// querying the underlying backend shards individually and reaggregating them. 33type ShardedEngine struct { 34 timeout time.Duration 35 downstreamable Downstreamable 36 limits Limits 37 metrics *ShardingMetrics 38} 39 40// NewShardedEngine constructs a *ShardedEngine 41func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits) *ShardedEngine { 42 opts.applyDefault() 43 return &ShardedEngine{ 44 timeout: opts.Timeout, 45 downstreamable: downstreamable, 46 metrics: metrics, 47 limits: limits, 48 } 49} 50 51// Query constructs a Query 52func (ng *ShardedEngine) Query(p Params, mapped Expr) Query { 53 return &query{ 54 timeout: ng.timeout, 55 params: p, 56 evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()), 57 parse: func(_ context.Context, _ string) (Expr, error) { 58 return mapped, nil 59 }, 60 limits: ng.limits, 61 } 62} 63 64// DownstreamSampleExpr is a SampleExpr which signals downstream computation 65type DownstreamSampleExpr struct { 66 shard *astmapper.ShardAnnotation 67 SampleExpr 68} 69 70func (d DownstreamSampleExpr) String() string { 71 return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard) 72} 73 74// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation 75type DownstreamLogSelectorExpr struct { 76 shard *astmapper.ShardAnnotation 77 LogSelectorExpr 78} 79 80func (d DownstreamLogSelectorExpr) String() string { 81 return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard) 82} 83 84func (d DownstreamSampleExpr) Walk(f WalkFn) { f(d) } 85 86// ConcatSampleExpr is an expr for concatenating multiple SampleExpr 87// Contract: The embedded SampleExprs within a linked list of ConcatSampleExprs must be of the 88// same structure. This makes special implementations of SampleExpr.Associative() unnecessary. 89type ConcatSampleExpr struct { 90 DownstreamSampleExpr 91 next *ConcatSampleExpr 92} 93 94func (c ConcatSampleExpr) String() string { 95 if c.next == nil { 96 return c.DownstreamSampleExpr.String() 97 } 98 99 return fmt.Sprintf("%s ++ %s", c.DownstreamSampleExpr.String(), c.next.String()) 100} 101 102func (c ConcatSampleExpr) Walk(f WalkFn) { 103 f(c) 104 f(c.next) 105} 106 107// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr 108type ConcatLogSelectorExpr struct { 109 DownstreamLogSelectorExpr 110 next *ConcatLogSelectorExpr 111} 112 113func (c ConcatLogSelectorExpr) String() string { 114 if c.next == nil { 115 return c.DownstreamLogSelectorExpr.String() 116 } 117 118 return fmt.Sprintf("%s ++ %s", c.DownstreamLogSelectorExpr.String(), c.next.String()) 119} 120 121type Shards []astmapper.ShardAnnotation 122 123func (xs Shards) Encode() (encoded []string) { 124 for _, shard := range xs { 125 encoded = append(encoded, shard.String()) 126 } 127 128 return encoded 129} 130 131// ParseShards parses a list of string encoded shards 132func ParseShards(strs []string) (Shards, error) { 133 if len(strs) == 0 { 134 return nil, nil 135 } 136 shards := make([]astmapper.ShardAnnotation, 0, len(strs)) 137 138 for _, str := range strs { 139 shard, err := astmapper.ParseShard(str) 140 if err != nil { 141 return nil, err 142 } 143 shards = append(shards, shard) 144 } 145 return shards, nil 146} 147 148type Downstreamable interface { 149 Downstreamer() Downstreamer 150} 151 152type DownstreamQuery struct { 153 Expr Expr 154 Params Params 155 Shards Shards 156} 157 158// Downstreamer is an interface for deferring responsibility for query execution. 159// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. 160type Downstreamer interface { 161 Downstream(context.Context, []DownstreamQuery) ([]logqlmodel.Result, error) 162} 163 164// DownstreamEvaluator is an evaluator which handles shard aware AST nodes 165type DownstreamEvaluator struct { 166 Downstreamer 167 defaultEvaluator Evaluator 168} 169 170// Downstream runs queries and collects stats from the embedded Downstreamer 171func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) { 172 results, err := ev.Downstreamer.Downstream(ctx, queries) 173 if err != nil { 174 return nil, err 175 } 176 177 for _, res := range results { 178 if err := stats.JoinResults(ctx, res.Statistics); err != nil { 179 level.Warn(util_log.Logger).Log("msg", "unable to merge downstream results", "err", err) 180 } 181 } 182 183 return results, nil 184} 185 186type errorQuerier struct{} 187 188func (errorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { 189 return nil, errors.New("Unimplemented") 190} 191 192func (errorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { 193 return nil, errors.New("Unimplemented") 194} 195 196func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator { 197 return &DownstreamEvaluator{ 198 Downstreamer: downstreamer, 199 defaultEvaluator: NewDefaultEvaluator(&errorQuerier{}, 0), 200 } 201} 202 203// Evaluator returns a StepEvaluator for a given SampleExpr 204func (ev *DownstreamEvaluator) StepEvaluator( 205 ctx context.Context, 206 nextEv SampleEvaluator, 207 expr SampleExpr, 208 params Params, 209) (StepEvaluator, error) { 210 switch e := expr.(type) { 211 212 case DownstreamSampleExpr: 213 // downstream to a querier 214 var shards []astmapper.ShardAnnotation 215 if e.shard != nil { 216 shards = append(shards, *e.shard) 217 } 218 results, err := ev.Downstream(ctx, []DownstreamQuery{{ 219 Expr: e.SampleExpr, 220 Params: params, 221 Shards: shards, 222 }}) 223 if err != nil { 224 return nil, err 225 } 226 return ResultStepEvaluator(results[0], params) 227 228 case *ConcatSampleExpr: 229 cur := e 230 var queries []DownstreamQuery 231 for cur != nil { 232 qry := DownstreamQuery{ 233 Expr: cur.DownstreamSampleExpr.SampleExpr, 234 Params: params, 235 } 236 if shard := cur.DownstreamSampleExpr.shard; shard != nil { 237 qry.Shards = Shards{*shard} 238 } 239 queries = append(queries, qry) 240 cur = cur.next 241 } 242 243 results, err := ev.Downstream(ctx, queries) 244 if err != nil { 245 return nil, err 246 } 247 248 xs := make([]StepEvaluator, 0, len(queries)) 249 for i, res := range results { 250 stepper, err := ResultStepEvaluator(res, params) 251 if err != nil { 252 level.Warn(util_log.Logger).Log( 253 "msg", "could not extract StepEvaluator", 254 "err", err, 255 "expr", queries[i].Expr.String(), 256 ) 257 return nil, err 258 } 259 xs = append(xs, stepper) 260 } 261 262 return ConcatEvaluator(xs) 263 264 default: 265 return ev.defaultEvaluator.StepEvaluator(ctx, nextEv, e, params) 266 } 267} 268 269// Iterator returns the iter.EntryIterator for a given LogSelectorExpr 270func (ev *DownstreamEvaluator) Iterator( 271 ctx context.Context, 272 expr LogSelectorExpr, 273 params Params, 274) (iter.EntryIterator, error) { 275 switch e := expr.(type) { 276 case DownstreamLogSelectorExpr: 277 // downstream to a querier 278 var shards Shards 279 if e.shard != nil { 280 shards = append(shards, *e.shard) 281 } 282 results, err := ev.Downstream(ctx, []DownstreamQuery{{ 283 Expr: e.LogSelectorExpr, 284 Params: params, 285 Shards: shards, 286 }}) 287 if err != nil { 288 return nil, err 289 } 290 return ResultIterator(results[0], params) 291 292 case *ConcatLogSelectorExpr: 293 cur := e 294 var queries []DownstreamQuery 295 for cur != nil { 296 qry := DownstreamQuery{ 297 Expr: cur.DownstreamLogSelectorExpr.LogSelectorExpr, 298 Params: params, 299 } 300 if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil { 301 qry.Shards = Shards{*shard} 302 } 303 queries = append(queries, qry) 304 cur = cur.next 305 } 306 307 results, err := ev.Downstream(ctx, queries) 308 if err != nil { 309 return nil, err 310 } 311 312 xs := make([]iter.EntryIterator, 0, len(queries)) 313 for i, res := range results { 314 iter, err := ResultIterator(res, params) 315 if err != nil { 316 level.Warn(util_log.Logger).Log( 317 "msg", "could not extract Iterator", 318 "err", err, 319 "expr", queries[i].Expr.String(), 320 ) 321 } 322 xs = append(xs, iter) 323 } 324 325 return iter.NewHeapIterator(ctx, xs, params.Direction()), nil 326 327 default: 328 return nil, EvaluatorUnsupportedType(expr, ev) 329 } 330} 331 332// ConcatEvaluator joins multiple StepEvaluators. 333// Contract: They must be of identical start, end, and step values. 334func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { 335 return newStepEvaluator( 336 func() (ok bool, ts int64, vec promql.Vector) { 337 var cur promql.Vector 338 for _, eval := range evaluators { 339 ok, ts, cur = eval.Next() 340 vec = append(vec, cur...) 341 } 342 return ok, ts, vec 343 }, 344 func() (lastErr error) { 345 for _, eval := range evaluators { 346 if err := eval.Close(); err != nil { 347 lastErr = err 348 } 349 } 350 return lastErr 351 }, 352 func() error { 353 var errs []error 354 for _, eval := range evaluators { 355 if err := eval.Error(); err != nil { 356 errs = append(errs, err) 357 } 358 } 359 switch len(errs) { 360 case 0: 361 return nil 362 case 1: 363 return errs[0] 364 default: 365 return util.MultiError(errs) 366 } 367 }, 368 ) 369} 370 371// ResultStepEvaluator coerces a downstream vector or matrix into a StepEvaluator 372func ResultStepEvaluator(res logqlmodel.Result, params Params) (StepEvaluator, error) { 373 var ( 374 start = params.Start() 375 end = params.End() 376 step = params.Step() 377 ) 378 379 switch data := res.Data.(type) { 380 case promql.Vector: 381 var exhausted bool 382 return newStepEvaluator(func() (bool, int64, promql.Vector) { 383 if !exhausted { 384 exhausted = true 385 return true, start.UnixNano() / int64(time.Millisecond), data 386 } 387 return false, 0, nil 388 }, nil, nil) 389 case promql.Matrix: 390 return NewMatrixStepper(start, end, step, data), nil 391 default: 392 return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) 393 } 394} 395 396// ResultIterator coerces a downstream streams result into an iter.EntryIterator 397func ResultIterator(res logqlmodel.Result, params Params) (iter.EntryIterator, error) { 398 streams, ok := res.Data.(logqlmodel.Streams) 399 if !ok { 400 return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), logqlmodel.ValueTypeStreams) 401 } 402 return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil 403} 404