1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package expfmt 15 16import ( 17 "fmt" 18 "io" 19 "math" 20 "mime" 21 "net/http" 22 23 dto "github.com/prometheus/client_model/go" 24 25 "github.com/matttproud/golang_protobuf_extensions/pbutil" 26 "github.com/prometheus/common/model" 27) 28 29// Decoder types decode an input stream into metric families. 30type Decoder interface { 31 Decode(*dto.MetricFamily) error 32} 33 34// DecodeOptions contains options used by the Decoder and in sample extraction. 35type DecodeOptions struct { 36 // Timestamp is added to each value from the stream that has no explicit timestamp set. 37 Timestamp model.Time 38} 39 40// ResponseFormat extracts the correct format from a HTTP response header. 41// If no matching format can be found FormatUnknown is returned. 42func ResponseFormat(h http.Header) Format { 43 ct := h.Get(hdrContentType) 44 45 mediatype, params, err := mime.ParseMediaType(ct) 46 if err != nil { 47 return FmtUnknown 48 } 49 50 const textType = "text/plain" 51 52 switch mediatype { 53 case ProtoType: 54 if p, ok := params["proto"]; ok && p != ProtoProtocol { 55 return FmtUnknown 56 } 57 if e, ok := params["encoding"]; ok && e != "delimited" { 58 return FmtUnknown 59 } 60 return FmtProtoDelim 61 62 case textType: 63 if v, ok := params["version"]; ok && v != TextVersion { 64 return FmtUnknown 65 } 66 return FmtText 67 } 68 69 return FmtUnknown 70} 71 72// NewDecoder returns a new decoder based on the given input format. 73// If the input format does not imply otherwise, a text format decoder is returned. 74func NewDecoder(r io.Reader, format Format) Decoder { 75 switch format { 76 case FmtProtoDelim: 77 return &protoDecoder{r: r} 78 } 79 return &textDecoder{r: r} 80} 81 82// protoDecoder implements the Decoder interface for protocol buffers. 83type protoDecoder struct { 84 r io.Reader 85} 86 87// Decode implements the Decoder interface. 88func (d *protoDecoder) Decode(v *dto.MetricFamily) error { 89 _, err := pbutil.ReadDelimited(d.r, v) 90 if err != nil { 91 return err 92 } 93 if !model.IsValidMetricName(model.LabelValue(v.GetName())) { 94 return fmt.Errorf("invalid metric name %q", v.GetName()) 95 } 96 for _, m := range v.GetMetric() { 97 if m == nil { 98 continue 99 } 100 for _, l := range m.GetLabel() { 101 if l == nil { 102 continue 103 } 104 if !model.LabelValue(l.GetValue()).IsValid() { 105 return fmt.Errorf("invalid label value %q", l.GetValue()) 106 } 107 if !model.LabelName(l.GetName()).IsValid() { 108 return fmt.Errorf("invalid label name %q", l.GetName()) 109 } 110 } 111 } 112 return nil 113} 114 115// textDecoder implements the Decoder interface for the text protocol. 116type textDecoder struct { 117 r io.Reader 118 p TextParser 119 fams []*dto.MetricFamily 120} 121 122// Decode implements the Decoder interface. 123func (d *textDecoder) Decode(v *dto.MetricFamily) error { 124 // TODO(fabxc): Wrap this as a line reader to make streaming safer. 125 if len(d.fams) == 0 { 126 // No cached metric families, read everything and parse metrics. 127 fams, err := d.p.TextToMetricFamilies(d.r) 128 if err != nil { 129 return err 130 } 131 if len(fams) == 0 { 132 return io.EOF 133 } 134 d.fams = make([]*dto.MetricFamily, 0, len(fams)) 135 for _, f := range fams { 136 d.fams = append(d.fams, f) 137 } 138 } 139 140 *v = *d.fams[0] 141 d.fams = d.fams[1:] 142 143 return nil 144} 145 146// SampleDecoder wraps a Decoder to extract samples from the metric families 147// decoded by the wrapped Decoder. 148type SampleDecoder struct { 149 Dec Decoder 150 Opts *DecodeOptions 151 152 f dto.MetricFamily 153} 154 155// Decode calls the Decode method of the wrapped Decoder and then extracts the 156// samples from the decoded MetricFamily into the provided model.Vector. 157func (sd *SampleDecoder) Decode(s *model.Vector) error { 158 err := sd.Dec.Decode(&sd.f) 159 if err != nil { 160 return err 161 } 162 *s, err = extractSamples(&sd.f, sd.Opts) 163 return err 164} 165 166// ExtractSamples builds a slice of samples from the provided metric 167// families. If an error occurrs during sample extraction, it continues to 168// extract from the remaining metric families. The returned error is the last 169// error that has occurred. 170func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) { 171 var ( 172 all model.Vector 173 lastErr error 174 ) 175 for _, f := range fams { 176 some, err := extractSamples(f, o) 177 if err != nil { 178 lastErr = err 179 continue 180 } 181 all = append(all, some...) 182 } 183 return all, lastErr 184} 185 186func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) { 187 switch f.GetType() { 188 case dto.MetricType_COUNTER: 189 return extractCounter(o, f), nil 190 case dto.MetricType_GAUGE: 191 return extractGauge(o, f), nil 192 case dto.MetricType_SUMMARY: 193 return extractSummary(o, f), nil 194 case dto.MetricType_UNTYPED: 195 return extractUntyped(o, f), nil 196 case dto.MetricType_HISTOGRAM: 197 return extractHistogram(o, f), nil 198 } 199 return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType()) 200} 201 202func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector { 203 samples := make(model.Vector, 0, len(f.Metric)) 204 205 for _, m := range f.Metric { 206 if m.Counter == nil { 207 continue 208 } 209 210 lset := make(model.LabelSet, len(m.Label)+1) 211 for _, p := range m.Label { 212 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 213 } 214 lset[model.MetricNameLabel] = model.LabelValue(f.GetName()) 215 216 smpl := &model.Sample{ 217 Metric: model.Metric(lset), 218 Value: model.SampleValue(m.Counter.GetValue()), 219 } 220 221 if m.TimestampMs != nil { 222 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000) 223 } else { 224 smpl.Timestamp = o.Timestamp 225 } 226 227 samples = append(samples, smpl) 228 } 229 230 return samples 231} 232 233func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector { 234 samples := make(model.Vector, 0, len(f.Metric)) 235 236 for _, m := range f.Metric { 237 if m.Gauge == nil { 238 continue 239 } 240 241 lset := make(model.LabelSet, len(m.Label)+1) 242 for _, p := range m.Label { 243 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 244 } 245 lset[model.MetricNameLabel] = model.LabelValue(f.GetName()) 246 247 smpl := &model.Sample{ 248 Metric: model.Metric(lset), 249 Value: model.SampleValue(m.Gauge.GetValue()), 250 } 251 252 if m.TimestampMs != nil { 253 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000) 254 } else { 255 smpl.Timestamp = o.Timestamp 256 } 257 258 samples = append(samples, smpl) 259 } 260 261 return samples 262} 263 264func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector { 265 samples := make(model.Vector, 0, len(f.Metric)) 266 267 for _, m := range f.Metric { 268 if m.Untyped == nil { 269 continue 270 } 271 272 lset := make(model.LabelSet, len(m.Label)+1) 273 for _, p := range m.Label { 274 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 275 } 276 lset[model.MetricNameLabel] = model.LabelValue(f.GetName()) 277 278 smpl := &model.Sample{ 279 Metric: model.Metric(lset), 280 Value: model.SampleValue(m.Untyped.GetValue()), 281 } 282 283 if m.TimestampMs != nil { 284 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000) 285 } else { 286 smpl.Timestamp = o.Timestamp 287 } 288 289 samples = append(samples, smpl) 290 } 291 292 return samples 293} 294 295func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector { 296 samples := make(model.Vector, 0, len(f.Metric)) 297 298 for _, m := range f.Metric { 299 if m.Summary == nil { 300 continue 301 } 302 303 timestamp := o.Timestamp 304 if m.TimestampMs != nil { 305 timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000) 306 } 307 308 for _, q := range m.Summary.Quantile { 309 lset := make(model.LabelSet, len(m.Label)+2) 310 for _, p := range m.Label { 311 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 312 } 313 // BUG(matt): Update other names to "quantile". 314 lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile())) 315 lset[model.MetricNameLabel] = model.LabelValue(f.GetName()) 316 317 samples = append(samples, &model.Sample{ 318 Metric: model.Metric(lset), 319 Value: model.SampleValue(q.GetValue()), 320 Timestamp: timestamp, 321 }) 322 } 323 324 lset := make(model.LabelSet, len(m.Label)+1) 325 for _, p := range m.Label { 326 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 327 } 328 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum") 329 330 samples = append(samples, &model.Sample{ 331 Metric: model.Metric(lset), 332 Value: model.SampleValue(m.Summary.GetSampleSum()), 333 Timestamp: timestamp, 334 }) 335 336 lset = make(model.LabelSet, len(m.Label)+1) 337 for _, p := range m.Label { 338 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 339 } 340 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count") 341 342 samples = append(samples, &model.Sample{ 343 Metric: model.Metric(lset), 344 Value: model.SampleValue(m.Summary.GetSampleCount()), 345 Timestamp: timestamp, 346 }) 347 } 348 349 return samples 350} 351 352func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector { 353 samples := make(model.Vector, 0, len(f.Metric)) 354 355 for _, m := range f.Metric { 356 if m.Histogram == nil { 357 continue 358 } 359 360 timestamp := o.Timestamp 361 if m.TimestampMs != nil { 362 timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000) 363 } 364 365 infSeen := false 366 367 for _, q := range m.Histogram.Bucket { 368 lset := make(model.LabelSet, len(m.Label)+2) 369 for _, p := range m.Label { 370 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 371 } 372 lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound())) 373 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket") 374 375 if math.IsInf(q.GetUpperBound(), +1) { 376 infSeen = true 377 } 378 379 samples = append(samples, &model.Sample{ 380 Metric: model.Metric(lset), 381 Value: model.SampleValue(q.GetCumulativeCount()), 382 Timestamp: timestamp, 383 }) 384 } 385 386 lset := make(model.LabelSet, len(m.Label)+1) 387 for _, p := range m.Label { 388 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 389 } 390 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum") 391 392 samples = append(samples, &model.Sample{ 393 Metric: model.Metric(lset), 394 Value: model.SampleValue(m.Histogram.GetSampleSum()), 395 Timestamp: timestamp, 396 }) 397 398 lset = make(model.LabelSet, len(m.Label)+1) 399 for _, p := range m.Label { 400 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 401 } 402 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count") 403 404 count := &model.Sample{ 405 Metric: model.Metric(lset), 406 Value: model.SampleValue(m.Histogram.GetSampleCount()), 407 Timestamp: timestamp, 408 } 409 samples = append(samples, count) 410 411 if !infSeen { 412 // Append an infinity bucket sample. 413 lset := make(model.LabelSet, len(m.Label)+2) 414 for _, p := range m.Label { 415 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) 416 } 417 lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf") 418 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket") 419 420 samples = append(samples, &model.Sample{ 421 Metric: model.Metric(lset), 422 Value: count.Value, 423 Timestamp: timestamp, 424 }) 425 } 426 } 427 428 return samples 429} 430