1package memmetrics 2 3import ( 4 "fmt" 5 "time" 6 7 "github.com/mailgun/timetools" 8) 9 10type rcOptSetter func(*RollingCounter) error 11 12// CounterClock defines a counter clock 13func CounterClock(c timetools.TimeProvider) rcOptSetter { 14 return func(r *RollingCounter) error { 15 r.clock = c 16 return nil 17 } 18} 19 20// RollingCounter Calculates in memory failure rate of an endpoint using rolling window of a predefined size 21type RollingCounter struct { 22 clock timetools.TimeProvider 23 resolution time.Duration 24 values []int 25 countedBuckets int // how many samples in different buckets have we collected so far 26 lastBucket int // last recorded bucket 27 lastUpdated time.Time 28} 29 30// NewCounter creates a counter with fixed amount of buckets that are rotated every resolution period. 31// E.g. 10 buckets with 1 second means that every new second the bucket is refreshed, so it maintains 10 second rolling window. 32// By default creates a bucket with 10 buckets and 1 second resolution 33func NewCounter(buckets int, resolution time.Duration, options ...rcOptSetter) (*RollingCounter, error) { 34 if buckets <= 0 { 35 return nil, fmt.Errorf("Buckets should be >= 0") 36 } 37 if resolution < time.Second { 38 return nil, fmt.Errorf("Resolution should be larger than a second") 39 } 40 41 rc := &RollingCounter{ 42 lastBucket: -1, 43 resolution: resolution, 44 45 values: make([]int, buckets), 46 } 47 48 for _, o := range options { 49 if err := o(rc); err != nil { 50 return nil, err 51 } 52 } 53 54 if rc.clock == nil { 55 rc.clock = &timetools.RealTime{} 56 } 57 58 return rc, nil 59} 60 61// Append append a counter 62func (c *RollingCounter) Append(o *RollingCounter) error { 63 c.Inc(int(o.Count())) 64 return nil 65} 66 67// Clone clone a counter 68func (c *RollingCounter) Clone() *RollingCounter { 69 c.cleanup() 70 other := &RollingCounter{ 71 resolution: c.resolution, 72 values: make([]int, len(c.values)), 73 clock: c.clock, 74 lastBucket: c.lastBucket, 75 lastUpdated: c.lastUpdated, 76 } 77 copy(other.values, c.values) 78 return other 79} 80 81// Reset reset a counter 82func (c *RollingCounter) Reset() { 83 c.lastBucket = -1 84 c.countedBuckets = 0 85 c.lastUpdated = time.Time{} 86 for i := range c.values { 87 c.values[i] = 0 88 } 89} 90 91// CountedBuckets gets counted buckets 92func (c *RollingCounter) CountedBuckets() int { 93 return c.countedBuckets 94} 95 96// Count counts 97func (c *RollingCounter) Count() int64 { 98 c.cleanup() 99 return c.sum() 100} 101 102// Resolution gets resolution 103func (c *RollingCounter) Resolution() time.Duration { 104 return c.resolution 105} 106 107// Buckets gets buckets 108func (c *RollingCounter) Buckets() int { 109 return len(c.values) 110} 111 112// WindowSize gets windows size 113func (c *RollingCounter) WindowSize() time.Duration { 114 return time.Duration(len(c.values)) * c.resolution 115} 116 117// Inc increment counter 118func (c *RollingCounter) Inc(v int) { 119 c.cleanup() 120 c.incBucketValue(v) 121} 122 123func (c *RollingCounter) incBucketValue(v int) { 124 now := c.clock.UtcNow() 125 bucket := c.getBucket(now) 126 c.values[bucket] += v 127 c.lastUpdated = now 128 // Update usage stats if we haven't collected enough data 129 if c.countedBuckets < len(c.values) { 130 // Only update if we have advanced to the next bucket and not incremented the value 131 // in the current bucket. 132 if c.lastBucket != bucket { 133 c.lastBucket = bucket 134 c.countedBuckets++ 135 } 136 } 137} 138 139// Returns the number in the moving window bucket that this slot occupies 140func (c *RollingCounter) getBucket(t time.Time) int { 141 return int(t.Truncate(c.resolution).Unix() % int64(len(c.values))) 142} 143 144// Reset buckets that were not updated 145func (c *RollingCounter) cleanup() { 146 now := c.clock.UtcNow() 147 for i := 0; i < len(c.values); i++ { 148 now = now.Add(time.Duration(-1*i) * c.resolution) 149 if now.Truncate(c.resolution).After(c.lastUpdated.Truncate(c.resolution)) { 150 c.values[c.getBucket(now)] = 0 151 } else { 152 break 153 } 154 } 155} 156 157func (c *RollingCounter) sum() int64 { 158 out := int64(0) 159 for _, v := range c.values { 160 out += int64(v) 161 } 162 return out 163} 164