1package retention 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "time" 9 10 util_log "github.com/cortexproject/cortex/pkg/util/log" 11 "github.com/go-kit/kit/log/level" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/common/model" 14 "go.etcd.io/bbolt" 15 16 "github.com/grafana/loki/pkg/chunkenc" 17 "github.com/grafana/loki/pkg/storage" 18 "github.com/grafana/loki/pkg/storage/chunk" 19) 20 21var ( 22 bucketName = []byte("index") 23 chunkBucket = []byte("chunks") 24) 25 26const ( 27 logMetricName = "logs" 28 markersFolder = "markers" 29 separator = "\000" 30) 31 32type TableMarker interface { 33 // MarkForDelete marks chunks to delete for a given table and returns if it's empty and how many marks were created. 34 MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) 35} 36 37type Marker struct { 38 workingDirectory string 39 config storage.SchemaConfig 40 expiration ExpirationChecker 41 markerMetrics *markerMetrics 42 chunkClient chunk.Client 43} 44 45func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration ExpirationChecker, chunkClient chunk.Client, r prometheus.Registerer) (*Marker, error) { 46 if err := validatePeriods(config); err != nil { 47 return nil, err 48 } 49 metrics := newMarkerMetrics(r) 50 return &Marker{ 51 workingDirectory: workingDirectory, 52 config: config, 53 expiration: expiration, 54 markerMetrics: metrics, 55 chunkClient: chunkClient, 56 }, nil 57} 58 59// MarkForDelete marks all chunks expired for a given table. 60func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { 61 start := time.Now() 62 status := statusSuccess 63 defer func() { 64 t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds()) 65 level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start)) 66 }() 67 level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName) 68 69 empty, markCount, err := t.markTable(ctx, tableName, db) 70 if err != nil { 71 status = statusFailure 72 return false, 0, err 73 } 74 return empty, markCount, nil 75} 76 77func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { 78 schemaCfg, ok := schemaPeriodForTable(t.config, tableName) 79 if !ok { 80 return false, 0, fmt.Errorf("could not find schema for table: %s", tableName) 81 } 82 83 markerWriter, err := NewMarkerStorageWriter(t.workingDirectory) 84 if err != nil { 85 return false, 0, fmt.Errorf("failed to create marker writer: %w", err) 86 } 87 88 var empty bool 89 err = db.Update(func(tx *bbolt.Tx) error { 90 bucket := tx.Bucket(bucketName) 91 if bucket == nil { 92 return nil 93 } 94 95 chunkIt, err := newChunkIndexIterator(bucket, schemaCfg) 96 if err != nil { 97 return fmt.Errorf("failed to create chunk index iterator: %w", err) 98 } 99 if ctx.Err() != nil { 100 return ctx.Err() 101 } 102 chunkRewriter, err := newChunkRewriter(t.chunkClient, schemaCfg, tableName, bucket) 103 if err != nil { 104 return err 105 } 106 107 empty, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter) 108 if err != nil { 109 return err 110 } 111 t.markerMetrics.tableMarksCreatedTotal.WithLabelValues(tableName).Add(float64(markerWriter.Count())) 112 if err := markerWriter.Close(); err != nil { 113 return fmt.Errorf("failed to close marker writer: %w", err) 114 } 115 return nil 116 }) 117 if err != nil { 118 return false, 0, err 119 } 120 if empty { 121 t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc() 122 return empty, markerWriter.Count(), nil 123 } 124 if markerWriter.Count() == 0 { 125 t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc() 126 return empty, markerWriter.Count(), nil 127 } 128 t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc() 129 return empty, markerWriter.Count(), nil 130} 131 132func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) { 133 seriesMap := newUserSeriesMap() 134 // tableInterval holds the interval for which the table is expected to have the chunks indexed 135 tableInterval := ExtractIntervalFromTableName(tableName) 136 empty := true 137 now := model.Now() 138 139 for chunkIt.Next() { 140 if chunkIt.Err() != nil { 141 return false, chunkIt.Err() 142 } 143 c := chunkIt.Entry() 144 seriesMap.Add(c.SeriesID, c.UserID, c.Labels) 145 146 // see if the chunk is deleted completely or partially 147 if expired, nonDeletedIntervals := expiration.Expired(c, now); expired { 148 if len(nonDeletedIntervals) > 0 { 149 wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals) 150 if err != nil { 151 return false, err 152 } 153 154 if wroteChunks { 155 // we have re-written chunk to the storage so the table won't be empty and the series are still being referred. 156 empty = false 157 seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) 158 } 159 } 160 161 if err := chunkIt.Delete(); err != nil { 162 return false, err 163 } 164 165 // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. 166 // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then 167 // the retention would fail because it would fail to find it in the storage. 168 if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End { 169 if err := marker.Put(c.ChunkID); err != nil { 170 return false, err 171 } 172 } 173 continue 174 } 175 176 // The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval. 177 // If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table. 178 // We would now check if the end time of the tableInterval is out of retention period so that 179 // we can drop the chunk entry from this table without removing the chunk from the store. 180 if c.Through.After(tableInterval.End) { 181 if expiration.DropFromIndex(c, tableInterval.End, now) { 182 if err := chunkIt.Delete(); err != nil { 183 return false, err 184 } 185 continue 186 } 187 } 188 189 empty = false 190 seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) 191 } 192 if empty { 193 return true, nil 194 } 195 if ctx.Err() != nil { 196 return false, ctx.Err() 197 } 198 199 return false, seriesMap.ForEach(func(info userSeriesInfo) error { 200 if !info.isDeleted { 201 return nil 202 } 203 204 return seriesCleaner.Cleanup(info.UserID(), info.lbls) 205 }) 206} 207 208type ChunkClient interface { 209 DeleteChunk(ctx context.Context, userID, chunkID string) error 210 IsChunkNotFoundErr(err error) bool 211} 212 213type Sweeper struct { 214 markerProcessor MarkerProcessor 215 chunkClient ChunkClient 216 sweeperMetrics *sweeperMetrics 217} 218 219func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) { 220 m := newSweeperMetrics(r) 221 p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m) 222 if err != nil { 223 return nil, err 224 } 225 return &Sweeper{ 226 markerProcessor: p, 227 chunkClient: deleteClient, 228 sweeperMetrics: m, 229 }, nil 230} 231 232func (s *Sweeper) Start() { 233 s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error { 234 status := statusSuccess 235 start := time.Now() 236 defer func() { 237 s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds()) 238 }() 239 chunkIDString := unsafeGetString(chunkId) 240 userID, err := getUserIDFromChunkID(chunkId) 241 if err != nil { 242 return err 243 } 244 245 err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString) 246 if s.chunkClient.IsChunkNotFoundErr(err) { 247 status = statusNotFound 248 level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString) 249 return nil 250 } 251 if err != nil { 252 level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err) 253 status = statusFailure 254 } 255 return err 256 }) 257} 258 259func getUserIDFromChunkID(chunkID []byte) ([]byte, error) { 260 idx := bytes.IndexByte(chunkID, '/') 261 if idx <= 0 { 262 return nil, fmt.Errorf("invalid chunk ID %q", chunkID) 263 } 264 265 return chunkID[:idx], nil 266} 267 268func (s *Sweeper) Stop() { 269 s.markerProcessor.Stop() 270} 271 272type chunkRewriter struct { 273 chunkClient chunk.Client 274 tableName string 275 bucket *bbolt.Bucket 276 277 seriesStoreSchema chunk.SeriesStoreSchema 278} 279 280func newChunkRewriter(chunkClient chunk.Client, schemaCfg chunk.PeriodConfig, 281 tableName string, bucket *bbolt.Bucket) (*chunkRewriter, error) { 282 schema, err := schemaCfg.CreateSchema() 283 if err != nil { 284 return nil, err 285 } 286 287 seriesStoreSchema, ok := schema.(chunk.SeriesStoreSchema) 288 if !ok { 289 return nil, errors.New("invalid schema") 290 } 291 292 return &chunkRewriter{ 293 chunkClient: chunkClient, 294 tableName: tableName, 295 bucket: bucket, 296 seriesStoreSchema: seriesStoreSchema, 297 }, nil 298} 299 300func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervals []model.Interval) (bool, error) { 301 userID := unsafeGetString(ce.UserID) 302 chunkID := unsafeGetString(ce.ChunkID) 303 304 chk, err := chunk.ParseExternalKey(userID, chunkID) 305 if err != nil { 306 return false, err 307 } 308 309 chks, err := c.chunkClient.GetChunks(ctx, []chunk.Chunk{chk}) 310 if err != nil { 311 return false, err 312 } 313 314 if len(chks) != 1 { 315 return false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks)) 316 } 317 318 wroteChunks := false 319 320 for _, interval := range intervals { 321 newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End) 322 if err != nil { 323 return false, err 324 } 325 326 facade, ok := newChunkData.(*chunkenc.Facade) 327 if !ok { 328 return false, errors.New("invalid chunk type") 329 } 330 331 newChunk := chunk.NewChunk( 332 userID, chks[0].Fingerprint, chks[0].Metric, 333 facade, 334 interval.Start, 335 interval.End, 336 ) 337 338 err = newChunk.Encode() 339 if err != nil { 340 return false, err 341 } 342 343 entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, newChunk.ExternalKey()) 344 if err != nil { 345 return false, err 346 } 347 348 uploadChunk := false 349 350 for _, entry := range entries { 351 // write an entry only if it belongs to this table 352 if entry.TableName == c.tableName { 353 key := entry.HashValue + separator + string(entry.RangeValue) 354 if err := c.bucket.Put([]byte(key), nil); err != nil { 355 return false, err 356 } 357 uploadChunk = true 358 } 359 } 360 361 // upload chunk only if an entry was written 362 if uploadChunk { 363 err = c.chunkClient.PutChunks(ctx, []chunk.Chunk{newChunk}) 364 if err != nil { 365 return false, err 366 } 367 wroteChunks = true 368 } 369 } 370 371 return wroteChunks, nil 372} 373