1// This directory was copied and adapted from https://github.com/grafana/agent/tree/main/pkg/metrics. 2// We cannot vendor the agent in since the agent vendors loki in, which would cause a cyclic dependency. 3// NOTE: many changes have been made to the original code for our use-case. 4package instance 5 6import ( 7 "bytes" 8 "context" 9 "errors" 10 "flag" 11 "fmt" 12 "math" 13 "path/filepath" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/go-kit/kit/log" 19 "github.com/go-kit/kit/log/level" 20 "github.com/oklog/run" 21 "github.com/prometheus/client_golang/prometheus" 22 "github.com/prometheus/prometheus/config" 23 "github.com/prometheus/prometheus/pkg/timestamp" 24 "github.com/prometheus/prometheus/scrape" 25 "github.com/prometheus/prometheus/storage" 26 "github.com/prometheus/prometheus/storage/remote" 27 "gopkg.in/yaml.v2" 28 29 "github.com/grafana/loki/pkg/ruler/storage/util" 30 "github.com/grafana/loki/pkg/ruler/storage/wal" 31 "github.com/grafana/loki/pkg/util/build" 32) 33 34func init() { 35 remote.UserAgent = fmt.Sprintf("LokiRulerWAL/%s", build.Version) 36} 37 38var ( 39 remoteWriteMetricName = "queue_highest_sent_timestamp_seconds" 40) 41 42// Default configuration values 43var ( 44 DefaultConfig = Config{ 45 Dir: "wal", 46 TruncateFrequency: 60 * time.Minute, 47 MinAge: 5 * time.Minute, 48 MaxAge: 4 * time.Hour, 49 RemoteFlushDeadline: 1 * time.Minute, 50 } 51) 52 53// Config is a specific agent that runs within the overall Prometheus 54// agent. It has its own set of scrape_configs and remote_write rules. 55type Config struct { 56 Tenant string 57 Name string 58 RemoteWrite []*config.RemoteWriteConfig 59 60 Dir string `yaml:"dir"` 61 62 // How frequently the WAL should be truncated. 63 TruncateFrequency time.Duration `yaml:"truncate_frequency,omitempty"` 64 65 // Minimum and maximum time series should exist in the WAL for. 66 MinAge time.Duration `yaml:"min_age,omitempty"` 67 MaxAge time.Duration `yaml:"max_age,omitempty"` 68 69 RemoteFlushDeadline time.Duration `yaml:"remote_flush_deadline,omitempty"` 70} 71 72// UnmarshalYAML implements yaml.Unmarshaler. 73func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { 74 *c = DefaultConfig 75 76 type plain Config 77 return unmarshal((*plain)(c)) 78} 79 80// MarshalYAML implements yaml.Marshaler. 81func (c Config) MarshalYAML() (interface{}, error) { 82 // We want users to be able to marshal instance.Configs directly without 83 // *needing* to call instance.MarshalConfig, so we call it internally 84 // here and return a map. 85 bb, err := MarshalConfig(&c, false) 86 if err != nil { 87 return nil, err 88 } 89 90 // Use a yaml.MapSlice rather than a map[string]interface{} so 91 // order of keys is retained compared to just calling MarshalConfig. 92 var m yaml.MapSlice 93 if err := yaml.Unmarshal(bb, &m); err != nil { 94 return nil, err 95 } 96 return m, nil 97} 98 99// ApplyDefaults applies default configurations to the configuration to all 100// values that have not been changed to their non-zero value. ApplyDefaults 101// also validates the config. 102// 103// The value for global will saved. 104func (c *Config) ApplyDefaults() error { 105 switch { 106 case c.Name == "": 107 return errors.New("missing instance name") 108 case c.TruncateFrequency <= 0: 109 return errors.New("wal_truncate_frequency must be greater than 0s") 110 case c.RemoteFlushDeadline <= 0: 111 return errors.New("remote_flush_deadline must be greater than 0s") 112 case c.MinAge > c.MaxAge: 113 return errors.New("min_wal_time must be less than max_wal_time") 114 } 115 116 for _, cfg := range c.RemoteWrite { 117 if cfg == nil { 118 return fmt.Errorf("empty or null remote write config section") 119 } 120 } 121 return nil 122} 123 124// Clone makes a deep copy of the config along with global settings. 125func (c *Config) Clone() (Config, error) { 126 bb, err := MarshalConfig(c, false) 127 if err != nil { 128 return Config{}, err 129 } 130 cp, err := UnmarshalConfig(bytes.NewReader(bb)) 131 if err != nil { 132 return Config{}, err 133 } 134 135 // Some tests will trip up on this; the marshal/unmarshal cycle might set 136 // an empty slice to nil. Set it back to an empty slice if we detect this 137 // happening. 138 if cp.RemoteWrite == nil && c.RemoteWrite != nil { 139 cp.RemoteWrite = []*config.RemoteWriteConfig{} 140 } 141 142 return *cp, nil 143} 144 145func (c *Config) RegisterFlags(f *flag.FlagSet) { 146 f.StringVar(&c.Dir, "ruler.wal.dir", DefaultConfig.Dir, "Directory to store the WAL and/or recover from WAL.") 147 f.DurationVar(&c.TruncateFrequency, "ruler.wal.truncate-frequency", DefaultConfig.TruncateFrequency, "How often to run the WAL truncation.") 148 f.DurationVar(&c.MinAge, "ruler.wal.min-age", DefaultConfig.MinAge, "Minimum age that samples must exist in the WAL before being truncated.") 149 f.DurationVar(&c.MaxAge, "ruler.wal.max-age", DefaultConfig.MaxAge, "Maximum age that samples must exist in the WAL before being truncated.") 150} 151 152type walStorageFactory func(reg prometheus.Registerer) (walStorage, error) 153 154// Instance is an individual metrics collector and remote_writer. 155type Instance struct { 156 initialized bool 157 158 // All fields in the following block may be accessed and modified by 159 // concurrently running goroutines. 160 // 161 // Note that all Prometheus components listed here may be nil at any 162 // given time; methods reading them should take care to do nil checks. 163 mut sync.Mutex 164 cfg Config 165 wal walStorage 166 remoteStore *remote.Storage 167 storage storage.Storage 168 169 logger log.Logger 170 171 reg prometheus.Registerer 172 newWal walStorageFactory 173 174 vc *MetricValueCollector 175 tenant string 176} 177 178// New creates a new Instance with a directory for storing the WAL. The instance 179// will not start until Run is called on the instance. 180func New(reg prometheus.Registerer, cfg Config, metrics *wal.Metrics, logger log.Logger) (*Instance, error) { 181 logger = log.With(logger, "instance", cfg.Name) 182 183 instWALDir := filepath.Join(cfg.Dir, cfg.Tenant) 184 185 newWal := func(reg prometheus.Registerer) (walStorage, error) { 186 return wal.NewStorage(logger, metrics, reg, instWALDir) 187 } 188 189 return newInstance(cfg, reg, logger, newWal, cfg.Tenant) 190} 191 192func newInstance(cfg Config, reg prometheus.Registerer, logger log.Logger, newWal walStorageFactory, tenant string) (*Instance, error) { 193 vc := NewMetricValueCollector(prometheus.DefaultGatherer, remoteWriteMetricName) 194 195 i := &Instance{ 196 cfg: cfg, 197 logger: logger, 198 vc: vc, 199 200 reg: reg, 201 newWal: newWal, 202 203 tenant: tenant, 204 } 205 206 return i, nil 207} 208 209func (i *Instance) Storage() storage.Storage { 210 return i.storage 211} 212 213// Run starts the instance, initializing Prometheus components, and will 214// continue to run until an error happens during execution or the provided 215// context is cancelled. 216// 217// Run may be re-called after exiting, as components will be reinitialized each 218// time Run is called. 219func (i *Instance) Run(ctx context.Context) error { 220 // i.cfg may change at any point in the middle of this method but not in a way 221 // that affects any of the code below; rather than grabbing a mutex every time 222 // we want to read the config, we'll simplify the access and just grab a copy 223 // now. 224 i.mut.Lock() 225 cfg := i.cfg 226 i.mut.Unlock() 227 228 level.Debug(i.logger).Log("msg", "initializing instance", "name", cfg.Name) 229 230 // trackingReg wraps the register for the instance to make sure that if Run 231 // exits, any metrics Prometheus registers are removed and can be 232 // re-registered if Run is called again. 233 trackingReg := util.WrapWithUnregisterer(i.reg) 234 defer trackingReg.UnregisterAll() 235 236 if err := i.initialize(ctx, trackingReg, &cfg); err != nil { 237 level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err) 238 return fmt.Errorf("failed to initialize instance: %w", err) 239 } 240 241 // The actors defined here are defined in the order we want them to shut down. 242 // Primarily, we want to ensure that the following shutdown order is 243 // maintained: 244 // 1. The scrape manager stops 245 // 2. WAL storage is closed 246 // 3. Remote write storage is closed 247 // This is done to allow the instance to write stale markers for all active 248 // series. 249 rg := runGroupWithContext(ctx) 250 251 { 252 // Truncation loop 253 ctx, contextCancel := context.WithCancel(context.Background()) 254 defer contextCancel() 255 rg.Add( 256 func() error { 257 i.truncateLoop(ctx, i.wal, &cfg) 258 level.Info(i.logger).Log("msg", "truncation loop stopped") 259 return nil 260 }, 261 func(err error) { 262 level.Info(i.logger).Log("msg", "stopping truncation loop...") 263 contextCancel() 264 }, 265 ) 266 } 267 268 level.Debug(i.logger).Log("msg", "running instance", "name", cfg.Name) 269 err := rg.Run() 270 if err != nil { 271 level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err) 272 } 273 return err 274} 275 276type noopScrapeManager struct{} 277 278func (n noopScrapeManager) Get() (*scrape.Manager, error) { 279 return nil, nil 280} 281 282// initialize sets up the various Prometheus components with their initial 283// settings. initialize will be called each time the Instance is run. Prometheus 284// components cannot be reused after they are stopped so we need to recreate them 285// each run. 286func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg *Config) error { 287 // explicitly set this in case this function is called multiple times 288 i.initialized = false 289 290 i.mut.Lock() 291 defer i.mut.Unlock() 292 293 var err error 294 295 i.wal, err = i.newWal(reg) 296 if err != nil { 297 return fmt.Errorf("error creating WAL: %w", err) 298 } 299 300 // Setup the remote storage 301 remoteLogger := log.With(i.logger, "component", "remote") 302 i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, noopScrapeManager{}) 303 err = i.remoteStore.ApplyConfig(&config.Config{ 304 RemoteWriteConfigs: cfg.RemoteWrite, 305 }) 306 if err != nil { 307 return fmt.Errorf("failed applying config to remote storage: %w", err) 308 } 309 310 i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore) 311 i.initialized = true 312 313 return nil 314} 315 316// Update accepts a new Config for the Instance and will dynamically update any 317// running Prometheus components with the new values from Config. Update will 318// return an ErrInvalidUpdate if the Update could not be applied. 319func (i *Instance) Update(c Config) (err error) { 320 i.mut.Lock() 321 defer i.mut.Unlock() 322 323 // It's only (currently) valid to update scrape_configs and remote_write, so 324 // if any other field has changed here, return the error. 325 switch { 326 // This first case should never happen in practice but it's included here for 327 // completions sake. 328 case i.cfg.Name != c.Name: 329 err = errImmutableField{Field: "name"} 330 case i.cfg.TruncateFrequency != c.TruncateFrequency: 331 err = errImmutableField{Field: "wal_truncate_frequency"} 332 case i.cfg.RemoteFlushDeadline != c.RemoteFlushDeadline: 333 err = errImmutableField{Field: "remote_flush_deadline"} 334 } 335 if err != nil { 336 return ErrInvalidUpdate{Inner: err} 337 } 338 339 // Check to see if the components exist yet. 340 if i.remoteStore == nil { 341 return ErrInvalidUpdate{ 342 Inner: fmt.Errorf("cannot dynamically update because instance is not running"), 343 } 344 } 345 346 // NOTE(rfratto): Prometheus applies configs in a specific order to ensure 347 // flow from service discovery down to the WAL continues working properly. 348 // 349 // Keep the following order below: 350 // 351 // 1. Local config 352 // 2. Remote Store 353 // 3. Scrape Manager 354 // 4. Discovery Manager 355 356 originalConfig := i.cfg 357 defer func() { 358 if err != nil { 359 i.cfg = originalConfig 360 } 361 }() 362 i.cfg = c 363 364 err = i.remoteStore.ApplyConfig(&config.Config{ 365 RemoteWriteConfigs: c.RemoteWrite, 366 }) 367 if err != nil { 368 return fmt.Errorf("error applying new remote_write configs: %w", err) 369 } 370 371 return nil 372} 373 374// Ready indicates if the instance is ready for processing. 375func (i *Instance) Ready() bool { 376 return i.initialized 377} 378 379// StorageDirectory returns the directory where this Instance is writing series 380// and samples to for the WAL. 381func (i *Instance) StorageDirectory() string { 382 return i.wal.Directory() 383} 384 385// Appender returns a storage.Appender from the instance's WAL 386func (i *Instance) Appender(ctx context.Context) storage.Appender { 387 return i.wal.Appender(ctx) 388} 389 390// Stop stops the WAL 391func (i *Instance) Stop() error { 392 level.Info(i.logger).Log("msg", "stopping WAL instance", "user", i.Tenant()) 393 394 // close WAL first to prevent any further appends 395 if err := i.wal.Close(); err != nil { 396 level.Error(i.logger).Log("msg", "error stopping WAL instance", "user", i.Tenant(), "err", err) 397 return err 398 } 399 400 if err := i.remoteStore.Close(); err != nil { 401 level.Error(i.logger).Log("msg", "error stopping remote storage instance", "user", i.Tenant(), "err", err) 402 return err 403 } 404 405 return nil 406} 407 408// Tenant returns the tenant name of the instance 409func (i *Instance) Tenant() string { 410 return i.tenant 411} 412 413func (i *Instance) truncateLoop(ctx context.Context, wal walStorage, cfg *Config) { 414 // Track the last timestamp we truncated for to prevent segments from getting 415 // deleted until at least some new data has been sent. 416 var lastTs int64 = math.MinInt64 417 418 for { 419 select { 420 case <-ctx.Done(): 421 return 422 case <-time.After(cfg.TruncateFrequency): 423 // The timestamp ts is used to determine which series are not receiving 424 // samples and may be deleted from the WAL. Their most recent append 425 // timestamp is compared to ts, and if that timestamp is older then ts, 426 // they are considered inactive and may be deleted. 427 // 428 // Subtracting a duration from ts will delay when it will be considered 429 // inactive and scheduled for deletion. 430 ts := i.getRemoteWriteTimestamp() - i.cfg.MinAge.Milliseconds() 431 if ts < 0 { 432 ts = 0 433 } 434 435 // Network issues can prevent the result of getRemoteWriteTimestamp from 436 // changing. We don't want data in the WAL to grow forever, so we set a cap 437 // on the maximum age data can be. If our ts is older than this cutoff point, 438 // we'll shift it forward to start deleting very stale data. 439 if maxTS := timestamp.FromTime(time.Now().Add(-i.cfg.MaxAge)); ts < maxTS { 440 ts = maxTS 441 } 442 443 if ts == lastTs { 444 level.Debug(i.logger).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts) 445 continue 446 } 447 lastTs = ts 448 449 level.Debug(i.logger).Log("msg", "truncating the WAL", "ts", ts) 450 err := wal.Truncate(ts) 451 if err != nil { 452 // The only issue here is larger disk usage and a greater replay time, 453 // so we'll only log this as a warning. 454 level.Warn(i.logger).Log("msg", "could not truncate WAL", "err", err) 455 } 456 } 457 } 458} 459 460// getRemoteWriteTimestamp looks up the last successful remote write timestamp. 461// This is passed to wal.Storage for its truncation. If no remote write sections 462// are configured, getRemoteWriteTimestamp returns the current time. 463func (i *Instance) getRemoteWriteTimestamp() int64 { 464 i.mut.Lock() 465 defer i.mut.Unlock() 466 467 if len(i.cfg.RemoteWrite) == 0 { 468 return timestamp.FromTime(time.Now()) 469 } 470 471 lbls := make([]string, len(i.cfg.RemoteWrite)) 472 for idx := 0; idx < len(lbls); idx++ { 473 lbls[idx] = i.cfg.RemoteWrite[idx].Name 474 } 475 476 vals, err := i.vc.GetValues("remote_name", lbls...) 477 if err != nil { 478 level.Error(i.logger).Log("msg", "could not get remote write timestamps", "err", err) 479 return 0 480 } 481 if len(vals) == 0 { 482 return 0 483 } 484 485 // We use the lowest value from the metric since we don't want to delete any 486 // segments from the WAL until they've been written by all of the remote_write 487 // configurations. 488 ts := int64(math.MaxInt64) 489 for _, val := range vals { 490 ival := int64(val) 491 if ival < ts { 492 ts = ival 493 } 494 } 495 496 // Convert to the millisecond precision which is used by the WAL 497 return ts * 1000 498} 499 500// walStorage is an interface satisfied by wal.Storage, and created for testing. 501type walStorage interface { 502 // walStorage implements Queryable/ChunkQueryable for compatibility, but is unused. 503 storage.Queryable 504 storage.ChunkQueryable 505 506 Directory() string 507 508 StartTime() (int64, error) 509 WriteStalenessMarkers(remoteTsFunc func() int64) error 510 Appender(context.Context) storage.Appender 511 Truncate(mint int64) error 512 513 Close() error 514} 515 516// MetricValueCollector wraps around a Gatherer and provides utilities for 517// pulling metric values from a given metric name and label matchers. 518// 519// This is used by the agent instances to find the most recent timestamp 520// successfully remote_written to for purposes of safely truncating the WAL. 521// 522// MetricValueCollector is only intended for use with Gauges and Counters. 523type MetricValueCollector struct { 524 g prometheus.Gatherer 525 match string 526} 527 528// NewMetricValueCollector creates a new MetricValueCollector. 529func NewMetricValueCollector(g prometheus.Gatherer, match string) *MetricValueCollector { 530 return &MetricValueCollector{ 531 g: g, 532 match: match, 533 } 534} 535 536// GetValues looks through all the tracked metrics and returns all values 537// for metrics that match some key value pair. 538func (vc *MetricValueCollector) GetValues(label string, labelValues ...string) ([]float64, error) { 539 vals := []float64{} 540 541 families, err := vc.g.Gather() 542 if err != nil { 543 return nil, err 544 } 545 546 for _, family := range families { 547 if !strings.Contains(family.GetName(), vc.match) { 548 continue 549 } 550 551 for _, m := range family.GetMetric() { 552 matches := false 553 for _, l := range m.GetLabel() { 554 if l.GetName() != label { 555 continue 556 } 557 558 v := l.GetValue() 559 for _, match := range labelValues { 560 if match == v { 561 matches = true 562 break 563 } 564 } 565 break 566 } 567 if !matches { 568 continue 569 } 570 571 var value float64 572 if m.Gauge != nil { 573 value = m.Gauge.GetValue() 574 } else if m.Counter != nil { 575 value = m.Counter.GetValue() 576 } else if m.Untyped != nil { 577 value = m.Untyped.GetValue() 578 } else { 579 return nil, errors.New("tracking unexpected metric type") 580 } 581 582 vals = append(vals, value) 583 } 584 } 585 586 return vals, nil 587} 588 589type runGroupContext struct { 590 cancel context.CancelFunc 591 592 g *run.Group 593} 594 595// runGroupWithContext creates a new run.Group that will be stopped if the 596// context gets canceled in addition to the normal behavior of stopping 597// when any of the actors stop. 598func runGroupWithContext(ctx context.Context) *runGroupContext { 599 ctx, cancel := context.WithCancel(ctx) 600 601 var g run.Group 602 g.Add(func() error { 603 <-ctx.Done() 604 return nil 605 }, func(_ error) { 606 cancel() 607 }) 608 609 return &runGroupContext{cancel: cancel, g: &g} 610} 611 612func (rg *runGroupContext) Add(execute func() error, interrupt func(error)) { 613 rg.g.Add(execute, interrupt) 614} 615 616func (rg *runGroupContext) Run() error { return rg.g.Run() } 617func (rg *runGroupContext) Stop(_ error) { rg.cancel() } 618