1package metrics
2
3import (
4	"fmt"
5	"math"
6	"strings"
7	"sync"
8	"time"
9)
10
11// InmemSink provides a MetricSink that does in-memory aggregation
12// without sending metrics over a network. It can be embedded within
13// an application to provide profiling information.
14type InmemSink struct {
15	// How long is each aggregation interval
16	interval time.Duration
17
18	// Retain controls how many metrics interval we keep
19	retain time.Duration
20
21	// maxIntervals is the maximum length of intervals.
22	// It is retain / interval.
23	maxIntervals int
24
25	// intervals is a slice of the retained intervals
26	intervals    []*IntervalMetrics
27	intervalLock sync.RWMutex
28}
29
30// IntervalMetrics stores the aggregated metrics
31// for a specific interval
32type IntervalMetrics struct {
33	sync.RWMutex
34
35	// The start time of the interval
36	Interval time.Time
37
38	// Gauges maps the key to the last set value
39	Gauges map[string]float32
40
41	// Points maps the string to the list of emitted values
42	// from EmitKey
43	Points map[string][]float32
44
45	// Counters maps the string key to a sum of the counter
46	// values
47	Counters map[string]*AggregateSample
48
49	// Samples maps the key to an AggregateSample,
50	// which has the rolled up view of a sample
51	Samples map[string]*AggregateSample
52}
53
54// NewIntervalMetrics creates a new IntervalMetrics for a given interval
55func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
56	return &IntervalMetrics{
57		Interval: intv,
58		Gauges:   make(map[string]float32),
59		Points:   make(map[string][]float32),
60		Counters: make(map[string]*AggregateSample),
61		Samples:  make(map[string]*AggregateSample),
62	}
63}
64
65// AggregateSample is used to hold aggregate metrics
66// about a sample
67type AggregateSample struct {
68	Count int     // The count of emitted pairs
69	Sum   float64 // The sum of values
70	SumSq float64 // The sum of squared values
71	Min   float64 // Minimum value
72	Max   float64 // Maximum value
73}
74
75// Computes a Stddev of the values
76func (a *AggregateSample) Stddev() float64 {
77	num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
78	div := float64(a.Count * (a.Count - 1))
79	if div == 0 {
80		return 0
81	}
82	return math.Sqrt(num / div)
83}
84
85// Computes a mean of the values
86func (a *AggregateSample) Mean() float64 {
87	if a.Count == 0 {
88		return 0
89	}
90	return a.Sum / float64(a.Count)
91}
92
93// Ingest is used to update a sample
94func (a *AggregateSample) Ingest(v float64) {
95	a.Count++
96	a.Sum += v
97	a.SumSq += (v * v)
98	if v < a.Min || a.Count == 1 {
99		a.Min = v
100	}
101	if v > a.Max || a.Count == 1 {
102		a.Max = v
103	}
104}
105
106func (a *AggregateSample) String() string {
107	if a.Count == 0 {
108		return "Count: 0"
109	} else if a.Stddev() == 0 {
110		return fmt.Sprintf("Count: %d Sum: %0.3f", a.Count, a.Sum)
111	} else {
112		return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f",
113			a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum)
114	}
115}
116
117// NewInmemSink is used to construct a new in-memory sink.
118// Uses an aggregation interval and maximum retention period.
119func NewInmemSink(interval, retain time.Duration) *InmemSink {
120	i := &InmemSink{
121		interval:     interval,
122		retain:       retain,
123		maxIntervals: int(retain / interval),
124	}
125	i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
126	return i
127}
128
129func (i *InmemSink) SetGauge(key []string, val float32) {
130	k := i.flattenKey(key)
131	intv := i.getInterval()
132
133	intv.Lock()
134	defer intv.Unlock()
135	intv.Gauges[k] = val
136}
137
138func (i *InmemSink) EmitKey(key []string, val float32) {
139	k := i.flattenKey(key)
140	intv := i.getInterval()
141
142	intv.Lock()
143	defer intv.Unlock()
144	vals := intv.Points[k]
145	intv.Points[k] = append(vals, val)
146}
147
148func (i *InmemSink) IncrCounter(key []string, val float32) {
149	k := i.flattenKey(key)
150	intv := i.getInterval()
151
152	intv.Lock()
153	defer intv.Unlock()
154
155	agg := intv.Counters[k]
156	if agg == nil {
157		agg = &AggregateSample{}
158		intv.Counters[k] = agg
159	}
160	agg.Ingest(float64(val))
161}
162
163func (i *InmemSink) AddSample(key []string, val float32) {
164	k := i.flattenKey(key)
165	intv := i.getInterval()
166
167	intv.Lock()
168	defer intv.Unlock()
169
170	agg := intv.Samples[k]
171	if agg == nil {
172		agg = &AggregateSample{}
173		intv.Samples[k] = agg
174	}
175	agg.Ingest(float64(val))
176}
177
178// Data is used to retrieve all the aggregated metrics
179// Intervals may be in use, and a read lock should be acquired
180func (i *InmemSink) Data() []*IntervalMetrics {
181	// Get the current interval, forces creation
182	i.getInterval()
183
184	i.intervalLock.RLock()
185	defer i.intervalLock.RUnlock()
186
187	intervals := make([]*IntervalMetrics, len(i.intervals))
188	copy(intervals, i.intervals)
189	return intervals
190}
191
192func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
193	i.intervalLock.RLock()
194	defer i.intervalLock.RUnlock()
195
196	n := len(i.intervals)
197	if n > 0 && i.intervals[n-1].Interval == intv {
198		return i.intervals[n-1]
199	}
200	return nil
201}
202
203func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
204	i.intervalLock.Lock()
205	defer i.intervalLock.Unlock()
206
207	// Check for an existing interval
208	n := len(i.intervals)
209	if n > 0 && i.intervals[n-1].Interval == intv {
210		return i.intervals[n-1]
211	}
212
213	// Add the current interval
214	current := NewIntervalMetrics(intv)
215	i.intervals = append(i.intervals, current)
216	n++
217
218	// Truncate the intervals if they are too long
219	if n >= i.maxIntervals {
220		copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
221		i.intervals = i.intervals[:i.maxIntervals]
222	}
223	return current
224}
225
226// getInterval returns the current interval to write to
227func (i *InmemSink) getInterval() *IntervalMetrics {
228	intv := time.Now().Truncate(i.interval)
229	if m := i.getExistingInterval(intv); m != nil {
230		return m
231	}
232	return i.createInterval(intv)
233}
234
235// Flattens the key for formatting, removes spaces
236func (i *InmemSink) flattenKey(parts []string) string {
237	joined := strings.Join(parts, ".")
238	return strings.Replace(joined, " ", "_", -1)
239}
240