1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package configsource
16
17import (
18	"bytes"
19	"context"
20	"errors"
21	"fmt"
22	"net/url"
23	"os"
24	"strings"
25	"sync"
26
27	"gopkg.in/yaml.v2"
28
29	"go.opentelemetry.io/collector/config/configparser"
30	"go.opentelemetry.io/collector/config/experimental/configsource"
31	"go.opentelemetry.io/collector/consumer/consumererror"
32)
33
34const (
35	// expandPrefixChar is the char used to prefix strings that can be expanded,
36	// either environment variables or config sources.
37	expandPrefixChar = '$'
38	// configSourceNameDelimChar is the char used to terminate the name of config source
39	// when it is used to retrieve values to inject in the configuration.
40	configSourceNameDelimChar = ':'
41)
42
43// private error types to help with testability
44type (
45	errUnknownConfigSource struct{ error }
46)
47
48// Manager is used to inject data from config sources into a configuration and also
49// to monitor for updates on the items injected into the configuration.  All methods
50// of a Manager must be called only once and have an expected sequence:
51//
52// 1. NewManager to create a new instance;
53// 2. Resolve to inject the data from config sources into a configuration;
54// 3. WatchForUpdate in a goroutine to wait for configuration updates;
55// 4. WaitForWatcher to wait until the watchers are in place;
56// 5. Close to close the instance;
57//
58// The current syntax to reference a config source in a YAML is provisional. Currently
59// single-line:
60//
61//    param_to_be_retrieved: $<cfgSrcName>:<selector>[?<params_url_query_format>]
62//
63// bracketed single-line:
64//
65//    param_to_be_retrieved: ${<cfgSrcName>:<selector>[?<params_url_query_format>]}
66//
67// and multi-line are supported:
68//
69//    param_to_be_retrieved: |
70//      $<cfgSrcName>: <selector>
71//      [<params_multi_line_YAML>]
72//
73// The <cfgSrcName> is a name string used to identify the config source instance to be used
74// to retrieve the value.
75//
76// The <selector> is the mandatory parameter required when retrieving data from a config source.
77//
78// Not all config sources need the optional parameters, they are used to provide extra control when
79// retrieving and preparing the data to be injected into the configuration.
80//
81// For single-line format <params_url_query_format> uses the same syntax as URL query parameters.
82// Hypothetical example in a YAML file:
83//
84// component:
85//   config_field: $file:/etc/secret.bin?binary=true
86//
87// For multi-line format <params_multi_line_YAML> uses syntax as a YAML inside YAML. Possible usage
88// example in a YAML file:
89//
90// component:
91//   config_field: |
92//     $yamltemplate: /etc/log_template.yaml
93//     logs_path: /var/logs/
94//     timeout: 10s
95//
96// Not all config sources need these optional parameters, they are used to provide extra control when
97// retrieving and data to be injected into the configuration.
98//
99// Assuming a config source named "env" that retrieve environment variables and one named "file" that
100// retrieves contents from individual files, here are some examples:
101//
102//    component:
103//      # Retrieves the value of the environment variable LOGS_DIR.
104//      logs_dir: $env:LOGS_DIR
105//
106//      # Retrieves the value from the file /etc/secret.bin and injects its contents as a []byte.
107//      bytes_from_file: $file:/etc/secret.bin?binary=true
108//
109//      # Retrieves the value from the file /etc/text.txt and injects its contents as a string.
110//      # Hypothetically the "file" config source by default tries to inject the file contents
111//      # as a string if params doesn't specify that "binary" is true.
112//      text_from_file: $file:/etc/text.txt
113//
114// Bracketed single-line should be used when concatenating a suffix to the value retrieved by
115// the config source. Example:
116//
117//    component:
118//      # Retrieves the value of the environment variable LOGS_DIR and appends /component.log to it.
119//      log_file_fullname: ${env:LOGS_DIR}/component.log
120//
121// Environment variables are expanded before passed to the config source when used in the selector or
122// the optional parameters. Example:
123//
124//    component:
125//      # Retrieves the value from the file text.txt located on the path specified by the environment
126//      # variable DATA_PATH. The name of the environment variable is the string after the delimiter
127//      # until the first character different than '_' and non-alpha-numeric.
128//      text_from_file: $file:$DATA_PATH/text.txt
129//
130// Since environment variables and config sources both use the '$', with or without brackets, as a prefix
131// for their expansion it is necessary to have a way to distinguish between them. For the non-bracketed
132// syntax the code will peek at the first character other than alpha-numeric and '_' after the '$'. If
133// that character is a ':' it will treat it as a config source and as environment variable otherwise.
134// For example:
135//
136//    component:
137//      field_0: $PATH:/etc/logs # Injects the data from a config sourced named "PATH" using the selector "/etc/logs".
138//      field_1: $PATH/etc/logs  # Expands the environment variable "PATH" and adds the suffix "/etc/logs" to it.
139//
140// So if you need to include an environment followed by ':' the bracketed syntax must be used instead:
141//
142//    component:
143//      field_0: ${PATH}:/etc/logs # Expands the environment variable "PATH" and adds the suffix ":/etc/logs" to it.
144//
145// For the bracketed syntax the presence of ':' inside the brackets indicates that code will treat the bracketed
146// contents as a config source. For example:
147//
148//    component:
149//      field_0: ${file:/var/secret.txt} # Injects the data from a config sourced named "file" using the selector "/var/secret.txt".
150//      field_1: ${file}:/var/secret.txt # Expands the environment variable "file" and adds the suffix ":/var/secret.txt" to it.
151//
152// If the character following the '$' is in the set {'*', '#', '$', '@', '!', '?', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
153// the code will consider it to be the name of an environment variable to expand, or config source if followed by ':'. Do not use any of these
154// characters as the first char on the name of a config source or an environment variable (even if allowed by the system) to avoid unexpected
155// results.
156type Manager struct {
157	// configSources is map from ConfigSource names (as defined in the configuration)
158	// and the respective instances.
159	configSources map[string]configsource.ConfigSource
160	// sessions track all the Session objects used to retrieve values to be injected
161	// into the configuration.
162	sessions map[string]configsource.Session
163	// watchers keeps track of all WatchForUpdate functions for retrieved values.
164	watchers []func() error
165	// watchersWG is used to ensure that Close waits for all WatchForUpdate calls
166	// to complete.
167	watchersWG sync.WaitGroup
168	// watchingCh is used to notify users of the Manager that the WatchForUpdate function
169	// is ready and waiting for notifications.
170	watchingCh chan struct{}
171	// closeCh is used to notify the Manager WatchForUpdate function that the manager
172	// is being closed.
173	closeCh chan struct{}
174}
175
176// NewManager creates a new instance of a Manager to be used to inject data from
177// ConfigSource objects into a configuration and watch for updates on the injected
178// data.
179func NewManager(_ *configparser.Parser) (*Manager, error) {
180	// TODO: Config sources should be extracted for the config itself, need Factories for that.
181
182	return &Manager{
183		// TODO: Temporarily tests should set their config sources per their needs.
184		sessions:   make(map[string]configsource.Session),
185		watchingCh: make(chan struct{}),
186		closeCh:    make(chan struct{}),
187	}, nil
188}
189
190// Resolve inspects the given config.Parser and resolves all config sources referenced
191// in the configuration, returning a config.Parser fully resolved. This must be called only
192// once per lifetime of a Manager object.
193func (m *Manager) Resolve(ctx context.Context, parser *configparser.Parser) (*configparser.Parser, error) {
194	res := configparser.NewParser()
195	allKeys := parser.AllKeys()
196	for _, k := range allKeys {
197		value, err := m.expandStringValues(ctx, parser.Get(k))
198		if err != nil {
199			// Call RetrieveEnd for all sessions used so far but don't record any errors.
200			_ = m.retrieveEndAllSessions(ctx)
201			return nil, err
202		}
203		res.Set(k, value)
204	}
205
206	if errs := m.retrieveEndAllSessions(ctx); len(errs) > 0 {
207		return nil, consumererror.Combine(errs)
208	}
209
210	return res, nil
211}
212
213// WatchForUpdate must watch for updates on any of the values retrieved from config sources
214// and injected into the configuration. Typically this method is launched in a goroutine, the
215// method WaitForWatcher blocks until the WatchForUpdate goroutine is running and ready.
216func (m *Manager) WatchForUpdate() error {
217	// Use a channel to capture the first error returned by any watcher and another one
218	// to ensure completion of any remaining watcher also trying to report an error.
219	errChannel := make(chan error, 1)
220	doneCh := make(chan struct{})
221	defer close(doneCh)
222
223	for _, watcher := range m.watchers {
224		m.watchersWG.Add(1)
225		watcherFn := watcher
226		go func() {
227			defer m.watchersWG.Done()
228
229			err := watcherFn()
230			switch {
231			case errors.Is(err, configsource.ErrWatcherNotSupported):
232				// The watcher for the retrieved value is not supported, nothing to
233				// do, just exit from the goroutine.
234				return
235			case errors.Is(err, configsource.ErrSessionClosed):
236				// The Session from which this watcher was retrieved is being closed.
237				// There is no error to report, just exit from the goroutine.
238				return
239			default:
240				select {
241				case errChannel <- err:
242					// Try to report any other error.
243				case <-doneCh:
244					// There was either one error published or the watcher was closed.
245					// This channel was closed and any goroutines waiting on these
246					// should simply close.
247				}
248			}
249		}()
250	}
251
252	// All goroutines were created, they may not be running yet, but the manager WatchForUpdate
253	// is only waiting for any of the watchers to terminate.
254	close(m.watchingCh)
255
256	select {
257	case err := <-errChannel:
258		// Return the first error that reaches the channel and ignore any other error.
259		return err
260	case <-m.closeCh:
261		// This covers the case that all watchers returned ErrWatcherNotSupported.
262		return configsource.ErrSessionClosed
263	}
264}
265
266// WaitForWatcher blocks until the watchers used by WatchForUpdate are all ready.
267// This is used to ensure that the watchers are in place before proceeding.
268func (m *Manager) WaitForWatcher() {
269	<-m.watchingCh
270}
271
272// Close terminates the WatchForUpdate function and closes all Session objects used
273// in the configuration. It should be called
274func (m *Manager) Close(ctx context.Context) error {
275	var errs []error
276	for _, session := range m.sessions {
277		if err := session.Close(ctx); err != nil {
278			errs = append(errs, err)
279		}
280	}
281
282	close(m.closeCh)
283	m.watchersWG.Wait()
284
285	return consumererror.Combine(errs)
286}
287
288func (m *Manager) retrieveEndAllSessions(ctx context.Context) []error {
289	var errs []error
290	for _, session := range m.sessions {
291		if err := session.RetrieveEnd(ctx); err != nil {
292			errs = append(errs, err)
293		}
294	}
295	return errs
296}
297
298func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (interface{}, error) {
299	switch v := value.(type) {
300	case string:
301		return m.expandString(ctx, v)
302	case []interface{}:
303		nslice := make([]interface{}, 0, len(v))
304		for _, vint := range v {
305			value, err := m.expandStringValues(ctx, vint)
306			if err != nil {
307				return nil, err
308			}
309			nslice = append(nslice, value)
310		}
311		return nslice, nil
312	case map[string]interface{}:
313		nmap := make(map[interface{}]interface{}, len(v))
314		for k, vint := range v {
315			value, err := m.expandStringValues(ctx, vint)
316			if err != nil {
317				return nil, err
318			}
319			nmap[k] = value
320		}
321		return nmap, nil
322	case map[interface{}]interface{}:
323		nmap := make(map[interface{}]interface{}, len(v))
324		for k, vint := range v {
325			value, err := m.expandStringValues(ctx, vint)
326			if err != nil {
327				return nil, err
328			}
329			nmap[k] = value
330		}
331		return nmap, nil
332	default:
333		return v, nil
334	}
335}
336
337// expandConfigSource retrieve data from the specified config source and injects them into
338// the configuration. The Manager tracks sessions and watcher objects as needed.
339func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.ConfigSource, s string) (interface{}, error) {
340	cfgSrcName, selector, params, err := parseCfgSrc(s)
341	if err != nil {
342		return nil, err
343	}
344
345	session, ok := m.sessions[cfgSrcName]
346	if !ok {
347		session, err = cfgSrc.NewSession(ctx)
348		if err != nil {
349			return nil, fmt.Errorf("failed to create session for config source %q: %w", cfgSrcName, err)
350		}
351		m.sessions[cfgSrcName] = session
352	}
353
354	retrieved, err := session.Retrieve(ctx, selector, params)
355	if err != nil {
356		return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err)
357	}
358
359	m.watchers = append(m.watchers, retrieved.WatchForUpdate)
360
361	return retrieved.Value(), nil
362}
363
364// expandString expands environment variables and config sources that are specified on the string.
365func (m *Manager) expandString(ctx context.Context, s string) (interface{}, error) {
366	// Code based on os.Expand function. All delimiters that are checked against are
367	// ASCII so bytes are fine for this operation.
368	var buf []byte
369
370	// Using i, j, and w variables to keep correspondence with os.Expand code.
371	// i tracks the index in s from which a slice to be appended to buf should start.
372	// j tracks the char being currently checked and also the end of the slice to be appended to buf.
373	// w tracks the number of characters being consumed after a prefix identifying env vars or config sources.
374	i := 0
375	for j := 0; j < len(s); j++ {
376		if s[j] == expandPrefixChar && j+1 < len(s) {
377			if buf == nil {
378				// Assuming that the length of the string will double after expansion of env vars and config sources.
379				buf = make([]byte, 0, 2*len(s))
380			}
381
382			// Append everything consumed up to the prefix char (but not including the prefix char) to the result.
383			buf = append(buf, s[i:j]...)
384
385			var expandableContent, cfgSrcName string
386			w := 0 // number of bytes consumed on this pass
387
388			switch {
389			case s[j+1] == expandPrefixChar:
390				// Escaping the prefix so $$ becomes a single $ without attempting
391				// to treat the string after it as a config source or env var.
392				expandableContent = string(expandPrefixChar)
393				w = 1 // consumed a single char
394
395			case s[j+1] == '{':
396				// Bracketed usage, consume everything until first '}' exactly as os.Expand.
397				expandableContent, w = getShellName(s[j+1:])
398				// Allow for some spaces.
399				expandableContent = strings.Trim(expandableContent, " ")
400				if len(expandableContent) > 1 && strings.Contains(expandableContent, string(configSourceNameDelimChar)) {
401					// Bracket expandableContent contains ':' treating it as a config source.
402					cfgSrcName, _ = getShellName(expandableContent)
403				}
404
405			default:
406				// Non-bracketed usage, ie.: found the prefix char, it can be either a config
407				// source or an environment variable.
408				var name string
409				name, w = getShellName(s[j+1:])
410				expandableContent = name // Assume for now that it is an env var.
411
412				// Peek next char after name, if it is a config source name delimiter treat the remaining of the
413				// string as a config source.
414				if j+w+1 < len(s) && s[j+w+1] == configSourceNameDelimChar {
415					// This is a config source, since it is not delimited it will consume until end of the string.
416					cfgSrcName = name
417					expandableContent = s[j+1:]
418					w = len(expandableContent) // Set consumed bytes to the length of expandableContent
419				}
420			}
421
422			switch {
423			case cfgSrcName == "":
424				// Not a config source, expand as os.ExpandEnv
425				buf = osExpandEnv(buf, expandableContent, w)
426
427			default:
428				// A config source, retrieve and apply results.
429				retrieved, err := m.retrieveConfigSourceData(ctx, cfgSrcName, expandableContent)
430				if err != nil {
431					return nil, err
432				}
433
434				consumedAll := j+w+1 == len(s)
435				if consumedAll && len(buf) == 0 {
436					// This is the only expandableContent on the string, config
437					// source is free to return interface{}.
438					return retrieved, nil
439				}
440
441				// Either there was a prefix already or there are still
442				// characters to be processed.
443				buf = append(buf, fmt.Sprintf("%v", retrieved)...)
444			}
445
446			j += w    // move the index of the char being checked (j) by the number of characters consumed (w) on this iteration.
447			i = j + 1 // update start index (i) of next slice of bytes to be copied.
448		}
449	}
450
451	if buf == nil {
452		// No changes to original string, just return it.
453		return s, nil
454	}
455
456	// Return whatever was accumulated on the buffer plus the remaining of the original string.
457	return string(buf) + s[i:], nil
458}
459
460func (m *Manager) retrieveConfigSourceData(ctx context.Context, name, cfgSrcInvoke string) (interface{}, error) {
461	cfgSrc, ok := m.configSources[name]
462	if !ok {
463		return nil, newErrUnknownConfigSource(name)
464	}
465
466	// Expand any env vars on the selector and parameters. Nested config source usage
467	// is not supported.
468	cfgSrcInvoke = expandEnvVars(cfgSrcInvoke)
469	retrieved, err := m.expandConfigSource(ctx, cfgSrc, cfgSrcInvoke)
470	if err != nil {
471		return nil, err
472	}
473
474	return retrieved, nil
475}
476
477func newErrUnknownConfigSource(cfgSrcName string) error {
478	return &errUnknownConfigSource{
479		fmt.Errorf(`config source %q not found if this was intended to be an environment variable use "${%s}" instead"`, cfgSrcName, cfgSrcName),
480	}
481}
482
483// parseCfgSrc extracts the reference to a config source from a string value.
484// The caller should check for error explicitly since it is possible for the
485// other values to have been partially set.
486func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err error) {
487	parts := strings.SplitN(s, string(configSourceNameDelimChar), 2)
488	if len(parts) != 2 {
489		err = fmt.Errorf("invalid config source syntax at %q, it must have at least the config source name and a selector", s)
490		return
491	}
492	cfgSrcName = strings.Trim(parts[0], " ")
493
494	// Separate multi-line and single line case.
495	afterCfgSrcName := parts[1]
496	switch {
497	case strings.Contains(afterCfgSrcName, "\n"):
498		// Multi-line, until the first \n it is the selector, everything after as YAML.
499		parts = strings.SplitN(afterCfgSrcName, "\n", 2)
500		selector = strings.Trim(parts[0], " ")
501
502		if len(parts) > 1 && len(parts[1]) > 0 {
503			var cp *configparser.Parser
504			cp, err = configparser.NewParserFromBuffer(bytes.NewReader([]byte(parts[1])))
505			if err != nil {
506				return
507			}
508			params = cp.ToStringMap()
509		}
510
511	default:
512		// Single line, and parameters as URL query.
513		const selectorDelim string = "?"
514		parts = strings.SplitN(parts[1], selectorDelim, 2)
515		selector = strings.Trim(parts[0], " ")
516
517		if len(parts) == 2 {
518			paramsPart := parts[1]
519			params, err = parseParamsAsURLQuery(paramsPart)
520			if err != nil {
521				err = fmt.Errorf("invalid parameters syntax at %q: %w", s, err)
522				return
523			}
524		}
525	}
526
527	return cfgSrcName, selector, params, err
528}
529
530func parseParamsAsURLQuery(s string) (interface{}, error) {
531	values, err := url.ParseQuery(s)
532	if err != nil {
533		return nil, err
534	}
535
536	// Transform single array values in scalars.
537	params := make(map[string]interface{})
538	for k, v := range values {
539		switch len(v) {
540		case 0:
541			params[k] = nil
542		case 1:
543			var iface interface{}
544			if err = yaml.Unmarshal([]byte(v[0]), &iface); err != nil {
545				return nil, err
546			}
547			params[k] = iface
548		default:
549			// It is a slice add element by element
550			elemSlice := make([]interface{}, 0, len(v))
551			for _, elem := range v {
552				var iface interface{}
553				if err = yaml.Unmarshal([]byte(elem), &iface); err != nil {
554					return nil, err
555				}
556				elemSlice = append(elemSlice, iface)
557			}
558			params[k] = elemSlice
559		}
560	}
561	return params, err
562}
563
564// expandEnvVars is used to expand environment variables with the same syntax used
565// by config.Parser.
566func expandEnvVars(s string) string {
567	return os.Expand(s, func(str string) string {
568		// This allows escaping environment variable substitution via $$, e.g.
569		// - $FOO will be substituted with env var FOO
570		// - $$FOO will be replaced with $FOO
571		// - $$$FOO will be replaced with $ + substituted env var FOO
572		if str == "$" {
573			return "$"
574		}
575		return os.Getenv(str)
576	})
577}
578
579// osExpandEnv replicate the internal behavior of os.ExpandEnv when handling env
580// vars updating the buffer accordingly.
581func osExpandEnv(buf []byte, name string, w int) []byte {
582	switch {
583	case name == "" && w > 0:
584		// Encountered invalid syntax; eat the
585		// characters.
586	case name == "" || name == "$":
587		// Valid syntax, but $ was not followed by a
588		// name. Leave the dollar character untouched.
589		buf = append(buf, expandPrefixChar)
590	default:
591		buf = append(buf, os.Getenv(name)...)
592	}
593
594	return buf
595}
596
597// Below are helper functions used by os.Expand, copied without changes from original sources (env.go).
598
599// isShellSpecialVar reports whether the character identifies a special
600// shell variable such as $*.
601func isShellSpecialVar(c uint8) bool {
602	switch c {
603	case '*', '#', '$', '@', '!', '?', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
604		return true
605	}
606	return false
607}
608
609// isAlphaNum reports whether the byte is an ASCII letter, number, or underscore
610func isAlphaNum(c uint8) bool {
611	return c == '_' || '0' <= c && c <= '9' || 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z'
612}
613
614// getShellName returns the name that begins the string and the number of bytes
615// consumed to extract it. If the name is enclosed in {}, it's part of a ${}
616// expansion and two more bytes are needed than the length of the name.
617func getShellName(s string) (string, int) {
618	switch {
619	case s[0] == '{':
620		if len(s) > 2 && isShellSpecialVar(s[1]) && s[2] == '}' {
621			return s[1:2], 3
622		}
623		// Scan to closing brace
624		for i := 1; i < len(s); i++ {
625			if s[i] == '}' {
626				if i == 1 {
627					// Bad syntax; eat "${}"
628					return "", 2
629				}
630				return s[1:i], i + 1
631			}
632		}
633		// Bad syntax; eat "${"
634		return "", 1
635	case isShellSpecialVar(s[0]):
636		return s[0:1], 1
637	}
638	// Scan alphanumerics.
639	var i int
640	for i = 0; i < len(s) && isAlphaNum(s[i]); i++ {
641	}
642	return s[:i], i
643}
644