1package metrics 2 3import ( 4 "fmt" 5 "math" 6 "strings" 7 "sync" 8 "time" 9) 10 11// InmemSink provides a MetricSink that does in-memory aggregation 12// without sending metrics over a network. It can be embedded within 13// an application to provide profiling information. 14type InmemSink struct { 15 // How long is each aggregation interval 16 interval time.Duration 17 18 // Retain controls how many metrics interval we keep 19 retain time.Duration 20 21 // maxIntervals is the maximum length of intervals. 22 // It is retain / interval. 23 maxIntervals int 24 25 // intervals is a slice of the retained intervals 26 intervals []*IntervalMetrics 27 intervalLock sync.RWMutex 28} 29 30// IntervalMetrics stores the aggregated metrics 31// for a specific interval 32type IntervalMetrics struct { 33 sync.RWMutex 34 35 // The start time of the interval 36 Interval time.Time 37 38 // Gauges maps the key to the last set value 39 Gauges map[string]float32 40 41 // Points maps the string to the list of emitted values 42 // from EmitKey 43 Points map[string][]float32 44 45 // Counters maps the string key to a sum of the counter 46 // values 47 Counters map[string]*AggregateSample 48 49 // Samples maps the key to an AggregateSample, 50 // which has the rolled up view of a sample 51 Samples map[string]*AggregateSample 52} 53 54// NewIntervalMetrics creates a new IntervalMetrics for a given interval 55func NewIntervalMetrics(intv time.Time) *IntervalMetrics { 56 return &IntervalMetrics{ 57 Interval: intv, 58 Gauges: make(map[string]float32), 59 Points: make(map[string][]float32), 60 Counters: make(map[string]*AggregateSample), 61 Samples: make(map[string]*AggregateSample), 62 } 63} 64 65// AggregateSample is used to hold aggregate metrics 66// about a sample 67type AggregateSample struct { 68 Count int // The count of emitted pairs 69 Sum float64 // The sum of values 70 SumSq float64 // The sum of squared values 71 Min float64 // Minimum value 72 Max float64 // Maximum value 73} 74 75// Computes a Stddev of the values 76func (a *AggregateSample) Stddev() float64 { 77 num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) 78 div := float64(a.Count * (a.Count - 1)) 79 if div == 0 { 80 return 0 81 } 82 return math.Sqrt(num / div) 83} 84 85// Computes a mean of the values 86func (a *AggregateSample) Mean() float64 { 87 if a.Count == 0 { 88 return 0 89 } 90 return a.Sum / float64(a.Count) 91} 92 93// Ingest is used to update a sample 94func (a *AggregateSample) Ingest(v float64) { 95 a.Count++ 96 a.Sum += v 97 a.SumSq += (v * v) 98 if v < a.Min || a.Count == 1 { 99 a.Min = v 100 } 101 if v > a.Max || a.Count == 1 { 102 a.Max = v 103 } 104} 105 106func (a *AggregateSample) String() string { 107 if a.Count == 0 { 108 return "Count: 0" 109 } else if a.Stddev() == 0 { 110 return fmt.Sprintf("Count: %d Sum: %0.3f", a.Count, a.Sum) 111 } else { 112 return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f", 113 a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum) 114 } 115} 116 117// NewInmemSink is used to construct a new in-memory sink. 118// Uses an aggregation interval and maximum retention period. 119func NewInmemSink(interval, retain time.Duration) *InmemSink { 120 i := &InmemSink{ 121 interval: interval, 122 retain: retain, 123 maxIntervals: int(retain / interval), 124 } 125 i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) 126 return i 127} 128 129func (i *InmemSink) SetGauge(key []string, val float32) { 130 k := i.flattenKey(key) 131 intv := i.getInterval() 132 133 intv.Lock() 134 defer intv.Unlock() 135 intv.Gauges[k] = val 136} 137 138func (i *InmemSink) EmitKey(key []string, val float32) { 139 k := i.flattenKey(key) 140 intv := i.getInterval() 141 142 intv.Lock() 143 defer intv.Unlock() 144 vals := intv.Points[k] 145 intv.Points[k] = append(vals, val) 146} 147 148func (i *InmemSink) IncrCounter(key []string, val float32) { 149 k := i.flattenKey(key) 150 intv := i.getInterval() 151 152 intv.Lock() 153 defer intv.Unlock() 154 155 agg := intv.Counters[k] 156 if agg == nil { 157 agg = &AggregateSample{} 158 intv.Counters[k] = agg 159 } 160 agg.Ingest(float64(val)) 161} 162 163func (i *InmemSink) AddSample(key []string, val float32) { 164 k := i.flattenKey(key) 165 intv := i.getInterval() 166 167 intv.Lock() 168 defer intv.Unlock() 169 170 agg := intv.Samples[k] 171 if agg == nil { 172 agg = &AggregateSample{} 173 intv.Samples[k] = agg 174 } 175 agg.Ingest(float64(val)) 176} 177 178// Data is used to retrieve all the aggregated metrics 179// Intervals may be in use, and a read lock should be acquired 180func (i *InmemSink) Data() []*IntervalMetrics { 181 // Get the current interval, forces creation 182 i.getInterval() 183 184 i.intervalLock.RLock() 185 defer i.intervalLock.RUnlock() 186 187 intervals := make([]*IntervalMetrics, len(i.intervals)) 188 copy(intervals, i.intervals) 189 return intervals 190} 191 192func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { 193 i.intervalLock.RLock() 194 defer i.intervalLock.RUnlock() 195 196 n := len(i.intervals) 197 if n > 0 && i.intervals[n-1].Interval == intv { 198 return i.intervals[n-1] 199 } 200 return nil 201} 202 203func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { 204 i.intervalLock.Lock() 205 defer i.intervalLock.Unlock() 206 207 // Check for an existing interval 208 n := len(i.intervals) 209 if n > 0 && i.intervals[n-1].Interval == intv { 210 return i.intervals[n-1] 211 } 212 213 // Add the current interval 214 current := NewIntervalMetrics(intv) 215 i.intervals = append(i.intervals, current) 216 n++ 217 218 // Truncate the intervals if they are too long 219 if n >= i.maxIntervals { 220 copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) 221 i.intervals = i.intervals[:i.maxIntervals] 222 } 223 return current 224} 225 226// getInterval returns the current interval to write to 227func (i *InmemSink) getInterval() *IntervalMetrics { 228 intv := time.Now().Truncate(i.interval) 229 if m := i.getExistingInterval(intv); m != nil { 230 return m 231 } 232 return i.createInterval(intv) 233} 234 235// Flattens the key for formatting, removes spaces 236func (i *InmemSink) flattenKey(parts []string) string { 237 joined := strings.Join(parts, ".") 238 return strings.Replace(joined, " ", "_", -1) 239} 240