1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package push // import "go.opentelemetry.io/otel/sdk/metric/controller/push"
16
17import (
18	"context"
19	"sync"
20	"time"
21
22	"go.opentelemetry.io/otel/api/global"
23	"go.opentelemetry.io/otel/api/metric"
24	"go.opentelemetry.io/otel/api/metric/registry"
25	export "go.opentelemetry.io/otel/sdk/export/metric"
26	sdk "go.opentelemetry.io/otel/sdk/metric"
27	controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
28)
29
30// DefaultPushPeriod is the default time interval between pushes.
31const DefaultPushPeriod = 10 * time.Second
32
33// Controller organizes a periodic push of metric data.
34type Controller struct {
35	lock         sync.Mutex
36	accumulator  *sdk.Accumulator
37	provider     *registry.Provider
38	checkpointer export.Checkpointer
39	exporter     export.Exporter
40	wg           sync.WaitGroup
41	ch           chan struct{}
42	period       time.Duration
43	timeout      time.Duration
44	clock        controllerTime.Clock
45	ticker       controllerTime.Ticker
46}
47
48// New constructs a Controller, an implementation of metric.Provider,
49// using the provided checkpointer, exporter, and options to configure
50// an SDK with periodic collection.
51func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Option) *Controller {
52	c := &Config{
53		Period: DefaultPushPeriod,
54	}
55	for _, opt := range opts {
56		opt.Apply(c)
57	}
58	if c.Timeout == 0 {
59		c.Timeout = c.Period
60	}
61
62	impl := sdk.NewAccumulator(
63		checkpointer,
64		sdk.WithResource(c.Resource),
65	)
66	return &Controller{
67		provider:     registry.NewProvider(impl),
68		accumulator:  impl,
69		checkpointer: checkpointer,
70		exporter:     exporter,
71		ch:           make(chan struct{}),
72		period:       c.Period,
73		timeout:      c.Timeout,
74		clock:        controllerTime.RealClock{},
75	}
76}
77
78// SetClock supports setting a mock clock for testing.  This must be
79// called before Start().
80func (c *Controller) SetClock(clock controllerTime.Clock) {
81	c.lock.Lock()
82	defer c.lock.Unlock()
83	c.clock = clock
84}
85
86// Provider returns a metric.Provider instance for this controller.
87func (c *Controller) Provider() metric.Provider {
88	return c.provider
89}
90
91// Start begins a ticker that periodically collects and exports
92// metrics with the configured interval.
93func (c *Controller) Start() {
94	c.lock.Lock()
95	defer c.lock.Unlock()
96
97	if c.ticker != nil {
98		return
99	}
100
101	c.ticker = c.clock.Ticker(c.period)
102	c.wg.Add(1)
103	go c.run(c.ch)
104}
105
106// Stop waits for the background goroutine to return and then collects
107// and exports metrics one last time before returning.
108func (c *Controller) Stop() {
109	c.lock.Lock()
110	defer c.lock.Unlock()
111
112	if c.ch == nil {
113		return
114	}
115
116	close(c.ch)
117	c.ch = nil
118	c.wg.Wait()
119	c.ticker.Stop()
120
121	c.tick()
122}
123
124func (c *Controller) run(ch chan struct{}) {
125	for {
126		select {
127		case <-ch:
128			c.wg.Done()
129			return
130		case <-c.ticker.C():
131			c.tick()
132		}
133	}
134}
135
136func (c *Controller) tick() {
137	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
138	defer cancel()
139
140	ckpt := c.checkpointer.CheckpointSet()
141	ckpt.Lock()
142	defer ckpt.Unlock()
143
144	c.checkpointer.StartCollection()
145	c.accumulator.Collect(ctx)
146	if err := c.checkpointer.FinishCollection(); err != nil {
147		global.Handle(err)
148	}
149
150	if err := c.exporter.Export(ctx, ckpt); err != nil {
151		global.Handle(err)
152	}
153}
154