1// Copyright 2017, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15 16package view 17 18import ( 19 "math" 20 "time" 21 22 "go.opencensus.io/metric/metricdata" 23) 24 25// AggregationData represents an aggregated value from a collection. 26// They are reported on the view data during exporting. 27// Mosts users won't directly access aggregration data. 28type AggregationData interface { 29 isAggregationData() bool 30 addSample(v float64, attachments map[string]interface{}, t time.Time) 31 clone() AggregationData 32 equal(other AggregationData) bool 33 toPoint(t metricdata.Type, time time.Time) metricdata.Point 34} 35 36const epsilon = 1e-9 37 38// CountData is the aggregated data for the Count aggregation. 39// A count aggregation processes data and counts the recordings. 40// 41// Most users won't directly access count data. 42type CountData struct { 43 Value int64 44} 45 46func (a *CountData) isAggregationData() bool { return true } 47 48func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) { 49 a.Value = a.Value + 1 50} 51 52func (a *CountData) clone() AggregationData { 53 return &CountData{Value: a.Value} 54} 55 56func (a *CountData) equal(other AggregationData) bool { 57 a2, ok := other.(*CountData) 58 if !ok { 59 return false 60 } 61 62 return a.Value == a2.Value 63} 64 65func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { 66 switch metricType { 67 case metricdata.TypeCumulativeInt64: 68 return metricdata.NewInt64Point(t, a.Value) 69 default: 70 panic("unsupported metricdata.Type") 71 } 72} 73 74// SumData is the aggregated data for the Sum aggregation. 75// A sum aggregation processes data and sums up the recordings. 76// 77// Most users won't directly access sum data. 78type SumData struct { 79 Value float64 80} 81 82func (a *SumData) isAggregationData() bool { return true } 83 84func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) { 85 a.Value += v 86} 87 88func (a *SumData) clone() AggregationData { 89 return &SumData{Value: a.Value} 90} 91 92func (a *SumData) equal(other AggregationData) bool { 93 a2, ok := other.(*SumData) 94 if !ok { 95 return false 96 } 97 return math.Pow(a.Value-a2.Value, 2) < epsilon 98} 99 100func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { 101 switch metricType { 102 case metricdata.TypeCumulativeInt64: 103 return metricdata.NewInt64Point(t, int64(a.Value)) 104 case metricdata.TypeCumulativeFloat64: 105 return metricdata.NewFloat64Point(t, a.Value) 106 default: 107 panic("unsupported metricdata.Type") 108 } 109} 110 111// DistributionData is the aggregated data for the 112// Distribution aggregation. 113// 114// Most users won't directly access distribution data. 115// 116// For a distribution with N bounds, the associated DistributionData will have 117// N+1 buckets. 118type DistributionData struct { 119 Count int64 // number of data points aggregated 120 Min float64 // minimum value in the distribution 121 Max float64 // max value in the distribution 122 Mean float64 // mean of the distribution 123 SumOfSquaredDev float64 // sum of the squared deviation from the mean 124 CountPerBucket []int64 // number of occurrences per bucket 125 // ExemplarsPerBucket is slice the same length as CountPerBucket containing 126 // an exemplar for the associated bucket, or nil. 127 ExemplarsPerBucket []*metricdata.Exemplar 128 bounds []float64 // histogram distribution of the values 129} 130 131func newDistributionData(agg *Aggregation) *DistributionData { 132 bucketCount := len(agg.Buckets) + 1 133 return &DistributionData{ 134 CountPerBucket: make([]int64, bucketCount), 135 ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount), 136 bounds: agg.Buckets, 137 Min: math.MaxFloat64, 138 Max: math.SmallestNonzeroFloat64, 139 } 140} 141 142// Sum returns the sum of all samples collected. 143func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) } 144 145func (a *DistributionData) variance() float64 { 146 if a.Count <= 1 { 147 return 0 148 } 149 return a.SumOfSquaredDev / float64(a.Count-1) 150} 151 152func (a *DistributionData) isAggregationData() bool { return true } 153 154// TODO(songy23): support exemplar attachments. 155func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) { 156 if v < a.Min { 157 a.Min = v 158 } 159 if v > a.Max { 160 a.Max = v 161 } 162 a.Count++ 163 a.addToBucket(v, attachments, t) 164 165 if a.Count == 1 { 166 a.Mean = v 167 return 168 } 169 170 oldMean := a.Mean 171 a.Mean = a.Mean + (v-a.Mean)/float64(a.Count) 172 a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean) 173} 174 175func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) { 176 var count *int64 177 var i int 178 var b float64 179 for i, b = range a.bounds { 180 if v < b { 181 count = &a.CountPerBucket[i] 182 break 183 } 184 } 185 if count == nil { // Last bucket. 186 i = len(a.bounds) 187 count = &a.CountPerBucket[i] 188 } 189 *count++ 190 if exemplar := getExemplar(v, attachments, t); exemplar != nil { 191 a.ExemplarsPerBucket[i] = exemplar 192 } 193} 194 195func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar { 196 if len(attachments) == 0 { 197 return nil 198 } 199 return &metricdata.Exemplar{ 200 Value: v, 201 Timestamp: t, 202 Attachments: attachments, 203 } 204} 205 206func (a *DistributionData) clone() AggregationData { 207 c := *a 208 c.CountPerBucket = append([]int64(nil), a.CountPerBucket...) 209 c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...) 210 return &c 211} 212 213func (a *DistributionData) equal(other AggregationData) bool { 214 a2, ok := other.(*DistributionData) 215 if !ok { 216 return false 217 } 218 if a2 == nil { 219 return false 220 } 221 if len(a.CountPerBucket) != len(a2.CountPerBucket) { 222 return false 223 } 224 for i := range a.CountPerBucket { 225 if a.CountPerBucket[i] != a2.CountPerBucket[i] { 226 return false 227 } 228 } 229 return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon 230} 231 232func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { 233 switch metricType { 234 case metricdata.TypeCumulativeDistribution: 235 buckets := []metricdata.Bucket{} 236 for i := 0; i < len(a.CountPerBucket); i++ { 237 buckets = append(buckets, metricdata.Bucket{ 238 Count: a.CountPerBucket[i], 239 Exemplar: a.ExemplarsPerBucket[i], 240 }) 241 } 242 bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds} 243 244 val := &metricdata.Distribution{ 245 Count: a.Count, 246 Sum: a.Sum(), 247 SumOfSquaredDeviation: a.SumOfSquaredDev, 248 BucketOptions: bucketOptions, 249 Buckets: buckets, 250 } 251 return metricdata.NewDistributionPoint(t, val) 252 253 default: 254 // TODO: [rghetia] when we have a use case for TypeGaugeDistribution. 255 panic("unsupported metricdata.Type") 256 } 257} 258 259// LastValueData returns the last value recorded for LastValue aggregation. 260type LastValueData struct { 261 Value float64 262} 263 264func (l *LastValueData) isAggregationData() bool { 265 return true 266} 267 268func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) { 269 l.Value = v 270} 271 272func (l *LastValueData) clone() AggregationData { 273 return &LastValueData{l.Value} 274} 275 276func (l *LastValueData) equal(other AggregationData) bool { 277 a2, ok := other.(*LastValueData) 278 if !ok { 279 return false 280 } 281 return l.Value == a2.Value 282} 283 284func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { 285 switch metricType { 286 case metricdata.TypeGaugeInt64: 287 return metricdata.NewInt64Point(t, int64(l.Value)) 288 case metricdata.TypeGaugeFloat64: 289 return metricdata.NewFloat64Point(t, l.Value) 290 default: 291 panic("unsupported metricdata.Type") 292 } 293} 294