1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package configsource 16 17import ( 18 "bytes" 19 "context" 20 "errors" 21 "fmt" 22 "net/url" 23 "os" 24 "strings" 25 "sync" 26 27 "gopkg.in/yaml.v2" 28 29 "go.opentelemetry.io/collector/config/configparser" 30 "go.opentelemetry.io/collector/config/experimental/configsource" 31 "go.opentelemetry.io/collector/consumer/consumererror" 32) 33 34const ( 35 // expandPrefixChar is the char used to prefix strings that can be expanded, 36 // either environment variables or config sources. 37 expandPrefixChar = '$' 38 // configSourceNameDelimChar is the char used to terminate the name of config source 39 // when it is used to retrieve values to inject in the configuration. 40 configSourceNameDelimChar = ':' 41) 42 43// private error types to help with testability 44type ( 45 errUnknownConfigSource struct{ error } 46) 47 48// Manager is used to inject data from config sources into a configuration and also 49// to monitor for updates on the items injected into the configuration. All methods 50// of a Manager must be called only once and have an expected sequence: 51// 52// 1. NewManager to create a new instance; 53// 2. Resolve to inject the data from config sources into a configuration; 54// 3. WatchForUpdate in a goroutine to wait for configuration updates; 55// 4. WaitForWatcher to wait until the watchers are in place; 56// 5. Close to close the instance; 57// 58// The current syntax to reference a config source in a YAML is provisional. Currently 59// single-line: 60// 61// param_to_be_retrieved: $<cfgSrcName>:<selector>[?<params_url_query_format>] 62// 63// bracketed single-line: 64// 65// param_to_be_retrieved: ${<cfgSrcName>:<selector>[?<params_url_query_format>]} 66// 67// and multi-line are supported: 68// 69// param_to_be_retrieved: | 70// $<cfgSrcName>: <selector> 71// [<params_multi_line_YAML>] 72// 73// The <cfgSrcName> is a name string used to identify the config source instance to be used 74// to retrieve the value. 75// 76// The <selector> is the mandatory parameter required when retrieving data from a config source. 77// 78// Not all config sources need the optional parameters, they are used to provide extra control when 79// retrieving and preparing the data to be injected into the configuration. 80// 81// For single-line format <params_url_query_format> uses the same syntax as URL query parameters. 82// Hypothetical example in a YAML file: 83// 84// component: 85// config_field: $file:/etc/secret.bin?binary=true 86// 87// For multi-line format <params_multi_line_YAML> uses syntax as a YAML inside YAML. Possible usage 88// example in a YAML file: 89// 90// component: 91// config_field: | 92// $yamltemplate: /etc/log_template.yaml 93// logs_path: /var/logs/ 94// timeout: 10s 95// 96// Not all config sources need these optional parameters, they are used to provide extra control when 97// retrieving and data to be injected into the configuration. 98// 99// Assuming a config source named "env" that retrieve environment variables and one named "file" that 100// retrieves contents from individual files, here are some examples: 101// 102// component: 103// # Retrieves the value of the environment variable LOGS_DIR. 104// logs_dir: $env:LOGS_DIR 105// 106// # Retrieves the value from the file /etc/secret.bin and injects its contents as a []byte. 107// bytes_from_file: $file:/etc/secret.bin?binary=true 108// 109// # Retrieves the value from the file /etc/text.txt and injects its contents as a string. 110// # Hypothetically the "file" config source by default tries to inject the file contents 111// # as a string if params doesn't specify that "binary" is true. 112// text_from_file: $file:/etc/text.txt 113// 114// Bracketed single-line should be used when concatenating a suffix to the value retrieved by 115// the config source. Example: 116// 117// component: 118// # Retrieves the value of the environment variable LOGS_DIR and appends /component.log to it. 119// log_file_fullname: ${env:LOGS_DIR}/component.log 120// 121// Environment variables are expanded before passed to the config source when used in the selector or 122// the optional parameters. Example: 123// 124// component: 125// # Retrieves the value from the file text.txt located on the path specified by the environment 126// # variable DATA_PATH. The name of the environment variable is the string after the delimiter 127// # until the first character different than '_' and non-alpha-numeric. 128// text_from_file: $file:$DATA_PATH/text.txt 129// 130// Since environment variables and config sources both use the '$', with or without brackets, as a prefix 131// for their expansion it is necessary to have a way to distinguish between them. For the non-bracketed 132// syntax the code will peek at the first character other than alpha-numeric and '_' after the '$'. If 133// that character is a ':' it will treat it as a config source and as environment variable otherwise. 134// For example: 135// 136// component: 137// field_0: $PATH:/etc/logs # Injects the data from a config sourced named "PATH" using the selector "/etc/logs". 138// field_1: $PATH/etc/logs # Expands the environment variable "PATH" and adds the suffix "/etc/logs" to it. 139// 140// So if you need to include an environment followed by ':' the bracketed syntax must be used instead: 141// 142// component: 143// field_0: ${PATH}:/etc/logs # Expands the environment variable "PATH" and adds the suffix ":/etc/logs" to it. 144// 145// For the bracketed syntax the presence of ':' inside the brackets indicates that code will treat the bracketed 146// contents as a config source. For example: 147// 148// component: 149// field_0: ${file:/var/secret.txt} # Injects the data from a config sourced named "file" using the selector "/var/secret.txt". 150// field_1: ${file}:/var/secret.txt # Expands the environment variable "file" and adds the suffix ":/var/secret.txt" to it. 151// 152// If the character following the '$' is in the set {'*', '#', '$', '@', '!', '?', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} 153// the code will consider it to be the name of an environment variable to expand, or config source if followed by ':'. Do not use any of these 154// characters as the first char on the name of a config source or an environment variable (even if allowed by the system) to avoid unexpected 155// results. 156type Manager struct { 157 // configSources is map from ConfigSource names (as defined in the configuration) 158 // and the respective instances. 159 configSources map[string]configsource.ConfigSource 160 // sessions track all the Session objects used to retrieve values to be injected 161 // into the configuration. 162 sessions map[string]configsource.Session 163 // watchers keeps track of all WatchForUpdate functions for retrieved values. 164 watchers []func() error 165 // watchersWG is used to ensure that Close waits for all WatchForUpdate calls 166 // to complete. 167 watchersWG sync.WaitGroup 168 // watchingCh is used to notify users of the Manager that the WatchForUpdate function 169 // is ready and waiting for notifications. 170 watchingCh chan struct{} 171 // closeCh is used to notify the Manager WatchForUpdate function that the manager 172 // is being closed. 173 closeCh chan struct{} 174} 175 176// NewManager creates a new instance of a Manager to be used to inject data from 177// ConfigSource objects into a configuration and watch for updates on the injected 178// data. 179func NewManager(_ *configparser.Parser) (*Manager, error) { 180 // TODO: Config sources should be extracted for the config itself, need Factories for that. 181 182 return &Manager{ 183 // TODO: Temporarily tests should set their config sources per their needs. 184 sessions: make(map[string]configsource.Session), 185 watchingCh: make(chan struct{}), 186 closeCh: make(chan struct{}), 187 }, nil 188} 189 190// Resolve inspects the given config.Parser and resolves all config sources referenced 191// in the configuration, returning a config.Parser fully resolved. This must be called only 192// once per lifetime of a Manager object. 193func (m *Manager) Resolve(ctx context.Context, parser *configparser.Parser) (*configparser.Parser, error) { 194 res := configparser.NewParser() 195 allKeys := parser.AllKeys() 196 for _, k := range allKeys { 197 value, err := m.expandStringValues(ctx, parser.Get(k)) 198 if err != nil { 199 // Call RetrieveEnd for all sessions used so far but don't record any errors. 200 _ = m.retrieveEndAllSessions(ctx) 201 return nil, err 202 } 203 res.Set(k, value) 204 } 205 206 if errs := m.retrieveEndAllSessions(ctx); len(errs) > 0 { 207 return nil, consumererror.Combine(errs) 208 } 209 210 return res, nil 211} 212 213// WatchForUpdate must watch for updates on any of the values retrieved from config sources 214// and injected into the configuration. Typically this method is launched in a goroutine, the 215// method WaitForWatcher blocks until the WatchForUpdate goroutine is running and ready. 216func (m *Manager) WatchForUpdate() error { 217 // Use a channel to capture the first error returned by any watcher and another one 218 // to ensure completion of any remaining watcher also trying to report an error. 219 errChannel := make(chan error, 1) 220 doneCh := make(chan struct{}) 221 defer close(doneCh) 222 223 for _, watcher := range m.watchers { 224 m.watchersWG.Add(1) 225 watcherFn := watcher 226 go func() { 227 defer m.watchersWG.Done() 228 229 err := watcherFn() 230 switch { 231 case errors.Is(err, configsource.ErrWatcherNotSupported): 232 // The watcher for the retrieved value is not supported, nothing to 233 // do, just exit from the goroutine. 234 return 235 case errors.Is(err, configsource.ErrSessionClosed): 236 // The Session from which this watcher was retrieved is being closed. 237 // There is no error to report, just exit from the goroutine. 238 return 239 default: 240 select { 241 case errChannel <- err: 242 // Try to report any other error. 243 case <-doneCh: 244 // There was either one error published or the watcher was closed. 245 // This channel was closed and any goroutines waiting on these 246 // should simply close. 247 } 248 } 249 }() 250 } 251 252 // All goroutines were created, they may not be running yet, but the manager WatchForUpdate 253 // is only waiting for any of the watchers to terminate. 254 close(m.watchingCh) 255 256 select { 257 case err := <-errChannel: 258 // Return the first error that reaches the channel and ignore any other error. 259 return err 260 case <-m.closeCh: 261 // This covers the case that all watchers returned ErrWatcherNotSupported. 262 return configsource.ErrSessionClosed 263 } 264} 265 266// WaitForWatcher blocks until the watchers used by WatchForUpdate are all ready. 267// This is used to ensure that the watchers are in place before proceeding. 268func (m *Manager) WaitForWatcher() { 269 <-m.watchingCh 270} 271 272// Close terminates the WatchForUpdate function and closes all Session objects used 273// in the configuration. It should be called 274func (m *Manager) Close(ctx context.Context) error { 275 var errs []error 276 for _, session := range m.sessions { 277 if err := session.Close(ctx); err != nil { 278 errs = append(errs, err) 279 } 280 } 281 282 close(m.closeCh) 283 m.watchersWG.Wait() 284 285 return consumererror.Combine(errs) 286} 287 288func (m *Manager) retrieveEndAllSessions(ctx context.Context) []error { 289 var errs []error 290 for _, session := range m.sessions { 291 if err := session.RetrieveEnd(ctx); err != nil { 292 errs = append(errs, err) 293 } 294 } 295 return errs 296} 297 298func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (interface{}, error) { 299 switch v := value.(type) { 300 case string: 301 return m.expandString(ctx, v) 302 case []interface{}: 303 nslice := make([]interface{}, 0, len(v)) 304 for _, vint := range v { 305 value, err := m.expandStringValues(ctx, vint) 306 if err != nil { 307 return nil, err 308 } 309 nslice = append(nslice, value) 310 } 311 return nslice, nil 312 case map[string]interface{}: 313 nmap := make(map[interface{}]interface{}, len(v)) 314 for k, vint := range v { 315 value, err := m.expandStringValues(ctx, vint) 316 if err != nil { 317 return nil, err 318 } 319 nmap[k] = value 320 } 321 return nmap, nil 322 case map[interface{}]interface{}: 323 nmap := make(map[interface{}]interface{}, len(v)) 324 for k, vint := range v { 325 value, err := m.expandStringValues(ctx, vint) 326 if err != nil { 327 return nil, err 328 } 329 nmap[k] = value 330 } 331 return nmap, nil 332 default: 333 return v, nil 334 } 335} 336 337// expandConfigSource retrieve data from the specified config source and injects them into 338// the configuration. The Manager tracks sessions and watcher objects as needed. 339func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.ConfigSource, s string) (interface{}, error) { 340 cfgSrcName, selector, params, err := parseCfgSrc(s) 341 if err != nil { 342 return nil, err 343 } 344 345 session, ok := m.sessions[cfgSrcName] 346 if !ok { 347 session, err = cfgSrc.NewSession(ctx) 348 if err != nil { 349 return nil, fmt.Errorf("failed to create session for config source %q: %w", cfgSrcName, err) 350 } 351 m.sessions[cfgSrcName] = session 352 } 353 354 retrieved, err := session.Retrieve(ctx, selector, params) 355 if err != nil { 356 return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) 357 } 358 359 m.watchers = append(m.watchers, retrieved.WatchForUpdate) 360 361 return retrieved.Value(), nil 362} 363 364// expandString expands environment variables and config sources that are specified on the string. 365func (m *Manager) expandString(ctx context.Context, s string) (interface{}, error) { 366 // Code based on os.Expand function. All delimiters that are checked against are 367 // ASCII so bytes are fine for this operation. 368 var buf []byte 369 370 // Using i, j, and w variables to keep correspondence with os.Expand code. 371 // i tracks the index in s from which a slice to be appended to buf should start. 372 // j tracks the char being currently checked and also the end of the slice to be appended to buf. 373 // w tracks the number of characters being consumed after a prefix identifying env vars or config sources. 374 i := 0 375 for j := 0; j < len(s); j++ { 376 if s[j] == expandPrefixChar && j+1 < len(s) { 377 if buf == nil { 378 // Assuming that the length of the string will double after expansion of env vars and config sources. 379 buf = make([]byte, 0, 2*len(s)) 380 } 381 382 // Append everything consumed up to the prefix char (but not including the prefix char) to the result. 383 buf = append(buf, s[i:j]...) 384 385 var expandableContent, cfgSrcName string 386 w := 0 // number of bytes consumed on this pass 387 388 switch { 389 case s[j+1] == expandPrefixChar: 390 // Escaping the prefix so $$ becomes a single $ without attempting 391 // to treat the string after it as a config source or env var. 392 expandableContent = string(expandPrefixChar) 393 w = 1 // consumed a single char 394 395 case s[j+1] == '{': 396 // Bracketed usage, consume everything until first '}' exactly as os.Expand. 397 expandableContent, w = getShellName(s[j+1:]) 398 // Allow for some spaces. 399 expandableContent = strings.Trim(expandableContent, " ") 400 if len(expandableContent) > 1 && strings.Contains(expandableContent, string(configSourceNameDelimChar)) { 401 // Bracket expandableContent contains ':' treating it as a config source. 402 cfgSrcName, _ = getShellName(expandableContent) 403 } 404 405 default: 406 // Non-bracketed usage, ie.: found the prefix char, it can be either a config 407 // source or an environment variable. 408 var name string 409 name, w = getShellName(s[j+1:]) 410 expandableContent = name // Assume for now that it is an env var. 411 412 // Peek next char after name, if it is a config source name delimiter treat the remaining of the 413 // string as a config source. 414 if j+w+1 < len(s) && s[j+w+1] == configSourceNameDelimChar { 415 // This is a config source, since it is not delimited it will consume until end of the string. 416 cfgSrcName = name 417 expandableContent = s[j+1:] 418 w = len(expandableContent) // Set consumed bytes to the length of expandableContent 419 } 420 } 421 422 switch { 423 case cfgSrcName == "": 424 // Not a config source, expand as os.ExpandEnv 425 buf = osExpandEnv(buf, expandableContent, w) 426 427 default: 428 // A config source, retrieve and apply results. 429 retrieved, err := m.retrieveConfigSourceData(ctx, cfgSrcName, expandableContent) 430 if err != nil { 431 return nil, err 432 } 433 434 consumedAll := j+w+1 == len(s) 435 if consumedAll && len(buf) == 0 { 436 // This is the only expandableContent on the string, config 437 // source is free to return interface{}. 438 return retrieved, nil 439 } 440 441 // Either there was a prefix already or there are still 442 // characters to be processed. 443 buf = append(buf, fmt.Sprintf("%v", retrieved)...) 444 } 445 446 j += w // move the index of the char being checked (j) by the number of characters consumed (w) on this iteration. 447 i = j + 1 // update start index (i) of next slice of bytes to be copied. 448 } 449 } 450 451 if buf == nil { 452 // No changes to original string, just return it. 453 return s, nil 454 } 455 456 // Return whatever was accumulated on the buffer plus the remaining of the original string. 457 return string(buf) + s[i:], nil 458} 459 460func (m *Manager) retrieveConfigSourceData(ctx context.Context, name, cfgSrcInvoke string) (interface{}, error) { 461 cfgSrc, ok := m.configSources[name] 462 if !ok { 463 return nil, newErrUnknownConfigSource(name) 464 } 465 466 // Expand any env vars on the selector and parameters. Nested config source usage 467 // is not supported. 468 cfgSrcInvoke = expandEnvVars(cfgSrcInvoke) 469 retrieved, err := m.expandConfigSource(ctx, cfgSrc, cfgSrcInvoke) 470 if err != nil { 471 return nil, err 472 } 473 474 return retrieved, nil 475} 476 477func newErrUnknownConfigSource(cfgSrcName string) error { 478 return &errUnknownConfigSource{ 479 fmt.Errorf(`config source %q not found if this was intended to be an environment variable use "${%s}" instead"`, cfgSrcName, cfgSrcName), 480 } 481} 482 483// parseCfgSrc extracts the reference to a config source from a string value. 484// The caller should check for error explicitly since it is possible for the 485// other values to have been partially set. 486func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err error) { 487 parts := strings.SplitN(s, string(configSourceNameDelimChar), 2) 488 if len(parts) != 2 { 489 err = fmt.Errorf("invalid config source syntax at %q, it must have at least the config source name and a selector", s) 490 return 491 } 492 cfgSrcName = strings.Trim(parts[0], " ") 493 494 // Separate multi-line and single line case. 495 afterCfgSrcName := parts[1] 496 switch { 497 case strings.Contains(afterCfgSrcName, "\n"): 498 // Multi-line, until the first \n it is the selector, everything after as YAML. 499 parts = strings.SplitN(afterCfgSrcName, "\n", 2) 500 selector = strings.Trim(parts[0], " ") 501 502 if len(parts) > 1 && len(parts[1]) > 0 { 503 var cp *configparser.Parser 504 cp, err = configparser.NewParserFromBuffer(bytes.NewReader([]byte(parts[1]))) 505 if err != nil { 506 return 507 } 508 params = cp.ToStringMap() 509 } 510 511 default: 512 // Single line, and parameters as URL query. 513 const selectorDelim string = "?" 514 parts = strings.SplitN(parts[1], selectorDelim, 2) 515 selector = strings.Trim(parts[0], " ") 516 517 if len(parts) == 2 { 518 paramsPart := parts[1] 519 params, err = parseParamsAsURLQuery(paramsPart) 520 if err != nil { 521 err = fmt.Errorf("invalid parameters syntax at %q: %w", s, err) 522 return 523 } 524 } 525 } 526 527 return cfgSrcName, selector, params, err 528} 529 530func parseParamsAsURLQuery(s string) (interface{}, error) { 531 values, err := url.ParseQuery(s) 532 if err != nil { 533 return nil, err 534 } 535 536 // Transform single array values in scalars. 537 params := make(map[string]interface{}) 538 for k, v := range values { 539 switch len(v) { 540 case 0: 541 params[k] = nil 542 case 1: 543 var iface interface{} 544 if err = yaml.Unmarshal([]byte(v[0]), &iface); err != nil { 545 return nil, err 546 } 547 params[k] = iface 548 default: 549 // It is a slice add element by element 550 elemSlice := make([]interface{}, 0, len(v)) 551 for _, elem := range v { 552 var iface interface{} 553 if err = yaml.Unmarshal([]byte(elem), &iface); err != nil { 554 return nil, err 555 } 556 elemSlice = append(elemSlice, iface) 557 } 558 params[k] = elemSlice 559 } 560 } 561 return params, err 562} 563 564// expandEnvVars is used to expand environment variables with the same syntax used 565// by config.Parser. 566func expandEnvVars(s string) string { 567 return os.Expand(s, func(str string) string { 568 // This allows escaping environment variable substitution via $$, e.g. 569 // - $FOO will be substituted with env var FOO 570 // - $$FOO will be replaced with $FOO 571 // - $$$FOO will be replaced with $ + substituted env var FOO 572 if str == "$" { 573 return "$" 574 } 575 return os.Getenv(str) 576 }) 577} 578 579// osExpandEnv replicate the internal behavior of os.ExpandEnv when handling env 580// vars updating the buffer accordingly. 581func osExpandEnv(buf []byte, name string, w int) []byte { 582 switch { 583 case name == "" && w > 0: 584 // Encountered invalid syntax; eat the 585 // characters. 586 case name == "" || name == "$": 587 // Valid syntax, but $ was not followed by a 588 // name. Leave the dollar character untouched. 589 buf = append(buf, expandPrefixChar) 590 default: 591 buf = append(buf, os.Getenv(name)...) 592 } 593 594 return buf 595} 596 597// Below are helper functions used by os.Expand, copied without changes from original sources (env.go). 598 599// isShellSpecialVar reports whether the character identifies a special 600// shell variable such as $*. 601func isShellSpecialVar(c uint8) bool { 602 switch c { 603 case '*', '#', '$', '@', '!', '?', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': 604 return true 605 } 606 return false 607} 608 609// isAlphaNum reports whether the byte is an ASCII letter, number, or underscore 610func isAlphaNum(c uint8) bool { 611 return c == '_' || '0' <= c && c <= '9' || 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' 612} 613 614// getShellName returns the name that begins the string and the number of bytes 615// consumed to extract it. If the name is enclosed in {}, it's part of a ${} 616// expansion and two more bytes are needed than the length of the name. 617func getShellName(s string) (string, int) { 618 switch { 619 case s[0] == '{': 620 if len(s) > 2 && isShellSpecialVar(s[1]) && s[2] == '}' { 621 return s[1:2], 3 622 } 623 // Scan to closing brace 624 for i := 1; i < len(s); i++ { 625 if s[i] == '}' { 626 if i == 1 { 627 // Bad syntax; eat "${}" 628 return "", 2 629 } 630 return s[1:i], i + 1 631 } 632 } 633 // Bad syntax; eat "${" 634 return "", 1 635 case isShellSpecialVar(s[0]): 636 return s[0:1], 1 637 } 638 // Scan alphanumerics. 639 var i int 640 for i = 0; i < len(s) && isAlphaNum(s[i]); i++ { 641 } 642 return s[:i], i 643} 644