1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package service handles the command-line, configuration, and runs the 16// OpenTelemetry Collector. 17package service 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "os" 25 "os/signal" 26 "runtime" 27 "syscall" 28 29 "github.com/spf13/cobra" 30 "go.opentelemetry.io/contrib/zpages" 31 "go.opentelemetry.io/otel" 32 sdktrace "go.opentelemetry.io/otel/sdk/trace" 33 "go.uber.org/zap" 34 35 "go.opentelemetry.io/collector/component" 36 "go.opentelemetry.io/collector/config/configcheck" 37 "go.opentelemetry.io/collector/config/configloader" 38 "go.opentelemetry.io/collector/config/configtelemetry" 39 "go.opentelemetry.io/collector/config/experimental/configsource" 40 "go.opentelemetry.io/collector/consumer/consumererror" 41 "go.opentelemetry.io/collector/extension/ballastextension" 42 "go.opentelemetry.io/collector/internal/collector/telemetry" 43 "go.opentelemetry.io/collector/service/internal" 44 "go.opentelemetry.io/collector/service/internal/builder" 45 "go.opentelemetry.io/collector/service/parserprovider" 46) 47 48// State defines Collector's state. 49type State int 50 51const ( 52 Starting State = iota 53 Running 54 Closing 55 Closed 56) 57 58// (Internal note) Collector Lifecycle: 59// - New constructs a new Collector. 60// - Run starts the collector and calls (*Collector).execute. 61// - execute calls setupConfigurationComponents to handle configuration. 62// If configuration parser fails, collector's config can be reloaded. 63// Collector can be shutdown if parser gets a shutdown error. 64// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event. 65// SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events. 66// - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down. 67// - Users can call (*Collector).Shutdown anytime to shutdown the collector. 68 69// Collector represents a server providing the OpenTelemetry Collector service. 70type Collector struct { 71 info component.BuildInfo 72 rootCmd *cobra.Command 73 logger *zap.Logger 74 75 tracerProvider *sdktrace.TracerProvider 76 zPagesSpanProcessor *zpages.SpanProcessor 77 78 service *service 79 stateChannel chan State 80 81 factories component.Factories 82 83 parserProvider parserprovider.ParserProvider 84 85 // shutdownChan is used to terminate the collector. 86 shutdownChan chan struct{} 87 88 // signalsChannel is used to receive termination signals from the OS. 89 signalsChannel chan os.Signal 90 91 allowGracefulShutodwn bool 92 93 // asyncErrorChannel is used to signal a fatal error from any component. 94 asyncErrorChannel chan error 95} 96 97// New creates and returns a new instance of Collector. 98func New(set CollectorSettings) (*Collector, error) { 99 if err := configcheck.ValidateConfigFromFactories(set.Factories); err != nil { 100 return nil, err 101 } 102 103 col := &Collector{ 104 info: set.BuildInfo, 105 factories: set.Factories, 106 stateChannel: make(chan State, Closed+1), 107 // We use a negative in the settings not to break the existing 108 // behavior. Internally, allowGracefulShutodwn is more readable. 109 allowGracefulShutodwn: !set.DisableGracefulShutdown, 110 } 111 112 rootCmd := &cobra.Command{ 113 Use: set.BuildInfo.Command, 114 Version: set.BuildInfo.Version, 115 RunE: func(cmd *cobra.Command, args []string) error { 116 var err error 117 if col.logger, err = newLogger(set.LoggingOptions); err != nil { 118 return fmt.Errorf("failed to get logger: %w", err) 119 } 120 121 col.zPagesSpanProcessor = zpages.NewSpanProcessor() 122 col.tracerProvider = sdktrace.NewTracerProvider( 123 sdktrace.WithSampler(internal.AlwaysRecord()), 124 sdktrace.WithSpanProcessor(col.zPagesSpanProcessor)) 125 126 // Set the constructed tracer provider as Global, in case any component uses the 127 // global TracerProvider. 128 otel.SetTracerProvider(col.tracerProvider) 129 130 return col.execute(cmd.Context()) 131 }, 132 } 133 134 // TODO: coalesce this code and expose this information to other components. 135 flagSet := new(flag.FlagSet) 136 addFlagsFns := []func(*flag.FlagSet){ 137 configtelemetry.Flags, 138 parserprovider.Flags, 139 telemetry.Flags, 140 builder.Flags, 141 loggerFlags, 142 } 143 for _, addFlags := range addFlagsFns { 144 addFlags(flagSet) 145 } 146 rootCmd.Flags().AddGoFlagSet(flagSet) 147 col.rootCmd = rootCmd 148 149 parserProvider := set.ParserProvider 150 if parserProvider == nil { 151 // use default provider. 152 parserProvider = parserprovider.Default() 153 } 154 col.parserProvider = parserProvider 155 156 return col, nil 157} 158 159// Run starts the collector according to the command and configuration 160// given by the user, and waits for it to complete. 161// Consecutive calls to Run are not allowed, Run shouldn't be called 162// once a collector is shut down. 163func (col *Collector) Run() error { 164 // From this point on do not show usage in case of error. 165 col.rootCmd.SilenceUsage = true 166 167 return col.rootCmd.Execute() 168} 169 170// GetStateChannel returns state channel of the collector server. 171func (col *Collector) GetStateChannel() chan State { 172 return col.stateChannel 173} 174 175// Command returns Collector's root command. 176func (col *Collector) Command() *cobra.Command { 177 return col.rootCmd 178} 179 180// GetLogger returns logger used by the Collector. 181// The logger is initialized after collector server start. 182func (col *Collector) GetLogger() *zap.Logger { 183 return col.logger 184} 185 186// Shutdown shuts down the collector server. 187func (col *Collector) Shutdown() { 188 // TODO: Implement a proper shutdown with graceful draining of the pipeline. 189 // See https://github.com/open-telemetry/opentelemetry-collector/issues/483. 190 defer func() { 191 if r := recover(); r != nil { 192 col.logger.Info("shutdownChan already closed") 193 } 194 }() 195 close(col.shutdownChan) 196} 197 198func (col *Collector) setupTelemetry(ballastSizeBytes uint64) error { 199 col.logger.Info("Setting up own telemetry...") 200 201 err := collectorTelemetry.init(col.asyncErrorChannel, ballastSizeBytes, col.logger) 202 if err != nil { 203 return fmt.Errorf("failed to initialize telemetry: %w", err) 204 } 205 206 return nil 207} 208 209// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen. 210func (col *Collector) runAndWaitForShutdownEvent() { 211 col.logger.Info("Everything is ready. Begin running and processing data.") 212 213 col.signalsChannel = make(chan os.Signal, 1) 214 // Only notify with SIGTERM and SIGINT if graceful shutdown is enabled. 215 if col.allowGracefulShutodwn { 216 signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) 217 } 218 219 col.shutdownChan = make(chan struct{}) 220 col.stateChannel <- Running 221 select { 222 case err := <-col.asyncErrorChannel: 223 col.logger.Error("Asynchronous error received, terminating process", zap.Error(err)) 224 case s := <-col.signalsChannel: 225 col.logger.Info("Received signal from OS", zap.String("signal", s.String())) 226 case <-col.shutdownChan: 227 col.logger.Info("Received shutdown request") 228 } 229 col.stateChannel <- Closing 230} 231 232// setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it 233// sets the col.service with the service currently running. 234func (col *Collector) setupConfigurationComponents(ctx context.Context) error { 235 col.logger.Info("Loading configuration...") 236 237 cp, err := col.parserProvider.Get() 238 if err != nil { 239 return fmt.Errorf("cannot load configuration's parser: %w", err) 240 } 241 242 cfg, err := configloader.Load(cp, col.factories) 243 if err != nil { 244 return fmt.Errorf("cannot load configuration: %w", err) 245 } 246 247 if err = cfg.Validate(); err != nil { 248 return fmt.Errorf("invalid configuration: %w", err) 249 } 250 251 col.logger.Info("Applying configuration...") 252 253 service, err := newService(&svcSettings{ 254 BuildInfo: col.info, 255 Factories: col.factories, 256 Config: cfg, 257 Logger: col.logger, 258 TracerProvider: col.tracerProvider, 259 ZPagesSpanProcessor: col.zPagesSpanProcessor, 260 AsyncErrorChannel: col.asyncErrorChannel, 261 }) 262 if err != nil { 263 return err 264 } 265 266 err = service.Start(ctx) 267 if err != nil { 268 return err 269 } 270 271 col.service = service 272 273 // If provider is watchable start a goroutine watching for updates. 274 if watchable, ok := col.parserProvider.(parserprovider.Watchable); ok { 275 go func() { 276 err := watchable.WatchForUpdate() 277 switch { 278 // TODO: Move configsource.ErrSessionClosed to providerparser package to avoid depending on configsource. 279 case errors.Is(err, configsource.ErrSessionClosed): 280 // This is the case of shutdown of the whole collector server, nothing to do. 281 col.logger.Info("Config WatchForUpdate closed", zap.Error(err)) 282 return 283 default: 284 col.logger.Warn("Config WatchForUpdated exited", zap.Error(err)) 285 if err := col.reloadService(context.Background()); err != nil { 286 col.asyncErrorChannel <- err 287 } 288 } 289 }() 290 } 291 292 return nil 293} 294 295func (col *Collector) execute(ctx context.Context) error { 296 col.logger.Info("Starting "+col.info.Command+"...", 297 zap.String("Version", col.info.Version), 298 zap.Int("NumCPU", runtime.NumCPU()), 299 ) 300 col.stateChannel <- Starting 301 302 // Add `mem-ballast-size-mib` warning message if it is still enabled 303 // TODO: will remove all `mem-ballast-size-mib` footprints after some baking time. 304 if builder.MemBallastSize() > 0 { 305 col.logger.Warn("`mem-ballast-size-mib` command line option has been deprecated. Please use `ballast extension` instead!") 306 } 307 308 col.asyncErrorChannel = make(chan error) 309 310 err := col.setupConfigurationComponents(ctx) 311 if err != nil { 312 return err 313 } 314 315 // Get ballastSizeBytes if ballast extension is enabled 316 ballastSizeBytes := col.getBallastSize() 317 318 // Setup Telemetry. 319 err = col.setupTelemetry(ballastSizeBytes) 320 if err != nil { 321 return err 322 } 323 324 // Everything is ready, now run until an event requiring shutdown happens. 325 col.runAndWaitForShutdownEvent() 326 327 // Accumulate errors and proceed with shutting down remaining components. 328 var errs []error 329 330 // Begin shutdown sequence. 331 col.logger.Info("Starting shutdown...") 332 333 if closable, ok := col.parserProvider.(parserprovider.Closeable); ok { 334 if err := closable.Close(ctx); err != nil { 335 errs = append(errs, fmt.Errorf("failed to close config: %w", err)) 336 } 337 } 338 339 if col.service != nil { 340 if err := col.service.Shutdown(ctx); err != nil { 341 errs = append(errs, fmt.Errorf("failed to shutdown service: %w", err)) 342 } 343 } 344 345 if err := collectorTelemetry.shutdown(); err != nil { 346 errs = append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err)) 347 } 348 349 col.logger.Info("Shutdown complete.") 350 col.stateChannel <- Closed 351 close(col.stateChannel) 352 353 return consumererror.Combine(errs) 354} 355 356// reloadService shutdowns the current col.service and setups a new one according 357// to the latest configuration. It requires that col.parserProvider and col.factories 358// are properly populated to finish successfully. 359func (col *Collector) reloadService(ctx context.Context) error { 360 if closeable, ok := col.parserProvider.(parserprovider.Closeable); ok { 361 if err := closeable.Close(ctx); err != nil { 362 return fmt.Errorf("failed close current config provider: %w", err) 363 } 364 } 365 366 if col.service != nil { 367 retiringService := col.service 368 col.service = nil 369 if err := retiringService.Shutdown(ctx); err != nil { 370 return fmt.Errorf("failed to shutdown the retiring config: %w", err) 371 } 372 } 373 374 if err := col.setupConfigurationComponents(ctx); err != nil { 375 return fmt.Errorf("failed to setup configuration components: %w", err) 376 } 377 378 return nil 379} 380 381func (col *Collector) getBallastSize() uint64 { 382 var ballastSize uint64 383 extensions := col.service.GetExtensions() 384 for _, extension := range extensions { 385 if ext, ok := extension.(*ballastextension.MemoryBallast); ok { 386 ballastSize = ext.GetBallastSize() 387 break 388 } 389 } 390 return ballastSize 391} 392