1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package configloader 16 17import ( 18 "errors" 19 "fmt" 20 "os" 21 "reflect" 22 23 "github.com/spf13/cast" 24 "github.com/spf13/viper" 25 26 "go.opentelemetry.io/collector/component" 27 "go.opentelemetry.io/collector/config" 28 "go.opentelemetry.io/collector/config/configparser" 29) 30 31// These are errors that can be returned by Load(). Note that error codes are not part 32// of Load()'s public API, they are for internal unit testing only. 33type configErrorCode int 34 35const ( 36 // Skip 0, start errors codes from 1. 37 _ configErrorCode = iota 38 39 errInvalidTypeAndNameKey 40 errUnknownType 41 errDuplicateName 42 errUnmarshalTopLevelStructureError 43) 44 45type configError struct { 46 // Human readable error message. 47 msg string 48 49 // Internal error code. 50 code configErrorCode 51} 52 53func (e *configError) Error() string { 54 return e.msg 55} 56 57// YAML top-level configuration keys. 58const ( 59 // extensionsKeyName is the configuration key name for extensions section. 60 extensionsKeyName = "extensions" 61 62 // receiversKeyName is the configuration key name for receivers section. 63 receiversKeyName = "receivers" 64 65 // exportersKeyName is the configuration key name for exporters section. 66 exportersKeyName = "exporters" 67 68 // processorsKeyName is the configuration key name for processors section. 69 processorsKeyName = "processors" 70 71 // pipelinesKeyName is the configuration key name for pipelines section. 72 pipelinesKeyName = "pipelines" 73) 74 75type configSettings struct { 76 Receivers map[string]map[string]interface{} `mapstructure:"receivers"` 77 Processors map[string]map[string]interface{} `mapstructure:"processors"` 78 Exporters map[string]map[string]interface{} `mapstructure:"exporters"` 79 Extensions map[string]map[string]interface{} `mapstructure:"extensions"` 80 Service serviceSettings `mapstructure:"service"` 81} 82 83type serviceSettings struct { 84 Extensions []string `mapstructure:"extensions"` 85 Pipelines map[string]pipelineSettings `mapstructure:"pipelines"` 86} 87 88type pipelineSettings struct { 89 Receivers []string `mapstructure:"receivers"` 90 Processors []string `mapstructure:"processors"` 91 Exporters []string `mapstructure:"exporters"` 92} 93 94// Load loads a Config from Parser. 95// After loading the config, `Validate()` must be called to validate. 96func Load(v *configparser.Parser, factories component.Factories) (*config.Config, error) { 97 var cfg config.Config 98 99 // Load the config. 100 101 // Struct to validate top level sections. 102 var rawCfg configSettings 103 if err := v.UnmarshalExact(&rawCfg); err != nil { 104 return nil, &configError{ 105 code: errUnmarshalTopLevelStructureError, 106 msg: fmt.Sprintf("error reading top level configuration sections: %s", err.Error()), 107 } 108 } 109 110 // In the following section use v.GetStringMap(xyzKeyName) instead of rawCfg.Xyz, because 111 // UnmarshalExact will not unmarshal entries in the map[string]interface{} with nil values. 112 // GetStringMap does the correct thing. 113 114 // Start with the service extensions. 115 116 extensions, err := loadExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions) 117 if err != nil { 118 return nil, err 119 } 120 cfg.Extensions = extensions 121 122 // Load data components (receivers, exporters, and processors). 123 124 receivers, err := loadReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers) 125 if err != nil { 126 return nil, err 127 } 128 cfg.Receivers = receivers 129 130 exporters, err := loadExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters) 131 if err != nil { 132 return nil, err 133 } 134 cfg.Exporters = exporters 135 136 processors, err := loadProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors) 137 if err != nil { 138 return nil, err 139 } 140 cfg.Processors = processors 141 142 // Load the service and its data pipelines. 143 service, err := loadService(rawCfg.Service) 144 if err != nil { 145 return nil, err 146 } 147 cfg.Service = service 148 149 return &cfg, nil 150} 151 152func errorInvalidTypeAndNameKey(component, key string, err error) error { 153 return &configError{ 154 code: errInvalidTypeAndNameKey, 155 msg: fmt.Sprintf("invalid %s type and name key %q: %v", component, key, err), 156 } 157} 158 159func errorUnknownType(component string, id config.ComponentID) error { 160 return &configError{ 161 code: errUnknownType, 162 msg: fmt.Sprintf("unknown %s type %q for %v", component, id.Type(), id), 163 } 164} 165 166func errorUnmarshalError(component string, id config.ComponentID, err error) error { 167 return &configError{ 168 code: errUnmarshalTopLevelStructureError, 169 msg: fmt.Sprintf("error reading %s configuration for %v: %v", component, id, err), 170 } 171} 172 173func errorDuplicateName(component string, id config.ComponentID) error { 174 return &configError{ 175 code: errDuplicateName, 176 msg: fmt.Sprintf("duplicate %s name %v", component, id), 177 } 178} 179 180func loadExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) { 181 // Prepare resulting map. 182 extensions := make(config.Extensions) 183 184 // Iterate over extensions and create a config for each. 185 for key, value := range exts { 186 componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value)) 187 expandEnvConfig(componentConfig) 188 189 // Decode the key into type and fullName components. 190 id, err := config.NewIDFromString(key) 191 if err != nil { 192 return nil, errorInvalidTypeAndNameKey(extensionsKeyName, key, err) 193 } 194 195 // Find extension factory based on "type" that we read from config source. 196 factory := factories[id.Type()] 197 if factory == nil { 198 return nil, errorUnknownType(extensionsKeyName, id) 199 } 200 201 // Create the default config for this extension. 202 extensionCfg := factory.CreateDefaultConfig() 203 extensionCfg.SetIDName(id.Name()) 204 expandEnvLoadedConfig(extensionCfg) 205 206 // Now that the default config struct is created we can Unmarshal into it 207 // and it will apply user-defined config on top of the default. 208 unm := unmarshaler(factory) 209 if err := unm(componentConfig, extensionCfg); err != nil { 210 return nil, errorUnmarshalError(extensionsKeyName, id, err) 211 } 212 213 if extensions[id] != nil { 214 return nil, errorDuplicateName(extensionsKeyName, id) 215 } 216 217 extensions[id] = extensionCfg 218 } 219 220 return extensions, nil 221} 222 223func loadService(rawService serviceSettings) (config.Service, error) { 224 var ret config.Service 225 ret.Extensions = make([]config.ComponentID, 0, len(rawService.Extensions)) 226 for _, extIDStr := range rawService.Extensions { 227 id, err := config.NewIDFromString(extIDStr) 228 if err != nil { 229 return ret, err 230 } 231 ret.Extensions = append(ret.Extensions, id) 232 } 233 234 // Process the pipelines first so in case of error on them it can be properly 235 // reported. 236 pipelines, err := loadPipelines(rawService.Pipelines) 237 ret.Pipelines = pipelines 238 239 return ret, err 240} 241 242// LoadReceiver loads a receiver config from componentConfig using the provided factories. 243func LoadReceiver(componentConfig *configparser.Parser, id config.ComponentID, factory component.ReceiverFactory) (config.Receiver, error) { 244 // Create the default config for this receiver. 245 receiverCfg := factory.CreateDefaultConfig() 246 receiverCfg.SetIDName(id.Name()) 247 expandEnvLoadedConfig(receiverCfg) 248 249 // Now that the default config struct is created we can Unmarshal into it 250 // and it will apply user-defined config on top of the default. 251 unm := unmarshaler(factory) 252 if err := unm(componentConfig, receiverCfg); err != nil { 253 return nil, errorUnmarshalError(receiversKeyName, id, err) 254 } 255 256 return receiverCfg, nil 257} 258 259func loadReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) { 260 // Prepare resulting map. 261 receivers := make(config.Receivers) 262 263 // Iterate over input map and create a config for each. 264 for key, value := range recvs { 265 componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value)) 266 expandEnvConfig(componentConfig) 267 268 // Decode the key into type and fullName components. 269 id, err := config.NewIDFromString(key) 270 if err != nil { 271 return nil, errorInvalidTypeAndNameKey(receiversKeyName, key, err) 272 } 273 274 // Find receiver factory based on "type" that we read from config source. 275 factory := factories[id.Type()] 276 if factory == nil { 277 return nil, errorUnknownType(receiversKeyName, id) 278 } 279 280 receiverCfg, err := LoadReceiver(componentConfig, id, factory) 281 282 if err != nil { 283 // LoadReceiver already wraps the error. 284 return nil, err 285 } 286 287 if receivers[id] != nil { 288 return nil, errorDuplicateName(receiversKeyName, id) 289 } 290 receivers[id] = receiverCfg 291 } 292 293 return receivers, nil 294} 295 296func loadExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) { 297 // Prepare resulting map. 298 exporters := make(config.Exporters) 299 300 // Iterate over Exporters and create a config for each. 301 for key, value := range exps { 302 componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value)) 303 expandEnvConfig(componentConfig) 304 305 // Decode the key into type and fullName components. 306 id, err := config.NewIDFromString(key) 307 if err != nil { 308 return nil, errorInvalidTypeAndNameKey(exportersKeyName, key, err) 309 } 310 311 // Find exporter factory based on "type" that we read from config source. 312 factory := factories[id.Type()] 313 if factory == nil { 314 return nil, errorUnknownType(exportersKeyName, id) 315 } 316 317 // Create the default config for this exporter. 318 exporterCfg := factory.CreateDefaultConfig() 319 exporterCfg.SetIDName(id.Name()) 320 expandEnvLoadedConfig(exporterCfg) 321 322 // Now that the default config struct is created we can Unmarshal into it 323 // and it will apply user-defined config on top of the default. 324 unm := unmarshaler(factory) 325 if err := unm(componentConfig, exporterCfg); err != nil { 326 return nil, errorUnmarshalError(exportersKeyName, id, err) 327 } 328 329 if exporters[id] != nil { 330 return nil, errorDuplicateName(exportersKeyName, id) 331 } 332 333 exporters[id] = exporterCfg 334 } 335 336 return exporters, nil 337} 338 339func loadProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) { 340 // Prepare resulting map. 341 processors := make(config.Processors) 342 343 // Iterate over processors and create a config for each. 344 for key, value := range procs { 345 componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value)) 346 expandEnvConfig(componentConfig) 347 348 // Decode the key into type and fullName components. 349 id, err := config.NewIDFromString(key) 350 if err != nil { 351 return nil, errorInvalidTypeAndNameKey(processorsKeyName, key, err) 352 } 353 354 // Find processor factory based on "type" that we read from config source. 355 factory := factories[id.Type()] 356 if factory == nil { 357 return nil, errorUnknownType(processorsKeyName, id) 358 } 359 360 // Create the default config for this processor. 361 processorCfg := factory.CreateDefaultConfig() 362 processorCfg.SetIDName(id.Name()) 363 expandEnvLoadedConfig(processorCfg) 364 365 // Now that the default config struct is created we can Unmarshal into it 366 // and it will apply user-defined config on top of the default. 367 unm := unmarshaler(factory) 368 if err := unm(componentConfig, processorCfg); err != nil { 369 return nil, errorUnmarshalError(processorsKeyName, id, err) 370 } 371 372 if processors[id] != nil { 373 return nil, errorDuplicateName(processorsKeyName, id) 374 } 375 376 processors[id] = processorCfg 377 } 378 379 return processors, nil 380} 381 382func loadPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) { 383 // Prepare resulting map. 384 pipelines := make(config.Pipelines) 385 386 // Iterate over input map and create a config for each. 387 for key, rawPipeline := range pipelinesConfig { 388 // Decode the key into type and name components. 389 id, err := config.NewIDFromString(key) 390 if err != nil { 391 return nil, errorInvalidTypeAndNameKey(pipelinesKeyName, key, err) 392 } 393 fullName := id.String() 394 395 // Create the config for this pipeline. 396 var pipelineCfg config.Pipeline 397 398 // Set the type. 399 pipelineCfg.InputType = config.DataType(id.Type()) 400 switch pipelineCfg.InputType { 401 case config.TracesDataType: 402 case config.MetricsDataType: 403 case config.LogsDataType: 404 default: 405 return nil, errorUnknownType(pipelinesKeyName, id) 406 } 407 408 pipelineCfg.Name = fullName 409 if pipelineCfg.Receivers, err = parseIDNames(id, receiversKeyName, rawPipeline.Receivers); err != nil { 410 return nil, err 411 } 412 if pipelineCfg.Processors, err = parseIDNames(id, processorsKeyName, rawPipeline.Processors); err != nil { 413 return nil, err 414 } 415 if pipelineCfg.Exporters, err = parseIDNames(id, exportersKeyName, rawPipeline.Exporters); err != nil { 416 return nil, err 417 } 418 419 if pipelines[fullName] != nil { 420 return nil, errorDuplicateName(pipelinesKeyName, id) 421 } 422 423 pipelines[fullName] = &pipelineCfg 424 } 425 426 return pipelines, nil 427} 428 429func parseIDNames(pipelineID config.ComponentID, componentType string, names []string) ([]config.ComponentID, error) { 430 var ret []config.ComponentID 431 for _, idProcStr := range names { 432 idRecv, err := config.NewIDFromString(idProcStr) 433 if err != nil { 434 return nil, fmt.Errorf("pipelines: config for %v contains invalid %s name %s : %w", pipelineID, componentType, idProcStr, err) 435 } 436 ret = append(ret, idRecv) 437 } 438 return ret, nil 439} 440 441// expandEnvConfig updates a configparser.Parser with expanded values for all the values (simple, list or map value). 442// It does not expand the keys. 443func expandEnvConfig(v *configparser.Parser) { 444 for _, k := range v.AllKeys() { 445 v.Set(k, expandStringValues(v.Get(k))) 446 } 447} 448 449func expandStringValues(value interface{}) interface{} { 450 switch v := value.(type) { 451 default: 452 return v 453 case string: 454 return expandEnv(v) 455 case []interface{}: 456 nslice := make([]interface{}, 0, len(v)) 457 for _, vint := range v { 458 nslice = append(nslice, expandStringValues(vint)) 459 } 460 return nslice 461 case map[string]interface{}: 462 nmap := make(map[interface{}]interface{}, len(v)) 463 for k, vint := range v { 464 nmap[k] = expandStringValues(vint) 465 } 466 return nmap 467 case map[interface{}]interface{}: 468 nmap := make(map[interface{}]interface{}, len(v)) 469 for k, vint := range v { 470 nmap[k] = expandStringValues(vint) 471 } 472 return nmap 473 } 474} 475 476// expandEnvLoadedConfig is a utility function that goes recursively through a config object 477// and tries to expand environment variables in its string fields. 478func expandEnvLoadedConfig(s interface{}) { 479 expandEnvLoadedConfigPointer(s) 480} 481 482func expandEnvLoadedConfigPointer(s interface{}) { 483 // Check that the value given is indeed a pointer, otherwise safely stop the search here 484 value := reflect.ValueOf(s) 485 if value.Kind() != reflect.Ptr { 486 return 487 } 488 // Run expandLoadedConfigValue on the value behind the pointer. 489 expandEnvLoadedConfigValue(value.Elem()) 490} 491 492func expandEnvLoadedConfigValue(value reflect.Value) { 493 // The value given is a string, we expand it (if allowed). 494 if value.Kind() == reflect.String && value.CanSet() { 495 value.SetString(expandEnv(value.String())) 496 } 497 // The value given is a struct, we go through its fields. 498 if value.Kind() == reflect.Struct { 499 for i := 0; i < value.NumField(); i++ { 500 // Returns the content of the field. 501 field := value.Field(i) 502 503 // Only try to modify a field if it can be modified (eg. skip unexported private fields). 504 if field.CanSet() { 505 switch field.Kind() { 506 case reflect.String: 507 // The current field is a string, expand env variables in the string. 508 field.SetString(expandEnv(field.String())) 509 case reflect.Ptr: 510 // The current field is a pointer, run the expansion function on the pointer. 511 expandEnvLoadedConfigPointer(field.Interface()) 512 case reflect.Struct: 513 // The current field is a nested struct, go through the nested struct 514 expandEnvLoadedConfigValue(field) 515 } 516 } 517 } 518 } 519} 520 521func expandEnv(s string) string { 522 return os.Expand(s, func(str string) string { 523 // This allows escaping environment variable substitution via $$, e.g. 524 // - $FOO will be substituted with env var FOO 525 // - $$FOO will be replaced with $FOO 526 // - $$$FOO will be replaced with $ + substituted env var FOO 527 if str == "$" { 528 return "$" 529 } 530 return os.Getenv(str) 531 }) 532} 533 534// deprecatedUnmarshaler interface is a deprecated optional interface that if implemented by a Factory, 535// the configuration loading system will use to unmarshal the config. 536// Implement config.CustomUnmarshable on the configuration struct instead. 537type deprecatedUnmarshaler interface { 538 // Unmarshal is a function that un-marshals a viper data into a config struct in a custom way. 539 // componentViperSection *viper.Viper 540 // The config for this specific component. May be nil or empty if no config available. 541 // intoCfg interface{} 542 // An empty interface wrapping a pointer to the config struct to unmarshal into. 543 Unmarshal(componentViperSection *viper.Viper, intoCfg interface{}) error 544} 545 546func unmarshal(componentSection *configparser.Parser, intoCfg interface{}) error { 547 if cu, ok := intoCfg.(config.CustomUnmarshable); ok { 548 return cu.Unmarshal(componentSection) 549 } 550 551 return componentSection.UnmarshalExact(intoCfg) 552} 553 554// unmarshaler returns an unmarshaling function. It should be removed when deprecatedUnmarshaler is removed. 555func unmarshaler(factory component.Factory) func(componentViperSection *configparser.Parser, intoCfg interface{}) error { 556 if _, ok := factory.(deprecatedUnmarshaler); ok { 557 return func(componentParser *configparser.Parser, intoCfg interface{}) error { 558 return errors.New("deprecated way to specify custom unmarshaler no longer supported") 559 } 560 } 561 return unmarshal 562} 563