1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package metric_test
16
17import (
18	"context"
19	"fmt"
20	"math"
21	"sync"
22	"testing"
23
24	"github.com/stretchr/testify/require"
25
26	"go.opentelemetry.io/otel"
27	"go.opentelemetry.io/otel/label"
28	"go.opentelemetry.io/otel/metric"
29	"go.opentelemetry.io/otel/metric/number"
30	export "go.opentelemetry.io/otel/sdk/export/metric"
31	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
32	metricsdk "go.opentelemetry.io/otel/sdk/metric"
33	"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
34	"go.opentelemetry.io/otel/sdk/resource"
35)
36
37var Must = metric.Must
38var testResource = resource.NewWithAttributes(label.String("R", "V"))
39
40type handler struct {
41	sync.Mutex
42	err error
43}
44
45func (h *handler) Handle(err error) {
46	h.Lock()
47	h.err = err
48	h.Unlock()
49}
50
51func (h *handler) Reset() {
52	h.Lock()
53	h.err = nil
54	h.Unlock()
55}
56
57func (h *handler) Flush() error {
58	h.Lock()
59	err := h.err
60	h.err = nil
61	h.Unlock()
62	return err
63}
64
65var testHandler *handler
66
67func init() {
68	testHandler = new(handler)
69	otel.SetErrorHandler(testHandler)
70}
71
72// correctnessProcessor could be replaced with processortest.Processor
73// with a non-default aggregator selector.  TODO(#872) use the
74// processortest code here.
75type correctnessProcessor struct {
76	t *testing.T
77	*testSelector
78
79	accumulations []export.Accumulation
80}
81
82type testSelector struct {
83	selector    export.AggregatorSelector
84	newAggCount int
85}
86
87func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
88	ts.newAggCount += len(aggPtrs)
89	processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
90}
91
92func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) {
93	testHandler.Reset()
94	processor := &correctnessProcessor{
95		t:            t,
96		testSelector: &testSelector{selector: processortest.AggregatorSelector()},
97	}
98	accum := metricsdk.NewAccumulator(
99		processor,
100		testResource,
101	)
102	meter := metric.WrapMeterImpl(accum, "test")
103	return meter, accum, processor
104}
105
106func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error {
107	ci.accumulations = append(ci.accumulations, accumulation)
108	return nil
109}
110
111func TestInputRangeCounter(t *testing.T) {
112	ctx := context.Background()
113	meter, sdk, processor := newSDK(t)
114
115	counter := Must(meter).NewInt64Counter("name.sum")
116
117	counter.Add(ctx, -1)
118	require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
119
120	checkpointed := sdk.Collect(ctx)
121	require.Equal(t, 0, checkpointed)
122
123	processor.accumulations = nil
124	counter.Add(ctx, 1)
125	checkpointed = sdk.Collect(ctx)
126	sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum()
127	require.Equal(t, int64(1), sum.AsInt64())
128	require.Equal(t, 1, checkpointed)
129	require.Nil(t, err)
130	require.Nil(t, testHandler.Flush())
131}
132
133func TestInputRangeUpDownCounter(t *testing.T) {
134	ctx := context.Background()
135	meter, sdk, processor := newSDK(t)
136
137	counter := Must(meter).NewInt64UpDownCounter("name.sum")
138
139	counter.Add(ctx, -1)
140	counter.Add(ctx, -1)
141	counter.Add(ctx, 2)
142	counter.Add(ctx, 1)
143
144	checkpointed := sdk.Collect(ctx)
145	sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum()
146	require.Equal(t, int64(1), sum.AsInt64())
147	require.Equal(t, 1, checkpointed)
148	require.Nil(t, err)
149	require.Nil(t, testHandler.Flush())
150}
151
152func TestInputRangeValueRecorder(t *testing.T) {
153	ctx := context.Background()
154	meter, sdk, processor := newSDK(t)
155
156	valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
157
158	valuerecorder.Record(ctx, math.NaN())
159	require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush())
160
161	checkpointed := sdk.Collect(ctx)
162	require.Equal(t, 0, checkpointed)
163
164	valuerecorder.Record(ctx, 1)
165	valuerecorder.Record(ctx, 2)
166
167	processor.accumulations = nil
168	checkpointed = sdk.Collect(ctx)
169
170	count, err := processor.accumulations[0].Aggregator().(aggregation.Distribution).Count()
171	require.Equal(t, int64(2), count)
172	require.Equal(t, 1, checkpointed)
173	require.Nil(t, testHandler.Flush())
174	require.Nil(t, err)
175}
176
177func TestDisabledInstrument(t *testing.T) {
178	ctx := context.Background()
179	meter, sdk, processor := newSDK(t)
180
181	valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled")
182
183	valuerecorder.Record(ctx, -1)
184	checkpointed := sdk.Collect(ctx)
185
186	require.Equal(t, 0, checkpointed)
187	require.Equal(t, 0, len(processor.accumulations))
188}
189
190func TestRecordNaN(t *testing.T) {
191	ctx := context.Background()
192	meter, _, _ := newSDK(t)
193
194	c := Must(meter).NewFloat64Counter("name.sum")
195
196	require.Nil(t, testHandler.Flush())
197	c.Add(ctx, math.NaN())
198	require.Error(t, testHandler.Flush())
199}
200
201func TestSDKLabelsDeduplication(t *testing.T) {
202	ctx := context.Background()
203	meter, sdk, processor := newSDK(t)
204
205	counter := Must(meter).NewInt64Counter("name.sum")
206
207	const (
208		maxKeys = 21
209		keySets = 2
210		repeats = 3
211	)
212	var keysA []label.Key
213	var keysB []label.Key
214
215	for i := 0; i < maxKeys; i++ {
216		keysA = append(keysA, label.Key(fmt.Sprintf("A%03d", i)))
217		keysB = append(keysB, label.Key(fmt.Sprintf("B%03d", i)))
218	}
219
220	var allExpect [][]label.KeyValue
221	for numKeys := 0; numKeys < maxKeys; numKeys++ {
222
223		var kvsA []label.KeyValue
224		var kvsB []label.KeyValue
225		for r := 0; r < repeats; r++ {
226			for i := 0; i < numKeys; i++ {
227				kvsA = append(kvsA, keysA[i].Int(r))
228				kvsB = append(kvsB, keysB[i].Int(r))
229			}
230		}
231
232		var expectA []label.KeyValue
233		var expectB []label.KeyValue
234		for i := 0; i < numKeys; i++ {
235			expectA = append(expectA, keysA[i].Int(repeats-1))
236			expectB = append(expectB, keysB[i].Int(repeats-1))
237		}
238
239		counter.Add(ctx, 1, kvsA...)
240		counter.Add(ctx, 1, kvsA...)
241		allExpect = append(allExpect, expectA)
242
243		if numKeys != 0 {
244			// In this case A and B sets are the same.
245			counter.Add(ctx, 1, kvsB...)
246			counter.Add(ctx, 1, kvsB...)
247			allExpect = append(allExpect, expectB)
248		}
249
250	}
251
252	sdk.Collect(ctx)
253
254	var actual [][]label.KeyValue
255	for _, rec := range processor.accumulations {
256		sum, _ := rec.Aggregator().(aggregation.Sum).Sum()
257		require.Equal(t, sum, number.NewInt64Number(2))
258
259		kvs := rec.Labels().ToSlice()
260		actual = append(actual, kvs)
261	}
262
263	require.ElementsMatch(t, allExpect, actual)
264}
265
266func newSetIter(kvs ...label.KeyValue) label.Iterator {
267	labels := label.NewSet(kvs...)
268	return labels.Iter()
269}
270
271func TestDefaultLabelEncoder(t *testing.T) {
272	encoder := label.DefaultEncoder()
273
274	encoded := encoder.Encode(newSetIter(label.String("A", "B"), label.String("C", "D")))
275	require.Equal(t, `A=B,C=D`, encoded)
276
277	encoded = encoder.Encode(newSetIter(label.String("A", "B,c=d"), label.String(`C\`, "D")))
278	require.Equal(t, `A=B\,c\=d,C\\=D`, encoded)
279
280	encoded = encoder.Encode(newSetIter(label.String(`\`, `=`), label.String(`,`, `\`)))
281	require.Equal(t, `\,=\\,\\=\=`, encoded)
282
283	// Note: the label encoder does not sort or de-dup values,
284	// that is done in Labels(...).
285	encoded = encoder.Encode(newSetIter(
286		label.Int("I", 1),
287		label.Uint("U", 1),
288		label.Int32("I32", 1),
289		label.Uint32("U32", 1),
290		label.Int64("I64", 1),
291		label.Uint64("U64", 1),
292		label.Float64("F64", 1),
293		label.Float64("F64", 1),
294		label.String("S", "1"),
295		label.Bool("B", true),
296	))
297	require.Equal(t, "B=true,F64=1,I=1,I32=1,I64=1,S=1,U=1,U32=1,U64=1", encoded)
298}
299
300func TestObserverCollection(t *testing.T) {
301	ctx := context.Background()
302	meter, sdk, processor := newSDK(t)
303
304	_ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) {
305		result.Observe(1, label.String("A", "B"))
306		// last value wins
307		result.Observe(-1, label.String("A", "B"))
308		result.Observe(-1, label.String("C", "D"))
309	})
310	_ = Must(meter).NewInt64ValueObserver("int.valueobserver.lastvalue", func(_ context.Context, result metric.Int64ObserverResult) {
311		result.Observe(-1, label.String("A", "B"))
312		result.Observe(1)
313		// last value wins
314		result.Observe(1, label.String("A", "B"))
315		result.Observe(1)
316	})
317
318	_ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
319		result.Observe(1, label.String("A", "B"))
320		result.Observe(2, label.String("A", "B"))
321		result.Observe(1, label.String("C", "D"))
322	})
323	_ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
324		result.Observe(2, label.String("A", "B"))
325		result.Observe(1)
326		// last value wins
327		result.Observe(1, label.String("A", "B"))
328		result.Observe(1)
329	})
330
331	_ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
332		result.Observe(1, label.String("A", "B"))
333		result.Observe(-2, label.String("A", "B"))
334		result.Observe(1, label.String("C", "D"))
335	})
336	_ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
337		result.Observe(2, label.String("A", "B"))
338		result.Observe(1)
339		// last value wins
340		result.Observe(1, label.String("A", "B"))
341		result.Observe(-1)
342	})
343
344	_ = Must(meter).NewInt64ValueObserver("empty.valueobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
345	})
346
347	collected := sdk.Collect(ctx)
348
349	require.Equal(t, collected, len(processor.accumulations))
350
351	out := processortest.NewOutput(label.DefaultEncoder())
352	for _, rec := range processor.accumulations {
353		require.NoError(t, out.AddAccumulation(rec))
354	}
355	require.EqualValues(t, map[string]float64{
356		"float.valueobserver.lastvalue/A=B/R=V": -1,
357		"float.valueobserver.lastvalue/C=D/R=V": -1,
358		"int.valueobserver.lastvalue//R=V":      1,
359		"int.valueobserver.lastvalue/A=B/R=V":   1,
360
361		"float.sumobserver.sum/A=B/R=V": 2,
362		"float.sumobserver.sum/C=D/R=V": 1,
363		"int.sumobserver.sum//R=V":      1,
364		"int.sumobserver.sum/A=B/R=V":   1,
365
366		"float.updownsumobserver.sum/A=B/R=V": -2,
367		"float.updownsumobserver.sum/C=D/R=V": 1,
368		"int.updownsumobserver.sum//R=V":      -1,
369		"int.updownsumobserver.sum/A=B/R=V":   1,
370	}, out.Map())
371}
372
373func TestSumObserverInputRange(t *testing.T) {
374	ctx := context.Background()
375	meter, sdk, processor := newSDK(t)
376
377	// TODO: these tests are testing for negative values, not for _descending values_. Fix.
378	_ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
379		result.Observe(-2, label.String("A", "B"))
380		require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
381		result.Observe(-1, label.String("C", "D"))
382		require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
383	})
384	_ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
385		result.Observe(-1, label.String("A", "B"))
386		require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
387		result.Observe(-1)
388		require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
389	})
390
391	collected := sdk.Collect(ctx)
392
393	require.Equal(t, 0, collected)
394	require.Equal(t, 0, len(processor.accumulations))
395
396	// check that the error condition was reset
397	require.NoError(t, testHandler.Flush())
398}
399
400func TestObserverBatch(t *testing.T) {
401	ctx := context.Background()
402	meter, sdk, processor := newSDK(t)
403
404	var floatValueObs metric.Float64ValueObserver
405	var intValueObs metric.Int64ValueObserver
406	var floatSumObs metric.Float64SumObserver
407	var intSumObs metric.Int64SumObserver
408	var floatUpDownSumObs metric.Float64UpDownSumObserver
409	var intUpDownSumObs metric.Int64UpDownSumObserver
410
411	var batch = Must(meter).NewBatchObserver(
412		func(_ context.Context, result metric.BatchObserverResult) {
413			result.Observe(
414				[]label.KeyValue{
415					label.String("A", "B"),
416				},
417				floatValueObs.Observation(1),
418				floatValueObs.Observation(-1),
419				intValueObs.Observation(-1),
420				intValueObs.Observation(1),
421				floatSumObs.Observation(1000),
422				intSumObs.Observation(100),
423				floatUpDownSumObs.Observation(-1000),
424				intUpDownSumObs.Observation(-100),
425			)
426			result.Observe(
427				[]label.KeyValue{
428					label.String("C", "D"),
429				},
430				floatValueObs.Observation(-1),
431				floatSumObs.Observation(-1),
432				floatUpDownSumObs.Observation(-1),
433			)
434			result.Observe(
435				nil,
436				intValueObs.Observation(1),
437				intValueObs.Observation(1),
438				intSumObs.Observation(10),
439				floatSumObs.Observation(1.1),
440				intUpDownSumObs.Observation(10),
441			)
442		})
443	floatValueObs = batch.NewFloat64ValueObserver("float.valueobserver.lastvalue")
444	intValueObs = batch.NewInt64ValueObserver("int.valueobserver.lastvalue")
445	floatSumObs = batch.NewFloat64SumObserver("float.sumobserver.sum")
446	intSumObs = batch.NewInt64SumObserver("int.sumobserver.sum")
447	floatUpDownSumObs = batch.NewFloat64UpDownSumObserver("float.updownsumobserver.sum")
448	intUpDownSumObs = batch.NewInt64UpDownSumObserver("int.updownsumobserver.sum")
449
450	collected := sdk.Collect(ctx)
451
452	require.Equal(t, collected, len(processor.accumulations))
453
454	out := processortest.NewOutput(label.DefaultEncoder())
455	for _, rec := range processor.accumulations {
456		require.NoError(t, out.AddAccumulation(rec))
457	}
458	require.EqualValues(t, map[string]float64{
459		"float.sumobserver.sum//R=V":    1.1,
460		"float.sumobserver.sum/A=B/R=V": 1000,
461		"int.sumobserver.sum//R=V":      10,
462		"int.sumobserver.sum/A=B/R=V":   100,
463
464		"int.updownsumobserver.sum/A=B/R=V":   -100,
465		"float.updownsumobserver.sum/A=B/R=V": -1000,
466		"int.updownsumobserver.sum//R=V":      10,
467		"float.updownsumobserver.sum/C=D/R=V": -1,
468
469		"float.valueobserver.lastvalue/A=B/R=V": -1,
470		"float.valueobserver.lastvalue/C=D/R=V": -1,
471		"int.valueobserver.lastvalue//R=V":      1,
472		"int.valueobserver.lastvalue/A=B/R=V":   1,
473	}, out.Map())
474}
475
476func TestRecordBatch(t *testing.T) {
477	ctx := context.Background()
478	meter, sdk, processor := newSDK(t)
479
480	counter1 := Must(meter).NewInt64Counter("int64.sum")
481	counter2 := Must(meter).NewFloat64Counter("float64.sum")
482	valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.exact")
483	valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.exact")
484
485	sdk.RecordBatch(
486		ctx,
487		[]label.KeyValue{
488			label.String("A", "B"),
489			label.String("C", "D"),
490		},
491		counter1.Measurement(1),
492		counter2.Measurement(2),
493		valuerecorder1.Measurement(3),
494		valuerecorder2.Measurement(4),
495	)
496
497	sdk.Collect(ctx)
498
499	out := processortest.NewOutput(label.DefaultEncoder())
500	for _, rec := range processor.accumulations {
501		require.NoError(t, out.AddAccumulation(rec))
502	}
503	require.EqualValues(t, map[string]float64{
504		"int64.sum/A=B,C=D/R=V":     1,
505		"float64.sum/A=B,C=D/R=V":   2,
506		"int64.exact/A=B,C=D/R=V":   3,
507		"float64.exact/A=B,C=D/R=V": 4,
508	}, out.Map())
509}
510
511// TestRecordPersistence ensures that a direct-called instrument that
512// is repeatedly used each interval results in a persistent record, so
513// that its encoded labels will be cached across collection intervals.
514func TestRecordPersistence(t *testing.T) {
515	ctx := context.Background()
516	meter, sdk, processor := newSDK(t)
517
518	c := Must(meter).NewFloat64Counter("name.sum")
519	b := c.Bind(label.String("bound", "true"))
520	uk := label.String("bound", "false")
521
522	for i := 0; i < 100; i++ {
523		c.Add(ctx, 1, uk)
524		b.Add(ctx, 1)
525		sdk.Collect(ctx)
526	}
527
528	require.Equal(t, 4, processor.newAggCount)
529}
530
531func TestIncorrectInstruments(t *testing.T) {
532	// The Batch observe/record APIs are susceptible to
533	// uninitialized instruments.
534	var counter metric.Int64Counter
535	var observer metric.Int64ValueObserver
536
537	ctx := context.Background()
538	meter, sdk, _ := newSDK(t)
539
540	// Now try with uninitialized instruments.
541	meter.RecordBatch(ctx, nil, counter.Measurement(1))
542	meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
543		result.Observe(nil, observer.Observation(1))
544	})
545
546	collected := sdk.Collect(ctx)
547	require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
548	require.Equal(t, 0, collected)
549
550	// Now try with instruments from another SDK.
551	var noopMeter metric.Meter
552	counter = metric.Must(noopMeter).NewInt64Counter("name.sum")
553	observer = metric.Must(noopMeter).NewBatchObserver(
554		func(context.Context, metric.BatchObserverResult) {},
555	).NewInt64ValueObserver("observer")
556
557	meter.RecordBatch(ctx, nil, counter.Measurement(1))
558	meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
559		result.Observe(nil, observer.Observation(1))
560	})
561
562	collected = sdk.Collect(ctx)
563	require.Equal(t, 0, collected)
564	require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
565}
566
567func TestSyncInAsync(t *testing.T) {
568	ctx := context.Background()
569	meter, sdk, processor := newSDK(t)
570
571	counter := Must(meter).NewFloat64Counter("counter.sum")
572	_ = Must(meter).NewInt64ValueObserver("observer.lastvalue",
573		func(ctx context.Context, result metric.Int64ObserverResult) {
574			result.Observe(10)
575			counter.Add(ctx, 100)
576		},
577	)
578
579	sdk.Collect(ctx)
580
581	out := processortest.NewOutput(label.DefaultEncoder())
582	for _, rec := range processor.accumulations {
583		require.NoError(t, out.AddAccumulation(rec))
584	}
585	require.EqualValues(t, map[string]float64{
586		"counter.sum//R=V":        100,
587		"observer.lastvalue//R=V": 10,
588	}, out.Map())
589}
590