1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package processortest // import "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 16 17import ( 18 "context" 19 "fmt" 20 "strings" 21 "sync" 22 "time" 23 24 "go.opentelemetry.io/otel/attribute" 25 "go.opentelemetry.io/otel/metric" 26 "go.opentelemetry.io/otel/metric/number" 27 export "go.opentelemetry.io/otel/sdk/export/metric" 28 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 29 "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" 30 "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" 31 "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" 32 "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" 33 "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" 34 "go.opentelemetry.io/otel/sdk/resource" 35) 36 37type ( 38 // mapKey is the unique key for a metric, consisting of its 39 // unique descriptor, distinct labels, and distinct resource 40 // attributes. 41 mapKey struct { 42 desc *metric.Descriptor 43 labels attribute.Distinct 44 resource attribute.Distinct 45 } 46 47 // mapValue is value stored in a processor used to produce a 48 // CheckpointSet. 49 mapValue struct { 50 labels *attribute.Set 51 resource *resource.Resource 52 aggregator export.Aggregator 53 } 54 55 // Output implements export.CheckpointSet. 56 Output struct { 57 m map[mapKey]mapValue 58 labelEncoder attribute.Encoder 59 sync.RWMutex 60 } 61 62 // testAggregatorSelector returns aggregators consistent with 63 // the test variables below, needed for testing stateful 64 // processors, which clone Aggregators using AggregatorFor(desc). 65 testAggregatorSelector struct{} 66 67 // testCheckpointer is a export.Checkpointer. 68 testCheckpointer struct { 69 started int 70 finished int 71 *Processor 72 } 73 74 // Processor is a testing implementation of export.Processor that 75 // assembles its results as a map[string]float64. 76 Processor struct { 77 export.AggregatorSelector 78 output *Output 79 } 80 81 // Exporter is a testing implementation of export.Exporter that 82 // assembles its results as a map[string]float64. 83 Exporter struct { 84 export.ExportKindSelector 85 output *Output 86 exportCount int 87 88 // InjectErr supports returning conditional errors from 89 // the Export() routine. This must be set before the 90 // Exporter is first used. 91 InjectErr func(export.Record) error 92 } 93) 94 95// NewProcessor returns a new testing Processor implementation. 96// Verify expected outputs using Values(), e.g.: 97// 98// require.EqualValues(t, map[string]float64{ 99// "counter.sum/A=1,B=2/R=V": 100, 100// }, processor.Values()) 101// 102// Where in the example A=1,B=2 is the encoded labels and R=V is the 103// encoded resource value. 104func NewProcessor(selector export.AggregatorSelector, encoder attribute.Encoder) *Processor { 105 return &Processor{ 106 AggregatorSelector: selector, 107 output: NewOutput(encoder), 108 } 109} 110 111// Process implements export.Processor. 112func (p *Processor) Process(accum export.Accumulation) error { 113 return p.output.AddAccumulation(accum) 114} 115 116// Values returns the mapping from label set to point values for the 117// accumulations that were processed. Point values are chosen as 118// either the Sum or the LastValue, whichever is implemented. (All 119// the built-in Aggregators implement one of these interfaces.) 120func (p *Processor) Values() map[string]float64 { 121 return p.output.Map() 122} 123 124// Checkpointer returns a checkpointer that computes a single 125// interval. 126func Checkpointer(p *Processor) export.Checkpointer { 127 return &testCheckpointer{ 128 Processor: p, 129 } 130} 131 132// StartCollection implements export.Checkpointer. 133func (c *testCheckpointer) StartCollection() { 134 if c.started != c.finished { 135 panic(fmt.Sprintf("collection was already started: %d != %d", c.started, c.finished)) 136 } 137 138 c.started++ 139} 140 141// FinishCollection implements export.Checkpointer. 142func (c *testCheckpointer) FinishCollection() error { 143 if c.started-1 != c.finished { 144 return fmt.Errorf("collection was not started: %d != %d", c.started, c.finished) 145 } 146 147 c.finished++ 148 return nil 149} 150 151// CheckpointSet implements export.Checkpointer. 152func (c *testCheckpointer) CheckpointSet() export.CheckpointSet { 153 return c.Processor.output 154} 155 156// AggregatorSelector returns a policy that is consistent with the 157// test descriptors above. I.e., it returns sum.New() for counter 158// instruments and lastvalue.New() for lastValue instruments. 159func AggregatorSelector() export.AggregatorSelector { 160 return testAggregatorSelector{} 161} 162 163// AggregatorFor implements export.AggregatorSelector. 164func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { 165 166 switch { 167 case strings.HasSuffix(desc.Name(), ".disabled"): 168 for i := range aggPtrs { 169 *aggPtrs[i] = nil 170 } 171 case strings.HasSuffix(desc.Name(), ".sum"): 172 aggs := sum.New(len(aggPtrs)) 173 for i := range aggPtrs { 174 *aggPtrs[i] = &aggs[i] 175 } 176 case strings.HasSuffix(desc.Name(), ".minmaxsumcount"): 177 aggs := minmaxsumcount.New(len(aggPtrs), desc) 178 for i := range aggPtrs { 179 *aggPtrs[i] = &aggs[i] 180 } 181 case strings.HasSuffix(desc.Name(), ".lastvalue"): 182 aggs := lastvalue.New(len(aggPtrs)) 183 for i := range aggPtrs { 184 *aggPtrs[i] = &aggs[i] 185 } 186 case strings.HasSuffix(desc.Name(), ".histogram"): 187 aggs := histogram.New(len(aggPtrs), desc) 188 for i := range aggPtrs { 189 *aggPtrs[i] = &aggs[i] 190 } 191 case strings.HasSuffix(desc.Name(), ".exact"): 192 aggs := exact.New(len(aggPtrs)) 193 for i := range aggPtrs { 194 *aggPtrs[i] = &aggs[i] 195 } 196 default: 197 panic(fmt.Sprint("Invalid instrument name for test AggregatorSelector: ", desc.Name())) 198 } 199} 200 201// NewOutput is a helper for testing an expected set of Accumulations 202// (from an Accumulator) or an expected set of Records (from a 203// Processor). If testing with an Accumulator, it may be simpler to 204// use the test Processor in this package. 205func NewOutput(labelEncoder attribute.Encoder) *Output { 206 return &Output{ 207 m: make(map[mapKey]mapValue), 208 labelEncoder: labelEncoder, 209 } 210} 211 212// ForEach implements export.CheckpointSet. 213func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) error) error { 214 for key, value := range o.m { 215 if err := ff(export.NewRecord( 216 key.desc, 217 value.labels, 218 value.resource, 219 value.aggregator.Aggregation(), 220 time.Time{}, 221 time.Time{}, 222 )); err != nil { 223 return err 224 } 225 } 226 return nil 227} 228 229// AddRecord adds a string representation of the exported metric data 230// to a map for use in testing. The value taken from the record is 231// either the Sum() or the LastValue() of its Aggregation(), whichever 232// is defined. Record timestamps are ignored. 233func (o *Output) AddRecord(rec export.Record) error { 234 key := mapKey{ 235 desc: rec.Descriptor(), 236 labels: rec.Labels().Equivalent(), 237 resource: rec.Resource().Equivalent(), 238 } 239 if _, ok := o.m[key]; !ok { 240 var agg export.Aggregator 241 testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) 242 o.m[key] = mapValue{ 243 aggregator: agg, 244 labels: rec.Labels(), 245 resource: rec.Resource(), 246 } 247 } 248 return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) 249} 250 251// Map returns the calculated values for test validation from a set of 252// Accumulations or a set of Records. When mapping records or 253// accumulations into floating point values, the Sum() or LastValue() 254// is chosen, whichever is implemented by the underlying Aggregator. 255func (o *Output) Map() map[string]float64 { 256 r := make(map[string]float64) 257 err := o.ForEach(export.StatelessExportKindSelector(), func(record export.Record) error { 258 for key, entry := range o.m { 259 encoded := entry.labels.Encoded(o.labelEncoder) 260 rencoded := entry.resource.Encoded(o.labelEncoder) 261 value := 0.0 262 if s, ok := entry.aggregator.(aggregation.Sum); ok { 263 sum, _ := s.Sum() 264 value = sum.CoerceToFloat64(key.desc.NumberKind()) 265 } else if l, ok := entry.aggregator.(aggregation.LastValue); ok { 266 last, _, _ := l.LastValue() 267 value = last.CoerceToFloat64(key.desc.NumberKind()) 268 } else if l, ok := entry.aggregator.(aggregation.Points); ok { 269 pts, _ := l.Points() 270 var sum number.Number 271 for _, s := range pts { 272 sum.AddNumber(key.desc.NumberKind(), s.Number) 273 } 274 value = sum.CoerceToFloat64(key.desc.NumberKind()) 275 } else { 276 panic(fmt.Sprintf("Unhandled aggregator type: %T", entry.aggregator)) 277 } 278 name := fmt.Sprint(key.desc.Name(), "/", encoded, "/", rencoded) 279 r[name] = value 280 } 281 return nil 282 }) 283 if err != nil { 284 panic(fmt.Sprint("Unexpected processor error: ", err)) 285 } 286 return r 287} 288 289// Reset restores the Output to its initial state, with no accumulated 290// metric data. 291func (o *Output) Reset() { 292 o.m = map[mapKey]mapValue{} 293} 294 295// AddAccumulation adds a string representation of the exported metric 296// data to a map for use in testing. The value taken from the 297// accumulation is either the Sum() or the LastValue() of its 298// Aggregator().Aggregation(), whichever is defined. 299func (o *Output) AddAccumulation(acc export.Accumulation) error { 300 return o.AddRecord( 301 export.NewRecord( 302 acc.Descriptor(), 303 acc.Labels(), 304 acc.Resource(), 305 acc.Aggregator().Aggregation(), 306 time.Time{}, 307 time.Time{}, 308 ), 309 ) 310} 311 312// NewExporter returns a new testing Exporter implementation. 313// Verify exporter outputs using Values(), e.g.,: 314// 315// require.EqualValues(t, map[string]float64{ 316// "counter.sum/A=1,B=2/R=V": 100, 317// }, exporter.Values()) 318// 319// Where in the example A=1,B=2 is the encoded labels and R=V is the 320// encoded resource value. 321func NewExporter(selector export.ExportKindSelector, encoder attribute.Encoder) *Exporter { 322 return &Exporter{ 323 ExportKindSelector: selector, 324 output: NewOutput(encoder), 325 } 326} 327 328func (e *Exporter) Export(_ context.Context, ckpt export.CheckpointSet) error { 329 e.output.Lock() 330 defer e.output.Unlock() 331 e.exportCount++ 332 return ckpt.ForEach(e.ExportKindSelector, func(r export.Record) error { 333 if e.InjectErr != nil { 334 if err := e.InjectErr(r); err != nil { 335 return err 336 } 337 } 338 return e.output.AddRecord(r) 339 }) 340} 341 342// Values returns the mapping from label set to point values for the 343// accumulations that were processed. Point values are chosen as 344// either the Sum or the LastValue, whichever is implemented. (All 345// the built-in Aggregators implement one of these interfaces.) 346func (e *Exporter) Values() map[string]float64 { 347 e.output.Lock() 348 defer e.output.Unlock() 349 return e.output.Map() 350} 351 352// ExportCount returns the number of times Export() has been called 353// since the last Reset(). 354func (e *Exporter) ExportCount() int { 355 e.output.Lock() 356 defer e.output.Unlock() 357 return e.exportCount 358} 359 360// Reset sets the exporter's output to the initial, empty state and 361// resets the export count to zero. 362func (e *Exporter) Reset() { 363 e.output.Lock() 364 defer e.output.Unlock() 365 e.output.Reset() 366 e.exportCount = 0 367} 368