1package metrics
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"net/url"
8	"strings"
9	"sync"
10	"time"
11)
12
13var spaceReplacer = strings.NewReplacer(" ", "_")
14
15// InmemSink provides a MetricSink that does in-memory aggregation
16// without sending metrics over a network. It can be embedded within
17// an application to provide profiling information.
18type InmemSink struct {
19	// How long is each aggregation interval
20	interval time.Duration
21
22	// Retain controls how many metrics interval we keep
23	retain time.Duration
24
25	// maxIntervals is the maximum length of intervals.
26	// It is retain / interval.
27	maxIntervals int
28
29	// intervals is a slice of the retained intervals
30	intervals    []*IntervalMetrics
31	intervalLock sync.RWMutex
32
33	rateDenom float64
34}
35
36// IntervalMetrics stores the aggregated metrics
37// for a specific interval
38type IntervalMetrics struct {
39	sync.RWMutex
40
41	// The start time of the interval
42	Interval time.Time
43
44	// Gauges maps the key to the last set value
45	Gauges map[string]GaugeValue
46
47	// Points maps the string to the list of emitted values
48	// from EmitKey
49	Points map[string][]float32
50
51	// Counters maps the string key to a sum of the counter
52	// values
53	Counters map[string]SampledValue
54
55	// Samples maps the key to an AggregateSample,
56	// which has the rolled up view of a sample
57	Samples map[string]SampledValue
58}
59
60// NewIntervalMetrics creates a new IntervalMetrics for a given interval
61func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
62	return &IntervalMetrics{
63		Interval: intv,
64		Gauges:   make(map[string]GaugeValue),
65		Points:   make(map[string][]float32),
66		Counters: make(map[string]SampledValue),
67		Samples:  make(map[string]SampledValue),
68	}
69}
70
71// AggregateSample is used to hold aggregate metrics
72// about a sample
73type AggregateSample struct {
74	Count       int       // The count of emitted pairs
75	Rate        float64   // The values rate per time unit (usually 1 second)
76	Sum         float64   // The sum of values
77	SumSq       float64   `json:"-"` // The sum of squared values
78	Min         float64   // Minimum value
79	Max         float64   // Maximum value
80	LastUpdated time.Time `json:"-"` // When value was last updated
81}
82
83// Computes a Stddev of the values
84func (a *AggregateSample) Stddev() float64 {
85	num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
86	div := float64(a.Count * (a.Count - 1))
87	if div == 0 {
88		return 0
89	}
90	return math.Sqrt(num / div)
91}
92
93// Computes a mean of the values
94func (a *AggregateSample) Mean() float64 {
95	if a.Count == 0 {
96		return 0
97	}
98	return a.Sum / float64(a.Count)
99}
100
101// Ingest is used to update a sample
102func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
103	a.Count++
104	a.Sum += v
105	a.SumSq += (v * v)
106	if v < a.Min || a.Count == 1 {
107		a.Min = v
108	}
109	if v > a.Max || a.Count == 1 {
110		a.Max = v
111	}
112	a.Rate = float64(a.Sum) / rateDenom
113	a.LastUpdated = time.Now()
114}
115
116func (a *AggregateSample) String() string {
117	if a.Count == 0 {
118		return "Count: 0"
119	} else if a.Stddev() == 0 {
120		return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
121	} else {
122		return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
123			a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
124	}
125}
126
127// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
128// (and tested) from NewMetricSinkFromURL.
129func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
130	params := u.Query()
131
132	interval, err := time.ParseDuration(params.Get("interval"))
133	if err != nil {
134		return nil, fmt.Errorf("Bad 'interval' param: %s", err)
135	}
136
137	retain, err := time.ParseDuration(params.Get("retain"))
138	if err != nil {
139		return nil, fmt.Errorf("Bad 'retain' param: %s", err)
140	}
141
142	return NewInmemSink(interval, retain), nil
143}
144
145// NewInmemSink is used to construct a new in-memory sink.
146// Uses an aggregation interval and maximum retention period.
147func NewInmemSink(interval, retain time.Duration) *InmemSink {
148	rateTimeUnit := time.Second
149	i := &InmemSink{
150		interval:     interval,
151		retain:       retain,
152		maxIntervals: int(retain / interval),
153		rateDenom:    float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
154	}
155	i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
156	return i
157}
158
159func (i *InmemSink) SetGauge(key []string, val float32) {
160	i.SetGaugeWithLabels(key, val, nil)
161}
162
163func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
164	k, name := i.flattenKeyLabels(key, labels)
165	intv := i.getInterval()
166
167	intv.Lock()
168	defer intv.Unlock()
169	intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
170}
171
172func (i *InmemSink) EmitKey(key []string, val float32) {
173	k := i.flattenKey(key)
174	intv := i.getInterval()
175
176	intv.Lock()
177	defer intv.Unlock()
178	vals := intv.Points[k]
179	intv.Points[k] = append(vals, val)
180}
181
182func (i *InmemSink) IncrCounter(key []string, val float32) {
183	i.IncrCounterWithLabels(key, val, nil)
184}
185
186func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
187	k, name := i.flattenKeyLabels(key, labels)
188	intv := i.getInterval()
189
190	intv.Lock()
191	defer intv.Unlock()
192
193	agg, ok := intv.Counters[k]
194	if !ok {
195		agg = SampledValue{
196			Name:            name,
197			AggregateSample: &AggregateSample{},
198			Labels:          labels,
199		}
200		intv.Counters[k] = agg
201	}
202	agg.Ingest(float64(val), i.rateDenom)
203}
204
205func (i *InmemSink) AddSample(key []string, val float32) {
206	i.AddSampleWithLabels(key, val, nil)
207}
208
209func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
210	k, name := i.flattenKeyLabels(key, labels)
211	intv := i.getInterval()
212
213	intv.Lock()
214	defer intv.Unlock()
215
216	agg, ok := intv.Samples[k]
217	if !ok {
218		agg = SampledValue{
219			Name:            name,
220			AggregateSample: &AggregateSample{},
221			Labels:          labels,
222		}
223		intv.Samples[k] = agg
224	}
225	agg.Ingest(float64(val), i.rateDenom)
226}
227
228// Data is used to retrieve all the aggregated metrics
229// Intervals may be in use, and a read lock should be acquired
230func (i *InmemSink) Data() []*IntervalMetrics {
231	// Get the current interval, forces creation
232	i.getInterval()
233
234	i.intervalLock.RLock()
235	defer i.intervalLock.RUnlock()
236
237	n := len(i.intervals)
238	intervals := make([]*IntervalMetrics, n)
239
240	copy(intervals[:n-1], i.intervals[:n-1])
241	current := i.intervals[n-1]
242
243	// make its own copy for current interval
244	intervals[n-1] = &IntervalMetrics{}
245	copyCurrent := intervals[n-1]
246	current.RLock()
247	*copyCurrent = *current
248
249	copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
250	for k, v := range current.Gauges {
251		copyCurrent.Gauges[k] = v
252	}
253	// saved values will be not change, just copy its link
254	copyCurrent.Points = make(map[string][]float32, len(current.Points))
255	for k, v := range current.Points {
256		copyCurrent.Points[k] = v
257	}
258	copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
259	for k, v := range current.Counters {
260		copyCurrent.Counters[k] = v.deepCopy()
261	}
262	copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
263	for k, v := range current.Samples {
264		copyCurrent.Samples[k] = v.deepCopy()
265	}
266	current.RUnlock()
267
268	return intervals
269}
270
271func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
272	i.intervalLock.RLock()
273	defer i.intervalLock.RUnlock()
274
275	n := len(i.intervals)
276	if n > 0 && i.intervals[n-1].Interval == intv {
277		return i.intervals[n-1]
278	}
279	return nil
280}
281
282func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
283	i.intervalLock.Lock()
284	defer i.intervalLock.Unlock()
285
286	// Check for an existing interval
287	n := len(i.intervals)
288	if n > 0 && i.intervals[n-1].Interval == intv {
289		return i.intervals[n-1]
290	}
291
292	// Add the current interval
293	current := NewIntervalMetrics(intv)
294	i.intervals = append(i.intervals, current)
295	n++
296
297	// Truncate the intervals if they are too long
298	if n >= i.maxIntervals {
299		copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
300		i.intervals = i.intervals[:i.maxIntervals]
301	}
302	return current
303}
304
305// getInterval returns the current interval to write to
306func (i *InmemSink) getInterval() *IntervalMetrics {
307	intv := time.Now().Truncate(i.interval)
308	if m := i.getExistingInterval(intv); m != nil {
309		return m
310	}
311	return i.createInterval(intv)
312}
313
314// Flattens the key for formatting, removes spaces
315func (i *InmemSink) flattenKey(parts []string) string {
316	buf := &bytes.Buffer{}
317
318	joined := strings.Join(parts, ".")
319
320	spaceReplacer.WriteString(buf, joined)
321
322	return buf.String()
323}
324
325// Flattens the key for formatting along with its labels, removes spaces
326func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
327	key := i.flattenKey(parts)
328	buf := bytes.NewBufferString(key)
329
330	for _, label := range labels {
331		spaceReplacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
332	}
333
334	return buf.String(), key
335}
336