1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package configloader
16
17import (
18	"errors"
19	"fmt"
20	"os"
21	"reflect"
22
23	"github.com/spf13/cast"
24	"github.com/spf13/viper"
25
26	"go.opentelemetry.io/collector/component"
27	"go.opentelemetry.io/collector/config"
28	"go.opentelemetry.io/collector/config/configparser"
29)
30
31// These are errors that can be returned by Load(). Note that error codes are not part
32// of Load()'s public API, they are for internal unit testing only.
33type configErrorCode int
34
35const (
36	// Skip 0, start errors codes from 1.
37	_ configErrorCode = iota
38
39	errInvalidTypeAndNameKey
40	errUnknownType
41	errDuplicateName
42	errUnmarshalTopLevelStructureError
43)
44
45type configError struct {
46	// Human readable error message.
47	msg string
48
49	// Internal error code.
50	code configErrorCode
51}
52
53func (e *configError) Error() string {
54	return e.msg
55}
56
57// YAML top-level configuration keys.
58const (
59	// extensionsKeyName is the configuration key name for extensions section.
60	extensionsKeyName = "extensions"
61
62	// receiversKeyName is the configuration key name for receivers section.
63	receiversKeyName = "receivers"
64
65	// exportersKeyName is the configuration key name for exporters section.
66	exportersKeyName = "exporters"
67
68	// processorsKeyName is the configuration key name for processors section.
69	processorsKeyName = "processors"
70
71	// pipelinesKeyName is the configuration key name for pipelines section.
72	pipelinesKeyName = "pipelines"
73)
74
75type configSettings struct {
76	Receivers  map[string]map[string]interface{} `mapstructure:"receivers"`
77	Processors map[string]map[string]interface{} `mapstructure:"processors"`
78	Exporters  map[string]map[string]interface{} `mapstructure:"exporters"`
79	Extensions map[string]map[string]interface{} `mapstructure:"extensions"`
80	Service    serviceSettings                   `mapstructure:"service"`
81}
82
83type serviceSettings struct {
84	Extensions []string                    `mapstructure:"extensions"`
85	Pipelines  map[string]pipelineSettings `mapstructure:"pipelines"`
86}
87
88type pipelineSettings struct {
89	Receivers  []string `mapstructure:"receivers"`
90	Processors []string `mapstructure:"processors"`
91	Exporters  []string `mapstructure:"exporters"`
92}
93
94// Load loads a Config from Parser.
95// After loading the config, `Validate()` must be called to validate.
96func Load(v *configparser.Parser, factories component.Factories) (*config.Config, error) {
97	var cfg config.Config
98
99	// Load the config.
100
101	// Struct to validate top level sections.
102	var rawCfg configSettings
103	if err := v.UnmarshalExact(&rawCfg); err != nil {
104		return nil, &configError{
105			code: errUnmarshalTopLevelStructureError,
106			msg:  fmt.Sprintf("error reading top level configuration sections: %s", err.Error()),
107		}
108	}
109
110	// In the following section use v.GetStringMap(xyzKeyName) instead of rawCfg.Xyz, because
111	// UnmarshalExact will not unmarshal entries in the map[string]interface{} with nil values.
112	// GetStringMap does the correct thing.
113
114	// Start with the service extensions.
115
116	extensions, err := loadExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions)
117	if err != nil {
118		return nil, err
119	}
120	cfg.Extensions = extensions
121
122	// Load data components (receivers, exporters, and processors).
123
124	receivers, err := loadReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers)
125	if err != nil {
126		return nil, err
127	}
128	cfg.Receivers = receivers
129
130	exporters, err := loadExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters)
131	if err != nil {
132		return nil, err
133	}
134	cfg.Exporters = exporters
135
136	processors, err := loadProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors)
137	if err != nil {
138		return nil, err
139	}
140	cfg.Processors = processors
141
142	// Load the service and its data pipelines.
143	service, err := loadService(rawCfg.Service)
144	if err != nil {
145		return nil, err
146	}
147	cfg.Service = service
148
149	return &cfg, nil
150}
151
152func errorInvalidTypeAndNameKey(component, key string, err error) error {
153	return &configError{
154		code: errInvalidTypeAndNameKey,
155		msg:  fmt.Sprintf("invalid %s type and name key %q: %v", component, key, err),
156	}
157}
158
159func errorUnknownType(component string, id config.ComponentID) error {
160	return &configError{
161		code: errUnknownType,
162		msg:  fmt.Sprintf("unknown %s type %q for %v", component, id.Type(), id),
163	}
164}
165
166func errorUnmarshalError(component string, id config.ComponentID, err error) error {
167	return &configError{
168		code: errUnmarshalTopLevelStructureError,
169		msg:  fmt.Sprintf("error reading %s configuration for %v: %v", component, id, err),
170	}
171}
172
173func errorDuplicateName(component string, id config.ComponentID) error {
174	return &configError{
175		code: errDuplicateName,
176		msg:  fmt.Sprintf("duplicate %s name %v", component, id),
177	}
178}
179
180func loadExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) {
181	// Prepare resulting map.
182	extensions := make(config.Extensions)
183
184	// Iterate over extensions and create a config for each.
185	for key, value := range exts {
186		componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value))
187		expandEnvConfig(componentConfig)
188
189		// Decode the key into type and fullName components.
190		id, err := config.NewIDFromString(key)
191		if err != nil {
192			return nil, errorInvalidTypeAndNameKey(extensionsKeyName, key, err)
193		}
194
195		// Find extension factory based on "type" that we read from config source.
196		factory := factories[id.Type()]
197		if factory == nil {
198			return nil, errorUnknownType(extensionsKeyName, id)
199		}
200
201		// Create the default config for this extension.
202		extensionCfg := factory.CreateDefaultConfig()
203		extensionCfg.SetIDName(id.Name())
204		expandEnvLoadedConfig(extensionCfg)
205
206		// Now that the default config struct is created we can Unmarshal into it
207		// and it will apply user-defined config on top of the default.
208		unm := unmarshaler(factory)
209		if err := unm(componentConfig, extensionCfg); err != nil {
210			return nil, errorUnmarshalError(extensionsKeyName, id, err)
211		}
212
213		if extensions[id] != nil {
214			return nil, errorDuplicateName(extensionsKeyName, id)
215		}
216
217		extensions[id] = extensionCfg
218	}
219
220	return extensions, nil
221}
222
223func loadService(rawService serviceSettings) (config.Service, error) {
224	var ret config.Service
225	ret.Extensions = make([]config.ComponentID, 0, len(rawService.Extensions))
226	for _, extIDStr := range rawService.Extensions {
227		id, err := config.NewIDFromString(extIDStr)
228		if err != nil {
229			return ret, err
230		}
231		ret.Extensions = append(ret.Extensions, id)
232	}
233
234	// Process the pipelines first so in case of error on them it can be properly
235	// reported.
236	pipelines, err := loadPipelines(rawService.Pipelines)
237	ret.Pipelines = pipelines
238
239	return ret, err
240}
241
242// LoadReceiver loads a receiver config from componentConfig using the provided factories.
243func LoadReceiver(componentConfig *configparser.Parser, id config.ComponentID, factory component.ReceiverFactory) (config.Receiver, error) {
244	// Create the default config for this receiver.
245	receiverCfg := factory.CreateDefaultConfig()
246	receiverCfg.SetIDName(id.Name())
247	expandEnvLoadedConfig(receiverCfg)
248
249	// Now that the default config struct is created we can Unmarshal into it
250	// and it will apply user-defined config on top of the default.
251	unm := unmarshaler(factory)
252	if err := unm(componentConfig, receiverCfg); err != nil {
253		return nil, errorUnmarshalError(receiversKeyName, id, err)
254	}
255
256	return receiverCfg, nil
257}
258
259func loadReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) {
260	// Prepare resulting map.
261	receivers := make(config.Receivers)
262
263	// Iterate over input map and create a config for each.
264	for key, value := range recvs {
265		componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value))
266		expandEnvConfig(componentConfig)
267
268		// Decode the key into type and fullName components.
269		id, err := config.NewIDFromString(key)
270		if err != nil {
271			return nil, errorInvalidTypeAndNameKey(receiversKeyName, key, err)
272		}
273
274		// Find receiver factory based on "type" that we read from config source.
275		factory := factories[id.Type()]
276		if factory == nil {
277			return nil, errorUnknownType(receiversKeyName, id)
278		}
279
280		receiverCfg, err := LoadReceiver(componentConfig, id, factory)
281
282		if err != nil {
283			// LoadReceiver already wraps the error.
284			return nil, err
285		}
286
287		if receivers[id] != nil {
288			return nil, errorDuplicateName(receiversKeyName, id)
289		}
290		receivers[id] = receiverCfg
291	}
292
293	return receivers, nil
294}
295
296func loadExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) {
297	// Prepare resulting map.
298	exporters := make(config.Exporters)
299
300	// Iterate over Exporters and create a config for each.
301	for key, value := range exps {
302		componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value))
303		expandEnvConfig(componentConfig)
304
305		// Decode the key into type and fullName components.
306		id, err := config.NewIDFromString(key)
307		if err != nil {
308			return nil, errorInvalidTypeAndNameKey(exportersKeyName, key, err)
309		}
310
311		// Find exporter factory based on "type" that we read from config source.
312		factory := factories[id.Type()]
313		if factory == nil {
314			return nil, errorUnknownType(exportersKeyName, id)
315		}
316
317		// Create the default config for this exporter.
318		exporterCfg := factory.CreateDefaultConfig()
319		exporterCfg.SetIDName(id.Name())
320		expandEnvLoadedConfig(exporterCfg)
321
322		// Now that the default config struct is created we can Unmarshal into it
323		// and it will apply user-defined config on top of the default.
324		unm := unmarshaler(factory)
325		if err := unm(componentConfig, exporterCfg); err != nil {
326			return nil, errorUnmarshalError(exportersKeyName, id, err)
327		}
328
329		if exporters[id] != nil {
330			return nil, errorDuplicateName(exportersKeyName, id)
331		}
332
333		exporters[id] = exporterCfg
334	}
335
336	return exporters, nil
337}
338
339func loadProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) {
340	// Prepare resulting map.
341	processors := make(config.Processors)
342
343	// Iterate over processors and create a config for each.
344	for key, value := range procs {
345		componentConfig := configparser.NewParserFromStringMap(cast.ToStringMap(value))
346		expandEnvConfig(componentConfig)
347
348		// Decode the key into type and fullName components.
349		id, err := config.NewIDFromString(key)
350		if err != nil {
351			return nil, errorInvalidTypeAndNameKey(processorsKeyName, key, err)
352		}
353
354		// Find processor factory based on "type" that we read from config source.
355		factory := factories[id.Type()]
356		if factory == nil {
357			return nil, errorUnknownType(processorsKeyName, id)
358		}
359
360		// Create the default config for this processor.
361		processorCfg := factory.CreateDefaultConfig()
362		processorCfg.SetIDName(id.Name())
363		expandEnvLoadedConfig(processorCfg)
364
365		// Now that the default config struct is created we can Unmarshal into it
366		// and it will apply user-defined config on top of the default.
367		unm := unmarshaler(factory)
368		if err := unm(componentConfig, processorCfg); err != nil {
369			return nil, errorUnmarshalError(processorsKeyName, id, err)
370		}
371
372		if processors[id] != nil {
373			return nil, errorDuplicateName(processorsKeyName, id)
374		}
375
376		processors[id] = processorCfg
377	}
378
379	return processors, nil
380}
381
382func loadPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) {
383	// Prepare resulting map.
384	pipelines := make(config.Pipelines)
385
386	// Iterate over input map and create a config for each.
387	for key, rawPipeline := range pipelinesConfig {
388		// Decode the key into type and name components.
389		id, err := config.NewIDFromString(key)
390		if err != nil {
391			return nil, errorInvalidTypeAndNameKey(pipelinesKeyName, key, err)
392		}
393		fullName := id.String()
394
395		// Create the config for this pipeline.
396		var pipelineCfg config.Pipeline
397
398		// Set the type.
399		pipelineCfg.InputType = config.DataType(id.Type())
400		switch pipelineCfg.InputType {
401		case config.TracesDataType:
402		case config.MetricsDataType:
403		case config.LogsDataType:
404		default:
405			return nil, errorUnknownType(pipelinesKeyName, id)
406		}
407
408		pipelineCfg.Name = fullName
409		if pipelineCfg.Receivers, err = parseIDNames(id, receiversKeyName, rawPipeline.Receivers); err != nil {
410			return nil, err
411		}
412		if pipelineCfg.Processors, err = parseIDNames(id, processorsKeyName, rawPipeline.Processors); err != nil {
413			return nil, err
414		}
415		if pipelineCfg.Exporters, err = parseIDNames(id, exportersKeyName, rawPipeline.Exporters); err != nil {
416			return nil, err
417		}
418
419		if pipelines[fullName] != nil {
420			return nil, errorDuplicateName(pipelinesKeyName, id)
421		}
422
423		pipelines[fullName] = &pipelineCfg
424	}
425
426	return pipelines, nil
427}
428
429func parseIDNames(pipelineID config.ComponentID, componentType string, names []string) ([]config.ComponentID, error) {
430	var ret []config.ComponentID
431	for _, idProcStr := range names {
432		idRecv, err := config.NewIDFromString(idProcStr)
433		if err != nil {
434			return nil, fmt.Errorf("pipelines: config for %v contains invalid %s name %s : %w", pipelineID, componentType, idProcStr, err)
435		}
436		ret = append(ret, idRecv)
437	}
438	return ret, nil
439}
440
441// expandEnvConfig updates a configparser.Parser with expanded values for all the values (simple, list or map value).
442// It does not expand the keys.
443func expandEnvConfig(v *configparser.Parser) {
444	for _, k := range v.AllKeys() {
445		v.Set(k, expandStringValues(v.Get(k)))
446	}
447}
448
449func expandStringValues(value interface{}) interface{} {
450	switch v := value.(type) {
451	default:
452		return v
453	case string:
454		return expandEnv(v)
455	case []interface{}:
456		nslice := make([]interface{}, 0, len(v))
457		for _, vint := range v {
458			nslice = append(nslice, expandStringValues(vint))
459		}
460		return nslice
461	case map[string]interface{}:
462		nmap := make(map[interface{}]interface{}, len(v))
463		for k, vint := range v {
464			nmap[k] = expandStringValues(vint)
465		}
466		return nmap
467	case map[interface{}]interface{}:
468		nmap := make(map[interface{}]interface{}, len(v))
469		for k, vint := range v {
470			nmap[k] = expandStringValues(vint)
471		}
472		return nmap
473	}
474}
475
476// expandEnvLoadedConfig is a utility function that goes recursively through a config object
477// and tries to expand environment variables in its string fields.
478func expandEnvLoadedConfig(s interface{}) {
479	expandEnvLoadedConfigPointer(s)
480}
481
482func expandEnvLoadedConfigPointer(s interface{}) {
483	// Check that the value given is indeed a pointer, otherwise safely stop the search here
484	value := reflect.ValueOf(s)
485	if value.Kind() != reflect.Ptr {
486		return
487	}
488	// Run expandLoadedConfigValue on the value behind the pointer.
489	expandEnvLoadedConfigValue(value.Elem())
490}
491
492func expandEnvLoadedConfigValue(value reflect.Value) {
493	// The value given is a string, we expand it (if allowed).
494	if value.Kind() == reflect.String && value.CanSet() {
495		value.SetString(expandEnv(value.String()))
496	}
497	// The value given is a struct, we go through its fields.
498	if value.Kind() == reflect.Struct {
499		for i := 0; i < value.NumField(); i++ {
500			// Returns the content of the field.
501			field := value.Field(i)
502
503			// Only try to modify a field if it can be modified (eg. skip unexported private fields).
504			if field.CanSet() {
505				switch field.Kind() {
506				case reflect.String:
507					// The current field is a string, expand env variables in the string.
508					field.SetString(expandEnv(field.String()))
509				case reflect.Ptr:
510					// The current field is a pointer, run the expansion function on the pointer.
511					expandEnvLoadedConfigPointer(field.Interface())
512				case reflect.Struct:
513					// The current field is a nested struct, go through the nested struct
514					expandEnvLoadedConfigValue(field)
515				}
516			}
517		}
518	}
519}
520
521func expandEnv(s string) string {
522	return os.Expand(s, func(str string) string {
523		// This allows escaping environment variable substitution via $$, e.g.
524		// - $FOO will be substituted with env var FOO
525		// - $$FOO will be replaced with $FOO
526		// - $$$FOO will be replaced with $ + substituted env var FOO
527		if str == "$" {
528			return "$"
529		}
530		return os.Getenv(str)
531	})
532}
533
534// deprecatedUnmarshaler interface is a deprecated optional interface that if implemented by a Factory,
535// the configuration loading system will use to unmarshal the config.
536// Implement config.CustomUnmarshable on the configuration struct instead.
537type deprecatedUnmarshaler interface {
538	// Unmarshal is a function that un-marshals a viper data into a config struct in a custom way.
539	// componentViperSection *viper.Viper
540	//   The config for this specific component. May be nil or empty if no config available.
541	// intoCfg interface{}
542	//   An empty interface wrapping a pointer to the config struct to unmarshal into.
543	Unmarshal(componentViperSection *viper.Viper, intoCfg interface{}) error
544}
545
546func unmarshal(componentSection *configparser.Parser, intoCfg interface{}) error {
547	if cu, ok := intoCfg.(config.CustomUnmarshable); ok {
548		return cu.Unmarshal(componentSection)
549	}
550
551	return componentSection.UnmarshalExact(intoCfg)
552}
553
554// unmarshaler returns an unmarshaling function. It should be removed when deprecatedUnmarshaler is removed.
555func unmarshaler(factory component.Factory) func(componentViperSection *configparser.Parser, intoCfg interface{}) error {
556	if _, ok := factory.(deprecatedUnmarshaler); ok {
557		return func(componentParser *configparser.Parser, intoCfg interface{}) error {
558			return errors.New("deprecated way to specify custom unmarshaler no longer supported")
559		}
560	}
561	return unmarshal
562}
563