1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package metric_test 16 17import ( 18 "context" 19 "fmt" 20 "math" 21 "sync" 22 "testing" 23 24 "github.com/stretchr/testify/require" 25 26 "go.opentelemetry.io/otel" 27 "go.opentelemetry.io/otel/label" 28 "go.opentelemetry.io/otel/metric" 29 "go.opentelemetry.io/otel/metric/number" 30 export "go.opentelemetry.io/otel/sdk/export/metric" 31 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 32 metricsdk "go.opentelemetry.io/otel/sdk/metric" 33 "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 34 "go.opentelemetry.io/otel/sdk/resource" 35) 36 37var Must = metric.Must 38var testResource = resource.NewWithAttributes(label.String("R", "V")) 39 40type handler struct { 41 sync.Mutex 42 err error 43} 44 45func (h *handler) Handle(err error) { 46 h.Lock() 47 h.err = err 48 h.Unlock() 49} 50 51func (h *handler) Reset() { 52 h.Lock() 53 h.err = nil 54 h.Unlock() 55} 56 57func (h *handler) Flush() error { 58 h.Lock() 59 err := h.err 60 h.err = nil 61 h.Unlock() 62 return err 63} 64 65var testHandler *handler 66 67func init() { 68 testHandler = new(handler) 69 otel.SetErrorHandler(testHandler) 70} 71 72// correctnessProcessor could be replaced with processortest.Processor 73// with a non-default aggregator selector. TODO(#872) use the 74// processortest code here. 75type correctnessProcessor struct { 76 t *testing.T 77 *testSelector 78 79 accumulations []export.Accumulation 80} 81 82type testSelector struct { 83 selector export.AggregatorSelector 84 newAggCount int 85} 86 87func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { 88 ts.newAggCount += len(aggPtrs) 89 processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) 90} 91 92func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) { 93 testHandler.Reset() 94 processor := &correctnessProcessor{ 95 t: t, 96 testSelector: &testSelector{selector: processortest.AggregatorSelector()}, 97 } 98 accum := metricsdk.NewAccumulator( 99 processor, 100 testResource, 101 ) 102 meter := metric.WrapMeterImpl(accum, "test") 103 return meter, accum, processor 104} 105 106func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { 107 ci.accumulations = append(ci.accumulations, accumulation) 108 return nil 109} 110 111func TestInputRangeCounter(t *testing.T) { 112 ctx := context.Background() 113 meter, sdk, processor := newSDK(t) 114 115 counter := Must(meter).NewInt64Counter("name.sum") 116 117 counter.Add(ctx, -1) 118 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 119 120 checkpointed := sdk.Collect(ctx) 121 require.Equal(t, 0, checkpointed) 122 123 processor.accumulations = nil 124 counter.Add(ctx, 1) 125 checkpointed = sdk.Collect(ctx) 126 sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() 127 require.Equal(t, int64(1), sum.AsInt64()) 128 require.Equal(t, 1, checkpointed) 129 require.Nil(t, err) 130 require.Nil(t, testHandler.Flush()) 131} 132 133func TestInputRangeUpDownCounter(t *testing.T) { 134 ctx := context.Background() 135 meter, sdk, processor := newSDK(t) 136 137 counter := Must(meter).NewInt64UpDownCounter("name.sum") 138 139 counter.Add(ctx, -1) 140 counter.Add(ctx, -1) 141 counter.Add(ctx, 2) 142 counter.Add(ctx, 1) 143 144 checkpointed := sdk.Collect(ctx) 145 sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() 146 require.Equal(t, int64(1), sum.AsInt64()) 147 require.Equal(t, 1, checkpointed) 148 require.Nil(t, err) 149 require.Nil(t, testHandler.Flush()) 150} 151 152func TestInputRangeValueRecorder(t *testing.T) { 153 ctx := context.Background() 154 meter, sdk, processor := newSDK(t) 155 156 valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") 157 158 valuerecorder.Record(ctx, math.NaN()) 159 require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush()) 160 161 checkpointed := sdk.Collect(ctx) 162 require.Equal(t, 0, checkpointed) 163 164 valuerecorder.Record(ctx, 1) 165 valuerecorder.Record(ctx, 2) 166 167 processor.accumulations = nil 168 checkpointed = sdk.Collect(ctx) 169 170 count, err := processor.accumulations[0].Aggregator().(aggregation.Distribution).Count() 171 require.Equal(t, int64(2), count) 172 require.Equal(t, 1, checkpointed) 173 require.Nil(t, testHandler.Flush()) 174 require.Nil(t, err) 175} 176 177func TestDisabledInstrument(t *testing.T) { 178 ctx := context.Background() 179 meter, sdk, processor := newSDK(t) 180 181 valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled") 182 183 valuerecorder.Record(ctx, -1) 184 checkpointed := sdk.Collect(ctx) 185 186 require.Equal(t, 0, checkpointed) 187 require.Equal(t, 0, len(processor.accumulations)) 188} 189 190func TestRecordNaN(t *testing.T) { 191 ctx := context.Background() 192 meter, _, _ := newSDK(t) 193 194 c := Must(meter).NewFloat64Counter("name.sum") 195 196 require.Nil(t, testHandler.Flush()) 197 c.Add(ctx, math.NaN()) 198 require.Error(t, testHandler.Flush()) 199} 200 201func TestSDKLabelsDeduplication(t *testing.T) { 202 ctx := context.Background() 203 meter, sdk, processor := newSDK(t) 204 205 counter := Must(meter).NewInt64Counter("name.sum") 206 207 const ( 208 maxKeys = 21 209 keySets = 2 210 repeats = 3 211 ) 212 var keysA []label.Key 213 var keysB []label.Key 214 215 for i := 0; i < maxKeys; i++ { 216 keysA = append(keysA, label.Key(fmt.Sprintf("A%03d", i))) 217 keysB = append(keysB, label.Key(fmt.Sprintf("B%03d", i))) 218 } 219 220 var allExpect [][]label.KeyValue 221 for numKeys := 0; numKeys < maxKeys; numKeys++ { 222 223 var kvsA []label.KeyValue 224 var kvsB []label.KeyValue 225 for r := 0; r < repeats; r++ { 226 for i := 0; i < numKeys; i++ { 227 kvsA = append(kvsA, keysA[i].Int(r)) 228 kvsB = append(kvsB, keysB[i].Int(r)) 229 } 230 } 231 232 var expectA []label.KeyValue 233 var expectB []label.KeyValue 234 for i := 0; i < numKeys; i++ { 235 expectA = append(expectA, keysA[i].Int(repeats-1)) 236 expectB = append(expectB, keysB[i].Int(repeats-1)) 237 } 238 239 counter.Add(ctx, 1, kvsA...) 240 counter.Add(ctx, 1, kvsA...) 241 allExpect = append(allExpect, expectA) 242 243 if numKeys != 0 { 244 // In this case A and B sets are the same. 245 counter.Add(ctx, 1, kvsB...) 246 counter.Add(ctx, 1, kvsB...) 247 allExpect = append(allExpect, expectB) 248 } 249 250 } 251 252 sdk.Collect(ctx) 253 254 var actual [][]label.KeyValue 255 for _, rec := range processor.accumulations { 256 sum, _ := rec.Aggregator().(aggregation.Sum).Sum() 257 require.Equal(t, sum, number.NewInt64Number(2)) 258 259 kvs := rec.Labels().ToSlice() 260 actual = append(actual, kvs) 261 } 262 263 require.ElementsMatch(t, allExpect, actual) 264} 265 266func newSetIter(kvs ...label.KeyValue) label.Iterator { 267 labels := label.NewSet(kvs...) 268 return labels.Iter() 269} 270 271func TestDefaultLabelEncoder(t *testing.T) { 272 encoder := label.DefaultEncoder() 273 274 encoded := encoder.Encode(newSetIter(label.String("A", "B"), label.String("C", "D"))) 275 require.Equal(t, `A=B,C=D`, encoded) 276 277 encoded = encoder.Encode(newSetIter(label.String("A", "B,c=d"), label.String(`C\`, "D"))) 278 require.Equal(t, `A=B\,c\=d,C\\=D`, encoded) 279 280 encoded = encoder.Encode(newSetIter(label.String(`\`, `=`), label.String(`,`, `\`))) 281 require.Equal(t, `\,=\\,\\=\=`, encoded) 282 283 // Note: the label encoder does not sort or de-dup values, 284 // that is done in Labels(...). 285 encoded = encoder.Encode(newSetIter( 286 label.Int("I", 1), 287 label.Uint("U", 1), 288 label.Int32("I32", 1), 289 label.Uint32("U32", 1), 290 label.Int64("I64", 1), 291 label.Uint64("U64", 1), 292 label.Float64("F64", 1), 293 label.Float64("F64", 1), 294 label.String("S", "1"), 295 label.Bool("B", true), 296 )) 297 require.Equal(t, "B=true,F64=1,I=1,I32=1,I64=1,S=1,U=1,U32=1,U64=1", encoded) 298} 299 300func TestObserverCollection(t *testing.T) { 301 ctx := context.Background() 302 meter, sdk, processor := newSDK(t) 303 304 _ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) { 305 result.Observe(1, label.String("A", "B")) 306 // last value wins 307 result.Observe(-1, label.String("A", "B")) 308 result.Observe(-1, label.String("C", "D")) 309 }) 310 _ = Must(meter).NewInt64ValueObserver("int.valueobserver.lastvalue", func(_ context.Context, result metric.Int64ObserverResult) { 311 result.Observe(-1, label.String("A", "B")) 312 result.Observe(1) 313 // last value wins 314 result.Observe(1, label.String("A", "B")) 315 result.Observe(1) 316 }) 317 318 _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 319 result.Observe(1, label.String("A", "B")) 320 result.Observe(2, label.String("A", "B")) 321 result.Observe(1, label.String("C", "D")) 322 }) 323 _ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 324 result.Observe(2, label.String("A", "B")) 325 result.Observe(1) 326 // last value wins 327 result.Observe(1, label.String("A", "B")) 328 result.Observe(1) 329 }) 330 331 _ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 332 result.Observe(1, label.String("A", "B")) 333 result.Observe(-2, label.String("A", "B")) 334 result.Observe(1, label.String("C", "D")) 335 }) 336 _ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 337 result.Observe(2, label.String("A", "B")) 338 result.Observe(1) 339 // last value wins 340 result.Observe(1, label.String("A", "B")) 341 result.Observe(-1) 342 }) 343 344 _ = Must(meter).NewInt64ValueObserver("empty.valueobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 345 }) 346 347 collected := sdk.Collect(ctx) 348 349 require.Equal(t, collected, len(processor.accumulations)) 350 351 out := processortest.NewOutput(label.DefaultEncoder()) 352 for _, rec := range processor.accumulations { 353 require.NoError(t, out.AddAccumulation(rec)) 354 } 355 require.EqualValues(t, map[string]float64{ 356 "float.valueobserver.lastvalue/A=B/R=V": -1, 357 "float.valueobserver.lastvalue/C=D/R=V": -1, 358 "int.valueobserver.lastvalue//R=V": 1, 359 "int.valueobserver.lastvalue/A=B/R=V": 1, 360 361 "float.sumobserver.sum/A=B/R=V": 2, 362 "float.sumobserver.sum/C=D/R=V": 1, 363 "int.sumobserver.sum//R=V": 1, 364 "int.sumobserver.sum/A=B/R=V": 1, 365 366 "float.updownsumobserver.sum/A=B/R=V": -2, 367 "float.updownsumobserver.sum/C=D/R=V": 1, 368 "int.updownsumobserver.sum//R=V": -1, 369 "int.updownsumobserver.sum/A=B/R=V": 1, 370 }, out.Map()) 371} 372 373func TestSumObserverInputRange(t *testing.T) { 374 ctx := context.Background() 375 meter, sdk, processor := newSDK(t) 376 377 // TODO: these tests are testing for negative values, not for _descending values_. Fix. 378 _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { 379 result.Observe(-2, label.String("A", "B")) 380 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 381 result.Observe(-1, label.String("C", "D")) 382 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 383 }) 384 _ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { 385 result.Observe(-1, label.String("A", "B")) 386 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 387 result.Observe(-1) 388 require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush()) 389 }) 390 391 collected := sdk.Collect(ctx) 392 393 require.Equal(t, 0, collected) 394 require.Equal(t, 0, len(processor.accumulations)) 395 396 // check that the error condition was reset 397 require.NoError(t, testHandler.Flush()) 398} 399 400func TestObserverBatch(t *testing.T) { 401 ctx := context.Background() 402 meter, sdk, processor := newSDK(t) 403 404 var floatValueObs metric.Float64ValueObserver 405 var intValueObs metric.Int64ValueObserver 406 var floatSumObs metric.Float64SumObserver 407 var intSumObs metric.Int64SumObserver 408 var floatUpDownSumObs metric.Float64UpDownSumObserver 409 var intUpDownSumObs metric.Int64UpDownSumObserver 410 411 var batch = Must(meter).NewBatchObserver( 412 func(_ context.Context, result metric.BatchObserverResult) { 413 result.Observe( 414 []label.KeyValue{ 415 label.String("A", "B"), 416 }, 417 floatValueObs.Observation(1), 418 floatValueObs.Observation(-1), 419 intValueObs.Observation(-1), 420 intValueObs.Observation(1), 421 floatSumObs.Observation(1000), 422 intSumObs.Observation(100), 423 floatUpDownSumObs.Observation(-1000), 424 intUpDownSumObs.Observation(-100), 425 ) 426 result.Observe( 427 []label.KeyValue{ 428 label.String("C", "D"), 429 }, 430 floatValueObs.Observation(-1), 431 floatSumObs.Observation(-1), 432 floatUpDownSumObs.Observation(-1), 433 ) 434 result.Observe( 435 nil, 436 intValueObs.Observation(1), 437 intValueObs.Observation(1), 438 intSumObs.Observation(10), 439 floatSumObs.Observation(1.1), 440 intUpDownSumObs.Observation(10), 441 ) 442 }) 443 floatValueObs = batch.NewFloat64ValueObserver("float.valueobserver.lastvalue") 444 intValueObs = batch.NewInt64ValueObserver("int.valueobserver.lastvalue") 445 floatSumObs = batch.NewFloat64SumObserver("float.sumobserver.sum") 446 intSumObs = batch.NewInt64SumObserver("int.sumobserver.sum") 447 floatUpDownSumObs = batch.NewFloat64UpDownSumObserver("float.updownsumobserver.sum") 448 intUpDownSumObs = batch.NewInt64UpDownSumObserver("int.updownsumobserver.sum") 449 450 collected := sdk.Collect(ctx) 451 452 require.Equal(t, collected, len(processor.accumulations)) 453 454 out := processortest.NewOutput(label.DefaultEncoder()) 455 for _, rec := range processor.accumulations { 456 require.NoError(t, out.AddAccumulation(rec)) 457 } 458 require.EqualValues(t, map[string]float64{ 459 "float.sumobserver.sum//R=V": 1.1, 460 "float.sumobserver.sum/A=B/R=V": 1000, 461 "int.sumobserver.sum//R=V": 10, 462 "int.sumobserver.sum/A=B/R=V": 100, 463 464 "int.updownsumobserver.sum/A=B/R=V": -100, 465 "float.updownsumobserver.sum/A=B/R=V": -1000, 466 "int.updownsumobserver.sum//R=V": 10, 467 "float.updownsumobserver.sum/C=D/R=V": -1, 468 469 "float.valueobserver.lastvalue/A=B/R=V": -1, 470 "float.valueobserver.lastvalue/C=D/R=V": -1, 471 "int.valueobserver.lastvalue//R=V": 1, 472 "int.valueobserver.lastvalue/A=B/R=V": 1, 473 }, out.Map()) 474} 475 476func TestRecordBatch(t *testing.T) { 477 ctx := context.Background() 478 meter, sdk, processor := newSDK(t) 479 480 counter1 := Must(meter).NewInt64Counter("int64.sum") 481 counter2 := Must(meter).NewFloat64Counter("float64.sum") 482 valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.exact") 483 valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.exact") 484 485 sdk.RecordBatch( 486 ctx, 487 []label.KeyValue{ 488 label.String("A", "B"), 489 label.String("C", "D"), 490 }, 491 counter1.Measurement(1), 492 counter2.Measurement(2), 493 valuerecorder1.Measurement(3), 494 valuerecorder2.Measurement(4), 495 ) 496 497 sdk.Collect(ctx) 498 499 out := processortest.NewOutput(label.DefaultEncoder()) 500 for _, rec := range processor.accumulations { 501 require.NoError(t, out.AddAccumulation(rec)) 502 } 503 require.EqualValues(t, map[string]float64{ 504 "int64.sum/A=B,C=D/R=V": 1, 505 "float64.sum/A=B,C=D/R=V": 2, 506 "int64.exact/A=B,C=D/R=V": 3, 507 "float64.exact/A=B,C=D/R=V": 4, 508 }, out.Map()) 509} 510 511// TestRecordPersistence ensures that a direct-called instrument that 512// is repeatedly used each interval results in a persistent record, so 513// that its encoded labels will be cached across collection intervals. 514func TestRecordPersistence(t *testing.T) { 515 ctx := context.Background() 516 meter, sdk, processor := newSDK(t) 517 518 c := Must(meter).NewFloat64Counter("name.sum") 519 b := c.Bind(label.String("bound", "true")) 520 uk := label.String("bound", "false") 521 522 for i := 0; i < 100; i++ { 523 c.Add(ctx, 1, uk) 524 b.Add(ctx, 1) 525 sdk.Collect(ctx) 526 } 527 528 require.Equal(t, 4, processor.newAggCount) 529} 530 531func TestIncorrectInstruments(t *testing.T) { 532 // The Batch observe/record APIs are susceptible to 533 // uninitialized instruments. 534 var counter metric.Int64Counter 535 var observer metric.Int64ValueObserver 536 537 ctx := context.Background() 538 meter, sdk, _ := newSDK(t) 539 540 // Now try with uninitialized instruments. 541 meter.RecordBatch(ctx, nil, counter.Measurement(1)) 542 meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { 543 result.Observe(nil, observer.Observation(1)) 544 }) 545 546 collected := sdk.Collect(ctx) 547 require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush()) 548 require.Equal(t, 0, collected) 549 550 // Now try with instruments from another SDK. 551 var noopMeter metric.Meter 552 counter = metric.Must(noopMeter).NewInt64Counter("name.sum") 553 observer = metric.Must(noopMeter).NewBatchObserver( 554 func(context.Context, metric.BatchObserverResult) {}, 555 ).NewInt64ValueObserver("observer") 556 557 meter.RecordBatch(ctx, nil, counter.Measurement(1)) 558 meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { 559 result.Observe(nil, observer.Observation(1)) 560 }) 561 562 collected = sdk.Collect(ctx) 563 require.Equal(t, 0, collected) 564 require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush()) 565} 566 567func TestSyncInAsync(t *testing.T) { 568 ctx := context.Background() 569 meter, sdk, processor := newSDK(t) 570 571 counter := Must(meter).NewFloat64Counter("counter.sum") 572 _ = Must(meter).NewInt64ValueObserver("observer.lastvalue", 573 func(ctx context.Context, result metric.Int64ObserverResult) { 574 result.Observe(10) 575 counter.Add(ctx, 100) 576 }, 577 ) 578 579 sdk.Collect(ctx) 580 581 out := processortest.NewOutput(label.DefaultEncoder()) 582 for _, rec := range processor.accumulations { 583 require.NoError(t, out.AddAccumulation(rec)) 584 } 585 require.EqualValues(t, map[string]float64{ 586 "counter.sum//R=V": 100, 587 "observer.lastvalue//R=V": 10, 588 }, out.Map()) 589} 590