1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15//go:generate stringer -type=ExportKind 16 17package metric // import "go.opentelemetry.io/otel/sdk/export/metric" 18 19import ( 20 "context" 21 "sync" 22 "time" 23 24 "go.opentelemetry.io/otel/attribute" 25 "go.opentelemetry.io/otel/metric" 26 "go.opentelemetry.io/otel/metric/number" 27 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 28 "go.opentelemetry.io/otel/sdk/resource" 29) 30 31// Processor is responsible for deciding which kind of aggregation to 32// use (via AggregatorSelector), gathering exported results from the 33// SDK during collection, and deciding over which dimensions to group 34// the exported data. 35// 36// The SDK supports binding only one of these interfaces, as it has 37// the sole responsibility of determining which Aggregator to use for 38// each record. 39// 40// The embedded AggregatorSelector interface is called (concurrently) 41// in instrumentation context to select the appropriate Aggregator for 42// an instrument. 43// 44// The `Process` method is called during collection in a 45// single-threaded context from the SDK, after the aggregator is 46// checkpointed, allowing the processor to build the set of metrics 47// currently being exported. 48type Processor interface { 49 // AggregatorSelector is responsible for selecting the 50 // concrete type of Aggregator used for a metric in the SDK. 51 // 52 // This may be a static decision based on fields of the 53 // Descriptor, or it could use an external configuration 54 // source to customize the treatment of each metric 55 // instrument. 56 // 57 // The result from AggregatorSelector.AggregatorFor should be 58 // the same type for a given Descriptor or else nil. The same 59 // type should be returned for a given descriptor, because 60 // Aggregators only know how to Merge with their own type. If 61 // the result is nil, the metric instrument will be disabled. 62 // 63 // Note that the SDK only calls AggregatorFor when new records 64 // require an Aggregator. This does not provide a way to 65 // disable metrics with active records. 66 AggregatorSelector 67 68 // Process is called by the SDK once per internal record, 69 // passing the export Accumulation (a Descriptor, the corresponding 70 // Labels, and the checkpointed Aggregator). This call has no 71 // Context argument because it is expected to perform only 72 // computation. An SDK is not expected to call exporters from 73 // with Process, use a controller for that (see 74 // ./controllers/{pull,push}. 75 Process(accum Accumulation) error 76} 77 78// AggregatorSelector supports selecting the kind of Aggregator to 79// use at runtime for a specific metric instrument. 80type AggregatorSelector interface { 81 // AggregatorFor allocates a variable number of aggregators of 82 // a kind suitable for the requested export. This method 83 // initializes a `...*Aggregator`, to support making a single 84 // allocation. 85 // 86 // When the call returns without initializing the *Aggregator 87 // to a non-nil value, the metric instrument is explicitly 88 // disabled. 89 // 90 // This must return a consistent type to avoid confusion in 91 // later stages of the metrics export process, i.e., when 92 // Merging or Checkpointing aggregators for a specific 93 // instrument. 94 // 95 // Note: This is context-free because the aggregator should 96 // not relate to the incoming context. This call should not 97 // block. 98 AggregatorFor(descriptor *metric.Descriptor, aggregator ...*Aggregator) 99} 100 101// Checkpointer is the interface used by a Controller to coordinate 102// the Processor with Accumulator(s) and Exporter(s). The 103// StartCollection() and FinishCollection() methods start and finish a 104// collection interval. Controllers call the Accumulator(s) during 105// collection to process Accumulations. 106type Checkpointer interface { 107 // Processor processes metric data for export. The Process 108 // method is bracketed by StartCollection and FinishCollection 109 // calls. The embedded AggregatorSelector can be called at 110 // any time. 111 Processor 112 113 // CheckpointSet returns the current data set. This may be 114 // called before and after collection. The 115 // implementation is required to return the same value 116 // throughout its lifetime, since CheckpointSet exposes a 117 // sync.Locker interface. The caller is responsible for 118 // locking the CheckpointSet before initiating collection. 119 CheckpointSet() CheckpointSet 120 121 // StartCollection begins a collection interval. 122 StartCollection() 123 124 // FinishCollection ends a collection interval. 125 FinishCollection() error 126} 127 128// Aggregator implements a specific aggregation behavior, e.g., a 129// behavior to track a sequence of updates to an instrument. Sum-only 130// instruments commonly use a simple Sum aggregator, but for the 131// distribution instruments (ValueRecorder, ValueObserver) there are a 132// number of possible aggregators with different cost and accuracy 133// tradeoffs. 134// 135// Note that any Aggregator may be attached to any instrument--this is 136// the result of the OpenTelemetry API/SDK separation. It is possible 137// to attach a Sum aggregator to a ValueRecorder instrument or a 138// MinMaxSumCount aggregator to a Counter instrument. 139type Aggregator interface { 140 // Aggregation returns an Aggregation interface to access the 141 // current state of this Aggregator. The caller is 142 // responsible for synchronization and must not call any the 143 // other methods in this interface concurrently while using 144 // the Aggregation. 145 Aggregation() aggregation.Aggregation 146 147 // Update receives a new measured value and incorporates it 148 // into the aggregation. Update() calls may be called 149 // concurrently. 150 // 151 // Descriptor.NumberKind() should be consulted to determine 152 // whether the provided number is an int64 or float64. 153 // 154 // The Context argument comes from user-level code and could be 155 // inspected for a `correlation.Map` or `trace.SpanContext`. 156 Update(ctx context.Context, number number.Number, descriptor *metric.Descriptor) error 157 158 // SynchronizedMove is called during collection to finish one 159 // period of aggregation by atomically saving the 160 // currently-updating state into the argument Aggregator AND 161 // resetting the current value to the zero state. 162 // 163 // SynchronizedMove() is called concurrently with Update(). These 164 // two methods must be synchronized with respect to each 165 // other, for correctness. 166 // 167 // After saving a synchronized copy, the Aggregator can be converted 168 // into one or more of the interfaces in the `aggregation` sub-package, 169 // according to kind of Aggregator that was selected. 170 // 171 // This method will return an InconsistentAggregatorError if 172 // this Aggregator cannot be copied into the destination due 173 // to an incompatible type. 174 // 175 // This call has no Context argument because it is expected to 176 // perform only computation. 177 // 178 // When called with a nil `destination`, this Aggregator is reset 179 // and the current value is discarded. 180 SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error 181 182 // Merge combines the checkpointed state from the argument 183 // Aggregator into this Aggregator. Merge is not synchronized 184 // with respect to Update or SynchronizedMove. 185 // 186 // The owner of an Aggregator being merged is responsible for 187 // synchronization of both Aggregator states. 188 Merge(aggregator Aggregator, descriptor *metric.Descriptor) error 189} 190 191// Subtractor is an optional interface implemented by some 192// Aggregators. An Aggregator must support `Subtract()` in order to 193// be configured for a Precomputed-Sum instrument (SumObserver, 194// UpDownSumObserver) using a DeltaExporter. 195type Subtractor interface { 196 // Subtract subtracts the `operand` from this Aggregator and 197 // outputs the value in `result`. 198 Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error 199} 200 201// Exporter handles presentation of the checkpoint of aggregate 202// metrics. This is the final stage of a metrics export pipeline, 203// where metric data are formatted for a specific system. 204type Exporter interface { 205 // Export is called immediately after completing a collection 206 // pass in the SDK. 207 // 208 // The Context comes from the controller that initiated 209 // collection. 210 // 211 // The CheckpointSet interface refers to the Processor that just 212 // completed collection. 213 Export(ctx context.Context, checkpointSet CheckpointSet) error 214 215 // ExportKindSelector is an interface used by the Processor 216 // in deciding whether to compute Delta or Cumulative 217 // Aggregations when passing Records to this Exporter. 218 ExportKindSelector 219} 220 221// ExportKindSelector is a sub-interface of Exporter used to indicate 222// whether the Processor should compute Delta or Cumulative 223// Aggregations. 224type ExportKindSelector interface { 225 // ExportKindFor should return the correct ExportKind that 226 // should be used when exporting data for the given metric 227 // instrument and Aggregator kind. 228 ExportKindFor(descriptor *metric.Descriptor, aggregatorKind aggregation.Kind) ExportKind 229} 230 231// CheckpointSet allows a controller to access a complete checkpoint of 232// aggregated metrics from the Processor. This is passed to the 233// Exporter which may then use ForEach to iterate over the collection 234// of aggregated metrics. 235type CheckpointSet interface { 236 // ForEach iterates over aggregated checkpoints for all 237 // metrics that were updated during the last collection 238 // period. Each aggregated checkpoint returned by the 239 // function parameter may return an error. 240 // 241 // The ExportKindSelector argument is used to determine 242 // whether the Record is computed using Delta or Cumulative 243 // aggregation. 244 // 245 // ForEach tolerates ErrNoData silently, as this is 246 // expected from the Meter implementation. Any other kind 247 // of error will immediately halt ForEach and return 248 // the error to the caller. 249 ForEach(kindSelector ExportKindSelector, recordFunc func(Record) error) error 250 251 // Locker supports locking the checkpoint set. Collection 252 // into the checkpoint set cannot take place (in case of a 253 // stateful processor) while it is locked. 254 // 255 // The Processor attached to the Accumulator MUST be called 256 // with the lock held. 257 sync.Locker 258 259 // RLock acquires a read lock corresponding to this Locker. 260 RLock() 261 // RUnlock releases a read lock corresponding to this Locker. 262 RUnlock() 263} 264 265// Metadata contains the common elements for exported metric data that 266// are shared by the Accumulator->Processor and Processor->Exporter 267// steps. 268type Metadata struct { 269 descriptor *metric.Descriptor 270 labels *attribute.Set 271 resource *resource.Resource 272} 273 274// Accumulation contains the exported data for a single metric instrument 275// and label set, as prepared by an Accumulator for the Processor. 276type Accumulation struct { 277 Metadata 278 aggregator Aggregator 279} 280 281// Record contains the exported data for a single metric instrument 282// and label set, as prepared by the Processor for the Exporter. 283// This includes the effective start and end time for the aggregation. 284type Record struct { 285 Metadata 286 aggregation aggregation.Aggregation 287 start time.Time 288 end time.Time 289} 290 291// Descriptor describes the metric instrument being exported. 292func (m Metadata) Descriptor() *metric.Descriptor { 293 return m.descriptor 294} 295 296// Labels describes the labels associated with the instrument and the 297// aggregated data. 298func (m Metadata) Labels() *attribute.Set { 299 return m.labels 300} 301 302// Resource contains common attributes that apply to this metric event. 303func (m Metadata) Resource() *resource.Resource { 304 return m.resource 305} 306 307// NewAccumulation allows Accumulator implementations to construct new 308// Accumulations to send to Processors. The Descriptor, Labels, Resource, 309// and Aggregator represent aggregate metric events received over a single 310// collection period. 311func NewAccumulation(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregator Aggregator) Accumulation { 312 return Accumulation{ 313 Metadata: Metadata{ 314 descriptor: descriptor, 315 labels: labels, 316 resource: resource, 317 }, 318 aggregator: aggregator, 319 } 320} 321 322// Aggregator returns the checkpointed aggregator. It is safe to 323// access the checkpointed state without locking. 324func (r Accumulation) Aggregator() Aggregator { 325 return r.aggregator 326} 327 328// NewRecord allows Processor implementations to construct export 329// records. The Descriptor, Labels, and Aggregator represent 330// aggregate metric events received over a single collection period. 331func NewRecord(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record { 332 return Record{ 333 Metadata: Metadata{ 334 descriptor: descriptor, 335 labels: labels, 336 resource: resource, 337 }, 338 aggregation: aggregation, 339 start: start, 340 end: end, 341 } 342} 343 344// Aggregation returns the aggregation, an interface to the record and 345// its aggregator, dependent on the kind of both the input and exporter. 346func (r Record) Aggregation() aggregation.Aggregation { 347 return r.aggregation 348} 349 350// StartTime is the start time of the interval covered by this aggregation. 351func (r Record) StartTime() time.Time { 352 return r.start 353} 354 355// EndTime is the end time of the interval covered by this aggregation. 356func (r Record) EndTime() time.Time { 357 return r.end 358} 359 360// ExportKind indicates the kind of data exported by an exporter. 361// These bits may be OR-d together when multiple exporters are in use. 362type ExportKind int 363 364const ( 365 // CumulativeExportKind indicates that an Exporter expects a 366 // Cumulative Aggregation. 367 CumulativeExportKind ExportKind = 1 368 369 // DeltaExportKind indicates that an Exporter expects a 370 // Delta Aggregation. 371 DeltaExportKind ExportKind = 2 372) 373 374// Includes tests whether `kind` includes a specific kind of 375// exporter. 376func (kind ExportKind) Includes(has ExportKind) bool { 377 return kind&has != 0 378} 379 380// MemoryRequired returns whether an exporter of this kind requires 381// memory to export correctly. 382func (kind ExportKind) MemoryRequired(mkind metric.InstrumentKind) bool { 383 switch mkind { 384 case metric.ValueRecorderInstrumentKind, metric.ValueObserverInstrumentKind, 385 metric.CounterInstrumentKind, metric.UpDownCounterInstrumentKind: 386 // Delta-oriented instruments: 387 return kind.Includes(CumulativeExportKind) 388 389 case metric.SumObserverInstrumentKind, metric.UpDownSumObserverInstrumentKind: 390 // Cumulative-oriented instruments: 391 return kind.Includes(DeltaExportKind) 392 } 393 // Something unexpected is happening--we could panic. This 394 // will become an error when the exporter tries to access a 395 // checkpoint, presumably, so let it be. 396 return false 397} 398 399type ( 400 constantExportKindSelector ExportKind 401 statelessExportKindSelector struct{} 402) 403 404var ( 405 _ ExportKindSelector = constantExportKindSelector(0) 406 _ ExportKindSelector = statelessExportKindSelector{} 407) 408 409// ConstantExportKindSelector returns an ExportKindSelector that returns 410// a constant ExportKind, one that is either always cumulative or always delta. 411func ConstantExportKindSelector(kind ExportKind) ExportKindSelector { 412 return constantExportKindSelector(kind) 413} 414 415// CumulativeExportKindSelector returns an ExportKindSelector that 416// always returns CumulativeExportKind. 417func CumulativeExportKindSelector() ExportKindSelector { 418 return ConstantExportKindSelector(CumulativeExportKind) 419} 420 421// DeltaExportKindSelector returns an ExportKindSelector that 422// always returns DeltaExportKind. 423func DeltaExportKindSelector() ExportKindSelector { 424 return ConstantExportKindSelector(DeltaExportKind) 425} 426 427// StatelessExportKindSelector returns an ExportKindSelector that 428// always returns the ExportKind that avoids long-term memory 429// requirements. 430func StatelessExportKindSelector() ExportKindSelector { 431 return statelessExportKindSelector{} 432} 433 434// ExportKindFor implements ExportKindSelector. 435func (c constantExportKindSelector) ExportKindFor(_ *metric.Descriptor, _ aggregation.Kind) ExportKind { 436 return ExportKind(c) 437} 438 439// ExportKindFor implements ExportKindSelector. 440func (s statelessExportKindSelector) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) ExportKind { 441 if kind == aggregation.SumKind && desc.InstrumentKind().PrecomputedSum() { 442 return CumulativeExportKind 443 } 444 return DeltaExportKind 445} 446