1package compactor 2 3import ( 4 "context" 5 "errors" 6 "flag" 7 "path/filepath" 8 "reflect" 9 "sync" 10 "time" 11 12 util_log "github.com/cortexproject/cortex/pkg/util/log" 13 "github.com/go-kit/kit/log/level" 14 "github.com/grafana/dskit/services" 15 "github.com/prometheus/client_golang/prometheus" 16 "github.com/prometheus/common/model" 17 18 loki_storage "github.com/grafana/loki/pkg/storage" 19 "github.com/grafana/loki/pkg/storage/chunk/local" 20 "github.com/grafana/loki/pkg/storage/chunk/objectclient" 21 "github.com/grafana/loki/pkg/storage/chunk/storage" 22 chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" 23 "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" 24 "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" 25 shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" 26 shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" 27) 28 29type Config struct { 30 WorkingDirectory string `yaml:"working_directory"` 31 SharedStoreType string `yaml:"shared_store"` 32 SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` 33 CompactionInterval time.Duration `yaml:"compaction_interval"` 34 RetentionEnabled bool `yaml:"retention_enabled"` 35 RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` 36 RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` 37 DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` 38 MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` 39} 40 41// RegisterFlags registers flags. 42func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 43 f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.") 44 f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") 45 f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") 46 f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") 47 f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") 48 f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") 49 f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") 50 f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") 51 f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") 52} 53 54func (cfg *Config) IsDefaults() bool { 55 cpy := &Config{} 56 cpy.RegisterFlags(flag.NewFlagSet("defaults", flag.ContinueOnError)) 57 return reflect.DeepEqual(cfg, cpy) 58} 59 60func (cfg *Config) Validate() error { 61 if cfg.MaxCompactionParallelism < 1 { 62 return errors.New("max compaction parallelism must be >= 1") 63 } 64 return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) 65} 66 67type Compactor struct { 68 services.Service 69 70 cfg Config 71 indexStorageClient shipper_storage.Client 72 tableMarker retention.TableMarker 73 sweeper *retention.Sweeper 74 deleteRequestsStore deletion.DeleteRequestsStore 75 DeleteRequestsHandler *deletion.DeleteRequestHandler 76 deleteRequestsManager *deletion.DeleteRequestsManager 77 expirationChecker retention.ExpirationChecker 78 metrics *metrics 79} 80 81func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { 82 if cfg.IsDefaults() { 83 return nil, errors.New("Must specify compactor config") 84 } 85 86 compactor := &Compactor{ 87 cfg: cfg, 88 } 89 90 if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil { 91 return nil, err 92 } 93 94 compactor.Service = services.NewBasicService(nil, compactor.loop, nil) 95 return compactor, nil 96} 97 98func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error { 99 objectClient, err := storage.NewObjectClient(c.cfg.SharedStoreType, storageConfig) 100 if err != nil { 101 return err 102 } 103 104 err = chunk_util.EnsureDirectory(c.cfg.WorkingDirectory) 105 if err != nil { 106 return err 107 } 108 c.indexStorageClient = shipper_storage.NewIndexStorageClient(objectClient, c.cfg.SharedStoreKeyPrefix) 109 c.metrics = newMetrics(r) 110 111 if c.cfg.RetentionEnabled { 112 var encoder objectclient.KeyEncoder 113 if _, ok := objectClient.(*local.FSObjectClient); ok { 114 encoder = objectclient.Base64Encoder 115 } 116 117 chunkClient := objectclient.NewClient(objectClient, encoder) 118 119 retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention") 120 c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r) 121 if err != nil { 122 return err 123 } 124 125 deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") 126 127 c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) 128 if err != nil { 129 return err 130 } 131 132 c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) 133 c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) 134 135 c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) 136 137 c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r) 138 if err != nil { 139 return err 140 } 141 } 142 143 return nil 144} 145 146func (c *Compactor) loop(ctx context.Context) error { 147 if c.cfg.RetentionEnabled { 148 defer c.deleteRequestsStore.Stop() 149 defer c.deleteRequestsManager.Stop() 150 } 151 152 runCompaction := func() { 153 err := c.RunCompaction(ctx) 154 if err != nil { 155 level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) 156 } 157 } 158 var wg sync.WaitGroup 159 wg.Add(1) 160 go func() { 161 defer wg.Done() 162 runCompaction() 163 164 ticker := time.NewTicker(c.cfg.CompactionInterval) 165 defer ticker.Stop() 166 167 for { 168 select { 169 case <-ticker.C: 170 runCompaction() 171 case <-ctx.Done(): 172 return 173 } 174 } 175 }() 176 if c.cfg.RetentionEnabled { 177 wg.Add(1) 178 go func() { 179 // starts the chunk sweeper 180 defer func() { 181 c.sweeper.Stop() 182 wg.Done() 183 }() 184 c.sweeper.Start() 185 <-ctx.Done() 186 }() 187 } 188 189 wg.Wait() 190 return nil 191} 192 193func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { 194 table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker) 195 if err != nil { 196 level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) 197 return err 198 } 199 200 interval := retention.ExtractIntervalFromTableName(tableName) 201 intervalHasExpiredChunks := false 202 if c.cfg.RetentionEnabled { 203 intervalHasExpiredChunks = c.expirationChecker.IntervalHasExpiredChunks(interval) 204 } 205 206 err = table.compact(intervalHasExpiredChunks) 207 if err != nil { 208 level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err) 209 return err 210 } 211 return nil 212} 213 214func (c *Compactor) RunCompaction(ctx context.Context) error { 215 status := statusSuccess 216 start := time.Now() 217 218 if c.cfg.RetentionEnabled { 219 c.expirationChecker.MarkPhaseStarted() 220 } 221 222 defer func() { 223 c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc() 224 if status == statusSuccess { 225 c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds()) 226 c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime() 227 } 228 229 if c.cfg.RetentionEnabled { 230 if status == statusSuccess { 231 c.expirationChecker.MarkPhaseFinished() 232 } else { 233 c.expirationChecker.MarkPhaseFailed() 234 } 235 } 236 }() 237 238 tables, err := c.indexStorageClient.ListTables(ctx) 239 if err != nil { 240 status = statusFailure 241 return err 242 } 243 244 compactTablesChan := make(chan string) 245 errChan := make(chan error) 246 247 for i := 0; i < c.cfg.MaxCompactionParallelism; i++ { 248 go func() { 249 var err error 250 defer func() { 251 errChan <- err 252 }() 253 254 for { 255 select { 256 case tableName, ok := <-compactTablesChan: 257 if !ok { 258 return 259 } 260 261 level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName) 262 err = c.CompactTable(ctx, tableName) 263 if err != nil { 264 return 265 } 266 level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName) 267 case <-ctx.Done(): 268 return 269 } 270 } 271 }() 272 } 273 274 go func() { 275 for _, tableName := range tables { 276 if tableName == deletion.DeleteRequestsTableName { 277 // we do not want to compact or apply retention on delete requests table 278 continue 279 } 280 281 select { 282 case compactTablesChan <- tableName: 283 case <-ctx.Done(): 284 return 285 } 286 } 287 288 close(compactTablesChan) 289 }() 290 291 var firstErr error 292 // read all the errors 293 for i := 0; i < c.cfg.MaxCompactionParallelism; i++ { 294 err := <-errChan 295 if err != nil && firstErr == nil { 296 status = statusFailure 297 firstErr = err 298 } 299 } 300 301 return firstErr 302} 303 304type expirationChecker struct { 305 retentionExpiryChecker retention.ExpirationChecker 306 deletionExpiryChecker retention.ExpirationChecker 307} 308 309func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retention.ExpirationChecker) retention.ExpirationChecker { 310 return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker} 311} 312 313func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) { 314 if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired { 315 return expired, nonDeletedIntervals 316 } 317 318 return e.deletionExpiryChecker.Expired(ref, now) 319} 320 321func (e *expirationChecker) MarkPhaseStarted() { 322 e.retentionExpiryChecker.MarkPhaseStarted() 323 e.deletionExpiryChecker.MarkPhaseStarted() 324} 325 326func (e *expirationChecker) MarkPhaseFailed() { 327 e.retentionExpiryChecker.MarkPhaseFailed() 328 e.deletionExpiryChecker.MarkPhaseFailed() 329} 330 331func (e *expirationChecker) MarkPhaseFinished() { 332 e.retentionExpiryChecker.MarkPhaseFinished() 333 e.deletionExpiryChecker.MarkPhaseFinished() 334} 335 336func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool { 337 return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval) 338} 339 340func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { 341 return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now) 342} 343