1package metrics 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "net/url" 8 "strings" 9 "sync" 10 "time" 11) 12 13// InmemSink provides a MetricSink that does in-memory aggregation 14// without sending metrics over a network. It can be embedded within 15// an application to provide profiling information. 16type InmemSink struct { 17 // How long is each aggregation interval 18 interval time.Duration 19 20 // Retain controls how many metrics interval we keep 21 retain time.Duration 22 23 // maxIntervals is the maximum length of intervals. 24 // It is retain / interval. 25 maxIntervals int 26 27 // intervals is a slice of the retained intervals 28 intervals []*IntervalMetrics 29 intervalLock sync.RWMutex 30 31 rateDenom float64 32} 33 34// IntervalMetrics stores the aggregated metrics 35// for a specific interval 36type IntervalMetrics struct { 37 sync.RWMutex 38 39 // The start time of the interval 40 Interval time.Time 41 42 // Gauges maps the key to the last set value 43 Gauges map[string]GaugeValue 44 45 // Points maps the string to the list of emitted values 46 // from EmitKey 47 Points map[string][]float32 48 49 // Counters maps the string key to a sum of the counter 50 // values 51 Counters map[string]SampledValue 52 53 // Samples maps the key to an AggregateSample, 54 // which has the rolled up view of a sample 55 Samples map[string]SampledValue 56} 57 58// NewIntervalMetrics creates a new IntervalMetrics for a given interval 59func NewIntervalMetrics(intv time.Time) *IntervalMetrics { 60 return &IntervalMetrics{ 61 Interval: intv, 62 Gauges: make(map[string]GaugeValue), 63 Points: make(map[string][]float32), 64 Counters: make(map[string]SampledValue), 65 Samples: make(map[string]SampledValue), 66 } 67} 68 69// AggregateSample is used to hold aggregate metrics 70// about a sample 71type AggregateSample struct { 72 Count int // The count of emitted pairs 73 Rate float64 // The values rate per time unit (usually 1 second) 74 Sum float64 // The sum of values 75 SumSq float64 `json:"-"` // The sum of squared values 76 Min float64 // Minimum value 77 Max float64 // Maximum value 78 LastUpdated time.Time `json:"-"` // When value was last updated 79} 80 81// Computes a Stddev of the values 82func (a *AggregateSample) Stddev() float64 { 83 num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) 84 div := float64(a.Count * (a.Count - 1)) 85 if div == 0 { 86 return 0 87 } 88 return math.Sqrt(num / div) 89} 90 91// Computes a mean of the values 92func (a *AggregateSample) Mean() float64 { 93 if a.Count == 0 { 94 return 0 95 } 96 return a.Sum / float64(a.Count) 97} 98 99// Ingest is used to update a sample 100func (a *AggregateSample) Ingest(v float64, rateDenom float64) { 101 a.Count++ 102 a.Sum += v 103 a.SumSq += (v * v) 104 if v < a.Min || a.Count == 1 { 105 a.Min = v 106 } 107 if v > a.Max || a.Count == 1 { 108 a.Max = v 109 } 110 a.Rate = float64(a.Sum) / rateDenom 111 a.LastUpdated = time.Now() 112} 113 114func (a *AggregateSample) String() string { 115 if a.Count == 0 { 116 return "Count: 0" 117 } else if a.Stddev() == 0 { 118 return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) 119 } else { 120 return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", 121 a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) 122 } 123} 124 125// NewInmemSinkFromURL creates an InmemSink from a URL. It is used 126// (and tested) from NewMetricSinkFromURL. 127func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) { 128 params := u.Query() 129 130 interval, err := time.ParseDuration(params.Get("interval")) 131 if err != nil { 132 return nil, fmt.Errorf("Bad 'interval' param: %s", err) 133 } 134 135 retain, err := time.ParseDuration(params.Get("retain")) 136 if err != nil { 137 return nil, fmt.Errorf("Bad 'retain' param: %s", err) 138 } 139 140 return NewInmemSink(interval, retain), nil 141} 142 143// NewInmemSink is used to construct a new in-memory sink. 144// Uses an aggregation interval and maximum retention period. 145func NewInmemSink(interval, retain time.Duration) *InmemSink { 146 rateTimeUnit := time.Second 147 i := &InmemSink{ 148 interval: interval, 149 retain: retain, 150 maxIntervals: int(retain / interval), 151 rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), 152 } 153 i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) 154 return i 155} 156 157func (i *InmemSink) SetGauge(key []string, val float32) { 158 i.SetGaugeWithLabels(key, val, nil) 159} 160 161func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { 162 k, name := i.flattenKeyLabels(key, labels) 163 intv := i.getInterval() 164 165 intv.Lock() 166 defer intv.Unlock() 167 intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels} 168} 169 170func (i *InmemSink) EmitKey(key []string, val float32) { 171 k := i.flattenKey(key) 172 intv := i.getInterval() 173 174 intv.Lock() 175 defer intv.Unlock() 176 vals := intv.Points[k] 177 intv.Points[k] = append(vals, val) 178} 179 180func (i *InmemSink) IncrCounter(key []string, val float32) { 181 i.IncrCounterWithLabels(key, val, nil) 182} 183 184func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { 185 k, name := i.flattenKeyLabels(key, labels) 186 intv := i.getInterval() 187 188 intv.Lock() 189 defer intv.Unlock() 190 191 agg, ok := intv.Counters[k] 192 if !ok { 193 agg = SampledValue{ 194 Name: name, 195 AggregateSample: &AggregateSample{}, 196 Labels: labels, 197 } 198 intv.Counters[k] = agg 199 } 200 agg.Ingest(float64(val), i.rateDenom) 201} 202 203func (i *InmemSink) AddSample(key []string, val float32) { 204 i.AddSampleWithLabels(key, val, nil) 205} 206 207func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) { 208 k, name := i.flattenKeyLabels(key, labels) 209 intv := i.getInterval() 210 211 intv.Lock() 212 defer intv.Unlock() 213 214 agg, ok := intv.Samples[k] 215 if !ok { 216 agg = SampledValue{ 217 Name: name, 218 AggregateSample: &AggregateSample{}, 219 Labels: labels, 220 } 221 intv.Samples[k] = agg 222 } 223 agg.Ingest(float64(val), i.rateDenom) 224} 225 226// Data is used to retrieve all the aggregated metrics 227// Intervals may be in use, and a read lock should be acquired 228func (i *InmemSink) Data() []*IntervalMetrics { 229 // Get the current interval, forces creation 230 i.getInterval() 231 232 i.intervalLock.RLock() 233 defer i.intervalLock.RUnlock() 234 235 n := len(i.intervals) 236 intervals := make([]*IntervalMetrics, n) 237 238 copy(intervals[:n-1], i.intervals[:n-1]) 239 current := i.intervals[n-1] 240 241 // make its own copy for current interval 242 intervals[n-1] = &IntervalMetrics{} 243 copyCurrent := intervals[n-1] 244 current.RLock() 245 *copyCurrent = *current 246 247 copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges)) 248 for k, v := range current.Gauges { 249 copyCurrent.Gauges[k] = v 250 } 251 // saved values will be not change, just copy its link 252 copyCurrent.Points = make(map[string][]float32, len(current.Points)) 253 for k, v := range current.Points { 254 copyCurrent.Points[k] = v 255 } 256 copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters)) 257 for k, v := range current.Counters { 258 copyCurrent.Counters[k] = v 259 } 260 copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples)) 261 for k, v := range current.Samples { 262 copyCurrent.Samples[k] = v 263 } 264 current.RUnlock() 265 266 return intervals 267} 268 269func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { 270 i.intervalLock.RLock() 271 defer i.intervalLock.RUnlock() 272 273 n := len(i.intervals) 274 if n > 0 && i.intervals[n-1].Interval == intv { 275 return i.intervals[n-1] 276 } 277 return nil 278} 279 280func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { 281 i.intervalLock.Lock() 282 defer i.intervalLock.Unlock() 283 284 // Check for an existing interval 285 n := len(i.intervals) 286 if n > 0 && i.intervals[n-1].Interval == intv { 287 return i.intervals[n-1] 288 } 289 290 // Add the current interval 291 current := NewIntervalMetrics(intv) 292 i.intervals = append(i.intervals, current) 293 n++ 294 295 // Truncate the intervals if they are too long 296 if n >= i.maxIntervals { 297 copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) 298 i.intervals = i.intervals[:i.maxIntervals] 299 } 300 return current 301} 302 303// getInterval returns the current interval to write to 304func (i *InmemSink) getInterval() *IntervalMetrics { 305 intv := time.Now().Truncate(i.interval) 306 if m := i.getExistingInterval(intv); m != nil { 307 return m 308 } 309 return i.createInterval(intv) 310} 311 312// Flattens the key for formatting, removes spaces 313func (i *InmemSink) flattenKey(parts []string) string { 314 buf := &bytes.Buffer{} 315 replacer := strings.NewReplacer(" ", "_") 316 317 if len(parts) > 0 { 318 replacer.WriteString(buf, parts[0]) 319 } 320 for _, part := range parts[1:] { 321 replacer.WriteString(buf, ".") 322 replacer.WriteString(buf, part) 323 } 324 325 return buf.String() 326} 327 328// Flattens the key for formatting along with its labels, removes spaces 329func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) { 330 buf := &bytes.Buffer{} 331 replacer := strings.NewReplacer(" ", "_") 332 333 if len(parts) > 0 { 334 replacer.WriteString(buf, parts[0]) 335 } 336 for _, part := range parts[1:] { 337 replacer.WriteString(buf, ".") 338 replacer.WriteString(buf, part) 339 } 340 341 key := buf.String() 342 343 for _, label := range labels { 344 replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value)) 345 } 346 347 return buf.String(), key 348} 349