1package memmetrics
2
3import (
4	"fmt"
5	"time"
6
7	"github.com/mailgun/timetools"
8)
9
10type rcOptSetter func(*RollingCounter) error
11
12// CounterClock defines a counter clock
13func CounterClock(c timetools.TimeProvider) rcOptSetter {
14	return func(r *RollingCounter) error {
15		r.clock = c
16		return nil
17	}
18}
19
20// RollingCounter Calculates in memory failure rate of an endpoint using rolling window of a predefined size
21type RollingCounter struct {
22	clock          timetools.TimeProvider
23	resolution     time.Duration
24	values         []int
25	countedBuckets int // how many samples in different buckets have we collected so far
26	lastBucket     int // last recorded bucket
27	lastUpdated    time.Time
28}
29
30// NewCounter creates a counter with fixed amount of buckets that are rotated every resolution period.
31// E.g. 10 buckets with 1 second means that every new second the bucket is refreshed, so it maintains 10 second rolling window.
32// By default creates a bucket with 10 buckets and 1 second resolution
33func NewCounter(buckets int, resolution time.Duration, options ...rcOptSetter) (*RollingCounter, error) {
34	if buckets <= 0 {
35		return nil, fmt.Errorf("Buckets should be >= 0")
36	}
37	if resolution < time.Second {
38		return nil, fmt.Errorf("Resolution should be larger than a second")
39	}
40
41	rc := &RollingCounter{
42		lastBucket: -1,
43		resolution: resolution,
44
45		values: make([]int, buckets),
46	}
47
48	for _, o := range options {
49		if err := o(rc); err != nil {
50			return nil, err
51		}
52	}
53
54	if rc.clock == nil {
55		rc.clock = &timetools.RealTime{}
56	}
57
58	return rc, nil
59}
60
61// Append append a counter
62func (c *RollingCounter) Append(o *RollingCounter) error {
63	c.Inc(int(o.Count()))
64	return nil
65}
66
67// Clone clone a counter
68func (c *RollingCounter) Clone() *RollingCounter {
69	c.cleanup()
70	other := &RollingCounter{
71		resolution:  c.resolution,
72		values:      make([]int, len(c.values)),
73		clock:       c.clock,
74		lastBucket:  c.lastBucket,
75		lastUpdated: c.lastUpdated,
76	}
77	copy(other.values, c.values)
78	return other
79}
80
81// Reset reset a counter
82func (c *RollingCounter) Reset() {
83	c.lastBucket = -1
84	c.countedBuckets = 0
85	c.lastUpdated = time.Time{}
86	for i := range c.values {
87		c.values[i] = 0
88	}
89}
90
91// CountedBuckets gets counted buckets
92func (c *RollingCounter) CountedBuckets() int {
93	return c.countedBuckets
94}
95
96// Count counts
97func (c *RollingCounter) Count() int64 {
98	c.cleanup()
99	return c.sum()
100}
101
102// Resolution gets resolution
103func (c *RollingCounter) Resolution() time.Duration {
104	return c.resolution
105}
106
107// Buckets gets buckets
108func (c *RollingCounter) Buckets() int {
109	return len(c.values)
110}
111
112// WindowSize gets windows size
113func (c *RollingCounter) WindowSize() time.Duration {
114	return time.Duration(len(c.values)) * c.resolution
115}
116
117// Inc increment counter
118func (c *RollingCounter) Inc(v int) {
119	c.cleanup()
120	c.incBucketValue(v)
121}
122
123func (c *RollingCounter) incBucketValue(v int) {
124	now := c.clock.UtcNow()
125	bucket := c.getBucket(now)
126	c.values[bucket] += v
127	c.lastUpdated = now
128	// Update usage stats if we haven't collected enough data
129	if c.countedBuckets < len(c.values) {
130		// Only update if we have advanced to the next bucket and not incremented the value
131		// in the current bucket.
132		if c.lastBucket != bucket {
133			c.lastBucket = bucket
134			c.countedBuckets++
135		}
136	}
137}
138
139// Returns the number in the moving window bucket that this slot occupies
140func (c *RollingCounter) getBucket(t time.Time) int {
141	return int(t.Truncate(c.resolution).Unix() % int64(len(c.values)))
142}
143
144// Reset buckets that were not updated
145func (c *RollingCounter) cleanup() {
146	now := c.clock.UtcNow()
147	for i := 0; i < len(c.values); i++ {
148		now = now.Add(time.Duration(-1*i) * c.resolution)
149		if now.Truncate(c.resolution).After(c.lastUpdated.Truncate(c.resolution)) {
150			c.values[c.getBucket(now)] = 0
151		} else {
152			break
153		}
154	}
155}
156
157func (c *RollingCounter) sum() int64 {
158	out := int64(0)
159	for _, v := range c.values {
160		out += int64(v)
161	}
162	return out
163}
164