1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package metric_test 16 17import ( 18 "context" 19 "fmt" 20 "math" 21 "sync" 22 "testing" 23 24 "github.com/stretchr/testify/require" 25 26 "go.opentelemetry.io/otel" 27 "go.opentelemetry.io/otel/attribute" 28 "go.opentelemetry.io/otel/metric" 29 "go.opentelemetry.io/otel/metric/number" 30 export "go.opentelemetry.io/otel/sdk/export/metric" 31 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 32 metricsdk "go.opentelemetry.io/otel/sdk/metric" 33 "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 34 "go.opentelemetry.io/otel/sdk/resource" 35) 36 37var Must = metric.Must 38var testResource = resource.NewWithAttributes(attribute.String("R", "V")) 39 40type handler struct { 41 sync.Mutex 42 err error 43} 44 45func (h *handler) Handle(err error) { 46 h.Lock() 47 h.err = err 48 h.Unlock() 49} 50 51func (h *handler) Reset() { 52 h.Lock() 53 h.err = nil 54 h.Unlock() 55} 56 57func (h *handler) Flush() error { 58 h.Lock() 59 err := h.err 60 h.err = nil 61 h.Unlock() 62 return err 63} 64 65var testHandler *handler 66 67func init() { 68 testHandler = new(handler) 69 otel.SetErrorHandler(testHandler) 70} 71 72// correctnessProcessor could be replaced with processortest.Processor 73// with a non-default aggregator selector. TODO(#872) use the 74// processortest code here. 75type correctnessProcessor struct { 76 t *testing.T 77 *testSelector 78 79 accumulations []export.Accumulation 80} 81 82type testSelector struct { 83 selector export.AggregatorSelector 84 newAggCount int 85} 86 87func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { 88 ts.newAggCount += len(aggPtrs) 89 processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) 90} 91 92func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) { 93 testHandler.Reset() 94 processor := &correctnessProcessor{ 95 t: t, 96 testSelector: &testSelector{selector: processortest.AggregatorSelector()}, 97 } 98 accum := metricsdk.NewAccumulator( 99 processor, 100 testResource, 101 ) 102 meter := metric.WrapMeterImpl(accum, "test") 103 return meter, accum, processor 104} 105 106func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { 107 ci.accumulations = append(ci.accumulations, accumulation) 108 return nil 109} 110 111func TestInputRangeCounter(t *testing.T) { 112 ctx := context.Background() 113 meter, sdk, processor := newSDK(t) 114 115 counter := Must(meter).NewInt64Counter("name.sum") 116 117 counter.Add(ctx, -1) 118 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 119 120 checkpointed := sdk.Collect(ctx) 121 require.Equal(t, 0, checkpointed) 122 123 processor.accumulations = nil 124 counter.Add(ctx, 1) 125 checkpointed = sdk.Collect(ctx) 126 sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() 127 require.Equal(t, int64(1), sum.AsInt64()) 128 require.Equal(t, 1, checkpointed) 129 require.Nil(t, err) 130 require.Nil(t, testHandler.Flush()) 131} 132 133func TestInputRangeUpDownCounter(t *testing.T) { 134 ctx := context.Background() 135 meter, sdk, processor := newSDK(t) 136 137 counter := Must(meter).NewInt64UpDownCounter("name.sum") 138 139 counter.Add(ctx, -1) 140 counter.Add(ctx, -1) 141 counter.Add(ctx, 2) 142 counter.Add(ctx, 1) 143 144 checkpointed := sdk.Collect(ctx) 145 sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() 146 require.Equal(t, int64(1), sum.AsInt64()) 147 require.Equal(t, 1, checkpointed) 148 require.Nil(t, err) 149 require.Nil(t, testHandler.Flush()) 150} 151 152func TestInputRangeValueRecorder(t *testing.T) { 153 ctx := context.Background() 154 meter, sdk, processor := newSDK(t) 155 156 valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") 157 158 valuerecorder.Record(ctx, math.NaN()) 159 require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush()) 160 161 checkpointed := sdk.Collect(ctx) 162 require.Equal(t, 0, checkpointed) 163 164 valuerecorder.Record(ctx, 1) 165 valuerecorder.Record(ctx, 2) 166 167 processor.accumulations = nil 168 checkpointed = sdk.Collect(ctx) 169 170 count, err := processor.accumulations[0].Aggregator().(aggregation.Count).Count() 171 require.Equal(t, uint64(2), count) 172 require.Equal(t, 1, checkpointed) 173 require.Nil(t, testHandler.Flush()) 174 require.Nil(t, err) 175} 176 177func TestDisabledInstrument(t *testing.T) { 178 ctx := context.Background() 179 meter, sdk, processor := newSDK(t) 180 181 valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled") 182 183 valuerecorder.Record(ctx, -1) 184 checkpointed := sdk.Collect(ctx) 185 186 require.Equal(t, 0, checkpointed) 187 require.Equal(t, 0, len(processor.accumulations)) 188} 189 190func TestRecordNaN(t *testing.T) { 191 ctx := context.Background() 192 meter, _, _ := newSDK(t) 193 194 c := Must(meter).NewFloat64Counter("name.sum") 195 196 require.Nil(t, testHandler.Flush()) 197 c.Add(ctx, math.NaN()) 198 require.Error(t, testHandler.Flush()) 199} 200 201func TestSDKLabelsDeduplication(t *testing.T) { 202 ctx := context.Background() 203 meter, sdk, processor := newSDK(t) 204 205 counter := Must(meter).NewInt64Counter("name.sum") 206 207 const ( 208 maxKeys = 21 209 keySets = 2 210 repeats = 3 211 ) 212 var keysA []attribute.Key 213 var keysB []attribute.Key 214 215 for i := 0; i < maxKeys; i++ { 216 keysA = append(keysA, attribute.Key(fmt.Sprintf("A%03d", i))) 217 keysB = append(keysB, attribute.Key(fmt.Sprintf("B%03d", i))) 218 } 219 220 var allExpect [][]attribute.KeyValue 221 for numKeys := 0; numKeys < maxKeys; numKeys++ { 222 223 var kvsA []attribute.KeyValue 224 var kvsB []attribute.KeyValue 225 for r := 0; r < repeats; r++ { 226 for i := 0; i < numKeys; i++ { 227 kvsA = append(kvsA, keysA[i].Int(r)) 228 kvsB = append(kvsB, keysB[i].Int(r)) 229 } 230 } 231 232 var expectA []attribute.KeyValue 233 var expectB []attribute.KeyValue 234 for i := 0; i < numKeys; i++ { 235 expectA = append(expectA, keysA[i].Int(repeats-1)) 236 expectB = append(expectB, keysB[i].Int(repeats-1)) 237 } 238 239 counter.Add(ctx, 1, kvsA...) 240 counter.Add(ctx, 1, kvsA...) 241 allExpect = append(allExpect, expectA) 242 243 if numKeys != 0 { 244 // In this case A and B sets are the same. 245 counter.Add(ctx, 1, kvsB...) 246 counter.Add(ctx, 1, kvsB...) 247 allExpect = append(allExpect, expectB) 248 } 249 250 } 251 252 sdk.Collect(ctx) 253 254 var actual [][]attribute.KeyValue 255 for _, rec := range processor.accumulations { 256 sum, _ := rec.Aggregator().(aggregation.Sum).Sum() 257 require.Equal(t, sum, number.NewInt64Number(2)) 258 259 kvs := rec.Labels().ToSlice() 260 actual = append(actual, kvs) 261 } 262 263 require.ElementsMatch(t, allExpect, actual) 264} 265 266func newSetIter(kvs ...attribute.KeyValue) attribute.Iterator { 267 labels := attribute.NewSet(kvs...) 268 return labels.Iter() 269} 270 271func TestDefaultLabelEncoder(t *testing.T) { 272 encoder := attribute.DefaultEncoder() 273 274 encoded := encoder.Encode(newSetIter(attribute.String("A", "B"), attribute.String("C", "D"))) 275 require.Equal(t, `A=B,C=D`, encoded) 276 277 encoded = encoder.Encode(newSetIter(attribute.String("A", "B,c=d"), attribute.String(`C\`, "D"))) 278 require.Equal(t, `A=B\,c\=d,C\\=D`, encoded) 279 280 encoded = encoder.Encode(newSetIter(attribute.String(`\`, `=`), attribute.String(`,`, `\`))) 281 require.Equal(t, `\,=\\,\\=\=`, encoded) 282 283 // Note: the label encoder does not sort or de-dup values, 284 // that is done in Labels(...). 285 encoded = encoder.Encode(newSetIter( 286 attribute.Int("I", 1), 287 attribute.Int64("I64", 1), 288 attribute.Float64("F64", 1), 289 attribute.Float64("F64", 1), 290 attribute.String("S", "1"), 291 attribute.Bool("B", true), 292 )) 293 require.Equal(t, "B=true,F64=1,I=1,I64=1,S=1", encoded) 294} 295 296func TestObserverCollection(t *testing.T) { 297 ctx := context.Background() 298 meter, sdk, processor := newSDK(t) 299 mult := 1 300 301 _ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) { 302 result.Observe(float64(mult), attribute.String("A", "B")) 303 // last value wins 304 result.Observe(float64(-mult), attribute.String("A", "B")) 305 result.Observe(float64(-mult), attribute.String("C", "D")) 306 }) 307 _ = Must(meter).NewInt64ValueObserver("int.valueobserver.lastvalue", func(_ context.Context, result metric.Int64ObserverResult) { 308 result.Observe(int64(-mult), attribute.String("A", "B")) 309 result.Observe(int64(mult)) 310 // last value wins 311 result.Observe(int64(mult), attribute.String("A", "B")) 312 result.Observe(int64(mult)) 313 }) 314 315 _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 316 result.Observe(float64(mult), attribute.String("A", "B")) 317 result.Observe(float64(2*mult), attribute.String("A", "B")) 318 result.Observe(float64(mult), attribute.String("C", "D")) 319 }) 320 _ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 321 result.Observe(int64(2*mult), attribute.String("A", "B")) 322 result.Observe(int64(mult)) 323 // last value wins 324 result.Observe(int64(mult), attribute.String("A", "B")) 325 result.Observe(int64(mult)) 326 }) 327 328 _ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 329 result.Observe(float64(mult), attribute.String("A", "B")) 330 result.Observe(float64(-2*mult), attribute.String("A", "B")) 331 result.Observe(float64(mult), attribute.String("C", "D")) 332 }) 333 _ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 334 result.Observe(int64(2*mult), attribute.String("A", "B")) 335 result.Observe(int64(mult)) 336 // last value wins 337 result.Observe(int64(mult), attribute.String("A", "B")) 338 result.Observe(int64(-mult)) 339 }) 340 341 _ = Must(meter).NewInt64ValueObserver("empty.valueobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 342 }) 343 344 for mult = 0; mult < 3; mult++ { 345 processor.accumulations = nil 346 347 collected := sdk.Collect(ctx) 348 require.Equal(t, collected, len(processor.accumulations)) 349 350 out := processortest.NewOutput(attribute.DefaultEncoder()) 351 for _, rec := range processor.accumulations { 352 require.NoError(t, out.AddAccumulation(rec)) 353 } 354 mult := float64(mult) 355 require.EqualValues(t, map[string]float64{ 356 "float.valueobserver.lastvalue/A=B/R=V": -mult, 357 "float.valueobserver.lastvalue/C=D/R=V": -mult, 358 "int.valueobserver.lastvalue//R=V": mult, 359 "int.valueobserver.lastvalue/A=B/R=V": mult, 360 361 "float.sumobserver.sum/A=B/R=V": 2 * mult, 362 "float.sumobserver.sum/C=D/R=V": mult, 363 "int.sumobserver.sum//R=V": mult, 364 "int.sumobserver.sum/A=B/R=V": mult, 365 366 "float.updownsumobserver.sum/A=B/R=V": -2 * mult, 367 "float.updownsumobserver.sum/C=D/R=V": mult, 368 "int.updownsumobserver.sum//R=V": -mult, 369 "int.updownsumobserver.sum/A=B/R=V": mult, 370 }, out.Map()) 371 } 372} 373 374func TestSumObserverInputRange(t *testing.T) { 375 ctx := context.Background() 376 meter, sdk, processor := newSDK(t) 377 378 // TODO: these tests are testing for negative values, not for _descending values_. Fix. 379 _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 380 result.Observe(-2, attribute.String("A", "B")) 381 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 382 result.Observe(-1, attribute.String("C", "D")) 383 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 384 }) 385 _ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 386 result.Observe(-1, attribute.String("A", "B")) 387 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 388 result.Observe(-1) 389 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 390 }) 391 392 collected := sdk.Collect(ctx) 393 394 require.Equal(t, 0, collected) 395 require.Equal(t, 0, len(processor.accumulations)) 396 397 // check that the error condition was reset 398 require.NoError(t, testHandler.Flush()) 399} 400 401func TestObserverBatch(t *testing.T) { 402 ctx := context.Background() 403 meter, sdk, processor := newSDK(t) 404 405 var floatValueObs metric.Float64ValueObserver 406 var intValueObs metric.Int64ValueObserver 407 var floatSumObs metric.Float64SumObserver 408 var intSumObs metric.Int64SumObserver 409 var floatUpDownSumObs metric.Float64UpDownSumObserver 410 var intUpDownSumObs metric.Int64UpDownSumObserver 411 412 var batch = Must(meter).NewBatchObserver( 413 func(_ context.Context, result metric.BatchObserverResult) { 414 result.Observe( 415 []attribute.KeyValue{ 416 attribute.String("A", "B"), 417 }, 418 floatValueObs.Observation(1), 419 floatValueObs.Observation(-1), 420 intValueObs.Observation(-1), 421 intValueObs.Observation(1), 422 floatSumObs.Observation(1000), 423 intSumObs.Observation(100), 424 floatUpDownSumObs.Observation(-1000), 425 intUpDownSumObs.Observation(-100), 426 ) 427 result.Observe( 428 []attribute.KeyValue{ 429 attribute.String("C", "D"), 430 }, 431 floatValueObs.Observation(-1), 432 floatSumObs.Observation(-1), 433 floatUpDownSumObs.Observation(-1), 434 ) 435 result.Observe( 436 nil, 437 intValueObs.Observation(1), 438 intValueObs.Observation(1), 439 intSumObs.Observation(10), 440 floatSumObs.Observation(1.1), 441 intUpDownSumObs.Observation(10), 442 ) 443 }) 444 floatValueObs = batch.NewFloat64ValueObserver("float.valueobserver.lastvalue") 445 intValueObs = batch.NewInt64ValueObserver("int.valueobserver.lastvalue") 446 floatSumObs = batch.NewFloat64SumObserver("float.sumobserver.sum") 447 intSumObs = batch.NewInt64SumObserver("int.sumobserver.sum") 448 floatUpDownSumObs = batch.NewFloat64UpDownSumObserver("float.updownsumobserver.sum") 449 intUpDownSumObs = batch.NewInt64UpDownSumObserver("int.updownsumobserver.sum") 450 451 collected := sdk.Collect(ctx) 452 453 require.Equal(t, collected, len(processor.accumulations)) 454 455 out := processortest.NewOutput(attribute.DefaultEncoder()) 456 for _, rec := range processor.accumulations { 457 require.NoError(t, out.AddAccumulation(rec)) 458 } 459 require.EqualValues(t, map[string]float64{ 460 "float.sumobserver.sum//R=V": 1.1, 461 "float.sumobserver.sum/A=B/R=V": 1000, 462 "int.sumobserver.sum//R=V": 10, 463 "int.sumobserver.sum/A=B/R=V": 100, 464 465 "int.updownsumobserver.sum/A=B/R=V": -100, 466 "float.updownsumobserver.sum/A=B/R=V": -1000, 467 "int.updownsumobserver.sum//R=V": 10, 468 "float.updownsumobserver.sum/C=D/R=V": -1, 469 470 "float.valueobserver.lastvalue/A=B/R=V": -1, 471 "float.valueobserver.lastvalue/C=D/R=V": -1, 472 "int.valueobserver.lastvalue//R=V": 1, 473 "int.valueobserver.lastvalue/A=B/R=V": 1, 474 }, out.Map()) 475} 476 477func TestRecordBatch(t *testing.T) { 478 ctx := context.Background() 479 meter, sdk, processor := newSDK(t) 480 481 counter1 := Must(meter).NewInt64Counter("int64.sum") 482 counter2 := Must(meter).NewFloat64Counter("float64.sum") 483 valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.exact") 484 valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.exact") 485 486 sdk.RecordBatch( 487 ctx, 488 []attribute.KeyValue{ 489 attribute.String("A", "B"), 490 attribute.String("C", "D"), 491 }, 492 counter1.Measurement(1), 493 counter2.Measurement(2), 494 valuerecorder1.Measurement(3), 495 valuerecorder2.Measurement(4), 496 ) 497 498 sdk.Collect(ctx) 499 500 out := processortest.NewOutput(attribute.DefaultEncoder()) 501 for _, rec := range processor.accumulations { 502 require.NoError(t, out.AddAccumulation(rec)) 503 } 504 require.EqualValues(t, map[string]float64{ 505 "int64.sum/A=B,C=D/R=V": 1, 506 "float64.sum/A=B,C=D/R=V": 2, 507 "int64.exact/A=B,C=D/R=V": 3, 508 "float64.exact/A=B,C=D/R=V": 4, 509 }, out.Map()) 510} 511 512// TestRecordPersistence ensures that a direct-called instrument that 513// is repeatedly used each interval results in a persistent record, so 514// that its encoded labels will be cached across collection intervals. 515func TestRecordPersistence(t *testing.T) { 516 ctx := context.Background() 517 meter, sdk, processor := newSDK(t) 518 519 c := Must(meter).NewFloat64Counter("name.sum") 520 b := c.Bind(attribute.String("bound", "true")) 521 uk := attribute.String("bound", "false") 522 523 for i := 0; i < 100; i++ { 524 c.Add(ctx, 1, uk) 525 b.Add(ctx, 1) 526 sdk.Collect(ctx) 527 } 528 529 require.Equal(t, 4, processor.newAggCount) 530} 531 532func TestIncorrectInstruments(t *testing.T) { 533 // The Batch observe/record APIs are susceptible to 534 // uninitialized instruments. 535 var counter metric.Int64Counter 536 var observer metric.Int64ValueObserver 537 538 ctx := context.Background() 539 meter, sdk, _ := newSDK(t) 540 541 // Now try with uninitialized instruments. 542 meter.RecordBatch(ctx, nil, counter.Measurement(1)) 543 meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { 544 result.Observe(nil, observer.Observation(1)) 545 }) 546 547 collected := sdk.Collect(ctx) 548 require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush()) 549 require.Equal(t, 0, collected) 550 551 // Now try with instruments from another SDK. 552 var noopMeter metric.Meter 553 counter = metric.Must(noopMeter).NewInt64Counter("name.sum") 554 observer = metric.Must(noopMeter).NewBatchObserver( 555 func(context.Context, metric.BatchObserverResult) {}, 556 ).NewInt64ValueObserver("observer") 557 558 meter.RecordBatch(ctx, nil, counter.Measurement(1)) 559 meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { 560 result.Observe(nil, observer.Observation(1)) 561 }) 562 563 collected = sdk.Collect(ctx) 564 require.Equal(t, 0, collected) 565 require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush()) 566} 567 568func TestSyncInAsync(t *testing.T) { 569 ctx := context.Background() 570 meter, sdk, processor := newSDK(t) 571 572 counter := Must(meter).NewFloat64Counter("counter.sum") 573 _ = Must(meter).NewInt64ValueObserver("observer.lastvalue", 574 func(ctx context.Context, result metric.Int64ObserverResult) { 575 result.Observe(10) 576 counter.Add(ctx, 100) 577 }, 578 ) 579 580 sdk.Collect(ctx) 581 582 out := processortest.NewOutput(attribute.DefaultEncoder()) 583 for _, rec := range processor.accumulations { 584 require.NoError(t, out.AddAccumulation(rec)) 585 } 586 require.EqualValues(t, map[string]float64{ 587 "counter.sum//R=V": 100, 588 "observer.lastvalue//R=V": 10, 589 }, out.Map()) 590} 591