1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14package ddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" 15 16import ( 17 "context" 18 "math" 19 "sync" 20 21 sdk "github.com/DataDog/sketches-go/ddsketch" 22 23 "go.opentelemetry.io/otel/metric" 24 "go.opentelemetry.io/otel/metric/number" 25 export "go.opentelemetry.io/otel/sdk/export/metric" 26 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 27 "go.opentelemetry.io/otel/sdk/metric/aggregator" 28) 29 30// Config is an alias for the underlying DDSketch config object. 31type Config = sdk.Config 32 33// Aggregator aggregates events into a distribution. 34type Aggregator struct { 35 lock sync.Mutex 36 cfg *Config 37 kind number.Kind 38 sketch *sdk.DDSketch 39} 40 41var _ export.Aggregator = &Aggregator{} 42var _ aggregation.MinMaxSumCount = &Aggregator{} 43var _ aggregation.Distribution = &Aggregator{} 44 45// New returns a new DDSketch aggregator. 46func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator { 47 if cfg == nil { 48 cfg = NewDefaultConfig() 49 } 50 aggs := make([]Aggregator, cnt) 51 for i := range aggs { 52 aggs[i] = Aggregator{ 53 cfg: cfg, 54 kind: desc.NumberKind(), 55 sketch: sdk.NewDDSketch(cfg), 56 } 57 } 58 return aggs 59} 60 61// Aggregation returns an interface for reading the state of this aggregator. 62func (c *Aggregator) Aggregation() aggregation.Aggregation { 63 return c 64} 65 66// Kind returns aggregation.SketchKind. 67func (c *Aggregator) Kind() aggregation.Kind { 68 return aggregation.SketchKind 69} 70 71// NewDefaultConfig returns a new, default DDSketch config. 72func NewDefaultConfig() *Config { 73 return sdk.NewDefaultConfig() 74} 75 76// Sum returns the sum of values in the checkpoint. 77func (c *Aggregator) Sum() (number.Number, error) { 78 return c.toNumber(c.sketch.Sum()), nil 79} 80 81// Count returns the number of values in the checkpoint. 82func (c *Aggregator) Count() (int64, error) { 83 return c.sketch.Count(), nil 84} 85 86// Max returns the maximum value in the checkpoint. 87func (c *Aggregator) Max() (number.Number, error) { 88 return c.Quantile(1) 89} 90 91// Min returns the minimum value in the checkpoint. 92func (c *Aggregator) Min() (number.Number, error) { 93 return c.Quantile(0) 94} 95 96// Quantile returns the estimated quantile of data in the checkpoint. 97// It is an error if `q` is less than 0 or greated than 1. 98func (c *Aggregator) Quantile(q float64) (number.Number, error) { 99 if c.sketch.Count() == 0 { 100 return 0, aggregation.ErrNoData 101 } 102 f := c.sketch.Quantile(q) 103 if math.IsNaN(f) { 104 return 0, aggregation.ErrInvalidQuantile 105 } 106 return c.toNumber(f), nil 107} 108 109func (c *Aggregator) toNumber(f float64) number.Number { 110 if c.kind == number.Float64Kind { 111 return number.NewFloat64Number(f) 112 } 113 return number.NewInt64Number(int64(f)) 114} 115 116// SynchronizedMove saves the current state into oa and resets the current state to 117// a new sketch, taking a lock to prevent concurrent Update() calls. 118func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { 119 o, _ := oa.(*Aggregator) 120 if o == nil { 121 return aggregator.NewInconsistentAggregatorError(c, oa) 122 } 123 replace := sdk.NewDDSketch(c.cfg) 124 125 c.lock.Lock() 126 o.sketch, c.sketch = c.sketch, replace 127 c.lock.Unlock() 128 129 return nil 130} 131 132// Update adds the recorded measurement to the current data set. 133// Update takes a lock to prevent concurrent Update() and SynchronizedMove() 134// calls. 135func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metric.Descriptor) error { 136 c.lock.Lock() 137 defer c.lock.Unlock() 138 c.sketch.Add(number.CoerceToFloat64(desc.NumberKind())) 139 return nil 140} 141 142// Merge combines two sketches into one. 143func (c *Aggregator) Merge(oa export.Aggregator, d *metric.Descriptor) error { 144 o, _ := oa.(*Aggregator) 145 if o == nil { 146 return aggregator.NewInconsistentAggregatorError(c, oa) 147 } 148 149 c.sketch.Merge(o.sketch) 150 return nil 151} 152