1package metrics
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"net/url"
8	"strings"
9	"sync"
10	"time"
11)
12
13// InmemSink provides a MetricSink that does in-memory aggregation
14// without sending metrics over a network. It can be embedded within
15// an application to provide profiling information.
16type InmemSink struct {
17	// How long is each aggregation interval
18	interval time.Duration
19
20	// Retain controls how many metrics interval we keep
21	retain time.Duration
22
23	// maxIntervals is the maximum length of intervals.
24	// It is retain / interval.
25	maxIntervals int
26
27	// intervals is a slice of the retained intervals
28	intervals    []*IntervalMetrics
29	intervalLock sync.RWMutex
30
31	rateDenom float64
32}
33
34// IntervalMetrics stores the aggregated metrics
35// for a specific interval
36type IntervalMetrics struct {
37	sync.RWMutex
38
39	// The start time of the interval
40	Interval time.Time
41
42	// Gauges maps the key to the last set value
43	Gauges map[string]GaugeValue
44
45	// Points maps the string to the list of emitted values
46	// from EmitKey
47	Points map[string][]float32
48
49	// Counters maps the string key to a sum of the counter
50	// values
51	Counters map[string]SampledValue
52
53	// Samples maps the key to an AggregateSample,
54	// which has the rolled up view of a sample
55	Samples map[string]SampledValue
56}
57
58// NewIntervalMetrics creates a new IntervalMetrics for a given interval
59func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
60	return &IntervalMetrics{
61		Interval: intv,
62		Gauges:   make(map[string]GaugeValue),
63		Points:   make(map[string][]float32),
64		Counters: make(map[string]SampledValue),
65		Samples:  make(map[string]SampledValue),
66	}
67}
68
69// AggregateSample is used to hold aggregate metrics
70// about a sample
71type AggregateSample struct {
72	Count       int       // The count of emitted pairs
73	Rate        float64   // The values rate per time unit (usually 1 second)
74	Sum         float64   // The sum of values
75	SumSq       float64   `json:"-"` // The sum of squared values
76	Min         float64   // Minimum value
77	Max         float64   // Maximum value
78	LastUpdated time.Time `json:"-"` // When value was last updated
79}
80
81// Computes a Stddev of the values
82func (a *AggregateSample) Stddev() float64 {
83	num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
84	div := float64(a.Count * (a.Count - 1))
85	if div == 0 {
86		return 0
87	}
88	return math.Sqrt(num / div)
89}
90
91// Computes a mean of the values
92func (a *AggregateSample) Mean() float64 {
93	if a.Count == 0 {
94		return 0
95	}
96	return a.Sum / float64(a.Count)
97}
98
99// Ingest is used to update a sample
100func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
101	a.Count++
102	a.Sum += v
103	a.SumSq += (v * v)
104	if v < a.Min || a.Count == 1 {
105		a.Min = v
106	}
107	if v > a.Max || a.Count == 1 {
108		a.Max = v
109	}
110	a.Rate = float64(a.Sum) / rateDenom
111	a.LastUpdated = time.Now()
112}
113
114func (a *AggregateSample) String() string {
115	if a.Count == 0 {
116		return "Count: 0"
117	} else if a.Stddev() == 0 {
118		return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
119	} else {
120		return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
121			a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
122	}
123}
124
125// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
126// (and tested) from NewMetricSinkFromURL.
127func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
128	params := u.Query()
129
130	interval, err := time.ParseDuration(params.Get("interval"))
131	if err != nil {
132		return nil, fmt.Errorf("Bad 'interval' param: %s", err)
133	}
134
135	retain, err := time.ParseDuration(params.Get("retain"))
136	if err != nil {
137		return nil, fmt.Errorf("Bad 'retain' param: %s", err)
138	}
139
140	return NewInmemSink(interval, retain), nil
141}
142
143// NewInmemSink is used to construct a new in-memory sink.
144// Uses an aggregation interval and maximum retention period.
145func NewInmemSink(interval, retain time.Duration) *InmemSink {
146	rateTimeUnit := time.Second
147	i := &InmemSink{
148		interval:     interval,
149		retain:       retain,
150		maxIntervals: int(retain / interval),
151		rateDenom:    float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
152	}
153	i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
154	return i
155}
156
157func (i *InmemSink) SetGauge(key []string, val float32) {
158	i.SetGaugeWithLabels(key, val, nil)
159}
160
161func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
162	k, name := i.flattenKeyLabels(key, labels)
163	intv := i.getInterval()
164
165	intv.Lock()
166	defer intv.Unlock()
167	intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
168}
169
170func (i *InmemSink) EmitKey(key []string, val float32) {
171	k := i.flattenKey(key)
172	intv := i.getInterval()
173
174	intv.Lock()
175	defer intv.Unlock()
176	vals := intv.Points[k]
177	intv.Points[k] = append(vals, val)
178}
179
180func (i *InmemSink) IncrCounter(key []string, val float32) {
181	i.IncrCounterWithLabels(key, val, nil)
182}
183
184func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
185	k, name := i.flattenKeyLabels(key, labels)
186	intv := i.getInterval()
187
188	intv.Lock()
189	defer intv.Unlock()
190
191	agg, ok := intv.Counters[k]
192	if !ok {
193		agg = SampledValue{
194			Name:            name,
195			AggregateSample: &AggregateSample{},
196			Labels:          labels,
197		}
198		intv.Counters[k] = agg
199	}
200	agg.Ingest(float64(val), i.rateDenom)
201}
202
203func (i *InmemSink) AddSample(key []string, val float32) {
204	i.AddSampleWithLabels(key, val, nil)
205}
206
207func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
208	k, name := i.flattenKeyLabels(key, labels)
209	intv := i.getInterval()
210
211	intv.Lock()
212	defer intv.Unlock()
213
214	agg, ok := intv.Samples[k]
215	if !ok {
216		agg = SampledValue{
217			Name:            name,
218			AggregateSample: &AggregateSample{},
219			Labels:          labels,
220		}
221		intv.Samples[k] = agg
222	}
223	agg.Ingest(float64(val), i.rateDenom)
224}
225
226// Data is used to retrieve all the aggregated metrics
227// Intervals may be in use, and a read lock should be acquired
228func (i *InmemSink) Data() []*IntervalMetrics {
229	// Get the current interval, forces creation
230	i.getInterval()
231
232	i.intervalLock.RLock()
233	defer i.intervalLock.RUnlock()
234
235	n := len(i.intervals)
236	intervals := make([]*IntervalMetrics, n)
237
238	copy(intervals[:n-1], i.intervals[:n-1])
239	current := i.intervals[n-1]
240
241	// make its own copy for current interval
242	intervals[n-1] = &IntervalMetrics{}
243	copyCurrent := intervals[n-1]
244	current.RLock()
245	*copyCurrent = *current
246
247	copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
248	for k, v := range current.Gauges {
249		copyCurrent.Gauges[k] = v
250	}
251	// saved values will be not change, just copy its link
252	copyCurrent.Points = make(map[string][]float32, len(current.Points))
253	for k, v := range current.Points {
254		copyCurrent.Points[k] = v
255	}
256	copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
257	for k, v := range current.Counters {
258		copyCurrent.Counters[k] = v
259	}
260	copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
261	for k, v := range current.Samples {
262		copyCurrent.Samples[k] = v
263	}
264	current.RUnlock()
265
266	return intervals
267}
268
269func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
270	i.intervalLock.RLock()
271	defer i.intervalLock.RUnlock()
272
273	n := len(i.intervals)
274	if n > 0 && i.intervals[n-1].Interval == intv {
275		return i.intervals[n-1]
276	}
277	return nil
278}
279
280func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
281	i.intervalLock.Lock()
282	defer i.intervalLock.Unlock()
283
284	// Check for an existing interval
285	n := len(i.intervals)
286	if n > 0 && i.intervals[n-1].Interval == intv {
287		return i.intervals[n-1]
288	}
289
290	// Add the current interval
291	current := NewIntervalMetrics(intv)
292	i.intervals = append(i.intervals, current)
293	n++
294
295	// Truncate the intervals if they are too long
296	if n >= i.maxIntervals {
297		copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
298		i.intervals = i.intervals[:i.maxIntervals]
299	}
300	return current
301}
302
303// getInterval returns the current interval to write to
304func (i *InmemSink) getInterval() *IntervalMetrics {
305	intv := time.Now().Truncate(i.interval)
306	if m := i.getExistingInterval(intv); m != nil {
307		return m
308	}
309	return i.createInterval(intv)
310}
311
312// Flattens the key for formatting, removes spaces
313func (i *InmemSink) flattenKey(parts []string) string {
314	buf := &bytes.Buffer{}
315	replacer := strings.NewReplacer(" ", "_")
316
317	if len(parts) > 0 {
318		replacer.WriteString(buf, parts[0])
319	}
320	for _, part := range parts[1:] {
321		replacer.WriteString(buf, ".")
322		replacer.WriteString(buf, part)
323	}
324
325	return buf.String()
326}
327
328// Flattens the key for formatting along with its labels, removes spaces
329func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
330	buf := &bytes.Buffer{}
331	replacer := strings.NewReplacer(" ", "_")
332
333	if len(parts) > 0 {
334		replacer.WriteString(buf, parts[0])
335	}
336	for _, part := range parts[1:] {
337		replacer.WriteString(buf, ".")
338		replacer.WriteString(buf, part)
339	}
340
341	key := buf.String()
342
343	for _, label := range labels {
344		replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
345	}
346
347	return buf.String(), key
348}
349