1// This directory was copied and adapted from https://github.com/grafana/agent/tree/main/pkg/metrics.
2// We cannot vendor the agent in since the agent vendors loki in, which would cause a cyclic dependency.
3// NOTE: many changes have been made to the original code for our use-case.
4package instance
5
6import (
7	"bytes"
8	"context"
9	"errors"
10	"flag"
11	"fmt"
12	"math"
13	"path/filepath"
14	"strings"
15	"sync"
16	"time"
17
18	"github.com/go-kit/kit/log"
19	"github.com/go-kit/kit/log/level"
20	"github.com/oklog/run"
21	"github.com/prometheus/client_golang/prometheus"
22	"github.com/prometheus/prometheus/config"
23	"github.com/prometheus/prometheus/pkg/timestamp"
24	"github.com/prometheus/prometheus/scrape"
25	"github.com/prometheus/prometheus/storage"
26	"github.com/prometheus/prometheus/storage/remote"
27	"gopkg.in/yaml.v2"
28
29	"github.com/grafana/loki/pkg/ruler/storage/util"
30	"github.com/grafana/loki/pkg/ruler/storage/wal"
31	"github.com/grafana/loki/pkg/util/build"
32)
33
34func init() {
35	remote.UserAgent = fmt.Sprintf("LokiRulerWAL/%s", build.Version)
36}
37
38var (
39	remoteWriteMetricName = "queue_highest_sent_timestamp_seconds"
40)
41
42// Default configuration values
43var (
44	DefaultConfig = Config{
45		Dir:                 "wal",
46		TruncateFrequency:   60 * time.Minute,
47		MinAge:              5 * time.Minute,
48		MaxAge:              4 * time.Hour,
49		RemoteFlushDeadline: 1 * time.Minute,
50	}
51)
52
53// Config is a specific agent that runs within the overall Prometheus
54// agent. It has its own set of scrape_configs and remote_write rules.
55type Config struct {
56	Tenant      string
57	Name        string
58	RemoteWrite []*config.RemoteWriteConfig
59
60	Dir string `yaml:"dir"`
61
62	// How frequently the WAL should be truncated.
63	TruncateFrequency time.Duration `yaml:"truncate_frequency,omitempty"`
64
65	// Minimum and maximum time series should exist in the WAL for.
66	MinAge time.Duration `yaml:"min_age,omitempty"`
67	MaxAge time.Duration `yaml:"max_age,omitempty"`
68
69	RemoteFlushDeadline time.Duration `yaml:"remote_flush_deadline,omitempty"`
70}
71
72// UnmarshalYAML implements yaml.Unmarshaler.
73func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
74	*c = DefaultConfig
75
76	type plain Config
77	return unmarshal((*plain)(c))
78}
79
80// MarshalYAML implements yaml.Marshaler.
81func (c Config) MarshalYAML() (interface{}, error) {
82	// We want users to be able to marshal instance.Configs directly without
83	// *needing* to call instance.MarshalConfig, so we call it internally
84	// here and return a map.
85	bb, err := MarshalConfig(&c, false)
86	if err != nil {
87		return nil, err
88	}
89
90	// Use a yaml.MapSlice rather than a map[string]interface{} so
91	// order of keys is retained compared to just calling MarshalConfig.
92	var m yaml.MapSlice
93	if err := yaml.Unmarshal(bb, &m); err != nil {
94		return nil, err
95	}
96	return m, nil
97}
98
99// ApplyDefaults applies default configurations to the configuration to all
100// values that have not been changed to their non-zero value. ApplyDefaults
101// also validates the config.
102//
103// The value for global will saved.
104func (c *Config) ApplyDefaults() error {
105	switch {
106	case c.Name == "":
107		return errors.New("missing instance name")
108	case c.TruncateFrequency <= 0:
109		return errors.New("wal_truncate_frequency must be greater than 0s")
110	case c.RemoteFlushDeadline <= 0:
111		return errors.New("remote_flush_deadline must be greater than 0s")
112	case c.MinAge > c.MaxAge:
113		return errors.New("min_wal_time must be less than max_wal_time")
114	}
115
116	for _, cfg := range c.RemoteWrite {
117		if cfg == nil {
118			return fmt.Errorf("empty or null remote write config section")
119		}
120	}
121	return nil
122}
123
124// Clone makes a deep copy of the config along with global settings.
125func (c *Config) Clone() (Config, error) {
126	bb, err := MarshalConfig(c, false)
127	if err != nil {
128		return Config{}, err
129	}
130	cp, err := UnmarshalConfig(bytes.NewReader(bb))
131	if err != nil {
132		return Config{}, err
133	}
134
135	// Some tests will trip up on this; the marshal/unmarshal cycle might set
136	// an empty slice to nil. Set it back to an empty slice if we detect this
137	// happening.
138	if cp.RemoteWrite == nil && c.RemoteWrite != nil {
139		cp.RemoteWrite = []*config.RemoteWriteConfig{}
140	}
141
142	return *cp, nil
143}
144
145func (c *Config) RegisterFlags(f *flag.FlagSet) {
146	f.StringVar(&c.Dir, "ruler.wal.dir", DefaultConfig.Dir, "Directory to store the WAL and/or recover from WAL.")
147	f.DurationVar(&c.TruncateFrequency, "ruler.wal.truncate-frequency", DefaultConfig.TruncateFrequency, "How often to run the WAL truncation.")
148	f.DurationVar(&c.MinAge, "ruler.wal.min-age", DefaultConfig.MinAge, "Minimum age that samples must exist in the WAL before being truncated.")
149	f.DurationVar(&c.MaxAge, "ruler.wal.max-age", DefaultConfig.MaxAge, "Maximum age that samples must exist in the WAL before being truncated.")
150}
151
152type walStorageFactory func(reg prometheus.Registerer) (walStorage, error)
153
154// Instance is an individual metrics collector and remote_writer.
155type Instance struct {
156	initialized bool
157
158	// All fields in the following block may be accessed and modified by
159	// concurrently running goroutines.
160	//
161	// Note that all Prometheus components listed here may be nil at any
162	// given time; methods reading them should take care to do nil checks.
163	mut         sync.Mutex
164	cfg         Config
165	wal         walStorage
166	remoteStore *remote.Storage
167	storage     storage.Storage
168
169	logger log.Logger
170
171	reg    prometheus.Registerer
172	newWal walStorageFactory
173
174	vc     *MetricValueCollector
175	tenant string
176}
177
178// New creates a new Instance with a directory for storing the WAL. The instance
179// will not start until Run is called on the instance.
180func New(reg prometheus.Registerer, cfg Config, metrics *wal.Metrics, logger log.Logger) (*Instance, error) {
181	logger = log.With(logger, "instance", cfg.Name)
182
183	instWALDir := filepath.Join(cfg.Dir, cfg.Tenant)
184
185	newWal := func(reg prometheus.Registerer) (walStorage, error) {
186		return wal.NewStorage(logger, metrics, reg, instWALDir)
187	}
188
189	return newInstance(cfg, reg, logger, newWal, cfg.Tenant)
190}
191
192func newInstance(cfg Config, reg prometheus.Registerer, logger log.Logger, newWal walStorageFactory, tenant string) (*Instance, error) {
193	vc := NewMetricValueCollector(prometheus.DefaultGatherer, remoteWriteMetricName)
194
195	i := &Instance{
196		cfg:    cfg,
197		logger: logger,
198		vc:     vc,
199
200		reg:    reg,
201		newWal: newWal,
202
203		tenant: tenant,
204	}
205
206	return i, nil
207}
208
209func (i *Instance) Storage() storage.Storage {
210	return i.storage
211}
212
213// Run starts the instance, initializing Prometheus components, and will
214// continue to run until an error happens during execution or the provided
215// context is cancelled.
216//
217// Run may be re-called after exiting, as components will be reinitialized each
218// time Run is called.
219func (i *Instance) Run(ctx context.Context) error {
220	// i.cfg may change at any point in the middle of this method but not in a way
221	// that affects any of the code below; rather than grabbing a mutex every time
222	// we want to read the config, we'll simplify the access and just grab a copy
223	// now.
224	i.mut.Lock()
225	cfg := i.cfg
226	i.mut.Unlock()
227
228	level.Debug(i.logger).Log("msg", "initializing instance", "name", cfg.Name)
229
230	// trackingReg wraps the register for the instance to make sure that if Run
231	// exits, any metrics Prometheus registers are removed and can be
232	// re-registered if Run is called again.
233	trackingReg := util.WrapWithUnregisterer(i.reg)
234	defer trackingReg.UnregisterAll()
235
236	if err := i.initialize(ctx, trackingReg, &cfg); err != nil {
237		level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err)
238		return fmt.Errorf("failed to initialize instance: %w", err)
239	}
240
241	// The actors defined here are defined in the order we want them to shut down.
242	// Primarily, we want to ensure that the following shutdown order is
243	// maintained:
244	//		1. The scrape manager stops
245	//    2. WAL storage is closed
246	//    3. Remote write storage is closed
247	// This is done to allow the instance to write stale markers for all active
248	// series.
249	rg := runGroupWithContext(ctx)
250
251	{
252		// Truncation loop
253		ctx, contextCancel := context.WithCancel(context.Background())
254		defer contextCancel()
255		rg.Add(
256			func() error {
257				i.truncateLoop(ctx, i.wal, &cfg)
258				level.Info(i.logger).Log("msg", "truncation loop stopped")
259				return nil
260			},
261			func(err error) {
262				level.Info(i.logger).Log("msg", "stopping truncation loop...")
263				contextCancel()
264			},
265		)
266	}
267
268	level.Debug(i.logger).Log("msg", "running instance", "name", cfg.Name)
269	err := rg.Run()
270	if err != nil {
271		level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err)
272	}
273	return err
274}
275
276type noopScrapeManager struct{}
277
278func (n noopScrapeManager) Get() (*scrape.Manager, error) {
279	return nil, nil
280}
281
282// initialize sets up the various Prometheus components with their initial
283// settings. initialize will be called each time the Instance is run. Prometheus
284// components cannot be reused after they are stopped so we need to recreate them
285// each run.
286func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg *Config) error {
287	// explicitly set this in case this function is called multiple times
288	i.initialized = false
289
290	i.mut.Lock()
291	defer i.mut.Unlock()
292
293	var err error
294
295	i.wal, err = i.newWal(reg)
296	if err != nil {
297		return fmt.Errorf("error creating WAL: %w", err)
298	}
299
300	// Setup the remote storage
301	remoteLogger := log.With(i.logger, "component", "remote")
302	i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, noopScrapeManager{})
303	err = i.remoteStore.ApplyConfig(&config.Config{
304		RemoteWriteConfigs: cfg.RemoteWrite,
305	})
306	if err != nil {
307		return fmt.Errorf("failed applying config to remote storage: %w", err)
308	}
309
310	i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore)
311	i.initialized = true
312
313	return nil
314}
315
316// Update accepts a new Config for the Instance and will dynamically update any
317// running Prometheus components with the new values from Config. Update will
318// return an ErrInvalidUpdate if the Update could not be applied.
319func (i *Instance) Update(c Config) (err error) {
320	i.mut.Lock()
321	defer i.mut.Unlock()
322
323	// It's only (currently) valid to update scrape_configs and remote_write, so
324	// if any other field has changed here, return the error.
325	switch {
326	// This first case should never happen in practice but it's included here for
327	// completions sake.
328	case i.cfg.Name != c.Name:
329		err = errImmutableField{Field: "name"}
330	case i.cfg.TruncateFrequency != c.TruncateFrequency:
331		err = errImmutableField{Field: "wal_truncate_frequency"}
332	case i.cfg.RemoteFlushDeadline != c.RemoteFlushDeadline:
333		err = errImmutableField{Field: "remote_flush_deadline"}
334	}
335	if err != nil {
336		return ErrInvalidUpdate{Inner: err}
337	}
338
339	// Check to see if the components exist yet.
340	if i.remoteStore == nil {
341		return ErrInvalidUpdate{
342			Inner: fmt.Errorf("cannot dynamically update because instance is not running"),
343		}
344	}
345
346	// NOTE(rfratto): Prometheus applies configs in a specific order to ensure
347	// flow from service discovery down to the WAL continues working properly.
348	//
349	// Keep the following order below:
350	//
351	// 1. Local config
352	// 2. Remote Store
353	// 3. Scrape Manager
354	// 4. Discovery Manager
355
356	originalConfig := i.cfg
357	defer func() {
358		if err != nil {
359			i.cfg = originalConfig
360		}
361	}()
362	i.cfg = c
363
364	err = i.remoteStore.ApplyConfig(&config.Config{
365		RemoteWriteConfigs: c.RemoteWrite,
366	})
367	if err != nil {
368		return fmt.Errorf("error applying new remote_write configs: %w", err)
369	}
370
371	return nil
372}
373
374// Ready indicates if the instance is ready for processing.
375func (i *Instance) Ready() bool {
376	return i.initialized
377}
378
379// StorageDirectory returns the directory where this Instance is writing series
380// and samples to for the WAL.
381func (i *Instance) StorageDirectory() string {
382	return i.wal.Directory()
383}
384
385// Appender returns a storage.Appender from the instance's WAL
386func (i *Instance) Appender(ctx context.Context) storage.Appender {
387	return i.wal.Appender(ctx)
388}
389
390// Stop stops the WAL
391func (i *Instance) Stop() error {
392	level.Info(i.logger).Log("msg", "stopping WAL instance", "user", i.Tenant())
393
394	// close WAL first to prevent any further appends
395	if err := i.wal.Close(); err != nil {
396		level.Error(i.logger).Log("msg", "error stopping WAL instance", "user", i.Tenant(), "err", err)
397		return err
398	}
399
400	if err := i.remoteStore.Close(); err != nil {
401		level.Error(i.logger).Log("msg", "error stopping remote storage instance", "user", i.Tenant(), "err", err)
402		return err
403	}
404
405	return nil
406}
407
408// Tenant returns the tenant name of the instance
409func (i *Instance) Tenant() string {
410	return i.tenant
411}
412
413func (i *Instance) truncateLoop(ctx context.Context, wal walStorage, cfg *Config) {
414	// Track the last timestamp we truncated for to prevent segments from getting
415	// deleted until at least some new data has been sent.
416	var lastTs int64 = math.MinInt64
417
418	for {
419		select {
420		case <-ctx.Done():
421			return
422		case <-time.After(cfg.TruncateFrequency):
423			// The timestamp ts is used to determine which series are not receiving
424			// samples and may be deleted from the WAL. Their most recent append
425			// timestamp is compared to ts, and if that timestamp is older then ts,
426			// they are considered inactive and may be deleted.
427			//
428			// Subtracting a duration from ts will delay when it will be considered
429			// inactive and scheduled for deletion.
430			ts := i.getRemoteWriteTimestamp() - i.cfg.MinAge.Milliseconds()
431			if ts < 0 {
432				ts = 0
433			}
434
435			// Network issues can prevent the result of getRemoteWriteTimestamp from
436			// changing. We don't want data in the WAL to grow forever, so we set a cap
437			// on the maximum age data can be. If our ts is older than this cutoff point,
438			// we'll shift it forward to start deleting very stale data.
439			if maxTS := timestamp.FromTime(time.Now().Add(-i.cfg.MaxAge)); ts < maxTS {
440				ts = maxTS
441			}
442
443			if ts == lastTs {
444				level.Debug(i.logger).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts)
445				continue
446			}
447			lastTs = ts
448
449			level.Debug(i.logger).Log("msg", "truncating the WAL", "ts", ts)
450			err := wal.Truncate(ts)
451			if err != nil {
452				// The only issue here is larger disk usage and a greater replay time,
453				// so we'll only log this as a warning.
454				level.Warn(i.logger).Log("msg", "could not truncate WAL", "err", err)
455			}
456		}
457	}
458}
459
460// getRemoteWriteTimestamp looks up the last successful remote write timestamp.
461// This is passed to wal.Storage for its truncation. If no remote write sections
462// are configured, getRemoteWriteTimestamp returns the current time.
463func (i *Instance) getRemoteWriteTimestamp() int64 {
464	i.mut.Lock()
465	defer i.mut.Unlock()
466
467	if len(i.cfg.RemoteWrite) == 0 {
468		return timestamp.FromTime(time.Now())
469	}
470
471	lbls := make([]string, len(i.cfg.RemoteWrite))
472	for idx := 0; idx < len(lbls); idx++ {
473		lbls[idx] = i.cfg.RemoteWrite[idx].Name
474	}
475
476	vals, err := i.vc.GetValues("remote_name", lbls...)
477	if err != nil {
478		level.Error(i.logger).Log("msg", "could not get remote write timestamps", "err", err)
479		return 0
480	}
481	if len(vals) == 0 {
482		return 0
483	}
484
485	// We use the lowest value from the metric since we don't want to delete any
486	// segments from the WAL until they've been written by all of the remote_write
487	// configurations.
488	ts := int64(math.MaxInt64)
489	for _, val := range vals {
490		ival := int64(val)
491		if ival < ts {
492			ts = ival
493		}
494	}
495
496	// Convert to the millisecond precision which is used by the WAL
497	return ts * 1000
498}
499
500// walStorage is an interface satisfied by wal.Storage, and created for testing.
501type walStorage interface {
502	// walStorage implements Queryable/ChunkQueryable for compatibility, but is unused.
503	storage.Queryable
504	storage.ChunkQueryable
505
506	Directory() string
507
508	StartTime() (int64, error)
509	WriteStalenessMarkers(remoteTsFunc func() int64) error
510	Appender(context.Context) storage.Appender
511	Truncate(mint int64) error
512
513	Close() error
514}
515
516// MetricValueCollector wraps around a Gatherer and provides utilities for
517// pulling metric values from a given metric name and label matchers.
518//
519// This is used by the agent instances to find the most recent timestamp
520// successfully remote_written to for purposes of safely truncating the WAL.
521//
522// MetricValueCollector is only intended for use with Gauges and Counters.
523type MetricValueCollector struct {
524	g     prometheus.Gatherer
525	match string
526}
527
528// NewMetricValueCollector creates a new MetricValueCollector.
529func NewMetricValueCollector(g prometheus.Gatherer, match string) *MetricValueCollector {
530	return &MetricValueCollector{
531		g:     g,
532		match: match,
533	}
534}
535
536// GetValues looks through all the tracked metrics and returns all values
537// for metrics that match some key value pair.
538func (vc *MetricValueCollector) GetValues(label string, labelValues ...string) ([]float64, error) {
539	vals := []float64{}
540
541	families, err := vc.g.Gather()
542	if err != nil {
543		return nil, err
544	}
545
546	for _, family := range families {
547		if !strings.Contains(family.GetName(), vc.match) {
548			continue
549		}
550
551		for _, m := range family.GetMetric() {
552			matches := false
553			for _, l := range m.GetLabel() {
554				if l.GetName() != label {
555					continue
556				}
557
558				v := l.GetValue()
559				for _, match := range labelValues {
560					if match == v {
561						matches = true
562						break
563					}
564				}
565				break
566			}
567			if !matches {
568				continue
569			}
570
571			var value float64
572			if m.Gauge != nil {
573				value = m.Gauge.GetValue()
574			} else if m.Counter != nil {
575				value = m.Counter.GetValue()
576			} else if m.Untyped != nil {
577				value = m.Untyped.GetValue()
578			} else {
579				return nil, errors.New("tracking unexpected metric type")
580			}
581
582			vals = append(vals, value)
583		}
584	}
585
586	return vals, nil
587}
588
589type runGroupContext struct {
590	cancel context.CancelFunc
591
592	g *run.Group
593}
594
595// runGroupWithContext creates a new run.Group that will be stopped if the
596// context gets canceled in addition to the normal behavior of stopping
597// when any of the actors stop.
598func runGroupWithContext(ctx context.Context) *runGroupContext {
599	ctx, cancel := context.WithCancel(ctx)
600
601	var g run.Group
602	g.Add(func() error {
603		<-ctx.Done()
604		return nil
605	}, func(_ error) {
606		cancel()
607	})
608
609	return &runGroupContext{cancel: cancel, g: &g}
610}
611
612func (rg *runGroupContext) Add(execute func() error, interrupt func(error)) {
613	rg.g.Add(execute, interrupt)
614}
615
616func (rg *runGroupContext) Run() error   { return rg.g.Run() }
617func (rg *runGroupContext) Stop(_ error) { rg.cancel() }
618