1package retention 2 3import ( 4 "errors" 5 "fmt" 6 "strconv" 7 "strings" 8 "time" 9 10 "github.com/prometheus/common/model" 11 12 util_log "github.com/cortexproject/cortex/pkg/util/log" 13 "github.com/go-kit/kit/log/level" 14 15 "github.com/grafana/loki/pkg/storage" 16 "github.com/grafana/loki/pkg/storage/chunk" 17 "github.com/grafana/loki/pkg/storage/stores/shipper" 18) 19 20const ( 21 chunkTimeRangeKeyV3 = '3' 22 seriesRangeKeyV1 = '7' 23 labelSeriesRangeKeyV1 = '8' 24) 25 26var ErrInvalidIndexKey = errors.New("invalid index key") 27 28type InvalidIndexKeyError struct { 29 HashKey string 30 RangeKey string 31} 32 33func newInvalidIndexKeyError(h, r []byte) InvalidIndexKeyError { 34 return InvalidIndexKeyError{ 35 HashKey: string(h), 36 RangeKey: string(r), 37 } 38} 39 40func (e InvalidIndexKeyError) Error() string { 41 return fmt.Sprintf("%s: hash_key:%s range_key:%s", ErrInvalidIndexKey, e.HashKey, e.RangeKey) 42} 43 44func (e InvalidIndexKeyError) Is(target error) bool { 45 return target == ErrInvalidIndexKey 46} 47 48type ChunkRef struct { 49 UserID []byte 50 SeriesID []byte 51 ChunkID []byte 52 From model.Time 53 Through model.Time 54} 55 56func (c ChunkRef) String() string { 57 return fmt.Sprintf("UserID: %s , SeriesID: %s , Time: [%s,%s]", c.UserID, c.SeriesID, c.From, c.Through) 58} 59 60func parseChunkRef(hashKey, rangeKey []byte) (ChunkRef, bool, error) { 61 componentsRef := getComponents() 62 defer putComponents(componentsRef) 63 components := componentsRef.components 64 65 components = decodeRangeKey(rangeKey, components) 66 if len(components) == 0 { 67 return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) 68 } 69 70 keyType := components[len(components)-1] 71 if len(keyType) == 0 || keyType[0] != chunkTimeRangeKeyV3 { 72 return ChunkRef{}, false, nil 73 } 74 chunkID := components[len(components)-2] 75 76 userID, hexFrom, hexThrough, ok := parseChunkID(chunkID) 77 if !ok { 78 return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) 79 } 80 from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64) 81 if err != nil { 82 return ChunkRef{}, false, err 83 } 84 through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64) 85 if err != nil { 86 return ChunkRef{}, false, err 87 } 88 89 return ChunkRef{ 90 UserID: userID, 91 SeriesID: seriesFromHash(hashKey), 92 From: model.Time(from), 93 Through: model.Time(through), 94 ChunkID: chunkID, 95 }, true, nil 96} 97 98func parseChunkID(chunkID []byte) (userID []byte, hexFrom, hexThrough []byte, valid bool) { 99 var ( 100 j, i int 101 hex []byte 102 ) 103 104 for j < len(chunkID) { 105 if chunkID[j] != '/' { 106 j++ 107 continue 108 } 109 userID = chunkID[:j] 110 hex = chunkID[j+1:] 111 break 112 } 113 if len(userID) == 0 { 114 return nil, nil, nil, false 115 } 116 _, i = readOneHexPart(hex) 117 if i == 0 { 118 return nil, nil, nil, false 119 } 120 hex = hex[i+1:] 121 hexFrom, i = readOneHexPart(hex) 122 if i == 0 { 123 return nil, nil, nil, false 124 } 125 hex = hex[i+1:] 126 hexThrough, i = readOneHexPart(hex) 127 if i == 0 { 128 return nil, nil, nil, false 129 } 130 return userID, hexFrom, hexThrough, true 131} 132 133func readOneHexPart(hex []byte) (part []byte, i int) { 134 for i < len(hex) { 135 if hex[i] != ':' { 136 i++ 137 continue 138 } 139 return hex[:i], i 140 } 141 return nil, 0 142} 143 144func parseLabelIndexSeriesID(hashKey, rangeKey []byte) ([]byte, bool, error) { 145 componentsRef := getComponents() 146 defer putComponents(componentsRef) 147 components := componentsRef.components 148 var seriesID []byte 149 components = decodeRangeKey(rangeKey, components) 150 if len(components) < 4 { 151 return nil, false, newInvalidIndexKeyError(hashKey, rangeKey) 152 } 153 keyType := components[len(components)-1] 154 if len(keyType) == 0 { 155 return nil, false, nil 156 } 157 switch keyType[0] { 158 case labelSeriesRangeKeyV1: 159 seriesID = components[1] 160 case seriesRangeKeyV1: 161 seriesID = components[0] 162 default: 163 return nil, false, nil 164 } 165 return seriesID, true, nil 166} 167 168type LabelSeriesRangeKey struct { 169 SeriesID []byte 170 UserID []byte 171 Name []byte 172} 173 174func (l LabelSeriesRangeKey) String() string { 175 return fmt.Sprintf("%s:%s:%s", l.SeriesID, l.UserID, l.Name) 176} 177 178func parseLabelSeriesRangeKey(hashKey, rangeKey []byte) (LabelSeriesRangeKey, bool, error) { 179 rangeComponentsRef := getComponents() 180 defer putComponents(rangeComponentsRef) 181 rangeComponents := rangeComponentsRef.components 182 hashComponentsRef := getComponents() 183 defer putComponents(hashComponentsRef) 184 hashComponents := hashComponentsRef.components 185 186 rangeComponents = decodeRangeKey(rangeKey, rangeComponents) 187 if len(rangeComponents) < 4 { 188 return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey) 189 } 190 keyType := rangeComponents[len(rangeComponents)-1] 191 if len(keyType) == 0 || keyType[0] != labelSeriesRangeKeyV1 { 192 return LabelSeriesRangeKey{}, false, nil 193 } 194 hashComponents = splitBytesBy(hashKey, ':', hashComponents) 195 // > v10 HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey , metricName, v.Name), 196 // < v10 HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, v.Name), 197 198 if len(hashComponents) < 4 { 199 return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey) 200 } 201 return LabelSeriesRangeKey{ 202 SeriesID: rangeComponents[1], 203 Name: hashComponents[len(hashComponents)-1], 204 UserID: hashComponents[len(hashComponents)-4], 205 }, true, nil 206} 207 208func validatePeriods(config storage.SchemaConfig) error { 209 for _, schema := range config.Configs { 210 if schema.IndexType != shipper.BoltDBShipperType { 211 level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("custom retention is not supported for store %s, no retention will be applied for schema entry with start date %s", schema.IndexType, schema.From)) 212 continue 213 } 214 if schema.IndexTables.Period != 24*time.Hour { 215 return fmt.Errorf("schema period must be daily, was: %s", schema.IndexTables.Period) 216 } 217 } 218 return nil 219} 220 221func schemaPeriodForTable(config storage.SchemaConfig, tableName string) (chunk.PeriodConfig, bool) { 222 // first round removes configs that does not have the prefix. 223 candidates := []chunk.PeriodConfig{} 224 for _, schema := range config.Configs { 225 if strings.HasPrefix(tableName, schema.IndexTables.Prefix) { 226 candidates = append(candidates, schema) 227 } 228 } 229 // WARN we assume period is always daily. This is only true for boltdb-shipper. 230 var ( 231 matched chunk.PeriodConfig 232 found bool 233 ) 234 for _, schema := range candidates { 235 periodIndex, err := strconv.ParseInt(strings.TrimPrefix(tableName, schema.IndexTables.Prefix), 10, 64) 236 if err != nil { 237 continue 238 } 239 periodSec := int64(schema.IndexTables.Period / time.Second) 240 tableTs := model.TimeFromUnix(periodIndex * periodSec) 241 if tableTs.After(schema.From.Time) || tableTs == schema.From.Time { 242 matched = schema 243 found = true 244 } 245 } 246 247 return matched, found 248} 249 250func seriesFromHash(h []byte) (seriesID []byte) { 251 var index int 252 for i := range h { 253 if h[i] == ':' { 254 index++ 255 } 256 if index == 2 { 257 seriesID = h[i+1:] 258 return 259 } 260 } 261 return 262} 263 264// decodeKey decodes hash and range value from a boltdb key. 265func decodeKey(k []byte) (hashValue, rangeValue []byte) { 266 // hashValue + 0 + string(rangeValue) 267 for i := range k { 268 if k[i] == 0 { 269 hashValue = k[:i] 270 rangeValue = k[i+1:] 271 return 272 } 273 } 274 return 275} 276 277func splitBytesBy(value []byte, by byte, components [][]byte) [][]byte { 278 components = components[:0] 279 i, j := 0, 0 280 for j < len(value) { 281 if value[j] != by { 282 j++ 283 continue 284 } 285 components = append(components, value[i:j]) 286 j++ 287 i = j 288 } 289 components = append(components, value[i:]) 290 return components 291} 292 293func decodeRangeKey(value []byte, components [][]byte) [][]byte { 294 components = components[:0] 295 i, j := 0, 0 296 for j < len(value) { 297 if value[j] != 0 { 298 j++ 299 continue 300 } 301 components = append(components, value[i:j]) 302 j++ 303 i = j 304 } 305 return components 306} 307