1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package ddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
15
16import (
17	"context"
18	"math"
19	"sync"
20
21	sdk "github.com/DataDog/sketches-go/ddsketch"
22
23	"go.opentelemetry.io/otel/metric"
24	"go.opentelemetry.io/otel/metric/number"
25	export "go.opentelemetry.io/otel/sdk/export/metric"
26	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
27	"go.opentelemetry.io/otel/sdk/metric/aggregator"
28)
29
30// Config is an alias for the underlying DDSketch config object.
31type Config = sdk.Config
32
33// Aggregator aggregates events into a distribution.
34type Aggregator struct {
35	lock   sync.Mutex
36	cfg    *Config
37	kind   number.Kind
38	sketch *sdk.DDSketch
39}
40
41var _ export.Aggregator = &Aggregator{}
42var _ aggregation.MinMaxSumCount = &Aggregator{}
43var _ aggregation.Distribution = &Aggregator{}
44
45// New returns a new DDSketch aggregator.
46func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator {
47	if cfg == nil {
48		cfg = NewDefaultConfig()
49	}
50	aggs := make([]Aggregator, cnt)
51	for i := range aggs {
52		aggs[i] = Aggregator{
53			cfg:    cfg,
54			kind:   desc.NumberKind(),
55			sketch: sdk.NewDDSketch(cfg),
56		}
57	}
58	return aggs
59}
60
61// Aggregation returns an interface for reading the state of this aggregator.
62func (c *Aggregator) Aggregation() aggregation.Aggregation {
63	return c
64}
65
66// Kind returns aggregation.SketchKind.
67func (c *Aggregator) Kind() aggregation.Kind {
68	return aggregation.SketchKind
69}
70
71// NewDefaultConfig returns a new, default DDSketch config.
72func NewDefaultConfig() *Config {
73	return sdk.NewDefaultConfig()
74}
75
76// Sum returns the sum of values in the checkpoint.
77func (c *Aggregator) Sum() (number.Number, error) {
78	return c.toNumber(c.sketch.Sum()), nil
79}
80
81// Count returns the number of values in the checkpoint.
82func (c *Aggregator) Count() (int64, error) {
83	return c.sketch.Count(), nil
84}
85
86// Max returns the maximum value in the checkpoint.
87func (c *Aggregator) Max() (number.Number, error) {
88	return c.Quantile(1)
89}
90
91// Min returns the minimum value in the checkpoint.
92func (c *Aggregator) Min() (number.Number, error) {
93	return c.Quantile(0)
94}
95
96// Quantile returns the estimated quantile of data in the checkpoint.
97// It is an error if `q` is less than 0 or greated than 1.
98func (c *Aggregator) Quantile(q float64) (number.Number, error) {
99	if c.sketch.Count() == 0 {
100		return 0, aggregation.ErrNoData
101	}
102	f := c.sketch.Quantile(q)
103	if math.IsNaN(f) {
104		return 0, aggregation.ErrInvalidQuantile
105	}
106	return c.toNumber(f), nil
107}
108
109func (c *Aggregator) toNumber(f float64) number.Number {
110	if c.kind == number.Float64Kind {
111		return number.NewFloat64Number(f)
112	}
113	return number.NewInt64Number(int64(f))
114}
115
116// SynchronizedMove saves the current state into oa and resets the current state to
117// a new sketch, taking a lock to prevent concurrent Update() calls.
118func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
119	o, _ := oa.(*Aggregator)
120	if o == nil {
121		return aggregator.NewInconsistentAggregatorError(c, oa)
122	}
123	replace := sdk.NewDDSketch(c.cfg)
124
125	c.lock.Lock()
126	o.sketch, c.sketch = c.sketch, replace
127	c.lock.Unlock()
128
129	return nil
130}
131
132// Update adds the recorded measurement to the current data set.
133// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
134// calls.
135func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metric.Descriptor) error {
136	c.lock.Lock()
137	defer c.lock.Unlock()
138	c.sketch.Add(number.CoerceToFloat64(desc.NumberKind()))
139	return nil
140}
141
142// Merge combines two sketches into one.
143func (c *Aggregator) Merge(oa export.Aggregator, d *metric.Descriptor) error {
144	o, _ := oa.(*Aggregator)
145	if o == nil {
146		return aggregator.NewInconsistentAggregatorError(c, oa)
147	}
148
149	c.sketch.Merge(o.sketch)
150	return nil
151}
152