1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package basic_test
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"runtime"
22	"sync"
23	"testing"
24	"time"
25
26	"github.com/stretchr/testify/require"
27
28	"go.opentelemetry.io/otel"
29	"go.opentelemetry.io/otel/attribute"
30	"go.opentelemetry.io/otel/metric"
31	export "go.opentelemetry.io/otel/sdk/export/metric"
32	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
33	controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
34	"go.opentelemetry.io/otel/sdk/metric/controller/controllertest"
35	processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
36	"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
37	"go.opentelemetry.io/otel/sdk/resource"
38)
39
40var testResource = resource.NewWithAttributes(attribute.String("R", "V"))
41
42type handler struct {
43	sync.Mutex
44	err error
45}
46
47func (h *handler) Handle(err error) {
48	h.Lock()
49	h.err = err
50	h.Unlock()
51}
52
53func (h *handler) Flush() error {
54	h.Lock()
55	err := h.err
56	h.err = nil
57	h.Unlock()
58	return err
59}
60
61var testHandler *handler
62
63func init() {
64	testHandler = new(handler)
65	otel.SetErrorHandler(testHandler)
66}
67
68func newExporter() *processortest.Exporter {
69	return processortest.NewExporter(
70		export.StatelessExportKindSelector(),
71		attribute.DefaultEncoder(),
72	)
73}
74
75func newCheckpointer() export.Checkpointer {
76	return processortest.Checkpointer(
77		processortest.NewProcessor(
78			processortest.AggregatorSelector(),
79			attribute.DefaultEncoder(),
80		),
81	)
82}
83
84func TestPushDoubleStop(t *testing.T) {
85	ctx := context.Background()
86	exporter := newExporter()
87	checkpointer := newCheckpointer()
88	p := controller.New(checkpointer, controller.WithExporter(exporter))
89	require.NoError(t, p.Start(ctx))
90	require.NoError(t, p.Stop(ctx))
91	require.NoError(t, p.Stop(ctx))
92}
93
94func TestPushDoubleStart(t *testing.T) {
95	ctx := context.Background()
96	exporter := newExporter()
97	checkpointer := newCheckpointer()
98	p := controller.New(checkpointer, controller.WithExporter(exporter))
99	require.NoError(t, p.Start(ctx))
100	err := p.Start(ctx)
101	require.Error(t, err)
102	require.True(t, errors.Is(err, controller.ErrControllerStarted))
103	require.NoError(t, p.Stop(ctx))
104}
105
106func TestPushTicker(t *testing.T) {
107	exporter := newExporter()
108	checkpointer := newCheckpointer()
109	p := controller.New(
110		checkpointer,
111		controller.WithExporter(exporter),
112		controller.WithCollectPeriod(time.Second),
113		controller.WithResource(testResource),
114	)
115	meter := p.MeterProvider().Meter("name")
116
117	mock := controllertest.NewMockClock()
118	p.SetClock(mock)
119
120	ctx := context.Background()
121
122	counter := metric.Must(meter).NewInt64Counter("counter.sum")
123
124	require.NoError(t, p.Start(ctx))
125
126	counter.Add(ctx, 3)
127
128	require.EqualValues(t, map[string]float64{}, exporter.Values())
129
130	mock.Add(time.Second)
131	runtime.Gosched()
132
133	require.EqualValues(t, map[string]float64{
134		"counter.sum//R=V": 3,
135	}, exporter.Values())
136
137	require.Equal(t, 1, exporter.ExportCount())
138	exporter.Reset()
139
140	counter.Add(ctx, 7)
141
142	mock.Add(time.Second)
143	runtime.Gosched()
144
145	require.EqualValues(t, map[string]float64{
146		"counter.sum//R=V": 10,
147	}, exporter.Values())
148
149	require.Equal(t, 1, exporter.ExportCount())
150	exporter.Reset()
151
152	require.NoError(t, p.Stop(ctx))
153}
154
155func TestPushExportError(t *testing.T) {
156	injector := func(name string, e error) func(r export.Record) error {
157		return func(r export.Record) error {
158			if r.Descriptor().Name() == name {
159				return e
160			}
161			return nil
162		}
163	}
164	var errAggregator = fmt.Errorf("unexpected error")
165	var tests = []struct {
166		name          string
167		injectedError error
168		expected      map[string]float64
169		expectedError error
170	}{
171		{"errNone", nil, map[string]float64{
172			"counter1.sum/X=Y/R=V": 3,
173			"counter2.sum//R=V":    5,
174		}, nil},
175		{"errNoData", aggregation.ErrNoData, map[string]float64{
176			"counter2.sum//R=V": 5,
177		}, nil},
178		{"errUnexpected", errAggregator, map[string]float64{}, errAggregator},
179	}
180	for _, tt := range tests {
181		t.Run(tt.name, func(t *testing.T) {
182			exporter := newExporter()
183			exporter.InjectErr = injector("counter1.sum", tt.injectedError)
184
185			// This test validates the error handling
186			// behavior of the basic Processor is honored
187			// by the push processor.
188			checkpointer := processor.New(processortest.AggregatorSelector(), exporter)
189			p := controller.New(
190				checkpointer,
191				controller.WithExporter(exporter),
192				controller.WithCollectPeriod(time.Second),
193				controller.WithResource(testResource),
194			)
195
196			mock := controllertest.NewMockClock()
197			p.SetClock(mock)
198
199			ctx := context.Background()
200
201			meter := p.MeterProvider().Meter("name")
202			counter1 := metric.Must(meter).NewInt64Counter("counter1.sum")
203			counter2 := metric.Must(meter).NewInt64Counter("counter2.sum")
204
205			require.NoError(t, p.Start(ctx))
206			runtime.Gosched()
207
208			counter1.Add(ctx, 3, attribute.String("X", "Y"))
209			counter2.Add(ctx, 5)
210
211			require.Equal(t, 0, exporter.ExportCount())
212			require.Nil(t, testHandler.Flush())
213
214			mock.Add(time.Second)
215			runtime.Gosched()
216
217			require.Equal(t, 1, exporter.ExportCount())
218			if tt.expectedError == nil {
219				require.EqualValues(t, tt.expected, exporter.Values())
220				require.NoError(t, testHandler.Flush())
221			} else {
222				err := testHandler.Flush()
223				require.Error(t, err)
224				require.Equal(t, tt.expectedError, err)
225			}
226
227			require.NoError(t, p.Stop(ctx))
228		})
229	}
230}
231