1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package view
17
18import (
19	"math"
20	"time"
21
22	"go.opencensus.io/metric/metricdata"
23)
24
25// AggregationData represents an aggregated value from a collection.
26// They are reported on the view data during exporting.
27// Mosts users won't directly access aggregration data.
28type AggregationData interface {
29	isAggregationData() bool
30	addSample(v float64, attachments map[string]interface{}, t time.Time)
31	clone() AggregationData
32	equal(other AggregationData) bool
33	toPoint(t metricdata.Type, time time.Time) metricdata.Point
34}
35
36const epsilon = 1e-9
37
38// CountData is the aggregated data for the Count aggregation.
39// A count aggregation processes data and counts the recordings.
40//
41// Most users won't directly access count data.
42type CountData struct {
43	Value int64
44}
45
46func (a *CountData) isAggregationData() bool { return true }
47
48func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
49	a.Value = a.Value + 1
50}
51
52func (a *CountData) clone() AggregationData {
53	return &CountData{Value: a.Value}
54}
55
56func (a *CountData) equal(other AggregationData) bool {
57	a2, ok := other.(*CountData)
58	if !ok {
59		return false
60	}
61
62	return a.Value == a2.Value
63}
64
65func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
66	switch metricType {
67	case metricdata.TypeCumulativeInt64:
68		return metricdata.NewInt64Point(t, a.Value)
69	default:
70		panic("unsupported metricdata.Type")
71	}
72}
73
74// SumData is the aggregated data for the Sum aggregation.
75// A sum aggregation processes data and sums up the recordings.
76//
77// Most users won't directly access sum data.
78type SumData struct {
79	Value float64
80}
81
82func (a *SumData) isAggregationData() bool { return true }
83
84func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
85	a.Value += v
86}
87
88func (a *SumData) clone() AggregationData {
89	return &SumData{Value: a.Value}
90}
91
92func (a *SumData) equal(other AggregationData) bool {
93	a2, ok := other.(*SumData)
94	if !ok {
95		return false
96	}
97	return math.Pow(a.Value-a2.Value, 2) < epsilon
98}
99
100func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
101	switch metricType {
102	case metricdata.TypeCumulativeInt64:
103		return metricdata.NewInt64Point(t, int64(a.Value))
104	case metricdata.TypeCumulativeFloat64:
105		return metricdata.NewFloat64Point(t, a.Value)
106	default:
107		panic("unsupported metricdata.Type")
108	}
109}
110
111// DistributionData is the aggregated data for the
112// Distribution aggregation.
113//
114// Most users won't directly access distribution data.
115//
116// For a distribution with N bounds, the associated DistributionData will have
117// N+1 buckets.
118type DistributionData struct {
119	Count           int64   // number of data points aggregated
120	Min             float64 // minimum value in the distribution
121	Max             float64 // max value in the distribution
122	Mean            float64 // mean of the distribution
123	SumOfSquaredDev float64 // sum of the squared deviation from the mean
124	CountPerBucket  []int64 // number of occurrences per bucket
125	// ExemplarsPerBucket is slice the same length as CountPerBucket containing
126	// an exemplar for the associated bucket, or nil.
127	ExemplarsPerBucket []*metricdata.Exemplar
128	bounds             []float64 // histogram distribution of the values
129}
130
131func newDistributionData(agg *Aggregation) *DistributionData {
132	bucketCount := len(agg.Buckets) + 1
133	return &DistributionData{
134		CountPerBucket:     make([]int64, bucketCount),
135		ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
136		bounds:             agg.Buckets,
137		Min:                math.MaxFloat64,
138		Max:                math.SmallestNonzeroFloat64,
139	}
140}
141
142// Sum returns the sum of all samples collected.
143func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) }
144
145func (a *DistributionData) variance() float64 {
146	if a.Count <= 1 {
147		return 0
148	}
149	return a.SumOfSquaredDev / float64(a.Count-1)
150}
151
152func (a *DistributionData) isAggregationData() bool { return true }
153
154// TODO(songy23): support exemplar attachments.
155func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
156	if v < a.Min {
157		a.Min = v
158	}
159	if v > a.Max {
160		a.Max = v
161	}
162	a.Count++
163	a.addToBucket(v, attachments, t)
164
165	if a.Count == 1 {
166		a.Mean = v
167		return
168	}
169
170	oldMean := a.Mean
171	a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
172	a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
173}
174
175func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
176	var count *int64
177	var i int
178	var b float64
179	for i, b = range a.bounds {
180		if v < b {
181			count = &a.CountPerBucket[i]
182			break
183		}
184	}
185	if count == nil { // Last bucket.
186		i = len(a.bounds)
187		count = &a.CountPerBucket[i]
188	}
189	*count++
190	if exemplar := getExemplar(v, attachments, t); exemplar != nil {
191		a.ExemplarsPerBucket[i] = exemplar
192	}
193}
194
195func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
196	if len(attachments) == 0 {
197		return nil
198	}
199	return &metricdata.Exemplar{
200		Value:       v,
201		Timestamp:   t,
202		Attachments: attachments,
203	}
204}
205
206func (a *DistributionData) clone() AggregationData {
207	c := *a
208	c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
209	c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
210	return &c
211}
212
213func (a *DistributionData) equal(other AggregationData) bool {
214	a2, ok := other.(*DistributionData)
215	if !ok {
216		return false
217	}
218	if a2 == nil {
219		return false
220	}
221	if len(a.CountPerBucket) != len(a2.CountPerBucket) {
222		return false
223	}
224	for i := range a.CountPerBucket {
225		if a.CountPerBucket[i] != a2.CountPerBucket[i] {
226			return false
227		}
228	}
229	return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
230}
231
232func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
233	switch metricType {
234	case metricdata.TypeCumulativeDistribution:
235		buckets := []metricdata.Bucket{}
236		for i := 0; i < len(a.CountPerBucket); i++ {
237			buckets = append(buckets, metricdata.Bucket{
238				Count:    a.CountPerBucket[i],
239				Exemplar: a.ExemplarsPerBucket[i],
240			})
241		}
242		bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
243
244		val := &metricdata.Distribution{
245			Count:                 a.Count,
246			Sum:                   a.Sum(),
247			SumOfSquaredDeviation: a.SumOfSquaredDev,
248			BucketOptions:         bucketOptions,
249			Buckets:               buckets,
250		}
251		return metricdata.NewDistributionPoint(t, val)
252
253	default:
254		// TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
255		panic("unsupported metricdata.Type")
256	}
257}
258
259// LastValueData returns the last value recorded for LastValue aggregation.
260type LastValueData struct {
261	Value float64
262}
263
264func (l *LastValueData) isAggregationData() bool {
265	return true
266}
267
268func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
269	l.Value = v
270}
271
272func (l *LastValueData) clone() AggregationData {
273	return &LastValueData{l.Value}
274}
275
276func (l *LastValueData) equal(other AggregationData) bool {
277	a2, ok := other.(*LastValueData)
278	if !ok {
279		return false
280	}
281	return l.Value == a2.Value
282}
283
284func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
285	switch metricType {
286	case metricdata.TypeGaugeInt64:
287		return metricdata.NewInt64Point(t, int64(l.Value))
288	case metricdata.TypeGaugeFloat64:
289		return metricdata.NewFloat64Point(t, l.Value)
290	default:
291		panic("unsupported metricdata.Type")
292	}
293}
294