1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package basic_test 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "testing" 22 "time" 23 24 "github.com/stretchr/testify/require" 25 26 "go.opentelemetry.io/otel/attribute" 27 ottest "go.opentelemetry.io/otel/internal/internaltest" 28 "go.opentelemetry.io/otel/metric" 29 "go.opentelemetry.io/otel/metric/sdkapi" 30 export "go.opentelemetry.io/otel/sdk/export/metric" 31 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 32 "go.opentelemetry.io/otel/sdk/instrumentation" 33 controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" 34 "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" 35 processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" 36 "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 37 "go.opentelemetry.io/otel/sdk/resource" 38) 39 40const envVar = "OTEL_RESOURCE_ATTRIBUTES" 41 42func getMap(t *testing.T, cont *controller.Controller) map[string]float64 { 43 out := processortest.NewOutput(attribute.DefaultEncoder()) 44 45 require.NoError(t, cont.ForEach( 46 func(_ instrumentation.Library, reader export.Reader) error { 47 return reader.ForEach( 48 aggregation.CumulativeTemporalitySelector(), 49 func(record export.Record) error { 50 return out.AddRecord(record) 51 }, 52 ) 53 })) 54 return out.Map() 55} 56 57type testContextKey string 58 59func testContext() context.Context { 60 ctx := context.Background() 61 return context.WithValue(ctx, testContextKey("A"), "B") 62} 63 64func checkTestContext(t *testing.T, ctx context.Context) { 65 require.Equal(t, "B", ctx.Value(testContextKey("A"))) 66} 67 68func TestControllerUsesResource(t *testing.T) { 69 const envVal = "T=U,key=value" 70 store, err := ottest.SetEnvVariables(map[string]string{ 71 envVar: envVal, 72 }) 73 74 require.NoError(t, err) 75 defer func() { require.NoError(t, store.Restore()) }() 76 77 cases := []struct { 78 name string 79 options []controller.Option 80 wanted string 81 }{ 82 { 83 name: "explicitly empty resource", 84 options: []controller.Option{controller.WithResource(resource.Empty())}, 85 wanted: envVal, 86 }, 87 { 88 name: "uses default if no resource option", 89 options: nil, 90 wanted: resource.Default().Encoded(attribute.DefaultEncoder()), 91 }, 92 { 93 name: "explicit resource", 94 options: []controller.Option{controller.WithResource(resource.NewSchemaless(attribute.String("R", "S")))}, 95 wanted: "R=S," + envVal, 96 }, 97 { 98 name: "multi resource", 99 options: []controller.Option{ 100 controller.WithResource(resource.NewSchemaless(attribute.String("R", "WRONG"))), 101 controller.WithResource(resource.NewSchemaless(attribute.String("R", "S"))), 102 controller.WithResource(resource.NewSchemaless(attribute.String("W", "X"))), 103 controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))), 104 }, 105 wanted: "R=S,T=V,W=X,key=value", 106 }, 107 { 108 name: "user override environment", 109 options: []controller.Option{ 110 controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))), 111 controller.WithResource(resource.NewSchemaless(attribute.String("key", "I win"))), 112 }, 113 wanted: "T=V,key=I win", 114 }, 115 } 116 for _, c := range cases { 117 t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) { 118 sel := aggregation.CumulativeTemporalitySelector() 119 exp := processortest.New(sel, attribute.DefaultEncoder()) 120 cont := controller.New( 121 processor.NewFactory( 122 processortest.AggregatorSelector(), 123 exp, 124 ), 125 append(c.options, controller.WithExporter(exp))..., 126 ) 127 ctx := context.Background() 128 require.NoError(t, cont.Start(ctx)) 129 130 ctr := metric.Must(cont.Meter("named")).NewFloat64Counter("calls.sum") 131 ctr.Add(context.Background(), 1.) 132 133 // Collect once 134 require.NoError(t, cont.Stop(ctx)) 135 136 expect := map[string]float64{ 137 "calls.sum//" + c.wanted: 1., 138 } 139 require.EqualValues(t, expect, exp.Values()) 140 }) 141 } 142} 143 144func TestStartNoExporter(t *testing.T) { 145 cont := controller.New( 146 processor.NewFactory( 147 processortest.AggregatorSelector(), 148 aggregation.CumulativeTemporalitySelector(), 149 ), 150 controller.WithCollectPeriod(time.Second), 151 controller.WithResource(resource.Empty()), 152 ) 153 mock := controllertest.NewMockClock() 154 cont.SetClock(mock) 155 156 calls := int64(0) 157 158 _ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("calls.lastvalue", 159 func(ctx context.Context, result metric.Int64ObserverResult) { 160 calls++ 161 checkTestContext(t, ctx) 162 result.Observe(calls, attribute.String("A", "B")) 163 }, 164 ) 165 166 // Collect() has not been called. The controller is unstarted. 167 expect := map[string]float64{} 168 169 // The time advances, but doesn't change the result (not collected). 170 require.EqualValues(t, expect, getMap(t, cont)) 171 mock.Add(time.Second) 172 require.EqualValues(t, expect, getMap(t, cont)) 173 mock.Add(time.Second) 174 175 expect = map[string]float64{ 176 "calls.lastvalue/A=B/": 1, 177 } 178 179 // Collect once 180 ctx := testContext() 181 182 require.NoError(t, cont.Collect(ctx)) 183 184 require.EqualValues(t, expect, getMap(t, cont)) 185 mock.Add(time.Second) 186 require.EqualValues(t, expect, getMap(t, cont)) 187 mock.Add(time.Second) 188 189 // Again 190 expect = map[string]float64{ 191 "calls.lastvalue/A=B/": 2, 192 } 193 194 require.NoError(t, cont.Collect(ctx)) 195 196 require.EqualValues(t, expect, getMap(t, cont)) 197 mock.Add(time.Second) 198 require.EqualValues(t, expect, getMap(t, cont)) 199 200 // Start the controller 201 require.NoError(t, cont.Start(ctx)) 202 203 for i := 1; i <= 3; i++ { 204 expect = map[string]float64{ 205 "calls.lastvalue/A=B/": 2 + float64(i), 206 } 207 208 mock.Add(time.Second) 209 require.EqualValues(t, expect, getMap(t, cont)) 210 } 211} 212 213func TestObserverCanceled(t *testing.T) { 214 cont := controller.New( 215 processor.NewFactory( 216 processortest.AggregatorSelector(), 217 aggregation.CumulativeTemporalitySelector(), 218 ), 219 controller.WithCollectPeriod(0), 220 controller.WithCollectTimeout(time.Millisecond), 221 controller.WithResource(resource.Empty()), 222 ) 223 224 calls := int64(0) 225 226 _ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("done.lastvalue", 227 func(ctx context.Context, result metric.Int64ObserverResult) { 228 <-ctx.Done() 229 calls++ 230 result.Observe(calls) 231 }, 232 ) 233 // This relies on the context timing out 234 err := cont.Collect(context.Background()) 235 require.Error(t, err) 236 require.True(t, errors.Is(err, context.DeadlineExceeded)) 237 238 expect := map[string]float64{ 239 "done.lastvalue//": 1, 240 } 241 242 require.EqualValues(t, expect, getMap(t, cont)) 243} 244 245func TestObserverContext(t *testing.T) { 246 cont := controller.New( 247 processor.NewFactory( 248 processortest.AggregatorSelector(), 249 aggregation.CumulativeTemporalitySelector(), 250 ), 251 controller.WithCollectTimeout(0), 252 controller.WithResource(resource.Empty()), 253 ) 254 255 _ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("done.lastvalue", 256 func(ctx context.Context, result metric.Int64ObserverResult) { 257 time.Sleep(10 * time.Millisecond) 258 checkTestContext(t, ctx) 259 result.Observe(1) 260 }, 261 ) 262 ctx := testContext() 263 264 require.NoError(t, cont.Collect(ctx)) 265 266 expect := map[string]float64{ 267 "done.lastvalue//": 1, 268 } 269 270 require.EqualValues(t, expect, getMap(t, cont)) 271} 272 273type blockingExporter struct { 274 calls int 275 exporter *processortest.Exporter 276} 277 278func newBlockingExporter() *blockingExporter { 279 return &blockingExporter{ 280 exporter: processortest.New( 281 aggregation.CumulativeTemporalitySelector(), 282 attribute.DefaultEncoder(), 283 ), 284 } 285} 286 287func (b *blockingExporter) Export(ctx context.Context, res *resource.Resource, output export.InstrumentationLibraryReader) error { 288 var err error 289 _ = b.exporter.Export(ctx, res, output) 290 if b.calls == 0 { 291 // timeout once 292 <-ctx.Done() 293 err = ctx.Err() 294 } 295 b.calls++ 296 return err 297} 298 299func (*blockingExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality { 300 return aggregation.CumulativeTemporality 301} 302 303func TestExportTimeout(t *testing.T) { 304 exporter := newBlockingExporter() 305 cont := controller.New( 306 processor.NewFactory( 307 processortest.AggregatorSelector(), 308 aggregation.CumulativeTemporalitySelector(), 309 ), 310 controller.WithCollectPeriod(time.Second), 311 controller.WithPushTimeout(time.Millisecond), 312 controller.WithExporter(exporter), 313 controller.WithResource(resource.Empty()), 314 ) 315 mock := controllertest.NewMockClock() 316 cont.SetClock(mock) 317 318 calls := int64(0) 319 _ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("one.lastvalue", 320 func(ctx context.Context, result metric.Int64ObserverResult) { 321 calls++ 322 result.Observe(calls) 323 }, 324 ) 325 326 require.NoError(t, cont.Start(context.Background())) 327 328 // Initial empty state 329 expect := map[string]float64{} 330 require.EqualValues(t, expect, exporter.exporter.Values()) 331 332 // Collect after 1s, timeout 333 mock.Add(time.Second) 334 335 err := testHandler.Flush() 336 require.Error(t, err) 337 require.True(t, errors.Is(err, context.DeadlineExceeded)) 338 339 expect = map[string]float64{ 340 "one.lastvalue//": 1, 341 } 342 require.EqualValues(t, expect, exporter.exporter.Values()) 343 344 // Collect again 345 mock.Add(time.Second) 346 expect = map[string]float64{ 347 "one.lastvalue//": 2, 348 } 349 require.EqualValues(t, expect, exporter.exporter.Values()) 350 351 err = testHandler.Flush() 352 require.NoError(t, err) 353} 354 355func TestCollectAfterStopThenStartAgain(t *testing.T) { 356 exp := processortest.New( 357 aggregation.CumulativeTemporalitySelector(), 358 attribute.DefaultEncoder(), 359 ) 360 cont := controller.New( 361 processor.NewFactory( 362 processortest.AggregatorSelector(), 363 exp, 364 ), 365 controller.WithCollectPeriod(time.Second), 366 controller.WithExporter(exp), 367 controller.WithResource(resource.Empty()), 368 ) 369 mock := controllertest.NewMockClock() 370 cont.SetClock(mock) 371 372 calls := 0 373 _ = metric.Must(cont.Meter("named")).NewInt64CounterObserver("one.lastvalue", 374 func(ctx context.Context, result metric.Int64ObserverResult) { 375 calls++ 376 result.Observe(int64(calls)) 377 }, 378 ) 379 380 // No collections happen (because mock clock does not advance): 381 require.NoError(t, cont.Start(context.Background())) 382 require.True(t, cont.IsRunning()) 383 384 // There's one collection run by Stop(): 385 require.NoError(t, cont.Stop(context.Background())) 386 387 require.EqualValues(t, map[string]float64{ 388 "one.lastvalue//": 1, 389 }, exp.Values()) 390 require.NoError(t, testHandler.Flush()) 391 392 // Manual collect after Stop still works, subject to 393 // CollectPeriod. 394 require.NoError(t, cont.Collect(context.Background())) 395 require.EqualValues(t, map[string]float64{ 396 "one.lastvalue//": 2, 397 }, getMap(t, cont)) 398 399 require.NoError(t, testHandler.Flush()) 400 require.False(t, cont.IsRunning()) 401 402 // Start again, see that collection proceeds. However, 403 // explicit collection should still fail. 404 require.NoError(t, cont.Start(context.Background())) 405 require.True(t, cont.IsRunning()) 406 err := cont.Collect(context.Background()) 407 require.Error(t, err) 408 require.Equal(t, controller.ErrControllerStarted, err) 409 410 require.NoError(t, cont.Stop(context.Background())) 411 require.EqualValues(t, map[string]float64{ 412 "one.lastvalue//": 3, 413 }, exp.Values()) 414 require.False(t, cont.IsRunning()) 415 416 // Time has not advanced yet. Now let the ticker perform 417 // collection: 418 require.NoError(t, cont.Start(context.Background())) 419 mock.Add(time.Second) 420 require.EqualValues(t, map[string]float64{ 421 "one.lastvalue//": 4, 422 }, exp.Values()) 423 424 mock.Add(time.Second) 425 require.EqualValues(t, map[string]float64{ 426 "one.lastvalue//": 5, 427 }, exp.Values()) 428 require.NoError(t, cont.Stop(context.Background())) 429 require.EqualValues(t, map[string]float64{ 430 "one.lastvalue//": 6, 431 }, exp.Values()) 432} 433 434func TestRegistryFunction(t *testing.T) { 435 exp := processortest.New( 436 aggregation.CumulativeTemporalitySelector(), 437 attribute.DefaultEncoder(), 438 ) 439 cont := controller.New( 440 processor.NewFactory( 441 processortest.AggregatorSelector(), 442 exp, 443 ), 444 controller.WithCollectPeriod(time.Second), 445 controller.WithExporter(exp), 446 controller.WithResource(resource.Empty()), 447 ) 448 449 m1 := cont.Meter("test") 450 m2 := cont.Meter("test") 451 452 require.NotNil(t, m1) 453 require.Equal(t, m1, m2) 454 455 c1, err := m1.NewInt64Counter("counter.sum") 456 require.NoError(t, err) 457 458 c2, err := m1.NewInt64Counter("counter.sum") 459 require.NoError(t, err) 460 461 require.Equal(t, c1, c2) 462 463 ctx := context.Background() 464 465 require.NoError(t, cont.Start(ctx)) 466 467 c1.Add(ctx, 10) 468 c2.Add(ctx, 10) 469 470 require.NoError(t, cont.Stop(ctx)) 471 472 require.EqualValues(t, map[string]float64{ 473 "counter.sum//": 20, 474 }, exp.Values()) 475} 476