1package retention
2
3import (
4	"github.com/prometheus/prometheus/pkg/labels"
5	"go.etcd.io/bbolt"
6
7	"github.com/grafana/loki/pkg/storage/chunk"
8)
9
10type userSeries struct {
11	key         []byte
12	seriesIDLen int
13}
14
15func newUserSeries(seriesID []byte, userID []byte) userSeries {
16	key := make([]byte, 0, len(seriesID)+len(userID))
17	key = append(key, seriesID...)
18	key = append(key, userID...)
19	return userSeries{
20		key:         key,
21		seriesIDLen: len(seriesID),
22	}
23}
24
25func (us userSeries) Key() string {
26	return unsafeGetString(us.key)
27}
28
29func (us userSeries) SeriesID() []byte {
30	return us.key[:us.seriesIDLen]
31}
32
33func (us userSeries) UserID() []byte {
34	return us.key[us.seriesIDLen:]
35}
36
37func (us *userSeries) Reset(seriesID []byte, userID []byte) {
38	if us.key == nil {
39		us.key = make([]byte, 0, len(seriesID)+len(userID))
40	}
41	us.key = us.key[:0]
42	us.key = append(us.key, seriesID...)
43	us.key = append(us.key, userID...)
44	us.seriesIDLen = len(seriesID)
45}
46
47type userSeriesInfo struct {
48	userSeries
49	isDeleted bool
50	lbls      labels.Labels
51}
52
53type userSeriesMap map[string]userSeriesInfo
54
55func newUserSeriesMap() userSeriesMap {
56	return make(userSeriesMap)
57}
58
59func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) {
60	us := newUserSeries(seriesID, userID)
61	if _, ok := u[us.Key()]; ok {
62		return
63	}
64
65	u[us.Key()] = userSeriesInfo{
66		userSeries: us,
67		isDeleted:  true,
68		lbls:       lbls,
69	}
70}
71
72// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store
73func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) {
74	us := newUserSeries(seriesID, userID)
75	usi := u[us.Key()]
76	usi.isDeleted = false
77	u[us.Key()] = usi
78}
79
80func (u userSeriesMap) ForEach(callback func(info userSeriesInfo) error) error {
81	for _, v := range u {
82		if err := callback(v); err != nil {
83			return err
84		}
85	}
86	return nil
87}
88
89type seriesLabels struct {
90	userSeries
91	lbs labels.Labels
92}
93
94type seriesLabelsMapper struct {
95	cursor *bbolt.Cursor
96	config chunk.PeriodConfig
97
98	bufKey  userSeries
99	mapping map[string]*seriesLabels
100}
101
102func newSeriesLabelsMapper(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*seriesLabelsMapper, error) {
103	sm := &seriesLabelsMapper{
104		cursor:  bucket.Cursor(),
105		mapping: map[string]*seriesLabels{},
106		config:  config,
107		bufKey:  newUserSeries(nil, nil),
108	}
109	if err := sm.build(); err != nil {
110		return nil, err
111	}
112	return sm, nil
113}
114
115func (sm *seriesLabelsMapper) Get(seriesID []byte, userID []byte) labels.Labels {
116	sm.bufKey.Reset(seriesID, userID)
117	lbs, ok := sm.mapping[sm.bufKey.Key()]
118	if ok {
119		return lbs.lbs
120	}
121	return labels.Labels{}
122}
123
124func (sm *seriesLabelsMapper) build() error {
125Outer:
126	for k, v := sm.cursor.First(); k != nil; k, v = sm.cursor.Next() {
127		ref, ok, err := parseLabelSeriesRangeKey(decodeKey(k))
128		if err != nil {
129			return err
130		}
131		if !ok {
132			continue
133		}
134		sm.bufKey.Reset(ref.SeriesID, ref.UserID)
135		lbs, ok := sm.mapping[sm.bufKey.Key()]
136		if !ok {
137			k := newUserSeries(ref.SeriesID, ref.UserID)
138			lbs = &seriesLabels{
139				userSeries: k,
140				lbs:        make(labels.Labels, 0, 15),
141			}
142			sm.mapping[k.Key()] = lbs
143		}
144		// add the labels if it doesn't exist.
145		for _, l := range lbs.lbs {
146			if l.Name == unsafeGetString(ref.Name) {
147				continue Outer
148			}
149		}
150		lbs.lbs = append(lbs.lbs, labels.Label{Name: string(ref.Name), Value: string(v)})
151	}
152	return nil
153}
154