1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package service handles the command-line, configuration, and runs the
16// OpenTelemetry Collector.
17package service
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"os"
25	"os/signal"
26	"runtime"
27	"syscall"
28
29	"github.com/spf13/cobra"
30	"go.opentelemetry.io/contrib/zpages"
31	"go.opentelemetry.io/otel"
32	sdktrace "go.opentelemetry.io/otel/sdk/trace"
33	"go.uber.org/zap"
34
35	"go.opentelemetry.io/collector/component"
36	"go.opentelemetry.io/collector/config/configcheck"
37	"go.opentelemetry.io/collector/config/configloader"
38	"go.opentelemetry.io/collector/config/configtelemetry"
39	"go.opentelemetry.io/collector/config/experimental/configsource"
40	"go.opentelemetry.io/collector/consumer/consumererror"
41	"go.opentelemetry.io/collector/extension/ballastextension"
42	"go.opentelemetry.io/collector/internal/collector/telemetry"
43	"go.opentelemetry.io/collector/service/internal"
44	"go.opentelemetry.io/collector/service/internal/builder"
45	"go.opentelemetry.io/collector/service/parserprovider"
46)
47
48// State defines Collector's state.
49type State int
50
51const (
52	Starting State = iota
53	Running
54	Closing
55	Closed
56)
57
58// (Internal note) Collector Lifecycle:
59// - New constructs a new Collector.
60// - Run starts the collector and calls (*Collector).execute.
61// - execute calls setupConfigurationComponents to handle configuration.
62//   If configuration parser fails, collector's config can be reloaded.
63//   Collector can be shutdown if parser gets a shutdown error.
64// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event.
65//   SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events.
66// - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down.
67// - Users can call (*Collector).Shutdown anytime to shutdown the collector.
68
69// Collector represents a server providing the OpenTelemetry Collector service.
70type Collector struct {
71	info    component.BuildInfo
72	rootCmd *cobra.Command
73	logger  *zap.Logger
74
75	tracerProvider      *sdktrace.TracerProvider
76	zPagesSpanProcessor *zpages.SpanProcessor
77
78	service      *service
79	stateChannel chan State
80
81	factories component.Factories
82
83	parserProvider parserprovider.ParserProvider
84
85	// shutdownChan is used to terminate the collector.
86	shutdownChan chan struct{}
87
88	// signalsChannel is used to receive termination signals from the OS.
89	signalsChannel chan os.Signal
90
91	allowGracefulShutodwn bool
92
93	// asyncErrorChannel is used to signal a fatal error from any component.
94	asyncErrorChannel chan error
95}
96
97// New creates and returns a new instance of Collector.
98func New(set CollectorSettings) (*Collector, error) {
99	if err := configcheck.ValidateConfigFromFactories(set.Factories); err != nil {
100		return nil, err
101	}
102
103	col := &Collector{
104		info:         set.BuildInfo,
105		factories:    set.Factories,
106		stateChannel: make(chan State, Closed+1),
107		// We use a negative in the settings not to break the existing
108		// behavior. Internally, allowGracefulShutodwn is more readable.
109		allowGracefulShutodwn: !set.DisableGracefulShutdown,
110	}
111
112	rootCmd := &cobra.Command{
113		Use:     set.BuildInfo.Command,
114		Version: set.BuildInfo.Version,
115		RunE: func(cmd *cobra.Command, args []string) error {
116			var err error
117			if col.logger, err = newLogger(set.LoggingOptions); err != nil {
118				return fmt.Errorf("failed to get logger: %w", err)
119			}
120
121			col.zPagesSpanProcessor = zpages.NewSpanProcessor()
122			col.tracerProvider = sdktrace.NewTracerProvider(
123				sdktrace.WithSampler(internal.AlwaysRecord()),
124				sdktrace.WithSpanProcessor(col.zPagesSpanProcessor))
125
126			// Set the constructed tracer provider as Global, in case any component uses the
127			// global TracerProvider.
128			otel.SetTracerProvider(col.tracerProvider)
129
130			return col.execute(cmd.Context())
131		},
132	}
133
134	// TODO: coalesce this code and expose this information to other components.
135	flagSet := new(flag.FlagSet)
136	addFlagsFns := []func(*flag.FlagSet){
137		configtelemetry.Flags,
138		parserprovider.Flags,
139		telemetry.Flags,
140		builder.Flags,
141		loggerFlags,
142	}
143	for _, addFlags := range addFlagsFns {
144		addFlags(flagSet)
145	}
146	rootCmd.Flags().AddGoFlagSet(flagSet)
147	col.rootCmd = rootCmd
148
149	parserProvider := set.ParserProvider
150	if parserProvider == nil {
151		// use default provider.
152		parserProvider = parserprovider.Default()
153	}
154	col.parserProvider = parserProvider
155
156	return col, nil
157}
158
159// Run starts the collector according to the command and configuration
160// given by the user, and waits for it to complete.
161// Consecutive calls to Run are not allowed, Run shouldn't be called
162// once a collector is shut down.
163func (col *Collector) Run() error {
164	// From this point on do not show usage in case of error.
165	col.rootCmd.SilenceUsage = true
166
167	return col.rootCmd.Execute()
168}
169
170// GetStateChannel returns state channel of the collector server.
171func (col *Collector) GetStateChannel() chan State {
172	return col.stateChannel
173}
174
175// Command returns Collector's root command.
176func (col *Collector) Command() *cobra.Command {
177	return col.rootCmd
178}
179
180// GetLogger returns logger used by the Collector.
181// The logger is initialized after collector server start.
182func (col *Collector) GetLogger() *zap.Logger {
183	return col.logger
184}
185
186// Shutdown shuts down the collector server.
187func (col *Collector) Shutdown() {
188	// TODO: Implement a proper shutdown with graceful draining of the pipeline.
189	// See https://github.com/open-telemetry/opentelemetry-collector/issues/483.
190	defer func() {
191		if r := recover(); r != nil {
192			col.logger.Info("shutdownChan already closed")
193		}
194	}()
195	close(col.shutdownChan)
196}
197
198func (col *Collector) setupTelemetry(ballastSizeBytes uint64) error {
199	col.logger.Info("Setting up own telemetry...")
200
201	err := collectorTelemetry.init(col.asyncErrorChannel, ballastSizeBytes, col.logger)
202	if err != nil {
203		return fmt.Errorf("failed to initialize telemetry: %w", err)
204	}
205
206	return nil
207}
208
209// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen.
210func (col *Collector) runAndWaitForShutdownEvent() {
211	col.logger.Info("Everything is ready. Begin running and processing data.")
212
213	col.signalsChannel = make(chan os.Signal, 1)
214	// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
215	if col.allowGracefulShutodwn {
216		signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
217	}
218
219	col.shutdownChan = make(chan struct{})
220	col.stateChannel <- Running
221	select {
222	case err := <-col.asyncErrorChannel:
223		col.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
224	case s := <-col.signalsChannel:
225		col.logger.Info("Received signal from OS", zap.String("signal", s.String()))
226	case <-col.shutdownChan:
227		col.logger.Info("Received shutdown request")
228	}
229	col.stateChannel <- Closing
230}
231
232// setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it
233// sets the col.service with the service currently running.
234func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
235	col.logger.Info("Loading configuration...")
236
237	cp, err := col.parserProvider.Get()
238	if err != nil {
239		return fmt.Errorf("cannot load configuration's parser: %w", err)
240	}
241
242	cfg, err := configloader.Load(cp, col.factories)
243	if err != nil {
244		return fmt.Errorf("cannot load configuration: %w", err)
245	}
246
247	if err = cfg.Validate(); err != nil {
248		return fmt.Errorf("invalid configuration: %w", err)
249	}
250
251	col.logger.Info("Applying configuration...")
252
253	service, err := newService(&svcSettings{
254		BuildInfo:           col.info,
255		Factories:           col.factories,
256		Config:              cfg,
257		Logger:              col.logger,
258		TracerProvider:      col.tracerProvider,
259		ZPagesSpanProcessor: col.zPagesSpanProcessor,
260		AsyncErrorChannel:   col.asyncErrorChannel,
261	})
262	if err != nil {
263		return err
264	}
265
266	err = service.Start(ctx)
267	if err != nil {
268		return err
269	}
270
271	col.service = service
272
273	// If provider is watchable start a goroutine watching for updates.
274	if watchable, ok := col.parserProvider.(parserprovider.Watchable); ok {
275		go func() {
276			err := watchable.WatchForUpdate()
277			switch {
278			// TODO: Move configsource.ErrSessionClosed to providerparser package to avoid depending on configsource.
279			case errors.Is(err, configsource.ErrSessionClosed):
280				// This is the case of shutdown of the whole collector server, nothing to do.
281				col.logger.Info("Config WatchForUpdate closed", zap.Error(err))
282				return
283			default:
284				col.logger.Warn("Config WatchForUpdated exited", zap.Error(err))
285				if err := col.reloadService(context.Background()); err != nil {
286					col.asyncErrorChannel <- err
287				}
288			}
289		}()
290	}
291
292	return nil
293}
294
295func (col *Collector) execute(ctx context.Context) error {
296	col.logger.Info("Starting "+col.info.Command+"...",
297		zap.String("Version", col.info.Version),
298		zap.Int("NumCPU", runtime.NumCPU()),
299	)
300	col.stateChannel <- Starting
301
302	//  Add `mem-ballast-size-mib` warning message if it is still enabled
303	//  TODO: will remove all `mem-ballast-size-mib` footprints after some baking time.
304	if builder.MemBallastSize() > 0 {
305		col.logger.Warn("`mem-ballast-size-mib` command line option has been deprecated. Please use `ballast extension` instead!")
306	}
307
308	col.asyncErrorChannel = make(chan error)
309
310	err := col.setupConfigurationComponents(ctx)
311	if err != nil {
312		return err
313	}
314
315	// Get ballastSizeBytes if ballast extension is enabled
316	ballastSizeBytes := col.getBallastSize()
317
318	// Setup Telemetry.
319	err = col.setupTelemetry(ballastSizeBytes)
320	if err != nil {
321		return err
322	}
323
324	// Everything is ready, now run until an event requiring shutdown happens.
325	col.runAndWaitForShutdownEvent()
326
327	// Accumulate errors and proceed with shutting down remaining components.
328	var errs []error
329
330	// Begin shutdown sequence.
331	col.logger.Info("Starting shutdown...")
332
333	if closable, ok := col.parserProvider.(parserprovider.Closeable); ok {
334		if err := closable.Close(ctx); err != nil {
335			errs = append(errs, fmt.Errorf("failed to close config: %w", err))
336		}
337	}
338
339	if col.service != nil {
340		if err := col.service.Shutdown(ctx); err != nil {
341			errs = append(errs, fmt.Errorf("failed to shutdown service: %w", err))
342		}
343	}
344
345	if err := collectorTelemetry.shutdown(); err != nil {
346		errs = append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
347	}
348
349	col.logger.Info("Shutdown complete.")
350	col.stateChannel <- Closed
351	close(col.stateChannel)
352
353	return consumererror.Combine(errs)
354}
355
356// reloadService shutdowns the current col.service and setups a new one according
357// to the latest configuration. It requires that col.parserProvider and col.factories
358// are properly populated to finish successfully.
359func (col *Collector) reloadService(ctx context.Context) error {
360	if closeable, ok := col.parserProvider.(parserprovider.Closeable); ok {
361		if err := closeable.Close(ctx); err != nil {
362			return fmt.Errorf("failed close current config provider: %w", err)
363		}
364	}
365
366	if col.service != nil {
367		retiringService := col.service
368		col.service = nil
369		if err := retiringService.Shutdown(ctx); err != nil {
370			return fmt.Errorf("failed to shutdown the retiring config: %w", err)
371		}
372	}
373
374	if err := col.setupConfigurationComponents(ctx); err != nil {
375		return fmt.Errorf("failed to setup configuration components: %w", err)
376	}
377
378	return nil
379}
380
381func (col *Collector) getBallastSize() uint64 {
382	var ballastSize uint64
383	extensions := col.service.GetExtensions()
384	for _, extension := range extensions {
385		if ext, ok := extension.(*ballastextension.MemoryBallast); ok {
386			ballastSize = ext.GetBallastSize()
387			break
388		}
389	}
390	return ballastSize
391}
392