1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic" 16 17import ( 18 "errors" 19 "fmt" 20 "sync" 21 "time" 22 23 "go.opentelemetry.io/otel/api/metric" 24 "go.opentelemetry.io/otel/label" 25 export "go.opentelemetry.io/otel/sdk/export/metric" 26 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 27 "go.opentelemetry.io/otel/sdk/resource" 28) 29 30type ( 31 Processor struct { 32 export.ExportKindSelector 33 export.AggregatorSelector 34 35 state 36 } 37 38 stateKey struct { 39 // TODO: This code is organized to support multiple 40 // accumulators which could theoretically produce the 41 // data for the same instrument with the same 42 // resources, and this code has logic to combine data 43 // properly from multiple accumulators. However, the 44 // use of *metric.Descriptor in the stateKey makes 45 // such combination impossible, because each 46 // accumulator allocates its own instruments. This 47 // can be fixed by using the instrument name and kind 48 // instead of the descriptor pointer. See 49 // https://github.com/open-telemetry/opentelemetry-go/issues/862. 50 descriptor *metric.Descriptor 51 distinct label.Distinct 52 resource label.Distinct 53 } 54 55 stateValue struct { 56 // labels corresponds to the stateKey.distinct field. 57 labels *label.Set 58 59 // resource corresponds to the stateKey.resource field. 60 resource *resource.Resource 61 62 // updated indicates the last sequence number when this value had 63 // Process() called by an accumulator. 64 updated int64 65 66 // stateful indicates that a cumulative aggregation is 67 // being maintained, taken from the process start time. 68 stateful bool 69 70 // currentOwned indicates that "current" was allocated 71 // by the processor in order to merge results from 72 // multiple Accumulators during a single collection 73 // round, which may happen either because: 74 // (1) multiple Accumulators output the same Accumulation. 75 // (2) one Accumulator is configured with dimensionality reduction. 76 currentOwned bool 77 78 // current refers to the output from a single Accumulator 79 // (if !currentOwned) or it refers to an Aggregator 80 // owned by the processor used to accumulate multiple 81 // values in a single collection round. 82 current export.Aggregator 83 84 // delta, if non-nil, refers to an Aggregator owned by 85 // the processor used to compute deltas between 86 // precomputed sums. 87 delta export.Aggregator 88 89 // cumulative, if non-nil, refers to an Aggregator owned 90 // by the processor used to store the last cumulative 91 // value. 92 cumulative export.Aggregator 93 } 94 95 state struct { 96 config Config 97 98 // RWMutex implements locking for the `CheckpointSet` interface. 99 sync.RWMutex 100 values map[stateKey]*stateValue 101 102 // Note: the timestamp logic currently assumes all 103 // exports are deltas. 104 105 processStart time.Time 106 intervalStart time.Time 107 intervalEnd time.Time 108 109 // startedCollection and finishedCollection are the 110 // number of StartCollection() and FinishCollection() 111 // calls, used to ensure that the sequence of starts 112 // and finishes are correctly balanced. 113 114 startedCollection int64 115 finishedCollection int64 116 } 117) 118 119var _ export.Processor = &Processor{} 120var _ export.Checkpointer = &Processor{} 121var _ export.CheckpointSet = &state{} 122var ErrInconsistentState = fmt.Errorf("inconsistent processor state") 123var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind") 124 125// New returns a basic Processor that is also a Checkpointer using the provided 126// AggregatorSelector to select Aggregators. The ExportKindSelector 127// is consulted to determine the kind(s) of exporter that will consume 128// data, so that this Processor can prepare to compute Delta or 129// Cumulative Aggregations as needed. 130func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor { 131 now := time.Now() 132 p := &Processor{ 133 AggregatorSelector: aselector, 134 ExportKindSelector: eselector, 135 state: state{ 136 values: map[stateKey]*stateValue{}, 137 processStart: now, 138 intervalStart: now, 139 }, 140 } 141 for _, opt := range opts { 142 opt.ApplyProcessor(&p.config) 143 } 144 return p 145} 146 147// Process implements export.Processor. 148func (b *Processor) Process(accum export.Accumulation) error { 149 if b.startedCollection != b.finishedCollection+1 { 150 return ErrInconsistentState 151 } 152 desc := accum.Descriptor() 153 key := stateKey{ 154 descriptor: desc, 155 distinct: accum.Labels().Equivalent(), 156 resource: accum.Resource().Equivalent(), 157 } 158 agg := accum.Aggregator() 159 160 // Check if there is an existing value. 161 value, ok := b.state.values[key] 162 if !ok { 163 stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.MetricKind()) 164 165 newValue := &stateValue{ 166 labels: accum.Labels(), 167 resource: accum.Resource(), 168 updated: b.state.finishedCollection, 169 stateful: stateful, 170 current: agg, 171 } 172 if stateful { 173 if desc.MetricKind().PrecomputedSum() { 174 // If we know we need to compute deltas, allocate two aggregators. 175 b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) 176 } else { 177 // In this case we are certain not to need a delta, only allocate 178 // a cumulative aggregator. 179 b.AggregatorFor(desc, &newValue.cumulative) 180 } 181 } 182 b.state.values[key] = newValue 183 return nil 184 } 185 186 // Advance the update sequence number. 187 sameCollection := b.state.finishedCollection == value.updated 188 value.updated = b.state.finishedCollection 189 190 // At this point in the code, we have located an existing 191 // value for some stateKey. This can be because: 192 // 193 // (a) stateful aggregation is being used, the entry was 194 // entered during a prior collection, and this is the first 195 // time processing an accumulation for this stateKey in the 196 // current collection. Since this is the first time 197 // processing an accumulation for this stateKey during this 198 // collection, we don't know yet whether there are multiple 199 // accumulators at work. If there are multiple accumulators, 200 // they'll hit case (b) the second time through. 201 // 202 // (b) multiple accumulators are being used, whether stateful 203 // or not. 204 // 205 // Case (a) occurs when the instrument and the exporter 206 // require memory to work correctly, either because the 207 // instrument reports a PrecomputedSum to a DeltaExporter or 208 // the reverse, a non-PrecomputedSum instrument with a 209 // CumulativeExporter. This logic is encapsulated in 210 // ExportKind.MemoryRequired(MetricKind). 211 // 212 // Case (b) occurs when the variable `sameCollection` is true, 213 // indicating that the stateKey for Accumulation has already 214 // been seen in the same collection. When this happens, it 215 // implies that multiple Accumulators are being used, or that 216 // a single Accumulator has been configured with a label key 217 // filter. 218 219 if !sameCollection { 220 if !value.currentOwned { 221 // This is the first Accumulation we've seen 222 // for this stateKey during this collection. 223 // Just keep a reference to the Accumulator's 224 // Aggregator. All the other cases copy 225 // Aggregator state. 226 value.current = agg 227 return nil 228 } 229 return agg.SynchronizedMove(value.current, desc) 230 } 231 232 // If the current is not owned, take ownership of a copy 233 // before merging below. 234 if !value.currentOwned { 235 tmp := value.current 236 b.AggregatorSelector.AggregatorFor(desc, &value.current) 237 value.currentOwned = true 238 if err := tmp.SynchronizedMove(value.current, desc); err != nil { 239 return err 240 } 241 } 242 243 // Combine this Accumulation with the prior Accumulation. 244 return value.current.Merge(agg, desc) 245} 246 247// CheckpointSet returns the associated CheckpointSet. Use the 248// CheckpointSet Locker interface to synchronize access to this 249// object. The CheckpointSet.ForEach() method cannot be called 250// concurrently with Process(). 251func (b *Processor) CheckpointSet() export.CheckpointSet { 252 return &b.state 253} 254 255// StartCollection signals to the Processor one or more Accumulators 256// will begin calling Process() calls during collection. 257func (b *Processor) StartCollection() { 258 if b.startedCollection != 0 { 259 b.intervalStart = b.intervalEnd 260 } 261 b.startedCollection++ 262} 263 264// FinishCollection signals to the Processor that a complete 265// collection has finished and that ForEach will be called to access 266// the CheckpointSet. 267func (b *Processor) FinishCollection() error { 268 b.intervalEnd = time.Now() 269 if b.startedCollection != b.finishedCollection+1 { 270 return ErrInconsistentState 271 } 272 defer func() { b.finishedCollection++ }() 273 274 for key, value := range b.values { 275 mkind := key.descriptor.MetricKind() 276 stale := value.updated != b.finishedCollection 277 stateless := !value.stateful 278 279 // The following branch updates stateful aggregators. Skip 280 // these updates if the aggregator is not stateful or if the 281 // aggregator is stale. 282 if stale || stateless { 283 // If this processor does not require memeory, 284 // stale, stateless entries can be removed. 285 // This implies that they were not updated 286 // over the previous full collection interval. 287 if stale && stateless && !b.config.Memory { 288 delete(b.values, key) 289 } 290 continue 291 } 292 293 // Update Aggregator state to support exporting either a 294 // delta or a cumulative aggregation. 295 var err error 296 if mkind.PrecomputedSum() { 297 if currentSubtractor, ok := value.current.(export.Subtractor); ok { 298 // This line is equivalent to: 299 // value.delta = currentSubtractor - value.cumulative 300 err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor) 301 302 if err == nil { 303 err = value.current.SynchronizedMove(value.cumulative, key.descriptor) 304 } 305 } else { 306 err = aggregation.ErrNoSubtraction 307 } 308 } else { 309 // This line is equivalent to: 310 // value.cumulative = value.cumulative + value.delta 311 err = value.cumulative.Merge(value.current, key.descriptor) 312 } 313 if err != nil { 314 return err 315 } 316 } 317 return nil 318} 319 320// ForEach iterates through the CheckpointSet, passing an 321// export.Record with the appropriate Cumulative or Delta aggregation 322// to an exporter. 323func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error { 324 if b.startedCollection != b.finishedCollection { 325 return ErrInconsistentState 326 } 327 for key, value := range b.values { 328 mkind := key.descriptor.MetricKind() 329 330 var agg aggregation.Aggregation 331 var start time.Time 332 333 // If the processor does not have Config.Memory and it was not updated 334 // in the prior round, do not visit this value. 335 if !b.config.Memory && value.updated != (b.finishedCollection-1) { 336 continue 337 } 338 339 ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind()) 340 switch ekind { 341 case export.PassThroughExporter: 342 // No state is required, pass through the checkpointed value. 343 agg = value.current.Aggregation() 344 345 if mkind.PrecomputedSum() { 346 start = b.processStart 347 } else { 348 start = b.intervalStart 349 } 350 351 case export.CumulativeExporter: 352 // If stateful, the sum has been computed. If stateless, the 353 // input was already cumulative. Either way, use the checkpointed 354 // value: 355 if value.stateful { 356 agg = value.cumulative.Aggregation() 357 } else { 358 agg = value.current.Aggregation() 359 } 360 start = b.processStart 361 362 case export.DeltaExporter: 363 // Precomputed sums are a special case. 364 if mkind.PrecomputedSum() { 365 agg = value.delta.Aggregation() 366 } else { 367 agg = value.current.Aggregation() 368 } 369 start = b.intervalStart 370 371 default: 372 return fmt.Errorf("%v: %w", ekind, ErrInvalidExporterKind) 373 } 374 375 if err := f(export.NewRecord( 376 key.descriptor, 377 value.labels, 378 value.resource, 379 agg, 380 start, 381 b.intervalEnd, 382 )); err != nil && !errors.Is(err, aggregation.ErrNoData) { 383 return err 384 } 385 } 386 return nil 387} 388