1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
16
17import (
18	"context"
19	"errors"
20	"math/rand"
21	"os"
22	"sort"
23	"testing"
24	"unsafe"
25
26	"github.com/stretchr/testify/require"
27
28	ottest "go.opentelemetry.io/otel/internal/internaltest"
29	"go.opentelemetry.io/otel/metric"
30	"go.opentelemetry.io/otel/metric/number"
31	export "go.opentelemetry.io/otel/sdk/export/metric"
32	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
33	"go.opentelemetry.io/otel/sdk/metric/aggregator"
34)
35
36const Magnitude = 1000
37
38type Profile struct {
39	NumberKind number.Kind
40	Random     func(sign int) number.Number
41}
42
43type NoopAggregator struct{}
44type NoopAggregation struct{}
45
46var _ export.Aggregator = NoopAggregator{}
47var _ aggregation.Aggregation = NoopAggregation{}
48
49func newProfiles() []Profile {
50	rnd := rand.New(rand.NewSource(rand.Int63()))
51	return []Profile{
52		{
53			NumberKind: number.Int64Kind,
54			Random: func(sign int) number.Number {
55				return number.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1)))
56			},
57		},
58		{
59			NumberKind: number.Float64Kind,
60			Random: func(sign int) number.Number {
61				return number.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude)
62			},
63		},
64	}
65}
66
67func NewAggregatorTest(mkind metric.InstrumentKind, nkind number.Kind) *metric.Descriptor {
68	desc := metric.NewDescriptor("test.name", mkind, nkind)
69	return &desc
70}
71
72func RunProfiles(t *testing.T, f func(*testing.T, Profile)) {
73	for _, profile := range newProfiles() {
74		t.Run(profile.NumberKind.String(), func(t *testing.T) {
75			f(t, profile)
76		})
77	}
78}
79
80// Ensure local struct alignment prior to running tests.
81func TestMain(m *testing.M) {
82	fields := []ottest.FieldOffset{
83		{
84			Name:   "Numbers.numbers",
85			Offset: unsafe.Offsetof(Numbers{}.numbers),
86		},
87	}
88	if !ottest.Aligned8Byte(fields, os.Stderr) {
89		os.Exit(1)
90	}
91
92	os.Exit(m.Run())
93}
94
95type Numbers struct {
96	// numbers has to be aligned for 64-bit atomic operations.
97	numbers []number.Number
98	kind    number.Kind
99}
100
101func NewNumbers(kind number.Kind) Numbers {
102	return Numbers{
103		kind: kind,
104	}
105}
106
107func (n *Numbers) Append(v number.Number) {
108	n.numbers = append(n.numbers, v)
109}
110
111func (n *Numbers) Sort() {
112	sort.Sort(n)
113}
114
115func (n *Numbers) Less(i, j int) bool {
116	return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0
117}
118
119func (n *Numbers) Len() int {
120	return len(n.numbers)
121}
122
123func (n *Numbers) Swap(i, j int) {
124	n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i]
125}
126
127func (n *Numbers) Sum() number.Number {
128	var sum number.Number
129	for _, num := range n.numbers {
130		sum.AddNumber(n.kind, num)
131	}
132	return sum
133}
134
135func (n *Numbers) Count() uint64 {
136	return uint64(len(n.numbers))
137}
138
139func (n *Numbers) Min() number.Number {
140	return n.numbers[0]
141}
142
143func (n *Numbers) Max() number.Number {
144	return n.numbers[len(n.numbers)-1]
145}
146
147func (n *Numbers) Points() []number.Number {
148	return n.numbers
149}
150
151// Performs the same range test the SDK does on behalf of the aggregator.
152func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *metric.Descriptor) {
153	ctx := context.Background()
154
155	// Note: Aggregator tests are written assuming that the SDK
156	// has performed the RangeTest. Therefore we skip errors that
157	// would have been detected by the RangeTest.
158	err := aggregator.RangeTest(number, descriptor)
159	if err != nil {
160		return
161	}
162
163	if err := agg.Update(ctx, number, descriptor); err != nil {
164		t.Error("Unexpected Update failure", err)
165	}
166}
167
168func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *metric.Descriptor) {
169	if err := aggInto.Merge(aggFrom, descriptor); err != nil {
170		t.Error("Unexpected Merge failure", err)
171	}
172}
173
174func (NoopAggregation) Kind() aggregation.Kind {
175	return aggregation.Kind("Noop")
176}
177
178func (NoopAggregator) Aggregation() aggregation.Aggregation {
179	return NoopAggregation{}
180}
181
182func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) error {
183	return nil
184}
185
186func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error {
187	return nil
188}
189
190func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
191	return nil
192}
193
194func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) {
195	t.Run("reset on nil", func(t *testing.T) {
196		// Ensures that SynchronizedMove(nil, descriptor) discards and
197		// resets the aggregator.
198		RunProfiles(t, func(t *testing.T, profile Profile) {
199			descriptor := NewAggregatorTest(
200				mkind,
201				profile.NumberKind,
202			)
203			agg := nf(descriptor)
204
205			for i := 0; i < 10; i++ {
206				x1 := profile.Random(+1)
207				CheckedUpdate(t, agg, x1, descriptor)
208			}
209
210			require.NoError(t, agg.SynchronizedMove(nil, descriptor))
211
212			if count, ok := agg.(aggregation.Count); ok {
213				c, err := count.Count()
214				require.Equal(t, uint64(0), c)
215				require.NoError(t, err)
216			}
217
218			if sum, ok := agg.(aggregation.Sum); ok {
219				s, err := sum.Sum()
220				require.Equal(t, number.Number(0), s)
221				require.NoError(t, err)
222			}
223
224			if lv, ok := agg.(aggregation.LastValue); ok {
225				v, _, err := lv.LastValue()
226				require.Equal(t, number.Number(0), v)
227				require.Error(t, err)
228				require.True(t, errors.Is(err, aggregation.ErrNoData))
229			}
230		})
231	})
232
233	t.Run("no reset on incorrect type", func(t *testing.T) {
234		// Ensures that SynchronizedMove(wrong_type, descriptor) does not
235		// reset the aggregator.
236		RunProfiles(t, func(t *testing.T, profile Profile) {
237			descriptor := NewAggregatorTest(
238				mkind,
239				profile.NumberKind,
240			)
241			agg := nf(descriptor)
242
243			var input number.Number
244			const inval = 100
245			if profile.NumberKind == number.Int64Kind {
246				input = number.NewInt64Number(inval)
247			} else {
248				input = number.NewFloat64Number(inval)
249			}
250
251			CheckedUpdate(t, agg, input, descriptor)
252
253			err := agg.SynchronizedMove(NoopAggregator{}, descriptor)
254			require.Error(t, err)
255			require.True(t, errors.Is(err, aggregation.ErrInconsistentType))
256
257			// Test that the aggregator was not reset
258
259			if count, ok := agg.(aggregation.Count); ok {
260				c, err := count.Count()
261				require.Equal(t, uint64(1), c)
262				require.NoError(t, err)
263			}
264
265			if sum, ok := agg.(aggregation.Sum); ok {
266				s, err := sum.Sum()
267				require.Equal(t, input, s)
268				require.NoError(t, err)
269			}
270
271			if lv, ok := agg.(aggregation.LastValue); ok {
272				v, _, err := lv.LastValue()
273				require.Equal(t, input, v)
274				require.NoError(t, err)
275			}
276
277		})
278	})
279
280}
281