1package metrics
2
3import (
4	"math"
5	"math/rand"
6	"sort"
7	"sync"
8	"time"
9)
10
11const rescaleThreshold = time.Hour
12
13// Samples maintain a statistically-significant selection of values from
14// a stream.
15type Sample interface {
16	Clear()
17	Count() int64
18	Max() int64
19	Mean() float64
20	Min() int64
21	Percentile(float64) float64
22	Percentiles([]float64) []float64
23	Size() int
24	Snapshot() Sample
25	StdDev() float64
26	Sum() int64
27	Update(int64)
28	Values() []int64
29	Variance() float64
30}
31
32// ExpDecaySample is an exponentially-decaying sample using a forward-decaying
33// priority reservoir.  See Cormode et al's "Forward Decay: A Practical Time
34// Decay Model for Streaming Systems".
35//
36// <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf>
37type ExpDecaySample struct {
38	alpha         float64
39	count         int64
40	mutex         sync.Mutex
41	reservoirSize int
42	t0, t1        time.Time
43	values        *expDecaySampleHeap
44}
45
46// NewExpDecaySample constructs a new exponentially-decaying sample with the
47// given reservoir size and alpha.
48func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
49	if UseNilMetrics {
50		return NilSample{}
51	}
52	s := &ExpDecaySample{
53		alpha:         alpha,
54		reservoirSize: reservoirSize,
55		t0:            time.Now(),
56		values:        newExpDecaySampleHeap(reservoirSize),
57	}
58	s.t1 = s.t0.Add(rescaleThreshold)
59	return s
60}
61
62// Clear clears all samples.
63func (s *ExpDecaySample) Clear() {
64	s.mutex.Lock()
65	defer s.mutex.Unlock()
66	s.count = 0
67	s.t0 = time.Now()
68	s.t1 = s.t0.Add(rescaleThreshold)
69	s.values.Clear()
70}
71
72// Count returns the number of samples recorded, which may exceed the
73// reservoir size.
74func (s *ExpDecaySample) Count() int64 {
75	s.mutex.Lock()
76	defer s.mutex.Unlock()
77	return s.count
78}
79
80// Max returns the maximum value in the sample, which may not be the maximum
81// value ever to be part of the sample.
82func (s *ExpDecaySample) Max() int64 {
83	return SampleMax(s.Values())
84}
85
86// Mean returns the mean of the values in the sample.
87func (s *ExpDecaySample) Mean() float64 {
88	return SampleMean(s.Values())
89}
90
91// Min returns the minimum value in the sample, which may not be the minimum
92// value ever to be part of the sample.
93func (s *ExpDecaySample) Min() int64 {
94	return SampleMin(s.Values())
95}
96
97// Percentile returns an arbitrary percentile of values in the sample.
98func (s *ExpDecaySample) Percentile(p float64) float64 {
99	return SamplePercentile(s.Values(), p)
100}
101
102// Percentiles returns a slice of arbitrary percentiles of values in the
103// sample.
104func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
105	return SamplePercentiles(s.Values(), ps)
106}
107
108// Size returns the size of the sample, which is at most the reservoir size.
109func (s *ExpDecaySample) Size() int {
110	s.mutex.Lock()
111	defer s.mutex.Unlock()
112	return s.values.Size()
113}
114
115// Snapshot returns a read-only copy of the sample.
116func (s *ExpDecaySample) Snapshot() Sample {
117	s.mutex.Lock()
118	defer s.mutex.Unlock()
119	vals := s.values.Values()
120	values := make([]int64, len(vals))
121	for i, v := range vals {
122		values[i] = v.v
123	}
124	return &SampleSnapshot{
125		count:  s.count,
126		values: values,
127	}
128}
129
130// StdDev returns the standard deviation of the values in the sample.
131func (s *ExpDecaySample) StdDev() float64 {
132	return SampleStdDev(s.Values())
133}
134
135// Sum returns the sum of the values in the sample.
136func (s *ExpDecaySample) Sum() int64 {
137	return SampleSum(s.Values())
138}
139
140// Update samples a new value.
141func (s *ExpDecaySample) Update(v int64) {
142	s.update(time.Now(), v)
143}
144
145// Values returns a copy of the values in the sample.
146func (s *ExpDecaySample) Values() []int64 {
147	s.mutex.Lock()
148	defer s.mutex.Unlock()
149	vals := s.values.Values()
150	values := make([]int64, len(vals))
151	for i, v := range vals {
152		values[i] = v.v
153	}
154	return values
155}
156
157// Variance returns the variance of the values in the sample.
158func (s *ExpDecaySample) Variance() float64 {
159	return SampleVariance(s.Values())
160}
161
162// update samples a new value at a particular timestamp.  This is a method all
163// its own to facilitate testing.
164func (s *ExpDecaySample) update(t time.Time, v int64) {
165	s.mutex.Lock()
166	defer s.mutex.Unlock()
167	s.count++
168	if s.values.Size() == s.reservoirSize {
169		s.values.Pop()
170	}
171	s.values.Push(expDecaySample{
172		k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
173		v: v,
174	})
175	if t.After(s.t1) {
176		values := s.values.Values()
177		t0 := s.t0
178		s.values.Clear()
179		s.t0 = t
180		s.t1 = s.t0.Add(rescaleThreshold)
181		for _, v := range values {
182			v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds())
183			s.values.Push(v)
184		}
185	}
186}
187
188// NilSample is a no-op Sample.
189type NilSample struct{}
190
191// Clear is a no-op.
192func (NilSample) Clear() {}
193
194// Count is a no-op.
195func (NilSample) Count() int64 { return 0 }
196
197// Max is a no-op.
198func (NilSample) Max() int64 { return 0 }
199
200// Mean is a no-op.
201func (NilSample) Mean() float64 { return 0.0 }
202
203// Min is a no-op.
204func (NilSample) Min() int64 { return 0 }
205
206// Percentile is a no-op.
207func (NilSample) Percentile(p float64) float64 { return 0.0 }
208
209// Percentiles is a no-op.
210func (NilSample) Percentiles(ps []float64) []float64 {
211	return make([]float64, len(ps))
212}
213
214// Size is a no-op.
215func (NilSample) Size() int { return 0 }
216
217// Sample is a no-op.
218func (NilSample) Snapshot() Sample { return NilSample{} }
219
220// StdDev is a no-op.
221func (NilSample) StdDev() float64 { return 0.0 }
222
223// Sum is a no-op.
224func (NilSample) Sum() int64 { return 0 }
225
226// Update is a no-op.
227func (NilSample) Update(v int64) {}
228
229// Values is a no-op.
230func (NilSample) Values() []int64 { return []int64{} }
231
232// Variance is a no-op.
233func (NilSample) Variance() float64 { return 0.0 }
234
235// SampleMax returns the maximum value of the slice of int64.
236func SampleMax(values []int64) int64 {
237	if 0 == len(values) {
238		return 0
239	}
240	var max int64 = math.MinInt64
241	for _, v := range values {
242		if max < v {
243			max = v
244		}
245	}
246	return max
247}
248
249// SampleMean returns the mean value of the slice of int64.
250func SampleMean(values []int64) float64 {
251	if 0 == len(values) {
252		return 0.0
253	}
254	return float64(SampleSum(values)) / float64(len(values))
255}
256
257// SampleMin returns the minimum value of the slice of int64.
258func SampleMin(values []int64) int64 {
259	if 0 == len(values) {
260		return 0
261	}
262	var min int64 = math.MaxInt64
263	for _, v := range values {
264		if min > v {
265			min = v
266		}
267	}
268	return min
269}
270
271// SamplePercentiles returns an arbitrary percentile of the slice of int64.
272func SamplePercentile(values int64Slice, p float64) float64 {
273	return SamplePercentiles(values, []float64{p})[0]
274}
275
276// SamplePercentiles returns a slice of arbitrary percentiles of the slice of
277// int64.
278func SamplePercentiles(values int64Slice, ps []float64) []float64 {
279	scores := make([]float64, len(ps))
280	size := len(values)
281	if size > 0 {
282		sort.Sort(values)
283		for i, p := range ps {
284			pos := p * float64(size+1)
285			if pos < 1.0 {
286				scores[i] = float64(values[0])
287			} else if pos >= float64(size) {
288				scores[i] = float64(values[size-1])
289			} else {
290				lower := float64(values[int(pos)-1])
291				upper := float64(values[int(pos)])
292				scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
293			}
294		}
295	}
296	return scores
297}
298
299// SampleSnapshot is a read-only copy of another Sample.
300type SampleSnapshot struct {
301	count  int64
302	values []int64
303}
304
305func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot {
306	return &SampleSnapshot{
307		count:  count,
308		values: values,
309	}
310}
311
312// Clear panics.
313func (*SampleSnapshot) Clear() {
314	panic("Clear called on a SampleSnapshot")
315}
316
317// Count returns the count of inputs at the time the snapshot was taken.
318func (s *SampleSnapshot) Count() int64 { return s.count }
319
320// Max returns the maximal value at the time the snapshot was taken.
321func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) }
322
323// Mean returns the mean value at the time the snapshot was taken.
324func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) }
325
326// Min returns the minimal value at the time the snapshot was taken.
327func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) }
328
329// Percentile returns an arbitrary percentile of values at the time the
330// snapshot was taken.
331func (s *SampleSnapshot) Percentile(p float64) float64 {
332	return SamplePercentile(s.values, p)
333}
334
335// Percentiles returns a slice of arbitrary percentiles of values at the time
336// the snapshot was taken.
337func (s *SampleSnapshot) Percentiles(ps []float64) []float64 {
338	return SamplePercentiles(s.values, ps)
339}
340
341// Size returns the size of the sample at the time the snapshot was taken.
342func (s *SampleSnapshot) Size() int { return len(s.values) }
343
344// Snapshot returns the snapshot.
345func (s *SampleSnapshot) Snapshot() Sample { return s }
346
347// StdDev returns the standard deviation of values at the time the snapshot was
348// taken.
349func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) }
350
351// Sum returns the sum of values at the time the snapshot was taken.
352func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) }
353
354// Update panics.
355func (*SampleSnapshot) Update(int64) {
356	panic("Update called on a SampleSnapshot")
357}
358
359// Values returns a copy of the values in the sample.
360func (s *SampleSnapshot) Values() []int64 {
361	values := make([]int64, len(s.values))
362	copy(values, s.values)
363	return values
364}
365
366// Variance returns the variance of values at the time the snapshot was taken.
367func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) }
368
369// SampleStdDev returns the standard deviation of the slice of int64.
370func SampleStdDev(values []int64) float64 {
371	return math.Sqrt(SampleVariance(values))
372}
373
374// SampleSum returns the sum of the slice of int64.
375func SampleSum(values []int64) int64 {
376	var sum int64
377	for _, v := range values {
378		sum += v
379	}
380	return sum
381}
382
383// SampleVariance returns the variance of the slice of int64.
384func SampleVariance(values []int64) float64 {
385	if 0 == len(values) {
386		return 0.0
387	}
388	m := SampleMean(values)
389	var sum float64
390	for _, v := range values {
391		d := float64(v) - m
392		sum += d * d
393	}
394	return sum / float64(len(values))
395}
396
397// A uniform sample using Vitter's Algorithm R.
398//
399// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
400type UniformSample struct {
401	count         int64
402	mutex         sync.Mutex
403	reservoirSize int
404	values        []int64
405}
406
407// NewUniformSample constructs a new uniform sample with the given reservoir
408// size.
409func NewUniformSample(reservoirSize int) Sample {
410	if UseNilMetrics {
411		return NilSample{}
412	}
413	return &UniformSample{
414		reservoirSize: reservoirSize,
415		values:        make([]int64, 0, reservoirSize),
416	}
417}
418
419// Clear clears all samples.
420func (s *UniformSample) Clear() {
421	s.mutex.Lock()
422	defer s.mutex.Unlock()
423	s.count = 0
424	s.values = make([]int64, 0, s.reservoirSize)
425}
426
427// Count returns the number of samples recorded, which may exceed the
428// reservoir size.
429func (s *UniformSample) Count() int64 {
430	s.mutex.Lock()
431	defer s.mutex.Unlock()
432	return s.count
433}
434
435// Max returns the maximum value in the sample, which may not be the maximum
436// value ever to be part of the sample.
437func (s *UniformSample) Max() int64 {
438	s.mutex.Lock()
439	defer s.mutex.Unlock()
440	return SampleMax(s.values)
441}
442
443// Mean returns the mean of the values in the sample.
444func (s *UniformSample) Mean() float64 {
445	s.mutex.Lock()
446	defer s.mutex.Unlock()
447	return SampleMean(s.values)
448}
449
450// Min returns the minimum value in the sample, which may not be the minimum
451// value ever to be part of the sample.
452func (s *UniformSample) Min() int64 {
453	s.mutex.Lock()
454	defer s.mutex.Unlock()
455	return SampleMin(s.values)
456}
457
458// Percentile returns an arbitrary percentile of values in the sample.
459func (s *UniformSample) Percentile(p float64) float64 {
460	s.mutex.Lock()
461	defer s.mutex.Unlock()
462	return SamplePercentile(s.values, p)
463}
464
465// Percentiles returns a slice of arbitrary percentiles of values in the
466// sample.
467func (s *UniformSample) Percentiles(ps []float64) []float64 {
468	s.mutex.Lock()
469	defer s.mutex.Unlock()
470	return SamplePercentiles(s.values, ps)
471}
472
473// Size returns the size of the sample, which is at most the reservoir size.
474func (s *UniformSample) Size() int {
475	s.mutex.Lock()
476	defer s.mutex.Unlock()
477	return len(s.values)
478}
479
480// Snapshot returns a read-only copy of the sample.
481func (s *UniformSample) Snapshot() Sample {
482	s.mutex.Lock()
483	defer s.mutex.Unlock()
484	values := make([]int64, len(s.values))
485	copy(values, s.values)
486	return &SampleSnapshot{
487		count:  s.count,
488		values: values,
489	}
490}
491
492// StdDev returns the standard deviation of the values in the sample.
493func (s *UniformSample) StdDev() float64 {
494	s.mutex.Lock()
495	defer s.mutex.Unlock()
496	return SampleStdDev(s.values)
497}
498
499// Sum returns the sum of the values in the sample.
500func (s *UniformSample) Sum() int64 {
501	s.mutex.Lock()
502	defer s.mutex.Unlock()
503	return SampleSum(s.values)
504}
505
506// Update samples a new value.
507func (s *UniformSample) Update(v int64) {
508	s.mutex.Lock()
509	defer s.mutex.Unlock()
510	s.count++
511	if len(s.values) < s.reservoirSize {
512		s.values = append(s.values, v)
513	} else {
514		r := rand.Int63n(s.count)
515		if r < int64(len(s.values)) {
516			s.values[int(r)] = v
517		}
518	}
519}
520
521// Values returns a copy of the values in the sample.
522func (s *UniformSample) Values() []int64 {
523	s.mutex.Lock()
524	defer s.mutex.Unlock()
525	values := make([]int64, len(s.values))
526	copy(values, s.values)
527	return values
528}
529
530// Variance returns the variance of the values in the sample.
531func (s *UniformSample) Variance() float64 {
532	s.mutex.Lock()
533	defer s.mutex.Unlock()
534	return SampleVariance(s.values)
535}
536
537// expDecaySample represents an individual sample in a heap.
538type expDecaySample struct {
539	k float64
540	v int64
541}
542
543func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap {
544	return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)}
545}
546
547// expDecaySampleHeap is a min-heap of expDecaySamples.
548// The internal implementation is copied from the standard library's container/heap
549type expDecaySampleHeap struct {
550	s []expDecaySample
551}
552
553func (h *expDecaySampleHeap) Clear() {
554	h.s = h.s[:0]
555}
556
557func (h *expDecaySampleHeap) Push(s expDecaySample) {
558	n := len(h.s)
559	h.s = h.s[0 : n+1]
560	h.s[n] = s
561	h.up(n)
562}
563
564func (h *expDecaySampleHeap) Pop() expDecaySample {
565	n := len(h.s) - 1
566	h.s[0], h.s[n] = h.s[n], h.s[0]
567	h.down(0, n)
568
569	n = len(h.s)
570	s := h.s[n-1]
571	h.s = h.s[0 : n-1]
572	return s
573}
574
575func (h *expDecaySampleHeap) Size() int {
576	return len(h.s)
577}
578
579func (h *expDecaySampleHeap) Values() []expDecaySample {
580	return h.s
581}
582
583func (h *expDecaySampleHeap) up(j int) {
584	for {
585		i := (j - 1) / 2 // parent
586		if i == j || !(h.s[j].k < h.s[i].k) {
587			break
588		}
589		h.s[i], h.s[j] = h.s[j], h.s[i]
590		j = i
591	}
592}
593
594func (h *expDecaySampleHeap) down(i, n int) {
595	for {
596		j1 := 2*i + 1
597		if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
598			break
599		}
600		j := j1 // left child
601		if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) {
602			j = j2 // = 2*i + 2  // right child
603		}
604		if !(h.s[j].k < h.s[i].k) {
605			break
606		}
607		h.s[i], h.s[j] = h.s[j], h.s[i]
608		i = j
609	}
610}
611
612type int64Slice []int64
613
614func (p int64Slice) Len() int           { return len(p) }
615func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
616func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
617