1package retention 2 3import ( 4 "github.com/prometheus/prometheus/pkg/labels" 5 "go.etcd.io/bbolt" 6 7 "github.com/grafana/loki/pkg/storage/chunk" 8) 9 10type userSeries struct { 11 key []byte 12 seriesIDLen int 13} 14 15func newUserSeries(seriesID []byte, userID []byte) userSeries { 16 key := make([]byte, 0, len(seriesID)+len(userID)) 17 key = append(key, seriesID...) 18 key = append(key, userID...) 19 return userSeries{ 20 key: key, 21 seriesIDLen: len(seriesID), 22 } 23} 24 25func (us userSeries) Key() string { 26 return unsafeGetString(us.key) 27} 28 29func (us userSeries) SeriesID() []byte { 30 return us.key[:us.seriesIDLen] 31} 32 33func (us userSeries) UserID() []byte { 34 return us.key[us.seriesIDLen:] 35} 36 37func (us *userSeries) Reset(seriesID []byte, userID []byte) { 38 if us.key == nil { 39 us.key = make([]byte, 0, len(seriesID)+len(userID)) 40 } 41 us.key = us.key[:0] 42 us.key = append(us.key, seriesID...) 43 us.key = append(us.key, userID...) 44 us.seriesIDLen = len(seriesID) 45} 46 47type userSeriesInfo struct { 48 userSeries 49 isDeleted bool 50 lbls labels.Labels 51} 52 53type userSeriesMap map[string]userSeriesInfo 54 55func newUserSeriesMap() userSeriesMap { 56 return make(userSeriesMap) 57} 58 59func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) { 60 us := newUserSeries(seriesID, userID) 61 if _, ok := u[us.Key()]; ok { 62 return 63 } 64 65 u[us.Key()] = userSeriesInfo{ 66 userSeries: us, 67 isDeleted: true, 68 lbls: lbls, 69 } 70} 71 72// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store 73func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) { 74 us := newUserSeries(seriesID, userID) 75 usi := u[us.Key()] 76 usi.isDeleted = false 77 u[us.Key()] = usi 78} 79 80func (u userSeriesMap) ForEach(callback func(info userSeriesInfo) error) error { 81 for _, v := range u { 82 if err := callback(v); err != nil { 83 return err 84 } 85 } 86 return nil 87} 88 89type seriesLabels struct { 90 userSeries 91 lbs labels.Labels 92} 93 94type seriesLabelsMapper struct { 95 cursor *bbolt.Cursor 96 config chunk.PeriodConfig 97 98 bufKey userSeries 99 mapping map[string]*seriesLabels 100} 101 102func newSeriesLabelsMapper(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*seriesLabelsMapper, error) { 103 sm := &seriesLabelsMapper{ 104 cursor: bucket.Cursor(), 105 mapping: map[string]*seriesLabels{}, 106 config: config, 107 bufKey: newUserSeries(nil, nil), 108 } 109 if err := sm.build(); err != nil { 110 return nil, err 111 } 112 return sm, nil 113} 114 115func (sm *seriesLabelsMapper) Get(seriesID []byte, userID []byte) labels.Labels { 116 sm.bufKey.Reset(seriesID, userID) 117 lbs, ok := sm.mapping[sm.bufKey.Key()] 118 if ok { 119 return lbs.lbs 120 } 121 return labels.Labels{} 122} 123 124func (sm *seriesLabelsMapper) build() error { 125Outer: 126 for k, v := sm.cursor.First(); k != nil; k, v = sm.cursor.Next() { 127 ref, ok, err := parseLabelSeriesRangeKey(decodeKey(k)) 128 if err != nil { 129 return err 130 } 131 if !ok { 132 continue 133 } 134 sm.bufKey.Reset(ref.SeriesID, ref.UserID) 135 lbs, ok := sm.mapping[sm.bufKey.Key()] 136 if !ok { 137 k := newUserSeries(ref.SeriesID, ref.UserID) 138 lbs = &seriesLabels{ 139 userSeries: k, 140 lbs: make(labels.Labels, 0, 15), 141 } 142 sm.mapping[k.Key()] = lbs 143 } 144 // add the labels if it doesn't exist. 145 for _, l := range lbs.lbs { 146 if l.Name == unsafeGetString(ref.Name) { 147 continue Outer 148 } 149 } 150 lbs.lbs = append(lbs.lbs, labels.Label{Name: string(ref.Name), Value: string(v)}) 151 } 152 return nil 153} 154