1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package prometheus // import "go.opentelemetry.io/otel/exporters/metric/prometheus" 16 17import ( 18 "context" 19 "fmt" 20 "net/http" 21 "sync" 22 23 "github.com/prometheus/client_golang/prometheus" 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 26 "go.opentelemetry.io/otel" 27 "go.opentelemetry.io/otel/label" 28 "go.opentelemetry.io/otel/metric" 29 "go.opentelemetry.io/otel/metric/number" 30 export "go.opentelemetry.io/otel/sdk/export/metric" 31 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 32 "go.opentelemetry.io/otel/sdk/metric/controller/pull" 33 "go.opentelemetry.io/otel/sdk/metric/processor/basic" 34 "go.opentelemetry.io/otel/sdk/metric/selector/simple" 35) 36 37// Exporter supports Prometheus pulls. It does not implement the 38// sdk/export/metric.Exporter interface--instead it creates a pull 39// controller and reads the latest checkpointed data on-scrape. 40type Exporter struct { 41 handler http.Handler 42 43 registerer prometheus.Registerer 44 gatherer prometheus.Gatherer 45 46 // lock protects access to the controller. The controller 47 // exposes its own lock, but using a dedicated lock in this 48 // struct allows the exporter to potentially support multiple 49 // controllers (e.g., with different resources). 50 lock sync.RWMutex 51 controller *pull.Controller 52 53 defaultSummaryQuantiles []float64 54 defaultHistogramBoundaries []float64 55} 56 57var _ http.Handler = &Exporter{} 58 59// Config is a set of configs for the tally reporter. 60type Config struct { 61 // Registry is the prometheus registry that will be used as the default Registerer and 62 // Gatherer if these are not specified. 63 // 64 // If not set a new empty Registry is created. 65 Registry *prometheus.Registry 66 67 // Registerer is the prometheus registerer to register 68 // metrics with. 69 // 70 // If not specified the Registry will be used as default. 71 Registerer prometheus.Registerer 72 73 // Gatherer is the prometheus gatherer to gather 74 // metrics with. 75 // 76 // If not specified the Registry will be used as default. 77 Gatherer prometheus.Gatherer 78 79 // DefaultSummaryQuantiles is the default summary quantiles 80 // to use. Use nil to specify the system-default summary quantiles. 81 DefaultSummaryQuantiles []float64 82 83 // DefaultHistogramBoundaries defines the default histogram bucket 84 // boundaries. 85 DefaultHistogramBoundaries []float64 86} 87 88// NewExportPipeline sets up a complete export pipeline with the recommended setup, 89// using the recommended selector and standard processor. See the pull.Options. 90func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) { 91 if config.Registry == nil { 92 config.Registry = prometheus.NewRegistry() 93 } 94 95 if config.Registerer == nil { 96 config.Registerer = config.Registry 97 } 98 99 if config.Gatherer == nil { 100 config.Gatherer = config.Registry 101 } 102 103 e := &Exporter{ 104 handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}), 105 registerer: config.Registerer, 106 gatherer: config.Gatherer, 107 defaultSummaryQuantiles: config.DefaultSummaryQuantiles, 108 defaultHistogramBoundaries: config.DefaultHistogramBoundaries, 109 } 110 111 c := &collector{ 112 exp: e, 113 } 114 e.SetController(config, options...) 115 if err := config.Registerer.Register(c); err != nil { 116 return nil, fmt.Errorf("cannot register the collector: %w", err) 117 } 118 119 return e, nil 120} 121 122// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. 123// Typically called as: 124// 125// hf, err := prometheus.InstallNewPipeline(prometheus.Config{...}) 126// 127// if err != nil { 128// ... 129// } 130// http.HandleFunc("/metrics", hf) 131// defer pipeline.Stop() 132// ... Done 133func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error) { 134 exp, err := NewExportPipeline(config, options...) 135 if err != nil { 136 return nil, err 137 } 138 otel.SetMeterProvider(exp.MeterProvider()) 139 return exp, nil 140} 141 142// SetController sets up a standard *pull.Controller as the metric provider 143// for this exporter. 144func (e *Exporter) SetController(config Config, options ...pull.Option) { 145 e.lock.Lock() 146 defer e.lock.Unlock() 147 148 e.controller = pull.New( 149 basic.New( 150 simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), 151 e, 152 basic.WithMemory(true), 153 ), 154 options..., 155 ) 156} 157 158// MeterProvider returns the MeterProvider of this exporter. 159func (e *Exporter) MeterProvider() metric.MeterProvider { 160 return e.controller.MeterProvider() 161} 162 163// Controller returns the controller object that coordinates collection for the SDK. 164func (e *Exporter) Controller() *pull.Controller { 165 e.lock.RLock() 166 defer e.lock.RUnlock() 167 return e.controller 168} 169 170func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) export.ExportKind { 171 // NOTE: Summary values should use Delta aggregation, then be 172 // combined into a sliding window, see the TODO below. 173 // NOTE: Prometheus also supports a "GaugeDelta" exposition format, 174 // which is expressed as a delta histogram. Need to understand if this 175 // should be a default behavior for ValueRecorder/ValueObserver. 176 return export.CumulativeExportKindSelector().ExportKindFor(desc, kind) 177} 178 179func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { 180 e.handler.ServeHTTP(w, r) 181} 182 183// collector implements prometheus.Collector interface. 184type collector struct { 185 exp *Exporter 186} 187 188var _ prometheus.Collector = (*collector)(nil) 189 190func (c *collector) Describe(ch chan<- *prometheus.Desc) { 191 c.exp.lock.RLock() 192 defer c.exp.lock.RUnlock() 193 194 _ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error { 195 var labelKeys []string 196 mergeLabels(record, &labelKeys, nil) 197 ch <- c.toDesc(record, labelKeys) 198 return nil 199 }) 200} 201 202// Collect exports the last calculated CheckpointSet. 203// 204// Collect is invoked whenever prometheus.Gatherer is also invoked. 205// For example, when the HTTP endpoint is invoked by Prometheus. 206func (c *collector) Collect(ch chan<- prometheus.Metric) { 207 c.exp.lock.RLock() 208 defer c.exp.lock.RUnlock() 209 210 ctrl := c.exp.Controller() 211 if err := ctrl.Collect(context.Background()); err != nil { 212 otel.Handle(err) 213 } 214 215 err := ctrl.ForEach(c.exp, func(record export.Record) error { 216 agg := record.Aggregation() 217 numberKind := record.Descriptor().NumberKind() 218 instrumentKind := record.Descriptor().InstrumentKind() 219 220 var labelKeys, labels []string 221 mergeLabels(record, &labelKeys, &labels) 222 223 desc := c.toDesc(record, labelKeys) 224 225 if hist, ok := agg.(aggregation.Histogram); ok { 226 if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil { 227 return fmt.Errorf("exporting histogram: %w", err) 228 } 229 } else if dist, ok := agg.(aggregation.Distribution); ok { 230 // TODO: summaries values are never being resetted. 231 // As measurements are recorded, new records starts to have less impact on these summaries. 232 // We should implement an solution that is similar to the Prometheus Clients 233 // using a rolling window for summaries could be a solution. 234 // 235 // References: 236 // https://www.robustperception.io/how-does-a-prometheus-summary-work 237 // https://github.com/prometheus/client_golang/blob/fa4aa9000d2863904891d193dea354d23f3d712a/prometheus/summary.go#L135 238 if err := c.exportSummary(ch, dist, numberKind, desc, labels); err != nil { 239 return fmt.Errorf("exporting summary: %w", err) 240 } 241 } else if sum, ok := agg.(aggregation.Sum); ok && instrumentKind.Monotonic() { 242 if err := c.exportMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil { 243 return fmt.Errorf("exporting monotonic counter: %w", err) 244 } 245 } else if sum, ok := agg.(aggregation.Sum); ok && !instrumentKind.Monotonic() { 246 if err := c.exportNonMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil { 247 return fmt.Errorf("exporting non monotonic counter: %w", err) 248 } 249 } else if lastValue, ok := agg.(aggregation.LastValue); ok { 250 if err := c.exportLastValue(ch, lastValue, numberKind, desc, labels); err != nil { 251 return fmt.Errorf("exporting last value: %w", err) 252 } 253 } 254 return nil 255 }) 256 if err != nil { 257 otel.Handle(err) 258 } 259} 260 261func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind number.Kind, desc *prometheus.Desc, labels []string) error { 262 lv, _, err := lvagg.LastValue() 263 if err != nil { 264 return fmt.Errorf("error retrieving last value: %w", err) 265 } 266 267 m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lv.CoerceToFloat64(kind), labels...) 268 if err != nil { 269 return fmt.Errorf("error creating constant metric: %w", err) 270 } 271 272 ch <- m 273 return nil 274} 275 276func (c *collector) exportNonMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error { 277 v, err := sum.Sum() 278 if err != nil { 279 return fmt.Errorf("error retrieving counter: %w", err) 280 } 281 282 m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, v.CoerceToFloat64(kind), labels...) 283 if err != nil { 284 return fmt.Errorf("error creating constant metric: %w", err) 285 } 286 287 ch <- m 288 return nil 289} 290 291func (c *collector) exportMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error { 292 v, err := sum.Sum() 293 if err != nil { 294 return fmt.Errorf("error retrieving counter: %w", err) 295 } 296 297 m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...) 298 if err != nil { 299 return fmt.Errorf("error creating constant metric: %w", err) 300 } 301 302 ch <- m 303 return nil 304} 305 306func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregation.Distribution, kind number.Kind, desc *prometheus.Desc, labels []string) error { 307 count, err := dist.Count() 308 if err != nil { 309 return fmt.Errorf("error retrieving count: %w", err) 310 } 311 312 var sum number.Number 313 sum, err = dist.Sum() 314 if err != nil { 315 return fmt.Errorf("error retrieving distribution sum: %w", err) 316 } 317 318 quantiles := make(map[float64]float64) 319 for _, quantile := range c.exp.defaultSummaryQuantiles { 320 q, _ := dist.Quantile(quantile) 321 quantiles[quantile] = q.CoerceToFloat64(kind) 322 } 323 324 m, err := prometheus.NewConstSummary(desc, uint64(count), sum.CoerceToFloat64(kind), quantiles, labels...) 325 if err != nil { 326 return fmt.Errorf("error creating constant summary: %w", err) 327 } 328 329 ch <- m 330 return nil 331} 332 333func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind number.Kind, desc *prometheus.Desc, labels []string) error { 334 buckets, err := hist.Histogram() 335 if err != nil { 336 return fmt.Errorf("error retrieving histogram: %w", err) 337 } 338 sum, err := hist.Sum() 339 if err != nil { 340 return fmt.Errorf("error retrieving sum: %w", err) 341 } 342 343 var totalCount uint64 344 // counts maps from the bucket upper-bound to the cumulative count. 345 // The bucket with upper-bound +inf is not included. 346 counts := make(map[float64]uint64, len(buckets.Boundaries)) 347 for i := range buckets.Boundaries { 348 boundary := buckets.Boundaries[i] 349 totalCount += uint64(buckets.Counts[i]) 350 counts[boundary] = totalCount 351 } 352 // Include the +inf bucket in the total count. 353 totalCount += uint64(buckets.Counts[len(buckets.Counts)-1]) 354 355 m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...) 356 if err != nil { 357 return fmt.Errorf("error creating constant histogram: %w", err) 358 } 359 360 ch <- m 361 return nil 362} 363 364func (c *collector) toDesc(record export.Record, labelKeys []string) *prometheus.Desc { 365 desc := record.Descriptor() 366 return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labelKeys, nil) 367} 368 369// mergeLabels merges the export.Record's labels and resources into a 370// single set, giving precedence to the record's labels in case of 371// duplicate keys. This outputs one or both of the keys and the 372// values as a slice, and either argument may be nil to avoid 373// allocating an unnecessary slice. 374func mergeLabels(record export.Record, keys, values *[]string) { 375 if keys != nil { 376 *keys = make([]string, 0, record.Labels().Len()+record.Resource().Len()) 377 } 378 if values != nil { 379 *values = make([]string, 0, record.Labels().Len()+record.Resource().Len()) 380 } 381 382 // Duplicate keys are resolved by taking the record label value over 383 // the resource value. 384 mi := label.NewMergeIterator(record.Labels(), record.Resource().LabelSet()) 385 for mi.Next() { 386 label := mi.Label() 387 if keys != nil { 388 *keys = append(*keys, sanitize(string(label.Key))) 389 } 390 if values != nil { 391 *values = append(*values, label.Value.Emit()) 392 } 393 } 394} 395