1package metrics 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "net/url" 8 "strings" 9 "sync" 10 "time" 11) 12 13var spaceReplacer = strings.NewReplacer(" ", "_") 14 15// InmemSink provides a MetricSink that does in-memory aggregation 16// without sending metrics over a network. It can be embedded within 17// an application to provide profiling information. 18type InmemSink struct { 19 // How long is each aggregation interval 20 interval time.Duration 21 22 // Retain controls how many metrics interval we keep 23 retain time.Duration 24 25 // maxIntervals is the maximum length of intervals. 26 // It is retain / interval. 27 maxIntervals int 28 29 // intervals is a slice of the retained intervals 30 intervals []*IntervalMetrics 31 intervalLock sync.RWMutex 32 33 rateDenom float64 34} 35 36// IntervalMetrics stores the aggregated metrics 37// for a specific interval 38type IntervalMetrics struct { 39 sync.RWMutex 40 41 // The start time of the interval 42 Interval time.Time 43 44 // Gauges maps the key to the last set value 45 Gauges map[string]GaugeValue 46 47 // Points maps the string to the list of emitted values 48 // from EmitKey 49 Points map[string][]float32 50 51 // Counters maps the string key to a sum of the counter 52 // values 53 Counters map[string]SampledValue 54 55 // Samples maps the key to an AggregateSample, 56 // which has the rolled up view of a sample 57 Samples map[string]SampledValue 58} 59 60// NewIntervalMetrics creates a new IntervalMetrics for a given interval 61func NewIntervalMetrics(intv time.Time) *IntervalMetrics { 62 return &IntervalMetrics{ 63 Interval: intv, 64 Gauges: make(map[string]GaugeValue), 65 Points: make(map[string][]float32), 66 Counters: make(map[string]SampledValue), 67 Samples: make(map[string]SampledValue), 68 } 69} 70 71// AggregateSample is used to hold aggregate metrics 72// about a sample 73type AggregateSample struct { 74 Count int // The count of emitted pairs 75 Rate float64 // The values rate per time unit (usually 1 second) 76 Sum float64 // The sum of values 77 SumSq float64 `json:"-"` // The sum of squared values 78 Min float64 // Minimum value 79 Max float64 // Maximum value 80 LastUpdated time.Time `json:"-"` // When value was last updated 81} 82 83// Computes a Stddev of the values 84func (a *AggregateSample) Stddev() float64 { 85 num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) 86 div := float64(a.Count * (a.Count - 1)) 87 if div == 0 { 88 return 0 89 } 90 return math.Sqrt(num / div) 91} 92 93// Computes a mean of the values 94func (a *AggregateSample) Mean() float64 { 95 if a.Count == 0 { 96 return 0 97 } 98 return a.Sum / float64(a.Count) 99} 100 101// Ingest is used to update a sample 102func (a *AggregateSample) Ingest(v float64, rateDenom float64) { 103 a.Count++ 104 a.Sum += v 105 a.SumSq += (v * v) 106 if v < a.Min || a.Count == 1 { 107 a.Min = v 108 } 109 if v > a.Max || a.Count == 1 { 110 a.Max = v 111 } 112 a.Rate = float64(a.Sum) / rateDenom 113 a.LastUpdated = time.Now() 114} 115 116func (a *AggregateSample) String() string { 117 if a.Count == 0 { 118 return "Count: 0" 119 } else if a.Stddev() == 0 { 120 return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) 121 } else { 122 return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", 123 a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) 124 } 125} 126 127// NewInmemSinkFromURL creates an InmemSink from a URL. It is used 128// (and tested) from NewMetricSinkFromURL. 129func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) { 130 params := u.Query() 131 132 interval, err := time.ParseDuration(params.Get("interval")) 133 if err != nil { 134 return nil, fmt.Errorf("Bad 'interval' param: %s", err) 135 } 136 137 retain, err := time.ParseDuration(params.Get("retain")) 138 if err != nil { 139 return nil, fmt.Errorf("Bad 'retain' param: %s", err) 140 } 141 142 return NewInmemSink(interval, retain), nil 143} 144 145// NewInmemSink is used to construct a new in-memory sink. 146// Uses an aggregation interval and maximum retention period. 147func NewInmemSink(interval, retain time.Duration) *InmemSink { 148 rateTimeUnit := time.Second 149 i := &InmemSink{ 150 interval: interval, 151 retain: retain, 152 maxIntervals: int(retain / interval), 153 rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), 154 } 155 i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) 156 return i 157} 158 159func (i *InmemSink) SetGauge(key []string, val float32) { 160 i.SetGaugeWithLabels(key, val, nil) 161} 162 163func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { 164 k, name := i.flattenKeyLabels(key, labels) 165 intv := i.getInterval() 166 167 intv.Lock() 168 defer intv.Unlock() 169 intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels} 170} 171 172func (i *InmemSink) EmitKey(key []string, val float32) { 173 k := i.flattenKey(key) 174 intv := i.getInterval() 175 176 intv.Lock() 177 defer intv.Unlock() 178 vals := intv.Points[k] 179 intv.Points[k] = append(vals, val) 180} 181 182func (i *InmemSink) IncrCounter(key []string, val float32) { 183 i.IncrCounterWithLabels(key, val, nil) 184} 185 186func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { 187 k, name := i.flattenKeyLabels(key, labels) 188 intv := i.getInterval() 189 190 intv.Lock() 191 defer intv.Unlock() 192 193 agg, ok := intv.Counters[k] 194 if !ok { 195 agg = SampledValue{ 196 Name: name, 197 AggregateSample: &AggregateSample{}, 198 Labels: labels, 199 } 200 intv.Counters[k] = agg 201 } 202 agg.Ingest(float64(val), i.rateDenom) 203} 204 205func (i *InmemSink) AddSample(key []string, val float32) { 206 i.AddSampleWithLabels(key, val, nil) 207} 208 209func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) { 210 k, name := i.flattenKeyLabels(key, labels) 211 intv := i.getInterval() 212 213 intv.Lock() 214 defer intv.Unlock() 215 216 agg, ok := intv.Samples[k] 217 if !ok { 218 agg = SampledValue{ 219 Name: name, 220 AggregateSample: &AggregateSample{}, 221 Labels: labels, 222 } 223 intv.Samples[k] = agg 224 } 225 agg.Ingest(float64(val), i.rateDenom) 226} 227 228// Data is used to retrieve all the aggregated metrics 229// Intervals may be in use, and a read lock should be acquired 230func (i *InmemSink) Data() []*IntervalMetrics { 231 // Get the current interval, forces creation 232 i.getInterval() 233 234 i.intervalLock.RLock() 235 defer i.intervalLock.RUnlock() 236 237 n := len(i.intervals) 238 intervals := make([]*IntervalMetrics, n) 239 240 copy(intervals[:n-1], i.intervals[:n-1]) 241 current := i.intervals[n-1] 242 243 // make its own copy for current interval 244 intervals[n-1] = &IntervalMetrics{} 245 copyCurrent := intervals[n-1] 246 current.RLock() 247 *copyCurrent = *current 248 249 copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges)) 250 for k, v := range current.Gauges { 251 copyCurrent.Gauges[k] = v 252 } 253 // saved values will be not change, just copy its link 254 copyCurrent.Points = make(map[string][]float32, len(current.Points)) 255 for k, v := range current.Points { 256 copyCurrent.Points[k] = v 257 } 258 copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters)) 259 for k, v := range current.Counters { 260 copyCurrent.Counters[k] = v.deepCopy() 261 } 262 copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples)) 263 for k, v := range current.Samples { 264 copyCurrent.Samples[k] = v.deepCopy() 265 } 266 current.RUnlock() 267 268 return intervals 269} 270 271func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { 272 i.intervalLock.RLock() 273 defer i.intervalLock.RUnlock() 274 275 n := len(i.intervals) 276 if n > 0 && i.intervals[n-1].Interval == intv { 277 return i.intervals[n-1] 278 } 279 return nil 280} 281 282func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { 283 i.intervalLock.Lock() 284 defer i.intervalLock.Unlock() 285 286 // Check for an existing interval 287 n := len(i.intervals) 288 if n > 0 && i.intervals[n-1].Interval == intv { 289 return i.intervals[n-1] 290 } 291 292 // Add the current interval 293 current := NewIntervalMetrics(intv) 294 i.intervals = append(i.intervals, current) 295 n++ 296 297 // Truncate the intervals if they are too long 298 if n >= i.maxIntervals { 299 copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) 300 i.intervals = i.intervals[:i.maxIntervals] 301 } 302 return current 303} 304 305// getInterval returns the current interval to write to 306func (i *InmemSink) getInterval() *IntervalMetrics { 307 intv := time.Now().Truncate(i.interval) 308 if m := i.getExistingInterval(intv); m != nil { 309 return m 310 } 311 return i.createInterval(intv) 312} 313 314// Flattens the key for formatting, removes spaces 315func (i *InmemSink) flattenKey(parts []string) string { 316 buf := &bytes.Buffer{} 317 318 joined := strings.Join(parts, ".") 319 320 spaceReplacer.WriteString(buf, joined) 321 322 return buf.String() 323} 324 325// Flattens the key for formatting along with its labels, removes spaces 326func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) { 327 key := i.flattenKey(parts) 328 buf := bytes.NewBufferString(key) 329 330 for _, label := range labels { 331 spaceReplacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value)) 332 } 333 334 return buf.String(), key 335} 336