1package expr 2 3import ( 4 "context" 5 6 utilctx "github.com/go-graphite/carbonapi/util/ctx" 7 8 "github.com/ansel1/merry" 9 "github.com/go-graphite/carbonapi/cmd/carbonapi/config" 10 _ "github.com/go-graphite/carbonapi/expr/functions" 11 "github.com/go-graphite/carbonapi/expr/helper" 12 "github.com/go-graphite/carbonapi/expr/metadata" 13 "github.com/go-graphite/carbonapi/expr/types" 14 "github.com/go-graphite/carbonapi/pkg/parser" 15 pb "github.com/go-graphite/protocol/carbonapi_v3_pb" 16) 17 18type evaluator struct{} 19 20// FetchAndEvalExp fetch data and evalualtes expressions 21func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) { 22 config.Config.Limiter.Enter() 23 defer config.Config.Limiter.Leave() 24 25 multiFetchRequest := pb.MultiFetchRequest{} 26 metricRequestCache := make(map[string]parser.MetricRequest) 27 maxDataPoints := utilctx.GetMaxDatapoints(ctx) 28 // values related to this particular `target=` 29 targetValues := make(map[parser.MetricRequest][]*types.MetricData) 30 31 for _, m := range exp.Metrics() { 32 fetchRequest := pb.FetchRequest{ 33 Name: m.Metric, 34 PathExpression: m.Metric, 35 StartTime: m.From + from, 36 StopTime: m.Until + until, 37 MaxDataPoints: maxDataPoints, 38 } 39 metricRequest := parser.MetricRequest{ 40 Metric: fetchRequest.PathExpression, 41 From: fetchRequest.StartTime, 42 Until: fetchRequest.StopTime, 43 } 44 45 // avoid multiple requests in a function, E.g divideSeries(a.b, a.b) 46 if cachedMetricRequest, ok := metricRequestCache[m.Metric]; ok && 47 cachedMetricRequest.From == metricRequest.From && 48 cachedMetricRequest.Until == metricRequest.Until { 49 continue 50 } 51 52 // avoid multiple requests in a http request, E.g render?target=a.b&target=a.b 53 if _, ok := values[metricRequest]; ok { 54 targetValues[metricRequest] = nil 55 continue 56 } 57 58 metricRequestCache[m.Metric] = metricRequest 59 targetValues[metricRequest] = nil 60 multiFetchRequest.Metrics = append(multiFetchRequest.Metrics, fetchRequest) 61 } 62 63 if len(multiFetchRequest.Metrics) > 0 { 64 metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest) 65 // If we had only partial result, we want to do our best to actually do our job 66 if err != nil && merry.HTTPCode(err) >= 400 && exp.Target() != "fallbackSeries" { 67 return nil, err 68 } 69 for _, metric := range metrics { 70 metricRequest := metricRequestCache[metric.PathExpression] 71 if metric.RequestStartTime != 0 && metric.RequestStopTime != 0 { 72 metricRequest.From = metric.RequestStartTime 73 metricRequest.Until = metric.RequestStopTime 74 } 75 data, ok := values[metricRequest] 76 if !ok { 77 data = make([]*types.MetricData, 0, 1) 78 } 79 values[metricRequest] = append(data, metric) 80 } 81 } 82 83 for m := range targetValues { 84 targetValues[m] = values[m] 85 } 86 87 if config.Config.ZipperInstance.ScaleToCommonStep() { 88 targetValues = helper.ScaleValuesToCommonStep(targetValues) 89 } 90 91 return eval.Eval(ctx, exp, from, until, targetValues) 92} 93 94// Eval evalualtes expressions 95func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) { 96 rewritten, targets, err := RewriteExpr(ctx, exp, from, until, values) 97 if err != nil { 98 return nil, err 99 } 100 if rewritten { 101 for _, target := range targets { 102 exp, _, err = parser.ParseExpr(target) 103 if err != nil { 104 return nil, err 105 } 106 result, err := eval.FetchAndEvalExp(ctx, exp, from, until, values) 107 if err != nil { 108 return nil, err 109 } 110 results = append(results, result...) 111 } 112 return results, nil 113 } 114 return EvalExpr(ctx, exp, from, until, values) 115} 116 117var _evaluator = evaluator{} 118 119func init() { 120 helper.SetEvaluator(_evaluator) 121 metadata.SetEvaluator(_evaluator) 122} 123 124// FetchAndEvalExp fetch data and evalualtes expressions 125func FetchAndEvalExp(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) { 126 return _evaluator.FetchAndEvalExp(ctx, e, from, until, values) 127} 128 129// Eval is the main expression evaluator 130func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) { 131 if e.IsName() { 132 return values[parser.MetricRequest{Metric: e.Target(), From: from, Until: until}], nil 133 } else if e.IsConst() { 134 p := types.MetricData{FetchResponse: pb.FetchResponse{Name: e.Target(), Values: []float64{e.FloatValue()}}} 135 return []*types.MetricData{&p}, nil 136 } 137 // evaluate the function 138 139 // all functions have arguments -- check we do too 140 if len(e.Args()) == 0 { 141 return nil, parser.ErrMissingArgument 142 } 143 144 metadata.FunctionMD.RLock() 145 f, ok := metadata.FunctionMD.Functions[e.Target()] 146 metadata.FunctionMD.RUnlock() 147 if ok { 148 v, err := f.Do(ctx, e, from, until, values) 149 if err != nil { 150 err = merry.WithMessagef(err, "function=%s", e.Target()) 151 } 152 return v, err 153 } 154 155 return nil, helper.ErrUnknownFunction(e.Target()) 156} 157 158// RewriteExpr expands targets that use applyByNode into a new list of targets. 159// eg: 160// applyByNode(foo*, 1, "%") -> (true, ["foo1", "foo2"], nil) 161// sumSeries(foo) -> (false, nil, nil) 162// Assumes that applyByNode only appears as the outermost function. 163func RewriteExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) { 164 if e.IsFunc() { 165 metadata.FunctionMD.RLock() 166 f, ok := metadata.FunctionMD.RewriteFunctions[e.Target()] 167 metadata.FunctionMD.RUnlock() 168 if ok { 169 return f.Do(ctx, e, from, until, values) 170 } 171 } 172 return false, nil, nil 173} 174