1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" 16 17import ( 18 "context" 19 "errors" 20 "math/rand" 21 "os" 22 "sort" 23 "testing" 24 "unsafe" 25 26 "github.com/stretchr/testify/require" 27 28 ottest "go.opentelemetry.io/otel/internal/internaltest" 29 "go.opentelemetry.io/otel/metric/metrictest" 30 "go.opentelemetry.io/otel/metric/number" 31 "go.opentelemetry.io/otel/metric/sdkapi" 32 export "go.opentelemetry.io/otel/sdk/export/metric" 33 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 34 "go.opentelemetry.io/otel/sdk/metric/aggregator" 35) 36 37const Magnitude = 1000 38 39type Profile struct { 40 NumberKind number.Kind 41 Random func(sign int) number.Number 42} 43 44type NoopAggregator struct{} 45type NoopAggregation struct{} 46 47var _ export.Aggregator = NoopAggregator{} 48var _ aggregation.Aggregation = NoopAggregation{} 49 50func newProfiles() []Profile { 51 rnd := rand.New(rand.NewSource(rand.Int63())) 52 return []Profile{ 53 { 54 NumberKind: number.Int64Kind, 55 Random: func(sign int) number.Number { 56 return number.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1))) 57 }, 58 }, 59 { 60 NumberKind: number.Float64Kind, 61 Random: func(sign int) number.Number { 62 return number.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude) 63 }, 64 }, 65 } 66} 67 68func NewAggregatorTest(mkind sdkapi.InstrumentKind, nkind number.Kind) *sdkapi.Descriptor { 69 desc := metrictest.NewDescriptor("test.name", mkind, nkind) 70 return &desc 71} 72 73func RunProfiles(t *testing.T, f func(*testing.T, Profile)) { 74 for _, profile := range newProfiles() { 75 t.Run(profile.NumberKind.String(), func(t *testing.T) { 76 f(t, profile) 77 }) 78 } 79} 80 81// TestMain ensures local struct alignment prior to running tests. 82func TestMain(m *testing.M) { 83 fields := []ottest.FieldOffset{ 84 { 85 Name: "Numbers.numbers", 86 Offset: unsafe.Offsetof(Numbers{}.numbers), 87 }, 88 } 89 if !ottest.Aligned8Byte(fields, os.Stderr) { 90 os.Exit(1) 91 } 92 93 os.Exit(m.Run()) 94} 95 96type Numbers struct { 97 // numbers has to be aligned for 64-bit atomic operations. 98 numbers []number.Number 99 kind number.Kind 100} 101 102func NewNumbers(kind number.Kind) Numbers { 103 return Numbers{ 104 kind: kind, 105 } 106} 107 108func (n *Numbers) Append(v number.Number) { 109 n.numbers = append(n.numbers, v) 110} 111 112func (n *Numbers) Sort() { 113 sort.Sort(n) 114} 115 116func (n *Numbers) Less(i, j int) bool { 117 return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0 118} 119 120func (n *Numbers) Len() int { 121 return len(n.numbers) 122} 123 124func (n *Numbers) Swap(i, j int) { 125 n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] 126} 127 128func (n *Numbers) Sum() number.Number { 129 var sum number.Number 130 for _, num := range n.numbers { 131 sum.AddNumber(n.kind, num) 132 } 133 return sum 134} 135 136func (n *Numbers) Count() uint64 { 137 return uint64(len(n.numbers)) 138} 139 140func (n *Numbers) Min() number.Number { 141 return n.numbers[0] 142} 143 144func (n *Numbers) Max() number.Number { 145 return n.numbers[len(n.numbers)-1] 146} 147 148func (n *Numbers) Points() []number.Number { 149 return n.numbers 150} 151 152// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator. 153func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) { 154 ctx := context.Background() 155 156 // Note: Aggregator tests are written assuming that the SDK 157 // has performed the RangeTest. Therefore we skip errors that 158 // would have been detected by the RangeTest. 159 err := aggregator.RangeTest(number, descriptor) 160 if err != nil { 161 return 162 } 163 164 if err := agg.Update(ctx, number, descriptor); err != nil { 165 t.Error("Unexpected Update failure", err) 166 } 167} 168 169func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) { 170 if err := aggInto.Merge(aggFrom, descriptor); err != nil { 171 t.Error("Unexpected Merge failure", err) 172 } 173} 174 175func (NoopAggregation) Kind() aggregation.Kind { 176 return aggregation.Kind("Noop") 177} 178 179func (NoopAggregator) Aggregation() aggregation.Aggregation { 180 return NoopAggregation{} 181} 182 183func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor) error { 184 return nil 185} 186 187func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error { 188 return nil 189} 190 191func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error { 192 return nil 193} 194 195func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) { 196 t.Run("reset on nil", func(t *testing.T) { 197 // Ensures that SynchronizedMove(nil, descriptor) discards and 198 // resets the aggregator. 199 RunProfiles(t, func(t *testing.T, profile Profile) { 200 descriptor := NewAggregatorTest( 201 mkind, 202 profile.NumberKind, 203 ) 204 agg := nf(descriptor) 205 206 for i := 0; i < 10; i++ { 207 x1 := profile.Random(+1) 208 CheckedUpdate(t, agg, x1, descriptor) 209 } 210 211 require.NoError(t, agg.SynchronizedMove(nil, descriptor)) 212 213 if count, ok := agg.(aggregation.Count); ok { 214 c, err := count.Count() 215 require.Equal(t, uint64(0), c) 216 require.NoError(t, err) 217 } 218 219 if sum, ok := agg.(aggregation.Sum); ok { 220 s, err := sum.Sum() 221 require.Equal(t, number.Number(0), s) 222 require.NoError(t, err) 223 } 224 225 if lv, ok := agg.(aggregation.LastValue); ok { 226 v, _, err := lv.LastValue() 227 require.Equal(t, number.Number(0), v) 228 require.Error(t, err) 229 require.True(t, errors.Is(err, aggregation.ErrNoData)) 230 } 231 }) 232 }) 233 234 t.Run("no reset on incorrect type", func(t *testing.T) { 235 // Ensures that SynchronizedMove(wrong_type, descriptor) does not 236 // reset the aggregator. 237 RunProfiles(t, func(t *testing.T, profile Profile) { 238 descriptor := NewAggregatorTest( 239 mkind, 240 profile.NumberKind, 241 ) 242 agg := nf(descriptor) 243 244 var input number.Number 245 const inval = 100 246 if profile.NumberKind == number.Int64Kind { 247 input = number.NewInt64Number(inval) 248 } else { 249 input = number.NewFloat64Number(inval) 250 } 251 252 CheckedUpdate(t, agg, input, descriptor) 253 254 err := agg.SynchronizedMove(NoopAggregator{}, descriptor) 255 require.Error(t, err) 256 require.True(t, errors.Is(err, aggregation.ErrInconsistentType)) 257 258 // Test that the aggregator was not reset 259 260 if count, ok := agg.(aggregation.Count); ok { 261 c, err := count.Count() 262 require.Equal(t, uint64(1), c) 263 require.NoError(t, err) 264 } 265 266 if sum, ok := agg.(aggregation.Sum); ok { 267 s, err := sum.Sum() 268 require.Equal(t, input, s) 269 require.NoError(t, err) 270 } 271 272 if lv, ok := agg.(aggregation.LastValue); ok { 273 v, _, err := lv.LastValue() 274 require.Equal(t, input, v) 275 require.NoError(t, err) 276 } 277 278 }) 279 }) 280 281} 282