1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//go:generate stringer -type=ExportKind
16
17package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
18
19import (
20	"context"
21	"sync"
22	"time"
23
24	"go.opentelemetry.io/otel/attribute"
25	"go.opentelemetry.io/otel/metric"
26	"go.opentelemetry.io/otel/metric/number"
27	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
28	"go.opentelemetry.io/otel/sdk/resource"
29)
30
31// Processor is responsible for deciding which kind of aggregation to
32// use (via AggregatorSelector), gathering exported results from the
33// SDK during collection, and deciding over which dimensions to group
34// the exported data.
35//
36// The SDK supports binding only one of these interfaces, as it has
37// the sole responsibility of determining which Aggregator to use for
38// each record.
39//
40// The embedded AggregatorSelector interface is called (concurrently)
41// in instrumentation context to select the appropriate Aggregator for
42// an instrument.
43//
44// The `Process` method is called during collection in a
45// single-threaded context from the SDK, after the aggregator is
46// checkpointed, allowing the processor to build the set of metrics
47// currently being exported.
48type Processor interface {
49	// AggregatorSelector is responsible for selecting the
50	// concrete type of Aggregator used for a metric in the SDK.
51	//
52	// This may be a static decision based on fields of the
53	// Descriptor, or it could use an external configuration
54	// source to customize the treatment of each metric
55	// instrument.
56	//
57	// The result from AggregatorSelector.AggregatorFor should be
58	// the same type for a given Descriptor or else nil.  The same
59	// type should be returned for a given descriptor, because
60	// Aggregators only know how to Merge with their own type.  If
61	// the result is nil, the metric instrument will be disabled.
62	//
63	// Note that the SDK only calls AggregatorFor when new records
64	// require an Aggregator. This does not provide a way to
65	// disable metrics with active records.
66	AggregatorSelector
67
68	// Process is called by the SDK once per internal record,
69	// passing the export Accumulation (a Descriptor, the corresponding
70	// Labels, and the checkpointed Aggregator).  This call has no
71	// Context argument because it is expected to perform only
72	// computation.  An SDK is not expected to call exporters from
73	// with Process, use a controller for that (see
74	// ./controllers/{pull,push}.
75	Process(accum Accumulation) error
76}
77
78// AggregatorSelector supports selecting the kind of Aggregator to
79// use at runtime for a specific metric instrument.
80type AggregatorSelector interface {
81	// AggregatorFor allocates a variable number of aggregators of
82	// a kind suitable for the requested export.  This method
83	// initializes a `...*Aggregator`, to support making a single
84	// allocation.
85	//
86	// When the call returns without initializing the *Aggregator
87	// to a non-nil value, the metric instrument is explicitly
88	// disabled.
89	//
90	// This must return a consistent type to avoid confusion in
91	// later stages of the metrics export process, i.e., when
92	// Merging or Checkpointing aggregators for a specific
93	// instrument.
94	//
95	// Note: This is context-free because the aggregator should
96	// not relate to the incoming context.  This call should not
97	// block.
98	AggregatorFor(descriptor *metric.Descriptor, aggregator ...*Aggregator)
99}
100
101// Checkpointer is the interface used by a Controller to coordinate
102// the Processor with Accumulator(s) and Exporter(s).  The
103// StartCollection() and FinishCollection() methods start and finish a
104// collection interval.  Controllers call the Accumulator(s) during
105// collection to process Accumulations.
106type Checkpointer interface {
107	// Processor processes metric data for export.  The Process
108	// method is bracketed by StartCollection and FinishCollection
109	// calls.  The embedded AggregatorSelector can be called at
110	// any time.
111	Processor
112
113	// CheckpointSet returns the current data set.  This may be
114	// called before and after collection.  The
115	// implementation is required to return the same value
116	// throughout its lifetime, since CheckpointSet exposes a
117	// sync.Locker interface.  The caller is responsible for
118	// locking the CheckpointSet before initiating collection.
119	CheckpointSet() CheckpointSet
120
121	// StartCollection begins a collection interval.
122	StartCollection()
123
124	// FinishCollection ends a collection interval.
125	FinishCollection() error
126}
127
128// Aggregator implements a specific aggregation behavior, e.g., a
129// behavior to track a sequence of updates to an instrument.  Sum-only
130// instruments commonly use a simple Sum aggregator, but for the
131// distribution instruments (ValueRecorder, ValueObserver) there are a
132// number of possible aggregators with different cost and accuracy
133// tradeoffs.
134//
135// Note that any Aggregator may be attached to any instrument--this is
136// the result of the OpenTelemetry API/SDK separation.  It is possible
137// to attach a Sum aggregator to a ValueRecorder instrument or a
138// MinMaxSumCount aggregator to a Counter instrument.
139type Aggregator interface {
140	// Aggregation returns an Aggregation interface to access the
141	// current state of this Aggregator.  The caller is
142	// responsible for synchronization and must not call any the
143	// other methods in this interface concurrently while using
144	// the Aggregation.
145	Aggregation() aggregation.Aggregation
146
147	// Update receives a new measured value and incorporates it
148	// into the aggregation.  Update() calls may be called
149	// concurrently.
150	//
151	// Descriptor.NumberKind() should be consulted to determine
152	// whether the provided number is an int64 or float64.
153	//
154	// The Context argument comes from user-level code and could be
155	// inspected for a `correlation.Map` or `trace.SpanContext`.
156	Update(ctx context.Context, number number.Number, descriptor *metric.Descriptor) error
157
158	// SynchronizedMove is called during collection to finish one
159	// period of aggregation by atomically saving the
160	// currently-updating state into the argument Aggregator AND
161	// resetting the current value to the zero state.
162	//
163	// SynchronizedMove() is called concurrently with Update().  These
164	// two methods must be synchronized with respect to each
165	// other, for correctness.
166	//
167	// After saving a synchronized copy, the Aggregator can be converted
168	// into one or more of the interfaces in the `aggregation` sub-package,
169	// according to kind of Aggregator that was selected.
170	//
171	// This method will return an InconsistentAggregatorError if
172	// this Aggregator cannot be copied into the destination due
173	// to an incompatible type.
174	//
175	// This call has no Context argument because it is expected to
176	// perform only computation.
177	//
178	// When called with a nil `destination`, this Aggregator is reset
179	// and the current value is discarded.
180	SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error
181
182	// Merge combines the checkpointed state from the argument
183	// Aggregator into this Aggregator.  Merge is not synchronized
184	// with respect to Update or SynchronizedMove.
185	//
186	// The owner of an Aggregator being merged is responsible for
187	// synchronization of both Aggregator states.
188	Merge(aggregator Aggregator, descriptor *metric.Descriptor) error
189}
190
191// Subtractor is an optional interface implemented by some
192// Aggregators.  An Aggregator must support `Subtract()` in order to
193// be configured for a Precomputed-Sum instrument (SumObserver,
194// UpDownSumObserver) using a DeltaExporter.
195type Subtractor interface {
196	// Subtract subtracts the `operand` from this Aggregator and
197	// outputs the value in `result`.
198	Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error
199}
200
201// Exporter handles presentation of the checkpoint of aggregate
202// metrics.  This is the final stage of a metrics export pipeline,
203// where metric data are formatted for a specific system.
204type Exporter interface {
205	// Export is called immediately after completing a collection
206	// pass in the SDK.
207	//
208	// The Context comes from the controller that initiated
209	// collection.
210	//
211	// The CheckpointSet interface refers to the Processor that just
212	// completed collection.
213	Export(ctx context.Context, checkpointSet CheckpointSet) error
214
215	// ExportKindSelector is an interface used by the Processor
216	// in deciding whether to compute Delta or Cumulative
217	// Aggregations when passing Records to this Exporter.
218	ExportKindSelector
219}
220
221// ExportKindSelector is a sub-interface of Exporter used to indicate
222// whether the Processor should compute Delta or Cumulative
223// Aggregations.
224type ExportKindSelector interface {
225	// ExportKindFor should return the correct ExportKind that
226	// should be used when exporting data for the given metric
227	// instrument and Aggregator kind.
228	ExportKindFor(descriptor *metric.Descriptor, aggregatorKind aggregation.Kind) ExportKind
229}
230
231// CheckpointSet allows a controller to access a complete checkpoint of
232// aggregated metrics from the Processor.  This is passed to the
233// Exporter which may then use ForEach to iterate over the collection
234// of aggregated metrics.
235type CheckpointSet interface {
236	// ForEach iterates over aggregated checkpoints for all
237	// metrics that were updated during the last collection
238	// period. Each aggregated checkpoint returned by the
239	// function parameter may return an error.
240	//
241	// The ExportKindSelector argument is used to determine
242	// whether the Record is computed using Delta or Cumulative
243	// aggregation.
244	//
245	// ForEach tolerates ErrNoData silently, as this is
246	// expected from the Meter implementation. Any other kind
247	// of error will immediately halt ForEach and return
248	// the error to the caller.
249	ForEach(kindSelector ExportKindSelector, recordFunc func(Record) error) error
250
251	// Locker supports locking the checkpoint set.  Collection
252	// into the checkpoint set cannot take place (in case of a
253	// stateful processor) while it is locked.
254	//
255	// The Processor attached to the Accumulator MUST be called
256	// with the lock held.
257	sync.Locker
258
259	// RLock acquires a read lock corresponding to this Locker.
260	RLock()
261	// RUnlock releases a read lock corresponding to this Locker.
262	RUnlock()
263}
264
265// Metadata contains the common elements for exported metric data that
266// are shared by the Accumulator->Processor and Processor->Exporter
267// steps.
268type Metadata struct {
269	descriptor *metric.Descriptor
270	labels     *attribute.Set
271	resource   *resource.Resource
272}
273
274// Accumulation contains the exported data for a single metric instrument
275// and label set, as prepared by an Accumulator for the Processor.
276type Accumulation struct {
277	Metadata
278	aggregator Aggregator
279}
280
281// Record contains the exported data for a single metric instrument
282// and label set, as prepared by the Processor for the Exporter.
283// This includes the effective start and end time for the aggregation.
284type Record struct {
285	Metadata
286	aggregation aggregation.Aggregation
287	start       time.Time
288	end         time.Time
289}
290
291// Descriptor describes the metric instrument being exported.
292func (m Metadata) Descriptor() *metric.Descriptor {
293	return m.descriptor
294}
295
296// Labels describes the labels associated with the instrument and the
297// aggregated data.
298func (m Metadata) Labels() *attribute.Set {
299	return m.labels
300}
301
302// Resource contains common attributes that apply to this metric event.
303func (m Metadata) Resource() *resource.Resource {
304	return m.resource
305}
306
307// NewAccumulation allows Accumulator implementations to construct new
308// Accumulations to send to Processors. The Descriptor, Labels, Resource,
309// and Aggregator represent aggregate metric events received over a single
310// collection period.
311func NewAccumulation(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregator Aggregator) Accumulation {
312	return Accumulation{
313		Metadata: Metadata{
314			descriptor: descriptor,
315			labels:     labels,
316			resource:   resource,
317		},
318		aggregator: aggregator,
319	}
320}
321
322// Aggregator returns the checkpointed aggregator. It is safe to
323// access the checkpointed state without locking.
324func (r Accumulation) Aggregator() Aggregator {
325	return r.aggregator
326}
327
328// NewRecord allows Processor implementations to construct export
329// records.  The Descriptor, Labels, and Aggregator represent
330// aggregate metric events received over a single collection period.
331func NewRecord(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record {
332	return Record{
333		Metadata: Metadata{
334			descriptor: descriptor,
335			labels:     labels,
336			resource:   resource,
337		},
338		aggregation: aggregation,
339		start:       start,
340		end:         end,
341	}
342}
343
344// Aggregation returns the aggregation, an interface to the record and
345// its aggregator, dependent on the kind of both the input and exporter.
346func (r Record) Aggregation() aggregation.Aggregation {
347	return r.aggregation
348}
349
350// StartTime is the start time of the interval covered by this aggregation.
351func (r Record) StartTime() time.Time {
352	return r.start
353}
354
355// EndTime is the end time of the interval covered by this aggregation.
356func (r Record) EndTime() time.Time {
357	return r.end
358}
359
360// ExportKind indicates the kind of data exported by an exporter.
361// These bits may be OR-d together when multiple exporters are in use.
362type ExportKind int
363
364const (
365	// CumulativeExportKind indicates that an Exporter expects a
366	// Cumulative Aggregation.
367	CumulativeExportKind ExportKind = 1
368
369	// DeltaExportKind indicates that an Exporter expects a
370	// Delta Aggregation.
371	DeltaExportKind ExportKind = 2
372)
373
374// Includes tests whether `kind` includes a specific kind of
375// exporter.
376func (kind ExportKind) Includes(has ExportKind) bool {
377	return kind&has != 0
378}
379
380// MemoryRequired returns whether an exporter of this kind requires
381// memory to export correctly.
382func (kind ExportKind) MemoryRequired(mkind metric.InstrumentKind) bool {
383	switch mkind {
384	case metric.ValueRecorderInstrumentKind, metric.ValueObserverInstrumentKind,
385		metric.CounterInstrumentKind, metric.UpDownCounterInstrumentKind:
386		// Delta-oriented instruments:
387		return kind.Includes(CumulativeExportKind)
388
389	case metric.SumObserverInstrumentKind, metric.UpDownSumObserverInstrumentKind:
390		// Cumulative-oriented instruments:
391		return kind.Includes(DeltaExportKind)
392	}
393	// Something unexpected is happening--we could panic.  This
394	// will become an error when the exporter tries to access a
395	// checkpoint, presumably, so let it be.
396	return false
397}
398
399type (
400	constantExportKindSelector  ExportKind
401	statelessExportKindSelector struct{}
402)
403
404var (
405	_ ExportKindSelector = constantExportKindSelector(0)
406	_ ExportKindSelector = statelessExportKindSelector{}
407)
408
409// ConstantExportKindSelector returns an ExportKindSelector that returns
410// a constant ExportKind, one that is either always cumulative or always delta.
411func ConstantExportKindSelector(kind ExportKind) ExportKindSelector {
412	return constantExportKindSelector(kind)
413}
414
415// CumulativeExportKindSelector returns an ExportKindSelector that
416// always returns CumulativeExportKind.
417func CumulativeExportKindSelector() ExportKindSelector {
418	return ConstantExportKindSelector(CumulativeExportKind)
419}
420
421// DeltaExportKindSelector returns an ExportKindSelector that
422// always returns DeltaExportKind.
423func DeltaExportKindSelector() ExportKindSelector {
424	return ConstantExportKindSelector(DeltaExportKind)
425}
426
427// StatelessExportKindSelector returns an ExportKindSelector that
428// always returns the ExportKind that avoids long-term memory
429// requirements.
430func StatelessExportKindSelector() ExportKindSelector {
431	return statelessExportKindSelector{}
432}
433
434// ExportKindFor implements ExportKindSelector.
435func (c constantExportKindSelector) ExportKindFor(_ *metric.Descriptor, _ aggregation.Kind) ExportKind {
436	return ExportKind(c)
437}
438
439// ExportKindFor implements ExportKindSelector.
440func (s statelessExportKindSelector) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) ExportKind {
441	if kind == aggregation.SumKind && desc.InstrumentKind().PrecomputedSum() {
442		return CumulativeExportKind
443	}
444	return DeltaExportKind
445}
446