1package metrics 2 3import ( 4 "math" 5 "math/rand" 6 "sort" 7 "sync" 8 "time" 9) 10 11const rescaleThreshold = time.Hour 12 13// Samples maintain a statistically-significant selection of values from 14// a stream. 15type Sample interface { 16 Clear() 17 Count() int64 18 Max() int64 19 Mean() float64 20 Min() int64 21 Percentile(float64) float64 22 Percentiles([]float64) []float64 23 Size() int 24 Snapshot() Sample 25 StdDev() float64 26 Sum() int64 27 Update(int64) 28 Values() []int64 29 Variance() float64 30} 31 32// ExpDecaySample is an exponentially-decaying sample using a forward-decaying 33// priority reservoir. See Cormode et al's "Forward Decay: A Practical Time 34// Decay Model for Streaming Systems". 35// 36// <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf> 37type ExpDecaySample struct { 38 alpha float64 39 count int64 40 mutex sync.Mutex 41 reservoirSize int 42 t0, t1 time.Time 43 values *expDecaySampleHeap 44} 45 46// NewExpDecaySample constructs a new exponentially-decaying sample with the 47// given reservoir size and alpha. 48func NewExpDecaySample(reservoirSize int, alpha float64) Sample { 49 if UseNilMetrics { 50 return NilSample{} 51 } 52 s := &ExpDecaySample{ 53 alpha: alpha, 54 reservoirSize: reservoirSize, 55 t0: time.Now(), 56 values: newExpDecaySampleHeap(reservoirSize), 57 } 58 s.t1 = s.t0.Add(rescaleThreshold) 59 return s 60} 61 62// Clear clears all samples. 63func (s *ExpDecaySample) Clear() { 64 s.mutex.Lock() 65 defer s.mutex.Unlock() 66 s.count = 0 67 s.t0 = time.Now() 68 s.t1 = s.t0.Add(rescaleThreshold) 69 s.values.Clear() 70} 71 72// Count returns the number of samples recorded, which may exceed the 73// reservoir size. 74func (s *ExpDecaySample) Count() int64 { 75 s.mutex.Lock() 76 defer s.mutex.Unlock() 77 return s.count 78} 79 80// Max returns the maximum value in the sample, which may not be the maximum 81// value ever to be part of the sample. 82func (s *ExpDecaySample) Max() int64 { 83 return SampleMax(s.Values()) 84} 85 86// Mean returns the mean of the values in the sample. 87func (s *ExpDecaySample) Mean() float64 { 88 return SampleMean(s.Values()) 89} 90 91// Min returns the minimum value in the sample, which may not be the minimum 92// value ever to be part of the sample. 93func (s *ExpDecaySample) Min() int64 { 94 return SampleMin(s.Values()) 95} 96 97// Percentile returns an arbitrary percentile of values in the sample. 98func (s *ExpDecaySample) Percentile(p float64) float64 { 99 return SamplePercentile(s.Values(), p) 100} 101 102// Percentiles returns a slice of arbitrary percentiles of values in the 103// sample. 104func (s *ExpDecaySample) Percentiles(ps []float64) []float64 { 105 return SamplePercentiles(s.Values(), ps) 106} 107 108// Size returns the size of the sample, which is at most the reservoir size. 109func (s *ExpDecaySample) Size() int { 110 s.mutex.Lock() 111 defer s.mutex.Unlock() 112 return s.values.Size() 113} 114 115// Snapshot returns a read-only copy of the sample. 116func (s *ExpDecaySample) Snapshot() Sample { 117 s.mutex.Lock() 118 defer s.mutex.Unlock() 119 vals := s.values.Values() 120 values := make([]int64, len(vals)) 121 for i, v := range vals { 122 values[i] = v.v 123 } 124 return &SampleSnapshot{ 125 count: s.count, 126 values: values, 127 } 128} 129 130// StdDev returns the standard deviation of the values in the sample. 131func (s *ExpDecaySample) StdDev() float64 { 132 return SampleStdDev(s.Values()) 133} 134 135// Sum returns the sum of the values in the sample. 136func (s *ExpDecaySample) Sum() int64 { 137 return SampleSum(s.Values()) 138} 139 140// Update samples a new value. 141func (s *ExpDecaySample) Update(v int64) { 142 s.update(time.Now(), v) 143} 144 145// Values returns a copy of the values in the sample. 146func (s *ExpDecaySample) Values() []int64 { 147 s.mutex.Lock() 148 defer s.mutex.Unlock() 149 vals := s.values.Values() 150 values := make([]int64, len(vals)) 151 for i, v := range vals { 152 values[i] = v.v 153 } 154 return values 155} 156 157// Variance returns the variance of the values in the sample. 158func (s *ExpDecaySample) Variance() float64 { 159 return SampleVariance(s.Values()) 160} 161 162// update samples a new value at a particular timestamp. This is a method all 163// its own to facilitate testing. 164func (s *ExpDecaySample) update(t time.Time, v int64) { 165 s.mutex.Lock() 166 defer s.mutex.Unlock() 167 s.count++ 168 if s.values.Size() == s.reservoirSize { 169 s.values.Pop() 170 } 171 s.values.Push(expDecaySample{ 172 k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(), 173 v: v, 174 }) 175 if t.After(s.t1) { 176 values := s.values.Values() 177 t0 := s.t0 178 s.values.Clear() 179 s.t0 = t 180 s.t1 = s.t0.Add(rescaleThreshold) 181 for _, v := range values { 182 v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds()) 183 s.values.Push(v) 184 } 185 } 186} 187 188// NilSample is a no-op Sample. 189type NilSample struct{} 190 191// Clear is a no-op. 192func (NilSample) Clear() {} 193 194// Count is a no-op. 195func (NilSample) Count() int64 { return 0 } 196 197// Max is a no-op. 198func (NilSample) Max() int64 { return 0 } 199 200// Mean is a no-op. 201func (NilSample) Mean() float64 { return 0.0 } 202 203// Min is a no-op. 204func (NilSample) Min() int64 { return 0 } 205 206// Percentile is a no-op. 207func (NilSample) Percentile(p float64) float64 { return 0.0 } 208 209// Percentiles is a no-op. 210func (NilSample) Percentiles(ps []float64) []float64 { 211 return make([]float64, len(ps)) 212} 213 214// Size is a no-op. 215func (NilSample) Size() int { return 0 } 216 217// Sample is a no-op. 218func (NilSample) Snapshot() Sample { return NilSample{} } 219 220// StdDev is a no-op. 221func (NilSample) StdDev() float64 { return 0.0 } 222 223// Sum is a no-op. 224func (NilSample) Sum() int64 { return 0 } 225 226// Update is a no-op. 227func (NilSample) Update(v int64) {} 228 229// Values is a no-op. 230func (NilSample) Values() []int64 { return []int64{} } 231 232// Variance is a no-op. 233func (NilSample) Variance() float64 { return 0.0 } 234 235// SampleMax returns the maximum value of the slice of int64. 236func SampleMax(values []int64) int64 { 237 if 0 == len(values) { 238 return 0 239 } 240 var max int64 = math.MinInt64 241 for _, v := range values { 242 if max < v { 243 max = v 244 } 245 } 246 return max 247} 248 249// SampleMean returns the mean value of the slice of int64. 250func SampleMean(values []int64) float64 { 251 if 0 == len(values) { 252 return 0.0 253 } 254 return float64(SampleSum(values)) / float64(len(values)) 255} 256 257// SampleMin returns the minimum value of the slice of int64. 258func SampleMin(values []int64) int64 { 259 if 0 == len(values) { 260 return 0 261 } 262 var min int64 = math.MaxInt64 263 for _, v := range values { 264 if min > v { 265 min = v 266 } 267 } 268 return min 269} 270 271// SamplePercentiles returns an arbitrary percentile of the slice of int64. 272func SamplePercentile(values int64Slice, p float64) float64 { 273 return SamplePercentiles(values, []float64{p})[0] 274} 275 276// SamplePercentiles returns a slice of arbitrary percentiles of the slice of 277// int64. 278func SamplePercentiles(values int64Slice, ps []float64) []float64 { 279 scores := make([]float64, len(ps)) 280 size := len(values) 281 if size > 0 { 282 sort.Sort(values) 283 for i, p := range ps { 284 pos := p * float64(size+1) 285 if pos < 1.0 { 286 scores[i] = float64(values[0]) 287 } else if pos >= float64(size) { 288 scores[i] = float64(values[size-1]) 289 } else { 290 lower := float64(values[int(pos)-1]) 291 upper := float64(values[int(pos)]) 292 scores[i] = lower + (pos-math.Floor(pos))*(upper-lower) 293 } 294 } 295 } 296 return scores 297} 298 299// SampleSnapshot is a read-only copy of another Sample. 300type SampleSnapshot struct { 301 count int64 302 values []int64 303} 304 305func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot { 306 return &SampleSnapshot{ 307 count: count, 308 values: values, 309 } 310} 311 312// Clear panics. 313func (*SampleSnapshot) Clear() { 314 panic("Clear called on a SampleSnapshot") 315} 316 317// Count returns the count of inputs at the time the snapshot was taken. 318func (s *SampleSnapshot) Count() int64 { return s.count } 319 320// Max returns the maximal value at the time the snapshot was taken. 321func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) } 322 323// Mean returns the mean value at the time the snapshot was taken. 324func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) } 325 326// Min returns the minimal value at the time the snapshot was taken. 327func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) } 328 329// Percentile returns an arbitrary percentile of values at the time the 330// snapshot was taken. 331func (s *SampleSnapshot) Percentile(p float64) float64 { 332 return SamplePercentile(s.values, p) 333} 334 335// Percentiles returns a slice of arbitrary percentiles of values at the time 336// the snapshot was taken. 337func (s *SampleSnapshot) Percentiles(ps []float64) []float64 { 338 return SamplePercentiles(s.values, ps) 339} 340 341// Size returns the size of the sample at the time the snapshot was taken. 342func (s *SampleSnapshot) Size() int { return len(s.values) } 343 344// Snapshot returns the snapshot. 345func (s *SampleSnapshot) Snapshot() Sample { return s } 346 347// StdDev returns the standard deviation of values at the time the snapshot was 348// taken. 349func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) } 350 351// Sum returns the sum of values at the time the snapshot was taken. 352func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) } 353 354// Update panics. 355func (*SampleSnapshot) Update(int64) { 356 panic("Update called on a SampleSnapshot") 357} 358 359// Values returns a copy of the values in the sample. 360func (s *SampleSnapshot) Values() []int64 { 361 values := make([]int64, len(s.values)) 362 copy(values, s.values) 363 return values 364} 365 366// Variance returns the variance of values at the time the snapshot was taken. 367func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) } 368 369// SampleStdDev returns the standard deviation of the slice of int64. 370func SampleStdDev(values []int64) float64 { 371 return math.Sqrt(SampleVariance(values)) 372} 373 374// SampleSum returns the sum of the slice of int64. 375func SampleSum(values []int64) int64 { 376 var sum int64 377 for _, v := range values { 378 sum += v 379 } 380 return sum 381} 382 383// SampleVariance returns the variance of the slice of int64. 384func SampleVariance(values []int64) float64 { 385 if 0 == len(values) { 386 return 0.0 387 } 388 m := SampleMean(values) 389 var sum float64 390 for _, v := range values { 391 d := float64(v) - m 392 sum += d * d 393 } 394 return sum / float64(len(values)) 395} 396 397// A uniform sample using Vitter's Algorithm R. 398// 399// <http://www.cs.umd.edu/~samir/498/vitter.pdf> 400type UniformSample struct { 401 count int64 402 mutex sync.Mutex 403 reservoirSize int 404 values []int64 405} 406 407// NewUniformSample constructs a new uniform sample with the given reservoir 408// size. 409func NewUniformSample(reservoirSize int) Sample { 410 if UseNilMetrics { 411 return NilSample{} 412 } 413 return &UniformSample{ 414 reservoirSize: reservoirSize, 415 values: make([]int64, 0, reservoirSize), 416 } 417} 418 419// Clear clears all samples. 420func (s *UniformSample) Clear() { 421 s.mutex.Lock() 422 defer s.mutex.Unlock() 423 s.count = 0 424 s.values = make([]int64, 0, s.reservoirSize) 425} 426 427// Count returns the number of samples recorded, which may exceed the 428// reservoir size. 429func (s *UniformSample) Count() int64 { 430 s.mutex.Lock() 431 defer s.mutex.Unlock() 432 return s.count 433} 434 435// Max returns the maximum value in the sample, which may not be the maximum 436// value ever to be part of the sample. 437func (s *UniformSample) Max() int64 { 438 s.mutex.Lock() 439 defer s.mutex.Unlock() 440 return SampleMax(s.values) 441} 442 443// Mean returns the mean of the values in the sample. 444func (s *UniformSample) Mean() float64 { 445 s.mutex.Lock() 446 defer s.mutex.Unlock() 447 return SampleMean(s.values) 448} 449 450// Min returns the minimum value in the sample, which may not be the minimum 451// value ever to be part of the sample. 452func (s *UniformSample) Min() int64 { 453 s.mutex.Lock() 454 defer s.mutex.Unlock() 455 return SampleMin(s.values) 456} 457 458// Percentile returns an arbitrary percentile of values in the sample. 459func (s *UniformSample) Percentile(p float64) float64 { 460 s.mutex.Lock() 461 defer s.mutex.Unlock() 462 return SamplePercentile(s.values, p) 463} 464 465// Percentiles returns a slice of arbitrary percentiles of values in the 466// sample. 467func (s *UniformSample) Percentiles(ps []float64) []float64 { 468 s.mutex.Lock() 469 defer s.mutex.Unlock() 470 return SamplePercentiles(s.values, ps) 471} 472 473// Size returns the size of the sample, which is at most the reservoir size. 474func (s *UniformSample) Size() int { 475 s.mutex.Lock() 476 defer s.mutex.Unlock() 477 return len(s.values) 478} 479 480// Snapshot returns a read-only copy of the sample. 481func (s *UniformSample) Snapshot() Sample { 482 s.mutex.Lock() 483 defer s.mutex.Unlock() 484 values := make([]int64, len(s.values)) 485 copy(values, s.values) 486 return &SampleSnapshot{ 487 count: s.count, 488 values: values, 489 } 490} 491 492// StdDev returns the standard deviation of the values in the sample. 493func (s *UniformSample) StdDev() float64 { 494 s.mutex.Lock() 495 defer s.mutex.Unlock() 496 return SampleStdDev(s.values) 497} 498 499// Sum returns the sum of the values in the sample. 500func (s *UniformSample) Sum() int64 { 501 s.mutex.Lock() 502 defer s.mutex.Unlock() 503 return SampleSum(s.values) 504} 505 506// Update samples a new value. 507func (s *UniformSample) Update(v int64) { 508 s.mutex.Lock() 509 defer s.mutex.Unlock() 510 s.count++ 511 if len(s.values) < s.reservoirSize { 512 s.values = append(s.values, v) 513 } else { 514 r := rand.Int63n(s.count) 515 if r < int64(len(s.values)) { 516 s.values[int(r)] = v 517 } 518 } 519} 520 521// Values returns a copy of the values in the sample. 522func (s *UniformSample) Values() []int64 { 523 s.mutex.Lock() 524 defer s.mutex.Unlock() 525 values := make([]int64, len(s.values)) 526 copy(values, s.values) 527 return values 528} 529 530// Variance returns the variance of the values in the sample. 531func (s *UniformSample) Variance() float64 { 532 s.mutex.Lock() 533 defer s.mutex.Unlock() 534 return SampleVariance(s.values) 535} 536 537// expDecaySample represents an individual sample in a heap. 538type expDecaySample struct { 539 k float64 540 v int64 541} 542 543func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap { 544 return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)} 545} 546 547// expDecaySampleHeap is a min-heap of expDecaySamples. 548// The internal implementation is copied from the standard library's container/heap 549type expDecaySampleHeap struct { 550 s []expDecaySample 551} 552 553func (h *expDecaySampleHeap) Clear() { 554 h.s = h.s[:0] 555} 556 557func (h *expDecaySampleHeap) Push(s expDecaySample) { 558 n := len(h.s) 559 h.s = h.s[0 : n+1] 560 h.s[n] = s 561 h.up(n) 562} 563 564func (h *expDecaySampleHeap) Pop() expDecaySample { 565 n := len(h.s) - 1 566 h.s[0], h.s[n] = h.s[n], h.s[0] 567 h.down(0, n) 568 569 n = len(h.s) 570 s := h.s[n-1] 571 h.s = h.s[0 : n-1] 572 return s 573} 574 575func (h *expDecaySampleHeap) Size() int { 576 return len(h.s) 577} 578 579func (h *expDecaySampleHeap) Values() []expDecaySample { 580 return h.s 581} 582 583func (h *expDecaySampleHeap) up(j int) { 584 for { 585 i := (j - 1) / 2 // parent 586 if i == j || !(h.s[j].k < h.s[i].k) { 587 break 588 } 589 h.s[i], h.s[j] = h.s[j], h.s[i] 590 j = i 591 } 592} 593 594func (h *expDecaySampleHeap) down(i, n int) { 595 for { 596 j1 := 2*i + 1 597 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow 598 break 599 } 600 j := j1 // left child 601 if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) { 602 j = j2 // = 2*i + 2 // right child 603 } 604 if !(h.s[j].k < h.s[i].k) { 605 break 606 } 607 h.s[i], h.s[j] = h.s[j], h.s[i] 608 i = j 609 } 610} 611 612type int64Slice []int64 613 614func (p int64Slice) Len() int { return len(p) } 615func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } 616func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } 617