1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
16
17import (
18	"context"
19	"errors"
20	"math/rand"
21	"os"
22	"sort"
23	"testing"
24	"unsafe"
25
26	"github.com/stretchr/testify/require"
27
28	ottest "go.opentelemetry.io/otel/internal/internaltest"
29	"go.opentelemetry.io/otel/metric/metrictest"
30	"go.opentelemetry.io/otel/metric/number"
31	"go.opentelemetry.io/otel/metric/sdkapi"
32	export "go.opentelemetry.io/otel/sdk/export/metric"
33	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
34	"go.opentelemetry.io/otel/sdk/metric/aggregator"
35)
36
37const Magnitude = 1000
38
39type Profile struct {
40	NumberKind number.Kind
41	Random     func(sign int) number.Number
42}
43
44type NoopAggregator struct{}
45type NoopAggregation struct{}
46
47var _ export.Aggregator = NoopAggregator{}
48var _ aggregation.Aggregation = NoopAggregation{}
49
50func newProfiles() []Profile {
51	rnd := rand.New(rand.NewSource(rand.Int63()))
52	return []Profile{
53		{
54			NumberKind: number.Int64Kind,
55			Random: func(sign int) number.Number {
56				return number.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1)))
57			},
58		},
59		{
60			NumberKind: number.Float64Kind,
61			Random: func(sign int) number.Number {
62				return number.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude)
63			},
64		},
65	}
66}
67
68func NewAggregatorTest(mkind sdkapi.InstrumentKind, nkind number.Kind) *sdkapi.Descriptor {
69	desc := metrictest.NewDescriptor("test.name", mkind, nkind)
70	return &desc
71}
72
73func RunProfiles(t *testing.T, f func(*testing.T, Profile)) {
74	for _, profile := range newProfiles() {
75		t.Run(profile.NumberKind.String(), func(t *testing.T) {
76			f(t, profile)
77		})
78	}
79}
80
81// TestMain ensures local struct alignment prior to running tests.
82func TestMain(m *testing.M) {
83	fields := []ottest.FieldOffset{
84		{
85			Name:   "Numbers.numbers",
86			Offset: unsafe.Offsetof(Numbers{}.numbers),
87		},
88	}
89	if !ottest.Aligned8Byte(fields, os.Stderr) {
90		os.Exit(1)
91	}
92
93	os.Exit(m.Run())
94}
95
96type Numbers struct {
97	// numbers has to be aligned for 64-bit atomic operations.
98	numbers []number.Number
99	kind    number.Kind
100}
101
102func NewNumbers(kind number.Kind) Numbers {
103	return Numbers{
104		kind: kind,
105	}
106}
107
108func (n *Numbers) Append(v number.Number) {
109	n.numbers = append(n.numbers, v)
110}
111
112func (n *Numbers) Sort() {
113	sort.Sort(n)
114}
115
116func (n *Numbers) Less(i, j int) bool {
117	return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0
118}
119
120func (n *Numbers) Len() int {
121	return len(n.numbers)
122}
123
124func (n *Numbers) Swap(i, j int) {
125	n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i]
126}
127
128func (n *Numbers) Sum() number.Number {
129	var sum number.Number
130	for _, num := range n.numbers {
131		sum.AddNumber(n.kind, num)
132	}
133	return sum
134}
135
136func (n *Numbers) Count() uint64 {
137	return uint64(len(n.numbers))
138}
139
140func (n *Numbers) Min() number.Number {
141	return n.numbers[0]
142}
143
144func (n *Numbers) Max() number.Number {
145	return n.numbers[len(n.numbers)-1]
146}
147
148func (n *Numbers) Points() []number.Number {
149	return n.numbers
150}
151
152// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator.
153func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
154	ctx := context.Background()
155
156	// Note: Aggregator tests are written assuming that the SDK
157	// has performed the RangeTest. Therefore we skip errors that
158	// would have been detected by the RangeTest.
159	err := aggregator.RangeTest(number, descriptor)
160	if err != nil {
161		return
162	}
163
164	if err := agg.Update(ctx, number, descriptor); err != nil {
165		t.Error("Unexpected Update failure", err)
166	}
167}
168
169func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) {
170	if err := aggInto.Merge(aggFrom, descriptor); err != nil {
171		t.Error("Unexpected Merge failure", err)
172	}
173}
174
175func (NoopAggregation) Kind() aggregation.Kind {
176	return aggregation.Kind("Noop")
177}
178
179func (NoopAggregator) Aggregation() aggregation.Aggregation {
180	return NoopAggregation{}
181}
182
183func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor) error {
184	return nil
185}
186
187func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error {
188	return nil
189}
190
191func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error {
192	return nil
193}
194
195func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) {
196	t.Run("reset on nil", func(t *testing.T) {
197		// Ensures that SynchronizedMove(nil, descriptor) discards and
198		// resets the aggregator.
199		RunProfiles(t, func(t *testing.T, profile Profile) {
200			descriptor := NewAggregatorTest(
201				mkind,
202				profile.NumberKind,
203			)
204			agg := nf(descriptor)
205
206			for i := 0; i < 10; i++ {
207				x1 := profile.Random(+1)
208				CheckedUpdate(t, agg, x1, descriptor)
209			}
210
211			require.NoError(t, agg.SynchronizedMove(nil, descriptor))
212
213			if count, ok := agg.(aggregation.Count); ok {
214				c, err := count.Count()
215				require.Equal(t, uint64(0), c)
216				require.NoError(t, err)
217			}
218
219			if sum, ok := agg.(aggregation.Sum); ok {
220				s, err := sum.Sum()
221				require.Equal(t, number.Number(0), s)
222				require.NoError(t, err)
223			}
224
225			if lv, ok := agg.(aggregation.LastValue); ok {
226				v, _, err := lv.LastValue()
227				require.Equal(t, number.Number(0), v)
228				require.Error(t, err)
229				require.True(t, errors.Is(err, aggregation.ErrNoData))
230			}
231		})
232	})
233
234	t.Run("no reset on incorrect type", func(t *testing.T) {
235		// Ensures that SynchronizedMove(wrong_type, descriptor) does not
236		// reset the aggregator.
237		RunProfiles(t, func(t *testing.T, profile Profile) {
238			descriptor := NewAggregatorTest(
239				mkind,
240				profile.NumberKind,
241			)
242			agg := nf(descriptor)
243
244			var input number.Number
245			const inval = 100
246			if profile.NumberKind == number.Int64Kind {
247				input = number.NewInt64Number(inval)
248			} else {
249				input = number.NewFloat64Number(inval)
250			}
251
252			CheckedUpdate(t, agg, input, descriptor)
253
254			err := agg.SynchronizedMove(NoopAggregator{}, descriptor)
255			require.Error(t, err)
256			require.True(t, errors.Is(err, aggregation.ErrInconsistentType))
257
258			// Test that the aggregator was not reset
259
260			if count, ok := agg.(aggregation.Count); ok {
261				c, err := count.Count()
262				require.Equal(t, uint64(1), c)
263				require.NoError(t, err)
264			}
265
266			if sum, ok := agg.(aggregation.Sum); ok {
267				s, err := sum.Sum()
268				require.Equal(t, input, s)
269				require.NoError(t, err)
270			}
271
272			if lv, ok := agg.(aggregation.LastValue); ok {
273				v, _, err := lv.LastValue()
274				require.Equal(t, input, v)
275				require.NoError(t, err)
276			}
277
278		})
279	})
280
281}
282