1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package processortest // import "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
16
17import (
18	"context"
19	"fmt"
20	"strings"
21	"sync"
22	"time"
23
24	"go.opentelemetry.io/otel/attribute"
25	"go.opentelemetry.io/otel/metric"
26	"go.opentelemetry.io/otel/metric/number"
27	export "go.opentelemetry.io/otel/sdk/export/metric"
28	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
29	"go.opentelemetry.io/otel/sdk/metric/aggregator/exact"
30	"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
31	"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
32	"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
33	"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
34	"go.opentelemetry.io/otel/sdk/resource"
35)
36
37type (
38	// mapKey is the unique key for a metric, consisting of its
39	// unique descriptor, distinct labels, and distinct resource
40	// attributes.
41	mapKey struct {
42		desc     *metric.Descriptor
43		labels   attribute.Distinct
44		resource attribute.Distinct
45	}
46
47	// mapValue is value stored in a processor used to produce a
48	// CheckpointSet.
49	mapValue struct {
50		labels     *attribute.Set
51		resource   *resource.Resource
52		aggregator export.Aggregator
53	}
54
55	// Output implements export.CheckpointSet.
56	Output struct {
57		m            map[mapKey]mapValue
58		labelEncoder attribute.Encoder
59		sync.RWMutex
60	}
61
62	// testAggregatorSelector returns aggregators consistent with
63	// the test variables below, needed for testing stateful
64	// processors, which clone Aggregators using AggregatorFor(desc).
65	testAggregatorSelector struct{}
66
67	// testCheckpointer is a export.Checkpointer.
68	testCheckpointer struct {
69		started  int
70		finished int
71		*Processor
72	}
73
74	// Processor is a testing implementation of export.Processor that
75	// assembles its results as a map[string]float64.
76	Processor struct {
77		export.AggregatorSelector
78		output *Output
79	}
80
81	// Exporter is a testing implementation of export.Exporter that
82	// assembles its results as a map[string]float64.
83	Exporter struct {
84		export.ExportKindSelector
85		output      *Output
86		exportCount int
87
88		// InjectErr supports returning conditional errors from
89		// the Export() routine.  This must be set before the
90		// Exporter is first used.
91		InjectErr func(export.Record) error
92	}
93)
94
95// NewProcessor returns a new testing Processor implementation.
96// Verify expected outputs using Values(), e.g.:
97//
98//     require.EqualValues(t, map[string]float64{
99//         "counter.sum/A=1,B=2/R=V": 100,
100//     }, processor.Values())
101//
102// Where in the example A=1,B=2 is the encoded labels and R=V is the
103// encoded resource value.
104func NewProcessor(selector export.AggregatorSelector, encoder attribute.Encoder) *Processor {
105	return &Processor{
106		AggregatorSelector: selector,
107		output:             NewOutput(encoder),
108	}
109}
110
111// Process implements export.Processor.
112func (p *Processor) Process(accum export.Accumulation) error {
113	return p.output.AddAccumulation(accum)
114}
115
116// Values returns the mapping from label set to point values for the
117// accumulations that were processed.  Point values are chosen as
118// either the Sum or the LastValue, whichever is implemented.  (All
119// the built-in Aggregators implement one of these interfaces.)
120func (p *Processor) Values() map[string]float64 {
121	return p.output.Map()
122}
123
124// Checkpointer returns a checkpointer that computes a single
125// interval.
126func Checkpointer(p *Processor) export.Checkpointer {
127	return &testCheckpointer{
128		Processor: p,
129	}
130}
131
132// StartCollection implements export.Checkpointer.
133func (c *testCheckpointer) StartCollection() {
134	if c.started != c.finished {
135		panic(fmt.Sprintf("collection was already started: %d != %d", c.started, c.finished))
136	}
137
138	c.started++
139}
140
141// FinishCollection implements export.Checkpointer.
142func (c *testCheckpointer) FinishCollection() error {
143	if c.started-1 != c.finished {
144		return fmt.Errorf("collection was not started: %d != %d", c.started, c.finished)
145	}
146
147	c.finished++
148	return nil
149}
150
151// CheckpointSet implements export.Checkpointer.
152func (c *testCheckpointer) CheckpointSet() export.CheckpointSet {
153	return c.Processor.output
154}
155
156// AggregatorSelector returns a policy that is consistent with the
157// test descriptors above.  I.e., it returns sum.New() for counter
158// instruments and lastvalue.New() for lastValue instruments.
159func AggregatorSelector() export.AggregatorSelector {
160	return testAggregatorSelector{}
161}
162
163// AggregatorFor implements export.AggregatorSelector.
164func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
165
166	switch {
167	case strings.HasSuffix(desc.Name(), ".disabled"):
168		for i := range aggPtrs {
169			*aggPtrs[i] = nil
170		}
171	case strings.HasSuffix(desc.Name(), ".sum"):
172		aggs := sum.New(len(aggPtrs))
173		for i := range aggPtrs {
174			*aggPtrs[i] = &aggs[i]
175		}
176	case strings.HasSuffix(desc.Name(), ".minmaxsumcount"):
177		aggs := minmaxsumcount.New(len(aggPtrs), desc)
178		for i := range aggPtrs {
179			*aggPtrs[i] = &aggs[i]
180		}
181	case strings.HasSuffix(desc.Name(), ".lastvalue"):
182		aggs := lastvalue.New(len(aggPtrs))
183		for i := range aggPtrs {
184			*aggPtrs[i] = &aggs[i]
185		}
186	case strings.HasSuffix(desc.Name(), ".histogram"):
187		aggs := histogram.New(len(aggPtrs), desc)
188		for i := range aggPtrs {
189			*aggPtrs[i] = &aggs[i]
190		}
191	case strings.HasSuffix(desc.Name(), ".exact"):
192		aggs := exact.New(len(aggPtrs))
193		for i := range aggPtrs {
194			*aggPtrs[i] = &aggs[i]
195		}
196	default:
197		panic(fmt.Sprint("Invalid instrument name for test AggregatorSelector: ", desc.Name()))
198	}
199}
200
201// NewOutput is a helper for testing an expected set of Accumulations
202// (from an Accumulator) or an expected set of Records (from a
203// Processor).  If testing with an Accumulator, it may be simpler to
204// use the test Processor in this package.
205func NewOutput(labelEncoder attribute.Encoder) *Output {
206	return &Output{
207		m:            make(map[mapKey]mapValue),
208		labelEncoder: labelEncoder,
209	}
210}
211
212// ForEach implements export.CheckpointSet.
213func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) error) error {
214	for key, value := range o.m {
215		if err := ff(export.NewRecord(
216			key.desc,
217			value.labels,
218			value.resource,
219			value.aggregator.Aggregation(),
220			time.Time{},
221			time.Time{},
222		)); err != nil {
223			return err
224		}
225	}
226	return nil
227}
228
229// AddRecord adds a string representation of the exported metric data
230// to a map for use in testing.  The value taken from the record is
231// either the Sum() or the LastValue() of its Aggregation(), whichever
232// is defined.  Record timestamps are ignored.
233func (o *Output) AddRecord(rec export.Record) error {
234	key := mapKey{
235		desc:     rec.Descriptor(),
236		labels:   rec.Labels().Equivalent(),
237		resource: rec.Resource().Equivalent(),
238	}
239	if _, ok := o.m[key]; !ok {
240		var agg export.Aggregator
241		testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
242		o.m[key] = mapValue{
243			aggregator: agg,
244			labels:     rec.Labels(),
245			resource:   rec.Resource(),
246		}
247	}
248	return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor())
249}
250
251// Map returns the calculated values for test validation from a set of
252// Accumulations or a set of Records.  When mapping records or
253// accumulations into floating point values, the Sum() or LastValue()
254// is chosen, whichever is implemented by the underlying Aggregator.
255func (o *Output) Map() map[string]float64 {
256	r := make(map[string]float64)
257	err := o.ForEach(export.StatelessExportKindSelector(), func(record export.Record) error {
258		for key, entry := range o.m {
259			encoded := entry.labels.Encoded(o.labelEncoder)
260			rencoded := entry.resource.Encoded(o.labelEncoder)
261			value := 0.0
262			if s, ok := entry.aggregator.(aggregation.Sum); ok {
263				sum, _ := s.Sum()
264				value = sum.CoerceToFloat64(key.desc.NumberKind())
265			} else if l, ok := entry.aggregator.(aggregation.LastValue); ok {
266				last, _, _ := l.LastValue()
267				value = last.CoerceToFloat64(key.desc.NumberKind())
268			} else if l, ok := entry.aggregator.(aggregation.Points); ok {
269				pts, _ := l.Points()
270				var sum number.Number
271				for _, s := range pts {
272					sum.AddNumber(key.desc.NumberKind(), s.Number)
273				}
274				value = sum.CoerceToFloat64(key.desc.NumberKind())
275			} else {
276				panic(fmt.Sprintf("Unhandled aggregator type: %T", entry.aggregator))
277			}
278			name := fmt.Sprint(key.desc.Name(), "/", encoded, "/", rencoded)
279			r[name] = value
280		}
281		return nil
282	})
283	if err != nil {
284		panic(fmt.Sprint("Unexpected processor error: ", err))
285	}
286	return r
287}
288
289// Reset restores the Output to its initial state, with no accumulated
290// metric data.
291func (o *Output) Reset() {
292	o.m = map[mapKey]mapValue{}
293}
294
295// AddAccumulation adds a string representation of the exported metric
296// data to a map for use in testing.  The value taken from the
297// accumulation is either the Sum() or the LastValue() of its
298// Aggregator().Aggregation(), whichever is defined.
299func (o *Output) AddAccumulation(acc export.Accumulation) error {
300	return o.AddRecord(
301		export.NewRecord(
302			acc.Descriptor(),
303			acc.Labels(),
304			acc.Resource(),
305			acc.Aggregator().Aggregation(),
306			time.Time{},
307			time.Time{},
308		),
309	)
310}
311
312// NewExporter returns a new testing Exporter implementation.
313// Verify exporter outputs using Values(), e.g.,:
314//
315//     require.EqualValues(t, map[string]float64{
316//         "counter.sum/A=1,B=2/R=V": 100,
317//     }, exporter.Values())
318//
319// Where in the example A=1,B=2 is the encoded labels and R=V is the
320// encoded resource value.
321func NewExporter(selector export.ExportKindSelector, encoder attribute.Encoder) *Exporter {
322	return &Exporter{
323		ExportKindSelector: selector,
324		output:             NewOutput(encoder),
325	}
326}
327
328func (e *Exporter) Export(_ context.Context, ckpt export.CheckpointSet) error {
329	e.output.Lock()
330	defer e.output.Unlock()
331	e.exportCount++
332	return ckpt.ForEach(e.ExportKindSelector, func(r export.Record) error {
333		if e.InjectErr != nil {
334			if err := e.InjectErr(r); err != nil {
335				return err
336			}
337		}
338		return e.output.AddRecord(r)
339	})
340}
341
342// Values returns the mapping from label set to point values for the
343// accumulations that were processed.  Point values are chosen as
344// either the Sum or the LastValue, whichever is implemented.  (All
345// the built-in Aggregators implement one of these interfaces.)
346func (e *Exporter) Values() map[string]float64 {
347	e.output.Lock()
348	defer e.output.Unlock()
349	return e.output.Map()
350}
351
352// ExportCount returns the number of times Export() has been called
353// since the last Reset().
354func (e *Exporter) ExportCount() int {
355	e.output.Lock()
356	defer e.output.Unlock()
357	return e.exportCount
358}
359
360// Reset sets the exporter's output to the initial, empty state and
361// resets the export count to zero.
362func (e *Exporter) Reset() {
363	e.output.Lock()
364	defer e.output.Unlock()
365	e.output.Reset()
366	e.exportCount = 0
367}
368