1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package basic_test
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"strings"
22	"testing"
23	"time"
24
25	"github.com/stretchr/testify/require"
26
27	"go.opentelemetry.io/otel/attribute"
28	"go.opentelemetry.io/otel/metric"
29	"go.opentelemetry.io/otel/metric/metrictest"
30	"go.opentelemetry.io/otel/metric/number"
31	"go.opentelemetry.io/otel/metric/sdkapi"
32	export "go.opentelemetry.io/otel/sdk/export/metric"
33	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
34	"go.opentelemetry.io/otel/sdk/instrumentation"
35	sdk "go.opentelemetry.io/otel/sdk/metric"
36	"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
37	"go.opentelemetry.io/otel/sdk/metric/processor/basic"
38	"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
39	processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
40	"go.opentelemetry.io/otel/sdk/resource"
41)
42
43func requireNotAfter(t *testing.T, t1, t2 time.Time) {
44	require.False(t, t1.After(t2), "expected %v ≤ %v", t1, t2)
45}
46
47// TestProcessor tests all the non-error paths in this package.
48func TestProcessor(t *testing.T) {
49	type exportCase struct {
50		kind aggregation.Temporality
51	}
52	type instrumentCase struct {
53		kind sdkapi.InstrumentKind
54	}
55	type numberCase struct {
56		kind number.Kind
57	}
58	type aggregatorCase struct {
59		kind aggregation.Kind
60	}
61
62	for _, tc := range []exportCase{
63		{kind: aggregation.CumulativeTemporality},
64		{kind: aggregation.DeltaTemporality},
65	} {
66		t.Run(tc.kind.String(), func(t *testing.T) {
67			for _, ic := range []instrumentCase{
68				{kind: sdkapi.CounterInstrumentKind},
69				{kind: sdkapi.UpDownCounterInstrumentKind},
70				{kind: sdkapi.HistogramInstrumentKind},
71				{kind: sdkapi.CounterObserverInstrumentKind},
72				{kind: sdkapi.UpDownCounterObserverInstrumentKind},
73				{kind: sdkapi.GaugeObserverInstrumentKind},
74			} {
75				t.Run(ic.kind.String(), func(t *testing.T) {
76					for _, nc := range []numberCase{
77						{kind: number.Int64Kind},
78						{kind: number.Float64Kind},
79					} {
80						t.Run(nc.kind.String(), func(t *testing.T) {
81							for _, ac := range []aggregatorCase{
82								{kind: aggregation.SumKind},
83								{kind: aggregation.MinMaxSumCountKind},
84								{kind: aggregation.HistogramKind},
85								{kind: aggregation.LastValueKind},
86								{kind: aggregation.ExactKind},
87							} {
88								t.Run(ac.kind.String(), func(t *testing.T) {
89									testProcessor(
90										t,
91										tc.kind,
92										ic.kind,
93										nc.kind,
94										ac.kind,
95									)
96								})
97							}
98						})
99					}
100				})
101			}
102		})
103	}
104}
105
106func asNumber(nkind number.Kind, value int64) number.Number {
107	if nkind == number.Int64Kind {
108		return number.NewInt64Number(value)
109	}
110	return number.NewFloat64Number(float64(value))
111}
112
113func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation {
114	ls := attribute.NewSet(labs...)
115	var agg export.Aggregator
116	selector.AggregatorFor(desc, &agg)
117	require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc))
118
119	return export.NewAccumulation(desc, &ls, agg)
120}
121
122func testProcessor(
123	t *testing.T,
124	aggTemp aggregation.Temporality,
125	mkind sdkapi.InstrumentKind,
126	nkind number.Kind,
127	akind aggregation.Kind,
128) {
129	// Note: this selector uses the instrument name to dictate
130	// aggregation kind.
131	selector := processorTest.AggregatorSelector()
132
133	labs1 := []attribute.KeyValue{attribute.String("L1", "V")}
134	labs2 := []attribute.KeyValue{attribute.String("L2", "V")}
135
136	testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) {
137		processor := basic.New(selector, aggregation.ConstantTemporalitySelector(aggTemp), basic.WithMemory(hasMemory))
138
139		instSuffix := fmt.Sprint(".", strings.ToLower(akind.String()))
140
141		desc1 := metrictest.NewDescriptor(fmt.Sprint("inst1", instSuffix), mkind, nkind)
142		desc2 := metrictest.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind)
143
144		for nc := 0; nc < nCheckpoint; nc++ {
145
146			// The input is 10 per update, scaled by
147			// the number of checkpoints for
148			// cumulative instruments:
149			input := int64(10)
150			cumulativeMultiplier := int64(nc + 1)
151			if mkind.PrecomputedSum() {
152				input *= cumulativeMultiplier
153			}
154
155			processor.StartCollection()
156
157			for na := 0; na < nAccum; na++ {
158				_ = processor.Process(updateFor(t, &desc1, selector, input, labs1...))
159				_ = processor.Process(updateFor(t, &desc2, selector, input, labs2...))
160			}
161
162			err := processor.FinishCollection()
163			if err == aggregation.ErrNoSubtraction {
164				var subr export.Aggregator
165				selector.AggregatorFor(&desc1, &subr)
166				_, canSub := subr.(export.Subtractor)
167
168				// Allow unsupported subraction case only when it is called for.
169				require.True(t, mkind.PrecomputedSum() && aggTemp == aggregation.DeltaTemporality && !canSub)
170				return
171			} else if err != nil {
172				t.Fatal("unexpected FinishCollection error: ", err)
173			}
174
175			if nc < nCheckpoint-1 {
176				continue
177			}
178
179			reader := processor.Reader()
180
181			for _, repetitionAfterEmptyInterval := range []bool{false, true} {
182				if repetitionAfterEmptyInterval {
183					// We're repeating the test after another
184					// interval with no updates.
185					processor.StartCollection()
186					if err := processor.FinishCollection(); err != nil {
187						t.Fatal("unexpected collection error: ", err)
188					}
189				}
190
191				// Test the final checkpoint state.
192				records1 := processorTest.NewOutput(attribute.DefaultEncoder())
193				err = reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord)
194
195				// Test for an allowed error:
196				if err != nil && err != aggregation.ErrNoSubtraction {
197					t.Fatal("unexpected checkpoint error: ", err)
198				}
199				var multiplier int64
200
201				if mkind.Asynchronous() {
202					// Asynchronous tests accumulate results multiply by the
203					// number of Accumulators, unless LastValue aggregation.
204					// If a precomputed sum, we expect cumulative inputs.
205					if mkind.PrecomputedSum() {
206						if aggTemp == aggregation.DeltaTemporality && akind != aggregation.LastValueKind {
207							multiplier = int64(nAccum)
208						} else if akind == aggregation.LastValueKind {
209							multiplier = cumulativeMultiplier
210						} else {
211							multiplier = cumulativeMultiplier * int64(nAccum)
212						}
213					} else {
214						if aggTemp == aggregation.CumulativeTemporality && akind != aggregation.LastValueKind {
215							multiplier = cumulativeMultiplier * int64(nAccum)
216						} else if akind == aggregation.LastValueKind {
217							multiplier = 1
218						} else {
219							multiplier = int64(nAccum)
220						}
221					}
222				} else {
223					// Synchronous accumulate results from multiple accumulators,
224					// use that number as the baseline multiplier.
225					multiplier = int64(nAccum)
226					if aggTemp == aggregation.CumulativeTemporality {
227						// If a cumulative exporter, include prior checkpoints.
228						multiplier *= cumulativeMultiplier
229					}
230					if akind == aggregation.LastValueKind {
231						// If a last-value aggregator, set multiplier to 1.0.
232						multiplier = 1
233					}
234				}
235
236				exp := map[string]float64{}
237				if hasMemory || !repetitionAfterEmptyInterval {
238					exp = map[string]float64{
239						fmt.Sprintf("inst1%s/L1=V/", instSuffix): float64(multiplier * 10), // labels1
240						fmt.Sprintf("inst2%s/L2=V/", instSuffix): float64(multiplier * 10), // labels2
241					}
242				}
243
244				require.EqualValues(t, exp, records1.Map(), "with repetition=%v", repetitionAfterEmptyInterval)
245			}
246		}
247	}
248
249	for _, hasMem := range []bool{false, true} {
250		t.Run(fmt.Sprintf("HasMemory=%v", hasMem), func(t *testing.T) {
251			// For 1 to 3 checkpoints:
252			for nAccum := 1; nAccum <= 3; nAccum++ {
253				t.Run(fmt.Sprintf("NumAccum=%d", nAccum), func(t *testing.T) {
254					// For 1 to 3 accumulators:
255					for nCheckpoint := 1; nCheckpoint <= 3; nCheckpoint++ {
256						t.Run(fmt.Sprintf("NumCkpt=%d", nCheckpoint), func(t *testing.T) {
257							testBody(t, hasMem, nAccum, nCheckpoint)
258						})
259					}
260				})
261			}
262		})
263	}
264}
265
266type bogusExporter struct{}
267
268func (bogusExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality {
269	return 100
270}
271
272func (bogusExporter) Export(context.Context, export.Reader) error {
273	panic("Not called")
274}
275
276func TestBasicInconsistent(t *testing.T) {
277	// Test double-start
278	b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
279
280	b.StartCollection()
281	b.StartCollection()
282	require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
283
284	// Test finish without start
285	b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
286
287	require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
288
289	// Test no finish
290	b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
291
292	b.StartCollection()
293	require.Equal(
294		t,
295		basic.ErrInconsistentState,
296		b.ForEach(
297			aggregation.StatelessTemporalitySelector(),
298			func(export.Record) error { return nil },
299		),
300	)
301
302	// Test no start
303	b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
304
305	desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind)
306	accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{})
307	require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
308
309	// Test invalid kind:
310	b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
311	b.StartCollection()
312	require.NoError(t, b.Process(accum))
313	require.NoError(t, b.FinishCollection())
314
315	err := b.ForEach(
316		bogusExporter{},
317		func(export.Record) error { return nil },
318	)
319	require.True(t, errors.Is(err, basic.ErrInvalidTemporality))
320
321}
322
323func TestBasicTimestamps(t *testing.T) {
324	beforeNew := time.Now()
325	time.Sleep(time.Nanosecond)
326	b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
327	time.Sleep(time.Nanosecond)
328	afterNew := time.Now()
329
330	desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind)
331	accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{})
332
333	b.StartCollection()
334	_ = b.Process(accum)
335	require.NoError(t, b.FinishCollection())
336
337	var start1, end1 time.Time
338
339	require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error {
340		start1 = rec.StartTime()
341		end1 = rec.EndTime()
342		return nil
343	}))
344
345	// The first start time is set in the constructor.
346	requireNotAfter(t, beforeNew, start1)
347	requireNotAfter(t, start1, afterNew)
348
349	for i := 0; i < 2; i++ {
350		b.StartCollection()
351		require.NoError(t, b.Process(accum))
352		require.NoError(t, b.FinishCollection())
353
354		var start2, end2 time.Time
355
356		require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error {
357			start2 = rec.StartTime()
358			end2 = rec.EndTime()
359			return nil
360		}))
361
362		// Subsequent intervals have their start and end aligned.
363		require.Equal(t, start2, end1)
364		requireNotAfter(t, start1, end1)
365		requireNotAfter(t, start2, end2)
366
367		start1 = start2
368		end1 = end2
369	}
370}
371
372func TestStatefulNoMemoryCumulative(t *testing.T) {
373	aggTempSel := aggregation.CumulativeTemporalitySelector()
374
375	desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind)
376	selector := processorTest.AggregatorSelector()
377
378	processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
379	reader := processor.Reader()
380
381	for i := 1; i < 3; i++ {
382		// Empty interval
383		processor.StartCollection()
384		require.NoError(t, processor.FinishCollection())
385
386		// Verify zero elements
387		records := processorTest.NewOutput(attribute.DefaultEncoder())
388		require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
389		require.EqualValues(t, map[string]float64{}, records.Map())
390
391		// Add 10
392		processor.StartCollection()
393		_ = processor.Process(updateFor(t, &desc, selector, 10, attribute.String("A", "B")))
394		require.NoError(t, processor.FinishCollection())
395
396		// Verify one element
397		records = processorTest.NewOutput(attribute.DefaultEncoder())
398		require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
399		require.EqualValues(t, map[string]float64{
400			"inst.sum/A=B/": float64(i * 10),
401		}, records.Map())
402	}
403}
404
405func TestStatefulNoMemoryDelta(t *testing.T) {
406	aggTempSel := aggregation.DeltaTemporalitySelector()
407
408	desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind)
409	selector := processorTest.AggregatorSelector()
410
411	processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
412	reader := processor.Reader()
413
414	for i := 1; i < 3; i++ {
415		// Empty interval
416		processor.StartCollection()
417		require.NoError(t, processor.FinishCollection())
418
419		// Verify zero elements
420		records := processorTest.NewOutput(attribute.DefaultEncoder())
421		require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
422		require.EqualValues(t, map[string]float64{}, records.Map())
423
424		// Add 10
425		processor.StartCollection()
426		_ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B")))
427		require.NoError(t, processor.FinishCollection())
428
429		// Verify one element
430		records = processorTest.NewOutput(attribute.DefaultEncoder())
431		require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
432		require.EqualValues(t, map[string]float64{
433			"inst.sum/A=B/": 10,
434		}, records.Map())
435	}
436}
437
438func TestMultiObserverSum(t *testing.T) {
439	for _, aggTempSel := range []aggregation.TemporalitySelector{
440		aggregation.CumulativeTemporalitySelector(),
441		aggregation.DeltaTemporalitySelector(),
442	} {
443
444		desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind)
445		selector := processorTest.AggregatorSelector()
446
447		processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
448		reader := processor.Reader()
449
450		for i := 1; i < 3; i++ {
451			// Add i*10*3 times
452			processor.StartCollection()
453			_ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B")))
454			_ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B")))
455			_ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B")))
456			require.NoError(t, processor.FinishCollection())
457
458			// Multiplier is 1 for deltas, otherwise i.
459			multiplier := i
460			if aggTempSel.TemporalityFor(&desc, aggregation.SumKind) == aggregation.DeltaTemporality {
461				multiplier = 1
462			}
463
464			// Verify one element
465			records := processorTest.NewOutput(attribute.DefaultEncoder())
466			require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
467			require.EqualValues(t, map[string]float64{
468				"observe.sum/A=B/": float64(3 * 10 * multiplier),
469			}, records.Map())
470		}
471	}
472}
473
474func TestCounterObserverEndToEnd(t *testing.T) {
475	ctx := context.Background()
476	eselector := aggregation.CumulativeTemporalitySelector()
477	proc := basic.New(
478		processorTest.AggregatorSelector(),
479		eselector,
480	)
481	accum := sdk.NewAccumulator(proc)
482	meter := metric.WrapMeterImpl(accum)
483
484	var calls int64
485	metric.Must(meter).NewInt64CounterObserver("observer.sum",
486		func(_ context.Context, result metric.Int64ObserverResult) {
487			calls++
488			result.Observe(calls)
489		},
490	)
491	reader := proc.Reader()
492
493	var startTime [3]time.Time
494	var endTime [3]time.Time
495
496	for i := range startTime {
497		data := proc.Reader()
498		data.Lock()
499		proc.StartCollection()
500		accum.Collect(ctx)
501		require.NoError(t, proc.FinishCollection())
502
503		exporter := processortest.New(eselector, attribute.DefaultEncoder())
504		require.NoError(t, exporter.Export(ctx, resource.Empty(), processortest.OneInstrumentationLibraryReader(
505			instrumentation.Library{
506				Name: "test",
507			}, reader)))
508
509		require.EqualValues(t, map[string]float64{
510			"observer.sum//": float64(i + 1),
511		}, exporter.Values())
512
513		var record export.Record
514		require.NoError(t, data.ForEach(eselector, func(r export.Record) error {
515			record = r
516			return nil
517		}))
518
519		startTime[i] = record.StartTime()
520		endTime[i] = record.EndTime()
521		data.Unlock()
522	}
523
524	require.Equal(t, startTime[0], startTime[1])
525	require.Equal(t, startTime[0], startTime[2])
526	requireNotAfter(t, endTime[0], endTime[1])
527	requireNotAfter(t, endTime[1], endTime[2])
528}
529