1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4// Package reloader contains helpers to trigger reloads of Prometheus instances 5// on configuration changes and to substitute environment variables in config files. 6// 7// Reloader type is useful when you want to: 8// 9// * Watch on changes against certain file e.g (`cfgFile`). 10// * Optionally, specify different different output file for watched `cfgFile` (`cfgOutputFile`). 11// This will also try decompress the `cfgFile` if needed and substitute ALL the envvars using Kubernetes substitution format: (`$(var)`) 12// * Watch on changes against certain directories (`ruleDires`). 13// 14// Once any of those two changes Prometheus on given `reloadURL` will be notified, causing Prometheus to reload configuration and rules. 15// 16// This and below for reloader: 17// 18// u, _ := url.Parse("http://localhost:9090") 19// rl := reloader.New( 20// nil, 21// reloader.ReloadURLFromBase(u), 22// "/path/to/cfg", 23// "/path/to/cfg.out", 24// []string{"/path/to/dirs"}, 25// ) 26// 27// The url of reloads can be generated with function ReloadURLFromBase(). 28// It will append the default path of reload into the given url: 29// 30// u, _ := url.Parse("http://localhost:9090") 31// reloader.ReloadURLFromBase(u) // It will return "http://localhost:9090/-/reload" 32// 33// Start watching changes and stopped until the context gets canceled: 34// 35// ctx, cancel := context.WithCancel(context.Background()) 36// go func() { 37// if err := rl.Watch(ctx); err != nil { 38// log.Fatal(err) 39// } 40// }() 41// // ... 42// cancel() 43// 44// By default, reloader will make a schedule to check the given config files and dirs of sum of hash with the last result, 45// even if it is no changes. 46// 47// A basic example of configuration template with environment variables: 48// 49// global: 50// external_labels: 51// replica: '$(HOSTNAME)' 52package reloader 53 54import ( 55 "bytes" 56 "compress/gzip" 57 "context" 58 "crypto/sha256" 59 "hash" 60 "io" 61 "io/ioutil" 62 "net/http" 63 "net/url" 64 "os" 65 "path" 66 "path/filepath" 67 "regexp" 68 "strings" 69 "time" 70 71 "github.com/fsnotify/fsnotify" 72 "github.com/go-kit/kit/log" 73 "github.com/go-kit/kit/log/level" 74 "github.com/pkg/errors" 75 "github.com/thanos-io/thanos/pkg/runutil" 76) 77 78// Reloader can watch config files and trigger reloads of a Prometheus server. 79// It optionally substitutes environment variables in the configuration. 80// Referenced environment variables must be of the form `$(var)` (not `$var` or `${var}`). 81type Reloader struct { 82 logger log.Logger 83 reloadURL *url.URL 84 cfgFile string 85 cfgOutputFile string 86 ruleDirs []string 87 watchInterval time.Duration 88 retryInterval time.Duration 89 90 lastCfgHash []byte 91 lastRuleHash []byte 92} 93 94var firstGzipBytes = []byte{0x1f, 0x8b, 0x08} 95 96// New creates a new reloader that watches the given config file and rule directory 97// and triggers a Prometheus reload upon changes. 98// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables 99// will be substituted and the output written into the given path. Prometheus should then use 100// cfgOutputFile as its config file path. 101func New(logger log.Logger, reloadURL *url.URL, cfgFile string, cfgOutputFile string, ruleDirs []string) *Reloader { 102 if logger == nil { 103 logger = log.NewNopLogger() 104 } 105 return &Reloader{ 106 logger: logger, 107 reloadURL: reloadURL, 108 cfgFile: cfgFile, 109 cfgOutputFile: cfgOutputFile, 110 ruleDirs: ruleDirs, 111 watchInterval: 3 * time.Minute, 112 retryInterval: 5 * time.Second, 113 } 114} 115 116// We cannot detect everything via watch. Watch interval controls how often we re-read given dirs non-recursively. 117func (r *Reloader) WithWatchInterval(duration time.Duration) { 118 r.watchInterval = duration 119} 120 121// Watch starts to watch periodically the config file and rules and process them until the context 122// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if 123// config or rules changed. 124// Watch watchers periodically based on r.watchInterval. 125// For config file it watches it directly as well via fsnotify. 126// It watches rule dirs as well, but lot's of edge cases are missing, so rely on interval mostly. 127func (r *Reloader) Watch(ctx context.Context) error { 128 watcher, err := fsnotify.NewWatcher() 129 if err != nil { 130 return errors.Wrap(err, "create watcher") 131 } 132 defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close") 133 134 watchables := map[string]struct{}{} 135 if r.cfgFile != "" { 136 watchables[filepath.Dir(r.cfgFile)] = struct{}{} 137 if err := watcher.Add(r.cfgFile); err != nil { 138 return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile) 139 } 140 141 if err := r.apply(ctx); err != nil { 142 return err 143 } 144 } 145 146 // Watch rule dirs in best effort manner. 147 for _, ruleDir := range r.ruleDirs { 148 watchables[filepath.Dir(ruleDir)] = struct{}{} 149 if err := watcher.Add(ruleDir); err != nil { 150 return errors.Wrapf(err, "add rule dir %s to watcher", ruleDir) 151 } 152 } 153 154 tick := time.NewTicker(r.watchInterval) 155 defer tick.Stop() 156 157 level.Info(r.logger).Log( 158 "msg", "started watching config file and non-recursively rule dirs for changes", 159 "cfg", r.cfgFile, 160 "out", r.cfgOutputFile, 161 "dirs", strings.Join(r.ruleDirs, ",")) 162 163 for { 164 select { 165 case <-ctx.Done(): 166 return nil 167 case <-tick.C: 168 case event := <-watcher.Events: 169 // TODO(bwplotka): Add metric if we are not cycling CPU here too much. 170 if _, ok := watchables[filepath.Dir(event.Name)]; !ok { 171 continue 172 } 173 case err := <-watcher.Errors: 174 level.Error(r.logger).Log("msg", "watch error", "err", err) 175 continue 176 } 177 178 if err := r.apply(ctx); err != nil { 179 // Critical error. 180 return err 181 } 182 } 183} 184 185// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also 186// expand env vars into config file before reloading. 187// Reload is retried in retryInterval until watchInterval. 188func (r *Reloader) apply(ctx context.Context) error { 189 var ( 190 cfgHash []byte 191 ruleHash []byte 192 ) 193 if r.cfgFile != "" { 194 h := sha256.New() 195 if err := hashFile(h, r.cfgFile); err != nil { 196 return errors.Wrap(err, "hash file") 197 } 198 cfgHash = h.Sum(nil) 199 if r.cfgOutputFile != "" { 200 b, err := ioutil.ReadFile(r.cfgFile) 201 if err != nil { 202 return errors.Wrap(err, "read file") 203 } 204 205 // Detect and extract gzipped file. 206 if bytes.Equal(b[0:3], firstGzipBytes) { 207 zr, err := gzip.NewReader(bytes.NewReader(b)) 208 if err != nil { 209 return errors.Wrap(err, "create gzip reader") 210 } 211 defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close") 212 213 b, err = ioutil.ReadAll(zr) 214 if err != nil { 215 return errors.Wrap(err, "read compressed config file") 216 } 217 } 218 219 b, err = expandEnv(b) 220 if err != nil { 221 return errors.Wrap(err, "expand environment variables") 222 } 223 224 tmpFile := r.cfgOutputFile + ".tmp" 225 defer func() { 226 _ = os.Remove(tmpFile) 227 }() 228 if err := ioutil.WriteFile(tmpFile, b, 0666); err != nil { 229 return errors.Wrap(err, "write file") 230 } 231 if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil { 232 return errors.Wrap(err, "rename file") 233 } 234 } 235 } 236 237 h := sha256.New() 238 for _, ruleDir := range r.ruleDirs { 239 walkDir, err := filepath.EvalSymlinks(ruleDir) 240 if err != nil { 241 return errors.Wrap(err, "ruleDir symlink eval") 242 } 243 err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error { 244 if err != nil { 245 return err 246 } 247 248 // filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not 249 // follow symlinks. Make sure to follow a symlink before checking 250 // if it is a directory. 251 targetFile, err := os.Stat(path) 252 if err != nil { 253 return err 254 } 255 256 if targetFile.IsDir() { 257 return nil 258 } 259 260 if err := hashFile(h, path); err != nil { 261 return err 262 } 263 return nil 264 }) 265 if err != nil { 266 return errors.Wrap(err, "build hash") 267 } 268 } 269 if len(r.ruleDirs) > 0 { 270 ruleHash = h.Sum(nil) 271 } 272 273 if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastRuleHash, ruleHash) { 274 // Nothing to do. 275 return nil 276 } 277 278 // Retry trigger reload until it succeeded or next tick is near. 279 retryCtx, cancel := context.WithTimeout(ctx, r.watchInterval) 280 defer cancel() 281 282 if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error { 283 if err := r.triggerReload(ctx); err != nil { 284 return errors.Wrap(err, "trigger reload") 285 } 286 287 r.lastCfgHash = cfgHash 288 r.lastRuleHash = ruleHash 289 level.Info(r.logger).Log( 290 "msg", "Prometheus reload triggered", 291 "cfg_in", r.cfgFile, 292 "cfg_out", r.cfgOutputFile, 293 "rule_dirs", strings.Join(r.ruleDirs, ", ")) 294 return nil 295 }); err != nil { 296 level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err) 297 } 298 299 return nil 300} 301 302func hashFile(h hash.Hash, fn string) error { 303 f, err := os.Open(fn) 304 if err != nil { 305 return err 306 } 307 defer f.Close() 308 309 if _, err := h.Write([]byte{'\xff'}); err != nil { 310 return err 311 } 312 if _, err := h.Write([]byte(fn)); err != nil { 313 return err 314 } 315 if _, err := h.Write([]byte{'\xff'}); err != nil { 316 return err 317 } 318 319 if _, err := io.Copy(h, f); err != nil { 320 return err 321 } 322 return nil 323} 324 325func (r *Reloader) triggerReload(ctx context.Context) error { 326 req, err := http.NewRequest("POST", r.reloadURL.String(), nil) 327 if err != nil { 328 return errors.Wrap(err, "create request") 329 } 330 req = req.WithContext(ctx) 331 332 resp, err := http.DefaultClient.Do(req) 333 if err != nil { 334 return errors.Wrap(err, "reload request failed") 335 } 336 defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") 337 338 if resp.StatusCode != 200 { 339 return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status) 340 } 341 return nil 342} 343 344// ReloadURLFromBase returns the standard Prometheus reload URL from its base URL. 345func ReloadURLFromBase(u *url.URL) *url.URL { 346 r := *u 347 r.Path = path.Join(r.Path, "/-/reload") 348 return &r 349} 350 351var envRe = regexp.MustCompile(`\$\(([a-zA-Z_0-9]+)\)`) 352 353func expandEnv(b []byte) (r []byte, err error) { 354 r = envRe.ReplaceAllFunc(b, func(n []byte) []byte { 355 if err != nil { 356 return nil 357 } 358 n = n[2 : len(n)-1] 359 360 v, ok := os.LookupEnv(string(n)) 361 if !ok { 362 err = errors.Errorf("found reference to unset environment variable %q", n) 363 return nil 364 } 365 return []byte(v) 366 }) 367 return r, err 368} 369