1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package expfmt
15
16import (
17	"fmt"
18	"io"
19	"math"
20	"mime"
21	"net/http"
22
23	dto "github.com/prometheus/client_model/go"
24
25	"github.com/matttproud/golang_protobuf_extensions/pbutil"
26	"github.com/prometheus/common/model"
27)
28
29// Decoder types decode an input stream into metric families.
30type Decoder interface {
31	Decode(*dto.MetricFamily) error
32}
33
34// DecodeOptions contains options used by the Decoder and in sample extraction.
35type DecodeOptions struct {
36	// Timestamp is added to each value from the stream that has no explicit timestamp set.
37	Timestamp model.Time
38}
39
40// ResponseFormat extracts the correct format from a HTTP response header.
41// If no matching format can be found FormatUnknown is returned.
42func ResponseFormat(h http.Header) Format {
43	ct := h.Get(hdrContentType)
44
45	mediatype, params, err := mime.ParseMediaType(ct)
46	if err != nil {
47		return FmtUnknown
48	}
49
50	const textType = "text/plain"
51
52	switch mediatype {
53	case ProtoType:
54		if p, ok := params["proto"]; ok && p != ProtoProtocol {
55			return FmtUnknown
56		}
57		if e, ok := params["encoding"]; ok && e != "delimited" {
58			return FmtUnknown
59		}
60		return FmtProtoDelim
61
62	case textType:
63		if v, ok := params["version"]; ok && v != TextVersion {
64			return FmtUnknown
65		}
66		return FmtText
67	}
68
69	return FmtUnknown
70}
71
72// NewDecoder returns a new decoder based on the given input format.
73// If the input format does not imply otherwise, a text format decoder is returned.
74func NewDecoder(r io.Reader, format Format) Decoder {
75	switch format {
76	case FmtProtoDelim:
77		return &protoDecoder{r: r}
78	}
79	return &textDecoder{r: r}
80}
81
82// protoDecoder implements the Decoder interface for protocol buffers.
83type protoDecoder struct {
84	r io.Reader
85}
86
87// Decode implements the Decoder interface.
88func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
89	_, err := pbutil.ReadDelimited(d.r, v)
90	if err != nil {
91		return err
92	}
93	if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
94		return fmt.Errorf("invalid metric name %q", v.GetName())
95	}
96	for _, m := range v.GetMetric() {
97		if m == nil {
98			continue
99		}
100		for _, l := range m.GetLabel() {
101			if l == nil {
102				continue
103			}
104			if !model.LabelValue(l.GetValue()).IsValid() {
105				return fmt.Errorf("invalid label value %q", l.GetValue())
106			}
107			if !model.LabelName(l.GetName()).IsValid() {
108				return fmt.Errorf("invalid label name %q", l.GetName())
109			}
110		}
111	}
112	return nil
113}
114
115// textDecoder implements the Decoder interface for the text protocol.
116type textDecoder struct {
117	r    io.Reader
118	p    TextParser
119	fams []*dto.MetricFamily
120}
121
122// Decode implements the Decoder interface.
123func (d *textDecoder) Decode(v *dto.MetricFamily) error {
124	// TODO(fabxc): Wrap this as a line reader to make streaming safer.
125	if len(d.fams) == 0 {
126		// No cached metric families, read everything and parse metrics.
127		fams, err := d.p.TextToMetricFamilies(d.r)
128		if err != nil {
129			return err
130		}
131		if len(fams) == 0 {
132			return io.EOF
133		}
134		d.fams = make([]*dto.MetricFamily, 0, len(fams))
135		for _, f := range fams {
136			d.fams = append(d.fams, f)
137		}
138	}
139
140	*v = *d.fams[0]
141	d.fams = d.fams[1:]
142
143	return nil
144}
145
146// SampleDecoder wraps a Decoder to extract samples from the metric families
147// decoded by the wrapped Decoder.
148type SampleDecoder struct {
149	Dec  Decoder
150	Opts *DecodeOptions
151
152	f dto.MetricFamily
153}
154
155// Decode calls the Decode method of the wrapped Decoder and then extracts the
156// samples from the decoded MetricFamily into the provided model.Vector.
157func (sd *SampleDecoder) Decode(s *model.Vector) error {
158	err := sd.Dec.Decode(&sd.f)
159	if err != nil {
160		return err
161	}
162	*s, err = extractSamples(&sd.f, sd.Opts)
163	return err
164}
165
166// ExtractSamples builds a slice of samples from the provided metric
167// families. If an error occurrs during sample extraction, it continues to
168// extract from the remaining metric families. The returned error is the last
169// error that has occurred.
170func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
171	var (
172		all     model.Vector
173		lastErr error
174	)
175	for _, f := range fams {
176		some, err := extractSamples(f, o)
177		if err != nil {
178			lastErr = err
179			continue
180		}
181		all = append(all, some...)
182	}
183	return all, lastErr
184}
185
186func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
187	switch f.GetType() {
188	case dto.MetricType_COUNTER:
189		return extractCounter(o, f), nil
190	case dto.MetricType_GAUGE:
191		return extractGauge(o, f), nil
192	case dto.MetricType_SUMMARY:
193		return extractSummary(o, f), nil
194	case dto.MetricType_UNTYPED:
195		return extractUntyped(o, f), nil
196	case dto.MetricType_HISTOGRAM:
197		return extractHistogram(o, f), nil
198	}
199	return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
200}
201
202func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
203	samples := make(model.Vector, 0, len(f.Metric))
204
205	for _, m := range f.Metric {
206		if m.Counter == nil {
207			continue
208		}
209
210		lset := make(model.LabelSet, len(m.Label)+1)
211		for _, p := range m.Label {
212			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
213		}
214		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
215
216		smpl := &model.Sample{
217			Metric: model.Metric(lset),
218			Value:  model.SampleValue(m.Counter.GetValue()),
219		}
220
221		if m.TimestampMs != nil {
222			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
223		} else {
224			smpl.Timestamp = o.Timestamp
225		}
226
227		samples = append(samples, smpl)
228	}
229
230	return samples
231}
232
233func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
234	samples := make(model.Vector, 0, len(f.Metric))
235
236	for _, m := range f.Metric {
237		if m.Gauge == nil {
238			continue
239		}
240
241		lset := make(model.LabelSet, len(m.Label)+1)
242		for _, p := range m.Label {
243			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
244		}
245		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
246
247		smpl := &model.Sample{
248			Metric: model.Metric(lset),
249			Value:  model.SampleValue(m.Gauge.GetValue()),
250		}
251
252		if m.TimestampMs != nil {
253			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
254		} else {
255			smpl.Timestamp = o.Timestamp
256		}
257
258		samples = append(samples, smpl)
259	}
260
261	return samples
262}
263
264func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
265	samples := make(model.Vector, 0, len(f.Metric))
266
267	for _, m := range f.Metric {
268		if m.Untyped == nil {
269			continue
270		}
271
272		lset := make(model.LabelSet, len(m.Label)+1)
273		for _, p := range m.Label {
274			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
275		}
276		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
277
278		smpl := &model.Sample{
279			Metric: model.Metric(lset),
280			Value:  model.SampleValue(m.Untyped.GetValue()),
281		}
282
283		if m.TimestampMs != nil {
284			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
285		} else {
286			smpl.Timestamp = o.Timestamp
287		}
288
289		samples = append(samples, smpl)
290	}
291
292	return samples
293}
294
295func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
296	samples := make(model.Vector, 0, len(f.Metric))
297
298	for _, m := range f.Metric {
299		if m.Summary == nil {
300			continue
301		}
302
303		timestamp := o.Timestamp
304		if m.TimestampMs != nil {
305			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
306		}
307
308		for _, q := range m.Summary.Quantile {
309			lset := make(model.LabelSet, len(m.Label)+2)
310			for _, p := range m.Label {
311				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
312			}
313			// BUG(matt): Update other names to "quantile".
314			lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
315			lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
316
317			samples = append(samples, &model.Sample{
318				Metric:    model.Metric(lset),
319				Value:     model.SampleValue(q.GetValue()),
320				Timestamp: timestamp,
321			})
322		}
323
324		lset := make(model.LabelSet, len(m.Label)+1)
325		for _, p := range m.Label {
326			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
327		}
328		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
329
330		samples = append(samples, &model.Sample{
331			Metric:    model.Metric(lset),
332			Value:     model.SampleValue(m.Summary.GetSampleSum()),
333			Timestamp: timestamp,
334		})
335
336		lset = make(model.LabelSet, len(m.Label)+1)
337		for _, p := range m.Label {
338			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
339		}
340		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
341
342		samples = append(samples, &model.Sample{
343			Metric:    model.Metric(lset),
344			Value:     model.SampleValue(m.Summary.GetSampleCount()),
345			Timestamp: timestamp,
346		})
347	}
348
349	return samples
350}
351
352func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
353	samples := make(model.Vector, 0, len(f.Metric))
354
355	for _, m := range f.Metric {
356		if m.Histogram == nil {
357			continue
358		}
359
360		timestamp := o.Timestamp
361		if m.TimestampMs != nil {
362			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
363		}
364
365		infSeen := false
366
367		for _, q := range m.Histogram.Bucket {
368			lset := make(model.LabelSet, len(m.Label)+2)
369			for _, p := range m.Label {
370				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
371			}
372			lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
373			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
374
375			if math.IsInf(q.GetUpperBound(), +1) {
376				infSeen = true
377			}
378
379			samples = append(samples, &model.Sample{
380				Metric:    model.Metric(lset),
381				Value:     model.SampleValue(q.GetCumulativeCount()),
382				Timestamp: timestamp,
383			})
384		}
385
386		lset := make(model.LabelSet, len(m.Label)+1)
387		for _, p := range m.Label {
388			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
389		}
390		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
391
392		samples = append(samples, &model.Sample{
393			Metric:    model.Metric(lset),
394			Value:     model.SampleValue(m.Histogram.GetSampleSum()),
395			Timestamp: timestamp,
396		})
397
398		lset = make(model.LabelSet, len(m.Label)+1)
399		for _, p := range m.Label {
400			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
401		}
402		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
403
404		count := &model.Sample{
405			Metric:    model.Metric(lset),
406			Value:     model.SampleValue(m.Histogram.GetSampleCount()),
407			Timestamp: timestamp,
408		}
409		samples = append(samples, count)
410
411		if !infSeen {
412			// Append an infinity bucket sample.
413			lset := make(model.LabelSet, len(m.Label)+2)
414			for _, p := range m.Label {
415				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
416			}
417			lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
418			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
419
420			samples = append(samples, &model.Sample{
421				Metric:    model.Metric(lset),
422				Value:     count.Value,
423				Timestamp: timestamp,
424			})
425		}
426	}
427
428	return samples
429}
430