1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package basic_test 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "strings" 22 "testing" 23 "time" 24 25 "github.com/stretchr/testify/require" 26 27 "go.opentelemetry.io/otel/attribute" 28 "go.opentelemetry.io/otel/metric" 29 "go.opentelemetry.io/otel/metric/metrictest" 30 "go.opentelemetry.io/otel/metric/number" 31 "go.opentelemetry.io/otel/metric/sdkapi" 32 export "go.opentelemetry.io/otel/sdk/export/metric" 33 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 34 "go.opentelemetry.io/otel/sdk/instrumentation" 35 sdk "go.opentelemetry.io/otel/sdk/metric" 36 "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" 37 "go.opentelemetry.io/otel/sdk/metric/processor/basic" 38 "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 39 processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 40 "go.opentelemetry.io/otel/sdk/resource" 41) 42 43func requireNotAfter(t *testing.T, t1, t2 time.Time) { 44 require.False(t, t1.After(t2), "expected %v ≤ %v", t1, t2) 45} 46 47// TestProcessor tests all the non-error paths in this package. 48func TestProcessor(t *testing.T) { 49 type exportCase struct { 50 kind aggregation.Temporality 51 } 52 type instrumentCase struct { 53 kind sdkapi.InstrumentKind 54 } 55 type numberCase struct { 56 kind number.Kind 57 } 58 type aggregatorCase struct { 59 kind aggregation.Kind 60 } 61 62 for _, tc := range []exportCase{ 63 {kind: aggregation.CumulativeTemporality}, 64 {kind: aggregation.DeltaTemporality}, 65 } { 66 t.Run(tc.kind.String(), func(t *testing.T) { 67 for _, ic := range []instrumentCase{ 68 {kind: sdkapi.CounterInstrumentKind}, 69 {kind: sdkapi.UpDownCounterInstrumentKind}, 70 {kind: sdkapi.HistogramInstrumentKind}, 71 {kind: sdkapi.CounterObserverInstrumentKind}, 72 {kind: sdkapi.UpDownCounterObserverInstrumentKind}, 73 {kind: sdkapi.GaugeObserverInstrumentKind}, 74 } { 75 t.Run(ic.kind.String(), func(t *testing.T) { 76 for _, nc := range []numberCase{ 77 {kind: number.Int64Kind}, 78 {kind: number.Float64Kind}, 79 } { 80 t.Run(nc.kind.String(), func(t *testing.T) { 81 for _, ac := range []aggregatorCase{ 82 {kind: aggregation.SumKind}, 83 {kind: aggregation.MinMaxSumCountKind}, 84 {kind: aggregation.HistogramKind}, 85 {kind: aggregation.LastValueKind}, 86 {kind: aggregation.ExactKind}, 87 } { 88 t.Run(ac.kind.String(), func(t *testing.T) { 89 testProcessor( 90 t, 91 tc.kind, 92 ic.kind, 93 nc.kind, 94 ac.kind, 95 ) 96 }) 97 } 98 }) 99 } 100 }) 101 } 102 }) 103 } 104} 105 106func asNumber(nkind number.Kind, value int64) number.Number { 107 if nkind == number.Int64Kind { 108 return number.NewInt64Number(value) 109 } 110 return number.NewFloat64Number(float64(value)) 111} 112 113func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation { 114 ls := attribute.NewSet(labs...) 115 var agg export.Aggregator 116 selector.AggregatorFor(desc, &agg) 117 require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) 118 119 return export.NewAccumulation(desc, &ls, agg) 120} 121 122func testProcessor( 123 t *testing.T, 124 aggTemp aggregation.Temporality, 125 mkind sdkapi.InstrumentKind, 126 nkind number.Kind, 127 akind aggregation.Kind, 128) { 129 // Note: this selector uses the instrument name to dictate 130 // aggregation kind. 131 selector := processorTest.AggregatorSelector() 132 133 labs1 := []attribute.KeyValue{attribute.String("L1", "V")} 134 labs2 := []attribute.KeyValue{attribute.String("L2", "V")} 135 136 testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { 137 processor := basic.New(selector, aggregation.ConstantTemporalitySelector(aggTemp), basic.WithMemory(hasMemory)) 138 139 instSuffix := fmt.Sprint(".", strings.ToLower(akind.String())) 140 141 desc1 := metrictest.NewDescriptor(fmt.Sprint("inst1", instSuffix), mkind, nkind) 142 desc2 := metrictest.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind) 143 144 for nc := 0; nc < nCheckpoint; nc++ { 145 146 // The input is 10 per update, scaled by 147 // the number of checkpoints for 148 // cumulative instruments: 149 input := int64(10) 150 cumulativeMultiplier := int64(nc + 1) 151 if mkind.PrecomputedSum() { 152 input *= cumulativeMultiplier 153 } 154 155 processor.StartCollection() 156 157 for na := 0; na < nAccum; na++ { 158 _ = processor.Process(updateFor(t, &desc1, selector, input, labs1...)) 159 _ = processor.Process(updateFor(t, &desc2, selector, input, labs2...)) 160 } 161 162 err := processor.FinishCollection() 163 if err == aggregation.ErrNoSubtraction { 164 var subr export.Aggregator 165 selector.AggregatorFor(&desc1, &subr) 166 _, canSub := subr.(export.Subtractor) 167 168 // Allow unsupported subraction case only when it is called for. 169 require.True(t, mkind.PrecomputedSum() && aggTemp == aggregation.DeltaTemporality && !canSub) 170 return 171 } else if err != nil { 172 t.Fatal("unexpected FinishCollection error: ", err) 173 } 174 175 if nc < nCheckpoint-1 { 176 continue 177 } 178 179 reader := processor.Reader() 180 181 for _, repetitionAfterEmptyInterval := range []bool{false, true} { 182 if repetitionAfterEmptyInterval { 183 // We're repeating the test after another 184 // interval with no updates. 185 processor.StartCollection() 186 if err := processor.FinishCollection(); err != nil { 187 t.Fatal("unexpected collection error: ", err) 188 } 189 } 190 191 // Test the final checkpoint state. 192 records1 := processorTest.NewOutput(attribute.DefaultEncoder()) 193 err = reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord) 194 195 // Test for an allowed error: 196 if err != nil && err != aggregation.ErrNoSubtraction { 197 t.Fatal("unexpected checkpoint error: ", err) 198 } 199 var multiplier int64 200 201 if mkind.Asynchronous() { 202 // Asynchronous tests accumulate results multiply by the 203 // number of Accumulators, unless LastValue aggregation. 204 // If a precomputed sum, we expect cumulative inputs. 205 if mkind.PrecomputedSum() { 206 if aggTemp == aggregation.DeltaTemporality && akind != aggregation.LastValueKind { 207 multiplier = int64(nAccum) 208 } else if akind == aggregation.LastValueKind { 209 multiplier = cumulativeMultiplier 210 } else { 211 multiplier = cumulativeMultiplier * int64(nAccum) 212 } 213 } else { 214 if aggTemp == aggregation.CumulativeTemporality && akind != aggregation.LastValueKind { 215 multiplier = cumulativeMultiplier * int64(nAccum) 216 } else if akind == aggregation.LastValueKind { 217 multiplier = 1 218 } else { 219 multiplier = int64(nAccum) 220 } 221 } 222 } else { 223 // Synchronous accumulate results from multiple accumulators, 224 // use that number as the baseline multiplier. 225 multiplier = int64(nAccum) 226 if aggTemp == aggregation.CumulativeTemporality { 227 // If a cumulative exporter, include prior checkpoints. 228 multiplier *= cumulativeMultiplier 229 } 230 if akind == aggregation.LastValueKind { 231 // If a last-value aggregator, set multiplier to 1.0. 232 multiplier = 1 233 } 234 } 235 236 exp := map[string]float64{} 237 if hasMemory || !repetitionAfterEmptyInterval { 238 exp = map[string]float64{ 239 fmt.Sprintf("inst1%s/L1=V/", instSuffix): float64(multiplier * 10), // labels1 240 fmt.Sprintf("inst2%s/L2=V/", instSuffix): float64(multiplier * 10), // labels2 241 } 242 } 243 244 require.EqualValues(t, exp, records1.Map(), "with repetition=%v", repetitionAfterEmptyInterval) 245 } 246 } 247 } 248 249 for _, hasMem := range []bool{false, true} { 250 t.Run(fmt.Sprintf("HasMemory=%v", hasMem), func(t *testing.T) { 251 // For 1 to 3 checkpoints: 252 for nAccum := 1; nAccum <= 3; nAccum++ { 253 t.Run(fmt.Sprintf("NumAccum=%d", nAccum), func(t *testing.T) { 254 // For 1 to 3 accumulators: 255 for nCheckpoint := 1; nCheckpoint <= 3; nCheckpoint++ { 256 t.Run(fmt.Sprintf("NumCkpt=%d", nCheckpoint), func(t *testing.T) { 257 testBody(t, hasMem, nAccum, nCheckpoint) 258 }) 259 } 260 }) 261 } 262 }) 263 } 264} 265 266type bogusExporter struct{} 267 268func (bogusExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality { 269 return 100 270} 271 272func (bogusExporter) Export(context.Context, export.Reader) error { 273 panic("Not called") 274} 275 276func TestBasicInconsistent(t *testing.T) { 277 // Test double-start 278 b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 279 280 b.StartCollection() 281 b.StartCollection() 282 require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) 283 284 // Test finish without start 285 b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 286 287 require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) 288 289 // Test no finish 290 b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 291 292 b.StartCollection() 293 require.Equal( 294 t, 295 basic.ErrInconsistentState, 296 b.ForEach( 297 aggregation.StatelessTemporalitySelector(), 298 func(export.Record) error { return nil }, 299 ), 300 ) 301 302 // Test no start 303 b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 304 305 desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind) 306 accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{}) 307 require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) 308 309 // Test invalid kind: 310 b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 311 b.StartCollection() 312 require.NoError(t, b.Process(accum)) 313 require.NoError(t, b.FinishCollection()) 314 315 err := b.ForEach( 316 bogusExporter{}, 317 func(export.Record) error { return nil }, 318 ) 319 require.True(t, errors.Is(err, basic.ErrInvalidTemporality)) 320 321} 322 323func TestBasicTimestamps(t *testing.T) { 324 beforeNew := time.Now() 325 time.Sleep(time.Nanosecond) 326 b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) 327 time.Sleep(time.Nanosecond) 328 afterNew := time.Now() 329 330 desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind) 331 accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{}) 332 333 b.StartCollection() 334 _ = b.Process(accum) 335 require.NoError(t, b.FinishCollection()) 336 337 var start1, end1 time.Time 338 339 require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error { 340 start1 = rec.StartTime() 341 end1 = rec.EndTime() 342 return nil 343 })) 344 345 // The first start time is set in the constructor. 346 requireNotAfter(t, beforeNew, start1) 347 requireNotAfter(t, start1, afterNew) 348 349 for i := 0; i < 2; i++ { 350 b.StartCollection() 351 require.NoError(t, b.Process(accum)) 352 require.NoError(t, b.FinishCollection()) 353 354 var start2, end2 time.Time 355 356 require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error { 357 start2 = rec.StartTime() 358 end2 = rec.EndTime() 359 return nil 360 })) 361 362 // Subsequent intervals have their start and end aligned. 363 require.Equal(t, start2, end1) 364 requireNotAfter(t, start1, end1) 365 requireNotAfter(t, start2, end2) 366 367 start1 = start2 368 end1 = end2 369 } 370} 371 372func TestStatefulNoMemoryCumulative(t *testing.T) { 373 aggTempSel := aggregation.CumulativeTemporalitySelector() 374 375 desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind) 376 selector := processorTest.AggregatorSelector() 377 378 processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) 379 reader := processor.Reader() 380 381 for i := 1; i < 3; i++ { 382 // Empty interval 383 processor.StartCollection() 384 require.NoError(t, processor.FinishCollection()) 385 386 // Verify zero elements 387 records := processorTest.NewOutput(attribute.DefaultEncoder()) 388 require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) 389 require.EqualValues(t, map[string]float64{}, records.Map()) 390 391 // Add 10 392 processor.StartCollection() 393 _ = processor.Process(updateFor(t, &desc, selector, 10, attribute.String("A", "B"))) 394 require.NoError(t, processor.FinishCollection()) 395 396 // Verify one element 397 records = processorTest.NewOutput(attribute.DefaultEncoder()) 398 require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) 399 require.EqualValues(t, map[string]float64{ 400 "inst.sum/A=B/": float64(i * 10), 401 }, records.Map()) 402 } 403} 404 405func TestStatefulNoMemoryDelta(t *testing.T) { 406 aggTempSel := aggregation.DeltaTemporalitySelector() 407 408 desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) 409 selector := processorTest.AggregatorSelector() 410 411 processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) 412 reader := processor.Reader() 413 414 for i := 1; i < 3; i++ { 415 // Empty interval 416 processor.StartCollection() 417 require.NoError(t, processor.FinishCollection()) 418 419 // Verify zero elements 420 records := processorTest.NewOutput(attribute.DefaultEncoder()) 421 require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) 422 require.EqualValues(t, map[string]float64{}, records.Map()) 423 424 // Add 10 425 processor.StartCollection() 426 _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) 427 require.NoError(t, processor.FinishCollection()) 428 429 // Verify one element 430 records = processorTest.NewOutput(attribute.DefaultEncoder()) 431 require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) 432 require.EqualValues(t, map[string]float64{ 433 "inst.sum/A=B/": 10, 434 }, records.Map()) 435 } 436} 437 438func TestMultiObserverSum(t *testing.T) { 439 for _, aggTempSel := range []aggregation.TemporalitySelector{ 440 aggregation.CumulativeTemporalitySelector(), 441 aggregation.DeltaTemporalitySelector(), 442 } { 443 444 desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) 445 selector := processorTest.AggregatorSelector() 446 447 processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) 448 reader := processor.Reader() 449 450 for i := 1; i < 3; i++ { 451 // Add i*10*3 times 452 processor.StartCollection() 453 _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) 454 _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) 455 _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) 456 require.NoError(t, processor.FinishCollection()) 457 458 // Multiplier is 1 for deltas, otherwise i. 459 multiplier := i 460 if aggTempSel.TemporalityFor(&desc, aggregation.SumKind) == aggregation.DeltaTemporality { 461 multiplier = 1 462 } 463 464 // Verify one element 465 records := processorTest.NewOutput(attribute.DefaultEncoder()) 466 require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) 467 require.EqualValues(t, map[string]float64{ 468 "observe.sum/A=B/": float64(3 * 10 * multiplier), 469 }, records.Map()) 470 } 471 } 472} 473 474func TestCounterObserverEndToEnd(t *testing.T) { 475 ctx := context.Background() 476 eselector := aggregation.CumulativeTemporalitySelector() 477 proc := basic.New( 478 processorTest.AggregatorSelector(), 479 eselector, 480 ) 481 accum := sdk.NewAccumulator(proc) 482 meter := metric.WrapMeterImpl(accum) 483 484 var calls int64 485 metric.Must(meter).NewInt64CounterObserver("observer.sum", 486 func(_ context.Context, result metric.Int64ObserverResult) { 487 calls++ 488 result.Observe(calls) 489 }, 490 ) 491 reader := proc.Reader() 492 493 var startTime [3]time.Time 494 var endTime [3]time.Time 495 496 for i := range startTime { 497 data := proc.Reader() 498 data.Lock() 499 proc.StartCollection() 500 accum.Collect(ctx) 501 require.NoError(t, proc.FinishCollection()) 502 503 exporter := processortest.New(eselector, attribute.DefaultEncoder()) 504 require.NoError(t, exporter.Export(ctx, resource.Empty(), processortest.OneInstrumentationLibraryReader( 505 instrumentation.Library{ 506 Name: "test", 507 }, reader))) 508 509 require.EqualValues(t, map[string]float64{ 510 "observer.sum//": float64(i + 1), 511 }, exporter.Values()) 512 513 var record export.Record 514 require.NoError(t, data.ForEach(eselector, func(r export.Record) error { 515 record = r 516 return nil 517 })) 518 519 startTime[i] = record.StartTime() 520 endTime[i] = record.EndTime() 521 data.Unlock() 522 } 523 524 require.Equal(t, startTime[0], startTime[1]) 525 require.Equal(t, startTime[0], startTime[2]) 526 requireNotAfter(t, endTime[0], endTime[1]) 527 requireNotAfter(t, endTime[1], endTime[2]) 528} 529