1package retention
2
3import (
4	"errors"
5	"fmt"
6	"strconv"
7	"strings"
8	"time"
9
10	"github.com/prometheus/common/model"
11
12	util_log "github.com/cortexproject/cortex/pkg/util/log"
13	"github.com/go-kit/kit/log/level"
14
15	"github.com/grafana/loki/pkg/storage"
16	"github.com/grafana/loki/pkg/storage/chunk"
17	"github.com/grafana/loki/pkg/storage/stores/shipper"
18)
19
20const (
21	chunkTimeRangeKeyV3   = '3'
22	seriesRangeKeyV1      = '7'
23	labelSeriesRangeKeyV1 = '8'
24)
25
26var ErrInvalidIndexKey = errors.New("invalid index key")
27
28type InvalidIndexKeyError struct {
29	HashKey  string
30	RangeKey string
31}
32
33func newInvalidIndexKeyError(h, r []byte) InvalidIndexKeyError {
34	return InvalidIndexKeyError{
35		HashKey:  string(h),
36		RangeKey: string(r),
37	}
38}
39
40func (e InvalidIndexKeyError) Error() string {
41	return fmt.Sprintf("%s: hash_key:%s range_key:%s", ErrInvalidIndexKey, e.HashKey, e.RangeKey)
42}
43
44func (e InvalidIndexKeyError) Is(target error) bool {
45	return target == ErrInvalidIndexKey
46}
47
48type ChunkRef struct {
49	UserID   []byte
50	SeriesID []byte
51	ChunkID  []byte
52	From     model.Time
53	Through  model.Time
54}
55
56func (c ChunkRef) String() string {
57	return fmt.Sprintf("UserID: %s , SeriesID: %s , Time: [%s,%s]", c.UserID, c.SeriesID, c.From, c.Through)
58}
59
60func parseChunkRef(hashKey, rangeKey []byte) (ChunkRef, bool, error) {
61	componentsRef := getComponents()
62	defer putComponents(componentsRef)
63	components := componentsRef.components
64
65	components = decodeRangeKey(rangeKey, components)
66	if len(components) == 0 {
67		return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
68	}
69
70	keyType := components[len(components)-1]
71	if len(keyType) == 0 || keyType[0] != chunkTimeRangeKeyV3 {
72		return ChunkRef{}, false, nil
73	}
74	chunkID := components[len(components)-2]
75
76	userID, hexFrom, hexThrough, ok := parseChunkID(chunkID)
77	if !ok {
78		return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
79	}
80	from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64)
81	if err != nil {
82		return ChunkRef{}, false, err
83	}
84	through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64)
85	if err != nil {
86		return ChunkRef{}, false, err
87	}
88
89	return ChunkRef{
90		UserID:   userID,
91		SeriesID: seriesFromHash(hashKey),
92		From:     model.Time(from),
93		Through:  model.Time(through),
94		ChunkID:  chunkID,
95	}, true, nil
96}
97
98func parseChunkID(chunkID []byte) (userID []byte, hexFrom, hexThrough []byte, valid bool) {
99	var (
100		j, i int
101		hex  []byte
102	)
103
104	for j < len(chunkID) {
105		if chunkID[j] != '/' {
106			j++
107			continue
108		}
109		userID = chunkID[:j]
110		hex = chunkID[j+1:]
111		break
112	}
113	if len(userID) == 0 {
114		return nil, nil, nil, false
115	}
116	_, i = readOneHexPart(hex)
117	if i == 0 {
118		return nil, nil, nil, false
119	}
120	hex = hex[i+1:]
121	hexFrom, i = readOneHexPart(hex)
122	if i == 0 {
123		return nil, nil, nil, false
124	}
125	hex = hex[i+1:]
126	hexThrough, i = readOneHexPart(hex)
127	if i == 0 {
128		return nil, nil, nil, false
129	}
130	return userID, hexFrom, hexThrough, true
131}
132
133func readOneHexPart(hex []byte) (part []byte, i int) {
134	for i < len(hex) {
135		if hex[i] != ':' {
136			i++
137			continue
138		}
139		return hex[:i], i
140	}
141	return nil, 0
142}
143
144func parseLabelIndexSeriesID(hashKey, rangeKey []byte) ([]byte, bool, error) {
145	componentsRef := getComponents()
146	defer putComponents(componentsRef)
147	components := componentsRef.components
148	var seriesID []byte
149	components = decodeRangeKey(rangeKey, components)
150	if len(components) < 4 {
151		return nil, false, newInvalidIndexKeyError(hashKey, rangeKey)
152	}
153	keyType := components[len(components)-1]
154	if len(keyType) == 0 {
155		return nil, false, nil
156	}
157	switch keyType[0] {
158	case labelSeriesRangeKeyV1:
159		seriesID = components[1]
160	case seriesRangeKeyV1:
161		seriesID = components[0]
162	default:
163		return nil, false, nil
164	}
165	return seriesID, true, nil
166}
167
168type LabelSeriesRangeKey struct {
169	SeriesID []byte
170	UserID   []byte
171	Name     []byte
172}
173
174func (l LabelSeriesRangeKey) String() string {
175	return fmt.Sprintf("%s:%s:%s", l.SeriesID, l.UserID, l.Name)
176}
177
178func parseLabelSeriesRangeKey(hashKey, rangeKey []byte) (LabelSeriesRangeKey, bool, error) {
179	rangeComponentsRef := getComponents()
180	defer putComponents(rangeComponentsRef)
181	rangeComponents := rangeComponentsRef.components
182	hashComponentsRef := getComponents()
183	defer putComponents(hashComponentsRef)
184	hashComponents := hashComponentsRef.components
185
186	rangeComponents = decodeRangeKey(rangeKey, rangeComponents)
187	if len(rangeComponents) < 4 {
188		return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
189	}
190	keyType := rangeComponents[len(rangeComponents)-1]
191	if len(keyType) == 0 || keyType[0] != labelSeriesRangeKeyV1 {
192		return LabelSeriesRangeKey{}, false, nil
193	}
194	hashComponents = splitBytesBy(hashKey, ':', hashComponents)
195	// 	> v10		HashValue:  fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey , metricName, v.Name),
196	// < v10		HashValue:  fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, v.Name),
197
198	if len(hashComponents) < 4 {
199		return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
200	}
201	return LabelSeriesRangeKey{
202		SeriesID: rangeComponents[1],
203		Name:     hashComponents[len(hashComponents)-1],
204		UserID:   hashComponents[len(hashComponents)-4],
205	}, true, nil
206}
207
208func validatePeriods(config storage.SchemaConfig) error {
209	for _, schema := range config.Configs {
210		if schema.IndexType != shipper.BoltDBShipperType {
211			level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("custom retention is not supported for store %s, no retention will be applied for schema entry with start date %s", schema.IndexType, schema.From))
212			continue
213		}
214		if schema.IndexTables.Period != 24*time.Hour {
215			return fmt.Errorf("schema period must be daily, was: %s", schema.IndexTables.Period)
216		}
217	}
218	return nil
219}
220
221func schemaPeriodForTable(config storage.SchemaConfig, tableName string) (chunk.PeriodConfig, bool) {
222	// first round removes configs that does not have the prefix.
223	candidates := []chunk.PeriodConfig{}
224	for _, schema := range config.Configs {
225		if strings.HasPrefix(tableName, schema.IndexTables.Prefix) {
226			candidates = append(candidates, schema)
227		}
228	}
229	// WARN we  assume period is always daily. This is only true for boltdb-shipper.
230	var (
231		matched chunk.PeriodConfig
232		found   bool
233	)
234	for _, schema := range candidates {
235		periodIndex, err := strconv.ParseInt(strings.TrimPrefix(tableName, schema.IndexTables.Prefix), 10, 64)
236		if err != nil {
237			continue
238		}
239		periodSec := int64(schema.IndexTables.Period / time.Second)
240		tableTs := model.TimeFromUnix(periodIndex * periodSec)
241		if tableTs.After(schema.From.Time) || tableTs == schema.From.Time {
242			matched = schema
243			found = true
244		}
245	}
246
247	return matched, found
248}
249
250func seriesFromHash(h []byte) (seriesID []byte) {
251	var index int
252	for i := range h {
253		if h[i] == ':' {
254			index++
255		}
256		if index == 2 {
257			seriesID = h[i+1:]
258			return
259		}
260	}
261	return
262}
263
264// decodeKey decodes hash and range value from a boltdb key.
265func decodeKey(k []byte) (hashValue, rangeValue []byte) {
266	// hashValue + 0 + string(rangeValue)
267	for i := range k {
268		if k[i] == 0 {
269			hashValue = k[:i]
270			rangeValue = k[i+1:]
271			return
272		}
273	}
274	return
275}
276
277func splitBytesBy(value []byte, by byte, components [][]byte) [][]byte {
278	components = components[:0]
279	i, j := 0, 0
280	for j < len(value) {
281		if value[j] != by {
282			j++
283			continue
284		}
285		components = append(components, value[i:j])
286		j++
287		i = j
288	}
289	components = append(components, value[i:])
290	return components
291}
292
293func decodeRangeKey(value []byte, components [][]byte) [][]byte {
294	components = components[:0]
295	i, j := 0, 0
296	for j < len(value) {
297		if value[j] != 0 {
298			j++
299			continue
300		}
301		components = append(components, value[i:j])
302		j++
303		i = j
304	}
305	return components
306}
307