1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4// Package reloader contains helpers to trigger reloads of Prometheus instances
5// on configuration changes and to substitute environment variables in config files.
6//
7// Reloader type is useful when you want to:
8//
9// 	* Watch on changes against certain file e.g (`cfgFile`).
10// 	* Optionally, specify different different output file for watched `cfgFile` (`cfgOutputFile`).
11// 	This will also try decompress the `cfgFile` if needed and substitute ALL the envvars using Kubernetes substitution format: (`$(var)`)
12// 	* Watch on changes against certain directories (`ruleDires`).
13//
14// Once any of those two changes Prometheus on given `reloadURL` will be notified, causing Prometheus to reload configuration and rules.
15//
16// This and below for reloader:
17//
18// 	u, _ := url.Parse("http://localhost:9090")
19// 	rl := reloader.New(
20// 		nil,
21// 		reloader.ReloadURLFromBase(u),
22// 		"/path/to/cfg",
23// 		"/path/to/cfg.out",
24// 		[]string{"/path/to/dirs"},
25// 	)
26//
27// The url of reloads can be generated with function ReloadURLFromBase().
28// It will append the default path of reload into the given url:
29//
30// 	u, _ := url.Parse("http://localhost:9090")
31// 	reloader.ReloadURLFromBase(u) // It will return "http://localhost:9090/-/reload"
32//
33// Start watching changes and stopped until the context gets canceled:
34//
35// 	ctx, cancel := context.WithCancel(context.Background())
36// 	go func() {
37// 		if err := rl.Watch(ctx); err != nil {
38// 			log.Fatal(err)
39// 		}
40// 	}()
41// 	// ...
42// 	cancel()
43//
44// By default, reloader will make a schedule to check the given config files and dirs of sum of hash with the last result,
45// even if it is no changes.
46//
47// A basic example of configuration template with environment variables:
48//
49//   global:
50//     external_labels:
51//       replica: '$(HOSTNAME)'
52package reloader
53
54import (
55	"bytes"
56	"compress/gzip"
57	"context"
58	"crypto/sha256"
59	"hash"
60	"io"
61	"io/ioutil"
62	"net/http"
63	"net/url"
64	"os"
65	"path"
66	"path/filepath"
67	"regexp"
68	"strings"
69	"time"
70
71	"github.com/fsnotify/fsnotify"
72	"github.com/go-kit/kit/log"
73	"github.com/go-kit/kit/log/level"
74	"github.com/pkg/errors"
75	"github.com/thanos-io/thanos/pkg/runutil"
76)
77
78// Reloader can watch config files and trigger reloads of a Prometheus server.
79// It optionally substitutes environment variables in the configuration.
80// Referenced environment variables must be of the form `$(var)` (not `$var` or `${var}`).
81type Reloader struct {
82	logger        log.Logger
83	reloadURL     *url.URL
84	cfgFile       string
85	cfgOutputFile string
86	ruleDirs      []string
87	watchInterval time.Duration
88	retryInterval time.Duration
89
90	lastCfgHash  []byte
91	lastRuleHash []byte
92}
93
94var firstGzipBytes = []byte{0x1f, 0x8b, 0x08}
95
96// New creates a new reloader that watches the given config file and rule directory
97// and triggers a Prometheus reload upon changes.
98// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
99// will be substituted and the output written into the given path. Prometheus should then use
100// cfgOutputFile as its config file path.
101func New(logger log.Logger, reloadURL *url.URL, cfgFile string, cfgOutputFile string, ruleDirs []string) *Reloader {
102	if logger == nil {
103		logger = log.NewNopLogger()
104	}
105	return &Reloader{
106		logger:        logger,
107		reloadURL:     reloadURL,
108		cfgFile:       cfgFile,
109		cfgOutputFile: cfgOutputFile,
110		ruleDirs:      ruleDirs,
111		watchInterval: 3 * time.Minute,
112		retryInterval: 5 * time.Second,
113	}
114}
115
116// We cannot detect everything via watch. Watch interval controls how often we re-read given dirs non-recursively.
117func (r *Reloader) WithWatchInterval(duration time.Duration) {
118	r.watchInterval = duration
119}
120
121// Watch starts to watch periodically the config file and rules and process them until the context
122// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if
123// config or rules changed.
124// Watch watchers periodically based on r.watchInterval.
125// For config file it watches it directly as well via fsnotify.
126// It watches rule dirs as well, but lot's of edge cases are missing, so rely on interval mostly.
127func (r *Reloader) Watch(ctx context.Context) error {
128	watcher, err := fsnotify.NewWatcher()
129	if err != nil {
130		return errors.Wrap(err, "create watcher")
131	}
132	defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close")
133
134	watchables := map[string]struct{}{}
135	if r.cfgFile != "" {
136		watchables[filepath.Dir(r.cfgFile)] = struct{}{}
137		if err := watcher.Add(r.cfgFile); err != nil {
138			return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile)
139		}
140
141		if err := r.apply(ctx); err != nil {
142			return err
143		}
144	}
145
146	// Watch rule dirs in best effort manner.
147	for _, ruleDir := range r.ruleDirs {
148		watchables[filepath.Dir(ruleDir)] = struct{}{}
149		if err := watcher.Add(ruleDir); err != nil {
150			return errors.Wrapf(err, "add rule dir %s to watcher", ruleDir)
151		}
152	}
153
154	tick := time.NewTicker(r.watchInterval)
155	defer tick.Stop()
156
157	level.Info(r.logger).Log(
158		"msg", "started watching config file and non-recursively rule dirs for changes",
159		"cfg", r.cfgFile,
160		"out", r.cfgOutputFile,
161		"dirs", strings.Join(r.ruleDirs, ","))
162
163	for {
164		select {
165		case <-ctx.Done():
166			return nil
167		case <-tick.C:
168		case event := <-watcher.Events:
169			// TODO(bwplotka): Add metric if we are not cycling CPU here too much.
170			if _, ok := watchables[filepath.Dir(event.Name)]; !ok {
171				continue
172			}
173		case err := <-watcher.Errors:
174			level.Error(r.logger).Log("msg", "watch error", "err", err)
175			continue
176		}
177
178		if err := r.apply(ctx); err != nil {
179			// Critical error.
180			return err
181		}
182	}
183}
184
185// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
186// expand env vars into config file before reloading.
187// Reload is retried in retryInterval until watchInterval.
188func (r *Reloader) apply(ctx context.Context) error {
189	var (
190		cfgHash  []byte
191		ruleHash []byte
192	)
193	if r.cfgFile != "" {
194		h := sha256.New()
195		if err := hashFile(h, r.cfgFile); err != nil {
196			return errors.Wrap(err, "hash file")
197		}
198		cfgHash = h.Sum(nil)
199		if r.cfgOutputFile != "" {
200			b, err := ioutil.ReadFile(r.cfgFile)
201			if err != nil {
202				return errors.Wrap(err, "read file")
203			}
204
205			// Detect and extract gzipped file.
206			if bytes.Equal(b[0:3], firstGzipBytes) {
207				zr, err := gzip.NewReader(bytes.NewReader(b))
208				if err != nil {
209					return errors.Wrap(err, "create gzip reader")
210				}
211				defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")
212
213				b, err = ioutil.ReadAll(zr)
214				if err != nil {
215					return errors.Wrap(err, "read compressed config file")
216				}
217			}
218
219			b, err = expandEnv(b)
220			if err != nil {
221				return errors.Wrap(err, "expand environment variables")
222			}
223
224			tmpFile := r.cfgOutputFile + ".tmp"
225			defer func() {
226				_ = os.Remove(tmpFile)
227			}()
228			if err := ioutil.WriteFile(tmpFile, b, 0666); err != nil {
229				return errors.Wrap(err, "write file")
230			}
231			if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {
232				return errors.Wrap(err, "rename file")
233			}
234		}
235	}
236
237	h := sha256.New()
238	for _, ruleDir := range r.ruleDirs {
239		walkDir, err := filepath.EvalSymlinks(ruleDir)
240		if err != nil {
241			return errors.Wrap(err, "ruleDir symlink eval")
242		}
243		err = filepath.Walk(walkDir, func(path string, f os.FileInfo, err error) error {
244			if err != nil {
245				return err
246			}
247
248			// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
249			// follow symlinks. Make sure to follow a symlink before checking
250			// if it is a directory.
251			targetFile, err := os.Stat(path)
252			if err != nil {
253				return err
254			}
255
256			if targetFile.IsDir() {
257				return nil
258			}
259
260			if err := hashFile(h, path); err != nil {
261				return err
262			}
263			return nil
264		})
265		if err != nil {
266			return errors.Wrap(err, "build hash")
267		}
268	}
269	if len(r.ruleDirs) > 0 {
270		ruleHash = h.Sum(nil)
271	}
272
273	if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastRuleHash, ruleHash) {
274		// Nothing to do.
275		return nil
276	}
277
278	// Retry trigger reload until it succeeded or next tick is near.
279	retryCtx, cancel := context.WithTimeout(ctx, r.watchInterval)
280	defer cancel()
281
282	if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {
283		if err := r.triggerReload(ctx); err != nil {
284			return errors.Wrap(err, "trigger reload")
285		}
286
287		r.lastCfgHash = cfgHash
288		r.lastRuleHash = ruleHash
289		level.Info(r.logger).Log(
290			"msg", "Prometheus reload triggered",
291			"cfg_in", r.cfgFile,
292			"cfg_out", r.cfgOutputFile,
293			"rule_dirs", strings.Join(r.ruleDirs, ", "))
294		return nil
295	}); err != nil {
296		level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err)
297	}
298
299	return nil
300}
301
302func hashFile(h hash.Hash, fn string) error {
303	f, err := os.Open(fn)
304	if err != nil {
305		return err
306	}
307	defer f.Close()
308
309	if _, err := h.Write([]byte{'\xff'}); err != nil {
310		return err
311	}
312	if _, err := h.Write([]byte(fn)); err != nil {
313		return err
314	}
315	if _, err := h.Write([]byte{'\xff'}); err != nil {
316		return err
317	}
318
319	if _, err := io.Copy(h, f); err != nil {
320		return err
321	}
322	return nil
323}
324
325func (r *Reloader) triggerReload(ctx context.Context) error {
326	req, err := http.NewRequest("POST", r.reloadURL.String(), nil)
327	if err != nil {
328		return errors.Wrap(err, "create request")
329	}
330	req = req.WithContext(ctx)
331
332	resp, err := http.DefaultClient.Do(req)
333	if err != nil {
334		return errors.Wrap(err, "reload request failed")
335	}
336	defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
337
338	if resp.StatusCode != 200 {
339		return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
340	}
341	return nil
342}
343
344// ReloadURLFromBase returns the standard Prometheus reload URL from its base URL.
345func ReloadURLFromBase(u *url.URL) *url.URL {
346	r := *u
347	r.Path = path.Join(r.Path, "/-/reload")
348	return &r
349}
350
351var envRe = regexp.MustCompile(`\$\(([a-zA-Z_0-9]+)\)`)
352
353func expandEnv(b []byte) (r []byte, err error) {
354	r = envRe.ReplaceAllFunc(b, func(n []byte) []byte {
355		if err != nil {
356			return nil
357		}
358		n = n[2 : len(n)-1]
359
360		v, ok := os.LookupEnv(string(n))
361		if !ok {
362			err = errors.Errorf("found reference to unset environment variable %q", n)
363			return nil
364		}
365		return []byte(v)
366	})
367	return r, err
368}
369