1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic"
16
17import (
18	"errors"
19	"fmt"
20	"sync"
21	"time"
22
23	"go.opentelemetry.io/otel/attribute"
24	"go.opentelemetry.io/otel/metric"
25	export "go.opentelemetry.io/otel/sdk/export/metric"
26	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
27	"go.opentelemetry.io/otel/sdk/resource"
28)
29
30type (
31	Processor struct {
32		export.ExportKindSelector
33		export.AggregatorSelector
34
35		state
36	}
37
38	stateKey struct {
39		// TODO: This code is organized to support multiple
40		// accumulators which could theoretically produce the
41		// data for the same instrument with the same
42		// resources, and this code has logic to combine data
43		// properly from multiple accumulators.  However, the
44		// use of *metric.Descriptor in the stateKey makes
45		// such combination impossible, because each
46		// accumulator allocates its own instruments.  This
47		// can be fixed by using the instrument name and kind
48		// instead of the descriptor pointer.  See
49		// https://github.com/open-telemetry/opentelemetry-go/issues/862.
50		descriptor *metric.Descriptor
51		distinct   attribute.Distinct
52		resource   attribute.Distinct
53	}
54
55	stateValue struct {
56		// labels corresponds to the stateKey.distinct field.
57		labels *attribute.Set
58
59		// resource corresponds to the stateKey.resource field.
60		resource *resource.Resource
61
62		// updated indicates the last sequence number when this value had
63		// Process() called by an accumulator.
64		updated int64
65
66		// stateful indicates that a cumulative aggregation is
67		// being maintained, taken from the process start time.
68		stateful bool
69
70		// currentOwned indicates that "current" was allocated
71		// by the processor in order to merge results from
72		// multiple Accumulators during a single collection
73		// round, which may happen either because:
74		// (1) multiple Accumulators output the same Accumulation.
75		// (2) one Accumulator is configured with dimensionality reduction.
76		currentOwned bool
77
78		// current refers to the output from a single Accumulator
79		// (if !currentOwned) or it refers to an Aggregator
80		// owned by the processor used to accumulate multiple
81		// values in a single collection round.
82		current export.Aggregator
83
84		// delta, if non-nil, refers to an Aggregator owned by
85		// the processor used to compute deltas between
86		// precomputed sums.
87		delta export.Aggregator
88
89		// cumulative, if non-nil, refers to an Aggregator owned
90		// by the processor used to store the last cumulative
91		// value.
92		cumulative export.Aggregator
93	}
94
95	state struct {
96		config Config
97
98		// RWMutex implements locking for the `CheckpointSet` interface.
99		sync.RWMutex
100		values map[stateKey]*stateValue
101
102		// Note: the timestamp logic currently assumes all
103		// exports are deltas.
104
105		processStart  time.Time
106		intervalStart time.Time
107		intervalEnd   time.Time
108
109		// startedCollection and finishedCollection are the
110		// number of StartCollection() and FinishCollection()
111		// calls, used to ensure that the sequence of starts
112		// and finishes are correctly balanced.
113
114		startedCollection  int64
115		finishedCollection int64
116	}
117)
118
119var _ export.Processor = &Processor{}
120var _ export.Checkpointer = &Processor{}
121var _ export.CheckpointSet = &state{}
122var ErrInconsistentState = fmt.Errorf("inconsistent processor state")
123var ErrInvalidExportKind = fmt.Errorf("invalid export kind")
124
125// New returns a basic Processor that is also a Checkpointer using the provided
126// AggregatorSelector to select Aggregators.  The ExportKindSelector
127// is consulted to determine the kind(s) of exporter that will consume
128// data, so that this Processor can prepare to compute Delta or
129// Cumulative Aggregations as needed.
130func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor {
131	now := time.Now()
132	p := &Processor{
133		AggregatorSelector: aselector,
134		ExportKindSelector: eselector,
135		state: state{
136			values:        map[stateKey]*stateValue{},
137			processStart:  now,
138			intervalStart: now,
139		},
140	}
141	for _, opt := range opts {
142		opt.ApplyProcessor(&p.config)
143	}
144	return p
145}
146
147// Process implements export.Processor.
148func (b *Processor) Process(accum export.Accumulation) error {
149	if b.startedCollection != b.finishedCollection+1 {
150		return ErrInconsistentState
151	}
152	desc := accum.Descriptor()
153	key := stateKey{
154		descriptor: desc,
155		distinct:   accum.Labels().Equivalent(),
156		resource:   accum.Resource().Equivalent(),
157	}
158	agg := accum.Aggregator()
159
160	// Check if there is an existing value.
161	value, ok := b.state.values[key]
162	if !ok {
163		stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind())
164
165		newValue := &stateValue{
166			labels:   accum.Labels(),
167			resource: accum.Resource(),
168			updated:  b.state.finishedCollection,
169			stateful: stateful,
170			current:  agg,
171		}
172		if stateful {
173			if desc.InstrumentKind().PrecomputedSum() {
174				// If we know we need to compute deltas, allocate two aggregators.
175				b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
176			} else {
177				// In this case we are certain not to need a delta, only allocate
178				// a cumulative aggregator.
179				b.AggregatorFor(desc, &newValue.cumulative)
180			}
181		}
182		b.state.values[key] = newValue
183		return nil
184	}
185
186	// Advance the update sequence number.
187	sameCollection := b.state.finishedCollection == value.updated
188	value.updated = b.state.finishedCollection
189
190	// At this point in the code, we have located an existing
191	// value for some stateKey.  This can be because:
192	//
193	// (a) stateful aggregation is being used, the entry was
194	// entered during a prior collection, and this is the first
195	// time processing an accumulation for this stateKey in the
196	// current collection.  Since this is the first time
197	// processing an accumulation for this stateKey during this
198	// collection, we don't know yet whether there are multiple
199	// accumulators at work.  If there are multiple accumulators,
200	// they'll hit case (b) the second time through.
201	//
202	// (b) multiple accumulators are being used, whether stateful
203	// or not.
204	//
205	// Case (a) occurs when the instrument and the exporter
206	// require memory to work correctly, either because the
207	// instrument reports a PrecomputedSum to a DeltaExporter or
208	// the reverse, a non-PrecomputedSum instrument with a
209	// CumulativeExporter.  This logic is encapsulated in
210	// ExportKind.MemoryRequired(InstrumentKind).
211	//
212	// Case (b) occurs when the variable `sameCollection` is true,
213	// indicating that the stateKey for Accumulation has already
214	// been seen in the same collection.  When this happens, it
215	// implies that multiple Accumulators are being used, or that
216	// a single Accumulator has been configured with a label key
217	// filter.
218
219	if !sameCollection {
220		if !value.currentOwned {
221			// This is the first Accumulation we've seen
222			// for this stateKey during this collection.
223			// Just keep a reference to the Accumulator's
224			// Aggregator.  All the other cases copy
225			// Aggregator state.
226			value.current = agg
227			return nil
228		}
229		return agg.SynchronizedMove(value.current, desc)
230	}
231
232	// If the current is not owned, take ownership of a copy
233	// before merging below.
234	if !value.currentOwned {
235		tmp := value.current
236		b.AggregatorSelector.AggregatorFor(desc, &value.current)
237		value.currentOwned = true
238		if err := tmp.SynchronizedMove(value.current, desc); err != nil {
239			return err
240		}
241	}
242
243	// Combine this Accumulation with the prior Accumulation.
244	return value.current.Merge(agg, desc)
245}
246
247// CheckpointSet returns the associated CheckpointSet.  Use the
248// CheckpointSet Locker interface to synchronize access to this
249// object.  The CheckpointSet.ForEach() method cannot be called
250// concurrently with Process().
251func (b *Processor) CheckpointSet() export.CheckpointSet {
252	return &b.state
253}
254
255// StartCollection signals to the Processor one or more Accumulators
256// will begin calling Process() calls during collection.
257func (b *Processor) StartCollection() {
258	if b.startedCollection != 0 {
259		b.intervalStart = b.intervalEnd
260	}
261	b.startedCollection++
262}
263
264// FinishCollection signals to the Processor that a complete
265// collection has finished and that ForEach will be called to access
266// the CheckpointSet.
267func (b *Processor) FinishCollection() error {
268	b.intervalEnd = time.Now()
269	if b.startedCollection != b.finishedCollection+1 {
270		return ErrInconsistentState
271	}
272	defer func() { b.finishedCollection++ }()
273
274	for key, value := range b.values {
275		mkind := key.descriptor.InstrumentKind()
276		stale := value.updated != b.finishedCollection
277		stateless := !value.stateful
278
279		// The following branch updates stateful aggregators.  Skip
280		// these updates if the aggregator is not stateful or if the
281		// aggregator is stale.
282		if stale || stateless {
283			// If this processor does not require memeory,
284			// stale, stateless entries can be removed.
285			// This implies that they were not updated
286			// over the previous full collection interval.
287			if stale && stateless && !b.config.Memory {
288				delete(b.values, key)
289			}
290			continue
291		}
292
293		// Update Aggregator state to support exporting either a
294		// delta or a cumulative aggregation.
295		var err error
296		if mkind.PrecomputedSum() {
297			if currentSubtractor, ok := value.current.(export.Subtractor); ok {
298				// This line is equivalent to:
299				// value.delta = currentSubtractor - value.cumulative
300				err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)
301
302				if err == nil {
303					err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
304				}
305			} else {
306				err = aggregation.ErrNoSubtraction
307			}
308		} else {
309			// This line is equivalent to:
310			// value.cumulative = value.cumulative + value.delta
311			err = value.cumulative.Merge(value.current, key.descriptor)
312		}
313		if err != nil {
314			return err
315		}
316	}
317	return nil
318}
319
320// ForEach iterates through the CheckpointSet, passing an
321// export.Record with the appropriate Cumulative or Delta aggregation
322// to an exporter.
323func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
324	if b.startedCollection != b.finishedCollection {
325		return ErrInconsistentState
326	}
327	for key, value := range b.values {
328		mkind := key.descriptor.InstrumentKind()
329
330		var agg aggregation.Aggregation
331		var start time.Time
332
333		// If the processor does not have Config.Memory and it was not updated
334		// in the prior round, do not visit this value.
335		if !b.config.Memory && value.updated != (b.finishedCollection-1) {
336			continue
337		}
338
339		ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind())
340		switch ekind {
341		case export.CumulativeExportKind:
342			// If stateful, the sum has been computed.  If stateless, the
343			// input was already cumulative.  Either way, use the checkpointed
344			// value:
345			if value.stateful {
346				agg = value.cumulative.Aggregation()
347			} else {
348				agg = value.current.Aggregation()
349			}
350			start = b.processStart
351
352		case export.DeltaExportKind:
353			// Precomputed sums are a special case.
354			if mkind.PrecomputedSum() {
355				agg = value.delta.Aggregation()
356			} else {
357				agg = value.current.Aggregation()
358			}
359			start = b.intervalStart
360
361		default:
362			return fmt.Errorf("%v: %w", ekind, ErrInvalidExportKind)
363		}
364
365		if err := f(export.NewRecord(
366			key.descriptor,
367			value.labels,
368			value.resource,
369			agg,
370			start,
371			b.intervalEnd,
372		)); err != nil && !errors.Is(err, aggregation.ErrNoData) {
373			return err
374		}
375	}
376	return nil
377}
378