1package ingester 2 3import ( 4 "flag" 5 "sync" 6 "time" 7 8 util_log "github.com/cortexproject/cortex/pkg/util/log" 9 "github.com/go-kit/kit/log/level" 10 "github.com/pkg/errors" 11 "github.com/prometheus/client_golang/prometheus" 12 "github.com/prometheus/prometheus/tsdb/wal" 13 14 "github.com/grafana/loki/pkg/logproto" 15 "github.com/grafana/loki/pkg/util/flagext" 16) 17 18var ( 19 // shared pool for WALRecords and []logproto.Entries 20 recordPool = newRecordPool() 21) 22 23const walSegmentSize = wal.DefaultSegmentSize * 4 24const defaultCeiling = 4 << 30 // 4GB 25 26type WALConfig struct { 27 Enabled bool `yaml:"enabled"` 28 Dir string `yaml:"dir"` 29 CheckpointDuration time.Duration `yaml:"checkpoint_duration"` 30 FlushOnShutdown bool `yaml:"flush_on_shutdown"` 31 ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` 32} 33 34func (cfg *WALConfig) Validate() error { 35 if cfg.Enabled && cfg.CheckpointDuration < 1 { 36 return errors.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration) 37 } 38 return nil 39} 40 41// RegisterFlags adds the flags required to config this to the given FlagSet 42func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { 43 f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.") 44 f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") 45 f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.") 46 f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.") 47 48 // Need to set default here 49 cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling) 50 f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", "How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. We suggest setting this to a high percentage (~75%) of available memory.") 51} 52 53// WAL interface allows us to have a no-op WAL when the WAL is disabled. 54type WAL interface { 55 Start() 56 // Log marshalls the records and writes it into the WAL. 57 Log(*WALRecord) error 58 // Stop stops all the WAL operations. 59 Stop() error 60} 61 62type noopWAL struct{} 63 64func (noopWAL) Start() {} 65func (noopWAL) Log(*WALRecord) error { return nil } 66func (noopWAL) Stop() error { return nil } 67 68type walWrapper struct { 69 cfg WALConfig 70 wal *wal.WAL 71 metrics *ingesterMetrics 72 seriesIter SeriesIter 73 74 wait sync.WaitGroup 75 quit chan struct{} 76} 77 78// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. 79func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMetrics, seriesIter SeriesIter) (WAL, error) { 80 if !cfg.Enabled { 81 return noopWAL{}, nil 82 } 83 84 tsdbWAL, err := wal.NewSize(util_log.Logger, registerer, cfg.Dir, walSegmentSize, false) 85 if err != nil { 86 return nil, err 87 } 88 89 w := &walWrapper{ 90 cfg: cfg, 91 quit: make(chan struct{}), 92 wal: tsdbWAL, 93 metrics: metrics, 94 seriesIter: seriesIter, 95 } 96 97 return w, nil 98} 99 100func (w *walWrapper) Start() { 101 w.wait.Add(1) 102 go w.run() 103} 104 105func (w *walWrapper) Log(record *WALRecord) error { 106 if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) { 107 return nil 108 } 109 select { 110 case <-w.quit: 111 return nil 112 default: 113 buf := recordPool.GetBytes()[:0] 114 defer func() { 115 recordPool.PutBytes(buf) 116 }() 117 118 // Always write series then entries. 119 if len(record.Series) > 0 { 120 buf = record.encodeSeries(buf) 121 if err := w.wal.Log(buf); err != nil { 122 return err 123 } 124 w.metrics.walRecordsLogged.Inc() 125 w.metrics.walLoggedBytesTotal.Add(float64(len(buf))) 126 buf = buf[:0] 127 } 128 if len(record.RefEntries) > 0 { 129 buf = record.encodeEntries(CurrentEntriesRec, buf) 130 if err := w.wal.Log(buf); err != nil { 131 return err 132 } 133 w.metrics.walRecordsLogged.Inc() 134 w.metrics.walLoggedBytesTotal.Add(float64(len(buf))) 135 } 136 return nil 137 } 138} 139 140func (w *walWrapper) Stop() error { 141 close(w.quit) 142 w.wait.Wait() 143 err := w.wal.Close() 144 level.Info(util_log.Logger).Log("msg", "stopped", "component", "wal") 145 return err 146} 147 148func (w *walWrapper) checkpointWriter() *WALCheckpointWriter { 149 return &WALCheckpointWriter{ 150 metrics: w.metrics, 151 segmentWAL: w.wal, 152 } 153} 154 155func (w *walWrapper) run() { 156 level.Info(util_log.Logger).Log("msg", "started", "component", "wal") 157 defer w.wait.Done() 158 159 checkpointer := NewCheckpointer( 160 w.cfg.CheckpointDuration, 161 w.seriesIter, 162 w.checkpointWriter(), 163 w.metrics, 164 w.quit, 165 ) 166 checkpointer.Run() 167 168} 169 170type resettingPool struct { 171 rPool *sync.Pool // records 172 ePool *sync.Pool // entries 173 bPool *sync.Pool // bytes 174} 175 176func (p *resettingPool) GetRecord() *WALRecord { 177 rec := p.rPool.Get().(*WALRecord) 178 rec.Reset() 179 return rec 180} 181 182func (p *resettingPool) PutRecord(r *WALRecord) { 183 p.rPool.Put(r) 184} 185 186func (p *resettingPool) GetEntries() []logproto.Entry { 187 return p.ePool.Get().([]logproto.Entry) 188} 189 190func (p *resettingPool) PutEntries(es []logproto.Entry) { 191 p.ePool.Put(es[:0]) // nolint:staticcheck 192} 193 194func (p *resettingPool) GetBytes() []byte { 195 return p.bPool.Get().([]byte) 196} 197 198func (p *resettingPool) PutBytes(b []byte) { 199 p.bPool.Put(b[:0]) // nolint:staticcheck 200} 201 202func newRecordPool() *resettingPool { 203 return &resettingPool{ 204 rPool: &sync.Pool{ 205 New: func() interface{} { 206 return &WALRecord{} 207 }, 208 }, 209 ePool: &sync.Pool{ 210 New: func() interface{} { 211 return make([]logproto.Entry, 0, 512) 212 }, 213 }, 214 bPool: &sync.Pool{ 215 New: func() interface{} { 216 return make([]byte, 0, 1<<10) // 1kb 217 }, 218 }, 219 } 220} 221