1// Copyright 2014 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package prometheus
15
16import (
17	"fmt"
18	"hash/fnv"
19	"math"
20	"sort"
21	"sync"
22	"time"
23
24	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/beorn7/perks/quantile"
25	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/protobuf/proto"
26	dto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_model/go"
27)
28
29// quantileLabel is used for the label that defines the quantile in a
30// summary.
31const quantileLabel = "quantile"
32
33// A Summary captures individual observations from an event or sample stream and
34// summarizes them in a manner similar to traditional summary statistics: 1. sum
35// of observations, 2. observation count, 3. rank estimations.
36//
37// A typical use-case is the observation of request latencies. By default, a
38// Summary provides the median, the 90th and the 99th percentile of the latency
39// as rank estimations.
40//
41// Note that the rank estimations cannot be aggregated in a meaningful way with
42// the Prometheus query language (i.e. you cannot average or add them). If you
43// need aggregatable quantiles (e.g. you want the 99th percentile latency of all
44// queries served across all instances of a service), consider the Histogram
45// metric type. See the Prometheus documentation for more details.
46//
47// To create Summary instances, use NewSummary.
48type Summary interface {
49	Metric
50	Collector
51
52	// Observe adds a single observation to the summary.
53	Observe(float64)
54}
55
56var (
57	// DefObjectives are the default Summary quantile values.
58	DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
59
60	errQuantileLabelNotAllowed = fmt.Errorf(
61		"%q is not allowed as label name in summaries", quantileLabel,
62	)
63)
64
65// Default values for SummaryOpts.
66const (
67	// DefMaxAge is the default duration for which observations stay
68	// relevant.
69	DefMaxAge time.Duration = 10 * time.Minute
70	// DefAgeBuckets is the default number of buckets used to calculate the
71	// age of observations.
72	DefAgeBuckets = 5
73	// DefBufCap is the standard buffer size for collecting Summary observations.
74	DefBufCap = 500
75)
76
77// SummaryOpts bundles the options for creating a Summary metric. It is
78// mandatory to set Name and Help to a non-empty string. All other fields are
79// optional and can safely be left at their zero value.
80type SummaryOpts struct {
81	// Namespace, Subsystem, and Name are components of the fully-qualified
82	// name of the Summary (created by joining these components with
83	// "_"). Only Name is mandatory, the others merely help structuring the
84	// name. Note that the fully-qualified name of the Summary must be a
85	// valid Prometheus metric name.
86	Namespace string
87	Subsystem string
88	Name      string
89
90	// Help provides information about this Summary. Mandatory!
91	//
92	// Metrics with the same fully-qualified name must have the same Help
93	// string.
94	Help string
95
96	// ConstLabels are used to attach fixed labels to this
97	// Summary. Summaries with the same fully-qualified name must have the
98	// same label names in their ConstLabels.
99	//
100	// Note that in most cases, labels have a value that varies during the
101	// lifetime of a process. Those labels are usually managed with a
102	// SummaryVec. ConstLabels serve only special purposes. One is for the
103	// special case where the value of a label does not change during the
104	// lifetime of a process, e.g. if the revision of the running binary is
105	// put into a label. Another, more advanced purpose is if more than one
106	// Collector needs to collect Summaries with the same fully-qualified
107	// name. In that case, those Summaries must differ in the values of
108	// their ConstLabels. See the Collector examples.
109	//
110	// If the value of a label never changes (not even between binaries),
111	// that label most likely should not be a label at all (but part of the
112	// metric name).
113	ConstLabels Labels
114
115	// Objectives defines the quantile rank estimates with their respective
116	// absolute error. If Objectives[q] = e, then the value reported
117	// for q will be the φ-quantile value for some φ between q-e and q+e.
118	// The default value is DefObjectives.
119	Objectives map[float64]float64
120
121	// MaxAge defines the duration for which an observation stays relevant
122	// for the summary. Must be positive. The default value is DefMaxAge.
123	MaxAge time.Duration
124
125	// AgeBuckets is the number of buckets used to exclude observations that
126	// are older than MaxAge from the summary. A higher number has a
127	// resource penalty, so only increase it if the higher resolution is
128	// really required. For very high observation rates, you might want to
129	// reduce the number of age buckets. With only one age bucket, you will
130	// effectively see a complete reset of the summary each time MaxAge has
131	// passed. The default value is DefAgeBuckets.
132	AgeBuckets uint32
133
134	// BufCap defines the default sample stream buffer size.  The default
135	// value of DefBufCap should suffice for most uses. If there is a need
136	// to increase the value, a multiple of 500 is recommended (because that
137	// is the internal buffer size of the underlying package
138	// "github.com/bmizerany/perks/quantile").
139	BufCap uint32
140}
141
142// TODO: Great fuck-up with the sliding-window decay algorithm... The Merge
143// method of perk/quantile is actually not working as advertised - and it might
144// be unfixable, as the underlying algorithm is apparently not capable of
145// merging summaries in the first place. To avoid using Merge, we are currently
146// adding observations to _each_ age bucket, i.e. the effort to add a sample is
147// essentially multiplied by the number of age buckets. When rotating age
148// buckets, we empty the previous head stream. On scrape time, we simply take
149// the quantiles from the head stream (no merging required). Result: More effort
150// on observation time, less effort on scrape time, which is exactly the
151// opposite of what we try to accomplish, but at least the results are correct.
152//
153// The quite elegant previous contraption to merge the age buckets efficiently
154// on scrape time (see code up commit 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0)
155// can't be used anymore.
156
157// NewSummary creates a new Summary based on the provided SummaryOpts.
158func NewSummary(opts SummaryOpts) Summary {
159	return newSummary(
160		NewDesc(
161			BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
162			opts.Help,
163			nil,
164			opts.ConstLabels,
165		),
166		opts,
167	)
168}
169
170func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
171	if len(desc.variableLabels) != len(labelValues) {
172		panic(errInconsistentCardinality)
173	}
174
175	for _, n := range desc.variableLabels {
176		if n == quantileLabel {
177			panic(errQuantileLabelNotAllowed)
178		}
179	}
180	for _, lp := range desc.constLabelPairs {
181		if lp.GetName() == quantileLabel {
182			panic(errQuantileLabelNotAllowed)
183		}
184	}
185
186	if len(opts.Objectives) == 0 {
187		opts.Objectives = DefObjectives
188	}
189
190	if opts.MaxAge < 0 {
191		panic(fmt.Errorf("illegal max age MaxAge=%v", opts.MaxAge))
192	}
193	if opts.MaxAge == 0 {
194		opts.MaxAge = DefMaxAge
195	}
196
197	if opts.AgeBuckets == 0 {
198		opts.AgeBuckets = DefAgeBuckets
199	}
200
201	if opts.BufCap == 0 {
202		opts.BufCap = DefBufCap
203	}
204
205	s := &summary{
206		desc: desc,
207
208		objectives:       opts.Objectives,
209		sortedObjectives: make([]float64, 0, len(opts.Objectives)),
210
211		labelPairs: makeLabelPairs(desc, labelValues),
212
213		hotBuf:         make([]float64, 0, opts.BufCap),
214		coldBuf:        make([]float64, 0, opts.BufCap),
215		streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets),
216	}
217	s.headStreamExpTime = time.Now().Add(s.streamDuration)
218	s.hotBufExpTime = s.headStreamExpTime
219
220	for i := uint32(0); i < opts.AgeBuckets; i++ {
221		s.streams = append(s.streams, s.newStream())
222	}
223	s.headStream = s.streams[0]
224
225	for qu := range s.objectives {
226		s.sortedObjectives = append(s.sortedObjectives, qu)
227	}
228	sort.Float64s(s.sortedObjectives)
229
230	s.Init(s) // Init self-collection.
231	return s
232}
233
234type summary struct {
235	SelfCollector
236
237	bufMtx sync.Mutex // Protects hotBuf and hotBufExpTime.
238	mtx    sync.Mutex // Protects every other moving part.
239	// Lock bufMtx before mtx if both are needed.
240
241	desc *Desc
242
243	objectives       map[float64]float64
244	sortedObjectives []float64
245
246	labelPairs []*dto.LabelPair
247
248	sum float64
249	cnt uint64
250
251	hotBuf, coldBuf []float64
252
253	streams                          []*quantile.Stream
254	streamDuration                   time.Duration
255	headStream                       *quantile.Stream
256	headStreamIdx                    int
257	headStreamExpTime, hotBufExpTime time.Time
258}
259
260func (s *summary) Desc() *Desc {
261	return s.desc
262}
263
264func (s *summary) Observe(v float64) {
265	s.bufMtx.Lock()
266	defer s.bufMtx.Unlock()
267
268	now := time.Now()
269	if now.After(s.hotBufExpTime) {
270		s.asyncFlush(now)
271	}
272	s.hotBuf = append(s.hotBuf, v)
273	if len(s.hotBuf) == cap(s.hotBuf) {
274		s.asyncFlush(now)
275	}
276}
277
278func (s *summary) Write(out *dto.Metric) error {
279	sum := &dto.Summary{}
280	qs := make([]*dto.Quantile, 0, len(s.objectives))
281
282	s.bufMtx.Lock()
283	s.mtx.Lock()
284	// Swap bufs even if hotBuf is empty to set new hotBufExpTime.
285	s.swapBufs(time.Now())
286	s.bufMtx.Unlock()
287
288	s.flushColdBuf()
289	sum.SampleCount = proto.Uint64(s.cnt)
290	sum.SampleSum = proto.Float64(s.sum)
291
292	for _, rank := range s.sortedObjectives {
293		var q float64
294		if s.headStream.Count() == 0 {
295			q = math.NaN()
296		} else {
297			q = s.headStream.Query(rank)
298		}
299		qs = append(qs, &dto.Quantile{
300			Quantile: proto.Float64(rank),
301			Value:    proto.Float64(q),
302		})
303	}
304
305	s.mtx.Unlock()
306
307	if len(qs) > 0 {
308		sort.Sort(quantSort(qs))
309	}
310	sum.Quantile = qs
311
312	out.Summary = sum
313	out.Label = s.labelPairs
314	return nil
315}
316
317func (s *summary) newStream() *quantile.Stream {
318	return quantile.NewTargeted(s.objectives)
319}
320
321// asyncFlush needs bufMtx locked.
322func (s *summary) asyncFlush(now time.Time) {
323	s.mtx.Lock()
324	s.swapBufs(now)
325
326	// Unblock the original goroutine that was responsible for the mutation
327	// that triggered the compaction.  But hold onto the global non-buffer
328	// state mutex until the operation finishes.
329	go func() {
330		s.flushColdBuf()
331		s.mtx.Unlock()
332	}()
333}
334
335// rotateStreams needs mtx AND bufMtx locked.
336func (s *summary) maybeRotateStreams() {
337	for !s.hotBufExpTime.Equal(s.headStreamExpTime) {
338		s.headStream.Reset()
339		s.headStreamIdx++
340		if s.headStreamIdx >= len(s.streams) {
341			s.headStreamIdx = 0
342		}
343		s.headStream = s.streams[s.headStreamIdx]
344		s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration)
345	}
346}
347
348// flushColdBuf needs mtx locked.
349func (s *summary) flushColdBuf() {
350	for _, v := range s.coldBuf {
351		for _, stream := range s.streams {
352			stream.Insert(v)
353		}
354		s.cnt++
355		s.sum += v
356	}
357	s.coldBuf = s.coldBuf[0:0]
358	s.maybeRotateStreams()
359}
360
361// swapBufs needs mtx AND bufMtx locked, coldBuf must be empty.
362func (s *summary) swapBufs(now time.Time) {
363	if len(s.coldBuf) != 0 {
364		panic("coldBuf is not empty")
365	}
366	s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf
367	// hotBuf is now empty and gets new expiration set.
368	for now.After(s.hotBufExpTime) {
369		s.hotBufExpTime = s.hotBufExpTime.Add(s.streamDuration)
370	}
371}
372
373type quantSort []*dto.Quantile
374
375func (s quantSort) Len() int {
376	return len(s)
377}
378
379func (s quantSort) Swap(i, j int) {
380	s[i], s[j] = s[j], s[i]
381}
382
383func (s quantSort) Less(i, j int) bool {
384	return s[i].GetQuantile() < s[j].GetQuantile()
385}
386
387// SummaryVec is a Collector that bundles a set of Summaries that all share the
388// same Desc, but have different values for their variable labels. This is used
389// if you want to count the same thing partitioned by various dimensions
390// (e.g. HTTP request latencies, partitioned by status code and method). Create
391// instances with NewSummaryVec.
392type SummaryVec struct {
393	MetricVec
394}
395
396// NewSummaryVec creates a new SummaryVec based on the provided SummaryOpts and
397// partitioned by the given label names. At least one label name must be
398// provided.
399func NewSummaryVec(opts SummaryOpts, labelNames []string) *SummaryVec {
400	desc := NewDesc(
401		BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
402		opts.Help,
403		labelNames,
404		opts.ConstLabels,
405	)
406	return &SummaryVec{
407		MetricVec: MetricVec{
408			children: map[uint64]Metric{},
409			desc:     desc,
410			hash:     fnv.New64a(),
411			newMetric: func(lvs ...string) Metric {
412				return newSummary(desc, opts, lvs...)
413			},
414		},
415	}
416}
417
418// GetMetricWithLabelValues replaces the method of the same name in
419// MetricVec. The difference is that this method returns a Summary and not a
420// Metric so that no type conversion is required.
421func (m *SummaryVec) GetMetricWithLabelValues(lvs ...string) (Summary, error) {
422	metric, err := m.MetricVec.GetMetricWithLabelValues(lvs...)
423	if metric != nil {
424		return metric.(Summary), err
425	}
426	return nil, err
427}
428
429// GetMetricWith replaces the method of the same name in MetricVec. The
430// difference is that this method returns a Summary and not a Metric so that no
431// type conversion is required.
432func (m *SummaryVec) GetMetricWith(labels Labels) (Summary, error) {
433	metric, err := m.MetricVec.GetMetricWith(labels)
434	if metric != nil {
435		return metric.(Summary), err
436	}
437	return nil, err
438}
439
440// WithLabelValues works as GetMetricWithLabelValues, but panics where
441// GetMetricWithLabelValues would have returned an error. By not returning an
442// error, WithLabelValues allows shortcuts like
443//     myVec.WithLabelValues("404", "GET").Observe(42.21)
444func (m *SummaryVec) WithLabelValues(lvs ...string) Summary {
445	return m.MetricVec.WithLabelValues(lvs...).(Summary)
446}
447
448// With works as GetMetricWith, but panics where GetMetricWithLabels would have
449// returned an error. By not returning an error, With allows shortcuts like
450//     myVec.With(Labels{"code": "404", "method": "GET"}).Observe(42.21)
451func (m *SummaryVec) With(labels Labels) Summary {
452	return m.MetricVec.With(labels).(Summary)
453}
454
455type constSummary struct {
456	desc       *Desc
457	count      uint64
458	sum        float64
459	quantiles  map[float64]float64
460	labelPairs []*dto.LabelPair
461}
462
463func (s *constSummary) Desc() *Desc {
464	return s.desc
465}
466
467func (s *constSummary) Write(out *dto.Metric) error {
468	sum := &dto.Summary{}
469	qs := make([]*dto.Quantile, 0, len(s.quantiles))
470
471	sum.SampleCount = proto.Uint64(s.count)
472	sum.SampleSum = proto.Float64(s.sum)
473
474	for rank, q := range s.quantiles {
475		qs = append(qs, &dto.Quantile{
476			Quantile: proto.Float64(rank),
477			Value:    proto.Float64(q),
478		})
479	}
480
481	if len(qs) > 0 {
482		sort.Sort(quantSort(qs))
483	}
484	sum.Quantile = qs
485
486	out.Summary = sum
487	out.Label = s.labelPairs
488
489	return nil
490}
491
492// NewConstSummary returns a metric representing a Prometheus summary with fixed
493// values for the count, sum, and quantiles. As those parameters cannot be
494// changed, the returned value does not implement the Summary interface (but
495// only the Metric interface). Users of this package will not have much use for
496// it in regular operations. However, when implementing custom Collectors, it is
497// useful as a throw-away metric that is generated on the fly to send it to
498// Prometheus in the Collect method.
499//
500// quantiles maps ranks to quantile values. For example, a median latency of
501// 0.23s and a 99th percentile latency of 0.56s would be expressed as:
502//     map[float64]float64{0.5: 0.23, 0.99: 0.56}
503//
504// NewConstSummary returns an error if the length of labelValues is not
505// consistent with the variable labels in Desc.
506func NewConstSummary(
507	desc *Desc,
508	count uint64,
509	sum float64,
510	quantiles map[float64]float64,
511	labelValues ...string,
512) (Metric, error) {
513	if len(desc.variableLabels) != len(labelValues) {
514		return nil, errInconsistentCardinality
515	}
516	return &constSummary{
517		desc:       desc,
518		count:      count,
519		sum:        sum,
520		quantiles:  quantiles,
521		labelPairs: makeLabelPairs(desc, labelValues),
522	}, nil
523}
524
525// MustNewConstSummary is a version of NewConstSummary that panics where
526// NewConstMetric would have returned an error.
527func MustNewConstSummary(
528	desc *Desc,
529	count uint64,
530	sum float64,
531	quantiles map[float64]float64,
532	labelValues ...string,
533) Metric {
534	m, err := NewConstSummary(desc, count, sum, quantiles, labelValues...)
535	if err != nil {
536		panic(err)
537	}
538	return m
539}
540