1package compactor
2
3import (
4	"context"
5	"errors"
6	"flag"
7	"path/filepath"
8	"reflect"
9	"sync"
10	"time"
11
12	util_log "github.com/cortexproject/cortex/pkg/util/log"
13	"github.com/go-kit/kit/log/level"
14	"github.com/grafana/dskit/services"
15	"github.com/prometheus/client_golang/prometheus"
16	"github.com/prometheus/common/model"
17
18	loki_storage "github.com/grafana/loki/pkg/storage"
19	"github.com/grafana/loki/pkg/storage/chunk/local"
20	"github.com/grafana/loki/pkg/storage/chunk/objectclient"
21	"github.com/grafana/loki/pkg/storage/chunk/storage"
22	chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
23	"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
24	"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
25	shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage"
26	shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
27)
28
29type Config struct {
30	WorkingDirectory          string        `yaml:"working_directory"`
31	SharedStoreType           string        `yaml:"shared_store"`
32	SharedStoreKeyPrefix      string        `yaml:"shared_store_key_prefix"`
33	CompactionInterval        time.Duration `yaml:"compaction_interval"`
34	RetentionEnabled          bool          `yaml:"retention_enabled"`
35	RetentionDeleteDelay      time.Duration `yaml:"retention_delete_delay"`
36	RetentionDeleteWorkCount  int           `yaml:"retention_delete_worker_count"`
37	DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
38	MaxCompactionParallelism  int           `yaml:"max_compaction_parallelism"`
39}
40
41// RegisterFlags registers flags.
42func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
43	f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
44	f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
45	f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
46	f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
47	f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
48	f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
49	f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
50	f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
51	f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
52}
53
54func (cfg *Config) IsDefaults() bool {
55	cpy := &Config{}
56	cpy.RegisterFlags(flag.NewFlagSet("defaults", flag.ContinueOnError))
57	return reflect.DeepEqual(cfg, cpy)
58}
59
60func (cfg *Config) Validate() error {
61	if cfg.MaxCompactionParallelism < 1 {
62		return errors.New("max compaction parallelism must be >= 1")
63	}
64	return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
65}
66
67type Compactor struct {
68	services.Service
69
70	cfg                   Config
71	indexStorageClient    shipper_storage.Client
72	tableMarker           retention.TableMarker
73	sweeper               *retention.Sweeper
74	deleteRequestsStore   deletion.DeleteRequestsStore
75	DeleteRequestsHandler *deletion.DeleteRequestHandler
76	deleteRequestsManager *deletion.DeleteRequestsManager
77	expirationChecker     retention.ExpirationChecker
78	metrics               *metrics
79}
80
81func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) {
82	if cfg.IsDefaults() {
83		return nil, errors.New("Must specify compactor config")
84	}
85
86	compactor := &Compactor{
87		cfg: cfg,
88	}
89
90	if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil {
91		return nil, err
92	}
93
94	compactor.Service = services.NewBasicService(nil, compactor.loop, nil)
95	return compactor, nil
96}
97
98func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error {
99	objectClient, err := storage.NewObjectClient(c.cfg.SharedStoreType, storageConfig)
100	if err != nil {
101		return err
102	}
103
104	err = chunk_util.EnsureDirectory(c.cfg.WorkingDirectory)
105	if err != nil {
106		return err
107	}
108	c.indexStorageClient = shipper_storage.NewIndexStorageClient(objectClient, c.cfg.SharedStoreKeyPrefix)
109	c.metrics = newMetrics(r)
110
111	if c.cfg.RetentionEnabled {
112		var encoder objectclient.KeyEncoder
113		if _, ok := objectClient.(*local.FSObjectClient); ok {
114			encoder = objectclient.Base64Encoder
115		}
116
117		chunkClient := objectclient.NewClient(objectClient, encoder)
118
119		retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention")
120		c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
121		if err != nil {
122			return err
123		}
124
125		deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
126
127		c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
128		if err != nil {
129			return err
130		}
131
132		c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
133		c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
134
135		c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
136
137		c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
138		if err != nil {
139			return err
140		}
141	}
142
143	return nil
144}
145
146func (c *Compactor) loop(ctx context.Context) error {
147	if c.cfg.RetentionEnabled {
148		defer c.deleteRequestsStore.Stop()
149		defer c.deleteRequestsManager.Stop()
150	}
151
152	runCompaction := func() {
153		err := c.RunCompaction(ctx)
154		if err != nil {
155			level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
156		}
157	}
158	var wg sync.WaitGroup
159	wg.Add(1)
160	go func() {
161		defer wg.Done()
162		runCompaction()
163
164		ticker := time.NewTicker(c.cfg.CompactionInterval)
165		defer ticker.Stop()
166
167		for {
168			select {
169			case <-ticker.C:
170				runCompaction()
171			case <-ctx.Done():
172				return
173			}
174		}
175	}()
176	if c.cfg.RetentionEnabled {
177		wg.Add(1)
178		go func() {
179			// starts the chunk sweeper
180			defer func() {
181				c.sweeper.Stop()
182				wg.Done()
183			}()
184			c.sweeper.Start()
185			<-ctx.Done()
186		}()
187	}
188
189	wg.Wait()
190	return nil
191}
192
193func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
194	table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker)
195	if err != nil {
196		level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
197		return err
198	}
199
200	interval := retention.ExtractIntervalFromTableName(tableName)
201	intervalHasExpiredChunks := false
202	if c.cfg.RetentionEnabled {
203		intervalHasExpiredChunks = c.expirationChecker.IntervalHasExpiredChunks(interval)
204	}
205
206	err = table.compact(intervalHasExpiredChunks)
207	if err != nil {
208		level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err)
209		return err
210	}
211	return nil
212}
213
214func (c *Compactor) RunCompaction(ctx context.Context) error {
215	status := statusSuccess
216	start := time.Now()
217
218	if c.cfg.RetentionEnabled {
219		c.expirationChecker.MarkPhaseStarted()
220	}
221
222	defer func() {
223		c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc()
224		if status == statusSuccess {
225			c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds())
226			c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
227		}
228
229		if c.cfg.RetentionEnabled {
230			if status == statusSuccess {
231				c.expirationChecker.MarkPhaseFinished()
232			} else {
233				c.expirationChecker.MarkPhaseFailed()
234			}
235		}
236	}()
237
238	tables, err := c.indexStorageClient.ListTables(ctx)
239	if err != nil {
240		status = statusFailure
241		return err
242	}
243
244	compactTablesChan := make(chan string)
245	errChan := make(chan error)
246
247	for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
248		go func() {
249			var err error
250			defer func() {
251				errChan <- err
252			}()
253
254			for {
255				select {
256				case tableName, ok := <-compactTablesChan:
257					if !ok {
258						return
259					}
260
261					level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
262					err = c.CompactTable(ctx, tableName)
263					if err != nil {
264						return
265					}
266					level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName)
267				case <-ctx.Done():
268					return
269				}
270			}
271		}()
272	}
273
274	go func() {
275		for _, tableName := range tables {
276			if tableName == deletion.DeleteRequestsTableName {
277				// we do not want to compact or apply retention on delete requests table
278				continue
279			}
280
281			select {
282			case compactTablesChan <- tableName:
283			case <-ctx.Done():
284				return
285			}
286		}
287
288		close(compactTablesChan)
289	}()
290
291	var firstErr error
292	// read all the errors
293	for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
294		err := <-errChan
295		if err != nil && firstErr == nil {
296			status = statusFailure
297			firstErr = err
298		}
299	}
300
301	return firstErr
302}
303
304type expirationChecker struct {
305	retentionExpiryChecker retention.ExpirationChecker
306	deletionExpiryChecker  retention.ExpirationChecker
307}
308
309func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retention.ExpirationChecker) retention.ExpirationChecker {
310	return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
311}
312
313func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) {
314	if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
315		return expired, nonDeletedIntervals
316	}
317
318	return e.deletionExpiryChecker.Expired(ref, now)
319}
320
321func (e *expirationChecker) MarkPhaseStarted() {
322	e.retentionExpiryChecker.MarkPhaseStarted()
323	e.deletionExpiryChecker.MarkPhaseStarted()
324}
325
326func (e *expirationChecker) MarkPhaseFailed() {
327	e.retentionExpiryChecker.MarkPhaseFailed()
328	e.deletionExpiryChecker.MarkPhaseFailed()
329}
330
331func (e *expirationChecker) MarkPhaseFinished() {
332	e.retentionExpiryChecker.MarkPhaseFinished()
333	e.deletionExpiryChecker.MarkPhaseFinished()
334}
335
336func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool {
337	return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval)
338}
339
340func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool {
341	return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now)
342}
343