1package reduce
2
3import (
4	"context"
5
6	"github.com/go-graphite/carbonapi/expr/helper"
7	"github.com/go-graphite/carbonapi/expr/interfaces"
8	"github.com/go-graphite/carbonapi/expr/types"
9	"github.com/go-graphite/carbonapi/pkg/parser"
10
11	"strings"
12)
13
14type reduce struct {
15	interfaces.FunctionBase
16}
17
18func GetOrder() interfaces.Order {
19	return interfaces.Any
20}
21
22func New(configFile string) []interfaces.FunctionMetadata {
23	res := make([]interfaces.FunctionMetadata, 0)
24	f := &reduce{}
25	functions := []string{"reduceSeries", "reduce"}
26	for _, n := range functions {
27		res = append(res, interfaces.FunctionMetadata{Name: n, F: f})
28	}
29	return res
30}
31
32func (f *reduce) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
33	const matchersStartIndex = 3
34
35	if len(e.Args()) < matchersStartIndex+1 {
36		return nil, parser.ErrMissingArgument
37	}
38
39	seriesList, err := helper.GetSeriesArg(e.Args()[0], from, until, values)
40	if err != nil {
41		return nil, err
42	}
43
44	reduceFunction, err := e.GetStringArg(1)
45	if err != nil {
46		return nil, err
47	}
48
49	reduceNode, err := e.GetIntArg(2)
50	if err != nil {
51		return nil, err
52	}
53
54	argsCount := len(e.Args())
55	matchersCount := argsCount - matchersStartIndex
56	reduceMatchers := make([]string, matchersCount)
57	for i := matchersStartIndex; i < argsCount; i++ {
58		reduceMatcher, err := e.GetStringArg(i)
59		if err != nil {
60			return nil, err
61		}
62
63		reduceMatchers[i-matchersStartIndex] = reduceMatcher
64	}
65
66	var results []*types.MetricData
67
68	reduceGroups := make(map[string]map[string]*types.MetricData)
69	reducedValues := values
70	var aliasNames []string
71
72	for _, series := range seriesList {
73		metric := helper.ExtractMetric(series.Name)
74		nodes := strings.Split(metric, ".")
75		reduceNodeKey := nodes[reduceNode]
76		nodes[reduceNode] = "reduce." + reduceFunction
77		aliasName := strings.Join(nodes, ".")
78		_, exist := reduceGroups[aliasName]
79		if !exist {
80			reduceGroups[aliasName] = make(map[string]*types.MetricData)
81			aliasNames = append(aliasNames, aliasName)
82		}
83
84		reduceGroups[aliasName][reduceNodeKey] = series
85		valueKey := parser.MetricRequest{series.Name, from, until}
86		reducedValues[valueKey] = append(reducedValues[valueKey], series)
87	}
88AliasLoop:
89	for _, aliasName := range aliasNames {
90
91		reducedNodes := make([]parser.Expr, len(reduceMatchers))
92		for i, reduceMatcher := range reduceMatchers {
93			matched, ok := reduceGroups[aliasName][reduceMatcher]
94			if !ok {
95				continue AliasLoop
96			}
97			reducedNodes[i] = parser.NewTargetExpr(matched.Name)
98		}
99
100		result, err := f.Evaluator.Eval(ctx, parser.NewExprTyped("alias", []parser.Expr{
101			parser.NewExprTyped(reduceFunction, reducedNodes),
102			parser.NewValueExpr(aliasName),
103		}), from, until, reducedValues)
104
105		if err != nil {
106			return nil, err
107		}
108
109		results = append(results, result...)
110	}
111
112	return results, nil
113}
114
115// Description is auto-generated description, based on output of https://github.com/graphite-project/graphite-web
116func (f *reduce) Description() map[string]types.FunctionDescription {
117	return map[string]types.FunctionDescription{
118		"reduceSeries": {
119			Description: "Short form: ``reduce()``\n\nTakes a list of seriesLists and reduces it to a list of series by means of the reduceFunction.\n\nReduction is performed by matching the reduceNode in each series against the list of\nreduceMatchers. Then each series is passed to the reduceFunction as arguments in the order\ngiven by reduceMatchers. The reduceFunction should yield a single series.\n\nThe resulting list of series are aliased so that they can easily be nested in other functions.\n\n**Example**: Map/Reduce asPercent(bytes_used,total_bytes) for each server\n\nAssume that metrics in the form below exist:\n\n.. code-block:: none\n\n     servers.server1.disk.bytes_used\n     servers.server1.disk.total_bytes\n     servers.server2.disk.bytes_used\n     servers.server2.disk.total_bytes\n     servers.server3.disk.bytes_used\n     servers.server3.disk.total_bytes\n     ...\n     servers.serverN.disk.bytes_used\n     servers.serverN.disk.total_bytes\n\nTo get the percentage of disk used for each server:\n\n.. code-block:: none\n\n    reduceSeries(mapSeries(servers.*.disk.*,1),\"asPercent\",3,\"bytes_used\",\"total_bytes\") =>\n\n      alias(asPercent(servers.server1.disk.bytes_used,servers.server1.disk.total_bytes),\"servers.server1.disk.reduce.asPercent\"),\n      alias(asPercent(servers.server2.disk.bytes_used,servers.server2.disk.total_bytes),\"servers.server2.disk.reduce.asPercent\"),\n      alias(asPercent(servers.server3.disk.bytes_used,servers.server3.disk.total_bytes),\"servers.server3.disk.reduce.asPercent\"),\n      ...\n      alias(asPercent(servers.serverN.disk.bytes_used,servers.serverN.disk.total_bytes),\"servers.serverN.disk.reduce.asPercent\")\n\nIn other words, we will get back the following metrics::\n\n    servers.server1.disk.reduce.asPercent\n    servers.server2.disk.reduce.asPercent\n    servers.server3.disk.reduce.asPercent\n    ...\n    servers.serverN.disk.reduce.asPercent\n\n.. seealso:: :py:func:`mapSeries`",
120			Function:    "reduceSeries(seriesLists, reduceFunction, reduceNode, *reduceMatchers)",
121			Group:       "Combine",
122			Module:      "graphite.render.functions",
123			Name:        "reduceSeries",
124			Params: []types.FunctionParam{
125				{
126					Name:     "seriesLists",
127					Required: true,
128					Type:     types.SeriesLists,
129				},
130				{
131					Name:     "reduceFunction",
132					Required: true,
133					Type:     types.String,
134				},
135				{
136					Name:     "reduceNode",
137					Required: true,
138					Type:     types.Node,
139				},
140				{
141					Multiple: true,
142					Name:     "reduceMatchers",
143					Required: true,
144					Type:     types.String,
145				},
146			},
147		},
148		"reduce": {
149			Description: "Short form: ``reduce()``\n\nTakes a list of seriesLists and reduces it to a list of series by means of the reduceFunction.\n\nReduction is performed by matching the reduceNode in each series against the list of\nreduceMatchers. Then each series is passed to the reduceFunction as arguments in the order\ngiven by reduceMatchers. The reduceFunction should yield a single series.\n\nThe resulting list of series are aliased so that they can easily be nested in other functions.\n\n**Example**: Map/Reduce asPercent(bytes_used,total_bytes) for each server\n\nAssume that metrics in the form below exist:\n\n.. code-block:: none\n\n     servers.server1.disk.bytes_used\n     servers.server1.disk.total_bytes\n     servers.server2.disk.bytes_used\n     servers.server2.disk.total_bytes\n     servers.server3.disk.bytes_used\n     servers.server3.disk.total_bytes\n     ...\n     servers.serverN.disk.bytes_used\n     servers.serverN.disk.total_bytes\n\nTo get the percentage of disk used for each server:\n\n.. code-block:: none\n\n    reduceSeries(mapSeries(servers.*.disk.*,1),\"asPercent\",3,\"bytes_used\",\"total_bytes\") =>\n\n      alias(asPercent(servers.server1.disk.bytes_used,servers.server1.disk.total_bytes),\"servers.server1.disk.reduce.asPercent\"),\n      alias(asPercent(servers.server2.disk.bytes_used,servers.server2.disk.total_bytes),\"servers.server2.disk.reduce.asPercent\"),\n      alias(asPercent(servers.server3.disk.bytes_used,servers.server3.disk.total_bytes),\"servers.server3.disk.reduce.asPercent\"),\n      ...\n      alias(asPercent(servers.serverN.disk.bytes_used,servers.serverN.disk.total_bytes),\"servers.serverN.disk.reduce.asPercent\")\n\nIn other words, we will get back the following metrics::\n\n    servers.server1.disk.reduce.asPercent\n    servers.server2.disk.reduce.asPercent\n    servers.server3.disk.reduce.asPercent\n    ...\n    servers.serverN.disk.reduce.asPercent\n\n.. seealso:: :py:func:`mapSeries`",
150			Function:    "reduce(seriesLists, reduceFunction, reduceNode, *reduceMatchers)",
151			Group:       "Combine",
152			Module:      "graphite.render.functions",
153			Name:        "reduce",
154			Params: []types.FunctionParam{
155				{
156					Name:     "seriesLists",
157					Required: true,
158					Type:     types.SeriesLists,
159				},
160				{
161					Name:     "reduceFunction",
162					Required: true,
163					Type:     types.String,
164				},
165				{
166					Name:     "reduceNode",
167					Required: true,
168					Type:     types.Node,
169				},
170				{
171					Multiple: true,
172					Name:     "reduceMatchers",
173					Required: true,
174					Type:     types.String,
175				},
176			},
177		},
178	}
179}
180