1// Copyright 2014 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package prometheus 15 16import ( 17 "fmt" 18 "hash/fnv" 19 "math" 20 "sort" 21 "sync" 22 "time" 23 24 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/beorn7/perks/quantile" 25 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/protobuf/proto" 26 dto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_model/go" 27) 28 29// quantileLabel is used for the label that defines the quantile in a 30// summary. 31const quantileLabel = "quantile" 32 33// A Summary captures individual observations from an event or sample stream and 34// summarizes them in a manner similar to traditional summary statistics: 1. sum 35// of observations, 2. observation count, 3. rank estimations. 36// 37// A typical use-case is the observation of request latencies. By default, a 38// Summary provides the median, the 90th and the 99th percentile of the latency 39// as rank estimations. 40// 41// Note that the rank estimations cannot be aggregated in a meaningful way with 42// the Prometheus query language (i.e. you cannot average or add them). If you 43// need aggregatable quantiles (e.g. you want the 99th percentile latency of all 44// queries served across all instances of a service), consider the Histogram 45// metric type. See the Prometheus documentation for more details. 46// 47// To create Summary instances, use NewSummary. 48type Summary interface { 49 Metric 50 Collector 51 52 // Observe adds a single observation to the summary. 53 Observe(float64) 54} 55 56var ( 57 // DefObjectives are the default Summary quantile values. 58 DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} 59 60 errQuantileLabelNotAllowed = fmt.Errorf( 61 "%q is not allowed as label name in summaries", quantileLabel, 62 ) 63) 64 65// Default values for SummaryOpts. 66const ( 67 // DefMaxAge is the default duration for which observations stay 68 // relevant. 69 DefMaxAge time.Duration = 10 * time.Minute 70 // DefAgeBuckets is the default number of buckets used to calculate the 71 // age of observations. 72 DefAgeBuckets = 5 73 // DefBufCap is the standard buffer size for collecting Summary observations. 74 DefBufCap = 500 75) 76 77// SummaryOpts bundles the options for creating a Summary metric. It is 78// mandatory to set Name and Help to a non-empty string. All other fields are 79// optional and can safely be left at their zero value. 80type SummaryOpts struct { 81 // Namespace, Subsystem, and Name are components of the fully-qualified 82 // name of the Summary (created by joining these components with 83 // "_"). Only Name is mandatory, the others merely help structuring the 84 // name. Note that the fully-qualified name of the Summary must be a 85 // valid Prometheus metric name. 86 Namespace string 87 Subsystem string 88 Name string 89 90 // Help provides information about this Summary. Mandatory! 91 // 92 // Metrics with the same fully-qualified name must have the same Help 93 // string. 94 Help string 95 96 // ConstLabels are used to attach fixed labels to this 97 // Summary. Summaries with the same fully-qualified name must have the 98 // same label names in their ConstLabels. 99 // 100 // Note that in most cases, labels have a value that varies during the 101 // lifetime of a process. Those labels are usually managed with a 102 // SummaryVec. ConstLabels serve only special purposes. One is for the 103 // special case where the value of a label does not change during the 104 // lifetime of a process, e.g. if the revision of the running binary is 105 // put into a label. Another, more advanced purpose is if more than one 106 // Collector needs to collect Summaries with the same fully-qualified 107 // name. In that case, those Summaries must differ in the values of 108 // their ConstLabels. See the Collector examples. 109 // 110 // If the value of a label never changes (not even between binaries), 111 // that label most likely should not be a label at all (but part of the 112 // metric name). 113 ConstLabels Labels 114 115 // Objectives defines the quantile rank estimates with their respective 116 // absolute error. If Objectives[q] = e, then the value reported 117 // for q will be the φ-quantile value for some φ between q-e and q+e. 118 // The default value is DefObjectives. 119 Objectives map[float64]float64 120 121 // MaxAge defines the duration for which an observation stays relevant 122 // for the summary. Must be positive. The default value is DefMaxAge. 123 MaxAge time.Duration 124 125 // AgeBuckets is the number of buckets used to exclude observations that 126 // are older than MaxAge from the summary. A higher number has a 127 // resource penalty, so only increase it if the higher resolution is 128 // really required. For very high observation rates, you might want to 129 // reduce the number of age buckets. With only one age bucket, you will 130 // effectively see a complete reset of the summary each time MaxAge has 131 // passed. The default value is DefAgeBuckets. 132 AgeBuckets uint32 133 134 // BufCap defines the default sample stream buffer size. The default 135 // value of DefBufCap should suffice for most uses. If there is a need 136 // to increase the value, a multiple of 500 is recommended (because that 137 // is the internal buffer size of the underlying package 138 // "github.com/bmizerany/perks/quantile"). 139 BufCap uint32 140} 141 142// TODO: Great fuck-up with the sliding-window decay algorithm... The Merge 143// method of perk/quantile is actually not working as advertised - and it might 144// be unfixable, as the underlying algorithm is apparently not capable of 145// merging summaries in the first place. To avoid using Merge, we are currently 146// adding observations to _each_ age bucket, i.e. the effort to add a sample is 147// essentially multiplied by the number of age buckets. When rotating age 148// buckets, we empty the previous head stream. On scrape time, we simply take 149// the quantiles from the head stream (no merging required). Result: More effort 150// on observation time, less effort on scrape time, which is exactly the 151// opposite of what we try to accomplish, but at least the results are correct. 152// 153// The quite elegant previous contraption to merge the age buckets efficiently 154// on scrape time (see code up commit 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0) 155// can't be used anymore. 156 157// NewSummary creates a new Summary based on the provided SummaryOpts. 158func NewSummary(opts SummaryOpts) Summary { 159 return newSummary( 160 NewDesc( 161 BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), 162 opts.Help, 163 nil, 164 opts.ConstLabels, 165 ), 166 opts, 167 ) 168} 169 170func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { 171 if len(desc.variableLabels) != len(labelValues) { 172 panic(errInconsistentCardinality) 173 } 174 175 for _, n := range desc.variableLabels { 176 if n == quantileLabel { 177 panic(errQuantileLabelNotAllowed) 178 } 179 } 180 for _, lp := range desc.constLabelPairs { 181 if lp.GetName() == quantileLabel { 182 panic(errQuantileLabelNotAllowed) 183 } 184 } 185 186 if len(opts.Objectives) == 0 { 187 opts.Objectives = DefObjectives 188 } 189 190 if opts.MaxAge < 0 { 191 panic(fmt.Errorf("illegal max age MaxAge=%v", opts.MaxAge)) 192 } 193 if opts.MaxAge == 0 { 194 opts.MaxAge = DefMaxAge 195 } 196 197 if opts.AgeBuckets == 0 { 198 opts.AgeBuckets = DefAgeBuckets 199 } 200 201 if opts.BufCap == 0 { 202 opts.BufCap = DefBufCap 203 } 204 205 s := &summary{ 206 desc: desc, 207 208 objectives: opts.Objectives, 209 sortedObjectives: make([]float64, 0, len(opts.Objectives)), 210 211 labelPairs: makeLabelPairs(desc, labelValues), 212 213 hotBuf: make([]float64, 0, opts.BufCap), 214 coldBuf: make([]float64, 0, opts.BufCap), 215 streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets), 216 } 217 s.headStreamExpTime = time.Now().Add(s.streamDuration) 218 s.hotBufExpTime = s.headStreamExpTime 219 220 for i := uint32(0); i < opts.AgeBuckets; i++ { 221 s.streams = append(s.streams, s.newStream()) 222 } 223 s.headStream = s.streams[0] 224 225 for qu := range s.objectives { 226 s.sortedObjectives = append(s.sortedObjectives, qu) 227 } 228 sort.Float64s(s.sortedObjectives) 229 230 s.Init(s) // Init self-collection. 231 return s 232} 233 234type summary struct { 235 SelfCollector 236 237 bufMtx sync.Mutex // Protects hotBuf and hotBufExpTime. 238 mtx sync.Mutex // Protects every other moving part. 239 // Lock bufMtx before mtx if both are needed. 240 241 desc *Desc 242 243 objectives map[float64]float64 244 sortedObjectives []float64 245 246 labelPairs []*dto.LabelPair 247 248 sum float64 249 cnt uint64 250 251 hotBuf, coldBuf []float64 252 253 streams []*quantile.Stream 254 streamDuration time.Duration 255 headStream *quantile.Stream 256 headStreamIdx int 257 headStreamExpTime, hotBufExpTime time.Time 258} 259 260func (s *summary) Desc() *Desc { 261 return s.desc 262} 263 264func (s *summary) Observe(v float64) { 265 s.bufMtx.Lock() 266 defer s.bufMtx.Unlock() 267 268 now := time.Now() 269 if now.After(s.hotBufExpTime) { 270 s.asyncFlush(now) 271 } 272 s.hotBuf = append(s.hotBuf, v) 273 if len(s.hotBuf) == cap(s.hotBuf) { 274 s.asyncFlush(now) 275 } 276} 277 278func (s *summary) Write(out *dto.Metric) error { 279 sum := &dto.Summary{} 280 qs := make([]*dto.Quantile, 0, len(s.objectives)) 281 282 s.bufMtx.Lock() 283 s.mtx.Lock() 284 // Swap bufs even if hotBuf is empty to set new hotBufExpTime. 285 s.swapBufs(time.Now()) 286 s.bufMtx.Unlock() 287 288 s.flushColdBuf() 289 sum.SampleCount = proto.Uint64(s.cnt) 290 sum.SampleSum = proto.Float64(s.sum) 291 292 for _, rank := range s.sortedObjectives { 293 var q float64 294 if s.headStream.Count() == 0 { 295 q = math.NaN() 296 } else { 297 q = s.headStream.Query(rank) 298 } 299 qs = append(qs, &dto.Quantile{ 300 Quantile: proto.Float64(rank), 301 Value: proto.Float64(q), 302 }) 303 } 304 305 s.mtx.Unlock() 306 307 if len(qs) > 0 { 308 sort.Sort(quantSort(qs)) 309 } 310 sum.Quantile = qs 311 312 out.Summary = sum 313 out.Label = s.labelPairs 314 return nil 315} 316 317func (s *summary) newStream() *quantile.Stream { 318 return quantile.NewTargeted(s.objectives) 319} 320 321// asyncFlush needs bufMtx locked. 322func (s *summary) asyncFlush(now time.Time) { 323 s.mtx.Lock() 324 s.swapBufs(now) 325 326 // Unblock the original goroutine that was responsible for the mutation 327 // that triggered the compaction. But hold onto the global non-buffer 328 // state mutex until the operation finishes. 329 go func() { 330 s.flushColdBuf() 331 s.mtx.Unlock() 332 }() 333} 334 335// rotateStreams needs mtx AND bufMtx locked. 336func (s *summary) maybeRotateStreams() { 337 for !s.hotBufExpTime.Equal(s.headStreamExpTime) { 338 s.headStream.Reset() 339 s.headStreamIdx++ 340 if s.headStreamIdx >= len(s.streams) { 341 s.headStreamIdx = 0 342 } 343 s.headStream = s.streams[s.headStreamIdx] 344 s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration) 345 } 346} 347 348// flushColdBuf needs mtx locked. 349func (s *summary) flushColdBuf() { 350 for _, v := range s.coldBuf { 351 for _, stream := range s.streams { 352 stream.Insert(v) 353 } 354 s.cnt++ 355 s.sum += v 356 } 357 s.coldBuf = s.coldBuf[0:0] 358 s.maybeRotateStreams() 359} 360 361// swapBufs needs mtx AND bufMtx locked, coldBuf must be empty. 362func (s *summary) swapBufs(now time.Time) { 363 if len(s.coldBuf) != 0 { 364 panic("coldBuf is not empty") 365 } 366 s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf 367 // hotBuf is now empty and gets new expiration set. 368 for now.After(s.hotBufExpTime) { 369 s.hotBufExpTime = s.hotBufExpTime.Add(s.streamDuration) 370 } 371} 372 373type quantSort []*dto.Quantile 374 375func (s quantSort) Len() int { 376 return len(s) 377} 378 379func (s quantSort) Swap(i, j int) { 380 s[i], s[j] = s[j], s[i] 381} 382 383func (s quantSort) Less(i, j int) bool { 384 return s[i].GetQuantile() < s[j].GetQuantile() 385} 386 387// SummaryVec is a Collector that bundles a set of Summaries that all share the 388// same Desc, but have different values for their variable labels. This is used 389// if you want to count the same thing partitioned by various dimensions 390// (e.g. HTTP request latencies, partitioned by status code and method). Create 391// instances with NewSummaryVec. 392type SummaryVec struct { 393 MetricVec 394} 395 396// NewSummaryVec creates a new SummaryVec based on the provided SummaryOpts and 397// partitioned by the given label names. At least one label name must be 398// provided. 399func NewSummaryVec(opts SummaryOpts, labelNames []string) *SummaryVec { 400 desc := NewDesc( 401 BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), 402 opts.Help, 403 labelNames, 404 opts.ConstLabels, 405 ) 406 return &SummaryVec{ 407 MetricVec: MetricVec{ 408 children: map[uint64]Metric{}, 409 desc: desc, 410 hash: fnv.New64a(), 411 newMetric: func(lvs ...string) Metric { 412 return newSummary(desc, opts, lvs...) 413 }, 414 }, 415 } 416} 417 418// GetMetricWithLabelValues replaces the method of the same name in 419// MetricVec. The difference is that this method returns a Summary and not a 420// Metric so that no type conversion is required. 421func (m *SummaryVec) GetMetricWithLabelValues(lvs ...string) (Summary, error) { 422 metric, err := m.MetricVec.GetMetricWithLabelValues(lvs...) 423 if metric != nil { 424 return metric.(Summary), err 425 } 426 return nil, err 427} 428 429// GetMetricWith replaces the method of the same name in MetricVec. The 430// difference is that this method returns a Summary and not a Metric so that no 431// type conversion is required. 432func (m *SummaryVec) GetMetricWith(labels Labels) (Summary, error) { 433 metric, err := m.MetricVec.GetMetricWith(labels) 434 if metric != nil { 435 return metric.(Summary), err 436 } 437 return nil, err 438} 439 440// WithLabelValues works as GetMetricWithLabelValues, but panics where 441// GetMetricWithLabelValues would have returned an error. By not returning an 442// error, WithLabelValues allows shortcuts like 443// myVec.WithLabelValues("404", "GET").Observe(42.21) 444func (m *SummaryVec) WithLabelValues(lvs ...string) Summary { 445 return m.MetricVec.WithLabelValues(lvs...).(Summary) 446} 447 448// With works as GetMetricWith, but panics where GetMetricWithLabels would have 449// returned an error. By not returning an error, With allows shortcuts like 450// myVec.With(Labels{"code": "404", "method": "GET"}).Observe(42.21) 451func (m *SummaryVec) With(labels Labels) Summary { 452 return m.MetricVec.With(labels).(Summary) 453} 454 455type constSummary struct { 456 desc *Desc 457 count uint64 458 sum float64 459 quantiles map[float64]float64 460 labelPairs []*dto.LabelPair 461} 462 463func (s *constSummary) Desc() *Desc { 464 return s.desc 465} 466 467func (s *constSummary) Write(out *dto.Metric) error { 468 sum := &dto.Summary{} 469 qs := make([]*dto.Quantile, 0, len(s.quantiles)) 470 471 sum.SampleCount = proto.Uint64(s.count) 472 sum.SampleSum = proto.Float64(s.sum) 473 474 for rank, q := range s.quantiles { 475 qs = append(qs, &dto.Quantile{ 476 Quantile: proto.Float64(rank), 477 Value: proto.Float64(q), 478 }) 479 } 480 481 if len(qs) > 0 { 482 sort.Sort(quantSort(qs)) 483 } 484 sum.Quantile = qs 485 486 out.Summary = sum 487 out.Label = s.labelPairs 488 489 return nil 490} 491 492// NewConstSummary returns a metric representing a Prometheus summary with fixed 493// values for the count, sum, and quantiles. As those parameters cannot be 494// changed, the returned value does not implement the Summary interface (but 495// only the Metric interface). Users of this package will not have much use for 496// it in regular operations. However, when implementing custom Collectors, it is 497// useful as a throw-away metric that is generated on the fly to send it to 498// Prometheus in the Collect method. 499// 500// quantiles maps ranks to quantile values. For example, a median latency of 501// 0.23s and a 99th percentile latency of 0.56s would be expressed as: 502// map[float64]float64{0.5: 0.23, 0.99: 0.56} 503// 504// NewConstSummary returns an error if the length of labelValues is not 505// consistent with the variable labels in Desc. 506func NewConstSummary( 507 desc *Desc, 508 count uint64, 509 sum float64, 510 quantiles map[float64]float64, 511 labelValues ...string, 512) (Metric, error) { 513 if len(desc.variableLabels) != len(labelValues) { 514 return nil, errInconsistentCardinality 515 } 516 return &constSummary{ 517 desc: desc, 518 count: count, 519 sum: sum, 520 quantiles: quantiles, 521 labelPairs: makeLabelPairs(desc, labelValues), 522 }, nil 523} 524 525// MustNewConstSummary is a version of NewConstSummary that panics where 526// NewConstMetric would have returned an error. 527func MustNewConstSummary( 528 desc *Desc, 529 count uint64, 530 sum float64, 531 quantiles map[float64]float64, 532 labelValues ...string, 533) Metric { 534 m, err := NewConstSummary(desc, count, sum, quantiles, labelValues...) 535 if err != nil { 536 panic(err) 537 } 538 return m 539} 540