1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package basic_test
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"testing"
22	"time"
23
24	"github.com/stretchr/testify/require"
25
26	"go.opentelemetry.io/otel/attribute"
27	ottest "go.opentelemetry.io/otel/internal/internaltest"
28	"go.opentelemetry.io/otel/metric"
29	"go.opentelemetry.io/otel/metric/sdkapi"
30	export "go.opentelemetry.io/otel/sdk/export/metric"
31	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
32	"go.opentelemetry.io/otel/sdk/instrumentation"
33	controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
34	"go.opentelemetry.io/otel/sdk/metric/controller/controllertest"
35	processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
36	"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
37	"go.opentelemetry.io/otel/sdk/resource"
38)
39
40const envVar = "OTEL_RESOURCE_ATTRIBUTES"
41
42func getMap(t *testing.T, cont *controller.Controller) map[string]float64 {
43	out := processortest.NewOutput(attribute.DefaultEncoder())
44
45	require.NoError(t, cont.ForEach(
46		func(_ instrumentation.Library, reader export.Reader) error {
47			return reader.ForEach(
48				aggregation.CumulativeTemporalitySelector(),
49				func(record export.Record) error {
50					return out.AddRecord(record)
51				},
52			)
53		}))
54	return out.Map()
55}
56
57type testContextKey string
58
59func testContext() context.Context {
60	ctx := context.Background()
61	return context.WithValue(ctx, testContextKey("A"), "B")
62}
63
64func checkTestContext(t *testing.T, ctx context.Context) {
65	require.Equal(t, "B", ctx.Value(testContextKey("A")))
66}
67
68func TestControllerUsesResource(t *testing.T) {
69	const envVal = "T=U,key=value"
70	store, err := ottest.SetEnvVariables(map[string]string{
71		envVar: envVal,
72	})
73
74	require.NoError(t, err)
75	defer func() { require.NoError(t, store.Restore()) }()
76
77	cases := []struct {
78		name    string
79		options []controller.Option
80		wanted  string
81	}{
82		{
83			name:    "explicitly empty resource",
84			options: []controller.Option{controller.WithResource(resource.Empty())},
85			wanted:  envVal,
86		},
87		{
88			name:    "uses default if no resource option",
89			options: nil,
90			wanted:  resource.Default().Encoded(attribute.DefaultEncoder()),
91		},
92		{
93			name:    "explicit resource",
94			options: []controller.Option{controller.WithResource(resource.NewSchemaless(attribute.String("R", "S")))},
95			wanted:  "R=S," + envVal,
96		},
97		{
98			name: "multi resource",
99			options: []controller.Option{
100				controller.WithResource(resource.NewSchemaless(attribute.String("R", "WRONG"))),
101				controller.WithResource(resource.NewSchemaless(attribute.String("R", "S"))),
102				controller.WithResource(resource.NewSchemaless(attribute.String("W", "X"))),
103				controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))),
104			},
105			wanted: "R=S,T=V,W=X,key=value",
106		},
107		{
108			name: "user override environment",
109			options: []controller.Option{
110				controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))),
111				controller.WithResource(resource.NewSchemaless(attribute.String("key", "I win"))),
112			},
113			wanted: "T=V,key=I win",
114		},
115	}
116	for _, c := range cases {
117		t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) {
118			sel := aggregation.CumulativeTemporalitySelector()
119			exp := processortest.New(sel, attribute.DefaultEncoder())
120			cont := controller.New(
121				processor.NewFactory(
122					processortest.AggregatorSelector(),
123					exp,
124				),
125				append(c.options, controller.WithExporter(exp))...,
126			)
127			ctx := context.Background()
128			require.NoError(t, cont.Start(ctx))
129
130			ctr := metric.Must(cont.Meter("named")).NewFloat64Counter("calls.sum")
131			ctr.Add(context.Background(), 1.)
132
133			// Collect once
134			require.NoError(t, cont.Stop(ctx))
135
136			expect := map[string]float64{
137				"calls.sum//" + c.wanted: 1.,
138			}
139			require.EqualValues(t, expect, exp.Values())
140		})
141	}
142}
143
144func TestStartNoExporter(t *testing.T) {
145	cont := controller.New(
146		processor.NewFactory(
147			processortest.AggregatorSelector(),
148			aggregation.CumulativeTemporalitySelector(),
149		),
150		controller.WithCollectPeriod(time.Second),
151		controller.WithResource(resource.Empty()),
152	)
153	mock := controllertest.NewMockClock()
154	cont.SetClock(mock)
155
156	calls := int64(0)
157
158	_ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("calls.lastvalue",
159		func(ctx context.Context, result metric.Int64ObserverResult) {
160			calls++
161			checkTestContext(t, ctx)
162			result.Observe(calls, attribute.String("A", "B"))
163		},
164	)
165
166	// Collect() has not been called.  The controller is unstarted.
167	expect := map[string]float64{}
168
169	// The time advances, but doesn't change the result (not collected).
170	require.EqualValues(t, expect, getMap(t, cont))
171	mock.Add(time.Second)
172	require.EqualValues(t, expect, getMap(t, cont))
173	mock.Add(time.Second)
174
175	expect = map[string]float64{
176		"calls.lastvalue/A=B/": 1,
177	}
178
179	// Collect once
180	ctx := testContext()
181
182	require.NoError(t, cont.Collect(ctx))
183
184	require.EqualValues(t, expect, getMap(t, cont))
185	mock.Add(time.Second)
186	require.EqualValues(t, expect, getMap(t, cont))
187	mock.Add(time.Second)
188
189	// Again
190	expect = map[string]float64{
191		"calls.lastvalue/A=B/": 2,
192	}
193
194	require.NoError(t, cont.Collect(ctx))
195
196	require.EqualValues(t, expect, getMap(t, cont))
197	mock.Add(time.Second)
198	require.EqualValues(t, expect, getMap(t, cont))
199
200	// Start the controller
201	require.NoError(t, cont.Start(ctx))
202
203	for i := 1; i <= 3; i++ {
204		expect = map[string]float64{
205			"calls.lastvalue/A=B/": 2 + float64(i),
206		}
207
208		mock.Add(time.Second)
209		require.EqualValues(t, expect, getMap(t, cont))
210	}
211}
212
213func TestObserverCanceled(t *testing.T) {
214	cont := controller.New(
215		processor.NewFactory(
216			processortest.AggregatorSelector(),
217			aggregation.CumulativeTemporalitySelector(),
218		),
219		controller.WithCollectPeriod(0),
220		controller.WithCollectTimeout(time.Millisecond),
221		controller.WithResource(resource.Empty()),
222	)
223
224	calls := int64(0)
225
226	_ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("done.lastvalue",
227		func(ctx context.Context, result metric.Int64ObserverResult) {
228			<-ctx.Done()
229			calls++
230			result.Observe(calls)
231		},
232	)
233	// This relies on the context timing out
234	err := cont.Collect(context.Background())
235	require.Error(t, err)
236	require.True(t, errors.Is(err, context.DeadlineExceeded))
237
238	expect := map[string]float64{
239		"done.lastvalue//": 1,
240	}
241
242	require.EqualValues(t, expect, getMap(t, cont))
243}
244
245func TestObserverContext(t *testing.T) {
246	cont := controller.New(
247		processor.NewFactory(
248			processortest.AggregatorSelector(),
249			aggregation.CumulativeTemporalitySelector(),
250		),
251		controller.WithCollectTimeout(0),
252		controller.WithResource(resource.Empty()),
253	)
254
255	_ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("done.lastvalue",
256		func(ctx context.Context, result metric.Int64ObserverResult) {
257			time.Sleep(10 * time.Millisecond)
258			checkTestContext(t, ctx)
259			result.Observe(1)
260		},
261	)
262	ctx := testContext()
263
264	require.NoError(t, cont.Collect(ctx))
265
266	expect := map[string]float64{
267		"done.lastvalue//": 1,
268	}
269
270	require.EqualValues(t, expect, getMap(t, cont))
271}
272
273type blockingExporter struct {
274	calls    int
275	exporter *processortest.Exporter
276}
277
278func newBlockingExporter() *blockingExporter {
279	return &blockingExporter{
280		exporter: processortest.New(
281			aggregation.CumulativeTemporalitySelector(),
282			attribute.DefaultEncoder(),
283		),
284	}
285}
286
287func (b *blockingExporter) Export(ctx context.Context, res *resource.Resource, output export.InstrumentationLibraryReader) error {
288	var err error
289	_ = b.exporter.Export(ctx, res, output)
290	if b.calls == 0 {
291		// timeout once
292		<-ctx.Done()
293		err = ctx.Err()
294	}
295	b.calls++
296	return err
297}
298
299func (*blockingExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality {
300	return aggregation.CumulativeTemporality
301}
302
303func TestExportTimeout(t *testing.T) {
304	exporter := newBlockingExporter()
305	cont := controller.New(
306		processor.NewFactory(
307			processortest.AggregatorSelector(),
308			aggregation.CumulativeTemporalitySelector(),
309		),
310		controller.WithCollectPeriod(time.Second),
311		controller.WithPushTimeout(time.Millisecond),
312		controller.WithExporter(exporter),
313		controller.WithResource(resource.Empty()),
314	)
315	mock := controllertest.NewMockClock()
316	cont.SetClock(mock)
317
318	calls := int64(0)
319	_ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("one.lastvalue",
320		func(ctx context.Context, result metric.Int64ObserverResult) {
321			calls++
322			result.Observe(calls)
323		},
324	)
325
326	require.NoError(t, cont.Start(context.Background()))
327
328	// Initial empty state
329	expect := map[string]float64{}
330	require.EqualValues(t, expect, exporter.exporter.Values())
331
332	// Collect after 1s, timeout
333	mock.Add(time.Second)
334
335	err := testHandler.Flush()
336	require.Error(t, err)
337	require.True(t, errors.Is(err, context.DeadlineExceeded))
338
339	expect = map[string]float64{
340		"one.lastvalue//": 1,
341	}
342	require.EqualValues(t, expect, exporter.exporter.Values())
343
344	// Collect again
345	mock.Add(time.Second)
346	expect = map[string]float64{
347		"one.lastvalue//": 2,
348	}
349	require.EqualValues(t, expect, exporter.exporter.Values())
350
351	err = testHandler.Flush()
352	require.NoError(t, err)
353}
354
355func TestCollectAfterStopThenStartAgain(t *testing.T) {
356	exp := processortest.New(
357		aggregation.CumulativeTemporalitySelector(),
358		attribute.DefaultEncoder(),
359	)
360	cont := controller.New(
361		processor.NewFactory(
362			processortest.AggregatorSelector(),
363			exp,
364		),
365		controller.WithCollectPeriod(time.Second),
366		controller.WithExporter(exp),
367		controller.WithResource(resource.Empty()),
368	)
369	mock := controllertest.NewMockClock()
370	cont.SetClock(mock)
371
372	calls := 0
373	_ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("one.lastvalue",
374		func(ctx context.Context, result metric.Int64ObserverResult) {
375			calls++
376			result.Observe(int64(calls))
377		},
378	)
379
380	// No collections happen (because mock clock does not advance):
381	require.NoError(t, cont.Start(context.Background()))
382	require.True(t, cont.IsRunning())
383
384	// There's one collection run by Stop():
385	require.NoError(t, cont.Stop(context.Background()))
386
387	require.EqualValues(t, map[string]float64{
388		"one.lastvalue//": 1,
389	}, exp.Values())
390	require.NoError(t, testHandler.Flush())
391
392	// Manual collect after Stop still works, subject to
393	// CollectPeriod.
394	require.NoError(t, cont.Collect(context.Background()))
395	require.EqualValues(t, map[string]float64{
396		"one.lastvalue//": 2,
397	}, getMap(t, cont))
398
399	require.NoError(t, testHandler.Flush())
400	require.False(t, cont.IsRunning())
401
402	// Start again, see that collection proceeds.  However,
403	// explicit collection should still fail.
404	require.NoError(t, cont.Start(context.Background()))
405	require.True(t, cont.IsRunning())
406	err := cont.Collect(context.Background())
407	require.Error(t, err)
408	require.Equal(t, controller.ErrControllerStarted, err)
409
410	require.NoError(t, cont.Stop(context.Background()))
411	require.EqualValues(t, map[string]float64{
412		"one.lastvalue//": 3,
413	}, exp.Values())
414	require.False(t, cont.IsRunning())
415
416	// Time has not advanced yet. Now let the ticker perform
417	// collection:
418	require.NoError(t, cont.Start(context.Background()))
419	mock.Add(time.Second)
420	require.EqualValues(t, map[string]float64{
421		"one.lastvalue//": 4,
422	}, exp.Values())
423
424	mock.Add(time.Second)
425	require.EqualValues(t, map[string]float64{
426		"one.lastvalue//": 5,
427	}, exp.Values())
428	require.NoError(t, cont.Stop(context.Background()))
429	require.EqualValues(t, map[string]float64{
430		"one.lastvalue//": 6,
431	}, exp.Values())
432}
433
434func TestRegistryFunction(t *testing.T) {
435	exp := processortest.New(
436		aggregation.CumulativeTemporalitySelector(),
437		attribute.DefaultEncoder(),
438	)
439	cont := controller.New(
440		processor.NewFactory(
441			processortest.AggregatorSelector(),
442			exp,
443		),
444		controller.WithCollectPeriod(time.Second),
445		controller.WithExporter(exp),
446		controller.WithResource(resource.Empty()),
447	)
448
449	m1 := cont.Meter("test")
450	m2 := cont.Meter("test")
451
452	require.NotNil(t, m1)
453	require.Equal(t, m1, m2)
454
455	c1, err := m1.NewInt64Counter("counter.sum")
456	require.NoError(t, err)
457
458	c2, err := m1.NewInt64Counter("counter.sum")
459	require.NoError(t, err)
460
461	require.Equal(t, c1, c2)
462
463	ctx := context.Background()
464
465	require.NoError(t, cont.Start(ctx))
466
467	c1.Add(ctx, 10)
468	c2.Add(ctx, 10)
469
470	require.NoError(t, cont.Stop(ctx))
471
472	require.EqualValues(t, map[string]float64{
473		"counter.sum//": 20,
474	}, exp.Values())
475}
476