1package prometheus
2
3import (
4	"errors"
5	"fmt"
6	"math"
7	"time"
8
9	"github.com/gogo/protobuf/types"
10	"github.com/influxdata/influxdb/models"
11	"github.com/influxdata/influxdb/services/storage"
12	"github.com/influxdata/influxdb/storage/reads/datatypes"
13	dto "github.com/prometheus/client_model/go"
14	"github.com/prometheus/prometheus/prompb"
15)
16
17const (
18	// measurementName is the default name used if no Prometheus name can be found on write
19	measurementName = "prom_metric_not_specified"
20
21	// fieldName is the field all prometheus values get written to
22	fieldName = "value"
23
24	// fieldTagKey is the tag key that all field names use in the new storage processor
25	fieldTagKey = "_field"
26
27	// prometheusNameTag is the tag key that Prometheus uses for metric names
28	prometheusNameTag = "__name__"
29
30	// measurementTagKey is the tag key that all measurement names use in the new storage processor
31	measurementTagKey = "_measurement"
32)
33
34// A DroppedValuesError is returned when the prometheus write request contains
35// unsupported float64 values.
36type DroppedValuesError struct {
37	nan  uint64
38	ninf uint64
39	inf  uint64
40}
41
42// Error returns a descriptive error of the values dropped.
43func (e DroppedValuesError) Error() string {
44	return fmt.Sprintf("dropped unsupported Prometheus values: [NaN = %d, +Inf = %d, -Inf = %d]", e.nan, e.inf, e.ninf)
45}
46
47// WriteRequestToPoints converts a Prometheus remote write request of time series and their
48// samples into Points that can be written into Influx
49func WriteRequestToPoints(req *prompb.WriteRequest) ([]models.Point, error) {
50	var maxPoints int
51	for _, ts := range req.Timeseries {
52		maxPoints += len(ts.Samples)
53	}
54	points := make([]models.Point, 0, maxPoints)
55
56	// Track any dropped values.
57	var nan, inf, ninf uint64
58
59	for _, ts := range req.Timeseries {
60		measurement := measurementName
61
62		tags := make(map[string]string, len(ts.Labels))
63		for _, l := range ts.Labels {
64			tags[l.Name] = l.Value
65			if l.Name == prometheusNameTag {
66				measurement = l.Value
67			}
68		}
69
70		for _, s := range ts.Samples {
71			if v := s.Value; math.IsNaN(v) {
72				nan++
73				continue
74			} else if math.IsInf(v, -1) {
75				ninf++
76				continue
77			} else if math.IsInf(v, 1) {
78				inf++
79				continue
80			}
81
82			// convert and append
83			t := time.Unix(0, s.Timestamp*int64(time.Millisecond))
84			fields := map[string]interface{}{fieldName: s.Value}
85			p, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
86			if err != nil {
87				return nil, err
88			}
89			points = append(points, p)
90		}
91	}
92
93	if nan+inf+ninf > 0 {
94		return points, DroppedValuesError{nan: nan, inf: inf, ninf: ninf}
95	}
96	return points, nil
97}
98
99// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
100// new storage API that IFQL uses.
101func ReadRequestToInfluxStorageRequest(req *prompb.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) {
102	if len(req.Queries) != 1 {
103		return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
104	}
105	q := req.Queries[0]
106
107	src, err := types.MarshalAny(&storage.ReadSource{Database: db, RetentionPolicy: rp})
108	if err != nil {
109		return nil, err
110	}
111
112	sreq := &datatypes.ReadFilterRequest{
113		ReadSource: src,
114		Range: datatypes.TimestampRange{
115			Start: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond)).UnixNano(),
116			End:   time.Unix(0, q.EndTimestampMs*int64(time.Millisecond)).UnixNano(),
117		},
118	}
119
120	pred, err := predicateFromMatchers(q.Matchers)
121	if err != nil {
122		return nil, err
123	}
124
125	sreq.Predicate = pred
126	return sreq, nil
127}
128
129// RemoveInfluxSystemTags will remove tags that are Influx internal (_measurement and _field)
130func RemoveInfluxSystemTags(tags models.Tags) models.Tags {
131	var t models.Tags
132	for _, tt := range tags {
133		if string(tt.Key) == measurementTagKey || string(tt.Key) == fieldTagKey {
134			continue
135		}
136		t = append(t, tt)
137	}
138
139	return t
140}
141
142// predicateFromMatchers takes Prometheus label matchers and converts them to a storage
143// predicate that works with the schema that is written in, which assumes a single field
144// named value
145func predicateFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Predicate, error) {
146	left, err := nodeFromMatchers(matchers)
147	if err != nil {
148		return nil, err
149	}
150	right := fieldNode()
151
152	return &datatypes.Predicate{
153		Root: &datatypes.Node{
154			NodeType: datatypes.NodeTypeLogicalExpression,
155			Value:    &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd},
156			Children: []*datatypes.Node{left, right},
157		},
158	}, nil
159}
160
161// fieldNode returns a datatypes.Node that will match that the fieldTagKey == fieldName
162// which matches how Prometheus data is fed into the system
163func fieldNode() *datatypes.Node {
164	children := []*datatypes.Node{
165		&datatypes.Node{
166			NodeType: datatypes.NodeTypeTagRef,
167			Value: &datatypes.Node_TagRefValue{
168				TagRefValue: fieldTagKey,
169			},
170		},
171		&datatypes.Node{
172			NodeType: datatypes.NodeTypeLiteral,
173			Value: &datatypes.Node_StringValue{
174				StringValue: fieldName,
175			},
176		},
177	}
178
179	return &datatypes.Node{
180		NodeType: datatypes.NodeTypeComparisonExpression,
181		Value:    &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonEqual},
182		Children: children,
183	}
184}
185
186func nodeFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Node, error) {
187	if len(matchers) == 0 {
188		return nil, errors.New("expected matcher")
189	} else if len(matchers) == 1 {
190		return nodeFromMatcher(matchers[0])
191	}
192
193	left, err := nodeFromMatcher(matchers[0])
194	if err != nil {
195		return nil, err
196	}
197
198	right, err := nodeFromMatchers(matchers[1:])
199	if err != nil {
200		return nil, err
201	}
202
203	children := []*datatypes.Node{left, right}
204	return &datatypes.Node{
205		NodeType: datatypes.NodeTypeLogicalExpression,
206		Value:    &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd},
207		Children: children,
208	}, nil
209}
210
211func nodeFromMatcher(m *prompb.LabelMatcher) (*datatypes.Node, error) {
212	var op datatypes.Node_Comparison
213	switch m.Type {
214	case prompb.LabelMatcher_EQ:
215		op = datatypes.ComparisonEqual
216	case prompb.LabelMatcher_NEQ:
217		op = datatypes.ComparisonNotEqual
218	case prompb.LabelMatcher_RE:
219		op = datatypes.ComparisonRegex
220	case prompb.LabelMatcher_NRE:
221		op = datatypes.ComparisonNotRegex
222	default:
223		return nil, fmt.Errorf("unknown match type %v", m.Type)
224	}
225
226	name := m.Name
227	if m.Name == prometheusNameTag {
228		name = measurementTagKey
229	}
230
231	left := &datatypes.Node{
232		NodeType: datatypes.NodeTypeTagRef,
233		Value: &datatypes.Node_TagRefValue{
234			TagRefValue: name,
235		},
236	}
237
238	var right *datatypes.Node
239
240	if op == datatypes.ComparisonRegex || op == datatypes.ComparisonNotRegex {
241		right = &datatypes.Node{
242			NodeType: datatypes.NodeTypeLiteral,
243			Value: &datatypes.Node_RegexValue{
244				// To comply with PromQL, see
245				// https://github.com/prometheus/prometheus/blob/daf382e4a9f5ca380b2b662c8e60755a56675f14/pkg/labels/regexp.go#L30
246				RegexValue: "^(?:" + m.Value + ")$",
247			},
248		}
249	} else {
250		right = &datatypes.Node{
251			NodeType: datatypes.NodeTypeLiteral,
252			Value: &datatypes.Node_StringValue{
253				StringValue: m.Value,
254			},
255		}
256	}
257
258	children := []*datatypes.Node{left, right}
259	return &datatypes.Node{
260		NodeType: datatypes.NodeTypeComparisonExpression,
261		Value:    &datatypes.Node_Comparison_{Comparison: op},
262		Children: children,
263	}, nil
264}
265
266// ModelTagsToLabelPairs converts models.Tags to a slice of Prometheus label pairs
267func ModelTagsToLabelPairs(tags models.Tags) []prompb.Label {
268	pairs := make([]prompb.Label, 0, len(tags))
269	for _, t := range tags {
270		if string(t.Value) == "" {
271			continue
272		}
273		pairs = append(pairs, prompb.Label{
274			Name:  string(t.Key),
275			Value: string(t.Value),
276		})
277	}
278	return pairs
279}
280
281// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
282func TagsToLabelPairs(tags map[string]string) []*prompb.Label {
283	pairs := make([]*prompb.Label, 0, len(tags))
284	for k, v := range tags {
285		if v == "" {
286			// If we select metrics with different sets of labels names,
287			// InfluxDB returns *all* possible tag names on all returned
288			// series, with empty tag values on series where they don't
289			// apply. In Prometheus, an empty label value is equivalent
290			// to a non-existent label, so we just skip empty ones here
291			// to make the result correct.
292			continue
293		}
294		pairs = append(pairs, &prompb.Label{
295			Name:  k,
296			Value: v,
297		})
298	}
299	return pairs
300}
301
302// PrometheusToStatistics converts a prometheus metric family (from Registry.Gather)
303// to a []model.Statistics for /debug/vars .
304// This code is strongly inspired by the telegraf prometheus plugin.
305func PrometheusToStatistics(family []*dto.MetricFamily, tags map[string]string) []models.Statistic {
306	statistics := []models.Statistic{}
307	for _, mf := range family {
308		for _, m := range mf.Metric {
309			newTags := make(map[string]string, len(tags)+len(m.Label))
310			for key, value := range tags {
311				newTags[key] = value
312			}
313			for _, lp := range m.Label {
314				newTags[lp.GetName()] = lp.GetValue()
315			}
316
317			// reading fields
318			var fields map[string]interface{}
319			if mf.GetType() == dto.MetricType_SUMMARY {
320				// summary metric
321				fields = makeQuantiles(m)
322				fields["count"] = float64(m.GetSummary().GetSampleCount())
323				fields["sum"] = float64(m.GetSummary().GetSampleSum())
324			} else if mf.GetType() == dto.MetricType_HISTOGRAM {
325				// histogram metric
326				fields = makeBuckets(m)
327				fields["count"] = float64(m.GetHistogram().GetSampleCount())
328				fields["sum"] = float64(m.GetHistogram().GetSampleSum())
329			} else {
330				// standard metric
331				fields = getNameAndValue(m)
332			}
333
334			statistics = append(statistics, models.Statistic{
335				Name:   *mf.Name,
336				Tags:   tags,
337				Values: fields,
338			})
339		}
340	}
341	return statistics
342}
343
344// Get Quantiles from summary metric
345func makeQuantiles(m *dto.Metric) map[string]interface{} {
346	fields := make(map[string]interface{})
347	for _, q := range m.GetSummary().Quantile {
348		if !math.IsNaN(q.GetValue()) {
349			fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
350		}
351	}
352	return fields
353}
354
355// Get Buckets  from histogram metric
356func makeBuckets(m *dto.Metric) map[string]interface{} {
357	fields := make(map[string]interface{})
358	for _, b := range m.GetHistogram().Bucket {
359		fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
360	}
361	return fields
362}
363
364// Get name and value from metric
365func getNameAndValue(m *dto.Metric) map[string]interface{} {
366	fields := make(map[string]interface{})
367	if m.Gauge != nil {
368		if !math.IsNaN(m.GetGauge().GetValue()) {
369			fields["gauge"] = float64(m.GetGauge().GetValue())
370		}
371	} else if m.Counter != nil {
372		if !math.IsNaN(m.GetCounter().GetValue()) {
373			fields["counter"] = float64(m.GetCounter().GetValue())
374		}
375	} else if m.Untyped != nil {
376		if !math.IsNaN(m.GetUntyped().GetValue()) {
377			fields["value"] = float64(m.GetUntyped().GetValue())
378		}
379	}
380	return fields
381}
382