1package expr
2
3import (
4	"context"
5
6	utilctx "github.com/go-graphite/carbonapi/util/ctx"
7
8	"github.com/ansel1/merry"
9	"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
10	_ "github.com/go-graphite/carbonapi/expr/functions"
11	"github.com/go-graphite/carbonapi/expr/helper"
12	"github.com/go-graphite/carbonapi/expr/metadata"
13	"github.com/go-graphite/carbonapi/expr/types"
14	"github.com/go-graphite/carbonapi/pkg/parser"
15	pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
16)
17
18type evaluator struct{}
19
20// FetchAndEvalExp fetch data and evalualtes expressions
21func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
22	config.Config.Limiter.Enter()
23	defer config.Config.Limiter.Leave()
24
25	multiFetchRequest := pb.MultiFetchRequest{}
26	metricRequestCache := make(map[string]parser.MetricRequest)
27	maxDataPoints := utilctx.GetMaxDatapoints(ctx)
28	// values related to this particular `target=`
29	targetValues := make(map[parser.MetricRequest][]*types.MetricData)
30
31	for _, m := range exp.Metrics() {
32		fetchRequest := pb.FetchRequest{
33			Name:           m.Metric,
34			PathExpression: m.Metric,
35			StartTime:      m.From + from,
36			StopTime:       m.Until + until,
37			MaxDataPoints:  maxDataPoints,
38		}
39		metricRequest := parser.MetricRequest{
40			Metric: fetchRequest.PathExpression,
41			From:   fetchRequest.StartTime,
42			Until:  fetchRequest.StopTime,
43		}
44
45		// avoid multiple requests in a function, E.g divideSeries(a.b, a.b)
46		if cachedMetricRequest, ok := metricRequestCache[m.Metric]; ok &&
47			cachedMetricRequest.From == metricRequest.From &&
48			cachedMetricRequest.Until == metricRequest.Until {
49			continue
50		}
51
52		// avoid multiple requests in a http request, E.g render?target=a.b&target=a.b
53		if _, ok := values[metricRequest]; ok {
54			targetValues[metricRequest] = nil
55			continue
56		}
57
58		metricRequestCache[m.Metric] = metricRequest
59		targetValues[metricRequest] = nil
60		multiFetchRequest.Metrics = append(multiFetchRequest.Metrics, fetchRequest)
61	}
62
63	if len(multiFetchRequest.Metrics) > 0 {
64		metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest)
65		// If we had only partial result, we want to do our best to actually do our job
66		if err != nil && merry.HTTPCode(err) >= 400 && exp.Target() != "fallbackSeries" {
67			return nil, err
68		}
69		for _, metric := range metrics {
70			metricRequest := metricRequestCache[metric.PathExpression]
71			if metric.RequestStartTime != 0 && metric.RequestStopTime != 0 {
72				metricRequest.From = metric.RequestStartTime
73				metricRequest.Until = metric.RequestStopTime
74			}
75			data, ok := values[metricRequest]
76			if !ok {
77				data = make([]*types.MetricData, 0, 1)
78			}
79			values[metricRequest] = append(data, metric)
80		}
81	}
82
83	for m := range targetValues {
84		targetValues[m] = values[m]
85	}
86
87	if config.Config.ZipperInstance.ScaleToCommonStep() {
88		targetValues = helper.ScaleValuesToCommonStep(targetValues)
89	}
90
91	return eval.Eval(ctx, exp, from, until, targetValues)
92}
93
94// Eval evalualtes expressions
95func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) {
96	rewritten, targets, err := RewriteExpr(ctx, exp, from, until, values)
97	if err != nil {
98		return nil, err
99	}
100	if rewritten {
101		for _, target := range targets {
102			exp, _, err = parser.ParseExpr(target)
103			if err != nil {
104				return nil, err
105			}
106			result, err := eval.FetchAndEvalExp(ctx, exp, from, until, values)
107			if err != nil {
108				return nil, err
109			}
110			results = append(results, result...)
111		}
112		return results, nil
113	}
114	return EvalExpr(ctx, exp, from, until, values)
115}
116
117var _evaluator = evaluator{}
118
119func init() {
120	helper.SetEvaluator(_evaluator)
121	metadata.SetEvaluator(_evaluator)
122}
123
124// FetchAndEvalExp fetch data and evalualtes expressions
125func FetchAndEvalExp(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
126	return _evaluator.FetchAndEvalExp(ctx, e, from, until, values)
127}
128
129// Eval is the main expression evaluator
130func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
131	if e.IsName() {
132		return values[parser.MetricRequest{Metric: e.Target(), From: from, Until: until}], nil
133	} else if e.IsConst() {
134		p := types.MetricData{FetchResponse: pb.FetchResponse{Name: e.Target(), Values: []float64{e.FloatValue()}}}
135		return []*types.MetricData{&p}, nil
136	}
137	// evaluate the function
138
139	// all functions have arguments -- check we do too
140	if len(e.Args()) == 0 {
141		return nil, parser.ErrMissingArgument
142	}
143
144	metadata.FunctionMD.RLock()
145	f, ok := metadata.FunctionMD.Functions[e.Target()]
146	metadata.FunctionMD.RUnlock()
147	if ok {
148		v, err := f.Do(ctx, e, from, until, values)
149		if err != nil {
150			err = merry.WithMessagef(err, "function=%s", e.Target())
151		}
152		return v, err
153	}
154
155	return nil, helper.ErrUnknownFunction(e.Target())
156}
157
158// RewriteExpr expands targets that use applyByNode into a new list of targets.
159// eg:
160// applyByNode(foo*, 1, "%") -> (true, ["foo1", "foo2"], nil)
161// sumSeries(foo) -> (false, nil, nil)
162// Assumes that applyByNode only appears as the outermost function.
163func RewriteExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) {
164	if e.IsFunc() {
165		metadata.FunctionMD.RLock()
166		f, ok := metadata.FunctionMD.RewriteFunctions[e.Target()]
167		metadata.FunctionMD.RUnlock()
168		if ok {
169			return f.Do(ctx, e, from, until, values)
170		}
171	}
172	return false, nil, nil
173}
174