1package retention
2
3import (
4	"fmt"
5
6	"github.com/prometheus/common/model"
7	"github.com/prometheus/prometheus/pkg/labels"
8	"go.etcd.io/bbolt"
9
10	"github.com/grafana/loki/pkg/storage/chunk"
11)
12
13var (
14	_ ChunkEntryIterator = &chunkIndexIterator{}
15	_ SeriesCleaner      = &seriesCleaner{}
16)
17
18type ChunkEntry struct {
19	ChunkRef
20	Labels labels.Labels
21}
22
23type ChunkEntryIterator interface {
24	Next() bool
25	Entry() ChunkEntry
26	// Delete deletes the current entry.
27	Delete() error
28	Err() error
29}
30
31type chunkIndexIterator struct {
32	cursor  *bbolt.Cursor
33	current ChunkEntry
34	first   bool
35	err     error
36
37	labelsMapper *seriesLabelsMapper
38}
39
40func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) {
41	labelsMapper, err := newSeriesLabelsMapper(bucket, config)
42	if err != nil {
43		return nil, err
44	}
45	return &chunkIndexIterator{
46		cursor:       bucket.Cursor(),
47		first:        true,
48		labelsMapper: labelsMapper,
49		current:      ChunkEntry{},
50	}, nil
51}
52
53func (b *chunkIndexIterator) Err() error {
54	return b.err
55}
56
57func (b *chunkIndexIterator) Entry() ChunkEntry {
58	return b.current
59}
60
61func (b *chunkIndexIterator) Delete() error {
62	return b.cursor.Delete()
63}
64
65func (b *chunkIndexIterator) Next() bool {
66	var key []byte
67	if b.first {
68		key, _ = b.cursor.First()
69		b.first = false
70	} else {
71		key, _ = b.cursor.Next()
72	}
73	for key != nil {
74		ref, ok, err := parseChunkRef(decodeKey(key))
75		if err != nil {
76			b.err = err
77			return false
78		}
79		// skips anything else than chunk index entries.
80		if !ok {
81			key, _ = b.cursor.Next()
82			continue
83		}
84		b.current.ChunkRef = ref
85		b.current.Labels = b.labelsMapper.Get(ref.SeriesID, ref.UserID)
86		return true
87	}
88	return false
89}
90
91type SeriesCleaner interface {
92	Cleanup(userID []byte, lbls labels.Labels) error
93}
94
95type seriesCleaner struct {
96	tableInterval model.Interval
97	shards        map[uint32]string
98	bucket        *bbolt.Bucket
99	config        chunk.PeriodConfig
100	schema        chunk.SeriesStoreSchema
101
102	buf []byte
103}
104
105func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName string) *seriesCleaner {
106	baseSchema, _ := config.CreateSchema()
107	schema := baseSchema.(chunk.SeriesStoreSchema)
108	var shards map[uint32]string
109
110	if config.RowShards != 0 {
111		shards = map[uint32]string{}
112		for s := uint32(0); s <= config.RowShards; s++ {
113			shards[s] = fmt.Sprintf("%02d", s)
114		}
115	}
116
117	return &seriesCleaner{
118		tableInterval: ExtractIntervalFromTableName(tableName),
119		schema:        schema,
120		bucket:        bucket,
121		buf:           make([]byte, 0, 1024),
122		config:        config,
123		shards:        shards,
124	}
125}
126
127func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error {
128	_, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "")
129	if err != nil {
130		return err
131	}
132
133	for i := range indexEntries {
134		for _, indexEntry := range indexEntries[i] {
135			key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue))
136			key = append(key, []byte(indexEntry.HashValue)...)
137			key = append(key, []byte(separator)...)
138			key = append(key, indexEntry.RangeValue...)
139
140			err := s.bucket.Delete(key)
141			if err != nil {
142				return err
143			}
144		}
145	}
146
147	return nil
148}
149