1package ingester
2
3import (
4	"flag"
5	"sync"
6	"time"
7
8	util_log "github.com/cortexproject/cortex/pkg/util/log"
9	"github.com/go-kit/kit/log/level"
10	"github.com/pkg/errors"
11	"github.com/prometheus/client_golang/prometheus"
12	"github.com/prometheus/prometheus/tsdb/wal"
13
14	"github.com/grafana/loki/pkg/logproto"
15	"github.com/grafana/loki/pkg/util/flagext"
16)
17
18var (
19	// shared pool for WALRecords and []logproto.Entries
20	recordPool = newRecordPool()
21)
22
23const walSegmentSize = wal.DefaultSegmentSize * 4
24const defaultCeiling = 4 << 30 // 4GB
25
26type WALConfig struct {
27	Enabled             bool             `yaml:"enabled"`
28	Dir                 string           `yaml:"dir"`
29	CheckpointDuration  time.Duration    `yaml:"checkpoint_duration"`
30	FlushOnShutdown     bool             `yaml:"flush_on_shutdown"`
31	ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"`
32}
33
34func (cfg *WALConfig) Validate() error {
35	if cfg.Enabled && cfg.CheckpointDuration < 1 {
36		return errors.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration)
37	}
38	return nil
39}
40
41// RegisterFlags adds the flags required to config this to the given FlagSet
42func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
43	f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
44	f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
45	f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.")
46	f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.")
47
48	// Need to set default here
49	cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling)
50	f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", "How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. We suggest setting this to a high percentage (~75%) of available memory.")
51}
52
53// WAL interface allows us to have a no-op WAL when the WAL is disabled.
54type WAL interface {
55	Start()
56	// Log marshalls the records and writes it into the WAL.
57	Log(*WALRecord) error
58	// Stop stops all the WAL operations.
59	Stop() error
60}
61
62type noopWAL struct{}
63
64func (noopWAL) Start()               {}
65func (noopWAL) Log(*WALRecord) error { return nil }
66func (noopWAL) Stop() error          { return nil }
67
68type walWrapper struct {
69	cfg        WALConfig
70	wal        *wal.WAL
71	metrics    *ingesterMetrics
72	seriesIter SeriesIter
73
74	wait sync.WaitGroup
75	quit chan struct{}
76}
77
78// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
79func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMetrics, seriesIter SeriesIter) (WAL, error) {
80	if !cfg.Enabled {
81		return noopWAL{}, nil
82	}
83
84	tsdbWAL, err := wal.NewSize(util_log.Logger, registerer, cfg.Dir, walSegmentSize, false)
85	if err != nil {
86		return nil, err
87	}
88
89	w := &walWrapper{
90		cfg:        cfg,
91		quit:       make(chan struct{}),
92		wal:        tsdbWAL,
93		metrics:    metrics,
94		seriesIter: seriesIter,
95	}
96
97	return w, nil
98}
99
100func (w *walWrapper) Start() {
101	w.wait.Add(1)
102	go w.run()
103}
104
105func (w *walWrapper) Log(record *WALRecord) error {
106	if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) {
107		return nil
108	}
109	select {
110	case <-w.quit:
111		return nil
112	default:
113		buf := recordPool.GetBytes()[:0]
114		defer func() {
115			recordPool.PutBytes(buf)
116		}()
117
118		// Always write series then entries.
119		if len(record.Series) > 0 {
120			buf = record.encodeSeries(buf)
121			if err := w.wal.Log(buf); err != nil {
122				return err
123			}
124			w.metrics.walRecordsLogged.Inc()
125			w.metrics.walLoggedBytesTotal.Add(float64(len(buf)))
126			buf = buf[:0]
127		}
128		if len(record.RefEntries) > 0 {
129			buf = record.encodeEntries(CurrentEntriesRec, buf)
130			if err := w.wal.Log(buf); err != nil {
131				return err
132			}
133			w.metrics.walRecordsLogged.Inc()
134			w.metrics.walLoggedBytesTotal.Add(float64(len(buf)))
135		}
136		return nil
137	}
138}
139
140func (w *walWrapper) Stop() error {
141	close(w.quit)
142	w.wait.Wait()
143	err := w.wal.Close()
144	level.Info(util_log.Logger).Log("msg", "stopped", "component", "wal")
145	return err
146}
147
148func (w *walWrapper) checkpointWriter() *WALCheckpointWriter {
149	return &WALCheckpointWriter{
150		metrics:    w.metrics,
151		segmentWAL: w.wal,
152	}
153}
154
155func (w *walWrapper) run() {
156	level.Info(util_log.Logger).Log("msg", "started", "component", "wal")
157	defer w.wait.Done()
158
159	checkpointer := NewCheckpointer(
160		w.cfg.CheckpointDuration,
161		w.seriesIter,
162		w.checkpointWriter(),
163		w.metrics,
164		w.quit,
165	)
166	checkpointer.Run()
167
168}
169
170type resettingPool struct {
171	rPool *sync.Pool // records
172	ePool *sync.Pool // entries
173	bPool *sync.Pool // bytes
174}
175
176func (p *resettingPool) GetRecord() *WALRecord {
177	rec := p.rPool.Get().(*WALRecord)
178	rec.Reset()
179	return rec
180}
181
182func (p *resettingPool) PutRecord(r *WALRecord) {
183	p.rPool.Put(r)
184}
185
186func (p *resettingPool) GetEntries() []logproto.Entry {
187	return p.ePool.Get().([]logproto.Entry)
188}
189
190func (p *resettingPool) PutEntries(es []logproto.Entry) {
191	p.ePool.Put(es[:0]) // nolint:staticcheck
192}
193
194func (p *resettingPool) GetBytes() []byte {
195	return p.bPool.Get().([]byte)
196}
197
198func (p *resettingPool) PutBytes(b []byte) {
199	p.bPool.Put(b[:0]) // nolint:staticcheck
200}
201
202func newRecordPool() *resettingPool {
203	return &resettingPool{
204		rPool: &sync.Pool{
205			New: func() interface{} {
206				return &WALRecord{}
207			},
208		},
209		ePool: &sync.Pool{
210			New: func() interface{} {
211				return make([]logproto.Entry, 0, 512)
212			},
213		},
214		bPool: &sync.Pool{
215			New: func() interface{} {
216				return make([]byte, 0, 1<<10) // 1kb
217			},
218		},
219	}
220}
221