1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package prometheus // import "go.opentelemetry.io/otel/exporters/metric/prometheus"
16
17import (
18	"context"
19	"fmt"
20	"net/http"
21	"sync"
22
23	"github.com/prometheus/client_golang/prometheus"
24	"github.com/prometheus/client_golang/prometheus/promhttp"
25
26	"go.opentelemetry.io/otel"
27	"go.opentelemetry.io/otel/label"
28	"go.opentelemetry.io/otel/metric"
29	"go.opentelemetry.io/otel/metric/number"
30	export "go.opentelemetry.io/otel/sdk/export/metric"
31	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
32	"go.opentelemetry.io/otel/sdk/metric/controller/pull"
33	"go.opentelemetry.io/otel/sdk/metric/processor/basic"
34	"go.opentelemetry.io/otel/sdk/metric/selector/simple"
35)
36
37// Exporter supports Prometheus pulls.  It does not implement the
38// sdk/export/metric.Exporter interface--instead it creates a pull
39// controller and reads the latest checkpointed data on-scrape.
40type Exporter struct {
41	handler http.Handler
42
43	registerer prometheus.Registerer
44	gatherer   prometheus.Gatherer
45
46	// lock protects access to the controller. The controller
47	// exposes its own lock, but using a dedicated lock in this
48	// struct allows the exporter to potentially support multiple
49	// controllers (e.g., with different resources).
50	lock       sync.RWMutex
51	controller *pull.Controller
52
53	defaultSummaryQuantiles    []float64
54	defaultHistogramBoundaries []float64
55}
56
57var _ http.Handler = &Exporter{}
58
59// Config is a set of configs for the tally reporter.
60type Config struct {
61	// Registry is the prometheus registry that will be used as the default Registerer and
62	// Gatherer if these are not specified.
63	//
64	// If not set a new empty Registry is created.
65	Registry *prometheus.Registry
66
67	// Registerer is the prometheus registerer to register
68	// metrics with.
69	//
70	// If not specified the Registry will be used as default.
71	Registerer prometheus.Registerer
72
73	// Gatherer is the prometheus gatherer to gather
74	// metrics with.
75	//
76	// If not specified the Registry will be used as default.
77	Gatherer prometheus.Gatherer
78
79	// DefaultSummaryQuantiles is the default summary quantiles
80	// to use. Use nil to specify the system-default summary quantiles.
81	DefaultSummaryQuantiles []float64
82
83	// DefaultHistogramBoundaries defines the default histogram bucket
84	// boundaries.
85	DefaultHistogramBoundaries []float64
86}
87
88// NewExportPipeline sets up a complete export pipeline with the recommended setup,
89// using the recommended selector and standard processor.  See the pull.Options.
90func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) {
91	if config.Registry == nil {
92		config.Registry = prometheus.NewRegistry()
93	}
94
95	if config.Registerer == nil {
96		config.Registerer = config.Registry
97	}
98
99	if config.Gatherer == nil {
100		config.Gatherer = config.Registry
101	}
102
103	e := &Exporter{
104		handler:                    promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}),
105		registerer:                 config.Registerer,
106		gatherer:                   config.Gatherer,
107		defaultSummaryQuantiles:    config.DefaultSummaryQuantiles,
108		defaultHistogramBoundaries: config.DefaultHistogramBoundaries,
109	}
110
111	c := &collector{
112		exp: e,
113	}
114	e.SetController(config, options...)
115	if err := config.Registerer.Register(c); err != nil {
116		return nil, fmt.Errorf("cannot register the collector: %w", err)
117	}
118
119	return e, nil
120}
121
122// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
123// Typically called as:
124//
125// 	hf, err := prometheus.InstallNewPipeline(prometheus.Config{...})
126//
127// 	if err != nil {
128// 		...
129// 	}
130// 	http.HandleFunc("/metrics", hf)
131// 	defer pipeline.Stop()
132// 	... Done
133func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error) {
134	exp, err := NewExportPipeline(config, options...)
135	if err != nil {
136		return nil, err
137	}
138	otel.SetMeterProvider(exp.MeterProvider())
139	return exp, nil
140}
141
142// SetController sets up a standard *pull.Controller as the metric provider
143// for this exporter.
144func (e *Exporter) SetController(config Config, options ...pull.Option) {
145	e.lock.Lock()
146	defer e.lock.Unlock()
147
148	e.controller = pull.New(
149		basic.New(
150			simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
151			e,
152			basic.WithMemory(true),
153		),
154		options...,
155	)
156}
157
158// MeterProvider returns the MeterProvider of this exporter.
159func (e *Exporter) MeterProvider() metric.MeterProvider {
160	return e.controller.MeterProvider()
161}
162
163// Controller returns the controller object that coordinates collection for the SDK.
164func (e *Exporter) Controller() *pull.Controller {
165	e.lock.RLock()
166	defer e.lock.RUnlock()
167	return e.controller
168}
169
170func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) export.ExportKind {
171	// NOTE: Summary values should use Delta aggregation, then be
172	// combined into a sliding window, see the TODO below.
173	// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
174	// which is expressed as a delta histogram.  Need to understand if this
175	// should be a default behavior for ValueRecorder/ValueObserver.
176	return export.CumulativeExportKindSelector().ExportKindFor(desc, kind)
177}
178
179func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
180	e.handler.ServeHTTP(w, r)
181}
182
183// collector implements prometheus.Collector interface.
184type collector struct {
185	exp *Exporter
186}
187
188var _ prometheus.Collector = (*collector)(nil)
189
190func (c *collector) Describe(ch chan<- *prometheus.Desc) {
191	c.exp.lock.RLock()
192	defer c.exp.lock.RUnlock()
193
194	_ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error {
195		var labelKeys []string
196		mergeLabels(record, &labelKeys, nil)
197		ch <- c.toDesc(record, labelKeys)
198		return nil
199	})
200}
201
202// Collect exports the last calculated CheckpointSet.
203//
204// Collect is invoked whenever prometheus.Gatherer is also invoked.
205// For example, when the HTTP endpoint is invoked by Prometheus.
206func (c *collector) Collect(ch chan<- prometheus.Metric) {
207	c.exp.lock.RLock()
208	defer c.exp.lock.RUnlock()
209
210	ctrl := c.exp.Controller()
211	if err := ctrl.Collect(context.Background()); err != nil {
212		otel.Handle(err)
213	}
214
215	err := ctrl.ForEach(c.exp, func(record export.Record) error {
216		agg := record.Aggregation()
217		numberKind := record.Descriptor().NumberKind()
218		instrumentKind := record.Descriptor().InstrumentKind()
219
220		var labelKeys, labels []string
221		mergeLabels(record, &labelKeys, &labels)
222
223		desc := c.toDesc(record, labelKeys)
224
225		if hist, ok := agg.(aggregation.Histogram); ok {
226			if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil {
227				return fmt.Errorf("exporting histogram: %w", err)
228			}
229		} else if dist, ok := agg.(aggregation.Distribution); ok {
230			// TODO: summaries values are never being resetted.
231			//  As measurements are recorded, new records starts to have less impact on these summaries.
232			//  We should implement an solution that is similar to the Prometheus Clients
233			//  using a rolling window for summaries could be a solution.
234			//
235			//  References:
236			// 	https://www.robustperception.io/how-does-a-prometheus-summary-work
237			//  https://github.com/prometheus/client_golang/blob/fa4aa9000d2863904891d193dea354d23f3d712a/prometheus/summary.go#L135
238			if err := c.exportSummary(ch, dist, numberKind, desc, labels); err != nil {
239				return fmt.Errorf("exporting summary: %w", err)
240			}
241		} else if sum, ok := agg.(aggregation.Sum); ok && instrumentKind.Monotonic() {
242			if err := c.exportMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil {
243				return fmt.Errorf("exporting monotonic counter: %w", err)
244			}
245		} else if sum, ok := agg.(aggregation.Sum); ok && !instrumentKind.Monotonic() {
246			if err := c.exportNonMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil {
247				return fmt.Errorf("exporting non monotonic counter: %w", err)
248			}
249		} else if lastValue, ok := agg.(aggregation.LastValue); ok {
250			if err := c.exportLastValue(ch, lastValue, numberKind, desc, labels); err != nil {
251				return fmt.Errorf("exporting last value: %w", err)
252			}
253		}
254		return nil
255	})
256	if err != nil {
257		otel.Handle(err)
258	}
259}
260
261func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind number.Kind, desc *prometheus.Desc, labels []string) error {
262	lv, _, err := lvagg.LastValue()
263	if err != nil {
264		return fmt.Errorf("error retrieving last value: %w", err)
265	}
266
267	m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lv.CoerceToFloat64(kind), labels...)
268	if err != nil {
269		return fmt.Errorf("error creating constant metric: %w", err)
270	}
271
272	ch <- m
273	return nil
274}
275
276func (c *collector) exportNonMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error {
277	v, err := sum.Sum()
278	if err != nil {
279		return fmt.Errorf("error retrieving counter: %w", err)
280	}
281
282	m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, v.CoerceToFloat64(kind), labels...)
283	if err != nil {
284		return fmt.Errorf("error creating constant metric: %w", err)
285	}
286
287	ch <- m
288	return nil
289}
290
291func (c *collector) exportMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error {
292	v, err := sum.Sum()
293	if err != nil {
294		return fmt.Errorf("error retrieving counter: %w", err)
295	}
296
297	m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...)
298	if err != nil {
299		return fmt.Errorf("error creating constant metric: %w", err)
300	}
301
302	ch <- m
303	return nil
304}
305
306func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregation.Distribution, kind number.Kind, desc *prometheus.Desc, labels []string) error {
307	count, err := dist.Count()
308	if err != nil {
309		return fmt.Errorf("error retrieving count: %w", err)
310	}
311
312	var sum number.Number
313	sum, err = dist.Sum()
314	if err != nil {
315		return fmt.Errorf("error retrieving distribution sum: %w", err)
316	}
317
318	quantiles := make(map[float64]float64)
319	for _, quantile := range c.exp.defaultSummaryQuantiles {
320		q, _ := dist.Quantile(quantile)
321		quantiles[quantile] = q.CoerceToFloat64(kind)
322	}
323
324	m, err := prometheus.NewConstSummary(desc, uint64(count), sum.CoerceToFloat64(kind), quantiles, labels...)
325	if err != nil {
326		return fmt.Errorf("error creating constant summary: %w", err)
327	}
328
329	ch <- m
330	return nil
331}
332
333func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind number.Kind, desc *prometheus.Desc, labels []string) error {
334	buckets, err := hist.Histogram()
335	if err != nil {
336		return fmt.Errorf("error retrieving histogram: %w", err)
337	}
338	sum, err := hist.Sum()
339	if err != nil {
340		return fmt.Errorf("error retrieving sum: %w", err)
341	}
342
343	var totalCount uint64
344	// counts maps from the bucket upper-bound to the cumulative count.
345	// The bucket with upper-bound +inf is not included.
346	counts := make(map[float64]uint64, len(buckets.Boundaries))
347	for i := range buckets.Boundaries {
348		boundary := buckets.Boundaries[i]
349		totalCount += uint64(buckets.Counts[i])
350		counts[boundary] = totalCount
351	}
352	// Include the +inf bucket in the total count.
353	totalCount += uint64(buckets.Counts[len(buckets.Counts)-1])
354
355	m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...)
356	if err != nil {
357		return fmt.Errorf("error creating constant histogram: %w", err)
358	}
359
360	ch <- m
361	return nil
362}
363
364func (c *collector) toDesc(record export.Record, labelKeys []string) *prometheus.Desc {
365	desc := record.Descriptor()
366	return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labelKeys, nil)
367}
368
369// mergeLabels merges the export.Record's labels and resources into a
370// single set, giving precedence to the record's labels in case of
371// duplicate keys.  This outputs one or both of the keys and the
372// values as a slice, and either argument may be nil to avoid
373// allocating an unnecessary slice.
374func mergeLabels(record export.Record, keys, values *[]string) {
375	if keys != nil {
376		*keys = make([]string, 0, record.Labels().Len()+record.Resource().Len())
377	}
378	if values != nil {
379		*values = make([]string, 0, record.Labels().Len()+record.Resource().Len())
380	}
381
382	// Duplicate keys are resolved by taking the record label value over
383	// the resource value.
384	mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet())
385	for mi.Next() {
386		label := mi.Label()
387		if keys != nil {
388			*keys = append(*keys, sanitize(string(label.Key)))
389		}
390		if values != nil {
391			*values = append(*values, label.Value.Emit())
392		}
393	}
394}
395