1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package push // import "go.opentelemetry.io/otel/sdk/metric/controller/push" 16 17import ( 18 "context" 19 "sync" 20 "time" 21 22 "go.opentelemetry.io/otel/api/global" 23 "go.opentelemetry.io/otel/api/metric" 24 "go.opentelemetry.io/otel/api/metric/registry" 25 export "go.opentelemetry.io/otel/sdk/export/metric" 26 sdk "go.opentelemetry.io/otel/sdk/metric" 27 controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" 28) 29 30// DefaultPushPeriod is the default time interval between pushes. 31const DefaultPushPeriod = 10 * time.Second 32 33// Controller organizes a periodic push of metric data. 34type Controller struct { 35 lock sync.Mutex 36 accumulator *sdk.Accumulator 37 provider *registry.Provider 38 checkpointer export.Checkpointer 39 exporter export.Exporter 40 wg sync.WaitGroup 41 ch chan struct{} 42 period time.Duration 43 timeout time.Duration 44 clock controllerTime.Clock 45 ticker controllerTime.Ticker 46} 47 48// New constructs a Controller, an implementation of metric.Provider, 49// using the provided checkpointer, exporter, and options to configure 50// an SDK with periodic collection. 51func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Option) *Controller { 52 c := &Config{ 53 Period: DefaultPushPeriod, 54 } 55 for _, opt := range opts { 56 opt.Apply(c) 57 } 58 if c.Timeout == 0 { 59 c.Timeout = c.Period 60 } 61 62 impl := sdk.NewAccumulator( 63 checkpointer, 64 sdk.WithResource(c.Resource), 65 ) 66 return &Controller{ 67 provider: registry.NewProvider(impl), 68 accumulator: impl, 69 checkpointer: checkpointer, 70 exporter: exporter, 71 ch: make(chan struct{}), 72 period: c.Period, 73 timeout: c.Timeout, 74 clock: controllerTime.RealClock{}, 75 } 76} 77 78// SetClock supports setting a mock clock for testing. This must be 79// called before Start(). 80func (c *Controller) SetClock(clock controllerTime.Clock) { 81 c.lock.Lock() 82 defer c.lock.Unlock() 83 c.clock = clock 84} 85 86// Provider returns a metric.Provider instance for this controller. 87func (c *Controller) Provider() metric.Provider { 88 return c.provider 89} 90 91// Start begins a ticker that periodically collects and exports 92// metrics with the configured interval. 93func (c *Controller) Start() { 94 c.lock.Lock() 95 defer c.lock.Unlock() 96 97 if c.ticker != nil { 98 return 99 } 100 101 c.ticker = c.clock.Ticker(c.period) 102 c.wg.Add(1) 103 go c.run(c.ch) 104} 105 106// Stop waits for the background goroutine to return and then collects 107// and exports metrics one last time before returning. 108func (c *Controller) Stop() { 109 c.lock.Lock() 110 defer c.lock.Unlock() 111 112 if c.ch == nil { 113 return 114 } 115 116 close(c.ch) 117 c.ch = nil 118 c.wg.Wait() 119 c.ticker.Stop() 120 121 c.tick() 122} 123 124func (c *Controller) run(ch chan struct{}) { 125 for { 126 select { 127 case <-ch: 128 c.wg.Done() 129 return 130 case <-c.ticker.C(): 131 c.tick() 132 } 133 } 134} 135 136func (c *Controller) tick() { 137 ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 138 defer cancel() 139 140 ckpt := c.checkpointer.CheckpointSet() 141 ckpt.Lock() 142 defer ckpt.Unlock() 143 144 c.checkpointer.StartCollection() 145 c.accumulator.Collect(ctx) 146 if err := c.checkpointer.FinishCollection(); err != nil { 147 global.Handle(err) 148 } 149 150 if err := c.exporter.Export(ctx, ckpt); err != nil { 151 global.Handle(err) 152 } 153} 154