1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package main
5
6import (
7	"context"
8	"math"
9	"net/http"
10	"net/url"
11	"sync"
12	"time"
13
14	"github.com/go-kit/kit/log"
15	"github.com/go-kit/kit/log/level"
16	grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
17	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
18	"github.com/oklog/run"
19	"github.com/opentracing/opentracing-go"
20	"github.com/pkg/errors"
21	"github.com/prometheus/client_golang/prometheus"
22	"github.com/prometheus/client_golang/prometheus/promauto"
23	"github.com/prometheus/common/model"
24	"github.com/prometheus/prometheus/pkg/labels"
25	"github.com/thanos-io/thanos/pkg/block/metadata"
26	"github.com/thanos-io/thanos/pkg/component"
27	"github.com/thanos-io/thanos/pkg/exemplars"
28	"github.com/thanos-io/thanos/pkg/extflag"
29	"github.com/thanos-io/thanos/pkg/exthttp"
30	"github.com/thanos-io/thanos/pkg/extkingpin"
31	"github.com/thanos-io/thanos/pkg/extprom"
32	thanoshttp "github.com/thanos-io/thanos/pkg/http"
33	"github.com/thanos-io/thanos/pkg/logging"
34	meta "github.com/thanos-io/thanos/pkg/metadata"
35	thanosmodel "github.com/thanos-io/thanos/pkg/model"
36	"github.com/thanos-io/thanos/pkg/objstore/client"
37	"github.com/thanos-io/thanos/pkg/prober"
38	"github.com/thanos-io/thanos/pkg/promclient"
39	"github.com/thanos-io/thanos/pkg/reloader"
40	"github.com/thanos-io/thanos/pkg/rules"
41	"github.com/thanos-io/thanos/pkg/runutil"
42	grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
43	httpserver "github.com/thanos-io/thanos/pkg/server/http"
44	"github.com/thanos-io/thanos/pkg/shipper"
45	"github.com/thanos-io/thanos/pkg/store"
46	"github.com/thanos-io/thanos/pkg/targets"
47	"github.com/thanos-io/thanos/pkg/tls"
48	"github.com/thanos-io/thanos/pkg/tracing"
49)
50
51func registerSidecar(app *extkingpin.App) {
52	cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server.")
53	conf := &sidecarConfig{}
54	conf.registerFlag(cmd)
55	cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
56		tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", conf.reqLogConfig)
57		if err != nil {
58			return errors.Wrap(err, "error while parsing config for request logging")
59		}
60
61		rl := reloader.New(log.With(logger, "component", "reloader"),
62			extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),
63			&reloader.Options{
64				ReloadURL:     reloader.ReloadURLFromBase(conf.prometheus.url),
65				CfgFile:       conf.reloader.confFile,
66				CfgOutputFile: conf.reloader.envVarConfFile,
67				WatchedDirs:   conf.reloader.ruleDirectories,
68				WatchInterval: conf.reloader.watchInterval,
69				RetryInterval: conf.reloader.retryInterval,
70			})
71
72		return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
73	})
74}
75
76func runSidecar(
77	g *run.Group,
78	logger log.Logger,
79	reg *prometheus.Registry,
80	tracer opentracing.Tracer,
81	reloader *reloader.Reloader,
82	comp component.Component,
83	conf sidecarConfig,
84	grpcLogOpts []grpc_logging.Option,
85	tagOpts []tags.Option,
86) error {
87	var m = &promMetadata{
88		promURL: conf.prometheus.url,
89
90		// Start out with the full time range. The shipper will constrain it later.
91		// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
92		mint: conf.limitMinTime.PrometheusTimestamp(),
93		maxt: math.MaxInt64,
94
95		limitMinTime: conf.limitMinTime,
96		client:       promclient.NewWithTracingClient(logger, "thanos-sidecar"),
97	}
98
99	confContentYaml, err := conf.objStore.Content()
100	if err != nil {
101		return errors.Wrap(err, "getting object store config")
102	}
103
104	var uploads = true
105	if len(confContentYaml) == 0 {
106		level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
107		uploads = false
108	}
109
110	grpcProbe := prober.NewGRPC()
111	httpProbe := prober.NewHTTP()
112	statusProber := prober.Combine(
113		httpProbe,
114		grpcProbe,
115		prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
116	)
117
118	srv := httpserver.New(logger, reg, comp, httpProbe,
119		httpserver.WithListen(conf.http.bindAddress),
120		httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
121	)
122
123	g.Add(func() error {
124		statusProber.Healthy()
125
126		return srv.ListenAndServe()
127	}, func(err error) {
128		statusProber.NotReady(err)
129		defer statusProber.NotHealthy(err)
130
131		srv.Shutdown(err)
132	})
133
134	// Setup all the concurrent groups.
135	{
136		promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
137			Name: "thanos_sidecar_prometheus_up",
138			Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",
139		})
140		lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
141			Name: "thanos_sidecar_last_heartbeat_success_time_seconds",
142			Help: "Timestamp of the last successful heartbeat in seconds.",
143		})
144
145		ctx, cancel := context.WithCancel(context.Background())
146		g.Add(func() error {
147			// Only check Prometheus's flags when upload is enabled.
148			if uploads {
149				// Check prometheus's flags to ensure sane sidecar flags.
150				if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
151					return errors.Wrap(err, "validate Prometheus flags")
152				}
153			}
154
155			// Blocking query of external labels before joining as a Source Peer into gossip.
156			// We retry infinitely until we reach and fetch labels from our Prometheus.
157			err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
158				if err := m.UpdateLabels(ctx); err != nil {
159					level.Warn(logger).Log(
160						"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
161						"err", err,
162					)
163					promUp.Set(0)
164					statusProber.NotReady(err)
165					return err
166				}
167
168				level.Info(logger).Log(
169					"msg", "successfully loaded prometheus external labels",
170					"external_labels", m.Labels().String(),
171				)
172				promUp.Set(1)
173				statusProber.Ready()
174				lastHeartbeat.SetToCurrentTime()
175				return nil
176			})
177			if err != nil {
178				return errors.Wrap(err, "initial external labels query")
179			}
180
181			if len(m.Labels()) == 0 {
182				return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
183			}
184
185			// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
186			// the external labels we apply.
187			return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
188				iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
189				defer iterCancel()
190
191				if err := m.UpdateLabels(iterCtx); err != nil {
192					level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
193					promUp.Set(0)
194				} else {
195					promUp.Set(1)
196					lastHeartbeat.SetToCurrentTime()
197				}
198
199				return nil
200			})
201		}, func(error) {
202			cancel()
203		})
204	}
205	{
206		ctx, cancel := context.WithCancel(context.Background())
207		g.Add(func() error {
208			return reloader.Watch(ctx)
209		}, func(error) {
210			cancel()
211		})
212	}
213
214	{
215		t := exthttp.NewTransport()
216		t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
217		t.MaxIdleConns = conf.connection.maxIdleConns
218		c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)
219
220		promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
221		if err != nil {
222			return errors.Wrap(err, "create Prometheus store")
223		}
224
225		tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"),
226			conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
227		if err != nil {
228			return errors.Wrap(err, "setup gRPC server")
229		}
230
231		s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
232			grpcserver.WithServer(store.RegisterStoreServer(promStore)),
233			grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
234			grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
235			grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
236			grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
237			grpcserver.WithListen(conf.grpc.bindAddress),
238			grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
239			grpcserver.WithTLSConfig(tlsCfg),
240		)
241		g.Add(func() error {
242			statusProber.Ready()
243			return s.ListenAndServe()
244		}, func(err error) {
245			statusProber.NotReady(err)
246			s.Shutdown(err)
247		})
248	}
249
250	if uploads {
251		// The background shipper continuously scans the data directory and uploads
252		// new blocks to Google Cloud Storage or an S3-compatible storage service.
253		bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
254		if err != nil {
255			return err
256		}
257
258		// Ensure we close up everything properly.
259		defer func() {
260			if err != nil {
261				runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
262			}
263		}()
264
265		if err := promclient.IsWALDirAccessible(conf.tsdb.path); err != nil {
266			level.Error(logger).Log("err", err)
267		}
268
269		ctx, cancel := context.WithCancel(context.Background())
270		g.Add(func() error {
271			defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
272
273			promReadyTimeout := conf.prometheus.readyTimeout
274			extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout)
275			defer cancel()
276
277			if err := runutil.Retry(2*time.Second, extLabelsCtx.Done(), func() error {
278				if len(m.Labels()) == 0 {
279					return errors.New("not uploading as no external labels are configured yet - is Prometheus healthy/reachable?")
280				}
281				return nil
282			}); err != nil {
283				return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
284			}
285
286			s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
287				conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
288
289			return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
290				if uploaded, err := s.Sync(ctx); err != nil {
291					level.Warn(logger).Log("err", err, "uploaded", uploaded)
292				}
293
294				minTime, _, err := s.Timestamps()
295				if err != nil {
296					level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
297					return nil
298				}
299				m.UpdateTimestamps(minTime, math.MaxInt64)
300				return nil
301			})
302		}, func(error) {
303			cancel()
304		})
305	}
306
307	level.Info(logger).Log("msg", "starting sidecar")
308	return nil
309}
310
311func validatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error {
312	var (
313		flagErr error
314		flags   promclient.Flags
315	)
316
317	if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
318		if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound {
319			level.Warn(logger).Log("msg", "failed to get Prometheus flags. Is Prometheus running? Retrying", "err", flagErr)
320			return errors.Wrapf(flagErr, "fetch Prometheus flags")
321		}
322		return nil
323	}); err != nil {
324		return errors.Wrapf(err, "fetch Prometheus flags")
325	}
326
327	if flagErr != nil {
328		level.Warn(logger).Log("msg", "failed to check Prometheus flags, due to potentially older Prometheus. No extra validation is done.", "err", flagErr)
329		return nil
330	}
331
332	// Check if compaction is disabled.
333	if flags.TSDBMinTime != flags.TSDBMaxTime {
334		if !ignoreBlockSize {
335			return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
336				"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime)
337		}
338		level.Warn(logger).Log("msg", "flag to ignore Prometheus min/max block duration flags differing is being used. If the upload of a 2h block fails and a Prometheus compaction happens that block may be missing from your Thanos bucket storage.")
339	}
340	// Check if block time is 2h.
341	if flags.TSDBMinTime != model.Duration(2*time.Hour) {
342		level.Warn(logger).Log("msg", "found that TSDB block time is not 2h. Only 2h block time is recommended.", "block-time", flags.TSDBMinTime)
343	}
344
345	return nil
346}
347
348type promMetadata struct {
349	promURL *url.URL
350
351	mtx    sync.Mutex
352	mint   int64
353	maxt   int64
354	labels labels.Labels
355
356	limitMinTime thanosmodel.TimeOrDurationValue
357
358	client *promclient.Client
359}
360
361func (s *promMetadata) UpdateLabels(ctx context.Context) error {
362	elset, err := s.client.ExternalLabels(ctx, s.promURL)
363	if err != nil {
364		return err
365	}
366
367	s.mtx.Lock()
368	defer s.mtx.Unlock()
369
370	s.labels = elset
371	return nil
372}
373
374func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
375	s.mtx.Lock()
376	defer s.mtx.Unlock()
377
378	if mint < s.limitMinTime.PrometheusTimestamp() {
379		mint = s.limitMinTime.PrometheusTimestamp()
380	}
381
382	s.mint = mint
383	s.maxt = maxt
384}
385
386func (s *promMetadata) Labels() labels.Labels {
387	s.mtx.Lock()
388	defer s.mtx.Unlock()
389
390	return s.labels
391}
392
393func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
394	s.mtx.Lock()
395	defer s.mtx.Unlock()
396
397	return s.mint, s.maxt
398}
399
400type sidecarConfig struct {
401	http         httpConfig
402	grpc         grpcConfig
403	prometheus   prometheusConfig
404	connection   connConfig
405	tsdb         tsdbConfig
406	reloader     reloaderConfig
407	reqLogConfig *extflag.PathOrContent
408	objStore     extflag.PathOrContent
409	shipper      shipperConfig
410	limitMinTime thanosmodel.TimeOrDurationValue
411}
412
413func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
414	sc.http.registerFlag(cmd)
415	sc.grpc.registerFlag(cmd)
416	sc.prometheus.registerFlag(cmd)
417	sc.connection.registerFlag(cmd)
418	sc.tsdb.registerFlag(cmd)
419	sc.reloader.registerFlag(cmd)
420	sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
421	sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
422	sc.shipper.registerFlag(cmd)
423	cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
424		Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
425}
426