1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" 16 17import ( 18 "context" 19 "errors" 20 "math/rand" 21 "os" 22 "sort" 23 "testing" 24 "unsafe" 25 26 "github.com/stretchr/testify/require" 27 28 ottest "go.opentelemetry.io/otel/internal/internaltest" 29 "go.opentelemetry.io/otel/metric" 30 "go.opentelemetry.io/otel/metric/number" 31 export "go.opentelemetry.io/otel/sdk/export/metric" 32 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 33 "go.opentelemetry.io/otel/sdk/metric/aggregator" 34) 35 36const Magnitude = 1000 37 38type Profile struct { 39 NumberKind number.Kind 40 Random func(sign int) number.Number 41} 42 43type NoopAggregator struct{} 44type NoopAggregation struct{} 45 46var _ export.Aggregator = NoopAggregator{} 47var _ aggregation.Aggregation = NoopAggregation{} 48 49func newProfiles() []Profile { 50 rnd := rand.New(rand.NewSource(rand.Int63())) 51 return []Profile{ 52 { 53 NumberKind: number.Int64Kind, 54 Random: func(sign int) number.Number { 55 return number.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1))) 56 }, 57 }, 58 { 59 NumberKind: number.Float64Kind, 60 Random: func(sign int) number.Number { 61 return number.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude) 62 }, 63 }, 64 } 65} 66 67func NewAggregatorTest(mkind metric.InstrumentKind, nkind number.Kind) *metric.Descriptor { 68 desc := metric.NewDescriptor("test.name", mkind, nkind) 69 return &desc 70} 71 72func RunProfiles(t *testing.T, f func(*testing.T, Profile)) { 73 for _, profile := range newProfiles() { 74 t.Run(profile.NumberKind.String(), func(t *testing.T) { 75 f(t, profile) 76 }) 77 } 78} 79 80// Ensure local struct alignment prior to running tests. 81func TestMain(m *testing.M) { 82 fields := []ottest.FieldOffset{ 83 { 84 Name: "Numbers.numbers", 85 Offset: unsafe.Offsetof(Numbers{}.numbers), 86 }, 87 } 88 if !ottest.Aligned8Byte(fields, os.Stderr) { 89 os.Exit(1) 90 } 91 92 os.Exit(m.Run()) 93} 94 95type Numbers struct { 96 // numbers has to be aligned for 64-bit atomic operations. 97 numbers []number.Number 98 kind number.Kind 99} 100 101func NewNumbers(kind number.Kind) Numbers { 102 return Numbers{ 103 kind: kind, 104 } 105} 106 107func (n *Numbers) Append(v number.Number) { 108 n.numbers = append(n.numbers, v) 109} 110 111func (n *Numbers) Sort() { 112 sort.Sort(n) 113} 114 115func (n *Numbers) Less(i, j int) bool { 116 return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0 117} 118 119func (n *Numbers) Len() int { 120 return len(n.numbers) 121} 122 123func (n *Numbers) Swap(i, j int) { 124 n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] 125} 126 127func (n *Numbers) Sum() number.Number { 128 var sum number.Number 129 for _, num := range n.numbers { 130 sum.AddNumber(n.kind, num) 131 } 132 return sum 133} 134 135func (n *Numbers) Count() uint64 { 136 return uint64(len(n.numbers)) 137} 138 139func (n *Numbers) Min() number.Number { 140 return n.numbers[0] 141} 142 143func (n *Numbers) Max() number.Number { 144 return n.numbers[len(n.numbers)-1] 145} 146 147func (n *Numbers) Points() []number.Number { 148 return n.numbers 149} 150 151// Performs the same range test the SDK does on behalf of the aggregator. 152func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *metric.Descriptor) { 153 ctx := context.Background() 154 155 // Note: Aggregator tests are written assuming that the SDK 156 // has performed the RangeTest. Therefore we skip errors that 157 // would have been detected by the RangeTest. 158 err := aggregator.RangeTest(number, descriptor) 159 if err != nil { 160 return 161 } 162 163 if err := agg.Update(ctx, number, descriptor); err != nil { 164 t.Error("Unexpected Update failure", err) 165 } 166} 167 168func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *metric.Descriptor) { 169 if err := aggInto.Merge(aggFrom, descriptor); err != nil { 170 t.Error("Unexpected Merge failure", err) 171 } 172} 173 174func (NoopAggregation) Kind() aggregation.Kind { 175 return aggregation.Kind("Noop") 176} 177 178func (NoopAggregator) Aggregation() aggregation.Aggregation { 179 return NoopAggregation{} 180} 181 182func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) error { 183 return nil 184} 185 186func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error { 187 return nil 188} 189 190func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error { 191 return nil 192} 193 194func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) { 195 t.Run("reset on nil", func(t *testing.T) { 196 // Ensures that SynchronizedMove(nil, descriptor) discards and 197 // resets the aggregator. 198 RunProfiles(t, func(t *testing.T, profile Profile) { 199 descriptor := NewAggregatorTest( 200 mkind, 201 profile.NumberKind, 202 ) 203 agg := nf(descriptor) 204 205 for i := 0; i < 10; i++ { 206 x1 := profile.Random(+1) 207 CheckedUpdate(t, agg, x1, descriptor) 208 } 209 210 require.NoError(t, agg.SynchronizedMove(nil, descriptor)) 211 212 if count, ok := agg.(aggregation.Count); ok { 213 c, err := count.Count() 214 require.Equal(t, uint64(0), c) 215 require.NoError(t, err) 216 } 217 218 if sum, ok := agg.(aggregation.Sum); ok { 219 s, err := sum.Sum() 220 require.Equal(t, number.Number(0), s) 221 require.NoError(t, err) 222 } 223 224 if lv, ok := agg.(aggregation.LastValue); ok { 225 v, _, err := lv.LastValue() 226 require.Equal(t, number.Number(0), v) 227 require.Error(t, err) 228 require.True(t, errors.Is(err, aggregation.ErrNoData)) 229 } 230 }) 231 }) 232 233 t.Run("no reset on incorrect type", func(t *testing.T) { 234 // Ensures that SynchronizedMove(wrong_type, descriptor) does not 235 // reset the aggregator. 236 RunProfiles(t, func(t *testing.T, profile Profile) { 237 descriptor := NewAggregatorTest( 238 mkind, 239 profile.NumberKind, 240 ) 241 agg := nf(descriptor) 242 243 var input number.Number 244 const inval = 100 245 if profile.NumberKind == number.Int64Kind { 246 input = number.NewInt64Number(inval) 247 } else { 248 input = number.NewFloat64Number(inval) 249 } 250 251 CheckedUpdate(t, agg, input, descriptor) 252 253 err := agg.SynchronizedMove(NoopAggregator{}, descriptor) 254 require.Error(t, err) 255 require.True(t, errors.Is(err, aggregation.ErrInconsistentType)) 256 257 // Test that the aggregator was not reset 258 259 if count, ok := agg.(aggregation.Count); ok { 260 c, err := count.Count() 261 require.Equal(t, uint64(1), c) 262 require.NoError(t, err) 263 } 264 265 if sum, ok := agg.(aggregation.Sum); ok { 266 s, err := sum.Sum() 267 require.Equal(t, input, s) 268 require.NoError(t, err) 269 } 270 271 if lv, ok := agg.(aggregation.LastValue); ok { 272 v, _, err := lv.LastValue() 273 require.Equal(t, input, v) 274 require.NoError(t, err) 275 } 276 277 }) 278 }) 279 280} 281