1package retention 2 3import ( 4 "fmt" 5 6 "github.com/prometheus/common/model" 7 "github.com/prometheus/prometheus/pkg/labels" 8 "go.etcd.io/bbolt" 9 10 "github.com/grafana/loki/pkg/storage/chunk" 11) 12 13var ( 14 _ ChunkEntryIterator = &chunkIndexIterator{} 15 _ SeriesCleaner = &seriesCleaner{} 16) 17 18type ChunkEntry struct { 19 ChunkRef 20 Labels labels.Labels 21} 22 23type ChunkEntryIterator interface { 24 Next() bool 25 Entry() ChunkEntry 26 // Delete deletes the current entry. 27 Delete() error 28 Err() error 29} 30 31type chunkIndexIterator struct { 32 cursor *bbolt.Cursor 33 current ChunkEntry 34 first bool 35 err error 36 37 labelsMapper *seriesLabelsMapper 38} 39 40func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) { 41 labelsMapper, err := newSeriesLabelsMapper(bucket, config) 42 if err != nil { 43 return nil, err 44 } 45 return &chunkIndexIterator{ 46 cursor: bucket.Cursor(), 47 first: true, 48 labelsMapper: labelsMapper, 49 current: ChunkEntry{}, 50 }, nil 51} 52 53func (b *chunkIndexIterator) Err() error { 54 return b.err 55} 56 57func (b *chunkIndexIterator) Entry() ChunkEntry { 58 return b.current 59} 60 61func (b *chunkIndexIterator) Delete() error { 62 return b.cursor.Delete() 63} 64 65func (b *chunkIndexIterator) Next() bool { 66 var key []byte 67 if b.first { 68 key, _ = b.cursor.First() 69 b.first = false 70 } else { 71 key, _ = b.cursor.Next() 72 } 73 for key != nil { 74 ref, ok, err := parseChunkRef(decodeKey(key)) 75 if err != nil { 76 b.err = err 77 return false 78 } 79 // skips anything else than chunk index entries. 80 if !ok { 81 key, _ = b.cursor.Next() 82 continue 83 } 84 b.current.ChunkRef = ref 85 b.current.Labels = b.labelsMapper.Get(ref.SeriesID, ref.UserID) 86 return true 87 } 88 return false 89} 90 91type SeriesCleaner interface { 92 Cleanup(userID []byte, lbls labels.Labels) error 93} 94 95type seriesCleaner struct { 96 tableInterval model.Interval 97 shards map[uint32]string 98 bucket *bbolt.Bucket 99 config chunk.PeriodConfig 100 schema chunk.SeriesStoreSchema 101 102 buf []byte 103} 104 105func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName string) *seriesCleaner { 106 baseSchema, _ := config.CreateSchema() 107 schema := baseSchema.(chunk.SeriesStoreSchema) 108 var shards map[uint32]string 109 110 if config.RowShards != 0 { 111 shards = map[uint32]string{} 112 for s := uint32(0); s <= config.RowShards; s++ { 113 shards[s] = fmt.Sprintf("%02d", s) 114 } 115 } 116 117 return &seriesCleaner{ 118 tableInterval: ExtractIntervalFromTableName(tableName), 119 schema: schema, 120 bucket: bucket, 121 buf: make([]byte, 0, 1024), 122 config: config, 123 shards: shards, 124 } 125} 126 127func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error { 128 _, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "") 129 if err != nil { 130 return err 131 } 132 133 for i := range indexEntries { 134 for _, indexEntry := range indexEntries[i] { 135 key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue)) 136 key = append(key, []byte(indexEntry.HashValue)...) 137 key = append(key, []byte(separator)...) 138 key = append(key, indexEntry.RangeValue...) 139 140 err := s.bucket.Delete(key) 141 if err != nil { 142 return err 143 } 144 } 145 } 146 147 return nil 148} 149