1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package main 5 6import ( 7 "context" 8 "math" 9 "net/http" 10 "net/url" 11 "sync" 12 "time" 13 14 "github.com/go-kit/kit/log" 15 "github.com/go-kit/kit/log/level" 16 grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" 17 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" 18 "github.com/oklog/run" 19 "github.com/opentracing/opentracing-go" 20 "github.com/pkg/errors" 21 "github.com/prometheus/client_golang/prometheus" 22 "github.com/prometheus/client_golang/prometheus/promauto" 23 "github.com/prometheus/common/model" 24 "github.com/prometheus/prometheus/pkg/labels" 25 "github.com/thanos-io/thanos/pkg/block/metadata" 26 "github.com/thanos-io/thanos/pkg/component" 27 "github.com/thanos-io/thanos/pkg/exemplars" 28 "github.com/thanos-io/thanos/pkg/extflag" 29 "github.com/thanos-io/thanos/pkg/exthttp" 30 "github.com/thanos-io/thanos/pkg/extkingpin" 31 "github.com/thanos-io/thanos/pkg/extprom" 32 thanoshttp "github.com/thanos-io/thanos/pkg/http" 33 "github.com/thanos-io/thanos/pkg/logging" 34 meta "github.com/thanos-io/thanos/pkg/metadata" 35 thanosmodel "github.com/thanos-io/thanos/pkg/model" 36 "github.com/thanos-io/thanos/pkg/objstore/client" 37 "github.com/thanos-io/thanos/pkg/prober" 38 "github.com/thanos-io/thanos/pkg/promclient" 39 "github.com/thanos-io/thanos/pkg/reloader" 40 "github.com/thanos-io/thanos/pkg/rules" 41 "github.com/thanos-io/thanos/pkg/runutil" 42 grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" 43 httpserver "github.com/thanos-io/thanos/pkg/server/http" 44 "github.com/thanos-io/thanos/pkg/shipper" 45 "github.com/thanos-io/thanos/pkg/store" 46 "github.com/thanos-io/thanos/pkg/targets" 47 "github.com/thanos-io/thanos/pkg/tls" 48 "github.com/thanos-io/thanos/pkg/tracing" 49) 50 51func registerSidecar(app *extkingpin.App) { 52 cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server.") 53 conf := &sidecarConfig{} 54 conf.registerFlag(cmd) 55 cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { 56 tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", conf.reqLogConfig) 57 if err != nil { 58 return errors.Wrap(err, "error while parsing config for request logging") 59 } 60 61 rl := reloader.New(log.With(logger, "component", "reloader"), 62 extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg), 63 &reloader.Options{ 64 ReloadURL: reloader.ReloadURLFromBase(conf.prometheus.url), 65 CfgFile: conf.reloader.confFile, 66 CfgOutputFile: conf.reloader.envVarConfFile, 67 WatchedDirs: conf.reloader.ruleDirectories, 68 WatchInterval: conf.reloader.watchInterval, 69 RetryInterval: conf.reloader.retryInterval, 70 }) 71 72 return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts) 73 }) 74} 75 76func runSidecar( 77 g *run.Group, 78 logger log.Logger, 79 reg *prometheus.Registry, 80 tracer opentracing.Tracer, 81 reloader *reloader.Reloader, 82 comp component.Component, 83 conf sidecarConfig, 84 grpcLogOpts []grpc_logging.Option, 85 tagOpts []tags.Option, 86) error { 87 var m = &promMetadata{ 88 promURL: conf.prometheus.url, 89 90 // Start out with the full time range. The shipper will constrain it later. 91 // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. 92 mint: conf.limitMinTime.PrometheusTimestamp(), 93 maxt: math.MaxInt64, 94 95 limitMinTime: conf.limitMinTime, 96 client: promclient.NewWithTracingClient(logger, "thanos-sidecar"), 97 } 98 99 confContentYaml, err := conf.objStore.Content() 100 if err != nil { 101 return errors.Wrap(err, "getting object store config") 102 } 103 104 var uploads = true 105 if len(confContentYaml) == 0 { 106 level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") 107 uploads = false 108 } 109 110 grpcProbe := prober.NewGRPC() 111 httpProbe := prober.NewHTTP() 112 statusProber := prober.Combine( 113 httpProbe, 114 grpcProbe, 115 prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), 116 ) 117 118 srv := httpserver.New(logger, reg, comp, httpProbe, 119 httpserver.WithListen(conf.http.bindAddress), 120 httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)), 121 ) 122 123 g.Add(func() error { 124 statusProber.Healthy() 125 126 return srv.ListenAndServe() 127 }, func(err error) { 128 statusProber.NotReady(err) 129 defer statusProber.NotHealthy(err) 130 131 srv.Shutdown(err) 132 }) 133 134 // Setup all the concurrent groups. 135 { 136 promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ 137 Name: "thanos_sidecar_prometheus_up", 138 Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.", 139 }) 140 lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ 141 Name: "thanos_sidecar_last_heartbeat_success_time_seconds", 142 Help: "Timestamp of the last successful heartbeat in seconds.", 143 }) 144 145 ctx, cancel := context.WithCancel(context.Background()) 146 g.Add(func() error { 147 // Only check Prometheus's flags when upload is enabled. 148 if uploads { 149 // Check prometheus's flags to ensure sane sidecar flags. 150 if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { 151 return errors.Wrap(err, "validate Prometheus flags") 152 } 153 } 154 155 // Blocking query of external labels before joining as a Source Peer into gossip. 156 // We retry infinitely until we reach and fetch labels from our Prometheus. 157 err := runutil.Retry(2*time.Second, ctx.Done(), func() error { 158 if err := m.UpdateLabels(ctx); err != nil { 159 level.Warn(logger).Log( 160 "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", 161 "err", err, 162 ) 163 promUp.Set(0) 164 statusProber.NotReady(err) 165 return err 166 } 167 168 level.Info(logger).Log( 169 "msg", "successfully loaded prometheus external labels", 170 "external_labels", m.Labels().String(), 171 ) 172 promUp.Set(1) 173 statusProber.Ready() 174 lastHeartbeat.SetToCurrentTime() 175 return nil 176 }) 177 if err != nil { 178 return errors.Wrap(err, "initial external labels query") 179 } 180 181 if len(m.Labels()) == 0 { 182 return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") 183 } 184 185 // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating 186 // the external labels we apply. 187 return runutil.Repeat(30*time.Second, ctx.Done(), func() error { 188 iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second) 189 defer iterCancel() 190 191 if err := m.UpdateLabels(iterCtx); err != nil { 192 level.Warn(logger).Log("msg", "heartbeat failed", "err", err) 193 promUp.Set(0) 194 } else { 195 promUp.Set(1) 196 lastHeartbeat.SetToCurrentTime() 197 } 198 199 return nil 200 }) 201 }, func(error) { 202 cancel() 203 }) 204 } 205 { 206 ctx, cancel := context.WithCancel(context.Background()) 207 g.Add(func() error { 208 return reloader.Watch(ctx) 209 }, func(error) { 210 cancel() 211 }) 212 } 213 214 { 215 t := exthttp.NewTransport() 216 t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost 217 t.MaxIdleConns = conf.connection.maxIdleConns 218 c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent) 219 220 promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) 221 if err != nil { 222 return errors.Wrap(err, "create Prometheus store") 223 } 224 225 tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), 226 conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) 227 if err != nil { 228 return errors.Wrap(err, "setup gRPC server") 229 } 230 231 s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, 232 grpcserver.WithServer(store.RegisterStoreServer(promStore)), 233 grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), 234 grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))), 235 grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))), 236 grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))), 237 grpcserver.WithListen(conf.grpc.bindAddress), 238 grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), 239 grpcserver.WithTLSConfig(tlsCfg), 240 ) 241 g.Add(func() error { 242 statusProber.Ready() 243 return s.ListenAndServe() 244 }, func(err error) { 245 statusProber.NotReady(err) 246 s.Shutdown(err) 247 }) 248 } 249 250 if uploads { 251 // The background shipper continuously scans the data directory and uploads 252 // new blocks to Google Cloud Storage or an S3-compatible storage service. 253 bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String()) 254 if err != nil { 255 return err 256 } 257 258 // Ensure we close up everything properly. 259 defer func() { 260 if err != nil { 261 runutil.CloseWithLogOnErr(logger, bkt, "bucket client") 262 } 263 }() 264 265 if err := promclient.IsWALDirAccessible(conf.tsdb.path); err != nil { 266 level.Error(logger).Log("err", err) 267 } 268 269 ctx, cancel := context.WithCancel(context.Background()) 270 g.Add(func() error { 271 defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") 272 273 promReadyTimeout := conf.prometheus.readyTimeout 274 extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout) 275 defer cancel() 276 277 if err := runutil.Retry(2*time.Second, extLabelsCtx.Done(), func() error { 278 if len(m.Labels()) == 0 { 279 return errors.New("not uploading as no external labels are configured yet - is Prometheus healthy/reachable?") 280 } 281 return nil 282 }); err != nil { 283 return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout) 284 } 285 286 s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, 287 conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) 288 289 return runutil.Repeat(30*time.Second, ctx.Done(), func() error { 290 if uploaded, err := s.Sync(ctx); err != nil { 291 level.Warn(logger).Log("err", err, "uploaded", uploaded) 292 } 293 294 minTime, _, err := s.Timestamps() 295 if err != nil { 296 level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) 297 return nil 298 } 299 m.UpdateTimestamps(minTime, math.MaxInt64) 300 return nil 301 }) 302 }, func(error) { 303 cancel() 304 }) 305 } 306 307 level.Info(logger).Log("msg", "starting sidecar") 308 return nil 309} 310 311func validatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error { 312 var ( 313 flagErr error 314 flags promclient.Flags 315 ) 316 317 if err := runutil.Retry(2*time.Second, ctx.Done(), func() error { 318 if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound { 319 level.Warn(logger).Log("msg", "failed to get Prometheus flags. Is Prometheus running? Retrying", "err", flagErr) 320 return errors.Wrapf(flagErr, "fetch Prometheus flags") 321 } 322 return nil 323 }); err != nil { 324 return errors.Wrapf(err, "fetch Prometheus flags") 325 } 326 327 if flagErr != nil { 328 level.Warn(logger).Log("msg", "failed to check Prometheus flags, due to potentially older Prometheus. No extra validation is done.", "err", flagErr) 329 return nil 330 } 331 332 // Check if compaction is disabled. 333 if flags.TSDBMinTime != flags.TSDBMaxTime { 334 if !ignoreBlockSize { 335 return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ 336 "Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime) 337 } 338 level.Warn(logger).Log("msg", "flag to ignore Prometheus min/max block duration flags differing is being used. If the upload of a 2h block fails and a Prometheus compaction happens that block may be missing from your Thanos bucket storage.") 339 } 340 // Check if block time is 2h. 341 if flags.TSDBMinTime != model.Duration(2*time.Hour) { 342 level.Warn(logger).Log("msg", "found that TSDB block time is not 2h. Only 2h block time is recommended.", "block-time", flags.TSDBMinTime) 343 } 344 345 return nil 346} 347 348type promMetadata struct { 349 promURL *url.URL 350 351 mtx sync.Mutex 352 mint int64 353 maxt int64 354 labels labels.Labels 355 356 limitMinTime thanosmodel.TimeOrDurationValue 357 358 client *promclient.Client 359} 360 361func (s *promMetadata) UpdateLabels(ctx context.Context) error { 362 elset, err := s.client.ExternalLabels(ctx, s.promURL) 363 if err != nil { 364 return err 365 } 366 367 s.mtx.Lock() 368 defer s.mtx.Unlock() 369 370 s.labels = elset 371 return nil 372} 373 374func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) { 375 s.mtx.Lock() 376 defer s.mtx.Unlock() 377 378 if mint < s.limitMinTime.PrometheusTimestamp() { 379 mint = s.limitMinTime.PrometheusTimestamp() 380 } 381 382 s.mint = mint 383 s.maxt = maxt 384} 385 386func (s *promMetadata) Labels() labels.Labels { 387 s.mtx.Lock() 388 defer s.mtx.Unlock() 389 390 return s.labels 391} 392 393func (s *promMetadata) Timestamps() (mint int64, maxt int64) { 394 s.mtx.Lock() 395 defer s.mtx.Unlock() 396 397 return s.mint, s.maxt 398} 399 400type sidecarConfig struct { 401 http httpConfig 402 grpc grpcConfig 403 prometheus prometheusConfig 404 connection connConfig 405 tsdb tsdbConfig 406 reloader reloaderConfig 407 reqLogConfig *extflag.PathOrContent 408 objStore extflag.PathOrContent 409 shipper shipperConfig 410 limitMinTime thanosmodel.TimeOrDurationValue 411} 412 413func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { 414 sc.http.registerFlag(cmd) 415 sc.grpc.registerFlag(cmd) 416 sc.prometheus.registerFlag(cmd) 417 sc.connection.registerFlag(cmd) 418 sc.tsdb.registerFlag(cmd) 419 sc.reloader.registerFlag(cmd) 420 sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) 421 sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) 422 sc.shipper.registerFlag(cmd) 423 cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). 424 Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) 425} 426