1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer" 16 17import ( 18 "go.opentelemetry.io/otel/attribute" 19 "go.opentelemetry.io/otel/metric" 20 export "go.opentelemetry.io/otel/sdk/export/metric" 21) 22 23type ( 24 // Processor implements "dimensionality reduction" by 25 // filtering keys from export label sets. 26 Processor struct { 27 export.Checkpointer 28 filterSelector LabelFilterSelector 29 } 30 31 // LabelFilterSelector is the interface used to configure a 32 // specific Filter to an instrument. 33 LabelFilterSelector interface { 34 LabelFilterFor(descriptor *metric.Descriptor) attribute.Filter 35 } 36) 37 38var _ export.Processor = &Processor{} 39var _ export.Checkpointer = &Processor{} 40 41// New returns a dimensionality-reducing Processor that passes data to 42// the next stage in an export pipeline. 43func New(filterSelector LabelFilterSelector, ckpter export.Checkpointer) *Processor { 44 return &Processor{ 45 Checkpointer: ckpter, 46 filterSelector: filterSelector, 47 } 48} 49 50// Process implements export.Processor. 51func (p *Processor) Process(accum export.Accumulation) error { 52 // Note: the removed labels are returned and ignored here. 53 // Conceivably these inputs could be useful to a sampler. 54 reduced, _ := accum.Labels().Filter( 55 p.filterSelector.LabelFilterFor( 56 accum.Descriptor(), 57 ), 58 ) 59 return p.Checkpointer.Process( 60 export.NewAccumulation( 61 accum.Descriptor(), 62 &reduced, 63 accum.Resource(), 64 accum.Aggregator(), 65 ), 66 ) 67} 68